agent01 / src /crawler /api.ts
Auto Deployer
Add enterprise agents: log/code/reg endpoints
a273844
import { Router, Request, Response } from 'express';
import { v4 as uuidv4 } from 'uuid';
import db from './db';
import { fetchPage } from './fetcher';
import { extractAndNormalize } from './extractor';
import { sliceAndDiff } from './differ';
import {
CreateSourceRequest,
CreateSourceResponse,
GetSourcesQuery,
GetSourcesResponse,
CreateJobsRequest,
CreateJobsResponse,
GetJobResponse,
SourceRegistry
} from './models';
const router = Router();
// API-00-01 新增来源
router.post('/sources', (req: Request<{}, {}, CreateSourceRequest>, res: Response<CreateSourceResponse | { error: string }>) => {
try {
const data = req.body;
// Generate a unique source_id, or use a combination of domain and type, etc.
const source_id = `source_${uuidv4().replace(/-/g, '').substring(0, 16)}`;
const stmt = db.prepare(`
INSERT INTO source_registry (
source_id, source_name, source_type, domain, entry_url, url_pattern,
parser_type, crawl_frequency, priority, enabled, topic_tags
) VALUES (
@source_id, @source_name, @source_type, @domain, @entry_url, @url_pattern,
@parser_type, @crawl_frequency, @priority, @enabled, @topic_tags
)
`);
stmt.run({
...data,
source_id,
enabled: data.enabled ? 1 : 0,
topic_tags: JSON.stringify(data.topic_tags || [])
});
res.json({ source_id, success: true });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
// API-00-02 来源列表
router.get('/sources', (req: Request<{}, {}, {}, GetSourcesQuery>, res: Response<GetSourcesResponse | { error: string }>) => {
try {
const { source_type, enabled, priority } = req.query;
let query = 'SELECT * FROM source_registry WHERE 1=1';
const params: any[] = [];
if (source_type) {
query += ' AND source_type = ?';
params.push(source_type);
}
if (enabled !== undefined) {
query += ' AND enabled = ?';
params.push(String(enabled) === 'true' ? 1 : 0);
}
if (priority) {
query += ' AND priority = ?';
params.push(priority);
}
const items = db.prepare(query).all(...params) as any[];
// Parse topic_tags and enabled
const formattedItems: SourceRegistry[] = items.map(item => ({
...item,
enabled: Boolean(item.enabled),
topic_tags: item.topic_tags ? JSON.parse(item.topic_tags) : []
}));
res.json({
items: formattedItems,
total: formattedItems.length
});
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
// Helper to run background job
async function runCrawlerJob(job_id: string, source_id: string) {
try {
// 1. Update job to running
db.prepare("UPDATE crawl_job SET status = 'running' WHERE job_id = ?").run(job_id);
// 2. Fetch page
const snapshot = await fetchPage(job_id, source_id);
// 3. Extract and normalize
const newDoc = extractAndNormalize(snapshot);
// 4. Diff and clause chunking
if (newDoc) {
sliceAndDiff(newDoc);
}
// 5. Update job to success
db.prepare("UPDATE crawl_job SET status = 'success', ended_at = ? WHERE job_id = ?").run(new Date().toISOString(), job_id);
} catch (error: any) {
// 6. Update job to failed
console.error(`Job ${job_id} failed:`, error);
db.prepare("UPDATE crawl_job SET status = 'failed', ended_at = ?, error_message = ? WHERE job_id = ?")
.run(new Date().toISOString(), error.message || 'Unknown error', job_id);
}
}
// API-00-03 手动触发抓取
router.post('/jobs', (req: Request<{}, {}, CreateJobsRequest>, res: Response<CreateJobsResponse | { error: string }>) => {
try {
const { source_ids, trigger_type } = req.body;
if (!source_ids || !Array.isArray(source_ids)) {
return res.status(400).json({ error: 'source_ids is required and must be an array' });
}
const job_ids: string[] = [];
const now = new Date().toISOString();
const stmt = db.prepare(`
INSERT INTO crawl_job (
job_id, source_id, trigger_type, status, started_at
) VALUES (
@job_id, @source_id, @trigger_type, @status, @started_at
)
`);
const insertMany = db.transaction((sources: string[]) => {
for (const source_id of sources) {
const job_id = `job_${uuidv4().replace(/-/g, '').substring(0, 16)}`;
stmt.run({
job_id,
source_id,
trigger_type: trigger_type || 'manual',
status: 'queued',
started_at: now
});
job_ids.push(job_id);
}
});
insertMany(source_ids);
// Trigger crawler workers asynchronously
for (let i = 0; i < source_ids.length; i++) {
runCrawlerJob(job_ids[i] as string, source_ids[i] as string).catch(err => console.error("Worker error:", err));
}
res.json({ job_ids, status: 'queued' });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
// API-00-04 查询抓取任务
router.get('/jobs/:jobId', (req: Request<{ jobId: string }>, res: Response<GetJobResponse | { error: string }>) => {
try {
const { jobId } = req.params;
const job = db.prepare('SELECT * FROM crawl_job WHERE job_id = ?').get(jobId) as any;
if (!job) {
return res.status(404).json({ error: 'Job not found' });
}
res.json({
job_id: job.job_id,
source_id: job.source_id,
status: job.status,
started_at: job.started_at,
ended_at: job.ended_at,
error_code: job.error_code,
retry_count: job.retry_count
});
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
// API-00-05 查询版本列表
router.get('/documents', (req: Request<{}, {}, {}, { source_id?: string }>, res: Response) => {
try {
const { source_id } = req.query;
let query = 'SELECT doc_id, version_date, effective_date, normalized_hash FROM normalized_document WHERE 1=1';
const params: any[] = [];
if (source_id) {
query += ' AND source_id = ?';
params.push(source_id);
}
const items = db.prepare(query).all(...params);
res.json({ items });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
// API-00-06 查询变化事件
router.get('/diff-events', (req: Request<{}, {}, {}, { source_id?: string }>, res: Response) => {
try {
const { source_id } = req.query;
let query = 'SELECT event_id, section_title, change_type, impact_level, topic_tags FROM diff_event WHERE 1=1';
const params: any[] = [];
if (source_id) {
query += ' AND source_id = ?';
params.push(source_id);
}
const items = db.prepare(query).all(...params) as any[];
const formattedItems = items.map(item => ({
...item,
topic_tags: item.topic_tags ? JSON.parse(item.topic_tags) : []
}));
res.json({ items: formattedItems });
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
// API-00-07 获取供 AGENT-01 使用的变化包
router.get('/updates', (req: Request<{}, {}, {}, { app_name?: string, business_line?: string, topics?: string, since?: string, limit?: string }>, res: Response) => {
try {
const { app_name, business_line, topics, since, limit } = req.query;
const limitNum = parseInt(limit || '50', 10);
let query = `
SELECT
d.event_id, d.section_title, d.change_type, d.new_excerpt as excerpt, d.topic_tags as diff_topic_tags,
s.source_name, s.source_type,
n.version_date
FROM diff_event d
JOIN source_registry s ON d.source_id = s.source_id
JOIN normalized_document n ON d.to_doc_id = n.doc_id
WHERE 1=1
`;
const params: any[] = [];
if (since) {
query += ' AND d.detected_at >= ?';
params.push(since);
}
query += ' ORDER BY d.detected_at DESC LIMIT ?';
params.push(limitNum);
const rows = db.prepare(query).all(...params) as any[];
let topicFilters: string[] = [];
if (topics) {
topicFilters = topics.split(',').map((t: string) => t.trim());
}
const updatesMap = {
peer_bank: {} as Record<string, any>,
regulator: {} as Record<string, any>,
sdk_vendor: {} as Record<string, any>
};
for (const row of rows) {
const rowTopics = row.diff_topic_tags ? JSON.parse(row.diff_topic_tags) : [];
if (topicFilters.length > 0) {
const hasTopic = topicFilters.some((t: string) => rowTopics.includes(t));
if (!hasTopic) continue;
}
const stype = row.source_type as string;
const mapKey = ((stype === 'peer_bank' || stype === 'regulator' || stype === 'sdk_vendor') ? stype : 'peer_bank') as 'peer_bank' | 'regulator' | 'sdk_vendor';
const key = `${row.source_name}_${row.version_date}`;
if (!updatesMap[mapKey][key]) {
updatesMap[mapKey][key] = {
source_name: row.source_name,
source_type: row.source_type,
version_date: row.version_date,
topic_tags: new Set<string>(),
changed_sections: []
};
}
rowTopics.forEach((t: string) => updatesMap[mapKey][key].topic_tags.add(t));
updatesMap[mapKey][key].changed_sections.push({
section_title: row.section_title,
change_type: row.change_type,
excerpt: row.excerpt
});
}
const formatUpdates = (map: Record<string, any>) => Object.values(map).map(v => ({
...v,
topic_tags: Array.from(v.topic_tags)
}));
res.json({
peer_updates: formatUpdates(updatesMap.peer_bank),
regulatory_updates: formatUpdates(updatesMap.regulator),
sdk_updates: formatUpdates(updatesMap.sdk_vendor)
});
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
export default router;