Dateien
AegisSight-Monitor/src/source_rules.py

681 Zeilen
24 KiB
Python

"""Dynamische Quellen-Regeln aus der Datenbank."""
import logging
import re
import json
import asyncio
from urllib.parse import urlparse
import httpx
import feedparser
from config import CLAUDE_PATH, CLAUDE_TIMEOUT
logger = logging.getLogger("osint.source_rules")
# Domain -> Kategorie Mapping für Auto-Erkennung
DOMAIN_CATEGORY_MAP = {
# Nachrichtenagenturen
"reuters.com": "nachrichtenagentur",
"apnews.com": "nachrichtenagentur",
"dpa.com": "nachrichtenagentur",
"afp.com": "nachrichtenagentur",
# Öffentlich-Rechtlich
"tagesschau.de": "oeffentlich-rechtlich",
"zdf.de": "oeffentlich-rechtlich",
"dw.com": "oeffentlich-rechtlich",
"br.de": "oeffentlich-rechtlich",
"ndr.de": "oeffentlich-rechtlich",
"wdr.de": "oeffentlich-rechtlich",
"mdr.de": "oeffentlich-rechtlich",
"swr.de": "oeffentlich-rechtlich",
"hr.de": "oeffentlich-rechtlich",
"rbb24.de": "oeffentlich-rechtlich",
"ard.de": "oeffentlich-rechtlich",
"orf.at": "oeffentlich-rechtlich",
"srf.ch": "oeffentlich-rechtlich",
# Qualitätszeitungen
"spiegel.de": "qualitaetszeitung",
"zeit.de": "qualitaetszeitung",
"faz.net": "qualitaetszeitung",
"sueddeutsche.de": "qualitaetszeitung",
"nzz.ch": "qualitaetszeitung",
"welt.de": "qualitaetszeitung",
"tagesspiegel.de": "qualitaetszeitung",
"fr.de": "qualitaetszeitung",
"stern.de": "qualitaetszeitung",
"focus.de": "qualitaetszeitung",
# Behörden
"bmi.bund.de": "behoerde",
"europol.europa.eu": "behoerde",
"bka.de": "behoerde",
"bsi.bund.de": "behoerde",
"verfassungsschutz.de": "behoerde",
"bpb.de": "behoerde",
# Fachmedien
"netzpolitik.org": "fachmedien",
"handelsblatt.com": "fachmedien",
"heise.de": "fachmedien",
"golem.de": "fachmedien",
"t3n.de": "fachmedien",
"wiwo.de": "fachmedien",
# Think Tanks
"swp-berlin.org": "think-tank",
"iiss.org": "think-tank",
"brookings.edu": "think-tank",
"rand.org": "think-tank",
"dgap.org": "think-tank",
"chathamhouse.org": "think-tank",
# International
"bbc.co.uk": "international",
"bbc.com": "international",
"aljazeera.com": "international",
"france24.com": "international",
"cnn.com": "international",
"theguardian.com": "international",
"nytimes.com": "international",
"washingtonpost.com": "international",
"lemonde.fr": "international",
"elpais.com": "international",
# Regional
"berliner-zeitung.de": "regional",
"hamburger-abendblatt.de": "regional",
"stuttgarter-zeitung.de": "regional",
"ksta.de": "regional",
"rp-online.de": "regional",
"merkur.de": "regional",
}
# Bekannte Feed-Pfade zum Durchprobieren
_FEED_PATHS = ["/feed", "/rss", "/rss.xml", "/atom.xml", "/feed.xml", "/index.xml", "/feed/rss", "/feeds/posts/default"]
# Erweiterte nachrichtenspezifische Feed-Pfade für Multi-Discovery
_NEWS_FEED_PATHS = [
"/world/rss", "/world/rss.xml", "/world/feed",
"/politics/rss", "/politics/rss.xml", "/politics/feed",
"/business/rss", "/business/rss.xml", "/business/feed",
"/technology/rss", "/technology/rss.xml", "/technology/feed",
"/environment/rss", "/environment/rss.xml", "/environment/feed",
"/science/rss", "/science/rss.xml", "/science/feed",
"/europe/rss", "/europe/rss.xml", "/europe/feed",
"/security/rss", "/security/rss.xml", "/security/feed",
"/international/rss", "/international/rss.xml", "/international/feed",
"/economy/rss", "/economy/rss.xml", "/economy/feed",
"/defence/rss", "/defence/rss.xml", "/defence/feed",
"/middle-east/rss", "/middle-east/rss.xml",
"/asia/rss", "/asia/rss.xml",
"/africa/rss", "/africa/rss.xml",
"/americas/rss", "/americas/rss.xml",
"/uk-news/rss", "/us-news/rss",
"/commentisfree/rss", "/opinion/rss",
"/law/rss", "/media/rss",
"/global-development/rss",
"/news/feed", "/news/rss", "/news/rss.xml",
"/politik/rss", "/politik/rss.xml",
"/wirtschaft/rss", "/wirtschaft/rss.xml",
"/panorama/rss", "/panorama/rss.xml",
"/wissen/rss", "/wissen/rss.xml",
"/ausland/rss", "/ausland/rss.xml",
"/inland/rss", "/inland/rss.xml",
"/netzwelt/rss", "/netzwelt/rss.xml",
"/kultur/rss", "/kultur/rss.xml",
]
# Bekannte Feed-Subdomains für Portale die Feeds auf separater Domain hosten
_DOMAIN_FEED_URLS = {
"bbc.com": [
"https://feeds.bbci.co.uk/news/rss.xml",
"https://feeds.bbci.co.uk/news/world/rss.xml",
"https://feeds.bbci.co.uk/news/business/rss.xml",
"https://feeds.bbci.co.uk/news/politics/rss.xml",
"https://feeds.bbci.co.uk/news/technology/rss.xml",
"https://feeds.bbci.co.uk/news/science_and_environment/rss.xml",
"https://feeds.bbci.co.uk/news/health/rss.xml",
"https://feeds.bbci.co.uk/news/education/rss.xml",
"https://feeds.bbci.co.uk/news/world/middle_east/rss.xml",
"https://feeds.bbci.co.uk/news/world/europe/rss.xml",
"https://feeds.bbci.co.uk/news/world/africa/rss.xml",
"https://feeds.bbci.co.uk/news/world/asia/rss.xml",
"https://feeds.bbci.co.uk/news/world/us_and_canada/rss.xml",
"https://feeds.bbci.co.uk/news/world/latin_america/rss.xml",
"https://feeds.bbci.co.uk/news/entertainment_and_arts/rss.xml",
],
"bbc.co.uk": "bbc.com", # Alias
"reuters.com": [
"https://www.reutersagency.com/feed/",
],
"aljazeera.com": [
"https://www.aljazeera.com/xml/rss/all.xml",
],
}
def _get_extra_feed_urls(domain: str) -> list[str]:
"""Gibt bekannte Feed-URLs für Domains mit separater Feed-Subdomain zurück."""
entry = _DOMAIN_FEED_URLS.get(domain)
if isinstance(entry, str):
# Alias — auf andere Domain verweisen
entry = _DOMAIN_FEED_URLS.get(entry)
if isinstance(entry, list):
return entry
return []
def _normalize_url(url: str) -> str:
"""URL normalisieren (https:// ergänzen falls fehlend)."""
url = url.strip()
if not url.startswith(("http://", "https://")):
url = "https://" + url
return url
def _extract_domain(url: str) -> str:
"""Domain aus URL extrahieren (ohne www.)."""
parsed = urlparse(url)
domain = parsed.hostname or ""
if domain.startswith("www."):
domain = domain[4:]
return domain
def _detect_category(domain: str) -> str:
"""Kategorie anhand der Domain erkennen."""
if domain in DOMAIN_CATEGORY_MAP:
return DOMAIN_CATEGORY_MAP[domain]
# Subdomain-Match: z.B. feeds.reuters.com -> reuters.com
parts = domain.split(".")
if len(parts) > 2:
parent = ".".join(parts[-2:])
if parent in DOMAIN_CATEGORY_MAP:
return DOMAIN_CATEGORY_MAP[parent]
return "sonstige"
# Bekannte Domain → Anzeigename Zuordnungen
DOMAIN_DISPLAY_NAMES = {
"tagesschau.de": "tagesschau",
"zdf.de": "ZDF heute",
"spiegel.de": "Spiegel",
"zeit.de": "Zeit",
"newsfeed.zeit.de": "Zeit",
"faz.net": "FAZ",
"sueddeutsche.de": "Süddeutsche Zeitung",
"rss.sueddeutsche.de": "Süddeutsche Zeitung",
"nzz.ch": "NZZ",
"dw.com": "Deutsche Welle",
"rss.dw.com": "Deutsche Welle",
"reuters.com": "Reuters",
"reutersagency.com": "Reuters",
"rsshub.app": "RSSHub",
"apnews.com": "AP News",
"bbc.com": "BBC",
"bbc.co.uk": "BBC",
"feeds.bbci.co.uk": "BBC",
"aljazeera.com": "Al Jazeera",
"france24.com": "France24",
"theguardian.com": "The Guardian",
"nytimes.com": "New York Times",
"washingtonpost.com": "Washington Post",
"cnn.com": "CNN",
"bmi.bund.de": "BMI",
"europol.europa.eu": "Europol",
"handelsblatt.com": "Handelsblatt",
"wiwo.de": "WirtschaftsWoche",
"heise.de": "Heise Online",
"golem.de": "Golem",
"netzpolitik.org": "netzpolitik.org",
"t3n.de": "t3n",
"welt.de": "Welt",
"tagesspiegel.de": "Tagesspiegel",
"stern.de": "Stern",
"focus.de": "Focus",
"n-tv.de": "n-tv",
"bild.de": "BILD",
"tarnkappe.info": "Tarnkappe",
"bleepingcomputer.com": "BleepingComputer",
"techcrunch.com": "TechCrunch",
"theverge.com": "The Verge",
"wired.com": "WIRED",
"tomshardware.com": "Tom's Hardware",
"finanzen.net": "Finanzen.net",
"404media.co": "404 Media",
"medium.com": "Medium",
"swp-berlin.org": "SWP Berlin",
"dgap.org": "DGAP",
"brookings.edu": "Brookings",
"rand.org": "RAND",
"lemonde.fr": "Le Monde",
"elpais.com": "El País",
"orf.at": "ORF",
"srf.ch": "SRF",
"br.de": "BR",
"ndr.de": "NDR",
"wdr.de": "WDR",
"mdr.de": "MDR",
"swr.de": "SWR",
"hr.de": "hr",
"rbb24.de": "rbb24",
"fr.de": "Frankfurter Rundschau",
"rp-online.de": "Rheinische Post",
"ksta.de": "Kölner Stadt-Anzeiger",
"berliner-zeitung.de": "Berliner Zeitung",
"stuttgarter-zeitung.de": "Stuttgarter Zeitung",
"hamburger-abendblatt.de": "Hamburger Abendblatt",
"merkur.de": "Münchner Merkur",
"bsi.bund.de": "BSI",
"bpb.de": "bpb",
"bka.de": "BKA",
"verfassungsschutz.de": "Verfassungsschutz",
"bashinho.de": "Bashinho",
}
def domain_to_display_name(domain: str) -> str:
"""Wandelt eine Domain in einen lesbaren Anzeigenamen um.
Prüft erst die bekannte Zuordnung, dann leitet einen sinnvollen
Namen aus der Domain ab (erster Teil, kapitalisiert).
"""
if domain in DOMAIN_DISPLAY_NAMES:
return DOMAIN_DISPLAY_NAMES[domain]
# Subdomain-Match: feeds.reuters.com -> reuters.com
parts = domain.split(".")
if len(parts) > 2:
parent = ".".join(parts[-2:])
if parent in DOMAIN_DISPLAY_NAMES:
return DOMAIN_DISPLAY_NAMES[parent]
# Fallback: Domain-Kern extrahieren und kapitalisieren
# z.B. "example-news.de" → "Example News"
core = parts[-2] if len(parts) >= 2 else parts[0]
return core.replace("-", " ").title()
async def _validate_feed(client: httpx.AsyncClient, url: str) -> dict | None:
"""Prüft ob eine URL ein gültiger RSS/Atom-Feed ist. Gibt Feed-Info zurück oder None."""
try:
resp = await client.get(url)
if resp.status_code != 200:
return None
content_type = resp.headers.get("content-type", "")
text = resp.text[:10000] # Nur Anfang prüfen
# Muss XML-artig sein
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
return None
feed = await asyncio.to_thread(feedparser.parse, text)
if feed.get("bozo") and not feed.entries:
return None
if feed.feed.get("title") or feed.entries:
return {
"url": str(resp.url), # Finale URL nach Redirects
"title": feed.feed.get("title", ""),
}
except Exception:
pass
return None
async def discover_source(url: str) -> dict:
"""Erkennt RSS-Feed, Name, Domain und Kategorie einer URL automatisch.
Returns:
dict mit: name, domain, rss_url, category, source_type
"""
url = _normalize_url(url)
domain = _extract_domain(url)
category = _detect_category(domain)
result = {
"name": domain_to_display_name(domain),
"domain": domain,
"rss_url": None,
"category": category,
"source_type": "web_source",
}
async with httpx.AsyncClient(
timeout=12.0,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
) as client:
# 1. Seite abrufen und nach RSS-Links suchen
page_title = None
try:
resp = await client.get(url)
if resp.status_code == 200:
html = resp.text[:50000]
# <title> extrahieren
title_match = re.search(r"<title[^>]*>([^<]+)</title>", html, re.IGNORECASE)
if title_match:
page_title = title_match.group(1).strip()
# RSS/Atom Link-Tags suchen
feed_links = re.findall(
r'<link[^>]+type=["\']application/(rss|atom)\+xml["\'][^>]*>',
html,
re.IGNORECASE,
)
# Auch umgekehrte Attribut-Reihenfolge
feed_links += re.findall(
r'<link[^>]+href=["\']([^"\']+)["\'][^>]+type=["\']application/(rss|atom)\+xml["\'][^>]*>',
html,
re.IGNORECASE,
)
# href aus den gefundenen Tags extrahieren
feed_urls = []
for tag in re.finditer(
r'<link[^>]+type=["\']application/(?:rss|atom)\+xml["\'][^>]*>',
html,
re.IGNORECASE,
):
href_match = re.search(r'href=["\']([^"\']+)["\']', tag.group(0))
if href_match:
href = href_match.group(1)
# Relative URLs auflösen
if href.startswith("/"):
parsed = urlparse(url)
href = f"{parsed.scheme}://{parsed.netloc}{href}"
elif not href.startswith("http"):
href = url.rstrip("/") + "/" + href
feed_urls.append(href)
# Gefundene Feed-URLs validieren
for feed_url in feed_urls:
feed_info = await _validate_feed(client, feed_url)
if feed_info:
result["rss_url"] = feed_info["url"]
result["source_type"] = "rss_feed"
if feed_info["title"]:
result["name"] = feed_info["title"]
elif page_title:
result["name"] = page_title
return result
except Exception as e:
logger.debug(f"Fehler beim Abrufen von {url}: {e}")
# 2. Bekannte Feed-Pfade durchprobieren
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
for path in _FEED_PATHS:
feed_url = base_url + path
feed_info = await _validate_feed(client, feed_url)
if feed_info:
result["rss_url"] = feed_info["url"]
result["source_type"] = "rss_feed"
if feed_info["title"]:
result["name"] = feed_info["title"]
elif page_title:
result["name"] = page_title
return result
# Kein Feed gefunden — Name aus Seitentitel
if page_title:
result["name"] = page_title
return result
async def discover_all_feeds(url: str) -> dict:
"""Findet ALLE RSS/Atom-Feeds einer Domain.
Returns:
dict mit: domain, category, page_title, feeds: [{"url", "title"}, ...]
"""
url = _normalize_url(url)
domain = _extract_domain(url)
category = _detect_category(domain)
result = {
"domain": domain,
"category": category,
"page_title": None,
"feeds": [],
}
seen_urls = set()
async with httpx.AsyncClient(
timeout=15.0,
follow_redirects=True,
headers={"User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0)"},
) as client:
# 1. HTML-Seite abrufen und ALLE RSS-Link-Tags sammeln
candidate_urls = []
try:
resp = await client.get(url)
if resp.status_code == 200:
html = resp.text[:100000]
title_match = re.search(r"<title[^>]*>([^<]+)</title>", html, re.IGNORECASE)
if title_match:
result["page_title"] = title_match.group(1).strip()
parsed = urlparse(url)
base = f"{parsed.scheme}://{parsed.netloc}"
for tag in re.finditer(
r'<link[^>]+type=["\']application/(?:rss|atom)\+xml["\'][^>]*>',
html,
re.IGNORECASE,
):
href_match = re.search(r'href=["\']([^"\']+)["\']', tag.group(0))
if href_match:
href = href_match.group(1)
if href.startswith("/"):
href = base + href
elif not href.startswith("http"):
href = url.rstrip("/") + "/" + href
candidate_urls.append(href)
except Exception as e:
logger.debug(f"Fehler beim Abrufen von {url}: {e}")
# 2. Bekannte Feed-Pfade hinzufügen (Standard + Nachrichten-spezifisch)
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
for path in _FEED_PATHS + _NEWS_FEED_PATHS:
candidate_urls.append(base_url + path)
# 2b. Bekannte Feed-URLs für Domains mit separater Feed-Subdomain (z.B. BBC)
extra_urls = _get_extra_feed_urls(domain)
candidate_urls.extend(extra_urls)
# 3. Alle Kandidaten parallel validieren (in Batches von 10)
async def _validate_and_collect(feed_url: str):
try:
return await _validate_feed(client, feed_url)
except Exception:
return None
for i in range(0, len(candidate_urls), 10):
batch = candidate_urls[i:i + 10]
results = await asyncio.gather(*[_validate_and_collect(u) for u in batch])
for feed_info in results:
if feed_info and feed_info["url"] not in seen_urls:
seen_urls.add(feed_info["url"])
result["feeds"].append(feed_info)
logger.info(f"discover_all_feeds({domain}): {len(result['feeds'])} Feeds gefunden")
return result
async def evaluate_feeds_with_claude(domain: str, feeds: list[dict]) -> list[dict]:
"""Lässt Claude die OSINT-Relevanz der Feeds bewerten.
Args:
domain: Domain-Name
feeds: Liste von {"url", "title"} Dicts
Returns:
Liste von {"url", "title", "name"} Dicts (nur relevante Feeds)
"""
if not feeds:
return []
feed_list = "\n".join(
f" {i+1}. {f['title'] or f['url']}{f['url']}"
for i, f in enumerate(feeds)
)
prompt = f"""Du bist ein OSINT-Analyst. Bewerte diese RSS-Feeds der Domain "{domain}" nach OSINT-Relevanz.
OSINT-relevante Themen: Politik, Sicherheit, Wirtschaft, Internationale Beziehungen, Verteidigung, Konflikte, Terrorismus, Cybersecurity, Umweltkatastrophen, Technologie, Wissenschaft, Nachrichten allgemein.
NICHT relevant: Sport, Lifestyle, Rezepte, Unterhaltung, Reisen, Mode, Kultur/Kunst, Wetter, Kreuzworträtsel, Podcasts (allgemein), Leserbriefe, Kommentare/Meinung.
Feeds:
{feed_list}
Antworte AUSSCHLIESSLICH mit einem JSON-Array. Jedes Element:
{{"index": <1-basiert>, "relevant": true/false, "name": "<Anzeigename für OSINT-Monitor, z.B. 'Guardian World' oder 'Spiegel Politik'>"}}
Nur das JSON-Array, kein anderer Text."""
try:
cmd = [
CLAUDE_PATH,
"-p", prompt,
"--output-format", "text",
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env={"PATH": "/usr/local/bin:/usr/bin:/bin", "HOME": "/home/claude-dev"},
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=min(CLAUDE_TIMEOUT, 120)
)
except asyncio.TimeoutError:
process.kill()
logger.warning(f"Claude-Bewertung Timeout für {domain}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
if process.returncode != 0:
logger.warning(f"Claude-Bewertung fehlgeschlagen für {domain}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
response = stdout.decode("utf-8", errors="replace").strip()
# JSON aus Antwort extrahieren (Claude gibt manchmal Markdown-Blöcke zurück)
json_match = re.search(r'\[.*\]', response, re.DOTALL)
if not json_match:
logger.warning(f"Kein JSON in Claude-Antwort für {domain}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
evaluations = json.loads(json_match.group(0))
relevant = []
for ev in evaluations:
idx = ev.get("index", 0) - 1
if ev.get("relevant") and 0 <= idx < len(feeds):
feed = feeds[idx]
relevant.append({
"url": feed["url"],
"title": feed["title"],
"name": ev.get("name", feed["title"] or domain),
})
logger.info(f"Claude-Bewertung für {domain}: {len(relevant)}/{len(feeds)} relevant")
return relevant
except json.JSONDecodeError:
logger.warning(f"JSON-Parse-Fehler bei Claude-Antwort für {domain}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
except Exception as e:
logger.warning(f"Claude-Bewertung Fehler für {domain}: {e}, nutze Fallback")
return _fallback_all_feeds(domain, feeds)
def _fallback_all_feeds(domain: str, feeds: list[dict]) -> list[dict]:
"""Fallback: Alle Feeds übernehmen mit Feed-Titel als Name."""
return [
{
"url": f["url"],
"title": f["title"],
"name": f["title"] or domain,
}
for f in feeds
]
async def get_feeds_with_metadata(tenant_id: int = None) -> list[dict]:
"""Alle aktiven RSS-Feeds mit Metadaten fuer Claude-Selektion (global + org-spezifisch)."""
from database import get_db
db = await get_db()
try:
if tenant_id:
cursor = await db.execute(
"SELECT name, url, domain, category FROM sources "
"WHERE source_type = 'rss_feed' AND status = 'active' "
"AND (tenant_id IS NULL OR tenant_id = ?)",
(tenant_id,),
)
else:
cursor = await db.execute(
"SELECT name, url, domain, category FROM sources "
"WHERE source_type = 'rss_feed' AND status = 'active'"
)
return [dict(row) for row in await cursor.fetchall()]
except Exception as e:
logger.error(f"Fehler beim Laden der Feed-Metadaten: {e}")
return []
finally:
await db.close()
async def get_source_rules(tenant_id: int = None) -> dict:
"""Liest Quellen-Konfiguration aus DB (global + org-spezifisch).
Returns:
dict mit:
- excluded_domains: Liste ausgeschlossener Domains
- rss_feeds: Dict mit Kategorien deutsch/international/behoerden
"""
from database import get_db
db = await get_db()
try:
if tenant_id:
cursor = await db.execute(
"SELECT * FROM sources WHERE status = 'active' AND (tenant_id IS NULL OR tenant_id = ?)",
(tenant_id,),
)
else:
cursor = await db.execute(
"SELECT * FROM sources WHERE status = 'active'"
)
sources = [dict(row) for row in await cursor.fetchall()]
excluded_domains = []
rss_feeds = {"deutsch": [], "international": [], "behoerden": []}
for source in sources:
if source["source_type"] == "excluded":
excluded_domains.append(source["domain"] or source["name"])
elif source["source_type"] == "rss_feed" and source["url"]:
feed_entry = {"name": source["name"], "url": source["url"]}
cat = source["category"]
if cat == "behoerde":
rss_feeds["behoerden"].append(feed_entry)
elif cat == "international":
rss_feeds["international"].append(feed_entry)
else:
# Alle anderen Kategorien → deutsch
rss_feeds["deutsch"].append(feed_entry)
return {
"excluded_domains": excluded_domains,
"rss_feeds": rss_feeds,
}
except Exception as e:
logger.error(f"Fehler beim Laden der Quellen-Regeln: {e}")
# Fallback auf config.py
from config import RSS_FEEDS, EXCLUDED_SOURCES
return {
"excluded_domains": list(EXCLUDED_SOURCES),
"rss_feeds": dict(RSS_FEEDS),
}
finally:
await db.close()