File size: 4,374 Bytes
5c429d4
 
 
 
 
5c38f67
 
 
 
5c429d4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import sqlite3
import json
from pathlib import Path
import datetime

import os
_OUTPUT = Path(os.environ.get('OUTPUT_DIR', str(Path(__file__).resolve().parent.parent / 'output')))
_OUTPUT.mkdir(parents=True, exist_ok=True)
DB_PATH = _OUTPUT / 'jobs.db'


def _conn():
    conn = sqlite3.connect(str(DB_PATH), detect_types=sqlite3.PARSE_DECLTYPES)
    conn.row_factory = sqlite3.Row
    return conn


def init_db():
    c = _conn()
    cur = c.cursor()
    cur.execute('''
    CREATE TABLE IF NOT EXISTS jobs (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        url TEXT,
        org_name TEXT,
        org_url TEXT,
        max_pages INTEGER,
        runs INTEGER,
        status TEXT,
        progress TEXT,
        result_path TEXT,
        created_at TIMESTAMP,
        updated_at TIMESTAMP
    )
    ''')
    # add optional columns for user association if missing
    cur.execute("PRAGMA table_info(jobs)")
    cols = [r[1] for r in cur.fetchall()]
    if 'user_id' not in cols:
        try:
            cur.execute('ALTER TABLE jobs ADD COLUMN user_id INTEGER')
        except Exception:
            pass
    if 'company_id' not in cols:
        try:
            cur.execute('ALTER TABLE jobs ADD COLUMN company_id INTEGER')
        except Exception:
            pass
    c.commit()
    c.close()


def enqueue_job(url, org_name, org_url, max_pages=3, runs=1, user_id=None, company_id=None):
    init_db()
    now = datetime.datetime.utcnow()
    c = _conn()
    cur = c.cursor()
    cur.execute('''INSERT INTO jobs (url,org_name,org_url,max_pages,runs,status,progress,created_at,updated_at,user_id,company_id) VALUES (?,?,?,?,?,?,?,?,?,?,?)''',
                (url, org_name, org_url, max_pages, runs, 'pending', json.dumps({}), now, now, user_id, company_id))
    jid = cur.lastrowid
    c.commit()
    c.close()
    return jid


def list_jobs(limit=50):
    init_db()
    c = _conn()
    cur = c.cursor()
    cur.execute('SELECT * FROM jobs ORDER BY created_at DESC LIMIT ?', (limit,))
    rows = [dict(r) for r in cur.fetchall()]
    c.close()
    return rows


def get_job(job_id):
    init_db()
    c = _conn()
    cur = c.cursor()
    cur.execute('SELECT * FROM jobs WHERE id=?', (job_id,))
    row = cur.fetchone()
    c.close()
    return dict(row) if row else None


def update_job(job_id, status=None, progress=None, result_path=None):
    init_db()
    c = _conn()
    cur = c.cursor()
    updates = []
    params = []
    if status is not None:
        updates.append('status=?'); params.append(status)
    if progress is not None:
        updates.append('progress=?'); params.append(json.dumps(progress))
    if result_path is not None:
        updates.append('result_path=?'); params.append(result_path)
    params.append(datetime.datetime.utcnow())
    params.append(job_id)
    if updates:
        sql = f"UPDATE jobs SET {', '.join(updates)}, updated_at=? WHERE id=?"
        cur.execute(sql, params)
        c.commit()
    c.close()


def claim_next_job():
    """Atomically find a pending job and claim it by setting status to 'running'. Returns job id or None."""
    init_db()
    c = _conn()
    cur = c.cursor()
    try:
        # simple approach: select a pending job, then update status where id and status still pending
        cur.execute("SELECT id FROM jobs WHERE status='pending' ORDER BY created_at ASC LIMIT 1")
        row = cur.fetchone()
        if not row:
            return None
        jid = row['id']
        now = datetime.datetime.utcnow()
        cur.execute("UPDATE jobs SET status=?, updated_at=? WHERE id=? AND status='pending'", ('running', now, jid))
        if cur.rowcount == 1:
            c.commit()
            cur.execute('SELECT * FROM jobs WHERE id=?', (jid,))
            job = cur.fetchone()
            return dict(job) if job else None
        c.commit()
        return None
    finally:
        c.close()


def claim_job(job_id):
    """Claim a specific job id if it's pending; returns True when claimed."""
    init_db()
    c = _conn()
    cur = c.cursor()
    try:
        now = datetime.datetime.utcnow()
        cur.execute("UPDATE jobs SET status=?, updated_at=? WHERE id=? AND status='pending'", ('running', now, job_id))
        claimed = cur.rowcount == 1
        if claimed:
            c.commit()
            return True
        c.commit()
        return False
    finally:
        c.close()