feat(sources): externer Reputations-Layer (IFCN + EUvsDisinfo)

Externe Datenquellen (kostenlos, Open Data) ergaenzen die LLM-geschaetzte
Reliability-Achse mit objektiven Signalen:

- IFCN-Signatories (raw.githubusercontent.com/IFCN/verified-signatories):
  Plain-Text-Liste anerkannter Faktencheck-Organisationen.
- EUvsDisinfo (Zenodo CSV): Pro-Kreml-Desinformations-Datenbank.

Schema-Erweiterung:
- ifcn_signatory, eu_disinfo_listed, eu_disinfo_case_count,
  eu_disinfo_last_seen, external_data_synced_at.

Service src/services/external_reputation.py:
- sync_ifcn_signatories(), sync_eu_disinfo(), apply_reputation_overrides(),
  sync_all() mit Domain-Normalisierung (lowercase, ohne www., ohne Schema).

Reliability-Override-Regeln (laufen nach Approve und manuellem Sync):
- ifcn_signatory=1 -> reliability=sehr_hoch
- eu_disinfo_case_count >= 5 -> reliability=sehr_niedrig
- eu_disinfo_case_count >= 1 -> Reliability eine Stufe runter (max niedrig)

API: POST /api/sources/external-reputation/sync (Admin, BackgroundTask).
Filter: ?ifcn_signatory=true, ?eu_disinfo_listed=true.

UI:
- Filter-Dropdown "Externe Reputation" im Quellen-Modal.
- Badges: gruenes "IFCN" und rotes "EU-Desinfo (n)".
- Tooltip macht Reliability-Quelle transparent: "(IFCN-Faktenchecker)",
  "(EU-Desinfo, n Faelle)" oder "(LLM-Schaetzung)".
- "Externe Daten syncen"-Button im Review-Toolbar (Admin-only).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
Claude Code
2026-05-07 19:40:30 +00:00
Ursprung 48a60d7579
Commit 5fc2467559
9 geänderte Dateien mit 410 neuen und 3 gelöschten Zeilen

Datei anzeigen

