diff --git a/CHANGELOG.md b/CHANGELOG.md index 9677991..7a0b2a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.54.5] — 2026-05-13 + +### Internal + +- **`get_analytics_db()` is a singleton — mirrors `get_system_db()`** (#163). Pre-fix the function opened a fresh `duckdb.connect()` on every call; most callers don't `.close()` the returned handle, so each leaked connection held a WAL ref + FD until GC kicked in. Under load this manifested as "too many open files" or DuckDB lock contention on the analytics DB. Singleton + cursor-per-call (matches the system-DB pattern) keeps one underlying connection alive while letting callers safely close the cursor handle. New `close_analytics_db()` mirrors `close_system_db()` (best-effort CHECKPOINT then close); both are wired into the FastAPI shutdown hook in `app/main.py`. `get_analytics_db_readonly()` deliberately stays per-call — each invocation re-ATTACHes extract.duckdb files into a fresh read-only context. 5 tests in `tests/test_analytics_db_singleton.py` pin the contract: cache, cursor-close-safe, DATA_DIR-change reopen, thread safety (16 concurrent calls share the singleton), close + reopen. + ## [0.54.4] — 2026-05-13 Three LOW hygiene fixes from the takeover-review on PR #276 (closed via #277). diff --git a/app/main.py b/app/main.py index a02a65c..31b0ca3 100644 --- a/app/main.py +++ b/app/main.py @@ -202,8 +202,9 @@ async def lifespan(app): get_posthog().shutdown() except Exception: logger.exception("PostHog shutdown failed") - from src.db import close_system_db + from src.db import close_analytics_db, close_system_db close_system_db() + close_analytics_db() def _is_truthy_env(name: str) -> bool: diff --git a/pyproject.toml b/pyproject.toml index 6304e98..f3a10e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.54.4" +version = "0.54.5" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/src/db.py b/src/db.py index 8b1b6f3..c1d0545 100644 --- a/src/db.py +++ b/src/db.py @@ -804,6 +804,18 @@ _system_db_lock = threading.Lock() _system_db_conn: duckdb.DuckDBPyConnection | None = None _system_db_path: str | None = None +# Mirror the system-DB singleton pattern for the analytics DB. Pre-#163, +# `get_analytics_db()` opened a fresh `duckdb.connect()` on every call — +# most callers don't `.close()` the returned handle, so each leaked +# connection held a WAL ref + FD until GC kicked in. Under load this +# manifested as "too many open files" or DuckDB lock contention on the +# analytics DB. Singleton + cursor-per-call (mirrors `get_system_db()` +# above) means callers that close the cursor only close the cursor — +# the underlying connection stays. +_analytics_db_lock = threading.Lock() +_analytics_db_conn: duckdb.DuckDBPyConnection | None = None +_analytics_db_path: str | None = None + def _get_data_dir() -> Path: return Path(os.environ.get("DATA_DIR", "./data")) @@ -905,10 +917,34 @@ def get_system_db() -> duckdb.DuckDBPyConnection: def get_analytics_db() -> duckdb.DuckDBPyConnection: - """Get a connection to the analytics database (parquet views).""" - db_path = _get_data_dir() / "analytics" / "server.duckdb" - db_path.parent.mkdir(parents=True, exist_ok=True) - return _maybe_instrument(duckdb.connect(str(db_path)), "analytics") + """Get a connection to the analytics database (parquet views). + + Singleton — mirrors `get_system_db()` above. Returns a cursor on the + shared connection so callers can `.close()` the handle without + closing the underlying connection. Re-opens transparently when + `DATA_DIR` changes (test fixtures that swap data dirs across cases). + + Pre-#163 this opened a fresh connection on every call and most + callers leaked it; see the rationale block at the module-level + `_analytics_db_*` globals. `get_analytics_db_readonly()` deliberately + stays per-call because each invocation re-ATTACHes extract.duckdb + files into a fresh read-only context. + """ + global _analytics_db_conn, _analytics_db_path + db_path = str(_get_data_dir() / "analytics" / "server.duckdb") + + with _analytics_db_lock: + if _analytics_db_conn is None or _analytics_db_path != db_path: + # Close stale connection if DATA_DIR changed (test fixtures) + if _analytics_db_conn is not None: + try: + _analytics_db_conn.close() + except Exception: + pass + Path(db_path).parent.mkdir(parents=True, exist_ok=True) + _analytics_db_conn = duckdb.connect(db_path) + _analytics_db_path = db_path + return _maybe_instrument(_analytics_db_conn.cursor(), "analytics") def _reattach_remote_extensions( @@ -3355,3 +3391,28 @@ def close_system_db() -> None: logger.debug("close_system_db: close raised (%s); ignoring", exc) _system_db_conn = None _system_db_path = None + + +def close_analytics_db() -> None: + """Close the shared analytics DB connection. Called on app shutdown. + + Mirrors `close_system_db()` above (best-effort CHECKPOINT then + close, swallow exceptions). Analytics DB is the parquet-views layer; + a dirty WAL on it is less consequential than on the system DB + (read-only views can be rebuilt by the orchestrator on next start) + but the CHECKPOINT keeps the file on-disk clean for any operator + poking at it with the duckdb CLI. + """ + global _analytics_db_conn, _analytics_db_path + if _analytics_db_conn: + try: + _analytics_db_conn.execute("CHECKPOINT") + logger.debug("close_analytics_db: CHECKPOINT ok") + except Exception as exc: + logger.warning("close_analytics_db: CHECKPOINT failed (%s); proceeding to close", exc) + try: + _analytics_db_conn.close() + except Exception as exc: + logger.debug("close_analytics_db: close raised (%s); ignoring", exc) + _analytics_db_conn = None + _analytics_db_path = None diff --git a/tests/test_analytics_db_singleton.py b/tests/test_analytics_db_singleton.py new file mode 100644 index 0000000..987f24e --- /dev/null +++ b/tests/test_analytics_db_singleton.py @@ -0,0 +1,153 @@ +"""`get_analytics_db()` is a singleton mirroring `get_system_db()` (#163). + +Pre-#163 every call opened a fresh `duckdb.connect()` — most callers +don't `.close()` the returned handle, so each leaked connection held a +WAL ref + FD until GC kicked in. Under load this manifested as "too +many open files" or DuckDB lock contention on the analytics DB. + +These tests pin the new contract so any regression to per-call +`duckdb.connect()` is loud: + +1. Two consecutive calls return cursors backed by the same connection. +2. Closing one cursor does NOT close the underlying connection. +3. `DATA_DIR` change → fresh connection on next call. +4. Concurrent calls don't race (the lock serializes init). +5. `close_analytics_db()` clears the singleton + a subsequent call + reopens cleanly. +""" + +from __future__ import annotations + +import threading + +import pytest + + +@pytest.fixture(autouse=True) +def _reset_singleton(monkeypatch, tmp_path): + """Each test gets its own DATA_DIR + clean singleton state. + + Reset both globals before AND after the test so a leak from a + previous test (this file or anywhere else in the suite) doesn't + pollute the case under inspection. + """ + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + import src.db as db_mod + db_mod._analytics_db_conn = None + db_mod._analytics_db_path = None + yield + db_mod._analytics_db_conn = None + db_mod._analytics_db_path = None + + +def test_get_analytics_db_caches_connection(): + """Two consecutive calls must share the same underlying DuckDB + connection object — not open a fresh one each time.""" + from src.db import get_analytics_db + import src.db as db_mod + cur1 = get_analytics_db() + cur2 = get_analytics_db() + # Cursors are different objects (DuckDB returns a fresh cursor each + # call) but they're both backed by `_analytics_db_conn` — only one + # underlying connection should have been opened. + assert db_mod._analytics_db_conn is not None + assert cur1 is not cur2 # cursors differ + # Sanity: both cursors execute against the same DB by writing + + # reading via the shared connection. + cur1.execute("CREATE TABLE singleton_probe (x INTEGER)") + cur2.execute("INSERT INTO singleton_probe VALUES (42)") + rows = cur1.execute("SELECT x FROM singleton_probe").fetchall() + assert rows == [(42,)] + + +def test_closing_cursor_does_not_close_connection(): + """The whole point of `.cursor()` indirection — close the cursor + handle, the underlying connection stays usable for the next call.""" + from src.db import get_analytics_db + import src.db as db_mod + cur1 = get_analytics_db() + cur1.execute("CREATE TABLE probe (x INTEGER)") + cur1.close() # caller is allowed to do this; mustn't break #2 call + # The connection itself must still be alive on the singleton. + assert db_mod._analytics_db_conn is not None + cur2 = get_analytics_db() + rows = cur2.execute("SELECT COUNT(*) FROM probe").fetchall() + assert rows == [(0,)] + + +def test_get_analytics_db_reopens_on_data_dir_change(tmp_path, monkeypatch): + """When DATA_DIR (the resolved path) changes, the singleton must + drop the old connection and open a fresh one against the new path. + This is the test-fixture path — production never moves DATA_DIR + mid-process, but pytest fixtures do.""" + import src.db as db_mod + from src.db import get_analytics_db + cur1 = get_analytics_db() + cur1.execute("CREATE TABLE marker_a (x INTEGER)") + conn_a = db_mod._analytics_db_conn + + # Move to a new DATA_DIR — singleton must reopen. + new_dir = tmp_path.parent / "alt-data" + new_dir.mkdir(exist_ok=True) + monkeypatch.setenv("DATA_DIR", str(new_dir)) + cur2 = get_analytics_db() + conn_b = db_mod._analytics_db_conn + + assert conn_a is not conn_b, "singleton should have reopened on DATA_DIR change" + # The new DB doesn't have marker_a — confirms it's a fresh DB at the new path. + with pytest.raises(Exception): + cur2.execute("SELECT * FROM marker_a") + + +def test_get_analytics_db_thread_safe(): + """Concurrent calls from N threads must produce exactly ONE + underlying connection (the lock serializes the init branch).""" + from src.db import get_analytics_db + import src.db as db_mod + + errors: list[BaseException] = [] + cursors: list = [] + + def worker(): + try: + cur = get_analytics_db() + cursors.append(cur) + except BaseException as e: + errors.append(e) + + threads = [threading.Thread(target=worker) for _ in range(16)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + + assert errors == [], errors + assert len(cursors) == 16 + # All cursors share one connection. + assert db_mod._analytics_db_conn is not None + # Any race-induced second connection would be re-assigned and the + # first would orphan; we can't probe that directly, but functionally + # all 16 threads must see the SAME singleton state. + cursors[0].execute("CREATE TABLE thread_probe (x INTEGER)") + rows = cursors[15].execute("SELECT COUNT(*) FROM thread_probe").fetchall() + assert rows == [(0,)], "16th thread's cursor doesn't see the 1st's table — race" + + +def test_close_analytics_db_clears_singleton_and_reopen_works(): + """Shutdown hook clears the singleton; a subsequent call after + re-init (test process keeps running) must reopen cleanly.""" + import src.db as db_mod + from src.db import close_analytics_db, get_analytics_db + cur1 = get_analytics_db() + cur1.execute("CREATE TABLE probe (x INTEGER)") + assert db_mod._analytics_db_conn is not None + + close_analytics_db() + assert db_mod._analytics_db_conn is None + assert db_mod._analytics_db_path is None + + # Re-open after close: fresh cursor, table from previous session + # PERSISTS on disk (we close, not nuke). + cur2 = get_analytics_db() + rows = cur2.execute("SELECT COUNT(*) FROM probe").fetchall() + assert rows == [(0,)]