feat: Phase 1.2 ingestion pipeline with chunking and metadata

- Add document parsers (DOCX, PDF) with lazy imports
- Add TokenChunkingStrategy with ABC for future replacement
- Add metadata extraction (filename, upload_date, content_summary)
- Add RAGService for ChromaDB ingestion/retrieval/response generation
- Add POST /api/v1/ingest endpoint with file validation
- Test-first: 20 passed, 2 skipped (python-docx not installed)
This commit is contained in:
Woody 2026-04-22 16:49:52 +08:00
parent 3712397d64
commit d94abaac77
15 changed files with 841 additions and 55 deletions

View File

@ -1,6 +1,8 @@
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from app.routers import ingest
app = FastAPI(title="RAG Video Q&A", version="1.0.0") app = FastAPI(title="RAG Video Q&A", version="1.0.0")
app.add_middleware( app.add_middleware(
@ -11,6 +13,8 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) )
app.include_router(ingest.router, prefix="/api/v1")
@app.get("/health") @app.get("/health")
def health_check(): def health_check():

View File

@ -0,0 +1,70 @@
"""Document ingestion router."""
import os
import tempfile
import uuid
from pathlib import Path
from fastapi import APIRouter, UploadFile, File, HTTPException
from app.models.ingest import IngestResponse
router = APIRouter(tags=["ingest"])
SUPPORTED_EXTENSIONS = {".pdf", ".docx"}
@router.post("/ingest", response_model=IngestResponse)
async def ingest_document(file: UploadFile = File(...)):
"""Ingest a document into the RAG system.
Accepts PDF and DOCX files, parses text, chunks, extracts metadata,
embeds, and stores in ChromaDB.
"""
from app.services.rag import RAGService
from app.utils.chunking import TokenChunkingStrategy
from app.utils.metadata import extract_metadata
file_ext = Path(file.filename or "").suffix.lower()
if file_ext not in SUPPORTED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Unsupported file format: {file_ext}. Supported: {', '.join(SUPPORTED_EXTENSIONS)}",
)
temp_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp:
content = await file.read()
tmp.write(content)
temp_path = tmp.name
if file_ext == ".pdf":
from app.utils.pdf_parser import parse_pdf
text = parse_pdf(temp_path)
elif file_ext == ".docx":
from app.utils.docx_parser import parse_docx
text = parse_docx(temp_path)
else:
text = ""
chunker = TokenChunkingStrategy(chunk_size=1000, overlap=200)
chunks = chunker.chunk(text)
metadata = extract_metadata(temp_path, chunks)
rag = RAGService()
document_id = rag.ingest_document(temp_path, chunks, metadata)
return IngestResponse(
document_id=document_id,
chunk_count=len(chunks),
filename=file.filename or "unknown",
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Ingestion failed: {str(e)}")
finally:
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)

View File

@ -0,0 +1,28 @@
import httpx
from app.core.config import Settings
class LLMClient:
def __init__(self, settings: Settings):
self.base_url = settings.llm_base_url.rstrip("/")
self.api_key = settings.llm_api_key
self.model = settings.llm_model_name
def complete(self, prompt: str, temperature: float = 0.7) -> str:
response = httpx.post(
f"{self.base_url}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json={
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature,
},
timeout=60.0,
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]

138
backend/app/services/rag.py Normal file
View File

