feat(x): X (Twitter) als Bezugsquelle pro Lage

X-Accounts werden analog zu Telegram als Quelle (source_type=x_account)
konfiguriert und pro Lage ueber include_x zugeschaltet. Der Scraper
(feeds/x_parser.py, twscrape) liest Account-Timelines, optional ueber
einen HTTP-Proxy mit Fallback auf direkten Abruf ueber die Server-IP.

- DB-Migration include_x, Pydantic-Modelle, incidents-Router
- Orchestrator-X-Pipeline plus Haiku-Account-Vorselektion
- sources-Router /x/validate, x_account-Typ in Stats und Frontend
- Lage-Einstellungen: X-Toggle neben international und Telegram
- twscrape als Abhaengigkeit

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
Claude Code
2026-05-22 06:52:19 +00:00
Ursprung f1200743e6
Commit 9c50439785
12 geänderte Dateien mit 547 neuen und 10 gelöschten Zeilen

Datei anzeigen

@@ -34,6 +34,7 @@ CATEGORY_REPUTATION = {
"international": 0.75, # CNN, Guardian, NYT, Al Jazeera, France24
"regional": 0.65, # regionale Tageszeitungen
"telegram": 0.5, # OSINT-Kanaele — gemischte Qualitaet
"x": 0.4, # X/Twitter-Accounts, hohes Rauschen
"sonstige": 0.4, # unkategorisiert
"boulevard": 0.3, # Bild, Sun etc.
}
@@ -750,6 +751,7 @@ class AgentOrchestrator:
# Einschraenkung passiert in get_feeds_with_metadata.
# Hinweis: source_lang_whitelist wird weiter unten geladen.
include_telegram = bool(incident["include_telegram"]) if "include_telegram" in incident.keys() else False
include_x = bool(incident["include_x"]) if "include_x" in incident.keys() else False
visibility = incident["visibility"] if "visibility" in incident.keys() else "public"
created_by = incident["created_by"] if "created_by" in incident.keys() else None
tenant_id = incident["tenant_id"] if "tenant_id" in incident.keys() else None
@@ -1078,20 +1080,67 @@ class AgentOrchestrator:
logger.info(f"Telegram-Pipeline: {len(articles)} Nachrichten")
return articles, None
async def _x_pipeline():
"""X-Account-Suche (Twitter) mit KI-basierter Account-Selektion."""
from feeds.x_parser import XParser
x_parser = XParser()
# Alle X-Accounts laden
all_accounts = await x_parser._get_x_accounts(tenant_id=tenant_id)
if not all_accounts:
logger.info("Keine X-Accounts konfiguriert")
return [], None
# KI waehlt relevante Accounts aus
x_researcher = ResearcherAgent()
selected_accounts, x_sel_usage = await x_researcher.select_relevant_x_accounts(
title, description, all_accounts
)
if x_sel_usage:
usage_acc.add(x_sel_usage)
selected_ids = [acc["id"] for acc in selected_accounts]
logger.info(f"X-Selektion: {len(selected_ids)} von {len(all_accounts)} Accounts")
# Dynamische Keywords fuer X (eigener Aufruf, da parallel zu RSS)
cursor_x_hl = await db.execute(
"""SELECT COALESCE(headline_de, headline) as hl
FROM articles WHERE incident_id = ?
AND COALESCE(headline_de, headline) IS NOT NULL
ORDER BY collected_at DESC LIMIT 30""",
(incident_id,),
)
x_headlines = [row["hl"] for row in await cursor_x_hl.fetchall() if row["hl"]]
x_keywords, x_kw_usage = await x_researcher.extract_dynamic_keywords(title, x_headlines)
if x_kw_usage:
usage_acc.add(x_kw_usage)
articles = await x_parser.search_accounts(
title, tenant_id=tenant_id, keywords=x_keywords, account_ids=selected_ids
)
logger.info(f"X-Pipeline: {len(articles)} Posts")
return articles, None
# Pipeline-Schritt 2: Nachrichten sammeln (Start)
await _pipe_start("collect")
# Pipelines parallel starten (RSS + WebSearch + Podcasts + optional Telegram)
# Pipelines parallel starten (RSS + WebSearch + Podcasts + optional Telegram/X)
pipelines = [_rss_pipeline(), _web_search_pipeline(), _podcast_pipeline()]
telegram_idx = x_idx = None
if include_telegram:
telegram_idx = len(pipelines)
pipelines.append(_telegram_pipeline())
if include_x:
x_idx = len(pipelines)
pipelines.append(_x_pipeline())
pipeline_results = await asyncio.gather(*pipelines)
(rss_articles, rss_feed_usage) = pipeline_results[0]
(search_results, search_usage, search_parse_failed) = pipeline_results[1]
(podcast_articles, _podcast_usage) = pipeline_results[2]
telegram_articles = pipeline_results[3][0] if include_telegram else []
telegram_articles = pipeline_results[telegram_idx][0] if telegram_idx is not None else []
x_articles = pipeline_results[x_idx][0] if x_idx is not None else []
# Podcast-Artikel in die RSS-Liste einfuegen (gleicher Downstream-Pfad)
if podcast_articles:
@@ -1110,7 +1159,7 @@ class AgentOrchestrator:
self._check_cancelled(incident_id)
# Alle Ergebnisse zusammenführen
all_results = rss_articles + search_results + telegram_articles
all_results = rss_articles + search_results + telegram_articles + x_articles
# Pipeline-Schritt 2: Nachrichten sammeln (fertig)
try:
_delivering_sources = len({a.get("source", "") for a in all_results if a.get("source")})

