"""Q&A-pair chunking utilities for Package 8. Provides section detection (LLM + regex), text preprocessing, and chunk building for LegCo documents with Q&A structure. """ from __future__ import annotations import json import logging import re from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple logger = logging.getLogger(__name__) @dataclass class Section: """A detected section within a LegCo document.""" type: str # "qa" | "narrative" | "speaking_notes" | "table" | "toc" | "heading_only" heading: str = "" qa_id: Optional[str] = None question: Optional[str] = None answer: Optional[str] = None content: str = "" start_page: int = 1 end_page: int = 1 has_table: bool = False parent_topic: str = "" _FOOTER_RE = re.compile(r"^[A-Z]-\d+\s*$", re.MULTILINE) _FOOTER_DATE_RE = re.compile(r"^[A-Z]-\d+\s*\n\d{4}-\d{2}-\d{2}$", re.MULTILINE) _HEADER_LETTER_RE = re.compile(r"^(\([A-Z]\))\s*$", re.MULTILINE) _FULLWIDTH_COLON_RE = re.compile("[︰:]") def preprocess_text(pages: List[Tuple[int, str]]) -> str: """Concatenate pages, strip footers/headers, normalize colons, insert [PAGE_BREAK: N] markers.""" parts: List[str] = [] for idx, (page_num, page_text) in enumerate(pages): text = _FOOTER_DATE_RE.sub("", page_text) text = _FOOTER_RE.sub("", text) if idx > 0: text = _HEADER_LETTER_RE.sub("", text) text = _FULLWIDTH_COLON_RE.sub(":", text) parts.append(f"[PAGE_BREAK: {page_num}]\n{text}") return "\n".join(parts) _STRUCTURE_PROMPT_TEMPLATE = """You are analyzing a Hong Kong Legislative Council document. The text has page markers like [PAGE_BREAK: N] showing where pages begin. For each distinct section in this document, identify: 1. The section type: - "qa": a question-and-answer pair (問/答 or Q1/Q2 format) - "narrative": policy text, explanatory paragraphs, section content with bullets - "speaking_notes": briefing points (發言要點) with bullet markers - "table": standalone data tables (not embedded in answers) - "toc": table of contents - "heading_only": a section heading with no following content 2. For "qa" sections: - The question text (exact) - The answer text (exact, including tables, bullet lists, and [內部參考] content) - The question ID if present (e.g. "A1", "Q3") - The start page and end page 3. For all sections: - The section heading (e.g. "(A) 排水系統", "(1) 住戶的安置補償") - The start page and end page - Whether the section contains tables Return JSON: {{ "sections": [ {{ "type": "qa", "heading": "(A) 排水系統", "qa_id": "A1", "question": "...", "answer": "...", "start_page": 2, "end_page": 3, "has_table": true, "parent_topic": "排水系統" }}, {{ "type": "narrative", "heading": "(1) 住戶的安置補償", "content": "...", "start_page": 2, "end_page": 5, "has_table": false }} ] }} DOCUMENT TEXT: {document_text}""" def build_structure_detection_prompt(text: str) -> str: """Construct the LLM prompt for section classification.""" return _STRUCTURE_PROMPT_TEMPLATE.format(document_text=text) _MARKDOWN_FENCE_RE = re.compile(r"```(?:json)?\s*\n?(.*?)\n?```", re.DOTALL) def parse_llm_structure_response(response_text: str) -> List[Section]: """Parse the JSON returned by the LLM. Handle markdown code fences. Raises ValueError if response is not valid JSON. """ cleaned = response_text.strip() fence_match = _MARKDOWN_FENCE_RE.search(cleaned) if fence_match: cleaned = fence_match.group(1).strip() try: data = json.loads(cleaned) except json.JSONDecodeError as exc: raise ValueError(f"Invalid JSON from LLM structure detection: {exc}") from exc sections_raw = data.get("sections", []) sections: List[Section] = [] for raw in sections_raw: sections.append(Section( type=raw.get("type", "narrative"), heading=raw.get("heading", ""), qa_id=raw.get("qa_id"), question=raw.get("question"), answer=raw.get("answer"), content=raw.get("content", ""), start_page=raw.get("start_page", 1), end_page=raw.get("end_page", 1), has_table=raw.get("has_table", False), parent_topic=raw.get("parent_topic", ""), )) return sections _CN_QA_RE = re.compile( r"問\s*([A-Z]\d+)\s*[︰::]\s*(.*?)\s*" r"(?:\n\s*答\s*\1\s*[︰::]\s*(.*?)\s*)" r"(?=\n\s*(?:問\s*[A-Z]\d+\s*[︰::]|$))", re.DOTALL, ) def split_chinese_qa(text: str) -> List[Section]: """Regex fast-pass for 問/答 format. Returns empty list if no matches found.""" sections: List[Section] = [] for m in _CN_QA_RE.finditer(text): qa_id = m.group(1) question = m.group(2).strip() answer = (m.group(3) or "").strip() sections.append(Section( type="qa", qa_id=qa_id, question=question, answer=answer, )) return sections _EN_QA_RE = re.compile( r"^(Q\d+)\s+(.*?)\s*$\n((?:(?!^Q\d+).+(?:\n|$))*)", re.MULTILINE, ) def split_english_qa(text: str) -> List[Section]: """Regex fast-pass for Q-number format. Returns empty list if no matches found.""" sections: List[Section] = [] for m in _EN_QA_RE.finditer(text): qa_id = m.group(1) question = m.group(2).strip() answer = m.group(3).strip() sections.append(Section( type="qa", qa_id=qa_id, question=question, answer=answer, )) return sections def _estimate_tokens(text: str) -> int: """Rough token estimate: ~1.3 tokens per CJK char, ~1 token per 4 chars for Latin.""" cjk_count = 0 latin_len = 0 for ch in text: if "\u4e00" <= ch <= "\u9fff": cjk_count += 1 else: latin_len += 1 return int(cjk_count * 1.3 + latin_len / 4) def _split_oversized_qa( question: str, answer: str, page: int, heading: str, qa_id: Optional[str], question_index: int, has_table: bool, parent_topic: str, start_page: int, end_page: int, max_tokens: int, ) -> List[Tuple[str, int, dict]]: """Recursively split an oversized Q&A answer with question prepended to each sub-chunk.""" # Try paragraph boundaries first parts = answer.split("\n\n") if len(parts) <= 1: parts = answer.split("\n") # Group parts into sub-chunks that fit within max_tokens sub_chunks: List[str] = [] current = "" for part in parts: candidate = (current + "\n\n" + part) if current else part if _estimate_tokens(f"Question: {question}\n\nAnswer (part 1/N): {candidate}") > max_tokens and current: sub_chunks.append(current) current = part else: current = candidate if current: sub_chunks.append(current) total = len(sub_chunks) results: List[Tuple[str, int, dict]] = [] for i, sub in enumerate(sub_chunks): chunk_text = f"Question: {question}\n\nAnswer (part {i + 1}/{total}): {sub}" meta = { "strategy_type": "question", "section_type": "qa", "question_index": question_index, "question_id": qa_id, "question_text": question, "section_heading": heading, "answer_contains_table": has_table, "source_page_range": [start_page, end_page], "parent_topic": parent_topic, } results.append((chunk_text, page, meta)) return results def build_chunks_from_sections( sections: List[Section], max_tokens: int = 3000, ) -> List[Tuple[str, int, dict]]: """Build chunk texts + page refs + metadata from sections. Returns List[(chunk_text, page_number, metadata_dict)]. """ chunks: List[Tuple[str, int, dict]] = [] qa_index = 0 for section in sections: if section.type in ("toc", "heading_only"): continue if section.type == "qa": question_text = section.question or "" answer_text = section.answer or "" chunk_text = f"Question: {question_text}\n\nAnswer: {answer_text}" if section.heading: chunk_text = f"[{section.heading}]\n{chunk_text}" page = section.start_page meta: Dict = { "strategy_type": "question", "section_type": "qa", "question_index": qa_index, "question_id": section.qa_id, "question_text": question_text, "section_heading": section.heading, "answer_contains_table": section.has_table, "source_page_range": [section.start_page, section.end_page], "parent_topic": section.parent_topic, } if _estimate_tokens(chunk_text) > max_tokens: chunks.extend(_split_oversized_qa( question=question_text, answer=answer_text, page=page, heading=section.heading, qa_id=section.qa_id, question_index=qa_index, has_table=section.has_table, parent_topic=section.parent_topic, start_page=section.start_page, end_page=section.end_page, max_tokens=max_tokens, )) else: chunks.append((chunk_text, page, meta)) qa_index += 1 elif section.type == "narrative": content = section.content if not content.strip(): continue prefix = f"[{section.heading}]\n" if section.heading else "" chunk_text = f"{prefix}{content}" meta = { "strategy_type": "question", "section_type": "narrative", "section_heading": section.heading, "source_page_range": [section.start_page, section.end_page], } if _estimate_tokens(chunk_text) <= max_tokens: chunks.append((chunk_text, section.start_page, meta)) else: paragraphs = content.split("\n\n") current = "" for para in paragraphs: candidate = (current + "\n\n" + para) if current else para full = f"{prefix}{candidate}" if _estimate_tokens(full) > max_tokens and current: chunks.append((f"{prefix}{current}", section.start_page, dict(meta))) current = para else: current = candidate if current: chunks.append((f"{prefix}{current}", section.start_page, dict(meta))) elif section.type == "speaking_notes": content = section.content if not content.strip(): continue bullets = re.split(r"(?=⚫)", content) bullets = [b.strip() for b in bullets if b.strip()] if not bullets: bullets = [content] prefix = f"[{section.heading}]\n" if section.heading else "" for bullet in bullets: chunk_text = f"{prefix}{bullet}" meta = { "strategy_type": "question", "section_type": "speaking_notes", "section_heading": section.heading, "source_page_range": [section.start_page, section.end_page], } chunks.append((chunk_text, section.start_page, meta)) elif section.type == "table": content = section.content if not content.strip(): continue chunk_text = f"[{section.heading}]\n{content}" if section.heading else content meta = { "strategy_type": "question", "section_type": "table", "section_heading": section.heading, "answer_contains_table": True, "source_page_range": [section.start_page, section.end_page], } chunks.append((chunk_text, section.start_page, meta)) return chunks