From ed43feb4e697cf73dac3718743ad60c4e111cbb3 Mon Sep 17 00:00:00 2001 From: ZdenekSrotyr Date: Sat, 11 Apr 2026 11:09:42 +0200 Subject: [PATCH] feat: add POST /api/query/hybrid endpoint for two-phase BQ+DuckDB queries --- app/api/query_hybrid.py | 43 ++++++++++++++++++++++++++++++++++ app/main.py | 2 ++ tests/test_api.py | 52 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 app/api/query_hybrid.py diff --git a/app/api/query_hybrid.py b/app/api/query_hybrid.py new file mode 100644 index 0000000..08178a0 --- /dev/null +++ b/app/api/query_hybrid.py @@ -0,0 +1,43 @@ +"""Hybrid query endpoint — two-phase BQ registration + DuckDB execution.""" + +from typing import Dict + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +from app.auth.dependencies import require_admin +from src.db import get_analytics_db_readonly +from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config + +router = APIRouter(prefix="/api/query", tags=["query"]) + + +class HybridQueryRequest(BaseModel): + sql: str + register_bq: Dict[str, str] = {} + + +@router.post("/hybrid") +async def hybrid_query(request: HybridQueryRequest, user: dict = Depends(require_admin)): + config = load_config() + analytics = get_analytics_db_readonly() + try: + engine = RemoteQueryEngine( + analytics, + max_bq_registration_rows=config.get("max_bq_registration_rows", 500_000), + max_memory_mb=config.get("max_memory_mb", 2048), + max_result_rows=config.get("max_result_rows", 100_000), + timeout_seconds=config.get("timeout_seconds", 300), + ) + for alias, bq_sql in request.register_bq.items(): + try: + engine.register_bq(alias, bq_sql) + except RemoteQueryError as e: + raise HTTPException(status_code=400, detail=f"BQ '{alias}': {e.error_type}: {e}") + try: + result = engine.execute(request.sql) + except RemoteQueryError as e: + raise HTTPException(status_code=400, detail=f"Query: {e.error_type}: {e}") + return result + finally: + analytics.close() diff --git a/app/main.py b/app/main.py index 78c235a..504cb16 100644 --- a/app/main.py +++ b/app/main.py @@ -29,6 +29,7 @@ from app.api.access_requests import router as access_requests_router from app.api.jira_webhooks import router as jira_webhooks_router from app.api.metrics import router as metrics_router from app.api.metadata import router as metadata_router +from app.api.query_hybrid import router as query_hybrid_router from app.web.router import router as web_router logger = logging.getLogger(__name__) @@ -137,6 +138,7 @@ def create_app() -> FastAPI: app.include_router(jira_webhooks_router) app.include_router(metrics_router) app.include_router(metadata_router) + app.include_router(query_hybrid_router) # Web UI router (must be last — has catch-all routes) app.include_router(web_router) diff --git a/tests/test_api.py b/tests/test_api.py index 77894f3..e74e752 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -385,7 +385,7 @@ class TestMetadataAPI: # 'orders' is not in table_registry — expect 404 or 400 assert resp.status_code in (400, 404) - def test_push_keboola_table(self, seeded_client, monkeypatch): + def test_push_keboola_table(self, seeded_client, monkeypatch): # noqa: F811 client, admin_token, _ = seeded_client # 1. Register a keboola table @@ -451,3 +451,53 @@ class TestMetadataAPI: called_json = call_args.kwargs.get("json", {}) assert called_json.get("provider") == "ai-metadata-enrichment" assert isinstance(called_json.get("metadata"), list) + + +# ---- Hybrid Query ---- + +class TestHybridQueryAPI: + def test_hybrid_query_requires_admin(self, seeded_client): + client, _, analyst_token = seeded_client + resp = client.post( + "/api/query/hybrid", + json={"sql": "SELECT 1 AS val", "register_bq": {}}, + headers={"Authorization": f"Bearer {analyst_token}"}, + ) + assert resp.status_code == 403 + + def test_hybrid_query_local_only(self, seeded_client): + client, admin_token, _ = seeded_client + resp = client.post( + "/api/query/hybrid", + json={"sql": "SELECT 1 AS val", "register_bq": {}}, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert "columns" in data + assert "rows" in data + assert data["columns"] == ["val"] + assert data["rows"] == [[1]] + + def test_hybrid_query_blocked_sql(self, seeded_client): + client, admin_token, _ = seeded_client + resp = client.post( + "/api/query/hybrid", + json={"sql": "DROP TABLE users", "register_bq": {}}, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 400 + assert "query_error" in resp.json()["detail"] + + def test_hybrid_query_blocked_bq_sql(self, seeded_client): + client, admin_token, _ = seeded_client + resp = client.post( + "/api/query/hybrid", + json={ + "sql": "SELECT 1", + "register_bq": {"bad_alias": "DROP TABLE sensitive"}, + }, + headers={"Authorization": f"Bearer {admin_token}"}, + ) + assert resp.status_code == 400 + assert "query_error" in resp.json()["detail"]