Spaces:
Sleeping
Sleeping
File size: 1,818 Bytes
54ca85f a9334c7 54ca85f a9334c7 54ca85f a9334c7 54ca85f a9334c7 54ca85f a9334c7 54ca85f a9334c7 54ca85f a9334c7 54ca85f a9334c7 54ca85f a9334c7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | """
queue_manager.py — Thread-pool job queue with status tracking.
"""
from __future__ import annotations
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
class State(str, Enum):
QUEUED = "queued"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
@dataclass
class JobStatus:
job_id: str
state: State = State.QUEUED
message: str = ""
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
def update(self, state: State, message: str = "") -> None:
self.state = state
self.message = message
self.updated_at = time.time()
class JobQueue:
def __init__(self, max_workers: int = 8, max_jobs: int = 100) -> None:
self._max_jobs = max_jobs
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._jobs: dict[str, JobStatus] = {}
self._lock = threading.Lock()
def is_full(self) -> bool:
with self._lock:
active = sum(
1 for s in self._jobs.values()
if s.state in (State.QUEUED, State.RUNNING)
)
return active >= self._max_jobs
def get_status(self, job_id: str) -> JobStatus | None:
with self._lock:
return self._jobs.get(job_id)
def _set_state(self, job_id: str, state: State, message: str = "") -> None:
with self._lock:
if job_id in self._jobs:
self._jobs[job_id].update(state, message)
def register(self, job_id: str) -> JobStatus:
status = JobStatus(job_id=job_id)
with self._lock:
self._jobs[job_id] = status
return status
|