agnes-the-ai-analyst/docs/archive/superpowers/plans/2026-04-10-business-metrics.md
ZdenekSrotyr a48524509a
docs: consolidate and de-clutter the documentation tree (#306)
CLAUDE.md rewritten (708 -> ~320 lines): four overlapping release
sections collapsed to one, stale v1->v35 schema history dropped (it
lives in CHANGELOG), marketplace endpoint internals and verbose
process sections moved out or tightened.

New focused docs:
- docs/RELEASING.md - release process, deploy workflows, CI quirks
  (RELEASE_TEMPLATE.md folded in as an appendix)
- docs/marketplace.md - marketplace ingestion + re-serving internals
- docs/README.md - documentation index by audience, linked from
  README.md and CLAUDE.md

Archived under docs/archive/: docs/superpowers/ (52 historical
planning artifacts), HACKATHON.md, pd-ps-comments.md,
security-audit-2026-04.md, future/NOTIFICATIONS.md.

Removed the docs/auto-install.md stub. Fixed dangling links in
connectors/jira/README.md and dev_docs/README.md, repointed
code/doc references to archived paths.
2026-05-14 18:54:22 +00:00

56 KiB
Raw Permalink Blame History

Business Metrics Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add DuckDB-backed business metrics framework with YAML import/export, CLI, API, profiler integration, and a 10-metric starter pack.

Architecture: New metric_definitions table in system.duckdb (schema v4). Repository pattern matching table_registry.py. CLI commands via Typer, API via FastAPI router. Profiler refactored to read metrics from DuckDB instead of YAML.

Tech Stack: DuckDB, FastAPI, Typer, PyYAML, pytest

Spec: docs/superpowers/specs/2026-04-10-porting-internal-features-design.md — Section 1


Task 1: Schema Migration v3→v4

Files:

  • Modify: src/db.py:19 (SCHEMA_VERSION), src/db.py:258-302 (migrations + _ensure_schema)

  • Test: tests/test_db.py

  • Step 1: Write failing test for v4 schema

In tests/test_db.py, add:

class TestSchemaV4:
    def test_metric_definitions_table_exists(self, tmp_path, monkeypatch):
        _setup_data_dir(tmp_path, monkeypatch)
        from src.db import get_system_db

        conn = get_system_db()
        try:
            tables = {
                row[0]
                for row in conn.execute(
                    "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'"
                ).fetchall()
            }
            assert "metric_definitions" in tables
            assert "column_metadata" in tables
        finally:
            conn.close()

    def test_metric_definitions_columns(self, tmp_path, monkeypatch):
        _setup_data_dir(tmp_path, monkeypatch)
        from src.db import get_system_db

        conn = get_system_db()
        try:
            cols = {
                row[0]
                for row in conn.execute(
                    "SELECT column_name FROM information_schema.columns "
                    "WHERE table_name = 'metric_definitions'"
                ).fetchall()
            }
            expected = {
                "id", "name", "display_name", "category", "description",
                "type", "unit", "grain", "table_name", "tables", "expression",
                "time_column", "dimensions", "filters", "synonyms", "notes",
                "sql", "sql_variants", "validation", "source",
                "created_at", "updated_at",
            }
            assert expected.issubset(cols), f"Missing: {expected - cols}"
        finally:
            conn.close()

    def test_column_metadata_table_exists(self, tmp_path, monkeypatch):
        _setup_data_dir(tmp_path, monkeypatch)
        from src.db import get_system_db

        conn = get_system_db()
        try:
            cols = {
                row[0]
                for row in conn.execute(
                    "SELECT column_name FROM information_schema.columns "
                    "WHERE table_name = 'column_metadata'"
                ).fetchall()
            }
            expected = {"table_id", "column_name", "basetype", "description", "confidence", "source", "updated_at"}
            assert expected.issubset(cols), f"Missing: {expected - cols}"
        finally:
            conn.close()

    def test_v3_to_v4_migration(self, tmp_path, monkeypatch):
        """Simulate a v3 database and verify migration to v4."""
        _setup_data_dir(tmp_path, monkeypatch)
        import duckdb
        from src.db import _SYSTEM_SCHEMA

        db_path = tmp_path / "state" / "system.duckdb"
        db_path.parent.mkdir(parents=True, exist_ok=True)
        conn = duckdb.connect(str(db_path))
        conn.execute(_SYSTEM_SCHEMA)
        conn.execute("INSERT INTO schema_version (version) VALUES (3)")
        conn.close()

        from src.db import get_system_db
        conn = get_system_db()
        try:
            version = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0]
            assert version == 4
            tables = {
                row[0]
                for row in conn.execute(
                    "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'"
                ).fetchall()
            }
            assert "metric_definitions" in tables
            assert "column_metadata" in tables
        finally:
            conn.close()
  • Step 2: Run test to verify it fails

Run: pytest tests/test_db.py::TestSchemaV4 -v Expected: FAIL — metric_definitions table does not exist

  • Step 3: Implement schema v4 migration

In src/db.py, change line 19:

SCHEMA_VERSION = 4

After _V2_TO_V3_MIGRATIONS (line ~260), add:

_V3_TO_V4_MIGRATIONS = [
    """CREATE TABLE IF NOT EXISTS metric_definitions (
        id              VARCHAR PRIMARY KEY,
        name            VARCHAR NOT NULL,
        display_name    VARCHAR NOT NULL,
        category        VARCHAR NOT NULL,
        description     TEXT,
        type            VARCHAR DEFAULT 'sum',
        unit            VARCHAR,
        grain           VARCHAR DEFAULT 'monthly',
        table_name      VARCHAR,
        tables          VARCHAR[],
        expression      VARCHAR,
        time_column     VARCHAR,
        dimensions      VARCHAR[],
        filters         VARCHAR[],
        synonyms        VARCHAR[],
        notes           VARCHAR[],
        sql             TEXT NOT NULL,
        sql_variants    JSON,
        validation      JSON,
        source          VARCHAR DEFAULT 'manual',
        created_at      TIMESTAMP DEFAULT current_timestamp,
        updated_at      TIMESTAMP DEFAULT current_timestamp
    )""",
    """CREATE TABLE IF NOT EXISTS column_metadata (
        table_id        VARCHAR NOT NULL,
        column_name     VARCHAR NOT NULL,
        basetype        VARCHAR,
        description     VARCHAR,
        confidence      VARCHAR DEFAULT 'manual',
        source          VARCHAR DEFAULT 'manual',
        updated_at      TIMESTAMP DEFAULT current_timestamp,
        PRIMARY KEY (table_id, column_name)
    )""",
]

