From 68c6666d870924f3fc8ac94711f64c9a3d4e0652 Mon Sep 17 00:00:00 2001 From: Claude Dev Date: Sun, 29 Mar 2026 20:54:54 +0200 Subject: [PATCH] =?UTF-8?q?cleanup:=20Blog-Pipeline=20entfernt=20(l=C3=A4u?= =?UTF-8?q?ft=20jetzt=20auf=20Dev)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Die Blog-Pipeline wurde auf den Dev-Server migriert und läuft dort als eigenständiger Service im Blog-Container. Die Monitor-seitige Implementation wird nicht mehr benötigt. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/agents/blog/__init__.py | 0 src/agents/blog/blog_curator.py | 149 ---------------------------- src/agents/blog/blog_pipeline.py | 115 ---------------------- src/agents/blog/blog_writer.py | 162 ------------------------------- 4 files changed, 426 deletions(-) delete mode 100644 src/agents/blog/__init__.py delete mode 100644 src/agents/blog/blog_curator.py delete mode 100644 src/agents/blog/blog_pipeline.py delete mode 100644 src/agents/blog/blog_writer.py diff --git a/src/agents/blog/__init__.py b/src/agents/blog/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/agents/blog/blog_curator.py b/src/agents/blog/blog_curator.py deleted file mode 100644 index e2a506a..0000000 --- a/src/agents/blog/blog_curator.py +++ /dev/null @@ -1,149 +0,0 @@ -"""BlogCurator -- Wählt tägliche Blog-Themen aus der Monitor-DB.""" -import json -import logging -import sqlite3 -from datetime import datetime, timedelta, timezone - -logger = logging.getLogger("blog.curator") - - -def _extract_json(text): - """Extrahiert JSON aus Claude-Antworten (robust).""" - text = text.strip() - # Typographische Anfuehrungszeichen ersetzen (brechen JSON) - text = text.replace("„", "'").replace("“", "'").replace("”", "'") - text = text.replace("«", "'").replace("»", "'") - # 1. Direktes Parsen versuchen - try: - return json.loads(text, strict=False) - except json.JSONDecodeError: - pass - # 2. Erstes JSON-Objekt oder Array finden - for open_c, close_c in [("{", "}"), ("[", "]")]: - start = text.find(open_c) - end = text.rfind(close_c) - if start != -1 and end > start: - candidate = text[start:end+1] - try: - return json.loads(candidate, strict=False) - except json.JSONDecodeError: - pass - raise json.JSONDecodeError("Kein gueltiges JSON gefunden", text, 0) - -DB_PATH = "/mnt/gitea/osint-data/osint.db" - - -def get_recent_data(hours: int = 24) -> dict: - """Holt aktuelle Artikel und Faktenchecks aus der Monitor-DB.""" - cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - - # Aktuelle Artikel - cursor = conn.execute( - """SELECT a.headline_de, a.headline, a.source, a.source_url, - a.content_de, a.published_at, a.collected_at, - i.title as incident_title, i.id as incident_id - FROM articles a - LEFT JOIN incidents i ON a.incident_id = i.id - WHERE a.collected_at > ? OR a.published_at > ? - ORDER BY a.collected_at DESC LIMIT 100""", - (cutoff, cutoff), - ) - articles = [dict(r) for r in cursor.fetchall()] - - # Aktive Lagen - cursor = conn.execute( - "SELECT id, title, summary, type, status FROM incidents WHERE status = 'active' ORDER BY updated_at DESC LIMIT 10" - ) - incidents = [dict(r) for r in cursor.fetchall()] - - # Aktuelle Faktenchecks - cursor = conn.execute( - """SELECT claim, status, evidence, checked_at, incident_id - FROM fact_checks WHERE checked_at > ? ORDER BY checked_at DESC LIMIT 30""", - (cutoff,), - ) - fact_checks = [dict(r) for r in cursor.fetchall()] - - conn.close() - return {"articles": articles, "incidents": incidents, "fact_checks": fact_checks} - - -async def curate_topics(call_claude_fn) -> list[dict]: - """Wählt 2-4 blogwürdige Themen aus.""" - data = get_recent_data() - - if not data["articles"]: - logger.warning("Keine neuen Artikel in den letzten 24h") - return [] - - # Zusammenfassung für Claude - article_summary = [] - for a in data["articles"][:50]: - title = a["headline_de"] or a["headline"] or "" - source = a["source"] or "" - incident = a["incident_title"] or "" - article_summary.append(f"- [{source}] {title} (Lage: {incident})") - - fc_summary = [] - for fc in data["fact_checks"][:15]: - fc_summary.append(f"- [{fc['status']}] {fc['claim'][:150]}") - - incident_summary = [] - for inc in data["incidents"]: - summary_short = (inc["summary"] or "")[:200] - incident_summary.append(f"- Lage #{inc['id']}: {inc['title']} -- {summary_short}") - - prompt = f"""Du bist der Redaktionsleiter des OSINT-Blogs "AegisSight Mosaic". -Wähle aus den folgenden aktuellen OSINT-Daten 2-4 Themen aus, die sich für fundierte Blog-Artikel eignen. - -KATEGORIEN (wähle passend): -- OSINT: Nachrichtenanalyse, Quellenauswertung, Faktencheck -- GEOINT: Karten, Satellitenbilder, räumliche Analyse -- CYBINT: Cyber-Bedrohungen, digitale Infrastruktur -- SOCMINT: Desinformation, Narrative, Social-Media-Trends - -AKTUELLE LAGEN: -{chr(10).join(incident_summary)} - -AKTUELLE ARTIKEL (letzte 24h): -{chr(10).join(article_summary[:30])} - -AKTUELLE FAKTENCHECKS: -{chr(10).join(fc_summary)} - -REGELN: -- Wähle Themen die für ein breites Publikum interessant sind -- Keine rein technischen oder internen Themen -- Bevorzuge Themen mit mehreren Quellen und Faktenchecks -- Jedes Thema muss einer Kategorie zugeordnet werden -- Gib relevante Artikel-IDs und Incident-IDs als Kontext mit - -Antworte als JSON-Array: -[ - {{ - "topic": "Kurzer Thementitel", - "category": "OSINT|GEOINT|CYBINT|SOCMINT", - "angle": "Welcher Blickwinkel/welche Story?", - "key_sources": ["Quellenname 1", "Quellenname 2"], - "incident_ids": [6, 18], - "relevance": "Warum ist das jetzt relevant?" - }} -]""" - - result, usage = await call_claude_fn(prompt, tools=None, model="claude-haiku-4-5-20251001") - - try: - topics = _extract_json(result) - # Doppelt-encodiertes JSON abfangen - if isinstance(topics, str): - topics = json.loads(topics, strict=False) - if not isinstance(topics, list): - logger.error(f"Curator: Unerwarteter Typ {type(topics).__name__}, erwartet list") - return [] - logger.info(f"Curator: {len(topics)} Themen ausgewählt (${usage.cost_usd:.4f})") - return topics - except (json.JSONDecodeError, IndexError, TypeError) as e: - logger.error(f"Curator JSON-Parse-Fehler: {e}\nRaw: {result[:300]}") - return [] diff --git a/src/agents/blog/blog_pipeline.py b/src/agents/blog/blog_pipeline.py deleted file mode 100644 index 5ca7522..0000000 --- a/src/agents/blog/blog_pipeline.py +++ /dev/null @@ -1,115 +0,0 @@ -"""Blog-Pipeline: Curator -> Writer -> Push zum Blog.""" -import asyncio -import json -import logging -import ssl -import sys -import urllib.request -from pathlib import Path - -# Projekt-Root zum Path hinzufügen -sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent)) -sys.path.insert(0, str(Path(__file__).parent.parent.parent)) - -from src.agents.claude_client import call_claude -from src.agents.blog.blog_curator import curate_topics -from src.agents.blog.blog_writer import write_article - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", - handlers=[ - logging.StreamHandler(), - ], -) -logger = logging.getLogger("blog.pipeline") - -BLOG_API_URL = "https://blog.aegis-sight.de/api/ingest/drafts" -API_KEY_FILE = "/home/claude-dev/.blog-api-key" - - -def read_api_key() -> str: - try: - with open(API_KEY_FILE) as f: - return f.read().strip() - except FileNotFoundError: - logger.error(f"API-Key-Datei nicht gefunden: {API_KEY_FILE}") - sys.exit(1) - - -def push_to_blog(articles: list[dict], api_key: str) -> dict: - """Pushed Artikel-Entwürfe an die Blog Ingest API (mit Retry).""" - import time - data = json.dumps({"articles": articles}).encode("utf-8") - ctx = ssl.create_default_context() - last_error = None - for attempt in range(3): - try: - req = urllib.request.Request( - BLOG_API_URL, - data=data, - headers={"Content-Type": "application/json", "X-API-Key": api_key}, - method="POST", - ) - with urllib.request.urlopen(req, timeout=30, context=ctx) as resp: - return json.loads(resp.read().decode("utf-8")) - except Exception as e: - last_error = e - if attempt < 2: - wait = 3 ** attempt # 1s, 3s - logger.warning(f"Push fehlgeschlagen (Versuch {attempt + 1}/3): {e} -- Retry in {wait}s") - time.sleep(wait) - raise last_error - - -async def run_pipeline(): - """Führt die komplette Blog-Pipeline aus.""" - logger.info("=== Blog-Pipeline gestartet ===") - - # 1. Themen auswählen - logger.info("Schritt 1: Themen auswählen...") - topics = await curate_topics(call_claude) - if not topics: - logger.warning("Keine Themen ausgewählt -- Pipeline beendet") - return - - logger.info(f"{len(topics)} Themen ausgewählt: {[t.get('topic', '?') if isinstance(t, dict) else str(t)[:50] for t in topics]}") - - # 2. Artikel schreiben - logger.info("Schritt 2: Artikel schreiben...") - articles = [] - for topic in topics: - topic_title = topic.get('topic', 'Unbekannt') if isinstance(topic, dict) else str(topic)[:50] - topic_cat = topic.get('category', '?') if isinstance(topic, dict) else '?' - logger.info(f"Schreibe: {topic_title} ({topic_cat})") - article = await write_article(topic, call_claude) - if article: - articles.append(article) - else: - logger.warning(f"Artikel fehlgeschlagen: {topic_title}") - - if not articles: - logger.warning("Keine Artikel geschrieben -- Pipeline beendet") - return - - logger.info(f"{len(articles)} Artikel geschrieben") - - # 3. An Blog pushen - logger.info("Schritt 3: An Blog pushen...") - api_key = read_api_key() - try: - result = push_to_blog(articles, api_key) - logger.info(f"Push-Ergebnis: {result['accepted']} akzeptiert, {result.get('rejected', 0)} abgelehnt") - except Exception as e: - logger.error(f"Push fehlgeschlagen: {e}") - return - - logger.info("=== Blog-Pipeline abgeschlossen ===") - - -def main(): - asyncio.run(run_pipeline()) - - -if __name__ == "__main__": - main() diff --git a/src/agents/blog/blog_writer.py b/src/agents/blog/blog_writer.py deleted file mode 100644 index 5e54acf..0000000 --- a/src/agents/blog/blog_writer.py +++ /dev/null @@ -1,162 +0,0 @@ -"""BlogWriter -- Schreibt Blog-Artikel aus Curator-Themen.""" -import json -import logging -import sqlite3 -from datetime import datetime, timedelta, timezone - -logger = logging.getLogger("blog.writer") - - -def _extract_json(text): - """Extrahiert JSON aus Claude-Antworten (robust).""" - text = text.strip() - # Typographische Anfuehrungszeichen ersetzen (brechen JSON) - text = text.replace("„", "'").replace("“", "'").replace("”", "'") - text = text.replace("«", "'").replace("»", "'") - # 1. Direktes Parsen versuchen - try: - return json.loads(text, strict=False) - except json.JSONDecodeError: - pass - # 2. Erstes JSON-Objekt oder Array finden - for open_c, close_c in [("{", "}"), ("[", "]")]: - start = text.find(open_c) - end = text.rfind(close_c) - if start != -1 and end > start: - candidate = text[start:end+1] - try: - return json.loads(candidate, strict=False) - except json.JSONDecodeError as e: - import logging; logging.getLogger("blog.writer.json").warning(f"Parse at {open_c}..{close_c}: {e.msg} at pos {e.pos}, ctx: {repr(candidate[max(0,e.pos-40):e.pos+40])}") - raise json.JSONDecodeError("Kein gueltiges JSON gefunden", text, 0) - -DB_PATH = "/mnt/gitea/osint-data/osint.db" - - -def get_context_for_topic(topic: dict) -> str: - """Holt relevante Daten aus der DB für ein Thema.""" - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - - context_parts = [] - - # Incident-Zusammenfassungen - for inc_id in topic.get("incident_ids", []): - cursor = conn.execute( - "SELECT title, summary, updated_at FROM incidents WHERE id = ?", (inc_id,) - ) - inc = cursor.fetchone() - if inc: - summary = (dict(inc).get("summary") or "")[:2000] - context_parts.append(f"## Lage: {inc['title']}\n{summary}") - - # Relevante Artikel der letzten 48h - cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat() - incident_ids = topic.get("incident_ids", []) - if incident_ids: - placeholders = ",".join("?" for _ in incident_ids) - cursor = conn.execute( - f"""SELECT headline_de, headline, source, source_url, content_de, published_at - FROM articles WHERE incident_id IN ({placeholders}) AND (collected_at > ? OR published_at > ?) - ORDER BY collected_at DESC LIMIT 20""", - (*incident_ids, cutoff, cutoff), - ) - else: - cursor = conn.execute( - """SELECT headline_de, headline, source, source_url, content_de, published_at - FROM articles WHERE collected_at > ? OR published_at > ? - ORDER BY collected_at DESC LIMIT 20""", - (cutoff, cutoff), - ) - articles = [dict(r) for r in cursor.fetchall()] - - if articles: - art_text = [] - for a in articles: - title = a["headline_de"] or a["headline"] or "" - content = (a["content_de"] or "")[:500] - source = a["source"] or "" - url = a["source_url"] or "" - art_text.append(f"### {title}\nQuelle: {source} ({url})\n{content}") - context_parts.append("## Aktuelle Berichte\n" + "\n\n".join(art_text[:10])) - - # Relevante Faktenchecks - if incident_ids: - placeholders = ",".join("?" for _ in incident_ids) - cursor = conn.execute( - f"""SELECT claim, status, evidence FROM fact_checks - WHERE incident_id IN ({placeholders}) ORDER BY checked_at DESC LIMIT 10""", - incident_ids, - ) - fcs = [dict(r) for r in cursor.fetchall()] - if fcs: - fc_text = [] - for fc in fcs: - evidence = (fc["evidence"] or "")[:300] - fc_text.append(f"- [{fc['status']}] {fc['claim']}\n Evidenz: {evidence}") - context_parts.append("## Faktenchecks\n" + "\n".join(fc_text)) - - conn.close() - return "\n\n".join(context_parts) - - -async def write_article(topic: dict, call_claude_fn) -> dict | None: - """Schreibt einen Blog-Artikel zu einem Thema.""" - context = get_context_for_topic(topic) - today = datetime.now(timezone.utc).strftime('%Y-%m-%d') - - prompt = f"""Du bist Journalist beim OSINT-Blog "AegisSight Mosaic". Schreibe einen fundierten Artikel auf Deutsch. - -THEMA: {topic["topic"]} -KATEGORIE: {topic["category"]} -BLICKWINKEL: {topic.get("angle", "")} - -KONTEXT AUS DER OSINT-ANALYSE: -{context} - -REGELN FÜR DEN ARTIKEL: -1. Schreibe wie ein professioneller Journalist, KEIN Lagebericht-Stil -2. Erzählerischer Fließtext mit Kontext und Einordnung -3. Verwende echte Umlaute (ü, ä, ö, ß) -4. Markdown-Format: ## für Zwischenüberschriften, **fett** für Betonung -5. WICHTIG: Verwende im Markdown-Text KEINE doppelten Anführungszeichen ("). Nutze stattdessen einfache Anführungszeichen (') oder *kursiv*. Doppelte Anführungszeichen brechen das JSON-Format. -6. 800-1500 Wörter, gut strukturiert -7. Nenne und verlinke Quellen im Text wo möglich -8. Meta-Description: 1 Satz, max 155 Zeichen, für Suchmaschinen -9. Am Ende: Einordnung/Ausblick (was bedeutet das?) - -Antworte als JSON: -{{ - "title": "Aussagekräftiger Titel (kein Clickbait)", - "content_markdown": "## Kompletter Artikel in Markdown...", - "meta_description": "Kurze Beschreibung für SEO (max 155 Zeichen)", - "sources": [ - {{"title": "Quellenname", "url": "https://...", "accessed_at": "{today}"}} - ], - "geo_data": null -}} - -Falls das Thema einen geographischen Bezug hat, fülle geo_data: -{{ - "center": [lat, lng], - "zoom": 5, - "markers": [{{"lat": 0, "lng": 0, "label": "Ort", "popup": "Beschreibung"}}] -}}""" - - result, usage = await call_claude_fn(prompt, tools="WebSearch,WebFetch", model=None) - - try: - article = _extract_json(result) - # Doppelt-encodiertes JSON abfangen - if isinstance(article, str): - article = json.loads(article, strict=False) - if not isinstance(article, dict): - logger.error(f"Writer: Unerwarteter Typ {type(article).__name__}") - return None - article["category"] = topic["category"] - article["monitor_event_ids"] = topic.get("incident_ids", []) - logger.info(f"Writer: Artikel '{article['title']}' geschrieben (${usage.cost_usd:.4f})") - return article - except (json.JSONDecodeError, IndexError, KeyError, TypeError) as e: - logger.error(f"Writer JSON-Parse-Fehler: {e} | repr: {repr(result[:200])} | has_brace: {chr(123) in result}") - return None