legco_ai_assistant/backend/app/routers/ingest.py

225 lines
8.3 KiB
Python

"""Document ingestion router."""
import logging
import os
import tempfile
import uuid
from pathlib import Path
from fastapi import APIRouter, UploadFile, File, HTTPException, Query
from app.models.ingest import IngestResponse, VALID_CHUNKING_STRATEGIES
logger = logging.getLogger(__name__)
router = APIRouter(tags=["ingest"])
SUPPORTED_EXTENSIONS = {".pdf", ".docx", ".txt"}
def _delete_existing_document(rag, filename: str, chunk_dir: str) -> None:
"""Delete existing document with same filename from ChromaDB and chunk PDFs."""
doc_list, _, _ = rag.list_documents()
existing = [d for d in doc_list if d["filename"] == filename]
if not existing:
return
for doc in existing:
old_id = doc["document_id"]
chunks_info = rag.list_chunks(old_id)
for chunk in chunks_info:
chunk_file = chunk.get("chunk_file_path")
if chunk_file:
full_path = os.path.join(chunk_dir, chunk_file)
if os.path.exists(full_path):
os.unlink(full_path)
rag.delete_document(old_id)
logger.info("Deleted existing document %s (filename=%s)", old_id, filename)
@router.post("/ingest", response_model=IngestResponse)
async def ingest_document(
file: UploadFile = File(...),
strategy: str = Query("token"),
):
"""Ingest a document into the RAG system."""
from app.core.config import get_settings
from app.services.rag import RAGService
from app.utils.chunking import get_chunking_strategy
from app.utils.metadata import extract_metadata
filename = file.filename or "unknown"
file_ext = Path(filename).suffix.lower()
if file_ext not in SUPPORTED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Unsupported file format: {file_ext}. Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}",
)
if strategy not in VALID_CHUNKING_STRATEGIES:
raise HTTPException(
status_code=400,
detail=f"Invalid chunking strategy: {strategy}. Valid: {', '.join(sorted(VALID_CHUNKING_STRATEGIES))}",
)
settings = get_settings()
temp_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp:
content = await file.read()
tmp.write(content)
temp_path = tmp.name
logger.info("Ingesting file: %s (%d bytes)", filename, len(content))
rag = RAGService(settings=settings)
chunk_dir = settings.document_chunk_path
_delete_existing_document(rag, filename, chunk_dir)
document_id = str(uuid.uuid4())
chunker = get_chunking_strategy(strategy, settings)
if file_ext == ".pdf":
from app.utils.pdf_parser import parse_pdf_by_page
pages = parse_pdf_by_page(temp_path)
if not pages:
raise HTTPException(
status_code=400,
detail="Document appears to be empty or could not be parsed",
)
chunked = chunker.chunk_pages(pages, overlap_tokens=settings.chunk_overlap)
chunk_texts = [text for text, _ in chunked]
page_numbers = [pn for _, pn in chunked]
os.makedirs(chunk_dir, exist_ok=True)
stem = Path(filename).stem
chunk_file_paths: list[str | None] = []
for page_num in page_numbers:
from app.utils.pdf_extractor import extract_page_as_pdf
chunk_filename = f"{stem}_page_{page_num}.pdf"
output_path = os.path.join(chunk_dir, chunk_filename)
try:
extract_page_as_pdf(temp_path, page_num, output_path)
chunk_file_paths.append(chunk_filename)
except Exception as exc:
logger.warning(
"Failed to extract page %d PDF for %s: %s",
page_num, filename, exc,
)
chunk_file_paths.append(None)
chunk_metadata = chunker._chunk_metadata if hasattr(chunker, '_chunk_metadata') else None
metadata = extract_metadata(
temp_path,
chunk_texts,
original_filename=filename,
page_numbers=page_numbers,
chunk_file_paths=chunk_file_paths,
document_id=document_id,
strategy_type=strategy,
chunk_metadata=chunk_metadata,
)
rag.ingest_document(temp_path, chunk_texts, metadata, document_id=document_id)
elif file_ext == ".docx":
from app.utils.docx_parser import parse_docx
text = parse_docx(temp_path)
chunks = chunker.chunk(text)
if not chunks:
raise HTTPException(
status_code=400,
detail="Document appears to be empty or could not be parsed",
)
os.makedirs(chunk_dir, exist_ok=True)
stem = Path(filename).stem
chunk_file_paths: list[str | None] = []
for idx in range(len(chunks)):
chunk_filename = f"{stem}_chunk_{idx}.pdf"
output_path = os.path.join(chunk_dir, chunk_filename)
try:
from app.utils.text_to_pdf import generate_text_pdf
generate_text_pdf(chunks[idx], output_path)
chunk_file_paths.append(chunk_filename)
except Exception as exc:
logger.warning(
"Failed to generate chunk %d PDF for %s: %s",
idx, filename, exc,
)
chunk_file_paths.append(None)
chunk_metadata = chunker._chunk_metadata if hasattr(chunker, '_chunk_metadata') else None
metadata = extract_metadata(
temp_path, chunks, original_filename=filename,
chunk_file_paths=chunk_file_paths, document_id=document_id,
strategy_type=strategy, chunk_metadata=chunk_metadata,
)
rag.ingest_document(temp_path, chunks, metadata, document_id=document_id)
elif file_ext == ".txt":
with open(temp_path, "r", encoding="utf-8") as f:
text = f.read()
chunks = chunker.chunk(text)
if not chunks:
raise HTTPException(
status_code=400,
detail="Document appears to be empty or could not be parsed",
)
os.makedirs(chunk_dir, exist_ok=True)
stem = Path(filename).stem
chunk_file_paths: list[str | None] = []
for idx in range(len(chunks)):
chunk_filename = f"{stem}_chunk_{idx}.pdf"
output_path = os.path.join(chunk_dir, chunk_filename)
try:
from app.utils.text_to_pdf import generate_text_pdf
generate_text_pdf(chunks[idx], output_path)
chunk_file_paths.append(chunk_filename)
except Exception as exc:
logger.warning(
"Failed to generate chunk %d PDF for %s: %s",
idx, filename, exc,
)
chunk_file_paths.append(None)
chunk_metadata = chunker._chunk_metadata if hasattr(chunker, '_chunk_metadata') else None
metadata = extract_metadata(
temp_path, chunks, original_filename=filename,
chunk_file_paths=chunk_file_paths, document_id=document_id,
strategy_type=strategy, chunk_metadata=chunk_metadata,
)
rag.ingest_document(temp_path, chunks, metadata, document_id=document_id)
logger.info("Ingested %s: doc_id=%s", filename, document_id)
chunk_count = len(chunk_texts) if file_ext == ".pdf" else len(chunks)
return IngestResponse(
document_id=document_id,
chunk_count=chunk_count,
filename=filename,
strategy=strategy,
)
except HTTPException:
raise
except Exception as e:
logger.error("Ingestion failed for %s: %s", filename, str(e))
raise HTTPException(status_code=500, detail=f"Ingestion failed: {str(e)}")
finally:
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)