| | |
| | |
| |
|
| | """ |
| | Security Scanner Service |
| | |
| | This module provides functionality for scanning code for security vulnerabilities. |
| | """ |
| |
|
| | import os |
| | import subprocess |
| | import logging |
| | import json |
| | import tempfile |
| | import concurrent.futures |
| | from collections import defaultdict |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class SecurityScanner: |
| | """ |
| | Service for scanning code for security vulnerabilities. |
| | """ |
| | |
| | def __init__(self): |
| | """ |
| | Initialize the SecurityScanner. |
| | """ |
| | logger.info("Initialized SecurityScanner") |
| | self.scanners = { |
| | 'Python': self._scan_python, |
| | 'JavaScript': self._scan_javascript, |
| | 'TypeScript': self._scan_javascript, |
| | 'Java': self._scan_java, |
| | 'Go': self._scan_go, |
| | 'Rust': self._scan_rust, |
| | } |
| | |
| | def scan_repository(self, repo_path, languages): |
| | """ |
| | Scan a repository for security vulnerabilities in the specified languages using parallel processing. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | languages (list): A list of programming languages to scan. |
| | |
| | Returns: |
| | dict: A dictionary containing scan results for each language. |
| | """ |
| | logger.info(f"Scanning repository at {repo_path} for security vulnerabilities in languages: {languages}") |
| | |
| | results = {} |
| | |
| | |
| | results['dependencies'] = self._scan_dependencies(repo_path) |
| | |
| | |
| | def scan_language(language): |
| | if language in self.scanners: |
| | try: |
| | logger.info(f"Scanning {language} code in {repo_path} for security vulnerabilities") |
| | return language, self.scanners[language](repo_path) |
| | except Exception as e: |
| | logger.error(f"Error scanning {language} code for security vulnerabilities: {e}") |
| | return language, { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'vulnerabilities': [], |
| | } |
| | else: |
| | logger.warning(f"No security scanner available for {language}") |
| | return language, { |
| | 'status': 'not_supported', |
| | 'message': f"Security scanning for {language} is not supported yet.", |
| | 'vulnerabilities': [], |
| | } |
| | |
| | |
| | with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(languages), 5)) as executor: |
| | |
| | future_to_language = {executor.submit(scan_language, language): language for language in languages} |
| | |
| | |
| | for future in concurrent.futures.as_completed(future_to_language): |
| | language = future_to_language[future] |
| | try: |
| | lang, result = future.result() |
| | results[lang] = result |
| | logger.info(f"Completed security scanning for {lang}") |
| | except Exception as e: |
| | logger.error(f"Exception occurred during security scanning of {language}: {e}") |
| | results[language] = { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'vulnerabilities': [], |
| | } |
| | |
| | return results |
| | |
| | def _scan_dependencies(self, repo_path): |
| | """ |
| | Scan dependencies for known vulnerabilities. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Dependency scan results. |
| | """ |
| | logger.info(f"Scanning dependencies in {repo_path}") |
| | |
| | results = { |
| | 'python': self._scan_python_dependencies(repo_path), |
| | 'javascript': self._scan_javascript_dependencies(repo_path), |
| | 'java': self._scan_java_dependencies(repo_path), |
| | 'go': self._scan_go_dependencies(repo_path), |
| | 'rust': self._scan_rust_dependencies(repo_path), |
| | } |
| | |
| | |
| | all_vulnerabilities = [] |
| | for lang_result in results.values(): |
| | all_vulnerabilities.extend(lang_result.get('vulnerabilities', [])) |
| | |
| | return { |
| | 'status': 'success', |
| | 'vulnerabilities': all_vulnerabilities, |
| | 'vulnerability_count': len(all_vulnerabilities), |
| | 'language_results': results, |
| | } |
| | |
| | def _scan_python_dependencies(self, repo_path): |
| | """ |
| | Scan Python dependencies for known vulnerabilities using safety. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for Python dependencies. |
| | """ |
| | logger.info(f"Scanning Python dependencies in {repo_path}") |
| | |
| | |
| | requirements_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file == 'requirements.txt' or file == 'Pipfile' or file == 'Pipfile.lock' or file == 'setup.py': |
| | requirements_files.append(os.path.join(root, file)) |
| | |
| | if not requirements_files: |
| | return { |
| | 'status': 'no_dependencies', |
| | 'message': 'No Python dependency files found.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | vulnerabilities = [] |
| | |
| | for req_file in requirements_files: |
| | try: |
| | |
| | cmd = [ |
| | 'safety', |
| | 'check', |
| | '--file', req_file, |
| | '--json', |
| | ] |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | ) |
| | |
| | |
| | if process.stdout.strip(): |
| | try: |
| | safety_results = json.loads(process.stdout) |
| | |
| | for vuln in safety_results.get('vulnerabilities', []): |
| | vulnerabilities.append({ |
| | 'package': vuln.get('package_name', ''), |
| | 'installed_version': vuln.get('installed_version', ''), |
| | 'affected_versions': vuln.get('vulnerable_spec', ''), |
| | 'description': vuln.get('advisory', ''), |
| | 'severity': vuln.get('severity', ''), |
| | 'file': req_file, |
| | 'language': 'Python', |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing safety output: {process.stdout}") |
| | except Exception as e: |
| | logger.error(f"Error running safety on {req_file}: {e}") |
| | |
| | return { |
| | 'status': 'success', |
| | 'vulnerabilities': vulnerabilities, |
| | 'vulnerability_count': len(vulnerabilities), |
| | 'files_scanned': requirements_files, |
| | } |
| | |
| | def _scan_javascript_dependencies(self, repo_path): |
| | """ |
| | Scan JavaScript/TypeScript dependencies for known vulnerabilities using npm audit. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for JavaScript dependencies. |
| | """ |
| | logger.info(f"Scanning JavaScript dependencies in {repo_path}") |
| | |
| | |
| | package_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | if 'package.json' in files: |
| | package_files.append(os.path.join(root, 'package.json')) |
| | |
| | if not package_files: |
| | return { |
| | 'status': 'no_dependencies', |
| | 'message': 'No JavaScript dependency files found.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | vulnerabilities = [] |
| | |
| | for pkg_file in package_files: |
| | pkg_dir = os.path.dirname(pkg_file) |
| | try: |
| | |
| | cmd = [ |
| | 'npm', |
| | 'audit', |
| | '--json', |
| | ] |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | cwd=pkg_dir, |
| | ) |
| | |
| | |
| | if process.stdout.strip(): |
| | try: |
| | audit_results = json.loads(process.stdout) |
| | |
| | |
| | for vuln_id, vuln_info in audit_results.get('vulnerabilities', {}).items(): |
| | vulnerabilities.append({ |
| | 'package': vuln_info.get('name', ''), |
| | 'installed_version': vuln_info.get('version', ''), |
| | 'affected_versions': vuln_info.get('range', ''), |
| | 'description': vuln_info.get('overview', ''), |
| | 'severity': vuln_info.get('severity', ''), |
| | 'file': pkg_file, |
| | 'language': 'JavaScript', |
| | 'cwe': vuln_info.get('cwe', ''), |
| | 'recommendation': vuln_info.get('recommendation', ''), |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing npm audit output: {process.stdout}") |
| | except Exception as e: |
| | logger.error(f"Error running npm audit on {pkg_file}: {e}") |
| | |
| | return { |
| | 'status': 'success', |
| | 'vulnerabilities': vulnerabilities, |
| | 'vulnerability_count': len(vulnerabilities), |
| | 'files_scanned': package_files, |
| | } |
| | |
| | def _scan_java_dependencies(self, repo_path): |
| | """ |
| | Scan Java dependencies for known vulnerabilities. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for Java dependencies. |
| | """ |
| | logger.info(f"Scanning Java dependencies in {repo_path}") |
| | |
| | |
| | dependency_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file == 'pom.xml' or file == 'build.gradle': |
| | dependency_files.append(os.path.join(root, file)) |
| | |
| | if not dependency_files: |
| | return { |
| | 'status': 'no_dependencies', |
| | 'message': 'No Java dependency files found.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | |
| | |
| | return { |
| | 'status': 'not_implemented', |
| | 'message': 'Java dependency scanning is not fully implemented yet.', |
| | 'vulnerabilities': [], |
| | 'files_scanned': dependency_files, |
| | } |
| | |
| | def _scan_go_dependencies(self, repo_path): |
| | """ |
| | Scan Go dependencies for known vulnerabilities using govulncheck. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for Go dependencies. |
| | """ |
| | logger.info(f"Scanning Go dependencies in {repo_path}") |
| | |
| | |
| | go_mod_path = os.path.join(repo_path, 'go.mod') |
| | if not os.path.exists(go_mod_path): |
| | return { |
| | 'status': 'no_dependencies', |
| | 'message': 'No Go dependency files found.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'govulncheck', |
| | '-json', |
| | './...', |
| | ] |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | cwd=repo_path, |
| | ) |
| | |
| | |
| | vulnerabilities = [] |
| | if process.stdout.strip(): |
| | for line in process.stdout.splitlines(): |
| | try: |
| | result = json.loads(line) |
| | if 'vulnerability' in result: |
| | vuln = result['vulnerability'] |
| | vulnerabilities.append({ |
| | 'package': vuln.get('package', ''), |
| | 'description': vuln.get('details', ''), |
| | 'severity': 'high', |
| | 'file': go_mod_path, |
| | 'language': 'Go', |
| | 'cve': vuln.get('osv', {}).get('id', ''), |
| | 'affected_versions': vuln.get('osv', {}).get('affected', ''), |
| | }) |
| | except json.JSONDecodeError: |
| | continue |
| | |
| | return { |
| | 'status': 'success', |
| | 'vulnerabilities': vulnerabilities, |
| | 'vulnerability_count': len(vulnerabilities), |
| | 'files_scanned': [go_mod_path], |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error running govulncheck: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'vulnerabilities': [], |
| | } |
| | |
| | def _scan_rust_dependencies(self, repo_path): |
| | """ |
| | Scan Rust dependencies for known vulnerabilities using cargo-audit. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for Rust dependencies. |
| | """ |
| | logger.info(f"Scanning Rust dependencies in {repo_path}") |
| | |
| | |
| | cargo_toml_path = os.path.join(repo_path, 'Cargo.toml') |
| | if not os.path.exists(cargo_toml_path): |
| | return { |
| | 'status': 'no_dependencies', |
| | 'message': 'No Rust dependency files found.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'cargo', |
| | 'audit', |
| | '--json', |
| | ] |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | cwd=repo_path, |
| | ) |
| | |
| | |
| | vulnerabilities = [] |
| | if process.stdout.strip(): |
| | try: |
| | audit_results = json.loads(process.stdout) |
| | |
| | for vuln in audit_results.get('vulnerabilities', {}).get('list', []): |
| | vulnerabilities.append({ |
| | 'package': vuln.get('package', {}).get('name', ''), |
| | 'installed_version': vuln.get('package', {}).get('version', ''), |
| | 'description': vuln.get('advisory', {}).get('description', ''), |
| | 'severity': vuln.get('advisory', {}).get('severity', ''), |
| | 'file': cargo_toml_path, |
| | 'language': 'Rust', |
| | 'cve': vuln.get('advisory', {}).get('id', ''), |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing cargo-audit output: {process.stdout}") |
| | |
| | return { |
| | 'status': 'success', |
| | 'vulnerabilities': vulnerabilities, |
| | 'vulnerability_count': len(vulnerabilities), |
| | 'files_scanned': [cargo_toml_path], |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error running cargo-audit: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'vulnerabilities': [], |
| | } |
| | |
| | def _scan_python(self, repo_path): |
| | """ |
| | Scan Python code for security vulnerabilities using bandit. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for Python code. |
| | """ |
| | logger.info(f"Scanning Python code in {repo_path} for security vulnerabilities") |
| | |
| | |
| | python_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith('.py'): |
| | python_files.append(os.path.join(root, file)) |
| | |
| | if not python_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No Python files found in the repository.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'bandit', |
| | '-r', |
| | '-f', 'json', |
| | repo_path, |
| | ] |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | ) |
| | |
| | |
| | vulnerabilities = [] |
| | if process.stdout.strip(): |
| | try: |
| | bandit_results = json.loads(process.stdout) |
| | |
| | for result in bandit_results.get('results', []): |
| | vulnerabilities.append({ |
| | 'file': result.get('filename', ''), |
| | 'line': result.get('line_number', 0), |
| | 'code': result.get('code', ''), |
| | 'issue': result.get('issue_text', ''), |
| | 'severity': result.get('issue_severity', ''), |
| | 'confidence': result.get('issue_confidence', ''), |
| | 'cwe': result.get('cwe', ''), |
| | 'test_id': result.get('test_id', ''), |
| | 'test_name': result.get('test_name', ''), |
| | 'language': 'Python', |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing bandit output: {process.stdout}") |
| | |
| | |
| | vulns_by_severity = defaultdict(list) |
| | for vuln in vulnerabilities: |
| | severity = vuln.get('severity', 'unknown') |
| | vulns_by_severity[severity].append(vuln) |
| | |
| | return { |
| | 'status': 'success', |
| | 'vulnerabilities': vulnerabilities, |
| | 'vulnerabilities_by_severity': dict(vulns_by_severity), |
| | 'vulnerability_count': len(vulnerabilities), |
| | 'files_scanned': len(python_files), |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error running bandit: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'vulnerabilities': [], |
| | } |
| | |
| | def _scan_javascript(self, repo_path): |
| | """ |
| | Scan JavaScript/TypeScript code for security vulnerabilities using NodeJSScan. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for JavaScript/TypeScript code. |
| | """ |
| | logger.info(f"Scanning JavaScript/TypeScript code in {repo_path} for security vulnerabilities") |
| | |
| | |
| | js_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | if 'node_modules' in root: |
| | continue |
| | for file in files: |
| | if file.endswith(('.js', '.jsx', '.ts', '.tsx')): |
| | js_files.append(os.path.join(root, file)) |
| | |
| | if not js_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No JavaScript/TypeScript files found in the repository.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | |
| | |
| | |
| | |
| | eslint_config = { |
| | "env": { |
| | "browser": True, |
| | "es2021": True, |
| | "node": True |
| | }, |
| | "extends": [ |
| | "eslint:recommended", |
| | "plugin:security/recommended" |
| | ], |
| | "plugins": [ |
| | "security" |
| | ], |
| | "parserOptions": { |
| | "ecmaVersion": 12, |
| | "sourceType": "module", |
| | "ecmaFeatures": { |
| | "jsx": True |
| | } |
| | }, |
| | "rules": {} |
| | } |
| | |
| | with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as temp_config: |
| | json.dump(eslint_config, temp_config) |
| | temp_config_path = temp_config.name |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'npx', |
| | 'eslint', |
| | '--config', temp_config_path, |
| | '--format', 'json', |
| | '--plugin', 'security', |
| | ] + js_files |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | ) |
| | |
| | |
| | vulnerabilities = [] |
| | if process.stdout.strip(): |
| | try: |
| | eslint_results = json.loads(process.stdout) |
| | |
| | for result in eslint_results: |
| | file_path = result.get('filePath', '') |
| | for message in result.get('messages', []): |
| | |
| | rule_id = message.get('ruleId', '') |
| | if rule_id and ('security' in rule_id or 'no-eval' in rule_id or 'no-implied-eval' in rule_id): |
| | vulnerabilities.append({ |
| | 'file': file_path, |
| | 'line': message.get('line', 0), |
| | 'column': message.get('column', 0), |
| | 'issue': message.get('message', ''), |
| | 'severity': 'high' if message.get('severity', 0) == 2 else 'medium', |
| | 'rule': rule_id, |
| | 'language': 'JavaScript', |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing ESLint output: {process.stdout}") |
| | |
| | |
| | vulns_by_severity = defaultdict(list) |
| | for vuln in vulnerabilities: |
| | severity = vuln.get('severity', 'unknown') |
| | vulns_by_severity[severity].append(vuln) |
| | |
| | return { |
| | 'status': 'success', |
| | 'vulnerabilities': vulnerabilities, |
| | 'vulnerabilities_by_severity': dict(vulns_by_severity), |
| | 'vulnerability_count': len(vulnerabilities), |
| | 'files_scanned': len(js_files), |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error scanning JavaScript/TypeScript code: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'vulnerabilities': [], |
| | } |
| | |
| | finally: |
| | |
| | if os.path.exists(temp_config_path): |
| | os.unlink(temp_config_path) |
| | |
| | def _scan_java(self, repo_path): |
| | """ |
| | Scan Java code for security vulnerabilities. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for Java code. |
| | """ |
| | logger.info(f"Scanning Java code in {repo_path} for security vulnerabilities") |
| | |
| | |
| | java_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith('.java'): |
| | java_files.append(os.path.join(root, file)) |
| | |
| | if not java_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No Java files found in the repository.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | |
| | |
| | return { |
| | 'status': 'not_implemented', |
| | 'message': 'Java security scanning is not fully implemented yet.', |
| | 'vulnerabilities': [], |
| | 'files_scanned': java_files, |
| | } |
| | |
| | def _scan_go(self, repo_path): |
| | """ |
| | Scan Go code for security vulnerabilities using gosec. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for Go code. |
| | """ |
| | logger.info(f"Scanning Go code in {repo_path} for security vulnerabilities") |
| | |
| | |
| | go_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith('.go'): |
| | go_files.append(os.path.join(root, file)) |
| | |
| | if not go_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No Go files found in the repository.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | try: |
| | |
| | cmd = [ |
| | 'gosec', |
| | '-fmt', 'json', |
| | '-quiet', |
| | './...', |
| | ] |
| | |
| | process = subprocess.run( |
| | cmd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | check=False, |
| | cwd=repo_path, |
| | ) |
| | |
| | |
| | vulnerabilities = [] |
| | if process.stdout.strip(): |
| | try: |
| | gosec_results = json.loads(process.stdout) |
| | |
| | for issue in gosec_results.get('Issues', []): |
| | vulnerabilities.append({ |
| | 'file': issue.get('file', ''), |
| | 'line': issue.get('line', ''), |
| | 'code': issue.get('code', ''), |
| | 'issue': issue.get('details', ''), |
| | 'severity': issue.get('severity', ''), |
| | 'confidence': issue.get('confidence', ''), |
| | 'cwe': issue.get('cwe', {}).get('ID', ''), |
| | 'rule_id': issue.get('rule_id', ''), |
| | 'language': 'Go', |
| | }) |
| | except json.JSONDecodeError: |
| | logger.error(f"Error parsing gosec output: {process.stdout}") |
| | |
| | |
| | vulns_by_severity = defaultdict(list) |
| | for vuln in vulnerabilities: |
| | severity = vuln.get('severity', 'unknown') |
| | vulns_by_severity[severity].append(vuln) |
| | |
| | return { |
| | 'status': 'success', |
| | 'vulnerabilities': vulnerabilities, |
| | 'vulnerabilities_by_severity': dict(vulns_by_severity), |
| | 'vulnerability_count': len(vulnerabilities), |
| | 'files_scanned': len(go_files), |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error running gosec: {e}") |
| | return { |
| | 'status': 'error', |
| | 'error': str(e), |
| | 'vulnerabilities': [], |
| | } |
| | |
| | def _scan_rust(self, repo_path): |
| | """ |
| | Scan Rust code for security vulnerabilities. |
| | |
| | Args: |
| | repo_path (str): The path to the repository. |
| | |
| | Returns: |
| | dict: Scan results for Rust code. |
| | """ |
| | logger.info(f"Scanning Rust code in {repo_path} for security vulnerabilities") |
| | |
| | |
| | rust_files = [] |
| | for root, _, files in os.walk(repo_path): |
| | for file in files: |
| | if file.endswith('.rs'): |
| | rust_files.append(os.path.join(root, file)) |
| | |
| | if not rust_files: |
| | return { |
| | 'status': 'no_files', |
| | 'message': 'No Rust files found in the repository.', |
| | 'vulnerabilities': [], |
| | } |
| | |
| | |
| | |
| | return { |
| | 'status': 'not_implemented', |
| | 'message': 'Rust security scanning is not fully implemented yet.', |
| | 'vulnerabilities': [], |
| | 'files_scanned': rust_files, |
| | } |