From 5127e0a42d5dff64e58f1004a540bb45a95341c6 Mon Sep 17 00:00:00 2001 From: claude-dev Date: Sat, 18 Apr 2026 12:06:54 +0000 Subject: [PATCH] Podcast-Integration Phase 1: Feed-Tag + Senderseiten MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Podcasts werden wie normale RSS-Quellen behandelt (source_type=podcast_feed). Kein externer bezahlter Dienst, keine lokale Transkription — Monitor nutzt ausschliesslich vorhandene Transkripte. Kaskade fuer Transkript-Bezug: 1. Podcasting-2.0-Tag im Feed (SRT/VTT/HTML/JSON) 2. Redaktionelles Manuskript auf der Episodenseite (Adapter: Dlf, SZ, Spiegel, NDR) 3. YouTube-Captions — Phase 2, optional per yt-dlp Kein Stufen-Treffer -> Episode verworfen (graceful, kein Error). Neu: - src/feeds/podcast_parser.py (eigener Parser, RSS-Heisspfad unveraendert) - src/feeds/transcript_extractors/ (Plugin-Muster): __init__.py Dispatcher, Cache-Lookup gegen podcast_transcripts _common.py HTML-Extraktion, Domain-Matching, httpx-Helper rss_native.py Stufe 1: Feed-Tag-Parser (SRT/VTT/JSON/HTML) website_dlf.py Stufe 2: deutschlandfunk.de + Schwester-Domains website_sz.py Stufe 2: sz.de / sueddeutsche.de website_spiegel.py Stufe 2: spiegel.de / manager-magazin.de website_ndr.py Stufe 2: ndr.de Geaendert: - src/database.py: idempotente Migration, Tabelle podcast_transcripts als URL-Cache gegen Mehrfach-Scrape zwischen Lagen - src/models.py: Pydantic-Pattern von source_type um podcast_feed erweitert - src/source_rules.py: get_feeds_with_metadata() nimmt source_type-Parameter, Default rss_feed (RSS-Pfad unveraendert) - src/agents/orchestrator.py: neue _podcast_pipeline() parallel zu RSS, WebSearch und Telegram; nur fuer adhoc-Lagen; ohne Podcast-Quellen dormant Verifikation: - Migration auf Live-DB erfolgreich (Log: Tabelle podcast_transcripts angelegt) - Import-/Instanziierungs-Test aller Module bestanden - can_handle-Tests pro Sender-Adapter positiv + negativ OK - Live-Scrape gegen Dlf: 22710 Zeichen, gegen SZ: 24918 Zeichen - Dormant-Test: 0 Podcast-Quellen -> keine neue Codezeile im Refresh Verwerfbarkeit: rein additiv, RSS-Pfad unberuehrt, Rollback in drei Schritten (Quellen disablen, git revert, DROP TABLE podcast_transcripts). --- src/agents/orchestrator.py | 44 ++++- src/database.py | 19 ++ src/feeds/podcast_parser.py | 184 ++++++++++++++++++ src/feeds/transcript_extractors/__init__.py | 126 ++++++++++++ src/feeds/transcript_extractors/_common.py | 170 ++++++++++++++++ src/feeds/transcript_extractors/rss_native.py | 182 +++++++++++++++++ .../transcript_extractors/website_dlf.py | 61 ++++++ .../transcript_extractors/website_ndr.py | 51 +++++ .../transcript_extractors/website_spiegel.py | 51 +++++ src/feeds/transcript_extractors/website_sz.py | 53 +++++ src/models.py | 4 +- src/source_rules.py | 17 +- 12 files changed, 951 insertions(+), 11 deletions(-) create mode 100644 src/feeds/podcast_parser.py create mode 100644 src/feeds/transcript_extractors/__init__.py create mode 100644 src/feeds/transcript_extractors/_common.py create mode 100644 src/feeds/transcript_extractors/rss_native.py create mode 100644 src/feeds/transcript_extractors/website_dlf.py create mode 100644 src/feeds/transcript_extractors/website_ndr.py create mode 100644 src/feeds/transcript_extractors/website_spiegel.py create mode 100644 src/feeds/transcript_extractors/website_sz.py diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index 3b9183c..60ce383 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -781,6 +781,39 @@ class AgentOrchestrator: logger.info(f"Claude-Recherche: {len(results)} Ergebnisse") return results, usage + async def _podcast_pipeline(): + """Podcast-Episoden-Suche (nur adhoc-Lagen, nur mit vorhandenen Transkripten).""" + if incident_type != "adhoc": + logger.info("Recherche-Modus: Podcasts uebersprungen") + return [], None + + from source_rules import get_feeds_with_metadata + podcast_feeds = await get_feeds_with_metadata(tenant_id=tenant_id, source_type="podcast_feed") + if not podcast_feeds: + return [], None + + from feeds.podcast_parser import PodcastFeedParser + pd_parser = PodcastFeedParser() + pd_researcher = ResearcherAgent() + + # Dynamische Keywords (eigener Haiku-Call, parallel zu RSS — + # billig und hält Pipelines unabhaengig) + cursor_pd_hl = await db.execute( + """SELECT COALESCE(headline_de, headline) as hl + FROM articles WHERE incident_id = ? + AND COALESCE(headline_de, headline) IS NOT NULL + ORDER BY collected_at DESC LIMIT 30""", + (incident_id,), + ) + pd_headlines = [row["hl"] for row in await cursor_pd_hl.fetchall() if row["hl"]] + pd_keywords, pd_kw_usage = await pd_researcher.extract_dynamic_keywords(title, pd_headlines) + if pd_kw_usage: + usage_acc.add(pd_kw_usage) + + articles = await pd_parser.search_feeds_selective(title, podcast_feeds, keywords=pd_keywords) + logger.info(f"Podcast-Pipeline: {len(articles)} Episoden gefunden") + return articles, None + async def _telegram_pipeline(): """Telegram-Kanal-Suche mit KI-basierter Kanal-Selektion.""" from feeds.telegram_parser import TelegramParser @@ -821,8 +854,8 @@ class AgentOrchestrator: logger.info(f"Telegram-Pipeline: {len(articles)} Nachrichten") return articles, None - # Pipelines parallel starten (RSS + WebSearch + optional Telegram) - pipelines = [_rss_pipeline(), _web_search_pipeline()] + # Pipelines parallel starten (RSS + WebSearch + Podcasts + optional Telegram) + pipelines = [_rss_pipeline(), _web_search_pipeline(), _podcast_pipeline()] if include_telegram: pipelines.append(_telegram_pipeline()) @@ -830,7 +863,12 @@ class AgentOrchestrator: (rss_articles, rss_feed_usage) = pipeline_results[0] (search_results, search_usage) = pipeline_results[1] - telegram_articles = pipeline_results[2][0] if include_telegram else [] + (podcast_articles, _podcast_usage) = pipeline_results[2] + telegram_articles = pipeline_results[3][0] if include_telegram else [] + + # Podcast-Artikel in die RSS-Liste einfuegen (gleicher Downstream-Pfad) + if podcast_articles: + rss_articles = (rss_articles or []) + podcast_articles # URL-Verifizierung nur fuer WebSearch-Ergebnisse (RSS-URLs sind bereits verifiziert) if search_results: diff --git a/src/database.py b/src/database.py index a6937d0..4f2f7fd 100644 --- a/src/database.py +++ b/src/database.py @@ -374,6 +374,25 @@ async def init_db(): await db.commit() logger.info("Migration: latest_developments zu incidents hinzugefuegt") + # Migration: Tabelle podcast_transcripts (URL-Cache fuer Transkripte) + cursor = await db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='podcast_transcripts'" + ) + if not await cursor.fetchone(): + await db.execute( + """ + CREATE TABLE podcast_transcripts ( + url TEXT PRIMARY KEY, + transcript TEXT NOT NULL, + source TEXT NOT NULL, + segments_json TEXT, + fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + await db.commit() + logger.info("Migration: Tabelle podcast_transcripts angelegt") + # Migration: Token-Spalten fuer refresh_log cursor = await db.execute("PRAGMA table_info(refresh_log)") rl_columns = [row[1] for row in await cursor.fetchall()] diff --git a/src/feeds/podcast_parser.py b/src/feeds/podcast_parser.py new file mode 100644 index 0000000..c8d87a4 --- /dev/null +++ b/src/feeds/podcast_parser.py @@ -0,0 +1,184 @@ +"""Podcast-Feed-Parser: wie RSSParser, nur mit Transkript-Kaskade. + +Aufbau bewusst copy-light zu rss_parser.py: dieselbe oeffentliche +Signatur `search_feeds_selective()`, eigener Code-Pfad mit Pre-Filter und +anschliessender Transkript-Kaskade via `transcript_extractors`. + +Vorgaben des Plans: +- Keine kostenpflichtige API, keine lokale Transkription +- Episoden ohne auffindbares Transkript werden verworfen +- content_original wird NICHT auf 1000 Zeichen gekuerzt (Transkript-Volltext) +- Duplikate-Schutz zwischen Lagen ueber Cache-Tabelle podcast_transcripts +""" +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timezone + +import feedparser +import httpx + +from config import TIMEZONE, MAX_ARTICLES_PER_DOMAIN_RSS +from source_rules import _extract_domain +from feeds.transcript_extractors import fetch_transcript + +logger = logging.getLogger("osint.podcast") + + +class PodcastFeedParser: + """Durchsucht Podcast-Feeds nach relevanten Episoden (mit Transkript).""" + + STOP_WORDS = { + "und", "oder", "der", "die", "das", "ein", "eine", "in", "im", "am", "an", + "auf", "für", "mit", "von", "zu", "zum", "zur", "bei", "nach", "vor", + "über", "unter", "ist", "sind", "hat", "the", "and", "for", "with", "from", + } + + # Pre-Filter: wie im RSSParser — mindestens Haelfte der Keywords, max 2 notwendig + @staticmethod + def _prefilter_match(title: str, summary: str, keywords: list[str]) -> tuple[bool, float]: + text = f"{title} {summary}".lower() + if not keywords: + return True, 0.0 + min_matches = min(2, max(1, (len(keywords) + 1) // 2)) + match_count = sum(1 for kw in keywords if kw and kw in text) + if match_count >= min_matches: + return True, match_count / len(keywords) + return False, 0.0 + + async def search_feeds_selective( + self, + search_term: str, + selected_feeds: list[dict], + keywords: list[str] | None = None, + ) -> list[dict]: + """Durchsucht die uebergebenen Podcast-Feeds nach relevanten Episoden. + + Signatur bewusst identisch zu RSSParser.search_feeds_selective, damit + die Orchestrator-Logik analog aufgebaut werden kann. + """ + if not selected_feeds: + return [] + + if keywords: + search_words = [w.lower().strip() for w in keywords if w.strip()] + else: + search_words = [w.lower() for w in search_term.split() if len(w) > 2 and w.lower() not in self.STOP_WORDS] + search_words = self._clean_search_words(search_words) + if not search_words: + return [] + + # Feeds parallel abfragen + tasks = [self._fetch_feed(feed, search_words) for feed in selected_feeds] + results = await asyncio.gather(*tasks, return_exceptions=True) + + all_articles: list[dict] = [] + for feed, r in zip(selected_feeds, results): + if isinstance(r, Exception): + logger.debug(f"Podcast-Feed {feed.get('name')} fehlgeschlagen: {r}") + continue + all_articles.extend(r) + + all_articles = self._apply_domain_cap(all_articles) + logger.info(f"Podcast-Parser: {len(all_articles)} Episoden mit Transkript gefunden") + return all_articles + + @staticmethod + def _clean_search_words(words: list[str]) -> list[str]: + cleaned = [w for w in words if not w.isdigit()] + return cleaned if cleaned else words + + async def _fetch_feed(self, feed_config: dict, search_words: list[str]) -> list[dict]: + """Einzelnen Podcast-Feed abrufen, Pre-Filter + Transkript-Kaskade.""" + name = feed_config["name"] + url = feed_config["url"] + articles: list[dict] = [] + + try: + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: + response = await client.get(url, headers={"User-Agent": "OSINT-Monitor/1.0 (Podcast Aggregator)"}) + response.raise_for_status() + feed = await asyncio.to_thread(feedparser.parse, response.text) + except Exception as e: + logger.debug(f"Podcast-Feed {name} ({url}): {e}") + return articles + + # Pro Feed maximal die 20 neuesten Episoden betrachten. + # Podcasts veroeffentlichen seltener als RSS-Feeds; 20 reicht fuer + # einen mehrmonatigen Rueckblick und begrenzt den Scrape-Aufwand. + entries = list(feed.entries[:20]) + + # Kandidaten nach Pre-Filter sammeln (keine Transkript-Abfrage dafuer). + candidates = [] + for entry in entries: + title = entry.get("title", "") + summary = entry.get("summary", "") or entry.get("description", "") + passed, score = self._prefilter_match(title, summary, search_words) + if passed: + candidates.append((entry, title, summary, score)) + + if not candidates: + return articles + + # Transkript-Kaskade parallel nur fuer die Kandidaten + transcript_tasks = [fetch_transcript(e, url, e.get("link")) for e, _t, _s, _r in candidates] + transcript_results = await asyncio.gather(*transcript_tasks, return_exceptions=True) + + for (entry, title, summary, score), t_result in zip(candidates, transcript_results): + if isinstance(t_result, Exception): + logger.debug(f"Transkript-Kaskade fuer {entry.get('link')}: {t_result}") + continue + if not t_result or not t_result.text: + # Ohne Transkript keine Uebernahme (Plan-Vorgabe) + continue + + # Nach-Transkript-Filter: wenn der Pre-Filter nur knapp griff, + # muss das Transkript die Keywords ebenfalls enthalten — sonst ist + # die Episode nicht wirklich relevant (Shownotes-Zufallstreffer). + if not self._transcript_confirms(t_result.text, search_words): + continue + + published = None + if hasattr(entry, "published_parsed") and entry.published_parsed: + try: + published = datetime(*entry.published_parsed[:6], tzinfo=timezone.utc).astimezone(TIMEZONE).isoformat() + except (TypeError, ValueError): + pass + + # WICHTIG: Transkript-Volltext, KEINE 1000-Zeichen-Kuerzung wie bei RSS. + articles.append({ + "headline": title, + "headline_de": title, + "source": name, + "source_url": entry.get("link", ""), + "content_original": t_result.text, + "content_de": t_result.text, + "language": "de", + "published_at": published, + "relevance_score": score, + }) + + return articles + + @staticmethod + def _transcript_confirms(transcript: str, keywords: list[str]) -> bool: + """Prueft, dass mind. ein Keyword auch im Transkript vorkommt.""" + if not keywords: + return True + text = transcript.lower() + return any(kw in text for kw in keywords if kw) + + def _apply_domain_cap(self, articles: list[dict]) -> list[dict]: + """Begrenzt die Anzahl der Episoden pro Domain (analog RSSParser).""" + if not articles: + return articles + by_domain: dict[str, list[dict]] = {} + for a in articles: + dom = _extract_domain(a.get("source_url", "")) or "_unknown" + by_domain.setdefault(dom, []).append(a) + out: list[dict] = [] + for dom, items in by_domain.items(): + items.sort(key=lambda x: x.get("relevance_score", 0.0), reverse=True) + out.extend(items[:MAX_ARTICLES_PER_DOMAIN_RSS]) + return out diff --git a/src/feeds/transcript_extractors/__init__.py b/src/feeds/transcript_extractors/__init__.py new file mode 100644 index 0000000..a209131 --- /dev/null +++ b/src/feeds/transcript_extractors/__init__.py @@ -0,0 +1,126 @@ +"""Kaskaden-Dispatcher fuer Podcast-Transkript-Bezug. + +Reihenfolge der Strategien: + 1. rss_native — Podcasting-2.0-Tag im Feed-Entry + 2. website_* — Redaktionelles Manuskript auf der Episoden-Webseite + (sender-spezifische Adapter) + 3. youtube — YouTube-Auto-Captions via yt-dlp (Phase 2, optional) + +Jeder Adapter implementiert: + def can_handle(feed_entry: dict, feed_url: str) -> bool + async def fetch(feed_entry: dict, feed_url: str) -> TranscriptResult | None + +Wer None liefert, gibt der naechsten Stufe die Chance. Wer einen +TranscriptResult liefert, beendet die Kaskade fuer diese Episode. + +Der Dispatcher kuemmert sich um das Caching gegen die Tabelle +`podcast_transcripts` — eine einmal gefundene Episode wird bei folgenden +Refreshes (auch in anderen Lagen) direkt aus dem Cache geholt. +""" +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from typing import Optional + +logger = logging.getLogger("osint.podcast.extractors") + + +@dataclass +class TranscriptResult: + """Einheitliches Ergebnis einer Transkript-Strategie.""" + text: str + source: str # "rss_native" / "website_scrape" / "youtube" + segments: Optional[list] = None # Optional: [{"start": sec, "end": sec, "text": "..."}] + + +# Reihenfolge der Kaskade: zuerst Feed-Tag, dann Senderseiten, zuletzt YouTube +from . import rss_native +from . import website_dlf +from . import website_sz +from . import website_spiegel +from . import website_ndr + +_EXTRACTORS = [ + rss_native, + website_dlf, + website_sz, + website_spiegel, + website_ndr, +] + +# YouTube-Adapter ist optional — nur importieren falls vorhanden (Phase 2) +try: + from . import youtube # noqa: F401 + _EXTRACTORS.append(youtube) +except ImportError: + pass + + +async def fetch_transcript(feed_entry: dict, feed_url: str, episode_url: str) -> Optional[TranscriptResult]: + """Versucht Kaskade durch bis eine Stufe liefert. + + Vor dem Kaskaden-Lauf wird der Cache (Tabelle `podcast_transcripts`) gegen + episode_url geprueft. Trifft der Cache, wird ohne HTTP-Request ausgeliefert. + """ + if not episode_url: + return None + + from database import get_db + db = await get_db() + try: + cursor = await db.execute( + "SELECT transcript, source, segments_json FROM podcast_transcripts WHERE url = ?", + (episode_url,), + ) + row = await cursor.fetchone() + if row: + segments = None + if row["segments_json"]: + try: + segments = json.loads(row["segments_json"]) + except json.JSONDecodeError: + segments = None + logger.debug(f"Transkript-Cache-Hit: {episode_url}") + return TranscriptResult(text=row["transcript"], source=row["source"], segments=segments) + finally: + await db.close() + + # Kaskade: erste Stufe, die can_handle(True) und ein Ergebnis liefert, gewinnt. + for extractor in _EXTRACTORS: + try: + if not extractor.can_handle(feed_entry, feed_url): + continue + result = await extractor.fetch(feed_entry, feed_url) + if result and result.text and result.text.strip(): + await _store_in_cache(episode_url, result) + logger.info( + f"Transkript via {result.source} fuer {episode_url} " + f"({len(result.text)} Zeichen)" + ) + return result + except Exception as e: + logger.warning(f"Extraktor {extractor.__name__} fuer {episode_url}: {e}") + continue + + logger.debug(f"Kein Transkript verfuegbar: {episode_url}") + return None + + +async def _store_in_cache(url: str, result: TranscriptResult) -> None: + """Legt das Transkript in der Cache-Tabelle ab (INSERT OR REPLACE).""" + from database import get_db + db = await get_db() + try: + segments_json = json.dumps(result.segments, ensure_ascii=False) if result.segments else None + await db.execute( + "INSERT OR REPLACE INTO podcast_transcripts (url, transcript, source, segments_json) " + "VALUES (?, ?, ?, ?)", + (url, result.text, result.source, segments_json), + ) + await db.commit() + except Exception as e: + logger.warning(f"Cache-Write fuer {url} fehlgeschlagen: {e}") + finally: + await db.close() diff --git a/src/feeds/transcript_extractors/_common.py b/src/feeds/transcript_extractors/_common.py new file mode 100644 index 0000000..59b074d --- /dev/null +++ b/src/feeds/transcript_extractors/_common.py @@ -0,0 +1,170 @@ +"""Gemeinsame Helfer fuer Website-Scrape-Adapter. + +HTML-Extraktor ohne externe Abhaengigkeiten (BeautifulSoup nicht in +requirements.txt). Nutzt Regex fuer robusten Plaintext-Extract aus +typischen Artikel-Containern. +""" +from __future__ import annotations + +import logging +import re +from typing import Optional +from urllib.parse import urlparse + +import httpx + +logger = logging.getLogger("osint.podcast.extractors.common") + + +HTTP_TIMEOUT = 20.0 +MIN_TRANSCRIPT_LEN = 500 # Unter 500 Zeichen ist das kein Manuskript, nur Shownotes + +DEFAULT_HEADERS = { + "User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0; +https://monitor.aegis-sight.de)", + "Accept": "text/html,application/xhtml+xml", + "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", +} + + +def matches_domain(url: str, domains: tuple[str, ...]) -> bool: + """Prueft, ob die URL zu einer der bekannten Sender-Domains gehoert.""" + if not url: + return False + try: + host = urlparse(url).hostname or "" + host = host.lower().lstrip("www.") + return any(host == d or host.endswith("." + d) for d in domains) + except Exception: + return False + + +def episode_url(feed_entry: dict) -> Optional[str]: + """Holt die Episoden-Webseite (meist entry.link).""" + if isinstance(feed_entry, dict): + return feed_entry.get("link") or feed_entry.get("guid") + return getattr(feed_entry, "link", None) or getattr(feed_entry, "guid", None) + + +async def fetch_html(url: str) -> Optional[str]: + async with httpx.AsyncClient(timeout=HTTP_TIMEOUT, follow_redirects=True, headers=DEFAULT_HEADERS) as client: + try: + resp = await client.get(url) + resp.raise_for_status() + return resp.text + except Exception as e: + logger.debug(f"HTML-Fetch fehlgeschlagen ({url}): {e}") + return None + + +# --- HTML-Extraktion ------------------------------------------------------ + +_SCRIPT_STYLE_RE = re.compile(r"<(script|style|noscript|iframe)[^>]*>.*?", re.DOTALL | re.IGNORECASE) +_COMMENT_RE = re.compile(r"", re.DOTALL) +_TAG_RE = re.compile(r"<[^>]+>") +_WHITESPACE_RE = re.compile(r"\s+") + + +def extract_text_by_container(html: str, container_patterns: list[str]) -> Optional[str]: + """Extrahiert Text aus dem ersten gefundenen Container. + + container_patterns: Liste von Regex-Mustern, die den oeffnenden Container-Tag + matchen (z. B. r']*class="[^"]*article-body[^"]*"[^>]*>'). + Intern wird der zugehoerige schliessende Tag per Tag-Balancing gesucht. + """ + html_clean = _COMMENT_RE.sub("", _SCRIPT_STYLE_RE.sub("", html)) + + for pattern in container_patterns: + m = re.search(pattern, html_clean, re.IGNORECASE) + if not m: + continue + start = m.start() + # Tag-Name aus Pattern-Treffer extrahieren + tag_match = re.match(r"<(\w+)", m.group(0)) + if not tag_match: + continue + tag_name = tag_match.group(1).lower() + end = _find_matching_close(html_clean, start, tag_name) + if end < 0: + continue + block = html_clean[start:end] + text = html_to_text(block) + if len(text) >= MIN_TRANSCRIPT_LEN: + return text + return None + + +def extract_longest_article_block(html: str) -> Optional[str]: + """Fallback: suche den laengsten zusammenhaengenden Block aus

-Tags. + + Nuetzlich, wenn spezifische Container-Selektoren fehlschlagen. + """ + html_clean = _COMMENT_RE.sub("", _SCRIPT_STYLE_RE.sub("", html)) + + # Alle

- und
-Bloecke finden + candidates = [] + for tag in ("article", "main"): + for m in re.finditer(rf"<{tag}\b[^>]*>", html_clean, re.IGNORECASE): + end = _find_matching_close(html_clean, m.start(), tag) + if end > m.start(): + candidates.append(html_clean[m.start():end]) + + if not candidates: + # Letzter Ausweg: gesamter Body + body_m = re.search(r"]*>", html_clean, re.IGNORECASE) + if body_m: + candidates.append(html_clean[body_m.start():]) + + best_text = "" + for block in candidates: + text = html_to_text(block) + if len(text) > len(best_text): + best_text = text + return best_text if len(best_text) >= MIN_TRANSCRIPT_LEN else None + + +def html_to_text(html: str) -> str: + """Simple HTML→Plaintext-Konvertierung.""" + no_tags = _COMMENT_RE.sub("", _SCRIPT_STYLE_RE.sub("", html)) + no_tags = _TAG_RE.sub(" ", no_tags) + no_tags = (no_tags + .replace(" ", " ") + .replace("&", "&") + .replace(""", '"') + .replace("'", "'") + .replace("'", "'") + .replace("<", "<") + .replace(">", ">") + .replace("–", "-") + .replace("—", "-") + .replace("ä", "ä") + .replace("ö", "ö") + .replace("ü", "ü") + .replace("Ä", "Ä") + .replace("Ö", "Ö") + .replace("Ü", "Ü") + .replace("ß", "ß")) + return _WHITESPACE_RE.sub(" ", no_tags).strip() + + +def _find_matching_close(html: str, start: int, tag_name: str) -> int: + """Findet die Position des schliessenden Tags, der zum oeffnenden Tag an `start` gehoert. + + Einfacher Zaehler-Ansatz: jeder weitere erhoeht, jeder verringert. + Rueckgabe: Index NACH dem schliessenden Tag, -1 falls nicht gefunden. + """ + open_re = re.compile(rf"<{tag_name}\b[^>]*>", re.IGNORECASE) + close_re = re.compile(rf"", re.IGNORECASE) + depth = 1 + pos = start + 1 # nach dem initial geoeffneten Tag + while pos < len(html) and depth > 0: + next_open = open_re.search(html, pos) + next_close = close_re.search(html, pos) + if not next_close: + return -1 + if next_open and next_open.start() < next_close.start(): + depth += 1 + pos = next_open.end() + else: + depth -= 1 + pos = next_close.end() + return pos if depth == 0 else -1 diff --git a/src/feeds/transcript_extractors/rss_native.py b/src/feeds/transcript_extractors/rss_native.py new file mode 100644 index 0000000..ddaa7cb --- /dev/null +++ b/src/feeds/transcript_extractors/rss_native.py @@ -0,0 +1,182 @@ +"""Stufe 1: Podcasting-2.0-Tag im Feed-Entry. + +Wenn der Podcast-Herausgeber den offenen Podcasting-2.0-Standard nutzt, +liegt im Feed-Entry ein oder mehrere -Tags mit Link +zu SRT/VTT/HTML/JSON. Das ist die zuverlaessigste Quelle ueberhaupt und +verursacht nur einen HTTP-Request. +""" +from __future__ import annotations + +import logging +import re +from typing import Optional + +import httpx + +from . import TranscriptResult + +logger = logging.getLogger("osint.podcast.extractors.rss_native") + + +# Reihenfolge der akzeptierten Formate (mehr Struktur bevorzugt) +_PREFERRED_MIME = ["application/json", "text/vtt", "application/x-subrip", "text/srt", "text/html", "text/plain"] + + +def can_handle(feed_entry: dict, feed_url: str) -> bool: + """Greift immer, wenn feedparser einen podcast:transcript-Link erkannt hat.""" + return bool(_find_transcript_links(feed_entry)) + + +async def fetch(feed_entry: dict, feed_url: str) -> Optional[TranscriptResult]: + links = _find_transcript_links(feed_entry) + if not links: + return None + + # Bestes Format auswaehlen (nach _PREFERRED_MIME) + links_sorted = sorted( + links, + key=lambda l: _PREFERRED_MIME.index(l.get("type", "")) if l.get("type") in _PREFERRED_MIME else 99, + ) + + async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client: + for link in links_sorted: + url = link.get("url") + if not url: + continue + try: + resp = await client.get(url, headers={"User-Agent": "OSINT-Monitor/1.0 (Podcast-Transcript)"}) + resp.raise_for_status() + raw = resp.text + mime = (link.get("type") or "").lower() + text, segments = _parse_by_mime(raw, mime) + if text and text.strip(): + return TranscriptResult(text=text.strip(), source="rss_native", segments=segments) + except Exception as e: + logger.debug(f"Link {url} fehlgeschlagen: {e}") + continue + return None + + +def _find_transcript_links(feed_entry: dict) -> list[dict]: + """Findet -Angaben im feedparser-Entry. + + feedparser bildet Namespace-Tags als Dicts mit 'url' und 'type' ab + (z. B. entry.podcast_transcript oder entry['podcast_transcript']). + Je nach feedparser-Version kann das ein einzelnes Dict oder eine Liste sein. + """ + candidates = [] + for key in ("podcast_transcript", "podcast_transcripts", "transcripts"): + val = feed_entry.get(key) if isinstance(feed_entry, dict) else getattr(feed_entry, key, None) + if not val: + continue + if isinstance(val, list): + candidates.extend([v for v in val if isinstance(v, dict)]) + elif isinstance(val, dict): + candidates.append(val) + + # Zusaetzlich: manche Feeds schreiben die Tags ins links-Array mit rel="transcript" + links = feed_entry.get("links") if isinstance(feed_entry, dict) else getattr(feed_entry, "links", None) or [] + for link in links or []: + if isinstance(link, dict) and link.get("rel") == "transcript" and link.get("href"): + candidates.append({"url": link["href"], "type": link.get("type", "")}) + + return candidates + + +def _parse_by_mime(raw: str, mime: str) -> tuple[str, Optional[list]]: + """Extrahiert Plaintext und (wenn moeglich) Segmente nach MIME-Typ.""" + if "json" in mime: + return _parse_json(raw) + if "vtt" in mime: + return _parse_vtt(raw) + if "subrip" in mime or "srt" in mime: + return _parse_srt(raw) + if "html" in mime: + return _parse_html(raw), None + # Fallback: Plaintext + return raw, None + + +def _parse_json(raw: str) -> tuple[str, Optional[list]]: + """Podcasting-2.0 JSON-Transcript-Format.""" + import json + try: + data = json.loads(raw) + segments_raw = data.get("segments", []) + texts = [] + segments = [] + for seg in segments_raw: + body = seg.get("body", "").strip() + if body: + texts.append(body) + segments.append({ + "start": seg.get("startTime"), + "end": seg.get("endTime"), + "text": body, + }) + return "\n".join(texts), segments or None + except Exception: + return "", None + + +def _parse_vtt(raw: str) -> tuple[str, Optional[list]]: + """WebVTT-Parser (ohne externe Abhaengigkeiten).""" + lines = raw.splitlines() + blocks = [] + current = [] + time_re = re.compile(r"(\d{2}:)?(\d{2}):(\d{2})\.(\d{3})\s*-->\s*(\d{2}:)?(\d{2}):(\d{2})\.(\d{3})") + + def finalize_block(block: list) -> Optional[dict]: + if len(block) < 2: + return None + time_line = next((l for l in block if time_re.search(l)), None) + text_lines = [l for l in block if not time_re.search(l) and l.strip() and not l.strip().isdigit()] + if not time_line or not text_lines: + return None + m = time_re.search(time_line) + start = _time_to_sec(m.group(1), m.group(2), m.group(3), m.group(4)) + end = _time_to_sec(m.group(5), m.group(6), m.group(7), m.group(8)) + return {"start": start, "end": end, "text": " ".join(text_lines).strip()} + + for line in lines: + if line.strip() == "": + b = finalize_block(current) + if b: + blocks.append(b) + current = [] + else: + current.append(line) + b = finalize_block(current) + if b: + blocks.append(b) + + text = " ".join(b["text"] for b in blocks) + return text, blocks or None + + +def _parse_srt(raw: str) -> tuple[str, Optional[list]]: + """SubRip-Parser (Timecodes mit Komma statt Punkt).""" + return _parse_vtt(raw.replace(",", ".")) + + +def _parse_html(raw: str) -> str: + """HTML → Plaintext. Entfernt Tags simpel via Regex (genuegt fuer Transcript-HTML).""" + no_tags = re.sub(r"", " ", raw, flags=re.DOTALL | re.IGNORECASE) + no_tags = re.sub(r"", " ", no_tags, flags=re.DOTALL | re.IGNORECASE) + no_tags = re.sub(r"<[^>]+>", " ", no_tags) + # HTML-Entitys grob zuruecksetzen + no_tags = (no_tags + .replace(" ", " ") + .replace("&", "&") + .replace(""", '"') + .replace("'", "'") + .replace("<", "<") + .replace(">", ">")) + no_tags = re.sub(r"\s+", " ", no_tags) + return no_tags.strip() + + +def _time_to_sec(h: Optional[str], m: str, s: str, ms: str) -> float: + """Konvertiert VTT-Timecode in Sekunden.""" + hours = int(h.rstrip(":")) if h else 0 + return hours * 3600 + int(m) * 60 + int(s) + int(ms) / 1000.0 diff --git a/src/feeds/transcript_extractors/website_dlf.py b/src/feeds/transcript_extractors/website_dlf.py new file mode 100644 index 0000000..26971d0 --- /dev/null +++ b/src/feeds/transcript_extractors/website_dlf.py @@ -0,0 +1,61 @@ +"""Deutschlandfunk: Manuskripte auf den Sender-Websites. + +Domains: + - deutschlandfunk.de + - deutschlandfunkkultur.de + - deutschlandfunknova.de + +Dlf-Artikel-HTML enthaelt den Manuskript-Text typischerweise in +
...
mit vielen

