"""Podcast-Feed-Parser: wie RSSParser, nur mit Transkript-Kaskade. Aufbau bewusst copy-light zu rss_parser.py: dieselbe oeffentliche Signatur `search_feeds_selective()`, eigener Code-Pfad mit Pre-Filter und anschliessender Transkript-Kaskade via `transcript_extractors`. Vorgaben des Plans: - Keine kostenpflichtige API, keine lokale Transkription - Episoden ohne auffindbares Transkript werden verworfen - content_original wird NICHT auf 1000 Zeichen gekuerzt (Transkript-Volltext) - Duplikate-Schutz zwischen Lagen ueber Cache-Tabelle podcast_transcripts """ from __future__ import annotations import asyncio import logging from datetime import datetime, timezone import feedparser import httpx from config import TIMEZONE, MAX_ARTICLES_PER_DOMAIN_RSS from source_rules import _extract_domain from feeds.transcript_extractors import fetch_transcript logger = logging.getLogger("osint.podcast") class PodcastFeedParser: """Durchsucht Podcast-Feeds nach relevanten Episoden (mit Transkript).""" STOP_WORDS = { "und", "oder", "der", "die", "das", "ein", "eine", "in", "im", "am", "an", "auf", "für", "mit", "von", "zu", "zum", "zur", "bei", "nach", "vor", "über", "unter", "ist", "sind", "hat", "the", "and", "for", "with", "from", } # Pre-Filter: wie im RSSParser — mindestens Haelfte der Keywords, max 2 notwendig @staticmethod def _prefilter_match(title: str, summary: str, keywords: list[str]) -> tuple[bool, float]: text = f"{title} {summary}".lower() if not keywords: return True, 0.0 min_matches = min(2, max(1, (len(keywords) + 1) // 2)) match_count = sum(1 for kw in keywords if kw and kw in text) if match_count >= min_matches: return True, match_count / len(keywords) return False, 0.0 async def search_feeds_selective( self, search_term: str, selected_feeds: list[dict], keywords: list[str] | None = None, ) -> list[dict]: """Durchsucht die uebergebenen Podcast-Feeds nach relevanten Episoden. Signatur bewusst identisch zu RSSParser.search_feeds_selective, damit die Orchestrator-Logik analog aufgebaut werden kann. """ if not selected_feeds: return [] if keywords: search_words = [w.lower().strip() for w in keywords if w.strip()] else: search_words = [w.lower() for w in search_term.split() if len(w) > 2 and w.lower() not in self.STOP_WORDS] search_words = self._clean_search_words(search_words) if not search_words: return [] # Feeds parallel abfragen tasks = [self._fetch_feed(feed, search_words) for feed in selected_feeds] results = await asyncio.gather(*tasks, return_exceptions=True) all_articles: list[dict] = [] for feed, r in zip(selected_feeds, results): if isinstance(r, Exception): logger.debug(f"Podcast-Feed {feed.get('name')} fehlgeschlagen: {r}") continue all_articles.extend(r) all_articles = self._apply_domain_cap(all_articles) logger.info(f"Podcast-Parser: {len(all_articles)} Episoden mit Transkript gefunden") return all_articles @staticmethod def _clean_search_words(words: list[str]) -> list[str]: cleaned = [w for w in words if not w.isdigit()] return cleaned if cleaned else words async def _fetch_feed(self, feed_config: dict, search_words: list[str]) -> list[dict]: """Einzelnen Podcast-Feed abrufen, Pre-Filter + Transkript-Kaskade.""" name = feed_config["name"] url = feed_config["url"] articles: list[dict] = [] try: async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: response = await client.get(url, headers={"User-Agent": "OSINT-Monitor/1.0 (Podcast Aggregator)"}) response.raise_for_status() feed = await asyncio.to_thread(feedparser.parse, response.text) except Exception as e: logger.debug(f"Podcast-Feed {name} ({url}): {e}") return articles # Pro Feed maximal die 20 neuesten Episoden betrachten. # Podcasts veroeffentlichen seltener als RSS-Feeds; 20 reicht fuer # einen mehrmonatigen Rueckblick und begrenzt den Scrape-Aufwand. entries = list(feed.entries[:20]) # Kandidaten nach Pre-Filter sammeln (keine Transkript-Abfrage dafuer). candidates = [] for entry in entries: title = entry.get("title", "") summary = entry.get("summary", "") or entry.get("description", "") passed, score = self._prefilter_match(title, summary, search_words) if passed: candidates.append((entry, title, summary, score)) if not candidates: return articles # Transkript-Kaskade parallel nur fuer die Kandidaten transcript_tasks = [fetch_transcript(e, url, e.get("link")) for e, _t, _s, _r in candidates] transcript_results = await asyncio.gather(*transcript_tasks, return_exceptions=True) for (entry, title, summary, score), t_result in zip(candidates, transcript_results): if isinstance(t_result, Exception): logger.debug(f"Transkript-Kaskade fuer {entry.get('link')}: {t_result}") continue if not t_result or not t_result.text: # Ohne Transkript keine Uebernahme (Plan-Vorgabe) continue # Nach-Transkript-Filter: wenn der Pre-Filter nur knapp griff, # muss das Transkript die Keywords ebenfalls enthalten — sonst ist # die Episode nicht wirklich relevant (Shownotes-Zufallstreffer). if not self._transcript_confirms(t_result.text, search_words): continue published = None if hasattr(entry, "published_parsed") and entry.published_parsed: try: published = datetime(*entry.published_parsed[:6], tzinfo=timezone.utc).astimezone(TIMEZONE).isoformat() except (TypeError, ValueError): pass # WICHTIG: Transkript-Volltext, KEINE 1000-Zeichen-Kuerzung wie bei RSS. articles.append({ "headline": title, "headline_de": title, "source": name, "source_url": entry.get("link", ""), "content_original": t_result.text, "content_de": t_result.text, "language": "de", "published_at": published, "relevance_score": score, }) return articles @staticmethod def _transcript_confirms(transcript: str, keywords: list[str]) -> bool: """Prueft, dass mind. ein Keyword auch im Transkript vorkommt.""" if not keywords: return True text = transcript.lower() return any(kw in text for kw in keywords if kw) def _apply_domain_cap(self, articles: list[dict]) -> list[dict]: """Begrenzt die Anzahl der Episoden pro Domain (analog RSSParser).""" if not articles: return articles by_domain: dict[str, list[dict]] = {} for a in articles: dom = _extract_domain(a.get("source_url", "")) or "_unknown" by_domain.setdefault(dom, []).append(a) out: list[dict] = [] for dom, items in by_domain.items(): items.sort(key=lambda x: x.get("relevance_score", 0.0), reverse=True) out.extend(items[:MAX_ARTICLES_PER_DOMAIN_RSS]) return out