Phase 1 - Internal reference cleanup:
- Delete dev_docs/meetings/ (internal meeting notes/transcripts)
- Replace hardcoded usernames (padak/matejkys/dasa) with deploy/generic
- Replace "Internal AI Data Analyst" with "AI Data Analyst"
- Replace keboola/internal_ai_data_analyst URLs with your-org/ai-data-analyst
- Replace /tmp/keboola_load/ with /tmp/data_analyst_staging/ in dev_docs
Phase 2 - Deployment hardening:
- Tighten sudoers wildcards to explicit paths (visudo, sudoers cp)
- setup.sh creates all groups (data-ops, dataread, data-private) and deploy user
- webapp-setup.sh copies sudoers-webapp from repo instead of inline definition
- deploy.sh conditional copy for data_description.md (not in git for OSS)
- deploy.sh ownership changed to deploy:data-ops for /data/{scripts,docs,examples}
Phase 3 - Config and misc:
- Add ${ENV_VAR} interpolation to config/loader.py
- Expand config/instance.yaml.example with all sections (admins, deployment, auth, etc.)
- Create config/.env.template for secret values
- Add MIT LICENSE
- Fix .gitignore: add .venv/, docs/data_description.md
- Fix README.md: CSV status Planned, remove metrics/, update license text
- Translate Czech comments in requirements.txt to English
- Fix test_account_service.py: mock username mapping instead of relying on instance config
All 118 tests pass.
893 lines
32 KiB
Python
893 lines
32 KiB
Python
"""Deploy Guard Tests - Pre-merge CI tests to prevent deployment failures.
|
|
|
|
These tests validate consistency between deploy.sh, sudoers files, systemd
|
|
services, and server scripts. They run against the real repository structure
|
|
(no mocks) and automatically discover files/scripts/services.
|
|
|
|
Supports `# deploy-guard: ignore` comments in sudoers files to suppress
|
|
known false positives.
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
# Repository root (two levels up from tests/)
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
SERVER_DIR = REPO_ROOT / "server"
|
|
DEPLOY_SH = SERVER_DIR / "deploy.sh"
|
|
BIN_DIR = SERVER_DIR / "bin"
|
|
SRC_DIR = REPO_ROOT / "src"
|
|
WEBAPP_DIR = REPO_ROOT / "webapp"
|
|
|
|
|
|
def _read_deploy_sh() -> str:
|
|
"""Read deploy.sh contents."""
|
|
return DEPLOY_SH.read_text()
|
|
|
|
|
|
def _read_file(path: Path) -> str:
|
|
"""Read file contents, return empty string if not found."""
|
|
if path.is_file():
|
|
return path.read_text()
|
|
return ""
|
|
|
|
|
|
def _find_sudoers_files() -> list[Path]:
|
|
"""Discover all sudoers-* files in server/."""
|
|
return sorted(SERVER_DIR.glob("sudoers-*"))
|
|
|
|
|
|
def _find_service_files() -> list[Path]:
|
|
"""Discover all *.service files in server/."""
|
|
return sorted(SERVER_DIR.glob("*.service"))
|
|
|
|
|
|
def _find_timer_files() -> list[Path]:
|
|
"""Discover all *.timer files in server/."""
|
|
return sorted(SERVER_DIR.glob("*.timer"))
|
|
|
|
|
|
def _find_shell_scripts() -> list[Path]:
|
|
"""Discover all shell scripts in server/bin/ and server/*.sh."""
|
|
scripts = []
|
|
for f in sorted(BIN_DIR.glob("*")):
|
|
if f.is_file():
|
|
content = f.read_text(errors="replace")
|
|
if content.startswith("#!/bin/bash") or content.startswith("#!/bin/sh"):
|
|
scripts.append(f)
|
|
for f in sorted(SERVER_DIR.glob("*.sh")):
|
|
if f.is_file():
|
|
scripts.append(f)
|
|
return scripts
|
|
|
|
|
|
def _parse_sudoers_commands(sudoers_path: Path) -> list[dict]:
|
|
"""Parse sudoers file, extract allowed commands.
|
|
|
|
Returns list of dicts with keys: user, command, line, ignored.
|
|
"""
|
|
results = []
|
|
content = sudoers_path.read_text()
|
|
for line_num, line in enumerate(content.splitlines(), 1):
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
# Match: user ALL=(ALL) NOPASSWD: /path/to/command [args]
|
|
m = re.match(
|
|
r"^(\S+)\s+ALL=\(ALL\)\s+NOPASSWD:\s+(.+)$", stripped
|
|
)
|
|
if m:
|
|
user = m.group(1)
|
|
command = m.group(2).strip()
|
|
# Unescape sudoers backslash-colon (e.g., deploy\:data-ops -> deploy:data-ops)
|
|
command = command.replace("\\:", ":")
|
|
# Check for deploy-guard: ignore in preceding comment
|
|
ignored = False
|
|
if line_num >= 2:
|
|
prev_line = content.splitlines()[line_num - 2].strip()
|
|
if "deploy-guard: ignore" in prev_line:
|
|
ignored = True
|
|
results.append({
|
|
"user": user,
|
|
"command": command,
|
|
"line": line_num,
|
|
"ignored": ignored,
|
|
"file": sudoers_path.name,
|
|
})
|
|
return results
|
|
|
|
|
|
def _resolve_deploy_variables() -> dict[str, str]:
|
|
"""Extract variable assignments from deploy.sh header.
|
|
|
|
Returns dict of variable_name -> value for simple assignments like:
|
|
APP_DIR="/opt/data-analyst"
|
|
REPO_DIR="${APP_DIR}/repo"
|
|
"""
|
|
content = _read_deploy_sh()
|
|
variables = {}
|
|
for line in content.splitlines():
|
|
stripped = line.strip()
|
|
m = re.match(r'^(\w+)="([^"]*)"$', stripped)
|
|
if not m:
|
|
m = re.match(r"^(\w+)='([^']*)'$", stripped)
|
|
if not m:
|
|
m = re.match(r"^(\w+)=(\S+)$", stripped)
|
|
if m:
|
|
name, value = m.group(1), m.group(2)
|
|
# Resolve references to other variables
|
|
for var_name, var_value in variables.items():
|
|
value = value.replace(f"${{{var_name}}}", var_value)
|
|
value = value.replace(f"${var_name}", var_value)
|
|
variables[name] = value
|
|
return variables
|
|
|
|
|
|
def _extract_sudo_commands_from_deploy() -> list[str]:
|
|
"""Extract all sudo commands from deploy.sh.
|
|
|
|
Returns normalized command strings with known variables resolved
|
|
and unknown variables replaced with *.
|
|
"""
|
|
content = _read_deploy_sh()
|
|
variables = _resolve_deploy_variables()
|
|
commands = []
|
|
for line in content.splitlines():
|
|
stripped = line.strip()
|
|
if stripped.startswith("#"):
|
|
continue
|
|
# Find sudo /path/to/command patterns
|
|
# Skip lines with || true (optional commands that may fail)
|
|
if "|| true" in stripped:
|
|
continue
|
|
for m in re.finditer(r"sudo\s+(/\S+(?:\s+[^|;&\n]+)?)", stripped):
|
|
cmd = m.group(1).strip()
|
|
# Remove trailing comments
|
|
cmd = re.sub(r"\s*#.*$", "", cmd)
|
|
# Remove shell redirections (e.g., > /dev/null, 2>/dev/null)
|
|
cmd = re.sub(r"\s*\d*>.*$", "", cmd)
|
|
cmd = re.sub(r"\s*[|].*$", "", cmd)
|
|
# Remove quotes
|
|
cmd = cmd.replace('"', '').replace("'", "")
|
|
# Resolve known variables first
|
|
for var_name, var_value in variables.items():
|
|
cmd = cmd.replace(f"${{{var_name}}}", var_value)
|
|
cmd = cmd.replace(f"${var_name}", var_value)
|
|
# Replace remaining unknown variables with *
|
|
cmd = re.sub(r'\$\{?\w+\}?', '*', cmd)
|
|
cmd = cmd.strip()
|
|
if cmd:
|
|
commands.append(cmd)
|
|
return _deduplicate_commands(commands)
|
|
|
|
|
|
def _deduplicate_commands(commands: list[str]) -> list[str]:
|
|
"""Remove duplicate commands after normalization."""
|
|
seen = set()
|
|
result = []
|
|
for cmd in commands:
|
|
if cmd not in seen:
|
|
seen.add(cmd)
|
|
result.append(cmd)
|
|
return result
|
|
|
|
|
|
def _extract_service_cp_from_deploy() -> list[str]:
|
|
"""Extract service file names that deploy.sh copies to /etc/systemd/system/.
|
|
|
|
Returns list of service/timer basenames.
|
|
"""
|
|
content = _read_deploy_sh()
|
|
results = []
|
|
# Match patterns like:
|
|
# sudo /usr/bin/cp "${REPO_DIR}/server/foo.service" /etc/systemd/system/foo.service
|
|
# sudo /usr/bin/cp "${REPO_DIR}/server/foo.timer" /etc/systemd/system/foo.timer
|
|
for m in re.finditer(
|
|
r'sudo\s+/usr/bin/cp\s+["\']?\$\{?\w+\}?/server/(\S+\.(?:service|timer))["\']?\s+'
|
|
r'/etc/systemd/system/',
|
|
content,
|
|
):
|
|
results.append(m.group(1))
|
|
|
|
# Also match quoted form: "...server/foo.service"
|
|
for m in re.finditer(
|
|
r'sudo\s+/usr/bin/cp\s+"[^"]*?/server/([^"]+\.(?:service|timer))"\s+'
|
|
r'/etc/systemd/system/',
|
|
content,
|
|
):
|
|
filename = m.group(1)
|
|
if filename not in results:
|
|
results.append(filename)
|
|
|
|
return results
|
|
|
|
|
|
def _parse_service_file(path: Path) -> dict:
|
|
"""Parse a systemd service file and return key directives."""
|
|
content = path.read_text()
|
|
result = {"User": None, "Group": None, "ExecStart": None}
|
|
for line in content.splitlines():
|
|
stripped = line.strip()
|
|
for key in result:
|
|
if stripped.startswith(f"{key}="):
|
|
result[key] = stripped.split("=", 1)[1].strip()
|
|
return result
|
|
|
|
|
|
# =============================================================================
|
|
# 1. Sudoers <-> Deploy Consistency (P0)
|
|
# =============================================================================
|
|
|
|
|
|
class TestSudoersDeployConsistency:
|
|
"""Verify that all sudo commands in deploy.sh have matching sudoers rules."""
|
|
|
|
def test_deploy_sudo_commands_have_sudoers_rules(self):
|
|
"""Every sudo command in deploy.sh must have a matching sudoers rule.
|
|
|
|
Catches: New sudo operation in deploy.sh without sudoers rule (#123).
|
|
"""
|
|
deploy_commands = _extract_sudo_commands_from_deploy()
|
|
assert deploy_commands, "Should find sudo commands in deploy.sh"
|
|
|
|
# Collect all sudoers rules
|
|
all_sudoers_commands = []
|
|
for sf in _find_sudoers_files():
|
|
all_sudoers_commands.extend(_parse_sudoers_commands(sf))
|
|
|
|
sudoers_patterns = [
|
|
entry["command"] for entry in all_sudoers_commands
|
|
]
|
|
|
|
missing = []
|
|
for cmd in deploy_commands:
|
|
if not _command_matches_any_sudoers_rule(cmd, sudoers_patterns):
|
|
missing.append(cmd)
|
|
|
|
assert not missing, (
|
|
f"deploy.sh uses sudo commands without matching sudoers rules:\n"
|
|
+ "\n".join(f" - {cmd}" for cmd in missing)
|
|
)
|
|
|
|
def test_sudoers_commands_are_used(self):
|
|
"""Each sudoers rule should be referenced somewhere in the codebase.
|
|
|
|
Low-confidence test - uses deploy-guard: ignore to suppress false positives.
|
|
|
|
Catches: Stale/dead sudoers rules (code hygiene).
|
|
"""
|
|
# Collect all codebase content to search
|
|
search_files = [DEPLOY_SH]
|
|
search_files.extend(BIN_DIR.glob("*"))
|
|
search_files.extend(WEBAPP_DIR.glob("*.py"))
|
|
|
|
codebase_content = ""
|
|
for f in search_files:
|
|
if f.is_file():
|
|
codebase_content += _read_file(f) + "\n"
|
|
|
|
unused = []
|
|
for sf in _find_sudoers_files():
|
|
for entry in _parse_sudoers_commands(sf):
|
|
if entry["ignored"]:
|
|
continue
|
|
# Extract the binary path from the sudoers command
|
|
binary = entry["command"].split()[0]
|
|
binary_name = Path(binary).name
|
|
|
|
# Check if the binary or command pattern appears in codebase
|
|
if binary_name not in codebase_content and binary not in codebase_content:
|
|
unused.append(
|
|
f"{entry['file']}:{entry['line']} -> {entry['command']}"
|
|
)
|
|
|
|
assert not unused, (
|
|
f"Sudoers rules not referenced in codebase "
|
|
f"(add '# deploy-guard: ignore' to suppress):\n"
|
|
+ "\n".join(f" - {u}" for u in unused)
|
|
)
|
|
|
|
|
|
def _command_matches_any_sudoers_rule(command: str, sudoers_patterns: list[str]) -> bool:
|
|
"""Check if a deploy.sh sudo command matches any sudoers rule.
|
|
|
|
Both sides use wildcards: sudoers rules use * for glob matching,
|
|
and deploy.sh commands have shell variables normalized to *.
|
|
"""
|
|
for pattern in sudoers_patterns:
|
|
if _sudoers_rule_matches(pattern, command):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _sudoers_rule_matches(rule: str, command: str) -> bool:
|
|
"""Check if a sudoers rule matches a given command.
|
|
|
|
Sudoers uses glob-like wildcards where * matches any string.
|
|
The command may also contain * from variable normalization.
|
|
|
|
Matching strategy:
|
|
1. Binary (first token) must match.
|
|
2. Compare argument-by-argument: a sudoers arg with * matches
|
|
any command arg, and vice versa.
|
|
3. If arg counts differ, the side with fewer args uses * to match rest.
|
|
"""
|
|
# Direct string match
|
|
if rule == command:
|
|
return True
|
|
|
|
rule_parts = rule.split()
|
|
cmd_parts = command.split()
|
|
if not rule_parts or not cmd_parts:
|
|
return False
|
|
|
|
# Binary must match exactly
|
|
if rule_parts[0] != cmd_parts[0]:
|
|
return False
|
|
|
|
# If rule has no args beyond binary, it matches any args
|
|
if len(rule_parts) == 1:
|
|
return True
|
|
|
|
# Compare arguments using regex with wildcard expansion
|
|
rule_args = rule_parts[1:]
|
|
cmd_args = cmd_parts[1:]
|
|
|
|
# Strategy 1: Full regex matching (both directions)
|
|
rule_regex = re.escape(" ".join(rule_args)).replace(r"\*", ".*")
|
|
cmd_str = " ".join(cmd_args)
|
|
cmd_regex = re.escape(" ".join(cmd_args)).replace(r"\*", ".*")
|
|
rule_str = " ".join(rule_args)
|
|
|
|
try:
|
|
if re.fullmatch(rule_regex, cmd_str):
|
|
return True
|
|
if re.fullmatch(cmd_regex, rule_str):
|
|
return True
|
|
except re.error:
|
|
pass
|
|
|
|
# Strategy 2: Positional arg-by-arg with wildcard matching
|
|
max_len = max(len(rule_args), len(cmd_args))
|
|
padded_rule = rule_args + ["*"] * (max_len - len(rule_args))
|
|
padded_cmd = cmd_args + ["*"] * (max_len - len(cmd_args))
|
|
|
|
all_match = True
|
|
for r_arg, c_arg in zip(padded_rule, padded_cmd):
|
|
if r_arg == "*" or c_arg == "*":
|
|
continue
|
|
r_pat = re.escape(r_arg).replace(r"\*", ".*")
|
|
c_pat = re.escape(c_arg).replace(r"\*", ".*")
|
|
try:
|
|
if not (re.fullmatch(r_pat, c_arg) or re.fullmatch(c_pat, r_arg)):
|
|
all_match = False
|
|
break
|
|
except re.error:
|
|
all_match = False
|
|
break
|
|
|
|
return all_match
|
|
|
|
|
|
# =============================================================================
|
|
# 2. Systemd Services (P0/P2)
|
|
# =============================================================================
|
|
|
|
|
|
class TestSystemdServices:
|
|
"""Verify consistency of systemd service and timer files."""
|
|
|
|
def test_all_deployed_services_exist(self):
|
|
"""Every service/timer that deploy.sh copies must exist in server/.
|
|
|
|
Catches: deploy.sh references non-existent service file.
|
|
"""
|
|
deployed = _extract_service_cp_from_deploy()
|
|
assert deployed, "Should find service deployments in deploy.sh"
|
|
|
|
missing = []
|
|
for filename in deployed:
|
|
if not (SERVER_DIR / filename).is_file():
|
|
missing.append(filename)
|
|
|
|
assert not missing, (
|
|
f"deploy.sh deploys non-existent service files:\n"
|
|
+ "\n".join(f" - server/{f}" for f in missing)
|
|
)
|
|
|
|
def test_services_with_timers_have_both_files(self):
|
|
"""For each *.timer, the corresponding *.service must exist.
|
|
|
|
If the timer has an explicit Unit= directive, that service is checked
|
|
instead of the default (timer stem + .service).
|
|
|
|
Catches: Timer without service file.
|
|
"""
|
|
missing = []
|
|
for timer in _find_timer_files():
|
|
# Check if timer has explicit Unit= directive
|
|
timer_content = timer.read_text()
|
|
unit_match = re.search(r"^Unit=(\S+)", timer_content, re.MULTILINE)
|
|
if unit_match:
|
|
service_name = unit_match.group(1)
|
|
else:
|
|
service_name = timer.stem + ".service"
|
|
|
|
if not (SERVER_DIR / service_name).is_file():
|
|
missing.append(f"{timer.name} -> {service_name}")
|
|
|
|
assert not missing, (
|
|
f"Timer files without corresponding service:\n"
|
|
+ "\n".join(f" - {m}" for m in missing)
|
|
)
|
|
|
|
def test_new_services_have_sudoers_for_systemctl(self):
|
|
"""Every service deployed by deploy.sh must have sudoers rules for systemctl.
|
|
|
|
Catches: New service without sudoers for restart/start/stop (#123).
|
|
"""
|
|
all_sudoers_content = ""
|
|
for sf in _find_sudoers_files():
|
|
all_sudoers_content += sf.read_text() + "\n"
|
|
|
|
# Get service names from deploy.sh cp commands
|
|
deployed = _extract_service_cp_from_deploy()
|
|
service_names = {
|
|
f.replace(".service", "").replace(".timer", "")
|
|
for f in deployed
|
|
if f.endswith(".service")
|
|
}
|
|
|
|
missing = []
|
|
for svc in sorted(service_names):
|
|
# Check that at least one systemctl operation exists for this service
|
|
has_systemctl = bool(
|
|
re.search(rf"systemctl\s+\S+\s+{re.escape(svc)}", all_sudoers_content)
|
|
)
|
|
if not has_systemctl:
|
|
missing.append(svc)
|
|
|
|
assert not missing, (
|
|
f"Services deployed by deploy.sh without systemctl sudoers rules:\n"
|
|
+ "\n".join(f" - {s}" for s in missing)
|
|
)
|
|
|
|
def test_service_files_have_valid_structure(self):
|
|
"""All service files must have required systemd sections."""
|
|
for svc in _find_service_files():
|
|
content = svc.read_text()
|
|
assert "[Service]" in content, (
|
|
f"{svc.name} missing [Service] section"
|
|
)
|
|
assert "[Unit]" in content, (
|
|
f"{svc.name} missing [Unit] section"
|
|
)
|
|
assert "ExecStart=" in content, (
|
|
f"{svc.name} missing ExecStart directive"
|
|
)
|
|
|
|
def test_exec_start_post_does_not_reference_runtime_files(self):
|
|
"""ExecStartPost commands must not operate on files created by the service itself.
|
|
|
|
ExecStartPost runs immediately after the process starts, not when it's ready.
|
|
If a service creates files asynchronously (sockets, pidfiles), ExecStartPost
|
|
cannot reliably operate on them.
|
|
|
|
Catches: notify-bot startup failure where ExecStartPost tried to chgrp a
|
|
socket that didn't exist yet (#192/#193).
|
|
"""
|
|
problems = []
|
|
for svc in _find_service_files():
|
|
content = svc.read_text()
|
|
|
|
# Find RuntimeDirectory directive
|
|
runtime_dir_match = re.search(r"RuntimeDirectory=(\S+)", content)
|
|
if not runtime_dir_match:
|
|
continue
|
|
|
|
runtime_dir = runtime_dir_match.group(1)
|
|
runtime_path = f"/run/{runtime_dir}"
|
|
|
|
# Check if ExecStartPost references files in RuntimeDirectory
|
|
exec_post_matches = re.findall(r"ExecStartPost=(.+)", content)
|
|
for post_cmd in exec_post_matches:
|
|
# Skip if it's a systemd special command (like -/bin/true)
|
|
if post_cmd.startswith("-") or post_cmd.startswith("+"):
|
|
post_cmd = post_cmd[1:]
|
|
|
|
# Check if command references the RuntimeDirectory path
|
|
if runtime_path in post_cmd:
|
|
problems.append(
|
|
f"{svc.name}: ExecStartPost references {runtime_path}, "
|
|
f"but files in RuntimeDirectory may not exist yet at startup. "
|
|
f"Consider using a different approach (os.chown() in application code, "
|
|
f"RuntimeDirectoryGroup, or adding deploy user to required group)."
|
|
)
|
|
|
|
assert not problems, (
|
|
f"Service files with unsafe ExecStartPost:\n"
|
|
+ "\n".join(f" - {p}" for p in problems)
|
|
)
|
|
|
|
def test_timer_files_have_valid_structure(self):
|
|
"""All timer files must have required systemd sections."""
|
|
for timer in _find_timer_files():
|
|
content = timer.read_text()
|
|
assert "[Timer]" in content, (
|
|
f"{timer.name} missing [Timer] section"
|
|
)
|
|
assert "[Install]" in content, (
|
|
f"{timer.name} missing [Install] section"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# 3. File Ownership & Permissions (P0)
|
|
# =============================================================================
|
|
|
|
|
|
class TestFileOwnership:
|
|
"""Verify ownership and permission consistency in deploy.sh."""
|
|
|
|
# Explicit list of critical directories and their expected ownership.
|
|
# Maintained manually - extend when new critical directories are added.
|
|
CRITICAL_DIRS = {
|
|
"/data/scripts": {"owner": "deploy", "group": "data-ops"},
|
|
"/data/docs": {"owner": "deploy", "group": "data-ops"},
|
|
"/data/examples": {"owner": "deploy", "group": "data-ops"},
|
|
"/data/notifications": {"owner": "deploy", "group": "data-ops"},
|
|
"/data/auth": {"owner": "www-data", "group": "data-ops"},
|
|
"/data/corporate-memory": {"owner": "deploy", "group": "data-ops"},
|
|
"/data/user_sessions": {"owner": "root", "group": "data-ops"},
|
|
"/data/src_data/raw/jira": {"owner": "root", "group": "data-ops"},
|
|
"/opt/data-analyst": {"owner": "root", "group": "data-ops"},
|
|
}
|
|
|
|
def test_service_user_matches_file_ownership(self):
|
|
"""Verify chown commands in deploy.sh match expectations for critical dirs.
|
|
|
|
Catches: Service runs as www-data but files owned by root (#108).
|
|
"""
|
|
deploy_content = _read_deploy_sh()
|
|
|
|
mismatches = []
|
|
for dir_path, expected in self.CRITICAL_DIRS.items():
|
|
# Find chown commands for this directory in deploy.sh
|
|
# Pattern: chown [-R] owner:group /path
|
|
chown_pattern = re.compile(
|
|
rf"chown\s+(?:-R\s+)?(\S+?)[:\\](\S+?)\s+[\"']?{re.escape(dir_path)}[\"']?"
|
|
)
|
|
matches = chown_pattern.findall(deploy_content)
|
|
|
|
if not matches:
|
|
# Directory might be created without explicit chown
|
|
continue
|
|
|
|
for owner, group in matches:
|
|
if owner != expected["owner"]:
|
|
mismatches.append(
|
|
f"{dir_path}: expected owner={expected['owner']}, "
|
|
f"found owner={owner}"
|
|
)
|
|
if group != expected["group"]:
|
|
mismatches.append(
|
|
f"{dir_path}: expected group={expected['group']}, "
|
|
f"found group={group}"
|
|
)
|
|
|
|
assert not mismatches, (
|
|
f"Ownership mismatches in deploy.sh:\n"
|
|
+ "\n".join(f" - {m}" for m in mismatches)
|
|
)
|
|
|
|
def test_deploy_chmod_sets_required_permissions(self):
|
|
"""Files owned by www-data must have at least 644 permissions.
|
|
|
|
Catches: mkstemp creates 600, webapp needs 644 (#108).
|
|
"""
|
|
deploy_content = _read_deploy_sh()
|
|
|
|
# Find paths chowned to www-data
|
|
www_paths = re.findall(
|
|
r"chown\s+(?:-R\s+)?www-data[:\\]\S+\s+(\S+)",
|
|
deploy_content,
|
|
)
|
|
|
|
problems = []
|
|
for path in www_paths:
|
|
path = path.strip("\"'")
|
|
# Check if there's a chmod for this path
|
|
has_chmod = bool(
|
|
re.search(
|
|
rf"chmod\s+(?:-R\s+)?\S+\s+[\"']?{re.escape(path)}[\"']?",
|
|
deploy_content,
|
|
)
|
|
)
|
|
if not has_chmod:
|
|
problems.append(
|
|
f"{path}: chown to www-data without corresponding chmod"
|
|
)
|
|
|
|
assert not problems, (
|
|
f"Missing chmod for www-data owned paths:\n"
|
|
+ "\n".join(f" - {p}" for p in problems)
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# 4. Symlinks and Paths (P1)
|
|
# =============================================================================
|
|
|
|
|
|
class TestSymlinksAndPaths:
|
|
"""Verify symlink targets and hardcoded paths are consistent."""
|
|
|
|
def test_symlink_targets_in_add_analyst(self):
|
|
"""All symlink targets in add-analyst must point to paths that deploy.sh creates.
|
|
|
|
Catches: Script creates symlink to /data/X, but deploy.sh copies to /data/Y
|
|
(#157, #158).
|
|
"""
|
|
add_analyst = BIN_DIR / "add-analyst"
|
|
if not add_analyst.is_file():
|
|
pytest.skip("add-analyst not found")
|
|
|
|
content = add_analyst.read_text()
|
|
deploy_content = _read_deploy_sh()
|
|
|
|
# Find all ln -sf TARGET patterns
|
|
symlinks = re.findall(r"ln\s+-sf?\s+(/\S+)", content)
|
|
|
|
missing = []
|
|
for target in symlinks:
|
|
# The target directory must be created by deploy.sh (mkdir -p)
|
|
# or be a well-known path
|
|
target_base = target.rstrip("/")
|
|
# Check deploy.sh creates this or a parent
|
|
found = False
|
|
parts = Path(target_base).parts
|
|
for i in range(len(parts), 0, -1):
|
|
check_path = "/".join(parts[:i])
|
|
if not check_path.startswith("/"):
|
|
check_path = "/" + check_path
|
|
if check_path in deploy_content:
|
|
found = True
|
|
break
|
|
if not found:
|
|
missing.append(target)
|
|
|
|
assert not missing, (
|
|
f"Symlink targets in add-analyst not found in deploy.sh:\n"
|
|
+ "\n".join(f" - {t}" for t in missing)
|
|
)
|
|
|
|
def test_deploy_copies_match_source_files(self):
|
|
"""Every file that deploy.sh copies from repo must exist.
|
|
|
|
Catches: deploy.sh references files that were deleted or moved.
|
|
"""
|
|
deploy_content = _read_deploy_sh()
|
|
|
|
# Find cp commands copying from ${REPO_DIR}/ or repo-relative paths
|
|
# Pattern: cp ... ${REPO_DIR}/path or "${REPO_DIR}/path"
|
|
cp_sources = re.findall(
|
|
r'cp\s+(?:-r\s+)?"?\$\{REPO_DIR\}/([^"}\s]+)',
|
|
deploy_content,
|
|
)
|
|
|
|
missing = []
|
|
for rel_path in cp_sources:
|
|
# Handle glob patterns (e.g., examples/notifications/*.py)
|
|
if "*" in rel_path:
|
|
# Check the directory exists
|
|
dir_path = REPO_ROOT / rel_path.rsplit("/", 1)[0]
|
|
if not dir_path.is_dir():
|
|
missing.append(rel_path)
|
|
else:
|
|
full_path = REPO_ROOT / rel_path
|
|
if not full_path.exists():
|
|
missing.append(rel_path)
|
|
|
|
assert not missing, (
|
|
f"deploy.sh copies files that don't exist in repo:\n"
|
|
+ "\n".join(f" - {p}" for p in missing)
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# 5. Documentation <-> Code (P2)
|
|
# =============================================================================
|
|
|
|
|
|
class TestDocumentation:
|
|
"""Verify documentation matches deployed services."""
|
|
|
|
def test_server_md_documents_all_services(self):
|
|
"""All *.service files should be mentioned in dev_docs/server.md.
|
|
|
|
Catches: New service without documentation.
|
|
"""
|
|
server_md = REPO_ROOT / "dev_docs" / "server.md"
|
|
if not server_md.is_file():
|
|
pytest.skip("dev_docs/server.md not found")
|
|
|
|
doc_content = server_md.read_text()
|
|
services = _find_service_files()
|
|
|
|
undocumented = []
|
|
for svc in services:
|
|
svc_name = svc.stem # e.g., "notify-bot"
|
|
if svc_name not in doc_content:
|
|
undocumented.append(svc.name)
|
|
|
|
assert not undocumented, (
|
|
f"Services not documented in dev_docs/server.md:\n"
|
|
+ "\n".join(f" - {s}" for s in undocumented)
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# 6. Shell Script Hygiene (P1)
|
|
# =============================================================================
|
|
|
|
|
|
class TestShellScriptHygiene:
|
|
"""Verify shell scripts follow safety best practices."""
|
|
|
|
def test_shell_scripts_use_strict_mode(self):
|
|
"""All bash scripts must use set -euo pipefail (or equivalent).
|
|
|
|
Exceptions:
|
|
- Scripts that are simple exec wrappers (single exec command)
|
|
- Scripts with only display/query operations (list-*, read-only)
|
|
|
|
Catches: Scripts silently continuing on error, leading to partial deployments.
|
|
"""
|
|
scripts = _find_shell_scripts()
|
|
assert scripts, "Should find shell scripts"
|
|
|
|
non_strict = []
|
|
for script in scripts:
|
|
content = script.read_text()
|
|
|
|
# Skip trivial scripts: exec wrappers or read-only display scripts
|
|
non_comment_lines = [
|
|
l.strip() for l in content.splitlines()
|
|
if l.strip() and not l.strip().startswith("#")
|
|
]
|
|
if len(non_comment_lines) <= 2:
|
|
continue
|
|
|
|
# Skip read-only scripts that only display information (no side effects)
|
|
# These are safe to run without strict mode.
|
|
has_side_effects = any(
|
|
re.search(pattern, content)
|
|
for pattern in [
|
|
r"\buseradd\b", r"\buserdel\b", r"\busermod\b",
|
|
r"\bmkdir\b", r"\bcp\b", r"\bmv\b", r"\brm\b",
|
|
r"\bchmod\b", r"\bchown\b", r"\bln\b", r"\btee\b",
|
|
r"\bsystemctl\b",
|
|
]
|
|
)
|
|
if not has_side_effects:
|
|
continue
|
|
|
|
# Check for set -euo pipefail or individual set commands
|
|
has_set_e = bool(re.search(r"set\s+-[a-z]*e", content))
|
|
has_set_u = bool(re.search(r"set\s+-[a-z]*u", content))
|
|
has_pipefail = "pipefail" in content
|
|
|
|
if not has_set_e:
|
|
non_strict.append(
|
|
f"{script.relative_to(REPO_ROOT)}: missing 'set -e'"
|
|
)
|
|
elif not has_set_u:
|
|
non_strict.append(
|
|
f"{script.relative_to(REPO_ROOT)}: missing 'set -u'"
|
|
)
|
|
elif not has_pipefail:
|
|
non_strict.append(
|
|
f"{script.relative_to(REPO_ROOT)}: missing 'pipefail'"
|
|
)
|
|
|
|
assert not non_strict, (
|
|
f"Shell scripts without strict mode:\n"
|
|
+ "\n".join(f" - {s}" for s in non_strict)
|
|
)
|
|
|
|
def test_shell_scripts_have_shebang(self):
|
|
"""All scripts in server/bin/ must start with a shebang line."""
|
|
problems = []
|
|
for script in sorted(BIN_DIR.glob("*")):
|
|
if not script.is_file():
|
|
continue
|
|
first_line = script.read_text(errors="replace").split("\n", 1)[0]
|
|
if not first_line.startswith("#!"):
|
|
problems.append(str(script.relative_to(REPO_ROOT)))
|
|
|
|
assert not problems, (
|
|
f"Scripts without shebang line:\n"
|
|
+ "\n".join(f" - {p}" for p in problems)
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# 7. Python Modules & Dependencies (P1)
|
|
# =============================================================================
|
|
|
|
|
|
class TestPythonDependencies:
|
|
"""Verify Python source files are importable and dependencies are declared."""
|
|
|
|
def test_all_src_modules_have_valid_syntax(self):
|
|
"""All .py files in src/ and webapp/ must have valid Python syntax.
|
|
|
|
Uses py_compile to check syntax without executing module-level code,
|
|
avoiding side effects like DB connections or API calls.
|
|
|
|
Catches: Syntax errors, missing parentheses, indentation issues.
|
|
"""
|
|
problems = []
|
|
for directory in [SRC_DIR, WEBAPP_DIR]:
|
|
if not directory.is_dir():
|
|
continue
|
|
for py_file in sorted(directory.rglob("*.py")):
|
|
try:
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "py_compile", str(py_file)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
if result.returncode != 0:
|
|
problems.append(f"{py_file.relative_to(REPO_ROOT)}: {result.stderr.strip()}")
|
|
except subprocess.TimeoutExpired:
|
|
problems.append(f"{py_file.relative_to(REPO_ROOT)}: compilation timed out")
|
|
|
|
assert not problems, (
|
|
f"Python files with syntax errors:\n"
|
|
+ "\n".join(f" - {p}" for p in problems)
|
|
)
|
|
|
|
def test_requirements_txt_exists_and_nonempty(self):
|
|
"""requirements.txt must exist and contain at least one package."""
|
|
req_file = REPO_ROOT / "requirements.txt"
|
|
assert req_file.is_file(), "requirements.txt not found"
|
|
|
|
content = req_file.read_text().strip()
|
|
packages = [
|
|
line for line in content.splitlines()
|
|
if line.strip() and not line.strip().startswith("#")
|
|
]
|
|
assert packages, "requirements.txt is empty (no packages declared)"
|
|
|
|
def test_no_duplicate_requirements(self):
|
|
"""requirements.txt should not have duplicate package declarations."""
|
|
req_file = REPO_ROOT / "requirements.txt"
|
|
if not req_file.is_file():
|
|
pytest.skip("requirements.txt not found")
|
|
|
|
content = req_file.read_text()
|
|
seen = {}
|
|
duplicates = []
|
|
for line_num, line in enumerate(content.splitlines(), 1):
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
# Extract package name (before >=, ==, etc.)
|
|
pkg_name = re.split(r"[><=!~\[]", stripped)[0].strip().lower()
|
|
if pkg_name in seen:
|
|
duplicates.append(
|
|
f"{pkg_name} (lines {seen[pkg_name]} and {line_num})"
|
|
)
|
|
else:
|
|
seen[pkg_name] = line_num
|
|
|
|
assert not duplicates, (
|
|
f"Duplicate packages in requirements.txt:\n"
|
|
+ "\n".join(f" - {d}" for d in duplicates)
|
|
)
|