In _ensure_schema(), after the if current < 3: block (line ~298), add:

            if current < 4:
                for sql in _V3_TO_V4_MIGRATIONS:
                    conn.execute(sql)

Also update the test_creates_all_tables test's expected set to include "metric_definitions" and "column_metadata".

  • Step 4: Run test to verify it passes

Run: pytest tests/test_db.py -v Expected: ALL PASS

  • Step 5: Commit
git add src/db.py tests/test_db.py
git commit -m "feat: add schema v4 with metric_definitions and column_metadata tables"

Task 2: MetricRepository — CRUD

Files:

  • Create: src/repositories/metrics.py

  • Test: tests/test_metrics.py

  • Step 1: Write failing tests for MetricRepository CRUD

Create tests/test_metrics.py:

"""Tests for MetricRepository."""

import os
import pytest
import duckdb


@pytest.fixture
def db_conn(tmp_path, monkeypatch):
    monkeypatch.setenv("DATA_DIR", str(tmp_path))
    from src.db import get_system_db
    conn = get_system_db()
    yield conn
    conn.close()


SAMPLE_METRIC = {
    "id": "revenue/mrr",
    "name": "mrr",
    "display_name": "Monthly Recurring Revenue",
    "category": "revenue",
    "description": "Total MRR from all subscriptions",
    "type": "sum",
    "unit": "USD",
    "grain": "monthly",
    "table_name": "subscriptions",
    "expression": "SUM(mrr_amount)",
    "time_column": "billing_date",
    "dimensions": ["plan_type", "region"],
    "synonyms": ["monthly_revenue", "recurring_revenue"],
    "notes": ["Excludes one-time fees"],
    "sql": "SELECT DATE_TRUNC('month', billing_date) AS month, SUM(mrr_amount) AS mrr FROM subscriptions GROUP BY 1",
}


