agnes-the-ai-analyst/tests/test_duckdb_manager.py
Petr 8bb46a9e0a Add per-partition streaming sync and hybrid query architecture
Partitioned sync: iterates day-by-day instead of loading full dataset.
Each partition: query BQ -> stream to disk -> free RAM. Peak ~50 MB.
New helpers: _sync_single_partition, _cleanup_old_partitions, _generate_partition_dates.

Config: added partition_column_type (DATE/TIMESTAMP/DATETIME), query_mode (local/remote/hybrid).
DuckDB manager: hybrid architecture support (local Parquet + remote BQ tables).
Data sync: skips remote tables, filters by query_mode.

Tests: 113 passing (adapter, client, config, data_sync, duckdb_manager).
2026-03-12 13:20:41 +01:00

488 lines
16 KiB
Python

"""Tests for DuckDB Manager - query_mode classification and BQ registration.
Tests cover:
- _get_bq_project_from_table_id: extracting BQ project from table IDs
- get_remote_tables: filtering tables by query_mode
- register_bq_table: registering BQ query results in DuckDB
- init_duckdb: table classification by query_mode, local view creation,
remote table logging
"""
import os
from pathlib import Path
from unittest.mock import MagicMock, patch
import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from scripts.duckdb_manager import (
_get_bq_project_from_table_id,
get_remote_tables,
init_duckdb,
register_bq_table,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def tmp_project(tmp_path):
"""Create a minimal project layout with docs/data_description.md and a parquet file.
Returns (project_root, db_path, data_dir) tuple.
"""
docs_dir = tmp_path / "docs"
docs_dir.mkdir()
data_description = """\
# Data Description
```yaml
folder_mapping:
in.c-crm: crm_data
tables:
- id: "in.c-crm.company"
name: "company"
description: "Company master data"
primary_key: "id"
sync_strategy: "full_refresh"
```
"""
(docs_dir / "data_description.md").write_text(data_description)
# Create parquet directory and a minimal parquet file
data_dir = tmp_path / "server" / "parquet" / "crm_data"
data_dir.mkdir(parents=True)
table = pa.table({"id": [1, 2, 3], "name": ["a", "b", "c"]})
pq.write_table(table, data_dir / "company.parquet")
db_dir = tmp_path / "user" / "duckdb"
db_dir.mkdir(parents=True)
db_path = str(db_dir / "test.duckdb")
return tmp_path, db_path, str(tmp_path / "server")
@pytest.fixture
def tmp_project_mixed(tmp_path):
"""Project with local, remote, and hybrid tables."""
docs_dir = tmp_path / "docs"
docs_dir.mkdir()
data_description = """\
# Data Description
```yaml
folder_mapping:
in.c-crm: crm_data
tables:
- id: "in.c-crm.company"
name: "company"
description: "Local table"
primary_key: "id"
sync_strategy: "full_refresh"
- id: "prj-grp-dataview-prod-1ff9.finance.revenue"
name: "revenue"
description: "Remote BQ table"
primary_key: "id"
query_mode: "remote"
- id: "prj-grp-dataview-prod-1ff9.marketing.campaigns"
name: "campaigns"
description: "Hybrid table"
primary_key: "id"
sync_strategy: "full_refresh"
query_mode: "hybrid"
```
"""
(docs_dir / "data_description.md").write_text(data_description)
# Create parquet files for local and hybrid tables
crm_dir = tmp_path / "server" / "parquet" / "crm_data"
crm_dir.mkdir(parents=True)
table = pa.table({"id": [1, 2], "name": ["a", "b"]})
pq.write_table(table, crm_dir / "company.parquet")
marketing_dir = tmp_path / "server" / "parquet" / "prj-grp-dataview-prod-1ff9.marketing"
marketing_dir.mkdir(parents=True)
campaigns_table = pa.table({"id": [10], "campaign": ["test"]})
pq.write_table(campaigns_table, marketing_dir / "campaigns.parquet")
db_dir = tmp_path / "user" / "duckdb"
db_dir.mkdir(parents=True)
db_path = str(db_dir / "test.duckdb")
return tmp_path, db_path, str(tmp_path / "server")
@pytest.fixture
def tmp_project_remote_only(tmp_path):
"""Project with only remote tables (no local parquet needed)."""
docs_dir = tmp_path / "docs"
docs_dir.mkdir()
data_description = """\
# Data Description
```yaml
tables:
- id: "prj-grp-dataview-prod-1ff9.finance.revenue"
name: "revenue"
description: "Remote BQ table"
primary_key: "id"
query_mode: "remote"
- id: "prj-grp-dataview-prod-1ff9.finance.costs"
name: "costs"
description: "Remote BQ table"
primary_key: "id"
query_mode: "remote"
```
"""
(docs_dir / "data_description.md").write_text(data_description)
db_dir = tmp_path / "user" / "duckdb"
db_dir.mkdir(parents=True)
db_path = str(db_dir / "test.duckdb")
return tmp_path, db_path, str(tmp_path / "server")
# ---------------------------------------------------------------------------
# Tests: _get_bq_project_from_table_id
# ---------------------------------------------------------------------------
class TestGetBqProjectFromTableId:
"""Test extracting BigQuery project ID from fully-qualified table IDs."""
def test_valid_bq_table_id(self):
result = _get_bq_project_from_table_id(
"prj-grp-dataview-prod-1ff9.finance.table"
)
assert result == "prj-grp-dataview-prod-1ff9"
def test_valid_bq_table_id_different_project(self):
result = _get_bq_project_from_table_id(
"my-gcp-project.dataset_name.table_name"
)
assert result == "my-gcp-project"
def test_keboola_format_returns_none(self):
result = _get_bq_project_from_table_id("in.c-crm.table")
assert result is None
def test_two_part_id_returns_none(self):
result = _get_bq_project_from_table_id("dataset.table")
assert result is None
def test_single_part_returns_none(self):
result = _get_bq_project_from_table_id("table_only")
assert result is None
def test_four_parts_returns_none(self):
result = _get_bq_project_from_table_id("a-b.c.d.e")
assert result is None
def test_empty_string_returns_none(self):
result = _get_bq_project_from_table_id("")
assert result is None
def test_three_parts_no_hyphen_returns_none(self):
result = _get_bq_project_from_table_id("project.dataset.table")
assert result is None
def test_hyphen_in_first_part_is_key(self):
result = _get_bq_project_from_table_id("a-b.dataset.table")
assert result == "a-b"
# ---------------------------------------------------------------------------
# Tests: get_remote_tables
# ---------------------------------------------------------------------------
class TestGetRemoteTables:
"""Test filtering table configs by query_mode."""
def test_returns_remote_tables(self):
configs = [
{"name": "local", "query_mode": "local"},
{"name": "remote1", "query_mode": "remote"},
{"name": "hybrid1", "query_mode": "hybrid"},
]
result = get_remote_tables(configs)
names = [tc["name"] for tc in result]
assert "remote1" in names
assert "hybrid1" in names
assert "local" not in names
def test_returns_empty_when_all_local(self):
configs = [
{"name": "t1", "query_mode": "local"},
{"name": "t2"}, # defaults to local (no query_mode key)
]
result = get_remote_tables(configs)
assert result == []
def test_missing_query_mode_treated_as_local(self):
configs = [{"name": "t1"}] # no query_mode
result = get_remote_tables(configs)
assert result == []
# ---------------------------------------------------------------------------
# Tests: register_bq_table
# ---------------------------------------------------------------------------
class TestRegisterBqTable:
"""Test registering BQ query results as DuckDB views."""
@staticmethod
def _make_factory(arrow_table, side_effect=None):
"""Create a mock BQ client factory returning a client that yields arrow_table."""
mock_job = MagicMock()
if side_effect:
mock_job.to_arrow.side_effect = side_effect
else:
mock_job.to_arrow.return_value = arrow_table
mock_client = MagicMock()
mock_client.query.return_value = mock_job
factory = MagicMock(return_value=mock_client)
factory._mock_client = mock_client
factory._mock_job = mock_job
return factory
def test_registers_arrow_table_in_duckdb(self):
"""Result from BQ should be queryable in DuckDB after registration."""
arrow_table = pa.table({"id": [1, 2], "val": [10.0, 20.0]})
factory = self._make_factory(arrow_table)
conn = duckdb.connect()
rows = register_bq_table(
conn=conn,
table_id="proj.dataset.table",
view_name="test_view",
sql="SELECT * FROM table",
bq_project="test-project",
_bq_client_factory=factory,
)
assert rows == 2
result = conn.execute("SELECT SUM(val) FROM test_view").fetchone()[0]
assert result == 30.0
conn.close()
def test_raises_without_bq_project(self):
conn = duckdb.connect()
with patch.dict(os.environ, {}, clear=True):
with pytest.raises(ValueError, match="BigQuery project not set"):
register_bq_table(
conn=conn,
table_id="proj.ds.tbl",
view_name="v",
sql="SELECT 1",
)
conn.close()
def test_uses_env_var_when_no_project_arg(self):
arrow_table = pa.table({"x": [1]})
factory = self._make_factory(arrow_table)
conn = duckdb.connect()
with patch.dict(os.environ, {"BIGQUERY_PROJECT": "env-proj"}):
register_bq_table(
conn=conn,
table_id="p.d.t",
view_name="v",
sql="SELECT 1",
_bq_client_factory=factory,
)
factory.assert_called_once_with("env-proj")
conn.close()
def test_storage_api_fallback(self):
"""Falls back to REST when Storage API permission denied."""
arrow_table = pa.table({"x": [1]})
factory = self._make_factory(
arrow_table,
side_effect=[
Exception("PERMISSION_DENIED readsessions"),
arrow_table,
],
)
conn = duckdb.connect()
rows = register_bq_table(
conn=conn,
table_id="p.d.t",
view_name="v",
sql="SELECT 1",
bq_project="proj",
_bq_client_factory=factory,
)
assert rows == 1
factory._mock_job.to_arrow.assert_called_with(create_bqstorage_client=False)
conn.close()
# ---------------------------------------------------------------------------
# Tests: init_duckdb - table classification
# ---------------------------------------------------------------------------
class TestInitDuckdbClassification:
"""Test that tables are correctly classified by query_mode."""
def test_local_tables_create_parquet_views(self, tmp_project):
project_root, db_path, data_dir = tmp_project
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
result = init_duckdb(
db_path=db_path, data_dir=data_dir, verbose=False
)
assert result is True
conn = duckdb.connect(db_path, read_only=True)
tables = [row[0] for row in conn.execute("SHOW TABLES").fetchall()]
assert "company" in tables
row_count = conn.execute("SELECT COUNT(*) FROM company").fetchone()[0]
assert row_count == 3
conn.close()
def test_remote_tables_not_created_as_local_views(self, tmp_project_mixed):
project_root, db_path, data_dir = tmp_project_mixed
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
result = init_duckdb(
db_path=db_path, data_dir=data_dir, verbose=False
)
assert result is True
conn = duckdb.connect(db_path, read_only=True)
tables = [row[0] for row in conn.execute("SHOW TABLES").fetchall()]
assert "revenue" not in tables
assert "company" in tables
conn.close()
def test_hybrid_tables_create_local_views(self, tmp_project_mixed):
project_root, db_path, data_dir = tmp_project_mixed
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
result = init_duckdb(
db_path=db_path, data_dir=data_dir, verbose=False
)
assert result is True
conn = duckdb.connect(db_path, read_only=True)
tables = [row[0] for row in conn.execute("SHOW TABLES").fetchall()]
assert "campaigns" in tables
conn.close()
def test_default_query_mode_is_local(self, tmp_project):
project_root, db_path, data_dir = tmp_project
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
result = init_duckdb(
db_path=db_path, data_dir=data_dir, verbose=False
)
assert result is True
conn = duckdb.connect(db_path, read_only=True)
tables = [row[0] for row in conn.execute("SHOW TABLES").fetchall()]
assert "company" in tables
conn.close()
# ---------------------------------------------------------------------------
# Tests: init_duckdb - remote table logging
# ---------------------------------------------------------------------------
class TestInitDuckdbRemoteLogging:
"""Test that remote tables are logged correctly."""
def test_remote_tables_logged(self, tmp_project_remote_only, capsys):
project_root, db_path, data_dir = tmp_project_remote_only
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
init_duckdb(
db_path=db_path, data_dir=data_dir, verbose=True,
)
output = capsys.readouterr().out
assert "revenue" in output
assert "costs" in output
assert "[BQ]" in output
def test_remote_only_project_succeeds(self, tmp_project_remote_only):
project_root, db_path, data_dir = tmp_project_remote_only
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
result = init_duckdb(
db_path=db_path, data_dir=data_dir, verbose=False,
)
assert result is True
# ---------------------------------------------------------------------------
# Tests: init_duckdb - missing parquet handling
# ---------------------------------------------------------------------------
class TestInitDuckdbMissingParquet:
"""Test behavior when parquet files are missing."""
def test_missing_parquet_skips_view(self, tmp_path):
docs_dir = tmp_path / "docs"
docs_dir.mkdir()
data_description = """\
# Data Description
```yaml
tables:
- id: "in.c-crm.missing_table"
name: "missing_table"
description: "No parquet exists"
primary_key: "id"
sync_strategy: "full_refresh"
```
"""
(docs_dir / "data_description.md").write_text(data_description)
db_dir = tmp_path / "user" / "duckdb"
db_dir.mkdir(parents=True)
db_path = str(db_dir / "test.duckdb")
with patch("scripts.duckdb_manager.find_project_root", return_value=tmp_path):
result = init_duckdb(
db_path=db_path, data_dir=str(tmp_path / "server"), verbose=False
)
assert result is True
conn = duckdb.connect(db_path, read_only=True)
tables = [row[0] for row in conn.execute("SHOW TABLES").fetchall()]
assert "missing_table" not in tables
conn.close()
def test_remote_table_no_local_parquet_needed(self, tmp_project_remote_only):
project_root, db_path, data_dir = tmp_project_remote_only
with patch("scripts.duckdb_manager.find_project_root", return_value=project_root):
result = init_duckdb(
db_path=db_path, data_dir=data_dir, verbose=False,
)
assert result is True