feat: add POST /api/query/hybrid endpoint for two-phase BQ+DuckDB queries
This commit is contained in:
parent
d605e7d95f
commit
ed43feb4e6
3 changed files with 96 additions and 1 deletions
43
app/api/query_hybrid.py
Normal file
43
app/api/query_hybrid.py
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
"""Hybrid query endpoint — two-phase BQ registration + DuckDB execution."""
|
||||||
|
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from app.auth.dependencies import require_admin
|
||||||
|
from src.db import get_analytics_db_readonly
|
||||||
|
from src.remote_query import RemoteQueryEngine, RemoteQueryError, load_config
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/api/query", tags=["query"])
|
||||||
|
|
||||||
|
|
||||||
|
class HybridQueryRequest(BaseModel):
|
||||||
|
sql: str
|
||||||
|
register_bq: Dict[str, str] = {}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/hybrid")
|
||||||
|
async def hybrid_query(request: HybridQueryRequest, user: dict = Depends(require_admin)):
|
||||||
|
config = load_config()
|
||||||
|
analytics = get_analytics_db_readonly()
|
||||||
|
try:
|
||||||
|
engine = RemoteQueryEngine(
|
||||||
|
analytics,
|
||||||
|
max_bq_registration_rows=config.get("max_bq_registration_rows", 500_000),
|
||||||
|
max_memory_mb=config.get("max_memory_mb", 2048),
|
||||||
|
max_result_rows=config.get("max_result_rows", 100_000),
|
||||||
|
timeout_seconds=config.get("timeout_seconds", 300),
|
||||||
|
)
|
||||||
|
for alias, bq_sql in request.register_bq.items():
|
||||||
|
try:
|
||||||
|
engine.register_bq(alias, bq_sql)
|
||||||
|
except RemoteQueryError as e:
|
||||||
|
raise HTTPException(status_code=400, detail=f"BQ '{alias}': {e.error_type}: {e}")
|
||||||
|
try:
|
||||||
|
result = engine.execute(request.sql)
|
||||||
|
except RemoteQueryError as e:
|
||||||
|
raise HTTPException(status_code=400, detail=f"Query: {e.error_type}: {e}")
|
||||||
|
return result
|
||||||
|
finally:
|
||||||
|
analytics.close()
|
||||||
|
|
@ -29,6 +29,7 @@ from app.api.access_requests import router as access_requests_router
|
||||||
from app.api.jira_webhooks import router as jira_webhooks_router
|
from app.api.jira_webhooks import router as jira_webhooks_router
|
||||||
from app.api.metrics import router as metrics_router
|
from app.api.metrics import router as metrics_router
|
||||||
from app.api.metadata import router as metadata_router
|
from app.api.metadata import router as metadata_router
|
||||||
|
from app.api.query_hybrid import router as query_hybrid_router
|
||||||
from app.web.router import router as web_router
|
from app.web.router import router as web_router
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -137,6 +138,7 @@ def create_app() -> FastAPI:
|
||||||
app.include_router(jira_webhooks_router)
|
app.include_router(jira_webhooks_router)
|
||||||
app.include_router(metrics_router)
|
app.include_router(metrics_router)
|
||||||
app.include_router(metadata_router)
|
app.include_router(metadata_router)
|
||||||
|
app.include_router(query_hybrid_router)
|
||||||
|
|
||||||
# Web UI router (must be last — has catch-all routes)
|
# Web UI router (must be last — has catch-all routes)
|
||||||
app.include_router(web_router)
|
app.include_router(web_router)
|
||||||
|
|
|
||||||
|
|
@ -385,7 +385,7 @@ class TestMetadataAPI:
|
||||||
# 'orders' is not in table_registry — expect 404 or 400
|
# 'orders' is not in table_registry — expect 404 or 400
|
||||||
assert resp.status_code in (400, 404)
|
assert resp.status_code in (400, 404)
|
||||||
|
|
||||||
def test_push_keboola_table(self, seeded_client, monkeypatch):
|
def test_push_keboola_table(self, seeded_client, monkeypatch): # noqa: F811
|
||||||
client, admin_token, _ = seeded_client
|
client, admin_token, _ = seeded_client
|
||||||
|
|
||||||
# 1. Register a keboola table
|
# 1. Register a keboola table
|
||||||
|
|
@ -451,3 +451,53 @@ class TestMetadataAPI:
|
||||||
called_json = call_args.kwargs.get("json", {})
|
called_json = call_args.kwargs.get("json", {})
|
||||||
assert called_json.get("provider") == "ai-metadata-enrichment"
|
assert called_json.get("provider") == "ai-metadata-enrichment"
|
||||||
assert isinstance(called_json.get("metadata"), list)
|
assert isinstance(called_json.get("metadata"), list)
|
||||||
|
|
||||||
|
|
||||||
|
# ---- Hybrid Query ----
|
||||||
|
|
||||||
|
class TestHybridQueryAPI:
|
||||||
|
def test_hybrid_query_requires_admin(self, seeded_client):
|
||||||
|
client, _, analyst_token = seeded_client
|
||||||
|
resp = client.post(
|
||||||
|
"/api/query/hybrid",
|
||||||
|
json={"sql": "SELECT 1 AS val", "register_bq": {}},
|
||||||
|
headers={"Authorization": f"Bearer {analyst_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 403
|
||||||
|
|
||||||
|
def test_hybrid_query_local_only(self, seeded_client):
|
||||||
|
client, admin_token, _ = seeded_client
|
||||||
|
resp = client.post(
|
||||||
|
"/api/query/hybrid",
|
||||||
|
json={"sql": "SELECT 1 AS val", "register_bq": {}},
|
||||||
|
headers={"Authorization": f"Bearer {admin_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 200
|
||||||
|
data = resp.json()
|
||||||
|
assert "columns" in data
|
||||||
|
assert "rows" in data
|
||||||
|
assert data["columns"] == ["val"]
|
||||||
|
assert data["rows"] == [[1]]
|
||||||
|
|
||||||
|
def test_hybrid_query_blocked_sql(self, seeded_client):
|
||||||
|
client, admin_token, _ = seeded_client
|
||||||
|
resp = client.post(
|
||||||
|
"/api/query/hybrid",
|
||||||
|
json={"sql": "DROP TABLE users", "register_bq": {}},
|
||||||
|
headers={"Authorization": f"Bearer {admin_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 400
|
||||||
|
assert "query_error" in resp.json()["detail"]
|
||||||
|
|
||||||
|
def test_hybrid_query_blocked_bq_sql(self, seeded_client):
|
||||||
|
client, admin_token, _ = seeded_client
|
||||||
|
resp = client.post(
|
||||||
|
"/api/query/hybrid",
|
||||||
|
json={
|
||||||
|
"sql": "SELECT 1",
|
||||||
|
"register_bq": {"bad_alias": "DROP TABLE sensitive"},
|
||||||
|
},
|
||||||
|
headers={"Authorization": f"Bearer {admin_token}"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 400
|
||||||
|
assert "query_error" in resp.json()["detail"]
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue