Dateien
AegisSight-Monitor/src/routers/sources.py
Claude Code 9c50439785 feat(x): X (Twitter) als Bezugsquelle pro Lage
X-Accounts werden analog zu Telegram als Quelle (source_type=x_account)
konfiguriert und pro Lage ueber include_x zugeschaltet. Der Scraper
(feeds/x_parser.py, twscrape) liest Account-Timelines, optional ueber
einen HTTP-Proxy mit Fallback auf direkten Abruf ueber die Server-IP.

- DB-Migration include_x, Pydantic-Modelle, incidents-Router
- Orchestrator-X-Pipeline plus Haiku-Account-Vorselektion
- sources-Router /x/validate, x_account-Typ in Stats und Frontend
- Lage-Einstellungen: X-Toggle neben international und Telegram
- twscrape als Abhaengigkeit

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 06:52:19 +00:00

781 Zeilen
28 KiB
Python

"""Sources-Router: Quellenverwaltung (Multi-Tenant). Klassifikation: Read-Only — Pflege in der Verwaltung."""
import json
import logging
import uuid
import re
import os
import hashlib
from collections import defaultdict
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile, status
from models import SourceCreate, SourceUpdate, SourceResponse, DiscoverRequest, DiscoverResponse, DiscoverMultiResponse, DomainActionRequest
from auth import get_current_user
from database import db_dependency, refresh_source_counts
from source_rules import discover_source, discover_all_feeds, evaluate_feeds_with_claude, _extract_domain, _detect_category, domain_to_display_name, _DOMAIN_ALIASES
import aiosqlite
from config import DB_PATH
from typing import Optional
logger = logging.getLogger("osint.sources")
router = APIRouter(prefix="/api/sources", tags=["sources"])
SOURCE_UPDATE_COLUMNS = {
"name", "url", "domain", "source_type", "category", "status", "notes",
"language", "bias",
}
async def _load_alignments_for(db: aiosqlite.Connection, source_ids: list[int]) -> dict[int, list[str]]:
"""Lädt alignments fuer mehrere Quellen — Read-Only fuer Anzeige (Pflege in Verwaltung)."""
if not source_ids:
return {}
placeholders = ",".join("?" for _ in source_ids)
cursor = await db.execute(
f"SELECT source_id, alignment FROM source_alignments WHERE source_id IN ({placeholders}) ORDER BY alignment",
source_ids,
)
out: dict[int, list[str]] = {sid: [] for sid in source_ids}
for row in await cursor.fetchall():
out.setdefault(row["source_id"], []).append(row["alignment"])
return out
def _check_source_ownership(source: dict, username: str):
"""Prueft ob der Nutzer die Quelle bearbeiten/loeschen darf.
System-Quellen (auto-entdeckt) duerfen von jedem bearbeitet werden.
Nutzer-Quellen nur vom Ersteller.
"""
added_by = source.get("added_by", "")
if added_by and added_by != "system" and added_by != username:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Nur der Ersteller kann diese Quelle bearbeiten",
)
@router.get("", response_model=list[SourceResponse])
async def list_sources(
source_type: str = None,
category: str = None,
source_status: str = None,
political_orientation: str = None,
media_type: str = None,
reliability: str = None,
state_affiliated: bool = None,
alignment: str = None,
ifcn_signatory: bool = None,
eu_disinfo_listed: bool = None,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Quellen auflisten (global + org-spezifisch)."""
tenant_id = current_user.get("tenant_id")
# Global (tenant_id=NULL) + eigene Org
query = "SELECT s.* FROM sources s WHERE (s.tenant_id IS NULL OR s.tenant_id = ?)"
params: list = [tenant_id]
if source_type:
query += " AND s.source_type = ?"
params.append(source_type)
if category:
query += " AND s.category = ?"
params.append(category)
if source_status:
query += " AND s.status = ?"
params.append(source_status)
if political_orientation:
query += " AND s.political_orientation = ?"
params.append(political_orientation)
if media_type:
query += " AND s.media_type = ?"
params.append(media_type)
if reliability:
query += " AND s.reliability = ?"
params.append(reliability)
if state_affiliated is not None:
query += " AND s.state_affiliated = ?"
params.append(1 if state_affiliated else 0)
if alignment:
query += " AND EXISTS (SELECT 1 FROM source_alignments sa WHERE sa.source_id = s.id AND sa.alignment = ?)"
params.append(alignment.lower())
if ifcn_signatory is not None:
query += " AND s.ifcn_signatory = ?"
params.append(1 if ifcn_signatory else 0)
if eu_disinfo_listed is not None:
query += " AND s.eu_disinfo_listed = ?"
params.append(1 if eu_disinfo_listed else 0)
query += " ORDER BY s.source_type, s.category, s.name"
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
results = [dict(row) for row in rows]
alignments_map = await _load_alignments_for(db, [r["id"] for r in results])
for d in results:
d["is_global"] = d.get("tenant_id") is None
d["state_affiliated"] = bool(d.get("state_affiliated"))
d["ifcn_signatory"] = bool(d.get("ifcn_signatory"))
d["eu_disinfo_listed"] = bool(d.get("eu_disinfo_listed"))
d["alignments"] = alignments_map.get(d["id"], [])
return results
@router.get("/stats")
async def get_source_stats(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Aggregierte Quellen-Statistiken (global + eigene Org)."""
tenant_id = current_user.get("tenant_id")
cursor = await db.execute("""
SELECT
source_type,
COUNT(*) as count,
SUM(article_count) as total_articles
FROM sources
WHERE status = 'active' AND (tenant_id IS NULL OR tenant_id = ?)
GROUP BY source_type
""", (tenant_id,))
rows = await cursor.fetchall()
stats = {
"rss_feed": {"count": 0, "articles": 0},
"web_source": {"count": 0, "articles": 0},
"telegram_channel": {"count": 0, "articles": 0},
"x_account": {"count": 0, "articles": 0},
"excluded": {"count": 0, "articles": 0},
}
for row in rows:
st = row["source_type"]
if st in stats:
stats[st]["count"] = row["count"]
stats[st]["articles"] = row["total_articles"] or 0
cursor = await db.execute(
"SELECT COUNT(*) as cnt FROM articles WHERE tenant_id = ?",
(tenant_id,),
)
total_row = await cursor.fetchone()
return {
"by_type": stats,
"total_sources": sum(s["count"] for s in stats.values()),
"total_articles": total_row["cnt"],
}
@router.post("/discover", response_model=DiscoverResponse)
async def discover_source_endpoint(
data: DiscoverRequest,
current_user: dict = Depends(get_current_user),
):
"""RSS-Feed, Name, Kategorie und Domain einer URL automatisch erkennen."""
try:
result = await discover_source(data.url)
return result
except Exception as e:
logger.error(f"Discovery fehlgeschlagen: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Discovery fehlgeschlagen")
@router.post("/discover-multi", response_model=DiscoverMultiResponse)
async def discover_multi_endpoint(
data: DiscoverRequest,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Findet ALLE RSS-Feeds einer Domain, bewertet sie mit Claude und legt relevante als Quellen an."""
tenant_id = current_user.get("tenant_id")
try:
multi = await discover_all_feeds(data.url)
domain = multi["domain"]
category = multi["category"]
if not multi["feeds"]:
single = await discover_source(data.url)
sources = []
if single.get("rss_url"):
cursor = await db.execute(
"SELECT id FROM sources WHERE url = ?", (single["rss_url"],)
)
existing = await cursor.fetchone()
if not existing:
cursor = await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
VALUES (?, ?, ?, ?, ?, 'active', ?, ?)""",
(single["name"], single["rss_url"], single["domain"],
single["source_type"], single["category"], current_user["username"], tenant_id),
)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,))
row = await cursor.fetchone()
sources.append(dict(row))
return DiscoverMultiResponse(
domain=single.get("domain", domain),
category=single.get("category", category),
added_count=len(sources),
skipped_count=1 if not sources and single.get("rss_url") else 0,
total_found=1 if single.get("rss_url") else 0,
sources=sources,
fallback_single=True,
)
relevant_feeds = await evaluate_feeds_with_claude(domain, multi["feeds"])
cursor = await db.execute("SELECT url FROM sources WHERE url IS NOT NULL")
existing_urls = {row["url"] for row in await cursor.fetchall()}
new_ids = []
skipped = 0
for feed in relevant_feeds:
if feed["url"] in existing_urls:
skipped += 1
continue
cursor = await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
VALUES (?, ?, ?, 'rss_feed', ?, 'active', ?, ?)""",
(feed["name"], feed["url"], domain, category, current_user["username"], tenant_id),
)
new_ids.append(cursor.lastrowid)
existing_urls.add(feed["url"])
cursor = await db.execute(
"SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'web_source'",
(domain.lower(),),
)
if not await cursor.fetchone():
cursor = await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
VALUES (?, ?, ?, 'web_source', ?, 'active', ?, ?)""",
(domain_to_display_name(domain), f"https://{domain}", domain, category, current_user["username"], tenant_id),
)
new_ids.append(cursor.lastrowid)
await db.commit()
added_sources = []
if new_ids:
placeholders = ",".join("?" for _ in new_ids)
cursor = await db.execute(
f"SELECT * FROM sources WHERE id IN ({placeholders}) ORDER BY id",
new_ids,
)
added_sources = [dict(row) for row in await cursor.fetchall()]
return DiscoverMultiResponse(
domain=domain,
category=category,
added_count=len(added_sources),
skipped_count=skipped,
total_found=len(multi["feeds"]),
sources=added_sources,
fallback_single=False,
)
except Exception as e:
logger.error(f"Multi-Discovery fehlgeschlagen: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Multi-Discovery fehlgeschlagen")
@router.post("/rediscover-existing")
async def rediscover_existing_endpoint(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Einmalige Migration: Bestehende RSS-Quellen nach zusaetzlichen Feeds durchsuchen."""
tenant_id = current_user.get("tenant_id")
try:
cursor = await db.execute(
"SELECT * FROM sources WHERE source_type = 'rss_feed' AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?)",
(tenant_id,),
)
existing_sources = [dict(row) for row in await cursor.fetchall()]
domains = defaultdict(list)
for src in existing_sources:
if src["domain"]:
domains[src["domain"]].append(src)
cursor = await db.execute("SELECT url FROM sources WHERE url IS NOT NULL")
existing_urls = {row["url"] for row in await cursor.fetchall()}
domains_processed = 0
feeds_added = 0
feeds_skipped = 0
for domain, sources in domains.items():
domains_processed += 1
base_url = f"https://{domain}"
try:
multi = await discover_all_feeds(base_url)
if not multi["feeds"]:
continue
relevant_feeds = await evaluate_feeds_with_claude(domain, multi["feeds"])
category = _detect_category(domain)
for feed in relevant_feeds:
if feed["url"] in existing_urls:
feeds_skipped += 1
continue
await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
VALUES (?, ?, ?, 'rss_feed', ?, 'active', ?, ?)""",
(feed["name"], feed["url"], domain, category, current_user["username"], tenant_id),
)
existing_urls.add(feed["url"])
feeds_added += 1
await db.commit()
except Exception as e:
logger.warning(f"Rediscovery fuer {domain} fehlgeschlagen: {e}")
continue
return {
"domains_processed": domains_processed,
"feeds_added": feeds_added,
"feeds_skipped": feeds_skipped,
}
except Exception as e:
logger.error(f"Rediscovery fehlgeschlagen: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Rediscovery fehlgeschlagen")
@router.get("/my-exclusions")
async def get_my_exclusions(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Gibt die vom aktuellen User ausgeschlossenen Domains zurück."""
user_id = current_user["id"]
cursor = await db.execute(
"SELECT domain, notes, created_at FROM user_excluded_domains WHERE user_id = ? ORDER BY domain",
(user_id,),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
@router.post("/block-domain")
async def block_domain(
data: DomainActionRequest,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Domain fuer den aktuellen User ausschließen (per-User, nicht org-weit)."""
user_id = current_user["id"]
domain = data.domain.lower().strip()
# Pruefen ob bereits ausgeschlossen
cursor = await db.execute(
"SELECT id FROM user_excluded_domains WHERE user_id = ? AND domain = ?",
(user_id, domain),
)
existing = await cursor.fetchone()
if existing:
if data.notes:
await db.execute(
"UPDATE user_excluded_domains SET notes = ? WHERE id = ?",
(data.notes, existing["id"]),
)
await db.commit()
return {"domain": domain, "status": "already_excluded"}
await db.execute(
"INSERT INTO user_excluded_domains (user_id, domain, notes) VALUES (?, ?, ?)",
(user_id, domain, data.notes),
)
await db.commit()
return {"domain": domain, "status": "excluded"}
@router.post("/unblock-domain")
async def unblock_domain(
data: DomainActionRequest,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Domain-Ausschluss fuer den aktuellen User aufheben."""
user_id = current_user["id"]
domain = data.domain.lower().strip()
cursor = await db.execute(
"DELETE FROM user_excluded_domains WHERE user_id = ? AND domain = ?",
(user_id, domain),
)
removed = cursor.rowcount
await db.commit()
return {"domain": domain, "removed": removed > 0}
@router.delete("/domain/{domain}")
async def delete_domain(
domain: str,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Alle Quellen einer Domain loeschen (nur org-eigene, nicht globale)."""
tenant_id = current_user.get("tenant_id")
domain_lower = domain.lower().strip()
cursor = await db.execute(
"SELECT * FROM sources WHERE LOWER(domain) = ? AND tenant_id = ?",
(domain_lower, tenant_id),
)
rows = await cursor.fetchall()
if not rows:
raise HTTPException(status_code=404, detail="Keine Quellen fuer diese Domain gefunden")
username = current_user["username"]
for row in rows:
source = dict(row)
if source["added_by"] == "system":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Domain enthaelt System-Quellen, die nicht geloescht werden koennen",
)
if source["added_by"] and source["added_by"] != username:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Domain enthaelt Quellen anderer Nutzer",
)
await db.execute(
"DELETE FROM sources WHERE LOWER(domain) = ? AND tenant_id = ?",
(domain_lower, tenant_id),
)
await db.commit()
return {
"domain": domain_lower,
"deleted_count": len(rows),
}
@router.post("", response_model=SourceResponse, status_code=status.HTTP_201_CREATED)
async def create_source(
data: SourceCreate,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Neue Quelle hinzufuegen (org-spezifisch)."""
tenant_id = current_user.get("tenant_id")
# Domain normalisieren (Subdomain-Aliase auflösen, aus URL extrahieren)
domain = data.domain
if not domain and data.url:
domain = _extract_domain(data.url)
if domain:
domain = _DOMAIN_ALIASES.get(domain.lower(), domain.lower())
# Duplikat-Prüfung 1: gleiche URL bereits vorhanden? (tenant-übergreifend)
if data.url:
cursor = await db.execute(
"SELECT id, name FROM sources WHERE url = ? AND status = 'active'",
(data.url,),
)
existing = await cursor.fetchone()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Feed-URL bereits vorhanden: {existing['name']} (ID {existing['id']})",
)
# Duplikat-Prüfung 2: Domain bereits vorhanden? (tenant-übergreifend)
if domain:
cursor = await db.execute(
"SELECT id, name, source_type FROM sources WHERE LOWER(domain) = ? AND status = 'active' AND (tenant_id IS NULL OR tenant_id = ?) LIMIT 1",
(domain.lower(), tenant_id),
)
domain_existing = await cursor.fetchone()
if domain_existing:
if data.source_type == "web_source":
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Web-Quelle für '{domain}' bereits vorhanden: {domain_existing['name']}",
)
if not data.url:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Domain '{domain}' bereits als Quelle vorhanden: {domain_existing['name']}. Für einen neuen RSS-Feed bitte die Feed-URL angeben.",
)
payload = data.model_dump(exclude_unset=True)
cols = ["name", "url", "domain", "source_type", "category", "status", "notes",
"language", "bias", "added_by", "tenant_id"]
vals = [
data.name,
data.url,
domain,
data.source_type,
data.category,
data.status,
data.notes,
payload.get("language"),
payload.get("bias"),
current_user["username"],
tenant_id,
]
placeholders = ", ".join(["?"] * len(vals))
cursor = await db.execute(
f"INSERT INTO sources ({', '.join(cols)}) VALUES ({placeholders})",
vals,
)
new_id = cursor.lastrowid
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (new_id,))
row = await cursor.fetchone()
result = dict(row)
result["is_global"] = result.get("tenant_id") is None
result["state_affiliated"] = bool(result.get("state_affiliated"))
alignments_map = await _load_alignments_for(db, [new_id])
result["alignments"] = alignments_map.get(new_id, [])
return result
@router.put("/{source_id}", response_model=SourceResponse)
async def update_source(
source_id: int,
data: SourceUpdate,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Quelle bearbeiten."""
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
if row["tenant_id"] is None:
raise HTTPException(status_code=403, detail="Grundquellen koennen nur ueber die Verwaltung bearbeitet werden")
_check_source_ownership(dict(row), current_user["username"])
payload = data.model_dump(exclude_unset=True)
updates = {}
for field, value in payload.items():
if field not in SOURCE_UPDATE_COLUMNS:
continue
if field == "domain" and value:
value = _DOMAIN_ALIASES.get(value.lower(), value.lower())
updates[field] = value
if updates:
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [source_id]
await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
row = await cursor.fetchone()
result = dict(row)
result["is_global"] = result.get("tenant_id") is None
result["state_affiliated"] = bool(result.get("state_affiliated"))
alignments_map = await _load_alignments_for(db, [source_id])
result["alignments"] = alignments_map.get(source_id, [])
return result
@router.delete("/{source_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_source(
source_id: int,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Quelle loeschen."""
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
row = await cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
if row["tenant_id"] is None:
raise HTTPException(status_code=403, detail="Grundquellen koennen nicht geloescht werden")
_check_source_ownership(dict(row), current_user["username"])
await db.execute("DELETE FROM sources WHERE id = ?", (source_id,))
await db.commit()
@router.post("/telegram/validate")
async def validate_telegram_channel(
data: dict,
current_user: dict = Depends(get_current_user),
):
"""Prueft ob ein Telegram-Kanal erreichbar ist und gibt Kanalinfo zurueck."""
channel_id = data.get("channel_id", "").strip()
if not channel_id:
raise HTTPException(status_code=400, detail="channel_id ist erforderlich")
try:
from feeds.telegram_parser import TelegramParser
parser = TelegramParser()
result = await parser.validate_channel(channel_id)
if result:
return result
raise HTTPException(status_code=404, detail="Kanal nicht erreichbar oder nicht gefunden")
except HTTPException:
raise
except Exception as e:
logger.error("Telegram-Validierung fehlgeschlagen: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail="Telegram-Validierung fehlgeschlagen")
@router.post("/x/validate")
async def validate_x_account(
data: dict,
current_user: dict = Depends(get_current_user),
):
"""Prueft ob ein X-Account (Twitter) erreichbar ist und gibt Account-Info zurueck."""
handle = data.get("handle", "").strip()
if not handle:
raise HTTPException(status_code=400, detail="handle ist erforderlich")
try:
from feeds.x_parser import XParser
parser = XParser()
result = await parser.validate_account(handle)
if result:
return result
raise HTTPException(status_code=404, detail="X-Account nicht erreichbar oder nicht gefunden")
except HTTPException:
raise
except Exception as e:
logger.error("X-Validierung fehlgeschlagen: %s", e, exc_info=True)
raise HTTPException(status_code=500, detail="X-Validierung fehlgeschlagen")
@router.post("/refresh-counts")
async def trigger_refresh_counts(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
"""Artikelzaehler fuer alle Quellen neu berechnen."""
await refresh_source_counts(db)
return {"status": "ok"}
# --- PDF-Upload (Kundenquelle vom Typ pdf_document) ---
# Analog zum Verwaltungs-Upload, aber tenant-spezifisch.
# Datei landet unter <dirname(DB_PATH)>/pdfs/{sha256}.pdf.
# Der Worker (services.pdf_ingest) verarbeitet sie asynchron im Minutentakt.
MAX_PDF_SIZE_BYTES = 50 * 1024 * 1024 # 50 MB
PDF_DIR = os.path.join(os.path.dirname(os.path.abspath(DB_PATH)), "pdfs")
def _pdf_dir() -> str:
os.makedirs(PDF_DIR, exist_ok=True)
return PDF_DIR
@router.post("/upload-pdf", status_code=status.HTTP_201_CREATED)
async def upload_pdf_source(
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
file: UploadFile = File(...),
name: Optional[str] = Form(None),
category: str = Form("sonstige"),
language: Optional[str] = Form(None),
notes: Optional[str] = Form(None),
):
"""PDF hochladen + als Kundenquelle (source_type=pdf_document) registrieren.
Idempotent ueber SHA256 innerhalb des Tenants: doppelter Upload erzeugt 409.
"""
head = await file.read(8)
if not head.startswith(b"%PDF-"):
raise HTTPException(status_code=415, detail="Datei ist kein gueltiges PDF")
tenant_id = current_user.get("tenant_id")
sha = hashlib.sha256()
sha.update(head)
total = len(head)
tmp_path = os.path.join(_pdf_dir(), f".upload-{uuid.uuid4().hex}.tmp")
try:
with open(tmp_path, "wb") as out:
out.write(head)
while True:
chunk = await file.read(1024 * 1024)
if not chunk:
break
total += len(chunk)
if total > MAX_PDF_SIZE_BYTES:
raise HTTPException(status_code=413, detail=f"PDF ueberschreitet {MAX_PDF_SIZE_BYTES // 1024 // 1024} MB")
sha.update(chunk)
out.write(chunk)
sha_hex = sha.hexdigest()
final_path = os.path.join(_pdf_dir(), f"{sha_hex}.pdf")
rel_path = os.path.join("pdfs", f"{sha_hex}.pdf")
# Duplikat-Pruefung innerhalb des Tenants (oder global, falls eine
# gleiche PDF bereits als Grundquelle existiert -> dann sichtbar fuer alle).
cursor = await db.execute(
"SELECT id, name, tenant_id FROM sources WHERE pdf_sha256 = ? "
"AND (tenant_id IS NULL OR tenant_id = ?)",
(sha_hex, tenant_id),
)
existing = await cursor.fetchone()
if existing:
os.unlink(tmp_path)
scope = "global" if existing["tenant_id"] is None else "Ihrer Organisation"
raise HTTPException(
status_code=409,
detail=f"PDF bereits in {scope} vorhanden als Quelle '{existing['name']}' (id={existing['id']})",
)
if not os.path.exists(final_path):
os.replace(tmp_path, final_path)
else:
os.unlink(tmp_path)
except HTTPException:
if os.path.exists(tmp_path):
try: os.unlink(tmp_path)
except OSError: pass
raise
except Exception as e:
if os.path.exists(tmp_path):
try: os.unlink(tmp_path)
except OSError: pass
logger.exception("PDF-Upload (tenant) fehlgeschlagen")
raise HTTPException(status_code=500, detail=f"PDF-Upload fehlgeschlagen: {e}")
display_name = (name or "").strip() or re.sub(r"\.pdf$", "", file.filename or "PDF", flags=re.I)
display_name = display_name[:200]
cursor = await db.execute(
"""INSERT INTO sources
(name, url, domain, source_type, category, status, notes, language,
pdf_path, pdf_sha256, added_by, tenant_id)
VALUES (?, NULL, NULL, 'pdf_document', ?, 'active', ?, ?, ?, ?, ?, ?)""",
(display_name, category, notes, language, rel_path, sha_hex,
current_user["username"], tenant_id),
)
src_id = cursor.lastrowid
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (src_id,))
row = await cursor.fetchone()
result = dict(row)
result["is_global"] = result.get("tenant_id") is None
result["state_affiliated"] = bool(result.get("state_affiliated"))
result["alignments"] = []
return result