docs: add implementation plans for porting internal features

Three independent plans following TDD approach:
1. Business metrics (10 tasks) — schema v4, repository, CLI, API, starter pack, profiler integration
2. Analyst bootstrap (4 tasks) — da analyst setup, CLAUDE.md template, freshness check
3. Metadata writer (4 tasks) — column metadata repo, CLI, API, Keboola push

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
ZdenekSrotyr 2026-04-10 19:08:55 +02:00
parent c57e195932
commit 06ac937f8b
3 changed files with 2977 additions and 0 deletions

View file

@ -0,0 +1,602 @@
# Analyst Bootstrap Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add `da analyst setup` command that onboards analysts to a remote Agnes instance — connects, downloads data, initializes local DuckDB, generates CLAUDE.md.
**Architecture:** New top-level Typer command `da analyst` with `setup` subcommand. Uses existing `cli/client.py` HTTP helpers. Generates CLAUDE.md from template with instance-specific placeholders.
**Tech Stack:** Typer, httpx (via cli/client.py), DuckDB, Rich (progress bars), Jinja2-style string substitution
**Spec:** `docs/superpowers/specs/2026-04-10-porting-internal-features-design.md` — Section 2
**Depends on:** Business Metrics plan (Task 5 — metrics API must exist for Step 4)
---
### Task 1: CLAUDE.md Template
**Files:**
- Create: `config/claude_md_template.txt`
- [ ] **Step 1: Create the template file**
Create `config/claude_md_template.txt`:
```
# {instance_name} — AI Data Analyst
This workspace is connected to {server_url}.
## Rules
- Before computing any business metric: run `da metrics show <category>/<name>`
- For current schema: read `data/metadata/schema.json`
- Do not use DESCRIBE/SHOW COLUMNS — read metadata files instead
- Save work output to `user/artifacts/`
- Sync data regularly with `da sync`
## Metrics Workflow
1. `da metrics list` — find the relevant metric
2. `da metrics show revenue/mrr` — read SQL and business rules
3. Use the canonical SQL from the metric definition, adapt to the question
4. Never invent metric calculations — always check existing definitions first
## Data Sync
- `da sync` — download current data from server
- `da sync --docs-only` — just metadata and metrics (fast refresh)
- `da sync --upload-only` — upload sessions and local notes to server
- Data on the server refreshes every {sync_interval}
## Directory Structure
- `data/` — read-only data downloaded from server
- `data/parquet/` — table data in Parquet format
- `data/duckdb/` — local analytics DuckDB database
- `data/metadata/` — profiles, schema, metrics cache
- `user/` — your workspace (persistent across syncs)
- `user/artifacts/` — analysis outputs, reports, charts
- `user/sessions/` — Claude Code session logs
- `.claude/CLAUDE.local.md` — your personal notes (never overwritten, uploaded on sync)
```
- [ ] **Step 2: Commit**
```bash
git add config/claude_md_template.txt
git commit -m "feat: add CLAUDE.md template for analyst bootstrap"
```
---
### Task 2: `da analyst setup` — Core Command
**Files:**
- Create: `cli/commands/analyst.py`
- Modify: `cli/main.py` (register analyst_app)
- Test: `tests/test_cli.py` (help test)
- Test: `tests/test_analyst_bootstrap.py`
- [ ] **Step 1: Write failing tests**
Add to `tests/test_cli.py` in `TestCLIHelp`:
```python
def test_analyst_help(self):
result = runner.invoke(app, ["analyst", "--help"])
assert result.exit_code == 0
assert "setup" in result.output
```
Create `tests/test_analyst_bootstrap.py`:
```python
"""Tests for analyst bootstrap flow."""
import json
import os
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from typer.testing import CliRunner
from cli.main import app
runner = CliRunner()
@pytest.fixture(autouse=True)
def tmp_workspace(tmp_path, monkeypatch):
monkeypatch.setenv("DATA_DIR", str(tmp_path / "data"))
monkeypatch.setenv("DA_CONFIG_DIR", str(tmp_path / "config"))
monkeypatch.setenv("DA_LOCAL_DIR", str(tmp_path / "local"))
monkeypatch.setenv("JWT_SECRET_KEY", "test-secret")
(tmp_path / "data").mkdir()
(tmp_path / "config").mkdir()
(tmp_path / "local").mkdir()
monkeypatch.chdir(tmp_path / "workspace")
(tmp_path / "workspace").mkdir()
yield tmp_path / "workspace"
class TestDetectExistingProject:
def test_detects_existing_claude_md(self, tmp_workspace):
(tmp_workspace / "CLAUDE.md").write_text("# Acme — AI Data Analyst\n")
result = runner.invoke(app, ["analyst", "setup", "--server-url", "http://localhost:8000"])
assert "already set up" in result.output.lower() or result.exit_code == 0
def test_no_detection_with_force(self, tmp_workspace):
(tmp_workspace / "CLAUDE.md").write_text("# Acme — AI Data Analyst\n")
with patch("cli.commands.analyst._connect_to_instance") as mock_connect:
mock_connect.side_effect = SystemExit(1) # Will fail at connect step
result = runner.invoke(app, ["analyst", "setup", "--force",
"--server-url", "http://localhost:8000"])
# Should have passed detection and attempted connect
mock_connect.assert_called_once()
class TestCreateWorkspace:
def test_creates_directory_structure(self, tmp_workspace):
from cli.commands.analyst import _create_workspace
_create_workspace(tmp_workspace)
assert (tmp_workspace / "data" / "parquet").is_dir()
assert (tmp_workspace / "data" / "duckdb").is_dir()
assert (tmp_workspace / "data" / "metadata").is_dir()
assert (tmp_workspace / "user" / "artifacts").is_dir()
assert (tmp_workspace / "user" / "sessions").is_dir()
assert (tmp_workspace / ".claude").is_dir()
class TestGenerateClaudeMd:
def test_generates_from_template(self, tmp_workspace):
from cli.commands.analyst import _generate_claude_md
_generate_claude_md(
workspace=tmp_workspace,
instance_name="Acme Analytics",
server_url="https://data.acme.com",
sync_interval="15 minutes",
)
claude_md = tmp_workspace / "CLAUDE.md"
assert claude_md.exists()
content = claude_md.read_text()
assert "Acme Analytics" in content
assert "https://data.acme.com" in content
assert "15 minutes" in content
def test_creates_claude_local_md(self, tmp_workspace):
from cli.commands.analyst import _generate_claude_md
(tmp_workspace / ".claude").mkdir(parents=True, exist_ok=True)
_generate_claude_md(
workspace=tmp_workspace,
instance_name="Test",
server_url="http://localhost",
sync_interval="1 hour",
)
assert (tmp_workspace / ".claude" / "CLAUDE.local.md").exists()
def test_does_not_overwrite_existing_local_md(self, tmp_workspace):
(tmp_workspace / ".claude").mkdir(parents=True, exist_ok=True)
local_md = tmp_workspace / ".claude" / "CLAUDE.local.md"
local_md.write_text("my notes")
from cli.commands.analyst import _generate_claude_md
_generate_claude_md(
workspace=tmp_workspace,
instance_name="Test",
server_url="http://localhost",
sync_interval="1 hour",
)
assert local_md.read_text() == "my notes"
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest tests/test_analyst_bootstrap.py -v`
Expected: FAIL — `No such command 'analyst'`
- [ ] **Step 3: Implement `cli/commands/analyst.py`**
Create `cli/commands/analyst.py`:
```python
"""Analyst commands — da analyst."""
import json
import logging
from pathlib import Path
from typing import Optional
import typer
logger = logging.getLogger(__name__)
analyst_app = typer.Typer(help="Analyst workspace — setup, connect to a remote instance")
AGNES_IDENTIFIER = "AI Data Analyst"
@analyst_app.command("setup")
def setup(
server_url: str = typer.Option(None, "--server-url", "-s", help="Agnes instance URL"),
force: bool = typer.Option(False, "--force", help="Re-run from scratch, clean partial state"),
):
"""Set up a local analyst workspace connected to a remote Agnes instance."""
workspace = Path.cwd()
# Step 1: Detect existing project
if not force:
claude_md = workspace / "CLAUDE.md"
if claude_md.exists() and AGNES_IDENTIFIER in claude_md.read_text():
typer.echo("Project already set up. Use 'da sync' to refresh data, or --force to re-setup.")
return
# Step 2: Connect
if not server_url:
server_url = typer.prompt("Agnes instance URL (e.g., https://data.acme.com)")
token = _connect_to_instance(server_url)
# Step 3: Create workspace
_create_workspace(workspace)
# Step 4: Download schema and metrics
_download_metadata(workspace, server_url, token)
# Step 5: Download data
table_count = _download_data(workspace, server_url, token)
# Step 6: Initialize DuckDB
row_count = _initialize_duckdb(workspace)
# Step 7: Generate CLAUDE.md
instance_name = _get_instance_name(server_url, token)
_generate_claude_md(
workspace=workspace,
instance_name=instance_name,
server_url=server_url,
sync_interval="15 minutes",
)
# Step 8: Verify
typer.echo(f"\nSetup complete. {table_count} tables, {row_count} total rows.")
typer.echo("Start analyzing with Claude Code, or run 'da sync' to refresh data.")
def _connect_to_instance(server_url: str) -> str:
"""Connect to Agnes instance, authenticate, return JWT token."""
import httpx
# Health check
try:
resp = httpx.get(f"{server_url}/api/health", timeout=10)
resp.raise_for_status()
except Exception as e:
typer.echo(f"Cannot reach {server_url}: {e}", err=True)
raise typer.Exit(1)
# Authenticate
email = typer.prompt("Email")
password = typer.prompt("Password", hide_input=True)
try:
resp = httpx.post(
f"{server_url}/auth/token",
data={"username": email, "password": password},
timeout=10,
)
resp.raise_for_status()
token = resp.json().get("access_token")
if not token:
typer.echo("Authentication failed: no token in response", err=True)
raise typer.Exit(1)
except httpx.HTTPStatusError as e:
typer.echo(f"Authentication failed: {e.response.text}", err=True)
raise typer.Exit(1)
# Save credentials
from cli.config import save_config
save_config({"server_url": server_url, "token": token})
typer.echo(f"Connected to {server_url}")
return token
def _create_workspace(workspace: Path) -> None:
"""Create analyst directory structure."""
dirs = [
workspace / "data" / "parquet",
workspace / "data" / "duckdb",
workspace / "data" / "metadata",
workspace / "user" / "artifacts",
workspace / "user" / "sessions",
workspace / ".claude",
]
for d in dirs:
d.mkdir(parents=True, exist_ok=True)
def _download_metadata(workspace: Path, server_url: str, token: str) -> None:
"""Download table list and metrics to local cache."""
import httpx
headers = {"Authorization": f"Bearer {token}"}
metadata_dir = workspace / "data" / "metadata"
# Tables
try:
resp = httpx.get(f"{server_url}/api/catalog/tables", headers=headers, timeout=30)
resp.raise_for_status()
(metadata_dir / "tables.json").write_text(json.dumps(resp.json(), indent=2))
typer.echo(f"Downloaded table catalog ({resp.json().get('count', '?')} tables)")
except Exception as e:
typer.echo(f"Warning: could not download table catalog: {e}", err=True)
# Metrics
try:
resp = httpx.get(f"{server_url}/api/metrics", headers=headers, timeout=30)
resp.raise_for_status()
(metadata_dir / "metrics.json").write_text(json.dumps(resp.json(), indent=2))
typer.echo(f"Downloaded metrics ({resp.json().get('count', '?')} metrics)")
except Exception as e:
typer.echo(f"Warning: could not download metrics: {e}", err=True)
def _download_data(workspace: Path, server_url: str, token: str) -> int:
"""Download parquet files for all accessible tables. Returns count."""
import httpx
metadata_dir = workspace / "data" / "metadata"
parquet_dir = workspace / "data" / "parquet"
tables_file = metadata_dir / "tables.json"
if not tables_file.exists():
return 0
tables_data = json.loads(tables_file.read_text())
tables = tables_data.get("tables", [])
count = 0
for table in tables:
tid = table["id"]
target = parquet_dir / f"{tid}.parquet"
# Resume: skip if already downloaded
if target.exists() and target.stat().st_size > 0:
count += 1
continue
try:
with httpx.Client(base_url=server_url, headers={"Authorization": f"Bearer {token}"},
timeout=300) as client:
with client.stream("GET", f"/api/data/{tid}/download") as resp:
if resp.status_code == 404:
continue
resp.raise_for_status()
target.parent.mkdir(parents=True, exist_ok=True)
with open(target, "wb") as f:
for chunk in resp.iter_bytes(65536):
f.write(chunk)
count += 1
typer.echo(f" Downloaded {tid}")
except Exception as e:
typer.echo(f" Failed {tid}: {e}", err=True)
typer.echo(f"Downloaded {count}/{len(tables)} tables")
return count
def _initialize_duckdb(workspace: Path) -> int:
"""Create local analytics.duckdb with views over downloaded parquets. Returns total rows."""
import duckdb
parquet_dir = workspace / "data" / "parquet"
db_path = workspace / "data" / "duckdb" / "analytics.duckdb"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = duckdb.connect(str(db_path))
total_rows = 0
for pq in sorted(parquet_dir.glob("*.parquet")):
view_name = pq.stem
try:
conn.execute(f"CREATE OR REPLACE VIEW \"{view_name}\" AS SELECT * FROM read_parquet('{pq}')")
row_count = conn.execute(f"SELECT count(*) FROM \"{view_name}\"").fetchone()[0]
total_rows += row_count
except Exception as e:
logger.warning("Could not create view for %s: %s", pq.name, e)
conn.close()
typer.echo(f"Initialized DuckDB with {len(list(parquet_dir.glob('*.parquet')))} views")
return total_rows
def _get_instance_name(server_url: str, token: str) -> str:
"""Get instance name from server, fallback to URL hostname."""
import httpx
try:
resp = httpx.get(f"{server_url}/api/health", headers={"Authorization": f"Bearer {token}"}, timeout=10)
data = resp.json()
return data.get("instance_name", server_url.split("//")[-1].split("/")[0])
except Exception:
return server_url.split("//")[-1].split("/")[0]
def _generate_claude_md(
workspace: Path,
instance_name: str,
server_url: str,
sync_interval: str,
) -> None:
"""Generate CLAUDE.md from template."""
template_path = Path(__file__).parent.parent.parent / "config" / "claude_md_template.txt"
if template_path.exists():
template = template_path.read_text()
else:
# Inline fallback
template = "# {instance_name} — AI Data Analyst\n\nConnected to {server_url}.\n"
content = template.replace("{instance_name}", instance_name)
content = content.replace("{server_url}", server_url)
content = content.replace("{sync_interval}", sync_interval)
(workspace / "CLAUDE.md").write_text(content)
# Create CLAUDE.local.md if it doesn't exist
local_md = workspace / ".claude" / "CLAUDE.local.md"
local_md.parent.mkdir(parents=True, exist_ok=True)
if not local_md.exists():
local_md.write_text("# Personal Notes\n\nAdd your learnings and insights here.\n")
```
Register in `cli/main.py`:
```python
from cli.commands.analyst import analyst_app
# ...
app.add_typer(analyst_app, name="analyst")
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest tests/test_analyst_bootstrap.py -v && pytest tests/test_cli.py::TestCLIHelp::test_analyst_help -v`
Expected: ALL PASS
- [ ] **Step 5: Commit**
```bash
git add cli/commands/analyst.py cli/main.py config/claude_md_template.txt tests/test_analyst_bootstrap.py tests/test_cli.py
git commit -m "feat: add da analyst setup command with bootstrap flow"
```
---
### Task 3: Returning-Session Detection
**Files:**
- Modify: `cli/commands/analyst.py` (add `da analyst status` command)
- Test: `tests/test_analyst_bootstrap.py`
- [ ] **Step 1: Write failing test**
Add to `tests/test_analyst_bootstrap.py`:
```python
import time
class TestReturningSession:
def test_stale_data_warning(self, tmp_workspace):
from cli.commands.analyst import _check_data_freshness
metadata_dir = tmp_workspace / "data" / "metadata"
metadata_dir.mkdir(parents=True, exist_ok=True)
# Write last_sync.json with old timestamp
import json
from datetime import datetime, timezone, timedelta
old_time = (datetime.now(timezone.utc) - timedelta(hours=25)).isoformat()
(metadata_dir / "last_sync.json").write_text(json.dumps({"last_sync": old_time}))
result = _check_data_freshness(tmp_workspace)
assert result == "stale"
def test_fresh_data_ok(self, tmp_workspace):
from cli.commands.analyst import _check_data_freshness
metadata_dir = tmp_workspace / "data" / "metadata"
metadata_dir.mkdir(parents=True, exist_ok=True)
import json
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
(metadata_dir / "last_sync.json").write_text(json.dumps({"last_sync": now}))
result = _check_data_freshness(tmp_workspace)
assert result == "fresh"
def test_no_data_returns_missing(self, tmp_workspace):
from cli.commands.analyst import _check_data_freshness
result = _check_data_freshness(tmp_workspace)
assert result == "missing"
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest tests/test_analyst_bootstrap.py::TestReturningSession -v`
Expected: FAIL — `_check_data_freshness` not found
- [ ] **Step 3: Implement freshness check and status command**
Add to `cli/commands/analyst.py`:
```python
@analyst_app.command("status")
def status():
"""Check workspace status and data freshness."""
workspace = Path.cwd()
claude_md = workspace / "CLAUDE.md"
if not claude_md.exists() or AGNES_IDENTIFIER not in claude_md.read_text():
typer.echo("No analyst workspace detected. Run 'da analyst setup' first.")
raise typer.Exit(1)
freshness = _check_data_freshness(workspace)
if freshness == "stale":
typer.echo("Data is stale (>24h old). Run 'da sync' to refresh.")
elif freshness == "missing":
typer.echo("No data found. Run 'da analyst setup' to download data.")
else:
typer.echo("Data is fresh.")
def _check_data_freshness(workspace: Path) -> str:
"""Check data freshness. Returns 'fresh', 'stale', or 'missing'."""
last_sync_file = workspace / "data" / "metadata" / "last_sync.json"
if not last_sync_file.exists():
return "missing"
try:
data = json.loads(last_sync_file.read_text())
last_sync_str = data.get("last_sync")
if not last_sync_str:
return "missing"
from datetime import datetime, timezone, timedelta
last_sync = datetime.fromisoformat(last_sync_str)
if last_sync.tzinfo is None:
last_sync = last_sync.replace(tzinfo=timezone.utc)
age = datetime.now(timezone.utc) - last_sync
if age > timedelta(hours=24):
return "stale"
return "fresh"
except Exception:
return "missing"
```
Also update `_download_metadata` to write `last_sync.json`:
At the end of the `_download_metadata` function, add:
```python
# Record sync timestamp
from datetime import datetime, timezone
sync_record = {"last_sync": datetime.now(timezone.utc).isoformat(), "server_url": server_url}
(metadata_dir / "last_sync.json").write_text(json.dumps(sync_record))
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest tests/test_analyst_bootstrap.py -v`
Expected: ALL PASS
- [ ] **Step 5: Commit**
```bash
git add cli/commands/analyst.py tests/test_analyst_bootstrap.py
git commit -m "feat: add da analyst status and returning-session freshness check"
```
---
### Task 4: Final Integration
- [ ] **Step 1: Run full test suite**
Run: `pytest tests/ -v --timeout=60`
Expected: ALL PASS
- [ ] **Step 2: Commit if any fixes needed**
```bash
git add -A
git commit -m "fix: address analyst bootstrap integration issues"
```

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,567 @@
# Metadata Writer Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add column metadata management — discover basetypes/descriptions, store in DuckDB, push back to Keboola Storage API.
**Architecture:** `column_metadata` table (created in schema v4 by the metrics plan). New `ColumnMetadataRepository` following `table_registry.py` pattern. CLI subcommands under `da admin metadata`. API endpoints under `/api/admin/metadata/`. Keboola push uses Storage API v2.
**Tech Stack:** DuckDB, FastAPI, Typer, httpx (for Keboola API push), PyArrow (for schema introspection)
**Spec:** `docs/superpowers/specs/2026-04-10-porting-internal-features-design.md` — Section 3
**Depends on:** Business Metrics plan (Task 1 — schema v4 creates `column_metadata` table)
---
### Task 1: ColumnMetadataRepository
**Files:**
- Create: `src/repositories/column_metadata.py`
- Test: `tests/test_column_metadata.py`
- [ ] **Step 1: Write failing tests**
Create `tests/test_column_metadata.py`:
```python
"""Tests for ColumnMetadataRepository."""
import os
import json
from pathlib import Path
import pytest
import duckdb
@pytest.fixture
def db_conn(tmp_path, monkeypatch):
monkeypatch.setenv("DATA_DIR", str(tmp_path))
from src.db import get_system_db
conn = get_system_db()
yield conn
conn.close()
class TestColumnMetadataCreate:
def test_save_single_column(self, db_conn):
from src.repositories.column_metadata import ColumnMetadataRepository
repo = ColumnMetadataRepository(db_conn)
repo.save("orders", "total_amount", basetype="NUMERIC", description="Order total in USD")
result = repo.get("orders", "total_amount")
assert result is not None
assert result["basetype"] == "NUMERIC"
assert result["description"] == "Order total in USD"
assert result["confidence"] == "manual"
def test_upsert_overwrites(self, db_conn):
from src.repositories.column_metadata import ColumnMetadataRepository
repo = ColumnMetadataRepository(db_conn)
repo.save("orders", "total_amount", basetype="NUMERIC", description="v1")
repo.save("orders", "total_amount", basetype="FLOAT", description="v2")
result = repo.get("orders", "total_amount")
assert result["basetype"] == "FLOAT"
assert result["description"] == "v2"
class TestColumnMetadataRead:
def test_list_for_table(self, db_conn):
from src.repositories.column_metadata import ColumnMetadataRepository
repo = ColumnMetadataRepository(db_conn)
repo.save("orders", "id", basetype="STRING")
repo.save("orders", "total", basetype="NUMERIC")
repo.save("users", "email", basetype="STRING")
results = repo.list_for_table("orders")
assert len(results) == 2
names = {r["column_name"] for r in results}
assert names == {"id", "total"}
def test_get_missing(self, db_conn):
from src.repositories.column_metadata import ColumnMetadataRepository
repo = ColumnMetadataRepository(db_conn)
assert repo.get("x", "y") is None
class TestColumnMetadataDelete:
def test_delete_column(self, db_conn):
from src.repositories.column_metadata import ColumnMetadataRepository
repo = ColumnMetadataRepository(db_conn)
repo.save("orders", "total", basetype="NUMERIC")
assert repo.delete("orders", "total") is True
assert repo.get("orders", "total") is None
def test_delete_missing(self, db_conn):
from src.repositories.column_metadata import ColumnMetadataRepository
repo = ColumnMetadataRepository(db_conn)
assert repo.delete("x", "y") is False
class TestColumnMetadataProposal:
def test_import_proposal(self, db_conn, tmp_path):
from src.repositories.column_metadata import ColumnMetadataRepository
repo = ColumnMetadataRepository(db_conn)
proposal = {
"project": {"name": "sales"},
"generated_at": "2026-04-10T12:00:00",
"tables": {
"orders": {
"columns": {
"id": {"basetype": "STRING", "description": "Order ID", "confidence": "high"},
"total": {"basetype": "NUMERIC", "description": "Total amount", "confidence": "medium"},
}
}
},
}
proposal_path = tmp_path / "proposal.json"
proposal_path.write_text(json.dumps(proposal))
count = repo.import_proposal(proposal_path)
assert count == 2
assert repo.get("orders", "id")["basetype"] == "STRING"
assert repo.get("orders", "total")["confidence"] == "medium"
def test_import_proposal_sets_source(self, db_conn, tmp_path):
from src.repositories.column_metadata import ColumnMetadataRepository
repo = ColumnMetadataRepository(db_conn)
proposal = {
"tables": {
"orders": {
"columns": {
"id": {"basetype": "STRING", "description": "test", "confidence": "high"},
}
}
},
}
(tmp_path / "p.json").write_text(json.dumps(proposal))
repo.import_proposal(tmp_path / "p.json")
assert repo.get("orders", "id")["source"] == "ai_enrichment"
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest tests/test_column_metadata.py -v`
Expected: FAIL — `ModuleNotFoundError`
- [ ] **Step 3: Implement ColumnMetadataRepository**
Create `src/repositories/column_metadata.py`:
```python
"""Repository for column metadata (descriptions, basetypes)."""
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import duckdb
logger = logging.getLogger(__name__)
class ColumnMetadataRepository:
def __init__(self, conn: duckdb.DuckDBPyConnection):
self.conn = conn
def save(self, table_id: str, column_name: str,
basetype: Optional[str] = None,
description: Optional[str] = None,
confidence: str = "manual",
source: str = "manual") -> Dict[str, Any]:
now = datetime.now(timezone.utc)
self.conn.execute(
"""INSERT INTO column_metadata (table_id, column_name, basetype, description, confidence, source, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (table_id, column_name) DO UPDATE SET
basetype = excluded.basetype,
description = excluded.description,
confidence = excluded.confidence,
source = excluded.source,
updated_at = excluded.updated_at""",
[table_id, column_name, basetype, description, confidence, source, now],
)
return self.get(table_id, column_name)
def get(self, table_id: str, column_name: str) -> Optional[Dict[str, Any]]:
result = self.conn.execute(
"SELECT * FROM column_metadata WHERE table_id = ? AND column_name = ?",
[table_id, column_name],
).fetchone()
if not result:
return None
columns = [desc[0] for desc in self.conn.description]
return dict(zip(columns, result))
def list_for_table(self, table_id: str) -> List[Dict[str, Any]]:
results = self.conn.execute(
"SELECT * FROM column_metadata WHERE table_id = ? ORDER BY column_name",
[table_id],
).fetchall()
if not results:
return []
columns = [desc[0] for desc in self.conn.description]
return [dict(zip(columns, row)) for row in results]
def delete(self, table_id: str, column_name: str) -> bool:
existing = self.get(table_id, column_name)
if not existing:
return False
self.conn.execute(
"DELETE FROM column_metadata WHERE table_id = ? AND column_name = ?",
[table_id, column_name],
)
return True
def import_proposal(self, proposal_path) -> int:
"""Import a metadata proposal JSON file. Returns count of columns imported."""
path = Path(proposal_path)
data = json.loads(path.read_text())
count = 0
tables = data.get("tables", {})
for table_id, table_data in tables.items():
columns = table_data.get("columns", {})
for col_name, col_data in columns.items():
self.save(
table_id=table_id,
column_name=col_name,
basetype=col_data.get("basetype"),
description=col_data.get("description"),
confidence=col_data.get("confidence", "medium"),
source="ai_enrichment",
)
count += 1
return count
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest tests/test_column_metadata.py -v`
Expected: ALL PASS
- [ ] **Step 5: Commit**
```bash
git add src/repositories/column_metadata.py tests/test_column_metadata.py
git commit -m "feat: add ColumnMetadataRepository with CRUD and proposal import"
```
---
### Task 2: CLI Subcommands `da admin metadata`
**Files:**
- Modify: `cli/commands/admin.py` (add metadata subcommands)
- Test: `tests/test_cli.py`
- [ ] **Step 1: Write failing test**
Add to `tests/test_cli.py` in `TestCLIHelp`:
```python
def test_admin_metadata_help(self):
result = runner.invoke(app, ["admin", "metadata-show", "--help"])
assert result.exit_code == 0
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest tests/test_cli.py::TestCLIHelp::test_admin_metadata_help -v`
Expected: FAIL — `No such command 'metadata-show'`
- [ ] **Step 3: Add metadata commands to admin.py**
Add to `cli/commands/admin.py`:
```python
@admin_app.command("metadata-show")
def metadata_show(
table_id: str = typer.Argument(..., help="Table ID"),
as_json: bool = typer.Option(False, "--json"),
):
"""Show column metadata for a table."""
resp = api_get(f"/api/admin/metadata/{table_id}")
if resp.status_code != 200:
typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
raise typer.Exit(1)
columns = resp.json().get("columns", [])
if as_json:
typer.echo(json.dumps(columns, indent=2))
else:
if not columns:
typer.echo(f"No metadata for table '{table_id}'")
return
typer.echo(f"\n Metadata for {table_id}:")
for c in columns:
desc = c.get("description", "-")
typer.echo(f" {c['column_name']:30s} {c.get('basetype', '?'):12s} {desc}")
@admin_app.command("metadata-apply")
def metadata_apply(
proposal_path: str = typer.Argument(..., help="Path to proposal JSON file"),
push_to_source: bool = typer.Option(False, "--push-to-source", help="Push to Keboola Storage API"),
dry_run: bool = typer.Option(False, "--dry-run", help="Show changes without applying"),
):
"""Apply a metadata proposal (JSON) to DuckDB and optionally push to source."""
from pathlib import Path
path = Path(proposal_path)
if not path.exists():
typer.echo(f"File not found: {proposal_path}", err=True)
raise typer.Exit(1)
import json as json_mod
data = json_mod.loads(path.read_text())
tables = data.get("tables", {})
if dry_run:
for table_id, td in tables.items():
for col, cd in td.get("columns", {}).items():
typer.echo(f" {table_id}.{col}: {cd.get('basetype', '?')} — {cd.get('description', '-')}")
typer.echo(f"\nDry run: {sum(len(td.get('columns', {})) for td in tables.values())} columns would be applied")
return
from src.db import get_system_db
from src.repositories.column_metadata import ColumnMetadataRepository
conn = get_system_db()
try:
repo = ColumnMetadataRepository(conn)
count = repo.import_proposal(path)
typer.echo(f"Applied {count} column metadata entries to DuckDB")
finally:
conn.close()
if push_to_source:
resp = api_post(f"/api/admin/metadata/push", json={"proposal_path": str(path)})
if resp.status_code == 200:
typer.echo("Pushed metadata to source system")
else:
typer.echo(f"Push failed: {resp.json().get('detail', resp.text)}", err=True)
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest tests/test_cli.py::TestCLIHelp::test_admin_metadata_help -v`
Expected: PASS
- [ ] **Step 5: Commit**
```bash
git add cli/commands/admin.py tests/test_cli.py
git commit -m "feat: add da admin metadata-show and metadata-apply commands"
```
---
### Task 3: API Endpoints
**Files:**
- Create: `app/api/metadata.py`
- Modify: `app/main.py` (register router)
- Test: `tests/test_api.py`
- [ ] **Step 1: Write failing tests**
Add to `tests/test_api.py`:
```python
class TestMetadataAPI:
def test_get_metadata_empty(self, seeded_client):
client, admin_token, _ = seeded_client
resp = client.get("/api/admin/metadata/orders",
headers={"Authorization": f"Bearer {admin_token}"})
assert resp.status_code == 200
assert resp.json()["columns"] == []
def test_save_and_get_metadata(self, seeded_client):
client, admin_token, _ = seeded_client
resp = client.post(
"/api/admin/metadata/orders",
json={"columns": [
{"column_name": "id", "basetype": "STRING", "description": "Order ID"},
{"column_name": "total", "basetype": "NUMERIC", "description": "Total amount"},
]},
headers={"Authorization": f"Bearer {admin_token}"},
)
assert resp.status_code == 200
assert resp.json()["count"] == 2
resp = client.get("/api/admin/metadata/orders",
headers={"Authorization": f"Bearer {admin_token}"})
assert len(resp.json()["columns"]) == 2
def test_analyst_cannot_save_metadata(self, seeded_client):
client, _, analyst_token = seeded_client
resp = client.post(
"/api/admin/metadata/orders",
json={"columns": [{"column_name": "id", "basetype": "STRING"}]},
headers={"Authorization": f"Bearer {analyst_token}"},
)
assert resp.status_code == 403
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest tests/test_api.py::TestMetadataAPI -v`
Expected: FAIL — 404 on `/api/admin/metadata/orders`
- [ ] **Step 3: Implement API router**
Create `app/api/metadata.py`:
```python
"""Column metadata API endpoints."""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import duckdb
from app.auth.dependencies import get_current_user, require_admin, _get_db
from src.repositories.column_metadata import ColumnMetadataRepository
router = APIRouter(tags=["metadata"])
class ColumnMetadataItem(BaseModel):
column_name: str
basetype: Optional[str] = None
description: Optional[str] = None
confidence: str = "manual"
class ColumnMetadataSave(BaseModel):
columns: List[ColumnMetadataItem]
@router.get("/api/admin/metadata/{table_id}")
async def get_table_metadata(
table_id: str,
user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
repo = ColumnMetadataRepository(conn)
columns = repo.list_for_table(table_id)
return {"table_id": table_id, "columns": columns}
@router.post("/api/admin/metadata/{table_id}")
async def save_table_metadata(
table_id: str,
body: ColumnMetadataSave,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
repo = ColumnMetadataRepository(conn)
for col in body.columns:
repo.save(
table_id=table_id,
column_name=col.column_name,
basetype=col.basetype,
description=col.description,
confidence=col.confidence,
source="api",
)
return {"status": "ok", "table_id": table_id, "count": len(body.columns)}
@router.post("/api/admin/metadata/{table_id}/push")
async def push_metadata_to_source(
table_id: str,
user: dict = Depends(require_admin),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Push column metadata to the source system (Keboola only)."""
from src.repositories.table_registry import TableRegistryRepository
table_repo = TableRegistryRepository(conn)
table = table_repo.get(table_id)
if not table:
raise HTTPException(status_code=404, detail=f"Table not found: {table_id}")
if table.get("source_type") != "keboola":
raise HTTPException(status_code=400, detail="Push only supported for Keboola source tables")
meta_repo = ColumnMetadataRepository(conn)
columns = meta_repo.list_for_table(table_id)
if not columns:
raise HTTPException(status_code=400, detail="No metadata to push")
# Build Keboola API payload
import os
import httpx
stack_url = os.environ.get("KBC_STACK_URL", "")
token = os.environ.get("KBC_STORAGE_TOKEN", "")
if not stack_url or not token:
raise HTTPException(status_code=400, detail="KBC_STACK_URL and KBC_STORAGE_TOKEN must be set")
source_table = table.get("source_table", table_id)
columns_metadata = {}
for col in columns:
entries = []
if col.get("basetype"):
entries.append({"key": "KBC.datatype.basetype", "value": col["basetype"]})
if col.get("description"):
entries.append({"key": "KBC.description", "value": col["description"]})
if entries:
columns_metadata[col["column_name"]] = entries
try:
resp = httpx.post(
f"{stack_url}/v2/storage/tables/{source_table}/metadata",
headers={"X-StorageApi-Token": token},
json={"provider": "ai-metadata-enrichment", "columnsMetadata": columns_metadata},
timeout=30,
)
resp.raise_for_status()
return {"status": "pushed", "table_id": table_id, "columns": len(columns_metadata)}
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=502, detail=f"Keboola API error: {e.response.text}")
```
Register in `app/main.py`:
```python
from app.api.metadata import router as metadata_router
# ... (add near other router imports)
# In create_app(), add before web_router:
app.include_router(metadata_router)
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest tests/test_api.py::TestMetadataAPI -v`
Expected: ALL PASS
- [ ] **Step 5: Commit**
```bash
git add app/api/metadata.py app/main.py tests/test_api.py
git commit -m "feat: add column metadata API with Keboola push support"
```
---
### Task 4: Final Integration
- [ ] **Step 1: Run full test suite**
Run: `pytest tests/ -v --timeout=60`
Expected: ALL PASS
- [ ] **Step 2: Commit if any fixes needed**
```bash
git add -A
git commit -m "fix: address metadata writer integration issues"
```