Dateien
AegisSight-Monitor/src/routers/public_api.py
Claude Dev 19da099583 feat: Kontextabhängige Karten-Kategorien
4 feste Farbstufen (primary/secondary/tertiary/mentioned) mit
variablen Labels pro Lage, die von Haiku generiert werden.

- DB: category_labels Spalte in incidents, alte Kategorien migriert
  (target->primary, response/retaliation->secondary, actor->tertiary)
- Geoparsing: generate_category_labels() + neuer Prompt mit neuen Keys
- QC: Kategorieprüfung auf neue Keys umgestellt
- Orchestrator: Tuple-Rückgabe + Labels in DB speichern
- API: category_labels im Locations- und Lagebild-Response
- Frontend: Dynamische Legende aus API-Labels mit Fallback-Defaults
- Migrationsskript für bestehende Lagen

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 15:04:02 +01:00

184 Zeilen
6.5 KiB
Python

"""Öffentliche API für die Lagebild-Seite auf aegissight.de.
Authentifizierung via X-API-Key Header (getrennt von der JWT-Auth).
Exponiert den Irankonflikt (alle zugehörigen Incidents) als read-only.
"""
import json
import logging
import os
import time
from collections import defaultdict
from datetime import datetime
from config import TIMEZONE
from fastapi import APIRouter, Depends, Header, HTTPException, Request
from database import db_dependency
logger = logging.getLogger("osint.public_api")
router = APIRouter(prefix="/api/public", tags=["public"])
VALID_API_KEY = os.environ.get("AEGIS_PUBLIC_API_KEY")
# Alle Iran-Incident-IDs (Haupt-Incident #6 + Ableger)
IRAN_INCIDENT_IDS = [6, 18, 19, 20]
PRIMARY_INCIDENT_ID = 6
# Simple in-memory rate limiter: max 120 requests per hour per IP
_rate_limit: dict[str, list[float]] = defaultdict(list)
RATE_LIMIT_MAX = 120
RATE_LIMIT_WINDOW = 3600 # 1 hour
def _check_rate_limit(ip: str):
now = time.time()
_rate_limit[ip] = [t for t in _rate_limit[ip] if now - t < RATE_LIMIT_WINDOW]
if len(_rate_limit[ip]) >= RATE_LIMIT_MAX:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
_rate_limit[ip].append(now)
async def verify_api_key(request: Request, x_api_key: str = Header(...)):
"""Prüft API-Key und Rate-Limit."""
if not VALID_API_KEY or x_api_key != VALID_API_KEY:
logger.warning(f"Ungültiger API-Key von {request.client.host}")
raise HTTPException(status_code=403, detail="Invalid API key")
_check_rate_limit(request.client.host)
def _in_clause(ids):
"""Erzeugt sichere IN-Klausel für mehrere IDs."""
return ",".join(str(int(i)) for i in ids)
@router.get("/lagebild", dependencies=[Depends(verify_api_key)])
async def get_lagebild(db=Depends(db_dependency)):
"""Liefert das aktuelle Lagebild (Irankonflikt) mit allen Daten."""
ids = _in_clause(IRAN_INCIDENT_IDS)
# Haupt-Incident laden (für Summary, Sources)
cursor = await db.execute(
"SELECT * FROM incidents WHERE id = ?", (PRIMARY_INCIDENT_ID,)
)
incident = await cursor.fetchone()
if not incident:
raise HTTPException(status_code=404, detail="Incident not found")
incident = dict(incident)
# Category-Labels laden
category_labels = None
if incident.get("category_labels"):
try:
category_labels = json.loads(incident["category_labels"])
except (json.JSONDecodeError, TypeError):
pass
# Alle Artikel aus allen Iran-Incidents laden
cursor = await db.execute(
f"""SELECT id, headline, headline_de, source, source_url, language,
published_at, collected_at, verification_status, incident_id
FROM articles WHERE incident_id IN ({ids})
ORDER BY published_at DESC, collected_at DESC"""
)
articles = [dict(r) for r in await cursor.fetchall()]
# Alle Faktenchecks aus allen Iran-Incidents laden
cursor = await db.execute(
f"""SELECT id, claim, status, sources_count, evidence, status_history, checked_at, incident_id
FROM fact_checks WHERE incident_id IN ({ids})
ORDER BY checked_at DESC"""
)
fact_checks = []
for r in await cursor.fetchall():
fc = dict(r)
try:
fc["status_history"] = json.loads(fc.get("status_history") or "[]")
except (json.JSONDecodeError, TypeError):
fc["status_history"] = []
fact_checks.append(fc)
# Quellenanzahl über alle Incidents
cursor = await db.execute(
f"SELECT COUNT(DISTINCT source) as cnt FROM articles WHERE incident_id IN ({ids})"
)
source_count = (await cursor.fetchone())["cnt"]
# Snapshots aus allen Iran-Incidents
cursor = await db.execute(
f"""SELECT id, incident_id, article_count, fact_check_count, created_at
FROM incident_snapshots WHERE incident_id IN ({ids})
ORDER BY created_at DESC"""
)
available_snapshots = [dict(r) for r in await cursor.fetchall()]
# Sources JSON aus Haupt-Incident
try:
sources_json = json.loads(incident.get("sources_json") or "[]")
except (json.JSONDecodeError, TypeError):
sources_json = []
# Locations aggregiert nach normalisierten Ortsnamen
cursor = await db.execute(
f"""SELECT
al.location_name_normalized as name,
al.latitude as lat,
al.longitude as lon,
al.country_code,
al.category,
COUNT(*) as article_count,
MAX(al.confidence) as confidence
FROM article_locations al
WHERE al.incident_id IN ({ids})
GROUP BY al.location_name_normalized
ORDER BY article_count DESC"""
)
locations = [dict(r) for r in await cursor.fetchall()]
return {
"generated_at": datetime.now(TIMEZONE).isoformat(),
"incident": {
"id": incident["id"],
"title": incident["title"],
"description": incident.get("description", ""),
"status": incident["status"],
"type": incident.get("type", "adhoc"),
"created_at": incident["created_at"],
"updated_at": incident["updated_at"],
"article_count": len(articles),
"source_count": source_count,
"factcheck_count": len(fact_checks),
},
"current_lagebild": {
"summary_markdown": incident.get("summary", ""),
"sources_json": sources_json,
"updated_at": incident["updated_at"],
},
"articles": articles,
"fact_checks": fact_checks,
"available_snapshots": available_snapshots,
"locations": locations,
"category_labels": category_labels,
}
@router.get("/lagebild/snapshot/{snapshot_id}", dependencies=[Depends(verify_api_key)])
async def get_snapshot(snapshot_id: int, db=Depends(db_dependency)):
"""Liefert einen historischen Snapshot."""
ids = _in_clause(IRAN_INCIDENT_IDS)
cursor = await db.execute(
f"""SELECT id, summary, sources_json, article_count, fact_check_count, created_at
FROM incident_snapshots
WHERE id = ? AND incident_id IN ({ids})""",
(snapshot_id,),
)
snap = await cursor.fetchone()
if not snap:
raise HTTPException(status_code=404, detail="Snapshot not found")
snap = dict(snap)
try:
snap["sources_json"] = json.loads(snap.get("sources_json") or "[]")
except (json.JSONDecodeError, TypeError):
snap["sources_json"] = []
return snap