fix: validate extract dir name in get_analytics_db_readonly to prevent SQL injection

Adds _SAFE_IDENTIFIER regex guard before ATTACHing extract.duckdb files in the
read-only analytics connection, matching the same fix already applied in the
orchestrator. Adds test coverage for malicious directory names.
This commit is contained in:
ZdenekSrotyr 2026-04-09 06:57:31 +02:00
parent e425d4baa5
commit 3e9c347cf1
2 changed files with 43 additions and 0 deletions

View file

@ -5,10 +5,13 @@ and get_analytics_db() for the analytics database with parquet views.
"""
import os
import re
from pathlib import Path
import duckdb
_SAFE_IDENTIFIER = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]{0,63}$")
SCHEMA_VERSION = 3
_SYSTEM_SCHEMA = """
@ -233,6 +236,8 @@ def get_analytics_db_readonly() -> duckdb.DuckDBPyConnection:
for ext_dir in sorted(extracts_dir.iterdir()):
db_file = ext_dir / "extract.duckdb"
if db_file.exists() and ext_dir.is_dir():
if not _SAFE_IDENTIFIER.match(ext_dir.name):
continue
try:
conn.execute(f"ATTACH '{db_file}' AS {ext_dir.name} (READ_ONLY)")
except Exception:

View file

@ -142,3 +142,41 @@ class TestGetAnalyticsDb:
assert (tmp_path / "analytics" / "server.duckdb").exists()
finally:
conn.close()
class TestGetAnalyticsDbReadonly:
def test_analytics_readonly_rejects_malicious_dir_name(self, tmp_path):
"""Directories with SQL-injection chars in their name are skipped."""
_setup_data_dir(tmp_path)
import importlib
import src.db as db_module
importlib.reload(db_module)
# Create the analytics DB first so get_analytics_db_readonly takes the read_only path
analytics_dir = tmp_path / "analytics"
analytics_dir.mkdir(parents=True, exist_ok=True)
import duckdb as _duckdb
seed_conn = _duckdb.connect(str(analytics_dir / "server.duckdb"))
seed_conn.close()
# Create a malicious extract directory whose name contains SQL injection chars
malicious_name = "foo) AS x; DROP TABLE users; --"
ext_dir = tmp_path / "extracts" / malicious_name
ext_dir.mkdir(parents=True, exist_ok=True)
# Place a real (empty) extract.duckdb inside it
mal_conn = _duckdb.connect(str(ext_dir / "extract.duckdb"))
mal_conn.close()
# get_analytics_db_readonly must not raise and must skip the malicious dir
conn = db_module.get_analytics_db_readonly()
try:
# Verify no attachment was made for the malicious source name
attached = {
row[0]
for row in conn.execute(
"SELECT database_name FROM duckdb_databases()"
).fetchall()
}
assert malicious_name not in attached
finally:
conn.close()