"""Document ingestion router.""" import logging import os import tempfile import uuid from pathlib import Path from fastapi import APIRouter, UploadFile, File, HTTPException, Query from app.models.ingest import IngestResponse, VALID_CHUNKING_STRATEGIES logger = logging.getLogger(__name__) router = APIRouter(tags=["ingest"]) SUPPORTED_EXTENSIONS = {".pdf", ".docx", ".txt"} def _delete_existing_document(rag, filename: str, chunk_dir: str) -> None: """Delete existing document with same filename from ChromaDB and chunk PDFs.""" doc_list, _, _ = rag.list_documents() existing = [d for d in doc_list if d["filename"] == filename] if not existing: return for doc in existing: old_id = doc["document_id"] chunks_info = rag.list_chunks(old_id) for chunk in chunks_info: chunk_file = chunk.get("chunk_file_path") if chunk_file: full_path = os.path.join(chunk_dir, chunk_file) if os.path.exists(full_path): os.unlink(full_path) rag.delete_document(old_id) logger.info("Deleted existing document %s (filename=%s)", old_id, filename) @router.post("/ingest", response_model=IngestResponse) async def ingest_document( file: UploadFile = File(...), strategy: str = Query("token"), ): """Ingest a document into the RAG system.""" from app.core.config import get_settings from app.services.rag import RAGService from app.utils.chunking import get_chunking_strategy from app.utils.metadata import extract_metadata filename = file.filename or "unknown" file_ext = Path(filename).suffix.lower() if file_ext not in SUPPORTED_EXTENSIONS: raise HTTPException( status_code=400, detail=f"Unsupported file format: {file_ext}. Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}", ) if strategy not in VALID_CHUNKING_STRATEGIES: raise HTTPException( status_code=400, detail=f"Invalid chunking strategy: {strategy}. Valid: {', '.join(sorted(VALID_CHUNKING_STRATEGIES))}", ) settings = get_settings() temp_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp: content = await file.read() tmp.write(content) temp_path = tmp.name logger.info("Ingesting file: %s (%d bytes)", filename, len(content)) rag = RAGService(settings=settings) chunk_dir = settings.document_chunk_path _delete_existing_document(rag, filename, chunk_dir) document_id = str(uuid.uuid4()) chunker = get_chunking_strategy(strategy, settings) if file_ext == ".pdf": from app.utils.pdf_parser import parse_pdf_by_page pages = parse_pdf_by_page(temp_path) if not pages: raise HTTPException( status_code=400, detail="Document appears to be empty or could not be parsed", ) chunked = await chunker.chunk_pages(pages, overlap_tokens=settings.chunk_overlap) chunk_texts = [text for text, _ in chunked] page_numbers = [pn for _, pn in chunked] os.makedirs(chunk_dir, exist_ok=True) stem = Path(filename).stem chunk_file_paths: list[str | None] = [] for page_num in page_numbers: from app.utils.pdf_extractor import extract_page_as_pdf chunk_filename = f"{stem}_page_{page_num}.pdf" output_path = os.path.join(chunk_dir, chunk_filename) try: extract_page_as_pdf(temp_path, page_num, output_path) chunk_file_paths.append(chunk_filename) except Exception as exc: logger.warning( "Failed to extract page %d PDF for %s: %s", page_num, filename, exc, ) chunk_file_paths.append(None) chunk_metadata = chunker._chunk_metadata if hasattr(chunker, '_chunk_metadata') else None metadata = extract_metadata( temp_path, chunk_texts, original_filename=filename, page_numbers=page_numbers, chunk_file_paths=chunk_file_paths, document_id=document_id, strategy_type=strategy, chunk_metadata=chunk_metadata, ) rag.ingest_document(temp_path, chunk_texts, metadata, document_id=document_id) elif file_ext == ".docx": from app.utils.docx_parser import parse_docx text = parse_docx(temp_path) chunks = chunker.chunk(text) if not chunks: raise HTTPException( status_code=400, detail="Document appears to be empty or could not be parsed", ) os.makedirs(chunk_dir, exist_ok=True) stem = Path(filename).stem chunk_file_paths: list[str | None] = [] for idx in range(len(chunks)): chunk_filename = f"{stem}_chunk_{idx}.pdf" output_path = os.path.join(chunk_dir, chunk_filename) try: from app.utils.text_to_pdf import generate_text_pdf generate_text_pdf(chunks[idx], output_path) chunk_file_paths.append(chunk_filename) except Exception as exc: logger.warning( "Failed to generate chunk %d PDF for %s: %s", idx, filename, exc, ) chunk_file_paths.append(None) chunk_metadata = chunker._chunk_metadata if hasattr(chunker, '_chunk_metadata') else None metadata = extract_metadata( temp_path, chunks, original_filename=filename, chunk_file_paths=chunk_file_paths, document_id=document_id, strategy_type=strategy, chunk_metadata=chunk_metadata, ) rag.ingest_document(temp_path, chunks, metadata, document_id=document_id) elif file_ext == ".txt": with open(temp_path, "r", encoding="utf-8") as f: text = f.read() chunks = chunker.chunk(text) if not chunks: raise HTTPException( status_code=400, detail="Document appears to be empty or could not be parsed", ) os.makedirs(chunk_dir, exist_ok=True) stem = Path(filename).stem chunk_file_paths: list[str | None] = [] for idx in range(len(chunks)): chunk_filename = f"{stem}_chunk_{idx}.pdf" output_path = os.path.join(chunk_dir, chunk_filename) try: from app.utils.text_to_pdf import generate_text_pdf generate_text_pdf(chunks[idx], output_path) chunk_file_paths.append(chunk_filename) except Exception as exc: logger.warning( "Failed to generate chunk %d PDF for %s: %s", idx, filename, exc, ) chunk_file_paths.append(None) chunk_metadata = chunker._chunk_metadata if hasattr(chunker, '_chunk_metadata') else None metadata = extract_metadata( temp_path, chunks, original_filename=filename, chunk_file_paths=chunk_file_paths, document_id=document_id, strategy_type=strategy, chunk_metadata=chunk_metadata, ) rag.ingest_document(temp_path, chunks, metadata, document_id=document_id) logger.info("Ingested %s: doc_id=%s", filename, document_id) chunk_count = len(chunk_texts) if file_ext == ".pdf" else len(chunks) return IngestResponse( document_id=document_id, chunk_count=chunk_count, filename=filename, strategy=strategy, ) except HTTPException: raise except Exception as e: logger.error("Ingestion failed for %s: %s", filename, str(e)) raise HTTPException(status_code=500, detail=f"Ingestion failed: {str(e)}") finally: if temp_path and os.path.exists(temp_path): os.unlink(temp_path)