Datei anzeigen

@@ -496,6 +496,24 @@ REGELN:
Antworte NUR mit einem JSON-Array der Kanal-Nummern, z.B.: [1, 3, 5, 12]"""
X_ACCOUNT_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Waehle aus dieser Liste von X-Accounts (Twitter) diejenigen aus, die fuer die Lage relevant sein koennten.
LAGE: {title}
KONTEXT: {description}
X-ACCOUNTS:
{account_list}
REGELN:
- Waehle alle Accounts die thematisch relevant sein koennten
- Lieber einen Account zu viel als zu wenig auswaehlen
- Beachte die Kategorie und Beschreibung jedes Accounts
- Allgemeine OSINT-Accounts sind oft relevant
- Bei geopolitischen Themen: Relevante Laender-/Regions-Accounts waehlen
Antworte NUR mit einem JSON-Array der Account-Nummern, z.B.: [1, 3, 5, 12]"""
class ResearcherAgent:
"""Führt OSINT-Recherchen über Claude CLI WebSearch durch."""
@@ -1016,3 +1034,62 @@ class ResearcherAgent:
logger.warning("Telegram-Selektion fehlgeschlagen (%s), nutze alle Kanaele", e)
return channels_metadata, None
async def select_relevant_x_accounts(
self,
title: str,
description: str,
accounts_metadata: list[dict],
) -> tuple[list[dict], ClaudeUsage | None]:
"""Laesst Claude die relevanten X-Accounts fuer eine Lage vorauswaehlen.
Nutzt Haiku (CLAUDE_MODEL_FAST) fuer diese einfache Aufgabe.
Returns:
(ausgewaehlte Accounts, usage) -- Bei Fehler: (alle Accounts, None)
"""
if len(accounts_metadata) <= 10:
logger.info("X-Selektion: Nur %d Accounts, nutze alle", len(accounts_metadata))
return accounts_metadata, None
account_lines = []
for i, acc in enumerate(accounts_metadata, 1):
cat = acc.get("category", "sonstige")
notes = (acc.get("notes") or "")[:100]
account_lines.append(f"{i}. {acc['name']} [{cat}] - {notes}")
prompt = X_ACCOUNT_SELECTION_PROMPT.format(
title=title,
description=description or "Keine weitere Beschreibung",
account_list="\n".join(account_lines),
)
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
indices = _extract_json_array(result)
if not isinstance(indices, list):
logger.warning(
"X-Selektion: Kein JSON in Antwort, nutze alle Accounts. Sample: %s",
_truncate_for_log(result),
)
return accounts_metadata, usage
selected = []
for idx in indices:
if isinstance(idx, int) and 1 <= idx <= len(accounts_metadata):
selected.append(accounts_metadata[idx - 1])
if not selected:
logger.warning("X-Selektion: Keine gueltigen Indizes, nutze alle Accounts")
return accounts_metadata, usage
logger.info(
"X-Selektion: %d von %d Accounts ausgewaehlt",
len(selected), len(accounts_metadata)
)
return selected, usage
except Exception as e:
logger.warning("X-Selektion fehlgeschlagen (%s), nutze alle Accounts", e)
return accounts_metadata, None