legco_ai_assistant/backend/app/routers/ingest.py

171 lines
5.9 KiB
Python

"""Document ingestion router."""
import logging
import os
import tempfile
import uuid
from pathlib import Path
from fastapi import APIRouter, UploadFile, File, HTTPException
from app.models.ingest import IngestResponse
logger = logging.getLogger(__name__)
router = APIRouter(tags=["ingest"])
SUPPORTED_EXTENSIONS = {".pdf", ".docx", ".txt"}
def _delete_existing_document(rag, filename: str, chunk_dir: str) -> None:
"""Delete existing document with same filename from ChromaDB and chunk PDFs."""
doc_list, _, _ = rag.list_documents()
existing = [d for d in doc_list if d["filename"] == filename]
if not existing:
return
for doc in existing:
old_id = doc["document_id"]
chunks_info = rag.list_chunks(old_id)
for chunk in chunks_info:
chunk_file = chunk.get("chunk_file_path")
if chunk_file:
full_path = os.path.join(chunk_dir, chunk_file)
if os.path.exists(full_path):
os.unlink(full_path)
rag.delete_document(old_id)
logger.info("Deleted existing document %s (filename=%s)", old_id, filename)
@router.post("/ingest", response_model=IngestResponse)
async def ingest_document(file: UploadFile = File(...)):
"""Ingest a document into the RAG system."""
from app.core.config import get_settings
from app.services.rag import RAGService
from app.utils.chunking import TokenChunkingStrategy
from app.utils.metadata import extract_metadata
filename = file.filename or "unknown"
file_ext = Path(filename).suffix.lower()
if file_ext not in SUPPORTED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Unsupported file format: {file_ext}. Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}",
)
settings = get_settings()
temp_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp:
content = await file.read()
tmp.write(content)
temp_path = tmp.name
logger.info("Ingesting file: %s (%d bytes)", filename, len(content))
rag = RAGService(settings=settings)
chunk_dir = settings.document_chunk_path
_delete_existing_document(rag, filename, chunk_dir)
document_id = str(uuid.uuid4())
chunker = TokenChunkingStrategy(
chunk_size=settings.chunk_size, overlap=settings.chunk_overlap
)
if file_ext == ".pdf":
from app.utils.pdf_parser import parse_pdf_by_page
pages = parse_pdf_by_page(temp_path)
if not pages:
raise HTTPException(
status_code=400,
detail="Document appears to be empty or could not be parsed",
)
chunked = chunker.chunk_pages(pages, overlap_tokens=settings.chunk_overlap)
chunk_texts = [text for text, _ in chunked]
page_numbers = [pn for _, pn in chunked]
os.makedirs(chunk_dir, exist_ok=True)
stem = Path(filename).stem
chunk_file_paths: list[str | None] = []
for page_num in page_numbers:
from app.utils.pdf_extractor import extract_page_as_pdf
chunk_filename = f"{stem}_page_{page_num}.pdf"
output_path = os.path.join(chunk_dir, chunk_filename)
try:
extract_page_as_pdf(temp_path, page_num, output_path)
chunk_file_paths.append(chunk_filename)
except Exception as exc:
logger.warning(
"Failed to extract page %d PDF for %s: %s",
page_num, filename, exc,
)
chunk_file_paths.append(None)
metadata = extract_metadata(
temp_path,
chunk_texts,
original_filename=filename,
page_numbers=page_numbers,
chunk_file_paths=chunk_file_paths,
document_id=document_id,
)
rag.ingest_document(temp_path, chunk_texts, metadata, document_id=document_id)
elif file_ext == ".docx":
from app.utils.docx_parser import parse_docx
text = parse_docx(temp_path)
chunks = chunker.chunk(text)
if not chunks:
raise HTTPException(
status_code=400,
detail="Document appears to be empty or could not be parsed",
)
metadata = extract_metadata(
temp_path, chunks, original_filename=filename, document_id=document_id
)
rag.ingest_document(temp_path, chunks, metadata, document_id=document_id)
elif file_ext == ".txt":
with open(temp_path, "r", encoding="utf-8") as f:
text = f.read()
chunks = chunker.chunk(text)
if not chunks:
raise HTTPException(
status_code=400,
detail="Document appears to be empty or could not be parsed",
)
metadata = extract_metadata(
temp_path, chunks, original_filename=filename, document_id=document_id
)
rag.ingest_document(temp_path, chunks, metadata, document_id=document_id)
logger.info("Ingested %s: doc_id=%s", filename, document_id)
chunk_count = len(chunk_texts) if file_ext == ".pdf" else len(chunks)
return IngestResponse(
document_id=document_id,
chunk_count=chunk_count,
filename=filename,
)
except HTTPException:
raise
except Exception as e:
logger.error("Ingestion failed for %s: %s", filename, str(e))
raise HTTPException(status_code=500, detail=f"Ingestion failed: {str(e)}")
finally:
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)