cleanup: Blog-Pipeline entfernt (läuft jetzt auf Dev)

Die Blog-Pipeline wurde auf den Dev-Server migriert und läuft dort
als eigenständiger Service im Blog-Container. Die Monitor-seitige
Implementation wird nicht mehr benötigt.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
Claude Dev
2026-03-29 20:54:54 +02:00
Ursprung b58eee2990
Commit 68c6666d87
4 geänderte Dateien mit 0 neuen und 426 gelöschten Zeilen

Datei anzeigen

@@ -1,149 +0,0 @@
"""BlogCurator -- Wählt tägliche Blog-Themen aus der Monitor-DB."""
import json
import logging
import sqlite3
from datetime import datetime, timedelta, timezone
logger = logging.getLogger("blog.curator")
def _extract_json(text):
"""Extrahiert JSON aus Claude-Antworten (robust)."""
text = text.strip()
# Typographische Anfuehrungszeichen ersetzen (brechen JSON)
text = text.replace("", "'").replace("", "'").replace("", "'")
text = text.replace("«", "'").replace("»", "'")
# 1. Direktes Parsen versuchen
try:
return json.loads(text, strict=False)
except json.JSONDecodeError:
pass
# 2. Erstes JSON-Objekt oder Array finden
for open_c, close_c in [("{", "}"), ("[", "]")]:
start = text.find(open_c)
end = text.rfind(close_c)
if start != -1 and end > start:
candidate = text[start:end+1]
try:
return json.loads(candidate, strict=False)
except json.JSONDecodeError:
pass
raise json.JSONDecodeError("Kein gueltiges JSON gefunden", text, 0)
DB_PATH = "/mnt/gitea/osint-data/osint.db"
def get_recent_data(hours: int = 24) -> dict:
"""Holt aktuelle Artikel und Faktenchecks aus der Monitor-DB."""
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
# Aktuelle Artikel
cursor = conn.execute(
"""SELECT a.headline_de, a.headline, a.source, a.source_url,
a.content_de, a.published_at, a.collected_at,
i.title as incident_title, i.id as incident_id
FROM articles a
LEFT JOIN incidents i ON a.incident_id = i.id
WHERE a.collected_at > ? OR a.published_at > ?
ORDER BY a.collected_at DESC LIMIT 100""",
(cutoff, cutoff),
)
articles = [dict(r) for r in cursor.fetchall()]
# Aktive Lagen
cursor = conn.execute(
"SELECT id, title, summary, type, status FROM incidents WHERE status = 'active' ORDER BY updated_at DESC LIMIT 10"
)
incidents = [dict(r) for r in cursor.fetchall()]
# Aktuelle Faktenchecks
cursor = conn.execute(
"""SELECT claim, status, evidence, checked_at, incident_id
FROM fact_checks WHERE checked_at > ? ORDER BY checked_at DESC LIMIT 30""",
(cutoff,),
)
fact_checks = [dict(r) for r in cursor.fetchall()]
conn.close()
return {"articles": articles, "incidents": incidents, "fact_checks": fact_checks}
async def curate_topics(call_claude_fn) -> list[dict]:
"""Wählt 2-4 blogwürdige Themen aus."""
data = get_recent_data()
if not data["articles"]:
logger.warning("Keine neuen Artikel in den letzten 24h")
return []
# Zusammenfassung für Claude
article_summary = []
for a in data["articles"][:50]:
title = a["headline_de"] or a["headline"] or ""
source = a["source"] or ""
incident = a["incident_title"] or ""
article_summary.append(f"- [{source}] {title} (Lage: {incident})")
fc_summary = []
for fc in data["fact_checks"][:15]:
fc_summary.append(f"- [{fc['status']}] {fc['claim'][:150]}")
incident_summary = []
for inc in data["incidents"]:
summary_short = (inc["summary"] or "")[:200]
incident_summary.append(f"- Lage #{inc['id']}: {inc['title']} -- {summary_short}")
prompt = f"""Du bist der Redaktionsleiter des OSINT-Blogs "AegisSight Mosaic".
Wähle aus den folgenden aktuellen OSINT-Daten 2-4 Themen aus, die sich für fundierte Blog-Artikel eignen.
KATEGORIEN (wähle passend):
- OSINT: Nachrichtenanalyse, Quellenauswertung, Faktencheck
- GEOINT: Karten, Satellitenbilder, räumliche Analyse
- CYBINT: Cyber-Bedrohungen, digitale Infrastruktur
- SOCMINT: Desinformation, Narrative, Social-Media-Trends
AKTUELLE LAGEN:
{chr(10).join(incident_summary)}
AKTUELLE ARTIKEL (letzte 24h):
{chr(10).join(article_summary[:30])}
AKTUELLE FAKTENCHECKS:
{chr(10).join(fc_summary)}
REGELN:
- Wähle Themen die für ein breites Publikum interessant sind
- Keine rein technischen oder internen Themen
- Bevorzuge Themen mit mehreren Quellen und Faktenchecks
- Jedes Thema muss einer Kategorie zugeordnet werden
- Gib relevante Artikel-IDs und Incident-IDs als Kontext mit
Antworte als JSON-Array:
[
{{
"topic": "Kurzer Thementitel",
"category": "OSINT|GEOINT|CYBINT|SOCMINT",
"angle": "Welcher Blickwinkel/welche Story?",
"key_sources": ["Quellenname 1", "Quellenname 2"],
"incident_ids": [6, 18],
"relevance": "Warum ist das jetzt relevant?"
}}
]"""
result, usage = await call_claude_fn(prompt, tools=None, model="claude-haiku-4-5-20251001")
try:
topics = _extract_json(result)
# Doppelt-encodiertes JSON abfangen
if isinstance(topics, str):
topics = json.loads(topics, strict=False)
if not isinstance(topics, list):
logger.error(f"Curator: Unerwarteter Typ {type(topics).__name__}, erwartet list")
return []
logger.info(f"Curator: {len(topics)} Themen ausgewählt (${usage.cost_usd:.4f})")
return topics
except (json.JSONDecodeError, IndexError, TypeError) as e:
logger.error(f"Curator JSON-Parse-Fehler: {e}\nRaw: {result[:300]}")
return []

