#!/usr/bin/env python3 """List or run notification scripts for the calling user. This helper is designed to be invoked via sudo -u , so it runs with the target user's permissions and can access their home directory. Callers (www-data, deploy) never need to traverse the user's home. Usage: notify-scripts list - JSON array of scripts with last_run info notify-scripts run - execute script, print its JSON output notify-scripts sync-status - JSON with ~/server/ mtime (last sync) """ import grp import json import os import pwd import subprocess import sys import time from pathlib import Path home = Path.home() SCRIPTS_DIR = home / "user" / "notifications" STATE_DIR = home / ".notifications" / "state" VENV_PYTHON = home / ".venv" / "bin" / "python" SCRIPT_TIMEOUT_SECONDS = 60 def validate_username(username): """Ensure username is a member of dataread group (authorized analyst).""" try: # Get dataread group members dataread_group = grp.getgrnam('dataread') dataread_members = dataread_group.gr_mem # Also check users whose primary group is dataread user_info = pwd.getpwnam(username) if user_info.pw_gid == dataread_group.gr_gid: return True # Check if user is in dataread supplementary groups return username in dataread_members except KeyError: return False def cmd_list(): """Print JSON array of notification scripts with last_run metadata.""" if not SCRIPTS_DIR.is_dir(): print("[]") return result = [] for script in sorted(SCRIPTS_DIR.glob("*.py")): state_file = STATE_DIR / f"{script.stem}.json" last_run = _read_last_run(state_file) result.append({ "name": script.name, "stem": script.stem, "last_run": last_run, }) print(json.dumps(result)) def cmd_run(script_name: str): """Execute a notification script and relay its stdout.""" # Validate that current user is an authorized analyst current_user = pwd.getpwuid(os.getuid()).pw_name if not validate_username(current_user): print(json.dumps({"error": f"User '{current_user}' is not authorized"})) sys.exit(1) script_path = SCRIPTS_DIR / script_name if not script_name.endswith(".py") or not script_path.is_file(): print(json.dumps({"error": "Script not found"})) sys.exit(1) if not VENV_PYTHON.is_file(): print(json.dumps({"error": "No .venv found"})) sys.exit(1) env = os.environ.copy() env["MPLCONFIGDIR"] = f"/tmp/mpl_{pwd.getpwuid(os.getuid()).pw_name}" try: proc = subprocess.run( [str(VENV_PYTHON), str(script_path)], capture_output=True, text=True, timeout=SCRIPT_TIMEOUT_SECONDS, cwd=str(home), env=env, ) except subprocess.TimeoutExpired: print(json.dumps({"error": f"Script timed out after {SCRIPT_TIMEOUT_SECONDS}s"})) sys.exit(1) if proc.returncode != 0: print(json.dumps({"error": proc.stderr[:500]})) sys.exit(1) sys.stdout.write(proc.stdout) def cmd_sync_status(): """Print JSON with last sync time based on ~/server/ directory mtime.""" server_dir = home / "server" if not server_dir.is_dir(): print(json.dumps({"synced": False, "elapsed_seconds": None})) return try: mtime = server_dir.stat().st_mtime elapsed = time.time() - mtime print(json.dumps({ "synced": True, "elapsed_seconds": round(elapsed), "elapsed_display": _format_elapsed(elapsed) + " ago", })) except OSError: print(json.dumps({"synced": False, "elapsed_seconds": None})) def _read_last_run(state_file: Path) -> str | None: """Read state file and return human-readable elapsed time, or None.""" if not state_file.exists(): return None try: state = json.loads(state_file.read_text()) ts = state.get("last_sent", 0) if ts <= 0: return None elapsed = time.time() - ts return _format_elapsed(elapsed) + " ago" except Exception: return None def _format_elapsed(seconds: float) -> str: """Format elapsed seconds into a compact string.""" if seconds < 60: return f"{int(seconds)}s" if seconds < 3600: return f"{int(seconds / 60)}m" if seconds < 86400: return f"{int(seconds / 3600)}h" return f"{int(seconds / 86400)}d" if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: notify-scripts [script_name]", file=sys.stderr) sys.exit(1) command = sys.argv[1] if command == "list": cmd_list() elif command == "run" and len(sys.argv) >= 3: cmd_run(sys.argv[2]) elif command == "sync-status": cmd_sync_status() else: print(f"Unknown command: {command}", file=sys.stderr) sys.exit(1)