"""Telegram-Kanal Parser: Liest Nachrichten aus konfigurierten Telegram-Kanaelen.""" import asyncio import logging import os from datetime import datetime, timezone from typing import Optional from config import TIMEZONE, TELEGRAM_API_ID, TELEGRAM_API_HASH, TELEGRAM_SESSION_PATH logger = logging.getLogger("osint.telegram") # Stoppwoerter (gleich wie RSS-Parser) STOP_WORDS = { "und", "oder", "der", "die", "das", "ein", "eine", "in", "im", "am", "an", "auf", "fuer", "mit", "von", "zu", "zum", "zur", "bei", "nach", "vor", "ueber", "unter", "ist", "sind", "hat", "the", "and", "for", "with", "from", } class TelegramParser: """Durchsucht Telegram-Kanaele nach relevanten Nachrichten.""" _client = None _lock = asyncio.Lock() async def _get_client(self): """Telethon-Client erstellen oder wiederverwenden.""" if TelegramParser._client is not None: if TelegramParser._client.is_connected(): return TelegramParser._client async with TelegramParser._lock: # Double-check nach Lock if TelegramParser._client is not None and TelegramParser._client.is_connected(): return TelegramParser._client try: from telethon import TelegramClient session_path = TELEGRAM_SESSION_PATH if not os.path.exists(session_path + ".session") and not os.path.exists(session_path): logger.error("Telegram-Session nicht gefunden: %s", session_path) return None client = TelegramClient(session_path, TELEGRAM_API_ID, TELEGRAM_API_HASH) await client.connect() if not await client.is_user_authorized(): logger.error("Telegram-Session nicht autorisiert. Bitte neu einloggen.") await client.disconnect() return None TelegramParser._client = client me = await client.get_me() logger.info("Telegram verbunden als: %s (%s)", me.first_name, me.phone) return client except ImportError: logger.error("telethon nicht installiert: pip install telethon") return None except Exception as e: logger.error("Telegram-Verbindung fehlgeschlagen: %s", e) return None async def search_channels(self, search_term: str, tenant_id: int = None, keywords: list[str] = None) -> list[dict]: """Liest Nachrichten aus konfigurierten Telegram-Kanaelen. Gibt Artikel-Dicts zurueck (kompatibel mit RSS-Parser-Format). """ client = await self._get_client() if not client: logger.warning("Telegram-Client nicht verfuegbar, ueberspringe Telegram-Pipeline") return [] # Telegram-Kanaele aus DB laden channels = await self._get_telegram_channels(tenant_id) if not channels: logger.info("Keine Telegram-Kanaele konfiguriert") return [] # Suchwoerter vorbereiten if keywords: search_words = [w.lower().strip() for w in keywords if w.strip()] else: search_words = [ w for w in search_term.lower().split() if w not in STOP_WORDS and len(w) >= 3 ] if not search_words: search_words = search_term.lower().split()[:2] # Kanaele parallel abrufen tasks = [] for ch in channels: channel_id = ch["url"] or ch["name"] tasks.append(self._fetch_channel(client, channel_id, search_words)) results = await asyncio.gather(*tasks, return_exceptions=True) all_articles = [] for i, result in enumerate(results): if isinstance(result, Exception): logger.warning("Telegram-Kanal %s: %s", channels[i]["name"], result) continue all_articles.extend(result) logger.info("Telegram: %d relevante Nachrichten aus %d Kanaelen", len(all_articles), len(channels)) return all_articles async def _get_telegram_channels(self, tenant_id: int = None) -> list[dict]: """Laedt Telegram-Kanaele aus der sources-Tabelle.""" try: from database import get_db db = await get_db() try: cursor = await db.execute( """SELECT id, name, url FROM sources WHERE source_type = 'telegram_channel' AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?)""", (tenant_id,), ) rows = await cursor.fetchall() return [dict(row) for row in rows] finally: await db.close() except Exception as e: logger.error("Fehler beim Laden der Telegram-Kanaele: %s", e) return [] async def _fetch_channel(self, client, channel_id: str, search_words: list[str], limit: int = 50) -> list[dict]: """Letzte N Nachrichten eines Kanals abrufen und nach Keywords filtern.""" articles = [] try: # Kanal-Identifier normalisieren identifier = channel_id.strip() if identifier.startswith("https://t.me/"): identifier = identifier.replace("https://t.me/", "") if identifier.startswith("t.me/"): identifier = identifier.replace("t.me/", "") # Privater Invite-Link if identifier.startswith("+") or identifier.startswith("joinchat/"): entity = await client.get_entity(channel_id) else: # Oeffentlicher Kanal if not identifier.startswith("@"): identifier = "@" + identifier entity = await client.get_entity(identifier) messages = await client.get_messages(entity, limit=limit) channel_title = getattr(entity, "title", identifier) channel_username = getattr(entity, "username", identifier.replace("@", "")) for msg in messages: if not msg.text: continue text = msg.text text_lower = text.lower() # Keyword-Matching (gleiche Logik wie RSS-Parser) min_matches = min(2, max(1, (len(search_words) + 1) // 2)) match_count = sum(1 for word in search_words if word in text_lower) if match_count < min_matches: continue # Erste Zeile als Headline, Rest als Content lines = text.strip().split("\n") headline = lines[0][:200] if lines else text[:200] content = text # Datum published = None if msg.date: try: published = msg.date.astimezone(TIMEZONE).isoformat() except Exception: published = msg.date.isoformat() # Source-URL: t.me/channel/msg_id if channel_username: source_url = "https://t.me/%s/%s" % (channel_username, msg.id) else: source_url = "https://t.me/c/%s/%s" % (entity.id, msg.id) relevance_score = match_count / len(search_words) if search_words else 0.0 articles.append({ "headline": headline, "headline_de": headline if self._is_german(headline) else None, "source": "Telegram: %s" % channel_title, "source_url": source_url, "content_original": content[:2000], "content_de": content[:2000] if self._is_german(content) else None, "language": "de" if self._is_german(content) else "en", "published_at": published, "relevance_score": relevance_score, }) except Exception as e: logger.warning("Telegram-Kanal %s: %s", channel_id, e) return articles async def validate_channel(self, channel_id: str) -> Optional[dict]: """Prueft ob ein Telegram-Kanal erreichbar ist und gibt Info zurueck.""" client = await self._get_client() if not client: return None try: identifier = channel_id.strip() if identifier.startswith("https://t.me/"): identifier = identifier.replace("https://t.me/", "") if identifier.startswith("t.me/"): identifier = identifier.replace("t.me/", "") if identifier.startswith("+") or identifier.startswith("joinchat/"): return {"valid": True, "name": "Privater Kanal", "description": "Privater Einladungslink", "subscribers": None} if not identifier.startswith("@"): identifier = "@" + identifier entity = await client.get_entity(identifier) from telethon.tl.functions.channels import GetFullChannelRequest full = await client(GetFullChannelRequest(entity)) return { "valid": True, "name": getattr(entity, "title", identifier), "description": getattr(full.full_chat, "about", "") or "", "subscribers": getattr(full.full_chat, "participants_count", None), "username": getattr(entity, "username", ""), } except Exception as e: logger.warning("Telegram-Kanal-Validierung fehlgeschlagen fuer %s: %s", channel_id, e) return None def _is_german(self, text: str) -> bool: """Einfache Heuristik ob ein Text deutsch ist.""" german_words = {"der", "die", "das", "und", "ist", "von", "mit", "fuer", "auf", "ein", "eine", "den", "dem", "des", "sich", "wird", "nach", "bei", "auch", "ueber", "wie", "aus", "hat", "zum", "zur", "als", "noch", "mehr", "nicht", "aber", "oder", "sind", "vor", "einem", "einer", "wurde"} words = set(text.lower().split()) matches = words & german_words return len(matches) >= 2