File size: 1,358 Bytes
6cfe55f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | import importlib.util
import pathlib
import threading
import time
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
MODULE_PATH = ROOT / "app" / "services" / "task_serial_executor.py"
spec = importlib.util.spec_from_file_location("task_serial_executor", MODULE_PATH)
if spec is None or spec.loader is None:
raise ImportError("task_serial_executor module spec not found")
task_serial_executor = importlib.util.module_from_spec(spec)
spec.loader.exec_module(task_serial_executor)
SerialTaskExecutor = task_serial_executor.SerialTaskExecutor
class TestTaskSerialExecutor(unittest.TestCase):
def test_executor_runs_tasks_one_by_one(self):
executor = SerialTaskExecutor()
state_lock = threading.Lock()
state = {"active": 0, "peak_active": 0}
def critical_work():
with state_lock:
state["active"] += 1
state["peak_active"] = max(state["peak_active"], state["active"])
time.sleep(0.05)
with state_lock:
state["active"] -= 1
threads = [threading.Thread(target=lambda: executor.run(critical_work)) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(state["peak_active"], 1)
if __name__ == "__main__":
unittest.main()
|