"""Query history CRUD service. Uses sync sqlite3 — all operations are instant local reads/writes. Each method opens its own connection so the service is safe to instantiate once per request without holding open file handles. """ import logging import sqlite3 logger = logging.getLogger(__name__) _SUMMARY_COLUMNS = ( "id", "input_text", "total_time_ms", "chunks_retrieved_count", "chunks_filtered_count", "profile_used", "created_at", ) _INSERT_COLUMNS = ( "input_text", "extracted_questions", "decompose_prompt", "decomposer_time_ms", "retriever_time_ms", "chunks_retrieved", "chunks_retrieved_count", "filter_prompt", "filter_time_ms", "chunks_filtered", "chunks_filtered_count", "generate_prompt", "generator_time_ms", "total_time_ms", "final_answer", "sources", "profile_used", "chunks_retrieved_per_subq_count", "chunks_filtered_per_subq_count", "highlight_prompt", "highlight_response", "highlight_time_ms", ) def _connect(db_path: str) -> sqlite3.Connection: conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row return conn def _row_to_dict(row: sqlite3.Row) -> dict: return dict(row) class HistoryService: def __init__(self, db_path: str) -> None: self._db_path = db_path def record(self, data: dict) -> int: values = [data.get(col) for col in _INSERT_COLUMNS] placeholders = ", ".join("?" for _ in _INSERT_COLUMNS) cols = ", ".join(_INSERT_COLUMNS) with _connect(self._db_path) as conn: cursor = conn.execute( f"INSERT INTO query_history ({cols}) VALUES ({placeholders})", values, ) conn.commit() row_id = cursor.lastrowid assert row_id is not None, "INSERT did not return lastrowid" return row_id def list(self, limit: int = 50, offset: int = 0) -> list[dict]: cols = ", ".join(_SUMMARY_COLUMNS) with _connect(self._db_path) as conn: rows = conn.execute( f"SELECT {cols} FROM query_history ORDER BY id DESC LIMIT ? OFFSET ?", (limit, offset), ).fetchall() return [_row_to_dict(r) for r in rows] def get(self, query_id: int) -> dict | None: with _connect(self._db_path) as conn: row = conn.execute( "SELECT * FROM query_history WHERE id=?", (query_id,), ).fetchone() if row is None: return None return _row_to_dict(row) def delete(self, query_id: int) -> bool: with _connect(self._db_path) as conn: cursor = conn.execute( "DELETE FROM query_history WHERE id=?", (query_id,), ) conn.commit() return cursor.rowcount > 0 def update_highlights(self, query_id: int, highlight_prompt: str, highlight_response: str, highlight_time_ms: int) -> bool: with _connect(self._db_path) as conn: cursor = conn.execute( "UPDATE query_history SET highlight_prompt=?, highlight_response=?, highlight_time_ms=? WHERE id=?", (highlight_prompt, highlight_response, highlight_time_ms, query_id), ) conn.commit() return cursor.rowcount > 0 def clear_all(self) -> int: with _connect(self._db_path) as conn: count = conn.execute("SELECT COUNT(*) FROM query_history").fetchone()[0] conn.execute("DELETE FROM query_history") conn.commit() return count def get_stats(self) -> dict: with _connect(self._db_path) as conn: row = conn.execute( "SELECT COUNT(*) as total_queries, " "COALESCE(AVG(total_time_ms), 0) as avg_time_ms, " "COALESCE(AVG(chunks_retrieved_count), 0) as avg_chunks_retrieved, " "COALESCE(AVG(chunks_filtered_count), 0) as avg_chunks_filtered " "FROM query_history" ).fetchone() profile_row = conn.execute( "SELECT profile_used FROM query_history " "WHERE profile_used IS NOT NULL " "GROUP BY profile_used ORDER BY COUNT(*) DESC LIMIT 1" ).fetchone() return { "total_queries": row["total_queries"], "avg_time_ms": row["avg_time_ms"], "avg_chunks_retrieved": row["avg_chunks_retrieved"], "avg_chunks_filtered": row["avg_chunks_filtered"], "most_used_profile": profile_row["profile_used"] if profile_row else None, }