| | |
| | |
| |
|
| | """ |
| | Agent Manager |
| | |
| | This module provides the main orchestrator for the Code Review Agent. |
| | It coordinates the review process and manages the state of the application. |
| | """ |
| |
|
| | import os |
| | import time |
| | import logging |
| | import tempfile |
| | import json |
| | import threading |
| | import concurrent.futures |
| | from datetime import datetime |
| | import gradio as gr |
| |
|
| | from src.core.language_detector import LanguageDetector |
| | from src.services.code_analyzer import CodeAnalyzer |
| | from src.services.report_generator import ReportGenerator |
| | from src.services.repository_service import RepositoryService |
| | from src.services.security_scanner import SecurityScanner |
| | from src.services.performance_analyzer import PerformanceAnalyzer |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class AgentManager: |
| | """ |
| | Main orchestrator for the Code Review Agent. |
| | |
| | This class coordinates the review process, manages the application state, |
| | and provides the interface between the UI and the business logic. |
| | """ |
| | |
| | def __init__(self): |
| | """ |
| | Initialize the AgentManager. |
| | """ |
| | |
| | self.state = { |
| | 'repo_url': None, |
| | 'progress': {}, |
| | 'results': {}, |
| | 'current_step': None |
| | } |
| | |
| | |
| | self.language_detector = LanguageDetector() |
| | self.code_analyzer = CodeAnalyzer() |
| | self.report_generator = ReportGenerator() |
| | self.repository_service = RepositoryService() |
| | self.security_scanner = SecurityScanner() |
| | self.performance_analyzer = PerformanceAnalyzer() |
| | self.temp_dir = tempfile.mkdtemp(prefix="code_review_agent_") |
| | |
| | logger.info(f"Initialized AgentManager with temp directory: {self.temp_dir}") |
| | |
| | def start_review(self, repo_url, github_token=None, selected_languages=None, progress_components=None): |
| | """ |
| | Start the code review process for a GitHub repository. |
| | |
| | Args: |
| | repo_url (str): The URL of the GitHub repository to review. |
| | github_token (str, optional): GitHub authentication token for private repositories. |
| | selected_languages (list, optional): List of languages to analyze. If None, |
| | languages will be auto-detected. |
| | progress_components (tuple, optional): Tuple containing (progress_group, overall_progress, status_message, step_progress) |
| | from create_progress_tracker(). |
| | |
| | Returns: |
| | tuple: (progress_group, overall_progress, status_message, results_dashboard) - Updated UI components. |
| | """ |
| | |
| | if progress_components: |
| | progress_group, overall_progress, status_message, step_progress = progress_components |
| | else: |
| | progress_group = gr.Group(visible=True) |
| | overall_progress = gr.Slider(value=0) |
| | status_message = gr.Markdown("*Starting review...*") |
| | step_progress = {} |
| | |
| | try: |
| | |
| | self.state = { |
| | 'repo_url': repo_url, |
| | 'progress': {}, |
| | 'results': {}, |
| | 'current_step': None |
| | } |
| | |
| | self.step_progress = step_progress |
| | |
| | |
| | self._update_progress("Repository Cloning", 0, overall_progress, status_message) |
| | repo_path = self._clone_repository(repo_url, github_token) |
| | self._update_progress("Repository Cloning", 100, overall_progress, status_message) |
| | |
| | |
| | self._update_progress("Language Detection", 0, overall_progress, status_message) |
| | if selected_languages and len(selected_languages) > 0: |
| | languages = selected_languages |
| | logger.info(f"Using selected languages: {languages}") |
| | else: |
| | languages = self.language_detector.detect_languages(repo_path) |
| | logger.info(f"Auto-detected languages: {languages}") |
| | |
| | self.state['languages'] = languages |
| | self._update_progress("Language Detection", 100, overall_progress, status_message) |
| | |
| | |
| | self._update_progress("Code Analysis", 0, overall_progress, status_message) |
| | self._update_progress("Security Scanning", 0, overall_progress, status_message) |
| | self._update_progress("Performance Analysis", 0, overall_progress, status_message) |
| | self._update_progress("AI Review", 0, overall_progress, status_message) |
| | |
| | |
| | lock = threading.Lock() |
| | results = {} |
| | |
| | |
| | def run_code_analysis(): |
| | try: |
| | code_results = self.code_analyzer.analyze_repository(repo_path, languages) |
| | with lock: |
| | results['code_analysis'] = code_results |
| | self._update_progress("Code Analysis", 100, overall_progress, status_message) |
| | except Exception as e: |
| | logger.error(f"Error in code analysis thread: {e}") |
| | with lock: |
| | results['code_analysis'] = {'status': 'error', 'error': str(e)} |
| | self._update_progress("Code Analysis", 100, overall_progress, status_message) |
| | |
| | def run_security_scan(): |
| | try: |
| | security_results = self.security_scanner.scan_repository(repo_path, languages) |
| | with lock: |
| | results['security'] = security_results |
| | self._update_progress("Security Scanning", 100, overall_progress, status_message) |
| | except Exception as e: |
| | logger.error(f"Error in security scanning thread: {e}") |
| | with lock: |
| | results['security'] = {'status': 'error', 'error': str(e)} |
| | self._update_progress("Security Scanning", 100, overall_progress, status_message) |
| | |
| | def run_performance_analysis(): |
| | try: |
| | perf_results = self.performance_analyzer.analyze_repository(repo_path, languages) |
| | with lock: |
| | results['performance'] = perf_results |
| | self._update_progress("Performance Analysis", 100, overall_progress, status_message) |
| | except Exception as e: |
| | logger.error(f"Error in performance analysis thread: {e}") |
| | with lock: |
| | results['performance'] = {'status': 'error', 'error': str(e)} |
| | self._update_progress("Performance Analysis", 100, overall_progress, status_message) |
| | |
| | def run_ai_review(): |
| | try: |
| | ai_results = self._perform_ai_review(repo_path, languages) |
| | with lock: |
| | results['ai_review'] = ai_results |
| | self._update_progress("AI Review", 100, overall_progress, status_message) |
| | except Exception as e: |
| | logger.error(f"Error in AI review thread: {e}") |
| | with lock: |
| | results['ai_review'] = {'status': 'error', 'error': str(e)} |
| | self._update_progress("AI Review", 100, overall_progress, status_message) |
| | |
| | |
| | with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: |
| | executor.submit(run_code_analysis) |
| | executor.submit(run_security_scan) |
| | executor.submit(run_performance_analysis) |
| | executor.submit(run_ai_review) |
| | |
| | |
| | executor.shutdown(wait=True) |
| | |
| | |
| | with lock: |
| | self.state['results'].update(results) |
| | |
| | |
| | repo_info = self.repository_service.get_repository_info(repo_path) |
| | self.state['results']['repository_info'] = repo_info |
| | |
| | |
| | self._update_progress("Report Generation", 0, overall_progress, status_message) |
| | repo_name = repo_url.split('/')[-1].replace('.git', '') |
| | report_paths = self.report_generator.generate_report( |
| | repo_name, self.state['results'] |
| | ) |
| | self.state['report_paths'] = report_paths |
| | self._update_progress("Report Generation", 100, overall_progress, status_message) |
| | |
| | |
| | results_dashboard = self._create_results_dashboard(self.state['results']) |
| | results_dashboard.visible = True |
| | |
| | return progress_group, overall_progress, status_message, results_dashboard |
| | |
| | except Exception as e: |
| | logger.exception(f"Error during code review: {e}") |
| | |
| | status_message.value = f"*Error: {str(e)}*" |
| | return progress_group, overall_progress, status_message, None |
| | |
| | def export_report(self, results_dashboard, export_format): |
| | """ |
| | Export the code review report in the specified format. |
| | |
| | Args: |
| | results_dashboard: The results dashboard component. |
| | export_format (str): The format to export the report in ('pdf', 'json', 'html', 'csv'). |
| | |
| | Returns: |
| | str: The path to the exported file. |
| | """ |
| | try: |
| | if not self.state.get('results'): |
| | logger.warning("No results available to export") |
| | return None |
| | |
| | |
| | format_value = export_format.value if hasattr(export_format, 'value') else export_format |
| | |
| | |
| | exports_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'exports') |
| | os.makedirs(exports_dir, exist_ok=True) |
| | |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | repo_name = self.state['repo_url'].split('/')[-1].replace('.git', '') |
| | filename = f"{repo_name}_review_{timestamp}.{format_value}" |
| | filepath = os.path.join(exports_dir, filename) |
| | |
| | |
| | report_paths = self.report_generator.generate_report( |
| | repo_name, self.state['results'], format_value |
| | ) |
| | |
| | if format_value in report_paths: |
| | return report_paths[format_value] |
| | else: |
| | logger.warning(f"Unsupported export format: {format_value}") |
| | return None |
| | |
| | logger.info(f"Exported report to {filepath}") |
| | return filepath |
| | |
| | except Exception as e: |
| | logger.exception(f"Error exporting report: {e}") |
| | return None |
| | |
| | def _clone_repository(self, repo_url, github_token=None): |
| | """ |
| | Clone the GitHub repository to a temporary directory. |
| | |
| | Args: |
| | repo_url (str): The URL of the GitHub repository to clone. |
| | github_token (str, optional): GitHub authentication token for private repositories. |
| | |
| | Returns: |
| | str: The path to the cloned repository. |
| | """ |
| | |
| | from src.services.repository_service import RepositoryService |
| | |
| | |
| | repo_service = RepositoryService(base_temp_dir=self.temp_dir) |
| | |
| | |
| | try: |
| | |
| | if github_token and github_token.strip(): |
| | |
| | auth_url = repo_url.replace('https://', f'https://{github_token}@') |
| | repo_path = repo_service.clone_repository(auth_url) |
| | logger.info(f"Cloned repository using GitHub token authentication") |
| | else: |
| | |
| | repo_path = repo_service.clone_repository(repo_url) |
| | logger.info(f"Cloned repository without authentication") |
| | |
| | return repo_path |
| | except Exception as e: |
| | logger.error(f"Error cloning repository: {e}") |
| | raise |
| | |
| | def _perform_ai_review(self, repo_path, languages): |
| | """ |
| | Perform AI-powered code review with parallel processing. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | languages (list): List of programming languages to analyze. |
| | |
| | Returns: |
| | dict: AI review results. |
| | """ |
| | try: |
| | |
| | from src.mcp.ai_review import AIReviewService |
| | import os |
| | |
| | ai_reviewer = AIReviewService() |
| | |
| | |
| | if not ai_reviewer.is_available(): |
| | logger.warning("AI review service is not available. Please set NEBIUS_API_KEY in environment variables.") |
| | return { |
| | 'error': 'AI review service is not available. Please set NEBIUS_API_KEY in environment variables.', |
| | 'suggestions': [], |
| | 'issues': [] |
| | } |
| | |
| | |
| | all_files = [] |
| | language_extensions = { |
| | 'Python': ['.py'], |
| | 'JavaScript': ['.js'], |
| | 'TypeScript': ['.ts', '.tsx'], |
| | 'Java': ['.java'], |
| | 'Go': ['.go'], |
| | 'Rust': ['.rs'] |
| | } |
| | |
| | |
| | extensions_to_check = [] |
| | for lang in languages: |
| | if lang in language_extensions: |
| | extensions_to_check.extend(language_extensions[lang]) |
| | |
| | |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | file_path = os.path.join(root, file) |
| | _, ext = os.path.splitext(file_path) |
| | if ext in extensions_to_check: |
| | all_files.append(file_path) |
| | |
| | |
| | max_files = 20 |
| | if len(all_files) > max_files: |
| | logger.warning(f"Too many files to review ({len(all_files)}). Limiting to {max_files} files.") |
| | all_files = all_files[:max_files] |
| | |
| | |
| | |
| | results = ai_reviewer.review_repository(repo_path, all_files, languages, None) |
| | |
| | logger.info(f"AI review completed for {len(all_files)} files across {len(languages)} languages") |
| | return results |
| | except Exception as e: |
| | logger.error(f"Error during AI review: {e}") |
| | return { |
| | 'error': str(e), |
| | 'suggestions': [], |
| | 'issues': [] |
| | } |
| | |
| | def _update_progress(self, step, value, overall_progress=None, status_message=None): |
| | """Update progress for a specific step and overall progress.""" |
| | |
| | self.state['current_step'] = step |
| | self.state['progress'][step] = value |
| | |
| | |
| | total_steps = len(self.state['progress']) |
| | if total_steps > 0: |
| | overall = sum(self.state['progress'].values()) / total_steps |
| | else: |
| | overall = 0 |
| | |
| | |
| | if overall_progress is not None: |
| | overall_progress.value = overall |
| | if status_message is not None: |
| | status_message.value = f"*Progress update: {step} - {value}% (Overall: {overall:.1f}%)*" |
| | |
| | |
| | if hasattr(self, 'step_progress') and step in self.step_progress: |
| | self.step_progress[step].value = value |
| | |
| | |
| | logger.info(f"Progress update: {step} - {value}% (Overall: {overall:.1f}%)") |
| | |
| | def _create_results_dashboard(self, report): |
| | """ |
| | Create a results dashboard component for the UI. |
| | |
| | Args: |
| | report (dict): The code review report. |
| | |
| | Returns: |
| | gr.Tabs: A Gradio results dashboard component. |
| | |
| | """ |
| | |
| | from src.ui.components.results_dashboard import create_results_dashboard |
| | |
| | |
| | results_dashboard = create_results_dashboard() |
| | |
| | |
| | results_dashboard.visible = True |
| | |
| | |
| | |
| | |
| | return results_dashboard |