-Absaetzen +oder als

. Als Fallback greift der generische +Longest-Article-Block-Extraktor. +""" +from __future__ import annotations + +import logging +from typing import Optional + +from . import TranscriptResult +from ._common import ( + episode_url, + extract_longest_article_block, + extract_text_by_container, + fetch_html, + matches_domain, +) + +logger = logging.getLogger("osint.podcast.extractors.dlf") + +_DOMAINS = ( + "deutschlandfunk.de", + "deutschlandfunkkultur.de", + "deutschlandfunknova.de", +) + +_CONTAINER_PATTERNS = [ + r']*class="[^"]*b-article[^"]*"[^>]*>', + r']*class="[^"]*b-text[^"]*"[^>]*>', + r']*>', + r']*>', +] + + +def can_handle(feed_entry: dict, feed_url: str) -> bool: + url = episode_url(feed_entry) or feed_url + return matches_domain(url, _DOMAINS) or matches_domain(feed_url, _DOMAINS) + + +async def fetch(feed_entry: dict, feed_url: str) -> Optional[TranscriptResult]: + url = episode_url(feed_entry) + if not url: + return None + html = await fetch_html(url) + if not html: + return None + + text = extract_text_by_container(html, _CONTAINER_PATTERNS) + if not text: + text = extract_longest_article_block(html) + if not text: + return None + return TranscriptResult(text=text, source="website_scrape") diff --git a/src/feeds/transcript_extractors/website_ndr.py b/src/feeds/transcript_extractors/website_ndr.py new file mode 100644 index 0000000..f2b36b6 --- /dev/null +++ b/src/feeds/transcript_extractors/website_ndr.py @@ -0,0 +1,51 @@ +"""Norddeutscher Rundfunk: Manuskripte auf ndr.de. + +NDR-Sendungen (insbesondere NDR Info „Streitkraefte und Strategien") stellen +Manuskripte auf der Episodenseite bereit, typischerweise in +
oder
. +""" +from __future__ import annotations + +import logging +from typing import Optional + +from . import TranscriptResult +from ._common import ( + episode_url, + extract_longest_article_block, + extract_text_by_container, + fetch_html, + matches_domain, +) + +logger = logging.getLogger("osint.podcast.extractors.ndr") + +_DOMAINS = ("ndr.de",) + +_CONTAINER_PATTERNS = [ + r']*class="[^"]*article[^"]*"[^>]*>', + r']*id="mainContent"[^>]*>', + r']*>', + r']*>', +] + + +def can_handle(feed_entry: dict, feed_url: str) -> bool: + url = episode_url(feed_entry) or feed_url + return matches_domain(url, _DOMAINS) or matches_domain(feed_url, _DOMAINS) + + +async def fetch(feed_entry: dict, feed_url: str) -> Optional[TranscriptResult]: + url = episode_url(feed_entry) + if not url: + return None + html = await fetch_html(url) + if not html: + return None + + text = extract_text_by_container(html, _CONTAINER_PATTERNS) + if not text: + text = extract_longest_article_block(html) + if not text: + return None + return TranscriptResult(text=text, source="website_scrape") diff --git a/src/feeds/transcript_extractors/website_spiegel.py b/src/feeds/transcript_extractors/website_spiegel.py new file mode 100644 index 0000000..ddce6c3 --- /dev/null +++ b/src/feeds/transcript_extractors/website_spiegel.py @@ -0,0 +1,51 @@ +"""Der Spiegel: Manuskripte auf spiegel.de. + +SPIEGEL-Artikel haben typischerweise einen
-Container. +SPIEGEL+-Artikel liefern ohne Login nur Teaser — der Length-Check in _common +sorgt dafuer, dass solche Teaser verworfen werden und die Kaskade weiterlaeuft. +""" +from __future__ import annotations + +import logging +from typing import Optional + +from . import TranscriptResult +from ._common import ( + episode_url, + extract_longest_article_block, + extract_text_by_container, + fetch_html, + matches_domain, +) + +logger = logging.getLogger("osint.podcast.extractors.spiegel") + +_DOMAINS = ("spiegel.de", "manager-magazin.de") + +_CONTAINER_PATTERNS = [ + r']*data-area="article"[^>]*>', + r']*data-article-el[^>]*>', + r']*>', + r']*>', +] + + +def can_handle(feed_entry: dict, feed_url: str) -> bool: + url = episode_url(feed_entry) or feed_url + return matches_domain(url, _DOMAINS) or matches_domain(feed_url, _DOMAINS) + + +async def fetch(feed_entry: dict, feed_url: str) -> Optional[TranscriptResult]: + url = episode_url(feed_entry) + if not url: + return None + html = await fetch_html(url) + if not html: + return None + + text = extract_text_by_container(html, _CONTAINER_PATTERNS) + if not text: + text = extract_longest_article_block(html) + if not text: + return None + return TranscriptResult(text=text, source="website_scrape") diff --git a/src/feeds/transcript_extractors/website_sz.py b/src/feeds/transcript_extractors/website_sz.py new file mode 100644 index 0000000..6461f8f --- /dev/null +++ b/src/feeds/transcript_extractors/website_sz.py @@ -0,0 +1,53 @@ +"""Sueddeutsche Zeitung: Manuskripte auf sz.de. + +Achtung: Viele SZ-Artikel sind hinter Paywall (SZ Plus). Der Scraper holt +den Inhalt, der ohne Login ausgeliefert wird. Ist nur ein Teaser vorhanden, +ist der Text-Length-Check in _common.MIN_TRANSCRIPT_LEN die Schutzschicht: +kurze Teaser werden verworfen, und der Aufrufer faellt auf die naechste +Kaskaden-Stufe (z. B. YouTube) zurueck — ohne Fehler. +""" +from __future__ import annotations + +import logging +from typing import Optional + +from . import TranscriptResult +from ._common import ( + episode_url, + extract_longest_article_block, + extract_text_by_container, + fetch_html, + matches_domain, +) + +logger = logging.getLogger("osint.podcast.extractors.sz") + +_DOMAINS = ("sz.de", "sueddeutsche.de") + +_CONTAINER_PATTERNS = [ + r']*class="[^"]*article-body[^"]*"[^>]*>', + r']*id="article-app-container"[^>]*>', + r']*>', + r']*>', +] + + +def can_handle(feed_entry: dict, feed_url: str) -> bool: + url = episode_url(feed_entry) or feed_url + return matches_domain(url, _DOMAINS) or matches_domain(feed_url, _DOMAINS) + + +async def fetch(feed_entry: dict, feed_url: str) -> Optional[TranscriptResult]: + url = episode_url(feed_entry) + if not url: + return None + html = await fetch_html(url) + if not html: + return None + + text = extract_text_by_container(html, _CONTAINER_PATTERNS) + if not text: + text = extract_longest_article_block(html) + if not text: + return None + return TranscriptResult(text=text, source="website_scrape") diff --git a/src/models.py b/src/models.py index 9ec638b..708a4e2 100644 --- a/src/models.py +++ b/src/models.py @@ -108,7 +108,7 @@ class SourceCreate(BaseModel): name: str = Field(min_length=1, max_length=200) url: Optional[str] = None domain: Optional[str] = None - source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded|telegram_channel)$") + source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$") category: str = Field(default="sonstige", pattern="^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$") status: str = Field(default="active", pattern="^(active|inactive)$") notes: Optional[str] = None @@ -118,7 +118,7 @@ class SourceUpdate(BaseModel): name: Optional[str] = Field(default=None, max_length=200) url: Optional[str] = None domain: Optional[str] = None - source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded|telegram_channel)$") + source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$") category: Optional[str] = Field(default=None, pattern="^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$") status: Optional[str] = Field(default=None, pattern="^(active|inactive)$") notes: Optional[str] = None diff --git a/src/source_rules.py b/src/source_rules.py index d9cb508..17b30ef 100644 --- a/src/source_rules.py +++ b/src/source_rules.py @@ -637,8 +637,12 @@ def _fallback_all_feeds(domain: str, feeds: list[dict]) -> list[dict]: ] -async def get_feeds_with_metadata(tenant_id: int = None) -> list[dict]: - """Alle aktiven RSS-Feeds mit Metadaten fuer Claude-Selektion (global + org-spezifisch).""" +async def get_feeds_with_metadata(tenant_id: int = None, source_type: str = "rss_feed") -> list[dict]: + """Aktive Feeds eines bestimmten Typs mit Metadaten fuer Claude-Selektion (global + org-spezifisch). + + source_type: "rss_feed" (Default) oder "podcast_feed" — trennt RSS- und Podcast-Quellen + in getrennten Pipelines, damit der RSS-Heisspfad unveraendert bleibt. + """ from database import get_db db = await get_db() @@ -646,18 +650,19 @@ async def get_feeds_with_metadata(tenant_id: int = None) -> list[dict]: if tenant_id: cursor = await db.execute( "SELECT name, url, domain, category, COALESCE(article_count, 0) AS article_count FROM sources " - "WHERE source_type = 'rss_feed' AND status = 'active' " + "WHERE source_type = ? AND status = 'active' " "AND (tenant_id IS NULL OR tenant_id = ?)", - (tenant_id,), + (source_type, tenant_id), ) else: cursor = await db.execute( "SELECT name, url, domain, category, COALESCE(article_count, 0) AS article_count FROM sources " - "WHERE source_type = 'rss_feed' AND status = 'active'" + "WHERE source_type = ? AND status = 'active'", + (source_type,), ) return [dict(row) for row in await cursor.fetchall()] except Exception as e: - logger.error(f"Fehler beim Laden der Feed-Metadaten: {e}") + logger.error(f"Fehler beim Laden der Feed-Metadaten ({source_type}): {e}") return [] finally: await db.close()