Damit die Pipeline das aktuelle Bild einfaengt, nicht nur das relevanteste (oft Monate alt). Bei der Test-Lage Qilin war der neueste Artikel 7 Wochen alt, die Masse 6-7 Monate — weil Google-News-Volltextsuche nach Relevanz rankt, nicht nach Datum. - build_news_search_feeds: neuer Parameter recency_days. Wenn gesetzt, wird der Google-News-Operator "when:Nd" an die Query gehaengt — der Feed liefert nur Artikel der letzten N Tage. Eigene Domain-Gruppe '...-recent'. - orchestrator._rss_pipeline: baut jetzt ZWEI Suchfeed-Saetze — einen Kontext-Feed (alle Zeiten) und einen Frische-Feed (when:14d). Beide laufen durch dieselbe Pipeline, Dedup entfernt Ueberschneidungen. - rss_parser._fetch_feed: relevance_score bekommt einen Aktualitaets-Bonus (<=3d +0.35, <=14d +0.20, <=60d +0.05) bzw. -Malus (>180d -0.15, >365d -0.30). Damit ueberleben frische Artikel den Domain-Cap statt von alten verdraengt zu werden. Nur adhoc-Pfad betroffen — research-Lagen ueberspringen die RSS-Pipeline ohnehin und behalten ihre volle historische Tiefe. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
350 Zeilen
17 KiB
Python
350 Zeilen
17 KiB
Python
"""RSS-Feed Parser: Durchsucht vorkonfigurierte Feeds nach relevanten Meldungen."""
|
|
import asyncio
|
|
import logging
|
|
import feedparser
|
|
import httpx
|
|
from datetime import datetime, timezone
|
|
from config import TIMEZONE, MAX_ARTICLES_PER_DOMAIN_RSS
|
|
from source_rules import _extract_domain
|
|
|
|
# Cap fuer dynamische Google-News-Suchfeeds — hoeher als der normale Domain-Cap,
|
|
# weil ein Suchfeed gezielt fuer breiten Recall gebaut wird. Topic-Filter
|
|
# entscheidet danach ueber die Precision.
|
|
MAX_ARTICLES_PER_DOMAIN_RSS_SEARCH = 25
|
|
from feeds.transcript_extractors._common import html_to_text
|
|
from services.post_refresh_qc import normalize_german_umlauts
|
|
from agents.researcher import keywords_for_language, flatten_keywords
|
|
|
|
logger = logging.getLogger("osint.rss")
|
|
|
|
|
|
def _is_specific_word(w: str) -> bool:
|
|
"""Spezifisches Keyword = 1-Treffer reicht für Match.
|
|
|
|
- Lateinisch: ab 7 Zeichen (alte Heuristik).
|
|
- Nicht-ASCII (CJK, Arabisch, Hebräisch, Kyrillisch etc.): ab 3 Zeichen.
|
|
Beispiel: '自衛隊' (3 Kanji) oder 'путин' (5 Kyrillisch) sind spezifisch genug.
|
|
"""
|
|
if not w:
|
|
return False
|
|
if any(ord(c) > 127 for c in w):
|
|
return len(w) >= 3
|
|
return len(w) >= 7
|
|
|
|
|
|
class RSSParser:
|
|
"""Durchsucht RSS-Feeds nach relevanten Artikeln."""
|
|
|
|
# Stoppwörter die bei der RSS-Suche ignoriert werden
|
|
STOP_WORDS = {
|
|
"und", "oder", "der", "die", "das", "ein", "eine", "in", "im", "am", "an",
|
|
"auf", "für", "mit", "von", "zu", "zum", "zur", "bei", "nach", "vor",
|
|
"über", "unter", "ist", "sind", "hat", "the", "and", "for", "with", "from",
|
|
}
|
|
|
|
@staticmethod
|
|
def _clean_search_words(words: list[str]) -> list[str]:
|
|
"""Entfernt rein-numerische Wörter (Jahreszahlen etc.) aus Suchbegriffen."""
|
|
cleaned = [w for w in words if not w.isdigit()]
|
|
return cleaned if cleaned else words
|
|
|
|
def _fallback_search_words(self, search_term: str) -> list[str]:
|
|
words = [
|
|
w for w in search_term.lower().split()
|
|
if w not in self.STOP_WORDS and len(w) >= 3
|
|
]
|
|
if not words:
|
|
words = search_term.lower().split()[:2]
|
|
return self._clean_search_words(words)
|
|
|
|
async def search_feeds(self, search_term: str, international: bool = True, tenant_id: int = None, keywords: dict | list | None = None, user_id: int = None) -> list[dict]:
|
|
"""Durchsucht RSS-Feeds nach einem Suchbegriff.
|
|
|
|
Args:
|
|
search_term: Suchbegriff
|
|
international: Wenn False, nur Feeds in der Org-Sprache + Behoerden (keine internationalen)
|
|
tenant_id: Optionale Org-ID fuer tenant-spezifische Quellen
|
|
keywords: Sprach-Dict {iso_lang: [keyword, ...]} oder flache Liste (Backward).
|
|
"""
|
|
all_articles = []
|
|
if keywords:
|
|
logger.info(f"RSS-Suche mit Claude-Keywords (Sprachen): "
|
|
f"{ {k: len(v) for k, v in keywords.items()} if isinstance(keywords, dict) else len(keywords) }")
|
|
fallback_words = None
|
|
else:
|
|
fallback_words = self._fallback_search_words(search_term)
|
|
|
|
rss_feeds = await self._get_rss_feeds(tenant_id=tenant_id)
|
|
|
|
# User-spezifische Ausschluesse anwenden
|
|
if user_id:
|
|
try:
|
|
from source_rules import get_user_excluded_domains
|
|
user_excluded = await get_user_excluded_domains(user_id)
|
|
if user_excluded:
|
|
for cat in rss_feeds:
|
|
rss_feeds[cat] = [f for f in rss_feeds[cat]
|
|
if not any(excl in (f.get("url", "") + f.get("name", "")).lower()
|
|
for excl in user_excluded)]
|
|
except Exception as e:
|
|
logger.warning(f"User-Ausschluesse konnten nicht geladen werden: {e}")
|
|
|
|
# Feed-Kategorien filtern
|
|
if international:
|
|
categories = rss_feeds.keys()
|
|
else:
|
|
categories = [c for c in rss_feeds.keys() if c != "international"]
|
|
|
|
tasks = []
|
|
for category in categories:
|
|
for feed_config in rss_feeds.get(category, []):
|
|
feed_lang = feed_config.get("primary_language")
|
|
if keywords:
|
|
words = keywords_for_language(keywords, feed_lang)
|
|
words = [w.lower() for w in words]
|
|
else:
|
|
words = fallback_words
|
|
tasks.append(self._fetch_feed(feed_config, words))
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
logger.warning(f"Feed-Fehler: {result}")
|
|
continue
|
|
all_articles.extend(result)
|
|
|
|
cat_info = "alle" if international else "nur primary + behörden"
|
|
logger.info(f"RSS-Suche nach '{search_term}' ({cat_info}): {len(all_articles)} Treffer")
|
|
all_articles = self._apply_domain_cap(all_articles)
|
|
return all_articles
|
|
|
|
async def search_feeds_selective(self, search_term: str, selected_feeds: list[dict], keywords: dict | list | None = None) -> list[dict]:
|
|
"""Durchsucht nur die übergebenen Feeds (vorselektiert durch Claude).
|
|
|
|
Args:
|
|
search_term: Suchbegriff
|
|
selected_feeds: Liste von Feed-Dicts mit mindestens {"name", "url"} und idealerweise "primary_language"
|
|
keywords: Sprach-Dict {iso_lang: [keyword, ...]} oder flache Liste (Backward).
|
|
"""
|
|
all_articles = []
|
|
if keywords:
|
|
if isinstance(keywords, dict):
|
|
logger.info(f"RSS-Selektiv mit Claude-Keywords (Sprachen): "
|
|
f"{ {k: len(v) for k, v in keywords.items()} }")
|
|
else:
|
|
logger.info(f"RSS-Selektiv mit Claude-Keywords (flach): {keywords}")
|
|
fallback_words = None
|
|
else:
|
|
fallback_words = self._fallback_search_words(search_term)
|
|
|
|
tasks = []
|
|
for feed_config in selected_feeds:
|
|
feed_lang = feed_config.get("primary_language")
|
|
if keywords:
|
|
words = keywords_for_language(keywords, feed_lang)
|
|
words = [w.lower() for w in words]
|
|
else:
|
|
words = fallback_words
|
|
tasks.append(self._fetch_feed(feed_config, words))
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
logger.warning(f"Feed-Fehler: {result}")
|
|
continue
|
|
all_articles.extend(result)
|
|
|
|
logger.info(f"RSS-Selektiv nach '{search_term}': {len(all_articles)} Treffer aus {len(selected_feeds)} Feeds")
|
|
all_articles = self._apply_domain_cap(all_articles)
|
|
return all_articles
|
|
|
|
async def _get_rss_feeds(self, tenant_id: int = None) -> dict:
|
|
"""Laedt RSS-Feeds aus der Datenbank (global + org-spezifisch)."""
|
|
try:
|
|
from source_rules import get_source_rules
|
|
rules = await get_source_rules(tenant_id=tenant_id)
|
|
return rules.get("rss_feeds", {})
|
|
except Exception as e:
|
|
logger.warning(f"Fallback auf config.py fuer RSS-Feeds: {e}")
|
|
from config import RSS_FEEDS
|
|
return dict(RSS_FEEDS)
|
|
|
|
async def _fetch_feed(self, feed_config: dict, search_words: list[str]) -> list[dict]:
|
|
"""Einzelnen RSS-Feed abrufen und durchsuchen."""
|
|
name = feed_config["name"]
|
|
url = feed_config["url"]
|
|
articles = []
|
|
# Google-News-Feeds (Site-Search ODER Volltext-Suche) buendeln Artikel
|
|
# vieler echter Publisher. Pro Item steht der echte Publisher im
|
|
# <source>-Tag — den nutzen wir als source-Name, sonst zaehlt der
|
|
# Faktencheck 25 Artikel als "eine Quelle".
|
|
_is_google_news = "news.google.com" in (url or "")
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
|
|
response = await client.get(url, headers={
|
|
"User-Agent": "OSINT-Monitor/1.0 (News Aggregator)"
|
|
})
|
|
response.raise_for_status()
|
|
|
|
feed = await asyncio.to_thread(feedparser.parse, response.text)
|
|
|
|
for entry in feed.entries[:50]:
|
|
title = entry.get("title", "")
|
|
# RSS-summary ist bei vielen Quellen HTML (Guardian, AP, SZ, ...).
|
|
# Vor weiterer Verwendung strippen, sonst landet HTML in DB
|
|
# und KI-Agenten und Sprach-Heuristik werden gestoert.
|
|
summary_raw = entry.get("summary", "")
|
|
summary = html_to_text(summary_raw) if summary_raw else ""
|
|
# ASCII-Umlaut-Normalisierung (z.B. dpa-AFX schreibt "Gespraeche").
|
|
# Dictionary-basiert, sicher gegen englische Woerter wie "Boeing".
|
|
title, _ = normalize_german_umlauts(title)
|
|
summary, _ = normalize_german_umlauts(summary)
|
|
text = f"{title} {summary}".lower()
|
|
|
|
# Adaptive Match-Schwelle:
|
|
# - Bei mindestens einem spezifischen Keyword (Latin ≥7 Zeichen oder
|
|
# CJK/Arabisch/Hebräisch/Kyrillisch ≥3 Zeichen) im Text reicht 1 Treffer.
|
|
# Damit matched z.B. "自衛隊" (3 Kanji) wie "buckelwal" (9 Zeichen).
|
|
# - Sonst: alte Heuristik (mindestens halb der Wörter, max. 2).
|
|
specific_in_text = any(w in text for w in search_words if _is_specific_word(w))
|
|
if specific_in_text:
|
|
min_matches = 1
|
|
else:
|
|
min_matches = min(2, max(1, (len(search_words) + 1) // 2))
|
|
match_count = sum(1 for word in search_words if word in text)
|
|
|
|
if match_count >= min_matches:
|
|
published = None
|
|
published_dt = None
|
|
if hasattr(entry, "published_parsed") and entry.published_parsed:
|
|
try:
|
|
published_dt = datetime(*entry.published_parsed[:6], tzinfo=timezone.utc)
|
|
published = published_dt.astimezone(TIMEZONE).isoformat()
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
# Relevanz-Score: Anteil der gematchten Suchworte (0.0-1.0)
|
|
relevance_score = match_count / len(search_words) if search_words else 0.0
|
|
# Aktualitaets-Bonus/Malus: frische Artikel sollen den
|
|
# Domain-Cap (sortiert nach relevance_score) ueberleben und
|
|
# nicht von Monate alten verdraengt werden. Damit faengt die
|
|
# Pipeline das aktuelle Bild ein. Nur adhoc-Pfad — research
|
|
# nutzt diesen Code nicht.
|
|
if published_dt is not None:
|
|
age_days = (datetime.now(timezone.utc) - published_dt).days
|
|
if age_days <= 3:
|
|
relevance_score += 0.35
|
|
elif age_days <= 14:
|
|
relevance_score += 0.20
|
|
elif age_days <= 60:
|
|
relevance_score += 0.05
|
|
elif age_days > 365:
|
|
relevance_score -= 0.30
|
|
elif age_days > 180:
|
|
relevance_score -= 0.15
|
|
|
|
# Bei Google-News-Feeds: echten Publisher aus <source>-Tag holen
|
|
article_source = name
|
|
if _is_google_news:
|
|
src_obj = entry.get("source")
|
|
src_title = ""
|
|
if isinstance(src_obj, dict):
|
|
src_title = (src_obj.get("title") or "").strip()
|
|
elif src_obj:
|
|
src_title = str(getattr(src_obj, "title", "") or "").strip()
|
|
if src_title:
|
|
article_source = src_title
|
|
else:
|
|
# Google-News-Titel enden oft mit " - Publishername"
|
|
if " - " in title:
|
|
article_source = title.rsplit(" - ", 1)[-1].strip() or name
|
|
|
|
articles.append({
|
|
"headline": title,
|
|
"headline_de": title if self._is_german(title) else None,
|
|
"source": article_source,
|
|
"source_url": entry.get("link", ""),
|
|
# Die Quell-Domain aus der DB (z.B. "mod.go.jp"), nicht aus
|
|
# der URL — relevant für Google-News-RSS-Quellen, deren URLs
|
|
# alle "news.google.com" sind, obwohl sie für 14 verschiedene
|
|
# Behörden/Zeitungen stehen. Wird vom Domain-Cap genutzt.
|
|
"source_domain": feed_config.get("domain") or "",
|
|
# media_type aus dem Feed-Eintrag (z.B. "forum" fuer 5ch/Hatena/Note)
|
|
# damit downstream Pipeline-Schritte (Faktencheck, Geoparsing,
|
|
# Topic-Filter, Stimmungs-Kachel) Foren-Quellen erkennen koennen.
|
|
"media_type": feed_config.get("media_type") or "",
|
|
"content_original": summary[:1000] if summary else None,
|
|
"content_de": summary[:1000] if summary and self._is_german(summary) else None,
|
|
# Sprache primär aus der Quell-Konfiguration übernehmen
|
|
# (z.B. "ja" für Asahi Shimbun, "ru" für TASS). Nur wenn
|
|
# die Quelle kein primary_language gesetzt hat, auf die
|
|
# alte de/en-Heuristik zurückfallen. Sonst landen
|
|
# CJK/kyrillische Headlines fälschlich als language="en"
|
|
# und verlieren Pre-Topic-Übersetzung + Translator-Pfad.
|
|
"language": feed_config.get("primary_language") or ("de" if self._is_german(title) else "en"),
|
|
"published_at": published,
|
|
"relevance_score": relevance_score,
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Feed {name} ({url}): {e}")
|
|
|
|
return articles
|
|
|
|
def _apply_domain_cap(self, articles: list[dict]) -> list[dict]:
|
|
"""Begrenzt die Anzahl der Artikel pro Domain auf MAX_ARTICLES_PER_DOMAIN_RSS.
|
|
|
|
Gruppiert nach Domain, sortiert pro Domain nach relevance_score (beste zuerst),
|
|
behält nur die Top-N pro Domain.
|
|
"""
|
|
if not articles:
|
|
return articles
|
|
|
|
# Nach Domain gruppieren. Bevorzugt source_domain (aus dem Feed-Eintrag,
|
|
# z.B. "mod.go.jp" bei einer Google-News-Site-Search-RSS-Quelle), fällt
|
|
# erst dann auf die URL-Domain zurück. Sonst landen alle Google-News-
|
|
# Feeds (14 ja-Quellen) im selben "news.google.com"-Topf und werden
|
|
# vom Cap auf 10 begrenzt.
|
|
by_domain: dict[str, list[dict]] = {}
|
|
for article in articles:
|
|
domain = (article.get("source_domain") or "").strip().lower()
|
|
if not domain:
|
|
domain = _extract_domain(article.get("source_url", ""))
|
|
if not domain:
|
|
domain = "__unknown__"
|
|
by_domain.setdefault(domain, []).append(article)
|
|
|
|
capped = []
|
|
for domain, domain_articles in by_domain.items():
|
|
# Nach Relevanz sortieren (beste zuerst)
|
|
domain_articles.sort(key=lambda a: a.get("relevance_score", 0), reverse=True)
|
|
# Dynamische Google-News-Suchfeeds ("google-news-search-<lang>") sind
|
|
# der Recall-Treiber und bekommen einen hoeheren Cap als feste Feeds.
|
|
cap = (MAX_ARTICLES_PER_DOMAIN_RSS_SEARCH
|
|
if domain.startswith("google-news-search-")
|
|
else MAX_ARTICLES_PER_DOMAIN_RSS)
|
|
kept = domain_articles[:cap]
|
|
if len(domain_articles) > cap:
|
|
logger.info(
|
|
f"Domain-Cap: {domain} von {len(domain_articles)} auf {cap} Artikel begrenzt"
|
|
)
|
|
capped.extend(kept)
|
|
|
|
if len(capped) < len(articles):
|
|
logger.info(f"Domain-Cap gesamt: {len(articles)} → {len(capped)} Artikel")
|
|
|
|
return capped
|
|
|
|
def _is_german(self, text: str) -> bool:
|
|
"""Einfache Heuristik ob ein Text deutsch ist."""
|
|
german_words = {"der", "die", "das", "und", "ist", "von", "mit", "für", "auf", "ein",
|
|
"eine", "den", "dem", "des", "sich", "wird", "nach", "bei", "auch",
|
|
"über", "wie", "aus", "hat", "zum", "zur", "als", "noch", "mehr",
|
|
"nicht", "aber", "oder", "sind", "vor", "einem", "einer", "wurde"}
|
|
words = set(text.lower().split())
|
|
matches = words & german_words
|
|
return len(matches) >= 2
|