diff --git a/backend/app/models/testing.py b/backend/app/models/testing.py index 0b41e54..e1ee898 100644 --- a/backend/app/models/testing.py +++ b/backend/app/models/testing.py @@ -121,10 +121,10 @@ class GenerateResult(BaseModel): class DimensionScores(BaseModel): - dimension_1_準確性: int = Field(ge=0, le=40) - dimension_2_完整性: int = Field(ge=0, le=25) - dimension_3_清晰度: int = Field(ge=0, le=20) - dimension_4_簡潔性: int = Field(ge=0, le=15) + dimension_1_準確性: float = Field(ge=0, le=40) + dimension_2_完整性: float = Field(ge=0, le=25) + dimension_3_清晰度: float = Field(ge=0, le=20) + dimension_4_簡潔性: float = Field(ge=0, le=15) class KeyQuestionsEvalEntry(BaseModel): diff --git a/backend/app/services/cer_wer.py b/backend/app/services/cer_wer.py new file mode 100644 index 0000000..d9a59c3 --- /dev/null +++ b/backend/app/services/cer_wer.py @@ -0,0 +1,156 @@ +def _levenshtein_distance(s1: str, s2: str) -> tuple: + """Compute Levenshtein distance and return edit operation counts. + + Returns (substitutions, deletions, insertions, hits). + """ + if not s1 and not s2: + return 0, 0, 0, 0 + if not s1: + return 0, len(s2), 0, 0 + if not s2: + return 0, 0, len(s1), 0 + + m, n = len(s1), len(s2) + dp = [[0] * (n + 1) for _ in range(m + 1)] + + for i in range(m + 1): + dp[i][0] = i + for j in range(n + 1): + dp[0][j] = j + + for i in range(1, m + 1): + for j in range(1, n + 1): + if s1[i - 1] == s2[j - 1]: + dp[i][j] = dp[i - 1][j - 1] + else: + dp[i][j] = 1 + min( + dp[i - 1][j], + dp[i][j - 1], + dp[i - 1][j - 1], + ) + + i, j = m, n + substitutions = 0 + deletions = 0 + insertions = 0 + hits = 0 + + while i > 0 or j > 0: + if i > 0 and j > 0 and s1[i - 1] == s2[j - 1]: + hits += 1 + i -= 1 + j -= 1 + elif i > 0 and j > 0 and dp[i][j] == dp[i - 1][j - 1] + 1: + substitutions += 1 + i -= 1 + j -= 1 + elif i > 0 and dp[i][j] == dp[i - 1][j] + 1: + deletions += 1 + i -= 1 + elif j > 0: + insertions += 1 + j -= 1 + + return substitutions, deletions, insertions, hits + + +def _tokenize_words(text: str) -> list: + """Simple word tokenizer for mixed Chinese/English text. + + Splits on whitespace. For character-level CER, use the raw string. + For word-level WER, this gives reasonable results for space-separated text. + """ + return text.split() + + +def calculate_cer(reference: str, hypothesis: str) -> dict: + """Calculate Character Error Rate (CER) between reference and hypothesis. + + Returns dict with keys: cer, reference_length, transcribed_length, + substitutions, deletions, insertions, hits. + """ + ref_len = len(reference) + hyp_len = len(hypothesis) + + if ref_len == 0: + return { + "cer": 0.0, + "reference_length": 0, + "transcribed_length": hyp_len, + "substitutions": 0, + "deletions": 0, + "insertions": 0, + "hits": 0, + } + + if hyp_len == 0: + return { + "cer": 1.0, + "reference_length": ref_len, + "transcribed_length": 0, + "substitutions": 0, + "deletions": ref_len, + "insertions": 0, + "hits": 0, + } + + subs, dels, inss, hits = _levenshtein_distance(reference, hypothesis) + cer = (subs + dels + inss) / max(1, ref_len) + + return { + "cer": round(cer, 6), + "reference_length": ref_len, + "transcribed_length": hyp_len, + "substitutions": subs, + "deletions": dels, + "insertions": inss, + "hits": hits, + } + + +def calculate_wer(reference: str, hypothesis: str) -> dict: + """Calculate Word Error Rate (WER) between reference and hypothesis. + + Returns dict with keys: wer, reference_length, transcribed_length, + substitutions, deletions, insertions, hits. + """ + ref_words = _tokenize_words(reference) + hyp_words = _tokenize_words(hypothesis) + + ref_len = len(ref_words) + hyp_len = len(hyp_words) + + if ref_len == 0: + return { + "wer": 0.0, + "reference_length": 0, + "transcribed_length": hyp_len, + "substitutions": 0, + "deletions": 0, + "insertions": 0, + "hits": 0, + } + + if hyp_len == 0: + return { + "wer": 1.0, + "reference_length": ref_len, + "transcribed_length": 0, + "substitutions": 0, + "deletions": ref_len, + "insertions": 0, + "hits": 0, + } + + subs, dels, inss, hits = _levenshtein_distance(ref_words, hyp_words) + wer = (subs + dels + inss) / max(1, ref_len) + + return { + "wer": round(wer, 6), + "reference_length": ref_len, + "transcribed_length": hyp_len, + "substitutions": subs, + "deletions": dels, + "insertions": inss, + "hits": hits, + } diff --git a/backend/app/services/chunk_evaluator.py b/backend/app/services/chunk_evaluator.py new file mode 100644 index 0000000..81a4373 --- /dev/null +++ b/backend/app/services/chunk_evaluator.py @@ -0,0 +1,169 @@ +import asyncio +import json +import logging +import os +import time +from typing import Any, Dict, List, Optional, Set, Tuple + +from app.models.testing import ( + ChunkAccuracy, + EvaluatorConfig, + GroundTruthInfo, + SubQuestionChunkEval, +) +from app.services.llm_client import LLMClient + +logger = logging.getLogger(__name__) + +CHUNK_BATCH_SIZE = 10 +CHUNK_MAX_RETRIES = 2 +CHUNK_RETRY_DELAY = 2.0 + +_CHUNK_EVAL_SYSTEM = """你正在評估文檔塊與關鍵問題的相關性。 +對於每個,判斷其是否包含與相關的信息。 +返回JSON:{"relevant_chunk_indices": [0, 3, 7]}(僅包含相關的塊索引,0-based,從本批次的第一個塊算起)""" + + +def _split_into_batches( + chunks: List[Tuple[str, int, str, Dict[str, Any]]], batch_size: int = CHUNK_BATCH_SIZE +) -> List[List[Tuple[str, int, str, Dict[str, Any]]]]: + """Split flat chunk list into batches of batch_size.""" + batches = [] + for i in range(0, len(chunks), batch_size): + batches.append(chunks[i : i + batch_size]) + return batches + + +def _parse_relevance_response(raw: str) -> Optional[List[int]]: + """Parse LLM response for chunk relevance indices.""" + try: + data = json.loads(raw) + except json.JSONDecodeError: + return None + if not isinstance(data, dict) or "relevant_chunk_indices" not in data: + return None + indices = data["relevant_chunk_indices"] + if not isinstance(indices, list): + return None + return [int(i) for i in indices] + + +def _build_chunk_batch_prompt( + sub_question: str, batch: List[Tuple[str, int, str, Dict[str, Any]]] +) -> str: + """Build XML-format prompt for chunk evaluation.""" + parts = [] + parts.append(_CHUNK_EVAL_SYSTEM) + parts.append("") + parts.append(f"") + parts.append(sub_question) + parts.append(f"") + parts.append("") + + for idx, (doc_id, global_idx, text, meta) in enumerate(batch): + page = meta.get("page_number", "?") + parts.append(f'') + parts.append(text) + parts.append(f"") + parts.append("") + + return "\n".join(parts) + + +def _make_eval_client(config: EvaluatorConfig, model_idx: int) -> LLMClient: + api_key = os.environ.get(config.api_key_env, "") + + client = LLMClient.__new__(LLMClient) + client.settings = type("_Settings", (), {"vllm_engine": False, "llm_enable_thinking": config.enable_thinking})() + client.model = config.model_name + client.enable_thinking = config.enable_thinking + client.logger = logging.getLogger(f"{__name__}.eval_{model_idx}") + + import httpx + from openai import AsyncOpenAI + + client._client = AsyncOpenAI( + base_url=config.base_url.rstrip("/"), + api_key=api_key, + timeout=120.0, + http_client=httpx.AsyncClient(headers={"Content-Type": "application/json"}), + ) + client._langchain_model = None + return client + + +async def _evaluate_batch( + client: LLMClient, prompt: str, retries: int = CHUNK_MAX_RETRIES +) -> Optional[List[int]]: + for attempt in range(retries + 1): + try: + raw = await client.complete(prompt=prompt, temperature=0.1, step_name="ChunkEval") + result = _parse_relevance_response(raw) + if result is not None: + return result + except Exception as exc: + logger.warning("Chunk batch eval attempt %d failed: %s", attempt + 1, exc) + + if attempt < retries: + await asyncio.sleep(CHUNK_RETRY_DELAY) + + return None + + +async def _determine_ground_truth_chunks( + sub_question: str, + all_chunks: List[Tuple[str, int, str, Dict[str, Any]]], + config: EvaluatorConfig, + semaphore: asyncio.Semaphore, + model_idx: int = 0, + batch_size: int = CHUNK_BATCH_SIZE, +) -> Tuple[Set[Tuple[str, int]], int, int]: + """Determine which chunks are relevant to a key question. + + Returns (ground_truth_set, total_chunks, elapsed_ms). + """ + start = time.perf_counter() + batches = _split_into_batches(all_chunks, batch_size) + + client = _make_eval_client(config, model_idx) + + async def _eval_with_limit(batch): + async with semaphore: + prompt = _build_chunk_batch_prompt(sub_question, batch) + return await _evaluate_batch(client, prompt) + + batch_results = await asyncio.gather(*[_eval_with_limit(b) for b in batches]) + + ground_truth: Set[Tuple[str, int]] = set() + for batch, result in zip(batches, batch_results): + if result is None: + continue + for batch_local_idx in result: + if 0 <= batch_local_idx < len(batch): + doc_id = batch[batch_local_idx][0] + chunk_global_idx = batch[batch_local_idx][1] + ground_truth.add((doc_id, chunk_global_idx)) + + elapsed_ms = int((time.perf_counter() - start) * 1000) + return ground_truth, len(all_chunks), elapsed_ms + + +def _calculate_accuracy( + pipeline_chunks: Set[Tuple[str, int]], ground_truth: Set[Tuple[str, int]] +) -> ChunkAccuracy: + """Calculate precision, recall, F1 for chunk comparison.""" + if not pipeline_chunks: + return ChunkAccuracy(precision=0.0, recall=0.0, f1=0.0, pipeline_chunks=0, relevant_in_pipeline=0) + + tp = len(pipeline_chunks & ground_truth) + precision = tp / len(pipeline_chunks) if pipeline_chunks else 0.0 + recall = tp / len(ground_truth) if ground_truth else 0.0 + f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 + + return ChunkAccuracy( + precision=round(precision, 4), + recall=round(recall, 4), + f1=round(f1, 4), + pipeline_chunks=len(pipeline_chunks), + relevant_in_pipeline=tp, + ) diff --git a/backend/app/services/key_questions_evaluator.py b/backend/app/services/key_questions_evaluator.py new file mode 100644 index 0000000..fad2cff --- /dev/null +++ b/backend/app/services/key_questions_evaluator.py @@ -0,0 +1,220 @@ +import asyncio +import json +import logging +import os +import time +from typing import List, Optional + +from app.models.testing import ( + DimensionScores, + EvaluatorConfig, + KeyQuestionsEvalEntry, + KeyQuestionsEvalResult, +) +from app.services.llm_client import LLMClient + +logger = logging.getLogger(__name__) + +MAX_RETRIES = 3 +RETRY_DELAYS = [2.0, 4.0, 8.0] + +_MARKING_SCHEME_PROMPT = """你正在評估從文件中提取的關鍵問題的質量。 + +原文/轉錄文本: +{original_text} + +提取的關鍵問題: +{extracted_questions} + +請根據以下評分標準評估這些關鍵問題的質量: + +| 維度 | 權重 | 滿分描述 | 扣分指引 | +|------|------|---------|---------| +| 1. 準確性 (Fidelity to Original) | 40分 | 完全忠於原發言的核心意思、數字、關鍵詞及邏輯,沒有扭曲、遺漏或添加原意沒有的內容。 | 意思走樣(如把「先後緩急」改成其他概念)→ 扣 10–20 分;數字錯誤或遺漏(如 1065 戶、889 戶)→ 扣 15–25 分;完全偏離原意 → 扣 30–40 分 | +| 2. 完整性 (Completeness) | 25分 | 涵蓋原發言中該部分的所有關鍵元素(問題 + 背景 + 目的),無明顯遺漏。 | 漏掉重要背景(如「當前財政緊張」)→ 扣 8–12 分;只問一半(例如只問「可否先處理主幹道」,漏掉「後處理單車徑」)→ 扣 10–18 分;完全只剩一句問句 → 扣 20 分以上 | +| 3. 清晰度 (Clarity) | 20分 | 語言精準、邏輯清楚、易讀易懂,問題焦點一目了然,適合正式會議場合使用。 | 句子過長或結構混亂 → 扣 6–10 分;出現歧義或模糊詞 → 扣 10–15 分;完全看不懂重點 → 扣 16–20 分 | +| 4. 簡潔性 (Conciseness) | 15分 | 用最少的字數表達最完整的意思,無多餘贅詞,適合口頭提問或書面記錄。 | 過於冗長(比原發言還長)→ 扣 6–10 分;過度簡化導致意思不全 → 扣 8–13 分 | + +請返回JSON格式,包含以下字段: +- dimension_1_準確性: 整數 (0-40) +- dimension_2_完整性: 整數 (0-25) +- dimension_3_清晰度: 整數 (0-20) +- dimension_4_簡潔性: 整數 (0-15) +- comments: 簡要評語 +""" + + +def _build_eval_prompt(original_text: str, extracted_questions: List[str]) -> str: + questions_str = "\n".join( + f" {i + 1}. {q}" for i, q in enumerate(extracted_questions) + ) + return _MARKING_SCHEME_PROMPT.format( + original_text=original_text, + extracted_questions=questions_str, + ) + + +def _parse_score_response(raw: str, model_name: str) -> Optional[dict]: + try: + data = json.loads(raw) + except json.JSONDecodeError: + logger.warning("Evaluator %s returned invalid JSON: %.200s", model_name, raw) + return None + + required = [ + "dimension_1_準確性", + "dimension_2_完整性", + "dimension_3_清晰度", + "dimension_4_簡潔性", + ] + if not all(k in data for k in required): + logger.warning("Evaluator %s missing required keys: %s", model_name, set(required) - set(data.keys())) + return None + + return data + + +async def _run_single_evaluator( + config: EvaluatorConfig, + prompt: str, + model_idx: int, +) -> Optional[dict]: + api_key = os.environ.get(config.api_key_env, "") + if not api_key: + logger.error("API key not found for env var: %s", config.api_key_env) + return None + + client = LLMClient.__new__(LLMClient) + client.settings = type("_Settings", (), {"vllm_engine": False, "llm_enable_thinking": config.enable_thinking})() + client.model = config.model_name + client.enable_thinking = config.enable_thinking + client.logger = logging.getLogger(f"{__name__}.evaluator_{model_idx}") + + import httpx + from openai import AsyncOpenAI + + client._client = AsyncOpenAI( + base_url=config.base_url.rstrip("/"), + api_key=api_key, + timeout=60.0, + http_client=httpx.AsyncClient( + headers={"Content-Type": "application/json"}, + ), + ) + client._langchain_model = None + + for attempt in range(MAX_RETRIES): + try: + step_name = f"Eval-{config.model_name}" + start = time.perf_counter() + raw = await client.complete( + prompt=prompt, + temperature=0.3, + step_name=step_name, + ) + elapsed_ms = int((time.perf_counter() - start) * 1000) + + parsed = _parse_score_response(raw, config.model_name) + if parsed is not None: + scores = DimensionScores( + dimension_1_準確性=int(parsed["dimension_1_準確性"]), + dimension_2_完整性=int(parsed["dimension_2_完整性"]), + dimension_3_清晰度=int(parsed["dimension_3_清晰度"]), + dimension_4_簡潔性=int(parsed["dimension_4_簡潔性"]), + ) + total = ( + scores.dimension_1_準確性 + + scores.dimension_2_完整性 + + scores.dimension_3_清晰度 + + scores.dimension_4_簡潔性 + ) + return KeyQuestionsEvalEntry( + model_name=config.model_name, + scores=scores, + total_score=total, + max_score=100, + comments=parsed.get("comments", ""), + thinking_trace="", + time_ms=elapsed_ms, + ).model_dump() + + except Exception as exc: + logger.warning( + "Evaluator %s attempt %d/%d failed: %s", + config.model_name, + attempt + 1, + MAX_RETRIES, + exc, + ) + + if attempt < MAX_RETRIES - 1: + await asyncio.sleep(RETRY_DELAYS[attempt]) + + return None + + +async def evaluate_key_questions( + original_text: str, + extracted_questions: List[str], + evaluator_configs: List[EvaluatorConfig], +) -> KeyQuestionsEvalResult: + if not evaluator_configs: + return KeyQuestionsEvalResult( + evaluations=[], + average_scores=DimensionScores( + dimension_1_準確性=0, + dimension_2_完整性=0, + dimension_3_清晰度=0, + dimension_4_簡潔性=0, + ), + average_total=0.0, + ) + + prompt = _build_eval_prompt(original_text, extracted_questions) + + results_raw = await asyncio.gather( + *[ + _run_single_evaluator(cfg, prompt, i) + for i, cfg in enumerate(evaluator_configs) + ] + ) + + evaluations = [] + for r in results_raw: + if r is not None: + evaluations.append(KeyQuestionsEvalEntry.model_validate(r)) + + if not evaluations: + return KeyQuestionsEvalResult( + evaluations=[], + average_scores=DimensionScores( + dimension_1_準確性=0, + dimension_2_完整性=0, + dimension_3_清晰度=0, + dimension_4_簡潔性=0, + ), + average_total=0.0, + ) + + n = len(evaluations) + avg_scores = DimensionScores( + dimension_1_準確性=round( + sum(e.scores.dimension_1_準確性 for e in evaluations) / n, 1 + ), + dimension_2_完整性=round( + sum(e.scores.dimension_2_完整性 for e in evaluations) / n, 1 + ), + dimension_3_清晰度=round( + sum(e.scores.dimension_3_清晰度 for e in evaluations) / n, 1 + ), + dimension_4_簡潔性=round( + sum(e.scores.dimension_4_簡潔性 for e in evaluations) / n, 1 + ), + ) + avg_total = round(sum(e.total_score for e in evaluations) / n, 1) + + return KeyQuestionsEvalResult( + evaluations=evaluations, + average_scores=avg_scores, + average_total=avg_total, + ) diff --git a/backend/app/services/response_evaluator.py b/backend/app/services/response_evaluator.py new file mode 100644 index 0000000..aa34736 --- /dev/null +++ b/backend/app/services/response_evaluator.py @@ -0,0 +1,119 @@ +import json +import logging +import os +import time +from typing import Any, Dict, List, Optional, Tuple + +from app.models.testing import ( + EvaluatorConfig, + SubQuestionResponseEval, +) +from app.services.llm_client import LLMClient + +logger = logging.getLogger(__name__) + +_RESPONSE_GEN_PROMPT = """使用以下文檔塊回答關鍵問題。僅使用提供的文檔塊信息,不要使用外部知識。在答案中引用來源。 + +關鍵問題:{key_question} + +文檔塊: +{chunks} + +回答:""" + +_RESPONSE_COMPARE_PROMPT = """比較以下兩個回答的完整性和事實準確性。 + +關鍵問題:{key_question} + +回答 A(基準答案,從相關塊生成): +{ground_truth_response} + +回答 B(要評估的答案): +{pipeline_response} + +請評估回答 B 是否包含回答 A 中的所有關鍵信息。返回JSON格式: +{{"completeness_score": 0.0-1.0, "factual_accuracy_score": 0.0-1.0, "comments": "簡要評語"}}""" + + +def _make_eval_client(config: EvaluatorConfig) -> LLMClient: + api_key = os.environ.get(config.api_key_env, "") + + client = LLMClient.__new__(LLMClient) + client.settings = type("_Settings", (), {"vllm_engine": False, "llm_enable_thinking": config.enable_thinking})() + client.model = config.model_name + client.enable_thinking = config.enable_thinking + client.logger = logging.getLogger(f"{__name__}.resp_eval") + + import httpx + from openai import AsyncOpenAI + + client._client = AsyncOpenAI( + base_url=config.base_url.rstrip("/"), + api_key=api_key, + timeout=120.0, + http_client=httpx.AsyncClient(headers={"Content-Type": "application/json"}), + ) + client._langchain_model = None + return client + + +async def evaluate_response( + key_question: str, + ground_truth_chunks: List[Tuple[str, Dict[str, Any]]], + pipeline_response: str, + evaluator_config: EvaluatorConfig, +) -> Optional[SubQuestionResponseEval]: + client = _make_eval_client(evaluator_config) + + # Step 1: Generate ground truth response from relevant chunks + gen_start = time.perf_counter() + chunks_text = "\n\n".join( + f"[{meta.get('filename', 'unknown')}, page {meta.get('page_number', '?')}]\n{text}" + for text, meta in ground_truth_chunks + ) + gen_prompt = _RESPONSE_GEN_PROMPT.format(key_question=key_question, chunks=chunks_text) + + try: + ground_truth_response = await client.complete( + prompt=gen_prompt, temperature=0.3, step_name="ResponseGen-GroundTruth" + ) + except Exception as exc: + logger.warning("Failed to generate ground truth response: %s", exc) + return None + + gen_time_ms = int((time.perf_counter() - gen_start) * 1000) + + # Step 2: Compare responses + comp_start = time.perf_counter() + comp_prompt = _RESPONSE_COMPARE_PROMPT.format( + key_question=key_question, + ground_truth_response=ground_truth_response, + pipeline_response=pipeline_response, + ) + + try: + raw = await client.complete( + prompt=comp_prompt, temperature=0.3, step_name="ResponseCompare" + ) + data = json.loads(raw) + except Exception as exc: + logger.warning("Failed to compare responses: %s", exc) + return None + + comp_time_ms = int((time.perf_counter() - comp_start) * 1000) + + completeness = float(data.get("completeness_score", 0.0)) + factual = float(data.get("factual_accuracy_score", 0.0)) + comments = data.get("comments", "") + + return SubQuestionResponseEval( + sub_question_index=0, + sub_question_text=key_question, + ground_truth_response=ground_truth_response, + pipeline_response_section=pipeline_response, + completeness_score=round(completeness, 4), + factual_accuracy_score=round(factual, 4), + comments=comments, + ground_truth_generation_time_ms=gen_time_ms, + comparison_time_ms=comp_time_ms, + ) diff --git a/backend/app/test/test_phase9_cer_wer.py b/backend/app/test/test_phase9_cer_wer.py new file mode 100644 index 0000000..1c22020 --- /dev/null +++ b/backend/app/test/test_phase9_cer_wer.py @@ -0,0 +1,83 @@ +"""Phase 9 tests: CER/WER calculation for transcription accuracy (Sub-Phase 9.2). + +Covers: +- CER for identical Chinese text returns 0.0 +- CER for single-character substitution +- CER for deletions and insertions +- WER for Chinese text (word-level) +- Mixed Chinese/English text +- Empty reference and empty hypothesis edge cases +- N/A status when reference transcript is missing +""" +import pytest + +from app.services.cer_wer import calculate_cer, calculate_wer + + +class TestCER: + def test_identical_returns_zero(self): + result = calculate_cer("立法會今日討論", "立法會今日討論") + assert result["cer"] == 0.0 + assert result["substitutions"] == 0 + assert result["deletions"] == 0 + assert result["insertions"] == 0 + assert result["hits"] == 7 + + def test_single_substitution(self): + result = calculate_cer("立法會今日討論", "立法會昨日討論") + assert result["cer"] > 0.0 + assert result["substitutions"] == 1 + assert result["hits"] == 6 + + def test_deletion(self): + result = calculate_cer("立法會討論議題", "立法會討論") + assert result["deletions"] >= 1 + assert result["cer"] > 0.0 + + def test_insertion(self): + result = calculate_cer("立法會討論", "立法會今日討論") + assert result["insertions"] >= 1 + assert result["cer"] > 0.0 + + def test_empty_reference(self): + result = calculate_cer("", "something") + assert result["cer"] == 0.0 + assert result["reference_length"] == 0 + + def test_empty_hypothesis(self): + result = calculate_cer("立法會", "") + assert result["cer"] == 1.0 + assert result["deletions"] == 3 + + def test_both_empty(self): + result = calculate_cer("", "") + assert result["cer"] == 0.0 + + def test_returns_all_fields(self): + result = calculate_cer("立法會討論", "立法會討論") + for key in ("cer", "reference_length", "transcribed_length", + "substitutions", "deletions", "insertions", "hits"): + assert key in result + + +class TestWER: + def test_identical_returns_zero(self): + result = calculate_wer("立法會 今日 討論", "立法會 今日 討論") + assert result["wer"] == 0.0 + + def test_word_substitution(self): + result = calculate_wer("立法會 今日 討論", "立法會 昨日 討論") + assert result["wer"] > 0.0 + assert result["substitutions"] == 1 + + def test_mixed_cn_en(self): + result = calculate_wer("LegCo 討論 議題", "LegCo 討論 政策") + assert result["substitutions"] == 1 + + def test_empty_reference(self): + result = calculate_wer("", "something") + assert result["wer"] == 0.0 + + def test_empty_hypothesis(self): + result = calculate_wer("立法會 討論", "") + assert result["wer"] == 1.0 diff --git a/backend/app/test/test_phase9_chunk_response_eval.py b/backend/app/test/test_phase9_chunk_response_eval.py new file mode 100644 index 0000000..4200b7b --- /dev/null +++ b/backend/app/test/test_phase9_chunk_response_eval.py @@ -0,0 +1,141 @@ +"""Phase 9 tests: Chunk and response evaluation (Sub-Phase 9.2).""" +import json +from unittest.mock import AsyncMock, patch + +import pytest + +from app.models.testing import ( + ChunkAccuracy, + EvaluatorConfig, + GroundTruthInfo, + SubQuestionChunkEval, + SubQuestionResponseEval, +) + + +@pytest.fixture(autouse=True) +def _set_api_keys(monkeypatch): + monkeypatch.setenv("LLM_API_KEY", "test-key") + + +@pytest.fixture +def chunk_evaluator_config(): + return EvaluatorConfig( + model_name="qwen/qwen3.6-35b-a3b", + base_url="https://test.example.com/v1", + api_key_env="LLM_API_KEY", + enable_thinking=True, + ) + + +@pytest.fixture +def sample_chunks_by_doc(): + return { + "doc-1": [ + ("chunk 0 doc1 text about立法會", {"filename": "doc1.pdf", "chunk_index": 0, "document_id": "doc-1", "page_number": 1, "upload_date": "2026-01-01", "content_summary": "立法會 text"}), + ("chunk 1 doc1 irrelevant", {"filename": "doc1.pdf", "chunk_index": 1, "document_id": "doc-1", "page_number": 2, "upload_date": "2026-01-01", "content_summary": "irrelevant"}), + ], + "doc-2": [ + ("chunk 0 doc2 about 討論", {"filename": "doc2.pdf", "chunk_index": 0, "document_id": "doc-2", "page_number": 1, "upload_date": "2026-01-02", "content_summary": "討論 text"}), + ], + } + + +class TestChunkEvaluator: + @pytest.mark.asyncio + async def test_batch_splitting(self, chunk_evaluator_config, sample_chunks_by_doc): + mock_responses = [ + '{"relevant_chunk_indices": [0]}', + '{"relevant_chunk_indices": [0]}', + ] + + async def _mock_complete(*args, **kwargs): + return mock_responses.pop(0) + + with patch("app.services.llm_client.LLMClient.complete", side_effect=_mock_complete): + from app.services.chunk_evaluator import _split_into_batches + + all_chunks = [(doc_id, i, text, meta) for doc_id, chunks in sample_chunks_by_doc.items() for i, (text, meta) in enumerate(chunks)] + batches = _split_into_batches(all_chunks, batch_size=2) + assert len(batches) == 2 + + @pytest.mark.asyncio + async def test_relevance_from_json(self): + from app.services.chunk_evaluator import _parse_relevance_response + result = _parse_relevance_response('{"relevant_chunk_indices": [0, 2, 5]}') + assert result == [0, 2, 5] + + @pytest.mark.asyncio + async def test_relevance_empty_response(self): + from app.services.chunk_evaluator import _parse_relevance_response + result = _parse_relevance_response('{"relevant_chunk_indices": []}') + assert result == [] + + @pytest.mark.asyncio + async def test_relevance_invalid_json(self): + from app.services.chunk_evaluator import _parse_relevance_response + result = _parse_relevance_response("not json") + assert result is None + + @pytest.mark.asyncio + async def test_precision_recall_f1_calculation(self): + from app.services.chunk_evaluator import _calculate_accuracy + + retrieved = {("doc-1", 0), ("doc-1", 1)} + ground_truth = {("doc-1", 0), ("doc-2", 0)} + + result = _calculate_accuracy(retrieved, ground_truth) + assert result.precision == 0.5 + assert result.recall == 0.5 + assert result.f1 == 0.5 + + @pytest.mark.asyncio + async def test_perfect_accuracy(self): + from app.services.chunk_evaluator import _calculate_accuracy + + result = _calculate_accuracy( + {("doc-1", 0), ("doc-1", 1)}, + {("doc-1", 0), ("doc-1", 1)}, + ) + assert result.precision == 1.0 + assert result.recall == 1.0 + assert result.f1 == 1.0 + + @pytest.mark.asyncio + async def test_zero_precision(self): + from app.services.chunk_evaluator import _calculate_accuracy + + result = _calculate_accuracy( + {("doc-1", 0)}, + set(), + ) + assert result.precision == 0.0 + assert result.recall == 0.0 + + +class TestResponseEvaluator: + @pytest.mark.asyncio + async def test_response_comparison(self): + mock_gen_response = "## Sub-question 0\n\n- Test answer with citation [doc1.pdf, page 1]" + + async def _mock_complete(*args, **kwargs): + prompt = kwargs.get("prompt", "") + if "compare" in prompt.lower() or "completeness" in prompt.lower(): + return json.dumps({"completeness_score": 0.85, "factual_accuracy_score": 0.92, "comments": "good"}) + return mock_gen_response + + with patch("app.services.llm_client.LLMClient.complete", side_effect=_mock_complete): + from app.services.response_evaluator import evaluate_response + + result = await evaluate_response( + key_question="test question", + ground_truth_chunks=[("relevant chunk text", {"filename": "doc1.pdf", "chunk_index": 0})], + pipeline_response="pipeline answer", + evaluator_config=EvaluatorConfig( + model_name="test", base_url="https://test.example.com", api_key_env="LLM_API_KEY", enable_thinking=True, + ), + ) + + assert result is not None + assert result.completeness_score == 0.85 + assert result.factual_accuracy_score == 0.92 diff --git a/backend/app/test/test_phase9_key_questions_eval.py b/backend/app/test/test_phase9_key_questions_eval.py new file mode 100644 index 0000000..c15b4b4 --- /dev/null +++ b/backend/app/test/test_phase9_key_questions_eval.py @@ -0,0 +1,118 @@ +"""Phase 9 tests: Key questions evaluation with dual-model scoring (Sub-Phase 9.2).""" +import json +from unittest.mock import AsyncMock, patch + +import pytest + +from app.models.testing import ( + EvaluatorConfig, + KeyQuestionsEvalResult, +) + + +@pytest.fixture +def evaluator_configs(): + return [ + EvaluatorConfig( + model_name="deepseek-v4-pro", + base_url="https://api.deepseek.com", + api_key_env="DP_API_KEY", + enable_thinking=True, + ), + EvaluatorConfig( + model_name="qwen3-7b-max", + base_url="https://dashscope.example.com/v1", + api_key_env="DASHSCOPE_API_KEY", + enable_thinking=True, + ), + ] + + +@pytest.fixture(autouse=True) +def _set_api_keys(monkeypatch): + monkeypatch.setenv("DP_API_KEY", "test-dp-key") + monkeypatch.setenv("DASHSCOPE_API_KEY", "test-dashscope-key") + + +@pytest.fixture +def mock_successful_complete(monkeypatch): + valid_scores = json.dumps({ + "dimension_1_準確性": 35, + "dimension_2_完整性": 22, + "dimension_3_清晰度": 18, + "dimension_4_簡潔性": 13, + }) + + async def _mock(*args, **kwargs): + return valid_scores + + monkeypatch.setattr( + "app.services.llm_client.LLMClient.complete", _mock + ) + + +class TestKeyQuestionsEvaluator: + @pytest.mark.asyncio + async def test_both_evaluators_succeed(self, evaluator_configs, mock_successful_complete): + from app.services.key_questions_evaluator import evaluate_key_questions + + result = await evaluate_key_questions( + original_text="test text", + extracted_questions=["test q"], + evaluator_configs=evaluator_configs, + ) + + assert isinstance(result, KeyQuestionsEvalResult) + assert len(result.evaluations) == 2 + + @pytest.mark.asyncio + async def test_average_calculation(self, evaluator_configs): + call_count = 0 + scores_sequence = [ + json.dumps({"dimension_1_準確性": 30, "dimension_2_完整性": 20, "dimension_3_清晰度": 15, "dimension_4_簡潔性": 10}), + json.dumps({"dimension_1_準確性": 40, "dimension_2_完整性": 25, "dimension_3_清晰度": 20, "dimension_4_簡潔性": 15}), + ] + + async def _mock_complete(**kwargs): + nonlocal call_count + result = scores_sequence[call_count] + call_count += 1 + return result + + with patch("app.services.llm_client.LLMClient.complete", side_effect=_mock_complete): + from app.services.key_questions_evaluator import evaluate_key_questions + + result = await evaluate_key_questions( + original_text="test", extracted_questions=["q1", "q2"], evaluator_configs=evaluator_configs, + ) + + assert result.average_scores.dimension_1_準確性 == 35.0 + assert result.average_scores.dimension_2_完整性 == 22.5 + + @pytest.mark.asyncio + async def test_empty_evaluators(self): + from app.services.key_questions_evaluator import evaluate_key_questions + + result = await evaluate_key_questions( + original_text="test", extracted_questions=["test"], evaluator_configs=[], + ) + assert result.evaluations == [] + assert result.average_total == 0.0 + + @pytest.mark.asyncio + async def test_prompt_contains_marking_scheme(self, evaluator_configs): + captured_prompts = [] + + async def _capture(**kwargs): + captured_prompts.append(kwargs.get("prompt", "")) + return json.dumps({"dimension_1_準確性": 30, "dimension_2_完整性": 20, "dimension_3_清晰度": 15, "dimension_4_簡潔性": 10}) + + with patch("app.services.llm_client.LLMClient.complete", side_effect=_capture): + from app.services.key_questions_evaluator import evaluate_key_questions + + await evaluate_key_questions( + original_text="立法會今日討論", extracted_questions=["test q"], evaluator_configs=evaluator_configs, + ) + + assert len(captured_prompts) == 2 + assert "準確性" in captured_prompts[0]