Granis87's picture
Upload folder using huggingface_hub
7c8b011 verified
"""
MnemoCore REST API
==================
FastAPI server for MnemoCore (Phase 3.5.1+).
Fully Async I/O with Redis backing.
"""
from contextlib import asynccontextmanager
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
import sys
import os
import asyncio
import secrets
from datetime import datetime, timezone
from fastapi import FastAPI, HTTPException, Request, Security, Depends
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security.api_key import APIKeyHeader
from starlette.middleware.base import BaseHTTPMiddleware
from pydantic import BaseModel, Field, field_validator
from loguru import logger
# Add parent to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from mnemocore.core.engine import HAIMEngine
from mnemocore.core.config import get_config
from mnemocore.core.container import build_container, Container
from mnemocore.api.middleware import (
SecurityHeadersMiddleware,
RateLimiter,
StoreRateLimiter,
QueryRateLimiter,
ConceptRateLimiter,
AnalogyRateLimiter,
rate_limit_exception_handler,
RATE_LIMIT_CONFIGS
)
from mnemocore.api.models import (
StoreRequest,
QueryRequest,
ConceptRequest,
AnalogyRequest,
StoreResponse,
QueryResponse,
QueryResult,
DeleteResponse,
ConceptResponse,
AnalogyResponse,
AnalogyResult,
HealthResponse,
RootResponse,
ErrorResponse
)
from mnemocore.core.logging_config import configure_logging
from mnemocore.core.exceptions import (
MnemoCoreError,
RecoverableError,
IrrecoverableError,
ValidationError,
NotFoundError,
MemoryNotFoundError,
is_debug_mode,
)
# Configure logging
configure_logging()
# --- Observability ---
from prometheus_client import make_asgi_app
from mnemocore.core.metrics import (
API_REQUEST_COUNT,
API_REQUEST_LATENCY,
track_async_latency,
STORAGE_OPERATION_COUNT,
extract_trace_context,
get_trace_id,
init_opentelemetry,
update_memory_count,
update_queue_length,
OTEL_AVAILABLE
)
# Initialize OpenTelemetry (optional, gracefully degrades if not installed)
if OTEL_AVAILABLE:
init_opentelemetry(service_name="mnemocore", exporter="console")
logger.info("OpenTelemetry tracing initialized")
metrics_app = make_asgi_app()
# --- Trace Context Middleware ---
class TraceContextMiddleware(BaseHTTPMiddleware):
"""
Middleware to extract and propagate trace context via X-Trace-ID header.
Integrates with OpenTelemetry for distributed tracing.
"""
async def dispatch(self, request: Request, call_next):
# Extract trace context from incoming headers
headers = dict(request.headers)
trace_id = headers.get("x-trace-id")
if trace_id:
# Set trace ID in context for downstream operations
from mnemocore.core.metrics import set_trace_id
set_trace_id(trace_id)
else:
# Try to extract from W3C Trace Context format
extracted_id = extract_trace_context(headers)
if extracted_id:
trace_id = extracted_id
# Process request
response = await call_next(request)
# Add trace ID to response headers for debugging
if trace_id:
response.headers["X-Trace-ID"] = trace_id
return response
# --- Lifecycle Management ---
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Security Check
config = get_config()
security = config.security if config else None
_api_key = (security.api_key if security else None) or os.getenv("HAIM_API_KEY", "")
if not _api_key:
logger.critical("No API Key configured! Set HAIM_API_KEY env var or security.api_key in config.")
sys.exit(1)
# Startup: Build dependency container
logger.info("Building dependency container...")
container = build_container(config)
app.state.container = container
# Check Redis health
logger.info("Checking Redis connection...")
if container.redis_storage:
if not await container.redis_storage.check_health():
logger.warning("Redis connection failed. Running in degraded mode (local only).")
else:
logger.warning("Redis storage not available.")
# Initialize implementation of engine with injected dependencies
logger.info("Initializing HAIMEngine...")
from mnemocore.core.tier_manager import TierManager
tier_manager = TierManager(config=config, qdrant_store=container.qdrant_store)
engine = HAIMEngine(
persist_path=config.paths.memory_file,
config=config,
tier_manager=tier_manager,
working_memory=container.working_memory,
episodic_store=container.episodic_store,
semantic_store=container.semantic_store,
)
await engine.initialize()
app.state.engine = engine
# Also expose the cognitive client to app state for agentic frameworks
from mnemocore.agent_interface import CognitiveMemoryClient
app.state.cognitive_client = CognitiveMemoryClient(
engine=engine,
wm=container.working_memory,
episodic=container.episodic_store,
semantic=container.semantic_store,
procedural=container.procedural_store,
meta=container.meta_memory,
)
yield
# Shutdown: Clean up
logger.info("Closing HAIMEngine...")
await app.state.engine.close()
logger.info("Closing Redis...")
if container.redis_storage:
await container.redis_storage.close()
app = FastAPI(
title="MnemoCore API",
description="MnemoCore - Infrastructure for Persistent Cognitive Memory - REST API (Async)",
version="3.5.2",
lifespan=lifespan
)
from mnemocore.core.reliability import (
CircuitBreakerError,
storage_circuit_breaker,
vector_circuit_breaker
)
@app.exception_handler(CircuitBreakerError)
async def circuit_breaker_exception_handler(request: Request, exc: CircuitBreakerError):
logger.error(f"Service Unavailable (Circuit Open): {exc}")
return JSONResponse(
status_code=503,
content={"detail": "Service Unavailable: Storage backend is down or overloaded.", "error": str(exc)},
)
@app.exception_handler(MnemoCoreError)
async def mnemocore_exception_handler(request: Request, exc: MnemoCoreError):
"""
Centralized exception handler for all MnemoCore errors.
Returns JSON with error details and stacktrace only in DEBUG mode.
"""
# Log the error with appropriate level
if exc.recoverable:
logger.warning(f"Recoverable error: {exc}")
else:
logger.error(f"Irrecoverable error: {exc}")
# Determine HTTP status code based on error type
if isinstance(exc, NotFoundError):
status_code = 404
elif isinstance(exc, ValidationError):
status_code = 400
elif isinstance(exc, RecoverableError):
status_code = 503 # Service Unavailable
else:
status_code = 500
# Build response
response_data = exc.to_dict(include_traceback=is_debug_mode())
return JSONResponse(
status_code=status_code,
content=response_data,
)
# Security Headers
app.add_middleware(SecurityHeadersMiddleware)
# Trace Context Middleware (for OpenTelemetry distributed tracing)
app.add_middleware(TraceContextMiddleware)
# CORS
config = get_config()
cors_origins = config.security.cors_origins if hasattr(config, "security") else ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount Prometheus metrics
app.mount("/metrics", metrics_app)
# --- Security ---
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def get_api_key(api_key: str = Security(api_key_header)):
config = get_config()
# Phase 3.5.1 Security - Prioritize config.security.api_key, fallback to env var
security = config.security if config else None
expected_key = (security.api_key if security else None) or os.getenv("HAIM_API_KEY", "")
if not expected_key:
# Should be caught by startup check, but double check
logger.error("API Key not configured during request processing.")
raise HTTPException(
status_code=500,
detail="Server Misconfiguration: API Key not set"
)
if not api_key or not secrets.compare_digest(api_key, expected_key):
raise HTTPException(
status_code=403,
detail="Invalid or missing API Key"
)
return api_key
def get_engine(request: Request) -> HAIMEngine:
return request.app.state.engine
def get_container(request: Request) -> Container:
return request.app.state.container
# --- Endpoints ---
@app.get("/", response_model=RootResponse)
async def root():
return {
"status": "ok",
"service": "MnemoCore",
"version": "3.5.1",
"phase": "Async I/O",
"timestamp": datetime.now(timezone.utc).isoformat()
}
@app.get("/health", response_model=HealthResponse)
async def health(container: Container = Depends(get_container), engine: HAIMEngine = Depends(get_engine)):
# Check Redis connectivity
redis_connected = False
if container.redis_storage:
redis_connected = await container.redis_storage.check_health()
# Check Circuit Breaker States (native implementation uses string state)
storage_cb_state = storage_circuit_breaker.state
vector_cb_state = vector_circuit_breaker.state
is_healthy = redis_connected and storage_cb_state == "closed" and vector_cb_state == "closed"
return {
"status": "healthy" if is_healthy else "degraded",
"redis_connected": redis_connected,
"storage_circuit_breaker": storage_cb_state,
"qdrant_circuit_breaker": vector_cb_state,
"engine_ready": engine is not None,
"timestamp": datetime.now(timezone.utc).isoformat()
}
@app.post(
"/store",
response_model=StoreResponse,
dependencies=[Depends(get_api_key), Depends(StoreRateLimiter())]
)
@track_async_latency(API_REQUEST_LATENCY, {"method": "POST", "endpoint": "/store"})
async def store_memory(
req: StoreRequest,
engine: HAIMEngine = Depends(get_engine),
container: Container = Depends(get_container)
):
"""Store a new memory (Async + Dual Write). Rate limit: 100/minute."""
API_REQUEST_COUNT.labels(method="POST", endpoint="/store", status="200").inc()
metadata = req.metadata or {}
if req.agent_id:
metadata["agent_id"] = req.agent_id
# 1. Run Core Engine (now Async)
mem_id = await engine.store(req.content, metadata=metadata)
# 2. Async Write to Redis (Metadata & LTP Index)
# Get the node details we just created
node = await engine.get_memory(mem_id)
if node and container.redis_storage:
redis_data = {
"id": node.id,
"content": node.content,
"metadata": node.metadata,
"ltp_strength": node.ltp_strength,
"created_at": node.created_at.isoformat()
}
try:
await container.redis_storage.store_memory(mem_id, redis_data, ttl=req.ttl)
# PubSub Event
await container.redis_storage.publish_event("memory.created", {"id": mem_id})
except Exception as e:
logger.exception(f"Failed async write for {mem_id}")
# Non-blocking failure for Redis write
return {
"ok": True,
"memory_id": mem_id,
"message": f"Stored memory: {mem_id}"
}
@app.post(
"/query",
response_model=QueryResponse,
dependencies=[Depends(get_api_key), Depends(QueryRateLimiter())]
)
@track_async_latency(API_REQUEST_LATENCY, {"method": "POST", "endpoint": "/query"})
async def query_memory(
req: QueryRequest,
engine: HAIMEngine = Depends(get_engine)
):
"""Query memories by semantic similarity (Async Wrapper). Rate limit: 500/minute."""
API_REQUEST_COUNT.labels(method="POST", endpoint="/query", status="200").inc()
# CPU heavy vector search (offloaded inside engine)
metadata_filter = {"agent_id": req.agent_id} if req.agent_id else None
results = await engine.query(req.query, top_k=req.top_k, metadata_filter=metadata_filter)
formatted = []
for mem_id, score in results:
# Check Redis first? For now rely on Engine's TierManager (which uses RAM/File)
# because Engine has the object cache + Hashing logic.
node = await engine.get_memory(mem_id)
if node:
formatted.append({
"id": mem_id,
"content": node.content,
"score": float(score),
"metadata": node.metadata,
"tier": getattr(node, "tier", "unknown")
})
return {
"ok": True,
"query": req.query,
"results": formatted
}
@app.get("/memory/{memory_id}", dependencies=[Depends(get_api_key)])
async def get_memory(
memory_id: str,
engine: HAIMEngine = Depends(get_engine),
container: Container = Depends(get_container)
):
"""Get a specific memory by ID."""
# Validate memory_id format
if not memory_id or len(memory_id) > 256:
raise ValidationError(
field="memory_id",
reason="Memory ID must be between 1 and 256 characters",
value=memory_id
)
# Try Redis first (L2 cache)
cached = None
if container.redis_storage:
cached = await container.redis_storage.retrieve_memory(memory_id)
if cached:
return {
"source": "redis",
**cached
}
# Fallback to Engine (TierManager)
node = await engine.get_memory(memory_id)
if not node:
raise MemoryNotFoundError(memory_id)
return {
"source": "engine",
"id": node.id,
"content": node.content,
"metadata": node.metadata,
"created_at": node.created_at.isoformat(),
"epistemic_value": getattr(node, "epistemic_value", 0.0),
"ltp_strength": getattr(node, "ltp_strength", 0.0),
"tier": getattr(node, "tier", "unknown")
}
@app.delete(
"/memory/{memory_id}",
response_model=DeleteResponse,
dependencies=[Depends(get_api_key)]
)
async def delete_memory(
memory_id: str,
engine: HAIMEngine = Depends(get_engine),
container: Container = Depends(get_container)
):
"""Delete a memory via Engine."""
# Validate memory_id format
if not memory_id or len(memory_id) > 256:
raise ValidationError(
field="memory_id",
reason="Memory ID must be between 1 and 256 characters",
value=memory_id
)
# Check if exists first for 404
node = await engine.get_memory(memory_id)
if not node:
raise MemoryNotFoundError(memory_id)
# Engine delete (handles HOT/WARM)
await engine.delete_memory(memory_id)
# Also Redis
if container.redis_storage:
await container.redis_storage.delete_memory(memory_id)
return {"ok": True, "deleted": memory_id}
# --- Phase 5: Cognitive Client Endpoints ---
class ObserveRequest(BaseModel):
agent_id: str
content: str
kind: str = "observation"
importance: float = 0.5
tags: Optional[List[str]] = None
@app.post("/wm/observe", dependencies=[Depends(get_api_key)])
async def observe_context(req: ObserveRequest, request: Request):
"""Push an observation explicitly into Working Memory."""
client = request.app.state.cognitive_client
if not client:
raise HTTPException(status_code=503, detail="Cognitive Client unavailable")
item_id = client.observe(
agent_id=req.agent_id,
content=req.content,
kind=req.kind,
importance=req.importance,
tags=req.tags
)
return {"ok": True, "item_id": item_id}
@app.get("/wm/context/{agent_id}", dependencies=[Depends(get_api_key)])
async def get_working_context(agent_id: str, limit: int = 16, request: Request = None):
"""Read active Working Memory context."""
client = request.app.state.cognitive_client
items = client.get_working_context(agent_id, limit=limit)
return {"ok": True, "items": [
{"id": i.id, "content": i.content, "kind": i.kind, "importance": i.importance}
for i in items
]}
class EpisodeStartRequest(BaseModel):
agent_id: str
goal: str
context: Optional[str] = None
@app.post("/episodes/start", dependencies=[Depends(get_api_key)])
async def start_episode(req: EpisodeStartRequest, request: Request):
"""Start a new episode chain."""
client = request.app.state.cognitive_client
ep_id = client.start_episode(req.agent_id, goal=req.goal, context=req.context)
return {"ok": True, "episode_id": ep_id}
# --- Conceptual Endpoints ---
@app.post(
"/concept",
response_model=ConceptResponse,
dependencies=[Depends(get_api_key), Depends(ConceptRateLimiter())]
)
async def define_concept(req: ConceptRequest, engine: HAIMEngine = Depends(get_engine)):
"""Define a concept with attributes. Rate limit: 100/minute."""
await engine.define_concept(req.name, req.attributes)
return {"ok": True, "concept": req.name}
@app.post(
"/analogy",
response_model=AnalogyResponse,
dependencies=[Depends(get_api_key), Depends(AnalogyRateLimiter())]
)
async def solve_analogy(req: AnalogyRequest, engine: HAIMEngine = Depends(get_engine)):
"""Solve an analogy. Rate limit: 100/minute."""
results = await engine.reason_by_analogy(
req.source_concept,
req.source_value,
req.target_concept
)
return {
"ok": True,
"analogy": f"{req.source_concept}:{req.source_value} :: {req.target_concept}:?",
"results": [{"value": v, "score": float(s)} for v, s in results[:10]]
}
@app.get("/stats", dependencies=[Depends(get_api_key)])
async def get_stats(engine: HAIMEngine = Depends(get_engine)):
"""Get aggregate engine stats."""
return await engine.get_stats()
# ─────────────────────────────────────────────────────────────────────────────
# Maintenance Endpoints
# ─────────────────────────────────────────────────────────────────────────────
@app.post("/maintenance/cleanup", dependencies=[Depends(get_api_key)])
async def cleanup_maintenance(threshold: float = 0.1, engine: HAIMEngine = Depends(get_engine)):
"""Remove decayed synapses and stale index nodes."""
await engine.cleanup_decay(threshold=threshold)
return {"ok": True, "message": f"Synapse cleanup triggered with threshold {threshold}"}
@app.post("/maintenance/consolidate", dependencies=[Depends(get_api_key)])
async def consolidate_maintenance(engine: HAIMEngine = Depends(get_engine)):
"""Trigger manual semantic consolidation pulse."""
if not engine._semantic_worker:
raise HTTPException(status_code=503, detail="Consolidation worker not initialized")
stats = await engine._semantic_worker.run_once()
return {"ok": True, "stats": stats}
@app.post("/maintenance/sweep", dependencies=[Depends(get_api_key)])
async def sweep_maintenance(engine: HAIMEngine = Depends(get_engine)):
"""Trigger manual immunology sweep."""
if not engine._immunology:
raise HTTPException(status_code=503, detail="Immunology loop not initialized")
stats = await engine._immunology.sweep()
return {"ok": True, "stats": stats}
# Rate limit info endpoint
@app.get("/rate-limits")
async def get_rate_limits():
"""Get current rate limit configuration."""
return {
"limits": {
category: {
"requests": cfg["requests"],
"window_seconds": cfg["window"],
"requests_per_minute": cfg["requests"],
"description": cfg["description"]
}
for category, cfg in RATE_LIMIT_CONFIGS.items()
}
}
# ─────────────────────────────────────────────────────────────────────────────
# Phase 4.5: Recursive Synthesis Engine Endpoint
# ─────────────────────────────────────────────────────────────────────────────
class RLMQueryRequest(BaseModel):
"""Request model for Phase 4.5 recursive memory query."""
query: str = Field(..., min_length=1, max_length=4096, description="The query to synthesize (can be complex/multi-topic)")
context_text: Optional[str] = Field(None, max_length=500000, description="Optional large external text (Ripple environment)")
project_id: Optional[str] = Field(None, max_length=128, description="Optional project scope for isolation masking")
max_depth: Optional[int] = Field(None, ge=0, le=5, description="Max recursion depth (0-5, default 3)")
max_sub_queries: Optional[int] = Field(None, ge=1, le=10, description="Max sub-queries to decompose into (1-10, default 5)")
top_k: Optional[int] = Field(None, ge=1, le=50, description="Final results to return (default 10)")
class RLMQueryResponse(BaseModel):
"""Response model for Phase 4.5 recursive memory query."""
ok: bool
query: str
sub_queries: List[str]
results: List[Dict[str, Any]]
synthesis: str
max_depth_hit: int
elapsed_ms: float
ripple_snippets: List[str]
stats: Dict[str, Any]
@app.post(
"/rlm/query",
response_model=RLMQueryResponse,
dependencies=[Depends(get_api_key), Depends(QueryRateLimiter())],
tags=["Phase 4.5"],
summary="Recursive Synthesis Query",
description=(
"Phase 4.5: Recursive Language Model (RLM) query. "
"Decomposes complex queries into sub-questions, searches MnemoCore in parallel, "
"recursively analyzes low-confidence clusters, and synthesizes a final answer. "
"Implements the MIT CSAIL RLM paradigm to eliminate Context Rot."
),
)
@track_async_latency(API_REQUEST_LATENCY, {"method": "POST", "endpoint": "/rlm/query"})
async def rlm_query(
req: RLMQueryRequest,
engine: HAIMEngine = Depends(get_engine),
):
"""
Phase 4.5 Recursive Synthesis Engine.
Instead of a single flat search, this endpoint:
1. Decomposes your query into focused sub-questions
2. Searches MnemoCore in PARALLEL for each sub-question
3. Recursively drills into low-confidence clusters
4. Synthesizes all results into a coherent answer
Rate limit: 500/minute (shared with /query).
"""
API_REQUEST_COUNT.labels(method="POST", endpoint="/rlm/query", status="200").inc()
from mnemocore.core.recursive_synthesizer import RecursiveSynthesizer, SynthesizerConfig
from mnemocore.core.ripple_context import RippleContext
# Build config from request overrides
synth_config = SynthesizerConfig(
max_depth=req.max_depth if req.max_depth is not None else 3,
max_sub_queries=req.max_sub_queries if req.max_sub_queries is not None else 5,
final_top_k=req.top_k if req.top_k is not None else 10,
)
# Build RippleContext if external text provided
ripple_ctx = None
if req.context_text and req.context_text.strip():
ripple_ctx = RippleContext(text=req.context_text, source_label="api_context")
# Run recursive synthesis (no LLM wired at API level β€” use heuristic mode)
# To enable LLM synthesis, configure via RLMIntegrator in your application code
synthesizer = RecursiveSynthesizer(engine=engine, config=synth_config)
result = await synthesizer.synthesize(
query=req.query,
ripple_context=ripple_ctx,
project_id=req.project_id,
)
return {
"ok": True,
"query": result.query,
"sub_queries": result.sub_queries,
"results": result.results,
"synthesis": result.synthesis,
"max_depth_hit": result.max_depth_hit,
"elapsed_ms": result.total_elapsed_ms,
"ripple_snippets": result.ripple_snippets,
"stats": result.stats,
}
# ─────────────────────────────────────────────────────────────────────────────
# Phase 5.0 β€” Agent 1: Trust & Provenance Endpoints
# ─────────────────────────────────────────────────────────────────────────────
@app.get(
"/memory/{memory_id}/lineage",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Trust"],
summary="Get full provenance lineage for a memory",
)
async def get_memory_lineage(
memory_id: str,
engine: HAIMEngine = Depends(get_engine),
):
"""
Return the complete provenance lineage of a memory:
origin (who created it, how, when) and all transformation events
(consolidated, verified, contradicted, archived, …).
"""
node = await engine.get_memory(memory_id)
if not node:
raise MemoryNotFoundError(memory_id)
prov = getattr(node, "provenance", None)
if prov is None:
return {
"ok": True,
"memory_id": memory_id,
"provenance": None,
"message": "No provenance record attached to this memory.",
}
return {
"ok": True,
"memory_id": memory_id,
"provenance": prov.to_dict(),
}
@app.get(
"/memory/{memory_id}/confidence",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Trust"],
summary="Get confidence envelope for a memory",
)
async def get_memory_confidence(
memory_id: str,
engine: HAIMEngine = Depends(get_engine),
):
"""
Return a structured confidence envelope for a memory, combining:
- Bayesian reliability (BayesianLTP posterior mean)
- access_count (evidence strength)
- staleness (days since last verification)
- source_type trust weight
- contradiction flag
Level: high | medium | low | contradicted | stale
"""
from mnemocore.core.confidence import build_confidence_envelope
node = await engine.get_memory(memory_id)
if not node:
raise MemoryNotFoundError(memory_id)
prov = getattr(node, "provenance", None)
envelope = build_confidence_envelope(node, prov)
return {
"ok": True,
"memory_id": memory_id,
"confidence": envelope,
}
# ─────────────────────────────────────────────────────────────────────────────
# Phase 5.0 β€” Agent 3 stub: Proactive Recall
# (Full implementation added by Agent 3 workstream)
# ─────────────────────────────────────────────────────────────────────────────
@app.get(
"/proactive",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Autonomy"],
summary="Retrieve contextually relevant memories without explicit query",
)
async def get_proactive_memories(
agent_id: Optional[str] = None,
limit: int = 10,
engine: HAIMEngine = Depends(get_engine),
):
"""
Proactive recall stub (Phase 5.0 / Agent 3).
Returns the most recently active high-LTP memories as a stand-in
until the full ProactiveRecallDaemon is implemented.
"""
nodes = await engine.tier_manager.get_hot_snapshot() if hasattr(engine, "tier_manager") else []
sorted_nodes = sorted(nodes, key=lambda n: n.ltp_strength, reverse=True)[:limit]
from mnemocore.core.confidence import build_confidence_envelope
results = []
for n in sorted_nodes:
prov = getattr(n, "provenance", None)
results.append({
"id": n.id,
"content": n.content,
"ltp_strength": round(n.ltp_strength, 4),
"confidence": build_confidence_envelope(n, prov),
"tier": getattr(n, "tier", "hot"),
})
return {"ok": True, "proactive_results": results, "count": len(results)}
# ─────────────────────────────────────────────────────────────────────────────
# Phase 5.0 β€” Agent 2: Memory Lifecycle Endpoints
# ─────────────────────────────────────────────────────────────────────────────
@app.get(
"/contradictions",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Lifecycle"],
summary="List active contradiction groups requiring resolution",
)
async def list_contradictions(
unresolved_only: bool = True,
):
"""
Returns all detected contradiction groups from the ContradictionRegistry.
By default only unresolved contradictions are returned.
"""
from mnemocore.core.contradiction import get_contradiction_detector
detector = get_contradiction_detector()
records = detector.registry.list_all(unresolved_only=unresolved_only)
return {
"ok": True,
"count": len(records),
"contradictions": [r.to_dict() for r in records],
}
class ResolveContradictionRequest(BaseModel):
note: Optional[str] = None
@app.post(
"/contradictions/{group_id}/resolve",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Lifecycle"],
summary="Mark a contradiction group as resolved",
)
async def resolve_contradiction(group_id: str, req: ResolveContradictionRequest):
"""Manually resolve a detected contradiction."""
from mnemocore.core.contradiction import get_contradiction_detector
detector = get_contradiction_detector()
success = detector.registry.resolve(group_id, note=req.note)
if not success:
raise HTTPException(status_code=404, detail=f"Contradiction group {group_id!r} not found.")
return {"ok": True, "resolved_group_id": group_id}
# ─────────────────────────────────────────────────────────────────────────────
# Phase 5.0 β€” Agent 3: Autonomous Cognition Endpoints
# ─────────────────────────────────────────────────────────────────────────────
@app.get(
"/memory/{memory_id}/emotional-tag",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Autonomy"],
summary="Get emotional (valence/arousal) tag for a memory",
)
async def get_emotional_tag_ep(
memory_id: str,
engine: HAIMEngine = Depends(get_engine),
):
"""Return the valence/arousal emotional metadata for a memory."""
from mnemocore.core.emotional_tag import get_emotional_tag
node = await engine.get_memory(memory_id)
if not node:
raise MemoryNotFoundError(memory_id)
tag = get_emotional_tag(node)
return {
"ok": True,
"memory_id": memory_id,
"emotional_tag": {
"valence": tag.valence,
"arousal": tag.arousal,
"salience": round(tag.salience(), 4),
},
}
class EmotionalTagPatchRequest(BaseModel):
valence: float
arousal: float
@app.patch(
"/memory/{memory_id}/emotional-tag",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Autonomy"],
summary="Attach or update emotional tag on a memory",
)
async def patch_emotional_tag(
memory_id: str,
req: EmotionalTagPatchRequest,
engine: HAIMEngine = Depends(get_engine),
):
from mnemocore.core.emotional_tag import EmotionalTag, attach_emotional_tag
node = await engine.get_memory(memory_id)
if not node:
raise MemoryNotFoundError(memory_id)
tag = EmotionalTag(valence=req.valence, arousal=req.arousal)
attach_emotional_tag(node, tag)
return {"ok": True, "memory_id": memory_id, "emotional_tag": tag.to_metadata_dict()}
# ─────────────────────────────────────────────────────────────────────────────
# Phase 5.0 β€” Agent 4: Prediction Endpoints
# ─────────────────────────────────────────────────────────────────────────────
_prediction_store_instance = None
def _get_prediction_store(engine: HAIMEngine = Depends(get_engine)):
from mnemocore.core.prediction_store import PredictionStore
global _prediction_store_instance
if _prediction_store_instance is None:
_prediction_store_instance = PredictionStore(engine=engine)
return _prediction_store_instance
class CreatePredictionRequest(BaseModel):
content: str
confidence: float = 0.5
deadline_days: Optional[float] = None
related_memory_ids: Optional[List[str]] = None
tags: Optional[List[str]] = None
class VerifyPredictionRequest(BaseModel):
success: bool
notes: Optional[str] = None
@app.post(
"/predictions",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Prediction"],
summary="Store a new forward-looking prediction",
)
async def create_prediction(req: CreatePredictionRequest):
from mnemocore.core.prediction_store import PredictionStore
global _prediction_store_instance
if _prediction_store_instance is None:
_prediction_store_instance = PredictionStore()
pred_id = _prediction_store_instance.create(
content=req.content,
confidence=req.confidence,
deadline_days=req.deadline_days,
related_memory_ids=req.related_memory_ids,
tags=req.tags,
)
pred = _prediction_store_instance.get(pred_id)
return {"ok": True, "prediction": pred.to_dict()}
@app.get(
"/predictions",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Prediction"],
summary="List all predictions",
)
async def list_predictions(status: Optional[str] = None):
from mnemocore.core.prediction_store import PredictionStore
global _prediction_store_instance
if _prediction_store_instance is None:
_prediction_store_instance = PredictionStore()
return {
"ok": True,
"predictions": [
p.to_dict()
for p in _prediction_store_instance.list_all(status=status)
],
}
@app.post(
"/predictions/{pred_id}/verify",
dependencies=[Depends(get_api_key)],
tags=["Phase 5.0 β€” Prediction"],
summary="Verify or falsify a prediction",
)
async def verify_prediction(pred_id: str, req: VerifyPredictionRequest):
from mnemocore.core.prediction_store import PredictionStore
global _prediction_store_instance
if _prediction_store_instance is None:
_prediction_store_instance = PredictionStore()
pred = await _prediction_store_instance.verify(pred_id, success=req.success, notes=req.notes)
if pred is None:
raise HTTPException(status_code=404, detail=f"Prediction {pred_id!r} not found.")
return {"ok": True, "prediction": pred.to_dict()}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)