diff --git a/CHANGELOG.md b/CHANGELOG.md index 468870f..6c2ab3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,59 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C ## [Unreleased] +## [0.54.29] — 2026-05-19 + +### Added +- **`table_registry.bq_fqn` column** (schema v51, issue #343) — optional + fully-qualified BigQuery path (`project.dataset.table`) that decouples + the UX/RBAC `bucket` label from the physical BQ dataset name. Pre-v51 + the orchestrator constructed the rebuild path as + `{remote_attach.project}.{bucket}.{source_table}`, which coupled + package naming to BQ storage layout — renaming a package broke its + tables and ad-hoc proxy datasets were needed when the UX name + differed from the dataset name. With `bq_fqn` set, the extractor + takes the project / dataset / table directly from the field; rows + without it use the legacy path (backwards-compatible). +- **`data_source.bigquery.location`** is now strongly recommended in + `instance.yaml` (`/admin/server-config`). When unset on a cross- + project setup, metadata-cache region resolution falls back to a + REST `dataset.get()` per metadata refresh that requires + `bigquery.datasets.get` IAM (often missing from data-viewer-only + SAs) and silently returns "provider returned no data" when it 404s. + Setting `location` (e.g. `us-central1` or `EU`) skips the REST hop + entirely. The `_resolve_bq_location` warning now points at this + config key explicitly. +- **Startup config check** (`connectors.bigquery.access.validate_bigquery_startup_config`) + surfaces two common BQ misconfigs in the boot log: cross-project + setup with `location` unset, and a warehouse-like data project + with no `billing_project` override (which silently bills to the + warehouse, where the SA usually lacks `serviceusage.services.use`). + Non-fatal warnings only — never blocks startup. +- **`POST /api/admin/register-table`** and **`PUT /api/admin/registry/{id}`** + accept `bq_fqn`. Malformed values are rejected at the API boundary + (422) instead of landing in the registry and breaking the next + rebuild silently. + +### Internal +- **Schema v51** — adds nullable `table_registry.bq_fqn VARCHAR`; + existing rows default to `NULL` and use the legacy + `bucket + source_table` path (backwards-compatible, no backfill). +- New test suite `tests/test_bq_fqn.py` (25 cases): `parse_bq_fqn` + unit matrix, extractor override paths (same-project VIEW + cross- + project VIEW success + cross-project BASE TABLE skip), orchestrator + drift sync, startup-validator heuristic, admin Pydantic models. + ### Changed +- **`SyncOrchestrator.rebuild()` self-heals BQ `_remote_attach.url` + drift**. When an admin edits `data_source.bigquery.project` in + `/admin/server-config`, the overlay is the source of truth but the + on-disk `extract.duckdb._remote_attach.url` would stay frozen at + the old project until the next BQ register/sync trigger — silently + routing every remote BQ query to the previous project (manifests as + `Dataset not found in ` errors even though the admin + UI shows the corrected project). The orchestrator now compares the + two at every rebuild and, if they differ, calls + `rebuild_from_registry()` to regenerate the extract. - Setup script no longer auto-creates the workspace folder. Step 2 of the pasted prompt now runs `pwd`, compares it to `$HOME/` (the folder the /home page's visible Step 3 told the user to create diff --git a/app/api/admin.py b/app/api/admin.py index 697853b..5d7eaed 100644 --- a/app/api/admin.py +++ b/app/api/admin.py @@ -1397,6 +1397,22 @@ class RegisterTableRequest(BaseModel): partition_by: Optional[str] = None partition_granularity: Optional[str] = None initial_load_chunk_days: Optional[int] = None + # v51 — fully-qualified BigQuery path. When set on a BigQuery row, + # the extractor uses ``project.dataset.table`` from this field instead + # of constructing the path from ``bucket`` + ``source_table`` against + # the globally-attached project. Decouples UX/RBAC ``bucket`` label + # from physical BQ dataset (issue #343). Format ``project.dataset.table``; + # validated by ``connectors.bigquery.extractor.parse_bq_fqn``. + bq_fqn: Optional[str] = Field( + default=None, + description=( + "Fully-qualified BigQuery path (``project.dataset.table``). " + "Only applies to source_type='bigquery'. When set, overrides " + "the legacy bucket+source_table path construction. Use this " + "to register a table whose BQ dataset name differs from the " + "Agnes ``bucket`` label (issue #343)." + ), + ) @model_validator(mode="after") def _check_mode_query_coherence(self): @@ -1911,6 +1927,10 @@ class UpdateTableRequest(BaseModel): partition_by: Optional[str] = None partition_granularity: Optional[str] = None initial_load_chunk_days: Optional[int] = None + # v51 — see RegisterTableRequest.bq_fqn. PUT lets an admin add or + # clear bq_fqn on an existing row (cleared via explicit `null`, + # per the PUT shape contract documented on the handler below). + bq_fqn: Optional[str] = None @field_validator("sync_strategy", mode="before") @classmethod @@ -2454,6 +2474,22 @@ def register_table( # deprecated and inert at the runtime layer. The DB column keeps its # schema default; the registry response no longer reflects request # values for this flag. + # v51 — validate bq_fqn upfront. The extractor would catch a malformed + # value at next rebuild and skip the row, but failing at register time + # gives the admin a clean 422 with the specific complaint instead of + # a silent "table registered but never materialized" state. + if request.bq_fqn is not None and request.source_type != "bigquery": + raise HTTPException( + status_code=422, + detail="bq_fqn only applies to source_type='bigquery'", + ) + if request.bq_fqn is not None: + from connectors.bigquery.extractor import parse_bq_fqn + try: + parse_bq_fqn(request.bq_fqn) + except ValueError as e: + raise HTTPException(status_code=422, detail=str(e)) + repo.register( id=table_id, name=request.name, @@ -2477,6 +2513,7 @@ def register_table( partition_by=request.partition_by, partition_granularity=request.partition_granularity, initial_load_chunk_days=request.initial_load_chunk_days, + bq_fqn=request.bq_fqn, ) # Audit entry — masked params; description kept raw (it's documentation). @@ -2824,6 +2861,25 @@ async def update_table( merged["profile_after_sync"] = synthetic.profile_after_sync merged["source_query"] = synthetic.source_query + # v51 — same bq_fqn validation as register-table. PUT can both + # add a fresh bq_fqn or update an existing one; in either case + # malformed values should reject at PUT time, not silently + # land in the DB and break the next rebuild. + if merged.get("bq_fqn"): + from connectors.bigquery.extractor import parse_bq_fqn + try: + parse_bq_fqn(merged["bq_fqn"]) + except ValueError as e: + raise HTTPException(status_code=422, detail=str(e)) + else: + # Non-BQ row carrying bq_fqn is nonsensical — reject the same + # way register-table does. + if merged.get("bq_fqn"): + raise HTTPException( + status_code=422, + detail="bq_fqn only applies to source_type='bigquery'", + ) + repo.register(id=table_id, **merged) AuditRepository(conn).log( diff --git a/app/main.py b/app/main.py index 2835547..cbe95c4 100644 --- a/app/main.py +++ b/app/main.py @@ -199,6 +199,18 @@ async def lifespan(app): except Exception: logger.exception("startup FTS index rebuild failed; falling back to ILIKE on /api/memory?search=") + # Surface BQ config gaps at startup so the operator sees them in the + # boot log instead of as cryptic "provider returned no data" / + # "403 serviceusage" later. Issue #343 — these are the same gaps that + # made every remote BQ query on foundryai-prod fail silently mid-May + # 2026. Non-fatal: warnings only, no startup abort. + try: + from connectors.bigquery.access import validate_bigquery_startup_config + for warning in validate_bigquery_startup_config(): + logger.warning("BQ config check: %s", warning) + except Exception: + logger.exception("BQ startup config validation crashed (non-fatal)") + # Seed admin user (SEED_ADMIN_EMAIL) and add them to the Admin user_group. # Optional SEED_ADMIN_PASSWORD lets the seeded user sign in immediately # without going through bootstrap; never overwritten if already set. diff --git a/connectors/bigquery/access.py b/connectors/bigquery/access.py index a2e7338..08dbc05 100644 --- a/connectors/bigquery/access.py +++ b/connectors/bigquery/access.py @@ -12,7 +12,7 @@ import threading from collections import deque from contextlib import contextmanager from dataclasses import dataclass -from typing import Callable, Iterator, Literal +from typing import Callable, Iterator, List, Literal logger = logging.getLogger(__name__) @@ -738,3 +738,74 @@ def get_bq_access() -> BqAccess: billing = data return BqAccess(BqProjects(billing=billing, data=data)) + + +def validate_bigquery_startup_config() -> List[str]: + """Surface common config gaps that only fail at first BQ call (not at boot). + + Returns a list of warning strings (empty when nothing notable). Caller + typically logs each at WARNING and continues — startup never blocks on + config quality issues, only on hard schema problems. + + Checks (in order, each independent): + + 1. Cross-project setup (``project`` ≠ ``billing_project``) without + ``location`` set. The region-scoped metadata path + (``_fetch_via_table_storage`` in metadata.py) falls back to + ``client.get_dataset()`` per-table on every cache refresh when + ``location`` is unset, which works for some IAM shapes but silently + fails with ``"provider returned no data"`` for others (the + on-disk symptom from issue #343). Setting + ``data_source.bigquery.location`` to the dataset's region makes the + fast path deterministic. + + 2. ``billing_project`` defaulted to ``project`` while the two values + suggest a cross-project setup (project name contains "data" or + "dataview", billing name contains "ai" or "foundryai" — heuristic). + Almost-always-wrong combo: pre-fix the SA on ``project`` lacks + ``serviceusage.services.use`` and every query 502s. We can't be + sure, so we warn rather than reject. + + Lives in this module (not app/main.py) so the SDK / CLI / scripts that + use ``BqAccess`` outside the FastAPI process can call it too. + """ + warnings: List[str] = [] + try: + from app.instance_config import get_value + except Exception: + return warnings # config layer not available — likely test harness + project = (get_value("data_source", "bigquery", "project") or "").strip() + billing = (get_value("data_source", "bigquery", "billing_project") or "").strip() + location = (get_value("data_source", "bigquery", "location") or "").strip() + if not project: + return warnings # BQ not configured — nothing to check + effective_billing = billing or project + if effective_billing != project and not location: + warnings.append( + f"data_source.bigquery.project={project!r} differs from " + f"billing_project={effective_billing!r} (cross-project setup) " + f"but data_source.bigquery.location is not set. The metadata " + f"cache will fall back to per-table REST dataset.get() and may " + f"silently return 'provider returned no data' for some IAM " + f"shapes. Set data_source.bigquery.location (e.g. 'us-central1' " + f"or 'EU') to the region where the dataset lives — see issue " + f"#343." + ) + if not billing and project: + # Heuristic detection of the common cross-project mistake: data + # project named like a warehouse, project the SA actually lives in + # named like the app. The user typically wants billing_project to + # equal the SA's home project. + proj_low = project.lower() + warehouse_like = any(s in proj_low for s in ("dataview", "warehouse", "datalake", "-dw-", "-data-")) + if warehouse_like: + warnings.append( + f"data_source.bigquery.project={project!r} looks like a " + f"shared data warehouse but billing_project is unset, so " + f"jobs will bill to {project!r}. If the service account " + f"doesn't have serviceusage.services.use on {project!r}, " + f"every query will fail with 403. Set " + f"data_source.bigquery.billing_project to the SA's home " + f"project — see issue #343." + ) + return warnings diff --git a/connectors/bigquery/extractor.py b/connectors/bigquery/extractor.py index 96d4e60..f011cf5 100644 --- a/connectors/bigquery/extractor.py +++ b/connectors/bigquery/extractor.py @@ -26,6 +26,49 @@ from src.identifier_validation import validate_identifier, validate_quoted_ident logger = logging.getLogger(__name__) + +def parse_bq_fqn(value: Optional[str]) -> Optional[tuple]: + """Parse a ``project.dataset.table`` fully-qualified BigQuery name. + + Returns ``(project, dataset, table)`` on success, ``None`` if ``value`` + is empty / None, or raises ``ValueError`` with a descriptive message + if the string is non-empty but malformed (wrong segment count, empty + segments, or any segment that fails the BQ identifier validators). + + Distinguishes "not set" (None / empty -> return None, caller falls + back to legacy bucket+source_table path) from "set but invalid" (raise + -> caller surfaces a registration error). Silently treating malformed + values as missing would let an admin typo land in the registry and + then rebuild against the legacy path, hiding the typo until query + time. + """ + if not value: + return None + parts = value.split(".") + if len(parts) != 3 or not all(p for p in parts): + raise ValueError( + f"malformed bq_fqn {value!r}: expected exactly three non-empty " + f"segments 'project.dataset.table'" + ) + project, dataset, table = parts + if not _validate_project_id(project): + raise ValueError( + f"malformed bq_fqn {value!r}: project {project!r} fails BQ " + f"project-id grammar" + ) + if not validate_quoted_identifier(dataset, "BQ dataset"): + raise ValueError( + f"malformed bq_fqn {value!r}: dataset {dataset!r} fails BQ " + f"identifier grammar" + ) + if not validate_quoted_identifier(table, "BQ table"): + raise ValueError( + f"malformed bq_fqn {value!r}: table {table!r} fails BQ " + f"identifier grammar" + ) + return (project, dataset, table) + + # Serializes the body of `init_extract` across threads so two concurrent # materialize calls (e.g. the synchronous timeout-fallback BackgroundTask # kicking in while the original daemon thread is still running) can't both @@ -245,6 +288,7 @@ def _detect_table_type( project: str, dataset: str, table: str, + billing_project: str | None = None, ) -> str | None: """Return BQ entity type for `project.dataset.table`. @@ -252,18 +296,34 @@ def _detect_table_type( API — works on tables, views, and materialized views alike. Returns the value of INFORMATION_SCHEMA.TABLES.table_type ('BASE TABLE', 'VIEW', 'MATERIALIZED_VIEW') or None if not found. + + Args: + project: Data project (where the entity lives — appears in the + INFORMATION_SCHEMA FROM clause). + dataset, table: Identify the BQ entity. + billing_project: Project whose SA quota pays for the lookup job and + against which `serviceusage.services.use` is checked. When ``None`` + (default), bills against ``project`` — fine for same-project + lookups. Cross-project callers MUST pass ``billing_project`` to + the extractor's billing project explicitly; the data-side SA + typically has only ``bigquery.dataViewer`` on ``project`` and lacks + ``serviceusage.services.use`` there, so reusing ``project`` for + billing 403s and the caller's broad ``except Exception`` silently + drops the row. """ + if billing_project is None: + billing_project = project bq_sql = ( f"SELECT table_type FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLES` " f"WHERE table_name = ? LIMIT 1" ) - # Parameter-bind project (1st arg of bigquery_query), the inner BQ SQL - # (2nd arg), and the table-name predicate. This avoids the nested-quote + # Parameter-bind billing_project (1st arg of bigquery_query), the inner BQ + # SQL (2nd arg), and the table-name predicate. This avoids the nested-quote # bug where inline `'{table}'` would close the outer `bigquery_query('...')` # string. Note: bigquery_query forwards extra positional args as BQ query # parameters, bound positionally to the `?` placeholders inside `bq_sql`. duck_sql = "SELECT * FROM bigquery_query(?, ?, ?)" - row = conn.execute(duck_sql, [project, bq_sql, table]).fetchone() + row = conn.execute(duck_sql, [billing_project, bq_sql, table]).fetchone() return row[0] if row else None @@ -548,8 +608,35 @@ def _init_extract_locked( continue table_name = tc["name"] - dataset = tc.get("bucket", "") - source_table = tc.get("source_table", table_name) + # v51: if ``bq_fqn`` is set on the registry row, it overrides the + # legacy bucket+source_table+project_id triplet. ``bq_fqn`` + # decouples the UX/RBAC ``bucket`` label from the physical BQ + # dataset name (issue #343). Missing/empty bq_fqn falls back to + # the legacy path — backwards compat for pre-v51 registrations. + raw_fqn = tc.get("bq_fqn") + try: + parsed_fqn = parse_bq_fqn(raw_fqn) + except ValueError as e: + stats["errors"].append({"table": table_name, "error": str(e)}) + continue + if parsed_fqn is not None: + fqn_project, dataset, source_table = parsed_fqn + # Cross-project bq_fqn: extractor ATTACHed `bq` against + # `project_id` (the overlay's data project). When + # bq_fqn.project differs, the BASE TABLE path via the bq + # alias would silently route to the wrong project. The + # VIEW path goes through ``bigquery_query(billing, …)`` + # which takes its own billing arg, so cross-project works + # there — but BASE TABLE we skip with a clear warning + # rather than serve wrong-source data. Multi-ATTACH per + # distinct project is the proper fix (follow-up; see PR + # description). + cross_project = fqn_project != project_id + else: + dataset = tc.get("bucket", "") + source_table = tc.get("source_table", table_name) + fqn_project = project_id + cross_project = False # #81 Group D — refuse rows with unsafe identifiers. Same # rationale as the keboola extractor: registry is admin-controlled @@ -573,10 +660,21 @@ def _init_extract_locked( continue try: - entity_type = _detect_table_type(conn, project_id, dataset, source_table) + # Cross-project rows MUST bill against ``project_id`` (the + # extractor's billing project where the SA has + # ``serviceusage.services.use``). Passing ``fqn_project`` for + # both data + billing 403s on cross-project setups, the + # broad ``except Exception`` below silently drops the row, + # and the cross-project VIEW path at line ~696 never + # executes. (Same-project rows: ``fqn_project == project_id`` + # so this is a no-op.) + entity_type = _detect_table_type( + conn, fqn_project, dataset, source_table, + billing_project=project_id, + ) if entity_type is None: raise RuntimeError( - f"BQ entity {project_id}.{dataset}.{source_table} not found" + f"BQ entity {fqn_project}.{dataset}.{source_table} not found" ) # Issue #160: always create a master view for query_mode='remote' @@ -588,6 +686,18 @@ def _init_extract_locked( # logged + skipped, with NO _meta row, since orchestrator-side # master-view creation requires a corresponding inner view. if entity_type == "BASE TABLE": + if cross_project: + # bq_fqn points to a different project than the + # ATTACH alias — see comment above. Skip with a + # diagnostic instead of serving wrong-source data. + logger.warning( + "bq_fqn project mismatch for BASE TABLE %s: " + "bq_fqn=%s, extractor ATTACHed to %s. Master " + "view skipped — multi-ATTACH follow-up needed " + "or register a same-project proxy view.", + table_name, fqn_project, project_id, + ) + continue view_sql = ( f'CREATE OR REPLACE VIEW "{table_name}" AS ' f'SELECT * FROM bq."{dataset}"."{source_table}"' @@ -595,12 +705,20 @@ def _init_extract_locked( conn.execute(view_sql) elif entity_type in ("VIEW", "MATERIALIZED_VIEW"): # `dataset` and `source_table` are validated above by - # validate_quoted_identifier; project_id is validated at - # the entry boundary of init_extract (lines 152-160). + # validate_quoted_identifier; ``fqn_project`` is either + # the entry-validated ``project_id`` or comes from + # ``parse_bq_fqn`` which re-validates the project + # segment via ``_validate_project_id``. # The .replace("'", "''") is defense-in-depth on the # inline literal. - bq_inner = f"SELECT * FROM `{project_id}.{dataset}.{source_table}`" + bq_inner = f"SELECT * FROM `{fqn_project}.{dataset}.{source_table}`" bq_inner_escaped = bq_inner.replace("'", "''") + # Billing project stays ``project_id`` (the extractor's + # ATTACH project) — that's the project whose SA quota + # pays for the job. ``fqn_project`` is the data project + # (where the table lives); ``bigquery_query`` reads + # cross-project just fine when the SA on ``project_id`` + # has BQ Data Viewer on ``fqn_project``. view_sql = ( f'CREATE OR REPLACE VIEW "{table_name}" AS ' f"SELECT * FROM bigquery_query('{project_id}', '{bq_inner_escaped}')" @@ -615,7 +733,7 @@ def _init_extract_locked( "Unverified BQ entity_type %r for %s.%s.%s — master view skipped. " "Use `agnes snapshot create` for this row, or file an issue with " "a repro to request native support.", - entity_type, project_id, dataset, source_table, + entity_type, fqn_project, dataset, source_table, ) continue # Do NOT insert _meta — no inner view to point at. @@ -626,7 +744,7 @@ def _init_extract_locked( stats["tables_registered"] += 1 logger.info( "Registered remote view: %s -> %s.%s.%s (%s)", - table_name, project_id, dataset, source_table, entity_type, + table_name, fqn_project, dataset, source_table, entity_type, ) except Exception as e: logger.error("Failed to register %s: %s", table_name, e) diff --git a/connectors/bigquery/metadata.py b/connectors/bigquery/metadata.py index d67c866..ab0071f 100644 --- a/connectors/bigquery/metadata.py +++ b/connectors/bigquery/metadata.py @@ -156,7 +156,17 @@ def _fetch_rows_and_size(bq, req: MetadataRequest) -> dict | None: def _resolve_bq_location(bq, req: MetadataRequest) -> str | None: - """instance.yaml.location → REST get_dataset → None.""" + """instance.yaml.location → REST get_dataset → None. + + The REST fallback is best-effort: it requires the SA to have + ``bigquery.datasets.get`` on the data project. Most cross-project + setups grant ``bigquery.tables.get`` (data viewer) but NOT dataset- + level metadata, so this 404s silently for the exact deployments + that most need region detection. Configuring + ``data_source.bigquery.location`` skips the REST round-trip entirely + and makes the path deterministic — strongly recommended for any + non-trivial setup. Issue #343. + """ cfg_location = (get_value("data_source", "bigquery", "location") or "").strip() if cfg_location: return cfg_location @@ -167,7 +177,11 @@ def _resolve_bq_location(bq, req: MetadataRequest) -> str | None: return ds.location except Exception as e: logger.warning( - "BQ dataset.get failed for %s.%s — falling back to __TABLES__: %s", + "BQ dataset.get fell back for %s.%s: %s. To skip this REST " + "round-trip on every metadata refresh (and silence cases " + "where the SA lacks bigquery.datasets.get), set " + "data_source.bigquery.location in /admin/server-config to the " + "dataset's region (e.g. 'us-central1' or 'EU').", bq.projects.data, req.bucket, e, ) return None diff --git a/docs/architecture.md b/docs/architecture.md index 27fa604..8141994 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -214,7 +214,7 @@ Files NOT to modify: `connectors/jira/file_lock.py`, `connectors/jira/transform. ### system.duckdb — `{DATA_DIR}/state/system.duckdb` -Current schema version: **19** (auto-migrated from any earlier version on startup — see `src/db.py`). +Current schema version: **51** (auto-migrated from any earlier version on startup — see `src/db.py`). | Table | Purpose | |-------|---------| diff --git a/pyproject.toml b/pyproject.toml index af4be5d..1be6d70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "agnes-the-ai-analyst" -version = "0.54.28" +version = "0.54.29" description = "Agnes — AI Data Analyst platform for AI analytical systems" requires-python = ">=3.11,<3.14" license = "MIT" diff --git a/src/db.py b/src/db.py index 19259b4..9b127e5 100644 --- a/src/db.py +++ b/src/db.py @@ -40,7 +40,7 @@ def _maybe_instrument(con, db_tag: str): _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") -SCHEMA_VERSION = 50 +SCHEMA_VERSION = 51 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -313,7 +313,13 @@ CREATE TABLE IF NOT EXISTS table_registry ( where_filters VARCHAR, partition_by VARCHAR, partition_granularity VARCHAR, - initial_load_chunk_days INTEGER + initial_load_chunk_days INTEGER, + -- v51: fully-qualified BigQuery path (`project.dataset.table`) for + -- BigQuery rows. When set, decouples the UX/RBAC `bucket` label from + -- the physical BQ dataset name; rows without it fall back to the + -- legacy `..` path. + -- Issue #343. + bq_fqn VARCHAR ); CREATE TABLE IF NOT EXISTS table_profiles ( @@ -3249,6 +3255,18 @@ def _v49_to_v50_migrate(conn: duckdb.DuckDBPyConnection) -> None: ) +_V50_TO_V51_MIGRATIONS = [ + # ``bq_fqn`` carries the fully-qualified BigQuery path + # (``project.dataset.table``) for a registered remote table when set, + # so the orchestrator's rebuild path no longer has to reconstruct it + # from the globally-attached ``_remote_attach`` project + the dual- + # purpose ``bucket`` field (which is also a UX/RBAC label). + # Nullable for backwards compat — rows without it keep using the + # legacy ``..`` fallback. + "ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS bq_fqn VARCHAR", +] + + _V33_TO_V34_MIGRATIONS = [ # DuckDB blocks DROP COLUMN while indexes reference the table # ("Dependency Error: Cannot alter entry … because there are entries @@ -3705,6 +3723,9 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: _v48_to_v49_migrate(conn) if current < 50: _v49_to_v50_migrate(conn) + if current < 51: + for sql in _V50_TO_V51_MIGRATIONS: + conn.execute(sql) conn.execute( "UPDATE schema_version SET version = ?, applied_at = current_timestamp", [SCHEMA_VERSION], diff --git a/src/orchestrator.py b/src/orchestrator.py index 1e95556..fa48976 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -129,6 +129,92 @@ class SyncOrchestrator: _capture_orchestrator_exception(exc, op="rebuild_source", source=source_name) raise + def _sync_bq_remote_attach_with_overlay(self, extracts_dir: Path) -> None: + """Detect drift in BQ extract.duckdb's ``_remote_attach.url`` and + rewrite the extract when it disagrees with the overlay project. + + Operational hazard this closes (issue #343, observed on Foundry AI + 2026-05-19): an admin updates ``data_source.bigquery.project`` via + ``POST /api/admin/server-config`` (overlay write), but the BQ + ``extract.duckdb`` keeps the previously-baked ``project=`` + in its ``_remote_attach`` row. The next rebuild ATTACHes the OLD + project, queries against datasets that don't exist there, and the + error message points at the old project — confusing operators + who just changed the config. + + Fix: at every rebuild, read the BQ extract's ``_remote_attach.url``, + compare against the overlay's ``data_source.bigquery.project``, and + if they differ, call ``rebuild_from_registry`` to regenerate the + extract. The regeneration path is the same one ``register-table`` + uses, so its semantics are well-tested. + + No-op preconditions (any one short-circuits to silent return): + - no BQ extract directory on disk (instance never had BQ) + - extract.duckdb missing (extracted-but-failed state) + - overlay project unset (BQ not configured yet — first-time + setup, not drift) + - no ``_remote_attach`` table in the extract (legacy / non-BQ + extract, e.g. a future "bigquery" name collision with a local + connector) + - existing url matches overlay (no drift) + """ + bq_extract = extracts_dir / "bigquery" / "extract.duckdb" + if not bq_extract.exists(): + return + try: + from app.instance_config import get_value + except Exception: + return + overlay_project = (get_value("data_source", "bigquery", "project") or "").strip() + if not overlay_project: + return + # Read-only handle, separate connection — orchestrator's rebuild + # connection is per-call and hasn't ATTACHed extracts yet at + # this pre-pass point, so this won't fight a file lock. + try: + ro = duckdb.connect(str(bq_extract), read_only=True) + except Exception: + return + try: + row = ro.execute( + "SELECT url FROM _remote_attach WHERE alias='bq'" + ).fetchone() + except Exception: + row = None + finally: + try: + ro.close() + except Exception: + pass + if not row or not row[0]: + return + current_url = row[0] + expected_url = f"project={overlay_project}" + if current_url == expected_url: + return + logger.info( + "BQ remote_attach drift detected: extract.duckdb has %r, " + "overlay has %r — regenerating extract via " + "rebuild_from_registry()", + current_url, expected_url, + ) + try: + from connectors.bigquery.extractor import rebuild_from_registry + result = rebuild_from_registry() + logger.info( + "BQ remote_attach drift sync: regenerated extract — " + "tables_registered=%s errors=%s", + result.get("tables_registered"), + len(result.get("errors", [])), + ) + except Exception as e: + logger.warning( + "BQ remote_attach drift sync: rebuild_from_registry() " + "failed: %s — extract.duckdb still points at %r, queries " + "will fail until next manual sync", + e, current_url, + ) + def _scan_meta_pairs(self, extracts_dir: Path) -> tuple: """Read every connector's `_meta` and return (pairs, clean) where: @@ -185,6 +271,30 @@ class SyncOrchestrator: logger.warning("Extracts directory %s does not exist", extracts_dir) return {} + # Pre-pass: detect drift between extract.duckdb _remote_attach.url + # (where the orchestrator's ATTACH path will read the BQ project + # from) and the overlay's data_source.bigquery.project (the + # writable source of truth, edited via admin /server-config). If + # they differ, regenerate the BQ extract so the new project + # propagates into views before we run the main rebuild loop. + # No-op when there is no BQ extract or no overlay project. See + # issue #343 for the operational hazard this closes (admin + # changes project in the UI, extract.duckdb stays stale, all + # remote queries fail with "Dataset not found in "). + try: + self._sync_bq_remote_attach_with_overlay(extracts_dir) + except Exception as e: + # Defensive: drift sync is a best-effort safety net. A failure + # here must not block the rest of the rebuild — the worst + # case is the same stale-extract failure mode the sync was + # trying to prevent, which the operator can still resolve + # manually via /admin/sync trigger. + logger.warning( + "BQ remote_attach drift sync failed: %s — continuing with " + "existing extract.duckdb (queries may fail until next " + "manual sync if project drifted)", e, + ) + # Issue #81 Group C — load view ownership map from system DB so we # can detect cross-connector view-name collisions during this # rebuild and refuse to silently overwrite a previously-claimed diff --git a/src/repositories/table_registry.py b/src/repositories/table_registry.py index ac6776d..d382190 100644 --- a/src/repositories/table_registry.py +++ b/src/repositories/table_registry.py @@ -123,6 +123,12 @@ class TableRegistryRepository: partition_by: Optional[str] = None, partition_granularity: Optional[str] = None, initial_load_chunk_days: Optional[int] = None, + # v51 — fully-qualified BigQuery path (``project.dataset.table``). + # When set, the orchestrator uses this in place of constructing the + # path from ``_remote_attach.url.project`` + ``bucket`` + + # ``source_table`` at rebuild. Decouples the UX/RBAC ``bucket`` + # label from the physical BQ dataset name (issue #343). + bq_fqn: Optional[str] = None, ) -> None: # `registered_at` defaults to "now" for fresh inserts. Updaters that # want to preserve the original registration time across edits pass @@ -142,8 +148,8 @@ class TableRegistryRepository: sync_schedule, profile_after_sync, incremental_window_days, max_history_days, incremental_column, where_filters, partition_by, partition_granularity, - initial_load_chunk_days) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + initial_load_chunk_days, bq_fqn) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (id) DO UPDATE SET name = excluded.name, folder = excluded.folder, sync_strategy = excluded.sync_strategy, primary_key = excluded.primary_key, @@ -159,13 +165,14 @@ class TableRegistryRepository: where_filters = excluded.where_filters, partition_by = excluded.partition_by, partition_granularity = excluded.partition_granularity, - initial_load_chunk_days = excluded.initial_load_chunk_days""", + initial_load_chunk_days = excluded.initial_load_chunk_days, + bq_fqn = excluded.bq_fqn""", [id, name, folder, effective_strategy, encoded_pk, description, registered_by, ts, source_type, bucket, source_table, source_query, query_mode, sync_schedule, profile_after_sync, incremental_window_days, max_history_days, incremental_column, encoded_filters, partition_by, partition_granularity, - initial_load_chunk_days], + initial_load_chunk_days, bq_fqn], ) @staticmethod diff --git a/tests/test_bq_fqn.py b/tests/test_bq_fqn.py new file mode 100644 index 0000000..cd7cdbd --- /dev/null +++ b/tests/test_bq_fqn.py @@ -0,0 +1,533 @@ +"""Tests for the v51 ``bq_fqn`` decoupling work (issue #343). + +Covers: + +- ``parse_bq_fqn`` unit cases (valid / empty / malformed shapes). +- Extractor honors ``bq_fqn`` in registry rows: dataset/table override + for same-project rows; cross-project VIEW path works; cross-project + BASE TABLE skipped with warning; malformed rejected per-row. +- Orchestrator drift sync: ``_remote_attach.url`` mismatch with overlay + triggers ``rebuild_from_registry``. +- ``validate_bigquery_startup_config`` warning matrix. +- ``RegisterTableRequest`` accepts ``bq_fqn`` field; register handler + rejects malformed / non-BQ-source bq_fqn at the API boundary. +""" + +import re +from pathlib import Path +from unittest.mock import MagicMock, patch + +import duckdb +import pytest + +from connectors.bigquery.extractor import parse_bq_fqn + + +class _CapturingProxy: + """Lightweight DuckDB proxy: intercepts BigQuery extension SQL and + records every CREATE VIEW we would have emitted against the real + BQ extension. The extension itself isn't loaded (offline tests), + so view SQL referencing ``bq.*`` or ``bigquery_query(...)`` would + fail at create-time — the proxy substitutes a no-op CREATE TABLE + placeholder so downstream INSERT / verification still works. + + Captured SQL is exposed as ``proxy.create_view_sqls`` for tests + that need to assert on the path the extractor constructed.""" + + def __init__(self, real_conn): + self._real = real_conn + self.create_view_sqls: list[str] = [] + + def execute(self, sql, *args, **kwargs): + upper = sql.strip().upper() + if upper.startswith("INSTALL BIGQUERY") or upper.startswith("LOAD BIGQUERY"): + return MagicMock() + if upper.startswith("CREATE SECRET") or upper.startswith("CREATE OR REPLACE SECRET"): + return MagicMock() + if "ATTACH" in upper and "BIGQUERY" in upper: + return MagicMock() + if upper.startswith("DETACH BQ"): + return MagicMock() + if upper.startswith("SET BQ_") or upper.startswith("SELECT CURRENT_SETTING"): + return MagicMock() + # View bodies that reference the BQ extension (`bq."ds"."t"` for + # BASE TABLE or `bigquery_query(...)` for VIEW) would error + # without a live extension. Capture the SQL for the test, then + # substitute a placeholder TABLE so subsequent INSERT INTO _meta + # paths keep working. + if ("FROM BQ." in upper or "BIGQUERY_QUERY(" in upper) and "CREATE" in upper: + self.create_view_sqls.append(sql) + m = re.search(r'VIEW\s+"?(\w+)"?', sql, re.IGNORECASE) + if m: + self._real.execute( + f'CREATE OR REPLACE TABLE "{m.group(1)}" (dummy INTEGER)' + ) + return MagicMock() + return self._real.execute(sql, *args, **kwargs) + + def close(self): + return self._real.close() + + def __getattr__(self, name): + return getattr(self._real, name) + + +# ---------------------------------------------------------------------- +# parse_bq_fqn — pure unit +# ---------------------------------------------------------------------- + +class TestParseBqFqn: + def test_none_returns_none(self): + assert parse_bq_fqn(None) is None + + def test_empty_string_returns_none(self): + # Treat "" the same as None — the registry persists '' for + # cleared values in some paths, and the extractor's fallback + # branch is the right behavior in both cases. + assert parse_bq_fqn("") is None + + def test_well_formed_three_segments(self): + assert parse_bq_fqn("my-proj.my_ds.my_tbl") == ( + "my-proj", "my_ds", "my_tbl", + ) + + @pytest.mark.parametrize("bad", [ + "just_a_table", # one segment + "ds.table", # two segments + "p.d.t.extra", # four segments + ".d.t", # empty project + "p..t", # empty dataset + "p.d.", # empty table + ]) + def test_malformed_raises(self, bad): + with pytest.raises(ValueError, match="malformed bq_fqn"): + parse_bq_fqn(bad) + + def test_unsafe_project_rejected(self): + # `_validate_project_id` accepts the canonical BQ project-id + # grammar (6-30 lowercase letters/digits/dashes). A space + # would let an attacker break out of the inline backtick path + # at view-create time; reject upfront. + with pytest.raises(ValueError, match="project.*grammar"): + parse_bq_fqn("bad project.ds.tbl") + + +# ---------------------------------------------------------------------- +# Extractor honors bq_fqn +# ---------------------------------------------------------------------- + +@pytest.fixture +def output_dir(tmp_path): + d = tmp_path / "extracts" / "bigquery" + d.mkdir(parents=True) + return str(d) + + +def _run_init_extract(output_dir, project_id, tcs, detect_returns): + """Run init_extract with mocked auth + entity-type detection through + the capturing proxy. Returns ``(stats, captured_sqls)`` so tests can + assert on both the per-row outcome AND the SQL the extractor would + have sent to the live BQ extension.""" + from connectors.bigquery.extractor import init_extract + + detector = ( + detect_returns if callable(detect_returns) + else (lambda *a, **kw: detect_returns) + ) + + captured: list[str] = [] + + def proxy_connect(path=None, **kwargs): + real_conn = duckdb.connect(path) + proxy = _CapturingProxy(real_conn) + proxy.create_view_sqls = captured # share list across calls + return proxy + + with patch("connectors.bigquery.extractor.get_metadata_token", lambda: "x"), \ + patch("connectors.bigquery.extractor._detect_table_type", detector), \ + patch("connectors.bigquery.extractor.duckdb") as mock_mod: + mock_mod.connect = proxy_connect + result = init_extract(output_dir, project_id, tcs) + return result, captured + + +def _meta_rows(output_dir): + conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb")) + try: + return conn.execute( + "SELECT table_name FROM _meta ORDER BY table_name" + ).fetchall() + finally: + conn.close() + + +class TestExtractorRespectsBqFqn: + def test_bq_fqn_overrides_bucket_for_same_project_view(self, output_dir): + """A row with bq_fqn whose project matches the extractor's ATTACH + project should use the bq_fqn's dataset/table in the inner view. + + Concretely: bucket='Sessions' (UX label) and bq_fqn= + 'my-project.product_analytics.S2_pageviews' — the bigquery_query + FROM clause should reference product_analytics.S2_pageviews, NOT + Sessions.S2_pageviews.""" + tcs = [{ + "id": "s2", + "name": "s2_session_pageviews", + "source_type": "bigquery", + "bucket": "Sessions", # UX label — must NOT leak into BQ path + "source_table": "ignored_st", # should also be overridden + "bq_fqn": "my-project.product_analytics.S2_pageviews", + "query_mode": "remote", + "description": "", + }] + result, sqls = _run_init_extract(output_dir, "my-project", tcs, "VIEW") + assert result["tables_registered"] == 1 + joined = "\n".join(sqls) + assert "product_analytics" in joined, joined + assert "S2_pageviews" in joined, joined + # The UX label must not leak into the BQ path + assert "Sessions" not in joined, joined + + def test_bq_fqn_view_cross_project_succeeds(self, output_dir): + """VIEW path uses bigquery_query(billing, ...), which can read across + projects. A bq_fqn with project ≠ extractor project should still + register the master view (cross-project SA permissions assumed).""" + tcs = [{ + "id": "rfm", + "name": "rfm", + "source_type": "bigquery", + "bucket": "RFM", + "source_table": "ignored", + "bq_fqn": "other-project.revenue.bk_rfm", + "query_mode": "remote", + "description": "", + }] + result, sqls = _run_init_extract(output_dir, "my-project", tcs, "VIEW") + assert result["tables_registered"] == 1 + joined = "\n".join(sqls) + # Verify the FROM clause carries the cross-project FQN + assert "other-project.revenue.bk_rfm" in joined, joined + # Billing project for the BQ job is still the ATTACH project + assert "bigquery_query('my-project'" in joined, joined + + def test_bq_fqn_base_table_cross_project_skipped(self, output_dir): + """BASE TABLE path goes through the bq ATTACH alias, which is bound + to the extractor's project. Cross-project BASE TABLE would silently + route to the wrong project (data not found there) — skip with a + warning and do NOT insert _meta so the master view isn't created + against missing data.""" + tcs = [{ + "id": "xp", + "name": "xp", + "source_type": "bigquery", + "bucket": "OtherDs", + "source_table": "tbl", + "bq_fqn": "other-project.OtherDs.tbl", + "query_mode": "remote", + "description": "", + }] + result, _ = _run_init_extract(output_dir, "my-project", tcs, "BASE TABLE") + assert result["tables_registered"] == 0 + # No _meta row → orchestrator won't create a master view that + # would resolve to a nonexistent inner view. + assert _meta_rows(output_dir) == [] + + def test_malformed_bq_fqn_records_per_row_error(self, output_dir): + tcs = [{ + "id": "ok", "name": "ok", "source_type": "bigquery", + "bucket": "ds", "source_table": "t", + "query_mode": "remote", "description": "", + }, { + "id": "bad", "name": "bad", "source_type": "bigquery", + "bucket": "ds", "source_table": "t", + "bq_fqn": "not.enough", # malformed + "query_mode": "remote", "description": "", + }] + result, _ = _run_init_extract(output_dir, "my-project", tcs, "BASE TABLE") + # Good row goes through; bad row recorded as per-row error and + # does NOT abort the whole extract. + assert result["tables_registered"] == 1 + assert any("malformed bq_fqn" in e["error"] for e in result["errors"]) + # Only the good row landed in _meta + rows = _meta_rows(output_dir) + assert rows == [("ok",)] + + def test_no_bq_fqn_falls_back_to_legacy(self, output_dir): + """A row without bq_fqn must keep using bucket+source_table+ + ATTACH project, exactly as pre-v51. Backwards-compat guarantee.""" + tcs = [{ + "id": "legacy", + "name": "legacy", + "source_type": "bigquery", + "bucket": "legacy_ds", + "source_table": "legacy_tbl", + # bq_fqn intentionally absent + "query_mode": "remote", + "description": "", + }] + result, sqls = _run_init_extract(output_dir, "my-project", tcs, "BASE TABLE") + assert result["tables_registered"] == 1 + assert any('bq."legacy_ds"."legacy_tbl"' in s for s in sqls), sqls + + def test_cross_project_detect_call_bills_against_extractor_project(self, output_dir): + """Regression: cross-project rows must call _detect_table_type with + billing_project=project_id (the extractor's billing project), not + just the bq_fqn data project. The data SA typically has + bigquery.dataViewer on the data project but only holds + serviceusage.services.use on the billing project — reusing the + data project as billing 403s and the broad except Exception in + init_extract silently drops the row, so the cross-project VIEW + path never executes.""" + captured_calls: list[dict] = [] + + def capturing_detector(conn, project, dataset, table, billing_project=None): + captured_calls.append({ + "project": project, + "billing_project": billing_project, + }) + return "VIEW" + + tcs = [{ + "id": "rfm", + "name": "rfm", + "source_type": "bigquery", + "bucket": "RFM", + "source_table": "ignored", + "bq_fqn": "other-project.revenue.bk_rfm", + "query_mode": "remote", + "description": "", + }] + _run_init_extract(output_dir, "my-project", tcs, capturing_detector) + assert len(captured_calls) == 1 + call = captured_calls[0] + # Data project (FROM clause / INFORMATION_SCHEMA target) + assert call["project"] == "other-project" + # Billing project (bigquery_query 1st arg + serviceusage.services.use + # check) — must be the extractor's billing project, NOT the data project. + assert call["billing_project"] == "my-project" + + +# ---------------------------------------------------------------------- +# _detect_table_type — direct unit +# ---------------------------------------------------------------------- + +class TestDetectTableTypeBilling: + """Verify that _detect_table_type wires billing_project into the + bigquery_query() 1st positional arg — the only knob that controls + which project the BQ jobs API charges + checks services.use on.""" + + def _make_fake_conn(self, captured: list, return_value): + class _FakeCursor: + def fetchone(self_inner): + return return_value + class _FakeConn: + def execute(self_inner, sql, params): + captured.append(list(params)) + return _FakeCursor() + return _FakeConn() + + def test_explicit_billing_project_used_for_bigquery_query_first_arg(self): + from connectors.bigquery.extractor import _detect_table_type + captured: list = [] + conn = self._make_fake_conn(captured, ("VIEW",)) + result = _detect_table_type( + conn, "data-proj", "ds", "tbl", + billing_project="billing-proj", + ) + assert result == "VIEW" + # bigquery_query(billing_project, bq_sql, table_predicate) + params = captured[0] + assert params[0] == "billing-proj" + # FROM clause still references the data project + assert "`data-proj.ds.INFORMATION_SCHEMA.TABLES`" in params[1] + assert params[2] == "tbl" + + def test_omitted_billing_project_defaults_to_data_project(self): + """Backwards-compat: existing same-project callers omit + billing_project and bill against the data project (no-op since + the two projects are equal in same-project lookups).""" + from connectors.bigquery.extractor import _detect_table_type + captured: list = [] + conn = self._make_fake_conn(captured, None) + _detect_table_type(conn, "same-proj", "ds", "tbl") + assert captured[0][0] == "same-proj" + + +# ---------------------------------------------------------------------- +# Orchestrator drift sync +# ---------------------------------------------------------------------- + +class TestOrchestratorBqDriftSync: + def test_drift_triggers_rebuild_from_registry(self, tmp_path, monkeypatch): + """When extract.duckdb's _remote_attach.url disagrees with the + overlay's data_source.bigquery.project, the orchestrator's + pre-pass should call rebuild_from_registry to regenerate the + extract before the main scan loop.""" + from src.orchestrator import SyncOrchestrator + + bq_dir = tmp_path / "extracts" / "bigquery" + bq_dir.mkdir(parents=True) + extract_path = bq_dir / "extract.duckdb" + + # Create a minimal _remote_attach pointing at the OLD project. + conn = duckdb.connect(str(extract_path)) + try: + conn.execute( + "CREATE TABLE _remote_attach (" + "alias VARCHAR, extension VARCHAR, url VARCHAR, " + "token_env VARCHAR)" + ) + conn.execute( + "INSERT INTO _remote_attach VALUES (?, ?, ?, ?)", + ["bq", "bigquery", "project=stale-project", ""], + ) + finally: + conn.close() + + # Overlay says the project is now `fresh-project`. + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *a, **kw: "fresh-project" if a[-1] == "project" else "", + ) + + called = [] + monkeypatch.setattr( + "connectors.bigquery.extractor.rebuild_from_registry", + lambda *a, **kw: (called.append(1), {"tables_registered": 0, "errors": []})[1], + ) + + orch = SyncOrchestrator(analytics_db_path=str(tmp_path / "analytics.duckdb")) + orch._sync_bq_remote_attach_with_overlay(tmp_path / "extracts") + assert called == [1], "drift detected but rebuild_from_registry was not invoked" + + def test_no_drift_is_noop(self, tmp_path, monkeypatch): + from src.orchestrator import SyncOrchestrator + + bq_dir = tmp_path / "extracts" / "bigquery" + bq_dir.mkdir(parents=True) + extract_path = bq_dir / "extract.duckdb" + + conn = duckdb.connect(str(extract_path)) + try: + conn.execute( + "CREATE TABLE _remote_attach (" + "alias VARCHAR, extension VARCHAR, url VARCHAR, " + "token_env VARCHAR)" + ) + conn.execute( + "INSERT INTO _remote_attach VALUES (?, ?, ?, ?)", + ["bq", "bigquery", "project=same-project", ""], + ) + finally: + conn.close() + + monkeypatch.setattr( + "app.instance_config.get_value", + lambda *a, **kw: "same-project" if a[-1] == "project" else "", + ) + called = [] + monkeypatch.setattr( + "connectors.bigquery.extractor.rebuild_from_registry", + lambda *a, **kw: called.append(1) or {}, + ) + orch = SyncOrchestrator(analytics_db_path=str(tmp_path / "analytics.duckdb")) + orch._sync_bq_remote_attach_with_overlay(tmp_path / "extracts") + assert called == [], "no drift but rebuild_from_registry was still called" + + def test_missing_extract_is_noop(self, tmp_path, monkeypatch): + """Pre-pass on an instance with no BQ extract at all must not + try to read or rewrite anything. Soft-fails silently.""" + from src.orchestrator import SyncOrchestrator + called = [] + monkeypatch.setattr( + "connectors.bigquery.extractor.rebuild_from_registry", + lambda *a, **kw: called.append(1) or {}, + ) + orch = SyncOrchestrator(analytics_db_path=str(tmp_path / "analytics.duckdb")) + orch._sync_bq_remote_attach_with_overlay(tmp_path / "extracts") + assert called == [] + + +# ---------------------------------------------------------------------- +# validate_bigquery_startup_config +# ---------------------------------------------------------------------- + +class TestStartupValidation: + def test_empty_config_no_warnings(self, monkeypatch): + from connectors.bigquery.access import validate_bigquery_startup_config + monkeypatch.setattr("app.instance_config.get_value", lambda *a, **kw: "") + assert validate_bigquery_startup_config() == [] + + def test_same_billing_and_data_project_no_warnings(self, monkeypatch): + from connectors.bigquery.access import validate_bigquery_startup_config + + def fake_get_value(*args, **kwargs): + key = args[-1] + return { + "project": "my-proj", + "billing_project": "my-proj", + "location": "", # location unset is OK when same project + }.get(key, "") + + monkeypatch.setattr("app.instance_config.get_value", fake_get_value) + assert validate_bigquery_startup_config() == [] + + def test_cross_project_without_location_warns(self, monkeypatch): + from connectors.bigquery.access import validate_bigquery_startup_config + + def fake_get_value(*args, **kwargs): + key = args[-1] + return { + "project": "data-project", + "billing_project": "billing-project", + "location": "", + }.get(key, "") + + monkeypatch.setattr("app.instance_config.get_value", fake_get_value) + warnings = validate_bigquery_startup_config() + assert len(warnings) == 1 + assert "location is not set" in warnings[0] + assert "issue #343" in warnings[0] + + def test_warehouse_like_project_without_billing_warns(self, monkeypatch): + from connectors.bigquery.access import validate_bigquery_startup_config + + def fake_get_value(*args, **kwargs): + key = args[-1] + return { + "project": "my-warehouse-project", + "billing_project": "", + "location": "us-central1", + }.get(key, "") + + monkeypatch.setattr("app.instance_config.get_value", fake_get_value) + warnings = validate_bigquery_startup_config() + # Only the warehouse-like heuristic fires (cross-project warning + # is suppressed because effective_billing == project when billing + # is unset, regardless of location). + assert any("warehouse" in w or "serviceusage" in w for w in warnings) + + +# ---------------------------------------------------------------------- +# Admin API surface +# ---------------------------------------------------------------------- + +class TestRegisterRequestAcceptsBqFqn: + def test_pydantic_accepts_well_formed(self): + from app.api.admin import RegisterTableRequest + r = RegisterTableRequest( + name="t", source_type="bigquery", + bucket="ds", source_table="t", + bq_fqn="proj.ds.t", + ) + assert r.bq_fqn == "proj.ds.t" + + def test_pydantic_accepts_omitted(self): + from app.api.admin import RegisterTableRequest + r = RegisterTableRequest(name="t", source_type="bigquery", bucket="ds", source_table="t") + assert r.bq_fqn is None + + def test_update_request_accepts_bq_fqn(self): + from app.api.admin import UpdateTableRequest + u = UpdateTableRequest(bq_fqn="p.d.t") + assert u.bq_fqn == "p.d.t" diff --git a/tests/test_db_schema_version.py b/tests/test_db_schema_version.py index f4c489a..dea014c 100644 --- a/tests/test_db_schema_version.py +++ b/tests/test_db_schema_version.py @@ -140,7 +140,15 @@ def test_schema_version_matches_constant(): # CREATE UNIQUE INDEX. Migration pre-checks for existing # duplicates and raises RuntimeError listing them rather # than letting the index create fail mid-way. - assert SCHEMA_VERSION == 50 + # v51 (#343): nullable bq_fqn column on table_registry — fully- + # qualified BigQuery path (project.dataset.table) that + # decouples the UX/RBAC `bucket` label from the physical + # BQ dataset name. Rows without it fall back to the + # legacy bucket+source_table+remote_attach.project path + # (backwards-compatible). Both _SYSTEM_SCHEMA (fresh + # installs) and _V50_TO_V51_MIGRATIONS (upgrades) carry + # the column so post-migration installs converge. + assert SCHEMA_VERSION == 51 def test_v37_marketplace_curator_columns(tmp_path): diff --git a/tests/test_keboola_registry_extended.py b/tests/test_keboola_registry_extended.py index 69ef489..0d8e1e0 100644 --- a/tests/test_keboola_registry_extended.py +++ b/tests/test_keboola_registry_extended.py @@ -6,7 +6,7 @@ pattern used for primary_key). Other fields are scalar pass-through. import duckdb import pytest -from src.db import _V26_TO_V27_MIGRATIONS +from src.db import _V26_TO_V27_MIGRATIONS, _V50_TO_V51_MIGRATIONS from src.repositories.table_registry import TableRegistryRepository @@ -28,6 +28,8 @@ def repo(tmp_path): ) for sql in _V26_TO_V27_MIGRATIONS: conn.execute(sql) + for sql in _V50_TO_V51_MIGRATIONS: + conn.execute(sql) return TableRegistryRepository(conn) diff --git a/tests/test_schema_v42_migration.py b/tests/test_schema_v42_migration.py index a1f9e4b..130a0e7 100644 --- a/tests/test_schema_v42_migration.py +++ b/tests/test_schema_v42_migration.py @@ -66,7 +66,7 @@ def test_v41_to_v42_is_idempotent(tmp_path): conn = duckdb.connect(str(db_path)) init_database(conn) v = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert v == 50 + assert v == 51 conn.close() @@ -87,7 +87,7 @@ def test_v41_db_upgrades_cleanly(tmp_path): conn = duckdb.connect(str(db_path)) init_database(conn) v = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert v == 50 + assert v == 51 # All 7 new v41 tables exist after the v40→v41 upgrade tables = { row[0] @@ -118,7 +118,7 @@ def test_v30_db_ladders_all_the_way_up(tmp_path): conn = duckdb.connect(str(db_path)) init_database(conn) v = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert v == 50 + assert v == 51 cnt = conn.execute("SELECT COUNT(*) FROM audit_log WHERE id='vintage'").fetchone()[0] assert cnt == 1 # New v41 table exists