Rifqi Hafizuddin commited on
Commit ·
f273db0
1
Parent(s): abc494f
[NOTICKET] fix delete, now can filter by user
Browse files
src/document/document_service.py
CHANGED
|
@@ -3,6 +3,7 @@
|
|
| 3 |
from sqlalchemy.ext.asyncio import AsyncSession
|
| 4 |
from sqlalchemy import select, delete, text
|
| 5 |
from src.db.postgres.models import Document
|
|
|
|
| 6 |
from src.storage.az_blob.az_blob import blob_storage
|
| 7 |
from src.middlewares.logging import get_logger
|
| 8 |
from typing import List, Optional
|
|
@@ -77,11 +78,20 @@ class DocumentService:
|
|
| 77 |
# Delete from blob storage
|
| 78 |
await blob_storage.delete_file(document.blob_name)
|
| 79 |
|
| 80 |
-
# Delete vector embeddings from pgvector
|
| 81 |
-
|
| 82 |
-
|
| 83 |
-
|
| 84 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 85 |
|
| 86 |
# Delete from database
|
| 87 |
await db.execute(
|
|
|
|
| 3 |
from sqlalchemy.ext.asyncio import AsyncSession
|
| 4 |
from sqlalchemy import select, delete, text
|
| 5 |
from src.db.postgres.models import Document
|
| 6 |
+
from src.db.postgres.connection import _pgvector_engine
|
| 7 |
from src.storage.az_blob.az_blob import blob_storage
|
| 8 |
from src.middlewares.logging import get_logger
|
| 9 |
from typing import List, Optional
|
|
|
|
| 78 |
# Delete from blob storage
|
| 79 |
await blob_storage.delete_file(document.blob_name)
|
| 80 |
|
| 81 |
+
# Delete vector embeddings from pgvector (scoped to user + collection to avoid cross-user over-delete)
|
| 82 |
+
async with _pgvector_engine.begin() as conn:
|
| 83 |
+
await conn.execute(
|
| 84 |
+
text("""
|
| 85 |
+
DELETE FROM langchain_pg_embedding
|
| 86 |
+
WHERE cmetadata->>'user_id' = :user_id
|
| 87 |
+
AND cmetadata->>'source_type' = 'document'
|
| 88 |
+
AND cmetadata->'data'->>'document_id' = :doc_id
|
| 89 |
+
AND collection_id = (
|
| 90 |
+
SELECT uuid FROM langchain_pg_collection WHERE name = 'document_embeddings'
|
| 91 |
+
)
|
| 92 |
+
"""),
|
| 93 |
+
{"user_id": document.user_id, "doc_id": document_id},
|
| 94 |
+
)
|
| 95 |
|
| 96 |
# Delete from database
|
| 97 |
await db.execute(
|
src/pipeline/db_pipeline/db_pipeline_service.py
CHANGED
|
@@ -183,35 +183,39 @@ class DbPipelineService:
|
|
| 183 |
vector_store = get_vector_store()
|
| 184 |
logger.info("db pipeline start", user_id=user_id)
|
| 185 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 186 |
async with _pgvector_engine.begin() as conn:
|
| 187 |
result = await conn.execute(
|
| 188 |
text(
|
| 189 |
"DELETE FROM langchain_pg_embedding "
|
| 190 |
"WHERE cmetadata->>'user_id' = :user_id "
|
| 191 |
" AND cmetadata->>'source_type' = 'database' "
|
|
|
|
| 192 |
" AND collection_id = ("
|
| 193 |
" SELECT uuid FROM langchain_pg_collection WHERE name = 'document_embeddings'"
|
| 194 |
" )"
|
| 195 |
),
|
| 196 |
-
{"user_id": user_id},
|
| 197 |
)
|
| 198 |
logger.info("cleared old db embeddings", user_id=user_id, deleted=result.rowcount)
|
| 199 |
|
| 200 |
-
|
| 201 |
-
|
| 202 |
-
updated_at = datetime.now(timezone(timedelta(hours=7))).isoformat()
|
| 203 |
-
total = 0
|
| 204 |
-
for table_name, columns in schema.items():
|
| 205 |
-
logger.info("profiling table", table=table_name, columns=len(columns))
|
| 206 |
-
entries = await asyncio.to_thread(profile_table, engine, table_name, columns)
|
| 207 |
-
docs = [self._to_document(user_id, client_id, table_name, e, updated_at) for e in entries]
|
| 208 |
-
if docs:
|
| 209 |
-
await vector_store.aadd_documents(docs)
|
| 210 |
-
total += len(docs)
|
| 211 |
-
logger.info("ingested chunks", table=table_name, count=len(docs))
|
| 212 |
|
| 213 |
-
logger.info("db pipeline complete", user_id=user_id, total=
|
| 214 |
-
return
|
| 215 |
|
| 216 |
|
| 217 |
db_pipeline_service = DbPipelineService()
|
|
|
|
| 183 |
vector_store = get_vector_store()
|
| 184 |
logger.info("db pipeline start", user_id=user_id)
|
| 185 |
|
| 186 |
+
# Profile first — if this fails, old embeddings are untouched
|
| 187 |
+
schema = await asyncio.to_thread(get_schema, engine, exclude_tables)
|
| 188 |
+
|
| 189 |
+
updated_at = datetime.now(timezone(timedelta(hours=7))).isoformat()
|
| 190 |
+
all_docs: list = []
|
| 191 |
+
for table_name, columns in schema.items():
|
| 192 |
+
logger.info("profiling table", table=table_name, columns=len(columns))
|
| 193 |
+
entries = await asyncio.to_thread(profile_table, engine, table_name, columns)
|
| 194 |
+
docs = [self._to_document(user_id, client_id, table_name, e, updated_at) for e in entries]
|
| 195 |
+
all_docs.extend(docs)
|
| 196 |
+
logger.info("profiled table", table=table_name, count=len(docs))
|
| 197 |
+
|
| 198 |
+
# Delete only after all docs are ready
|
| 199 |
async with _pgvector_engine.begin() as conn:
|
| 200 |
result = await conn.execute(
|
| 201 |
text(
|
| 202 |
"DELETE FROM langchain_pg_embedding "
|
| 203 |
"WHERE cmetadata->>'user_id' = :user_id "
|
| 204 |
" AND cmetadata->>'source_type' = 'database' "
|
| 205 |
+
" AND cmetadata->>'database_client_id' = :client_id "
|
| 206 |
" AND collection_id = ("
|
| 207 |
" SELECT uuid FROM langchain_pg_collection WHERE name = 'document_embeddings'"
|
| 208 |
" )"
|
| 209 |
),
|
| 210 |
+
{"user_id": user_id, "client_id": client_id},
|
| 211 |
)
|
| 212 |
logger.info("cleared old db embeddings", user_id=user_id, deleted=result.rowcount)
|
| 213 |
|
| 214 |
+
if all_docs:
|
| 215 |
+
await vector_store.aadd_documents(all_docs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 216 |
|
| 217 |
+
logger.info("db pipeline complete", user_id=user_id, total=len(all_docs))
|
| 218 |
+
return len(all_docs)
|
| 219 |
|
| 220 |
|
| 221 |
db_pipeline_service = DbPipelineService()
|