Spaces:
Running
Running
File size: 4,374 Bytes
5c429d4 5c38f67 5c429d4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | import sqlite3
import json
from pathlib import Path
import datetime
import os
_OUTPUT = Path(os.environ.get('OUTPUT_DIR', str(Path(__file__).resolve().parent.parent / 'output')))
_OUTPUT.mkdir(parents=True, exist_ok=True)
DB_PATH = _OUTPUT / 'jobs.db'
def _conn():
conn = sqlite3.connect(str(DB_PATH), detect_types=sqlite3.PARSE_DECLTYPES)
conn.row_factory = sqlite3.Row
return conn
def init_db():
c = _conn()
cur = c.cursor()
cur.execute('''
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
org_name TEXT,
org_url TEXT,
max_pages INTEGER,
runs INTEGER,
status TEXT,
progress TEXT,
result_path TEXT,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
''')
# add optional columns for user association if missing
cur.execute("PRAGMA table_info(jobs)")
cols = [r[1] for r in cur.fetchall()]
if 'user_id' not in cols:
try:
cur.execute('ALTER TABLE jobs ADD COLUMN user_id INTEGER')
except Exception:
pass
if 'company_id' not in cols:
try:
cur.execute('ALTER TABLE jobs ADD COLUMN company_id INTEGER')
except Exception:
pass
c.commit()
c.close()
def enqueue_job(url, org_name, org_url, max_pages=3, runs=1, user_id=None, company_id=None):
init_db()
now = datetime.datetime.utcnow()
c = _conn()
cur = c.cursor()
cur.execute('''INSERT INTO jobs (url,org_name,org_url,max_pages,runs,status,progress,created_at,updated_at,user_id,company_id) VALUES (?,?,?,?,?,?,?,?,?,?,?)''',
(url, org_name, org_url, max_pages, runs, 'pending', json.dumps({}), now, now, user_id, company_id))
jid = cur.lastrowid
c.commit()
c.close()
return jid
def list_jobs(limit=50):
init_db()
c = _conn()
cur = c.cursor()
cur.execute('SELECT * FROM jobs ORDER BY created_at DESC LIMIT ?', (limit,))
rows = [dict(r) for r in cur.fetchall()]
c.close()
return rows
def get_job(job_id):
init_db()
c = _conn()
cur = c.cursor()
cur.execute('SELECT * FROM jobs WHERE id=?', (job_id,))
row = cur.fetchone()
c.close()
return dict(row) if row else None
def update_job(job_id, status=None, progress=None, result_path=None):
init_db()
c = _conn()
cur = c.cursor()
updates = []
params = []
if status is not None:
updates.append('status=?'); params.append(status)
if progress is not None:
updates.append('progress=?'); params.append(json.dumps(progress))
if result_path is not None:
updates.append('result_path=?'); params.append(result_path)
params.append(datetime.datetime.utcnow())
params.append(job_id)
if updates:
sql = f"UPDATE jobs SET {', '.join(updates)}, updated_at=? WHERE id=?"
cur.execute(sql, params)
c.commit()
c.close()
def claim_next_job():
"""Atomically find a pending job and claim it by setting status to 'running'. Returns job id or None."""
init_db()
c = _conn()
cur = c.cursor()
try:
# simple approach: select a pending job, then update status where id and status still pending
cur.execute("SELECT id FROM jobs WHERE status='pending' ORDER BY created_at ASC LIMIT 1")
row = cur.fetchone()
if not row:
return None
jid = row['id']
now = datetime.datetime.utcnow()
cur.execute("UPDATE jobs SET status=?, updated_at=? WHERE id=? AND status='pending'", ('running', now, jid))
if cur.rowcount == 1:
c.commit()
cur.execute('SELECT * FROM jobs WHERE id=?', (jid,))
job = cur.fetchone()
return dict(job) if job else None
c.commit()
return None
finally:
c.close()
def claim_job(job_id):
"""Claim a specific job id if it's pending; returns True when claimed."""
init_db()
c = _conn()
cur = c.cursor()
try:
now = datetime.datetime.utcnow()
cur.execute("UPDATE jobs SET status=?, updated_at=? WHERE id=? AND status='pending'", ('running', now, job_id))
claimed = cur.rowcount == 1
if claimed:
c.commit()
return True
c.commit()
return False
finally:
c.close()
|