Podcast-Integration Phase 1: Feed-Tag + Senderseiten
Podcasts werden wie normale RSS-Quellen behandelt (source_type=podcast_feed).
Kein externer bezahlter Dienst, keine lokale Transkription — Monitor nutzt
ausschliesslich vorhandene Transkripte.
Kaskade fuer Transkript-Bezug:
1. Podcasting-2.0-Tag <podcast:transcript> im Feed (SRT/VTT/HTML/JSON)
2. Redaktionelles Manuskript auf der Episodenseite
(Adapter: Dlf, SZ, Spiegel, NDR)
3. YouTube-Captions — Phase 2, optional per yt-dlp
Kein Stufen-Treffer -> Episode verworfen (graceful, kein Error).
Neu:
- src/feeds/podcast_parser.py (eigener Parser, RSS-Heisspfad unveraendert)
- src/feeds/transcript_extractors/ (Plugin-Muster):
__init__.py Dispatcher, Cache-Lookup gegen podcast_transcripts
_common.py HTML-Extraktion, Domain-Matching, httpx-Helper
rss_native.py Stufe 1: Feed-Tag-Parser (SRT/VTT/JSON/HTML)
website_dlf.py Stufe 2: deutschlandfunk.de + Schwester-Domains
website_sz.py Stufe 2: sz.de / sueddeutsche.de
website_spiegel.py Stufe 2: spiegel.de / manager-magazin.de
website_ndr.py Stufe 2: ndr.de
Geaendert:
- src/database.py: idempotente Migration, Tabelle podcast_transcripts als
URL-Cache gegen Mehrfach-Scrape zwischen Lagen
- src/models.py: Pydantic-Pattern von source_type um podcast_feed erweitert
- src/source_rules.py: get_feeds_with_metadata() nimmt source_type-Parameter,
Default rss_feed (RSS-Pfad unveraendert)
- src/agents/orchestrator.py: neue _podcast_pipeline() parallel zu RSS,
WebSearch und Telegram; nur fuer adhoc-Lagen; ohne Podcast-Quellen dormant
Verifikation:
- Migration auf Live-DB erfolgreich (Log: Tabelle podcast_transcripts angelegt)
- Import-/Instanziierungs-Test aller Module bestanden
- can_handle-Tests pro Sender-Adapter positiv + negativ OK
- Live-Scrape gegen Dlf: 22710 Zeichen, gegen SZ: 24918 Zeichen
- Dormant-Test: 0 Podcast-Quellen -> keine neue Codezeile im Refresh
Verwerfbarkeit: rein additiv, RSS-Pfad unberuehrt, Rollback in drei
Schritten (Quellen disablen, git revert, DROP TABLE podcast_transcripts).
Dieser Commit ist enthalten in:
184
src/feeds/podcast_parser.py
Normale Datei
184
src/feeds/podcast_parser.py
Normale Datei
@@ -0,0 +1,184 @@
|
||||
"""Podcast-Feed-Parser: wie RSSParser, nur mit Transkript-Kaskade.
|
||||
|
||||
Aufbau bewusst copy-light zu rss_parser.py: dieselbe oeffentliche
|
||||
Signatur `search_feeds_selective()`, eigener Code-Pfad mit Pre-Filter und
|
||||
anschliessender Transkript-Kaskade via `transcript_extractors`.
|
||||
|
||||
Vorgaben des Plans:
|
||||
- Keine kostenpflichtige API, keine lokale Transkription
|
||||
- Episoden ohne auffindbares Transkript werden verworfen
|
||||
- content_original wird NICHT auf 1000 Zeichen gekuerzt (Transkript-Volltext)
|
||||
- Duplikate-Schutz zwischen Lagen ueber Cache-Tabelle podcast_transcripts
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import feedparser
|
||||
import httpx
|
||||
|
||||
from config import TIMEZONE, MAX_ARTICLES_PER_DOMAIN_RSS
|
||||
from source_rules import _extract_domain
|
||||
from feeds.transcript_extractors import fetch_transcript
|
||||
|
||||
logger = logging.getLogger("osint.podcast")
|
||||
|
||||
|
||||
class PodcastFeedParser:
|
||||
"""Durchsucht Podcast-Feeds nach relevanten Episoden (mit Transkript)."""
|
||||
|
||||
STOP_WORDS = {
|
||||
"und", "oder", "der", "die", "das", "ein", "eine", "in", "im", "am", "an",
|
||||
"auf", "für", "mit", "von", "zu", "zum", "zur", "bei", "nach", "vor",
|
||||
"über", "unter", "ist", "sind", "hat", "the", "and", "for", "with", "from",
|
||||
}
|
||||
|
||||
# Pre-Filter: wie im RSSParser — mindestens Haelfte der Keywords, max 2 notwendig
|
||||
@staticmethod
|
||||
def _prefilter_match(title: str, summary: str, keywords: list[str]) -> tuple[bool, float]:
|
||||
text = f"{title} {summary}".lower()
|
||||
if not keywords:
|
||||
return True, 0.0
|
||||
min_matches = min(2, max(1, (len(keywords) + 1) // 2))
|
||||
match_count = sum(1 for kw in keywords if kw and kw in text)
|
||||
if match_count >= min_matches:
|
||||
return True, match_count / len(keywords)
|
||||
return False, 0.0
|
||||
|
||||
async def search_feeds_selective(
|
||||
self,
|
||||
search_term: str,
|
||||
selected_feeds: list[dict],
|
||||
keywords: list[str] | None = None,
|
||||
) -> list[dict]:
|
||||
"""Durchsucht die uebergebenen Podcast-Feeds nach relevanten Episoden.
|
||||
|
||||
Signatur bewusst identisch zu RSSParser.search_feeds_selective, damit
|
||||
die Orchestrator-Logik analog aufgebaut werden kann.
|
||||
"""
|
||||
if not selected_feeds:
|
||||
return []
|
||||
|
||||
if keywords:
|
||||
search_words = [w.lower().strip() for w in keywords if w.strip()]
|
||||
else:
|
||||
search_words = [w.lower() for w in search_term.split() if len(w) > 2 and w.lower() not in self.STOP_WORDS]
|
||||
search_words = self._clean_search_words(search_words)
|
||||
if not search_words:
|
||||
return []
|
||||
|
||||
# Feeds parallel abfragen
|
||||
tasks = [self._fetch_feed(feed, search_words) for feed in selected_feeds]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
all_articles: list[dict] = []
|
||||
for feed, r in zip(selected_feeds, results):
|
||||
if isinstance(r, Exception):
|
||||
logger.debug(f"Podcast-Feed {feed.get('name')} fehlgeschlagen: {r}")
|
||||
continue
|
||||
all_articles.extend(r)
|
||||
|
||||
all_articles = self._apply_domain_cap(all_articles)
|
||||
logger.info(f"Podcast-Parser: {len(all_articles)} Episoden mit Transkript gefunden")
|
||||
return all_articles
|
||||
|
||||
@staticmethod
|
||||
def _clean_search_words(words: list[str]) -> list[str]:
|
||||
cleaned = [w for w in words if not w.isdigit()]
|
||||
return cleaned if cleaned else words
|
||||
|
||||
async def _fetch_feed(self, feed_config: dict, search_words: list[str]) -> list[dict]:
|
||||
"""Einzelnen Podcast-Feed abrufen, Pre-Filter + Transkript-Kaskade."""
|
||||
name = feed_config["name"]
|
||||
url = feed_config["url"]
|
||||
articles: list[dict] = []
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
|
||||
response = await client.get(url, headers={"User-Agent": "OSINT-Monitor/1.0 (Podcast Aggregator)"})
|
||||
response.raise_for_status()
|
||||
feed = await asyncio.to_thread(feedparser.parse, response.text)
|
||||
except Exception as e:
|
||||
logger.debug(f"Podcast-Feed {name} ({url}): {e}")
|
||||
return articles
|
||||
|
||||
# Pro Feed maximal die 20 neuesten Episoden betrachten.
|
||||
# Podcasts veroeffentlichen seltener als RSS-Feeds; 20 reicht fuer
|
||||
# einen mehrmonatigen Rueckblick und begrenzt den Scrape-Aufwand.
|
||||
entries = list(feed.entries[:20])
|
||||
|
||||
# Kandidaten nach Pre-Filter sammeln (keine Transkript-Abfrage dafuer).
|
||||
candidates = []
|
||||
for entry in entries:
|
||||
title = entry.get("title", "")
|
||||
summary = entry.get("summary", "") or entry.get("description", "")
|
||||
passed, score = self._prefilter_match(title, summary, search_words)
|
||||
if passed:
|
||||
candidates.append((entry, title, summary, score))
|
||||
|
||||
if not candidates:
|
||||
return articles
|
||||
|
||||
# Transkript-Kaskade parallel nur fuer die Kandidaten
|
||||
transcript_tasks = [fetch_transcript(e, url, e.get("link")) for e, _t, _s, _r in candidates]
|
||||
transcript_results = await asyncio.gather(*transcript_tasks, return_exceptions=True)
|
||||
|
||||
for (entry, title, summary, score), t_result in zip(candidates, transcript_results):
|
||||
if isinstance(t_result, Exception):
|
||||
logger.debug(f"Transkript-Kaskade fuer {entry.get('link')}: {t_result}")
|
||||
continue
|
||||
if not t_result or not t_result.text:
|
||||
# Ohne Transkript keine Uebernahme (Plan-Vorgabe)
|
||||
continue
|
||||
|
||||
# Nach-Transkript-Filter: wenn der Pre-Filter nur knapp griff,
|
||||
# muss das Transkript die Keywords ebenfalls enthalten — sonst ist
|
||||
# die Episode nicht wirklich relevant (Shownotes-Zufallstreffer).
|
||||
if not self._transcript_confirms(t_result.text, search_words):
|
||||
continue
|
||||
|
||||
published = None
|
||||
if hasattr(entry, "published_parsed") and entry.published_parsed:
|
||||
try:
|
||||
published = datetime(*entry.published_parsed[:6], tzinfo=timezone.utc).astimezone(TIMEZONE).isoformat()
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
# WICHTIG: Transkript-Volltext, KEINE 1000-Zeichen-Kuerzung wie bei RSS.
|
||||
articles.append({
|
||||
"headline": title,
|
||||
"headline_de": title,
|
||||
"source": name,
|
||||
"source_url": entry.get("link", ""),
|
||||
"content_original": t_result.text,
|
||||
"content_de": t_result.text,
|
||||
"language": "de",
|
||||
"published_at": published,
|
||||
"relevance_score": score,
|
||||
})
|
||||
|
||||
return articles
|
||||
|
||||
@staticmethod
|
||||
def _transcript_confirms(transcript: str, keywords: list[str]) -> bool:
|
||||
"""Prueft, dass mind. ein Keyword auch im Transkript vorkommt."""
|
||||
if not keywords:
|
||||
return True
|
||||
text = transcript.lower()
|
||||
return any(kw in text for kw in keywords if kw)
|
||||
|
||||
def _apply_domain_cap(self, articles: list[dict]) -> list[dict]:
|
||||
"""Begrenzt die Anzahl der Episoden pro Domain (analog RSSParser)."""
|
||||
if not articles:
|
||||
return articles
|
||||
by_domain: dict[str, list[dict]] = {}
|
||||
for a in articles:
|
||||
dom = _extract_domain(a.get("source_url", "")) or "_unknown"
|
||||
by_domain.setdefault(dom, []).append(a)
|
||||
out: list[dict] = []
|
||||
for dom, items in by_domain.items():
|
||||
items.sort(key=lambda x: x.get("relevance_score", 0.0), reverse=True)
|
||||
out.extend(items[:MAX_ARTICLES_PER_DOMAIN_RSS])
|
||||
return out
|
||||
In neuem Issue referenzieren
Einen Benutzer sperren