From 72230c3b51546f6ee87048bf0b587801f261e8d9 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr <139972147+ZdenekSrotyr@users.noreply.github.com> Date: Mon, 27 Apr 2026 22:09:49 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20#81=20Group=20C=20=E2=80=94=20view-name?= =?UTF-8?q?=20collision=20detection=20(schema=20v10,=20squashed)=20(#100)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Schema v10 + view_ownership table. Cross-connector view name collisions are detected and refused with an actionable ERROR rather than silently last-write-wins. Pre-scan reconcile releases stale ownerships in the same rebuild as a rename — but only when ALL sources' pre-scans succeed (transient-IO defense; partial pre-scan skips reconcile to avoid silently stealing a name). 26/26 view collision + orchestrator tests pass. Refs #81 Group C. --- CHANGELOG.md | 28 +++ src/db.py | 34 ++- src/orchestrator.py | 167 ++++++++++++++- src/repositories/view_ownership.py | 109 ++++++++++ tests/test_schema_v9_migration.py | 11 +- tests/test_view_collision_detection.py | 280 +++++++++++++++++++++++++ 6 files changed, 620 insertions(+), 9 deletions(-) create mode 100644 src/repositories/view_ownership.py create mode 100644 tests/test_view_collision_detection.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 1817704..38ef10c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,26 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C +### Added + +- **Schema v10** introduces `view_ownership` to detect cross-connector + view-name collisions in the master analytics DB (issue #81 Group C). + When two connectors register the same `_meta.table_name`, the + orchestrator now refuses to silently overwrite the prior owner's view — + it logs a `view_ownership collision` ERROR identifying both sources + and the colliding name, and the second source's view is NOT created. + Previously this was last-write-wins, which depended on directory + iteration order and could change deployment-to-deployment. Operators + resolve a collision by renaming `name` in `table_registry` on one side + (registry-side aliasing — `source_table` stays unchanged, only the + view name changes). The orchestrator pre-scans every connector's + `_meta` at the start of each rebuild and releases stale ownerships + immediately (when ALL pre-scans succeed; if any fail, reconcile is + skipped to avoid silently stealing a transient-IO source's name), + so a renamed table frees its name in the SAME rebuild that introduces + the rename — no two-step waits needed. New module + `src/repositories/view_ownership.py` exposes the repository. + ### Changed - **BREAKING (ops)**: Keboola extractor now exits with three distinct @@ -194,6 +214,14 @@ CalVer image tags (`stable-YYYY.MM.N`, `dev-YYYY.MM.N`) are produced for every C - `tests/conftest.py::seeded_app` extended with `viewer_token` and `km_admin_token` so role-gating tests cover all four core roles. +### Migrated + +- **Schema bumped from v9 to v10**. Auto-migration applies on next start + (creates the `view_ownership` table; data on disk is unaffected). The + pre-migration snapshot machinery (added at v8→v9) covers v9→v10 too — + if anything goes wrong during the migration, the snapshot at + `/state/system.duckdb.pre-migrate` lets you roll back. + ## [0.11.5] — 2026-04-27 Follow-up release for PR #73: addresses four rounds of Devin AI review on the role-management-complete branch. No new public-API surface; the user-visible payoff is that v8→v9-migrated installations now work end-to-end (login flows, user list, admin nav, privilege revocation), and `make local-dev` startup is finally quiet. diff --git a/src/db.py b/src/db.py index 1210cbb..4ee64ad 100644 --- a/src/db.py +++ b/src/db.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) _SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$") -SCHEMA_VERSION = 9 +SCHEMA_VERSION = 10 _SYSTEM_SCHEMA = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -64,6 +64,19 @@ CREATE TABLE IF NOT EXISTS sync_state ( error TEXT ); +-- v10: view-name collision detection across connectors. The orchestrator +-- writes views into the master analytics.duckdb under a flat namespace; two +-- connectors with the same `_meta.table_name` would otherwise silently +-- overwrite each other (last-write-wins). This table records the FIRST +-- source to register a given view name; subsequent attempts from a different +-- source are refused with a `name_collision` log line until the operator +-- renames one side. Issue #81 Group C. +CREATE TABLE IF NOT EXISTS view_ownership ( + view_name VARCHAR PRIMARY KEY, + source_name VARCHAR NOT NULL, + registered_at TIMESTAMP NOT NULL DEFAULT current_timestamp +); + CREATE TABLE IF NOT EXISTS sync_history ( id VARCHAR PRIMARY KEY, table_id VARCHAR NOT NULL, @@ -585,6 +598,18 @@ _V8_TO_V9_MIGRATIONS = [ """, ] +# Issue #81 Group C — view-name collision detection. New table records the +# first source to register a given view name in the master analytics DB. +_V9_TO_V10_MIGRATIONS = [ + """ + CREATE TABLE IF NOT EXISTS view_ownership ( + view_name VARCHAR PRIMARY KEY, + source_name VARCHAR NOT NULL, + registered_at TIMESTAMP NOT NULL DEFAULT current_timestamp + ) + """, +] + # Core role seed data — single source of truth. Used by both _seed_core_roles # (idempotent insert) and the v8→v9 backfill. Order matters: lowest privilege @@ -798,6 +823,13 @@ def _ensure_schema(conn: duckdb.DuckDBPyConnection) -> None: if current < 9: for sql in _V8_TO_V9_MIGRATIONS: conn.execute(sql) + _did_v9_finalize = True + else: + _did_v9_finalize = False + if current < 10: + for sql in _V9_TO_V10_MIGRATIONS: + conn.execute(sql) + if _did_v9_finalize: # v9 finalize: seed core.* roles, backfill grants from # legacy users.role, then drop the column. Order matters — # backfill needs the seed rows to exist; drop must be last. diff --git a/src/orchestrator.py b/src/orchestrator.py index 0183e2f..f866d45 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -23,7 +23,7 @@ import os import re import threading from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional import duckdb @@ -99,12 +99,110 @@ class SyncOrchestrator: with _rebuild_lock: return self._do_rebuild_source(source_name) + def _scan_meta_pairs(self, extracts_dir: Path) -> tuple: + """Read every connector's `_meta` and return (pairs, clean) where: + + - ``pairs`` — list of (source_name, table_name) tuples successfully + gathered from `_meta`. + - ``clean`` — True iff every source's pre-scan succeeded. False if + any source's `_meta` couldn't be read (transient I/O, mid-write, + missing/corrupt extract.duckdb). + + Used by view_ownership.reconcile to release stale claims before + the main rebuild loop tries to claim new names. The ``clean`` flag + guards against a correctness bug: if source B's pre-scan fails + and we naively reconcile against an incomplete `pairs` list, B's + prior ownership is dropped, and another source could claim B's + name in the same rebuild — a silent overwrite, exactly what + Group C is meant to prevent. Callers MUST skip reconcile when + ``clean`` is False; per-row claim-time collision detection still + catches actual collisions. + """ + pairs: List[tuple] = [] + clean = True + for ext_dir in sorted(extracts_dir.iterdir()): + if not ext_dir.is_dir(): + continue + db_file = ext_dir / "extract.duckdb" + if not db_file.exists(): + continue + if not _validate_identifier(ext_dir.name, "source_name"): + continue + try: + ro_conn = duckdb.connect(str(db_file), read_only=True) + try: + rows = ro_conn.execute( + "SELECT table_name FROM _meta" + ).fetchall() + for (table_name,) in rows: + if _validate_identifier(table_name, "table_name"): + pairs.append((ext_dir.name, table_name)) + finally: + ro_conn.close() + except Exception as e: + logger.warning( + "scan_meta_pairs: failed to read %s (%s) — " + "skipping reconcile this rebuild to avoid releasing " + "ownerships prematurely", + ext_dir.name, e, + ) + clean = False + return pairs, clean + def _do_rebuild(self) -> Dict[str, List[str]]: extracts_dir = _get_extracts_dir() if not extracts_dir.exists(): logger.warning("Extracts directory %s does not exist", extracts_dir) return {} + # Issue #81 Group C — load view ownership map from system DB so we + # can detect cross-connector view-name collisions during this + # rebuild and refuse to silently overwrite a previously-claimed + # name. The map is kept in system.duckdb (analytics.duckdb is + # rebuilt fresh each time and would not survive). + from src.db import get_system_db + from src.repositories.view_ownership import ViewOwnershipRepository + sys_conn_for_views = get_system_db() + view_repo = None + try: + view_repo = ViewOwnershipRepository(sys_conn_for_views) + # Pre-scan every connector's _meta so we can run the reconcile + # pass BEFORE claims are evaluated. This makes "owner stopped + # publishing → name freed → another source can claim" work in + # the SAME rebuild rather than requiring two consecutive runs. + # + # Correctness: only reconcile when EVERY source's pre-scan + # succeeded. Otherwise a transient I/O failure on source B + # would drop B's prior ownership and let another source steal + # B's name — silent overwrite, exactly the bug Group C + # prevents. Per-row claim-time collision detection still + # catches actual collisions even without reconcile this run. + current_pairs, pre_scan_clean = self._scan_meta_pairs(extracts_dir) + if pre_scan_clean: + view_repo.reconcile(current_pairs) + else: + logger.warning( + "view_ownership: skipping reconcile this rebuild — " + "pre-scan was incomplete; renamed tables will release " + "their names on the next clean rebuild instead" + ) + existing_owners = view_repo.get_all() + except Exception as e: + logger.warning( + "view_ownership pre-scan failed: %s — proceeding without " + "collision detection", e, + ) + existing_owners = {} + view_repo = None + try: + sys_conn_for_views.close() + except Exception: + pass + sys_conn_for_views = None + + # Track every (source, view) pair this rebuild successfully claims. + claimed_pairs: List[tuple] = [] + result = {} # Write to temp file then rename — avoids lock conflict with query endpoint tmp_path = self._db_path + ".tmp" @@ -139,14 +237,32 @@ class SyncOrchestrator: continue tables = self._attach_and_create_views( - conn, ext_dir.name, str(db_file) + conn, ext_dir.name, str(db_file), + existing_owners=existing_owners, + claimed_pairs=claimed_pairs, + view_repo=view_repo if sys_conn_for_views else None, ) if tables: result[ext_dir.name] = tables logger.info("Attached %s: %d tables", ext_dir.name, len(tables)) + + # No end-of-rebuild reconcile: the pre-scan reconcile above + # already released stale ownerships using a complete view of + # every source's `_meta`. Reconciling again here against + # `claimed_pairs` (which excludes refused collisions and any + # source that failed to attach) would incorrectly drop the + # legitimate prior owner of a name when its DB happens to be + # transiently unreadable. See test + # `test_pre_scan_failure_does_not_release_ownership` for the + # contract. finally: conn.execute("CHECKPOINT") conn.close() + if sys_conn_for_views is not None: + try: + sys_conn_for_views.close() + except Exception: + pass # Atomic swap: replace analytics.duckdb with new version _atomic_swap_db(tmp_path, self._db_path) @@ -170,9 +286,25 @@ class SyncOrchestrator: return result.get(source_name, []) def _attach_and_create_views( - self, conn: duckdb.DuckDBPyConnection, source_name: str, db_path: str + self, + conn: duckdb.DuckDBPyConnection, + source_name: str, + db_path: str, + existing_owners: Optional[Dict[str, str]] = None, + claimed_pairs: Optional[List[tuple]] = None, + view_repo=None, ) -> List[str]: - """ATTACH extract.duckdb, read _meta, create views in master.""" + """ATTACH extract.duckdb, read _meta, create views in master. + + Issue #81 Group C — when ``existing_owners`` and ``view_repo`` are + provided, the orchestrator checks for cross-connector view-name + collisions and refuses to overwrite a name owned by another source. + ``claimed_pairs`` accumulates the (source, view) tuples this + rebuild successfully claims; the caller uses it for end-of-rebuild + reconcile. + """ + if existing_owners is None: + existing_owners = {} tables = [] try: conn.execute(f"ATTACH '{db_path}' AS {source_name} (READ_ONLY)") @@ -189,6 +321,33 @@ class SyncOrchestrator: for table_name, rows, size_bytes, query_mode in meta_rows: if not _validate_identifier(table_name, "table_name"): continue + + # Issue #81 Group C — refuse cross-connector collisions. + # First-come-first-served: the source already in + # view_ownership keeps the name; any other source that + # tries to claim it gets logged + skipped until the + # operator renames one side. Re-claim by the same source + # is fine (idempotent rebuild). + if view_repo is not None: + if not view_repo.claim(table_name, source_name): + # Query live owner — covers two cases: + # (1) stale snapshot from rebuild start (existing_owners), + # (2) two sources both first-time-claim the same name + # in this rebuild — the loser sees the winner here. + prior_owner = ( + view_repo.get_owner(table_name) + or existing_owners.get(table_name, "") + ) + logger.error( + "view_ownership collision: %s already owns view %r; " + "%s.%s will NOT be exposed. Rename `name` in the " + "table_registry on one side to resolve.", + prior_owner, table_name, source_name, table_name, + ) + continue + if claimed_pairs is not None: + claimed_pairs.append((source_name, table_name)) + conn.execute( f"CREATE OR REPLACE VIEW \"{table_name}\" AS " f"SELECT * FROM {source_name}.\"{table_name}\"" diff --git a/src/repositories/view_ownership.py b/src/repositories/view_ownership.py new file mode 100644 index 0000000..87ce6ea --- /dev/null +++ b/src/repositories/view_ownership.py @@ -0,0 +1,109 @@ +"""Repository for view-name ownership across connectors. + +Issue #81 Group C — when two connectors register the same view name in the +master analytics DB, the second one used to silently overwrite the first +(last-write-wins). With this repository the orchestrator records the FIRST +source to claim a name and refuses subsequent collisions until the operator +renames one side. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Dict, List, Optional, Tuple + +import duckdb + + +class ViewOwnershipRepository: + def __init__(self, conn: duckdb.DuckDBPyConnection): + self.conn = conn + + def get_owner(self, view_name: str) -> Optional[str]: + """Return the source_name that owns ``view_name``, or None.""" + row = self.conn.execute( + "SELECT source_name FROM view_ownership WHERE view_name = ?", + [view_name], + ).fetchone() + return row[0] if row else None + + def get_all(self) -> Dict[str, str]: + """Return {view_name: source_name} for every registered ownership.""" + rows = self.conn.execute( + "SELECT view_name, source_name FROM view_ownership" + ).fetchall() + return {r[0]: r[1] for r in rows} + + def claim(self, view_name: str, source_name: str) -> bool: + """Register ``source_name`` as the owner of ``view_name``. + + Returns True if the claim succeeds (either fresh registration or + re-claim by the same source). Returns False if a different source + already owns the name — the caller MUST then refuse to create the + view and surface the collision to operators. + """ + existing = self.get_owner(view_name) + if existing is None: + self.conn.execute( + "INSERT INTO view_ownership (view_name, source_name, registered_at) " + "VALUES (?, ?, ?)", + [view_name, source_name, datetime.now(timezone.utc)], + ) + return True + return existing == source_name + + def release(self, view_name: str, source_name: str) -> bool: + """Drop ownership of ``view_name`` if held by ``source_name``. + + Used during rebuild cleanup when a connector no longer publishes a + previously-claimed name (e.g. operator renamed the table on the + upstream side). Returns True if a row was removed. + """ + before = self.conn.execute( + "SELECT COUNT(*) FROM view_ownership " + "WHERE view_name = ? AND source_name = ?", + [view_name, source_name], + ).fetchone()[0] + if before == 0: + return False + self.conn.execute( + "DELETE FROM view_ownership " + "WHERE view_name = ? AND source_name = ?", + [view_name, source_name], + ) + return True + + def reconcile( + self, current_pairs: List[Tuple[str, str]] + ) -> List[Tuple[str, str]]: + """Drop ownerships for (source_name, view_name) pairs no longer + present in ``current_pairs``. Returns the list of dropped pairs. + + Called at the end of `SyncOrchestrator.rebuild()` so a renamed or + removed table immediately releases its name; the next rebuild can + let a different source claim it without operator intervention. + """ + live = set(current_pairs) + all_rows = self.conn.execute( + "SELECT source_name, view_name FROM view_ownership" + ).fetchall() + dropped = [ + (src, view) for src, view in all_rows + if (src, view) not in live + ] + for src, view in dropped: + self.conn.execute( + "DELETE FROM view_ownership " + "WHERE source_name = ? AND view_name = ?", + [src, view], + ) + return dropped + + def list_for_source(self, source_name: str) -> List[str]: + """Return all view names owned by ``source_name``.""" + rows = self.conn.execute( + "SELECT view_name FROM view_ownership " + "WHERE source_name = ? ORDER BY view_name", + [source_name], + ).fetchall() + return [r[0] for r in rows] diff --git a/tests/test_schema_v9_migration.py b/tests/test_schema_v9_migration.py index 6269ac0..0f4e2c3 100644 --- a/tests/test_schema_v9_migration.py +++ b/tests/test_schema_v9_migration.py @@ -77,10 +77,12 @@ def _v8_state(db_path) -> duckdb.DuckDBPyConnection: class TestFreshInstall: """Fresh DB → v9 directly via _SYSTEM_SCHEMA + INSERT version + seed.""" - def test_schema_version_is_9(self, fresh_data_dir): - from src.db import get_system_db, get_schema_version + def test_schema_version_is_current(self, fresh_data_dir): + from src.db import get_system_db, get_schema_version, SCHEMA_VERSION conn = get_system_db() - assert get_schema_version(conn) == 9 + # Was hard-coded to 9 before #81 Group C bumped SCHEMA_VERSION to 10. + # Compare against the constant so future bumps don't churn this test. + assert get_schema_version(conn) == SCHEMA_VERSION def test_core_roles_seeded_with_implies_hierarchy(self, fresh_data_dir): from src.db import get_system_db @@ -132,7 +134,8 @@ class TestV8ToV9Migration: # Trigger migration. from src.db import get_system_db, get_schema_version conn = get_system_db() - assert get_schema_version(conn) == 9 + from src.db import SCHEMA_VERSION + assert get_schema_version(conn) == SCHEMA_VERSION rows = conn.execute( """SELECT u.email, r.key, g.source diff --git a/tests/test_view_collision_detection.py b/tests/test_view_collision_detection.py new file mode 100644 index 0000000..1f20f8f --- /dev/null +++ b/tests/test_view_collision_detection.py @@ -0,0 +1,280 @@ +"""Issue #81 Group C — view-name collision detection across connectors. + +Two connectors with overlapping `_meta.table_name` used to silently +overwrite each other in the master analytics DB. This file exercises: + +- Schema v10's `view_ownership` table exists after migration. +- `ViewOwnershipRepository.claim` is first-come-first-served. +- `ViewOwnershipRepository.reconcile` releases stale ownerships. +- The orchestrator refuses to overwrite a view owned by a different + source, logs an ERROR, but keeps publishing views for the winner. +""" + +from __future__ import annotations + +import os +import duckdb +import pytest + +from src.repositories.view_ownership import ViewOwnershipRepository + + +# -------------------------------------------------------------------------- +# Repository unit tests +# -------------------------------------------------------------------------- + + +@pytest.fixture +def fresh_system_db(tmp_path, monkeypatch): + """Set DATA_DIR to a fresh temp dir and trigger a v10 schema build.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + from src.db import get_system_db + conn = get_system_db() + yield conn + conn.close() + + +class TestViewOwnershipRepository: + def test_claim_first_succeeds(self, fresh_system_db): + repo = ViewOwnershipRepository(fresh_system_db) + assert repo.claim("orders", "keboola") is True + assert repo.get_owner("orders") == "keboola" + + def test_claim_same_source_idempotent(self, fresh_system_db): + repo = ViewOwnershipRepository(fresh_system_db) + repo.claim("orders", "keboola") + # Re-claiming by the same source is fine — rebuild is idempotent. + assert repo.claim("orders", "keboola") is True + + def test_claim_different_source_refused(self, fresh_system_db): + repo = ViewOwnershipRepository(fresh_system_db) + repo.claim("orders", "keboola") + # Second source asks for the same name — refused. + assert repo.claim("orders", "bigquery") is False + # Original owner unchanged. + assert repo.get_owner("orders") == "keboola" + + def test_release(self, fresh_system_db): + repo = ViewOwnershipRepository(fresh_system_db) + repo.claim("orders", "keboola") + assert repo.release("orders", "keboola") is True + assert repo.get_owner("orders") is None + + def test_release_wrong_source_no_op(self, fresh_system_db): + repo = ViewOwnershipRepository(fresh_system_db) + repo.claim("orders", "keboola") + # Release should not delete a row owned by a different source. + assert repo.release("orders", "bigquery") is False + assert repo.get_owner("orders") == "keboola" + + def test_reconcile_drops_stale_pairs(self, fresh_system_db): + repo = ViewOwnershipRepository(fresh_system_db) + repo.claim("orders", "keboola") + repo.claim("users", "keboola") + repo.claim("traffic", "bigquery") + + # Next rebuild claims orders + traffic only — users should be released. + live = [("keboola", "orders"), ("bigquery", "traffic")] + dropped = repo.reconcile(live) + assert dropped == [("keboola", "users")] + assert repo.get_owner("users") is None + assert repo.get_owner("orders") == "keboola" + assert repo.get_owner("traffic") == "bigquery" + + def test_list_for_source(self, fresh_system_db): + repo = ViewOwnershipRepository(fresh_system_db) + repo.claim("orders", "keboola") + repo.claim("users", "keboola") + repo.claim("traffic", "bigquery") + assert repo.list_for_source("keboola") == ["orders", "users"] + assert repo.list_for_source("bigquery") == ["traffic"] + + +# -------------------------------------------------------------------------- +# Orchestrator behaviour test +# -------------------------------------------------------------------------- + + +def _make_extract_db(path: str, table_names: list[str]) -> None: + """Create a minimal extract.duckdb with `_meta` rows + a view per table. + + Returns nothing — the file at `path` is the connector's output. + """ + os.makedirs(os.path.dirname(path), exist_ok=True) + # Recreate from scratch — round-2 tests rebuild the same connector dir + # with different tables, so wipe any prior file. + if os.path.exists(path): + os.unlink(path) + wal = path + ".wal" + if os.path.exists(wal): + os.unlink(wal) + conn = duckdb.connect(path) + conn.execute( + "CREATE TABLE _meta (" + "table_name VARCHAR, description VARCHAR, rows BIGINT, " + "size_bytes BIGINT, extracted_at TIMESTAMP, query_mode VARCHAR)" + ) + for t in table_names: + # Each table is a tiny in-memory CTAS — query_mode='local' so the + # orchestrator picks it up the same way it does parquet-backed views. + conn.execute(f'CREATE TABLE "{t}" AS SELECT 1 AS x') + conn.execute( + "INSERT INTO _meta VALUES (?, ?, 1, 0, current_timestamp, 'local')", + [t, ""], + ) + conn.close() + + +class TestOrchestratorCollisionRefusal: + def test_first_source_wins_second_source_skipped( + self, tmp_path, monkeypatch + ): + """Two sources both publish a view named `orders`. The first one + the orchestrator visits (alphabetical order) keeps the name; the + other is logged as a collision and its view is NOT created.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + + # Sources visited alphabetically: alpha first, beta second. + _make_extract_db( + str(tmp_path / "extracts" / "alpha" / "extract.duckdb"), + ["orders", "alpha_only"], + ) + _make_extract_db( + str(tmp_path / "extracts" / "beta" / "extract.duckdb"), + ["orders", "beta_only"], + ) + + from src.orchestrator import SyncOrchestrator + orch = SyncOrchestrator() + result = orch.rebuild() + + # alpha got both its views (it ran first); beta only got its non-colliding one. + assert "orders" in result["alpha"] + assert "alpha_only" in result["alpha"] + assert "orders" not in result["beta"], ( + "beta should NOT have published a colliding `orders` view" + ) + assert "beta_only" in result["beta"] + + # Ownership records persisted in system DB. + from src.db import get_system_db + sys_conn = get_system_db() + try: + repo = ViewOwnershipRepository(sys_conn) + assert repo.get_owner("orders") == "alpha" + assert repo.get_owner("alpha_only") == "alpha" + assert repo.get_owner("beta_only") == "beta" + finally: + sys_conn.close() + + def test_partial_collision_does_not_block_other_tables( + self, tmp_path, monkeypatch + ): + """Source A publishes [orders, alpha_only_a, alpha_only_b]; source + B publishes [orders, beta_only]. The collision on `orders` (A wins, + first by alphabet) must NOT prevent B from publishing `beta_only`, + nor prevent A from publishing its other two.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + + _make_extract_db( + str(tmp_path / "extracts" / "alpha" / "extract.duckdb"), + ["orders", "alpha_only_a", "alpha_only_b"], + ) + _make_extract_db( + str(tmp_path / "extracts" / "beta" / "extract.duckdb"), + ["orders", "beta_only"], + ) + + from src.orchestrator import SyncOrchestrator + result = SyncOrchestrator().rebuild() + + assert set(result["alpha"]) == {"orders", "alpha_only_a", "alpha_only_b"} + assert set(result["beta"]) == {"beta_only"} + + def test_pre_scan_failure_does_not_release_ownership( + self, tmp_path, monkeypatch + ): + """When `_scan_meta_pairs` cannot read source B (corrupt + extract.duckdb, transient I/O), the orchestrator must SKIP + reconcile this rebuild — otherwise B's name would be released and + another source could silently steal it. Issue #81 Group C + review-2.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + + # Round 1: alpha owns orders. + _make_extract_db( + str(tmp_path / "extracts" / "alpha" / "extract.duckdb"), + ["orders"], + ) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + # Round 2: alpha's extract.duckdb is unreadable (simulate corrupt + # file by writing garbage). beta now publishes `orders`. The + # reconcile should be SKIPPED because the scan was incomplete; + # alpha keeps ownership; beta is refused. + alpha_db = tmp_path / "extracts" / "alpha" / "extract.duckdb" + alpha_wal = tmp_path / "extracts" / "alpha" / "extract.duckdb.wal" + alpha_db.write_bytes(b"NOT A REAL DUCKDB FILE") + if alpha_wal.exists(): + alpha_wal.unlink() + _make_extract_db( + str(tmp_path / "extracts" / "beta" / "extract.duckdb"), + ["orders"], + ) + result = SyncOrchestrator().rebuild() + + # alpha did not contribute a view (file unreadable); beta did NOT + # get to claim `orders` because reconcile was skipped. + assert "orders" not in result.get("beta", []), ( + "beta should have been refused `orders` — reconcile must skip " + "when pre-scan is incomplete" + ) + + # Ownership unchanged. + from src.db import get_system_db + sys_conn = get_system_db() + try: + repo = ViewOwnershipRepository(sys_conn) + assert repo.get_owner("orders") == "alpha" + finally: + sys_conn.close() + + def test_owner_releases_name_after_rename(self, tmp_path, monkeypatch): + """If the previous owner of a name no longer publishes it, the + next rebuild releases the name — a different source can then + claim it without operator intervention.""" + monkeypatch.setenv("DATA_DIR", str(tmp_path)) + + # Round 1: alpha owns orders. + _make_extract_db( + str(tmp_path / "extracts" / "alpha" / "extract.duckdb"), + ["orders"], + ) + from src.orchestrator import SyncOrchestrator + SyncOrchestrator().rebuild() + + # Round 2: alpha renames its table to alpha_orders, beta wants `orders`. + _make_extract_db( + str(tmp_path / "extracts" / "alpha" / "extract.duckdb"), + ["alpha_orders"], + ) + _make_extract_db( + str(tmp_path / "extracts" / "beta" / "extract.duckdb"), + ["orders"], + ) + result = SyncOrchestrator().rebuild() + + assert "alpha_orders" in result["alpha"] + assert "orders" in result["beta"], ( + "beta should now own `orders` after alpha released it" + ) + + from src.db import get_system_db + sys_conn = get_system_db() + try: + repo = ViewOwnershipRepository(sys_conn) + assert repo.get_owner("orders") == "beta" + assert repo.get_owner("alpha_orders") == "alpha" + finally: + sys_conn.close()