diff --git a/backend/app/models/documents.py b/backend/app/models/documents.py new file mode 100644 index 0000000..6477588 --- /dev/null +++ b/backend/app/models/documents.py @@ -0,0 +1,29 @@ +from typing import List, Optional + +from pydantic import BaseModel + + +class DocumentInfo(BaseModel): + document_id: str + filename: str + chunk_count: int + upload_date: str + + +class ChunkInfo(BaseModel): + chunk_id: str + chunk_index: int + content_summary: str + page_number: Optional[int] = None + chunk_file_path: Optional[str] = None + + +class DocumentListResponse(BaseModel): + documents: List[DocumentInfo] + total_documents: int + total_chunks: int + + +class DeleteResponse(BaseModel): + deleted: bool + message: str diff --git a/backend/app/services/rag.py b/backend/app/services/rag.py index be43900..515bba2 100644 --- a/backend/app/services/rag.py +++ b/backend/app/services/rag.py @@ -113,3 +113,74 @@ class RAGService: ) return await self.llm_client.complete(prompt=prompt, temperature=0.3, step_name="ResponseGeneration") + + def list_documents(self) -> Tuple[List[Dict[str, Any]], int, int]: + from collections import defaultdict + + all_data = self.collection.get(include=["metadatas"]) + + if not all_data["metadatas"]: + return [], 0, 0 + + docs = defaultdict(lambda: {"filename": "", "chunk_count": 0, "upload_date": ""}) + + for chunk_id, meta in zip(all_data["ids"], all_data["metadatas"]): + parts = chunk_id.rsplit("_", 1) + doc_id = parts[0] if len(parts) == 2 else chunk_id + + docs[doc_id]["filename"] = meta.get("filename", "unknown") + docs[doc_id]["chunk_count"] += 1 + docs[doc_id]["upload_date"] = meta.get("upload_date", "") + + total_chunks = sum(d["chunk_count"] for d in docs.values()) + doc_list = [ + { + "document_id": doc_id, + "filename": info["filename"], + "chunk_count": info["chunk_count"], + "upload_date": info["upload_date"], + } + for doc_id, info in docs.items() + ] + + return doc_list, len(doc_list), total_chunks + + def list_chunks(self, document_id: str) -> List[Dict[str, Any]]: + all_data = self.collection.get(include=["metadatas"]) + + chunks = [] + for chunk_id, meta in zip(all_data["ids"], all_data["metadatas"]): + if chunk_id.startswith(f"{document_id}_"): + chunks.append({ + "chunk_id": chunk_id, + "chunk_index": meta.get("chunk_index", 0), + "content_summary": meta.get("content_summary", ""), + "page_number": meta.get("page_number"), + "chunk_file_path": meta.get("chunk_file_path"), + }) + + chunks.sort(key=lambda x: x["chunk_index"]) + return chunks + + def delete_document(self, document_id: str) -> Tuple[bool, int]: + all_data = self.collection.get(include=["metadatas"]) + + ids_to_delete = [ + chunk_id for chunk_id in all_data["ids"] + if chunk_id.startswith(f"{document_id}_") + ] + + if not ids_to_delete: + return False, 0 + + self.collection.delete(ids=ids_to_delete) + return True, len(ids_to_delete) + + def delete_chunk(self, chunk_id: str) -> bool: + all_data = self.collection.get(include=["metadatas"]) + + if chunk_id not in all_data["ids"]: + return False + + self.collection.delete(ids=[chunk_id]) + return True