feat(sources): PDF-Dokumente als neuer Quellentyp pdf_document
- SOURCE_TYPE_PATTERN um pdf_document erweitert - src/services/pdf_ingest.py: pdfplumber + Tesseract-OCR-Fallback, Uebersetzung nach DE+EN, ein Pool-Artikel pro PDF - Scheduler-Job pdf_ingest laeuft im Minuten-Takt und verarbeitet pdf_document-Quellen mit processed_at IS NULL - scripts/migrate_pdf_source.py: idempotente DB-Migration (sources.pdf_path/pdf_sha256/processed_at, articles.headline_en/content_en) - requirements.txt: pdfplumber, pytesseract, pdf2image, Pillow
Dieser Commit ist enthalten in:
@@ -16,3 +16,8 @@ Jinja2>=3.1
|
|||||||
weasyprint>=68.0
|
weasyprint>=68.0
|
||||||
python-docx>=1.2
|
python-docx>=1.2
|
||||||
pikepdf>=9.0
|
pikepdf>=9.0
|
||||||
|
# PDF-Quellen (Ingestion)
|
||||||
|
pdfplumber>=0.11
|
||||||
|
pytesseract>=0.3
|
||||||
|
pdf2image>=1.17
|
||||||
|
Pillow>=10.0
|
||||||
|
|||||||
34
scripts/migrate_pdf_source.py
Normale Datei
34
scripts/migrate_pdf_source.py
Normale Datei
@@ -0,0 +1,34 @@
|
|||||||
|
"""Idempotente Migration: Quellen-Typ pdf_document + EN-Spalten in articles.
|
||||||
|
|
||||||
|
Beim Live-Promote anwenden:
|
||||||
|
python3 scripts/migrate_pdf_source.py /home/claude-dev/osint-data/osint.db
|
||||||
|
"""
|
||||||
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
def add_col(db, table, col_def):
|
||||||
|
name = col_def.split()[0]
|
||||||
|
cols = {r[1] for r in db.execute(f"PRAGMA table_info({table})").fetchall()}
|
||||||
|
if name in cols:
|
||||||
|
return False
|
||||||
|
db.execute(f"ALTER TABLE {table} ADD COLUMN {col_def}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def main(path):
|
||||||
|
with sqlite3.connect(path) as db:
|
||||||
|
for col in ("pdf_path TEXT", "pdf_sha256 TEXT", "processed_at TIMESTAMP"):
|
||||||
|
print(f"sources.{col.split()[0]}:", "added" if add_col(db, "sources", col) else "exists")
|
||||||
|
for col in ("headline_en TEXT", "content_en TEXT"):
|
||||||
|
print(f"articles.{col.split()[0]}:", "added" if add_col(db, "articles", col) else "exists")
|
||||||
|
db.execute("CREATE INDEX IF NOT EXISTS idx_sources_pdf_sha256 ON sources(pdf_sha256)")
|
||||||
|
db.commit()
|
||||||
|
print("DONE")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) != 2:
|
||||||
|
print("Usage: migrate_pdf_source.py /path/to/osint.db")
|
||||||
|
sys.exit(1)
|
||||||
|
main(sys.argv[1])
|
||||||
@@ -298,6 +298,8 @@ async def lifespan(app: FastAPI):
|
|||||||
orchestrator.set_ws_manager(ws_manager)
|
orchestrator.set_ws_manager(ws_manager)
|
||||||
await orchestrator.start()
|
await orchestrator.start()
|
||||||
|
|
||||||
|
from services import pdf_ingest as _pdf_ingest
|
||||||
|
scheduler.add_job(_pdf_ingest.run_once, "interval", minutes=1, id="pdf_ingest", max_instances=1, coalesce=True)
|
||||||
scheduler.add_job(check_auto_refresh, "interval", minutes=1, id="auto_refresh")
|
scheduler.add_job(check_auto_refresh, "interval", minutes=1, id="auto_refresh")
|
||||||
scheduler.add_job(cleanup_expired, "interval", hours=1, id="cleanup")
|
scheduler.add_job(cleanup_expired, "interval", hours=1, id="cleanup")
|
||||||
scheduler.add_job(daily_source_health_check, "cron", hour=4, minute=0, id="source_health")
|
scheduler.add_job(daily_source_health_check, "cron", hour=4, minute=0, id="source_health")
|
||||||
|
|||||||
@@ -140,7 +140,7 @@ class IncidentListItem(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
# Sources (Quellenverwaltung)
|
# Sources (Quellenverwaltung)
|
||||||
SOURCE_TYPE_PATTERN = "^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$"
|
SOURCE_TYPE_PATTERN = "^(rss_feed|web_source|excluded|telegram_channel|podcast_feed|pdf_document)$"
|
||||||
SOURCE_CATEGORY_PATTERN = "^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$"
|
SOURCE_CATEGORY_PATTERN = "^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$"
|
||||||
SOURCE_STATUS_PATTERN = "^(active|inactive)$"
|
SOURCE_STATUS_PATTERN = "^(active|inactive)$"
|
||||||
class SourceCreate(BaseModel):
|
class SourceCreate(BaseModel):
|
||||||
|
|||||||
237
src/services/pdf_ingest.py
Normale Datei
237
src/services/pdf_ingest.py
Normale Datei
@@ -0,0 +1,237 @@
|
|||||||
|
"""PDF-Ingest: liest hochgeladene PDFs ein und legt sie als Pool-Artikel ab.
|
||||||
|
|
||||||
|
Quellen vom Typ `pdf_document` werden in der Verwaltung angelegt
|
||||||
|
(`processed_at IS NULL`). Dieser Service pollt sie, extrahiert den Text,
|
||||||
|
uebersetzt nach DE+EN und schreibt EINEN Artikel (incident_id=NULL) in
|
||||||
|
`articles`. Idempotent ueber `processed_at`.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import aiosqlite
|
||||||
|
|
||||||
|
from config import DB_PATH, CLAUDE_MODEL_FAST
|
||||||
|
from agents.claude_client import call_claude
|
||||||
|
|
||||||
|
logger = logging.getLogger("osint.pdf_ingest")
|
||||||
|
|
||||||
|
MAX_CHARS_PER_PDF = 200_000 # harte Obergrenze, schuetzt vor riesigen Dumps
|
||||||
|
TRANSLATE_INPUT_MAX = 12_000 # was wir dem LLM zum Uebersetzen geben (Cost-Control)
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_text_pdfplumber(path: str) -> str:
|
||||||
|
import pdfplumber
|
||||||
|
parts: list[str] = []
|
||||||
|
with pdfplumber.open(path) as pdf:
|
||||||
|
for page in pdf.pages:
|
||||||
|
t = page.extract_text() or ""
|
||||||
|
if t:
|
||||||
|
parts.append(t)
|
||||||
|
return "\n\n".join(parts).strip()
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_text_ocr(path: str) -> str:
|
||||||
|
"""Tesseract-Fallback ueber pdf2image -> Pillow -> pytesseract."""
|
||||||
|
from pdf2image import convert_from_path
|
||||||
|
import pytesseract
|
||||||
|
images = convert_from_path(path, dpi=200)
|
||||||
|
parts = []
|
||||||
|
for img in images:
|
||||||
|
# deu+eng zusammen, damit mehrsprachige PDFs gehen
|
||||||
|
t = pytesseract.image_to_string(img, lang="deu+eng")
|
||||||
|
if t and t.strip():
|
||||||
|
parts.append(t.strip())
|
||||||
|
return "\n\n".join(parts).strip()
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_text(path: str) -> tuple[str, str]:
|
||||||
|
"""Gibt (text, method) zurueck. method: 'pdfplumber' oder 'ocr'."""
|
||||||
|
try:
|
||||||
|
text = _extract_text_pdfplumber(path)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("pdfplumber-Extraktion fehlgeschlagen fuer %s: %s", path, e)
|
||||||
|
text = ""
|
||||||
|
if len(text) >= 50:
|
||||||
|
return text[:MAX_CHARS_PER_PDF], "pdfplumber"
|
||||||
|
logger.info("PDF hat keinen Text-Layer (oder <50 Zeichen), versuche OCR: %s", path)
|
||||||
|
text = _extract_text_ocr(path)
|
||||||
|
return text[:MAX_CHARS_PER_PDF], "ocr"
|
||||||
|
|
||||||
|
|
||||||
|
def _derive_headline(text: str, fallback: str) -> str:
|
||||||
|
"""Erste sinnvolle Zeile als Headline; sonst Fallback (Dateiname)."""
|
||||||
|
for raw in text.splitlines():
|
||||||
|
line = raw.strip()
|
||||||
|
if 5 <= len(line) <= 200:
|
||||||
|
return line
|
||||||
|
return fallback.strip() or "Untitled PDF"
|
||||||
|
|
||||||
|
|
||||||
|
async def _translate(text: str, headline: str, target_lang: str) -> tuple[str, str]:
|
||||||
|
"""Uebersetzt Headline + Content nach target_lang ('de' oder 'en').
|
||||||
|
|
||||||
|
Eigene mini-Funktion (statt agents.translator), weil wir je PDF nur EIN
|
||||||
|
Item haben und Headline+Content getrennt brauchen. Returnt (headline_t, content_t).
|
||||||
|
Bei Fehler oder leerem Text: ('', '').
|
||||||
|
"""
|
||||||
|
if not text and not headline:
|
||||||
|
return "", ""
|
||||||
|
lang_label = {"de": "Deutsch", "en": "Englisch"}.get(target_lang, target_lang)
|
||||||
|
content_in = (text or "")[:TRANSLATE_INPUT_MAX]
|
||||||
|
prompt = f"""Du bist ein praeziser Uebersetzer fuer Sachtexte.
|
||||||
|
Uebersetze Headline und Inhalt nach {lang_label}.
|
||||||
|
|
||||||
|
WICHTIG:
|
||||||
|
- Verwende IMMER echte UTF-8-Umlaute (ae->ä, oe->ö, ue->ü, ss->ß) bei Deutsch.
|
||||||
|
- Behalte Eigennamen im Original.
|
||||||
|
- Wenn der Text schon auf {lang_label} ist, gib ihn (nahezu) unveraendert zurueck.
|
||||||
|
- Behalte die wichtigsten Inhalte; kuerze stark auf MAX 3000 Zeichen Content.
|
||||||
|
|
||||||
|
Antworte AUSSCHLIESSLICH mit einem JSON-Objekt im Format:
|
||||||
|
{{"headline": "...", "content": "..."}}
|
||||||
|
|
||||||
|
Keine Markdown-Codefence, keine Einleitung.
|
||||||
|
|
||||||
|
HEADLINE: {headline}
|
||||||
|
INHALT:
|
||||||
|
{content_in}
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result_text, _usage = await call_claude(prompt, tools=None, model=CLAUDE_MODEL_FAST)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("PDF-Translator (%s) Claude-Call fehlgeschlagen: %s", target_lang, e)
|
||||||
|
return "", ""
|
||||||
|
|
||||||
|
raw = result_text.strip()
|
||||||
|
if raw.startswith("```"):
|
||||||
|
raw = re.sub(r"^```(?:json)?\s*", "", raw)
|
||||||
|
raw = re.sub(r"\s*```\s*$", "", raw).strip()
|
||||||
|
try:
|
||||||
|
data = json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
m = re.search(r"\{.*\}", raw, re.DOTALL)
|
||||||
|
if not m:
|
||||||
|
logger.warning("PDF-Translator (%s) JSON nicht parsbar: %r", target_lang, raw[:200])
|
||||||
|
return "", ""
|
||||||
|
try:
|
||||||
|
data = json.loads(m.group(0))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return "", ""
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
return "", ""
|
||||||
|
return (data.get("headline") or "").strip(), (data.get("content") or "").strip()
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_one(db: aiosqlite.Connection, src: dict) -> None:
|
||||||
|
sid = src["id"]
|
||||||
|
name = src["name"] or "PDF"
|
||||||
|
rel_path = src["pdf_path"]
|
||||||
|
if not rel_path:
|
||||||
|
logger.warning("PDF-Source #%d ohne pdf_path, ueberspringe", sid)
|
||||||
|
return
|
||||||
|
|
||||||
|
abs_path = rel_path if os.path.isabs(rel_path) else os.path.join(
|
||||||
|
os.path.dirname(DB_PATH), rel_path
|
||||||
|
)
|
||||||
|
if not os.path.exists(abs_path):
|
||||||
|
logger.error("PDF-Datei fehlt fuer Source #%d: %s", sid, abs_path)
|
||||||
|
# auf processed_at setzen aber Notiz hinterlegen, damit kein Endlos-Retry
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE sources SET processed_at = CURRENT_TIMESTAMP, "
|
||||||
|
"notes = COALESCE(notes,'') || ' [PDF-Datei nicht gefunden]' WHERE id = ?",
|
||||||
|
(sid,),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("PDF-Ingest start: source #%d (%s)", sid, abs_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
text, method = await asyncio.to_thread(_extract_text, abs_path)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception("PDF-Extraktion fehlgeschlagen fuer #%d: %s", sid, e)
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE sources SET processed_at = CURRENT_TIMESTAMP, "
|
||||||
|
"notes = COALESCE(notes,'') || ' [PDF-Extraktion fehlgeschlagen]' WHERE id = ?",
|
||||||
|
(sid,),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
return
|
||||||
|
|
||||||
|
if not text:
|
||||||
|
logger.warning("PDF #%d ergab keinen Text (auch OCR leer)", sid)
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE sources SET processed_at = CURRENT_TIMESTAMP, "
|
||||||
|
"notes = COALESCE(notes,'') || ' [PDF leer/nicht lesbar]' WHERE id = ?",
|
||||||
|
(sid,),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
return
|
||||||
|
|
||||||
|
fallback_name = re.sub(r"\.pdf$", "", os.path.basename(abs_path), flags=re.I)
|
||||||
|
headline = _derive_headline(text, fallback_name)
|
||||||
|
# Hochgeladene PDFs sind meist deutsch oder englisch; LLM kann das im Prompt erkennen
|
||||||
|
src_lang = (src.get("language") or "").lower() or "auto"
|
||||||
|
|
||||||
|
# Wir senden parallel DE + EN
|
||||||
|
(de_h, de_c), (en_h, en_c) = await asyncio.gather(
|
||||||
|
_translate(text, headline, "de"),
|
||||||
|
_translate(text, headline, "en"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Originaltext kappen, damit articles-Tabelle handhabbar bleibt
|
||||||
|
content_original = text[:5000]
|
||||||
|
|
||||||
|
await db.execute(
|
||||||
|
"""INSERT INTO articles (incident_id, headline, headline_de, headline_en,
|
||||||
|
source, source_url, content_original, content_de, content_en, language,
|
||||||
|
published_at, tenant_id, verification_status)
|
||||||
|
VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, ?, 'unverified')""",
|
||||||
|
(
|
||||||
|
headline,
|
||||||
|
de_h or None,
|
||||||
|
en_h or None,
|
||||||
|
name,
|
||||||
|
f"pdf://{src.get('pdf_sha256') or sid}",
|
||||||
|
content_original,
|
||||||
|
de_c or None,
|
||||||
|
en_c or None,
|
||||||
|
src_lang if src_lang != "auto" else None,
|
||||||
|
src.get("tenant_id"),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
await db.execute(
|
||||||
|
"UPDATE sources SET processed_at = CURRENT_TIMESTAMP, article_count = article_count + 1, "
|
||||||
|
"last_seen_at = CURRENT_TIMESTAMP WHERE id = ?",
|
||||||
|
(sid,),
|
||||||
|
)
|
||||||
|
await db.commit()
|
||||||
|
logger.info("PDF-Ingest fertig: source #%d (%s, %d Zeichen)", sid, method, len(text))
|
||||||
|
|
||||||
|
|
||||||
|
async def run_once() -> int:
|
||||||
|
"""Verarbeitet alle pdf_document-Sources ohne processed_at. Returnt Anzahl.
|
||||||
|
|
||||||
|
Wird vom APScheduler als interval-Job aufgerufen. Pro Tick max 5 PDFs,
|
||||||
|
damit ein hochgeladener Stapel nicht einen einzelnen Lauf monopolisiert.
|
||||||
|
"""
|
||||||
|
async with aiosqlite.connect(DB_PATH) as db:
|
||||||
|
db.row_factory = aiosqlite.Row
|
||||||
|
cursor = await db.execute(
|
||||||
|
"SELECT id, name, pdf_path, pdf_sha256, language, tenant_id "
|
||||||
|
"FROM sources WHERE source_type = 'pdf_document' AND processed_at IS NULL "
|
||||||
|
"ORDER BY created_at ASC LIMIT 5"
|
||||||
|
)
|
||||||
|
rows = [dict(r) for r in await cursor.fetchall()]
|
||||||
|
for src in rows:
|
||||||
|
try:
|
||||||
|
await _process_one(db, src)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("PDF-Ingest unerwarteter Fehler bei source #%d", src["id"])
|
||||||
|
return len(rows)
|
||||||
In neuem Issue referenzieren
Einen Benutzer sperren