"""X (Twitter) Parser: Liest Posts aus konfigurierten X-Accounts via twscrape. Egress laeuft -- wenn X_PROXY_URL gesetzt -- ueber den HTTP-Proxy am RUTX11 (Mobilfunk-IP). Faellt der Proxy aus, wird direkt ueber die Server-IP abgerufen (Fallback). Gibt Artikel-Dicts im RSS-/Telegram-kompatiblen Format zurueck. """ import asyncio import logging import os from datetime import datetime, timezone, timedelta import httpx from config import ( TIMEZONE, X_ACCOUNTS_DB_PATH, X_PROXY_URL, X_POST_CAP_PER_ACCOUNT, X_RECENCY_DAYS, X_SCRAPER_ENABLED, ) logger = logging.getLogger("osint.x") # Stoppwoerter (gleich wie RSS-/Telegram-Parser) STOP_WORDS = { "und", "oder", "der", "die", "das", "ein", "eine", "in", "im", "am", "an", "auf", "fuer", "mit", "von", "zu", "zum", "zur", "bei", "nach", "vor", "ueber", "unter", "ist", "sind", "hat", "the", "and", "for", "with", "from", } def _normalize_handle(raw: str) -> str: """X-Handle aus URL-/@-Form auf den nackten Benutzernamen normalisieren.""" h = (raw or "").strip() for prefix in ("https://", "http://"): if h.startswith(prefix): h = h[len(prefix):] for prefix in ("www.", "x.com/", "twitter.com/", "nitter.net/"): if h.startswith(prefix): h = h[len(prefix):] h = h.lstrip("@").strip("/") # Pfad-/Query-Reste abschneiden (z.B. handle/status/123 oder handle?lang=de) for sep in ("/", "?"): if sep in h: h = h.split(sep)[0] return h class XParser: """Durchsucht konfigurierte X-Accounts nach relevanten Posts.""" async def _resolve_proxy(self) -> tuple[str | None, str | None]: """Proxy-Strategie aufloesen. Returns (proxy_url, egress_ip): - X_PROXY_URL leer -> (None, None): direkter Abruf ueber Server-IP. - X_PROXY_URL gesetzt und erreichbar -> (proxy, egress_ip). - X_PROXY_URL gesetzt aber tot -> (None, None): Fallback direkt + Warnung. """ if not X_PROXY_URL: return None, None try: async with httpx.AsyncClient(proxy=X_PROXY_URL, timeout=8.0) as client: resp = await client.get("https://api.ipify.org") resp.raise_for_status() egress_ip = resp.text.strip() logger.info("X-Egress ueber Proxy %s aktiv (IP: %s)", X_PROXY_URL, egress_ip) return X_PROXY_URL, egress_ip except Exception as e: logger.warning( "X-Proxy %s nicht erreichbar (%s) -- Fallback auf direkte Server-IP", X_PROXY_URL, e, ) return None, None async def _get_api(self, proxy: str | None): """twscrape-API-Objekt erstellen. Gibt None zurueck wenn der Account-Store fehlt oder keine nutzbaren Accounts vorhanden sind. """ if not os.path.exists(X_ACCOUNTS_DB_PATH): logger.error("X-Account-Store nicht gefunden: %s", X_ACCOUNTS_DB_PATH) return None try: from twscrape import API except ImportError: logger.error("twscrape nicht installiert: pip install twscrape") return None try: api = API(X_ACCOUNTS_DB_PATH, proxy=proxy) # Account-Pool pruefen -- ohne aktive Accounts liefert twscrape nichts try: accounts = await api.pool.get_all() active = [a for a in accounts if getattr(a, "active", True)] if not accounts: logger.error("X-Account-Pool leer -- keine Accounts konfiguriert") return None if not active: logger.error( "X-Account-Pool: alle %d Accounts inaktiv/gesperrt", len(accounts) ) return None logger.info("X-Account-Pool: %d/%d Accounts aktiv", len(active), len(accounts)) except Exception as e: # Pool-Status nicht ermittelbar -- trotzdem weiterversuchen logger.debug("X-Account-Pool-Status nicht ermittelbar: %s", e) return api except Exception as e: logger.error("X-API-Initialisierung fehlgeschlagen: %s", e) return None async def search_accounts(self, search_term: str, tenant_id: int = None, keywords: dict | list = None, account_ids: list[int] = None) -> list[dict]: """Liest Posts aus konfigurierten X-Accounts. Args: keywords: Sprach-Dict {iso_lang: [keyword,...]} oder flache Liste. Match nutzt pro Account die "en"-Universalbegriffe + die Keywords der Account-Sprache (primary_language aus sources). Gibt Artikel-Dicts zurueck (kompatibel mit RSS-/Telegram-Format). """ if not X_SCRAPER_ENABLED: logger.info("X-Scraper deaktiviert (X_SCRAPER_ENABLED=false)") return [] from agents.researcher import keywords_for_language accounts = await self._get_x_accounts(tenant_id, account_ids=account_ids) if not accounts: logger.info("Keine X-Accounts konfiguriert") return [] proxy, _egress_ip = await self._resolve_proxy() api = await self._get_api(proxy) if not api: logger.warning("X-API nicht verfuegbar, ueberspringe X-Pipeline") return [] # Fallback-Suchwoerter wenn keine Keywords da sind fallback_words: list[str] | None = None if not keywords: fallback_words = [ w for w in search_term.lower().split() if w not in STOP_WORDS and len(w) >= 3 ] if not fallback_words: fallback_words = search_term.lower().split()[:2] cutoff = datetime.now(timezone.utc) - timedelta(days=X_RECENCY_DAYS) # Accounts parallel abrufen tasks = [] for acc in accounts: handle = _normalize_handle(acc["url"] or acc["name"]) acc_lang = acc.get("primary_language") if keywords: search_words = [w.lower() for w in keywords_for_language(keywords, acc_lang)] else: search_words = fallback_words or [] tasks.append(self._fetch_account(api, handle, search_words, cutoff, acc_lang)) results = await asyncio.gather(*tasks, return_exceptions=True) all_articles = [] for i, result in enumerate(results): if isinstance(result, Exception): logger.warning("X-Account %s: %s", accounts[i]["name"], result) continue all_articles.extend(result) logger.info("X: %d relevante Posts aus %d Accounts", len(all_articles), len(accounts)) return all_articles async def _get_x_accounts(self, tenant_id: int = None, account_ids: list[int] = None) -> list[dict]: """Laedt X-Accounts aus der sources-Tabelle.""" try: from database import get_db db = await get_db() try: if account_ids and len(account_ids) > 0: placeholders = ",".join("?" for _ in account_ids) cursor = await db.execute( f"""SELECT id, name, url, category, notes, primary_language FROM sources WHERE source_type = 'x_account' AND status = 'active' AND id IN ({placeholders})""", tuple(account_ids), ) else: cursor = await db.execute( """SELECT id, name, url, category, notes, primary_language FROM sources WHERE source_type = 'x_account' AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?)""", (tenant_id,), ) rows = await cursor.fetchall() return [dict(row) for row in rows] finally: await db.close() except Exception as e: logger.error("Fehler beim Laden der X-Accounts: %s", e) return [] async def _fetch_account(self, api, handle: str, search_words: list[str], cutoff: datetime, account_lang: str | None = None) -> list[dict]: """Letzte Posts eines X-Accounts abrufen und nach Keywords filtern.""" from twscrape import gather articles: list[dict] = [] if not handle: return articles try: user = await api.user_by_login(handle) if not user: logger.warning("X-Account @%s nicht gefunden", handle) return articles tweets = await gather(api.user_tweets(user.id, limit=X_POST_CAP_PER_ACCOUNT)) for tw in tweets: # Reine Retweets ueberspringen (Original wird ohnehin erfasst) if getattr(tw, "retweetedTweet", None) is not None: continue text = getattr(tw, "rawContent", None) or "" # Quote-Tweet: zitierten Text anhaengen, damit Kontext erhalten bleibt quoted = getattr(tw, "quotedTweet", None) if quoted is not None: q_text = getattr(quoted, "rawContent", "") or "" if q_text: text = "%s\n\n[Zitiert] %s" % (text, q_text) if not text.strip(): continue # Recency-Fenster tw_date = getattr(tw, "date", None) if tw_date is not None: try: if tw_date < cutoff: continue except TypeError: pass # Keyword-Matching (lockerer als RSS: 1 Match reicht, # da Accounts bereits thematisch vorselektiert sind) text_lower = text.lower() match_count = sum(1 for w in search_words if w in text_lower) if search_words and match_count < 1: continue lines = text.strip().split("\n") headline = (lines[0][:200] if lines else text[:200]).strip() published = None if tw_date is not None: try: published = tw_date.astimezone(TIMEZONE).isoformat() except Exception: published = tw_date.isoformat() source_url = getattr(tw, "url", None) or \ "https://x.com/%s/status/%s" % (handle, getattr(tw, "id", "")) tw_lang = getattr(tw, "lang", None) language = account_lang \ or (tw_lang if tw_lang and tw_lang != "und" else None) \ or ("de" if self._is_german(text) else "en") relevance_score = (match_count / len(search_words)) if search_words else 0.0 articles.append({ "headline": headline, "headline_de": headline if self._is_german(headline) else None, "source": "X: @%s" % handle, "source_url": source_url, "content_original": text[:2000], "content_de": text[:2000] if self._is_german(text) else None, "language": language, "published_at": published, "relevance_score": relevance_score, }) except Exception as e: logger.warning("X-Account @%s: %s", handle, e) return articles async def validate_account(self, handle: str) -> dict | None: """Prueft ob ein X-Account erreichbar ist und gibt Account-Info zurueck.""" handle = _normalize_handle(handle) if not handle: return None proxy, _ = await self._resolve_proxy() api = await self._get_api(proxy) if not api: return None try: user = await api.user_by_login(handle) if not user: return None return { "valid": True, "name": getattr(user, "displayname", None) or handle, "username": getattr(user, "username", handle), "description": getattr(user, "rawDescription", "") or "", "subscribers": getattr(user, "followersCount", None), } except Exception as e: logger.warning("X-Account-Validierung fehlgeschlagen fuer @%s: %s", handle, e) return None def _is_german(self, text: str) -> bool: """Einfache Heuristik ob ein Text deutsch ist.""" german_words = {"der", "die", "das", "und", "ist", "von", "mit", "fuer", "auf", "ein", "eine", "den", "dem", "des", "sich", "wird", "nach", "bei", "auch", "ueber", "wie", "aus", "hat", "zum", "zur", "als", "noch", "mehr", "nicht", "aber", "oder", "sind", "vor", "einem", "einer", "wurde"} words = set(text.lower().split()) return len(words & german_words) >= 2