| |
|
| |
|
| | import subprocess |
| | import threading |
| | import queue |
| | import time |
| | import traceback |
| | from .base_code_interpreter import BaseCodeInterpreter |
| |
|
| | class SubprocessCodeInterpreter(BaseCodeInterpreter): |
| | def __init__(self): |
| | self.start_cmd = "" |
| | self.process = None |
| | self.debug_mode = False |
| | self.output_queue = queue.Queue() |
| | self.done = threading.Event() |
| |
|
| | def detect_active_line(self, line): |
| | return None |
| | |
| | def detect_end_of_execution(self, line): |
| | return None |
| | |
| | def line_postprocessor(self, line): |
| | return line |
| | |
| | def preprocess_code(self, code): |
| | """ |
| | This needs to insert an end_of_execution marker of some kind, |
| | which can be detected by detect_end_of_execution. |
| | |
| | Optionally, add active line markers for detect_active_line. |
| | """ |
| | return code |
| | |
| | def terminate(self): |
| | self.process.terminate() |
| |
|
| | def start_process(self): |
| | if self.process: |
| | self.terminate() |
| |
|
| | self.process = subprocess.Popen(self.start_cmd.split(), |
| | stdin=subprocess.PIPE, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | bufsize=0, |
| | universal_newlines=True) |
| | threading.Thread(target=self.handle_stream_output, |
| | args=(self.process.stdout, False), |
| | daemon=True).start() |
| | threading.Thread(target=self.handle_stream_output, |
| | args=(self.process.stderr, True), |
| | daemon=True).start() |
| |
|
| | def run(self, code): |
| | retry_count = 0 |
| | max_retries = 3 |
| |
|
| | |
| | try: |
| | code = self.preprocess_code(code) |
| | if not self.process: |
| | self.start_process() |
| | except: |
| | yield {"output": traceback.format_exc()} |
| | return |
| | |
| |
|
| | while retry_count <= max_retries: |
| | if self.debug_mode: |
| | print(f"Running code:\n{code}\n---") |
| |
|
| | self.done.clear() |
| |
|
| | try: |
| | self.process.stdin.write(code + "\n") |
| | self.process.stdin.flush() |
| | break |
| | except: |
| | if retry_count != 0: |
| | |
| | |
| | |
| | yield {"output": traceback.format_exc()} |
| | yield {"output": f"Retrying... ({retry_count}/{max_retries})"} |
| | yield {"output": "Restarting process."} |
| |
|
| | self.start_process() |
| |
|
| | retry_count += 1 |
| | if retry_count > max_retries: |
| | yield {"output": "Maximum retries reached. Could not execute code."} |
| | return |
| |
|
| | while True: |
| | if not self.output_queue.empty(): |
| | yield self.output_queue.get() |
| | else: |
| | time.sleep(0.1) |
| | try: |
| | output = self.output_queue.get(timeout=0.3) |
| | yield output |
| | except queue.Empty: |
| | if self.done.is_set(): |
| | |
| | |
| | for _ in range(3): |
| | if not self.output_queue.empty(): |
| | yield self.output_queue.get() |
| | time.sleep(0.2) |
| | break |
| |
|
| | def handle_stream_output(self, stream, is_error_stream): |
| | for line in iter(stream.readline, ''): |
| | if self.debug_mode: |
| | print(f"Received output line:\n{line}\n---") |
| |
|
| | line = self.line_postprocessor(line) |
| |
|
| | if line is None: |
| | continue |
| |
|
| | if self.detect_active_line(line): |
| | active_line = self.detect_active_line(line) |
| | self.output_queue.put({"active_line": active_line}) |
| | elif self.detect_end_of_execution(line): |
| | self.output_queue.put({"active_line": None}) |
| | time.sleep(0.1) |
| | self.done.set() |
| | elif is_error_stream and "KeyboardInterrupt" in line: |
| | self.output_queue.put({"output": "KeyboardInterrupt"}) |
| | time.sleep(0.1) |
| | self.done.set() |
| | else: |
| | self.output_queue.put({"output": line}) |
| |
|
| |
|