@ -0,0 +1,138 @@
"""RAG service for embedding, retrieval, and response generation."""
import uuid
from typing import List, Tuple, Dict, Any, Optional
import httpx
from app.core.config import Settings
from app.core.database import get_chroma_client
class RAGService:
"""Service for document ingestion, retrieval, and response generation."""
def __init__(
self,
chroma_client=None,
llm_client=None,
settings: Optional[Settings] = None,
):
self.chroma_client = chroma_client or get_chroma_client()
self.llm_client = llm_client
self.settings = settings
self._collection = None
@property
def collection(self):
"""Lazy-load the ChromaDB collection."""
if self._collection is None:
from app.core.database import get_or_create_collection
self._collection = get_or_create_collection(self.chroma_client, "documents")
return self._collection
def ingest_document(
self,
file_path: str,
chunks: List[str],
metadata_list: List[Dict[str, Any]],
) -> str:
"""Ingest document chunks into ChromaDB.
Args:
file_path: Path to the source file.
chunks: List of text chunks.
metadata_list: List of metadata dicts matching chunk count.
Returns:
Document ID (UUID) for the ingestion batch.
"""
if not chunks:
return ""
document_id = str(uuid.uuid4())
ids = [f"{document_id}_{i}" for i in range(len(chunks))]
self.collection.add(
documents=chunks,
metadatas=metadata_list,
ids=ids,
)
return document_id
def retrieve(
self,
query_keywords: List[str],
n_results: int = 10,
) -> List[Tuple[str, Dict[str, Any], float]]:
"""Retrieve relevant chunks from ChromaDB.
Args:
query_keywords: List of keywords from query decomposition.
n_results: Maximum number of results to retrieve.
Returns:
List of (chunk_text, metadata, distance) tuples.
"""
query_text = " ".join(query_keywords)
results = self.collection.query(
query_texts=[query_text],
n_results=n_results,
)
chunks = []
if results["documents"] and results["documents"][0]:
for i, doc in enumerate(results["documents"][0]):
metadata = results["metadatas"][0][i] if results["metadatas"][0] else {}
distance = results["distances"][0][i] if results["distances"][0] else 0.0
chunks.append((doc, metadata, distance))
return chunks
def generate_response(
self,
question: str,
chunks: List[str],
metadata_list: List[Dict[str, Any]],
) -> str:
"""Generate a bullet-point response using only provided chunks.
Args:
question: The user's question.
chunks: List of relevant document chunks.
metadata_list: List of metadata for each chunk.
Returns:
Bullet-point formatted answer string.
"""
if not chunks:
return "I could not find any relevant information to answer your question."
if self.llm_client is None:
return "LLM client not configured."
context_parts = []
for i, (chunk, meta) in enumerate(zip(chunks, metadata_list)):
source = meta.get("filename", "unknown")
summary = meta.get("content_summary", "")
context_parts.append(
f"[{i + 1}] Source: {source}\n"
f"Summary: {summary}\n"
f"Content: {chunk}\n"
)
context = "\n".join(context_parts)
prompt = (
f"Question: {question}\n\n"
f"Answer the question using ONLY these document chunks. "
f"Do not use any external knowledge. "
f"Format your answer as bullet points. "
f"Cite the source number [N] for each point.\n\n"
f"Document chunks:\n{context}\n\n"
f"Answer:"
)
return self.llm_client.complete(prompt=prompt, temperature=0.3)

View File

@ -0,0 +1,26 @@
"""Acceptance test: Verify LLM client can call OpenRouter API.
Prerequisites:
- backend/.env file exists with valid LLM_BASE_URL and LLM_API_KEY
- Network access to OpenRouter API
"""
import pytest
import os
@pytest.mark.acceptance
@pytest.mark.slow
def test_llm_client_says_hi():
"""Should send 'hi' to LLM and receive a non-empty response."""
from app.core.config import get_settings
from app.services.llm_client import LLMClient
settings = get_settings()
client = LLMClient(settings)
response = client.complete("Say hi briefly", temperature=0.7)
assert response is not None
assert len(response) > 0
assert isinstance(response, str)
print(f"LLM Response: {response}")

View File

