diff --git a/app/api/query.py b/app/api/query.py index 5ea2179..5cb67fb 100644 --- a/app/api/query.py +++ b/app/api/query.py @@ -149,7 +149,13 @@ async def execute_query( # implementation released the slot before execute. Use a context # manager so dry-run + cap check + execute + record_bytes all run # inside the slot. - user_id = user.get("id") or user.get("email") or "anon" + # Match /api/v2/scan's user_id key shape (`email or "anon"`) so the + # shared QuotaTracker singleton sees the SAME key for both endpoints. + # Earlier `id or email` ordering keyed BQ bytes on UUID for /api/query + # vs email for /api/v2/scan — the per-user daily cap was effectively + # doubled because the two paths tracked under different keys. + # Devin Review #2 caught this on PR #168. + user_id = user.get("email") or user.get("id") or "anon" guard = ( _bq_quota_and_cap_guard( user_id=user_id, dry_run_set=dry_run_set, sql=request.sql, @@ -420,35 +426,56 @@ def _bq_quota_and_cap_guard(*, user_id: str, dry_run_set: list, sql: str): cap_bytes = _default_remote_query_cap_bytes() - with quota.acquire(user_id): - total_bytes = 0 - for i, (bucket, source_table, _) in enumerate(dry_run_set): - bq_sql = f"SELECT * FROM `{bq.projects.data}.{bucket}.{source_table}`" - try: - est = _bq_dry_run_bytes(bq, bq_sql) - except BqAccessError as exc: - raise HTTPException(status_code=502, detail={ - "kind": exc.kind, - "message": exc.message, - **(exc.details or {}), + # `quota.acquire(user_id)` raises QuotaExceededError(KIND_CONCURRENT) + # via __enter__ when the per-user concurrent-scan slot is at cap. + # Catch around the `with` and map to HTTP 429 with the typed detail + # shape — same shape as the daily-budget rejection above. Without + # this, the exception propagates through @contextlib.contextmanager + # and is caught by execute_query's generic `except Exception` → + # returns HTTP 400 with a flattened "Query error: concurrent_scans: + # N/M" string, dropping the typed retry_after_seconds field. + # Devin Review #2 on PR #168. + try: + with quota.acquire(user_id): + total_bytes = 0 + for i, (bucket, source_table, _) in enumerate(dry_run_set): + bq_sql = f"SELECT * FROM `{bq.projects.data}.{bucket}.{source_table}`" + try: + est = _bq_dry_run_bytes(bq, bq_sql) + except BqAccessError as exc: + raise HTTPException(status_code=502, detail={ + "kind": exc.kind, + "message": exc.message, + **(exc.details or {}), + }) + dry_run_set[i] = (bucket, source_table, est) + total_bytes += est + + if cap_bytes > 0 and total_bytes > cap_bytes: + tables = [f"{b}.{t}" for b, t, _ in dry_run_set] + raise HTTPException(status_code=400, detail={ + "reason": "remote_scan_too_large", + "scan_bytes": total_bytes, + "limit_bytes": cap_bytes, + "tables": tables, + "suggestion": ( + "Use `da fetch --select --where " + "--estimate` to materialize a filtered subset, then query " + "the snapshot locally." + ), }) - dry_run_set[i] = (bucket, source_table, est) - total_bytes += est - if cap_bytes > 0 and total_bytes > cap_bytes: - tables = [f"{b}.{t}" for b, t, _ in dry_run_set] - raise HTTPException(status_code=400, detail={ - "reason": "remote_scan_too_large", - "scan_bytes": total_bytes, - "limit_bytes": cap_bytes, - "tables": tables, - "suggestion": ( - "Use `da fetch --select --where " - "--estimate` to materialize a filtered subset, then query " - "the snapshot locally." - ), - }) - - # Yield control to the handler — slot stays acquired while the - # caller runs analytics.execute() + record_bytes(). - yield total_bytes + # Yield control to the handler — slot stays acquired while the + # caller runs analytics.execute() + record_bytes(). + yield total_bytes + except QuotaExceededError as exc: + # Only KIND_CONCURRENT can land here (daily-budget already mapped + # above; record_bytes never raises). Map to 429 with structured + # detail consistent with the daily-budget shape. + raise HTTPException(status_code=429, detail={ + "reason": "concurrent_slot_exceeded", + "kind": exc.kind, + "current": exc.current, + "limit": exc.limit, + "retry_after_seconds": exc.retry_after_seconds, + }) diff --git a/tests/test_api_query_quota.py b/tests/test_api_query_quota.py index 702b9fd..a37d191 100644 --- a/tests/test_api_query_quota.py +++ b/tests/test_api_query_quota.py @@ -68,7 +68,7 @@ def test_query_records_bytes_against_shared_quota(seeded_app, fresh_quota, mock_ # Pre-flight: tracker has zero usage for this user. tracker = fresh_quota._build_quota_tracker() - user_id = "admin1" # seeded_app's admin user id + user_id = "admin@test.com" # email-keyed per parity with /api/v2/scan (#168 review) # seeded_app's admin user id before = tracker.bytes_used_today(user_id) r = c.post( @@ -93,7 +93,7 @@ def test_query_pre_flight_rejects_user_over_daily_cap(seeded_app, fresh_quota, m # Plant the user's daily counter already at the cap by injecting bytes. tracker = fresh_quota._build_quota_tracker() - user_id = "admin1" + user_id = "admin@test.com" # email-keyed per parity with /api/v2/scan (#168 review) # Push counter past the cap (default 50 GiB). tracker.record_bytes(user_id, tracker._max_daily_bytes + 1) @@ -111,7 +111,7 @@ def test_non_bq_query_skips_quota_path(seeded_app, fresh_quota, mock_dry_run): """A query that doesn't touch any registered remote BQ row must NOT decrement quota. Quota wiring runs only when dry_run_set is non-empty.""" tracker = fresh_quota._build_quota_tracker() - user_id = "admin1" + user_id = "admin@test.com" # email-keyed per parity with /api/v2/scan (#168 review) before = tracker.bytes_used_today(user_id) c = seeded_app["client"]