Export-System: PDF/Word mit Executive Summary, Deckblatt, Klassifizierung

- Neuer report_generator.py: WeasyPrint (PDF) + python-docx (Word)
- 3 Stufen: Executive Summary (KI-generiert), Lagebericht, Vollständiger Bericht
- 3 Klassifizierungsstufen: Offen, Nur für den Dienstgebrauch, Vertraulich
- Deckblatt mit AegisSight Logo, Titel, Typ, Klassifizierung
- Executive Summary: Claude Haiku verdichtet Lagebild auf 3-5 Kernpunkte
- Jinja2 HTML-Template für PDF (A4-optimiert)
- Alte Exporte entfernt (Markdown, JSON, Browser-Print)
- Neues Export-Modal im Dashboard (Umfang/Format/Stufe)
Dieser Commit ist enthalten in:
Claude Dev
2026-03-25 01:28:47 +01:00
Ursprung 8feaac3320
Commit f7deafd14a
6 geänderte Dateien mit 678 neuen und 458 gelöschten Zeilen

387
src/report_generator.py Normale Datei
Datei anzeigen

@@ -0,0 +1,387 @@
"""Report-Generator: PDF und Word Berichte aus Lage-Daten."""
import base64
import io
import json
import logging
import re
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from weasyprint import HTML
from docx import Document
from docx.shared import Inches, Pt, Cm, RGBColor
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.enum.table import WD_TABLE_ALIGNMENT
from config import TIMEZONE, CLAUDE_MODEL_FAST
logger = logging.getLogger("osint.report")
TEMPLATE_DIR = Path(__file__).parent / "report_templates"
LOGO_PATH = Path(__file__).parent / "static" / "favicon.svg"
CLASSIFICATION_LABELS = {
"offen": "Offen",
"dienstgebrauch": "Nur für den Dienstgebrauch",
"vertraulich": "Vertraulich",
}
FC_STATUS_LABELS = {
"confirmed": "Bestätigt",
"unconfirmed": "Unbestätigt",
"disputed": "Umstritten",
"false": "Falsch",
}
def _get_logo_base64() -> str:
"""Logo als Base64 für HTML-Embedding."""
try:
return base64.b64encode(LOGO_PATH.read_bytes()).decode()
except Exception:
return ""
def _prepare_sources(incident: dict) -> list:
"""Quellenverzeichnis aus sources_json parsen."""
raw = incident.get("sources_json")
if not raw:
return []
try:
return json.loads(raw) if isinstance(raw, str) else raw
except (json.JSONDecodeError, TypeError):
return []
def _prepare_source_stats(articles: list) -> list:
"""Quellenstatistik: Artikel pro Quelle + Sprachen."""
source_map = defaultdict(lambda: {"count": 0, "langs": set()})
for art in articles:
name = art.get("source") or "Unbekannt"
source_map[name]["count"] += 1
source_map[name]["langs"].add((art.get("language") or "de").upper())
stats = []
for name, data in sorted(source_map.items(), key=lambda x: -x[1]["count"]):
stats.append({"name": name, "count": data["count"], "languages": ", ".join(sorted(data["langs"]))})
return stats
def _prepare_fact_checks(fact_checks: list) -> list:
"""Faktenchecks mit Label aufbereiten."""
result = []
for fc in fact_checks:
fc_copy = dict(fc)
fc_copy["status_label"] = FC_STATUS_LABELS.get(fc.get("status", ""), fc.get("status", "Unbekannt"))
result.append(fc_copy)
return result
def _prepare_timeline(articles: list) -> list:
"""Timeline aus Artikeln: sortiert nach Datum."""
timeline = []
for art in articles:
pub = art.get("published_at") or art.get("collected_at") or ""
headline = art.get("headline_de") or art.get("headline") or "Ohne Titel"
source = art.get("source") or ""
if pub:
try:
dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
date_str = dt.strftime("%d.%m.%Y %H:%M")
except Exception:
date_str = pub[:16]
else:
date_str = ""
timeline.append({"date": date_str, "headline": headline, "source": source, "sort_key": pub})
timeline.sort(key=lambda x: x["sort_key"], reverse=True)
return timeline[:100] # Max 100 Einträge
def _markdown_to_html(text: str) -> str:
"""Einfache Markdown -> HTML Konvertierung für Lagebild."""
if not text:
return "<p><em>Kein Lagebild verfügbar.</em></p>"
# Basic Markdown -> HTML
html = text
# Headlines
html = re.sub(r'^### (.+)$', r'<h3>\1</h3>', html, flags=re.MULTILINE)
html = re.sub(r'^## (.+)$', r'<h3>\1</h3>', html, flags=re.MULTILINE)
# Bold
html = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', html)
# Links [text](url)
html = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'<a href="\2">\1</a>', html)
# Bullet lists
html = re.sub(r'^- (.+)$', r'<li>\1</li>', html, flags=re.MULTILINE)
html = re.sub(r'(<li>.*</li>\n?)+', lambda m: '<ul>' + m.group(0) + '</ul>', html)
# Paragraphs
paragraphs = html.split('\n\n')
result = []
for p in paragraphs:
p = p.strip()
if not p:
continue
if p.startswith('<h') or p.startswith('<ul') or p.startswith('<ol'):
result.append(p)
else:
result.append(f'<p>{p}</p>')
return '\n'.join(result)
async def generate_executive_summary(summary_text: str) -> str:
"""KI-verdichtetes Executive Summary aus dem Lagebild."""
if not summary_text or len(summary_text.strip()) < 50:
return "<ul><li>Kein Lagebild verfügbar. Executive Summary kann nicht erstellt werden.</li></ul>"
from agents.claude_client import call_claude
prompt = f"""Du bist ein Intelligence-Analyst für ein OSINT-Lagemonitoring-System.
Verdichte das folgende Lagebild auf genau 3-5 Kernpunkte.
REGELN:
- Jeder Punkt: 1-2 Sätze, faktenbasiert
- Fokus: Was ist passiert? Was bedeutet es? Was ist die aktuelle Dynamik?
- Sprache: Deutsch, sachlich, prägnant
- Format: Gib NUR die Bullet Points aus, einen pro Zeile, mit "- " am Anfang
- KEINE Einleitung, KEINE Überschrift, NUR die Punkte
LAGEBILD:
{summary_text}"""
try:
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
# In HTML-Liste umwandeln
lines = [line.strip().lstrip("- ").lstrip("* ") for line in result.strip().split("\n") if line.strip().startswith(("-", "*"))]
if not lines:
lines = [result.strip()]
html = "<ul>\n" + "\n".join(f"<li>{line}</li>" for line in lines if line) + "\n</ul>"
return html
except Exception as e:
logger.error(f"Executive Summary Generierung fehlgeschlagen: {e}")
return "<ul><li>Executive Summary konnte nicht generiert werden.</li></ul>"
async def generate_pdf(
incident: dict, articles: list, fact_checks: list, snapshots: list,
scope: str, classification: str, creator: str, executive_summary_html: str,
) -> bytes:
"""PDF-Report via WeasyPrint generieren."""
env = Environment(loader=FileSystemLoader(str(TEMPLATE_DIR)))
template = env.get_template("report.html")
now = datetime.now(TIMEZONE)
incident_type_label = "Hintergrundrecherche" if incident.get("type") == "research" else "Live-Monitoring"
html_content = template.render(
incident=incident,
incident_type_label=incident_type_label,
classification=classification,
classification_label=CLASSIFICATION_LABELS.get(classification, classification),
report_date=now.strftime("%d.%m.%Y, %H:%M Uhr"),
creator=creator,
logo_base64=_get_logo_base64(),
executive_summary=executive_summary_html,
scope=scope,
lagebild_html=_markdown_to_html(incident.get("summary", "")),
lagebild_timestamp=(incident.get("updated_at") or "")[:16].replace("T", " "),
sources=_prepare_sources(incident),
fact_checks=_prepare_fact_checks(fact_checks),
source_stats=_prepare_source_stats(articles),
timeline=_prepare_timeline(articles) if scope == "full" else [],
articles=articles if scope == "full" else [],
)
# Artikel pub_date aufbereiten
for art in articles:
pub = art.get("published_at") or art.get("collected_at") or ""
try:
dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
art["pub_date"] = dt.strftime("%d.%m.%Y")
except Exception:
art["pub_date"] = pub[:10] if pub else ""
pdf_bytes = HTML(string=html_content).write_pdf()
return pdf_bytes
async def generate_docx(
incident: dict, articles: list, fact_checks: list, snapshots: list,
scope: str, classification: str, creator: str, executive_summary_text: str,
) -> bytes:
"""Word-Report via python-docx generieren."""
doc = Document()
# Styles
style = doc.styles['Normal']
style.font.size = Pt(10)
style.font.name = 'Calibri'
# --- Deckblatt ---
for _ in range(6):
doc.add_paragraph()
title_para = doc.add_paragraph()
title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = title_para.add_run("AegisSight Monitor")
run.font.size = Pt(12)
run.font.color.rgb = RGBColor(0x88, 0x88, 0x88)
doc.add_paragraph()
type_label = "Hintergrundrecherche" if incident.get("type") == "research" else "Live-Monitoring"
type_para = doc.add_paragraph()
type_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = type_para.add_run(type_label)
run.font.size = Pt(10)
run.font.color.rgb = RGBColor(0x88, 0x88, 0x88)
title_para2 = doc.add_paragraph()
title_para2.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = title_para2.add_run(incident.get("title", ""))
run.font.size = Pt(24)
run.font.bold = True
run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32)
if incident.get("description"):
desc_para = doc.add_paragraph()
desc_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = desc_para.add_run(incident["description"])
run.font.size = Pt(11)
run.font.color.rgb = RGBColor(0x66, 0x66, 0x66)
doc.add_paragraph()
# Klassifizierung
class_label = CLASSIFICATION_LABELS.get(classification, classification)
class_para = doc.add_paragraph()
class_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = class_para.add_run(f"{class_label}")
run.font.size = Pt(11)
run.font.bold = True
colors = {"offen": RGBColor(0x22, 0xc5, 0x5e), "dienstgebrauch": RGBColor(0xf0, 0xb4, 0x29), "vertraulich": RGBColor(0xef, 0x44, 0x44)}
run.font.color.rgb = colors.get(classification, RGBColor(0x88, 0x88, 0x88))
for _ in range(3):
doc.add_paragraph()
now = datetime.now(TIMEZONE)
meta_para = doc.add_paragraph()
meta_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = meta_para.add_run(f"Stand: {now.strftime('%d.%m.%Y, %H:%M Uhr')}\nErstellt von: {creator}")
run.font.size = Pt(9)
run.font.color.rgb = RGBColor(0x88, 0x88, 0x88)
doc.add_page_break()
# --- Executive Summary ---
doc.add_heading("Executive Summary", level=1)
# HTML-Tags entfernen und als Bullet Points
clean_text = re.sub(r'<[^>]+>', '', executive_summary_text)
lines = [line.strip().lstrip("- ").lstrip("* ") for line in clean_text.strip().split("\n") if line.strip()]
for line in lines:
if line:
doc.add_paragraph(line, style='List Bullet')
if scope in ("report", "full"):
# --- Lagebild ---
doc.add_heading("Lagebild", level=1)
summary = incident.get("summary") or "Kein Lagebild verfügbar."
# Markdown-Formatierung entfernen
clean_summary = re.sub(r'\*\*(.+?)\*\*', r'\1', summary)
clean_summary = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', clean_summary)
clean_summary = re.sub(r'^#{1,3}\s+', '', clean_summary, flags=re.MULTILINE)
for para_text in clean_summary.split("\n\n"):
para_text = para_text.strip()
if para_text:
if para_text.startswith("- "):
for bullet in para_text.split("\n"):
bullet = bullet.lstrip("- ").strip()
if bullet:
doc.add_paragraph(bullet, style='List Bullet')
else:
doc.add_paragraph(para_text)
# --- Faktencheck ---
if fact_checks:
doc.add_heading("Faktencheck", level=1)
table = doc.add_table(rows=1, cols=3)
table.style = 'Table Grid'
table.alignment = WD_TABLE_ALIGNMENT.CENTER
hdr = table.rows[0].cells
hdr[0].text = "Behauptung"
hdr[1].text = "Status"
hdr[2].text = "Quellen"
for cell in hdr:
for p in cell.paragraphs:
p.runs[0].font.bold = True
p.runs[0].font.size = Pt(9)
for fc in fact_checks:
row = table.add_row().cells
row[0].text = fc.get("claim", "")
row[1].text = FC_STATUS_LABELS.get(fc.get("status", ""), fc.get("status", ""))
row[2].text = str(fc.get("sources_count", 0))
# --- Quellenstatistik ---
source_stats = _prepare_source_stats(articles)
if source_stats:
doc.add_heading("Quellenstatistik", level=1)
table = doc.add_table(rows=1, cols=3)
table.style = 'Table Grid'
table.alignment = WD_TABLE_ALIGNMENT.CENTER
hdr = table.rows[0].cells
hdr[0].text = "Quelle"
hdr[1].text = "Artikel"
hdr[2].text = "Sprache"
for cell in hdr:
for p in cell.paragraphs:
p.runs[0].font.bold = True
p.runs[0].font.size = Pt(9)
for stat in source_stats:
row = table.add_row().cells
row[0].text = stat["name"]
row[1].text = str(stat["count"])
row[2].text = stat["languages"]
if scope == "full":
# --- Artikelverzeichnis ---
if articles:
doc.add_page_break()
doc.add_heading(f"Artikelverzeichnis ({len(articles)} Artikel)", level=1)
table = doc.add_table(rows=1, cols=4)
table.style = 'Table Grid'
table.alignment = WD_TABLE_ALIGNMENT.CENTER
hdr = table.rows[0].cells
for i, txt in enumerate(["Headline", "Quelle", "Sprache", "Datum"]):
hdr[i].text = txt
for p in hdr[i].paragraphs:
p.runs[0].font.bold = True
p.runs[0].font.size = Pt(8)
for art in articles:
row = table.add_row().cells
row[0].text = art.get("headline_de") or art.get("headline") or "Ohne Titel"
row[1].text = art.get("source") or ""
row[2].text = (art.get("language") or "de").upper()
pub = art.get("published_at") or art.get("collected_at") or ""
try:
dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
row[3].text = dt.strftime("%d.%m.%Y")
except Exception:
row[3].text = pub[:10] if pub else ""
# Schriftgröße reduzieren
for cell in row:
for p in cell.paragraphs:
for run in p.runs:
run.font.size = Pt(8)
# --- Footer ---
doc.add_paragraph()
footer = doc.add_paragraph()
footer.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = footer.add_run(f"Erstellt mit AegisSight Monitor — aegis-sight.de — {now.strftime('%d.%m.%Y')}")
run.font.size = Pt(8)
run.font.color.rgb = RGBColor(0x99, 0x99, 0x99)
buf = io.BytesIO()
doc.save(buf)
return buf.getvalue()