"""Report-Generator: PDF und Word Berichte aus Lage-Daten.""" import base64 import io import json import logging import re import uuid from collections import defaultdict from datetime import datetime from pathlib import Path import pikepdf from jinja2 import Environment, FileSystemLoader from weasyprint import HTML from docx import Document from docx.shared import Inches, Pt, Cm, RGBColor from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.enum.table import WD_TABLE_ALIGNMENT from config import TIMEZONE, CLAUDE_MODEL_FAST logger = logging.getLogger("osint.report") TEMPLATE_DIR = Path(__file__).parent / "report_templates" LOGO_PATH = Path(__file__).parent / "static" / "favicon.svg" FC_STATUS_LABELS = { "confirmed": "Bestätigt", "unconfirmed": "Unbestätigt", "disputed": "Umstritten", "false": "Falsch", } def _get_logo_base64() -> str: """Logo als Base64 für HTML-Embedding.""" try: return base64.b64encode(LOGO_PATH.read_bytes()).decode() except Exception: return "" def _prepare_sources(incident: dict) -> list: """Quellenverzeichnis aus sources_json parsen.""" raw = incident.get("sources_json") if not raw: return [] try: return json.loads(raw) if isinstance(raw, str) else raw except (json.JSONDecodeError, TypeError): return [] def _prepare_source_stats(articles: list) -> list: """Quellenstatistik: Artikel pro Quelle + Sprachen.""" source_map = defaultdict(lambda: {"count": 0, "langs": set()}) for art in articles: name = art.get("source") or "Unbekannt" source_map[name]["count"] += 1 source_map[name]["langs"].add((art.get("language") or "de").upper()) stats = [] for name, data in sorted(source_map.items(), key=lambda x: -x[1]["count"]): stats.append({"name": name, "count": data["count"], "languages": ", ".join(sorted(data["langs"]))}) return stats def _prepare_fact_checks(fact_checks: list) -> list: """Faktenchecks mit Label aufbereiten.""" result = [] for fc in fact_checks: fc_copy = dict(fc) fc_copy["status_label"] = FC_STATUS_LABELS.get(fc.get("status", ""), fc.get("status", "Unbekannt")) result.append(fc_copy) return result def _prepare_timeline(articles: list) -> list: """Timeline aus Artikeln: sortiert nach Datum.""" timeline = [] for art in articles: pub = art.get("published_at") or art.get("collected_at") or "" pub = str(pub) if pub else "" headline = art.get("headline_de") or art.get("headline") or "Ohne Titel" source = art.get("source") or "" if pub: try: dt = datetime.fromisoformat(pub.replace("Z", "+00:00")) date_str = dt.strftime("%d.%m.%Y %H:%M") except Exception: date_str = pub[:16] else: date_str = "" timeline.append({"date": date_str, "headline": headline, "source": source, "sort_key": pub}) timeline.sort(key=lambda x: x["sort_key"], reverse=True) return timeline[:100] # Max 100 Einträge def _markdown_to_html(text: str) -> str: """Einfache Markdown -> HTML Konvertierung für Lagebild.""" if not text: return "

Keine Zusammenfassung verfügbar.

" # Basic Markdown -> HTML html = text # Headlines html = re.sub(r'^### (.+)$', r'

\1

', html, flags=re.MULTILINE) html = re.sub(r'^## (.+)$', r'

\1

