| from typing import List, Dict, Tuple |
| import os |
| import json |
| from copy import deepcopy |
| import re |
| import sys |
| import time |
|
|
| import xml.etree.ElementTree as ET |
|
|
|
|
|
|
| |
| try: |
| from models.Pangu import Pangu |
| except ImportError: |
| print("Warning: Could not import Pangu model. Debugging might fail if Pangu is not defined.") |
|
|
| from .Base import BaseStrategy |
| from models.Base import BaseModel |
| from results.Results import Results |
| from datasets.Dataset import Dataset |
| from datasets.APPSDataset import APPSDataset |
| from datasets.XCodeDataset import XCodeDataset |
| from datasets.HumanEvalDataset import HumanDataset |
| from datasets.CodeContestDataset import CodeContestDataset |
| from datasets.MBPPDataset import MBPPDataset |
|
|
|
|
| class DebateCoder(BaseStrategy): |
| """ |
| Multi-Agent Debate-Based Planning Strategy |
| |
| 基于多智能体辩论的规划策略,通过三个不同角色的代理进行多轮辩论来生成高质量的代码规划。 |
| """ |
| |
| def __init__( |
| self, |
| rounds: int = 3, |
| early_stop_threshold: float = 95.0, |
| t: int = 3, |
| *args, |
| **kwargs |
| ): |
| super().__init__(*args, **kwargs) |
| self.rounds = rounds |
| self.early_stop_threshold = early_stop_threshold |
| self.t = t |
| self.log_dir = "./outputs/responses" |
| |
| |
| self.pr_tok = 0 |
| self.com_tok = 0 |
| |
| os.makedirs(self.log_dir, exist_ok=True) |
| |
| def log_response(self, response: str, stage: str, item: dict): |
| """记录响应到日志文件""" |
| log_file = os.path.join(self.log_dir, f"DebateCoder_{self.model.__class__.__name__}_responses.log") |
| |
| try: |
| with open(log_file, "a", encoding="utf-8") as f: |
| from datetime import datetime |
| f.write(f"\n---\n") |
| f.write(f"# timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") |
| f.write(f"# dataset: {self.data.__class__.__name__}\n") |
| f.write(f"# id: {item.get('task_id', item.get('name', 'unknown'))}\n") |
| f.write(f"# kind: {stage}\n") |
| f.write(response) |
| f.write(f"\n") |
| except Exception as e: |
| print(f"Failed to log response: {e}") |
| |
| def get_agent_role_prompt(self, role: str) -> str: |
| """获取不同角色的系统提示""" |
| role_prompts = { |
| "UA": """You are a User Agent (UA) focusing on functionality completeness and usability. |
| Your responsibility is to ensure: |
| - The solution meets all user requirements |
| - The code is easy to understand and use |
| - Edge cases from a user perspective are handled |
| - The interface is intuitive and clear""", |
| |
| "TA": """You are a Technical Agent (TA) focusing on technical feasibility and performance efficiency. |
| Your responsibility is to ensure: |
| - The solution is technically sound and implementable |
| - The algorithm is efficient with optimal time/space complexity |
| - Best practices and design patterns are followed |
| - The code is maintainable and scalable""", |
| |
| "QA": """You are a QA Agent (QA) focusing on robustness with boundary conditions and exception handling. |
| Your responsibility is to ensure: |
| - All edge cases and boundary conditions are handled |
| - Exception handling is comprehensive |
| - Input validation is thorough |
| - The solution is stable and reliable""" |
| } |
| return role_prompts.get(role, "") |
| |
| def generate_initial_plan(self, item: dict, role: str) -> str: |
| """生成初始计划(第一轮)""" |
| role_prompt = self.get_agent_role_prompt(role) |
| problem_description = self.data.get_prompt(item) |
| messages = [ |
| { |
| "role": "system", |
| "content": role_prompt |
| }, |
| { |
| "role": "user", |
| "content": f"""Given the following programming problem, create a detailed step-by-step plan to solve it from your perspective as {role}. |
| |
| # Problem: |
| {problem_description} |
| |
| # Your Task: |
| Generate a clear, actionable plan with specific steps. Focus on aspects that align with your role's responsibilities. |
| |
| # Output Format: |
| Provide a numbered list of steps to solve this problem.""" |
| } |
| ] |
| |
| print(f"\n{'='*60}") |
| print(f"[Round 1] {role} - Initial Planning") |
| print(f"{'='*60}") |
| print("messages:", messages) |
| response, pr_tok, com_tok = self.gpt_chat(messages) |
| item['api_calls'] = item.get('api_calls', 0) + 1 |
| |
| |
| self.pr_tok += pr_tok |
| self.com_tok += com_tok |
| |
| self.log_response(response, f"Round-1-{role}-Plan", item) |
| |
| print(f"\n{role} Plan:\n{response[:300]}...") |
| |
| return response |
| |
| def generate_debate_plan(self, item: dict, role: str, round_num: int, |
| own_prev_plan: str, other_plans: Dict[str, str]) -> str: |
| """生成辩论后的计划(第2轮及之后)""" |
| role_prompt = self.get_agent_role_prompt(role) |
| problem_description = self.data.get_prompt(item) |
| |
| |
| other_plans_text = "" |
| for other_role, other_plan in other_plans.items(): |
| other_plans_text += f"\n## {other_role}'s Previous Plan:\n{other_plan}\n" |
| |
| messages = [ |
| { |
| "role": "system", |
| "content": role_prompt |
| }, |
| { |
| "role": "user", |
| "content": f"""Given the following programming problem, you previously created a plan. Now, review the plans from other agents and refine your plan. |
| |
| # Problem: |
| {problem_description} |
| |
| # Your Previous Plan: |
| {own_prev_plan} |
| |
| # Other Agents' Plans: |
| {other_plans_text} |
| |
| # Your Task: |
| 1. Analyze the strengths and weaknesses of other agents' plans |
| 2. Compare them with your previous plan |
| 3. Refine your plan by: |
| - Incorporating good ideas from others |
| - Addressing issues you identified |
| - Maintaining focus on your role's responsibilities ({role}) |
| |
| # Output Format: |
| Provide an improved, numbered list of steps to solve this problem.""" |
| } |
| ] |
| |
| print(f"\n{'='*60}") |
| print(f"[Round {round_num}] {role} - Refining Plan") |
| print(f"{'='*60}") |
| |
| response, pr_tok, com_tok = self.gpt_chat(messages) |
| item['api_calls'] = item.get('api_calls', 0) + 1 |
| |
| |
| self.pr_tok += pr_tok |
| self.com_tok += com_tok |
| |
| self.log_response(response, f"Round-{round_num}-{role}-Plan", item) |
| |
| print(f"\n{role} Refined Plan:\n{response[:300]}...") |
| |
| return response |
| |
| def fuse_plans(self, item: dict, plans: Dict[str, str]) -> str: |
| """融合三个代理的最终计划""" |
| problem_description = self.data.get_prompt(item) |
| |
| plans_text = "" |
| for role, plan in plans.items(): |
| plans_text += f"\n## {role}'s Final Plan:\n{plan}\n" |
| |
| messages = [ |
| { |
| "role": "user", |
| "content": f"""Given the following programming problem and three different planning perspectives, create a comprehensive final plan that integrates the strengths of all three approaches. |
| |
| # Problem: |
| {problem_description} |
| |
| # Three Agents' Final Plans: |
| {plans_text} |
| |
| # Your Task: |
| Synthesize these three plans into ONE cohesive, comprehensive plan that: |
| 1. Ensures functionality completeness and usability (from UA) |
| 2. Maintains technical feasibility and efficiency (from TA) |
| 3. Handles edge cases and exceptions robustly (from QA) |
| |
| # Output Format: |
| Provide a clear, numbered list of steps that represents the best synthesis of all three perspectives.""" |
| } |
| ] |
| |
| print(f"\n{'='*60}") |
| print(f"[Fusion] Combining Final Plans") |
| print(f"{'='*60}") |
| |
| fused_plan, pr_tok, com_tok = self.gpt_chat(messages) |
| item['api_calls'] = item.get('api_calls', 0) + 1 |
| |
| |
| self.pr_tok += pr_tok |
| self.com_tok += com_tok |
| |
| self.log_response(fused_plan, "Fused-Plan", item) |
| |
| print(f"\nFused Plan:\n{fused_plan[:300]}...") |
| |
| return fused_plan |
| |
| def generate_code_from_plan(self, item: dict, final_plan: str) -> str: |
| """根据融合后的计划生成代码""" |
| problem_description = self.data.get_prompt(item) |
| |
| messages = [ |
| { |
| "role": "user", |
| "content": f"""Given the following programming problem and a detailed plan, implement the solution in {self.language}. |
| |
| # Problem: |
| {problem_description} |
| |
| # Detailed Plan: |
| {final_plan} |
| |
| # Your Task: |
| Write complete, working code that implements this plan. Ensure: |
| - The code follows the plan's steps |
| - All edge cases are handled |
| - The code is clean and well-structured |
| - It passes all test cases |
| |
| # Output Format: |
| Provide only the code implementation, without additional explanations.""" |
| } |
| ] |
| |
| print(f"\n{'='*60}") |
| print(f"[Code Generation] Implementing Solution") |
| print(f"{'='*60}") |
| |
| code, pr_tok, com_tok = self.gpt_chat(messages) |
| item['api_calls'] = item.get('api_calls', 0) + 1 |
| |
| |
| self.pr_tok += pr_tok |
| self.com_tok += com_tok |
| |
| try: |
| code = self.parse_code(code) |
| except IndexError as e: |
| print(f"parse_code raised IndexError: {e}. Will retry final code generation.", flush=True) |
| max_code_retries = 2 |
| parsed_success = False |
| for cretry in range(1, max_code_retries + 1): |
| retry_raw, pr_tok_r, com_tok_r = self.gpt_chat( |
| messages |
| ) |
| item['api_calls'] = item.get('api_calls', 0) + 1 |
| self.pr_tok += pr_tok_r |
| self.com_tok += com_tok_r |
|
|
| try: |
| retry_parsed = self.parse_code(retry_raw) |
| code = retry_parsed |
| parsed_success = True |
| self.log_response(retry_raw, f"final_code_retry_success-{cretry}", item) |
| break |
| except Exception as e2: |
| print(f"Retry {cretry} parse_code failed: {e2}", flush=True) |
| self.log_response(retry_raw, f"final_code_retry_failed-{cretry}", item) |
|
|
| if not parsed_success: |
| print("Final code generation: retries exhausted, using default fallback code.", flush=True) |
| lang = (self.language or "").lower() |
| if 'python' in lang: |
| code = 'print("")' |
| elif 'java' in lang: |
| code = 'public class Main { public static void main(String[] args) { } }' |
| elif 'c++' in lang or 'cpp' in lang: |
| code = 'int main() { return 0; }' |
| elif re.search(r"\bc\b", lang): |
| code = 'int main() { return 0; }' |
| elif 'js' in lang or 'node' in lang or 'javascript' in lang: |
| code = 'console.log("")' |
| else: |
| code = '' |
| |
| self.log_response(code, "final_code_fallback", item) |
|
|
| self.log_response(code, "Generated-Code", item) |
| |
| print(f"\nGenerated Code:\n{code[:300]}...") |
| |
| return code |
| |
| def evaluate_plan_confidence(self, item: dict, plan: str) -> float: |
| """评估计划的置信度""" |
| prompt = f"""Given a programming problem and a plan to solve it, evaluate the quality and completeness of the plan. |
| |
| # Problem: |
| {item.get('prompt', item.get('description', ''))} |
| |
| # Plan: |
| {plan} |
| |
| Rate the plan's quality on a scale of 0-100, where: |
| - 90-100: Excellent plan, covers all cases, clear implementation steps |
| - 70-89: Good plan, minor gaps but workable |
| - 50-69: Acceptable plan, has some issues |
| - 0-49: Poor plan, major gaps or errors |
| |
| Output ONLY a number between 0-100, nothing else.""" |
|
|
| messages = [{"role": "user", "content": prompt}] |
| |
| try: |
| response, pr_tok, com_tok = self.gpt_chat(messages) |
| |
| |
| self.pr_tok += pr_tok |
| self.com_tok += com_tok |
|
|
| |
| import re |
| match = re.search(r'\b(\d+(?:\.\d+)?)\b', response) |
| if match: |
| confidence = float(match.group(1)) |
| return min(100.0, max(0.0, confidence)) |
| return 50.0 |
| except Exception as e: |
| if hasattr(self, 'verbose') and self.verbose: |
| print(f"Failed to evaluate confidence: {e}") |
| return 50.0 |
|
|
| def parse_code(self, response: str) -> str: |
| if "```" not in response: |
| return response |
|
|
| code_pattern = r'```((.|\n)*?)```' |
| if "```Python" in response: |
| code_pattern = r'```Python((.|\n)*?)```' |
| if "```Python3" in response: |
| code_pattern = r'```Python3((.|\n)*?)```' |
| if "```python" in response: |
| code_pattern = r'```python((.|\n)*?)```' |
| if "```python3" in response: |
| code_pattern = r'```python3((.|\n)*?)```' |
| if "```C" in response: |
| code_pattern = r'```C((.|\n)*?)```' |
| if "```c" in response: |
| code_pattern = r'```c((.|\n)*?)```' |
| if "```C++" in response: |
| code_pattern = r'```C\+\+((.|\n)*?)```' |
| if "```c++" in response: |
| code_pattern = r'```c\+\+((.|\n)*?)```' |
| if "```Java" in response: |
| code_pattern = r'```Java((.|\n)*?)```' |
| if "```java" in response: |
| code_pattern = r'```java((.|\n)*?)```' |
| if "```Node" in response: |
| code_pattern = r'```Node((.|\n)*?)```' |
| if "```node" in response: |
| code_pattern = r'```node((.|\n)*?)```' |
| if "```Rust" in response: |
| code_pattern = r'```Rust((.|\n)*?)```' |
| if "```rust" in response: |
| code_pattern = r'```rust((.|\n)*?)```' |
| if "```PHP" in response: |
| code_pattern = r'```PHP((.|\n)*?)```' |
| if "```php" in response: |
| code_pattern = r'```php((.|\n)*?)```' |
| if "```Go" in response: |
| code_pattern = r'```Go((.|\n)*?)```' |
| if "```go" in response: |
| code_pattern = r'```go((.|\n)*?)```' |
| if "```Ruby" in response: |
| code_pattern = r'```Ruby((.|\n)*?)```' |
| if "```ruby" in response: |
| code_pattern = r'```ruby((.|\n)*?)```' |
| if "```C#" in response: |
| code_pattern = r'```C#((.|\n)*?)```' |
| if "```c#" in response: |
| code_pattern = r'```c#((.|\n)*?)```' |
| if "```csharp" in response: |
| code_pattern = r'```csharp((.|\n)*?)```' |
|
|
| code_blocks = re.findall(code_pattern, response, re.DOTALL) |
|
|
| if type(code_blocks[-1]) == tuple or type(code_blocks[-1]) == list: |
| code_str = "\n".join(code_blocks[-1]) |
| elif type(code_blocks[-1]) == str: |
| code_str = code_blocks[-1] |
| else: |
| code_str = response |
|
|
| return code_str |
|
|
|
|
| def Reviewer_pangu1b(self,problem_prompt: str, plan: str, code: str, test_log: str, task_id: str = "unknown") -> Tuple[str, int, int]: |
| """ |
| Reviewer 角色:分析代码失败原因并提供修复计划。 |
| |
| Args: |
| problem_prompt (str): 题目描述。 |
| code (str):生成的代码。 |
| test_log (str): 测试失败的日志报告。 |
| task_id (str): 当前任务的ID,用于日志打印 (对应原代码中的 i)。 |
| |
| Returns: |
| Tuple[str, int, int]: 返回 (分析结果, prompt_token消耗, completion_token消耗) |
| """ |
| |
| |
| reviewer_input = [ |
| { |
| "role": "user", |
| "content": f"You are an expert programmer. The following code was generated to solve a problem but failed sample test cases.\n\n## Problem:\n{problem_prompt}\n\n## Plan:\n{plan}\n \n## Generated Code:\n```\n{code}\n```\n\n## Test Report:\n{test_log}\n\nPlease analyze why the code failed and provide a specific plan to fix it. Do not generate the full code, just the analysis and fix plan." |
| } |
| ] |
| |
| print(f"Input for Reviewer analysis: {task_id}") |
|
|
| try: |
| |
| |
| pangu_model = Pangu() |
| analysis, q_pr_tok, q_com_tok = pangu_model.prompt(reviewer_input) |
| |
| print(f"Reviewer Analysis: {analysis}", flush=True) |
| return analysis, q_pr_tok, q_com_tok |
|
|
| except NameError: |
| print("Error: Pangu model class not found. Skipping detailed analysis.") |
| return "Code failed sample tests. Please check logic and edge cases.", 0, 0 |
| |
| except Exception as e: |
| print(f"Error during Reviewer analysis: {e}") |
| return "Code failed sample tests. Please check logic and edge cases.", 0, 0 |
|
|
|
|
| def debugging(self, plan: list, code: str, item: dict, algorithm_prompt: str) -> str: |
| passed = False |
| planning, _, _ = plan |
| |
| |
|
|
| if type(self.data) == APPSDataset or type(self.data) == CodeContestDataset or type(self.data) == XCodeDataset: |
| std_input_prompt = "## Note: Strictly follow the input and output format. The input should be taken from Standard input and output should be given to standard output. If you are writing a function then after the function definition take input using `input()` function then call the function with specified parameters and finally print the output of the function. Do not add extra print statement otherwise it will failed the test cases." |
| else: |
| std_input_prompt = "" |
| |
| for i in range(1, self.t + 1): |
| passed, test_log = self.data.evaluate_sample_io( |
| item, |
| code, |
| self.language |
| ) |
|
|
| if passed: |
| print(f"DEBUGGING: Test passed at round {i}") |
| break |
| |
| problem_prompt = self.data.get_prompt(item) |
|
|
| |
| analysis, pr_cost, com_cost = self.Reviewer_pangu1b( |
| problem_prompt=problem_prompt, |
| plan = planning, |
| code=code, |
| test_log=test_log, |
| task_id=i |
| ) |
| print(f"pr_cost: {pr_cost}, com_cost: {com_cost}") |
| |
| self.pr_tok += pr_cost |
| self.com_tok += com_cost |
|
|
| print(f"Input for improving code generation: {i}") |
| input_for_improving_code = [ |
| { |
| "role": "user", |
| "content": f" You are an expert competitive programmer. Your task is to fix the provided {self.language} code based on the Expert Analysis. The original code failed sample test cases.\n\n### Problem Description: \n{self.data.get_prompt(item)}\n ### Original Code (Buggy): \n```\n{code}\n```\n### Expert Analysis & Fix Plan:\n{analysis}\n\nImprove your code to solve the problem correctly based on this analysis.\n### Requirement: 1. Read the Expert Analysis carefully. 2. {std_input_prompt} 3. Generate the corrected code.\n\n----------------\nImportant:\n{std_input_prompt}\n## Your response must contain the modified planning and then the {self.language} code inside ``` block to solve this problem." } |
| ] |
|
|
| print("\n\n________________________") |
| print("Input for improving code generation: ") |
| print(input_for_improving_code[0]['content'], flush=True) |
|
|
| response, pr_tok_1, com_tok_1 = self.gpt_chat( |
| input_for_improving_code |
| ) |
| item['api_calls'] += 1 |
|
|
| self.pr_tok += pr_tok_1 |
| self.com_tok += com_tok_1 |
|
|
| raw_code = deepcopy(code) |
| try: |
| |
| code = self.parse_code(response) |
| except IndexError as e: |
| print(f"parse_code raised IndexError: {e}. Will retry final code generation.", flush=True) |
| max_code_retries = 2 |
| parsed_success = False |
| for cretry in range(1, max_code_retries + 1): |
| retry_raw, pr_tok_r, com_tok_r = self.gpt_chat( |
| input_for_improving_code |
| ) |
| item['api_calls'] = item.get('api_calls', 0) + 1 |
| self.pr_tok += pr_tok_r |
| self.com_tok += com_tok_r |
|
|
| try: |
| retry_parsed = self.parse_code(retry_raw) |
| code = retry_parsed |
| parsed_success = True |
| self.log_response(retry_raw, f"final_code_retry_success-{cretry}", item) |
| break |
| except Exception as e2: |
| print(f"Retry {cretry} parse_code failed: {e2}", flush=True) |
| self.log_response(retry_raw, f"final_code_retry_failed-{cretry}", item) |
|
|
| if not parsed_success: |
| print("Final code generation: retries exhausted, using raw code.", flush=True) |
| code = raw_code |
| self.log_response(code, "final_code_fallback", item) |
| |
| return code |
|
|
| def run_single_pass(self, item: dict): |
| """执行单个问题的多智能体辩论流程""" |
| self.pr_tok = 0 |
| self.com_tok = 0 |
| |
| print("\n" + "="*80) |
| print(f"Processing: {item.get('task_id', item.get('name', 'unknown'))}") |
| print("="*80) |
| |
| agents = ["UA", "TA", "QA"] |
| |
| plans_history = {agent: [] for agent in agents} |
| actual_rounds = 1 |
| |
| |
| print(f"\n{'#'*80}") |
| print(f"# ROUND 1: Initial Planning") |
| print(f"{'#'*80}") |
| |
| current_plans = {} |
| for agent in agents: |
| plan = self.generate_initial_plan(item, agent) |
| plans_history[agent].append(plan) |
| current_plans[agent] = plan |
| |
| |
| if self.rounds > 1: |
| confidences = {} |
| print(f"\n{'='*60}") |
| print("Evaluating Round 1 Plan Quality...") |
| print(f"{'='*60}") |
| |
| for agent in agents: |
| conf = self.evaluate_plan_confidence(item, current_plans[agent]) |
| confidences[agent] = conf |
| print(f"{agent} Plan Confidence: {conf:.1f}%") |
| |
| avg_confidence = sum(confidences.values()) / len(confidences) |
| print(f"Average Confidence: {avg_confidence:.1f}%") |
| |
| if avg_confidence >= self.early_stop_threshold: |
| print(f"\n✓ High confidence achieved! Skipping remaining rounds.") |
| item['actual_rounds'] = actual_rounds |
| item['early_stopped'] = True |
| item['final_confidence'] = avg_confidence |
| |
| final_plan = self.fuse_plans(item, current_plans) |
| code = self.generate_code_from_plan(item, final_plan) |
| |
| |
| print(f"\n{'#'*80}") |
| print(f"# DEBUGGING (Early Stop)") |
| print(f"{'#'*80}") |
| code = self.debugging([final_plan, None, None], code, item, "Please strictly follow the plan.") |
| |
| return code, self.pr_tok, self.com_tok |
| |
| |
| for round_num in range(2, self.rounds + 1): |
| actual_rounds = round_num |
| print(f"\n{'#'*80}") |
| print(f"# ROUND {round_num}: Debate and Refinement") |
| print(f"{'#'*80}") |
| |
| new_plans = {} |
| for agent in agents: |
| other_plans = {a: current_plans[a] for a in agents if a != agent} |
| refined_plan = self.generate_debate_plan( |
| item, agent, round_num, |
| current_plans[agent], other_plans |
| ) |
| plans_history[agent].append(refined_plan) |
| new_plans[agent] = refined_plan |
| |
| current_plans = new_plans |
| |
| if round_num < self.rounds: |
| confidences = {} |
| print(f"\n{'='*60}") |
| print(f"Evaluating Round {round_num} Plan Quality...") |
| print(f"{'='*60}") |
| |
| for agent in agents: |
| conf = self.evaluate_plan_confidence(item, current_plans[agent]) |
| confidences[agent] = conf |
| print(f"{agent} Plan Confidence: {conf:.1f}%") |
| |
| avg_confidence = sum(confidences.values()) / len(confidences) |
| if avg_confidence >= self.early_stop_threshold: |
| print(f"\n✓ High confidence achieved! Skipping remaining rounds.") |
| item['actual_rounds'] = actual_rounds |
| item['early_stopped'] = True |
| item['final_confidence'] = avg_confidence |
| break |
| |
| if 'actual_rounds' not in item: |
| item['actual_rounds'] = actual_rounds |
| item['early_stopped'] = False |
| |
| |
| print(f"\n{'#'*80}") |
| print(f"# FUSION: Combining All Perspectives") |
| print(f"{'#'*80}") |
| final_plan = self.fuse_plans(item, current_plans) |
| |
| |
| print(f"\n{'#'*80}") |
| print(f"# CODE GENERATION") |
| print(f"{'#'*80}") |
| code = self.generate_code_from_plan(item, final_plan) |
|
|
| |
| print(f"\n{'#'*80}") |
| print(f"# DEBUGGING") |
| print(f"{'#'*80}") |
| code = self.debugging([final_plan, None, None], code, item, "Please strictly follow the plan.") |
| |
| print(f"\n{'='*80}") |
| print(f"Completed: {item.get('task_id', item.get('name', 'unknown'))}") |
| |
| |
| return code, self.pr_tok, self.com_tok |