import { Router, Request, Response } from 'express'; import { v4 as uuidv4 } from 'uuid'; import db from './db'; import { fetchPage } from './fetcher'; import { extractAndNormalize } from './extractor'; import { sliceAndDiff } from './differ'; import { CreateSourceRequest, CreateSourceResponse, GetSourcesQuery, GetSourcesResponse, CreateJobsRequest, CreateJobsResponse, GetJobResponse, SourceRegistry } from './models'; const router = Router(); // API-00-01 新增来源 router.post('/sources', (req: Request<{}, {}, CreateSourceRequest>, res: Response) => { try { const data = req.body; // Generate a unique source_id, or use a combination of domain and type, etc. const source_id = `source_${uuidv4().replace(/-/g, '').substring(0, 16)}`; const stmt = db.prepare(` INSERT INTO source_registry ( source_id, source_name, source_type, domain, entry_url, url_pattern, parser_type, crawl_frequency, priority, enabled, topic_tags ) VALUES ( @source_id, @source_name, @source_type, @domain, @entry_url, @url_pattern, @parser_type, @crawl_frequency, @priority, @enabled, @topic_tags ) `); stmt.run({ ...data, source_id, enabled: data.enabled ? 1 : 0, topic_tags: JSON.stringify(data.topic_tags || []) }); res.json({ source_id, success: true }); } catch (error: any) { res.status(500).json({ error: error.message }); } }); // API-00-02 来源列表 router.get('/sources', (req: Request<{}, {}, {}, GetSourcesQuery>, res: Response) => { try { const { source_type, enabled, priority } = req.query; let query = 'SELECT * FROM source_registry WHERE 1=1'; const params: any[] = []; if (source_type) { query += ' AND source_type = ?'; params.push(source_type); } if (enabled !== undefined) { query += ' AND enabled = ?'; params.push(String(enabled) === 'true' ? 1 : 0); } if (priority) { query += ' AND priority = ?'; params.push(priority); } const items = db.prepare(query).all(...params) as any[]; // Parse topic_tags and enabled const formattedItems: SourceRegistry[] = items.map(item => ({ ...item, enabled: Boolean(item.enabled), topic_tags: item.topic_tags ? JSON.parse(item.topic_tags) : [] })); res.json({ items: formattedItems, total: formattedItems.length }); } catch (error: any) { res.status(500).json({ error: error.message }); } }); // Helper to run background job async function runCrawlerJob(job_id: string, source_id: string) { try { // 1. Update job to running db.prepare("UPDATE crawl_job SET status = 'running' WHERE job_id = ?").run(job_id); // 2. Fetch page const snapshot = await fetchPage(job_id, source_id); // 3. Extract and normalize const newDoc = extractAndNormalize(snapshot); // 4. Diff and clause chunking if (newDoc) { sliceAndDiff(newDoc); } // 5. Update job to success db.prepare("UPDATE crawl_job SET status = 'success', ended_at = ? WHERE job_id = ?").run(new Date().toISOString(), job_id); } catch (error: any) { // 6. Update job to failed console.error(`Job ${job_id} failed:`, error); db.prepare("UPDATE crawl_job SET status = 'failed', ended_at = ?, error_message = ? WHERE job_id = ?") .run(new Date().toISOString(), error.message || 'Unknown error', job_id); } } // API-00-03 手动触发抓取 router.post('/jobs', (req: Request<{}, {}, CreateJobsRequest>, res: Response) => { try { const { source_ids, trigger_type } = req.body; if (!source_ids || !Array.isArray(source_ids)) { return res.status(400).json({ error: 'source_ids is required and must be an array' }); } const job_ids: string[] = []; const now = new Date().toISOString(); const stmt = db.prepare(` INSERT INTO crawl_job ( job_id, source_id, trigger_type, status, started_at ) VALUES ( @job_id, @source_id, @trigger_type, @status, @started_at ) `); const insertMany = db.transaction((sources: string[]) => { for (const source_id of sources) { const job_id = `job_${uuidv4().replace(/-/g, '').substring(0, 16)}`; stmt.run({ job_id, source_id, trigger_type: trigger_type || 'manual', status: 'queued', started_at: now }); job_ids.push(job_id); } }); insertMany(source_ids); // Trigger crawler workers asynchronously for (let i = 0; i < source_ids.length; i++) { runCrawlerJob(job_ids[i] as string, source_ids[i] as string).catch(err => console.error("Worker error:", err)); } res.json({ job_ids, status: 'queued' }); } catch (error: any) { res.status(500).json({ error: error.message }); } }); // API-00-04 查询抓取任务 router.get('/jobs/:jobId', (req: Request<{ jobId: string }>, res: Response) => { try { const { jobId } = req.params; const job = db.prepare('SELECT * FROM crawl_job WHERE job_id = ?').get(jobId) as any; if (!job) { return res.status(404).json({ error: 'Job not found' }); } res.json({ job_id: job.job_id, source_id: job.source_id, status: job.status, started_at: job.started_at, ended_at: job.ended_at, error_code: job.error_code, retry_count: job.retry_count }); } catch (error: any) { res.status(500).json({ error: error.message }); } }); // API-00-05 查询版本列表 router.get('/documents', (req: Request<{}, {}, {}, { source_id?: string }>, res: Response) => { try { const { source_id } = req.query; let query = 'SELECT doc_id, version_date, effective_date, normalized_hash FROM normalized_document WHERE 1=1'; const params: any[] = []; if (source_id) { query += ' AND source_id = ?'; params.push(source_id); } const items = db.prepare(query).all(...params); res.json({ items }); } catch (error: any) { res.status(500).json({ error: error.message }); } }); // API-00-06 查询变化事件 router.get('/diff-events', (req: Request<{}, {}, {}, { source_id?: string }>, res: Response) => { try { const { source_id } = req.query; let query = 'SELECT event_id, section_title, change_type, impact_level, topic_tags FROM diff_event WHERE 1=1'; const params: any[] = []; if (source_id) { query += ' AND source_id = ?'; params.push(source_id); } const items = db.prepare(query).all(...params) as any[]; const formattedItems = items.map(item => ({ ...item, topic_tags: item.topic_tags ? JSON.parse(item.topic_tags) : [] })); res.json({ items: formattedItems }); } catch (error: any) { res.status(500).json({ error: error.message }); } }); // API-00-07 获取供 AGENT-01 使用的变化包 router.get('/updates', (req: Request<{}, {}, {}, { app_name?: string, business_line?: string, topics?: string, since?: string, limit?: string }>, res: Response) => { try { const { app_name, business_line, topics, since, limit } = req.query; const limitNum = parseInt(limit || '50', 10); let query = ` SELECT d.event_id, d.section_title, d.change_type, d.new_excerpt as excerpt, d.topic_tags as diff_topic_tags, s.source_name, s.source_type, n.version_date FROM diff_event d JOIN source_registry s ON d.source_id = s.source_id JOIN normalized_document n ON d.to_doc_id = n.doc_id WHERE 1=1 `; const params: any[] = []; if (since) { query += ' AND d.detected_at >= ?'; params.push(since); } query += ' ORDER BY d.detected_at DESC LIMIT ?'; params.push(limitNum); const rows = db.prepare(query).all(...params) as any[]; let topicFilters: string[] = []; if (topics) { topicFilters = topics.split(',').map((t: string) => t.trim()); } const updatesMap = { peer_bank: {} as Record, regulator: {} as Record, sdk_vendor: {} as Record }; for (const row of rows) { const rowTopics = row.diff_topic_tags ? JSON.parse(row.diff_topic_tags) : []; if (topicFilters.length > 0) { const hasTopic = topicFilters.some((t: string) => rowTopics.includes(t)); if (!hasTopic) continue; } const stype = row.source_type as string; const mapKey = ((stype === 'peer_bank' || stype === 'regulator' || stype === 'sdk_vendor') ? stype : 'peer_bank') as 'peer_bank' | 'regulator' | 'sdk_vendor'; const key = `${row.source_name}_${row.version_date}`; if (!updatesMap[mapKey][key]) { updatesMap[mapKey][key] = { source_name: row.source_name, source_type: row.source_type, version_date: row.version_date, topic_tags: new Set(), changed_sections: [] }; } rowTopics.forEach((t: string) => updatesMap[mapKey][key].topic_tags.add(t)); updatesMap[mapKey][key].changed_sections.push({ section_title: row.section_title, change_type: row.change_type, excerpt: row.excerpt }); } const formatUpdates = (map: Record) => Object.values(map).map(v => ({ ...v, topic_tags: Array.from(v.topic_tags) })); res.json({ peer_updates: formatUpdates(updatesMap.peer_bank), regulatory_updates: formatUpdates(updatesMap.regulator), sdk_updates: formatUpdates(updatesMap.sdk_vendor) }); } catch (error: any) { res.status(500).json({ error: error.message }); } }); export default router;