', html, flags=re.MULTILINE) # Bold html = re.sub(r'\*\*(.+?)\*\*', r'\1', html) # Links [text](url) html = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', html) # Bullet lists html = re.sub(r'^- (.+)$', r'
  • \1
  • ', html, flags=re.MULTILINE) html = re.sub(r'(
  • .*
  • \n?)+', lambda m: '', html) # Paragraphs paragraphs = html.split('\n\n') result = [] for p in paragraphs: p = p.strip() if not p: continue if p.startswith('{p}

    ') return '\n'.join(result) def _truncate_lagebild(summary_text: str, max_chars: int = 4000) -> str: """Lagebild für den Lagebericht auf die Zusammenfassung kürzen. Nimmt nur den ersten Abschnitt (bis zur zweiten H2/H3-Überschrift) oder kürzt auf max_chars Zeichen mit sauberem Abbruch am Absatzende. """ if not summary_text or len(summary_text) <= max_chars: return summary_text lines = summary_text.split("\n") result_lines = [] heading_count = 0 char_count = 0 for line in lines: stripped = line.strip() # Zähle Überschriften (## oder ###) if stripped.startswith("## ") or stripped.startswith("### "): heading_count += 1 # Nach der 3. Überschrift abbrechen (= 2 Abschnitte) if heading_count > 3: break result_lines.append(line) char_count += len(line) + 1 # Hard-Limit bei max_chars, aber am Absatzende abbrechen if char_count > max_chars and stripped == "": break text = "\n".join(result_lines).rstrip() if len(text) < len(summary_text) - 100: text += "\n\n*[Vollständige Zusammenfassung im Vollständigen Bericht]*" return text def _strip_citation_numbers(text: str) -> str: """Entfernt [1234]-Quellenreferenzen aus dem Text.""" # Einzelne Referenzen: [1302] text = re.sub(r"\s*\[\d{1,5}\]", "", text) # Mehrfach-Referenzen: [725][765][768] text = re.sub(r"(\[\d{1,5}\]){2,}", "", text) # Aufräumen: Doppelte Leerzeichen text = re.sub(r" +", " ", text) return text def _find_source_for_citation(num: str, sources: list) -> dict | None: """Sucht eine Quelle anhand der Zitat-Nummer (inkl. Suffix-Fallback wie 1383a -> 1383).""" if not sources: return None for s in sources: try: if str(s.get("nr")) == num: return s except Exception: continue # Suffix-Fallback: 1383a -> 1383 if re.search(r"[a-z]$", num): base = re.sub(r"[a-z]$", "", num) for s in sources: if str(s.get("nr")) == base: return s return None def _linkify_citations_html(text: str, sources: list) -> str: """Ersetzt [1234]-Zitate durch HTML-Links zur jeweiligen Quelle. Nummern ohne zugeordnete Quelle bleiben als sichtbare Zahl erhalten. """ if not text: return text if not sources: return text def repl(match: re.Match) -> str: num = match.group(1) src = _find_source_for_citation(num, sources) if src and src.get("url"): url = src["url"].replace('"', """) name = (src.get("name") or "").replace('"', """) return f'[{num}]' return match.group(0) return re.sub(r"\[(\d{1,5}[a-z]?)\]", repl, text) def _add_docx_hyperlink(paragraph, url: str, text: str): """Fügt einen klickbaren Hyperlink in ein python-docx-Paragraph-Objekt ein.""" from docx.oxml.shared import OxmlElement, qn part = paragraph.part r_id = part.relate_to( url, "http://schemas.openxmlformats.org/officeDocument/2006/relationships/hyperlink", is_external=True, ) hyperlink = OxmlElement("w:hyperlink") hyperlink.set(qn("r:id"), r_id) new_run = OxmlElement("w:r") rPr = OxmlElement("w:rPr") color = OxmlElement("w:color") color.set(qn("w:val"), "0066CC") rPr.append(color) u = OxmlElement("w:u") u.set(qn("w:val"), "single") rPr.append(u) sz = OxmlElement("w:sz") sz.set(qn("w:val"), "20") rPr.append(sz) new_run.append(rPr) t = OxmlElement("w:t") t.text = text t.set(qn("xml:space"), "preserve") new_run.append(t) hyperlink.append(new_run) paragraph._p.append(hyperlink) return hyperlink def _add_docx_paragraph_with_citations(doc_or_para, text: str, sources: list, style: str | None = None): """Fügt ein Paragraph hinzu, bei dem [1234]-Zitate als Hyperlink-Runs eingefügt werden. doc_or_para darf ein Document sein (neues Paragraph wird angelegt) oder bereits ein Paragraph. """ if hasattr(doc_or_para, "add_paragraph"): para = doc_or_para.add_paragraph(style=style) if style else doc_or_para.add_paragraph() else: para = doc_or_para pattern = re.compile(r"\[(\d{1,5}[a-z]?)\]") pos = 0 for m in pattern.finditer(text): if m.start() > pos: para.add_run(text[pos:m.start()]) num = m.group(1) src = _find_source_for_citation(num, sources) if src and src.get("url"): _add_docx_hyperlink(para, src["url"], f"[{num}]") else: para.add_run(m.group(0)) pos = m.end() if pos < len(text): para.add_run(text[pos:]) return para def _extract_zusammenfassung_lines(summary_text: str) -> tuple[list[str], str]: """Extrahiert die ZUSAMMENFASSUNG-Sektion als Liste von Rohzeilen (ohne Zitatbearbeitung). Returns: (lines, remaining_summary) """ if not summary_text: return [], summary_text pattern = r"(## (?:ZUSAMMENFASSUNG|ÜBERBLICK)\s*\n)(.*?)(?=\n## |\Z)" match = re.search(pattern, summary_text, re.DOTALL) if not match: return [], summary_text zusammenfassung_raw = match.group(2).strip() remaining = summary_text[:match.start()] + summary_text[match.end():] remaining = remaining.strip() lines: list[str] = [] for line in zusammenfassung_raw.split("\n"): stripped = line.strip() if stripped.startswith("- ") or stripped.startswith("* "): content = stripped[2:].strip() if content: lines.append(content) elif stripped and not stripped.startswith("#"): lines.append(stripped) return lines, remaining def _extract_zusammenfassung(summary_text: str, sources: list | None = None) -> tuple[str, str]: """Extrahiert die ZUSAMMENFASSUNG-Sektion und liefert sie als HTML mit verlinkten Zitaten.""" lines, remaining = _extract_zusammenfassung_lines(summary_text) if not lines: return "", summary_text src_list = sources or [] html_lines = [f"
  • {_linkify_citations_html(line, src_list)}
  • " for line in lines] html = "" return html, remaining async def generate_executive_summary(summary_text: str) -> str: """KI-verdichtetes Executive Summary aus dem Lagebild.""" if not summary_text or len(summary_text.strip()) < 50: return "" from agents.claude_client import call_claude prompt = f"""Du bist ein Intelligence-Analyst für ein OSINT-Lagemonitoring-System. Verdichte das folgende Lagebild auf genau 3-5 Kernpunkte. REGELN: - Jeder Punkt: 1-2 Sätze, faktenbasiert - Fokus: Was ist passiert? Was bedeutet es? Was ist die aktuelle Dynamik? - Sprache: Deutsch, sachlich, prägnant - Format: Gib NUR die Bullet Points aus, einen pro Zeile, mit "- " am Anfang - KEINE Einleitung, KEINE Überschrift, NUR die Punkte LAGEBILD: {summary_text}""" try: result, usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) # Robuster Parser: Akzeptiert JSON, Markdown-Listen oder Freitext lines = [] text = result.strip() # Code-Fences entfernen (```json ... ```) if text.startswith("```"): text = re.sub(r"^```\w*\n?", "", text) text = re.sub(r"\n?```$", "", text) text = text.strip() # Fall 1: JSON-Antwort (Haiku gibt manchmal JSON zurück) if text.startswith("{"): try: data = json.loads(text) for key in data: if isinstance(data[key], list): for item in data[key]: clean = str(item).strip().lstrip("- ").lstrip("* ") if clean: lines.append(clean) break except json.JSONDecodeError: pass # Fall 2: Markdown Bullet Points if not lines: for line in text.split("\n"): stripped = line.strip() if stripped.startswith(("- ", "* ")): clean = stripped.lstrip("- ").lstrip("* ").strip() if clean: lines.append(clean) # Fall 3: Nummerierte Liste (1. 2. 3.) if not lines: for line in text.split("\n"): m = re.match(r"^\d+\.\s+(.+)", line.strip()) if m: lines.append(m.group(1).strip()) # Fallback: Ganzen Text als einen Punkt if not lines: lines = [text[:500]] html = "" return html except Exception as e: logger.error(f"Executive Summary Generierung fehlgeschlagen: {e}") return "" def _parse_db_timestamp(value) -> datetime | None: """SQLite-Timestamp robust als datetime parsen (ISO oder 'YYYY-MM-DD HH:MM:SS').""" if not value: return None if isinstance(value, datetime): return value try: text = str(value).replace("T", " ").replace("Z", "") # Sekundenbruchteile und Timezone-Offset abschneiden (python-docx mag nur naive dt) text = text.split(".")[0].split("+")[0].strip() return datetime.strptime(text, "%Y-%m-%d %H:%M:%S") except (ValueError, TypeError): try: return datetime.strptime(str(value)[:10], "%Y-%m-%d") except (ValueError, TypeError): return None def _slug_scope_label(scope: str, sections: set[str] | None) -> str: """Scope-Label fuer Metadaten und Dateinamen.""" if sections: if sections == {"zusammenfassung"}: return "Zusammenfassung" if "timeline" in sections: return "Vollständiger Bericht" return "Lagebericht" return {"summary": "Zusammenfassung", "report": "Lagebericht", "full": "Vollständiger Bericht"}.get( scope, "Lagebericht" ) def _build_export_metadata( incident: dict, articles: list, fact_checks: list, sources: list, creator: str, scope: str, sections: set[str] | None, organization_name: str | None, top_locations: list[str] | None, snapshot_count: int = 0, ) -> dict: """Einheitlicher Metadaten-Dict fuer PDF (HTML-Meta-Tags) und DOCX (core_properties).""" is_research = incident.get("type") == "research" type_label = "Hintergrundrecherche" if is_research else "Live-Monitoring" category = "OSINT-Hintergrundrecherche" if is_research else "OSINT-Lagebericht" scope_label = _slug_scope_label(scope, sections) title_raw = (incident.get("title") or "Unbenannte Lage").strip() title = f"{title_raw} — {type_label}" subject = (incident.get("description") or "").strip() if not subject: subject = f"{type_label} zu: {title_raw}" # Keywords sammeln (Reihenfolge relevant für Anzeige, Dedup mit dict.fromkeys) keywords: list[str] = ["OSINT", type_label] if organization_name: keywords.append(organization_name) # category_labels: kann JSON-Dict (Karte primary/secondary/...), JSON-Liste # oder ein Komma-getrennter String sein. Nur die Label-Werte extrahieren. cat_labels_raw = (incident.get("category_labels") or "").strip() if cat_labels_raw: cat_values: list[str] = [] try: parsed = json.loads(cat_labels_raw) if isinstance(parsed, dict): cat_values = [str(v).strip() for v in parsed.values() if isinstance(v, str) and v.strip()] elif isinstance(parsed, list): cat_values = [str(v).strip() for v in parsed if isinstance(v, str) and v.strip()] except (json.JSONDecodeError, TypeError): cat_values = [lbl.strip() for lbl in cat_labels_raw.split(",") if lbl.strip()] # Keine JSON-Fragmente (geschweifte/eckige Klammern) als Keyword zulassen for lbl in cat_values: if lbl and not any(c in lbl for c in "{}[]"): keywords.append(lbl) if top_locations: keywords.extend([loc for loc in top_locations if loc]) # Sanitize: Zeilenumbrueche/Tabs weg, Sonderzeichen mit PDF-Sonderbedeutung filtern def _sanitize_keyword(kw: str) -> str: if not kw: return "" # Whitespace normalisieren cleaned = re.sub(r"\s+", " ", kw).strip() # PDF-Dict/Array-Klammern und Backslash raus (WeasyPrint escaped () bei Strings, # { und [ koennen aber den Keywords-Stream abschneiden) cleaned = re.sub(r"[{}\[\]\\]", "", cleaned) return cleaned.strip(" ,;:") # Dedup (case-insensitive) mit Reihenfolge erhalten, max 15 seen = set() unique_keywords: list[str] = [] for kw in keywords: clean_kw = _sanitize_keyword(kw) if not clean_kw: continue key = clean_kw.lower() if key not in seen: seen.add(key) unique_keywords.append(clean_kw) if len(unique_keywords) >= 15: break now = datetime.now(TIMEZONE) created = _parse_db_timestamp(incident.get("created_at")) or now.replace(tzinfo=None) modified = _parse_db_timestamp(incident.get("updated_at")) or created # Strukturierter Comments-Block (wird in DOCX angezeigt, kompakt) stand = now.strftime("%d.%m.%Y") comments_lines = [ f"Incident-ID: {incident.get('id', '?')} | Typ: {incident.get('type', 'adhoc')} | Scope: {scope_label}", f"Stand: {stand}", ] if organization_name: comments_lines.append(f"Organisation: {organization_name}") comments_lines.append( f"Umfang: {len(articles)} Artikel, {len(fact_checks)} Faktenchecks, {len(sources)} Quellen" ) if top_locations: comments_lines.append("Orte: " + ", ".join(top_locations[:5])) comments = "\n".join(comments_lines) publisher = organization_name or "AegisSight" identifier = f"urn:aegissight:incident:{incident.get('id', '0')}:{now.strftime('%Y%m%dT%H%M%S')}" rights = ( "Vertrauliche Lageanalyse — AegisSight Monitor. " "Weitergabe nur an autorisierte Empfänger." ) return { "title": title, "author": creator or "AegisSight Monitor", "subject": subject, "keywords": unique_keywords, "keywords_comma": ", ".join(unique_keywords), "keywords_semicolon": "; ".join(unique_keywords), "category": category, "comments": comments, "creator_app": "AegisSight Monitor", "language": "de-DE", "created": created, "modified": modified, "created_iso": created.strftime("%Y-%m-%dT%H:%M:%S"), "modified_iso": modified.strftime("%Y-%m-%dT%H:%M:%S"), "type_label": type_label, "scope_label": scope_label, "publisher": publisher, "identifier": identifier, "rights": rights, "doc_type": "Report", "version_id": str(max(1, snapshot_count)), } def _format_pdf_date(dt: datetime) -> str: """PDF-Datumsformat: D:YYYYMMDDHHmmSS+HH'mm' (mit Zeitzone) oder Z (UTC).""" if dt.tzinfo is None: # Naive dt — als lokale TIMEZONE interpretieren dt = dt.replace(tzinfo=TIMEZONE) base = dt.strftime("D:%Y%m%d%H%M%S") offset = dt.utcoffset() if offset is None: return base + "Z" total_minutes = int(offset.total_seconds() // 60) sign = "+" if total_minutes >= 0 else "-" total_minutes = abs(total_minutes) return f"{base}{sign}{total_minutes // 60:02d}'{total_minutes % 60:02d}'" def _enrich_pdf_metadata(pdf_bytes: bytes, meta: dict) -> bytes: """PDF-Ausgabe um XMP-Metadaten und CreationDate/ModDate erweitern (post-process via pikepdf).""" try: buf_in = io.BytesIO(pdf_bytes) with pikepdf.Pdf.open(buf_in) as pdf: created: datetime = meta.get("created") modified: datetime = meta.get("modified") if created and created.tzinfo is None: created = created.replace(tzinfo=TIMEZONE) if modified and modified.tzinfo is None: modified = modified.replace(tzinfo=TIMEZONE) # Klassisches Info-Dict: CreationDate + ModDate nachziehen if created: pdf.docinfo["/CreationDate"] = pikepdf.String(_format_pdf_date(created)) if modified: pdf.docinfo["/ModDate"] = pikepdf.String(_format_pdf_date(modified)) # Document-/Instance-ID fuer DMS-Versionierung (frisch pro Export) doc_uuid = f"uuid:{uuid.uuid4()}" instance_uuid = f"uuid:{uuid.uuid4()}" # XMP-Metadatenblock schreiben (Dublin Core + XMP + PDF + xmpRights + xmpMM) with pdf.open_metadata(set_pikepdf_as_editor=False) as xmp: # Dublin Core xmp["dc:title"] = meta.get("title", "") xmp["dc:creator"] = [meta.get("author", "")] xmp["dc:description"] = meta.get("subject", "") if meta.get("keywords"): xmp["dc:subject"] = list(meta["keywords"]) xmp["dc:language"] = [meta.get("language", "de-DE")] xmp["dc:publisher"] = [meta.get("publisher", "AegisSight")] xmp["dc:identifier"] = meta.get("identifier", "") xmp["dc:format"] = "application/pdf" xmp["dc:type"] = [meta.get("doc_type", "Report")] xmp["dc:rights"] = meta.get("rights", "") if created: xmp["dc:date"] = [created.strftime("%Y-%m-%dT%H:%M:%S%z")] # PDF Namespace xmp["pdf:Keywords"] = meta.get("keywords_comma", "") xmp["pdf:Producer"] = "WeasyPrint + AegisSight Monitor" # XMP Namespace xmp["xmp:CreatorTool"] = meta.get("creator_app", "AegisSight Monitor") if created: xmp["xmp:CreateDate"] = created.strftime("%Y-%m-%dT%H:%M:%S%z") if modified: xmp["xmp:ModifyDate"] = modified.strftime("%Y-%m-%dT%H:%M:%S%z") xmp["xmp:MetadataDate"] = modified.strftime("%Y-%m-%dT%H:%M:%S%z") # xmpRights: Rechte- und Vertraulichkeitshinweis (XMP erwartet String "True") xmp["xmpRights:Marked"] = "True" if meta.get("rights"): # String: pikepdf wrapped das automatisch als LangAlt mit x-default xmp["xmpRights:UsageTerms"] = meta["rights"] # xmpMM: Document- und Instance-ID fuer DMS-Versionierung xmp["xmpMM:DocumentID"] = doc_uuid xmp["xmpMM:InstanceID"] = instance_uuid xmp["xmpMM:VersionID"] = meta.get("version_id", "1") # xmpMM:History — Audit-Event fuer diesen Export (einzeiliger Eintrag je Seq-Item) history_when = (modified or datetime.now(TIMEZONE)).strftime("%Y-%m-%dT%H:%M:%S%z") history_entry = ( f"action=published; when={history_when}; " f"softwareAgent={meta.get('creator_app', 'AegisSight Monitor')}; " f"instanceID={instance_uuid}; " f"scope={meta.get('scope_label', '')}; " f"version={meta.get('version_id', '1')}" ) xmp["xmpMM:History"] = [history_entry] buf_out = io.BytesIO() pdf.save(buf_out) return buf_out.getvalue() except Exception as e: logger.warning(f"PDF-Metadaten-Anreicherung (XMP/Dates) fehlgeschlagen: {e}") return pdf_bytes async def generate_pdf( incident: dict, articles: list, fact_checks: list, snapshots: list, scope: str, creator: str, executive_summary_html: str, sections: set[str] | None = None, organization_name: str | None = None, top_locations: list[str] | None = None, snapshot_count: int = 0, ) -> bytes: """PDF-Report via WeasyPrint generieren.""" # Sections aus scope ableiten wenn nicht explizit angegeben if sections is None: if scope == "summary": sections = {"zusammenfassung"} elif scope == "report": sections = {"zusammenfassung", "bericht", "faktencheck", "quellen"} else: # full sections = {"zusammenfassung", "bericht", "faktencheck", "quellen", "timeline"} # Fuer Research-Lagen: Zusammenfassung aus dem Bericht extrahieren is_research = incident.get("type") == "research" all_sources = _prepare_sources(incident) zusammenfassung_html = executive_summary_html bericht_summary = incident.get("summary", "") zusammenfassung_title = "Zusammenfassung" if is_research and bericht_summary: extracted_html, remaining = _extract_zusammenfassung(bericht_summary, all_sources) if extracted_html: zusammenfassung_html = extracted_html zusammenfassung_title = "Zusammenfassung" bericht_summary = remaining # Auch das (nicht-research) Executive Summary linkifizieren — ggf. enthaelt es Zitate if not is_research and zusammenfassung_html: zusammenfassung_html = _linkify_citations_html(zusammenfassung_html, all_sources) meta = _build_export_metadata( incident, articles, fact_checks, all_sources, creator, scope, sections, organization_name, top_locations, snapshot_count=snapshot_count, ) env = Environment(loader=FileSystemLoader(str(TEMPLATE_DIR))) template = env.get_template("report.html") now = datetime.now(TIMEZONE) incident_type_label = "Hintergrundrecherche" if incident.get("type") == "research" else "Live-Monitoring" html_content = template.render( incident=incident, incident_type_label=incident_type_label, report_date=now.strftime("%d.%m.%Y, %H:%M Uhr"), creator=creator, logo_base64=_get_logo_base64(), executive_summary=zusammenfassung_html, zusammenfassung_title=zusammenfassung_title, sections=sections, scope=scope, lagebild_html=_linkify_citations_html( _markdown_to_html(bericht_summary), all_sources ), lagebild_timestamp=(incident.get("updated_at") or "")[:16].replace("T", " "), sources=_prepare_sources(incident)[:30] if scope == "report" else _prepare_sources(incident), fact_checks=_prepare_fact_checks(fact_checks[:20] if scope == "report" else fact_checks), source_stats=_prepare_source_stats(articles)[:20] if scope == "report" else _prepare_source_stats(articles), timeline=_prepare_timeline(articles) if scope == "full" else [], articles=articles if scope == "full" else [], meta=meta, ) # Artikel pub_date aufbereiten for art in articles: pub = str(art.get("published_at") or art.get("collected_at") or "") try: dt = datetime.fromisoformat(pub.replace("Z", "+00:00")) art["pub_date"] = dt.strftime("%d.%m.%Y") except Exception: art["pub_date"] = pub[:10] if pub else "" pdf_bytes = HTML(string=html_content).write_pdf() pdf_bytes = _enrich_pdf_metadata(pdf_bytes, meta) return pdf_bytes async def generate_docx( incident: dict, articles: list, fact_checks: list, snapshots: list, scope: str, creator: str, executive_summary_text: str, sections: set[str] | None = None, organization_name: str | None = None, top_locations: list[str] | None = None, snapshot_count: int = 0, ) -> bytes: """Word-Report via python-docx generieren.""" doc = Document() # Sections aus scope ableiten wenn nicht explizit angegeben if sections is None: if scope == "summary": sections = {"zusammenfassung"} elif scope == "report": sections = {"zusammenfassung", "bericht", "faktencheck", "quellen"} else: # full sections = {"zusammenfassung", "bericht", "faktencheck", "quellen", "timeline"} # Fuer Research-Lagen: Zusammenfassung aus dem Bericht extrahieren is_research = incident.get("type") == "research" all_sources = _prepare_sources(incident) zusammenfassung_text = executive_summary_text bericht_summary = incident.get("summary") or "Keine Zusammenfassung verfügbar." zusammenfassung_title = "Zusammenfassung" zusammenfassung_lines: list[str] = [] if is_research and bericht_summary: extracted_lines, remaining = _extract_zusammenfassung_lines(bericht_summary) if extracted_lines: zusammenfassung_lines = extracted_lines zusammenfassung_title = "Zusammenfassung" bericht_summary = remaining meta = _build_export_metadata( incident, articles, fact_checks, all_sources, creator, scope, sections, organization_name, top_locations, snapshot_count=snapshot_count, ) # Dateimetadaten setzen (sichtbar in Explorer/Finder, DMS-Systemen) cp = doc.core_properties cp.title = meta["title"] cp.author = meta["author"] cp.subject = meta["subject"] cp.keywords = meta["keywords_semicolon"] cp.comments = meta["comments"] cp.category = meta["category"] cp.last_modified_by = meta["author"] cp.language = meta["language"] cp.content_status = "Final" try: cp.created = meta["created"] cp.modified = meta["modified"] except (ValueError, TypeError) as e: logger.warning(f"DOCX created/modified konnte nicht gesetzt werden: {e}") # Styles style = doc.styles['Normal'] style.font.size = Pt(10) style.font.name = 'Calibri' # --- Deckblatt --- for _ in range(6): doc.add_paragraph() title_para = doc.add_paragraph() title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER run = title_para.add_run("AegisSight Monitor") run.font.size = Pt(12) run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32) doc.add_paragraph() type_label = "Hintergrundrecherche" if incident.get("type") == "research" else "Live-Monitoring" type_para = doc.add_paragraph() type_para.alignment = WD_ALIGN_PARAGRAPH.CENTER run = type_para.add_run(type_label) run.font.size = Pt(10) run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32) title_para2 = doc.add_paragraph() title_para2.alignment = WD_ALIGN_PARAGRAPH.CENTER run = title_para2.add_run(incident.get("title", "")) run.font.size = Pt(24) run.font.bold = True run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32) if incident.get("description"): desc_para = doc.add_paragraph() desc_para.alignment = WD_ALIGN_PARAGRAPH.CENTER run = desc_para.add_run(incident["description"]) run.font.size = Pt(11) run.font.color.rgb = RGBColor(0x66, 0x66, 0x66) doc.add_paragraph() for _ in range(3): doc.add_paragraph() now = datetime.now(TIMEZONE) meta_para = doc.add_paragraph() meta_para.alignment = WD_ALIGN_PARAGRAPH.CENTER run = meta_para.add_run(f"Stand: {now.strftime('%d.%m.%Y, %H:%M Uhr')}\nErstellt von: {creator}") run.font.size = Pt(9) run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32) doc.add_page_break() # --- Zusammenfassung / Executive Summary --- if "zusammenfassung" in sections: doc.add_heading(zusammenfassung_title, level=1) if zusammenfassung_lines: for line in zusammenfassung_lines: _add_docx_paragraph_with_citations(doc, line, all_sources, style='List Bullet') else: # Fallback: HTML-Tags aus executive_summary_text strippen, dann Bullets bilden clean_text = re.sub(r'<[^>]+>', '', zusammenfassung_text or '') lines = [line.strip().lstrip("- ").lstrip("* ") for line in clean_text.strip().split("\n") if line.strip()] for line in lines: if line: _add_docx_paragraph_with_citations(doc, line, all_sources, style='List Bullet') if "bericht" in sections: # --- Lagebild / Recherchebericht --- doc.add_heading("Recherchebericht" if is_research else "Lagebild", level=1) # Markdown-Formatierung entfernen, Zitate aber als [NNN] beibehalten und als Hyperlinks rendern clean_summary = re.sub(r'\*\*(.+?)\*\*', r'\1', bericht_summary) clean_summary = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', clean_summary) clean_summary = re.sub(r'^#{1,3}\s+', '', clean_summary, flags=re.MULTILINE) for para_text in clean_summary.split("\n\n"): para_text = para_text.strip() if not para_text: continue if para_text.startswith("- "): for bullet in para_text.split("\n"): bullet = bullet.lstrip("- ").strip() if bullet: _add_docx_paragraph_with_citations(doc, bullet, all_sources, style='List Bullet') else: _add_docx_paragraph_with_citations(doc, para_text, all_sources) if "faktencheck" in sections: # --- Faktencheck --- report_fcs = fact_checks if report_fcs: doc.add_heading("Faktencheck", level=1) table = doc.add_table(rows=1, cols=3) table.style = 'Table Grid' table.alignment = WD_TABLE_ALIGNMENT.CENTER hdr = table.rows[0].cells hdr[0].text = "Behauptung" hdr[1].text = "Status" hdr[2].text = "Quellen" for cell in hdr: for p in cell.paragraphs: p.runs[0].font.bold = True p.runs[0].font.size = Pt(9) for fc in report_fcs: row = table.add_row().cells row[0].text = fc.get("claim", "") row[1].text = FC_STATUS_LABELS.get(fc.get("status", ""), fc.get("status", "")) row[2].text = str(fc.get("sources_count", 0)) if "quellen" in sections: # --- Quellenstatistik --- source_stats = _prepare_source_stats(articles) if source_stats: doc.add_heading("Quellenstatistik", level=1) table = doc.add_table(rows=1, cols=3) table.style = 'Table Grid' table.alignment = WD_TABLE_ALIGNMENT.CENTER hdr = table.rows[0].cells hdr[0].text = "Quelle" hdr[1].text = "Artikel" hdr[2].text = "Sprache" for cell in hdr: for p in cell.paragraphs: p.runs[0].font.bold = True p.runs[0].font.size = Pt(9) for stat in source_stats: row = table.add_row().cells row[0].text = stat["name"] row[1].text = str(stat["count"]) row[2].text = stat["languages"] if "timeline" in sections: # --- Artikelverzeichnis --- if articles: doc.add_page_break() doc.add_heading(f"Artikelverzeichnis ({len(articles)} Artikel)", level=1) table = doc.add_table(rows=1, cols=4) table.style = 'Table Grid' table.alignment = WD_TABLE_ALIGNMENT.CENTER hdr = table.rows[0].cells for i, txt in enumerate(["Headline", "Quelle", "Sprache", "Datum"]): hdr[i].text = txt for p in hdr[i].paragraphs: p.runs[0].font.bold = True p.runs[0].font.size = Pt(8) for art in articles: row = table.add_row().cells row[0].text = art.get("headline_de") or art.get("headline") or "Ohne Titel" row[1].text = art.get("source") or "" row[2].text = (art.get("language") or "de").upper() pub = str(art.get("published_at") or art.get("collected_at") or "") try: dt = datetime.fromisoformat(pub.replace("Z", "+00:00")) row[3].text = dt.strftime("%d.%m.%Y") except Exception: row[3].text = pub[:10] if pub else "" # Schriftgröße reduzieren for cell in row: for p in cell.paragraphs: for run in p.runs: run.font.size = Pt(8) # --- Footer --- doc.add_paragraph() footer = doc.add_paragraph() footer.alignment = WD_ALIGN_PARAGRAPH.CENTER run = footer.add_run(f"Erstellt mit AegisSight Monitor — aegis-sight.de — {now.strftime('%d.%m.%Y')}") run.font.size = Pt(8) run.font.color.rgb = RGBColor(0x0a, 0x18, 0x32) buf = io.BytesIO() doc.save(buf) return buf.getvalue()