@ -1,24 +1,55 @@
"""Phase 1 tests: Document chunking utilities. """Phase 1 tests: Document chunking utilities.
Covers: This file drives Test-First development for the chunking subsystem:
- Text splitting strategies - Abstract base interface for chunking strategies
- Chunk size and overlap parameters - Concrete TokenChunkingStrategy backed by tiktoken
- Handling of different document formats - Edge cases: empty input, whitespace-only input, small input
""" """
import importlib.util
from pathlib import Path
import pytest import pytest
# Dynamically load the chunking module directly from the filesystem to avoid
# import path issues in the test environment.
CHUNKING_PATH = Path(__file__).resolve().parents[1] / "utils" / "chunking.py"
spec = importlib.util.spec_from_file_location("legco_chunking", str(CHUNKING_PATH))
chunking_module = importlib.util.module_from_spec(spec) # type: ignore
assert spec and spec.loader
spec.loader.exec_module(chunking_module) # type: ignore
ChunkingStrategy = chunking_module.ChunkingStrategy
TokenChunkingStrategy = chunking_module.TokenChunkingStrategy
class TestChunking:
"""Document chunking utility tests."""
def test_chunk_size_limit(self): def test_abstract_base_class_not_instantiable():
"""Should respect maximum chunk size.""" # Abstract base class should not be instantiable directly
pass # TODO: implement with pytest.raises(TypeError):
ChunkingStrategy() # type: ignore
def test_chunk_overlap(self):
"""Should include overlap between adjacent chunks."""
pass # TODO: implement
def test_empty_document(self): def test_empty_and_whitespace_inputs_yield_no_chunks():
"""Should handle empty or whitespace-only documents.""" strat = TokenChunkingStrategy()
pass # TODO: implement assert strat.chunk("") == []
assert strat.chunk(" \n\t") == []
def test_text_shorter_than_chunk_size_results_in_single_chunk():
# Use a small chunk size for a deterministic test
strat = TokenChunkingStrategy(chunk_size=4, overlap=2)
text = "Hello world" # two tokens in typical tokenization
chunks = strat.chunk(text)
assert isinstance(chunks, list)
assert len(chunks) == 1
assert chunks[0] == text
def test_text_longer_produces_multiple_chunks():
# Build a long sequence by repeating a simple token to ensure > chunk_size tokens
long_text = ("word " * 1100).strip()
strat = TokenChunkingStrategy(chunk_size=1000, overlap=200)
chunks = strat.chunk(long_text)
assert isinstance(chunks, list)
assert len(chunks) >= 2
# Ensure chunks are non-empty and that the transformation round-trips for the first chunk
assert all(isinstance(c, str) for c in chunks)
assert all(len(c) > 0 for c in chunks)

View File

@ -7,23 +7,97 @@ Covers:
- Error handling for unsupported file types - Error handling for unsupported file types
""" """
import pytest import pytest
from fastapi.testclient import TestClient
from unittest.mock import MagicMock, patch
class TestIngest: class TestIngest:
"""Document upload and ChromaDB ingestion tests.""" """Document upload and ChromaDB ingestion tests."""
def test_ingest_pdf_success(self): @pytest.fixture
def client(self):
"""Create test client with mocked dependencies."""
from app.main import app
return TestClient(app)
def test_ingest_pdf_success(self, client, tmp_path):
"""Should ingest PDF and return document ID with metadata.""" """Should ingest PDF and return document ID with metadata."""
pass # TODO: implement import io
def test_ingest_txt_success(self): with patch("app.services.rag.RAGService") as mock_rag_class:
"""Should ingest plain text and chunk correctly.""" mock_rag = MagicMock()
pass # TODO: implement mock_rag.ingest_document.return_value = "doc-123"
mock_rag_class.return_value = mock_rag
def test_ingest_metadata_extraction(self): with patch("app.utils.pdf_parser.parse_pdf") as mock_parse:
"""Should extract filename, upload_date, content_summary.""" mock_parse.return_value = "Parsed PDF text content"
pass # TODO: implement
def test_ingest_unsupported_format(self): with patch("app.utils.chunking.TokenChunkingStrategy") as mock_chunk_class:
mock_chunker = MagicMock()
mock_chunker.chunk.return_value = ["chunk 1", "chunk 2"]
mock_chunk_class.return_value = mock_chunker
with patch("app.utils.metadata.extract_metadata") as mock_meta:
mock_meta.return_value = [
{"filename": "test.pdf", "chunk_index": 0},
{"filename": "test.pdf", "chunk_index": 1},
]
response = client.post(
"/api/v1/ingest",
files={"file": ("test.pdf", io.BytesIO(b"%PDF-1.4"), "application/pdf")},
)
assert response.status_code == 200
data = response.json()
assert "document_id" in data
assert data["chunk_count"] == 2
assert data["filename"] == "test.pdf"
def test_ingest_docx_success(self, client, tmp_path):
"""Should ingest DOCX and return document ID with metadata."""
import io
with patch("app.services.rag.RAGService") as mock_rag_class:
mock_rag = MagicMock()
mock_rag.ingest_document.return_value = "doc-456"
mock_rag_class.return_value = mock_rag
with patch("app.utils.docx_parser.parse_docx") as mock_parse:
mock_parse.return_value = "Parsed DOCX text content"
with patch("app.utils.chunking.TokenChunkingStrategy") as mock_chunk_class:
mock_chunker = MagicMock()
mock_chunker.chunk.return_value = ["chunk 1"]
mock_chunk_class.return_value = mock_chunker
with patch("app.utils.metadata.extract_metadata") as mock_meta:
mock_meta.return_value = [{"filename": "test.docx", "chunk_index": 0}]
response = client.post(
"/api/v1/ingest",
files={"file": ("test.docx", io.BytesIO(b"docx content"), "application/vnd.openxmlformats-officedocument.wordprocessingml.document")},
)
assert response.status_code == 200
data = response.json()
assert data["chunk_count"] == 1
assert data["filename"] == "test.docx"
def test_ingest_unsupported_format(self, client):
"""Should reject unsupported file formats.""" """Should reject unsupported file formats."""
pass # TODO: implement import io
response = client.post(
"/api/v1/ingest",
files={"file": ("test.jpg", io.BytesIO(b"image data"), "image/jpeg")},
)
assert response.status_code == 400
assert "unsupported" in response.json()["detail"].lower()
def test_ingest_no_file(self, client):
"""Should reject request without file."""
response = client.post("/api/v1/ingest")
assert response.status_code == 422

