| | import os |
| | import sys |
| | from ..subprocess_code_interpreter import SubprocessCodeInterpreter |
| | import ast |
| | import re |
| | import shlex |
| |
|
| | class Python(SubprocessCodeInterpreter): |
| | file_extension = "py" |
| | proper_name = "Python" |
| |
|
| | def __init__(self): |
| | super().__init__() |
| | executable = sys.executable |
| | if os.name != 'nt': |
| | executable = shlex.quote(executable) |
| | self.start_cmd = executable + " -i -q -u" |
| | |
| | def preprocess_code(self, code): |
| | return preprocess_python(code) |
| | |
| | def line_postprocessor(self, line): |
| | if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*)', line): |
| | return None |
| | return line |
| |
|
| | def detect_active_line(self, line): |
| | if "## active_line " in line: |
| | return int(line.split("## active_line ")[1].split(" ##")[0]) |
| | return None |
| |
|
| | def detect_end_of_execution(self, line): |
| | return "## end_of_execution ##" in line |
| | |
| |
|
| | def preprocess_python(code): |
| | """ |
| | Add active line markers |
| | Wrap in a try except |
| | Add end of execution marker |
| | """ |
| |
|
| | |
| | code = add_active_line_prints(code) |
| |
|
| | |
| | code = wrap_in_try_except(code) |
| |
|
| | |
| | |
| | code_lines = code.split("\n") |
| | code_lines = [c for c in code_lines if c.strip() != ""] |
| | code = "\n".join(code_lines) |
| |
|
| | |
| | code += '\n\nprint("## end_of_execution ##")' |
| |
|
| | return code |
| |
|
| |
|
| | def add_active_line_prints(code): |
| | """ |
| | Add print statements indicating line numbers to a python string. |
| | """ |
| | tree = ast.parse(code) |
| | transformer = AddLinePrints() |
| | new_tree = transformer.visit(tree) |
| | return ast.unparse(new_tree) |
| |
|
| |
|
| | class AddLinePrints(ast.NodeTransformer): |
| | """ |
| | Transformer to insert print statements indicating the line number |
| | before every executable line in the AST. |
| | """ |
| |
|
| | def insert_print_statement(self, line_number): |
| | """Inserts a print statement for a given line number.""" |
| | return ast.Expr( |
| | value=ast.Call( |
| | func=ast.Name(id='print', ctx=ast.Load()), |
| | args=[ast.Constant(value=f"## active_line {line_number} ##")], |
| | keywords=[] |
| | ) |
| | ) |
| |
|
| | def process_body(self, body): |
| | """Processes a block of statements, adding print calls.""" |
| | new_body = [] |
| |
|
| | |
| | if not isinstance(body, list): |
| | body = [body] |
| |
|
| | for sub_node in body: |
| | if hasattr(sub_node, 'lineno'): |
| | new_body.append(self.insert_print_statement(sub_node.lineno)) |
| | new_body.append(sub_node) |
| |
|
| | return new_body |
| |
|
| | def visit(self, node): |
| | """Overridden visit to transform nodes.""" |
| | new_node = super().visit(node) |
| |
|
| | |
| | if hasattr(new_node, 'body'): |
| | new_node.body = self.process_body(new_node.body) |
| |
|
| | |
| | if hasattr(new_node, 'orelse') and new_node.orelse: |
| | new_node.orelse = self.process_body(new_node.orelse) |
| |
|
| | |
| | if isinstance(new_node, ast.Try): |
| | for handler in new_node.handlers: |
| | handler.body = self.process_body(handler.body) |
| | if new_node.finalbody: |
| | new_node.finalbody = self.process_body(new_node.finalbody) |
| |
|
| | return new_node |
| | |
| |
|
| | def wrap_in_try_except(code): |
| | |
| | code = "import traceback\n" + code |
| |
|
| | |
| | parsed_code = ast.parse(code) |
| |
|
| | |
| | try_except = ast.Try( |
| | body=parsed_code.body, |
| | handlers=[ |
| | ast.ExceptHandler( |
| | type=ast.Name(id="Exception", ctx=ast.Load()), |
| | name=None, |
| | body=[ |
| | ast.Expr( |
| | value=ast.Call( |
| | func=ast.Attribute(value=ast.Name(id="traceback", ctx=ast.Load()), attr="print_exc", ctx=ast.Load()), |
| | args=[], |
| | keywords=[] |
| | ) |
| | ), |
| | ] |
| | ) |
| | ], |
| | orelse=[], |
| | finalbody=[] |
| | ) |
| |
|
| | |
| | parsed_code.body = [try_except] |
| |
|
| | |
| | return ast.unparse(parsed_code) |
| |
|