File size: 3,919 Bytes
027123c
2ba0613
 
027123c
 
 
 
 
6bff5d9
027123c
 
2ba0613
027123c
2ba0613
027123c
2ba0613
 
027123c
 
 
 
 
 
 
2ba0613
 
6bff5d9
b1f1ccd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
027123c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2ba0613
 
027123c
 
 
 
 
 
 
 
 
 
 
2ba0613
 
 
 
 
 
027123c
 
 
 
 
 
 
 
2ba0613
6bff5d9
 
2ba0613
 
 
027123c
 
 
 
 
 
 
 
2ba0613
6bff5d9
 
 
 
 
 
 
 
2ba0613
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""Document management API endpoints."""
 
from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, File
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.postgres.connection import get_db
from src.document.document_service import document_service
from src.middlewares.logging import get_logger, log_execution
from src.middlewares.rate_limit import limiter
from src.pipeline.document_pipeline import document_pipeline
from pydantic import BaseModel
from typing import List
 
logger = get_logger("document_api")
 
router = APIRouter(prefix="/api/v1", tags=["Documents"])
 
 
class DocumentResponse(BaseModel):
    id: str
    filename: str
    status: str
    file_size: int
    file_type: str
    created_at: str
 
 
# NOTE: Keep in sync with SUPPORTED_FILE_TYPES in src/pipeline/document_pipeline.py
_DOC_TYPES = [
    {"doc_type": "pdf", "max_size": 10, "status": "active", "message": None},
    {"doc_type": "docx", "max_size": 10, "status": "active", "message": None},
    {"doc_type": "txt", "max_size": 10, "status": "active", "message": None},
    {"doc_type": "csv", "max_size": 10, "status": "active", "message": None},
    {"doc_type": "xlsx", "max_size": 10, "status": "active", "message": None},
]


@router.get(
    "/documents/doctypes",
    summary="List supported document types",
    response_description="All document types supported by DataEyond with their size limits and status.",
)
@log_execution(logger)
async def get_document_types():
    """Return every document type DataEyond can process, with max file size and active/inactive status."""
    return {"status": "success", "data": _DOC_TYPES}


@router.get("/documents/{user_id}", response_model=List[DocumentResponse])
@log_execution(logger)
async def list_documents(
    user_id: str,
    db: AsyncSession = Depends(get_db)
):
    """List all documents for a user."""
    documents = await document_service.get_user_documents(db, user_id)
    return [
        DocumentResponse(
            id=doc.id,
            filename=doc.filename,
            status=doc.status,
            file_size=doc.file_size or 0,
            file_type=doc.file_type,
            created_at=doc.created_at.isoformat()
        )
        for doc in documents
    ]
 
 
@router.post("/document/upload")
@limiter.limit("10/minute")
@log_execution(logger)
async def upload_document(
    request: Request,
    file: UploadFile = File(...),
    user_id: str = None,
    db: AsyncSession = Depends(get_db)
):
    """Upload a document."""
    if not user_id:
        raise HTTPException(status_code=400, detail="user_id is required")
 
    data = await document_pipeline.upload(file, user_id, db)
    return {"status": "success", "message": "Document uploaded successfully", "data": data}
 
 
@router.delete("/document/delete")
@log_execution(logger)
async def delete_document(
    document_id: str,
    user_id: str,
    db: AsyncSession = Depends(get_db)
):
    """Delete a document."""
    await document_pipeline.delete(document_id, user_id, db)
    from src.pipeline.triggers import on_tabular_deleted
    await on_tabular_deleted(document_id, user_id)
    return {"status": "success", "message": "Document deleted successfully"}
 
 
@router.post("/document/process")
@log_execution(logger)
async def process_document(
    document_id: str,
    user_id: str,
    db: AsyncSession = Depends(get_db)
):
    """Process document and ingest to vector index."""
    data = await document_pipeline.process(document_id, user_id, db)

    if data["file_type"] in ("csv", "xlsx"):
        from src.pipeline.triggers import on_tabular_uploaded
        try:
            await on_tabular_uploaded(document_id, user_id)
        except Exception as e:
            logger.error("catalog ingestion failed after process", document_id=document_id, error=str(e))

    return {"status": "success", "message": "Document processed successfully", "data": data}