Spaces:
Build error
Build error
| """ | |
| Pricing endpoints – integrates the ARF Bayesian pricing calculator. | |
| """ | |
| from fastapi import APIRouter, HTTPException, Depends | |
| from pydantic import BaseModel | |
| import logging | |
| from arf_pricing_calculator.core.pricing_engine import PricingEngine | |
| from arf_pricing_calculator.ingestion.questionnaire_parser import parse_input_dict | |
| from arf_pricing_calculator.types import PricingOutput | |
| from app.core.usage_tracker import enforce_quota | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| class PricingEstimateRequest(BaseModel): | |
| """Request body for single pricing estimate.""" | |
| input: dict | |
| customer_id: str = "default" | |
| force: bool = False | |
| class PricingRunRequest(BaseModel): | |
| """Request body for multi‑run pricing with learning.""" | |
| input: dict | |
| customer_id: str = "default" | |
| runs: int = 1 | |
| cooldown_hours: int = 24 | |
| force: bool = False | |
| async def estimate_pricing( | |
| req: PricingEstimateRequest, | |
| quota: dict = Depends(enforce_quota), # optional: enforce usage tracking | |
| ): | |
| """ | |
| Single pricing estimate – no learning, no buffer update. | |
| """ | |
| try: | |
| # Convert the input dict to a PricingInput object | |
| pricing_input = parse_input_dict(req.input) | |
| # Create engine without buffer (no learning) | |
| engine = PricingEngine(calibration_buffer=[]) | |
| output = engine.estimate(pricing_input) | |
| return output | |
| except Exception as e: | |
| logger.exception("Pricing estimate failed") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def run_pricing( | |
| req: PricingRunRequest, | |
| quota: dict = Depends(enforce_quota), | |
| ): | |
| """ | |
| Multi‑run pricing with cooldown and buffer persistence. | |
| Each run’s simulated outcome is added to the buffer, so subsequent runs | |
| see an updated posterior. | |
| """ | |
| # We need to reuse the same buffer across runs; we'll load it per request. | |
| # For simplicity, we'll load from the default location. | |
| from arf_pricing_calculator.storage.buffer import load_buffer, add_event | |
| from arf_pricing_calculator.orchestration.cooldown import enforce_cooldown, is_cooldown_active | |
| outputs = [] | |
| buffer = load_buffer() # loads from calibration_buffer.json | |
| for i in range(req.runs): | |
| if not req.force and is_cooldown_active( | |
| req.customer_id, req.cooldown_hours): | |
| raise HTTPException(status_code=429, | |
| detail=f"Cooldown active after {i} runs") | |
| pricing_input = parse_input_dict(req.input) | |
| engine = PricingEngine(calibration_buffer=buffer) | |
| out = engine.estimate(pricing_input) | |
| # Simulate an outcome (in real use, this would come from the actual | |
| # deal) | |
| import random | |
| outcome = "success" if random.random() > out.risk_score else "failure" # nosec B311 | |
| event = { | |
| "run_id": out.run_history_id, | |
| "customer_id": req.customer_id, | |
| "outcome": outcome, | |
| "price": out.recommended_price, | |
| "value": out.expected_value, | |
| "risk_score": out.risk_score, | |
| "run_number": i + 1, | |
| } | |
| add_event(event) | |
| buffer = load_buffer() # reload after update | |
| outputs.append(out) | |
| if i < req.runs - 1: | |
| enforce_cooldown(req.customer_id, req.cooldown_hours) | |
| return outputs | |