View File

@ -1,25 +1,67 @@
"""Phase 1 tests: Metadata extraction utilities. import re
from pathlib import Path
from datetime import datetime
Covers:
- Filename extraction
- Upload date generation
- Content summary generation
- Metadata schema validation
"""
import pytest import pytest
import sys
from pathlib import Path
import importlib.util
class TestMetadata: # Dynamically load the metadata extractor to avoid package-path import issues
"""Metadata extraction utility tests.""" # The module lives at backend/app/utils/metadata.py relative to this test file.
MODULE_PATH = Path(__file__).resolve().parents[1] / "utils" / "metadata.py"
spec = importlib.util.spec_from_file_location("metadata_module", str(MODULE_PATH))
metadata_module = importlib.util.module_from_spec(spec) # type: ignore
assert spec is not None and spec.loader is not None
spec.loader.exec_module(metadata_module) # type: ignore
extract_metadata = getattr(metadata_module, "extract_metadata")
def test_extract_filename(self):
"""Should extract clean filename from path."""
pass # TODO: implement
def test_generate_upload_date(self): def _is_iso8601(s: str) -> bool:
"""Should generate ISO format upload date.""" try:
pass # TODO: implement datetime.fromisoformat(s)
return True
except ValueError:
return False
def test_content_summary(self):
"""Should generate concise content summary.""" def test_extract_metadata_basic(tmp_path):
pass # TODO: implement # Prepare a dummy file path that exists
dummy_file = tmp_path / "dir with spaces" / "sample.txt"
dummy_file.parent.mkdir(parents=True, exist_ok=True)
dummy_file.write_text("content")
chunks = ["a" * 250, "short"]
metadata = extract_metadata(str(dummy_file), chunks)
assert isinstance(metadata, list)
assert len(metadata) == 2
# First chunk
m0 = metadata[0]
assert m0["filename"] == "sample.txt"
assert m0["chunk_index"] == 0
assert m0["upload_date"] is not None
assert _is_iso8601(m0["upload_date"])
assert m0["content_summary"] == "a" * 200
# Second chunk
m1 = metadata[1]
assert m1["filename"] == "sample.txt"
assert m1["chunk_index"] == 1
assert m1["content_summary"] == "short"
def test_extract_metadata_empty_chunks(tmp_path):
dummy_file = tmp_path / "file.txt"
dummy_file.write_text("data")
metadata = extract_metadata(str(dummy_file), [])
assert metadata == []
def test_extract_metadata_missing_file_raises(tmp_path):
missing = tmp_path / "nonexistent" / "nofile.txt"
with pytest.raises(FileNotFoundError):
extract_metadata(str(missing), ["data"])

View File