class TestMetricRepositoryCreate:
    def test_create_metric(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        result = repo.create(**SAMPLE_METRIC)
        assert result["id"] == "revenue/mrr"
        assert result["name"] == "mrr"

    def test_create_duplicate_upserts(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.create(**SAMPLE_METRIC)
        updated = {**SAMPLE_METRIC, "display_name": "Updated MRR"}
        result = repo.create(**updated)
        assert result["display_name"] == "Updated MRR"
        assert len(repo.list()) == 1


class TestMetricRepositoryRead:
    def test_get_existing(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.create(**SAMPLE_METRIC)
        result = repo.get("revenue/mrr")
        assert result is not None
        assert result["name"] == "mrr"
        assert result["dimensions"] == ["plan_type", "region"]

    def test_get_missing(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        assert repo.get("nonexistent/metric") is None

    def test_list_all(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.create(**SAMPLE_METRIC)
        repo.create(
            id="revenue/arr", name="arr", display_name="ARR",
            category="revenue", sql="SELECT 1",
        )
        results = repo.list()
        assert len(results) == 2

    def test_list_by_category(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.create(**SAMPLE_METRIC)
        repo.create(
            id="ops/resolution_time", name="resolution_time",
            display_name="Resolution Time", category="ops",
            sql="SELECT 1",
        )
        results = repo.list(category="revenue")
        assert len(results) == 1
        assert results[0]["name"] == "mrr"


class TestMetricRepositoryUpdate:
    def test_update_fields(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.create(**SAMPLE_METRIC)
        result = repo.update("revenue/mrr", display_name="Gross MRR", unit="EUR")
        assert result["display_name"] == "Gross MRR"
        assert result["unit"] == "EUR"
        assert result["name"] == "mrr"  # unchanged

    def test_update_missing_returns_none(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        assert repo.update("nonexistent/x", name="y") is None


class TestMetricRepositoryDelete:
    def test_delete_existing(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.create(**SAMPLE_METRIC)
        assert repo.delete("revenue/mrr") is True
        assert repo.get("revenue/mrr") is None

    def test_delete_missing(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        assert repo.delete("nonexistent/x") is False


class TestMetricRepositorySearch:
    def test_find_by_table(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.create(**SAMPLE_METRIC)
        repo.create(
            id="revenue/arr", name="arr", display_name="ARR",
            category="revenue", table_name="subscriptions",
            sql="SELECT 1",
        )
        repo.create(
            id="ops/tickets", name="tickets", display_name="Tickets",
            category="ops", table_name="tickets",
            sql="SELECT 1",
        )
        results = repo.find_by_table("subscriptions")
        assert len(results) == 2
        names = {r["name"] for r in results}
        assert names == {"mrr", "arr"}

    def test_find_by_synonym(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.create(**SAMPLE_METRIC)
        results = repo.find_by_synonym("recurring_revenue")
        assert len(results) == 1
        assert results[0]["name"] == "mrr"

    def test_get_table_map(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.create(**SAMPLE_METRIC)
        repo.create(
            id="ops/tickets", name="tickets", display_name="Tickets",
            category="ops", table_name="tickets",
            sql="SELECT 1",
        )
        table_map = repo.get_table_map()
        assert table_map == {"subscriptions": ["mrr"], "tickets": ["tickets"]}
  • Step 2: Run tests to verify they fail

Run: pytest tests/test_metrics.py -v Expected: FAIL — ModuleNotFoundError: No module named 'src.repositories.metrics'

  • Step 3: Implement MetricRepository

Create src/repositories/metrics.py:

"""Repository for metric definitions."""

from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

import duckdb


class MetricRepository:
    def __init__(self, conn: duckdb.DuckDBPyConnection):
        self.conn = conn

    def create(self, id: str, name: str, display_name: str, category: str,
               sql: str, description: Optional[str] = None,
               type: str = "sum", unit: Optional[str] = None,
               grain: str = "monthly", table_name: Optional[str] = None,
               tables: Optional[List[str]] = None, expression: Optional[str] = None,
               time_column: Optional[str] = None, dimensions: Optional[List[str]] = None,
               filters: Optional[List[str]] = None, synonyms: Optional[List[str]] = None,
               notes: Optional[List[str]] = None, sql_variants: Optional[dict] = None,
               validation: Optional[dict] = None, source: str = "manual",
               **kwargs) -> Dict[str, Any]:
        now = datetime.now(timezone.utc)
        self.conn.execute(
            """INSERT INTO metric_definitions (
                id, name, display_name, category, description, type, unit, grain,
                table_name, tables, expression, time_column, dimensions, filters,
                synonyms, notes, sql, sql_variants, validation, source,
                created_at, updated_at
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT (id) DO UPDATE SET
                name = excluded.name, display_name = excluded.display_name,
                category = excluded.category, description = excluded.description,
                type = excluded.type, unit = excluded.unit, grain = excluded.grain,
                table_name = excluded.table_name, tables = excluded.tables,
                expression = excluded.expression, time_column = excluded.time_column,
                dimensions = excluded.dimensions, filters = excluded.filters,
                synonyms = excluded.synonyms, notes = excluded.notes,
                sql = excluded.sql, sql_variants = excluded.sql_variants,
                validation = excluded.validation, source = excluded.source,
                updated_at = excluded.updated_at""",
            [id, name, display_name, category, description, type, unit, grain,
             table_name, tables, expression, time_column, dimensions, filters,
             synonyms, notes, sql,
             json_dumps(sql_variants), json_dumps(validation),
             source, now, now],
        )
        return self.get(id)

    def get(self, metric_id: str) -> Optional[Dict[str, Any]]:
        result = self.conn.execute(
            "SELECT * FROM metric_definitions WHERE id = ?", [metric_id]
        ).fetchone()
        if not result:
            return None
        columns = [desc[0] for desc in self.conn.description]
        return dict(zip(columns, result))

    def list(self, category: Optional[str] = None) -> List[Dict[str, Any]]:
        if category:
            results = self.conn.execute(
                "SELECT * FROM metric_definitions WHERE category = ? ORDER BY name",
                [category],
            ).fetchall()
        else:
            results = self.conn.execute(
                "SELECT * FROM metric_definitions ORDER BY category, name"
            ).fetchall()
        if not results:
            return []
        columns = [desc[0] for desc in self.conn.description]
        return [dict(zip(columns, row)) for row in results]

    def update(self, metric_id: str, **kwargs) -> Optional[Dict[str, Any]]:
        existing = self.get(metric_id)
        if not existing:
            return None
        kwargs["updated_at"] = datetime.now(timezone.utc)
        # Convert JSON fields
        for json_field in ("sql_variants", "validation"):
            if json_field in kwargs and kwargs[json_field] is not None:
                kwargs[json_field] = json_dumps(kwargs[json_field])
        set_clauses = ", ".join(f"{k} = ?" for k in kwargs)
        values = list(kwargs.values()) + [metric_id]
        self.conn.execute(
            f"UPDATE metric_definitions SET {set_clauses} WHERE id = ?",
            values,
        )
        return self.get(metric_id)

    def delete(self, metric_id: str) -> bool:
        existing = self.get(metric_id)
        if not existing:
            return False
        self.conn.execute("DELETE FROM metric_definitions WHERE id = ?", [metric_id])
        return True

    def find_by_table(self, table_name: str) -> List[Dict[str, Any]]:
        results = self.conn.execute(
            "SELECT * FROM metric_definitions WHERE table_name = ? ORDER BY name",
            [table_name],
        ).fetchall()
        if not results:
            return []
        columns = [desc[0] for desc in self.conn.description]
        return [dict(zip(columns, row)) for row in results]

    def find_by_synonym(self, term: str) -> List[Dict[str, Any]]:
        results = self.conn.execute(
            "SELECT * FROM metric_definitions WHERE list_contains(synonyms, ?) ORDER BY name",
            [term],
        ).fetchall()
        if not results:
            return []
        columns = [desc[0] for desc in self.conn.description]
        return [dict(zip(columns, row)) for row in results]

    def get_table_map(self) -> Dict[str, List[str]]:
        """Return {table_name: [metric_name, ...]} for profiler integration."""
        results = self.conn.execute(
            "SELECT table_name, name FROM metric_definitions WHERE table_name IS NOT NULL ORDER BY table_name"
        ).fetchall()
        table_map: Dict[str, List[str]] = {}
        for table_name, metric_name in results:
            table_map.setdefault(table_name, []).append(metric_name)
        return table_map


def json_dumps(obj) -> Optional[str]:
    """Serialize to JSON string for DuckDB JSON columns, or None."""
    if obj is None:
        return None
    import json
    return json.dumps(obj)
  • Step 4: Run tests to verify they pass

Run: pytest tests/test_metrics.py -v Expected: ALL PASS

  • Step 5: Commit
git add src/repositories/metrics.py tests/test_metrics.py
git commit -m "feat: add MetricRepository with CRUD, search, and table map"

Task 3: YAML Import/Export

Files:

  • Modify: src/repositories/metrics.py (add import_from_yaml, export_to_yaml)

  • Test: tests/test_metrics.py (add import/export tests)

  • Step 1: Write failing tests for YAML import/export

Add to tests/test_metrics.py:

import yaml
from pathlib import Path


@pytest.fixture
def metrics_dir(tmp_path):
    """Create a sample metrics directory with YAML files."""
    revenue_dir = tmp_path / "metrics" / "revenue"
    revenue_dir.mkdir(parents=True)
    ops_dir = tmp_path / "metrics" / "operations"
    ops_dir.mkdir(parents=True)

    # total_revenue.yml — uses 'table' key (YAML format)
    (revenue_dir / "total_revenue.yml").write_text(yaml.dump({
        "name": "total_revenue",
        "display_name": "Total Revenue",
        "category": "revenue",
        "type": "sum",
        "unit": "USD",
        "grain": "monthly",
        "table": "orders",
        "expression": "SUM(total_amount)",
        "time_column": "order_date",
        "dimensions": ["channel", "region"],
        "synonyms": ["gross_revenue"],
        "sql": "SELECT SUM(total_amount) FROM orders",
        "sql_by_channel": "SELECT channel, SUM(total_amount) FROM orders GROUP BY 1",
    }))

    # resolution_time.yml
    (ops_dir / "resolution_time.yml").write_text(yaml.dump({
        "name": "resolution_time",
        "display_name": "Support Resolution Time",
        "category": "operations",
        "type": "avg",
        "unit": "hours",
        "table": "tickets",
        "sql": "SELECT AVG(resolution_hours) FROM tickets",
    }))

    return tmp_path / "metrics"


class TestMetricRepositoryImport:
    def test_import_from_directory(self, db_conn, metrics_dir):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        count = repo.import_from_yaml(metrics_dir)
        assert count == 2
        assert repo.get("revenue/total_revenue") is not None
        assert repo.get("operations/resolution_time") is not None

    def test_import_maps_table_to_table_name(self, db_conn, metrics_dir):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.import_from_yaml(metrics_dir)
        metric = repo.get("revenue/total_revenue")
        assert metric["table_name"] == "orders"

    def test_import_collects_sql_variants(self, db_conn, metrics_dir):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.import_from_yaml(metrics_dir)
        metric = repo.get("revenue/total_revenue")
        variants = metric["sql_variants"]
        # DuckDB returns JSON as string or dict depending on version
        if isinstance(variants, str):
            import json
            variants = json.loads(variants)
        assert "by_channel" in variants

    def test_import_single_file(self, db_conn, metrics_dir):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        count = repo.import_from_yaml(metrics_dir / "revenue" / "total_revenue.yml")
        assert count == 1

    def test_import_idempotent(self, db_conn, metrics_dir):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.import_from_yaml(metrics_dir)
        repo.import_from_yaml(metrics_dir)
        assert len(repo.list()) == 2


class TestMetricRepositoryExport:
    def test_export_to_yaml(self, db_conn, metrics_dir, tmp_path):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        repo.import_from_yaml(metrics_dir)

        export_dir = tmp_path / "export"
        count = repo.export_to_yaml(export_dir)
        assert count == 2

        # Check directory structure
        assert (export_dir / "revenue" / "total_revenue.yml").exists()
        assert (export_dir / "operations" / "resolution_time.yml").exists()

        # Check content
        data = yaml.safe_load((export_dir / "revenue" / "total_revenue.yml").read_text())
        assert data["name"] == "total_revenue"
        assert data["table"] == "orders"  # exported as 'table', not 'table_name'
  • Step 2: Run tests to verify they fail

Run: pytest tests/test_metrics.py::TestMetricRepositoryImport -v Expected: FAIL — AttributeError: 'MetricRepository' object has no attribute 'import_from_yaml'

  • Step 3: Implement import_from_yaml and export_to_yaml

Add to MetricRepository class in src/repositories/metrics.py:

    def import_from_yaml(self, path) -> int:
        """Import metrics from YAML file(s). Returns count imported.

        Args:
            path: Path to a single YAML file or directory containing category/metric.yml files.
        """
        from pathlib import Path
        import yaml

        path = Path(path)
        count = 0

        if path.is_file():
            yml_files = [path]
        elif path.is_dir():
            yml_files = sorted(path.glob("*/*.yml"))
        else:
            return 0

        for yml_file in yml_files:
            try:
                with open(yml_file) as f:
                    data = yaml.safe_load(f)
            except (yaml.YAMLError, OSError):
                continue

            if not data:
                continue

            # Handle list-wrapped metrics (internal repo format)
            metric_list = data if isinstance(data, list) else [data]
            for metric in metric_list:
                if not isinstance(metric, dict) or "name" not in metric:
                    continue

                # Infer category from directory name
                category = metric.get("category", yml_file.parent.name)
                name = metric["name"]
                metric_id = f"{category}/{name}"

                # Map YAML 'table' → DuckDB 'table_name'
                table_name = metric.pop("table", None)

                # Collect sql_by_* variants
                sql_variants = {}
                keys_to_remove = []
                for key in list(metric.keys()):
                    if key.startswith("sql_by_"):
                        variant_name = key[4:]  # "sql_by_channel" → "by_channel"
                        sql_variants[variant_name] = metric[key]
                        keys_to_remove.append(key)
                for key in keys_to_remove:
                    del metric[key]

                self.create(
                    id=metric_id,
                    name=name,
                    display_name=metric.get("display_name", name),
                    category=category,
                    description=metric.get("description"),
                    type=metric.get("type", "sum"),
                    unit=metric.get("unit"),
                    grain=metric.get("grain", "monthly"),
                    table_name=table_name or metric.get("table_name"),
                    tables=metric.get("tables"),
                    expression=metric.get("expression"),
                    time_column=metric.get("time_column"),
                    dimensions=metric.get("dimensions"),
                    filters=metric.get("filters"),
                    synonyms=metric.get("synonyms"),
                    notes=metric.get("notes"),
                    sql=metric.get("sql", ""),
                    sql_variants=sql_variants if sql_variants else None,
                    validation=metric.get("validation"),
                    source="yaml_import",
                )
                count += 1

        return count

    def export_to_yaml(self, output_dir) -> int:
        """Export all metrics to YAML files. Returns count exported."""
        from pathlib import Path
        import yaml
        import json

        output_dir = Path(output_dir)
        count = 0

        for metric in self.list():
            category = metric["category"]
            name = metric["name"]
            cat_dir = output_dir / category
            cat_dir.mkdir(parents=True, exist_ok=True)

            # Build YAML-compatible dict
            data = {
                "name": name,
                "display_name": metric["display_name"],
                "category": category,
            }

            # Map DuckDB 'table_name' back to YAML 'table'
            if metric.get("table_name"):
                data["table"] = metric["table_name"]

            for field in ("description", "type", "unit", "grain", "tables",
                          "expression", "time_column", "dimensions", "filters",
                          "synonyms", "notes"):
                if metric.get(field) is not None:
                    data[field] = metric[field]

            if metric.get("sql"):
                data["sql"] = metric["sql"]

            # Expand sql_variants back to sql_by_* keys
            variants = metric.get("sql_variants")
            if variants:
                if isinstance(variants, str):
                    variants = json.loads(variants)
                for key, val in variants.items():
                    data[f"sql_{key}"] = val

            if metric.get("validation"):
                val = metric["validation"]
                if isinstance(val, str):
                    val = json.loads(val)
                data["validation"] = val

            yml_path = cat_dir / f"{name}.yml"
            yml_path.write_text(yaml.dump(data, default_flow_style=False, allow_unicode=True, sort_keys=False))
            count += 1

        return count
  • Step 4: Run tests to verify they pass

Run: pytest tests/test_metrics.py -v Expected: ALL PASS

  • Step 5: Commit
git add src/repositories/metrics.py tests/test_metrics.py
git commit -m "feat: add YAML import/export to MetricRepository"

Task 4: CLI da metrics

Files:

  • Create: cli/commands/metrics.py

  • Modify: cli/main.py (register metrics_app)

  • Test: tests/test_cli.py (add metrics help test)

  • Step 1: Write failing test for CLI registration

Add to tests/test_cli.py in TestCLIHelp:

    def test_metrics_help(self):
        result = runner.invoke(app, ["metrics", "--help"])
        assert result.exit_code == 0
        assert "list" in result.output
        assert "show" in result.output
        assert "import" in result.output
  • Step 2: Run test to verify it fails

Run: pytest tests/test_cli.py::TestCLIHelp::test_metrics_help -v Expected: FAIL — No such command 'metrics'

  • Step 3: Implement CLI commands

Create cli/commands/metrics.py:

"""Metrics commands — da metrics."""

import json
from pathlib import Path

import typer

from cli.client import api_get

metrics_app = typer.Typer(help="Business metrics — list, show, import, export, validate")


@metrics_app.command("list")
def list_metrics(
    category: str = typer.Option(None, "--category", "-c", help="Filter by category"),
    as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
    """List all metrics."""
    params = {}
    if category:
        params["category"] = category
    resp = api_get("/api/metrics", params=params)
    if resp.status_code != 200:
        typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
        raise typer.Exit(1)

    data = resp.json()
    metrics = data.get("metrics", [])

    if as_json:
        typer.echo(json.dumps(metrics, indent=2))
    else:
        if not metrics:
            typer.echo("No metrics found. Import with: da metrics import docs/metrics/")
            return
        current_cat = None
        for m in metrics:
            if m["category"] != current_cat:
                current_cat = m["category"]
                typer.echo(f"\n  {current_cat.upper()}")
            typer.echo(f"    {m['id']:40s} {m.get('display_name', m['name'])}")
        typer.echo(f"\nTotal: {len(metrics)} metrics")


@metrics_app.command("show")
def show_metric(
    metric_id: str = typer.Argument(..., help="Metric ID (e.g., revenue/mrr)"),
    as_json: bool = typer.Option(False, "--json", help="Output as JSON"),
):
    """Show detail for a metric."""
    resp = api_get(f"/api/metrics/{metric_id}")
    if resp.status_code == 404:
        typer.echo(f"Metric not found: {metric_id}", err=True)
        raise typer.Exit(1)
    if resp.status_code != 200:
        typer.echo(f"Failed: {resp.json().get('detail', resp.text)}", err=True)
        raise typer.Exit(1)

    m = resp.json()
    if as_json:
        typer.echo(json.dumps(m, indent=2))
    else:
        typer.echo(f"  {m['display_name']}  ({m['id']})")
        typer.echo(f"  Type: {m.get('type', 'sum')}  |  Unit: {m.get('unit', '-')}  |  Grain: {m.get('grain', '-')}")
        if m.get("description"):
            typer.echo(f"\n  {m['description']}")
        if m.get("table_name"):
            typer.echo(f"\n  Table: {m['table_name']}")
        if m.get("dimensions"):
            typer.echo(f"  Dimensions: {', '.join(m['dimensions'])}")
        if m.get("notes"):
            typer.echo("\n  Notes:")
            for note in m["notes"]:
                typer.echo(f"    - {note}")
        if m.get("sql"):
            typer.echo(f"\n  SQL:\n    {m['sql']}")


@metrics_app.command("import")
def import_metrics(
    path: str = typer.Argument(..., help="Path to YAML file or directory"),
):
    """Import metrics from YAML into DuckDB."""
    from src.db import get_system_db
    from src.repositories.metrics import MetricRepository

    source = Path(path)
    if not source.exists():
        typer.echo(f"Path not found: {path}", err=True)
        raise typer.Exit(1)

    conn = get_system_db()
    try:
        repo = MetricRepository(conn)
        count = repo.import_from_yaml(source)
        typer.echo(f"Imported {count} metrics into DuckDB")
    finally:
        conn.close()


@metrics_app.command("export")
def export_metrics(
    output_dir: str = typer.Option("./metrics_export", "--dir", "-d", help="Output directory"),
):
    """Export metrics from DuckDB to YAML files."""
    from src.db import get_system_db
    from src.repositories.metrics import MetricRepository

    conn = get_system_db()
    try:
        repo = MetricRepository(conn)
        count = repo.export_to_yaml(output_dir)
        typer.echo(f"Exported {count} metrics to {output_dir}/")
    finally:
        conn.close()


@metrics_app.command("validate")
def validate_metrics():
    """Validate metrics — check that referenced tables exist in analytics DB."""
    from src.db import get_system_db
    from src.repositories.metrics import MetricRepository
    from src.repositories.table_registry import TableRegistryRepository

    conn = get_system_db()
    try:
        metrics_repo = MetricRepository(conn)
        tables_repo = TableRegistryRepository(conn)
        all_tables = {t["id"] for t in tables_repo.list_all()}
        metrics = metrics_repo.list()

        ok = 0
        warnings = 0
        for m in metrics:
            if m.get("table_name") and m["table_name"] not in all_tables:
                typer.echo(f"  WARN  {m['id']}: table '{m['table_name']}' not in registry")
                warnings += 1
            else:
                ok += 1

        typer.echo(f"\nValidated {len(metrics)} metrics: {ok} OK, {warnings} warnings")
    finally:
        conn.close()

Register in cli/main.py — add import and registration:

from cli.commands.metrics import metrics_app
# ...
app.add_typer(metrics_app, name="metrics")
  • Step 4: Run tests to verify they pass

Run: pytest tests/test_cli.py::TestCLIHelp::test_metrics_help -v Expected: PASS

  • Step 5: Commit
git add cli/commands/metrics.py cli/main.py tests/test_cli.py
git commit -m "feat: add da metrics CLI commands (list, show, import, export, validate)"

Task 5: API Endpoints

Files:

  • Create: app/api/metrics.py

  • Modify: app/main.py (register router)

  • Modify: app/api/catalog.py (deprecation redirect)

  • Test: tests/test_api.py (add metrics API tests)

  • Step 1: Write failing tests for metrics API

Add to tests/test_api.py:

class TestMetricsAPI:
    def test_list_metrics_empty(self, seeded_client):
        client, admin_token, _ = seeded_client
        resp = client.get("/api/metrics", headers={"Authorization": f"Bearer {admin_token}"})
        assert resp.status_code == 200
        assert resp.json()["metrics"] == []

    def test_create_and_list_metric(self, seeded_client):
        client, admin_token, _ = seeded_client
        metric = {
            "id": "revenue/mrr",
            "name": "mrr",
            "display_name": "MRR",
            "category": "revenue",
            "sql": "SELECT SUM(amount) FROM subscriptions",
        }
        resp = client.post(
            "/api/admin/metrics",
            json=metric,
            headers={"Authorization": f"Bearer {admin_token}"},
        )
        assert resp.status_code == 201

        resp = client.get("/api/metrics", headers={"Authorization": f"Bearer {admin_token}"})
        assert resp.status_code == 200
        assert len(resp.json()["metrics"]) == 1

    def test_get_metric_detail(self, seeded_client):
        client, admin_token, _ = seeded_client
        client.post(
            "/api/admin/metrics",
            json={"id": "revenue/mrr", "name": "mrr", "display_name": "MRR",
                  "category": "revenue", "sql": "SELECT 1"},
            headers={"Authorization": f"Bearer {admin_token}"},
        )
        resp = client.get("/api/metrics/revenue/mrr", headers={"Authorization": f"Bearer {admin_token}"})
        assert resp.status_code == 200
        assert resp.json()["name"] == "mrr"

    def test_get_metric_not_found(self, seeded_client):
        client, admin_token, _ = seeded_client
        resp = client.get("/api/metrics/nonexistent/x", headers={"Authorization": f"Bearer {admin_token}"})
        assert resp.status_code == 404

    def test_delete_metric(self, seeded_client):
        client, admin_token, _ = seeded_client
        client.post(
            "/api/admin/metrics",
            json={"id": "revenue/mrr", "name": "mrr", "display_name": "MRR",
                  "category": "revenue", "sql": "SELECT 1"},
            headers={"Authorization": f"Bearer {admin_token}"},
        )
        resp = client.delete("/api/admin/metrics/revenue/mrr",
                             headers={"Authorization": f"Bearer {admin_token}"})
        assert resp.status_code == 200

    def test_analyst_cannot_create_metric(self, seeded_client):
        client, _, analyst_token = seeded_client
        resp = client.post(
            "/api/admin/metrics",
            json={"id": "revenue/mrr", "name": "mrr", "display_name": "MRR",
                  "category": "revenue", "sql": "SELECT 1"},
            headers={"Authorization": f"Bearer {analyst_token}"},
        )
        assert resp.status_code == 403

    def test_list_metrics_filter_by_category(self, seeded_client):
        client, admin_token, _ = seeded_client
        for m in [
            {"id": "revenue/mrr", "name": "mrr", "display_name": "MRR", "category": "revenue", "sql": "SELECT 1"},
            {"id": "ops/tickets", "name": "tickets", "display_name": "Tickets", "category": "ops", "sql": "SELECT 1"},
        ]:
            client.post("/api/admin/metrics", json=m, headers={"Authorization": f"Bearer {admin_token}"})
        resp = client.get("/api/metrics?category=revenue", headers={"Authorization": f"Bearer {admin_token}"})
        assert len(resp.json()["metrics"]) == 1
  • Step 2: Run tests to verify they fail

Run: pytest tests/test_api.py::TestMetricsAPI -v Expected: FAIL — 404 on /api/metrics

  • Step 3: Implement API router

Create app/api/metrics.py:

"""Metrics API endpoints."""

from typing import Optional

from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
import duckdb

from app.auth.dependencies import get_current_user, require_admin, _get_db
from src.repositories.metrics import MetricRepository

router = APIRouter(tags=["metrics"])


class MetricCreate(BaseModel):
    id: str
    name: str
    display_name: str
    category: str
    sql: str
    description: Optional[str] = None
    type: str = "sum"
    unit: Optional[str] = None
    grain: str = "monthly"
    table_name: Optional[str] = None
    tables: Optional[list] = None
    expression: Optional[str] = None
    time_column: Optional[str] = None
    dimensions: Optional[list] = None
    filters: Optional[list] = None
    synonyms: Optional[list] = None
    notes: Optional[list] = None
    sql_variants: Optional[dict] = None
    validation: Optional[dict] = None


@router.get("/api/metrics")
async def list_metrics(
    category: Optional[str] = Query(None),
    user: dict = Depends(get_current_user),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    repo = MetricRepository(conn)
    metrics = repo.list(category=category)
    return {"metrics": metrics, "count": len(metrics)}


@router.get("/api/metrics/{metric_id:path}")
async def get_metric(
    metric_id: str,
    user: dict = Depends(get_current_user),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    repo = MetricRepository(conn)
    metric = repo.get(metric_id)
    if not metric:
        raise HTTPException(status_code=404, detail=f"Metric not found: {metric_id}")
    return metric


@router.post("/api/admin/metrics", status_code=201)
async def create_metric(
    body: MetricCreate,
    user: dict = Depends(require_admin),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    repo = MetricRepository(conn)
    return repo.create(**body.model_dump())


@router.delete("/api/admin/metrics/{metric_id:path}")
async def delete_metric(
    metric_id: str,
    user: dict = Depends(require_admin),
    conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
    repo = MetricRepository(conn)
    if not repo.delete(metric_id):
        raise HTTPException(status_code=404, detail=f"Metric not found: {metric_id}")
    return {"status": "deleted", "id": metric_id}

Register in app/main.py — add import and include:

from app.api.metrics import router as metrics_router
# ... (add near other router imports at top of file)

# In create_app(), add before web_router:
app.include_router(metrics_router)

Deprecate old endpoint in app/api/catalog.py — replace the get_metric function:

@router.get("/metrics/{metric_path:path}", deprecated=True)
async def get_metric(
    metric_path: str,
    user: dict = Depends(get_current_user),
):
    """Deprecated: Use GET /api/metrics/{metric_id} instead."""
    from fastapi.responses import RedirectResponse
    # Strip .yml extension for the new endpoint
    metric_id = metric_path.replace(".yml", "")
    return RedirectResponse(url=f"/api/metrics/{metric_id}", status_code=301)
  • Step 4: Run tests to verify they pass

Run: pytest tests/test_api.py::TestMetricsAPI -v Expected: ALL PASS

  • Step 5: Commit
git add app/api/metrics.py app/main.py app/api/catalog.py tests/test_api.py
git commit -m "feat: add metrics API endpoints with admin CRUD"

Task 6: Starter Pack Metrics

Files:

  • Create: docs/metrics/metrics.yml (index)

  • Create: docs/metrics/revenue/mrr.yml

  • Create: docs/metrics/revenue/arr.yml

  • Create: docs/metrics/revenue/churn_rate.yml

  • Create: docs/metrics/product_usage/active_users.yml

  • Create: docs/metrics/product_usage/feature_adoption.yml

  • Create: docs/metrics/sales/new_customers.yml

  • Create: docs/metrics/sales/upsell_expansion.yml

  • Create: docs/metrics/sales/pipeline_value.yml

  • Create: docs/metrics/operations/support_resolution_time.yml

  • Create: docs/metrics/operations/infrastructure_cost.yml

  • Existing: docs/metrics/revenue/total_revenue.yml (already exists, no change)

  • Step 1: Create metrics index

Create docs/metrics/metrics.yml:

version: "2.0"
description: "Business metrics starter pack. Import with: da metrics import docs/metrics/"
categories:
  - name: revenue
    folder: revenue/
    metrics: [total_revenue, mrr, arr, churn_rate]
  - name: product_usage
    folder: product_usage/
    metrics: [active_users, feature_adoption]
  - name: sales
    folder: sales/
    metrics: [new_customers, upsell_expansion, pipeline_value]
  - name: operations
    folder: operations/
    metrics: [support_resolution_time, infrastructure_cost]
  • Step 2: Create revenue metrics

Create docs/metrics/revenue/mrr.yml:

- name: mrr
  display_name: Monthly Recurring Revenue
  category: revenue
  type: sum
  unit: USD
  grain: monthly
  table: subscriptions
  expression: "SUM(mrr_amount)"
  time_column: billing_date
  dimensions:
    - plan_type
    - region
  notes:
    - "Aggregated at company level, not per-contract"
    - "Excludes one-time setup fees and overages"
  synonyms:
    - monthly_revenue
    - recurring_revenue
  sql: |
    SELECT
        DATE_TRUNC('month', billing_date) AS month,
        SUM(mrr_amount) AS mrr
    FROM subscriptions
    WHERE status = 'active'
    GROUP BY 1
    ORDER BY 1    
  sql_by_plan: |
    SELECT
        DATE_TRUNC('month', billing_date) AS month,
        plan_type,
        SUM(mrr_amount) AS mrr
    FROM subscriptions
    WHERE status = 'active'
    GROUP BY 1, 2
    ORDER BY 1, 3 DESC    

Create docs/metrics/revenue/arr.yml:

- name: arr
  display_name: Annual Recurring Revenue
  category: revenue
  type: sum
  unit: USD
  grain: monthly
  table: subscriptions
  expression: "SUM(mrr_amount) * 12"
  time_column: billing_date
  notes:
    - "ARR = MRR × 12 (annualized current MRR)"
    - "Point-in-time snapshot, not forward-looking"
  synonyms:
    - annual_revenue
    - annualized_revenue
  sql: |
    SELECT
        DATE_TRUNC('month', billing_date) AS month,
        SUM(mrr_amount) * 12 AS arr
    FROM subscriptions
    WHERE status = 'active'
    GROUP BY 1
    ORDER BY 1    

Create docs/metrics/revenue/churn_rate.yml:

- name: churn_rate
  display_name: Monthly Churn Rate
  category: revenue
  type: ratio
  unit: percentage
  grain: monthly
  table: subscriptions
  expression: "churned_mrr / beginning_mrr * 100"
  time_column: billing_date
  notes:
    - "Gross revenue churn — does not net out expansion"
    - "Beginning MRR is MRR at start of month"
  synonyms:
    - revenue_churn
    - mrr_churn
  sql: |
    WITH monthly AS (
        SELECT
            DATE_TRUNC('month', cancelled_at) AS month,
            SUM(mrr_amount) AS churned_mrr
        FROM subscriptions
        WHERE status = 'cancelled'
        GROUP BY 1
    ),
    beginning AS (
        SELECT
            DATE_TRUNC('month', billing_date) AS month,
            SUM(mrr_amount) AS beginning_mrr
        FROM subscriptions
        WHERE status = 'active'
        GROUP BY 1
    )
    SELECT
        b.month,
        COALESCE(m.churned_mrr, 0) / NULLIF(b.beginning_mrr, 0) * 100 AS churn_rate
    FROM beginning b
    LEFT JOIN monthly m ON b.month = m.month
    ORDER BY 1    
  • Step 3: Create product_usage metrics

Create docs/metrics/product_usage/active_users.yml:

- name: active_users
  display_name: Monthly Active Users
  category: product_usage
  type: count
  unit: count
  grain: monthly
  table: user_events
  expression: "COUNT(DISTINCT user_id)"
  time_column: event_date
  dimensions:
    - feature
    - plan_type
  synonyms:
    - mau
    - monthly_active
  sql: |
    SELECT
        DATE_TRUNC('month', event_date) AS month,
        COUNT(DISTINCT user_id) AS active_users
    FROM user_events
    GROUP BY 1
    ORDER BY 1    

Create docs/metrics/product_usage/feature_adoption.yml:

- name: feature_adoption
  display_name: Feature Adoption Rate
  category: product_usage
  type: ratio
  unit: percentage
  grain: monthly
  table: user_events
  expression: "users_using_feature / total_active_users * 100"
  time_column: event_date
  dimensions:
    - feature
  synonyms:
    - adoption_rate
    - feature_usage
  sql: |
    WITH feature_users AS (
        SELECT
            DATE_TRUNC('month', event_date) AS month,
            feature,
            COUNT(DISTINCT user_id) AS users_using
        FROM user_events
        GROUP BY 1, 2
    ),
    total AS (
        SELECT
            DATE_TRUNC('month', event_date) AS month,
            COUNT(DISTINCT user_id) AS total_users
        FROM user_events
        GROUP BY 1
    )
    SELECT
        f.month, f.feature,
        f.users_using * 100.0 / NULLIF(t.total_users, 0) AS adoption_pct
    FROM feature_users f
    JOIN total t ON f.month = t.month
    ORDER BY 1, 3 DESC    
  • Step 4: Create sales metrics

Create docs/metrics/sales/new_customers.yml:

- name: new_customers
  display_name: New Customers
  category: sales
  type: count
  unit: count
  grain: monthly
  table: orders
  expression: "COUNT(DISTINCT customer_id)"
  time_column: order_date
  dimensions:
    - channel
    - region
  synonyms:
    - customer_acquisition
    - new_logos
  sql: |
    SELECT
        DATE_TRUNC('month', first_order_date) AS month,
        COUNT(DISTINCT customer_id) AS new_customers
    FROM (
        SELECT customer_id, MIN(order_date) AS first_order_date
        FROM orders
        WHERE status = 'completed'
        GROUP BY 1
    )
    GROUP BY 1
    ORDER BY 1    

Create docs/metrics/sales/upsell_expansion.yml:

- name: upsell_expansion
  display_name: Upsell & Expansion Revenue
  category: sales
  type: sum
  unit: USD
  grain: monthly
  table: subscriptions
  expression: "SUM(CASE WHEN change_type IN ('upgrade','expansion') THEN delta_mrr END)"
  time_column: change_date
  synonyms:
    - expansion_revenue
    - upsell
  sql: |
    SELECT
        DATE_TRUNC('month', change_date) AS month,
        SUM(delta_mrr) AS expansion_mrr
    FROM subscription_changes
    WHERE change_type IN ('upgrade', 'expansion')
    GROUP BY 1
    ORDER BY 1    

Create docs/metrics/sales/pipeline_value.yml:

- name: pipeline_value
  display_name: Pipeline Value
  category: sales
  type: sum
  unit: USD
  grain: monthly
  table: opportunities
  expression: "SUM(deal_value * probability / 100)"
  time_column: expected_close_date
  dimensions:
    - stage
    - owner
  synonyms:
    - weighted_pipeline
    - deal_pipeline
  sql: |
    SELECT
        DATE_TRUNC('month', expected_close_date) AS month,
        SUM(deal_value * probability / 100) AS weighted_pipeline
    FROM opportunities
    WHERE stage NOT IN ('closed_won', 'closed_lost')
    GROUP BY 1
    ORDER BY 1    
  • Step 5: Create operations metrics

Create docs/metrics/operations/support_resolution_time.yml:

- name: support_resolution_time
  display_name: Support Resolution Time
  category: operations
  type: avg
  unit: hours
  grain: monthly
  table: tickets
  expression: "AVG(resolution_hours)"
  time_column: created_at
  dimensions:
    - priority
    - category
  synonyms:
    - resolution_time
    - mttr
    - mean_time_to_resolve
  sql: |
    SELECT
        DATE_TRUNC('month', created_at) AS month,
        AVG(resolution_hours) AS avg_resolution_hours,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY resolution_hours) AS median_hours
    FROM tickets
    WHERE status = 'resolved'
    GROUP BY 1
    ORDER BY 1    

Create docs/metrics/operations/infrastructure_cost.yml:

- name: infrastructure_cost
  display_name: Infrastructure Cost
  category: operations
  type: sum
  unit: USD
  grain: monthly
  table: infra_costs
  expression: "SUM(cost_usd)"
  time_column: billing_month
  dimensions:
    - provider
    - service
    - environment
  synonyms:
    - cloud_cost
    - hosting_cost
    - infra_spend
  sql: |
    SELECT
        billing_month AS month,
        SUM(cost_usd) AS total_cost
    FROM infra_costs
    GROUP BY 1
    ORDER BY 1    
  sql_by_provider: |
    SELECT
        billing_month AS month,
        provider,
        SUM(cost_usd) AS cost
    FROM infra_costs
    GROUP BY 1, 2
    ORDER BY 1, 3 DESC    
  • Step 6: Write import test for starter pack

Add to tests/test_metrics.py:

class TestStarterPack:
    def test_import_starter_pack(self, db_conn):
        from src.repositories.metrics import MetricRepository
        repo = MetricRepository(db_conn)
        starter_dir = Path(__file__).parent.parent / "docs" / "metrics"
        if not starter_dir.exists():
            pytest.skip("Starter pack not found")
        count = repo.import_from_yaml(starter_dir)
        assert count >= 11  # 11 metrics (total_revenue + 10 new)
        assert repo.get("revenue/total_revenue") is not None
        assert repo.get("revenue/mrr") is not None
        assert repo.get("operations/infrastructure_cost") is not None
  • Step 7: Run tests

Run: pytest tests/test_metrics.py::TestStarterPack -v Expected: PASS

  • Step 8: Commit
git add docs/metrics/ tests/test_metrics.py
git commit -m "feat: add 10 starter pack metrics (revenue, usage, sales, operations)"

Task 7: Profiler Integration

Files:

  • Modify: src/profiler.py:1154,1248 (replace load_metrics calls)

  • Test: manual verification (profiler is integration-heavy)

  • Step 1: Add get_table_map fallback to profiler

In src/profiler.py, find the two call sites where load_metrics() is called (lines ~1154 and ~1248). In each location, replace:

    metrics_map = load_metrics(METRICS_YML_PATH)

with:

    # Try DuckDB-backed metrics first, fall back to YAML scan
    metrics_map = _load_metrics_from_db()
    if not metrics_map:
        metrics_map = load_metrics(METRICS_YML_PATH)

Add this helper function near the top of the file (after the imports):

def _load_metrics_from_db() -> Dict[str, List[str]]:
    """Load metrics table map from DuckDB. Returns empty dict on failure."""
    try:
        from src.db import get_system_db
        from src.repositories.metrics import MetricRepository
        conn = get_system_db()
        repo = MetricRepository(conn)
        table_map = repo.get_table_map()
        conn.close()
        return table_map
    except Exception as exc:
        logger.debug("Could not load metrics from DuckDB: %s", exc)
        return {}
  • Step 2: Run existing profiler tests

Run: pytest tests/ -k profiler -v Expected: ALL PASS (existing tests should not break)

  • Step 3: Commit
git add src/profiler.py
git commit -m "feat: profiler reads metrics from DuckDB with YAML fallback"

Task 8: CLAUDE.md Update

Files:

  • Modify: CLAUDE.md

  • Step 1: Add metrics workflow section to CLAUDE.md

After the "## Development" section, add:

## Business Metrics

Standardized metric definitions live in DuckDB (`metric_definitions` table). Import starter pack:

```bash
da metrics import docs/metrics/

For AI agents analyzing data:

Before computing any business metric, look up the canonical definition:

  1. da metrics list — find the relevant metric
  2. da metrics show revenue/mrr — read the SQL and business rules
  3. Use the SQL from the metric definition, adapt to the specific question

Never invent metric calculations — always use the canonical definitions.


- [ ] **Step 2: Commit**

```bash
git add CLAUDE.md
git commit -m "docs: add metrics workflow instructions to CLAUDE.md"

Task 9: Migration Script

Files:

  • Create: scripts/migrate_metrics_to_duckdb.py

  • Step 1: Create standalone migration script

Create scripts/migrate_metrics_to_duckdb.py:

"""Migrate metric YAML files to DuckDB metric_definitions table.

Usage:
    python scripts/migrate_metrics_to_duckdb.py [--metrics-dir docs/metrics]

Idempotent — safe to run repeatedly. Uses UPSERT (ON CONFLICT DO UPDATE).
"""

import argparse
import logging
import sys
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)


def main():
    parser = argparse.ArgumentParser(description="Migrate metric YAMLs to DuckDB")
    parser.add_argument("--metrics-dir", default="docs/metrics", help="Path to metrics directory")
    args = parser.parse_args()

    metrics_dir = Path(args.metrics_dir)
    if not metrics_dir.is_dir():
        logger.error("Metrics directory not found: %s", metrics_dir)
        sys.exit(1)

    from src.db import get_system_db
    from src.repositories.metrics import MetricRepository

    conn = get_system_db()
    try:
        repo = MetricRepository(conn)
        count = repo.import_from_yaml(metrics_dir)
        logger.info("Imported %d metrics from %s", count, metrics_dir)
    finally:
        conn.close()


if __name__ == "__main__":
    main()
  • Step 2: Run it manually to verify

Run: python scripts/migrate_metrics_to_duckdb.py --metrics-dir docs/metrics Expected: Imported N metrics from docs/metrics

  • Step 3: Commit
git add scripts/migrate_metrics_to_duckdb.py
git commit -m "feat: add standalone metric YAML → DuckDB migration script"

Task 10: Final Integration Test

  • Step 1: Run full test suite

Run: pytest tests/ -v --timeout=60 Expected: ALL PASS (no regressions)

  • Step 2: Run OpenAPI snapshot test (if exists)

Run: pytest tests/ -k openapi -v Expected: May need snapshot update if endpoint list changed. If it fails, regenerate with python scripts/generate_openapi.py

  • Step 3: Final commit if any fixes needed
git add -A
git commit -m "fix: address integration test issues"