"""Gemeinsame Helfer fuer Website-Scrape-Adapter. HTML-Extraktor ohne externe Abhaengigkeiten (BeautifulSoup nicht in requirements.txt). Nutzt Regex fuer robusten Plaintext-Extract aus typischen Artikel-Containern. """ from __future__ import annotations import logging import re from typing import Optional from urllib.parse import urlparse import httpx logger = logging.getLogger("osint.podcast.extractors.common") HTTP_TIMEOUT = 20.0 MIN_TRANSCRIPT_LEN = 500 # Unter 500 Zeichen ist das kein Manuskript, nur Shownotes DEFAULT_HEADERS = { "User-Agent": "Mozilla/5.0 (compatible; OSINT-Monitor/1.0; +https://monitor.aegis-sight.de)", "Accept": "text/html,application/xhtml+xml", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", } def matches_domain(url: str, domains: tuple[str, ...]) -> bool: """Prueft, ob die URL zu einer der bekannten Sender-Domains gehoert.""" if not url: return False try: host = urlparse(url).hostname or "" host = host.lower().lstrip("www.") return any(host == d or host.endswith("." + d) for d in domains) except Exception: return False def episode_url(feed_entry: dict) -> Optional[str]: """Holt die Episoden-Webseite (meist entry.link).""" if isinstance(feed_entry, dict): return feed_entry.get("link") or feed_entry.get("guid") return getattr(feed_entry, "link", None) or getattr(feed_entry, "guid", None) async def fetch_html(url: str) -> Optional[str]: async with httpx.AsyncClient(timeout=HTTP_TIMEOUT, follow_redirects=True, headers=DEFAULT_HEADERS) as client: try: resp = await client.get(url) resp.raise_for_status() return resp.text except Exception as e: logger.debug(f"HTML-Fetch fehlgeschlagen ({url}): {e}") return None # --- HTML-Extraktion ------------------------------------------------------ _SCRIPT_STYLE_RE = re.compile(r"<(script|style|noscript|iframe)[^>]*>.*?", re.DOTALL | re.IGNORECASE) _COMMENT_RE = re.compile(r"", re.DOTALL) _TAG_RE = re.compile(r"<[^>]+>") _WHITESPACE_RE = re.compile(r"\s+") def extract_text_by_container(html: str, container_patterns: list[str]) -> Optional[str]: """Extrahiert Text aus dem ersten gefundenen Container. container_patterns: Liste von Regex-Mustern, die den oeffnenden Container-Tag matchen (z. B. r']*class="[^"]*article-body[^"]*"[^>]*>'). Intern wird der zugehoerige schliessende Tag per Tag-Balancing gesucht. """ html_clean = _COMMENT_RE.sub("", _SCRIPT_STYLE_RE.sub("", html)) for pattern in container_patterns: m = re.search(pattern, html_clean, re.IGNORECASE) if not m: continue start = m.start() # Tag-Name aus Pattern-Treffer extrahieren tag_match = re.match(r"<(\w+)", m.group(0)) if not tag_match: continue tag_name = tag_match.group(1).lower() end = _find_matching_close(html_clean, start, tag_name) if end < 0: continue block = html_clean[start:end] text = html_to_text(block) if len(text) >= MIN_TRANSCRIPT_LEN: return text return None def extract_longest_article_block(html: str) -> Optional[str]: """Fallback: suche den laengsten zusammenhaengenden Block aus

-Tags. Nuetzlich, wenn spezifische Container-Selektoren fehlschlagen. """ html_clean = _COMMENT_RE.sub("", _SCRIPT_STYLE_RE.sub("", html)) # Alle

- und
-Bloecke finden candidates = [] for tag in ("article", "main"): for m in re.finditer(rf"<{tag}\b[^>]*>", html_clean, re.IGNORECASE): end = _find_matching_close(html_clean, m.start(), tag) if end > m.start(): candidates.append(html_clean[m.start():end]) if not candidates: # Letzter Ausweg: gesamter Body body_m = re.search(r"]*>", html_clean, re.IGNORECASE) if body_m: candidates.append(html_clean[body_m.start():]) best_text = "" for block in candidates: text = html_to_text(block) if len(text) > len(best_text): best_text = text return best_text if len(best_text) >= MIN_TRANSCRIPT_LEN else None def html_to_text(html: str) -> str: """Simple HTML→Plaintext-Konvertierung.""" no_tags = _COMMENT_RE.sub("", _SCRIPT_STYLE_RE.sub("", html)) no_tags = _TAG_RE.sub(" ", no_tags) no_tags = (no_tags .replace(" ", " ") .replace("&", "&") .replace(""", '"') .replace("'", "'") .replace("'", "'") .replace("<", "<") .replace(">", ">") .replace("–", "-") .replace("—", "-") .replace("ä", "ä") .replace("ö", "ö") .replace("ü", "ü") .replace("Ä", "Ä") .replace("Ö", "Ö") .replace("Ü", "Ü") .replace("ß", "ß")) return _WHITESPACE_RE.sub(" ", no_tags).strip() def _find_matching_close(html: str, start: int, tag_name: str) -> int: """Findet die Position des schliessenden Tags, der zum oeffnenden Tag an `start` gehoert. Einfacher Zaehler-Ansatz: jeder weitere erhoeht, jeder verringert. Rueckgabe: Index NACH dem schliessenden Tag, -1 falls nicht gefunden. """ open_re = re.compile(rf"<{tag_name}\b[^>]*>", re.IGNORECASE) close_re = re.compile(rf"", re.IGNORECASE) depth = 1 pos = start + 1 # nach dem initial geoeffneten Tag while pos < len(html) and depth > 0: next_open = open_re.search(html, pos) next_close = close_re.search(html, pos) if not next_close: return -1 if next_open and next_open.start() < next_close.start(): depth += 1 pos = next_open.end() else: depth -= 1 pos = next_close.end() return pos if depth == 0 else -1