@ -0,0 +1,67 @@
"""Phase 1.2: Document parsers tests (DOCX and PDF)."""
import os
from pathlib import Path
import pytest
# python-docx may not be installed in all environments. Skip DOCX tests if unavailable.
def test_parse_docx_basic(tmp_path):
# Dynamically create a minimal DOCX with two paragraphs
doc_path = tmp_path / "sample.docx"
try:
from docx import Document as Doc
doc = Doc()
except Exception:
pytest.skip("python-docx not installed, skipping DOCX tests")
doc.add_paragraph("Hello")
doc.add_paragraph("World")
doc.save(str(doc_path))
# Import here to avoid test import side effects
from app.utils.docx_parser import parse_docx
text = parse_docx(str(doc_path))
assert text == "Hello\nWorld"
def test_parse_docx_empty(tmp_path):
doc_path = tmp_path / "empty.docx"
try:
from docx import Document as Doc
doc = Doc()
except Exception:
pytest.skip("python-docx not installed, skipping DOCX tests")
doc.save(str(doc_path))
from app.utils.docx_parser import parse_docx
text = parse_docx(str(doc_path))
assert text == ""
def test_parse_docx_corrupted(tmp_path):
# Create a file with DOCX extension but invalid content
corrupted_path = tmp_path / "corrupted.docx"
corrupted_path.write_bytes(b"not a real docx content")
from app.utils.docx_parser import parse_docx
with pytest.raises(ValueError):
parse_docx(str(corrupted_path))
def test_parse_pdf_empty(tmp_path):
# Create an empty (0-byte) PDF file
pdf_path = tmp_path / "empty.pdf"
pdf_path.write_bytes(b"") # 0 bytes
from app.utils.pdf_parser import parse_pdf
with pytest.raises(ValueError):
parse_pdf(str(pdf_path))
def test_parse_pdf_corrupted(tmp_path):
pdf_path = tmp_path / "corrupted.pdf"
pdf_path.write_bytes(b"not a pdf content")
from app.utils.pdf_parser import parse_pdf
with pytest.raises(ValueError):
parse_pdf(str(pdf_path))

View File

