"""Migration 2026-05-09: Magic-Link-Auth für Verwaltungsportal. Erstellt zwei Tabellen: - portal_magic_links: Token-Speicher (E-Mail, Token, Ablauf, used_at) - portal_magic_link_attempts: Brute-Force-/Rate-Limit-Tracking (IP, E-Mail, ts) Außerdem: - portal_login_attempts wird gedroppt (alte Passwort-Login-Tabelle, obsolet) - portal_admins.password_hash wird auf '' gesetzt (Spalten bleiben für Audit-Spur erhalten) Ausführung: DB_PATH=/home/claude-dev/osint-data/osint.db python3 migrations/2026-05-09_portal_magic_link.py DB_PATH=/home/claude-dev/AegisSight-Monitor-staging/data/osint.db python3 migrations/2026-05-09_portal_magic_link.py """ import os import sqlite3 import sys def main(db_path: str) -> int: if not os.path.exists(db_path): print(f"FEHLER: DB nicht gefunden: {db_path}", file=sys.stderr) return 1 conn = sqlite3.connect(db_path, timeout=60) conn.execute("PRAGMA busy_timeout = 60000") conn.execute("PRAGMA journal_mode = WAL") print(f"Migration auf {db_path}") # 1. Magic-Link-Tabellen anlegen conn.executescript(""" CREATE TABLE IF NOT EXISTS portal_magic_links ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL, token TEXT UNIQUE NOT NULL, expires_at TIMESTAMP NOT NULL, used_at TIMESTAMP, ip_address TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_portal_magic_links_token ON portal_magic_links(token); CREATE INDEX IF NOT EXISTS idx_portal_magic_links_email ON portal_magic_links(email); CREATE TABLE IF NOT EXISTS portal_magic_link_attempts ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip TEXT NOT NULL, email TEXT NOT NULL, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_portal_magic_link_attempts_lookup ON portal_magic_link_attempts(email, ip, ts); """) print(" + portal_magic_links angelegt (oder vorhanden)") print(" + portal_magic_link_attempts angelegt (oder vorhanden)") # 2. Alte Brute-Force-Tabelle für Passwort-Login droppen cur = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='portal_login_attempts'" ) if cur.fetchone(): conn.execute("DROP TABLE portal_login_attempts") print(" - portal_login_attempts gedroppt (Passwort-Login obsolet)") else: print(" = portal_login_attempts war bereits weg") # 3. portal_admins.email-Spalte hinzufügen (falls noch nicht da) - für künftige Mehr-Admin-Erweiterung cols = [c[1] for c in conn.execute("PRAGMA table_info(portal_admins)")] if "email" not in cols: conn.execute("ALTER TABLE portal_admins ADD COLUMN email TEXT") print(" + portal_admins.email Spalte hinzugefügt") else: print(" = portal_admins.email war bereits da") # 4. password_hash auf leeren String setzen (Spalte bleibt für Audit, aber unbenutzt) cur = conn.execute("SELECT COUNT(*) FROM portal_admins WHERE password_hash != ''") if cur.fetchone()[0] > 0: conn.execute("UPDATE portal_admins SET password_hash = ''") print(" ~ portal_admins.password_hash geleert (Auth ab jetzt nur per Magic-Link)") else: print(" = portal_admins.password_hash war bereits leer") conn.commit() conn.close() print("Migration abgeschlossen.") return 0 if __name__ == "__main__": db_path = os.environ.get("DB_PATH", "/home/claude-dev/osint-data/osint.db") sys.exit(main(db_path))