Datei anzeigen

@@ -1,115 +0,0 @@
"""Blog-Pipeline: Curator -> Writer -> Push zum Blog."""
import asyncio
import json
import logging
import ssl
import sys
import urllib.request
from pathlib import Path
# Projekt-Root zum Path hinzufügen
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.agents.claude_client import call_claude
from src.agents.blog.blog_curator import curate_topics
from src.agents.blog.blog_writer import write_article
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
handlers=[
logging.StreamHandler(),
],
)
logger = logging.getLogger("blog.pipeline")
BLOG_API_URL = "https://blog.aegis-sight.de/api/ingest/drafts"
API_KEY_FILE = "/home/claude-dev/.blog-api-key"
def read_api_key() -> str:
try:
with open(API_KEY_FILE) as f:
return f.read().strip()
except FileNotFoundError:
logger.error(f"API-Key-Datei nicht gefunden: {API_KEY_FILE}")
sys.exit(1)
def push_to_blog(articles: list[dict], api_key: str) -> dict:
"""Pushed Artikel-Entwürfe an die Blog Ingest API (mit Retry)."""
import time
data = json.dumps({"articles": articles}).encode("utf-8")
ctx = ssl.create_default_context()
last_error = None
for attempt in range(3):
try:
req = urllib.request.Request(
BLOG_API_URL,
data=data,
headers={"Content-Type": "application/json", "X-API-Key": api_key},
method="POST",
)
with urllib.request.urlopen(req, timeout=30, context=ctx) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception as e:
last_error = e
if attempt < 2:
wait = 3 ** attempt # 1s, 3s
logger.warning(f"Push fehlgeschlagen (Versuch {attempt + 1}/3): {e} -- Retry in {wait}s")
time.sleep(wait)
raise last_error
async def run_pipeline():
"""Führt die komplette Blog-Pipeline aus."""
logger.info("=== Blog-Pipeline gestartet ===")
# 1. Themen auswählen
logger.info("Schritt 1: Themen auswählen...")
topics = await curate_topics(call_claude)
if not topics:
logger.warning("Keine Themen ausgewählt -- Pipeline beendet")
return
logger.info(f"{len(topics)} Themen ausgewählt: {[t.get('topic', '?') if isinstance(t, dict) else str(t)[:50] for t in topics]}")
# 2. Artikel schreiben
logger.info("Schritt 2: Artikel schreiben...")
articles = []
for topic in topics:
topic_title = topic.get('topic', 'Unbekannt') if isinstance(topic, dict) else str(topic)[:50]
topic_cat = topic.get('category', '?') if isinstance(topic, dict) else '?'
logger.info(f"Schreibe: {topic_title} ({topic_cat})")
article = await write_article(topic, call_claude)
if article:
articles.append(article)
else:
logger.warning(f"Artikel fehlgeschlagen: {topic_title}")
if not articles:
logger.warning("Keine Artikel geschrieben -- Pipeline beendet")
return
logger.info(f"{len(articles)} Artikel geschrieben")
# 3. An Blog pushen
logger.info("Schritt 3: An Blog pushen...")
api_key = read_api_key()
try:
result = push_to_blog(articles, api_key)
logger.info(f"Push-Ergebnis: {result['accepted']} akzeptiert, {result.get('rejected', 0)} abgelehnt")
except Exception as e:
logger.error(f"Push fehlgeschlagen: {e}")
return
logger.info("=== Blog-Pipeline abgeschlossen ===")
def main():
asyncio.run(run_pipeline())
if __name__ == "__main__":
main()