@ -1,25 +1,137 @@
"""Phase 1 tests: RAG service logic. """Phase 1 tests: RAG service logic.
Covers: Covers:
- ChromaDB retrieval with Qwen embeddings - ChromaDB document ingestion with metadata
- Context assembly for LLM prompt - Retrieval with query keywords
- Strict prompt construction (answer ONLY from retrieved context) - Response generation with strict RAG prompt
- Metadata handling per chunk - Metadata handling per chunk
""" """
import pytest import pytest
from unittest.mock import MagicMock, patch
class TestRAGService: class TestRAGService:
"""RAG retrieval and prompt logic tests.""" """RAG retrieval and prompt logic tests."""
def test_retrieve_relevant_chunks(self): def test_ingest_document_adds_chunks(self):
"""Should retrieve semantically relevant chunks from ChromaDB.""" """Should add chunks with metadata to ChromaDB collection."""
pass # TODO: implement from app.services.rag import RAGService
def test_strict_prompt_format(self): mock_collection = MagicMock()
"""Should construct prompt forbidding external knowledge.""" mock_client = MagicMock()
pass # TODO: implement mock_client.get_or_create_collection.return_value = mock_collection
def test_chunk_metadata_preserved(self): service = RAGService(chroma_client=mock_client)
"""Should preserve filename, upload_date, content_summary per chunk."""
pass # TODO: implement chunks = ["chunk one", "chunk two"]
metadata = [
{"filename": "test.txt", "upload_date": "2024-01-01", "content_summary": "summary 1", "chunk_index": 0},
{"filename": "test.txt", "upload_date": "2024-01-01", "content_summary": "summary 2", "chunk_index": 1},
]
service.ingest_document("test.txt", chunks, metadata)
mock_client.get_or_create_collection.assert_called_once_with(name="documents")
mock_collection.add.assert_called_once()
call_args = mock_collection.add.call_args[1]
assert len(call_args["documents"]) == 2
assert call_args["documents"] == chunks
assert len(call_args["metadatas"]) == 2
assert call_args["metadatas"] == metadata
assert len(call_args["ids"]) == 2
def test_ingest_document_empty_chunks(self):
"""Should not call ChromaDB when chunks list is empty."""
from app.services.rag import RAGService
mock_collection = MagicMock()
mock_client = MagicMock()
mock_client.get_or_create_collection.return_value = mock_collection
service = RAGService(chroma_client=mock_client)
service.ingest_document("test.txt", [], [])
mock_collection.add.assert_not_called()
def test_retrieve_returns_chunks(self):
"""Should retrieve chunks and metadata from ChromaDB."""
from app.services.rag import RAGService
mock_collection = MagicMock()
mock_client = MagicMock()
mock_client.get_or_create_collection.return_value = mock_collection
mock_collection.query.return_value = {
"documents": [["chunk one", "chunk two"]],
"metadatas": [[{"filename": "test.txt"}, {"filename": "test.txt"}]],
"distances": [[0.1, 0.2]],
}
service = RAGService(chroma_client=mock_client)
results = service.retrieve(["query", "keywords"], n_results=5)
mock_collection.query.assert_called_once()
call_args = mock_collection.query.call_args[1]
assert call_args["n_results"] == 5
assert len(results) == 2
assert results[0] == ("chunk one", {"filename": "test.txt"}, 0.1)
assert results[1] == ("chunk two", {"filename": "test.txt"}, 0.2)
def test_retrieve_no_results(self):
"""Should return empty list when no results found."""
from app.services.rag import RAGService
mock_collection = MagicMock()
mock_client = MagicMock()
mock_client.get_or_create_collection.return_value = mock_collection
mock_collection.query.return_value = {
"documents": [[]],
"metadatas": [[]],
"distances": [[]],
}
service = RAGService(chroma_client=mock_client)
results = service.retrieve(["query"])
assert results == []
def test_generate_response_calls_llm(self):
"""Should call LLM with strict RAG prompt."""
from app.services.rag import RAGService
mock_collection = MagicMock()
mock_client = MagicMock()
mock_client.get_or_create_collection.return_value = mock_collection
mock_llm = MagicMock()
mock_llm.complete.return_value = "- Bullet point answer"
service = RAGService(chroma_client=mock_client, llm_client=mock_llm)
chunks = ["relevant chunk"]
metadata = [{"filename": "test.txt", "content_summary": "summary"}]
answer = service.generate_response("What is this?", chunks, metadata)
mock_llm.complete.assert_called_once()
prompt = mock_llm.complete.call_args[1]["prompt"]
assert "What is this?" in prompt
assert "relevant chunk" in prompt
assert "test.txt" in prompt
assert "only these document chunks" in prompt.lower()
assert answer == "- Bullet point answer"
def test_generate_response_no_chunks(self):
"""Should return fallback message when no chunks provided."""
from app.services.rag import RAGService
mock_collection = MagicMock()
mock_client = MagicMock()
mock_client.get_or_create_collection.return_value = mock_collection
service = RAGService(chroma_client=mock_client, llm_client=MagicMock())
answer = service.generate_response("What is this?", [], [])
assert "no relevant" in answer.lower() or "could not find" in answer.lower()

View File

