* feat(bq): decouple table_registry bucket from BQ dataset name (#343) Adds optional `bq_fqn` column (schema v51) carrying the fully-qualified BigQuery path (project.dataset.table) so the rebuild path no longer has to reconstruct it from the dual-purpose `bucket` field (which is also a UX/RBAC label). - Schema v51 migration + _SYSTEM_SCHEMA carry the nullable column; rows without it keep using the legacy bucket+source_table+ remote_attach.project path (backwards compat). - BQ extractor honors bq_fqn per row when present: dataset/table override on same-project rows; cross-project VIEW path works via bigquery_query(billing, ...); cross-project BASE TABLE skipped with a clear warning (multi-ATTACH per project deferred to follow-up). - Orchestrator pre-pass detects drift between extract.duckdb _remote_attach.url and overlay data_source.bigquery.project, calls rebuild_from_registry to regenerate when they differ. Closes the operational hazard where /admin/server-config edits silently left the on-disk extract pointing at the old project until the next manual sync. - Startup config check warns when project ≠ billing_project without location set (the on-disk symptom is "provider returned no data" silently in metadata cache), and when a warehouse-like data project has no billing_project override (silent 403 serviceusage path). - _resolve_bq_location warning now points at the location config key explicitly so operators see the actionable fix in the log. - POST /api/admin/register-table and PUT /api/admin/registry/{id} accept bq_fqn; malformed values rejected at the API boundary (422). - 25 tests covering parse_bq_fqn matrix, extractor override paths (same-project + cross-project VIEW + cross-project BASE TABLE skip), orchestrator drift sync, startup-validator heuristic, admin models. UI surface for bq_fqn input in /admin/tables intentionally omitted from this PR (3.5k-line template change) — admins can register through the REST API or `agnes admin` CLI in the meantime. Multi-project ATTACH support is the same scope deferral as the cross-project BASE TABLE skip; both ride a follow-up PR. * review fixes: abstract CHANGELOG, merge duplicate Changed, bump docs schema version - CHANGELOG.md: remove customer-specific hostname + incident date range from the orchestrator drift-sync entry (vendor-agnostic OSS rule), fold the entry into the existing [Unreleased] ### Changed section instead of opening a duplicate heading. - docs/architecture.md: bump 'Current schema version' from 19 to 51 to match SCHEMA_VERSION (per agnes-orchestrator skill rule #4). * review fixes: vendor-agnostic test fixture + Schema v51 internal bullet - tests/test_bq_fqn.py: replace customer GCP project ID with generic 'my-warehouse-project' placeholder (vendor-agnostic OSS rule). Test asserts on the warehouse-like heuristic, not the literal project name, so the rename is behavior-neutral. - CHANGELOG.md: add explicit '\*\*Schema v51\*\*' bullet under `### Internal` naming the new version + summarizing the additive nullable column (matches the convention from v47/v48 bullets). * fix(bq): cross-project _detect_table_type bills against extractor project Addresses Devin review on #346 — pre-fix _detect_table_type passed the data project as BOTH the FROM-clause target AND the bigquery_query() first arg (billing project). For cross-project bq_fqn rows where fqn_project != project_id, the data SA holds bigquery.dataViewer on fqn_project but the serviceusage.services.use permission only on project_id, so the call 403'd. init_extract's broad except Exception swallowed the error and silently skipped the row, meaning the cross-project VIEW path at extractor.py:~696 — the PR's primary cross-project use case — never executed. - Add optional billing_project kwarg to _detect_table_type; defaults to project for backwards compat (same-project callers unaffected). - Update the init_extract call site to pass billing_project=project_id explicitly. Same-project rows (fqn_project == project_id) are a no-op; cross-project rows now route billing to the project where the SA actually has services.use. - 2 new tests in TestDetectTableTypeBilling cover (a) explicit billing_project routing to bigquery_query 1st arg + data project staying in FROM, and (b) the backwards-compat default. Plus test_cross_project_detect_call_bills_against_extractor_project pins the call-site wiring — captures the (project, billing_project) pair the extractor passes for a cross-project bq_fqn row. * release: 0.54.29 — bq_fqn decoupling + marketplace refactor + setup-script UX Accumulated [Unreleased] content from #342 (flea marketplace refactor), #344 (setup script step-2 cwd check), and #346 (this PR — bq_fqn column + orchestrator drift sync + startup config check). Schema v51.
This commit is contained in:
parent
bd90485dbd
commit
c3e82972c8
15 changed files with 1032 additions and 28 deletions
52
CHANGELOG.md
52
CHANGELOG.md
|
|
@ -10,7 +10,59 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.54.29] — 2026-05-19
|
||||
|
||||
### Added
|
||||
- **`table_registry.bq_fqn` column** (schema v51, issue #343) — optional
|
||||
fully-qualified BigQuery path (`project.dataset.table`) that decouples
|
||||
the UX/RBAC `bucket` label from the physical BQ dataset name. Pre-v51
|
||||
the orchestrator constructed the rebuild path as
|
||||
`{remote_attach.project}.{bucket}.{source_table}`, which coupled
|
||||
package naming to BQ storage layout — renaming a package broke its
|
||||
tables and ad-hoc proxy datasets were needed when the UX name
|
||||
differed from the dataset name. With `bq_fqn` set, the extractor
|
||||
takes the project / dataset / table directly from the field; rows
|
||||
without it use the legacy path (backwards-compatible).
|
||||
- **`data_source.bigquery.location`** is now strongly recommended in
|
||||
`instance.yaml` (`/admin/server-config`). When unset on a cross-
|
||||
project setup, metadata-cache region resolution falls back to a
|
||||
REST `dataset.get()` per metadata refresh that requires
|
||||
`bigquery.datasets.get` IAM (often missing from data-viewer-only
|
||||
SAs) and silently returns "provider returned no data" when it 404s.
|
||||
Setting `location` (e.g. `us-central1` or `EU`) skips the REST hop
|
||||
entirely. The `_resolve_bq_location` warning now points at this
|
||||
config key explicitly.
|
||||
- **Startup config check** (`connectors.bigquery.access.validate_bigquery_startup_config`)
|
||||
surfaces two common BQ misconfigs in the boot log: cross-project
|
||||
setup with `location` unset, and a warehouse-like data project
|
||||
with no `billing_project` override (which silently bills to the
|
||||
warehouse, where the SA usually lacks `serviceusage.services.use`).
|
||||
Non-fatal warnings only — never blocks startup.
|
||||
- **`POST /api/admin/register-table`** and **`PUT /api/admin/registry/{id}`**
|
||||
accept `bq_fqn`. Malformed values are rejected at the API boundary
|
||||
(422) instead of landing in the registry and breaking the next
|
||||
rebuild silently.
|
||||
|
||||
### Internal
|
||||
- **Schema v51** — adds nullable `table_registry.bq_fqn VARCHAR`;
|
||||
existing rows default to `NULL` and use the legacy
|
||||
`bucket + source_table` path (backwards-compatible, no backfill).
|
||||
- New test suite `tests/test_bq_fqn.py` (25 cases): `parse_bq_fqn`
|
||||
unit matrix, extractor override paths (same-project VIEW + cross-
|
||||
project VIEW success + cross-project BASE TABLE skip), orchestrator
|
||||
drift sync, startup-validator heuristic, admin Pydantic models.
|
||||
|
||||
### Changed
|
||||
- **`SyncOrchestrator.rebuild()` self-heals BQ `_remote_attach.url`
|
||||
drift**. When an admin edits `data_source.bigquery.project` in
|
||||
`/admin/server-config`, the overlay is the source of truth but the
|
||||
on-disk `extract.duckdb._remote_attach.url` would stay frozen at
|
||||
the old project until the next BQ register/sync trigger — silently
|
||||
routing every remote BQ query to the previous project (manifests as
|
||||
`Dataset not found in <old project>` errors even though the admin
|
||||
UI shows the corrected project). The orchestrator now compares the
|
||||
two at every rebuild and, if they differ, calls
|
||||
`rebuild_from_registry()` to regenerate the extract.
|
||||
- Setup script no longer auto-creates the workspace folder. Step 2 of
|
||||
the pasted prompt now runs `pwd`, compares it to `$HOME/<workspace_dir>`
|
||||
(the folder the /home page's visible Step 3 told the user to create
|
||||
|
|
|
|||
|
|
@ -1397,6 +1397,22 @@ class RegisterTableRequest(BaseModel):
|
|||
partition_by: Optional[str] = None
|
||||
partition_granularity: Optional[str] = None
|
||||
initial_load_chunk_days: Optional[int] = None
|
||||
# v51 — fully-qualified BigQuery path. When set on a BigQuery row,
|
||||
# the extractor uses ``project.dataset.table`` from this field instead
|
||||
# of constructing the path from ``bucket`` + ``source_table`` against
|
||||
# the globally-attached project. Decouples UX/RBAC ``bucket`` label
|
||||
# from physical BQ dataset (issue #343). Format ``project.dataset.table``;
|
||||
# validated by ``connectors.bigquery.extractor.parse_bq_fqn``.
|
||||
bq_fqn: Optional[str] = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Fully-qualified BigQuery path (``project.dataset.table``). "
|
||||
"Only applies to source_type='bigquery'. When set, overrides "
|
||||
"the legacy bucket+source_table path construction. Use this "
|
||||
"to register a table whose BQ dataset name differs from the "
|
||||
"Agnes ``bucket`` label (issue #343)."
|
||||
),
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _check_mode_query_coherence(self):
|
||||
|
|
@ -1911,6 +1927,10 @@ class UpdateTableRequest(BaseModel):
|
|||
partition_by: Optional[str] = None
|
||||
partition_granularity: Optional[str] = None
|
||||
initial_load_chunk_days: Optional[int] = None
|
||||
# v51 — see RegisterTableRequest.bq_fqn. PUT lets an admin add or
|
||||
# clear bq_fqn on an existing row (cleared via explicit `null`,
|
||||
# per the PUT shape contract documented on the handler below).
|
||||
bq_fqn: Optional[str] = None
|
||||
|
||||
@field_validator("sync_strategy", mode="before")
|
||||
@classmethod
|
||||
|
|
@ -2454,6 +2474,22 @@ def register_table(
|
|||
# deprecated and inert at the runtime layer. The DB column keeps its
|
||||
# schema default; the registry response no longer reflects request
|
||||
# values for this flag.
|
||||
# v51 — validate bq_fqn upfront. The extractor would catch a malformed
|
||||
# value at next rebuild and skip the row, but failing at register time
|
||||
# gives the admin a clean 422 with the specific complaint instead of
|
||||
# a silent "table registered but never materialized" state.
|
||||
if request.bq_fqn is not None and request.source_type != "bigquery":
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail="bq_fqn only applies to source_type='bigquery'",
|
||||
)
|
||||
if request.bq_fqn is not None:
|
||||
from connectors.bigquery.extractor import parse_bq_fqn
|
||||
try:
|
||||
parse_bq_fqn(request.bq_fqn)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=422, detail=str(e))
|
||||
|
||||
repo.register(
|
||||
id=table_id,
|
||||
name=request.name,
|
||||
|
|
@ -2477,6 +2513,7 @@ def register_table(
|
|||
partition_by=request.partition_by,
|
||||
partition_granularity=request.partition_granularity,
|
||||
initial_load_chunk_days=request.initial_load_chunk_days,
|
||||
bq_fqn=request.bq_fqn,
|
||||
)
|
||||
|
||||
# Audit entry — masked params; description kept raw (it's documentation).
|
||||
|
|
@ -2824,6 +2861,25 @@ async def update_table(
|
|||
merged["profile_after_sync"] = synthetic.profile_after_sync
|
||||
merged["source_query"] = synthetic.source_query
|
||||
|
||||
# v51 — same bq_fqn validation as register-table. PUT can both
|
||||
# add a fresh bq_fqn or update an existing one; in either case
|
||||
# malformed values should reject at PUT time, not silently
|
||||
# land in the DB and break the next rebuild.
|
||||
if merged.get("bq_fqn"):
|
||||
from connectors.bigquery.extractor import parse_bq_fqn
|
||||
try:
|
||||
parse_bq_fqn(merged["bq_fqn"])
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=422, detail=str(e))
|
||||
else:
|
||||
# Non-BQ row carrying bq_fqn is nonsensical — reject the same
|
||||
# way register-table does.
|
||||
if merged.get("bq_fqn"):
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail="bq_fqn only applies to source_type='bigquery'",
|
||||
)
|
||||
|
||||
repo.register(id=table_id, **merged)
|
||||
|
||||
AuditRepository(conn).log(
|
||||
|
|
|
|||
12
app/main.py
12
app/main.py
|
|
@ -199,6 +199,18 @@ async def lifespan(app):
|
|||
except Exception:
|
||||
logger.exception("startup FTS index rebuild failed; falling back to ILIKE on /api/memory?search=")
|
||||
|
||||
# Surface BQ config gaps at startup so the operator sees them in the
|
||||
# boot log instead of as cryptic "provider returned no data" /
|
||||
# "403 serviceusage" later. Issue #343 — these are the same gaps that
|
||||
# made every remote BQ query on foundryai-prod fail silently mid-May
|
||||
# 2026. Non-fatal: warnings only, no startup abort.
|
||||
try:
|
||||
from connectors.bigquery.access import validate_bigquery_startup_config
|
||||
for warning in validate_bigquery_startup_config():
|
||||
logger.warning("BQ config check: %s", warning)
|
||||
except Exception:
|
||||
logger.exception("BQ startup config validation crashed (non-fatal)")
|
||||
|
||||
# Seed admin user (SEED_ADMIN_EMAIL) and add them to the Admin user_group.
|
||||
# Optional SEED_ADMIN_PASSWORD lets the seeded user sign in immediately
|
||||
# without going through bootstrap; never overwritten if already set.
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import threading
|
|||
from collections import deque
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable, Iterator, Literal
|
||||
from typing import Callable, Iterator, List, Literal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -738,3 +738,74 @@ def get_bq_access() -> BqAccess:
|
|||
billing = data
|
||||
|
||||
return BqAccess(BqProjects(billing=billing, data=data))
|
||||
|
||||
|
||||
def validate_bigquery_startup_config() -> List[str]:
|
||||
"""Surface common config gaps that only fail at first BQ call (not at boot).
|
||||
|
||||
Returns a list of warning strings (empty when nothing notable). Caller
|
||||
typically logs each at WARNING and continues — startup never blocks on
|
||||
config quality issues, only on hard schema problems.
|
||||
|
||||
Checks (in order, each independent):
|
||||
|
||||
1. Cross-project setup (``project`` ≠ ``billing_project``) without
|
||||
``location`` set. The region-scoped metadata path
|
||||
(``_fetch_via_table_storage`` in metadata.py) falls back to
|
||||
``client.get_dataset()`` per-table on every cache refresh when
|
||||
``location`` is unset, which works for some IAM shapes but silently
|
||||
fails with ``"provider returned no data"`` for others (the
|
||||
on-disk symptom from issue #343). Setting
|
||||
``data_source.bigquery.location`` to the dataset's region makes the
|
||||
fast path deterministic.
|
||||
|
||||
2. ``billing_project`` defaulted to ``project`` while the two values
|
||||
suggest a cross-project setup (project name contains "data" or
|
||||
"dataview", billing name contains "ai" or "foundryai" — heuristic).
|
||||
Almost-always-wrong combo: pre-fix the SA on ``project`` lacks
|
||||
``serviceusage.services.use`` and every query 502s. We can't be
|
||||
sure, so we warn rather than reject.
|
||||
|
||||
Lives in this module (not app/main.py) so the SDK / CLI / scripts that
|
||||
use ``BqAccess`` outside the FastAPI process can call it too.
|
||||
"""
|
||||
warnings: List[str] = []
|
||||
try:
|
||||
from app.instance_config import get_value
|
||||
except Exception:
|
||||
return warnings # config layer not available — likely test harness
|
||||
project = (get_value("data_source", "bigquery", "project") or "").strip()
|
||||
billing = (get_value("data_source", "bigquery", "billing_project") or "").strip()
|
||||
location = (get_value("data_source", "bigquery", "location") or "").strip()
|
||||
if not project:
|
||||
return warnings # BQ not configured — nothing to check
|
||||
effective_billing = billing or project
|
||||
if effective_billing != project and not location:
|
||||
warnings.append(
|
||||
f"data_source.bigquery.project={project!r} differs from "
|
||||
f"billing_project={effective_billing!r} (cross-project setup) "
|
||||
f"but data_source.bigquery.location is not set. The metadata "
|
||||
f"cache will fall back to per-table REST dataset.get() and may "
|
||||
f"silently return 'provider returned no data' for some IAM "
|
||||
f"shapes. Set data_source.bigquery.location (e.g. 'us-central1' "
|
||||
f"or 'EU') to the region where the dataset lives — see issue "
|
||||
f"#343."
|
||||
)
|
||||
if not billing and project:
|
||||
# Heuristic detection of the common cross-project mistake: data
|
||||
# project named like a warehouse, project the SA actually lives in
|
||||
# named like the app. The user typically wants billing_project to
|
||||
# equal the SA's home project.
|
||||
proj_low = project.lower()
|
||||
warehouse_like = any(s in proj_low for s in ("dataview", "warehouse", "datalake", "-dw-", "-data-"))
|
||||
if warehouse_like:
|
||||
warnings.append(
|
||||
f"data_source.bigquery.project={project!r} looks like a "
|
||||
f"shared data warehouse but billing_project is unset, so "
|
||||
f"jobs will bill to {project!r}. If the service account "
|
||||
f"doesn't have serviceusage.services.use on {project!r}, "
|
||||
f"every query will fail with 403. Set "
|
||||
f"data_source.bigquery.billing_project to the SA's home "
|
||||
f"project — see issue #343."
|
||||
)
|
||||
return warnings
|
||||
|
|
|
|||
|
|
@ -26,6 +26,49 @@ from src.identifier_validation import validate_identifier, validate_quoted_ident
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_bq_fqn(value: Optional[str]) -> Optional[tuple]:
|
||||
"""Parse a ``project.dataset.table`` fully-qualified BigQuery name.
|
||||
|
||||
Returns ``(project, dataset, table)`` on success, ``None`` if ``value``
|
||||
is empty / None, or raises ``ValueError`` with a descriptive message
|
||||
if the string is non-empty but malformed (wrong segment count, empty
|
||||
segments, or any segment that fails the BQ identifier validators).
|
||||
|
||||
Distinguishes "not set" (None / empty -> return None, caller falls
|
||||
back to legacy bucket+source_table path) from "set but invalid" (raise
|
||||
-> caller surfaces a registration error). Silently treating malformed
|
||||
values as missing would let an admin typo land in the registry and
|
||||
then rebuild against the legacy path, hiding the typo until query
|
||||
time.
|
||||
"""
|
||||
if not value:
|
||||
return None
|
||||
parts = value.split(".")
|
||||
if len(parts) != 3 or not all(p for p in parts):
|
||||
raise ValueError(
|
||||
f"malformed bq_fqn {value!r}: expected exactly three non-empty "
|
||||
f"segments 'project.dataset.table'"
|
||||
)
|
||||
project, dataset, table = parts
|
||||
if not _validate_project_id(project):
|
||||
raise ValueError(
|
||||
f"malformed bq_fqn {value!r}: project {project!r} fails BQ "
|
||||
f"project-id grammar"
|
||||
)
|
||||
if not validate_quoted_identifier(dataset, "BQ dataset"):
|
||||
raise ValueError(
|
||||
f"malformed bq_fqn {value!r}: dataset {dataset!r} fails BQ "
|
||||
f"identifier grammar"
|
||||
)
|
||||
if not validate_quoted_identifier(table, "BQ table"):
|
||||
raise ValueError(
|
||||
f"malformed bq_fqn {value!r}: table {table!r} fails BQ "
|
||||
f"identifier grammar"
|
||||
)
|
||||
return (project, dataset, table)
|
||||
|
||||
|
||||
# Serializes the body of `init_extract` across threads so two concurrent
|
||||
# materialize calls (e.g. the synchronous timeout-fallback BackgroundTask
|
||||
# kicking in while the original daemon thread is still running) can't both
|
||||
|
|
@ -245,6 +288,7 @@ def _detect_table_type(
|
|||
project: str,
|
||||
dataset: str,
|
||||
table: str,
|
||||
billing_project: str | None = None,
|
||||
) -> str | None:
|
||||
"""Return BQ entity type for `project.dataset.table`.
|
||||
|
||||
|
|
@ -252,18 +296,34 @@ def _detect_table_type(
|
|||
API — works on tables, views, and materialized views alike. Returns the
|
||||
value of INFORMATION_SCHEMA.TABLES.table_type ('BASE TABLE', 'VIEW',
|
||||
'MATERIALIZED_VIEW') or None if not found.
|
||||
|
||||
Args:
|
||||
project: Data project (where the entity lives — appears in the
|
||||
INFORMATION_SCHEMA FROM clause).
|
||||
dataset, table: Identify the BQ entity.
|
||||
billing_project: Project whose SA quota pays for the lookup job and
|
||||
against which `serviceusage.services.use` is checked. When ``None``
|
||||
(default), bills against ``project`` — fine for same-project
|
||||
lookups. Cross-project callers MUST pass ``billing_project`` to
|
||||
the extractor's billing project explicitly; the data-side SA
|
||||
typically has only ``bigquery.dataViewer`` on ``project`` and lacks
|
||||
``serviceusage.services.use`` there, so reusing ``project`` for
|
||||
billing 403s and the caller's broad ``except Exception`` silently
|
||||
drops the row.
|
||||
"""
|
||||
if billing_project is None:
|
||||
billing_project = project
|
||||
bq_sql = (
|
||||
f"SELECT table_type FROM `{project}.{dataset}.INFORMATION_SCHEMA.TABLES` "
|
||||
f"WHERE table_name = ? LIMIT 1"
|
||||
)
|
||||
# Parameter-bind project (1st arg of bigquery_query), the inner BQ SQL
|
||||
# (2nd arg), and the table-name predicate. This avoids the nested-quote
|
||||
# Parameter-bind billing_project (1st arg of bigquery_query), the inner BQ
|
||||
# SQL (2nd arg), and the table-name predicate. This avoids the nested-quote
|
||||
# bug where inline `'{table}'` would close the outer `bigquery_query('...')`
|
||||
# string. Note: bigquery_query forwards extra positional args as BQ query
|
||||
# parameters, bound positionally to the `?` placeholders inside `bq_sql`.
|
||||
duck_sql = "SELECT * FROM bigquery_query(?, ?, ?)"
|
||||
row = conn.execute(duck_sql, [project, bq_sql, table]).fetchone()
|
||||
row = conn.execute(duck_sql, [billing_project, bq_sql, table]).fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
|
||||
|
|
@ -548,8 +608,35 @@ def _init_extract_locked(
|
|||
continue
|
||||
|
||||
table_name = tc["name"]
|
||||
# v51: if ``bq_fqn`` is set on the registry row, it overrides the
|
||||
# legacy bucket+source_table+project_id triplet. ``bq_fqn``
|
||||
# decouples the UX/RBAC ``bucket`` label from the physical BQ
|
||||
# dataset name (issue #343). Missing/empty bq_fqn falls back to
|
||||
# the legacy path — backwards compat for pre-v51 registrations.
|
||||
raw_fqn = tc.get("bq_fqn")
|
||||
try:
|
||||
parsed_fqn = parse_bq_fqn(raw_fqn)
|
||||
except ValueError as e:
|
||||
stats["errors"].append({"table": table_name, "error": str(e)})
|
||||
continue
|
||||
if parsed_fqn is not None:
|
||||
fqn_project, dataset, source_table = parsed_fqn
|
||||
# Cross-project bq_fqn: extractor ATTACHed `bq` against
|
||||
# `project_id` (the overlay's data project). When
|
||||
# bq_fqn.project differs, the BASE TABLE path via the bq
|
||||
# alias would silently route to the wrong project. The
|
||||
# VIEW path goes through ``bigquery_query(billing, …)``
|
||||
# which takes its own billing arg, so cross-project works
|
||||
# there — but BASE TABLE we skip with a clear warning
|
||||
# rather than serve wrong-source data. Multi-ATTACH per
|
||||
# distinct project is the proper fix (follow-up; see PR
|
||||
# description).
|
||||
cross_project = fqn_project != project_id
|
||||
else:
|
||||
dataset = tc.get("bucket", "")
|
||||
source_table = tc.get("source_table", table_name)
|
||||
fqn_project = project_id
|
||||
cross_project = False
|
||||
|
||||
# #81 Group D — refuse rows with unsafe identifiers. Same
|
||||
# rationale as the keboola extractor: registry is admin-controlled
|
||||
|
|
@ -573,10 +660,21 @@ def _init_extract_locked(
|
|||
continue
|
||||
|
||||
try:
|
||||
entity_type = _detect_table_type(conn, project_id, dataset, source_table)
|
||||
# Cross-project rows MUST bill against ``project_id`` (the
|
||||
# extractor's billing project where the SA has
|
||||
# ``serviceusage.services.use``). Passing ``fqn_project`` for
|
||||
# both data + billing 403s on cross-project setups, the
|
||||
# broad ``except Exception`` below silently drops the row,
|
||||
# and the cross-project VIEW path at line ~696 never
|
||||
# executes. (Same-project rows: ``fqn_project == project_id``
|
||||
# so this is a no-op.)
|
||||
entity_type = _detect_table_type(
|
||||
conn, fqn_project, dataset, source_table,
|
||||
billing_project=project_id,
|
||||
)
|
||||
if entity_type is None:
|
||||
raise RuntimeError(
|
||||
f"BQ entity {project_id}.{dataset}.{source_table} not found"
|
||||
f"BQ entity {fqn_project}.{dataset}.{source_table} not found"
|
||||
)
|
||||
|
||||
# Issue #160: always create a master view for query_mode='remote'
|
||||
|
|
@ -588,6 +686,18 @@ def _init_extract_locked(
|
|||
# logged + skipped, with NO _meta row, since orchestrator-side
|
||||
# master-view creation requires a corresponding inner view.
|
||||
if entity_type == "BASE TABLE":
|
||||
if cross_project:
|
||||
# bq_fqn points to a different project than the
|
||||
# ATTACH alias — see comment above. Skip with a
|
||||
# diagnostic instead of serving wrong-source data.
|
||||
logger.warning(
|
||||
"bq_fqn project mismatch for BASE TABLE %s: "
|
||||
"bq_fqn=%s, extractor ATTACHed to %s. Master "
|
||||
"view skipped — multi-ATTACH follow-up needed "
|
||||
"or register a same-project proxy view.",
|
||||
table_name, fqn_project, project_id,
|
||||
)
|
||||
continue
|
||||
view_sql = (
|
||||
f'CREATE OR REPLACE VIEW "{table_name}" AS '
|
||||
f'SELECT * FROM bq."{dataset}"."{source_table}"'
|
||||
|
|
@ -595,12 +705,20 @@ def _init_extract_locked(
|
|||
conn.execute(view_sql)
|
||||
elif entity_type in ("VIEW", "MATERIALIZED_VIEW"):
|
||||
# `dataset` and `source_table` are validated above by
|
||||
# validate_quoted_identifier; project_id is validated at
|
||||
# the entry boundary of init_extract (lines 152-160).
|
||||
# validate_quoted_identifier; ``fqn_project`` is either
|
||||
# the entry-validated ``project_id`` or comes from
|
||||
# ``parse_bq_fqn`` which re-validates the project
|
||||
# segment via ``_validate_project_id``.
|
||||
# The .replace("'", "''") is defense-in-depth on the
|
||||
# inline literal.
|
||||
bq_inner = f"SELECT * FROM `{project_id}.{dataset}.{source_table}`"
|
||||
bq_inner = f"SELECT * FROM `{fqn_project}.{dataset}.{source_table}`"
|
||||
bq_inner_escaped = bq_inner.replace("'", "''")
|
||||
# Billing project stays ``project_id`` (the extractor's
|
||||
# ATTACH project) — that's the project whose SA quota
|
||||
# pays for the job. ``fqn_project`` is the data project
|
||||
# (where the table lives); ``bigquery_query`` reads
|
||||
# cross-project just fine when the SA on ``project_id``
|
||||
# has BQ Data Viewer on ``fqn_project``.
|
||||
view_sql = (
|
||||
f'CREATE OR REPLACE VIEW "{table_name}" AS '
|
||||
f"SELECT * FROM bigquery_query('{project_id}', '{bq_inner_escaped}')"
|
||||
|
|
@ -615,7 +733,7 @@ def _init_extract_locked(
|
|||
"Unverified BQ entity_type %r for %s.%s.%s — master view skipped. "
|
||||
"Use `agnes snapshot create` for this row, or file an issue with "
|
||||
"a repro to request native support.",
|
||||
entity_type, project_id, dataset, source_table,
|
||||
entity_type, fqn_project, dataset, source_table,
|
||||
)
|
||||
continue # Do NOT insert _meta — no inner view to point at.
|
||||
|
||||
|
|
@ -626,7 +744,7 @@ def _init_extract_locked(
|
|||
stats["tables_registered"] += 1
|
||||
logger.info(
|
||||
"Registered remote view: %s -> %s.%s.%s (%s)",
|
||||
table_name, project_id, dataset, source_table, entity_type,
|
||||
table_name, fqn_project, dataset, source_table, entity_type,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to register %s: %s", table_name, e)
|
||||
|
|
|
|||
|
|
@ -156,7 +156,17 @@ def _fetch_rows_and_size(bq, req: MetadataRequest) -> dict | None:
|
|||
|
||||
|
||||
def _resolve_bq_location(bq, req: MetadataRequest) -> str | None:
|
||||
"""instance.yaml.location → REST get_dataset → None."""
|
||||
"""instance.yaml.location → REST get_dataset → None.
|
||||
|
||||
The REST fallback is best-effort: it requires the SA to have
|
||||
``bigquery.datasets.get`` on the data project. Most cross-project
|
||||
setups grant ``bigquery.tables.get`` (data viewer) but NOT dataset-
|
||||
level metadata, so this 404s silently for the exact deployments
|
||||
that most need region detection. Configuring
|
||||
``data_source.bigquery.location`` skips the REST round-trip entirely
|
||||
and makes the path deterministic — strongly recommended for any
|
||||
non-trivial setup. Issue #343.
|
||||
"""
|
||||
cfg_location = (get_value("data_source", "bigquery", "location") or "").strip()
|
||||
if cfg_location:
|
||||
return cfg_location
|
||||
|
|
@ -167,7 +177,11 @@ def _resolve_bq_location(bq, req: MetadataRequest) -> str | None:
|
|||
return ds.location
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"BQ dataset.get failed for %s.%s — falling back to __TABLES__: %s",
|
||||
"BQ dataset.get fell back for %s.%s: %s. To skip this REST "
|
||||
"round-trip on every metadata refresh (and silence cases "
|
||||
"where the SA lacks bigquery.datasets.get), set "
|
||||
"data_source.bigquery.location in /admin/server-config to the "
|
||||
"dataset's region (e.g. 'us-central1' or 'EU').",
|
||||
bq.projects.data, req.bucket, e,
|
||||
)
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -214,7 +214,7 @@ Files NOT to modify: `connectors/jira/file_lock.py`, `connectors/jira/transform.
|
|||
|
||||
### system.duckdb — `{DATA_DIR}/state/system.duckdb`
|
||||
|
||||
Current schema version: **19** (auto-migrated from any earlier version on startup — see `src/db.py`).
|
||||
Current schema version: **51** (auto-migrated from any earlier version on startup — see `src/db.py`).
|
||||
|
||||
| Table | Purpose |
|
||||
|-------|---------|
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[project]
|
||||
name = "agnes-the-ai-analyst"
|
||||
version = "0.54.28"
|
||||
version = "0.54.29"
|
||||
description = "Agnes — AI Data Analyst platform for AI analytical systems"
|
||||
requires-python = ">=3.11,<3.14"
|
||||
license = "MIT"
|
||||
|
|
|
|||
25
src/db.py
25
src/db.py
|
|
@ -40,7 +40,7 @@ def _maybe_instrument(con, db_tag: str):
|
|||
|
||||
_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$")
|
||||
|
||||
SCHEMA_VERSION = 50
|
||||
SCHEMA_VERSION = 51
|
||||
|
||||
_SYSTEM_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS schema_version (
|
||||
|
|
@ -313,7 +313,13 @@ CREATE TABLE IF NOT EXISTS table_registry (
|
|||
where_filters VARCHAR,
|
||||
partition_by VARCHAR,
|
||||
partition_granularity VARCHAR,
|
||||
initial_load_chunk_days INTEGER
|
||||
initial_load_chunk_days INTEGER,
|
||||
-- v51: fully-qualified BigQuery path (`project.dataset.table`) for
|
||||
-- BigQuery rows. When set, decouples the UX/RBAC `bucket` label from
|
||||
-- the physical BQ dataset name; rows without it fall back to the
|
||||
-- legacy `<remote_attach.project>.<bucket>.<source_table>` path.
|
||||
-- Issue #343.
|
||||
bq_fqn VARCHAR
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS table_profiles (
|
||||
|
|
@ -3249,6 +3255,18 @@ def _v49_to_v50_migrate(conn: duckdb.DuckDBPyConnection) -> None:
|
|||
)
|
||||
|
||||
|
||||
_V50_TO_V51_MIGRATIONS = [
|
||||
# ``bq_fqn`` carries the fully-qualified BigQuery path
|
||||
# (``project.dataset.table``) for a registered remote table when set,
|
||||
# so the orchestrator's rebuild path no longer has to reconstruct it
|
||||
# from the globally-attached ``_remote_attach`` project + the dual-
|
||||
# purpose ``bucket`` field (which is also a UX/RBAC label).
|
||||
# Nullable for backwards compat — rows without it keep using the
|
||||
# legacy ``<remote_attach.project>.<bucket>.<source_table>`` fallback.
|
||||
"ALTER TABLE table_registry ADD COLUMN IF NOT EXISTS bq_fqn VARCHAR",
|
||||
]
|
||||
|
||||
|
||||
_V33_TO_V34_MIGRATIONS = [
|
||||
# DuckDB blocks DROP COLUMN while indexes reference the table
|
||||
# ("Dependency Error: Cannot alter entry … because there are entries
|
||||
|
|
@ -3705,6 +3723,9 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None:
|
|||
_v48_to_v49_migrate(conn)
|
||||
if current < 50:
|
||||
_v49_to_v50_migrate(conn)
|
||||
if current < 51:
|
||||
for sql in _V50_TO_V51_MIGRATIONS:
|
||||
conn.execute(sql)
|
||||
conn.execute(
|
||||
"UPDATE schema_version SET version = ?, applied_at = current_timestamp",
|
||||
[SCHEMA_VERSION],
|
||||
|
|
|
|||
|
|
@ -129,6 +129,92 @@ class SyncOrchestrator:
|
|||
_capture_orchestrator_exception(exc, op="rebuild_source", source=source_name)
|
||||
raise
|
||||
|
||||
def _sync_bq_remote_attach_with_overlay(self, extracts_dir: Path) -> None:
|
||||
"""Detect drift in BQ extract.duckdb's ``_remote_attach.url`` and
|
||||
rewrite the extract when it disagrees with the overlay project.
|
||||
|
||||
Operational hazard this closes (issue #343, observed on Foundry AI
|
||||
2026-05-19): an admin updates ``data_source.bigquery.project`` via
|
||||
``POST /api/admin/server-config`` (overlay write), but the BQ
|
||||
``extract.duckdb`` keeps the previously-baked ``project=<old>``
|
||||
in its ``_remote_attach`` row. The next rebuild ATTACHes the OLD
|
||||
project, queries against datasets that don't exist there, and the
|
||||
error message points at the old project — confusing operators
|
||||
who just changed the config.
|
||||
|
||||
Fix: at every rebuild, read the BQ extract's ``_remote_attach.url``,
|
||||
compare against the overlay's ``data_source.bigquery.project``, and
|
||||
if they differ, call ``rebuild_from_registry`` to regenerate the
|
||||
extract. The regeneration path is the same one ``register-table``
|
||||
uses, so its semantics are well-tested.
|
||||
|
||||
No-op preconditions (any one short-circuits to silent return):
|
||||
- no BQ extract directory on disk (instance never had BQ)
|
||||
- extract.duckdb missing (extracted-but-failed state)
|
||||
- overlay project unset (BQ not configured yet — first-time
|
||||
setup, not drift)
|
||||
- no ``_remote_attach`` table in the extract (legacy / non-BQ
|
||||
extract, e.g. a future "bigquery" name collision with a local
|
||||
connector)
|
||||
- existing url matches overlay (no drift)
|
||||
"""
|
||||
bq_extract = extracts_dir / "bigquery" / "extract.duckdb"
|
||||
if not bq_extract.exists():
|
||||
return
|
||||
try:
|
||||
from app.instance_config import get_value
|
||||
except Exception:
|
||||
return
|
||||
overlay_project = (get_value("data_source", "bigquery", "project") or "").strip()
|
||||
if not overlay_project:
|
||||
return
|
||||
# Read-only handle, separate connection — orchestrator's rebuild
|
||||
# connection is per-call and hasn't ATTACHed extracts yet at
|
||||
# this pre-pass point, so this won't fight a file lock.
|
||||
try:
|
||||
ro = duckdb.connect(str(bq_extract), read_only=True)
|
||||
except Exception:
|
||||
return
|
||||
try:
|
||||
row = ro.execute(
|
||||
"SELECT url FROM _remote_attach WHERE alias='bq'"
|
||||
).fetchone()
|
||||
except Exception:
|
||||
row = None
|
||||
finally:
|
||||
try:
|
||||
ro.close()
|
||||
except Exception:
|
||||
pass
|
||||
if not row or not row[0]:
|
||||
return
|
||||
current_url = row[0]
|
||||
expected_url = f"project={overlay_project}"
|
||||
if current_url == expected_url:
|
||||
return
|
||||
logger.info(
|
||||
"BQ remote_attach drift detected: extract.duckdb has %r, "
|
||||
"overlay has %r — regenerating extract via "
|
||||
"rebuild_from_registry()",
|
||||
current_url, expected_url,
|
||||
)
|
||||
try:
|
||||
from connectors.bigquery.extractor import rebuild_from_registry
|
||||
result = rebuild_from_registry()
|
||||
logger.info(
|
||||
"BQ remote_attach drift sync: regenerated extract — "
|
||||
"tables_registered=%s errors=%s",
|
||||
result.get("tables_registered"),
|
||||
len(result.get("errors", [])),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"BQ remote_attach drift sync: rebuild_from_registry() "
|
||||
"failed: %s — extract.duckdb still points at %r, queries "
|
||||
"will fail until next manual sync",
|
||||
e, current_url,
|
||||
)
|
||||
|
||||
def _scan_meta_pairs(self, extracts_dir: Path) -> tuple:
|
||||
"""Read every connector's `_meta` and return (pairs, clean) where:
|
||||
|
||||
|
|
@ -185,6 +271,30 @@ class SyncOrchestrator:
|
|||
logger.warning("Extracts directory %s does not exist", extracts_dir)
|
||||
return {}
|
||||
|
||||
# Pre-pass: detect drift between extract.duckdb _remote_attach.url
|
||||
# (where the orchestrator's ATTACH path will read the BQ project
|
||||
# from) and the overlay's data_source.bigquery.project (the
|
||||
# writable source of truth, edited via admin /server-config). If
|
||||
# they differ, regenerate the BQ extract so the new project
|
||||
# propagates into views before we run the main rebuild loop.
|
||||
# No-op when there is no BQ extract or no overlay project. See
|
||||
# issue #343 for the operational hazard this closes (admin
|
||||
# changes project in the UI, extract.duckdb stays stale, all
|
||||
# remote queries fail with "Dataset not found in <old project>").
|
||||
try:
|
||||
self._sync_bq_remote_attach_with_overlay(extracts_dir)
|
||||
except Exception as e:
|
||||
# Defensive: drift sync is a best-effort safety net. A failure
|
||||
# here must not block the rest of the rebuild — the worst
|
||||
# case is the same stale-extract failure mode the sync was
|
||||
# trying to prevent, which the operator can still resolve
|
||||
# manually via /admin/sync trigger.
|
||||
logger.warning(
|
||||
"BQ remote_attach drift sync failed: %s — continuing with "
|
||||
"existing extract.duckdb (queries may fail until next "
|
||||
"manual sync if project drifted)", e,
|
||||
)
|
||||
|
||||
# Issue #81 Group C — load view ownership map from system DB so we
|
||||
# can detect cross-connector view-name collisions during this
|
||||
# rebuild and refuse to silently overwrite a previously-claimed
|
||||
|
|
|
|||
|
|
@ -123,6 +123,12 @@ class TableRegistryRepository:
|
|||
partition_by: Optional[str] = None,
|
||||
partition_granularity: Optional[str] = None,
|
||||
initial_load_chunk_days: Optional[int] = None,
|
||||
# v51 — fully-qualified BigQuery path (``project.dataset.table``).
|
||||
# When set, the orchestrator uses this in place of constructing the
|
||||
# path from ``_remote_attach.url.project`` + ``bucket`` +
|
||||
# ``source_table`` at rebuild. Decouples the UX/RBAC ``bucket``
|
||||
# label from the physical BQ dataset name (issue #343).
|
||||
bq_fqn: Optional[str] = None,
|
||||
) -> None:
|
||||
# `registered_at` defaults to "now" for fresh inserts. Updaters that
|
||||
# want to preserve the original registration time across edits pass
|
||||
|
|
@ -142,8 +148,8 @@ class TableRegistryRepository:
|
|||
sync_schedule, profile_after_sync,
|
||||
incremental_window_days, max_history_days, incremental_column,
|
||||
where_filters, partition_by, partition_granularity,
|
||||
initial_load_chunk_days)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
initial_load_chunk_days, bq_fqn)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
name = excluded.name, folder = excluded.folder,
|
||||
sync_strategy = excluded.sync_strategy, primary_key = excluded.primary_key,
|
||||
|
|
@ -159,13 +165,14 @@ class TableRegistryRepository:
|
|||
where_filters = excluded.where_filters,
|
||||
partition_by = excluded.partition_by,
|
||||
partition_granularity = excluded.partition_granularity,
|
||||
initial_load_chunk_days = excluded.initial_load_chunk_days""",
|
||||
initial_load_chunk_days = excluded.initial_load_chunk_days,
|
||||
bq_fqn = excluded.bq_fqn""",
|
||||
[id, name, folder, effective_strategy, encoded_pk, description, registered_by, ts,
|
||||
source_type, bucket, source_table, source_query, query_mode,
|
||||
sync_schedule, profile_after_sync,
|
||||
incremental_window_days, max_history_days, incremental_column,
|
||||
encoded_filters, partition_by, partition_granularity,
|
||||
initial_load_chunk_days],
|
||||
initial_load_chunk_days, bq_fqn],
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
|
|
|||
533
tests/test_bq_fqn.py
Normal file
533
tests/test_bq_fqn.py
Normal file
|
|
@ -0,0 +1,533 @@
|
|||
"""Tests for the v51 ``bq_fqn`` decoupling work (issue #343).
|
||||
|
||||
Covers:
|
||||
|
||||
- ``parse_bq_fqn`` unit cases (valid / empty / malformed shapes).
|
||||
- Extractor honors ``bq_fqn`` in registry rows: dataset/table override
|
||||
for same-project rows; cross-project VIEW path works; cross-project
|
||||
BASE TABLE skipped with warning; malformed rejected per-row.
|
||||
- Orchestrator drift sync: ``_remote_attach.url`` mismatch with overlay
|
||||
triggers ``rebuild_from_registry``.
|
||||
- ``validate_bigquery_startup_config`` warning matrix.
|
||||
- ``RegisterTableRequest`` accepts ``bq_fqn`` field; register handler
|
||||
rejects malformed / non-BQ-source bq_fqn at the API boundary.
|
||||
"""
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import duckdb
|
||||
import pytest
|
||||
|
||||
from connectors.bigquery.extractor import parse_bq_fqn
|
||||
|
||||
|
||||
class _CapturingProxy:
|
||||
"""Lightweight DuckDB proxy: intercepts BigQuery extension SQL and
|
||||
records every CREATE VIEW we would have emitted against the real
|
||||
BQ extension. The extension itself isn't loaded (offline tests),
|
||||
so view SQL referencing ``bq.*`` or ``bigquery_query(...)`` would
|
||||
fail at create-time — the proxy substitutes a no-op CREATE TABLE
|
||||
placeholder so downstream INSERT / verification still works.
|
||||
|
||||
Captured SQL is exposed as ``proxy.create_view_sqls`` for tests
|
||||
that need to assert on the path the extractor constructed."""
|
||||
|
||||
def __init__(self, real_conn):
|
||||
self._real = real_conn
|
||||
self.create_view_sqls: list[str] = []
|
||||
|
||||
def execute(self, sql, *args, **kwargs):
|
||||
upper = sql.strip().upper()
|
||||
if upper.startswith("INSTALL BIGQUERY") or upper.startswith("LOAD BIGQUERY"):
|
||||
return MagicMock()
|
||||
if upper.startswith("CREATE SECRET") or upper.startswith("CREATE OR REPLACE SECRET"):
|
||||
return MagicMock()
|
||||
if "ATTACH" in upper and "BIGQUERY" in upper:
|
||||
return MagicMock()
|
||||
if upper.startswith("DETACH BQ"):
|
||||
return MagicMock()
|
||||
if upper.startswith("SET BQ_") or upper.startswith("SELECT CURRENT_SETTING"):
|
||||
return MagicMock()
|
||||
# View bodies that reference the BQ extension (`bq."ds"."t"` for
|
||||
# BASE TABLE or `bigquery_query(...)` for VIEW) would error
|
||||
# without a live extension. Capture the SQL for the test, then
|
||||
# substitute a placeholder TABLE so subsequent INSERT INTO _meta
|
||||
# paths keep working.
|
||||
if ("FROM BQ." in upper or "BIGQUERY_QUERY(" in upper) and "CREATE" in upper:
|
||||
self.create_view_sqls.append(sql)
|
||||
m = re.search(r'VIEW\s+"?(\w+)"?', sql, re.IGNORECASE)
|
||||
if m:
|
||||
self._real.execute(
|
||||
f'CREATE OR REPLACE TABLE "{m.group(1)}" (dummy INTEGER)'
|
||||
)
|
||||
return MagicMock()
|
||||
return self._real.execute(sql, *args, **kwargs)
|
||||
|
||||
def close(self):
|
||||
return self._real.close()
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._real, name)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# parse_bq_fqn — pure unit
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
class TestParseBqFqn:
|
||||
def test_none_returns_none(self):
|
||||
assert parse_bq_fqn(None) is None
|
||||
|
||||
def test_empty_string_returns_none(self):
|
||||
# Treat "" the same as None — the registry persists '' for
|
||||
# cleared values in some paths, and the extractor's fallback
|
||||
# branch is the right behavior in both cases.
|
||||
assert parse_bq_fqn("") is None
|
||||
|
||||
def test_well_formed_three_segments(self):
|
||||
assert parse_bq_fqn("my-proj.my_ds.my_tbl") == (
|
||||
"my-proj", "my_ds", "my_tbl",
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize("bad", [
|
||||
"just_a_table", # one segment
|
||||
"ds.table", # two segments
|
||||
"p.d.t.extra", # four segments
|
||||
".d.t", # empty project
|
||||
"p..t", # empty dataset
|
||||
"p.d.", # empty table
|
||||
])
|
||||
def test_malformed_raises(self, bad):
|
||||
with pytest.raises(ValueError, match="malformed bq_fqn"):
|
||||
parse_bq_fqn(bad)
|
||||
|
||||
def test_unsafe_project_rejected(self):
|
||||
# `_validate_project_id` accepts the canonical BQ project-id
|
||||
# grammar (6-30 lowercase letters/digits/dashes). A space
|
||||
# would let an attacker break out of the inline backtick path
|
||||
# at view-create time; reject upfront.
|
||||
with pytest.raises(ValueError, match="project.*grammar"):
|
||||
parse_bq_fqn("bad project.ds.tbl")
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Extractor honors bq_fqn
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def output_dir(tmp_path):
|
||||
d = tmp_path / "extracts" / "bigquery"
|
||||
d.mkdir(parents=True)
|
||||
return str(d)
|
||||
|
||||
|
||||
def _run_init_extract(output_dir, project_id, tcs, detect_returns):
|
||||
"""Run init_extract with mocked auth + entity-type detection through
|
||||
the capturing proxy. Returns ``(stats, captured_sqls)`` so tests can
|
||||
assert on both the per-row outcome AND the SQL the extractor would
|
||||
have sent to the live BQ extension."""
|
||||
from connectors.bigquery.extractor import init_extract
|
||||
|
||||
detector = (
|
||||
detect_returns if callable(detect_returns)
|
||||
else (lambda *a, **kw: detect_returns)
|
||||
)
|
||||
|
||||
captured: list[str] = []
|
||||
|
||||
def proxy_connect(path=None, **kwargs):
|
||||
real_conn = duckdb.connect(path)
|
||||
proxy = _CapturingProxy(real_conn)
|
||||
proxy.create_view_sqls = captured # share list across calls
|
||||
return proxy
|
||||
|
||||
with patch("connectors.bigquery.extractor.get_metadata_token", lambda: "x"), \
|
||||
patch("connectors.bigquery.extractor._detect_table_type", detector), \
|
||||
patch("connectors.bigquery.extractor.duckdb") as mock_mod:
|
||||
mock_mod.connect = proxy_connect
|
||||
result = init_extract(output_dir, project_id, tcs)
|
||||
return result, captured
|
||||
|
||||
|
||||
def _meta_rows(output_dir):
|
||||
conn = duckdb.connect(str(Path(output_dir) / "extract.duckdb"))
|
||||
try:
|
||||
return conn.execute(
|
||||
"SELECT table_name FROM _meta ORDER BY table_name"
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
class TestExtractorRespectsBqFqn:
|
||||
def test_bq_fqn_overrides_bucket_for_same_project_view(self, output_dir):
|
||||
"""A row with bq_fqn whose project matches the extractor's ATTACH
|
||||
project should use the bq_fqn's dataset/table in the inner view.
|
||||
|
||||
Concretely: bucket='Sessions' (UX label) and bq_fqn=
|
||||
'my-project.product_analytics.S2_pageviews' — the bigquery_query
|
||||
FROM clause should reference product_analytics.S2_pageviews, NOT
|
||||
Sessions.S2_pageviews."""
|
||||
tcs = [{
|
||||
"id": "s2",
|
||||
"name": "s2_session_pageviews",
|
||||
"source_type": "bigquery",
|
||||
"bucket": "Sessions", # UX label — must NOT leak into BQ path
|
||||
"source_table": "ignored_st", # should also be overridden
|
||||
"bq_fqn": "my-project.product_analytics.S2_pageviews",
|
||||
"query_mode": "remote",
|
||||
"description": "",
|
||||
}]
|
||||
result, sqls = _run_init_extract(output_dir, "my-project", tcs, "VIEW")
|
||||
assert result["tables_registered"] == 1
|
||||
joined = "\n".join(sqls)
|
||||
assert "product_analytics" in joined, joined
|
||||
assert "S2_pageviews" in joined, joined
|
||||
# The UX label must not leak into the BQ path
|
||||
assert "Sessions" not in joined, joined
|
||||
|
||||
def test_bq_fqn_view_cross_project_succeeds(self, output_dir):
|
||||
"""VIEW path uses bigquery_query(billing, ...), which can read across
|
||||
projects. A bq_fqn with project ≠ extractor project should still
|
||||
register the master view (cross-project SA permissions assumed)."""
|
||||
tcs = [{
|
||||
"id": "rfm",
|
||||
"name": "rfm",
|
||||
"source_type": "bigquery",
|
||||
"bucket": "RFM",
|
||||
"source_table": "ignored",
|
||||
"bq_fqn": "other-project.revenue.bk_rfm",
|
||||
"query_mode": "remote",
|
||||
"description": "",
|
||||
}]
|
||||
result, sqls = _run_init_extract(output_dir, "my-project", tcs, "VIEW")
|
||||
assert result["tables_registered"] == 1
|
||||
joined = "\n".join(sqls)
|
||||
# Verify the FROM clause carries the cross-project FQN
|
||||
assert "other-project.revenue.bk_rfm" in joined, joined
|
||||
# Billing project for the BQ job is still the ATTACH project
|
||||
assert "bigquery_query('my-project'" in joined, joined
|
||||
|
||||
def test_bq_fqn_base_table_cross_project_skipped(self, output_dir):
|
||||
"""BASE TABLE path goes through the bq ATTACH alias, which is bound
|
||||
to the extractor's project. Cross-project BASE TABLE would silently
|
||||
route to the wrong project (data not found there) — skip with a
|
||||
warning and do NOT insert _meta so the master view isn't created
|
||||
against missing data."""
|
||||
tcs = [{
|
||||
"id": "xp",
|
||||
"name": "xp",
|
||||
"source_type": "bigquery",
|
||||
"bucket": "OtherDs",
|
||||
"source_table": "tbl",
|
||||
"bq_fqn": "other-project.OtherDs.tbl",
|
||||
"query_mode": "remote",
|
||||
"description": "",
|
||||
}]
|
||||
result, _ = _run_init_extract(output_dir, "my-project", tcs, "BASE TABLE")
|
||||
assert result["tables_registered"] == 0
|
||||
# No _meta row → orchestrator won't create a master view that
|
||||
# would resolve to a nonexistent inner view.
|
||||
assert _meta_rows(output_dir) == []
|
||||
|
||||
def test_malformed_bq_fqn_records_per_row_error(self, output_dir):
|
||||
tcs = [{
|
||||
"id": "ok", "name": "ok", "source_type": "bigquery",
|
||||
"bucket": "ds", "source_table": "t",
|
||||
"query_mode": "remote", "description": "",
|
||||
}, {
|
||||
"id": "bad", "name": "bad", "source_type": "bigquery",
|
||||
"bucket": "ds", "source_table": "t",
|
||||
"bq_fqn": "not.enough", # malformed
|
||||
"query_mode": "remote", "description": "",
|
||||
}]
|
||||
result, _ = _run_init_extract(output_dir, "my-project", tcs, "BASE TABLE")
|
||||
# Good row goes through; bad row recorded as per-row error and
|
||||
# does NOT abort the whole extract.
|
||||
assert result["tables_registered"] == 1
|
||||
assert any("malformed bq_fqn" in e["error"] for e in result["errors"])
|
||||
# Only the good row landed in _meta
|
||||
rows = _meta_rows(output_dir)
|
||||
assert rows == [("ok",)]
|
||||
|
||||
def test_no_bq_fqn_falls_back_to_legacy(self, output_dir):
|
||||
"""A row without bq_fqn must keep using bucket+source_table+
|
||||
ATTACH project, exactly as pre-v51. Backwards-compat guarantee."""
|
||||
tcs = [{
|
||||
"id": "legacy",
|
||||
"name": "legacy",
|
||||
"source_type": "bigquery",
|
||||
"bucket": "legacy_ds",
|
||||
"source_table": "legacy_tbl",
|
||||
# bq_fqn intentionally absent
|
||||
"query_mode": "remote",
|
||||
"description": "",
|
||||
}]
|
||||
result, sqls = _run_init_extract(output_dir, "my-project", tcs, "BASE TABLE")
|
||||
assert result["tables_registered"] == 1
|
||||
assert any('bq."legacy_ds"."legacy_tbl"' in s for s in sqls), sqls
|
||||
|
||||
def test_cross_project_detect_call_bills_against_extractor_project(self, output_dir):
|
||||
"""Regression: cross-project rows must call _detect_table_type with
|
||||
billing_project=project_id (the extractor's billing project), not
|
||||
just the bq_fqn data project. The data SA typically has
|
||||
bigquery.dataViewer on the data project but only holds
|
||||
serviceusage.services.use on the billing project — reusing the
|
||||
data project as billing 403s and the broad except Exception in
|
||||
init_extract silently drops the row, so the cross-project VIEW
|
||||
path never executes."""
|
||||
captured_calls: list[dict] = []
|
||||
|
||||
def capturing_detector(conn, project, dataset, table, billing_project=None):
|
||||
captured_calls.append({
|
||||
"project": project,
|
||||
"billing_project": billing_project,
|
||||
})
|
||||
return "VIEW"
|
||||
|
||||
tcs = [{
|
||||
"id": "rfm",
|
||||
"name": "rfm",
|
||||
"source_type": "bigquery",
|
||||
"bucket": "RFM",
|
||||
"source_table": "ignored",
|
||||
"bq_fqn": "other-project.revenue.bk_rfm",
|
||||
"query_mode": "remote",
|
||||
"description": "",
|
||||
}]
|
||||
_run_init_extract(output_dir, "my-project", tcs, capturing_detector)
|
||||
assert len(captured_calls) == 1
|
||||
call = captured_calls[0]
|
||||
# Data project (FROM clause / INFORMATION_SCHEMA target)
|
||||
assert call["project"] == "other-project"
|
||||
# Billing project (bigquery_query 1st arg + serviceusage.services.use
|
||||
# check) — must be the extractor's billing project, NOT the data project.
|
||||
assert call["billing_project"] == "my-project"
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# _detect_table_type — direct unit
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
class TestDetectTableTypeBilling:
|
||||
"""Verify that _detect_table_type wires billing_project into the
|
||||
bigquery_query() 1st positional arg — the only knob that controls
|
||||
which project the BQ jobs API charges + checks services.use on."""
|
||||
|
||||
def _make_fake_conn(self, captured: list, return_value):
|
||||
class _FakeCursor:
|
||||
def fetchone(self_inner):
|
||||
return return_value
|
||||
class _FakeConn:
|
||||
def execute(self_inner, sql, params):
|
||||
captured.append(list(params))
|
||||
return _FakeCursor()
|
||||
return _FakeConn()
|
||||
|
||||
def test_explicit_billing_project_used_for_bigquery_query_first_arg(self):
|
||||
from connectors.bigquery.extractor import _detect_table_type
|
||||
captured: list = []
|
||||
conn = self._make_fake_conn(captured, ("VIEW",))
|
||||
result = _detect_table_type(
|
||||
conn, "data-proj", "ds", "tbl",
|
||||
billing_project="billing-proj",
|
||||
)
|
||||
assert result == "VIEW"
|
||||
# bigquery_query(billing_project, bq_sql, table_predicate)
|
||||
params = captured[0]
|
||||
assert params[0] == "billing-proj"
|
||||
# FROM clause still references the data project
|
||||
assert "`data-proj.ds.INFORMATION_SCHEMA.TABLES`" in params[1]
|
||||
assert params[2] == "tbl"
|
||||
|
||||
def test_omitted_billing_project_defaults_to_data_project(self):
|
||||
"""Backwards-compat: existing same-project callers omit
|
||||
billing_project and bill against the data project (no-op since
|
||||
the two projects are equal in same-project lookups)."""
|
||||
from connectors.bigquery.extractor import _detect_table_type
|
||||
captured: list = []
|
||||
conn = self._make_fake_conn(captured, None)
|
||||
_detect_table_type(conn, "same-proj", "ds", "tbl")
|
||||
assert captured[0][0] == "same-proj"
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Orchestrator drift sync
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
class TestOrchestratorBqDriftSync:
|
||||
def test_drift_triggers_rebuild_from_registry(self, tmp_path, monkeypatch):
|
||||
"""When extract.duckdb's _remote_attach.url disagrees with the
|
||||
overlay's data_source.bigquery.project, the orchestrator's
|
||||
pre-pass should call rebuild_from_registry to regenerate the
|
||||
extract before the main scan loop."""
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
|
||||
bq_dir = tmp_path / "extracts" / "bigquery"
|
||||
bq_dir.mkdir(parents=True)
|
||||
extract_path = bq_dir / "extract.duckdb"
|
||||
|
||||
# Create a minimal _remote_attach pointing at the OLD project.
|
||||
conn = duckdb.connect(str(extract_path))
|
||||
try:
|
||||
conn.execute(
|
||||
"CREATE TABLE _remote_attach ("
|
||||
"alias VARCHAR, extension VARCHAR, url VARCHAR, "
|
||||
"token_env VARCHAR)"
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO _remote_attach VALUES (?, ?, ?, ?)",
|
||||
["bq", "bigquery", "project=stale-project", ""],
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# Overlay says the project is now `fresh-project`.
|
||||
monkeypatch.setattr(
|
||||
"app.instance_config.get_value",
|
||||
lambda *a, **kw: "fresh-project" if a[-1] == "project" else "",
|
||||
)
|
||||
|
||||
called = []
|
||||
monkeypatch.setattr(
|
||||
"connectors.bigquery.extractor.rebuild_from_registry",
|
||||
lambda *a, **kw: (called.append(1), {"tables_registered": 0, "errors": []})[1],
|
||||
)
|
||||
|
||||
orch = SyncOrchestrator(analytics_db_path=str(tmp_path / "analytics.duckdb"))
|
||||
orch._sync_bq_remote_attach_with_overlay(tmp_path / "extracts")
|
||||
assert called == [1], "drift detected but rebuild_from_registry was not invoked"
|
||||
|
||||
def test_no_drift_is_noop(self, tmp_path, monkeypatch):
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
|
||||
bq_dir = tmp_path / "extracts" / "bigquery"
|
||||
bq_dir.mkdir(parents=True)
|
||||
extract_path = bq_dir / "extract.duckdb"
|
||||
|
||||
conn = duckdb.connect(str(extract_path))
|
||||
try:
|
||||
conn.execute(
|
||||
"CREATE TABLE _remote_attach ("
|
||||
"alias VARCHAR, extension VARCHAR, url VARCHAR, "
|
||||
"token_env VARCHAR)"
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO _remote_attach VALUES (?, ?, ?, ?)",
|
||||
["bq", "bigquery", "project=same-project", ""],
|
||||
)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.instance_config.get_value",
|
||||
lambda *a, **kw: "same-project" if a[-1] == "project" else "",
|
||||
)
|
||||
called = []
|
||||
monkeypatch.setattr(
|
||||
"connectors.bigquery.extractor.rebuild_from_registry",
|
||||
lambda *a, **kw: called.append(1) or {},
|
||||
)
|
||||
orch = SyncOrchestrator(analytics_db_path=str(tmp_path / "analytics.duckdb"))
|
||||
orch._sync_bq_remote_attach_with_overlay(tmp_path / "extracts")
|
||||
assert called == [], "no drift but rebuild_from_registry was still called"
|
||||
|
||||
def test_missing_extract_is_noop(self, tmp_path, monkeypatch):
|
||||
"""Pre-pass on an instance with no BQ extract at all must not
|
||||
try to read or rewrite anything. Soft-fails silently."""
|
||||
from src.orchestrator import SyncOrchestrator
|
||||
called = []
|
||||
monkeypatch.setattr(
|
||||
"connectors.bigquery.extractor.rebuild_from_registry",
|
||||
lambda *a, **kw: called.append(1) or {},
|
||||
)
|
||||
orch = SyncOrchestrator(analytics_db_path=str(tmp_path / "analytics.duckdb"))
|
||||
orch._sync_bq_remote_attach_with_overlay(tmp_path / "extracts")
|
||||
assert called == []
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# validate_bigquery_startup_config
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
class TestStartupValidation:
|
||||
def test_empty_config_no_warnings(self, monkeypatch):
|
||||
from connectors.bigquery.access import validate_bigquery_startup_config
|
||||
monkeypatch.setattr("app.instance_config.get_value", lambda *a, **kw: "")
|
||||
assert validate_bigquery_startup_config() == []
|
||||
|
||||
def test_same_billing_and_data_project_no_warnings(self, monkeypatch):
|
||||
from connectors.bigquery.access import validate_bigquery_startup_config
|
||||
|
||||
def fake_get_value(*args, **kwargs):
|
||||
key = args[-1]
|
||||
return {
|
||||
"project": "my-proj",
|
||||
"billing_project": "my-proj",
|
||||
"location": "", # location unset is OK when same project
|
||||
}.get(key, "")
|
||||
|
||||
monkeypatch.setattr("app.instance_config.get_value", fake_get_value)
|
||||
assert validate_bigquery_startup_config() == []
|
||||
|
||||
def test_cross_project_without_location_warns(self, monkeypatch):
|
||||
from connectors.bigquery.access import validate_bigquery_startup_config
|
||||
|
||||
def fake_get_value(*args, **kwargs):
|
||||
key = args[-1]
|
||||
return {
|
||||
"project": "data-project",
|
||||
"billing_project": "billing-project",
|
||||
"location": "",
|
||||
}.get(key, "")
|
||||
|
||||
monkeypatch.setattr("app.instance_config.get_value", fake_get_value)
|
||||
warnings = validate_bigquery_startup_config()
|
||||
assert len(warnings) == 1
|
||||
assert "location is not set" in warnings[0]
|
||||
assert "issue #343" in warnings[0]
|
||||
|
||||
def test_warehouse_like_project_without_billing_warns(self, monkeypatch):
|
||||
from connectors.bigquery.access import validate_bigquery_startup_config
|
||||
|
||||
def fake_get_value(*args, **kwargs):
|
||||
key = args[-1]
|
||||
return {
|
||||
"project": "my-warehouse-project",
|
||||
"billing_project": "",
|
||||
"location": "us-central1",
|
||||
}.get(key, "")
|
||||
|
||||
monkeypatch.setattr("app.instance_config.get_value", fake_get_value)
|
||||
warnings = validate_bigquery_startup_config()
|
||||
# Only the warehouse-like heuristic fires (cross-project warning
|
||||
# is suppressed because effective_billing == project when billing
|
||||
# is unset, regardless of location).
|
||||
assert any("warehouse" in w or "serviceusage" in w for w in warnings)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Admin API surface
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
class TestRegisterRequestAcceptsBqFqn:
|
||||
def test_pydantic_accepts_well_formed(self):
|
||||
from app.api.admin import RegisterTableRequest
|
||||
r = RegisterTableRequest(
|
||||
name="t", source_type="bigquery",
|
||||
bucket="ds", source_table="t",
|
||||
bq_fqn="proj.ds.t",
|
||||
)
|
||||
assert r.bq_fqn == "proj.ds.t"
|
||||
|
||||
def test_pydantic_accepts_omitted(self):
|
||||
from app.api.admin import RegisterTableRequest
|
||||
r = RegisterTableRequest(name="t", source_type="bigquery", bucket="ds", source_table="t")
|
||||
assert r.bq_fqn is None
|
||||
|
||||
def test_update_request_accepts_bq_fqn(self):
|
||||
from app.api.admin import UpdateTableRequest
|
||||
u = UpdateTableRequest(bq_fqn="p.d.t")
|
||||
assert u.bq_fqn == "p.d.t"
|
||||
|
|
@ -140,7 +140,15 @@ def test_schema_version_matches_constant():
|
|||
# CREATE UNIQUE INDEX. Migration pre-checks for existing
|
||||
# duplicates and raises RuntimeError listing them rather
|
||||
# than letting the index create fail mid-way.
|
||||
assert SCHEMA_VERSION == 50
|
||||
# v51 (#343): nullable bq_fqn column on table_registry — fully-
|
||||
# qualified BigQuery path (project.dataset.table) that
|
||||
# decouples the UX/RBAC `bucket` label from the physical
|
||||
# BQ dataset name. Rows without it fall back to the
|
||||
# legacy bucket+source_table+remote_attach.project path
|
||||
# (backwards-compatible). Both _SYSTEM_SCHEMA (fresh
|
||||
# installs) and _V50_TO_V51_MIGRATIONS (upgrades) carry
|
||||
# the column so post-migration installs converge.
|
||||
assert SCHEMA_VERSION == 51
|
||||
|
||||
|
||||
def test_v37_marketplace_curator_columns(tmp_path):
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ pattern used for primary_key). Other fields are scalar pass-through.
|
|||
import duckdb
|
||||
import pytest
|
||||
|
||||
from src.db import _V26_TO_V27_MIGRATIONS
|
||||
from src.db import _V26_TO_V27_MIGRATIONS, _V50_TO_V51_MIGRATIONS
|
||||
from src.repositories.table_registry import TableRegistryRepository
|
||||
|
||||
|
||||
|
|
@ -28,6 +28,8 @@ def repo(tmp_path):
|
|||
)
|
||||
for sql in _V26_TO_V27_MIGRATIONS:
|
||||
conn.execute(sql)
|
||||
for sql in _V50_TO_V51_MIGRATIONS:
|
||||
conn.execute(sql)
|
||||
return TableRegistryRepository(conn)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ def test_v41_to_v42_is_idempotent(tmp_path):
|
|||
conn = duckdb.connect(str(db_path))
|
||||
init_database(conn)
|
||||
v = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0]
|
||||
assert v == 50
|
||||
assert v == 51
|
||||
conn.close()
|
||||
|
||||
|
||||
|
|
@ -87,7 +87,7 @@ def test_v41_db_upgrades_cleanly(tmp_path):
|
|||
conn = duckdb.connect(str(db_path))
|
||||
init_database(conn)
|
||||
v = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0]
|
||||
assert v == 50
|
||||
assert v == 51
|
||||
# All 7 new v41 tables exist after the v40→v41 upgrade
|
||||
tables = {
|
||||
row[0]
|
||||
|
|
@ -118,7 +118,7 @@ def test_v30_db_ladders_all_the_way_up(tmp_path):
|
|||
conn = duckdb.connect(str(db_path))
|
||||
init_database(conn)
|
||||
v = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0]
|
||||
assert v == 50
|
||||
assert v == 51
|
||||
cnt = conn.execute("SELECT COUNT(*) FROM audit_log WHERE id='vintage'").fetchone()[0]
|
||||
assert cnt == 1
|
||||
# New v41 table exists
|
||||
|
|
|
|||
Loading…
Reference in a new issue