File size: 3,506 Bytes
afa4de7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | """
Pricing endpoints – integrates the ARF Bayesian pricing calculator.
"""
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
import logging
from arf_pricing_calculator.core.pricing_engine import PricingEngine
from arf_pricing_calculator.ingestion.questionnaire_parser import parse_input_dict
from arf_pricing_calculator.types import PricingOutput
from app.core.usage_tracker import enforce_quota
logger = logging.getLogger(__name__)
router = APIRouter()
class PricingEstimateRequest(BaseModel):
"""Request body for single pricing estimate."""
input: dict
customer_id: str = "default"
force: bool = False
class PricingRunRequest(BaseModel):
"""Request body for multi‑run pricing with learning."""
input: dict
customer_id: str = "default"
runs: int = 1
cooldown_hours: int = 24
force: bool = False
@router.post("/pricing/estimate", response_model=PricingOutput)
async def estimate_pricing(
req: PricingEstimateRequest,
quota: dict = Depends(enforce_quota), # optional: enforce usage tracking
):
"""
Single pricing estimate – no learning, no buffer update.
"""
try:
# Convert the input dict to a PricingInput object
pricing_input = parse_input_dict(req.input)
# Create engine without buffer (no learning)
engine = PricingEngine(calibration_buffer=[])
output = engine.estimate(pricing_input)
return output
except Exception as e:
logger.exception("Pricing estimate failed")
raise HTTPException(status_code=400, detail=str(e))
@router.post("/pricing/run", response_model=list[PricingOutput])
async def run_pricing(
req: PricingRunRequest,
quota: dict = Depends(enforce_quota),
):
"""
Multi‑run pricing with cooldown and buffer persistence.
Each run’s simulated outcome is added to the buffer, so subsequent runs
see an updated posterior.
"""
# We need to reuse the same buffer across runs; we'll load it per request.
# For simplicity, we'll load from the default location.
from arf_pricing_calculator.storage.buffer import load_buffer, add_event
from arf_pricing_calculator.orchestration.cooldown import enforce_cooldown, is_cooldown_active
outputs = []
buffer = load_buffer() # loads from calibration_buffer.json
for i in range(req.runs):
if not req.force and is_cooldown_active(
req.customer_id, req.cooldown_hours):
raise HTTPException(status_code=429,
detail=f"Cooldown active after {i} runs")
pricing_input = parse_input_dict(req.input)
engine = PricingEngine(calibration_buffer=buffer)
out = engine.estimate(pricing_input)
# Simulate an outcome (in real use, this would come from the actual
# deal)
import random
outcome = "success" if random.random() > out.risk_score else "failure" # nosec B311
event = {
"run_id": out.run_history_id,
"customer_id": req.customer_id,
"outcome": outcome,
"price": out.recommended_price,
"value": out.expected_value,
"risk_score": out.risk_score,
"run_number": i + 1,
}
add_event(event)
buffer = load_buffer() # reload after update
outputs.append(out)
if i < req.runs - 1:
enforce_cooldown(req.customer_id, req.cooldown_hours)
return outputs
|