File size: 1,818 Bytes
54ca85f
a9334c7
54ca85f
 
 
 
 
 
 
 
 
 
 
 
 
a9334c7
 
 
 
54ca85f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a9334c7
 
54ca85f
 
 
 
 
 
a9334c7
 
54ca85f
 
 
a9334c7
54ca85f
a9334c7
54ca85f
a9334c7
 
 
 
54ca85f
a9334c7
 
54ca85f
a9334c7
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""
queue_manager.py — Thread-pool job queue with status tracking.
"""

from __future__ import annotations

import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable


class State(str, Enum):
    QUEUED   = "queued"
    RUNNING  = "running"
    DONE     = "done"
    FAILED   = "failed"


@dataclass
class JobStatus:
    job_id: str
    state: State = State.QUEUED
    message: str = ""
    created_at: float = field(default_factory=time.time)
    updated_at: float = field(default_factory=time.time)

    def update(self, state: State, message: str = "") -> None:
        self.state = state
        self.message = message
        self.updated_at = time.time()


class JobQueue:
    def __init__(self, max_workers: int = 8, max_jobs: int = 100) -> None:
        self._max_jobs  = max_jobs
        self._executor  = ThreadPoolExecutor(max_workers=max_workers)
        self._jobs: dict[str, JobStatus] = {}
        self._lock = threading.Lock()

    def is_full(self) -> bool:
        with self._lock:
            active = sum(
                1 for s in self._jobs.values()
                if s.state in (State.QUEUED, State.RUNNING)
            )
            return active >= self._max_jobs

    def get_status(self, job_id: str) -> JobStatus | None:
        with self._lock:
            return self._jobs.get(job_id)

    def _set_state(self, job_id: str, state: State, message: str = "") -> None:
        with self._lock:
            if job_id in self._jobs:
                self._jobs[job_id].update(state, message)

    def register(self, job_id: str) -> JobStatus:
        status = JobStatus(job_id=job_id)
        with self._lock:
            self._jobs[job_id] = status
        return status