|
|
| import logging
|
| import re
|
| import os
|
| import json
|
| from typing import Dict, List, Optional, Any, Union
|
| from enum import Enum
|
| from .registry import get_field_registry_manager
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class ValidationSeverity(Enum):
|
| ERROR = "error"
|
| WARNING = "warning"
|
| INFO = "info"
|
|
|
|
|
| try:
|
| REGISTRY_MANAGER = get_field_registry_manager()
|
| FIELD_CLASSIFICATION = REGISTRY_MANAGER.generate_field_classification()
|
| COMPLETENESS_PROFILES = REGISTRY_MANAGER.generate_completeness_profiles()
|
| VALIDATION_MESSAGES = REGISTRY_MANAGER.generate_validation_messages()
|
| SCORING_WEIGHTS = REGISTRY_MANAGER.get_configurable_scoring_weights()
|
| logger.info(f"✅ Registry-driven configuration loaded: {len(FIELD_CLASSIFICATION)} fields")
|
| except Exception as e:
|
| logger.error(f"❌ Failed to load registry configuration: {e}")
|
|
|
| FIELD_CLASSIFICATION = {}
|
| COMPLETENESS_PROFILES = {}
|
| VALIDATION_MESSAGES = {}
|
| SCORING_WEIGHTS = {}
|
|
|
|
|
| try:
|
| schema_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schemas", "spdx.schema.json")
|
| with open(schema_path, "r", encoding="utf-8") as f:
|
| _spdx_schema = json.load(f)
|
| SPDX_LICENSES = set(_spdx_schema.get("enum", []))
|
| logger.info(f"✅ SPDX licenses schema loaded: {len(SPDX_LICENSES)} licenses")
|
| except Exception as e:
|
| logger.error(f"❌ Failed to load SPDX schema: {e}")
|
| SPDX_LICENSES = {"MIT", "Apache-2.0", "GPL-3.0-only", "GPL-2.0-only", "LGPL-3.0-only",
|
| "BSD-3-Clause", "BSD-2-Clause", "CC-BY-4.0", "CC-BY-SA-4.0", "CC0-1.0",
|
| "Unlicense", "NONE"}
|
|
|
|
|
| JSON_SCHEMA_REGISTRY = None
|
| try:
|
| from referencing import Registry, Resource
|
| registry = Registry()
|
| schemas_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schemas")
|
| if os.path.exists(schemas_dir):
|
| for filename in os.listdir(schemas_dir):
|
| if filename.endswith(".json"):
|
| with open(os.path.join(schemas_dir, filename), "r", encoding="utf-8") as schema_file:
|
| schema_data = json.load(schema_file)
|
| resource = Resource.from_contents(schema_data)
|
| schema_id = schema_data.get("$id", "")
|
| if schema_id:
|
| registry = registry.with_resource(uri=schema_id, resource=resource)
|
| registry = registry.with_resource(uri=filename, resource=resource)
|
| JSON_SCHEMA_REGISTRY = registry
|
| logger.info("✅ JSON Schema Registry loaded for local ref resolution")
|
| except Exception as e:
|
| logger.error(f"❌ Failed to build JSON Schema Registry: {e}")
|
|
|
| def validate_spdx(license_entry):
|
| if isinstance(license_entry, list):
|
| return all(lic in SPDX_LICENSES for lic in license_entry)
|
| return license_entry in SPDX_LICENSES
|
|
|
| def check_field_in_aibom(aibom: Dict[str, Any], field: str) -> bool:
|
| """
|
| Check if a field is present in the AIBOM (Legacy/Standard Layout check).
|
| Optimized to use a flattened set if possible, but for individual check this is fine.
|
| """
|
|
|
| if field in aibom:
|
| return True
|
|
|
|
|
| metadata = aibom.get("metadata", {})
|
| if field in metadata:
|
| return True
|
|
|
|
|
| if "properties" in metadata:
|
| for prop in metadata["properties"]:
|
| if prop.get("name") in {field, f"spdx:{field}"}:
|
| return True
|
|
|
|
|
| components = aibom.get("components", [])
|
| if components:
|
| component = components[0]
|
| if field in component:
|
| return True
|
|
|
|
|
| if "properties" in component:
|
| for prop in component["properties"]:
|
| if prop.get("name") in {field, f"spdx:{field}"}:
|
| return True
|
|
|
|
|
| model_card = component.get("modelCard", {})
|
| if field in model_card:
|
| return True
|
|
|
| if "modelParameters" in model_card and field in model_card["modelParameters"]:
|
| return True
|
|
|
|
|
| if "considerations" in model_card:
|
| considerations = model_card["considerations"]
|
| field_mappings = {
|
| "technicalLimitations": ["technicalLimitations", "limitations"],
|
| "safetyRiskAssessment": ["ethicalConsiderations", "safetyRiskAssessment"],
|
| "energyConsumption": ["environmentalConsiderations", "energyConsumption"]
|
| }
|
| if field in field_mappings:
|
| if any(sec in considerations and considerations[sec] for sec in field_mappings[field]):
|
| return True
|
| if field in considerations:
|
| return True
|
|
|
|
|
| components = aibom.get("components", [])
|
| if components:
|
| ext_refs = components[0].get("externalReferences", [])
|
| if field == "downloadLocation":
|
| return any(ref.get("type") in ["distribution", "website"] and ref.get("url") for ref in ext_refs)
|
| if field == "vcs":
|
| return any(ref.get("type") == "vcs" and ref.get("url") for ref in ext_refs)
|
| if field == "website":
|
| return any(ref.get("type") == "website" and ref.get("url") for ref in ext_refs)
|
| if field == "paper":
|
| return any(ref.get("type") == "documentation" and ref.get("url") for ref in ext_refs)
|
|
|
| return False
|
|
|
| def check_field_with_enhanced_results(aibom: Dict[str, Any], field: str, extraction_results: Optional[Dict[str, Any]] = None) -> bool:
|
| """
|
| Enhanced field detection using registry manager and extraction results.
|
| """
|
| try:
|
| manager = get_field_registry_manager()
|
|
|
|
|
| fields = manager.get_field_definitions()
|
| if field in fields:
|
| field_config = fields[field]
|
| field_path = field_config.get('jsonpath', '')
|
| if field_path:
|
| is_present, value = manager.detect_field_presence(aibom, field_path)
|
| if is_present:
|
| return True
|
|
|
|
|
| if extraction_results and field in extraction_results:
|
| extraction_result = extraction_results[field]
|
|
|
| if hasattr(extraction_result, 'confidence'):
|
|
|
| conf = extraction_result.confidence
|
|
|
| val = conf.value if hasattr(conf, 'value') else conf
|
| if val == 'none':
|
| return False
|
| return val in ['medium', 'high']
|
| elif hasattr(extraction_result, 'value'):
|
| val = extraction_result.value
|
| return val not in ['NOASSERTION', 'NOT_FOUND', None, '']
|
| else:
|
|
|
| return True
|
|
|
|
|
| return check_field_in_aibom(aibom, field)
|
|
|
| except Exception as e:
|
| logger.error(f"Error in enhanced field detection for {field}: {e}")
|
| return check_field_in_aibom(aibom, field)
|
|
|
| def determine_completeness_profile(aibom: Dict[str, Any], score: float) -> Dict[str, Any]:
|
| satisfied_profiles = []
|
|
|
| for profile_name, profile in COMPLETENESS_PROFILES.items():
|
| all_required_present = all(check_field_in_aibom(aibom, field) for field in profile["required_fields"])
|
| score_sufficient = score >= profile["minimum_score"]
|
|
|
| if all_required_present and score_sufficient:
|
| satisfied_profiles.append(profile_name)
|
|
|
| if "advanced" in satisfied_profiles:
|
| profile = COMPLETENESS_PROFILES.get("advanced", {})
|
| return {"name": "Advanced", "description": profile.get("description", ""), "satisfied": True}
|
| elif "standard" in satisfied_profiles:
|
| profile = COMPLETENESS_PROFILES.get("standard", {})
|
| return {"name": "Standard", "description": profile.get("description", ""), "satisfied": True}
|
| elif "basic" in satisfied_profiles:
|
| profile = COMPLETENESS_PROFILES.get("basic", {})
|
| return {"name": "Basic", "description": profile.get("description", ""), "satisfied": True}
|
| else:
|
| return {"name": "incomplete", "description": "Does not satisfy any completeness profile", "satisfied": False}
|
|
|
| def generate_field_recommendations(missing_fields: Dict[str, List[str]]) -> List[Dict[str, Any]]:
|
| recommendations = []
|
|
|
| for field in missing_fields.get("critical", []):
|
| if field in VALIDATION_MESSAGES:
|
| recommendations.append({
|
| "priority": "high",
|
| "field": field,
|
| "message": VALIDATION_MESSAGES[field]["missing"],
|
| "recommendation": VALIDATION_MESSAGES[field]["recommendation"]
|
| })
|
| else:
|
| recommendations.append({
|
| "priority": "high",
|
| "field": field,
|
| "message": f"Missing critical field: {field}",
|
| "recommendation": f"Add {field} to improve documentation completeness"
|
| })
|
|
|
| for field in missing_fields.get("important", []):
|
| if field in VALIDATION_MESSAGES:
|
| recommendations.append({
|
| "priority": "medium",
|
| "field": field,
|
| "message": VALIDATION_MESSAGES[field]["missing"],
|
| "recommendation": VALIDATION_MESSAGES[field]["recommendation"]
|
| })
|
| else:
|
| recommendations.append({
|
| "priority": "medium",
|
| "field": field,
|
| "message": f"Missing field: {field}",
|
| "recommendation": f"Consider adding {field}"
|
| })
|
|
|
| supplementary_count = 0
|
| for field in missing_fields.get("supplementary", []):
|
| if supplementary_count >= 5: break
|
| recommendations.append({
|
| "priority": "low",
|
| "field": field,
|
| "message": f"Missing supplementary field: {field}",
|
| "recommendation": f"Consider adding {field}"
|
| })
|
| supplementary_count += 1
|
|
|
| return recommendations
|
|
|
|
|
| def calculate_completeness_score(aibom: Dict[str, Any], validate: bool = True, extraction_results: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
| """
|
| Calculate completeness score using registry-defined weights and rules.
|
| """
|
|
|
| category_weights = SCORING_WEIGHTS.get("category_weights", {})
|
| max_scores = {
|
| "required_fields": category_weights.get("required_fields", 20),
|
| "metadata": category_weights.get("metadata", 20),
|
| "component_basic": category_weights.get("component_basic", 20),
|
| "component_model_card": category_weights.get("component_model_card", 30),
|
| "external_references": category_weights.get("external_references", 10)
|
| }
|
|
|
| missing_fields = {"critical": [], "important": [], "supplementary": []}
|
| fields_by_category = {category: {"total": 0, "present": 0} for category in max_scores.keys()}
|
| field_checklist = {}
|
| field_types = {}
|
| field_reference_urls = {}
|
| category_fields_list = {category: [] for category in max_scores.keys()}
|
|
|
|
|
| for field, classification in FIELD_CLASSIFICATION.items():
|
| tier = classification["tier"]
|
| category = classification["category"]
|
| is_gguf = classification.get("is_gguf", False)
|
| jsonpath = classification.get("jsonpath", "")
|
|
|
|
|
|
|
| if category not in fields_by_category:
|
| fields_by_category[category] = {"total": 0, "present": 0}
|
| category_fields_list[category] = []
|
|
|
| is_present = check_field_with_enhanced_results(aibom, field, extraction_results)
|
|
|
| if not is_gguf or is_present:
|
| fields_by_category[category]["total"] += 1
|
|
|
| display_path = jsonpath.replace("$.components[0].", "")
|
| if display_path.startswith("$."): display_path = display_path[2:]
|
|
|
| tier_display = {"critical": "Critical", "important": "Important", "supplementary": "Supplementary"}.get(tier, "Unknown")
|
|
|
| category_fields_list[category].append({
|
| "name": field,
|
| "tier": tier_display,
|
| "path": display_path
|
| })
|
|
|
| if is_present:
|
| fields_by_category[category]["present"] += 1
|
| else:
|
| if not is_gguf:
|
| if tier in missing_fields:
|
| missing_fields[tier].append(field)
|
|
|
| importance_indicator = "★★★" if tier == "critical" else "★★" if tier == "important" else "★"
|
| field_checklist[field] = f"{'✔' if is_present else '✘'} {importance_indicator}"
|
| field_types[field] = classification.get("parameter_type", "CDX")
|
| ref_urls = classification.get("reference_urls", {})
|
| selected_url = ""
|
| if isinstance(ref_urls, dict):
|
| spec_version = aibom.get("specVersion", "1.6")
|
| if spec_version == "1.7" and "cyclonedx_1.7" in ref_urls:
|
| selected_url = ref_urls["cyclonedx_1.7"]
|
| elif "cyclonedx_1.6" in ref_urls:
|
| selected_url = ref_urls["cyclonedx_1.6"]
|
| if spec_version == "1.7" and "cyclonedx.org/docs/1.6" in selected_url:
|
| selected_url = selected_url.replace("1.6", "1.7")
|
| elif "genai_aibom_taxonomy" in ref_urls:
|
| selected_url = ref_urls["genai_aibom_taxonomy"]
|
| elif "spdx_3.1" in ref_urls:
|
| selected_url = ref_urls["spdx_3.1"]
|
| elif isinstance(ref_urls, str):
|
| selected_url = ref_urls
|
|
|
| field_reference_urls[field] = selected_url
|
|
|
| category_details = {}
|
| category_scores = {}
|
| for category, counts in fields_by_category.items():
|
| weight = max_scores.get(category, 0)
|
| percentage = 0
|
| if counts["total"] > 0:
|
| percentage = (counts["present"] / counts["total"]) * 100
|
| raw_score = (percentage / 100) * weight
|
| category_scores[category] = round(raw_score, 1)
|
| else:
|
| category_scores[category] = 0.0
|
|
|
| category_details[category] = {
|
| "present_fields": counts["present"],
|
| "total_fields": counts["total"],
|
| "max_points": weight,
|
| "percentage": round(percentage, 1)
|
| }
|
|
|
|
|
| subtotal_score = sum(category_scores.values())
|
|
|
|
|
| missing_critical = len(missing_fields["critical"])
|
| missing_important = len(missing_fields["important"])
|
|
|
| penalty_factor = 1.0
|
| penalty_reasons = []
|
|
|
| if missing_critical > 3:
|
| penalty_factor *= 0.8
|
| penalty_reasons.append("Multiple critical fields missing")
|
| elif missing_critical >= 2:
|
| penalty_factor *= 0.9
|
| penalty_reasons.append("Some critical fields missing")
|
|
|
| if missing_important >= 5:
|
| penalty_factor *= 0.95
|
| penalty_reasons.append("Several important fields missing")
|
|
|
| final_score = round(subtotal_score * penalty_factor, 1)
|
| final_score = max(0.0, min(final_score, 100.0))
|
|
|
|
|
| result = {
|
| "total_score": final_score,
|
| "subtotal_score": subtotal_score,
|
| "section_scores": category_scores,
|
| "category_details": category_details,
|
| "max_scores": max_scores,
|
| "field_checklist": field_checklist,
|
| "field_types": field_types,
|
| "reference_urls": field_reference_urls,
|
| "missing_fields": missing_fields,
|
| "category_fields_list": category_fields_list,
|
| "completeness_profile": determine_completeness_profile(aibom, final_score),
|
| "penalty_applied": penalty_factor < 1.0,
|
| "penalty_reason": " and ".join(penalty_reasons) if penalty_reasons else None,
|
| "recommendations": generate_field_recommendations(missing_fields)
|
| }
|
|
|
| if validate:
|
| validation_report = validate_aibom(aibom)
|
| result["validation"] = validation_report
|
|
|
| return result
|
|
|
| def _validate_ai_requirements(aibom: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
|
|
|
|
|
| issues = []
|
| if "bomFormat" in aibom and aibom["bomFormat"] != "CycloneDX":
|
| issues.append({"severity": "error", "code": "INVALID_BOM_FORMAT", "message": "Must be CycloneDX", "path": "$.bomFormat"})
|
|
|
| return issues
|
|
|
| def validate_aibom(aibom: Dict[str, Any]) -> Dict[str, Any]:
|
| """
|
| Validate the AIBOM against the appropriate CycloneDX schema.
|
| """
|
| issues = []
|
|
|
|
|
| try:
|
| import json
|
| import jsonschema
|
| import os
|
|
|
| spec_version = aibom.get("specVersion", "1.6")
|
| schema_file = f"bom-{spec_version}.schema.json"
|
|
|
| schema_path = os.path.join(os.path.dirname(__file__), '..', 'schemas', schema_file)
|
|
|
| if os.path.exists(schema_path):
|
| with open(schema_path, 'r', encoding="utf-8") as f:
|
| schema = json.load(f)
|
| if JSON_SCHEMA_REGISTRY is not None:
|
| jsonschema.validate(instance=aibom, schema=schema, registry=JSON_SCHEMA_REGISTRY)
|
| else:
|
| jsonschema.validate(instance=aibom, schema=schema)
|
| else:
|
|
|
| issues.append({"severity": "warning", "message": f"Schema file not found: {schema_file}, skipping strict validation."})
|
|
|
| except jsonschema.ValidationError as e:
|
| issues.append({"severity": "error", "message": e.message, "path": getattr(e, "json_path", "unknown")})
|
| except Exception as e:
|
| issues.append({"severity": "error", "message": f"Validation error: {str(e)}"})
|
|
|
|
|
| custom_issues = _validate_ai_requirements(aibom)
|
| issues.extend(custom_issues)
|
|
|
| return {
|
| "valid": not any(i["severity"] == "error" for i in issues),
|
| "issues": issues,
|
| "error_count": sum(1 for i in issues if i["severity"] == "error")
|
| }
|
|
|