| | import os |
| | import json |
| | import pandas as pd |
| | import streamlit as st |
| | from collections import defaultdict |
| |
|
| | def clean_git_patch(git_patch): |
| | if 'diff' in git_patch: |
| | git_patch = git_patch[git_patch.index('diff'):] |
| | return git_patch |
| |
|
| |
|
| | def _load_report_legacy(instance_id_to_status, report): |
| | |
| | for status, instance_ids in report.items(): |
| | for instance_id in instance_ids: |
| | if status == 'resolved': |
| | instance_id_to_status[instance_id]['resolved'] = True |
| | elif status == 'applied': |
| | instance_id_to_status[instance_id]['applied'] = True |
| | elif status == 'test_timeout': |
| | instance_id_to_status[instance_id]['test_timeout'] = True |
| | elif status == 'test_errored': |
| | instance_id_to_status[instance_id]['test_errored'] = True |
| | elif status == 'no_generation': |
| | instance_id_to_status[instance_id]['empty_generation'] = True |
| |
|
| | def _load_report_new(instance_id_to_status, report): |
| | |
| | |
| | for instance_id in report['resolved_ids']: |
| | instance_id_to_status[instance_id]['resolved'] = True |
| | for instance_id in report['error_ids']: |
| | instance_id_to_status[instance_id]['error_eval'] = True |
| |
|
| | def load_df_from_selected_filepaths(select_filepaths): |
| | data = [] |
| | if isinstance(select_filepaths, str): |
| | select_filepaths = [select_filepaths] |
| | for filepath in select_filepaths: |
| | |
| | dirname = os.path.dirname(filepath) |
| | |
| | report_json = os.path.join(dirname, 'report.json') |
| |
|
| | instance_id_to_status = defaultdict(lambda: {'resolved': False}) |
| | if os.path.exists(report_json): |
| | with open(report_json, 'r') as f: |
| | report = json.load(f) |
| | if "resolved_ids" in report: |
| | _load_report_new(instance_id_to_status, report) |
| | else: |
| | _load_report_legacy(instance_id_to_status, report) |
| | else: |
| | pass |
| |
|
| | with open(filepath, 'r') as f: |
| | for line in f.readlines(): |
| | d = json.loads(line) |
| | |
| | if 'git_patch' in d: |
| | d['git_patch'] = clean_git_patch(d['git_patch']) |
| | if d['instance_id'] in instance_id_to_status: |
| | d['fine_grained_report'] = dict(instance_id_to_status[d['instance_id']]) |
| | data.append(d) |
| | df = pd.DataFrame(data) |
| | return df |
| |
|
| |
|
| | def agg_stats(df): |
| | stats = [] |
| | for idx, entry in df.iterrows(): |
| | history = entry['history'] |
| | test_result = entry['test_result']['result'] if 'result' in entry['test_result'] else entry['test_result'] |
| | error = entry.get('error', None) |
| | if error is not None and isinstance(error, str): |
| | agent_stuck_in_loop = "Agent got stuck in a loop" in error |
| | contains_error = bool(error) and not agent_stuck_in_loop |
| | else: |
| | agent_stuck_in_loop = False |
| | contains_error = False |
| |
|
| | |
| | if 'fine_grained_report' in entry: |
| | |
| | if not isinstance(entry['fine_grained_report'], dict): |
| | entry['fine_grained_report'] = {} |
| | test_result['resolved'] = entry['fine_grained_report'].get('resolved', False) |
| | test_result['test_timeout'] = entry['fine_grained_report'].get('test_timeout', False) |
| | test_result['test_errored'] = entry['fine_grained_report'].get('test_errored', False) |
| | test_result['patch_applied'] = entry['fine_grained_report'].get('applied', False) |
| | elif 'report' in entry: |
| | test_result['resolved'] = bool(entry['report'].get('resolved', False)) |
| | test_result['test_timeout'] = bool(entry['report'].get('test_timeout', False)) |
| | test_result['test_errored'] = bool(entry['report'].get('test_errored', False)) |
| | test_result['patch_applied'] = bool(entry['report'].get('apply_test_patch_success', False)) |
| |
|
| | metrics = entry.get('metrics', {}) |
| | cost = metrics.get('accumulated_cost', None) |
| |
|
| | d = { |
| | 'idx': idx, |
| | 'instance_id': entry['instance_id'], |
| | 'agent_class': entry['metadata']['agent_class'], |
| | 'model_name': entry['metadata']['llm_config']['model'] if 'llm_config' in entry['metadata'] else entry['metadata']['model_name'], |
| | **test_result, |
| | 'agent_stuck_in_loop': agent_stuck_in_loop, |
| | 'contains_error': contains_error, |
| | 'cost': cost, |
| | } |
| | if 'swe_instance' in entry: |
| | d.update( |
| | { |
| | 'repo': entry['swe_instance']['repo'], |
| | } |
| | ) |
| | stats.append(d) |
| | return pd.DataFrame(stats) |
| |
|
| | @st.cache_data |
| | def get_resolved_stats_from_filepath(filepath): |
| | df = load_df_from_selected_filepaths(filepath) |
| | stats = agg_stats(df) |
| | del df |
| | if not len(stats): |
| | return { |
| | 'success_rate': None, |
| | 'n_solved': None, |
| | 'n_error': None, |
| | 'total': None, |
| | 'total_cost': None, |
| | } |
| | tot_cost = stats['cost'].sum() |
| | resolved = stats['resolved'].sum() / len(stats) |
| | num_contains_error = stats['contains_error'].sum() |
| | num_agent_stuck_in_loop = stats['agent_stuck_in_loop'].sum() |
| | tot_instances = len(stats) |
| | return { |
| | 'success_rate': resolved, |
| | 'n_solved': stats['resolved'].sum(), |
| | 'n_error': num_contains_error, |
| | 'n_stuck_in_loop': num_agent_stuck_in_loop, |
| | 'total': tot_instances, |
| | 'total_cost': tot_cost, |
| | } |
| |
|