Datei anzeigen

@@ -1,162 +0,0 @@
"""BlogWriter -- Schreibt Blog-Artikel aus Curator-Themen."""
import json
import logging
import sqlite3
from datetime import datetime, timedelta, timezone
logger = logging.getLogger("blog.writer")
def _extract_json(text):
"""Extrahiert JSON aus Claude-Antworten (robust)."""
text = text.strip()
# Typographische Anfuehrungszeichen ersetzen (brechen JSON)
text = text.replace("", "'").replace("", "'").replace("", "'")
text = text.replace("«", "'").replace("»", "'")
# 1. Direktes Parsen versuchen
try:
return json.loads(text, strict=False)
except json.JSONDecodeError:
pass
# 2. Erstes JSON-Objekt oder Array finden
for open_c, close_c in [("{", "}"), ("[", "]")]:
start = text.find(open_c)
end = text.rfind(close_c)
if start != -1 and end > start:
candidate = text[start:end+1]
try:
return json.loads(candidate, strict=False)
except json.JSONDecodeError as e:
import logging; logging.getLogger("blog.writer.json").warning(f"Parse at {open_c}..{close_c}: {e.msg} at pos {e.pos}, ctx: {repr(candidate[max(0,e.pos-40):e.pos+40])}")
raise json.JSONDecodeError("Kein gueltiges JSON gefunden", text, 0)
DB_PATH = "/mnt/gitea/osint-data/osint.db"
def get_context_for_topic(topic: dict) -> str:
"""Holt relevante Daten aus der DB für ein Thema."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
context_parts = []
# Incident-Zusammenfassungen
for inc_id in topic.get("incident_ids", []):
cursor = conn.execute(
"SELECT title, summary, updated_at FROM incidents WHERE id = ?", (inc_id,)
)
inc = cursor.fetchone()
if inc:
summary = (dict(inc).get("summary") or "")[:2000]
context_parts.append(f"## Lage: {inc['title']}\n{summary}")
# Relevante Artikel der letzten 48h
cutoff = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
incident_ids = topic.get("incident_ids", [])
if incident_ids:
placeholders = ",".join("?" for _ in incident_ids)
cursor = conn.execute(
f"""SELECT headline_de, headline, source, source_url, content_de, published_at
FROM articles WHERE incident_id IN ({placeholders}) AND (collected_at > ? OR published_at > ?)
ORDER BY collected_at DESC LIMIT 20""",
(*incident_ids, cutoff, cutoff),
)
else:
cursor = conn.execute(
"""SELECT headline_de, headline, source, source_url, content_de, published_at
FROM articles WHERE collected_at > ? OR published_at > ?
ORDER BY collected_at DESC LIMIT 20""",
(cutoff, cutoff),
)
articles = [dict(r) for r in cursor.fetchall()]
if articles:
art_text = []
for a in articles:
title = a["headline_de"] or a["headline"] or ""
content = (a["content_de"] or "")[:500]
source = a["source"] or ""
url = a["source_url"] or ""
art_text.append(f"### {title}\nQuelle: {source} ({url})\n{content}")
context_parts.append("## Aktuelle Berichte\n" + "\n\n".join(art_text[:10]))
# Relevante Faktenchecks
if incident_ids:
placeholders = ",".join("?" for _ in incident_ids)
cursor = conn.execute(
f"""SELECT claim, status, evidence FROM fact_checks
WHERE incident_id IN ({placeholders}) ORDER BY checked_at DESC LIMIT 10""",
incident_ids,
)
fcs = [dict(r) for r in cursor.fetchall()]
if fcs:
fc_text = []
for fc in fcs:
evidence = (fc["evidence"] or "")[:300]
fc_text.append(f"- [{fc['status']}] {fc['claim']}\n Evidenz: {evidence}")
context_parts.append("## Faktenchecks\n" + "\n".join(fc_text))
conn.close()
return "\n\n".join(context_parts)
async def write_article(topic: dict, call_claude_fn) -> dict | None:
"""Schreibt einen Blog-Artikel zu einem Thema."""
context = get_context_for_topic(topic)
today = datetime.now(timezone.utc).strftime('%Y-%m-%d')
prompt = f"""Du bist Journalist beim OSINT-Blog "AegisSight Mosaic". Schreibe einen fundierten Artikel auf Deutsch.
THEMA: {topic["topic"]}
KATEGORIE: {topic["category"]}
BLICKWINKEL: {topic.get("angle", "")}
KONTEXT AUS DER OSINT-ANALYSE:
{context}
REGELN FÜR DEN ARTIKEL:
1. Schreibe wie ein professioneller Journalist, KEIN Lagebericht-Stil
2. Erzählerischer Fließtext mit Kontext und Einordnung
3. Verwende echte Umlaute (ü, ä, ö, ß)
4. Markdown-Format: ## für Zwischenüberschriften, **fett** für Betonung
5. WICHTIG: Verwende im Markdown-Text KEINE doppelten Anführungszeichen ("). Nutze stattdessen einfache Anführungszeichen (') oder *kursiv*. Doppelte Anführungszeichen brechen das JSON-Format.
6. 800-1500 Wörter, gut strukturiert
7. Nenne und verlinke Quellen im Text wo möglich
8. Meta-Description: 1 Satz, max 155 Zeichen, für Suchmaschinen
9. Am Ende: Einordnung/Ausblick (was bedeutet das?)
Antworte als JSON:
{{
"title": "Aussagekräftiger Titel (kein Clickbait)",
"content_markdown": "## Kompletter Artikel in Markdown...",
"meta_description": "Kurze Beschreibung für SEO (max 155 Zeichen)",
"sources": [
{{"title": "Quellenname", "url": "https://...", "accessed_at": "{today}"}}
],
"geo_data": null
}}
Falls das Thema einen geographischen Bezug hat, fülle geo_data:
{{
"center": [lat, lng],
"zoom": 5,
"markers": [{{"lat": 0, "lng": 0, "label": "Ort", "popup": "Beschreibung"}}]
}}"""
result, usage = await call_claude_fn(prompt, tools="WebSearch,WebFetch", model=None)
try:
article = _extract_json(result)
# Doppelt-encodiertes JSON abfangen
if isinstance(article, str):
article = json.loads(article, strict=False)
if not isinstance(article, dict):
logger.error(f"Writer: Unerwarteter Typ {type(article).__name__}")
return None
article["category"] = topic["category"]
article["monitor_event_ids"] = topic.get("incident_ids", [])
logger.info(f"Writer: Artikel '{article['title']}' geschrieben (${usage.cost_usd:.4f})")
return article
except (json.JSONDecodeError, IndexError, KeyError, TypeError) as e:
logger.error(f"Writer JSON-Parse-Fehler: {e} | repr: {repr(result[:200])} | has_brace: {chr(123) in result}")
return None