1. Faktencheck immer vollständig
PDF-Export hatte im scope=report einen [:20]-Cap, der vollständige
Faktencheck wurde nur bei scope=full gerendert. Jetzt ungekürzt
überall, sortiert chronologisch absteigend (DB-Sortierung).
2. Status-Labels aus Frontend übernommen
FC_STATUS_LABELS hatte nur 4 Werte; in der DB existieren aber 7+
(confirmed/unconfirmed/contradicted/developing/established/
unverified/disputed). Folge: "contradicted" und drei weitere
wurden auf englisch ausgegeben. Jetzt 1:1 vom Monitor-UI:
contradicted → "Widerlegt"
developing → "Unklar"
established → "Gesichert"
unverified → "Ungeprüft"
3. Adhoc-Export: Neueste Entwicklungen statt Executive Summary
Bei Live-Monitoring-Lagen ist die generische Executive Summary
weniger aussagekräftig als die kompakten "Neueste Entwicklungen"-
Bullets. Endpoint nutzt jetzt:
- adhoc + latest_developments vorhanden → latest_developments
(Markdown -> HTML konvertiert)
- adhoc + leer → cached/generierte Executive Summary (Fallback)
- research → unverändert Executive Summary
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
966 Zeilen
37 KiB
Python
966 Zeilen
37 KiB
Python
"""Report-Generator: PDF und Word Berichte aus Lage-Daten."""
|
|
import base64
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
import uuid
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import pikepdf
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from weasyprint import HTML
|
|
from docx import Document
|
|
from docx.shared import Inches, Pt, Cm, RGBColor
|
|
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
|
from docx.enum.table import WD_TABLE_ALIGNMENT
|
|
|
|
from config import TIMEZONE, CLAUDE_MODEL_FAST
|
|
|
|
logger = logging.getLogger("osint.report")
|
|
|
|
TEMPLATE_DIR = Path(__file__).parent / "report_templates"
|
|
LOGO_PATH = Path(__file__).parent / "static" / "favicon.svg"
|
|
|
|
|
|
FC_STATUS_LABELS = {
|
|
# 1:1 vom Monitor-Frontend (components.js) — konsistent zum UI.
|
|
"confirmed": "Bestätigt",
|
|
"unconfirmed": "Unbestätigt",
|
|
"contradicted": "Widerlegt",
|
|
"developing": "Unklar",
|
|
"established": "Gesichert",
|
|
"disputed": "Umstritten",
|
|
"unverified": "Ungeprüft",
|
|
"false": "Falsch", # Legacy-Fallback
|
|
}
|
|
|
|
|
|
def _get_logo_base64() -> str:
|
|
"""Logo als Base64 für HTML-Embedding."""
|
|
try:
|
|
return base64.b64encode(LOGO_PATH.read_bytes()).decode()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _prepare_sources(incident: dict) -> list:
|
|
"""Quellenverzeichnis aus sources_json parsen."""
|
|
raw = incident.get("sources_json")
|
|
if not raw:
|
|
return []
|
|
try:
|
|
return json.loads(raw) if isinstance(raw, str) else raw
|
|
except (json.JSONDecodeError, TypeError):
|
|
return []
|
|
|
|
|
|
def _prepare_source_stats(articles: list) -> list:
|
|
"""Quellenstatistik: Artikel pro Quelle + Sprachen."""
|
|
source_map = defaultdict(lambda: {"count": 0, "langs": set()})
|
|
for art in articles:
|
|
name = art.get("source") or "Unbekannt"
|
|
source_map[name]["count"] += 1
|
|
source_map[name]["langs"].add((art.get("language") or "de").upper())
|
|
stats = []
|
|
for name, data in sorted(source_map.items(), key=lambda x: -x[1]["count"]):
|
|
stats.append({"name": name, "count": data["count"], "languages": ", ".join(sorted(data["langs"]))})
|
|
return stats
|
|
|
|
|
|
def _prepare_fact_checks(fact_checks: list) -> list:
|
|
"""Faktenchecks mit Label aufbereiten."""
|
|
result = []
|
|
for fc in fact_checks:
|
|
fc_copy = dict(fc)
|
|
fc_copy["status_label"] = FC_STATUS_LABELS.get(fc.get("status", ""), fc.get("status", "Unbekannt"))
|
|
result.append(fc_copy)
|
|
return result
|
|
|
|
|
|
def _prepare_timeline(articles: list) -> list:
|
|
"""Timeline aus Artikeln: sortiert nach Datum."""
|
|
timeline = []
|
|
for art in articles:
|
|
pub = art.get("published_at") or art.get("collected_at") or ""
|
|
pub = str(pub) if pub else ""
|
|
headline = art.get("headline_de") or art.get("headline") or "Ohne Titel"
|
|
source = art.get("source") or ""
|
|
if pub:
|
|
try:
|
|
dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
|
|
date_str = dt.strftime("%d.%m.%Y %H:%M")
|
|
except Exception:
|
|
date_str = pub[:16]
|
|
else:
|
|
date_str = ""
|
|
timeline.append({"date": date_str, "headline": headline, "source": source, "sort_key": pub})
|
|
timeline.sort(key=lambda x: x["sort_key"], reverse=True)
|
|
return timeline[:100] # Max 100 Einträge
|
|
|
|
|
|
def _markdown_to_html(text: str) -> str:
|
|
"""Einfache Markdown -> HTML Konvertierung für Lagebild."""
|
|
if not text:
|
|
return "<p><em>Keine Zusammenfassung verfügbar.</em></p>"
|
|
# Basic Markdown -> HTML
|
|
html = text
|
|
# Headlines
|
|
html = re.sub(r'^### (.+)$', r'<h3>\1</h3>', html, flags=re.MULTILINE)
|
|
html = re.sub(r'^## (.+)$', r'<h3>\1</h3>', html, flags=re.MULTILINE)
|
|
# Bold
|
|
html = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', html)
|
|
# Links [text](url)
|
|
html = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'<a href="\2">\1</a>', html)
|
|
# Bullet lists
|
|
html = re.sub(r'^- (.+)$', r'<li>\1</li>', html, flags=re.MULTILINE)
|
|
html = re.sub(r'(<li>.*</li>\n?)+', lambda m: '<ul>' + m.group(0) + '</ul>', html)
|
|
# Paragraphs
|
|
paragraphs = html.split('\n\n')
|
|
result = []
|
|
for p in paragraphs:
|
|
p = p.strip()
|
|
if not p:
|
|
continue
|
|
if p.startswith('<h') or p.startswith('<ul') or p.startswith('<ol'):
|
|
result.append(p)
|
|
else:
|
|
result.append(f'<p>{p}</p>')
|
|
return '\n'.join(result)
|
|
|
|
|
|
def _truncate_lagebild(summary_text: str, max_chars: int = 4000) -> str:
|
|
"""Lagebild für den Lagebericht auf die Zusammenfassung kürzen.
|
|
|
|
Nimmt nur den ersten Abschnitt (bis zur zweiten H2/H3-Überschrift)
|
|
oder kürzt auf max_chars Zeichen mit sauberem Abbruch am Absatzende.
|
|
"""
|
|
if not summary_text or len(summary_text) <= max_chars:
|
|
return summary_text
|
|
|
|
lines = summary_text.split("\n")
|
|
result_lines = []
|
|
heading_count = 0
|
|
char_count = 0
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
# Zähle Überschriften (## oder ###)
|
|
if stripped.startswith("## ") or stripped.startswith("### "):
|
|
heading_count += 1
|
|
# Nach der 3. Überschrift abbrechen (= 2 Abschnitte)
|
|
if heading_count > 3:
|
|
break
|
|
|
|
result_lines.append(line)
|
|
char_count += len(line) + 1
|
|
|
|
# Hard-Limit bei max_chars, aber am Absatzende abbrechen
|
|
if char_count > max_chars and stripped == "":
|
|
break
|
|
|
|
text = "\n".join(result_lines).rstrip()
|
|
if len(text) < len(summary_text) - 100:
|
|
text += "\n\n*[Vollständige Zusammenfassung im Vollständigen Bericht]*"
|
|
return text
|
|
|
|
|
|
def _strip_citation_numbers(text: str) -> str:
|
|
"""Entfernt [1234]-Quellenreferenzen aus dem Text."""
|
|
# Einzelne Referenzen: [1302]
|
|
text = re.sub(r"\s*\[\d{1,5}\]", "", text)
|
|
# Mehrfach-Referenzen: [725][765][768]
|
|
text = re.sub(r"(\[\d{1,5}\]){2,}", "", text)
|
|
# Aufräumen: Doppelte Leerzeichen
|
|
text = re.sub(r" +", " ", text)
|
|
return text
|
|
|
|
|
|
def _find_source_for_citation(num: str, sources: list) -> dict | None:
|
|
"""Sucht eine Quelle anhand der Zitat-Nummer (inkl. Suffix-Fallback wie 1383a -> 1383)."""
|
|
if not sources:
|
|
return None
|
|
for s in sources:
|
|
try:
|
|
if str(s.get("nr")) == num:
|
|
return s
|
|
except Exception:
|
|
continue
|
|
# Suffix-Fallback: 1383a -> 1383
|
|
if re.search(r"[a-z]$", num):
|
|
base = re.sub(r"[a-z]$", "", num)
|
|
for s in sources:
|
|
if str(s.get("nr")) == base:
|
|
return s
|
|
return None
|
|
|
|
|
|
def _linkify_citations_html(text: str, sources: list) -> str:
|
|
"""Ersetzt [1234]-Zitate durch HTML-Links zur jeweiligen Quelle.
|
|
|
|
Nummern ohne zugeordnete Quelle bleiben als sichtbare Zahl erhalten.
|
|
"""
|
|
if not text:
|
|
return text
|
|
if not sources:
|
|
return text
|
|
|
|
def repl(match: re.Match) -> str:
|
|
num = match.group(1)
|
|
src = _find_source_for_citation(num, sources)
|
|
if src and src.get("url"):
|
|
url = src["url"].replace('"', """)
|
|
name = (src.get("name") or "").replace('"', """)
|
|
return f'<a href="{url}" class="citation" title="{name}">[{num}]</a>'
|
|
return match.group(0)
|
|
|
|
return re.sub(r"\[(\d{1,5}[a-z]?)\]", repl, text)
|
|
|
|
|
|
def _add_docx_hyperlink(paragraph, url: str, text: str):
|
|
"""Fügt einen klickbaren Hyperlink in ein python-docx-Paragraph-Objekt ein."""
|
|
from docx.oxml.shared import OxmlElement, qn
|
|
|
|
part = paragraph.part
|
|
r_id = part.relate_to(
|
|
url,
|
|
"http://schemas.openxmlformats.org/officeDocument/2006/relationships/hyperlink",
|
|
is_external=True,
|
|
)
|
|
hyperlink = OxmlElement("w:hyperlink")
|
|
hyperlink.set(qn("r:id"), r_id)
|
|
|
|
new_run = OxmlElement("w:r")
|
|
rPr = OxmlElement("w:rPr")
|
|
color = OxmlElement("w:color")
|
|
color.set(qn("w:val"), "0066CC")
|
|
rPr.append(color)
|
|
u = OxmlElement("w:u")
|
|
u.set(qn("w:val"), "single")
|
|
rPr.append(u)
|
|
sz = OxmlElement("w:sz")
|
|
sz.set(qn("w:val"), "20")
|
|
rPr.append(sz)
|
|
new_run.append(rPr)
|
|
|
|
t = OxmlElement("w:t")
|
|
t.text = text
|
|
t.set(qn("xml:space"), "preserve")
|
|
new_run.append(t)
|
|
hyperlink.append(new_run)
|
|
paragraph._p.append(hyperlink)
|
|
return hyperlink
|
|
|
|
|
|
def _add_docx_paragraph_with_citations(doc_or_para, text: str, sources: list, style: str | None = None):
|
|
"""Fügt ein Paragraph hinzu, bei dem [1234]-Zitate als Hyperlink-Runs eingefügt werden.
|
|
|
|
doc_or_para darf ein Document sein (neues Paragraph wird angelegt) oder bereits ein Paragraph.
|
|
"""
|
|
if hasattr(doc_or_para, "add_paragraph"):
|
|
para = doc_or_para.add_paragraph(style=style) if style else doc_or_para.add_paragraph()
|
|
else:
|
|
para = doc_or_para
|
|
|
|
pattern = re.compile(r"\[(\d{1,5}[a-z]?)\]")
|
|
pos = 0
|
|
for m in pattern.finditer(text):
|
|
if m.start() > pos:
|
|
para.add_run(text[pos:m.start()])
|
|
num = m.group(1)
|
|
src = _find_source_for_citation(num, sources)
|
|
if src and src.get("url"):
|
|
_add_docx_hyperlink(para, src["url"], f"[{num}]")
|
|
else:
|
|
para.add_run(m.group(0))
|
|
pos = m.end()
|
|
if pos < len(text):
|
|
para.add_run(text[pos:])
|
|
return para
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_zusammenfassung_lines(summary_text: str) -> tuple[list[str], str]:
|
|
"""Extrahiert die ZUSAMMENFASSUNG-Sektion als Liste von Rohzeilen (ohne Zitatbearbeitung).
|
|
|
|
Returns:
|
|
(lines, remaining_summary)
|
|
"""
|
|
if not summary_text:
|
|
return [], summary_text
|
|
|
|
pattern = r"(## (?:ZUSAMMENFASSUNG|ÜBERBLICK)\s*\n)(.*?)(?=\n## |\Z)"
|
|
match = re.search(pattern, summary_text, re.DOTALL)
|
|
if not match:
|
|
return [], summary_text
|
|
|
|
zusammenfassung_raw = match.group(2).strip()
|
|
remaining = summary_text[:match.start()] + summary_text[match.end():]
|
|
remaining = remaining.strip()
|
|
|
|
lines: list[str] = []
|
|
for line in zusammenfassung_raw.split("\n"):
|
|
stripped = line.strip()
|
|
if stripped.startswith("- ") or stripped.startswith("* "):
|
|
content = stripped[2:].strip()
|
|
if content:
|
|
lines.append(content)
|
|
elif stripped and not stripped.startswith("#"):
|
|
lines.append(stripped)
|
|
return lines, remaining
|
|
|
|
|
|
def _extract_zusammenfassung(summary_text: str, sources: list | None = None) -> tuple[str, str]:
|
|
"""Extrahiert die ZUSAMMENFASSUNG-Sektion und liefert sie als HTML mit verlinkten Zitaten."""
|
|
lines, remaining = _extract_zusammenfassung_lines(summary_text)
|
|
if not lines:
|
|
return "", summary_text
|
|
|
|
src_list = sources or []
|
|
html_lines = [f"<li>{_linkify_citations_html(line, src_list)}</li>" for line in lines]
|
|
html = "<ul>\n" + "\n".join(html_lines) + "\n</ul>"
|
|
return html, remaining
|
|
|
|
|
|
async def generate_executive_summary(summary_text: str) -> str:
|
|
"""KI-verdichtetes Executive Summary aus dem Lagebild."""
|
|
if not summary_text or len(summary_text.strip()) < 50:
|
|
return "<ul><li>Kein Lagebild verfügbar. Zusammenfassung kann nicht erstellt werden.</li></ul>"
|
|
|
|
from agents.claude_client import call_claude
|
|
|
|
prompt = f"""Du bist ein Intelligence-Analyst für ein OSINT-Lagemonitoring-System.
|
|
Verdichte das folgende Lagebild auf genau 3-5 Kernpunkte.
|
|
|
|
REGELN:
|
|
- Jeder Punkt: 1-2 Sätze, faktenbasiert
|
|
- Fokus: Was ist passiert? Was bedeutet es? Was ist die aktuelle Dynamik?
|
|
- Sprache: Deutsch, sachlich, prägnant
|
|
- Format: Gib NUR die Bullet Points aus, einen pro Zeile, mit "- " am Anfang
|
|
- KEINE Einleitung, KEINE Überschrift, NUR die Punkte
|
|
|
|
LAGEBILD:
|
|
{summary_text}"""
|
|
|
|
try:
|
|
result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
|
# Robuster Parser: Akzeptiert JSON, Markdown-Listen oder Freitext
|
|
lines = []
|
|
text = result.strip()
|
|
# Code-Fences entfernen (```json ... ```)
|
|
if text.startswith("```"):
|
|
text = re.sub(r"^```\w*\n?", "", text)
|
|
text = re.sub(r"\n?```$", "", text)
|
|
text = text.strip()
|
|
|
|
# Fall 1: JSON-Antwort (Haiku gibt manchmal JSON zurück)
|
|
if text.startswith("{"):
|
|
try:
|
|
data = json.loads(text)
|
|
for key in data:
|
|
if isinstance(data[key], list):
|
|
for item in data[key]:
|
|
clean = str(item).strip().lstrip("- ").lstrip("* ")
|
|
if clean:
|
|
lines.append(clean)
|
|
break
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fall 2: Markdown Bullet Points
|
|
if not lines:
|
|
for line in text.split("\n"):
|
|
stripped = line.strip()
|
|
if stripped.startswith(("- ", "* ")):
|
|
clean = stripped.lstrip("- ").lstrip("* ").strip()
|
|
if clean:
|
|
lines.append(clean)
|
|
|
|
# Fall 3: Nummerierte Liste (1. 2. 3.)
|
|
if not lines:
|
|
for line in text.split("\n"):
|
|
m = re.match(r"^\d+\.\s+(.+)", line.strip())
|
|
if m:
|
|
lines.append(m.group(1).strip())
|
|
|
|
# Fallback: Ganzen Text als einen Punkt
|
|
if not lines:
|
|
lines = [text[:500]]
|
|
|
|
html = "<ul>\n" + "\n".join(f"<li>{line}</li>" for line in lines if line) + "\n</ul>"
|
|
return html
|
|
except Exception as e:
|
|
logger.error(f"Executive Summary Generierung fehlgeschlagen: {e}")
|
|
return "<ul><li>Zusammenfassung konnte nicht generiert werden.</li></ul>"
|
|
|
|
|
|
def _parse_db_timestamp(value) -> datetime | None:
|
|
"""SQLite-Timestamp robust als datetime parsen (ISO oder 'YYYY-MM-DD HH:MM:SS')."""
|
|
if not value:
|
|
return None
|
|
if isinstance(value, datetime):
|
|
return value
|
|
try:
|
|
text = str(value).replace("T", " ").replace("Z", "")
|
|
# Sekundenbruchteile und Timezone-Offset abschneiden (python-docx mag nur naive dt)
|
|
text = text.split(".")[0].split("+")[0].strip()
|
|
return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
|
|
except (ValueError, TypeError):
|
|
try:
|
|
return datetime.strptime(str(value)[:10], "%Y-%m-%d")
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def _slug_scope_label(scope: str, sections: set[str] | None) -> str:
|
|
"""Scope-Label fuer Metadaten und Dateinamen."""
|
|
if sections:
|
|
if sections == {"zusammenfassung"}:
|
|
return "Zusammenfassung"
|
|
if "timeline" in sections:
|
|
return "Vollständiger Bericht"
|
|
return "Lagebericht"
|
|
return {"summary": "Zusammenfassung", "report": "Lagebericht", "full": "Vollständiger Bericht"}.get(
|
|
scope, "Lagebericht"
|
|
)
|
|
|
|
|
|
def _build_export_metadata(
|
|
incident: dict,
|
|
articles: list,
|
|
fact_checks: list,
|
|
sources: list,
|
|
creator: str,
|
|
scope: str,
|
|
sections: set[str] | None,
|
|
organization_name: str | None,
|
|
top_locations: list[str] | None,
|
|
snapshot_count: int = 0,
|
|
) -> dict:
|
|
"""Einheitlicher Metadaten-Dict fuer PDF (HTML-Meta-Tags) und DOCX (core_properties)."""
|
|
is_research = incident.get("type") == "research"
|
|
type_label = "Hintergrundrecherche" if is_research else "Live-Monitoring"
|
|
category = "OSINT-Hintergrundrecherche" if is_research else "OSINT-Lagebericht"
|
|
scope_label = _slug_scope_label(scope, sections)
|
|
|
|
title_raw = (incident.get("title") or "Unbenannte Lage").strip()
|
|
title = f"{title_raw} — {type_label}"
|
|
|
|
subject = (incident.get("description") or "").strip()
|
|
if not subject:
|
|
subject = f"{type_label} zu: {title_raw}"
|
|
|
|
# Keywords sammeln (Reihenfolge relevant für Anzeige, Dedup mit dict.fromkeys)
|
|
keywords: list[str] = ["OSINT", type_label]
|
|
if organization_name:
|
|
keywords.append(organization_name)
|
|
|
|
# category_labels: kann JSON-Dict (Karte primary/secondary/...), JSON-Liste
|
|
# oder ein Komma-getrennter String sein. Nur die Label-Werte extrahieren.
|
|
cat_labels_raw = (incident.get("category_labels") or "").strip()
|
|
if cat_labels_raw:
|
|
cat_values: list[str] = []
|
|
try:
|
|
parsed = json.loads(cat_labels_raw)
|
|
if isinstance(parsed, dict):
|
|
cat_values = [str(v).strip() for v in parsed.values() if isinstance(v, str) and v.strip()]
|
|
elif isinstance(parsed, list):
|
|
cat_values = [str(v).strip() for v in parsed if isinstance(v, str) and v.strip()]
|
|
except (json.JSONDecodeError, TypeError):
|
|
cat_values = [lbl.strip() for lbl in cat_labels_raw.split(",") if lbl.strip()]
|
|
# Keine JSON-Fragmente (geschweifte/eckige Klammern) als Keyword zulassen
|
|
for lbl in cat_values:
|
|
if lbl and not any(c in lbl for c in "{}[]"):
|
|
keywords.append(lbl)
|
|
|
|
if top_locations:
|
|
keywords.extend([loc for loc in top_locations if loc])
|
|
|
|
# Sanitize: Zeilenumbrueche/Tabs weg, Sonderzeichen mit PDF-Sonderbedeutung filtern
|
|
def _sanitize_keyword(kw: str) -> str:
|
|
if not kw:
|
|
return ""
|
|
# Whitespace normalisieren
|
|
cleaned = re.sub(r"\s+", " ", kw).strip()
|
|
# PDF-Dict/Array-Klammern und Backslash raus (WeasyPrint escaped () bei Strings,
|
|
# { und [ koennen aber den Keywords-Stream abschneiden)
|
|
cleaned = re.sub(r"[{}\[\]\\]", "", cleaned)
|
|
return cleaned.strip(" ,;:")
|
|
|
|
# Dedup (case-insensitive) mit Reihenfolge erhalten, max 15
|
|
seen = set()
|
|
unique_keywords: list[str] = []
|
|
for kw in keywords:
|
|
clean_kw = _sanitize_keyword(kw)
|
|
if not clean_kw:
|
|
continue
|
|
key = clean_kw.lower()
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique_keywords.append(clean_kw)
|
|
if len(unique_keywords) >= 15:
|
|
break
|
|
|
|
now = datetime.now(TIMEZONE)
|
|
created = _parse_db_timestamp(incident.get("created_at")) or now.replace(tzinfo=None)
|
|
modified = _parse_db_timestamp(incident.get("updated_at")) or created
|
|
|
|
# Strukturierter Comments-Block (wird in DOCX angezeigt, kompakt)
|
|
stand = now.strftime("%d.%m.%Y")
|
|
comments_lines = [
|
|
f"Incident-ID: {incident.get('id', '?')} | Typ: {incident.get('type', 'adhoc')} | Scope: {scope_label}",
|
|
f"Stand: {stand}",
|
|
]
|
|
if organization_name:
|
|
comments_lines.append(f"Organisation: {organization_name}")
|
|
comments_lines.append(
|
|
f"Umfang: {len(articles)} Artikel, {len(fact_checks)} Faktenchecks, {len(sources)} Quellen"
|
|
)
|
|
if top_locations:
|
|
comments_lines.append("Orte: " + ", ".join(top_locations[:5]))
|
|
comments = "\n".join(comments_lines)
|
|
|
|
publisher = organization_name or "AegisSight"
|
|
identifier = f"urn:aegissight:incident:{incident.get('id', '0')}:{now.strftime('%Y%m%dT%H%M%S')}"
|
|
rights = (
|
|
"Vertrauliche Lageanalyse — AegisSight Monitor. "
|
|
"Weitergabe nur an autorisierte Empfänger."
|
|
)
|
|
|
|
return {
|
|
"title": title,
|
|
"author": creator or "AegisSight Monitor",
|
|
"subject": subject,
|
|
"keywords": unique_keywords,
|
|
"keywords_comma": ", ".join(unique_keywords),
|
|
"keywords_semicolon": "; ".join(unique_keywords),
|
|
"category": category,
|
|
"comments": comments,
|
|
"creator_app": "AegisSight Monitor",
|
|
"language": "de-DE",
|
|
"created": created,
|
|
"modified": modified,
|
|
"created_iso": created.strftime("%Y-%m-%dT%H:%M:%S"),
|
|
"modified_iso": modified.strftime("%Y-%m-%dT%H:%M:%S"),
|
|
"type_label": type_label,
|
|
"scope_label": scope_label,
|
|
"publisher": publisher,
|
|
"identifier": identifier,
|
|
"rights": rights,
|
|
"doc_type": "Report",
|
|
"version_id": str(max(1, snapshot_count)),
|
|
}
|
|
|
|
|
|
def _format_pdf_date(dt: datetime) -> str:
|
|
"""PDF-Datumsformat: D:YYYYMMDDHHmmSS+HH'mm' (mit Zeitzone) oder Z (UTC)."""
|
|
if dt.tzinfo is None:
|
|
# Naive dt — als lokale TIMEZONE interpretieren
|
|
dt = dt.replace(tzinfo=TIMEZONE)
|
|
base = dt.strftime("D:%Y%m%d%H%M%S")
|
|
offset = dt.utcoffset()
|
|
if offset is None:
|
|
return base + "Z"
|
|
total_minutes = int(offset.total_seconds() // 60)
|
|
sign = "+" if total_minutes >= 0 else "-"
|
|
total_minutes = abs(total_minutes)
|
|
return f"{base}{sign}{total_minutes // 60:02d}'{total_minutes % 60:02d}'"
|
|
|
|
|
|
def _enrich_pdf_metadata(pdf_bytes: bytes, meta: dict) -> bytes:
|
|
"""PDF-Ausgabe um XMP-Metadaten und CreationDate/ModDate erweitern (post-process via pikepdf)."""
|
|
try:
|
|
buf_in = io.BytesIO(pdf_bytes)
|
|
with pikepdf.Pdf.open(buf_in) as pdf:
|
|
created: datetime = meta.get("created")
|
|
modified: datetime = meta.get("modified")
|
|
if created and created.tzinfo is None:
|
|
created = created.replace(tzinfo=TIMEZONE)
|
|
if modified and modified.tzinfo is None:
|
|
modified = modified.replace(tzinfo=TIMEZONE)
|
|
|
|
# Klassisches Info-Dict: CreationDate + ModDate nachziehen
|
|
if created:
|
|
pdf.docinfo["/CreationDate"] = pikepdf.String(_format_pdf_date(created))
|
|
if modified:
|
|
pdf.docinfo["/ModDate"] = pikepdf.String(_format_pdf_date(modified))
|
|
|
|
# Document-/Instance-ID fuer DMS-Versionierung (frisch pro Export)
|
|
doc_uuid = f"uuid:{uuid.uuid4()}"
|
|
instance_uuid = f"uuid:{uuid.uuid4()}"
|
|
|
|
# XMP-Metadatenblock schreiben (Dublin Core + XMP + PDF + xmpRights + xmpMM)
|
|
with pdf.open_metadata(set_pikepdf_as_editor=False) as xmp:
|
|
# Dublin Core
|
|
xmp["dc:title"] = meta.get("title", "")
|
|
xmp["dc:creator"] = [meta.get("author", "")]
|
|
xmp["dc:description"] = meta.get("subject", "")
|
|
if meta.get("keywords"):
|
|
xmp["dc:subject"] = list(meta["keywords"])
|
|
xmp["dc:language"] = [meta.get("language", "de-DE")]
|
|
xmp["dc:publisher"] = [meta.get("publisher", "AegisSight")]
|
|
xmp["dc:identifier"] = meta.get("identifier", "")
|
|
xmp["dc:format"] = "application/pdf"
|
|
xmp["dc:type"] = [meta.get("doc_type", "Report")]
|
|
xmp["dc:rights"] = meta.get("rights", "")
|
|
if created:
|
|
xmp["dc:date"] = [created.strftime("%Y-%m-%dT%H:%M:%S%z")]
|
|
|
|
# PDF Namespace
|
|
xmp["pdf:Keywords"] = meta.get("keywords_comma", "")
|
|
xmp["pdf:Producer"] = "WeasyPrint + AegisSight Monitor"
|
|
|
|
# XMP Namespace
|
|
xmp["xmp:CreatorTool"] = meta.get("creator_app", "AegisSight Monitor")
|
|
if created:
|
|
xmp["xmp:CreateDate"] = created.strftime("%Y-%m-%dT%H:%M:%S%z")
|
|
if modified:
|
|
xmp["xmp:ModifyDate"] = modified.strftime("%Y-%m-%dT%H:%M:%S%z")
|
|
xmp["xmp:MetadataDate"] = modified.strftime("%Y-%m-%dT%H:%M:%S%z")
|
|
|
|
# xmpRights: Rechte- und Vertraulichkeitshinweis (XMP erwartet String "True")
|
|
xmp["xmpRights:Marked"] = "True"
|
|
if meta.get("rights"):
|
|
# String: pikepdf wrapped das automatisch als LangAlt mit x-default
|
|
xmp["xmpRights:UsageTerms"] = meta["rights"]
|
|
|
|
# xmpMM: Document- und Instance-ID fuer DMS-Versionierung
|
|
xmp["xmpMM:DocumentID"] = doc_uuid
|
|
xmp["xmpMM:InstanceID"] = instance_uuid
|
|
xmp["xmpMM:VersionID"] = meta.get("version_id", "1")
|
|
|
|
# xmpMM:History — Audit-Event fuer diesen Export (einzeiliger Eintrag je Seq-Item)
|
|
history_when = (modified or datetime.now(TIMEZONE)).strftime("%Y-%m-%dT%H:%M:%S%z")
|
|
history_entry = (
|
|
f"action=published; when={history_when}; "
|
|
f"softwareAgent={meta.get('creator_app', 'AegisSight Monitor')}; "
|
|
f"instanceID={instance_uuid}; "
|
|
f"scope={meta.get('scope_label', '')}; "
|
|
f"version={meta.get('version_id', '1')}"
|
|
)
|
|
xmp["xmpMM:History"] = [history_entry]
|
|
|
|
buf_out = io.BytesIO()
|
|
pdf.save(buf_out)
|
|
return buf_out.getvalue()
|
|
except Exception as e:
|
|
logger.warning(f"PDF-Metadaten-Anreicherung (XMP/Dates) fehlgeschlagen: {e}")
|
|
return pdf_bytes
|
|
|
|
|
|
async def generate_pdf(
|
|
incident: dict, articles: list, fact_checks: list, snapshots: list,
|
|
scope: str, creator: str, executive_summary_html: str,
|
|
sections: set[str] | None = None,
|
|
organization_name: str | None = None,
|
|
top_locations: list[str] | None = None,
|
|
snapshot_count: int = 0,
|
|
) -> bytes:
|
|
"""PDF-Report via WeasyPrint generieren."""
|
|
# Sections aus scope ableiten wenn nicht explizit angegeben
|
|
if sections is None:
|
|
if scope == "summary":
|
|
sections = {"zusammenfassung"}
|
|
elif scope == "report":
|
|
sections = {"zusammenfassung", "bericht", "faktencheck", "quellen"}
|
|
else: # full
|
|
sections = {"zusammenfassung", "bericht", "faktencheck", "quellen", "timeline"}
|
|
|
|
# Fuer Research-Lagen: Zusammenfassung aus dem Bericht extrahieren
|
|
is_research = incident.get("type") == "research"
|
|
all_sources = _prepare_sources(incident)
|
|
zusammenfassung_html = executive_summary_html
|
|
bericht_summary = incident.get("summary", "")
|
|
zusammenfassung_title = "Zusammenfassung"
|
|
|
|
if is_research and bericht_summary:
|
|
extracted_html, remaining = _extract_zusammenfassung(bericht_summary, all_sources)
|
|
if extracted_html:
|
|
zusammenfassung_html = extracted_html
|
|
zusammenfassung_title = "Zusammenfassung"
|
|
bericht_summary = remaining
|
|
|
|
# Auch das (nicht-research) Executive Summary linkifizieren — ggf. enthaelt es Zitate
|
|
if not is_research and zusammenfassung_html:
|
|
zusammenfassung_html = _linkify_citations_html(zusammenfassung_html, all_sources)
|
|
|
|
meta = _build_export_metadata(
|
|
incident, articles, fact_checks, all_sources, creator, scope, sections,
|
|
organization_name, top_locations, snapshot_count=snapshot_count,
|
|
)
|
|
|
|
env = Environment(loader=FileSystemLoader(str(TEMPLATE_DIR)))
|
|
template = env.get_template("report.html")
|
|
|
|
now = datetime.now(TIMEZONE)
|
|
incident_type_label = "Hintergrundrecherche" if incident.get("type") == "research" else "Live-Monitoring"
|
|
|
|
html_content = template.render(
|
|
incident=incident,
|
|
incident_type_label=incident_type_label,
|
|
report_date=now.strftime("%d.%m.%Y, %H:%M Uhr"),
|
|
creator=creator,
|
|
logo_base64=_get_logo_base64(),
|
|
executive_summary=zusammenfassung_html,
|
|
zusammenfassung_title=zusammenfassung_title,
|
|
sections=sections,
|
|
scope=scope,
|
|
lagebild_html=_linkify_citations_html(
|
|
_markdown_to_html(bericht_summary), all_sources
|
|
),
|
|
lagebild_timestamp=(incident.get("updated_at") or "")[:16].replace("T", " "),
|
|
sources=_prepare_sources(incident)[:30] if scope == "report" else _prepare_sources(incident),
|
|
fact_checks=_prepare_fact_checks(fact_checks),
|
|
source_stats=_prepare_source_stats(articles)[:20] if scope == "report" else _prepare_source_stats(articles),
|
|
timeline=_prepare_timeline(articles) if scope == "full" else [],
|
|
articles=articles if scope == "full" else [],
|
|
meta=meta,
|
|
)
|
|
|
|
# Artikel pub_date aufbereiten
|
|
for art in articles:
|
|
pub = str(art.get("published_at") or art.get("collected_at") or "")
|
|
try:
|
|
dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
|
|
art["pub_date"] = dt.strftime("%d.%m.%Y")
|
|
except Exception:
|
|
art["pub_date"] = pub[:10] if pub else ""
|
|
|
|
pdf_bytes = HTML(string=html_content).write_pdf()
|
|
pdf_bytes = _enrich_pdf_metadata(pdf_bytes, meta)
|
|
return pdf_bytes
|
|
|
|
|
|
async def generate_docx(
|
|
incident: dict, articles: list, fact_checks: list, snapshots: list,
|
|
scope: str, creator: str, executive_summary_text: str,
|
|
sections: set[str] | None = None,
|
|
organization_name: str | None = None,
|
|
top_locations: list[str] | None = None,
|
|
snapshot_count: int = 0,
|
|
) -> bytes:
|
|
"""Word-Report via python-docx generieren."""
|
|
doc = Document()
|
|
|
|
# Sections aus scope ableiten wenn nicht explizit angegeben
|
|
if sections is None:
|
|
if scope == "summary":
|
|
sections = {"zusammenfassung"}
|
|
elif scope == "report":
|
|
sections = {"zusammenfassung", "bericht", "faktencheck", "quellen"}
|
|
else: # full
|
|
sections = {"zusammenfassung", "bericht", "faktencheck", "quellen", "timeline"}
|
|
|
|
# Fuer Research-Lagen: Zusammenfassung aus dem Bericht extrahieren
|
|
is_research = incident.get("type") == "research"
|
|
all_sources = _prepare_sources(incident)
|
|
zusammenfassung_text = executive_summary_text
|
|
bericht_summary = incident.get("summary") or "Keine Zusammenfassung verfügbar."
|
|
zusammenfassung_title = "Zusammenfassung"
|
|
zusammenfassung_lines: list[str] = []
|
|
|
|
if is_research and bericht_summary:
|
|
extracted_lines, remaining = _extract_zusammenfassung_lines(bericht_summary)
|
|
if extracted_lines:
|
|
zusammenfassung_lines = extracted_lines
|
|
zusammenfassung_title = "Zusammenfassung"
|
|
bericht_summary = remaining
|
|
|
|
meta = _build_export_metadata(
|
|
incident, articles, fact_checks, all_sources, creator, scope, sections,
|
|
organization_name, top_locations, snapshot_count=snapshot_count,
|
|
)
|
|
|
|
# Dateimetadaten setzen (sichtbar in Explorer/Finder, DMS-Systemen)
|
|
cp = doc.core_properties
|
|
cp.title = meta["title"]
|
|
cp.author = meta["author"]
|
|
cp.subject = meta["subject"]
|
|
cp.keywords = meta["keywords_semicolon"]
|
|
cp.comments = meta["comments"]
|
|
cp.category = meta["category"]
|
|
cp.last_modified_by = meta["author"]
|
|
cp.language = meta["language"]
|
|
cp.content_status = "Final"
|
|
try:
|
|
cp.created = meta["created"]
|
|
cp.modified = meta["modified"]
|
|
except (ValueError, TypeError) as e:
|
|
logger.warning(f"DOCX created/modified konnte nicht gesetzt werden: {e}")
|
|
|
|
# Styles
|
|
style = doc.styles['Normal']
|
|
style.font.size = Pt(10)
|
|
style.font.name = 'Calibri'
|
|
|
|
# --- Deckblatt ---
|
|
for _ in range(6):
|
|
doc.add_paragraph()
|
|
|
|
title_para = doc.add_paragraph()
|
|
title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run = title_para.add_run("AegisSight Monitor")
|
|
run.font.size = Pt(12)
|
|
run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32)
|
|
|
|
doc.add_paragraph()
|
|
|
|
type_label = "Hintergrundrecherche" if incident.get("type") == "research" else "Live-Monitoring"
|
|
type_para = doc.add_paragraph()
|
|
type_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run = type_para.add_run(type_label)
|
|
run.font.size = Pt(10)
|
|
run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32)
|
|
|
|
title_para2 = doc.add_paragraph()
|
|
title_para2.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run = title_para2.add_run(incident.get("title", ""))
|
|
run.font.size = Pt(24)
|
|
run.font.bold = True
|
|
run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32)
|
|
|
|
if incident.get("description"):
|
|
desc_para = doc.add_paragraph()
|
|
desc_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run = desc_para.add_run(incident["description"])
|
|
run.font.size = Pt(11)
|
|
run.font.color.rgb = RGBColor(0x66, 0x66, 0x66)
|
|
|
|
doc.add_paragraph()
|
|
for _ in range(3):
|
|
doc.add_paragraph()
|
|
|
|
now = datetime.now(TIMEZONE)
|
|
meta_para = doc.add_paragraph()
|
|
meta_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run = meta_para.add_run(f"Stand: {now.strftime('%d.%m.%Y, %H:%M Uhr')}\nErstellt von: {creator}")
|
|
run.font.size = Pt(9)
|
|
run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32)
|
|
|
|
doc.add_page_break()
|
|
|
|
# --- Zusammenfassung / Executive Summary ---
|
|
if "zusammenfassung" in sections:
|
|
doc.add_heading(zusammenfassung_title, level=1)
|
|
|
|
if zusammenfassung_lines:
|
|
for line in zusammenfassung_lines:
|
|
_add_docx_paragraph_with_citations(doc, line, all_sources, style='List Bullet')
|
|
else:
|
|
# Fallback: HTML-Tags aus executive_summary_text strippen, dann Bullets bilden
|
|
clean_text = re.sub(r'<[^>]+>', '', zusammenfassung_text or '')
|
|
lines = [line.strip().lstrip("- ").lstrip("* ") for line in clean_text.strip().split("\n") if line.strip()]
|
|
for line in lines:
|
|
if line:
|
|
_add_docx_paragraph_with_citations(doc, line, all_sources, style='List Bullet')
|
|
|
|
if "bericht" in sections:
|
|
# --- Lagebild / Recherchebericht ---
|
|
doc.add_heading("Recherchebericht" if is_research else "Lagebild", level=1)
|
|
# Markdown-Formatierung entfernen, Zitate aber als [NNN] beibehalten und als Hyperlinks rendern
|
|
clean_summary = re.sub(r'\*\*(.+?)\*\*', r'\1', bericht_summary)
|
|
clean_summary = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', clean_summary)
|
|
clean_summary = re.sub(r'^#{1,3}\s+', '', clean_summary, flags=re.MULTILINE)
|
|
for para_text in clean_summary.split("\n\n"):
|
|
para_text = para_text.strip()
|
|
if not para_text:
|
|
continue
|
|
if para_text.startswith("- "):
|
|
for bullet in para_text.split("\n"):
|
|
bullet = bullet.lstrip("- ").strip()
|
|
if bullet:
|
|
_add_docx_paragraph_with_citations(doc, bullet, all_sources, style='List Bullet')
|
|
else:
|
|
_add_docx_paragraph_with_citations(doc, para_text, all_sources)
|
|
|
|
if "faktencheck" in sections:
|
|
# --- Faktencheck ---
|
|
report_fcs = fact_checks
|
|
if report_fcs:
|
|
doc.add_heading("Faktencheck", level=1)
|
|
table = doc.add_table(rows=1, cols=3)
|
|
table.style = 'Table Grid'
|
|
table.alignment = WD_TABLE_ALIGNMENT.CENTER
|
|
hdr = table.rows[0].cells
|
|
hdr[0].text = "Behauptung"
|
|
hdr[1].text = "Status"
|
|
hdr[2].text = "Quellen"
|
|
for cell in hdr:
|
|
for p in cell.paragraphs:
|
|
p.runs[0].font.bold = True
|
|
p.runs[0].font.size = Pt(9)
|
|
for fc in report_fcs:
|
|
row = table.add_row().cells
|
|
row[0].text = fc.get("claim", "")
|
|
row[1].text = FC_STATUS_LABELS.get(fc.get("status", ""), fc.get("status", ""))
|
|
row[2].text = str(fc.get("sources_count", 0))
|
|
|
|
if "quellen" in sections:
|
|
# --- Quellenstatistik ---
|
|
source_stats = _prepare_source_stats(articles)
|
|
if source_stats:
|
|
doc.add_heading("Quellenstatistik", level=1)
|
|
table = doc.add_table(rows=1, cols=3)
|
|
table.style = 'Table Grid'
|
|
table.alignment = WD_TABLE_ALIGNMENT.CENTER
|
|
hdr = table.rows[0].cells
|
|
hdr[0].text = "Quelle"
|
|
hdr[1].text = "Artikel"
|
|
hdr[2].text = "Sprache"
|
|
for cell in hdr:
|
|
for p in cell.paragraphs:
|
|
p.runs[0].font.bold = True
|
|
p.runs[0].font.size = Pt(9)
|
|
for stat in source_stats:
|
|
row = table.add_row().cells
|
|
row[0].text = stat["name"]
|
|
row[1].text = str(stat["count"])
|
|
row[2].text = stat["languages"]
|
|
|
|
if "timeline" in sections:
|
|
# --- Artikelverzeichnis ---
|
|
if articles:
|
|
doc.add_page_break()
|
|
doc.add_heading(f"Artikelverzeichnis ({len(articles)} Artikel)", level=1)
|
|
table = doc.add_table(rows=1, cols=4)
|
|
table.style = 'Table Grid'
|
|
table.alignment = WD_TABLE_ALIGNMENT.CENTER
|
|
hdr = table.rows[0].cells
|
|
for i, txt in enumerate(["Headline", "Quelle", "Sprache", "Datum"]):
|
|
hdr[i].text = txt
|
|
for p in hdr[i].paragraphs:
|
|
p.runs[0].font.bold = True
|
|
p.runs[0].font.size = Pt(8)
|
|
for art in articles:
|
|
row = table.add_row().cells
|
|
row[0].text = art.get("headline_de") or art.get("headline") or "Ohne Titel"
|
|
row[1].text = art.get("source") or ""
|
|
row[2].text = (art.get("language") or "de").upper()
|
|
pub = str(art.get("published_at") or art.get("collected_at") or "")
|
|
try:
|
|
dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
|
|
row[3].text = dt.strftime("%d.%m.%Y")
|
|
except Exception:
|
|
row[3].text = pub[:10] if pub else ""
|
|
# Schriftgröße reduzieren
|
|
for cell in row:
|
|
for p in cell.paragraphs:
|
|
for run in p.runs:
|
|
run.font.size = Pt(8)
|
|
|
|
# --- Footer ---
|
|
doc.add_paragraph()
|
|
footer = doc.add_paragraph()
|
|
footer.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run = footer.add_run(f"Erstellt mit AegisSight Monitor — aegis-sight.de — {now.strftime('%d.%m.%Y')}")
|
|
run.font.size = Pt(8)
|
|
run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32)
|
|
|
|
buf = io.BytesIO()
|
|
doc.save(buf)
|
|
return buf.getvalue()
|