Merge branch 'worktree-agent-afb2461f' into feature/v2-fastapi-duckdb-docker-cli

This commit is contained in:
ZdenekSrotyr 2026-04-12 11:15:35 +02:00
commit d70d645902
9 changed files with 1096 additions and 0 deletions

180
tests/test_cli_admin.py Normal file
View file

@ -0,0 +1,180 @@
"""Tests for da admin subcommands."""
import json
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
monkeypatch.setenv("DATA_DIR", str(tmp_path / "data"))
(tmp_path / "config").mkdir()
(tmp_path / "data").mkdir()
yield tmp_path
def _resp(status_code=200, json_data=None, text=""):
r = MagicMock()
r.status_code = status_code
r.json.return_value = json_data if json_data is not None else {}
r.text = text
return r
class TestListUsers:
def test_list_users_text(self):
users = [
{"email": "alice@x.com", "role": "admin", "id": "aaa00001"},
{"email": "bob@x.com", "role": "analyst", "id": "bbb00002"},
]
with patch("cli.commands.admin.api_get", return_value=_resp(200, users)):
result = runner.invoke(app, ["admin", "list-users"])
assert result.exit_code == 0
assert "alice@x.com" in result.output
assert "bob@x.com" in result.output
def test_list_users_json(self):
users = [{"email": "alice@x.com", "role": "admin", "id": "aaa00001"}]
with patch("cli.commands.admin.api_get", return_value=_resp(200, users)):
result = runner.invoke(app, ["admin", "list-users", "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data[0]["email"] == "alice@x.com"
def test_list_users_api_error(self):
with patch("cli.commands.admin.api_get", return_value=_resp(500, {"detail": "Server error"}, "Server error")):
result = runner.invoke(app, ["admin", "list-users"])
assert result.exit_code == 1
class TestAddUser:
def test_add_user_success(self):
created = {"email": "newuser@x.com", "id": "uid-1", "role": "analyst"}
with patch("cli.commands.admin.api_post", return_value=_resp(201, created)):
result = runner.invoke(app, ["admin", "add-user", "newuser@x.com", "--role", "analyst"])
assert result.exit_code == 0
assert "newuser@x.com" in result.output
def test_add_user_failure(self):
with patch("cli.commands.admin.api_post", return_value=_resp(400, {"detail": "Already exists"})):
result = runner.invoke(app, ["admin", "add-user", "dup@x.com"])
assert result.exit_code == 1
class TestRemoveUser:
def test_remove_user_success(self):
with patch("cli.commands.admin.api_delete", return_value=_resp(204)):
result = runner.invoke(app, ["admin", "remove-user", "uid-1"])
assert result.exit_code == 0
assert "removed" in result.output.lower()
def test_remove_user_not_found(self):
with patch("cli.commands.admin.api_delete", return_value=_resp(404, text="Not found")):
result = runner.invoke(app, ["admin", "remove-user", "nonexistent"])
assert result.exit_code == 1
class TestRegisterTable:
def test_register_table_success(self):
with patch("cli.commands.admin.api_post", return_value=_resp(201, {"id": "t1", "name": "orders"})):
result = runner.invoke(app, [
"admin", "register-table", "orders",
"--source-type", "keboola",
"--bucket", "in.c-crm",
"--query-mode", "local",
])
assert result.exit_code == 0
assert "Registered: orders" in result.output
def test_register_table_already_exists(self):
with patch("cli.commands.admin.api_post", return_value=_resp(409, {"detail": "exists"})):
result = runner.invoke(app, ["admin", "register-table", "orders"])
assert result.exit_code == 0
assert "Already exists" in result.output
def test_register_table_failure(self):
with patch("cli.commands.admin.api_post", return_value=_resp(500, {"detail": "error"})):
result = runner.invoke(app, ["admin", "register-table", "bad_table"])
assert result.exit_code == 1
class TestListTables:
def test_list_tables_text(self):
payload = {
"count": 2,
"tables": [
{"name": "orders", "source_type": "keboola", "query_mode": "local", "bucket": "in.c-crm"},
{"name": "customers", "source_type": "keboola", "query_mode": "local", "bucket": "in.c-crm"},
],
}
with patch("cli.commands.admin.api_get", return_value=_resp(200, payload)):
result = runner.invoke(app, ["admin", "list-tables"])
assert result.exit_code == 0
assert "Registered tables: 2" in result.output
assert "orders" in result.output
def test_list_tables_json(self):
payload = {"count": 1, "tables": [{"name": "orders"}]}
with patch("cli.commands.admin.api_get", return_value=_resp(200, payload)):
result = runner.invoke(app, ["admin", "list-tables", "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data["count"] == 1
class TestMetadataShow:
def test_metadata_show_columns(self):
payload = {
"columns": [
{"column_name": "id", "basetype": "INTEGER", "confidence": "high", "description": "PK"},
{"column_name": "name", "basetype": "VARCHAR", "confidence": "high", "description": ""},
]
}
with patch("cli.commands.admin.api_get", return_value=_resp(200, payload)):
result = runner.invoke(app, ["admin", "metadata-show", "orders"])
assert result.exit_code == 0
assert "id" in result.output
assert "name" in result.output
def test_metadata_show_json(self):
payload = {"columns": [{"column_name": "id", "basetype": "INTEGER"}]}
with patch("cli.commands.admin.api_get", return_value=_resp(200, payload)):
result = runner.invoke(app, ["admin", "metadata-show", "orders", "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert "columns" in data
def test_metadata_show_not_found(self):
with patch("cli.commands.admin.api_get", return_value=_resp(404, {"detail": "Not found"})):
result = runner.invoke(app, ["admin", "metadata-show", "nonexistent"])
assert result.exit_code == 1
class TestMetadataApply:
def test_metadata_apply_dry_run(self, tmp_path):
proposal = {
"tables": {
"orders": {
"columns": {
"id": {"basetype": "INTEGER", "description": "Primary key"},
}
}
}
}
proposal_file = tmp_path / "proposal.json"
proposal_file.write_text(json.dumps(proposal))
result = runner.invoke(app, ["admin", "metadata-apply", str(proposal_file), "--dry-run"])
assert result.exit_code == 0
assert "DRY RUN" in result.output
assert "orders.id" in result.output
def test_metadata_apply_file_not_found(self):
result = runner.invoke(app, ["admin", "metadata-apply", "/nonexistent/proposal.json"])
assert result.exit_code == 1
assert "not found" in result.output.lower()

124
tests/test_cli_analyst.py Normal file
View file

@ -0,0 +1,124 @@
"""Tests for da analyst setup/status commands."""
import json
import pytest
from pathlib import Path
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
(tmp_path / "config").mkdir()
yield tmp_path
def _httpx_resp(status_code=200, json_data=None):
r = MagicMock()
r.status_code = status_code
r.json.return_value = json_data if json_data is not None else {}
r.raise_for_status = MagicMock()
return r
class TestAnalystStatus:
def test_status_uninitialized(self, tmp_path):
"""Status shows 'no' for uninitialized workspace."""
workspace = tmp_path / "workspace"
workspace.mkdir()
result = runner.invoke(app, ["analyst", "status", "--workspace", str(workspace)])
assert result.exit_code == 0
assert "no" in result.output.lower() or "missing" in result.output.lower()
def test_status_initialized(self, tmp_path):
"""Status shows initialized when CLAUDE.md with marker exists."""
workspace = tmp_path / "workspace"
workspace.mkdir()
(workspace / "CLAUDE.md").write_text("# AI Data Analyst\nHello")
(workspace / "data" / "parquet").mkdir(parents=True)
(workspace / "data" / "metadata").mkdir(parents=True)
result = runner.invoke(app, ["analyst", "status", "--workspace", str(workspace)])
assert result.exit_code == 0
assert "yes" in result.output.lower() or "initialized" in result.output.lower()
def test_status_json_output(self, tmp_path):
"""--json flag produces valid JSON with expected keys."""
workspace = tmp_path / "workspace"
workspace.mkdir()
result = runner.invoke(app, ["analyst", "status", "--workspace", str(workspace), "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert "initialized" in data
assert "freshness" in data
assert "parquet_tables" in data
def test_status_fresh_data(self, tmp_path):
"""Status shows 'fresh' when last_sync is recent."""
from datetime import datetime, timezone
workspace = tmp_path / "workspace"
workspace.mkdir()
(workspace / "CLAUDE.md").write_text("# AI Data Analyst\n")
meta_dir = workspace / "data" / "metadata"
meta_dir.mkdir(parents=True)
(meta_dir / "last_sync.json").write_text(
json.dumps({"synced_at": datetime.now(timezone.utc).isoformat()})
)
result = runner.invoke(app, ["analyst", "status", "--workspace", str(workspace), "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data["freshness"] == "fresh"
def test_status_stale_data(self, tmp_path):
"""Status shows 'stale' when last_sync is >24 h ago."""
from datetime import datetime, timezone, timedelta
workspace = tmp_path / "workspace"
workspace.mkdir()
(workspace / "CLAUDE.md").write_text("# AI Data Analyst\n")
meta_dir = workspace / "data" / "metadata"
meta_dir.mkdir(parents=True)
old_ts = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
(meta_dir / "last_sync.json").write_text(json.dumps({"synced_at": old_ts}))
result = runner.invoke(app, ["analyst", "status", "--workspace", str(workspace), "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data["freshness"] == "stale"
class TestAnalystSetup:
def test_setup_existing_workspace_blocked(self, tmp_path):
"""Setup fails if workspace already initialized and --force not given."""
workspace = tmp_path / "workspace"
workspace.mkdir()
(workspace / "CLAUDE.md").write_text("# AI Data Analyst\nInitialized")
result = runner.invoke(app, [
"analyst", "setup", "--server-url", "http://server", "--workspace", str(workspace),
])
assert result.exit_code == 1
assert "force" in result.output.lower() or "existing" in result.output.lower()
def test_setup_server_unreachable(self, tmp_path):
"""Setup exits cleanly when server cannot be reached.
httpx is imported inside _connect_to_instance, so patch the module reference
that the function will use at call time.
"""
workspace = tmp_path / "workspace"
workspace.mkdir()
import httpx as _httpx
mock_httpx = MagicMock(spec=_httpx)
mock_httpx.get.side_effect = Exception("Connection refused")
with patch.dict("sys.modules", {"httpx": mock_httpx}):
result = runner.invoke(
app,
["analyst", "setup", "--server-url", "http://unreachable:9999",
"--workspace", str(workspace)],
)
assert result.exit_code == 1
assert "Cannot reach" in result.output

99
tests/test_cli_auth.py Normal file
View file

@ -0,0 +1,99 @@
"""Tests for da auth login/logout/whoami commands."""
import json
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
monkeypatch.setenv("DA_LOCAL_DIR", str(tmp_path / "local"))
(tmp_path / "config").mkdir()
(tmp_path / "local").mkdir()
yield tmp_path
def _make_response(status_code=200, json_data=None, text=""):
resp = MagicMock()
resp.status_code = status_code
resp.json.return_value = json_data or {}
resp.text = text
return resp
class TestAuthLogin:
def test_login_success(self):
"""Login with valid credentials saves token and shows confirmation."""
mock_resp = _make_response(200, {
"access_token": "tok123",
"email": "alice@example.com",
"role": "analyst",
})
with patch("cli.commands.auth.api_post", return_value=mock_resp):
with patch("cli.commands.auth.save_token") as mock_save:
result = runner.invoke(app, ["auth", "login", "--email", "alice@example.com"])
assert result.exit_code == 0
assert "alice@example.com" in result.output
mock_save.assert_called_once_with("tok123", "alice@example.com", "analyst")
def test_login_invalid_credentials(self):
"""Login with bad credentials exits with error."""
mock_resp = _make_response(401, {"detail": "Invalid credentials"})
with patch("cli.commands.auth.api_post", return_value=mock_resp):
result = runner.invoke(app, ["auth", "login", "--email", "bad@example.com"])
assert result.exit_code == 1
assert "Login failed" in result.output
def test_login_connection_error(self):
"""Login propagates connection errors cleanly."""
with patch("cli.commands.auth.api_post", side_effect=Exception("Connection refused")):
result = runner.invoke(app, ["auth", "login", "--email", "alice@example.com"])
assert result.exit_code == 1
assert "Connection error" in result.output
class TestAuthLogout:
def test_logout(self):
"""Logout clears token and confirms."""
with patch("cli.commands.auth.clear_token") as mock_clear:
result = runner.invoke(app, ["auth", "logout"])
assert result.exit_code == 0
assert "Logged out" in result.output
mock_clear.assert_called_once()
class TestAuthWhoami:
def test_whoami_no_token(self):
"""Whoami exits when no token is stored."""
with patch("cli.commands.auth.get_token", return_value=None):
result = runner.invoke(app, ["auth", "whoami"])
assert result.exit_code == 1
assert "Not logged in" in result.output
def test_whoami_valid_token(self):
"""Whoami decodes JWT and shows user info."""
import jwt as pyjwt
token = pyjwt.encode(
{"email": "alice@example.com", "role": "analyst"},
"secret",
algorithm="HS256",
)
with patch("cli.commands.auth.get_token", return_value=token):
with patch("cli.commands.auth.get_server_url", return_value="http://localhost:8000"):
result = runner.invoke(app, ["auth", "whoami"])
assert result.exit_code == 0
assert "alice@example.com" in result.output
assert "analyst" in result.output
def test_whoami_invalid_token(self):
"""Whoami with garbled token exits with error."""
with patch("cli.commands.auth.get_token", return_value="not.a.jwt"):
result = runner.invoke(app, ["auth", "whoami"])
# May succeed or fail depending on jwt decode — either way no traceback
assert result.exit_code in (0, 1)

View file

@ -0,0 +1,98 @@
"""Tests for da diagnose command."""
import json
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
(tmp_path / "config").mkdir()
yield tmp_path
def _resp(status_code=200, json_data=None):
r = MagicMock()
r.status_code = status_code
r.json.return_value = json_data if json_data is not None else {}
r.elapsed = MagicMock()
r.elapsed.total_seconds.return_value = 0.042
return r
HEALTHY_HEALTH = {
"status": "ok",
"instance_name": "Test Instance",
"services": {
"duckdb": {"status": "ok", "tables": 5},
"scheduler": {"status": "ok"},
},
}
class TestDiagnoseText:
def test_diagnose_healthy(self):
"""Diagnose healthy system shows 'healthy' overall."""
with patch("cli.commands.diagnose.api_get", return_value=_resp(200, HEALTHY_HEALTH)):
result = runner.invoke(app, ["diagnose"])
assert result.exit_code == 0
assert "healthy" in result.output.lower()
assert "api" in result.output
assert "duckdb" in result.output
def test_diagnose_api_unreachable(self):
"""Diagnose marks overall as unhealthy when API is down."""
with patch("cli.commands.diagnose.api_get", side_effect=Exception("Connection refused")):
result = runner.invoke(app, ["diagnose"])
assert result.exit_code == 0
assert "unhealthy" in result.output.lower()
assert "api" in result.output
assert "Server unreachable" in result.output or "Suggested actions" in result.output
def test_diagnose_warning_service(self):
"""Diagnose shows 'degraded' when a service reports warning."""
health = {
"services": {
"duckdb": {"status": "warning", "stale_tables": ["orders"]},
}
}
with patch("cli.commands.diagnose.api_get", return_value=_resp(200, health)):
result = runner.invoke(app, ["diagnose"])
assert result.exit_code == 0
assert "degraded" in result.output.lower()
class TestDiagnoseJson:
def test_diagnose_json_output(self):
"""--json flag produces parseable JSON with expected structure."""
with patch("cli.commands.diagnose.api_get", return_value=_resp(200, HEALTHY_HEALTH)):
result = runner.invoke(app, ["diagnose", "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert "overall" in data
assert "checks" in data
assert "suggested_actions" in data
def test_diagnose_json_api_down(self):
"""--json flag still emits valid JSON when API is unreachable."""
with patch("cli.commands.diagnose.api_get", side_effect=Exception("timeout")):
result = runner.invoke(app, ["diagnose", "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data["overall"] == "unhealthy"
assert any(c["name"] == "api" and c["status"] == "error" for c in data["checks"])
def test_diagnose_json_has_latency(self):
"""Healthy API check includes latency_ms in JSON output."""
with patch("cli.commands.diagnose.api_get", return_value=_resp(200, HEALTHY_HEALTH)):
result = runner.invoke(app, ["diagnose", "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
api_check = next(c for c in data["checks"] if c["name"] == "api")
assert "latency_ms" in api_check

111
tests/test_cli_explore.py Normal file
View file

@ -0,0 +1,111 @@
"""Tests for da explore command."""
import json
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
monkeypatch.setenv("DA_LOCAL_DIR", str(tmp_path / "local"))
(tmp_path / "config").mkdir()
(tmp_path / "local").mkdir()
yield tmp_path
def _resp(status_code=200, json_data=None, text=""):
r = MagicMock()
r.status_code = status_code
r.json.return_value = json_data if json_data is not None else {}
r.text = text
return r
def _make_local_db(tmp_config):
"""Create a local DuckDB with a sample table for exploration tests."""
import duckdb
db_dir = tmp_config / "local" / "user" / "duckdb"
db_dir.mkdir(parents=True)
conn = duckdb.connect(str(db_dir / "analytics.duckdb"))
conn.execute("CREATE TABLE orders (id INTEGER, amount DOUBLE, status VARCHAR)")
conn.executemany("INSERT INTO orders VALUES (?, ?, ?)", [
(1, 99.5, "shipped"),
(2, 200.0, "pending"),
(3, 50.0, "shipped"),
])
conn.close()
return db_dir / "analytics.duckdb"
class TestExploreLocal:
def test_explore_existing_table(self, tmp_config):
"""Exploring an existing local table shows row count and columns."""
_make_local_db(tmp_config)
result = runner.invoke(app, ["explore", "orders"])
assert result.exit_code == 0
assert "orders" in result.output
assert "3" in result.output # row count
def test_explore_no_db(self, tmp_config):
"""Exploring without local DB exits with guidance."""
result = runner.invoke(app, ["explore", "orders"])
assert result.exit_code == 1
assert "not found" in result.output.lower() or "sync" in result.output.lower()
def test_explore_missing_table(self, tmp_config):
"""Exploring a non-existent table exits with error."""
_make_local_db(tmp_config)
result = runner.invoke(app, ["explore", "nonexistent_xyz"])
assert result.exit_code == 1
assert "not found" in result.output.lower() or "nonexistent" in result.output.lower()
def test_explore_json_flag(self, tmp_config):
"""--json flag produces valid JSON with table info.
Note: with explore's callback pattern, options must precede positional args.
"""
_make_local_db(tmp_config)
result = runner.invoke(app, ["explore", "--json", "orders"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data["table"] == "orders"
assert data["row_count"] == 3
assert len(data["columns"]) == 3
assert len(data["sample_rows"]) <= 5
class TestExploreRemote:
def test_explore_remote_success(self):
"""--remote flag fetches catalog profile from server.
api_get is imported inside _explore_remote so mock the source module.
"""
profile = {
"table": "orders",
"row_count": 1000,
"columns": [{"name": "id"}, {"name": "amount"}],
}
with patch("cli.client.api_get", return_value=_resp(200, profile)):
result = runner.invoke(app, ["explore", "--remote", "orders"])
assert result.exit_code == 0
assert "orders" in result.output
def test_explore_remote_not_found(self):
"""--remote with unknown table exits with error."""
with patch("cli.client.api_get", return_value=_resp(404, {"detail": "Not found"}, "Not found")):
result = runner.invoke(app, ["explore", "--remote", "unknown_table"])
assert result.exit_code == 1
assert "not found" in result.output.lower() or "Profile not found" in result.output
def test_explore_remote_json_flag(self):
"""--remote --json outputs raw API JSON."""
profile = {"table": "orders", "row_count": 500}
with patch("cli.client.api_get", return_value=_resp(200, profile)):
result = runner.invoke(app, ["explore", "--remote", "--json", "orders"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data["table"] == "orders"

130
tests/test_cli_metrics.py Normal file
View file

@ -0,0 +1,130 @@
"""Tests for da metrics list/show commands."""
import json
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
monkeypatch.setenv("DATA_DIR", str(tmp_path / "data"))
(tmp_path / "config").mkdir()
(tmp_path / "data").mkdir()
yield tmp_path
def _resp(status_code=200, json_data=None):
r = MagicMock()
r.status_code = status_code
r.json.return_value = json_data if json_data is not None else {}
return r
METRICS_LIST = [
{"id": "revenue/mrr", "name": "mrr", "display_name": "Monthly Recurring Revenue",
"category": "revenue", "unit": "USD"},
{"id": "revenue/arr", "name": "arr", "display_name": "Annual Recurring Revenue",
"category": "revenue", "unit": "USD"},
{"id": "product/dau", "name": "dau", "display_name": "Daily Active Users",
"category": "product", "unit": "users"},
]
MRR_DETAIL = {
"id": "revenue/mrr",
"name": "mrr",
"display_name": "Monthly Recurring Revenue",
"category": "revenue",
"type": "sum",
"unit": "USD",
"grain": "monthly",
"table_name": "subscriptions",
"sql": "SELECT SUM(amount) FROM subscriptions WHERE status='active'",
"description": "Total monthly recurring revenue from active subscriptions.",
"synonyms": ["MRR", "monthly revenue"],
}
class TestMetricsList:
def test_list_metrics_text(self):
"""list command groups metrics by category."""
with patch("cli.commands.metrics.api_get", return_value=_resp(200, METRICS_LIST)):
result = runner.invoke(app, ["metrics", "list"])
assert result.exit_code == 0
assert "revenue" in result.output
assert "mrr" in result.output
assert "dau" in result.output
def test_list_metrics_json(self):
"""--json flag outputs raw metric list as JSON."""
with patch("cli.commands.metrics.api_get", return_value=_resp(200, METRICS_LIST)):
result = runner.invoke(app, ["metrics", "list", "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert len(data) == 3
assert data[0]["id"] == "revenue/mrr"
def test_list_metrics_empty(self):
"""Empty metric list shows 'No metrics found'."""
with patch("cli.commands.metrics.api_get", return_value=_resp(200, [])):
result = runner.invoke(app, ["metrics", "list"])
assert result.exit_code == 0
assert "No metrics" in result.output
def test_list_metrics_api_failure(self):
"""API error exits with non-zero code."""
with patch("cli.commands.metrics.api_get", return_value=_resp(500, {"detail": "Server error"})):
result = runner.invoke(app, ["metrics", "list"])
assert result.exit_code == 1
def test_list_metrics_category_filter_passed(self):
"""--category passes query param to API."""
with patch("cli.commands.metrics.api_get", return_value=_resp(200, [])) as mock_get:
runner.invoke(app, ["metrics", "list", "--category", "revenue"])
mock_get.assert_called_once_with("/api/metrics", params={"category": "revenue"})
def test_list_metrics_dict_response(self):
"""Response wrapped in {metrics: [...]} dict is handled."""
wrapped = {"metrics": METRICS_LIST}
with patch("cli.commands.metrics.api_get", return_value=_resp(200, wrapped)):
result = runner.invoke(app, ["metrics", "list"])
assert result.exit_code == 0
assert "mrr" in result.output
class TestMetricsShow:
def test_show_metric_text(self):
"""show command displays metric details in text format."""
with patch("cli.commands.metrics.api_get", return_value=_resp(200, MRR_DETAIL)):
result = runner.invoke(app, ["metrics", "show", "revenue/mrr"])
assert result.exit_code == 0
assert "Monthly Recurring Revenue" in result.output
assert "SELECT SUM(amount)" in result.output
assert "subscriptions" in result.output
def test_show_metric_json(self):
"""--json flag outputs full metric detail as JSON."""
with patch("cli.commands.metrics.api_get", return_value=_resp(200, MRR_DETAIL)):
result = runner.invoke(app, ["metrics", "show", "revenue/mrr", "--json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data["id"] == "revenue/mrr"
assert "sql" in data
def test_show_metric_not_found(self):
"""Missing metric ID exits with 404 error."""
with patch("cli.commands.metrics.api_get", return_value=_resp(404, {})):
result = runner.invoke(app, ["metrics", "show", "nonexistent/metric"])
assert result.exit_code == 1
assert "not found" in result.output.lower()
def test_show_metric_api_error(self):
"""Non-404 API error also exits with error."""
with patch("cli.commands.metrics.api_get", return_value=_resp(500, {"detail": "error"})):
result = runner.invoke(app, ["metrics", "show", "revenue/mrr"])
assert result.exit_code == 1

131
tests/test_cli_query.py Normal file
View file

@ -0,0 +1,131 @@
"""Tests for da query command."""
import json
import pytest
from unittest.mock import patch, MagicMock
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
monkeypatch.setenv("DA_LOCAL_DIR", str(tmp_path / "local"))
(tmp_path / "config").mkdir()
(tmp_path / "local").mkdir()
yield tmp_path
def _resp(status_code=200, json_data=None, text=""):
r = MagicMock()
r.status_code = status_code
r.json.return_value = json_data if json_data is not None else {}
r.text = text
return r
class TestRemoteQuery:
def test_remote_query_success(self):
"""--remote sends SQL to server and prints results."""
# api_post is imported inside _query_remote so mock the source module
payload = {"columns": ["id", "name"], "rows": [[1, "Alice"]], "truncated": False}
with patch("cli.client.api_post", return_value=_resp(200, payload)):
result = runner.invoke(app, ["query", "SELECT * FROM users", "--remote"])
assert result.exit_code == 0
def test_remote_query_failure(self):
"""--remote prints error message on API failure."""
with patch("cli.client.api_post", return_value=_resp(400, {"detail": "bad SQL"})):
result = runner.invoke(app, ["query", "SELECT bad", "--remote"])
assert result.exit_code == 1
assert "Query failed" in result.output
def test_remote_query_truncated(self):
"""Truncated result shows warning."""
payload = {"columns": ["id"], "rows": [[i] for i in range(5)], "truncated": True}
with patch("cli.client.api_post", return_value=_resp(200, payload)):
result = runner.invoke(app, ["query", "SELECT id FROM t", "--remote", "--limit", "5"])
assert result.exit_code == 0
assert "truncated" in result.output
class TestLocalQuery:
def test_local_query_no_db(self, tmp_config):
"""Local query without DuckDB exits with guidance."""
result = runner.invoke(app, ["query", "SELECT 1"])
assert result.exit_code == 1
assert "not found" in result.output.lower()
def test_local_query_with_real_db(self, tmp_config):
"""Local query executes against real DuckDB."""
import duckdb
db_dir = tmp_config / "local" / "user" / "duckdb"
db_dir.mkdir(parents=True)
conn = duckdb.connect(str(db_dir / "analytics.duckdb"))
conn.execute("CREATE TABLE nums (n INTEGER)")
conn.execute("INSERT INTO nums VALUES (1), (2), (3)")
conn.close()
result = runner.invoke(app, ["query", "SELECT SUM(n) as total FROM nums", "--format", "json"])
assert result.exit_code == 0
data = json.loads(result.output)
assert data[0]["total"] == 6
def test_local_query_csv_format(self, tmp_config):
"""--format csv produces CSV output."""
import duckdb
db_dir = tmp_config / "local" / "user" / "duckdb"
db_dir.mkdir(parents=True)
conn = duckdb.connect(str(db_dir / "analytics.duckdb"))
conn.execute("CREATE TABLE t (a INTEGER, b VARCHAR)")
conn.execute("INSERT INTO t VALUES (1, 'x')")
conn.close()
result = runner.invoke(app, ["query", "SELECT a, b FROM t", "--format", "csv"])
assert result.exit_code == 0
lines = result.output.strip().splitlines()
assert lines[0] == "a,b"
assert "1,x" in lines[1]
def test_local_query_table_format(self, tmp_config):
"""Default table format renders without crash."""
import duckdb
db_dir = tmp_config / "local" / "user" / "duckdb"
db_dir.mkdir(parents=True)
conn = duckdb.connect(str(db_dir / "analytics.duckdb"))
conn.execute("CREATE TABLE t (id INTEGER)")
conn.execute("INSERT INTO t VALUES (42)")
conn.close()
result = runner.invoke(app, ["query", "SELECT id FROM t"])
assert result.exit_code == 0
assert "42" in result.output
def test_local_query_limit(self, tmp_config):
"""--limit restricts rows returned."""
import duckdb
db_dir = tmp_config / "local" / "user" / "duckdb"
db_dir.mkdir(parents=True)
conn = duckdb.connect(str(db_dir / "analytics.duckdb"))
conn.execute("CREATE TABLE big (n INTEGER)")
conn.executemany("INSERT INTO big VALUES (?)", [(i,) for i in range(100)])
conn.close()
result = runner.invoke(app, ["query", "SELECT n FROM big", "--format", "json", "--limit", "5"])
assert result.exit_code == 0
data = json.loads(result.output)
assert len(data) == 5
def test_local_query_sql_error(self, tmp_config):
"""SQL syntax error exits with error."""
import duckdb
db_dir = tmp_config / "local" / "user" / "duckdb"
db_dir.mkdir(parents=True)
duckdb.connect(str(db_dir / "analytics.duckdb")).close()
result = runner.invoke(app, ["query", "SELECT * FROM nonexistent_table_xyz"])
assert result.exit_code == 1
assert "Query error" in result.output

107
tests/test_cli_server.py Normal file
View file

@ -0,0 +1,107 @@
"""Tests for da server subcommands (delegate to subprocess)."""
import subprocess
import pytest
from unittest.mock import patch, MagicMock, call
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
def _subprocess_result(returncode=0):
r = MagicMock()
r.returncode = returncode
return r
class TestServerStatus:
def test_server_status_runs_docker_compose_ps(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "status"])
assert result.exit_code == 0
cmd = mock_run.call_args[0][0]
assert "docker compose ps" in cmd
def test_server_status_nonzero_exit(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(1)):
result = runner.invoke(app, ["server", "status"])
assert result.exit_code != 0
class TestServerLogs:
def test_server_logs_default_service(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "logs"])
assert result.exit_code == 0
cmd = mock_run.call_args[0][0]
assert "docker compose logs" in cmd
assert "app" in cmd
def test_server_logs_custom_service(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "logs", "scheduler"])
assert result.exit_code == 0
cmd = mock_run.call_args[0][0]
assert "scheduler" in cmd
def test_server_logs_with_tail(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "logs", "--tail", "50"])
assert result.exit_code == 0
cmd = mock_run.call_args[0][0]
assert "50" in cmd
class TestServerRestart:
def test_server_restart_default_service(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "restart"])
assert result.exit_code == 0
cmd = mock_run.call_args[0][0]
assert "docker compose restart" in cmd
assert "app" in cmd
assert "Restarted" in result.output
def test_server_restart_named_service(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "restart", "scheduler"])
assert result.exit_code == 0
cmd = mock_run.call_args[0][0]
assert "scheduler" in cmd
class TestServerDeploy:
def test_server_deploy_production(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "deploy"])
assert result.exit_code == 0
cmd = mock_run.call_args[0][0]
assert "kamal deploy" in cmd
def test_server_deploy_staging(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "deploy", "--staging"])
assert result.exit_code == 0
cmd = mock_run.call_args[0][0]
assert "staging" in cmd
class TestServerRollback:
def test_server_rollback(self):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "rollback"])
assert result.exit_code == 0
cmd = mock_run.call_args[0][0]
assert "rollback" in cmd
class TestServerBackup:
def test_server_backup(self, tmp_path):
with patch("cli.commands.server.subprocess.run", return_value=_subprocess_result(0)) as mock_run:
result = runner.invoke(app, ["server", "backup", "--output", str(tmp_path)])
assert result.exit_code == 0
assert "Backup saved" in result.output
cmd = mock_run.call_args[0][0]
assert "docker compose cp" in cmd

116
tests/test_cli_sync.py Normal file
View file

@ -0,0 +1,116 @@
"""Tests for da sync command."""
import json
import pytest
from unittest.mock import patch, MagicMock, call
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_config(tmp_path, monkeypatch):
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
monkeypatch.setenv("DA_LOCAL_DIR", str(tmp_path / "local"))
(tmp_path / "config").mkdir()
(tmp_path / "local").mkdir()
yield tmp_path
def _resp(status_code=200, json_data=None):
r = MagicMock()
r.status_code = status_code
r.json.return_value = json_data if json_data is not None else {}
r.raise_for_status = MagicMock()
return r
MANIFEST = {
"tables": {
"orders": {"hash": "abc123", "rows": 100, "size_bytes": 2048},
"customers": {"hash": "def456", "rows": 50, "size_bytes": 1024},
}
}
class TestSyncHappyPath:
def test_sync_downloads_all_tables(self, tmp_config):
"""Sync with no local state downloads all tables."""
with patch("cli.commands.sync.api_get", return_value=_resp(200, MANIFEST)):
with patch("cli.commands.sync.stream_download") as mock_dl:
with patch("cli.commands.sync._rebuild_duckdb_views"):
result = runner.invoke(app, ["sync"])
assert result.exit_code == 0
assert mock_dl.call_count == 2
assert "Downloaded: 2" in result.output
def test_sync_specific_table(self, tmp_config):
"""--table flag limits download to one table."""
with patch("cli.commands.sync.api_get", return_value=_resp(200, MANIFEST)):
with patch("cli.commands.sync.stream_download") as mock_dl:
with patch("cli.commands.sync._rebuild_duckdb_views"):
result = runner.invoke(app, ["sync", "--table", "orders"])
assert result.exit_code == 0
assert mock_dl.call_count == 1
call_path = mock_dl.call_args[0][0]
assert "orders" in call_path
def test_sync_json_output(self, tmp_config):
"""--json flag produces valid JSON output (rich spinner may precede JSON)."""
with patch("cli.commands.sync.api_get", return_value=_resp(200, MANIFEST)):
with patch("cli.commands.sync.stream_download"):
with patch("cli.commands.sync._rebuild_duckdb_views"):
result = runner.invoke(app, ["sync", "--json"])
assert result.exit_code == 0
# Rich Progress may output a spinner line before the JSON block
output = result.output
json_start = output.find("{")
assert json_start >= 0, f"No JSON found in output: {output!r}"
data = json.loads(output[json_start:])
assert "downloaded" in data
assert "errors" in data
def test_sync_upload_only(self, tmp_config):
"""--upload-only skips download and calls upload."""
with patch("cli.commands.sync.api_post", return_value=_resp(200)):
result = runner.invoke(app, ["sync", "--upload-only"])
assert result.exit_code == 0
assert "session" in result.output.lower() or "upload" in result.output.lower()
class TestSyncErrors:
def test_sync_manifest_failure(self, tmp_config):
"""Manifest fetch failure exits with error."""
r = _resp(500)
r.raise_for_status.side_effect = Exception("Server error")
with patch("cli.commands.sync.api_get", return_value=r):
result = runner.invoke(app, ["sync"])
assert result.exit_code == 1
assert "Failed to fetch manifest" in result.output
def test_sync_download_error_recorded(self, tmp_config):
"""Download error is recorded in results but does not abort sync."""
with patch("cli.commands.sync.api_get", return_value=_resp(200, MANIFEST)):
with patch("cli.commands.sync.stream_download", side_effect=Exception("timeout")):
result = runner.invoke(app, ["sync"])
assert result.exit_code == 0
assert "Errors" in result.output
def test_sync_skips_unchanged_tables(self, tmp_config, monkeypatch):
"""Tables with matching hashes are not re-downloaded."""
state = {
"tables": {
"orders": {"hash": "abc123"},
"customers": {"hash": "def456"},
}
}
with patch("cli.commands.sync.get_sync_state", return_value=state):
with patch("cli.commands.sync.api_get", return_value=_resp(200, MANIFEST)):
with patch("cli.commands.sync.stream_download") as mock_dl:
result = runner.invoke(app, ["sync"])
assert result.exit_code == 0
# Nothing to download — both hashes match
assert mock_dl.call_count == 0
assert "Downloaded: 0" in result.output