"""Stufe 1: Podcasting-2.0-Tag im Feed-Entry. Wenn der Podcast-Herausgeber den offenen Podcasting-2.0-Standard nutzt, liegt im Feed-Entry ein oder mehrere -Tags mit Link zu SRT/VTT/HTML/JSON. Das ist die zuverlaessigste Quelle ueberhaupt und verursacht nur einen HTTP-Request. """ from __future__ import annotations import logging import re from typing import Optional import httpx from . import TranscriptResult logger = logging.getLogger("osint.podcast.extractors.rss_native") # Reihenfolge der akzeptierten Formate (mehr Struktur bevorzugt) _PREFERRED_MIME = ["application/json", "text/vtt", "application/x-subrip", "text/srt", "text/html", "text/plain"] def can_handle(feed_entry: dict, feed_url: str) -> bool: """Greift immer, wenn feedparser einen podcast:transcript-Link erkannt hat.""" return bool(_find_transcript_links(feed_entry)) async def fetch(feed_entry: dict, feed_url: str) -> Optional[TranscriptResult]: links = _find_transcript_links(feed_entry) if not links: return None # Bestes Format auswaehlen (nach _PREFERRED_MIME) links_sorted = sorted( links, key=lambda l: _PREFERRED_MIME.index(l.get("type", "")) if l.get("type") in _PREFERRED_MIME else 99, ) async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client: for link in links_sorted: url = link.get("url") if not url: continue try: resp = await client.get(url, headers={"User-Agent": "OSINT-Monitor/1.0 (Podcast-Transcript)"}) resp.raise_for_status() raw = resp.text mime = (link.get("type") or "").lower() text, segments = _parse_by_mime(raw, mime) if text and text.strip(): return TranscriptResult(text=text.strip(), source="rss_native", segments=segments) except Exception as e: logger.debug(f"Link {url} fehlgeschlagen: {e}") continue return None def _find_transcript_links(feed_entry: dict) -> list[dict]: """Findet -Angaben im feedparser-Entry. feedparser bildet Namespace-Tags als Dicts mit 'url' und 'type' ab (z. B. entry.podcast_transcript oder entry['podcast_transcript']). Je nach feedparser-Version kann das ein einzelnes Dict oder eine Liste sein. """ candidates = [] for key in ("podcast_transcript", "podcast_transcripts", "transcripts"): val = feed_entry.get(key) if isinstance(feed_entry, dict) else getattr(feed_entry, key, None) if not val: continue if isinstance(val, list): candidates.extend([v for v in val if isinstance(v, dict)]) elif isinstance(val, dict): candidates.append(val) # Zusaetzlich: manche Feeds schreiben die Tags ins links-Array mit rel="transcript" links = feed_entry.get("links") if isinstance(feed_entry, dict) else getattr(feed_entry, "links", None) or [] for link in links or []: if isinstance(link, dict) and link.get("rel") == "transcript" and link.get("href"): candidates.append({"url": link["href"], "type": link.get("type", "")}) return candidates def _parse_by_mime(raw: str, mime: str) -> tuple[str, Optional[list]]: """Extrahiert Plaintext und (wenn moeglich) Segmente nach MIME-Typ.""" if "json" in mime: return _parse_json(raw) if "vtt" in mime: return _parse_vtt(raw) if "subrip" in mime or "srt" in mime: return _parse_srt(raw) if "html" in mime: return _parse_html(raw), None # Fallback: Plaintext return raw, None def _parse_json(raw: str) -> tuple[str, Optional[list]]: """Podcasting-2.0 JSON-Transcript-Format.""" import json try: data = json.loads(raw) segments_raw = data.get("segments", []) texts = [] segments = [] for seg in segments_raw: body = seg.get("body", "").strip() if body: texts.append(body) segments.append({ "start": seg.get("startTime"), "end": seg.get("endTime"), "text": body, }) return "\n".join(texts), segments or None except Exception: return "", None def _parse_vtt(raw: str) -> tuple[str, Optional[list]]: """WebVTT-Parser (ohne externe Abhaengigkeiten).""" lines = raw.splitlines() blocks = [] current = [] time_re = re.compile(r"(\d{2}:)?(\d{2}):(\d{2})\.(\d{3})\s*-->\s*(\d{2}:)?(\d{2}):(\d{2})\.(\d{3})") def finalize_block(block: list) -> Optional[dict]: if len(block) < 2: return None time_line = next((l for l in block if time_re.search(l)), None) text_lines = [l for l in block if not time_re.search(l) and l.strip() and not l.strip().isdigit()] if not time_line or not text_lines: return None m = time_re.search(time_line) start = _time_to_sec(m.group(1), m.group(2), m.group(3), m.group(4)) end = _time_to_sec(m.group(5), m.group(6), m.group(7), m.group(8)) return {"start": start, "end": end, "text": " ".join(text_lines).strip()} for line in lines: if line.strip() == "": b = finalize_block(current) if b: blocks.append(b) current = [] else: current.append(line) b = finalize_block(current) if b: blocks.append(b) text = " ".join(b["text"] for b in blocks) return text, blocks or None def _parse_srt(raw: str) -> tuple[str, Optional[list]]: """SubRip-Parser (Timecodes mit Komma statt Punkt).""" return _parse_vtt(raw.replace(",", ".")) def _parse_html(raw: str) -> str: """HTML → Plaintext. Entfernt Tags simpel via Regex (genuegt fuer Transcript-HTML).""" no_tags = re.sub(r"", " ", raw, flags=re.DOTALL | re.IGNORECASE) no_tags = re.sub(r"", " ", no_tags, flags=re.DOTALL | re.IGNORECASE) no_tags = re.sub(r"<[^>]+>", " ", no_tags) # HTML-Entitys grob zuruecksetzen no_tags = (no_tags .replace(" ", " ") .replace("&", "&") .replace(""", '"') .replace("'", "'") .replace("<", "<") .replace(">", ">")) no_tags = re.sub(r"\s+", " ", no_tags) return no_tags.strip() def _time_to_sec(h: Optional[str], m: str, s: str, ms: str) -> float: """Konvertiert VTT-Timecode in Sekunden.""" hours = int(h.rstrip(":")) if h else 0 return hours * 3600 + int(m) * 60 + int(s) + int(ms) / 1000.0