""" Computer Auditor — Python backend (FastAPI). Hot paths: directory listing with recursive sizes, large-file scan, processes, drives, system, network. Binds to 127.0.0.1 only. Started by Electron with: python -m uvicorn main:app --host 127.0.0.1 --port """ from __future__ import annotations import os import platform import socket import sys import tempfile import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any import psutil from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field MAX_FILES_WALK = 200_000 app = FastAPI(title="Computer Auditor API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def measure_folder_bytes(root: str) -> tuple[int, int, bool]: """Recursive file byte total under root. Returns (bytes, file_count, truncated).""" total = 0 n = 0 truncated = False for dirpath, _dirnames, filenames in os.walk(root): for fn in filenames: if n >= MAX_FILES_WALK: return total, n, True fp = os.path.join(dirpath, fn) try: total += os.path.getsize(fp) n += 1 except OSError: pass return total, n, False class ListDirBody(BaseModel): path: str max_entries: int = Field(default=800, ge=1, le=5000) class FolderSizeBody(BaseModel): path: str class LargeFilesBody(BaseModel): path: str min_bytes: int = Field(ge=0) max_results: int = Field(default=80, ge=1, le=500) @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} @app.post("/api/list_dir") def api_list_dir(body: ListDirBody) -> list[dict[str, Any]]: try: root = os.path.abspath(os.path.normpath(body.path)) except Exception as e: raise HTTPException(400, str(e)) from e if not os.path.isdir(root): raise HTTPException(400, "not a directory") max_e = min(body.max_entries, 5000) try: with os.scandir(root) as it: raw = list(it)[:max_e] except OSError as e: return [ { "name": os.path.basename(root), "fullPath": root, "isDirectory": False, "sizeBytes": 0, "mtimeMs": 0, "error": str(e), } ] dirs = [e for e in raw if e.is_dir(follow_symlinks=False)] file_entries = [e for e in raw if not e.is_dir(follow_symlinks=False)] out: list[dict[str, Any]] = [] for e in file_entries: try: st = e.stat(follow_symlinks=False) out.append( { "name": e.name, "fullPath": e.path, "isDirectory": False, "sizeBytes": st.st_size, "mtimeMs": int(st.st_mtime * 1000), } ) except OSError as ex: out.append( { "name": e.name, "fullPath": e.path, "isDirectory": False, "sizeBytes": 0, "mtimeMs": 0, "error": str(ex), } ) workers = min(8, max(1, len(dirs))) if dirs: with ThreadPoolExecutor(max_workers=workers) as ex: future_to_ent = {ex.submit(measure_folder_bytes, d.path): d for d in dirs} for fut in as_completed(future_to_ent): d = future_to_ent[fut] try: bytes_total, _n, truncated = fut.result() try: st = os.stat(d.path) mtime_ms = int(st.st_mtime * 1000) except OSError: mtime_ms = 0 out.append( { "name": d.name, "fullPath": d.path, "isDirectory": True, "sizeBytes": bytes_total, "mtimeMs": mtime_ms, "sizeTruncated": truncated, } ) except Exception as ex: # noqa: BLE001 out.append( { "name": d.name, "fullPath": d.path, "isDirectory": True, "sizeBytes": 0, "mtimeMs": 0, "error": str(ex), } ) out.sort( key=lambda x: ( 0 if x.get("isDirectory") else 1, -int(x.get("sizeBytes") or 0), str(x.get("name") or ""), ) ) return out @app.post("/api/folder_size") def api_folder_size(body: FolderSizeBody) -> dict[str, Any]: try: root = os.path.abspath(os.path.normpath(body.path)) except Exception as e: raise HTTPException(400, str(e)) from e if not os.path.isdir(root): raise HTTPException(400, "not a directory") b, n, t = measure_folder_bytes(root) return {"bytes": b, "files": n, "truncated": t} @app.post("/api/large_files") def api_large_files(body: LargeFilesBody) -> list[dict[str, Any]]: try: root = os.path.abspath(os.path.normpath(body.path)) except Exception as e: raise HTTPException(400, str(e)) from e if not os.path.isdir(root): raise HTTPException(400, "not a directory") cap = min(body.max_results, 500) results: list[dict[str, Any]] = [] for dirpath, _dn, filenames in os.walk(root): for fn in filenames: if len(results) >= cap: break fp = os.path.join(dirpath, fn) try: sz = os.path.getsize(fp) if sz >= body.min_bytes: results.append({"path": fp, "sizeBytes": sz}) except OSError: pass if len(results) >= cap: break results.sort(key=lambda x: -x["sizeBytes"]) return results[:cap] @app.get("/api/processes") def api_processes() -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for p in psutil.process_iter( ["pid", "name", "memory_info", "cpu_times", "cmdline"] ): try: info = p.info mi = info.get("memory_info") rss = mi.rss if mi else 0 ct = info.get("cpu_times") cpu_s = None if ct: cpu_s = round(ct.user + ct.system, 2) cmd = info.get("cmdline") or [] cmdline = " ".join(cmd) if isinstance(cmd, list) else str(cmd) if len(cmdline) > 8000: cmdline = cmdline[:8000] + "…" rows.append( { "pid": int(info["pid"]), "name": str(info.get("name") or ""), "memoryBytes": int(rss), "cpuSeconds": cpu_s, "commandLine": cmdline, } ) except (psutil.NoSuchProcess, psutil.AccessDenied): continue return rows @app.get("/api/drives") def api_drives() -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for part in psutil.disk_partitions(all=False): if os.name == "nt" and "cdrom" in (part.opts or "").lower(): continue try: u = psutil.disk_usage(part.mountpoint) dev = part.device.rstrip("\\/") out.append( { "letter": dev if dev else part.mountpoint, "mount": part.mountpoint, "label": part.fstype or "", "totalBytes": int(u.total), "freeBytes": int(u.free), "usedBytes": int(u.used), } ) except OSError: continue return out @app.get("/api/system") def api_system() -> dict[str, Any]: vm = psutil.virtual_memory() boot = psutil.boot_time() try: user = os.getlogin() except OSError: user = os.environ.get("USERNAME", os.environ.get("USER", "")) cpus = os.cpu_count() or 1 model = "" if platform.system() == "Windows": try: import ctypes # noqa: PLC0415 buf = ctypes.create_unicode_buffer(256) if ctypes.windll.kernel32.GetEnvironmentVariableW("PROCESSOR_IDENTIFIER", buf, 256): model = buf.value except Exception: # noqa: BLE001 model = platform.processor() or "" else: model = platform.processor() or "" return { "hostname": socket.gethostname(), "platform": sys.platform, "release": platform.release(), "arch": platform.machine(), "uptimeSec": time.time() - boot, "totalMem": int(vm.total), "freeMem": int(vm.available), "cpuModel": model, "cpuCount": cpus, "load1": 0.0, "load5": 0.0, "load15": 0.0, "userInfo": user, "homedir": str(os.path.expanduser("~")), "tmpdir": tempfile.gettempdir(), } @app.get("/api/network") def api_network() -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] link_fam = getattr(psutil, "AF_LINK", None) for name, addrs in psutil.net_if_addrs().items(): for a in addrs: if a.family == socket.AF_INET: fam_s = "IPv4" elif a.family == socket.AF_INET6: fam_s = "IPv6" elif link_fam is not None and a.family == link_fam: fam_s = "MAC" else: fam_s = str(a.family) addr = a.address internal = addr.startswith("127.") or addr == "::1" mac = addr if fam_s == "MAC" else None rows.append( { "name": name, "address": addr, "family": fam_s, "internal": internal, "mac": mac, } ) return rows @app.get("/api/env") def api_env() -> dict[str, str]: return dict(os.environ) if __name__ == "__main__": import uvicorn # noqa: PLC0415 port = int(os.environ.get("AUDITOR_PY_PORT", "54789")) uvicorn.run(app, host="127.0.0.1", port=port, log_level="warning")