OpenMythos / agent /shell.py
KingNish's picture
Shell Fix
6ee487c
Raw
History Blame Contribute Delete
7.76 kB
"""Shell manager for running interactive subprocesses with streaming output."""
from __future__ import annotations
import shutil
import subprocess
import tempfile
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import IO
IDLE_TIMEOUT_SECONDS = 15 * 60 # 15 minutes
@dataclass
class ShellSession:
"""A running shell session."""
session_id: str
process: subprocess.Popen[str]
temp_dir: Path
output_buffer: list[str] = field(default_factory=list)
last_read_pos: int = 0
started_at: float = field(default_factory=time.time)
last_used_at: float = field(default_factory=time.time)
closed: bool = False
@property
def pid(self) -> int | None:
return self.process.pid
@property
def returncode(self) -> int | None:
return self.process.returncode
def touch(self) -> None:
"""Update last_used_at to current time."""
self.last_used_at = time.time()
def read_new_output(self) -> str:
"""Read new output since last read."""
self.touch()
new_lines = self.output_buffer[self.last_read_pos :]
self.last_read_pos = len(self.output_buffer)
return "".join(new_lines)
def get_full_output(self) -> str:
"""Get all output."""
self.touch()
return "".join(self.output_buffer)
def is_running(self) -> bool:
"""Check if process is still running."""
return self.process.poll() is None
def is_idle_expired(self) -> bool:
"""Check if session has been idle longer than IDLE_TIMEOUT_SECONDS."""
return (time.time() - self.last_used_at) > IDLE_TIMEOUT_SECONDS
def close(self) -> None:
"""Close the session and clean up temp directory."""
if not self.closed:
self.closed = True
try:
self.process.terminate()
self.process.wait(timeout=5)
except Exception:
try:
self.process.kill()
except Exception:
pass
# Clean up temp directory (retry a few times for Windows file locks)
if self.temp_dir.exists():
for _ in range(3):
try:
shutil.rmtree(self.temp_dir, ignore_errors=False)
break
except Exception:
time.sleep(0.1)
class ShellManager:
"""Manages multiple shell sessions with streaming output."""
def __init__(self, default_timeout: float = 15.0) -> None:
self.sessions: dict[str, ShellSession] = {}
self._lock = threading.Lock()
self.default_timeout = default_timeout
self._cleanup_thread = threading.Thread(
target=self._cleanup_loop, daemon=True
)
self._cleanup_thread.start()
def _cleanup_loop(self) -> None:
"""Background thread that cleans up idle sessions every 60 seconds."""
while True:
time.sleep(60)
self._cleanup_idle_sessions()
def _cleanup_idle_sessions(self) -> None:
"""Close sessions that have been idle for more than IDLE_TIMEOUT_SECONDS."""
with self._lock:
idle_sessions = [
sid
for sid, session in self.sessions.items()
if session.is_idle_expired()
]
for sid in idle_sessions:
session = self.sessions.pop(sid, None)
if session:
session.close()
def start(
self,
session_id: str,
command: str,
cwd: str | None = None,
env: dict[str, str] | None = None,
) -> ShellSession:
"""Start a new shell session in a fresh temp directory."""
with self._lock:
# Close existing session with same ID
if session_id in self.sessions:
self.sessions[session_id].close()
# Create a unique temp directory for this session
temp_dir = Path(tempfile.mkdtemp(prefix=f"shell_{session_id}_"))
# Merge env with inherited environment
import os
process_env = os.environ.copy()
if env:
process_env.update(env)
# Use shell=True for interactive commands
process = subprocess.Popen(
command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
cwd=str(temp_dir),
env=process_env,
)
session = ShellSession(
session_id=session_id,
process=process,
temp_dir=temp_dir,
)
self.sessions[session_id] = session
# Start output reader thread
thread = threading.Thread(
target=self._read_output, args=(session,), daemon=True
)
thread.start()
return session
def _read_output(self, session: ShellSession) -> None:
"""Read output from process in background thread."""
assert session.process.stdout is not None
try:
for line in session.process.stdout:
with self._lock:
session.output_buffer.append(line)
except Exception:
pass
finally:
with self._lock:
session.closed = True
def send_input(self, session_id: str, input_text: str) -> bool:
"""Send input to a running session."""
with self._lock:
session = self.sessions.get(session_id)
if session is None or not session.is_running():
return False
if session.process.stdin is None:
return False
try:
session.process.stdin.write(input_text + "\n")
session.process.stdin.flush()
session.touch()
return True
except Exception:
return False
def get_output(self, session_id: str) -> str | None:
"""Get full output from a session."""
with self._lock:
session = self.sessions.get(session_id)
if session is None:
return None
return session.get_full_output()
def poll_output(self, session_id: str) -> str | None:
"""Get new output since last poll."""
with self._lock:
session = self.sessions.get(session_id)
if session is None:
return None
return session.read_new_output()
def is_running(self, session_id: str) -> bool:
"""Check if a session is still running."""
with self._lock:
session = self.sessions.get(session_id)
if session is None:
return False
return session.is_running()
def close(self, session_id: str) -> None:
"""Close a session."""
with self._lock:
session = self.sessions.get(session_id)
if session:
session.close()
def close_all(self) -> None:
"""Close all sessions."""
with self._lock:
for session in self.sessions.values():
session.close()
self.sessions.clear()
# Global shell manager instance
_shell_manager: ShellManager | None = None
def get_shell_manager() -> ShellManager:
"""Get or create the global shell manager."""
global _shell_manager
if _shell_manager is None:
_shell_manager = ShellManager()
return _shell_manager