Zwei Lücken beim Befund Lage 96 (Verfassungsänderung Japan): die japanische Asahi-Shimbun-Quelle wurde durch das Sprach-aware Keyword-Matching (#27) und Pre-Topic-Translate (#28) erstmals durchgereicht, landete aber mit language='en' und ohne englische Headline in der DB. Damit ist sie im Frontend nur als Kanji-Headline zu lesen und das Summary-LLM kann den Treffer nicht aussagekräftig referenzieren. 1. INSERT INTO articles erweitert um headline_en und content_en. Werte stammen primär vom Translator (headline_en, falls TRANSLATOR_ENABLED den Pfad einmal in Englisch befüllt), Fallback auf die für den Topic-Filter angefertigte Mini-Übersetzung (headline_en_for_topic / content_en_for_topic). So liegt die englische Variante dauerhaft in der DB statt nur während des Refresh-Laufs im Speicher. 2. RSS- und Telegram-Parser setzen 'language' nun primär aus der Quell-/ Kanal-Konfiguration (primary_language). Vorher war es hart 'de' wenn die Headline deutsch wirkte, sonst 'en' - mit dem Resultat, dass ein Kanji-Titel als language='en' landete. Mit dem Fix bekommen Asahi & Co. korrekt language='ja', russische Telegram-Kanäle 'ru' etc. - src/agents/orchestrator.py: INSERT erweitert, Kommentar zur Fallback-Logik - src/feeds/rss_parser.py: language aus feed_config.primary_language - src/feeds/telegram_parser.py: channel_lang durch _fetch_channel reichen Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
277 Zeilen
12 KiB
Python
277 Zeilen
12 KiB
Python
"""Telegram-Kanal Parser: Liest Nachrichten aus konfigurierten Telegram-Kanaelen."""
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
from config import TIMEZONE, TELEGRAM_API_ID, TELEGRAM_API_HASH, TELEGRAM_SESSION_PATH
|
|
|
|
logger = logging.getLogger("osint.telegram")
|
|
|
|
# Stoppwoerter (gleich wie RSS-Parser)
|
|
STOP_WORDS = {
|
|
"und", "oder", "der", "die", "das", "ein", "eine", "in", "im", "am", "an",
|
|
"auf", "fuer", "mit", "von", "zu", "zum", "zur", "bei", "nach", "vor",
|
|
"ueber", "unter", "ist", "sind", "hat", "the", "and", "for", "with", "from",
|
|
}
|
|
|
|
|
|
class TelegramParser:
|
|
"""Durchsucht Telegram-Kanaele nach relevanten Nachrichten."""
|
|
|
|
_client = None
|
|
_lock = asyncio.Lock()
|
|
|
|
async def _get_client(self):
|
|
"""Telethon-Client erstellen oder wiederverwenden."""
|
|
if TelegramParser._client is not None:
|
|
if TelegramParser._client.is_connected():
|
|
return TelegramParser._client
|
|
|
|
async with TelegramParser._lock:
|
|
# Double-check nach Lock
|
|
if TelegramParser._client is not None and TelegramParser._client.is_connected():
|
|
return TelegramParser._client
|
|
|
|
try:
|
|
from telethon import TelegramClient
|
|
session_path = TELEGRAM_SESSION_PATH
|
|
if not os.path.exists(session_path + ".session") and not os.path.exists(session_path):
|
|
logger.error("Telegram-Session nicht gefunden: %s", session_path)
|
|
return None
|
|
|
|
client = TelegramClient(session_path, TELEGRAM_API_ID, TELEGRAM_API_HASH)
|
|
await client.connect()
|
|
|
|
if not await client.is_user_authorized():
|
|
logger.error("Telegram-Session nicht autorisiert. Bitte neu einloggen.")
|
|
await client.disconnect()
|
|
return None
|
|
|
|
TelegramParser._client = client
|
|
me = await client.get_me()
|
|
logger.info("Telegram verbunden als: %s (%s)", me.first_name, me.phone)
|
|
return client
|
|
except ImportError:
|
|
logger.error("telethon nicht installiert: pip install telethon")
|
|
return None
|
|
except Exception as e:
|
|
logger.error("Telegram-Verbindung fehlgeschlagen: %s", e)
|
|
return None
|
|
|
|
async def search_channels(self, search_term: str, tenant_id: int = None,
|
|
keywords: dict | list = None, channel_ids: list[int] = None) -> list[dict]:
|
|
"""Liest Nachrichten aus konfigurierten Telegram-Kanaelen.
|
|
|
|
Args:
|
|
keywords: Sprach-Dict {iso_lang: [keyword,...]} oder flache Liste (Backward).
|
|
Match nutzt pro Kanal die "en"-Universalbegriffe + die Keywords der
|
|
Kanalsprache (primary_language aus sources-Tabelle).
|
|
|
|
Gibt Artikel-Dicts zurueck (kompatibel mit RSS-Parser-Format).
|
|
"""
|
|
from agents.researcher import keywords_for_language
|
|
|
|
client = await self._get_client()
|
|
if not client:
|
|
logger.warning("Telegram-Client nicht verfuegbar, ueberspringe Telegram-Pipeline")
|
|
return []
|
|
|
|
# Telegram-Kanaele aus DB laden (inkl. primary_language)
|
|
channels = await self._get_telegram_channels(tenant_id, channel_ids=channel_ids)
|
|
if not channels:
|
|
logger.info("Keine Telegram-Kanaele konfiguriert")
|
|
return []
|
|
|
|
# Fallback-Suchwoerter wenn keine Keywords da sind
|
|
fallback_words: list[str] | None = None
|
|
if not keywords:
|
|
fallback_words = [
|
|
w for w in search_term.lower().split()
|
|
if w not in STOP_WORDS and len(w) >= 3
|
|
]
|
|
if not fallback_words:
|
|
fallback_words = search_term.lower().split()[:2]
|
|
|
|
# Kanaele parallel abrufen
|
|
tasks = []
|
|
for ch in channels:
|
|
channel_id = ch["url"] or ch["name"]
|
|
channel_lang = ch.get("primary_language")
|
|
if keywords:
|
|
search_words = keywords_for_language(keywords, channel_lang)
|
|
search_words = [w.lower() for w in search_words]
|
|
else:
|
|
search_words = fallback_words or []
|
|
tasks.append(self._fetch_channel(client, channel_id, search_words, channel_lang=channel_lang))
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
all_articles = []
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, Exception):
|
|
logger.warning("Telegram-Kanal %s: %s", channels[i]["name"], result)
|
|
continue
|
|
all_articles.extend(result)
|
|
|
|
logger.info("Telegram: %d relevante Nachrichten aus %d Kanaelen", len(all_articles), len(channels))
|
|
return all_articles
|
|
|
|
async def _get_telegram_channels(self, tenant_id: int = None, channel_ids: list[int] = None) -> list[dict]:
|
|
"""Laedt Telegram-Kanaele aus der sources-Tabelle."""
|
|
try:
|
|
from database import get_db
|
|
db = await get_db()
|
|
try:
|
|
if channel_ids and len(channel_ids) > 0:
|
|
placeholders = ",".join("?" for _ in channel_ids)
|
|
cursor = await db.execute(
|
|
f"""SELECT id, name, url, category, notes, primary_language FROM sources
|
|
WHERE source_type = 'telegram_channel'
|
|
AND status = 'active'
|
|
AND id IN ({placeholders})""",
|
|
tuple(channel_ids),
|
|
)
|
|
else:
|
|
cursor = await db.execute(
|
|
"""SELECT id, name, url, category, notes, primary_language FROM sources
|
|
WHERE source_type = 'telegram_channel'
|
|
AND status = 'active'
|
|
AND (tenant_id IS NULL OR tenant_id = ?)""",
|
|
(tenant_id,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
await db.close()
|
|
except Exception as e:
|
|
logger.error("Fehler beim Laden der Telegram-Kanaele: %s", e)
|
|
return []
|
|
|
|
async def _fetch_channel(self, client, channel_id: str, search_words: list[str],
|
|
limit: int = 50, channel_lang: str | None = None) -> list[dict]:
|
|
"""Letzte N Nachrichten eines Kanals abrufen und nach Keywords filtern."""
|
|
articles = []
|
|
try:
|
|
# Kanal-Identifier normalisieren
|
|
identifier = channel_id.strip()
|
|
if identifier.startswith("https://t.me/"):
|
|
identifier = identifier.replace("https://t.me/", "")
|
|
if identifier.startswith("t.me/"):
|
|
identifier = identifier.replace("t.me/", "")
|
|
|
|
# Privater Invite-Link
|
|
if identifier.startswith("+") or identifier.startswith("joinchat/"):
|
|
entity = await client.get_entity(channel_id)
|
|
else:
|
|
# Oeffentlicher Kanal
|
|
if not identifier.startswith("@"):
|
|
identifier = "@" + identifier
|
|
entity = await client.get_entity(identifier)
|
|
|
|
messages = await client.get_messages(entity, limit=limit)
|
|
|
|
channel_title = getattr(entity, "title", identifier)
|
|
channel_username = getattr(entity, "username", identifier.replace("@", ""))
|
|
|
|
for msg in messages:
|
|
if not msg.text:
|
|
continue
|
|
|
|
text = msg.text
|
|
text_lower = text.lower()
|
|
|
|
# Keyword-Matching (lockerer als RSS: 1 Match reicht,
|
|
# da Kanaele bereits thematisch vorselektiert sind)
|
|
match_count = sum(1 for word in search_words if word in text_lower)
|
|
|
|
if match_count < 1:
|
|
continue
|
|
|
|
# Erste Zeile als Headline, Rest als Content
|
|
lines = text.strip().split("\n")
|
|
headline = lines[0][:200] if lines else text[:200]
|
|
content = text
|
|
|
|
# Datum
|
|
published = None
|
|
if msg.date:
|
|
try:
|
|
published = msg.date.astimezone(TIMEZONE).isoformat()
|
|
except Exception:
|
|
published = msg.date.isoformat()
|
|
|
|
# Source-URL: t.me/channel/msg_id
|
|
if channel_username:
|
|
source_url = "https://t.me/%s/%s" % (channel_username, msg.id)
|
|
else:
|
|
source_url = "https://t.me/c/%s/%s" % (entity.id, msg.id)
|
|
|
|
relevance_score = match_count / len(search_words) if search_words else 0.0
|
|
|
|
articles.append({
|
|
"headline": headline,
|
|
"headline_de": headline if self._is_german(headline) else None,
|
|
"source": "Telegram: %s" % channel_title,
|
|
"source_url": source_url,
|
|
"content_original": content[:2000],
|
|
"content_de": content[:2000] if self._is_german(content) else None,
|
|
# Sprache primär aus der Kanal-Konfiguration übernehmen
|
|
# (z.B. "ru" für russische Kanäle). Sonst Fallback auf die
|
|
# de/en-Heuristik. Symmetrisch zur RSS-Pfad-Logik.
|
|
"language": channel_lang or ("de" if self._is_german(content) else "en"),
|
|
"published_at": published,
|
|
"relevance_score": relevance_score,
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.warning("Telegram-Kanal %s: %s", channel_id, e)
|
|
|
|
return articles
|
|
|
|
async def validate_channel(self, channel_id: str) -> Optional[dict]:
|
|
"""Prueft ob ein Telegram-Kanal erreichbar ist und gibt Info zurueck."""
|
|
client = await self._get_client()
|
|
if not client:
|
|
return None
|
|
|
|
try:
|
|
identifier = channel_id.strip()
|
|
if identifier.startswith("https://t.me/"):
|
|
identifier = identifier.replace("https://t.me/", "")
|
|
if identifier.startswith("t.me/"):
|
|
identifier = identifier.replace("t.me/", "")
|
|
|
|
if identifier.startswith("+") or identifier.startswith("joinchat/"):
|
|
return {"valid": True, "name": "Privater Kanal", "description": "Privater Einladungslink", "subscribers": None}
|
|
|
|
if not identifier.startswith("@"):
|
|
identifier = "@" + identifier
|
|
|
|
entity = await client.get_entity(identifier)
|
|
|
|
from telethon.tl.functions.channels import GetFullChannelRequest
|
|
full = await client(GetFullChannelRequest(entity))
|
|
|
|
return {
|
|
"valid": True,
|
|
"name": getattr(entity, "title", identifier),
|
|
"description": getattr(full.full_chat, "about", "") or "",
|
|
"subscribers": getattr(full.full_chat, "participants_count", None),
|
|
"username": getattr(entity, "username", ""),
|
|
}
|
|
except Exception as e:
|
|
logger.warning("Telegram-Kanal-Validierung fehlgeschlagen fuer %s: %s", channel_id, e)
|
|
return None
|
|
|
|
def _is_german(self, text: str) -> bool:
|
|
"""Einfache Heuristik ob ein Text deutsch ist."""
|
|
german_words = {"der", "die", "das", "und", "ist", "von", "mit", "fuer", "auf", "ein",
|
|
"eine", "den", "dem", "des", "sich", "wird", "nach", "bei", "auch",
|
|
"ueber", "wie", "aus", "hat", "zum", "zur", "als", "noch", "mehr",
|
|
"nicht", "aber", "oder", "sind", "vor", "einem", "einer", "wurde"}
|
|
words = set(text.lower().split())
|
|
matches = words & german_words
|
|
return len(matches) >= 2
|