119 lines
4.0 KiB
Python
119 lines
4.0 KiB
Python
"""Query history CRUD service.
|
|
|
|
Uses sync sqlite3 — all operations are instant local reads/writes.
|
|
Each method opens its own connection so the service is safe to
|
|
instantiate once per request without holding open file handles.
|
|
"""
|
|
|
|
import logging
|
|
import sqlite3
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_SUMMARY_COLUMNS = (
|
|
"id", "input_text", "total_time_ms",
|
|
"chunks_retrieved_count", "chunks_filtered_count",
|
|
"profile_used", "created_at",
|
|
)
|
|
|
|
_INSERT_COLUMNS = (
|
|
"input_text", "extracted_questions",
|
|
"decompose_prompt", "decomposer_time_ms",
|
|
"retriever_time_ms",
|
|
"chunks_retrieved", "chunks_retrieved_count",
|
|
"filter_prompt", "filter_time_ms",
|
|
"chunks_filtered", "chunks_filtered_count",
|
|
"generate_prompt", "generator_time_ms",
|
|
"total_time_ms",
|
|
"final_answer", "sources", "profile_used",
|
|
"chunks_retrieved_per_subq_count", "chunks_filtered_per_subq_count",
|
|
)
|
|
|
|
|
|
def _connect(db_path: str) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def _row_to_dict(row: sqlite3.Row) -> dict:
|
|
return dict(row)
|
|
|
|
|
|
class HistoryService:
|
|
def __init__(self, db_path: str) -> None:
|
|
self._db_path = db_path
|
|
|
|
def record(self, data: dict) -> int:
|
|
values = [data.get(col) for col in _INSERT_COLUMNS]
|
|
placeholders = ", ".join("?" for _ in _INSERT_COLUMNS)
|
|
cols = ", ".join(_INSERT_COLUMNS)
|
|
with _connect(self._db_path) as conn:
|
|
cursor = conn.execute(
|
|
f"INSERT INTO query_history ({cols}) VALUES ({placeholders})",
|
|
values,
|
|
)
|
|
conn.commit()
|
|
row_id = cursor.lastrowid
|
|
assert row_id is not None, "INSERT did not return lastrowid"
|
|
return row_id
|
|
|
|
def list(self, limit: int = 50, offset: int = 0) -> list[dict]:
|
|
cols = ", ".join(_SUMMARY_COLUMNS)
|
|
with _connect(self._db_path) as conn:
|
|
rows = conn.execute(
|
|
f"SELECT {cols} FROM query_history ORDER BY id DESC LIMIT ? OFFSET ?",
|
|
(limit, offset),
|
|
).fetchall()
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
def get(self, query_id: int) -> dict | None:
|
|
with _connect(self._db_path) as conn:
|
|
row = conn.execute(
|
|
"SELECT * FROM query_history WHERE id=?",
|
|
(query_id,),
|
|
).fetchone()
|
|
if row is None:
|
|
return None
|
|
return _row_to_dict(row)
|
|
|
|
def delete(self, query_id: int) -> bool:
|
|
with _connect(self._db_path) as conn:
|
|
cursor = conn.execute(
|
|
"DELETE FROM query_history WHERE id=?",
|
|
(query_id,),
|
|
)
|
|
conn.commit()
|
|
return cursor.rowcount > 0
|
|
|
|
def clear_all(self) -> int:
|
|
with _connect(self._db_path) as conn:
|
|
count = conn.execute("SELECT COUNT(*) FROM query_history").fetchone()[0]
|
|
conn.execute("DELETE FROM query_history")
|
|
conn.commit()
|
|
return count
|
|
|
|
def get_stats(self) -> dict:
|
|
with _connect(self._db_path) as conn:
|
|
row = conn.execute(
|
|
"SELECT COUNT(*) as total_queries, "
|
|
"COALESCE(AVG(total_time_ms), 0) as avg_time_ms, "
|
|
"COALESCE(AVG(chunks_retrieved_count), 0) as avg_chunks_retrieved, "
|
|
"COALESCE(AVG(chunks_filtered_count), 0) as avg_chunks_filtered "
|
|
"FROM query_history"
|
|
).fetchone()
|
|
|
|
profile_row = conn.execute(
|
|
"SELECT profile_used FROM query_history "
|
|
"WHERE profile_used IS NOT NULL "
|
|
"GROUP BY profile_used ORDER BY COUNT(*) DESC LIMIT 1"
|
|
).fetchone()
|
|
|
|
return {
|
|
"total_queries": row["total_queries"],
|
|
"avg_time_ms": row["avg_time_ms"],
|
|
"avg_chunks_retrieved": row["avg_chunks_retrieved"],
|
|
"avg_chunks_filtered": row["avg_chunks_filtered"],
|
|
"most_used_profile": profile_row["profile_used"] if profile_row else None,
|
|
}
|