@@ -0,0 +1,268 @@
"""Externe Reputations-Daten fuer Quellen.
Synchronisiert Domain-Listen von oeffentlichen Reputations-/Faktencheck-Datenbanken
und schreibt die Treffer in die sources-Spalten:
- IFCN-Signatories (anerkannte Faktenchecker) -> ifcn_signatory
- EUvsDisinfo (pro-Kreml-Desinformation, Zenodo-CSV) -> eu_disinfo_listed,
eu_disinfo_case_count, eu_disinfo_last_seen
Anschliessend wendet apply_reputation_overrides() Override-Regeln auf die
reliability-Spalte an:
- ifcn_signatory=1 -> reliability='sehr_hoch'
- eu_disinfo_case_count >= 5 -> reliability='sehr_niedrig'
- eu_disinfo_case_count >= 1 -> reliability eine Stufe runter (max bis 'niedrig')
"""
import csv
import io
import logging
from collections import defaultdict
from urllib.parse import urlparse
import aiosqlite
import httpx
logger = logging.getLogger("osint.external_reputation")
IFCN_LIST_URL = "https://raw.githubusercontent.com/IFCN/verified-signatories/main/list"
EU_DISINFO_CSV_URL = "https://zenodo.org/records/10514307/files/euvsdisinfo_base.csv?download=1"
HTTP_TIMEOUT = httpx.Timeout(60.0, connect=10.0)
# Reliability-Skala in Stufenfolge (schlecht -> gut)
RELIABILITY_ORDER = ["sehr_niedrig", "niedrig", "gemischt", "hoch", "sehr_hoch"]
def _normalize_domain(raw: str | None) -> str | None:
"""Normalisiert eine Domain: lowercase, ohne www., ohne Schema/Pfad."""
if not raw:
return None
raw = raw.strip().lower()
if not raw:
return None
# Falls eine vollstaendige URL uebergeben wurde
if "://" in raw:
try:
raw = urlparse(raw).netloc or raw
except ValueError:
pass
# Pfad/Query strippen
raw = raw.split("/")[0].split("?")[0].split("#")[0]
if raw.startswith("www."):
raw = raw[4:]
return raw or None
async def _fetch_text(url: str) -> str:
"""Laedt Text von einer URL. Wirft HTTPException bei Fehler."""
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT, follow_redirects=True) as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.text
async def sync_ifcn_signatories(db: aiosqlite.Connection) -> dict:
"""Laedt IFCN-Domain-Liste und matcht gegen sources.domain.
Setzt ifcn_signatory=1 wo die Domain in der Liste vorkommt, sonst 0.
"""
text = await _fetch_text(IFCN_LIST_URL)
domains: set[str] = set()
for line in text.splitlines():
d = _normalize_domain(line)
if d:
domains.add(d)
logger.info("IFCN-Liste geladen: %d Domains", len(domains))
# Aktuelle Quellen mit Domain laden
cursor = await db.execute(
"SELECT id, domain FROM sources WHERE domain IS NOT NULL AND domain != ''"
)
sources = [dict(r) for r in await cursor.fetchall()]
matched_ids: list[int] = []
unmatched_ids: list[int] = []
for s in sources:
nd = _normalize_domain(s["domain"])
if nd and nd in domains:
matched_ids.append(s["id"])
else:
unmatched_ids.append(s["id"])
# Bulk-Update in zwei Statements
if matched_ids:
placeholders = ",".join("?" for _ in matched_ids)
await db.execute(
f"UPDATE sources SET ifcn_signatory = 1 WHERE id IN ({placeholders})",
matched_ids,
)
if unmatched_ids:
placeholders = ",".join("?" for _ in unmatched_ids)
await db.execute(
f"UPDATE sources SET ifcn_signatory = 0 WHERE id IN ({placeholders})",
unmatched_ids,
)
await db.commit()
logger.info("IFCN-Sync: %d Quellen als Faktenchecker markiert (von %d)",
len(matched_ids), len(sources))
return {
"list_size": len(domains),
"sources_checked": len(sources),
"matched": len(matched_ids),
}
async def sync_eu_disinfo(db: aiosqlite.Connection) -> dict:
"""Laedt EUvsDisinfo-CSV von Zenodo, aggregiert pro Domain, schreibt sources.
- eu_disinfo_listed: 1 wenn Domain mindestens 1x als 'disinformation' debunkt
- eu_disinfo_case_count: Anzahl Disinformation-Faelle
- eu_disinfo_last_seen: spaetestes debunk_date
"""
text = await _fetch_text(EU_DISINFO_CSV_URL)
reader = csv.DictReader(io.StringIO(text))
# Per-Domain aggregieren (nur class='disinformation')
counts: dict[str, int] = defaultdict(int)
last_seen: dict[str, str] = {}
total_rows = 0
for row in reader:
total_rows += 1
if (row.get("class") or "").strip().lower() != "disinformation":
continue
d = _normalize_domain(row.get("article_domain"))
if not d:
continue
counts[d] += 1
debunk_date = (row.get("debunk_date") or "").strip()
if debunk_date:
prev = last_seen.get(d)
if not prev or debunk_date > prev:
last_seen[d] = debunk_date
logger.info("EUvsDisinfo-CSV: %d Zeilen, %d Domains mit Desinformation",
total_rows, len(counts))
# Quellen laden + matchen
cursor = await db.execute(
"SELECT id, domain FROM sources WHERE domain IS NOT NULL AND domain != ''"
)
sources = [dict(r) for r in await cursor.fetchall()]
matched = 0
for s in sources:
nd = _normalize_domain(s["domain"])
if nd and nd in counts:
await db.execute(
"""UPDATE sources SET
eu_disinfo_listed = 1,
eu_disinfo_case_count = ?,
eu_disinfo_last_seen = ?
WHERE id = ?""",
(counts[nd], last_seen.get(nd), s["id"]),
)
matched += 1
else:
await db.execute(
"""UPDATE sources SET
eu_disinfo_listed = 0,
eu_disinfo_case_count = 0,
eu_disinfo_last_seen = NULL
WHERE id = ?""",
(s["id"],),
)
await db.commit()
logger.info("EUvsDisinfo-Sync: %d Quellen als Desinformations-Quelle markiert (von %d)",
matched, len(sources))
return {
"rows_in_csv": total_rows,
"domains_with_disinfo_in_csv": len(counts),
"sources_checked": len(sources),
"matched": matched,
}
def _override_reliability(current: str | None, ifcn: bool, eu_count: int) -> str | None:
"""Wendet Override-Regeln auf eine reliability-Stufe an.
Rueckgabe: neue Stufe (oder None, wenn unveraendert).
"""
cur = current or "na"
# IFCN gewinnt: zertifizierter Faktenchecker -> sehr_hoch (immer)
if ifcn:
return "sehr_hoch" if cur != "sehr_hoch" else None
# EUvsDisinfo: Downgrade
if eu_count >= 5:
return "sehr_niedrig" if cur != "sehr_niedrig" else None
if eu_count >= 1:
# Eine Stufe runter, mindestens bis 'niedrig'
if cur == "na":
return "niedrig"
if cur in RELIABILITY_ORDER:
idx = RELIABILITY_ORDER.index(cur)
new_idx = max(0, idx - 1)
new = RELIABILITY_ORDER[new_idx]
# Mindeststufe 'niedrig' bei eu_count >= 1
if RELIABILITY_ORDER.index(new) > RELIABILITY_ORDER.index("niedrig"):
new = "niedrig"
return new if new != cur else None
return None
async def apply_reputation_overrides(db: aiosqlite.Connection, source_id: int | None = None) -> dict:
"""Wendet Reliability-Override-Regeln an.
Wenn source_id angegeben ist, nur fuer diese Quelle. Sonst fuer alle Quellen.
"""
if source_id is not None:
cursor = await db.execute(
"SELECT id, reliability, ifcn_signatory, eu_disinfo_case_count "
"FROM sources WHERE id = ?",
(source_id,),
)
else:
cursor = await db.execute(
"SELECT id, reliability, ifcn_signatory, eu_disinfo_case_count FROM sources"
)
sources = [dict(r) for r in await cursor.fetchall()]
changed = 0
for s in sources:
new = _override_reliability(
s.get("reliability"),
bool(s.get("ifcn_signatory")),
int(s.get("eu_disinfo_case_count") or 0),
)
if new is not None:
await db.execute(
"UPDATE sources SET reliability = ? WHERE id = ?",
(new, s["id"]),
)
changed += 1
await db.commit()
logger.info("Reliability-Override: %d Quellen angepasst (von %d gepruefte)",
changed, len(sources))
return {"checked": len(sources), "changed": changed}
async def sync_all(db: aiosqlite.Connection) -> dict:
"""Vollstaendiger Sync: IFCN + EUvsDisinfo + Reliability-Override.
Setzt external_data_synced_at fuer alle Quellen.
"""
ifcn_result = await sync_ifcn_signatories(db)
eu_result = await sync_eu_disinfo(db)
override_result = await apply_reputation_overrides(db)
await db.execute(
"UPDATE sources SET external_data_synced_at = CURRENT_TIMESTAMP "
"WHERE domain IS NOT NULL AND domain != ''"
)
await db.commit()
return {
"ifcn": ifcn_result,
"eu_disinfo": eu_result,
"override": override_result,
}