Backend:
- GET /{id}/articles paginiert jetzt per limit/offset (Default 500,
Max 1000) und unterstuetzt optionalen search-Parameter (LIKE ueber
headline/source/content). Response-Shape: {total, articles}.
- Neuer Endpunkt GET /{id}/articles/sources-summary liefert pro Quelle
{source, article_count, languages} sowie language_counts gesamt —
serverseitige Aggregation, unabhaengig von Artikel-Paginierung.
- Neuer Endpunkt GET /{id}/articles/timeline-buckets?granularity=hour|day|week|month
aggregiert Artikel + Snapshot-Counts pro Zeitbucket (fuer spaetere
Timeline-Zaehler ueber die volle Historie).
- database.py: Index idx_articles_incident_collected auf
(incident_id, collected_at DESC) fuer schnelleres ORDER BY + Pagination.
Frontend:
- api.js: getArticles({limit, offset, search}),
getArticlesSourcesSummary(), getArticlesTimelineBuckets().
- app.js: loadIncidentDetail laedt erste Seite (500 Artikel), startet
_loadSourcesSummary parallel und zieht restliche Artikel
batchweise (500er Bloecke) im Hintergrund nach, bis _currentArticlesTotal
erreicht ist. rerenderTimeline nach jedem Batch.
- components.js: renderSourceOverviewFromSummary(data) rendert aus
Aggregat-Daten (ersetzt clientseitige Zaehlung ueber geladene Artikel).
Hintergrund: /articles lieferte bei der Iran-Lage 22 MB (17.286 Artikel
mit SELECT *). Die Erstantwort sinkt auf ~650 KB (500 Artikel), weitere
werden progressiv im Hintergrund nachgeladen. Quellenuebersicht zeigt
dank Aggregat-Endpunkt sofort alle Quellen + Sprachen komplett.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
760 Zeilen
32 KiB
Python
760 Zeilen
32 KiB
Python
"""SQLite Datenbank-Setup und Zugriff."""
|
|
import aiosqlite
|
|
import logging
|
|
import os
|
|
from config import DB_PATH, DATA_DIR
|
|
|
|
logger = logging.getLogger("osint.database")
|
|
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS organizations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
slug TEXT UNIQUE NOT NULL,
|
|
is_active INTEGER DEFAULT 1,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS licenses (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
organization_id INTEGER NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
|
|
license_type TEXT NOT NULL DEFAULT 'trial',
|
|
max_users INTEGER NOT NULL DEFAULT 5,
|
|
valid_from TIMESTAMP NOT NULL,
|
|
valid_until TIMESTAMP,
|
|
status TEXT NOT NULL DEFAULT 'active',
|
|
notes TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
email TEXT UNIQUE NOT NULL,
|
|
username TEXT NOT NULL,
|
|
password_hash TEXT,
|
|
organization_id INTEGER NOT NULL REFERENCES organizations(id),
|
|
role TEXT NOT NULL DEFAULT 'member',
|
|
is_active INTEGER DEFAULT 1,
|
|
last_login_at TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
CREATE TABLE IF NOT EXISTS magic_links (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
email TEXT NOT NULL,
|
|
token TEXT UNIQUE NOT NULL,
|
|
code TEXT NOT NULL,
|
|
purpose TEXT NOT NULL DEFAULT 'login',
|
|
user_id INTEGER REFERENCES users(id),
|
|
is_used INTEGER DEFAULT 0,
|
|
expires_at TIMESTAMP NOT NULL,
|
|
ip_address TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
-- Hinweis: portal_admins wird von der Verwaltungs-App (Admin-Portal) genutzt, die dieselbe DB teilt.
|
|
CREATE TABLE IF NOT EXISTS portal_admins (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS incidents (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
title TEXT NOT NULL,
|
|
description TEXT,
|
|
status TEXT DEFAULT 'active',
|
|
type TEXT DEFAULT 'adhoc',
|
|
refresh_mode TEXT DEFAULT 'manual',
|
|
refresh_interval INTEGER DEFAULT 15,
|
|
refresh_start_time TEXT,
|
|
retention_days INTEGER DEFAULT 0,
|
|
visibility TEXT DEFAULT 'public',
|
|
summary TEXT,
|
|
sources_json TEXT,
|
|
international_sources INTEGER DEFAULT 1,
|
|
category_labels TEXT,
|
|
tenant_id INTEGER REFERENCES organizations(id),
|
|
created_by INTEGER REFERENCES users(id),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
CREATE TABLE IF NOT EXISTS articles (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE,
|
|
headline TEXT NOT NULL,
|
|
headline_de TEXT,
|
|
source TEXT NOT NULL,
|
|
source_url TEXT,
|
|
content_original TEXT,
|
|
content_de TEXT,
|
|
language TEXT DEFAULT 'de',
|
|
published_at TIMESTAMP,
|
|
collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
verification_status TEXT DEFAULT 'unverified',
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS fact_checks (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE,
|
|
claim TEXT NOT NULL,
|
|
status TEXT DEFAULT 'developing',
|
|
sources_count INTEGER DEFAULT 0,
|
|
evidence TEXT,
|
|
is_notification INTEGER DEFAULT 0,
|
|
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS refresh_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE,
|
|
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
completed_at TIMESTAMP,
|
|
articles_found INTEGER DEFAULT 0,
|
|
status TEXT DEFAULT 'running',
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS incident_snapshots (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE,
|
|
summary TEXT,
|
|
sources_json TEXT,
|
|
article_count INTEGER DEFAULT 0,
|
|
fact_check_count INTEGER DEFAULT 0,
|
|
refresh_log_id INTEGER REFERENCES refresh_log(id),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS sources (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
url TEXT,
|
|
domain TEXT,
|
|
source_type TEXT NOT NULL DEFAULT 'rss_feed',
|
|
category TEXT NOT NULL DEFAULT 'sonstige',
|
|
status TEXT NOT NULL DEFAULT 'active',
|
|
notes TEXT,
|
|
added_by TEXT,
|
|
article_count INTEGER DEFAULT 0,
|
|
last_seen_at TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS notifications (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE,
|
|
type TEXT NOT NULL DEFAULT 'refresh_summary',
|
|
title TEXT NOT NULL,
|
|
text TEXT NOT NULL,
|
|
icon TEXT DEFAULT 'info',
|
|
is_read INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_notifications_user_read ON notifications(user_id, is_read);
|
|
|
|
CREATE TABLE IF NOT EXISTS incident_subscriptions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
incident_id INTEGER NOT NULL REFERENCES incidents(id) ON DELETE CASCADE,
|
|
notify_email_summary INTEGER DEFAULT 0,
|
|
notify_email_new_articles INTEGER DEFAULT 0,
|
|
notify_email_status_change INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(user_id, incident_id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS article_locations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
article_id INTEGER REFERENCES articles(id) ON DELETE CASCADE,
|
|
incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE,
|
|
location_name TEXT NOT NULL,
|
|
location_name_normalized TEXT,
|
|
country_code TEXT,
|
|
latitude REAL NOT NULL,
|
|
longitude REAL NOT NULL,
|
|
confidence REAL DEFAULT 0.0,
|
|
source_text TEXT,
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_article_locations_incident ON article_locations(incident_id);
|
|
CREATE INDEX IF NOT EXISTS idx_article_locations_article ON article_locations(article_id);
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS source_health_checks (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
|
|
check_type TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
message TEXT,
|
|
details TEXT,
|
|
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_source_health_source ON source_health_checks(source_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS source_suggestions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
suggestion_type TEXT NOT NULL,
|
|
title TEXT NOT NULL,
|
|
description TEXT,
|
|
source_id INTEGER REFERENCES sources(id) ON DELETE SET NULL,
|
|
suggested_data TEXT,
|
|
priority TEXT DEFAULT 'medium',
|
|
status TEXT DEFAULT 'pending',
|
|
reviewed_at TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
CREATE TABLE IF NOT EXISTS user_excluded_domains (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
domain TEXT NOT NULL,
|
|
notes TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(user_id, domain)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS network_analyses (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
status TEXT DEFAULT 'pending',
|
|
entity_count INTEGER DEFAULT 0,
|
|
relation_count INTEGER DEFAULT 0,
|
|
data_hash TEXT,
|
|
last_generated_at TIMESTAMP,
|
|
tenant_id INTEGER REFERENCES organizations(id),
|
|
created_by INTEGER REFERENCES users(id),
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS network_analysis_incidents (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
network_analysis_id INTEGER NOT NULL REFERENCES network_analyses(id) ON DELETE CASCADE,
|
|
incident_id INTEGER NOT NULL REFERENCES incidents(id) ON DELETE CASCADE,
|
|
UNIQUE(network_analysis_id, incident_id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_network_analysis_incidents_analysis ON network_analysis_incidents(network_analysis_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS network_entities (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
network_analysis_id INTEGER NOT NULL REFERENCES network_analyses(id) ON DELETE CASCADE,
|
|
name TEXT NOT NULL,
|
|
name_normalized TEXT NOT NULL,
|
|
entity_type TEXT NOT NULL,
|
|
description TEXT DEFAULT '',
|
|
aliases TEXT DEFAULT '[]',
|
|
metadata TEXT DEFAULT '{}',
|
|
mention_count INTEGER DEFAULT 0,
|
|
corrected_by_opus INTEGER DEFAULT 0,
|
|
tenant_id INTEGER REFERENCES organizations(id),
|
|
UNIQUE(network_analysis_id, name_normalized, entity_type)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_network_entities_analysis ON network_entities(network_analysis_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS network_entity_mentions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
entity_id INTEGER NOT NULL REFERENCES network_entities(id) ON DELETE CASCADE,
|
|
article_id INTEGER REFERENCES articles(id) ON DELETE CASCADE,
|
|
incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE,
|
|
source_text TEXT,
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_network_entity_mentions_entity ON network_entity_mentions(entity_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS network_relations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
network_analysis_id INTEGER NOT NULL REFERENCES network_analyses(id) ON DELETE CASCADE,
|
|
source_entity_id INTEGER NOT NULL REFERENCES network_entities(id) ON DELETE CASCADE,
|
|
target_entity_id INTEGER NOT NULL REFERENCES network_entities(id) ON DELETE CASCADE,
|
|
category TEXT NOT NULL,
|
|
label TEXT NOT NULL,
|
|
description TEXT DEFAULT '',
|
|
weight INTEGER DEFAULT 1,
|
|
status TEXT DEFAULT '',
|
|
evidence TEXT DEFAULT '[]',
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_network_relations_analysis ON network_relations(network_analysis_id);
|
|
CREATE INDEX IF NOT EXISTS idx_network_relations_source ON network_relations(source_entity_id);
|
|
CREATE INDEX IF NOT EXISTS idx_network_relations_target ON network_relations(target_entity_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS network_generation_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
network_analysis_id INTEGER NOT NULL REFERENCES network_analyses(id) ON DELETE CASCADE,
|
|
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
completed_at TIMESTAMP,
|
|
status TEXT DEFAULT 'running',
|
|
input_tokens INTEGER DEFAULT 0,
|
|
output_tokens INTEGER DEFAULT 0,
|
|
cache_creation_tokens INTEGER DEFAULT 0,
|
|
cache_read_tokens INTEGER DEFAULT 0,
|
|
total_cost_usd REAL DEFAULT 0.0,
|
|
api_calls INTEGER DEFAULT 0,
|
|
entity_count INTEGER DEFAULT 0,
|
|
relation_count INTEGER DEFAULT 0,
|
|
error_message TEXT,
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
"""
|
|
|
|
|
|
async def get_db() -> aiosqlite.Connection:
|
|
"""Erstellt eine neue Datenbankverbindung."""
|
|
os.makedirs(DATA_DIR, exist_ok=True)
|
|
db = await aiosqlite.connect(DB_PATH)
|
|
db.row_factory = aiosqlite.Row
|
|
await db.execute("PRAGMA journal_mode=WAL")
|
|
await db.execute("PRAGMA foreign_keys=ON")
|
|
await db.execute("PRAGMA busy_timeout=5000")
|
|
return db
|
|
|
|
|
|
async def init_db():
|
|
"""Initialisiert die Datenbank mit dem Schema."""
|
|
db = await get_db()
|
|
try:
|
|
await db.executescript(SCHEMA)
|
|
await db.commit()
|
|
|
|
# --- Migrationen fuer bestehende Datenbanken ---
|
|
|
|
# Incidents-Spalten pruefen
|
|
cursor = await db.execute("PRAGMA table_info(incidents)")
|
|
columns = [row[1] for row in await cursor.fetchall()]
|
|
|
|
if "type" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN type TEXT DEFAULT 'adhoc'")
|
|
await db.commit()
|
|
|
|
if "sources_json" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN sources_json TEXT")
|
|
await db.commit()
|
|
|
|
if "international_sources" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN international_sources INTEGER DEFAULT 1")
|
|
await db.commit()
|
|
|
|
if "visibility" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN visibility TEXT DEFAULT 'public'")
|
|
await db.commit()
|
|
|
|
if "include_telegram" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN include_telegram INTEGER DEFAULT 0")
|
|
await db.commit()
|
|
logger.info("Migration: include_telegram zu incidents hinzugefuegt")
|
|
|
|
if "telegram_categories" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN telegram_categories TEXT DEFAULT NULL")
|
|
await db.commit()
|
|
logger.info("Migration: telegram_categories zu incidents hinzugefuegt")
|
|
|
|
if "category_labels" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN category_labels TEXT")
|
|
await db.commit()
|
|
logger.info("Migration: category_labels zu incidents hinzugefuegt")
|
|
|
|
if "tenant_id" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)")
|
|
await db.commit()
|
|
logger.info("Migration: tenant_id zu incidents hinzugefuegt")
|
|
|
|
if "refresh_start_time" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN refresh_start_time TEXT")
|
|
await db.execute("UPDATE incidents SET refresh_start_time = '07:00' WHERE refresh_mode = 'auto'")
|
|
await db.commit()
|
|
logger.info("Migration: refresh_start_time zu incidents hinzugefuegt (bestehende Auto-Lagen auf 07:00)")
|
|
|
|
if "latest_developments" not in columns:
|
|
await db.execute("ALTER TABLE incidents ADD COLUMN latest_developments TEXT")
|
|
await db.commit()
|
|
logger.info("Migration: latest_developments zu incidents hinzugefuegt")
|
|
|
|
# Migration: Tabelle podcast_transcripts (URL-Cache fuer Transkripte)
|
|
cursor = await db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='podcast_transcripts'"
|
|
)
|
|
if not await cursor.fetchone():
|
|
await db.execute(
|
|
"""
|
|
CREATE TABLE podcast_transcripts (
|
|
url TEXT PRIMARY KEY,
|
|
transcript TEXT NOT NULL,
|
|
source TEXT NOT NULL,
|
|
segments_json TEXT,
|
|
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
)
|
|
await db.commit()
|
|
logger.info("Migration: Tabelle podcast_transcripts angelegt")
|
|
|
|
# Migration: Token-Spalten fuer refresh_log
|
|
cursor = await db.execute("PRAGMA table_info(refresh_log)")
|
|
rl_columns = [row[1] for row in await cursor.fetchall()]
|
|
if "input_tokens" not in rl_columns:
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN input_tokens INTEGER DEFAULT 0")
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN output_tokens INTEGER DEFAULT 0")
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN cache_creation_tokens INTEGER DEFAULT 0")
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN cache_read_tokens INTEGER DEFAULT 0")
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN total_cost_usd REAL DEFAULT 0.0")
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN api_calls INTEGER DEFAULT 0")
|
|
await db.commit()
|
|
|
|
if "trigger_type" not in rl_columns:
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN trigger_type TEXT DEFAULT 'manual'")
|
|
await db.commit()
|
|
|
|
if "retry_count" not in rl_columns:
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN retry_count INTEGER DEFAULT 0")
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN error_message TEXT")
|
|
await db.commit()
|
|
|
|
if "tenant_id" not in rl_columns:
|
|
await db.execute("ALTER TABLE refresh_log ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)")
|
|
await db.commit()
|
|
|
|
# Migration: notifications-Tabelle (fuer bestehende DBs)
|
|
cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='notifications'")
|
|
if not await cursor.fetchone():
|
|
await db.executescript("""
|
|
CREATE TABLE IF NOT EXISTS notifications (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE,
|
|
type TEXT NOT NULL DEFAULT 'refresh_summary',
|
|
title TEXT NOT NULL,
|
|
text TEXT NOT NULL,
|
|
icon TEXT DEFAULT 'info',
|
|
is_read INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_notifications_user_read ON notifications(user_id, is_read);
|
|
""")
|
|
await db.commit()
|
|
|
|
# Migration: incident_subscriptions-Tabelle (fuer bestehende DBs)
|
|
cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='incident_subscriptions'")
|
|
if not await cursor.fetchone():
|
|
await db.executescript("""
|
|
CREATE TABLE IF NOT EXISTS incident_subscriptions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
incident_id INTEGER NOT NULL REFERENCES incidents(id) ON DELETE CASCADE,
|
|
notify_email_summary INTEGER DEFAULT 0,
|
|
notify_email_new_articles INTEGER DEFAULT 0,
|
|
notify_email_status_change INTEGER DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(user_id, incident_id)
|
|
);
|
|
""")
|
|
await db.commit()
|
|
logger.info("Migration: incident_subscriptions-Tabelle erstellt")
|
|
else:
|
|
# Migration: Spalte umbenennen contradiction -> new_articles
|
|
cursor = await db.execute("PRAGMA table_info(incident_subscriptions)")
|
|
sub_columns = [row[1] for row in await cursor.fetchall()]
|
|
if "notify_email_contradiction" in sub_columns:
|
|
await db.execute("ALTER TABLE incident_subscriptions RENAME COLUMN notify_email_contradiction TO notify_email_new_articles")
|
|
await db.commit()
|
|
logger.info("Migration: notify_email_contradiction -> notify_email_new_articles umbenannt")
|
|
|
|
# Migration: role-Spalte fuer users
|
|
cursor = await db.execute("PRAGMA table_info(users)")
|
|
user_columns = [row[1] for row in await cursor.fetchall()]
|
|
if "role" not in user_columns:
|
|
await db.execute("ALTER TABLE users ADD COLUMN role TEXT DEFAULT 'member'")
|
|
await db.execute("UPDATE users SET role = 'org_admin'")
|
|
await db.commit()
|
|
logger.info("Migration: role-Spalte zu users hinzugefuegt")
|
|
|
|
# Migration: email, organization_id, is_active, last_login_at fuer users
|
|
if "email" not in user_columns:
|
|
await db.execute("ALTER TABLE users ADD COLUMN email TEXT")
|
|
await db.commit()
|
|
logger.info("Migration: email zu users hinzugefuegt")
|
|
|
|
if "organization_id" not in user_columns:
|
|
await db.execute("ALTER TABLE users ADD COLUMN organization_id INTEGER REFERENCES organizations(id)")
|
|
await db.commit()
|
|
logger.info("Migration: organization_id zu users hinzugefuegt")
|
|
|
|
# Index erst nach Spalten-Migration erstellen
|
|
try:
|
|
await db.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_org_username ON users(organization_id, username)")
|
|
await db.commit()
|
|
except Exception:
|
|
pass # Index existiert bereits oder Spalte fehlt noch
|
|
|
|
if "is_active" not in user_columns:
|
|
await db.execute("ALTER TABLE users ADD COLUMN is_active INTEGER DEFAULT 1")
|
|
await db.commit()
|
|
|
|
# Migration: Tutorial-Fortschritt pro User
|
|
if "tutorial_step" not in user_columns:
|
|
await db.execute("ALTER TABLE users ADD COLUMN tutorial_step INTEGER DEFAULT NULL")
|
|
await db.execute("ALTER TABLE users ADD COLUMN tutorial_completed INTEGER DEFAULT 0")
|
|
await db.commit()
|
|
logger.info("Migration: tutorial_step + tutorial_completed zu users hinzugefuegt")
|
|
|
|
if "last_login_at" not in user_columns:
|
|
await db.execute("ALTER TABLE users ADD COLUMN last_login_at TIMESTAMP")
|
|
await db.commit()
|
|
|
|
# Migration: tenant_id fuer articles
|
|
cursor = await db.execute("PRAGMA table_info(articles)")
|
|
art_columns = [row[1] for row in await cursor.fetchall()]
|
|
if "tenant_id" not in art_columns:
|
|
await db.execute("ALTER TABLE articles ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)")
|
|
await db.commit()
|
|
|
|
# Migration: tenant_id fuer fact_checks
|
|
cursor = await db.execute("PRAGMA table_info(fact_checks)")
|
|
fc_columns = [row[1] for row in await cursor.fetchall()]
|
|
if "tenant_id" not in fc_columns:
|
|
await db.execute("ALTER TABLE fact_checks ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)")
|
|
await db.commit()
|
|
|
|
# Migration: status_history fuer fact_checks (Faktencheck-Verlauf)
|
|
if "status_history" not in fc_columns:
|
|
await db.execute("ALTER TABLE fact_checks ADD COLUMN status_history TEXT DEFAULT '[]'")
|
|
# Bestehende Eintraege initialisieren
|
|
cursor2 = await db.execute("SELECT id, status, checked_at FROM fact_checks")
|
|
for row2 in await cursor2.fetchall():
|
|
import json as _json
|
|
initial_history = _json.dumps([{"status": row2[1], "at": str(row2[2])}])
|
|
await db.execute("UPDATE fact_checks SET status_history = ? WHERE id = ?", (initial_history, row2[0]))
|
|
await db.commit()
|
|
logger.info("Migration: status_history zu fact_checks hinzugefuegt")
|
|
|
|
# Migration: category fuer article_locations (Marker-Klassifizierung)
|
|
cursor = await db.execute("PRAGMA table_info(article_locations)")
|
|
al_columns = [row[1] for row in await cursor.fetchall()]
|
|
if "category" not in al_columns:
|
|
await db.execute("ALTER TABLE article_locations ADD COLUMN category TEXT DEFAULT 'mentioned'")
|
|
await db.commit()
|
|
logger.info("Migration: category zu article_locations hinzugefuegt")
|
|
|
|
# Migration: Alte Kategorie-Werte auf neue Keys umbenennen
|
|
try:
|
|
await db.execute(
|
|
"UPDATE article_locations SET category = 'primary' WHERE category = 'target'"
|
|
)
|
|
await db.execute(
|
|
"UPDATE article_locations SET category = 'secondary' WHERE category IN ('response', 'retaliation')"
|
|
)
|
|
await db.execute(
|
|
"UPDATE article_locations SET category = 'tertiary' WHERE category IN ('actor', 'context')"
|
|
)
|
|
changed = db.total_changes
|
|
await db.commit()
|
|
if changed > 0:
|
|
logger.info("Migration: article_locations Kategorien umbenannt (target->primary, response/retaliation->secondary, actor->tertiary)")
|
|
except Exception:
|
|
pass # Bereits migriert oder keine Daten
|
|
|
|
# Migration: tenant_id fuer incident_snapshots
|
|
cursor = await db.execute("PRAGMA table_info(incident_snapshots)")
|
|
snap_columns2 = [row[1] for row in await cursor.fetchall()]
|
|
if "tenant_id" not in snap_columns2:
|
|
await db.execute("ALTER TABLE incident_snapshots ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)")
|
|
await db.commit()
|
|
|
|
# Migration: tenant_id fuer sources
|
|
cursor = await db.execute("PRAGMA table_info(sources)")
|
|
src_columns = [row[1] for row in await cursor.fetchall()]
|
|
if "tenant_id" not in src_columns:
|
|
await db.execute("ALTER TABLE sources ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)")
|
|
await db.commit()
|
|
|
|
# Migration: tenant_id fuer notifications
|
|
cursor = await db.execute("PRAGMA table_info(notifications)")
|
|
notif_columns = [row[1] for row in await cursor.fetchall()]
|
|
if "tenant_id" not in notif_columns:
|
|
await db.execute("ALTER TABLE notifications ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)")
|
|
await db.commit()
|
|
|
|
# Indexes erstellen (nach Spalten-Migrationen)
|
|
for idx_sql in [
|
|
"CREATE INDEX IF NOT EXISTS idx_incidents_tenant_status ON incidents(tenant_id, status)",
|
|
"CREATE INDEX IF NOT EXISTS idx_articles_tenant_incident ON articles(tenant_id, incident_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_articles_incident_collected ON articles(incident_id, collected_at DESC)",
|
|
]:
|
|
try:
|
|
await db.execute(idx_sql)
|
|
await db.commit()
|
|
except Exception:
|
|
pass
|
|
|
|
# Migration: article_locations-Tabelle (fuer bestehende DBs)
|
|
cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='article_locations'")
|
|
if not await cursor.fetchone():
|
|
await db.executescript("""
|
|
CREATE TABLE IF NOT EXISTS article_locations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
article_id INTEGER REFERENCES articles(id) ON DELETE CASCADE,
|
|
incident_id INTEGER REFERENCES incidents(id) ON DELETE CASCADE,
|
|
location_name TEXT NOT NULL,
|
|
location_name_normalized TEXT,
|
|
country_code TEXT,
|
|
latitude REAL NOT NULL,
|
|
longitude REAL NOT NULL,
|
|
confidence REAL DEFAULT 0.0,
|
|
source_text TEXT,
|
|
tenant_id INTEGER REFERENCES organizations(id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_article_locations_incident ON article_locations(incident_id);
|
|
CREATE INDEX IF NOT EXISTS idx_article_locations_article ON article_locations(article_id);
|
|
""")
|
|
await db.commit()
|
|
logger.info("Migration: article_locations-Tabelle erstellt")
|
|
|
|
|
|
# Migration: Credits-System fuer Lizenzen
|
|
cursor = await db.execute("PRAGMA table_info(licenses)")
|
|
columns = [row[1] for row in await cursor.fetchall()]
|
|
if "token_budget_usd" not in columns:
|
|
await db.execute("ALTER TABLE licenses ADD COLUMN token_budget_usd REAL")
|
|
await db.execute("ALTER TABLE licenses ADD COLUMN credits_total INTEGER")
|
|
await db.execute("ALTER TABLE licenses ADD COLUMN credits_used REAL DEFAULT 0")
|
|
await db.execute("ALTER TABLE licenses ADD COLUMN cost_per_credit REAL")
|
|
await db.execute("ALTER TABLE licenses ADD COLUMN budget_warning_percent INTEGER DEFAULT 80")
|
|
await db.commit()
|
|
logger.info("Migration: Credits-System zu Lizenzen hinzugefuegt")
|
|
|
|
# Migration: Token-Usage-Monatstabelle
|
|
cursor = await db.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='token_usage_monthly'")
|
|
if not await cursor.fetchone():
|
|
await db.execute("""
|
|
CREATE TABLE token_usage_monthly (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
organization_id INTEGER REFERENCES organizations(id),
|
|
year_month TEXT NOT NULL,
|
|
input_tokens INTEGER DEFAULT 0,
|
|
output_tokens INTEGER DEFAULT 0,
|
|
cache_creation_tokens INTEGER DEFAULT 0,
|
|
cache_read_tokens INTEGER DEFAULT 0,
|
|
total_cost_usd REAL DEFAULT 0.0,
|
|
api_calls INTEGER DEFAULT 0,
|
|
refresh_count INTEGER DEFAULT 0,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(organization_id, year_month)
|
|
)
|
|
""")
|
|
await db.commit()
|
|
logger.info("Migration: token_usage_monthly Tabelle erstellt")
|
|
|
|
# Verwaiste running-Eintraege beim Start als error markieren (aelter als 15 Min)
|
|
await db.execute(
|
|
"""UPDATE refresh_log SET status = 'error', error_message = 'Verwaist beim Neustart',
|
|
completed_at = CURRENT_TIMESTAMP
|
|
WHERE status = 'running'
|
|
AND started_at < datetime('now', '-15 minutes')"""
|
|
)
|
|
await db.commit()
|
|
|
|
# Sources-Tabelle seeden (nur wenn leer)
|
|
cursor = await db.execute("SELECT COUNT(*) as cnt FROM sources")
|
|
row = await cursor.fetchone()
|
|
if row["cnt"] == 0:
|
|
await _seed_sources(db)
|
|
|
|
finally:
|
|
await db.close()
|
|
|
|
|
|
async def _seed_sources(db: aiosqlite.Connection):
|
|
"""Befuellt die sources-Tabelle aus der config.py-Konfiguration."""
|
|
from config import RSS_FEEDS, EXCLUDED_SOURCES
|
|
|
|
category_map = {
|
|
"tagesschau": "oeffentlich-rechtlich",
|
|
"ZDF heute": "oeffentlich-rechtlich",
|
|
"Deutsche Welle": "oeffentlich-rechtlich",
|
|
"Spiegel": "qualitaetszeitung",
|
|
"Zeit": "qualitaetszeitung",
|
|
"FAZ": "qualitaetszeitung",
|
|
"Süddeutsche": "qualitaetszeitung",
|
|
"NZZ": "qualitaetszeitung",
|
|
"Reuters": "nachrichtenagentur",
|
|
"AP News": "nachrichtenagentur",
|
|
"BBC World": "international",
|
|
"Al Jazeera": "international",
|
|
"France24": "international",
|
|
"BMI": "behoerde",
|
|
"Europol": "behoerde",
|
|
}
|
|
|
|
for _rss_category, feeds in RSS_FEEDS.items():
|
|
for feed in feeds:
|
|
name = feed["name"]
|
|
url = feed["url"]
|
|
try:
|
|
from urllib.parse import urlparse
|
|
domain = urlparse(url).netloc.lower().replace("www.", "")
|
|
except Exception:
|
|
domain = ""
|
|
|
|
category = category_map.get(name, "sonstige")
|
|
await db.execute(
|
|
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
|
|
VALUES (?, ?, ?, 'rss_feed', ?, 'active', 'system', NULL)""",
|
|
(name, url, domain, category),
|
|
)
|
|
|
|
for excl in EXCLUDED_SOURCES:
|
|
await db.execute(
|
|
"""INSERT INTO sources (name, domain, source_type, category, status, added_by, tenant_id)
|
|
VALUES (?, ?, 'excluded', 'sonstige', 'active', 'system', NULL)""",
|
|
(excl, excl),
|
|
)
|
|
|
|
await db.commit()
|
|
await refresh_source_counts(db)
|
|
|
|
logger.info(f"Sources-Tabelle geseeded: {len(RSS_FEEDS.get('deutsch', []))+len(RSS_FEEDS.get('international', []))+len(RSS_FEEDS.get('behoerden', []))} RSS-Feeds, {len(EXCLUDED_SOURCES)} ausgeschlossene Quellen")
|
|
|
|
|
|
async def refresh_source_counts(db: aiosqlite.Connection):
|
|
"""Berechnet Artikelzaehler und last_seen_at fuer alle Quellen neu."""
|
|
cursor = await db.execute("SELECT id, name, domain FROM sources WHERE source_type != 'excluded'")
|
|
sources = await cursor.fetchall()
|
|
|
|
for source in sources:
|
|
sid = source["id"]
|
|
name = source["name"]
|
|
domain = source["domain"] or ""
|
|
|
|
if domain:
|
|
cursor = await db.execute(
|
|
"""SELECT COUNT(*) as cnt, MAX(collected_at) as last_seen
|
|
FROM articles WHERE source = ? OR source_url LIKE ?""",
|
|
(name, f"%{domain}%"),
|
|
)
|
|
else:
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) as cnt, MAX(collected_at) as last_seen FROM articles WHERE source = ?",
|
|
(name,),
|
|
)
|
|
row = await cursor.fetchone()
|
|
await db.execute(
|
|
"UPDATE sources SET article_count = ?, last_seen_at = ? WHERE id = ?",
|
|
(row["cnt"], row["last_seen"], sid),
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
|
|
async def db_dependency():
|
|
"""FastAPI Dependency fuer Datenbankverbindungen."""
|
|
db = await get_db()
|
|
try:
|
|
yield db
|
|
finally:
|
|
await db.close()
|