* fix(rbac): stack-gate analyst table access via data_packages exclusively
Previously analysts could see a table in ``agnes catalog`` /
``/api/sync/manifest`` either by:
1. being in a group with ``resource_grants(group, 'table', id)``, or
2. being in a group with ``resource_grants(group, 'data_package', …)``
for a package containing the table.
Path 1 leaked: admins who minted a per-table grant without ever
wrapping the table in a data_package still shipped the table to
analysts — directly contradicting the unified-stack mental model
("the stack is the unit of access"). User report:
"i když to admin nedal do data package tak to by default uživatelé
dostali to by se nemělo stát".
New policy: analyst visibility is strictly stack-gated. A table is
visible iff at least one data_package containing it is in the
analyst's stack (required ∪ subscribed). Admin god-mode and the three
internal data-source tables (agnes_sessions / _telemetry / _audit
with row-level RBAC) keep their existing carve-outs.
Touched surfaces:
* ``src/rbac.can_access_table`` + ``get_accessible_tables`` —
routed through ``StackResolver.stack(user, DATA_PACKAGE)`` +
``data_package_tables`` join instead of ``resource_grants(table)``.
* ``app/api/sync._build_direct_tables_section`` — always returns
``[]`` (key kept for older CLI destructuring); per-table grants
no longer manifest.
* Standardised 403 detail across ``/api/data/*``, ``/api/query``,
``/api/v2/sample``, ``/api/v2/scan``, ``/api/v2/schema``:
``Table 'X' is not in your stack. Ask an admin to add it to a
Data Package you have access to (Required or in your stack),
then run `agnes pull` to refresh.`` Single source of truth lives
in ``src.rbac.table_not_in_stack_message`` so the wording stays
consistent across CLI surfaces.
UX side: ``/catalog/t/<id>`` (table detail page) dropped the four
editorial sections (Sample questions, What's inside, Things to know,
Pairs well with) per user feedback — the page's job is now
"what is this table, where do I find it" (hero + parent packages).
Tests:
* ``tests/conftest.grant_table_via_package`` / ``revoke_table_via_package``
— shared helpers that wrap a table in an auto-named data_package +
grant the package required to a custom group. Replaces the legacy
per-test ``_grant_table_to_analyst`` table-grant pattern.
* All 17 previously-failing legacy tests (test_access_control,
test_journey_rbac, test_audit_gap_*, test_rbac, …) migrated to use
the new helper; logic stays the same.
* ``tests/fixtures/analyst_bootstrap._grant_table_access`` updated
to wrap via data_package so the ``test_pat`` fixture's "two table
grants" semantics still ship parquets through ``agnes init``.
* New ``tests/test_table_not_in_stack_message.py`` locks in the
standardised 403 detail across the data + check-access endpoints.
5204 tests passing (added 1).
* fix(catalog): first-demo UX feedback — required-first grouping + longer card description
Two minor polish items from the 2026-05-19 stakeholder demo:
1. Required packages cluster at the top of the Browse grid instead of
being interleaved by ``created_at``. Sort key
``(requirement != 'required', name)`` runs before the adapter
call in both /catalog (data_packages) and /corporate-memory
(memory_domains) so the required block is visible without
scrolling. Regression test pins the order via
``data-id="…"`` position in rendered HTML.
2. ``.stack-card__desc`` line clamp bumped 2 → 4 lines. Two-line clamp
trailed almost every admin-authored description off in "…" before
the second clause, forcing a click-through to read it. The detail
page (/catalog/p/<slug>) keeps the unclamped body for longer
content.
* release: 0.55.3 — stack-gated analyst RBAC (BREAKING) + first-demo UX polish + #345 A/B/C/D + #347 UI consistency
277 lines
10 KiB
Python
277 lines
10 KiB
Python
"""GET /api/v2/schema/{table_id} — table column metadata (spec §3.2)."""
|
|
|
|
from __future__ import annotations
|
|
import logging
|
|
import time
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
import duckdb
|
|
|
|
from app.auth.dependencies import get_current_user, _get_db
|
|
from src.audit_helpers import client_kind_from_user
|
|
from src.rbac import can_access_table
|
|
from src.repositories.table_registry import TableRegistryRepository
|
|
from src.repositories.audit import AuditRepository
|
|
from app.api.v2_cache import TTLCache
|
|
from connectors.bigquery.access import BqAccess, BqAccessError, get_bq_access
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/api/v2", tags=["v2"])
|
|
|
|
_schema_cache = TTLCache(maxsize=512, ttl_seconds=3600)
|
|
|
|
|
|
class NotFound(Exception):
|
|
pass
|
|
|
|
|
|
_BQ_DIALECT_HINTS = {
|
|
"date_literal": "DATE '2026-01-01'",
|
|
"timestamp_literal": "TIMESTAMP '2026-01-01 00:00:00 UTC'",
|
|
"interval_subtract": "DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)",
|
|
"regex": "REGEXP_CONTAINS(field, r'pattern')",
|
|
"cast": "CAST(x AS INT64)",
|
|
}
|
|
|
|
|
|
def _fetch_bq_schema(bq, dataset: str, table: str) -> list[dict]:
|
|
"""Fetch column list via the shared ``_fetch_bq_columns_full_impl`` helper.
|
|
|
|
Pre-#155 this had its own INFORMATION_SCHEMA.COLUMNS query; consolidating
|
|
with ``_fetch_bq_table_options`` (now also delegating to the same shared
|
|
SQL) halves the BQ job count on cache miss. Returns the schema-endpoint
|
|
column shape: name / type / nullable / description.
|
|
|
|
Calls the raising variant so BQ exceptions reach ``translate_bq_error``
|
|
with their original type (Forbidden → 502, BadRequest → 400, etc.).
|
|
"""
|
|
from connectors.bigquery.access import _fetch_bq_columns_full_impl, translate_bq_error, BqAccessError
|
|
|
|
try:
|
|
rows = _fetch_bq_columns_full_impl(bq, dataset, table)
|
|
except (ValueError, BqAccessError):
|
|
# ValueError ("unsafe identifier") and BqAccessError propagate
|
|
# unchanged — the endpoint's existing handlers expect those types.
|
|
raise
|
|
except Exception as e:
|
|
# Any other BQ-side exception goes through translate_bq_error so
|
|
# the response status is classified correctly.
|
|
raise translate_bq_error(e, bq.projects, bad_request_status="upstream_error")
|
|
|
|
return [
|
|
{
|
|
"name": r["name"],
|
|
"type": r["type"],
|
|
"nullable": r["nullable"],
|
|
"description": "",
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def _fetch_bq_table_options(bq, dataset: str, table: str) -> dict:
|
|
"""Best-effort fetch of partition/cluster info via the shared
|
|
`fetch_bq_columns_full` helper.
|
|
|
|
Returns ``{}`` on ANY failure (best-effort). Same load-bearing
|
|
contract as before: the /schema endpoint must keep returning 200
|
|
with empty partition info when this fails.
|
|
"""
|
|
from connectors.bigquery.access import fetch_bq_columns_full
|
|
|
|
rows = fetch_bq_columns_full(bq, dataset, table)
|
|
if not rows:
|
|
return {}
|
|
|
|
partition_by = next(
|
|
(r["name"] for r in rows if r["is_partitioning_column"]),
|
|
None,
|
|
)
|
|
clustered_rows = [r for r in rows if r["clustering_ordinal_position"] is not None]
|
|
clustered_rows.sort(key=lambda r: r["clustering_ordinal_position"])
|
|
clustered_by = [r["name"] for r in clustered_rows]
|
|
return {"partition_by": partition_by, "clustered_by": clustered_by}
|
|
|
|
|
|
def build_schema(
|
|
conn: duckdb.DuckDBPyConnection,
|
|
user: dict,
|
|
table_id: str,
|
|
*,
|
|
bq: BqAccess,
|
|
) -> dict:
|
|
# RBAC + existence check MUST run before cache lookup — otherwise an
|
|
# unauthorized user can read cached schema fetched by an authorized one.
|
|
repo = TableRegistryRepository(conn)
|
|
row = repo.get(table_id)
|
|
if not row:
|
|
raise NotFound(table_id)
|
|
|
|
if not can_access_table(user, table_id, conn):
|
|
raise PermissionError(table_id)
|
|
|
|
cached = _schema_cache.get(table_id)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
return build_schema_uncached(conn, table_id, bq=bq, row=row)
|
|
|
|
|
|
def build_schema_uncached(
|
|
conn: duckdb.DuckDBPyConnection,
|
|
table_id: str,
|
|
*,
|
|
bq: BqAccess,
|
|
row: dict | None = None,
|
|
) -> dict:
|
|
"""Build the schema response and populate `_schema_cache`. **Skips
|
|
RBAC and cache-hit short-circuit** — call only from contexts where
|
|
those are unnecessary (warmup) or already enforced upstream
|
|
(`build_schema`).
|
|
|
|
Pass `row` from the upstream caller's `repo.get(table_id)` to avoid
|
|
a redundant DB round-trip; if not provided, `build_schema_uncached`
|
|
fetches it itself (the warmup-direct call site).
|
|
"""
|
|
if row is None:
|
|
repo = TableRegistryRepository(conn)
|
|
row = repo.get(table_id)
|
|
if not row:
|
|
raise NotFound(table_id)
|
|
|
|
source_type = row.get("source_type") or ""
|
|
query_mode = row.get("query_mode") or ""
|
|
if source_type == "internal":
|
|
# Internal data source — schema lives in system.duckdb, not in the
|
|
# parquet/BQ paths the other branches use. Delegate to the
|
|
# connector's own introspector so /api/v2/schema/agnes_sessions
|
|
# returns the same shape as /api/v2/schema/<keboola-table>.
|
|
from connectors.internal.access import get_schema as _get_internal_schema
|
|
from src.db import _get_state_dir
|
|
system_db_path = str(_get_state_dir() / "system.duckdb")
|
|
cols = _get_internal_schema(system_db_path, table_id)
|
|
payload = {
|
|
"table_id": table_id,
|
|
"source_type": source_type,
|
|
"sql_flavor": "duckdb",
|
|
"columns": [
|
|
{"name": c["name"], "type": c["type"], "nullable": c["nullable"], "description": ""}
|
|
for c in cols
|
|
],
|
|
"partition_by": None,
|
|
"clustered_by": [],
|
|
"where_dialect_hints": {},
|
|
}
|
|
# Issue #261: a `source_type='bigquery'` row with `query_mode='materialized'`
|
|
# has the data on local disk as a parquet — same shape as Keboola local
|
|
# tables. Hitting BigQuery INFORMATION_SCHEMA on every schema call was
|
|
# the root cause of the materialized-schema cold-start anomaly observed
|
|
# in the 0.51.0 perf tests (4.6 s vs 1.0 s for remote VIEW). Use the
|
|
# local-parquet branch for any materialized source regardless of
|
|
# `source_type` — the parquet is the source of truth.
|
|
elif source_type == "bigquery" and query_mode != "materialized":
|
|
dataset = row.get("bucket") or ""
|
|
source_table = row.get("source_table") or table_id
|
|
columns = _fetch_bq_schema(bq, dataset, source_table)
|
|
opts = _fetch_bq_table_options(bq, dataset, source_table)
|
|
payload = {
|
|
"table_id": table_id,
|
|
"source_type": source_type,
|
|
"sql_flavor": "bigquery",
|
|
"columns": columns,
|
|
"partition_by": opts.get("partition_by"),
|
|
"clustered_by": opts.get("clustered_by", []),
|
|
"where_dialect_hints": _BQ_DIALECT_HINTS,
|
|
}
|
|
else:
|
|
# Local source — read schema from the parquet via DuckDB
|
|
from app.utils import get_data_dir
|
|
parquet = (
|
|
get_data_dir() / "extracts" / source_type / "data" / f"{table_id}.parquet"
|
|
)
|
|
local_conn = duckdb.connect(":memory:")
|
|
try:
|
|
cols = local_conn.execute(
|
|
"DESCRIBE SELECT * FROM read_parquet(?)", [str(parquet)]
|
|
).fetchall()
|
|
finally:
|
|
local_conn.close()
|
|
payload = {
|
|
"table_id": table_id,
|
|
"source_type": source_type,
|
|
"sql_flavor": "duckdb",
|
|
"columns": [
|
|
{"name": c[0], "type": c[1], "nullable": c[2] == "YES", "description": ""}
|
|
for c in cols
|
|
],
|
|
"partition_by": None,
|
|
"clustered_by": [],
|
|
"where_dialect_hints": {},
|
|
}
|
|
|
|
_schema_cache.set(table_id, payload)
|
|
return payload
|
|
|
|
|
|
@router.get("/schema/{table_id}")
|
|
def schema(
|
|
table_id: str,
|
|
user: dict = Depends(get_current_user),
|
|
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
|
|
bq: BqAccess = Depends(get_bq_access),
|
|
):
|
|
# Plain ``def`` — opens a `bq.duckdb_session()` and runs sync metadata
|
|
# queries through the BQ extension. See PR #188 Tier 1 entry.
|
|
t0 = time.monotonic()
|
|
resource = f"table:{table_id}"[:256]
|
|
try:
|
|
result = build_schema(conn, user, table_id, bq=bq)
|
|
try:
|
|
AuditRepository(conn).log(
|
|
user_id=user.get("id"),
|
|
action="catalog.schema",
|
|
resource=resource,
|
|
params={"duration_ms": int((time.monotonic() - t0) * 1000)},
|
|
result="success",
|
|
client_kind=client_kind_from_user(user),
|
|
)
|
|
except Exception:
|
|
logger.exception("audit_log write failed for catalog.schema; continuing")
|
|
return result
|
|
except (NotFound, PermissionError, ValueError, BqAccessError) as exc:
|
|
try:
|
|
if isinstance(exc, NotFound):
|
|
status_code = 404
|
|
elif isinstance(exc, PermissionError):
|
|
status_code = 403
|
|
elif isinstance(exc, ValueError):
|
|
status_code = 400
|
|
else:
|
|
status_code = BqAccessError.HTTP_STATUS.get(exc.kind, 500) # type: ignore[union-attr]
|
|
AuditRepository(conn).log(
|
|
user_id=user.get("id"),
|
|
action="catalog.schema",
|
|
resource=resource,
|
|
params={"duration_ms": int((time.monotonic() - t0) * 1000),
|
|
"error": str(exc)[:200]},
|
|
result=f"error.{status_code}",
|
|
client_kind=client_kind_from_user(user),
|
|
)
|
|
except Exception:
|
|
logger.exception("audit_log write failed on error path for catalog.schema; continuing")
|
|
if isinstance(exc, NotFound):
|
|
raise HTTPException(status_code=404, detail=f"table {table_id!r} not found")
|
|
if isinstance(exc, PermissionError):
|
|
from src.rbac import table_not_in_stack_message
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=table_not_in_stack_message(table_id),
|
|
)
|
|
if isinstance(exc, ValueError):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={"error": "unsafe_identifier", "message": str(exc), "details": {}},
|
|
)
|
|
raise HTTPException(
|
|
status_code=BqAccessError.HTTP_STATUS.get(exc.kind, 500), # type: ignore[union-attr]
|
|
detail={"error": exc.kind, "message": exc.message, "details": exc.details}, # type: ignore[union-attr]
|
|
)
|