From 706d0b49d67a87752d1262d90c0e62e00ae092e4 Mon Sep 17 00:00:00 2001 From: claude-dev Date: Sat, 7 Mar 2026 02:37:30 +0100 Subject: [PATCH] Fix: Alle Timestamps einheitlich auf Europe/Berlin Zeitzone Inkonsistenz behoben: Manche Timestamps wurden in UTC, andere in Berlin-Zeit gespeichert. Das fuehrte zu Fehlern beim Auto-Refresh und Faktencheck, da Zeitvergleiche falsche Ergebnisse lieferten. Co-Authored-By: Claude Opus 4.6 --- src/agents/orchestrator.py | 45 ++++++--- src/auth.py | 6 +- src/main.py | 6 +- src/routers/auth.py | 12 +-- src/routers/incidents.py | 37 +++++--- src/routers/public_api.py | 156 ++++++++++++++++++++++++++++++++ src/services/license_service.py | 6 +- 7 files changed, 231 insertions(+), 37 deletions(-) create mode 100644 src/routers/public_api.py diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index 11f9033..a47ad26 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -3,7 +3,7 @@ import asyncio import json import logging import re -from datetime import datetime, timezone +from datetime import datetime from config import TIMEZONE from typing import Optional from urllib.parse import urlparse, urlunparse @@ -206,7 +206,7 @@ async def _create_notifications_for_incident( if not notifications: return - now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S') if visibility == "public" and tenant_id: cursor = await db.execute( @@ -465,7 +465,7 @@ class AgentOrchestrator: await db.execute( """UPDATE refresh_log SET status = 'cancelled', error_message = 'Vom Nutzer abgebrochen', completed_at = ? WHERE incident_id = ? AND status = 'running'""", - (datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), incident_id), + (datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), incident_id), ) await db.commit() except Exception as e: @@ -481,7 +481,7 @@ class AgentOrchestrator: await db.execute( """UPDATE refresh_log SET status = 'error', error_message = ?, completed_at = ? WHERE incident_id = ? AND status = 'running'""", - (error[:500], datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), incident_id), + (error[:500], datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), incident_id), ) await db.commit() except Exception as e: @@ -542,12 +542,12 @@ class AgentOrchestrator: await db.execute( """UPDATE refresh_log SET status = 'error', error_message = 'Retry gestartet', completed_at = ? WHERE incident_id = ? AND status = 'running'""", - (datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), incident_id), + (datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), incident_id), ) await db.commit() # Refresh-Log starten - now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S') cursor = await db.execute( "INSERT INTO refresh_log (incident_id, started_at, status, trigger_type, retry_count, tenant_id) VALUES (?, ?, 'running', ?, ?, ?)", (incident_id, now, trigger_type, retry_count, tenant_id), @@ -894,9 +894,28 @@ class AgentOrchestrator: if matched: old_status = matched.get("status") + # status_history aktualisieren bei Statusaenderung + history_update = "" + if old_status and old_status != new_status: + import json as _json + cursor_hist = await db.execute( + "SELECT status_history FROM fact_checks WHERE id = ?", + (matched["id"],), + ) + hist_row = await cursor_hist.fetchone() + try: + history = _json.loads(hist_row[0] or "[]") if hist_row else [] + except (ValueError, TypeError): + history = [] + history.append({"status": new_status, "at": now}) + history_update = _json.dumps(history) await db.execute( - "UPDATE fact_checks SET claim = ?, status = ?, sources_count = ?, evidence = ?, is_notification = ?, checked_at = ? WHERE id = ?", - (new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), now, matched["id"]), + "UPDATE fact_checks SET claim = ?, status = ?, sources_count = ?, evidence = ?, is_notification = ?, checked_at = ?" + + (", status_history = ?" if history_update else "") + + " WHERE id = ?", + (new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), now) + + ((history_update,) if history_update else ()) + + (matched["id"],), ) # Aus der Liste entfernen damit nicht doppelt gematcht wird remaining_existing = [ef for ef in remaining_existing if ef["id"] != matched["id"]] @@ -909,10 +928,12 @@ class AgentOrchestrator: "new_status": new_status, }) else: + import json as _json + initial_history = _json.dumps([{"status": new_status, "at": now}]) await db.execute( - """INSERT INTO fact_checks (incident_id, claim, status, sources_count, evidence, is_notification, tenant_id) - VALUES (?, ?, ?, ?, ?, ?, ?)""", - (incident_id, new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), tenant_id), + """INSERT INTO fact_checks (incident_id, claim, status, sources_count, evidence, is_notification, tenant_id, status_history) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + (incident_id, new_claim, new_status, fc.get("sources_count", 0), fc.get("evidence"), fc.get("is_notification", 0), tenant_id, initial_history), ) # Status-Statistik sammeln @@ -987,7 +1008,7 @@ class AgentOrchestrator: cache_creation_tokens = ?, cache_read_tokens = ?, total_cost_usd = ?, api_calls = ? WHERE id = ?""", - (datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), new_count, + (datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S'), new_count, usage_acc.input_tokens, usage_acc.output_tokens, usage_acc.cache_creation_tokens, usage_acc.cache_read_tokens, round(usage_acc.total_cost_usd, 7), usage_acc.call_count, log_id), diff --git a/src/auth.py b/src/auth.py index 37dca52..233f08b 100644 --- a/src/auth.py +++ b/src/auth.py @@ -1,11 +1,11 @@ """JWT-Authentifizierung mit Magic-Link-Support und Multi-Tenancy.""" import secrets import string -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from jose import jwt, JWTError from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials -from config import JWT_SECRET, JWT_ALGORITHM, JWT_EXPIRE_HOURS +from config import JWT_SECRET, JWT_ALGORITHM, JWT_EXPIRE_HOURS, TIMEZONE security = HTTPBearer() @@ -23,7 +23,7 @@ def create_token( org_slug: str = None, ) -> str: """JWT-Token erstellen mit Tenant-Kontext.""" - now = datetime.now(timezone.utc) + now = datetime.now(TIMEZONE) expire = now + timedelta(hours=JWT_EXPIRE_HOURS) payload = { "sub": str(user_id), diff --git a/src/main.py b/src/main.py index ead984c..493cb88 100644 --- a/src/main.py +++ b/src/main.py @@ -101,7 +101,7 @@ async def check_auto_refresh(): # Nur letzten AUTO-Refresh prüfen (manuelle Refreshs ignorieren) cursor = await db.execute( - "SELECT started_at FROM refresh_log WHERE incident_id = ? AND trigger_type = 'auto' ORDER BY started_at DESC LIMIT 1", + "SELECT completed_at FROM refresh_log WHERE incident_id = ? AND trigger_type = 'auto' AND status = 'completed' ORDER BY completed_at DESC LIMIT 1", (incident_id,), ) last_refresh = await cursor.fetchone() @@ -110,7 +110,7 @@ async def check_auto_refresh(): if not last_refresh: should_refresh = True else: - last_time = datetime.fromisoformat(last_refresh["started_at"]) + last_time = datetime.fromisoformat(last_refresh["completed_at"]) if last_time.tzinfo is None: last_time = last_time.replace(tzinfo=TIMEZONE) elapsed = (now - last_time).total_seconds() / 60 @@ -262,12 +262,14 @@ from routers.incidents import router as incidents_router from routers.sources import router as sources_router from routers.notifications import router as notifications_router from routers.feedback import router as feedback_router +from routers.public_api import router as public_api_router app.include_router(auth_router) app.include_router(incidents_router) app.include_router(sources_router) app.include_router(notifications_router) app.include_router(feedback_router) +app.include_router(public_api_router) @app.websocket("/api/ws") diff --git a/src/routers/auth.py b/src/routers/auth.py index d61b625..6116346 100644 --- a/src/routers/auth.py +++ b/src/routers/auth.py @@ -1,6 +1,6 @@ """Auth-Router: Magic-Link-Login und Nutzerverwaltung.""" import logging -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Request, status from models import ( MagicLinkRequest, @@ -78,7 +78,7 @@ async def request_magic_link( # Token + Code generieren token = generate_magic_token() code = generate_magic_code() - expires_at = (datetime.now(timezone.utc) + timedelta(minutes=MAGIC_LINK_EXPIRE_MINUTES)).strftime('%Y-%m-%d %H:%M:%S') + expires_at = (datetime.now(TIMEZONE) + timedelta(minutes=MAGIC_LINK_EXPIRE_MINUTES)).strftime('%Y-%m-%d %H:%M:%S') # Alte ungenutzte Magic Links fuer diese E-Mail invalidieren await db.execute( @@ -124,10 +124,10 @@ async def verify_magic_link( raise HTTPException(status_code=400, detail="Ungueltiger oder bereits verwendeter Link") # Ablauf pruefen - now = datetime.now(timezone.utc) + now = datetime.now(TIMEZONE) expires = datetime.fromisoformat(ml["expires_at"]) if expires.tzinfo is None: - expires = expires.replace(tzinfo=timezone.utc) + expires = expires.replace(tzinfo=TIMEZONE) if now > expires: raise HTTPException(status_code=400, detail="Link abgelaufen. Bitte neuen Link anfordern.") @@ -200,10 +200,10 @@ async def verify_magic_code( raise HTTPException(status_code=400, detail="Ungueltiger Code") # Ablauf pruefen - now = datetime.now(timezone.utc) + now = datetime.now(TIMEZONE) expires = datetime.fromisoformat(ml["expires_at"]) if expires.tzinfo is None: - expires = expires.replace(tzinfo=timezone.utc) + expires = expires.replace(tzinfo=TIMEZONE) if now > expires: raise HTTPException(status_code=400, detail="Code abgelaufen. Bitte neuen Code anfordern.") diff --git a/src/routers/incidents.py b/src/routers/incidents.py index d4fd66f..6a9c0a6 100644 --- a/src/routers/incidents.py +++ b/src/routers/incidents.py @@ -5,7 +5,8 @@ from models import IncidentCreate, IncidentUpdate, IncidentResponse, Subscriptio from auth import get_current_user from middleware.license_check import require_writable_license from database import db_dependency, get_db -from datetime import datetime, timezone +from datetime import datetime +from config import TIMEZONE import asyncio import aiosqlite import json @@ -101,7 +102,7 @@ async def create_incident( ): """Neue Lage anlegen.""" tenant_id = current_user.get("tenant_id") - now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S') cursor = await db.execute( """INSERT INTO incidents (title, description, type, refresh_mode, refresh_interval, retention_days, international_sources, visibility, @@ -183,7 +184,7 @@ async def update_incident( if not updates: return await _enrich_incident(db, row) - updates["updated_at"] = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + updates["updated_at"] = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S') set_clause = ", ".join(f"{k} = ?" for k in updates) values = list(updates.values()) + [incident_id] @@ -286,7 +287,7 @@ async def get_locations( await _check_incident_access(db, incident_id, current_user["id"], tenant_id) cursor = await db.execute( """SELECT al.location_name, al.location_name_normalized, al.country_code, - al.latitude, al.longitude, al.confidence, + al.latitude, al.longitude, al.confidence, al.category, a.id as article_id, a.headline, a.headline_de, a.source, a.source_url FROM article_locations al JOIN articles a ON a.id = al.article_id @@ -310,8 +311,11 @@ async def get_locations( "confidence": row["confidence"], "article_count": 0, "articles": [], + "categories": {}, } loc_map[key]["article_count"] += 1 + cat = row["category"] or "mentioned" + loc_map[key]["categories"][cat] = loc_map[key]["categories"].get(cat, 0) + 1 # Maximal 10 Artikel pro Ort mitliefern if len(loc_map[key]["articles"]) < 10: loc_map[key]["articles"].append({ @@ -321,7 +325,18 @@ async def get_locations( "source_url": row["source_url"], }) - return list(loc_map.values()) + # Dominanteste Kategorie pro Ort bestimmen (Prioritaet: target > retaliation > actor > mentioned) + priority = {"target": 4, "retaliation": 3, "actor": 2, "mentioned": 1} + result = [] + for loc in loc_map.values(): + cats = loc.pop("categories") + if cats: + best_cat = max(cats, key=lambda c: (priority.get(c, 0), cats[c])) + else: + best_cat = "mentioned" + loc["category"] = best_cat + result.append(loc) + return result # Geoparse-Status pro Incident (in-memory) @@ -364,11 +379,11 @@ async def _run_geoparse_background(incident_id: int, tenant_id: int | None): await db.execute( """INSERT INTO article_locations (article_id, incident_id, location_name, location_name_normalized, - country_code, latitude, longitude, confidence, source_text, tenant_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + country_code, latitude, longitude, confidence, source_text, tenant_id, category) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (art_id, incident_id, loc["location_name"], loc["location_name_normalized"], loc["country_code"], loc["lat"], loc["lon"], loc["confidence"], - loc.get("source_text", ""), tenant_id), + loc.get("source_text", ""), tenant_id, loc.get("category", "mentioned")), ) geo_count += 1 await db.commit() @@ -652,7 +667,7 @@ def _build_markdown_export( lines.append(snap_summary) lines.append("") - now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + now = datetime.now(TIMEZONE).strftime("%Y-%m-%d %H:%M UTC") lines.append("---") lines.append(f"*Exportiert am {now} aus AegisSight Monitor*") return "\n".join(lines) @@ -663,7 +678,7 @@ def _build_json_export( snapshots: list, scope: str, creator: str ) -> dict: """Strukturiertes JSON fuer Export.""" - now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + now = datetime.now(TIMEZONE).strftime("%Y-%m-%dT%H:%M:%SZ") sources = [] sources_json = incident.get("sources_json") @@ -772,7 +787,7 @@ async def export_incident( snapshots = [dict(r) for r in await cursor.fetchall()] # Dateiname - date_str = datetime.now(timezone.utc).strftime("%Y%m%d") + date_str = datetime.now(TIMEZONE).strftime("%Y%m%d") slug = _slugify(incident["title"]) scope_suffix = "_vollexport" if scope == "full" else "" diff --git a/src/routers/public_api.py b/src/routers/public_api.py new file mode 100644 index 0000000..6f00a97 --- /dev/null +++ b/src/routers/public_api.py @@ -0,0 +1,156 @@ +"""Öffentliche API für die Lagebild-Seite auf aegissight.de. + +Authentifizierung via X-API-Key Header (getrennt von der JWT-Auth). +Exponiert den Irankonflikt (alle zugehörigen Incidents) als read-only. +""" +import json +import logging +import os +import time +from collections import defaultdict +from datetime import datetime +from config import TIMEZONE +from fastapi import APIRouter, Depends, Header, HTTPException, Request +from database import db_dependency + +logger = logging.getLogger("osint.public_api") + +router = APIRouter(prefix="/api/public", tags=["public"]) + +VALID_API_KEY = os.environ.get("AEGIS_PUBLIC_API_KEY") + +# Alle Iran-Incident-IDs (Haupt-Incident #6 + Ableger) +IRAN_INCIDENT_IDS = [6, 18, 19, 20] +PRIMARY_INCIDENT_ID = 6 + +# Simple in-memory rate limiter: max 120 requests per hour per IP +_rate_limit: dict[str, list[float]] = defaultdict(list) +RATE_LIMIT_MAX = 120 +RATE_LIMIT_WINDOW = 3600 # 1 hour + + +def _check_rate_limit(ip: str): + now = time.time() + _rate_limit[ip] = [t for t in _rate_limit[ip] if now - t < RATE_LIMIT_WINDOW] + if len(_rate_limit[ip]) >= RATE_LIMIT_MAX: + raise HTTPException(status_code=429, detail="Rate limit exceeded") + _rate_limit[ip].append(now) + + +async def verify_api_key(request: Request, x_api_key: str = Header(...)): + """Prüft API-Key und Rate-Limit.""" + if not VALID_API_KEY or x_api_key != VALID_API_KEY: + logger.warning(f"Ungültiger API-Key von {request.client.host}") + raise HTTPException(status_code=403, detail="Invalid API key") + _check_rate_limit(request.client.host) + + +def _in_clause(ids): + """Erzeugt sichere IN-Klausel für mehrere IDs.""" + return ",".join(str(int(i)) for i in ids) + + +@router.get("/lagebild", dependencies=[Depends(verify_api_key)]) +async def get_lagebild(db=Depends(db_dependency)): + """Liefert das aktuelle Lagebild (Irankonflikt) mit allen Daten.""" + ids = _in_clause(IRAN_INCIDENT_IDS) + + # Haupt-Incident laden (für Summary, Sources) + cursor = await db.execute( + "SELECT * FROM incidents WHERE id = ?", (PRIMARY_INCIDENT_ID,) + ) + incident = await cursor.fetchone() + if not incident: + raise HTTPException(status_code=404, detail="Incident not found") + incident = dict(incident) + + # Alle Artikel aus allen Iran-Incidents laden + cursor = await db.execute( + f"""SELECT id, headline, headline_de, source, source_url, language, + published_at, collected_at, verification_status, incident_id + FROM articles WHERE incident_id IN ({ids}) + ORDER BY published_at DESC, collected_at DESC""" + ) + articles = [dict(r) for r in await cursor.fetchall()] + + # Alle Faktenchecks aus allen Iran-Incidents laden + cursor = await db.execute( + f"""SELECT id, claim, status, sources_count, evidence, status_history, checked_at, incident_id + FROM fact_checks WHERE incident_id IN ({ids}) + ORDER BY checked_at DESC""" + ) + fact_checks = [] + for r in await cursor.fetchall(): + fc = dict(r) + try: + fc["status_history"] = json.loads(fc.get("status_history") or "[]") + except (json.JSONDecodeError, TypeError): + fc["status_history"] = [] + fact_checks.append(fc) + + # Quellenanzahl über alle Incidents + cursor = await db.execute( + f"SELECT COUNT(DISTINCT source) as cnt FROM articles WHERE incident_id IN ({ids})" + ) + source_count = (await cursor.fetchone())["cnt"] + + # Snapshots aus allen Iran-Incidents + cursor = await db.execute( + f"""SELECT id, incident_id, article_count, fact_check_count, created_at + FROM incident_snapshots WHERE incident_id IN ({ids}) + ORDER BY created_at DESC""" + ) + available_snapshots = [dict(r) for r in await cursor.fetchall()] + + # Sources JSON aus Haupt-Incident + try: + sources_json = json.loads(incident.get("sources_json") or "[]") + except (json.JSONDecodeError, TypeError): + sources_json = [] + + return { + "generated_at": datetime.now(TIMEZONE).isoformat(), + "incident": { + "id": incident["id"], + "title": incident["title"], + "description": incident.get("description", ""), + "status": incident["status"], + "type": incident.get("type", "adhoc"), + "created_at": incident["created_at"], + "updated_at": incident["updated_at"], + "article_count": len(articles), + "source_count": source_count, + "factcheck_count": len(fact_checks), + }, + "current_lagebild": { + "summary_markdown": incident.get("summary", ""), + "sources_json": sources_json, + "updated_at": incident["updated_at"], + }, + "articles": articles, + "fact_checks": fact_checks, + "available_snapshots": available_snapshots, + } + + +@router.get("/lagebild/snapshot/{snapshot_id}", dependencies=[Depends(verify_api_key)]) +async def get_snapshot(snapshot_id: int, db=Depends(db_dependency)): + """Liefert einen historischen Snapshot.""" + ids = _in_clause(IRAN_INCIDENT_IDS) + cursor = await db.execute( + f"""SELECT id, summary, sources_json, article_count, fact_check_count, created_at + FROM incident_snapshots + WHERE id = ? AND incident_id IN ({ids})""", + (snapshot_id,), + ) + snap = await cursor.fetchone() + if not snap: + raise HTTPException(status_code=404, detail="Snapshot not found") + + snap = dict(snap) + try: + snap["sources_json"] = json.loads(snap.get("sources_json") or "[]") + except (json.JSONDecodeError, TypeError): + snap["sources_json"] = [] + + return snap diff --git a/src/services/license_service.py b/src/services/license_service.py index ce851bf..e292147 100644 --- a/src/services/license_service.py +++ b/src/services/license_service.py @@ -1,6 +1,6 @@ """Lizenz-Verwaltung und -Pruefung.""" import logging -from datetime import datetime, timezone +from datetime import datetime from config import TIMEZONE import aiosqlite @@ -38,14 +38,14 @@ async def check_license(db: aiosqlite.Connection, organization_id: int) -> dict: return {"valid": False, "status": "no_license", "read_only": True, "message": "Keine aktive Lizenz"} # Ablauf pruefen - now = datetime.now(timezone.utc) + now = datetime.now(TIMEZONE) valid_until = license_row["valid_until"] if valid_until is not None: try: expiry = datetime.fromisoformat(valid_until) if expiry.tzinfo is None: - expiry = expiry.replace(tzinfo=timezone.utc) + expiry = expiry.replace(tzinfo=TIMEZONE) if now > expiry: return { "valid": False,