| import subprocess | |
| import json | |
| from pathlib import Path | |
| from typing import Any, Dict, List | |
| # Define the hardened whitelist | |
| ALLOWED_COMMANDS = {"iptables", "systemctl", "auditctl", "notify"} | |
| class SecurityCommandExecutor: | |
| def __init__(self, audit_log: Path): | |
| self.audit_log = audit_log | |
| def execute(self, command: str, args: List[str]) -> Dict[str, Any]: | |
| if command not in ALLOWED_COMMANDS: | |
| self._log_violation(command, args) | |
| return {"status": "DENIED", "command": command} | |
| proc = subprocess.run([command] + args, capture_output=True, text=True) | |
| return {"status": "SUCCESS" if proc.returncode == 0 else "FAIL", "output": proc.stdout} | |
| def _log_violation(self, cmd: str, args: List[str]): | |
| with open(self.audit_log, "a") as f: | |
| f.write(json.dumps({"event": "UNAUTHORIZED_EXECUTION_ATTEMPT", "cmd": cmd, "args": args}) + "\n") | |