feat: Blog-Pipeline (Curator + Writer + Push)

Dieser Commit ist enthalten in:
Claude Dev
2026-03-29 03:35:43 +02:00
Ursprung cb851ee72d
Commit b3c8cf2676
4 geänderte Dateien mit 364 neuen und 0 gelöschten Zeilen

Datei anzeigen

@@ -0,0 +1,103 @@
"""Blog-Pipeline: Curator -> Writer -> Push zum Blog."""
import asyncio
import json
import logging
import ssl
import sys
import urllib.request
from pathlib import Path
# Projekt-Root zum Path hinzufügen
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
from src.agents.claude_client import call_claude
from src.agents.blog.blog_curator import curate_topics
from src.agents.blog.blog_writer import write_article
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
handlers=[
logging.StreamHandler(),
],
)
logger = logging.getLogger("blog.pipeline")
BLOG_API_URL = "https://blog.aegis-sight.de/api/ingest/drafts"
API_KEY_FILE = "/home/claude-dev/.blog-api-key"
def read_api_key() -> str:
try:
return open(API_KEY_FILE).read().strip()
except FileNotFoundError:
logger.error(f"API-Key-Datei nicht gefunden: {API_KEY_FILE}")
sys.exit(1)
def push_to_blog(articles: list[dict], api_key: str) -> dict:
"""Pushed Artikel-Entwürfe an die Blog Ingest API."""
data = json.dumps({"articles": articles}).encode("utf-8")
req = urllib.request.Request(
BLOG_API_URL,
data=data,
headers={
"Content-Type": "application/json",
"X-API-Key": api_key,
},
method="POST",
)
ctx = ssl.create_default_context()
with urllib.request.urlopen(req, timeout=30, context=ctx) as resp:
return json.loads(resp.read().decode("utf-8"))
async def run_pipeline():
"""Führt die komplette Blog-Pipeline aus."""
logger.info("=== Blog-Pipeline gestartet ===")
# 1. Themen auswählen
logger.info("Schritt 1: Themen auswählen...")
topics = await curate_topics(call_claude)
if not topics:
logger.warning("Keine Themen ausgewählt -- Pipeline beendet")
return
logger.info(f"{len(topics)} Themen ausgewählt: {[t['topic'] for t in topics]}")
# 2. Artikel schreiben
logger.info("Schritt 2: Artikel schreiben...")
articles = []
for topic in topics:
logger.info(f"Schreibe: {topic['topic']} ({topic['category']})")
article = await write_article(topic, call_claude)
if article:
articles.append(article)
else:
logger.warning(f"Artikel fehlgeschlagen: {topic['topic']}")
if not articles:
logger.warning("Keine Artikel geschrieben -- Pipeline beendet")
return
logger.info(f"{len(articles)} Artikel geschrieben")
# 3. An Blog pushen
logger.info("Schritt 3: An Blog pushen...")
api_key = read_api_key()
try:
result = push_to_blog(articles, api_key)
logger.info(f"Push-Ergebnis: {result['accepted']} akzeptiert, {result.get('rejected', 0)} abgelehnt")
except Exception as e:
logger.error(f"Push fehlgeschlagen: {e}")
return
logger.info("=== Blog-Pipeline abgeschlossen ===")
def main():
asyncio.run(run_pipeline())
if __name__ == "__main__":
main()