From e68386f6bb28098fe5df3d761e80ed9d1e4eebea Mon Sep 17 00:00:00 2001 From: Claude Code Date: Sat, 16 May 2026 23:21:50 +0000 Subject: [PATCH 1/2] feat(sources): PDF-Dokumente als neuer Quellentyp pdf_document - SOURCE_TYPE_PATTERN um pdf_document erweitert - src/services/pdf_ingest.py: pdfplumber + Tesseract-OCR-Fallback, Uebersetzung nach DE+EN, ein Pool-Artikel pro PDF - Scheduler-Job pdf_ingest laeuft im Minuten-Takt und verarbeitet pdf_document-Quellen mit processed_at IS NULL - scripts/migrate_pdf_source.py: idempotente DB-Migration (sources.pdf_path/pdf_sha256/processed_at, articles.headline_en/content_en) - requirements.txt: pdfplumber, pytesseract, pdf2image, Pillow --- requirements.txt | 5 + scripts/migrate_pdf_source.py | 34 +++++ src/main.py | 2 + src/models.py | 2 +- src/services/pdf_ingest.py | 237 ++++++++++++++++++++++++++++++++++ 5 files changed, 279 insertions(+), 1 deletion(-) create mode 100644 scripts/migrate_pdf_source.py create mode 100644 src/services/pdf_ingest.py diff --git a/requirements.txt b/requirements.txt index 6880663..2c90cc1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,8 @@ Jinja2>=3.1 weasyprint>=68.0 python-docx>=1.2 pikepdf>=9.0 +# PDF-Quellen (Ingestion) +pdfplumber>=0.11 +pytesseract>=0.3 +pdf2image>=1.17 +Pillow>=10.0 diff --git a/scripts/migrate_pdf_source.py b/scripts/migrate_pdf_source.py new file mode 100644 index 0000000..6ba180c --- /dev/null +++ b/scripts/migrate_pdf_source.py @@ -0,0 +1,34 @@ +"""Idempotente Migration: Quellen-Typ pdf_document + EN-Spalten in articles. + +Beim Live-Promote anwenden: + python3 scripts/migrate_pdf_source.py /home/claude-dev/osint-data/osint.db +""" +import sqlite3 +import sys + + +def add_col(db, table, col_def): + name = col_def.split()[0] + cols = {r[1] for r in db.execute(f"PRAGMA table_info({table})").fetchall()} + if name in cols: + return False + db.execute(f"ALTER TABLE {table} ADD COLUMN {col_def}") + return True + + +def main(path): + with sqlite3.connect(path) as db: + for col in ("pdf_path TEXT", "pdf_sha256 TEXT", "processed_at TIMESTAMP"): + print(f"sources.{col.split()[0]}:", "added" if add_col(db, "sources", col) else "exists") + for col in ("headline_en TEXT", "content_en TEXT"): + print(f"articles.{col.split()[0]}:", "added" if add_col(db, "articles", col) else "exists") + db.execute("CREATE INDEX IF NOT EXISTS idx_sources_pdf_sha256 ON sources(pdf_sha256)") + db.commit() + print("DONE") + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: migrate_pdf_source.py /path/to/osint.db") + sys.exit(1) + main(sys.argv[1]) diff --git a/src/main.py b/src/main.py index ff98a9e..e744133 100644 --- a/src/main.py +++ b/src/main.py @@ -298,6 +298,8 @@ async def lifespan(app: FastAPI): orchestrator.set_ws_manager(ws_manager) await orchestrator.start() + from services import pdf_ingest as _pdf_ingest + scheduler.add_job(_pdf_ingest.run_once, "interval", minutes=1, id="pdf_ingest", max_instances=1, coalesce=True) scheduler.add_job(check_auto_refresh, "interval", minutes=1, id="auto_refresh") scheduler.add_job(cleanup_expired, "interval", hours=1, id="cleanup") scheduler.add_job(daily_source_health_check, "cron", hour=4, minute=0, id="source_health") diff --git a/src/models.py b/src/models.py index 6f9dc71..1ac356d 100644 --- a/src/models.py +++ b/src/models.py @@ -140,7 +140,7 @@ class IncidentListItem(BaseModel): # Sources (Quellenverwaltung) -SOURCE_TYPE_PATTERN = "^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$" +SOURCE_TYPE_PATTERN = "^(rss_feed|web_source|excluded|telegram_channel|podcast_feed|pdf_document)$" SOURCE_CATEGORY_PATTERN = "^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$" SOURCE_STATUS_PATTERN = "^(active|inactive)$" class SourceCreate(BaseModel): diff --git a/src/services/pdf_ingest.py b/src/services/pdf_ingest.py new file mode 100644 index 0000000..2486fa7 --- /dev/null +++ b/src/services/pdf_ingest.py @@ -0,0 +1,237 @@ +"""PDF-Ingest: liest hochgeladene PDFs ein und legt sie als Pool-Artikel ab. + +Quellen vom Typ `pdf_document` werden in der Verwaltung angelegt +(`processed_at IS NULL`). Dieser Service pollt sie, extrahiert den Text, +uebersetzt nach DE+EN und schreibt EINEN Artikel (incident_id=NULL) in +`articles`. Idempotent ueber `processed_at`. +""" +from __future__ import annotations + +import asyncio +import json +import logging +import os +import re +from typing import Optional + +import aiosqlite + +from config import DB_PATH, CLAUDE_MODEL_FAST +from agents.claude_client import call_claude + +logger = logging.getLogger("osint.pdf_ingest") + +MAX_CHARS_PER_PDF = 200_000 # harte Obergrenze, schuetzt vor riesigen Dumps +TRANSLATE_INPUT_MAX = 12_000 # was wir dem LLM zum Uebersetzen geben (Cost-Control) + + +def _extract_text_pdfplumber(path: str) -> str: + import pdfplumber + parts: list[str] = [] + with pdfplumber.open(path) as pdf: + for page in pdf.pages: + t = page.extract_text() or "" + if t: + parts.append(t) + return "\n\n".join(parts).strip() + + +def _extract_text_ocr(path: str) -> str: + """Tesseract-Fallback ueber pdf2image -> Pillow -> pytesseract.""" + from pdf2image import convert_from_path + import pytesseract + images = convert_from_path(path, dpi=200) + parts = [] + for img in images: + # deu+eng zusammen, damit mehrsprachige PDFs gehen + t = pytesseract.image_to_string(img, lang="deu+eng") + if t and t.strip(): + parts.append(t.strip()) + return "\n\n".join(parts).strip() + + +def _extract_text(path: str) -> tuple[str, str]: + """Gibt (text, method) zurueck. method: 'pdfplumber' oder 'ocr'.""" + try: + text = _extract_text_pdfplumber(path) + except Exception as e: + logger.warning("pdfplumber-Extraktion fehlgeschlagen fuer %s: %s", path, e) + text = "" + if len(text) >= 50: + return text[:MAX_CHARS_PER_PDF], "pdfplumber" + logger.info("PDF hat keinen Text-Layer (oder <50 Zeichen), versuche OCR: %s", path) + text = _extract_text_ocr(path) + return text[:MAX_CHARS_PER_PDF], "ocr" + + +def _derive_headline(text: str, fallback: str) -> str: + """Erste sinnvolle Zeile als Headline; sonst Fallback (Dateiname).""" + for raw in text.splitlines(): + line = raw.strip() + if 5 <= len(line) <= 200: + return line + return fallback.strip() or "Untitled PDF" + + +async def _translate(text: str, headline: str, target_lang: str) -> tuple[str, str]: + """Uebersetzt Headline + Content nach target_lang ('de' oder 'en'). + + Eigene mini-Funktion (statt agents.translator), weil wir je PDF nur EIN + Item haben und Headline+Content getrennt brauchen. Returnt (headline_t, content_t). + Bei Fehler oder leerem Text: ('', ''). + """ + if not text and not headline: + return "", "" + lang_label = {"de": "Deutsch", "en": "Englisch"}.get(target_lang, target_lang) + content_in = (text or "")[:TRANSLATE_INPUT_MAX] + prompt = f"""Du bist ein praeziser Uebersetzer fuer Sachtexte. +Uebersetze Headline und Inhalt nach {lang_label}. + +WICHTIG: +- Verwende IMMER echte UTF-8-Umlaute (ae->ä, oe->ö, ue->ü, ss->ß) bei Deutsch. +- Behalte Eigennamen im Original. +- Wenn der Text schon auf {lang_label} ist, gib ihn (nahezu) unveraendert zurueck. +- Behalte die wichtigsten Inhalte; kuerze stark auf MAX 3000 Zeichen Content. + +Antworte AUSSCHLIESSLICH mit einem JSON-Objekt im Format: +{{"headline": "...", "content": "..."}} + +Keine Markdown-Codefence, keine Einleitung. + +HEADLINE: {headline} +INHALT: +{content_in} +""" + try: + result_text, _usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST) + except Exception as e: + logger.warning("PDF-Translator (%s) Claude-Call fehlgeschlagen: %s", target_lang, e) + return "", "" + + raw = result_text.strip() + if raw.startswith("```"): + raw = re.sub(r"^```(?:json)?\s*", "", raw) + raw = re.sub(r"\s*```\s*$", "", raw).strip() + try: + data = json.loads(raw) + except json.JSONDecodeError: + m = re.search(r"\{.*\}", raw, re.DOTALL) + if not m: + logger.warning("PDF-Translator (%s) JSON nicht parsbar: %r", target_lang, raw[:200]) + return "", "" + try: + data = json.loads(m.group(0)) + except json.JSONDecodeError: + return "", "" + if not isinstance(data, dict): + return "", "" + return (data.get("headline") or "").strip(), (data.get("content") or "").strip() + + +async def _process_one(db: aiosqlite.Connection, src: dict) -> None: + sid = src["id"] + name = src["name"] or "PDF" + rel_path = src["pdf_path"] + if not rel_path: + logger.warning("PDF-Source #%d ohne pdf_path, ueberspringe", sid) + return + + abs_path = rel_path if os.path.isabs(rel_path) else os.path.join( + os.path.dirname(DB_PATH), rel_path + ) + if not os.path.exists(abs_path): + logger.error("PDF-Datei fehlt fuer Source #%d: %s", sid, abs_path) + # auf processed_at setzen aber Notiz hinterlegen, damit kein Endlos-Retry + await db.execute( + "UPDATE sources SET processed_at = CURRENT_TIMESTAMP, " + "notes = COALESCE(notes,'') || ' [PDF-Datei nicht gefunden]' WHERE id = ?", + (sid,), + ) + await db.commit() + return + + logger.info("PDF-Ingest start: source #%d (%s)", sid, abs_path) + + try: + text, method = await asyncio.to_thread(_extract_text, abs_path) + except Exception as e: + logger.exception("PDF-Extraktion fehlgeschlagen fuer #%d: %s", sid, e) + await db.execute( + "UPDATE sources SET processed_at = CURRENT_TIMESTAMP, " + "notes = COALESCE(notes,'') || ' [PDF-Extraktion fehlgeschlagen]' WHERE id = ?", + (sid,), + ) + await db.commit() + return + + if not text: + logger.warning("PDF #%d ergab keinen Text (auch OCR leer)", sid) + await db.execute( + "UPDATE sources SET processed_at = CURRENT_TIMESTAMP, " + "notes = COALESCE(notes,'') || ' [PDF leer/nicht lesbar]' WHERE id = ?", + (sid,), + ) + await db.commit() + return + + fallback_name = re.sub(r"\.pdf$", "", os.path.basename(abs_path), flags=re.I) + headline = _derive_headline(text, fallback_name) + # Hochgeladene PDFs sind meist deutsch oder englisch; LLM kann das im Prompt erkennen + src_lang = (src.get("language") or "").lower() or "auto" + + # Wir senden parallel DE + EN + (de_h, de_c), (en_h, en_c) = await asyncio.gather( + _translate(text, headline, "de"), + _translate(text, headline, "en"), + ) + + # Originaltext kappen, damit articles-Tabelle handhabbar bleibt + content_original = text[:5000] + + await db.execute( + """INSERT INTO articles (incident_id, headline, headline_de, headline_en, + source, source_url, content_original, content_de, content_en, language, + published_at, tenant_id, verification_status) + VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, ?, 'unverified')""", + ( + headline, + de_h or None, + en_h or None, + name, + f"pdf://{src.get('pdf_sha256') or sid}", + content_original, + de_c or None, + en_c or None, + src_lang if src_lang != "auto" else None, + src.get("tenant_id"), + ), + ) + await db.execute( + "UPDATE sources SET processed_at = CURRENT_TIMESTAMP, article_count = article_count + 1, " + "last_seen_at = CURRENT_TIMESTAMP WHERE id = ?", + (sid,), + ) + await db.commit() + logger.info("PDF-Ingest fertig: source #%d (%s, %d Zeichen)", sid, method, len(text)) + + +async def run_once() -> int: + """Verarbeitet alle pdf_document-Sources ohne processed_at. Returnt Anzahl. + + Wird vom APScheduler als interval-Job aufgerufen. Pro Tick max 5 PDFs, + damit ein hochgeladener Stapel nicht einen einzelnen Lauf monopolisiert. + """ + async with aiosqlite.connect(DB_PATH) as db: + db.row_factory = aiosqlite.Row + cursor = await db.execute( + "SELECT id, name, pdf_path, pdf_sha256, language, tenant_id " + "FROM sources WHERE source_type = 'pdf_document' AND processed_at IS NULL " + "ORDER BY created_at ASC LIMIT 5" + ) + rows = [dict(r) for r in await cursor.fetchall()] + for src in rows: + try: + await _process_one(db, src) + except Exception: + logger.exception("PDF-Ingest unerwarteter Fehler bei source #%d", src["id"]) + return len(rows) -- 2.49.1 From 168fbc3987cdaa959e39a1561d50bb7211ac6e7e Mon Sep 17 00:00:00 2001 From: Claude Code Date: Sat, 16 May 2026 23:57:32 +0000 Subject: [PATCH 2/2] feat(sources): PDF-Upload auch in der Endkunden-App (Kundenquelle) - POST /api/sources/upload-pdf: tenant-scoped Upload, gleiche Speicher- Konvention wie der Verwaltungs-Endpoint (/pdfs/{sha}.pdf). Duplikat-Check beruecksichtigt globale Quellen. - dashboard.html: +PDF-Button in der Quellenverwaltungs-Toolbar + eigenes Modal modal-pdf-upload (closeModal-Quotes via '). - app.js: App.openPdfUpload + _bindPdfUploadFormOnce (Submit nur einmal binden). - api.js: API.upload(path, formData) Helper analog Verwaltung. --- src/routers/sources.py | 115 +++++++++++++++++++++++++++++++++++++- src/static/dashboard.html | 52 +++++++++++++++++ src/static/js/api.js | 25 +++++++++ src/static/js/app.js | 64 +++++++++++++++++++++ 4 files changed, 255 insertions(+), 1 deletion(-) diff --git a/src/routers/sources.py b/src/routers/sources.py index f1e35bd..b61d0a7 100644 --- a/src/routers/sources.py +++ b/src/routers/sources.py @@ -1,13 +1,19 @@ """Sources-Router: Quellenverwaltung (Multi-Tenant). Klassifikation: Read-Only — Pflege in der Verwaltung.""" import json import logging +import uuid +import re +import os +import hashlib from collections import defaultdict -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status from models import SourceCreate, SourceUpdate, SourceResponse, DiscoverRequest, DiscoverResponse, DiscoverMultiResponse, DomainActionRequest from auth import get_current_user from database import db_dependency, refresh_source_counts from source_rules import discover_source, discover_all_feeds, evaluate_feeds_with_claude, _extract_domain, _detect_category, domain_to_display_name, _DOMAIN_ALIASES import aiosqlite +from config import DB_PATH +from typing import Optional logger = logging.getLogger("osint.sources") @@ -640,3 +646,110 @@ async def trigger_refresh_counts( await refresh_source_counts(db) return {"status": "ok"} + +# --- PDF-Upload (Kundenquelle vom Typ pdf_document) --- +# Analog zum Verwaltungs-Upload, aber tenant-spezifisch. +# Datei landet unter /pdfs/{sha256}.pdf. +# Der Worker (services.pdf_ingest) verarbeitet sie asynchron im Minutentakt. + +MAX_PDF_SIZE_BYTES = 50 * 1024 * 1024 # 50 MB +PDF_DIR = os.path.join(os.path.dirname(os.path.abspath(DB_PATH)), "pdfs") + + +def _pdf_dir() -> str: + os.makedirs(PDF_DIR, exist_ok=True) + return PDF_DIR + + +@router.post("/upload-pdf", status_code=status.HTTP_201_CREATED) +async def upload_pdf_source( + current_user: dict = Depends(get_current_user), + db: aiosqlite.Connection = Depends(db_dependency), + file: UploadFile = File(...), + name: Optional[str] = Form(None), + category: str = Form("sonstige"), + language: Optional[str] = Form(None), + notes: Optional[str] = Form(None), +): + """PDF hochladen + als Kundenquelle (source_type=pdf_document) registrieren. + + Idempotent ueber SHA256 innerhalb des Tenants: doppelter Upload erzeugt 409. + """ + head = await file.read(8) + if not head.startswith(b"%PDF-"): + raise HTTPException(status_code=415, detail="Datei ist kein gueltiges PDF") + + tenant_id = current_user.get("tenant_id") + sha = hashlib.sha256() + sha.update(head) + total = len(head) + tmp_path = os.path.join(_pdf_dir(), f".upload-{uuid.uuid4().hex}.tmp") + try: + with open(tmp_path, "wb") as out: + out.write(head) + while True: + chunk = await file.read(1024 * 1024) + if not chunk: + break + total += len(chunk) + if total > MAX_PDF_SIZE_BYTES: + raise HTTPException(status_code=413, detail=f"PDF ueberschreitet {MAX_PDF_SIZE_BYTES // 1024 // 1024} MB") + sha.update(chunk) + out.write(chunk) + sha_hex = sha.hexdigest() + final_path = os.path.join(_pdf_dir(), f"{sha_hex}.pdf") + rel_path = os.path.join("pdfs", f"{sha_hex}.pdf") + + # Duplikat-Pruefung innerhalb des Tenants (oder global, falls eine + # gleiche PDF bereits als Grundquelle existiert -> dann sichtbar fuer alle). + cursor = await db.execute( + "SELECT id, name, tenant_id FROM sources WHERE pdf_sha256 = ? " + "AND (tenant_id IS NULL OR tenant_id = ?)", + (sha_hex, tenant_id), + ) + existing = await cursor.fetchone() + if existing: + os.unlink(tmp_path) + scope = "global" if existing["tenant_id"] is None else "Ihrer Organisation" + raise HTTPException( + status_code=409, + detail=f"PDF bereits in {scope} vorhanden als Quelle '{existing['name']}' (id={existing['id']})", + ) + + if not os.path.exists(final_path): + os.replace(tmp_path, final_path) + else: + os.unlink(tmp_path) + except HTTPException: + if os.path.exists(tmp_path): + try: os.unlink(tmp_path) + except OSError: pass + raise + except Exception as e: + if os.path.exists(tmp_path): + try: os.unlink(tmp_path) + except OSError: pass + logger.exception("PDF-Upload (tenant) fehlgeschlagen") + raise HTTPException(status_code=500, detail=f"PDF-Upload fehlgeschlagen: {e}") + + display_name = (name or "").strip() or re.sub(r"\.pdf$", "", file.filename or "PDF", flags=re.I) + display_name = display_name[:200] + + cursor = await db.execute( + """INSERT INTO sources + (name, url, domain, source_type, category, status, notes, language, + pdf_path, pdf_sha256, added_by, tenant_id) + VALUES (?, NULL, NULL, 'pdf_document', ?, 'active', ?, ?, ?, ?, ?, ?)""", + (display_name, category, notes, language, rel_path, sha_hex, + current_user["username"], tenant_id), + ) + src_id = cursor.lastrowid + await db.commit() + + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (src_id,)) + row = await cursor.fetchone() + result = dict(row) + result["is_global"] = result.get("tenant_id") is None + result["state_affiliated"] = bool(result.get("state_affiliated")) + result["alignments"] = [] + return result diff --git a/src/static/dashboard.html b/src/static/dashboard.html index 427aa6c..b5ee2fc 100644 --- a/src/static/dashboard.html +++ b/src/static/dashboard.html @@ -555,6 +555,7 @@
+
@@ -633,6 +634,57 @@ + + +