diff --git a/requirements.txt b/requirements.txt index 2c90cc1..da0c809 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,8 @@ python-multipart aiosmtplib geonamescache>=2.0 telethon +# X/Twitter-Scraper (feeds/x_parser.py) +twscrape # Bericht-Export (PDF via WeasyPrint + DOCX via python-docx) Jinja2>=3.1 weasyprint>=68.0 diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index 5bdff2c..7a9760f 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -34,6 +34,7 @@ CATEGORY_REPUTATION = { "international": 0.75, # CNN, Guardian, NYT, Al Jazeera, France24 "regional": 0.65, # regionale Tageszeitungen "telegram": 0.5, # OSINT-Kanaele — gemischte Qualitaet + "x": 0.4, # X/Twitter-Accounts, hohes Rauschen "sonstige": 0.4, # unkategorisiert "boulevard": 0.3, # Bild, Sun etc. } @@ -750,6 +751,7 @@ class AgentOrchestrator: # Einschraenkung passiert in get_feeds_with_metadata. # Hinweis: source_lang_whitelist wird weiter unten geladen. include_telegram = bool(incident["include_telegram"]) if "include_telegram" in incident.keys() else False + include_x = bool(incident["include_x"]) if "include_x" in incident.keys() else False visibility = incident["visibility"] if "visibility" in incident.keys() else "public" created_by = incident["created_by"] if "created_by" in incident.keys() else None tenant_id = incident["tenant_id"] if "tenant_id" in incident.keys() else None @@ -1078,20 +1080,67 @@ class AgentOrchestrator: logger.info(f"Telegram-Pipeline: {len(articles)} Nachrichten") return articles, None + async def _x_pipeline(): + """X-Account-Suche (Twitter) mit KI-basierter Account-Selektion.""" + from feeds.x_parser import XParser + x_parser = XParser() + + # Alle X-Accounts laden + all_accounts = await x_parser._get_x_accounts(tenant_id=tenant_id) + if not all_accounts: + logger.info("Keine X-Accounts konfiguriert") + return [], None + + # KI waehlt relevante Accounts aus + x_researcher = ResearcherAgent() + selected_accounts, x_sel_usage = await x_researcher.select_relevant_x_accounts( + title, description, all_accounts + ) + if x_sel_usage: + usage_acc.add(x_sel_usage) + + selected_ids = [acc["id"] for acc in selected_accounts] + logger.info(f"X-Selektion: {len(selected_ids)} von {len(all_accounts)} Accounts") + + # Dynamische Keywords fuer X (eigener Aufruf, da parallel zu RSS) + cursor_x_hl = await db.execute( + """SELECT COALESCE(headline_de, headline) as hl + FROM articles WHERE incident_id = ? + AND COALESCE(headline_de, headline) IS NOT NULL + ORDER BY collected_at DESC LIMIT 30""", + (incident_id,), + ) + x_headlines = [row["hl"] for row in await cursor_x_hl.fetchall() if row["hl"]] + x_keywords, x_kw_usage = await x_researcher.extract_dynamic_keywords(title, x_headlines) + if x_kw_usage: + usage_acc.add(x_kw_usage) + + articles = await x_parser.search_accounts( + title, tenant_id=tenant_id, keywords=x_keywords, account_ids=selected_ids + ) + logger.info(f"X-Pipeline: {len(articles)} Posts") + return articles, None + # Pipeline-Schritt 2: Nachrichten sammeln (Start) await _pipe_start("collect") - # Pipelines parallel starten (RSS + WebSearch + Podcasts + optional Telegram) + # Pipelines parallel starten (RSS + WebSearch + Podcasts + optional Telegram/X) pipelines = [_rss_pipeline(), _web_search_pipeline(), _podcast_pipeline()] + telegram_idx = x_idx = None if include_telegram: + telegram_idx = len(pipelines) pipelines.append(_telegram_pipeline()) + if include_x: + x_idx = len(pipelines) + pipelines.append(_x_pipeline()) pipeline_results = await asyncio.gather(*pipelines) (rss_articles, rss_feed_usage) = pipeline_results[0] (search_results, search_usage, search_parse_failed) = pipeline_results[1] (podcast_articles, _podcast_usage) = pipeline_results[2] - telegram_articles = pipeline_results[3][0] if include_telegram else [] + telegram_articles = pipeline_results[telegram_idx][0] if telegram_idx is not None else [] + x_articles = pipeline_results[x_idx][0] if x_idx is not None else [] # Podcast-Artikel in die RSS-Liste einfuegen (gleicher Downstream-Pfad) if podcast_articles: @@ -1110,7 +1159,7 @@ class AgentOrchestrator: self._check_cancelled(incident_id) # Alle Ergebnisse zusammenführen - all_results = rss_articles + search_results + telegram_articles + all_results = rss_articles + search_results + telegram_articles + x_articles # Pipeline-Schritt 2: Nachrichten sammeln (fertig) try: _delivering_sources = len({a.get("source", "") for a in all_results if a.get("source")}) diff --git a/src/agents/researcher.py b/src/agents/researcher.py index 4845e1e..70e4786 100644 --- a/src/agents/researcher.py +++ b/src/agents/researcher.py @@ -496,6 +496,24 @@ REGELN: Antworte NUR mit einem JSON-Array der Kanal-Nummern, z.B.: [1, 3, 5, 12]""" +X_ACCOUNT_SELECTION_PROMPT = """Du bist ein OSINT-Analyst. Waehle aus dieser Liste von X-Accounts (Twitter) diejenigen aus, die fuer die Lage relevant sein koennten. + +LAGE: {title} +KONTEXT: {description} + +X-ACCOUNTS: +{account_list} + +REGELN: +- Waehle alle Accounts die thematisch relevant sein koennten +- Lieber einen Account zu viel als zu wenig auswaehlen +- Beachte die Kategorie und Beschreibung jedes Accounts +- Allgemeine OSINT-Accounts sind oft relevant +- Bei geopolitischen Themen: Relevante Laender-/Regions-Accounts waehlen + +Antworte NUR mit einem JSON-Array der Account-Nummern, z.B.: [1, 3, 5, 12]""" + + class ResearcherAgent: """Führt OSINT-Recherchen über Claude CLI WebSearch durch.""" @@ -1016,3 +1034,62 @@ class ResearcherAgent: logger.warning("Telegram-Selektion fehlgeschlagen (%s), nutze alle Kanaele", e) return channels_metadata, None + async def select_relevant_x_accounts( + self, + title: str, + description: str, + accounts_metadata: list[dict], + ) -> tuple[list[dict], ClaudeUsage | None]: + """Laesst Claude die relevanten X-Accounts fuer eine Lage vorauswaehlen. + + Nutzt Haiku (CLAUDE_MODEL_FAST) fuer diese einfache Aufgabe. + + Returns: + (ausgewaehlte Accounts, usage) -- Bei Fehler: (alle Accounts, None) + """ + if len(accounts_metadata) <= 10: + logger.info("X-Selektion: Nur %d Accounts, nutze alle", len(accounts_metadata)) + return accounts_metadata, None + + account_lines = [] + for i, acc in enumerate(accounts_metadata, 1): + cat = acc.get("category", "sonstige") + notes = (acc.get("notes") or "")[:100] + account_lines.append(f"{i}. {acc['name']} [{cat}] - {notes}") + + prompt = X_ACCOUNT_SELECTION_PROMPT.format( + title=title, + description=description or "Keine weitere Beschreibung", + account_list="\n".join(account_lines), + ) + + try: + result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) + + indices = _extract_json_array(result) + if not isinstance(indices, list): + logger.warning( + "X-Selektion: Kein JSON in Antwort, nutze alle Accounts. Sample: %s", + _truncate_for_log(result), + ) + return accounts_metadata, usage + + selected = [] + for idx in indices: + if isinstance(idx, int) and 1 <= idx <= len(accounts_metadata): + selected.append(accounts_metadata[idx - 1]) + + if not selected: + logger.warning("X-Selektion: Keine gueltigen Indizes, nutze alle Accounts") + return accounts_metadata, usage + + logger.info( + "X-Selektion: %d von %d Accounts ausgewaehlt", + len(selected), len(accounts_metadata) + ) + return selected, usage + + except Exception as e: + logger.warning("X-Selektion fehlgeschlagen (%s), nutze alle Accounts", e) + return accounts_metadata, None + diff --git a/src/config.py b/src/config.py index 9acf64d..538537c 100644 --- a/src/config.py +++ b/src/config.py @@ -97,6 +97,19 @@ TELEGRAM_API_ID = int(os.environ.get("TELEGRAM_API_ID", "0")) TELEGRAM_API_HASH = os.environ.get("TELEGRAM_API_HASH", "") TELEGRAM_SESSION_PATH = os.environ.get("TELEGRAM_SESSION_PATH", "/home/claude-dev/.telegram/telegram_session") +# X / Twitter (twscrape) -- siehe feeds/x_parser.py +# Scraper liest Account-Timelines konfigurierter X-Quellen (source_type='x_account'). +X_SCRAPER_ENABLED = os.environ.get("X_SCRAPER_ENABLED", "true").lower() == "true" +# twscrape-Account-Store (SQLite). Liegt ausserhalb des Repos. +X_ACCOUNTS_DB_PATH = os.environ.get("X_ACCOUNTS_DB_PATH", "/home/claude-dev/.x-scraper/accounts.db") +# HTTP-Proxy fuer den X-Egress (tinyproxy am RUTX11 ueber WireGuard). +# Leer = direkter Abruf ueber die Server-IP. Bei gesetztem Wert prueft der +# Parser den Proxy vor jedem Lauf und faellt bei Ausfall auf direkt zurueck. +X_PROXY_URL = os.environ.get("X_PROXY_URL", "") +# Max. Posts pro Account-Timeline und Recency-Fenster in Tagen. +X_POST_CAP_PER_ACCOUNT = int(os.environ.get("X_POST_CAP_PER_ACCOUNT", "40")) +X_RECENCY_DAYS = int(os.environ.get("X_RECENCY_DAYS", "14")) + # Health-Check (genutzt von services/source_health.py) HEALTH_CHECK_USER_AGENT = os.environ.get( "HEALTH_CHECK_USER_AGENT", diff --git a/src/database.py b/src/database.py index b4cb242..9f3ae88 100644 --- a/src/database.py +++ b/src/database.py @@ -403,6 +403,11 @@ async def init_db(): await db.commit() logger.info("Migration: include_telegram zu incidents hinzugefuegt") + if "include_x" not in columns: + await db.execute("ALTER TABLE incidents ADD COLUMN include_x INTEGER DEFAULT 0") + await db.commit() + logger.info("Migration: include_x zu incidents hinzugefuegt") + if "telegram_categories" not in columns: await db.execute("ALTER TABLE incidents ADD COLUMN telegram_categories TEXT DEFAULT NULL") await db.commit() diff --git a/src/feeds/x_parser.py b/src/feeds/x_parser.py new file mode 100644 index 0000000..a48cd7d --- /dev/null +++ b/src/feeds/x_parser.py @@ -0,0 +1,320 @@ +"""X (Twitter) Parser: Liest Posts aus konfigurierten X-Accounts via twscrape. + +Egress laeuft -- wenn X_PROXY_URL gesetzt -- ueber den HTTP-Proxy am RUTX11 +(Mobilfunk-IP). Faellt der Proxy aus, wird direkt ueber die Server-IP +abgerufen (Fallback). Gibt Artikel-Dicts im RSS-/Telegram-kompatiblen Format +zurueck. +""" +import asyncio +import logging +import os +from datetime import datetime, timezone, timedelta + +import httpx + +from config import ( + TIMEZONE, X_ACCOUNTS_DB_PATH, X_PROXY_URL, + X_POST_CAP_PER_ACCOUNT, X_RECENCY_DAYS, X_SCRAPER_ENABLED, +) + +logger = logging.getLogger("osint.x") + +# Stoppwoerter (gleich wie RSS-/Telegram-Parser) +STOP_WORDS = { + "und", "oder", "der", "die", "das", "ein", "eine", "in", "im", "am", "an", + "auf", "fuer", "mit", "von", "zu", "zum", "zur", "bei", "nach", "vor", + "ueber", "unter", "ist", "sind", "hat", "the", "and", "for", "with", "from", +} + + +def _normalize_handle(raw: str) -> str: + """X-Handle aus URL-/@-Form auf den nackten Benutzernamen normalisieren.""" + h = (raw or "").strip() + for prefix in ("https://", "http://"): + if h.startswith(prefix): + h = h[len(prefix):] + for prefix in ("www.", "x.com/", "twitter.com/", "nitter.net/"): + if h.startswith(prefix): + h = h[len(prefix):] + h = h.lstrip("@").strip("/") + # Pfad-/Query-Reste abschneiden (z.B. handle/status/123 oder handle?lang=de) + for sep in ("/", "?"): + if sep in h: + h = h.split(sep)[0] + return h + + +class XParser: + """Durchsucht konfigurierte X-Accounts nach relevanten Posts.""" + + async def _resolve_proxy(self) -> tuple[str | None, str | None]: + """Proxy-Strategie aufloesen. + + Returns (proxy_url, egress_ip): + - X_PROXY_URL leer -> (None, None): direkter Abruf ueber Server-IP. + - X_PROXY_URL gesetzt und erreichbar -> (proxy, egress_ip). + - X_PROXY_URL gesetzt aber tot -> (None, None): Fallback direkt + Warnung. + """ + if not X_PROXY_URL: + return None, None + try: + async with httpx.AsyncClient(proxy=X_PROXY_URL, timeout=8.0) as client: + resp = await client.get("https://api.ipify.org") + resp.raise_for_status() + egress_ip = resp.text.strip() + logger.info("X-Egress ueber Proxy %s aktiv (IP: %s)", X_PROXY_URL, egress_ip) + return X_PROXY_URL, egress_ip + except Exception as e: + logger.warning( + "X-Proxy %s nicht erreichbar (%s) -- Fallback auf direkte Server-IP", + X_PROXY_URL, e, + ) + return None, None + + async def _get_api(self, proxy: str | None): + """twscrape-API-Objekt erstellen. + + Gibt None zurueck wenn der Account-Store fehlt oder keine + nutzbaren Accounts vorhanden sind. + """ + if not os.path.exists(X_ACCOUNTS_DB_PATH): + logger.error("X-Account-Store nicht gefunden: %s", X_ACCOUNTS_DB_PATH) + return None + try: + from twscrape import API + except ImportError: + logger.error("twscrape nicht installiert: pip install twscrape") + return None + try: + api = API(X_ACCOUNTS_DB_PATH, proxy=proxy) + # Account-Pool pruefen -- ohne aktive Accounts liefert twscrape nichts + try: + accounts = await api.pool.get_all() + active = [a for a in accounts if getattr(a, "active", True)] + if not accounts: + logger.error("X-Account-Pool leer -- keine Accounts konfiguriert") + return None + if not active: + logger.error( + "X-Account-Pool: alle %d Accounts inaktiv/gesperrt", len(accounts) + ) + return None + logger.info("X-Account-Pool: %d/%d Accounts aktiv", len(active), len(accounts)) + except Exception as e: + # Pool-Status nicht ermittelbar -- trotzdem weiterversuchen + logger.debug("X-Account-Pool-Status nicht ermittelbar: %s", e) + return api + except Exception as e: + logger.error("X-API-Initialisierung fehlgeschlagen: %s", e) + return None + + async def search_accounts(self, search_term: str, tenant_id: int = None, + keywords: dict | list = None, + account_ids: list[int] = None) -> list[dict]: + """Liest Posts aus konfigurierten X-Accounts. + + Args: + keywords: Sprach-Dict {iso_lang: [keyword,...]} oder flache Liste. + Match nutzt pro Account die "en"-Universalbegriffe + die + Keywords der Account-Sprache (primary_language aus sources). + + Gibt Artikel-Dicts zurueck (kompatibel mit RSS-/Telegram-Format). + """ + if not X_SCRAPER_ENABLED: + logger.info("X-Scraper deaktiviert (X_SCRAPER_ENABLED=false)") + return [] + + from agents.researcher import keywords_for_language + + accounts = await self._get_x_accounts(tenant_id, account_ids=account_ids) + if not accounts: + logger.info("Keine X-Accounts konfiguriert") + return [] + + proxy, _egress_ip = await self._resolve_proxy() + api = await self._get_api(proxy) + if not api: + logger.warning("X-API nicht verfuegbar, ueberspringe X-Pipeline") + return [] + + # Fallback-Suchwoerter wenn keine Keywords da sind + fallback_words: list[str] | None = None + if not keywords: + fallback_words = [ + w for w in search_term.lower().split() + if w not in STOP_WORDS and len(w) >= 3 + ] + if not fallback_words: + fallback_words = search_term.lower().split()[:2] + + cutoff = datetime.now(timezone.utc) - timedelta(days=X_RECENCY_DAYS) + + # Accounts parallel abrufen + tasks = [] + for acc in accounts: + handle = _normalize_handle(acc["url"] or acc["name"]) + acc_lang = acc.get("primary_language") + if keywords: + search_words = [w.lower() for w in keywords_for_language(keywords, acc_lang)] + else: + search_words = fallback_words or [] + tasks.append(self._fetch_account(api, handle, search_words, cutoff, acc_lang)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + all_articles = [] + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.warning("X-Account %s: %s", accounts[i]["name"], result) + continue + all_articles.extend(result) + + logger.info("X: %d relevante Posts aus %d Accounts", len(all_articles), len(accounts)) + return all_articles + + async def _get_x_accounts(self, tenant_id: int = None, + account_ids: list[int] = None) -> list[dict]: + """Laedt X-Accounts aus der sources-Tabelle.""" + try: + from database import get_db + db = await get_db() + try: + if account_ids and len(account_ids) > 0: + placeholders = ",".join("?" for _ in account_ids) + cursor = await db.execute( + f"""SELECT id, name, url, category, notes, primary_language FROM sources + WHERE source_type = 'x_account' + AND status = 'active' + AND id IN ({placeholders})""", + tuple(account_ids), + ) + else: + cursor = await db.execute( + """SELECT id, name, url, category, notes, primary_language FROM sources + WHERE source_type = 'x_account' + AND status = 'active' + AND (tenant_id IS NULL OR tenant_id = ?)""", + (tenant_id,), + ) + rows = await cursor.fetchall() + return [dict(row) for row in rows] + finally: + await db.close() + except Exception as e: + logger.error("Fehler beim Laden der X-Accounts: %s", e) + return [] + + async def _fetch_account(self, api, handle: str, search_words: list[str], + cutoff: datetime, account_lang: str | None = None) -> list[dict]: + """Letzte Posts eines X-Accounts abrufen und nach Keywords filtern.""" + from twscrape import gather + + articles: list[dict] = [] + if not handle: + return articles + try: + user = await api.user_by_login(handle) + if not user: + logger.warning("X-Account @%s nicht gefunden", handle) + return articles + + tweets = await gather(api.user_tweets(user.id, limit=X_POST_CAP_PER_ACCOUNT)) + + for tw in tweets: + # Reine Retweets ueberspringen (Original wird ohnehin erfasst) + if getattr(tw, "retweetedTweet", None) is not None: + continue + + text = getattr(tw, "rawContent", None) or "" + # Quote-Tweet: zitierten Text anhaengen, damit Kontext erhalten bleibt + quoted = getattr(tw, "quotedTweet", None) + if quoted is not None: + q_text = getattr(quoted, "rawContent", "") or "" + if q_text: + text = "%s\n\n[Zitiert] %s" % (text, q_text) + if not text.strip(): + continue + + # Recency-Fenster + tw_date = getattr(tw, "date", None) + if tw_date is not None: + try: + if tw_date < cutoff: + continue + except TypeError: + pass + + # Keyword-Matching (lockerer als RSS: 1 Match reicht, + # da Accounts bereits thematisch vorselektiert sind) + text_lower = text.lower() + match_count = sum(1 for w in search_words if w in text_lower) + if search_words and match_count < 1: + continue + + lines = text.strip().split("\n") + headline = (lines[0][:200] if lines else text[:200]).strip() + + published = None + if tw_date is not None: + try: + published = tw_date.astimezone(TIMEZONE).isoformat() + except Exception: + published = tw_date.isoformat() + + source_url = getattr(tw, "url", None) or \ + "https://x.com/%s/status/%s" % (handle, getattr(tw, "id", "")) + tw_lang = getattr(tw, "lang", None) + language = account_lang \ + or (tw_lang if tw_lang and tw_lang != "und" else None) \ + or ("de" if self._is_german(text) else "en") + relevance_score = (match_count / len(search_words)) if search_words else 0.0 + + articles.append({ + "headline": headline, + "headline_de": headline if self._is_german(headline) else None, + "source": "X: @%s" % handle, + "source_url": source_url, + "content_original": text[:2000], + "content_de": text[:2000] if self._is_german(text) else None, + "language": language, + "published_at": published, + "relevance_score": relevance_score, + }) + + except Exception as e: + logger.warning("X-Account @%s: %s", handle, e) + + return articles + + async def validate_account(self, handle: str) -> dict | None: + """Prueft ob ein X-Account erreichbar ist und gibt Account-Info zurueck.""" + handle = _normalize_handle(handle) + if not handle: + return None + proxy, _ = await self._resolve_proxy() + api = await self._get_api(proxy) + if not api: + return None + try: + user = await api.user_by_login(handle) + if not user: + return None + return { + "valid": True, + "name": getattr(user, "displayname", None) or handle, + "username": getattr(user, "username", handle), + "description": getattr(user, "rawDescription", "") or "", + "subscribers": getattr(user, "followersCount", None), + } + except Exception as e: + logger.warning("X-Account-Validierung fehlgeschlagen fuer @%s: %s", handle, e) + return None + + def _is_german(self, text: str) -> bool: + """Einfache Heuristik ob ein Text deutsch ist.""" + german_words = {"der", "die", "das", "und", "ist", "von", "mit", "fuer", "auf", "ein", + "eine", "den", "dem", "des", "sich", "wird", "nach", "bei", "auch", + "ueber", "wie", "aus", "hat", "zum", "zur", "als", "noch", "mehr", + "nicht", "aber", "oder", "sind", "vor", "einem", "einer", "wurde"} + words = set(text.lower().split()) + return len(words & german_words) >= 2 diff --git a/src/models.py b/src/models.py index b6486fd..b2cd1c3 100644 --- a/src/models.py +++ b/src/models.py @@ -57,6 +57,7 @@ class IncidentCreate(BaseModel): retention_days: int = Field(default=0, ge=0, le=999) international_sources: bool = False include_telegram: bool = False + include_x: bool = False visibility: str = Field(default="public", pattern="^(public|private)$") @@ -71,6 +72,7 @@ class IncidentUpdate(BaseModel): retention_days: Optional[int] = Field(default=None, ge=0, le=999) international_sources: Optional[bool] = None include_telegram: Optional[bool] = None + include_x: Optional[bool] = None visibility: Optional[str] = Field(default=None, pattern="^(public|private)$") @@ -102,6 +104,7 @@ class IncidentResponse(BaseModel): public_mood_updated_at: Optional[str] = None international_sources: bool = True include_telegram: bool = False + include_x: bool = False created_by: int created_by_username: str = "" created_at: str @@ -130,6 +133,7 @@ class IncidentListItem(BaseModel): visibility: str = "public" international_sources: bool = True include_telegram: bool = False + include_x: bool = False created_by: int created_by_username: str = "" created_at: str diff --git a/src/routers/incidents.py b/src/routers/incidents.py index 4f63220..9debb69 100644 --- a/src/routers/incidents.py +++ b/src/routers/incidents.py @@ -21,7 +21,7 @@ router = APIRouter(prefix="/api/incidents", tags=["incidents"]) INCIDENT_UPDATE_COLUMNS = { "title", "description", "type", "status", "refresh_mode", - "refresh_interval", "refresh_start_time", "retention_days", "international_sources", "include_telegram", "visibility", + "refresh_interval", "refresh_start_time", "retention_days", "international_sources", "include_telegram", "include_x", "visibility", } @@ -89,7 +89,7 @@ async def list_incidents( query = ( "SELECT id, title, description, type, status, refresh_mode, refresh_interval, " "refresh_start_time, retention_days, visibility, " - "international_sources, include_telegram, created_by, created_at, updated_at, " + "international_sources, include_telegram, include_x, created_by, created_at, updated_at, " "CASE WHEN summary IS NOT NULL AND summary != '' THEN 1 ELSE 0 END AS has_summary " "FROM incidents WHERE tenant_id = ? AND (visibility = 'public' OR created_by = ?)" ) @@ -120,9 +120,9 @@ async def create_incident( now = datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S') cursor = await db.execute( """INSERT INTO incidents (title, description, type, refresh_mode, refresh_interval, - refresh_start_time, retention_days, international_sources, include_telegram, visibility, + refresh_start_time, retention_days, international_sources, include_telegram, include_x, visibility, tenant_id, created_by, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( data.title, data.description, @@ -133,6 +133,7 @@ async def create_incident( data.retention_days, 1 if data.international_sources else 0, 1 if data.include_telegram else 0, + 1 if data.include_x else 0, data.visibility, tenant_id, current_user["id"], @@ -385,7 +386,7 @@ async def update_incident( for field, value in data.model_dump(exclude_none=True).items(): if field not in INCIDENT_UPDATE_COLUMNS: continue - if field in ("international_sources", "include_telegram"): + if field in ("international_sources", "include_telegram", "include_x"): updates[field] = 1 if value else 0 else: updates[field] = value diff --git a/src/routers/sources.py b/src/routers/sources.py index b61d0a7..2413307 100644 --- a/src/routers/sources.py +++ b/src/routers/sources.py @@ -144,6 +144,7 @@ async def get_source_stats( "rss_feed": {"count": 0, "articles": 0}, "web_source": {"count": 0, "articles": 0}, "telegram_channel": {"count": 0, "articles": 0}, + "x_account": {"count": 0, "articles": 0}, "excluded": {"count": 0, "articles": 0}, } for row in rows: @@ -637,6 +638,30 @@ async def validate_telegram_channel( raise HTTPException(status_code=500, detail="Telegram-Validierung fehlgeschlagen") +@router.post("/x/validate") +async def validate_x_account( + data: dict, + current_user: dict = Depends(get_current_user), +): + """Prueft ob ein X-Account (Twitter) erreichbar ist und gibt Account-Info zurueck.""" + handle = data.get("handle", "").strip() + if not handle: + raise HTTPException(status_code=400, detail="handle ist erforderlich") + + try: + from feeds.x_parser import XParser + parser = XParser() + result = await parser.validate_account(handle) + if result: + return result + raise HTTPException(status_code=404, detail="X-Account nicht erreichbar oder nicht gefunden") + except HTTPException: + raise + except Exception as e: + logger.error("X-Validierung fehlgeschlagen: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail="X-Validierung fehlgeschlagen") + + @router.post("/refresh-counts") async def trigger_refresh_counts( current_user: dict = Depends(get_current_user), diff --git a/src/source_rules.py b/src/source_rules.py index 1b5aaed..eeddab2 100644 --- a/src/source_rules.py +++ b/src/source_rules.py @@ -86,6 +86,9 @@ DOMAIN_CATEGORY_MAP = { "merkur.de": "regional", # Telegram "t.me": "telegram", + # X / Twitter + "x.com": "x", + "twitter.com": "x", } # Bekannte Feed-Pfade zum Durchprobieren diff --git a/src/static/dashboard.html b/src/static/dashboard.html index 7d90dc3..6f738f5 100644 --- a/src/static/dashboard.html +++ b/src/static/dashboard.html @@ -392,6 +392,13 @@ Telegram-Kanäle einbeziehen + +
+
@@ -484,6 +491,7 @@ + @@ -623,6 +631,7 @@ +
diff --git a/src/static/js/app.js b/src/static/js/app.js index e19e7cd..d97b82a 100644 --- a/src/static/js/app.js +++ b/src/static/js/app.js @@ -1831,6 +1831,7 @@ const App = { retention_days: parseInt(document.getElementById('inc-retention').value) || 0, international_sources: document.getElementById('inc-international').checked, include_telegram: document.getElementById('inc-telegram').checked, + include_x: document.getElementById('inc-x').checked, visibility: document.getElementById('inc-visibility').checked ? 'public' : 'private', }; }, @@ -2266,6 +2267,7 @@ async handleRefresh() { { const _e = document.getElementById('inc-retention'); if (_e) _e.value = incident.retention_days; } { const _e = document.getElementById('inc-international'); if (_e) _e.checked = incident.international_sources !== false && incident.international_sources !== 0; } { const _e = document.getElementById('inc-telegram'); if (_e) _e.checked = !!incident.include_telegram; } + { const _e = document.getElementById('inc-x'); if (_e) _e.checked = !!incident.include_x; } { const _e = document.getElementById('inc-visibility'); if (_e) _e.checked = incident.visibility !== 'private'; } updateVisibilityHint(); @@ -2795,12 +2797,14 @@ async handleRefresh() { const rss = stats.by_type.rss_feed || { count: 0, articles: 0 }; const web = stats.by_type.web_source || { count: 0, articles: 0 }; const tg = stats.by_type.telegram_channel || { count: 0, articles: 0 }; + const x = stats.by_type.x_account || { count: 0, articles: 0 }; const excluded = this._myExclusions.length; bar.innerHTML = ` ${rss.count} ${(typeof T === 'function' ? T('sources_modal.stats.rss', 'RSS-Feeds') : 'RSS-Feeds')} ${web.count} ${(typeof T === 'function' ? T('sources_modal.stats.web', 'Web-Quellen') : 'Web-Quellen')} ${tg.count} Telegram + ${x.count} X ${excluded} ${(typeof T === 'function' ? T('sources_modal.stats.excluded', 'Ausgeschlossen') : 'Ausgeschlossen')} ${stats.total_articles} Artikel gesamt `; @@ -3246,6 +3250,31 @@ async handleRefresh() { if (saveBtn) { saveBtn.disabled = false; saveBtn.textContent = 'Speichern'; } return; } + + // X (Twitter)-URLs direkt behandeln (kein Discovery noetig) + if (urlVal.match(/^(https?:\/\/)?(x\.com|twitter\.com)\//i)) { + const handle = urlVal + .replace(/^(https?:\/\/)?(x\.com|twitter\.com)\//i, '') + .replace(/\/$/, '') + .split(/[/?]/)[0] + .replace(/^@/, ''); + const xUrl = 'x.com/' + handle; + this._discoveredData = { + name: '@' + handle, + domain: xUrl, + source_type: 'x_account', + rss_url: null, + }; + document.getElementById('src-name').value = '@' + handle; + document.getElementById('src-type-select').value = 'x_account'; + document.getElementById('src-type-display').value = 'X (Twitter)'; + document.getElementById('src-domain').value = xUrl; + document.getElementById('src-rss-url-group').style.display = 'none'; + document.getElementById('src-discovery-result').style.display = 'block'; + const saveBtnX = document.querySelector('#src-discovery-result .sources-discovery-actions .btn-primary'); + if (saveBtnX) { saveBtnX.disabled = false; saveBtnX.textContent = 'Speichern'; } + return; + } const url = urlInput.value.trim(); if (!url) { UI.showToast('Bitte URL oder Domain eingeben.', 'warning'); @@ -3365,7 +3394,7 @@ async handleRefresh() { document.getElementById('src-notes').value = source.notes || ''; document.getElementById('src-domain').value = source.domain || ''; - const typeLabel = source.source_type === 'rss_feed' ? 'RSS-Feed' : source.source_type === 'telegram_channel' ? 'Telegram' : 'Web-Quelle'; + const typeLabel = source.source_type === 'rss_feed' ? 'RSS-Feed' : source.source_type === 'telegram_channel' ? 'Telegram' : source.source_type === 'x_account' ? 'X (Twitter)' : 'Web-Quelle'; const typeSelect = document.getElementById('src-type-select'); if (typeSelect) typeSelect.value = source.source_type || 'web_source'; document.getElementById('src-type-display').value = typeLabel; @@ -3409,7 +3438,7 @@ async handleRefresh() { name, source_type: discovered.source_type || 'web_source', category: document.getElementById('src-category').value, - url: discovered.rss_url || (discovered.source_type === 'telegram_channel' ? (document.getElementById('src-domain').value || null) : null), + url: discovered.rss_url || ((discovered.source_type === 'telegram_channel' || discovered.source_type === 'x_account') ? (document.getElementById('src-domain').value || null) : null), domain: document.getElementById('src-domain').value.trim() || discovered.domain || null, notes: document.getElementById('src-notes').value.trim() || null, };