Spaces:
Running
Running
| import sqlite3 | |
| import json | |
| from pathlib import Path | |
| import datetime | |
| import os | |
| _OUTPUT = Path(os.environ.get('OUTPUT_DIR', str(Path(__file__).resolve().parent.parent / 'output'))) | |
| _OUTPUT.mkdir(parents=True, exist_ok=True) | |
| DB_PATH = _OUTPUT / 'jobs.db' | |
| def _conn(): | |
| conn = sqlite3.connect(str(DB_PATH), detect_types=sqlite3.PARSE_DECLTYPES) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| c = _conn() | |
| cur = c.cursor() | |
| cur.execute(''' | |
| CREATE TABLE IF NOT EXISTS jobs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| url TEXT, | |
| org_name TEXT, | |
| org_url TEXT, | |
| max_pages INTEGER, | |
| runs INTEGER, | |
| status TEXT, | |
| progress TEXT, | |
| result_path TEXT, | |
| created_at TIMESTAMP, | |
| updated_at TIMESTAMP | |
| ) | |
| ''') | |
| # add optional columns for user association if missing | |
| cur.execute("PRAGMA table_info(jobs)") | |
| cols = [r[1] for r in cur.fetchall()] | |
| if 'user_id' not in cols: | |
| try: | |
| cur.execute('ALTER TABLE jobs ADD COLUMN user_id INTEGER') | |
| except Exception: | |
| pass | |
| if 'company_id' not in cols: | |
| try: | |
| cur.execute('ALTER TABLE jobs ADD COLUMN company_id INTEGER') | |
| except Exception: | |
| pass | |
| c.commit() | |
| c.close() | |
| def enqueue_job(url, org_name, org_url, max_pages=3, runs=1, user_id=None, company_id=None): | |
| init_db() | |
| now = datetime.datetime.utcnow() | |
| c = _conn() | |
| cur = c.cursor() | |
| cur.execute('''INSERT INTO jobs (url,org_name,org_url,max_pages,runs,status,progress,created_at,updated_at,user_id,company_id) VALUES (?,?,?,?,?,?,?,?,?,?,?)''', | |
| (url, org_name, org_url, max_pages, runs, 'pending', json.dumps({}), now, now, user_id, company_id)) | |
| jid = cur.lastrowid | |
| c.commit() | |
| c.close() | |
| return jid | |
| def list_jobs(limit=50): | |
| init_db() | |
| c = _conn() | |
| cur = c.cursor() | |
| cur.execute('SELECT * FROM jobs ORDER BY created_at DESC LIMIT ?', (limit,)) | |
| rows = [dict(r) for r in cur.fetchall()] | |
| c.close() | |
| return rows | |
| def get_job(job_id): | |
| init_db() | |
| c = _conn() | |
| cur = c.cursor() | |
| cur.execute('SELECT * FROM jobs WHERE id=?', (job_id,)) | |
| row = cur.fetchone() | |
| c.close() | |
| return dict(row) if row else None | |
| def update_job(job_id, status=None, progress=None, result_path=None): | |
| init_db() | |
| c = _conn() | |
| cur = c.cursor() | |
| updates = [] | |
| params = [] | |
| if status is not None: | |
| updates.append('status=?'); params.append(status) | |
| if progress is not None: | |
| updates.append('progress=?'); params.append(json.dumps(progress)) | |
| if result_path is not None: | |
| updates.append('result_path=?'); params.append(result_path) | |
| params.append(datetime.datetime.utcnow()) | |
| params.append(job_id) | |
| if updates: | |
| sql = f"UPDATE jobs SET {', '.join(updates)}, updated_at=? WHERE id=?" | |
| cur.execute(sql, params) | |
| c.commit() | |
| c.close() | |
| def claim_next_job(): | |
| """Atomically find a pending job and claim it by setting status to 'running'. Returns job id or None.""" | |
| init_db() | |
| c = _conn() | |
| cur = c.cursor() | |
| try: | |
| # simple approach: select a pending job, then update status where id and status still pending | |
| cur.execute("SELECT id FROM jobs WHERE status='pending' ORDER BY created_at ASC LIMIT 1") | |
| row = cur.fetchone() | |
| if not row: | |
| return None | |
| jid = row['id'] | |
| now = datetime.datetime.utcnow() | |
| cur.execute("UPDATE jobs SET status=?, updated_at=? WHERE id=? AND status='pending'", ('running', now, jid)) | |
| if cur.rowcount == 1: | |
| c.commit() | |
| cur.execute('SELECT * FROM jobs WHERE id=?', (jid,)) | |
| job = cur.fetchone() | |
| return dict(job) if job else None | |
| c.commit() | |
| return None | |
| finally: | |
| c.close() | |
| def claim_job(job_id): | |
| """Claim a specific job id if it's pending; returns True when claimed.""" | |
| init_db() | |
| c = _conn() | |
| cur = c.cursor() | |
| try: | |
| now = datetime.datetime.utcnow() | |
| cur.execute("UPDATE jobs SET status=?, updated_at=? WHERE id=? AND status='pending'", ('running', now, job_id)) | |
| claimed = cur.rowcount == 1 | |
| if claimed: | |
| c.commit() | |
| return True | |
| c.commit() | |
| return False | |
| finally: | |
| c.close() | |