Spaces:
Sleeping
Sleeping
File size: 1,489 Bytes
240c946 bbbfba8 240c946 bbbfba8 240c946 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | """API layer — FastAPI dependencies and shared state.
This module provides dependency injection for the DB, FileStore, and
JobService, scoped to the application lifespan.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from src.app.jobs.service import JobService
from src.app.persistence.db import Database
from src.app.persistence.file_store import FileStore
if TYPE_CHECKING:
from src.app.settings import Settings
# Module-level singletons, initialized during lifespan startup.
_db: Database | None = None
_file_store: FileStore | None = None
_job_service: JobService | None = None
def init_services(settings: Settings) -> None:
"""Initialize singletons. Called once during app startup."""
global _db, _file_store, _job_service
_db = Database(settings.db_path)
_db.connect()
_file_store = FileStore(settings.storage_root)
_file_store.ensure_dirs()
_job_service = JobService(_db, _file_store)
def shutdown_services() -> None:
"""Clean up. Called during app shutdown."""
global _db
if _db:
_db.close()
_db = None
def get_db() -> Database:
assert _db is not None, "Database not initialized — call init_services first"
return _db
def get_file_store() -> FileStore:
assert _file_store is not None, "FileStore not initialized"
return _file_store
def get_job_service() -> JobService:
assert _job_service is not None, "JobService not initialized"
return _job_service
|