agnes-the-ai-analyst/app/api/telegram.py
ZdenekSrotyr 1287e63ed9 feat: complete system — web UI, all API endpoints, governance, admin, CLI commands
Major additions:
- Web UI: Jinja2 templates in FastAPI (login, dashboard, catalog, corporate memory, admin)
- API: catalog profiles/metrics, telegram verify/unlink/status, admin table registry CRUD
- Corporate memory governance: approve/reject/mandate/revoke/edit/batch + audit log
- Sync: real DataSyncManager trigger, sync-settings, table-subscriptions
- CLI: setup (init/test/deploy/verify), server (logs/restart/deploy/backup), explore
- Instance config integration (instance.yaml loaded at startup)
- 140 tests passing (25 new)
2026-03-27 16:52:22 +01:00

55 lines
1.7 KiB
Python

"""Telegram integration endpoints — verify, unlink, status."""
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import duckdb
from app.auth.dependencies import get_current_user, _get_db
from src.repositories.notifications import TelegramRepository, PendingCodeRepository
router = APIRouter(prefix="/api/telegram", tags=["telegram"])
class VerifyRequest(BaseModel):
code: str
@router.post("/verify")
async def telegram_verify(
request: VerifyRequest,
user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Verify a code to link Telegram account."""
code_repo = PendingCodeRepository(conn)
code_data = code_repo.verify_code(request.code)
if not code_data:
raise HTTPException(status_code=400, detail="Invalid or expired code")
tg_repo = TelegramRepository(conn)
tg_repo.link_user(user["id"], chat_id=code_data["chat_id"])
return {"status": "linked", "chat_id": code_data["chat_id"]}
@router.post("/unlink")
async def telegram_unlink(
user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Unlink Telegram account."""
tg_repo = TelegramRepository(conn)
tg_repo.unlink_user(user["id"])
return {"status": "unlinked"}
@router.get("/status")
async def telegram_status(
user: dict = Depends(get_current_user),
conn: duckdb.DuckDBPyConnection = Depends(_get_db),
):
"""Get current Telegram link status."""
tg_repo = TelegramRepository(conn)
link = tg_repo.get_link(user["id"])
if link:
return {"linked": True, "chat_id": link["chat_id"], "linked_at": str(link.get("linked_at", ""))}
return {"linked": False}