rafmacalaba's picture
feat: multi-corpus support
a2c885c
import fs from 'fs';
import path from 'path';
import { commit } from '@huggingface/hub';
import { HF_DATASET_ID, HF_DATASET_BASE_URL, getCorpus, getDocRepoPath, getDocLocalPath } from './config.js';
const isHFSpace = () => {
return process.env.HF_TOKEN && process.env.NODE_ENV !== 'development';
};
/**
* Reads the full document JSON (all pages) from local file
*/
function readDocLocal(corpus, docIndex) {
const filePath = getDocLocalPath(corpus, docIndex);
if (!fs.existsSync(filePath)) return null;
return JSON.parse(fs.readFileSync(filePath, 'utf-8'));
}
/**
* Writes the full document JSON (all pages) to local file
*/
function writeDocLocal(corpus, docIndex, pagesData) {
const filePath = getDocLocalPath(corpus, docIndex);
fs.writeFileSync(filePath, JSON.stringify(pagesData, null, 2));
console.log(`Saved doc_${docIndex} locally (${corpus.id})`);
}
/**
* Finds the page index in the pages array by page_number
*/
function findPageIndex(pagesData, pageNumber) {
return pagesData.findIndex(p => p.document?.pages?.[0] === pageNumber);
}
/**
* Fetches the document JSON from HuggingFace
*/
async function fetchDocFromHF(corpus, docIndex) {
const token = process.env.HF_TOKEN;
const repoPath = getDocRepoPath(corpus, docIndex);
const url = `${HF_DATASET_BASE_URL}/raw/main/${repoPath}`;
const res = await fetch(url, {
headers: { 'Authorization': `Bearer ${token}` }
});
if (!res.ok) throw new Error(`Failed to fetch doc_${docIndex} (${corpus.id}) from HF: ${res.status}`);
return res.json();
}
/**
* Commits the updated document JSON back to HuggingFace
*/
async function commitDocToHF(corpus, docIndex, pagesData, commitMessage) {
const token = process.env.HF_TOKEN;
if (!token) throw new Error("Missing HF_TOKEN");
const repoPath = getDocRepoPath(corpus, docIndex);
const content = JSON.stringify(pagesData, null, 2);
await commit({
repo: { type: 'dataset', name: HF_DATASET_ID },
credentials: { accessToken: token },
title: commitMessage,
operations: [{
operation: 'addOrUpdate',
path: repoPath,
content: new Blob([content], { type: 'application/json' }),
}],
});
console.log(`Committed ${repoPath} to HF dataset ${HF_DATASET_ID}`);
}
// ─── Public API ────────────────────────────────────
/**
* Saves an annotation by appending it to the page's datasets array.
* @param {Object} annotation - Must include corpus (optional, defaults to first), document_index, page_number
*/
export async function saveAnnotation(annotation) {
const corpus = getCorpus(annotation.corpus);
const { document_index: docIndex, page_number: pageNumber } = annotation;
const datasetEntry = {
dataset_name: annotation.dataset_name,
dataset_tag: annotation.dataset_tag,
source: annotation.source || 'human',
annotator: annotation.annotator,
timestamp: annotation.timestamp,
description: annotation.description || null,
data_type: annotation.data_type || null,
acronym: annotation.acronym || null,
author: annotation.author || null,
producer: annotation.producer || null,
geography: annotation.geography || null,
publication_year: annotation.publication_year || null,
reference_year: annotation.reference_year || null,
reference_population: annotation.reference_population || null,
is_used: annotation.is_used || null,
usage_context: annotation.usage_context || null,
};
if (isHFSpace()) {
const pagesData = await fetchDocFromHF(corpus, docIndex);
const pageIdx = findPageIndex(pagesData, pageNumber);
if (pageIdx === -1) throw new Error(`Page ${pageNumber} not found in doc_${docIndex} (${corpus.id})`);
pagesData[pageIdx].datasets.push(datasetEntry);
await commitDocToHF(corpus, docIndex, pagesData,
`Add annotation to ${corpus.id}/doc_${docIndex} page ${pageNumber}`);
} else {
const pagesData = readDocLocal(corpus, docIndex);
if (!pagesData) throw new Error(`doc_${docIndex} not found locally (${corpus.id})`);
const pageIdx = findPageIndex(pagesData, pageNumber);
if (pageIdx === -1) throw new Error(`Page ${pageNumber} not found in doc_${docIndex} (${corpus.id})`);
pagesData[pageIdx].datasets.push(datasetEntry);
writeDocLocal(corpus, docIndex, pagesData);
}
}
/**
* Deletes an annotation by timestamp
*/
export async function deleteAnnotation(timestamp, docIndex, pageNumber, corpusId) {
const corpus = getCorpus(corpusId);
if (isHFSpace()) {
const pagesData = await fetchDocFromHF(corpus, docIndex);
const pageIdx = findPageIndex(pagesData, pageNumber);
if (pageIdx === -1) return false;
const before = pagesData[pageIdx].datasets.length;
pagesData[pageIdx].datasets = pagesData[pageIdx].datasets.filter(
ds => ds.timestamp !== timestamp
);
if (pagesData[pageIdx].datasets.length === before) return false;
await commitDocToHF(corpus, docIndex, pagesData,
`Delete annotation from ${corpus.id}/doc_${docIndex} page ${pageNumber}`);
return true;
} else {
const pagesData = readDocLocal(corpus, docIndex);
if (!pagesData) return false;
const pageIdx = findPageIndex(pagesData, pageNumber);
if (pageIdx === -1) return false;
const before = pagesData[pageIdx].datasets.length;
pagesData[pageIdx].datasets = pagesData[pageIdx].datasets.filter(
ds => ds.timestamp !== timestamp
);
if (pagesData[pageIdx].datasets.length === before) return false;
writeDocLocal(corpus, docIndex, pagesData);
return true;
}
}
/**
* Updates an annotation by timestamp
*/
export async function updateAnnotation(timestamp, docIndex, pageNumber, updates, corpusId) {
const corpus = getCorpus(corpusId);
if (isHFSpace()) {
const pagesData = await fetchDocFromHF(corpus, docIndex);
const pageIdx = findPageIndex(pagesData, pageNumber);
if (pageIdx === -1) return null;
const dsIdx = pagesData[pageIdx].datasets.findIndex(ds => ds.timestamp === timestamp);
if (dsIdx === -1) return null;
pagesData[pageIdx].datasets[dsIdx] = {
...pagesData[pageIdx].datasets[dsIdx],
...updates
};
await commitDocToHF(corpus, docIndex, pagesData,
`Update annotation in ${corpus.id}/doc_${docIndex} page ${pageNumber}`);
return pagesData[pageIdx].datasets[dsIdx];
} else {
const pagesData = readDocLocal(corpus, docIndex);
if (!pagesData) return null;
const pageIdx = findPageIndex(pagesData, pageNumber);
if (pageIdx === -1) return null;
const dsIdx = pagesData[pageIdx].datasets.findIndex(ds => ds.timestamp === timestamp);
if (dsIdx === -1) return null;
pagesData[pageIdx].datasets[dsIdx] = {
...pagesData[pageIdx].datasets[dsIdx],
...updates
};
writeDocLocal(corpus, docIndex, pagesData);
return pagesData[pageIdx].datasets[dsIdx];
}
}
/**
* Retrieves all human annotations from local files.
*/
export async function getAnnotations(docIndex = null, corpusId = null) {
const { getCorpora } = await import('./config.js');
const corporaList = corpusId ? [getCorpus(corpusId)] : getCorpora();
const results = [];
for (const corpus of corporaList) {
const extractionsDir = path.join(process.cwd(), 'annotation_data', corpus.extractions_dir);
if (!fs.existsSync(extractionsDir)) continue;
const dirs = fs.readdirSync(extractionsDir).filter(d => d.startsWith('doc_'));
for (const dir of dirs) {
const idx = parseInt(dir.replace('doc_', ''), 10);
if (docIndex !== null && idx !== docIndex) continue;
const filePath = path.join(extractionsDir, dir, 'raw', `${dir}_direct_judged.jsonl`);
if (!fs.existsSync(filePath)) continue;
try {
const pagesData = JSON.parse(fs.readFileSync(filePath, 'utf-8'));
for (const page of pagesData) {
const pageNum = page.document?.pages?.[0];
for (const ds of (page.datasets || [])) {
if (ds.annotator) {
results.push({
...ds,
corpus: corpus.id,
document_index: idx,
page_number: pageNum,
});
}
}
}
} catch (e) {
console.error(`Error reading ${filePath}:`, e);
}
}
}
return results;
}