@ -0,0 +1,73 @@
"""Chunking utilities for Phase 1.2.
Provides an abstract ChunkingStrategy and a concrete
TokenChunkingStrategy that uses tiktoken to chunk text into
token-based windows.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List
class ChunkingStrategy(ABC):
"""Abstract base class for text chunking strategies."""
@abstractmethod
def chunk(self, text: str) -> List[str]:
"""Split text into a list of chunks (strings).
Implementations should return an empty list for empty or whitespace-only
input. The output chunks should be non-overlapping in terms of the produced
sequence when considering the token boundaries, but may overlap in raw text
due to token-based windowing.
"""
raise NotImplementedError
class TokenChunkingStrategy(ChunkingStrategy):
"""Chunk text by token windows using the tiktoken encoder.
The strategy operates on token counts: each chunk contains up to
chunk_size tokens with overlap of overlap tokens between consecutive chunks.
"""
def __init__(self, chunk_size: int = 1000, overlap: int = 200, encoding_name: str = "cl100k_base"):
if chunk_size <= 0:
raise ValueError("chunk_size must be positive")
if overlap < 0:
raise ValueError("overlap must be non-negative")
self.chunk_size = chunk_size
self.overlap = overlap
# Lazy import to avoid import-time penalties in environments without tokenizers
import tiktoken
self._encoding = tiktoken.get_encoding(encoding_name)
def chunk(self, text: str) -> List[str]:
if not isinstance(text, str):
raise TypeError("text must be a string")
if text.strip() == "":
return []
# Tokenize the input text
tokens = self._encoding.encode(text)
if not tokens:
return []
chunks: List[str] = []
step = self.chunk_size - self.overlap
if step <= 0:
step = 1 # ensure progress even with extreme overlap
for i in range(0, len(tokens), step):
segment = tokens[i : i + self.chunk_size]
if not segment:
break
chunk_text = self._encoding.decode(segment)
chunks.append(chunk_text)
# If we reached the end of the token array, break early
if len(segment) < self.chunk_size:
break
return chunks

View File

@ -0,0 +1,35 @@
from __future__ import annotations
from typing import Optional
Document = None
def _ensure_docx_imported():
global Document
if Document is None:
try:
from docx import Document as _Doc # type: ignore
Document = _Doc
except Exception as exc: # pragma: no cover - missing optional dep
raise ValueError("DOCX library is not installed") from exc
def parse_docx(file_path: str) -> str:
"""Parse a DOCX file and return its text content.
The function preserves paragraph breaks by inserting a newline between
paragraphs. Empty documents yield an empty string.
Raises:
ValueError: If the file is not a valid DOCX document or cannot be read.
"""
try:
_ensure_docx_imported()
assert Document is not None
doc = Document(file_path)
except Exception as exc: # pragma: no cover - surface invalid DOCX
raise ValueError(f"Invalid DOCX file: {exc}") from exc
paragraphs = [para.text for para in doc.paragraphs if para.text is not None]
# Join with newline to preserve paragraph breaks
return "\n".join(paragraphs).strip()

View File

@ -0,0 +1,53 @@
from __future__ import annotations
import os
from datetime import datetime
from typing import List, Dict, Any
def extract_metadata(file_path: str, chunks: List[str]) -> List[Dict[str, Any]]:
"""Extract metadata for a list of text chunks.
For each chunk, create a metadata dictionary containing:
- filename: basename of the provided file_path
- upload_date: ISO 8601 timestamp of when metadata was generated
- content_summary: first 200 characters of the chunk (or full chunk if shorter)
- chunk_index: 0-based index of the chunk
Args:
file_path: Path to the file associated with the chunks.
chunks: List of string chunks to generate metadata for.
Returns:
A list of metadata dictionaries, one per chunk. If chunks is empty, returns an empty list.
Raises:
FileNotFoundError: If the provided file_path does not exist.
"""
# Edge case: no chunks to metadataize
if not chunks:
return []
# Validate file existence up-front to follow the edge-case requirements
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
filename = os.path.basename(file_path)
upload_date = datetime.now().isoformat()
metadata: List[Dict[str, Any]] = []
for idx, chunk in enumerate(chunks):
# Ensure we always have a string for summary extraction
text = chunk if isinstance(chunk, str) else ""
content_summary = text[:200]
metadata.append(
{
"filename": filename,
"upload_date": upload_date,
"content_summary": content_summary,
"chunk_index": idx,
}
)
return metadata

View File

@ -0,0 +1,28 @@
from __future__ import annotations
from typing import Optional
from pypdf import PdfReader
def parse_pdf(file_path: str) -> str:
"""Parse a PDF file and return its text content.
Text is collected from each page and concatenated with newlines between pages.
Empty PDFs or corrupted files raise ValueError.
"""
try:
reader = PdfReader(file_path)
except Exception as exc:
raise ValueError(f"Invalid PDF file: {exc}") from exc
texts = []
try:
for page in reader.pages:
text = page.extract_text()
if text:
# Normalize line endings and trim unrelated whitespace
texts.append(text.strip())
except Exception as exc:
raise ValueError(f"Failed to extract text from PDF: {exc}") from exc
return "\n".join(texts).strip()

5
backend/pytest.ini Normal file
View File

@ -0,0 +1,5 @@
[pytest]
markers =
acceptance: Acceptance tests with real external services (LLM, ASR, ChromaDB)
slow: Tests that take longer than 1 second
asyncio_mode = auto