Echter Bottleneck war der DOM-Render von 519 Tabellen-Zeilen, nicht
das Backend (45ms). Backend-Slim und Cache aus dem letzten Commit
haben Bandbreite und wiederholte Klicks beschleunigt, aber der erste
Klick blieb langsam, weil weiterhin alle 519 Items in einem
innerHTML-Schub gerendert wurden.
Lösung: Server-Side-Pagination.
Backend (/api/sources/health):
- Neue Query-Param: limit (default 100, max 5000), offset (default 0)
- Counters errors/warnings/ok/total_checks aus separater GROUP-BY-
Aggregat-Query über den GESAMTEN Bestand, nicht über die Page.
- Neues Feld all_orgs in der Antwort: alle Tenants mit Health-Checks,
damit das Filter-Dropdown auch im Pagination-Modus die volle
Org-Liste hat.
- Neue Felder limit, offset, has_more.
Frontend (source-health.js):
- healthLoadLimit (default 100), wird durch loadMoreHealth() um 200
hochgesetzt oder durch loadAllHealth() auf alles gesetzt.
- Cache-Key beinhaltet jetzt auch das aktuelle Limit, damit beim
Mehr-laden nicht aus altem Cache bedient wird.
- Org-Liste kommt aus healthData.all_orgs statt aus den geladenen
Page-Items, sonst wäre sie nach Pagination unvollständig.
- Footer mit zwei Buttons ("+200 laden", "Alle N weiteren laden")
unter der Tabelle, nur sichtbar bei has_more=true.
- Counter-Anzeige: "X / Y angezeigt (von Z insgesamt)".
Cache-Buster für source-health.js auf 20260509f gebumpt.
1099 Zeilen
40 KiB
Python
1099 Zeilen
40 KiB
Python
"""Grundquellen-Verwaltung und Kundenquellen-Übersicht."""
|
|
import logging
|
|
import uuid
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional
|
|
import aiosqlite
|
|
|
|
from auth import get_current_admin
|
|
from database import db_dependency
|
|
from audit import log_action, get_client_ip
|
|
from source_meta import get_meta
|
|
from config import HEALTH_CHECK_USER_AGENT, HEALTH_CHECK_TIMEOUT_S
|
|
from shared.source_rules import (
|
|
discover_source,
|
|
discover_all_feeds,
|
|
evaluate_feeds_with_claude,
|
|
domain_to_display_name,
|
|
)
|
|
|
|
logger = logging.getLogger("verwaltung.sources")
|
|
|
|
router = APIRouter(prefix="/api/sources", tags=["sources"])
|
|
|
|
SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes", "language", "bias", "fetch_strategy"}
|
|
|
|
|
|
@router.get("/meta")
|
|
async def get_sources_meta(admin: dict = Depends(get_current_admin)):
|
|
"""Liefert Kategorien und Typen als Single Source of Truth.
|
|
|
|
Frontend lädt das beim Init und befüllt damit Filter-Dropdowns + Label-Lookups.
|
|
Damit gibt es keine hardcoded Listen mehr im JS/HTML.
|
|
"""
|
|
return get_meta()
|
|
|
|
|
|
|
|
class GlobalSourceCreate(BaseModel):
|
|
name: str = Field(min_length=1, max_length=200)
|
|
url: Optional[str] = None
|
|
domain: Optional[str] = None
|
|
source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$")
|
|
category: str = Field(default="sonstige")
|
|
status: str = Field(default="active", pattern="^(active|inactive)$")
|
|
notes: Optional[str] = None
|
|
language: Optional[str] = Field(default=None, max_length=100)
|
|
bias: Optional[str] = Field(default=None, max_length=500)
|
|
fetch_strategy: Optional[str] = Field(default="default", pattern="^(default|googlebot|paywall|skip)$")
|
|
|
|
|
|
class GlobalSourceUpdate(BaseModel):
|
|
name: Optional[str] = Field(default=None, max_length=200)
|
|
url: Optional[str] = None
|
|
domain: Optional[str] = None
|
|
source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$")
|
|
category: Optional[str] = None
|
|
status: Optional[str] = Field(default=None, pattern="^(active|inactive)$")
|
|
notes: Optional[str] = None
|
|
language: Optional[str] = Field(default=None, max_length=100)
|
|
bias: Optional[str] = Field(default=None, max_length=500)
|
|
|
|
|
|
@router.get("/global")
|
|
async def list_global_sources(
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Alle Grundquellen auflisten (tenant_id IS NULL).
|
|
|
|
Liefert pro Quelle den worst-case Health-Status aus source_health_checks
|
|
(error > warning > ok > unknown). Damit kann das Frontend ein Inline-Badge
|
|
pro Zeile zeigen, ohne separate Health-Tab-Abfrage.
|
|
"""
|
|
cursor = await db.execute("""
|
|
WITH article_stats AS (
|
|
-- Match per source-Name (case-insensitive). source_url im articles ist die
|
|
-- Artikel-URL, nicht die Feed-URL - daher matcht das nicht mit sources.url.
|
|
SELECT LOWER(source) AS s_lower,
|
|
SUM(CASE WHEN collected_at > datetime('now', '-7 days') THEN 1 ELSE 0 END) AS a7d,
|
|
SUM(CASE WHEN collected_at > datetime('now', '-30 days') THEN 1 ELSE 0 END) AS a30d
|
|
FROM articles
|
|
WHERE collected_at > datetime('now', '-30 days')
|
|
AND source IS NOT NULL
|
|
GROUP BY LOWER(source)
|
|
),
|
|
excluded_counts AS (
|
|
SELECT LOWER(ued.domain) AS dom,
|
|
COUNT(DISTINCT u.organization_id) AS cnt
|
|
FROM user_excluded_domains ued
|
|
JOIN users u ON u.id = ued.user_id
|
|
WHERE ued.domain IS NOT NULL
|
|
GROUP BY LOWER(ued.domain)
|
|
),
|
|
health_agg AS (
|
|
SELECT source_id,
|
|
MAX(CASE WHEN status = 'error' THEN 3
|
|
WHEN status = 'warning' THEN 2
|
|
WHEN status = 'ok' THEN 1
|
|
ELSE 0 END) AS rank
|
|
FROM source_health_checks
|
|
GROUP BY source_id
|
|
)
|
|
SELECT s.*,
|
|
CASE ha.rank
|
|
WHEN 3 THEN 'error'
|
|
WHEN 2 THEN 'warning'
|
|
WHEN 1 THEN 'ok'
|
|
ELSE NULL
|
|
END AS health_status,
|
|
COALESCE(ast.a7d, 0) AS articles_7d,
|
|
COALESCE(ast.a30d, 0) AS articles_30d,
|
|
COALESCE(ec.cnt, 0) AS tenant_excluded_count
|
|
FROM sources s
|
|
LEFT JOIN article_stats ast ON ast.s_lower = LOWER(s.name)
|
|
LEFT JOIN excluded_counts ec ON ec.dom = LOWER(s.domain)
|
|
LEFT JOIN health_agg ha ON ha.source_id = s.id
|
|
WHERE s.tenant_id IS NULL
|
|
ORDER BY s.category, s.source_type, s.name
|
|
""")
|
|
return [dict(row) for row in await cursor.fetchall()]
|
|
|
|
|
|
@router.post("/global", status_code=201)
|
|
async def create_global_source(
|
|
data: GlobalSourceCreate,
|
|
request: Request,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Neue Grundquelle anlegen."""
|
|
if data.url:
|
|
cursor = await db.execute(
|
|
"SELECT id, name FROM sources WHERE url = ? AND tenant_id IS NULL",
|
|
(data.url,),
|
|
)
|
|
existing = await cursor.fetchone()
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail=f"URL bereits vorhanden: {existing['name']}",
|
|
)
|
|
|
|
cursor = await db.execute(
|
|
"""INSERT INTO sources (name, url, domain, source_type, category, status, notes, language, bias, fetch_strategy, added_by, tenant_id)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'system', NULL)""",
|
|
(data.name, data.url, data.domain, data.source_type, data.category, data.status, data.notes,
|
|
data.language, data.bias, data.fetch_strategy or "default"),
|
|
)
|
|
src_id = cursor.lastrowid
|
|
await db.commit()
|
|
|
|
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (src_id,))
|
|
new_src = dict(await cursor.fetchone())
|
|
await log_action(
|
|
db, admin, get_client_ip(request),
|
|
action="create", resource_type="source", resource_id=src_id,
|
|
after=new_src,
|
|
)
|
|
return new_src
|
|
|
|
|
|
@router.put("/global/{source_id}")
|
|
async def update_global_source(
|
|
source_id: int,
|
|
data: GlobalSourceUpdate,
|
|
request: Request,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Grundquelle bearbeiten."""
|
|
cursor = await db.execute(
|
|
"SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden")
|
|
before = dict(row)
|
|
|
|
updates = {}
|
|
for field, value in data.model_dump(exclude_none=True).items():
|
|
updates[field] = value
|
|
|
|
if not updates:
|
|
return before
|
|
|
|
set_clause = ", ".join(f"{k} = ?" for k in updates)
|
|
values = list(updates.values()) + [source_id]
|
|
await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values)
|
|
await db.commit()
|
|
|
|
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
|
|
after = dict(await cursor.fetchone())
|
|
await log_action(
|
|
db, admin, get_client_ip(request),
|
|
action="update", resource_type="source", resource_id=source_id,
|
|
before=before, after=after,
|
|
)
|
|
return after
|
|
|
|
|
|
@router.delete("/global/{source_id}", status_code=204)
|
|
async def delete_global_source(
|
|
source_id: int,
|
|
request: Request,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Grundquelle loeschen."""
|
|
cursor = await db.execute(
|
|
"SELECT * FROM sources WHERE id = ? AND tenant_id IS NULL", (source_id,)
|
|
)
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Grundquelle nicht gefunden")
|
|
before = dict(row)
|
|
|
|
await db.execute("DELETE FROM sources WHERE id = ?", (source_id,))
|
|
await db.commit()
|
|
await log_action(
|
|
db, admin, get_client_ip(request),
|
|
action="delete", resource_type="source", resource_id=source_id,
|
|
before=before,
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/global/languages")
|
|
async def get_global_languages(
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Distinct language-Werte aus Grundquellen - für Frontend-Filter-Dropdown."""
|
|
cur = await db.execute("""
|
|
SELECT DISTINCT language
|
|
FROM sources
|
|
WHERE tenant_id IS NULL AND language IS NOT NULL AND language != ''
|
|
ORDER BY language
|
|
""")
|
|
return [r["language"] for r in await cur.fetchall()]
|
|
|
|
|
|
@router.get("/global/stats")
|
|
async def get_global_stats(
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Aggregierte Stats für die Grundquellen-Stats-Bar oben im Tab."""
|
|
cur = await db.execute("""
|
|
SELECT source_type, COUNT(*) AS count, COALESCE(SUM(article_count), 0) AS articles
|
|
FROM sources
|
|
WHERE tenant_id IS NULL AND status = 'active'
|
|
GROUP BY source_type
|
|
""")
|
|
by_type = {}
|
|
total = 0
|
|
total_articles = 0
|
|
for r in await cur.fetchall():
|
|
d = dict(r)
|
|
by_type[d["source_type"]] = {"count": d["count"], "articles": d["articles"]}
|
|
total += d["count"]
|
|
total_articles += d["articles"]
|
|
|
|
# Health-Counter
|
|
health = {"errors": 0, "warnings": 0, "ok": 0}
|
|
cur = await db.execute("""
|
|
SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'
|
|
""")
|
|
if await cur.fetchone():
|
|
cur = await db.execute("""
|
|
SELECT h.status AS hs, COUNT(DISTINCT h.source_id) AS cnt
|
|
FROM source_health_checks h
|
|
JOIN sources s ON s.id = h.source_id
|
|
WHERE s.tenant_id IS NULL AND s.status = 'active'
|
|
GROUP BY h.status
|
|
""")
|
|
for r in await cur.fetchall():
|
|
d = dict(r)
|
|
if d["hs"] == "error":
|
|
health["errors"] = d["cnt"]
|
|
elif d["hs"] == "warning":
|
|
health["warnings"] = d["cnt"]
|
|
elif d["hs"] == "ok":
|
|
health["ok"] = d["cnt"]
|
|
|
|
return {
|
|
"by_type": by_type,
|
|
"total": total,
|
|
"total_articles": total_articles,
|
|
"health": health,
|
|
}
|
|
|
|
|
|
@router.get("/tenant")
|
|
async def list_tenant_sources(
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Alle tenant-spezifischen Quellen mit Org-Name auflisten."""
|
|
cursor = await db.execute("""
|
|
SELECT s.*, o.name as org_name
|
|
FROM sources s
|
|
LEFT JOIN organizations o ON o.id = s.tenant_id
|
|
WHERE s.tenant_id IS NOT NULL
|
|
ORDER BY o.name, s.category, s.name
|
|
""")
|
|
return [dict(row) for row in await cursor.fetchall()]
|
|
|
|
|
|
@router.post("/tenant/{source_id}/promote")
|
|
async def promote_to_global(
|
|
source_id: int,
|
|
request: Request,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Tenant-Quelle zur Grundquelle befoerdern."""
|
|
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
|
|
row = await cursor.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
|
|
if row["tenant_id"] is None:
|
|
raise HTTPException(status_code=400, detail="Bereits eine Grundquelle")
|
|
before = dict(row)
|
|
|
|
if row["url"]:
|
|
cursor = await db.execute(
|
|
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL",
|
|
(row["url"],),
|
|
)
|
|
if await cursor.fetchone():
|
|
raise HTTPException(status_code=409, detail="URL bereits als Grundquelle vorhanden")
|
|
|
|
await db.execute(
|
|
"UPDATE sources SET tenant_id = NULL, added_by = 'system' WHERE id = ?",
|
|
(source_id,),
|
|
)
|
|
await db.commit()
|
|
|
|
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
|
|
after = dict(await cursor.fetchone())
|
|
await log_action(
|
|
db, admin, get_client_ip(request),
|
|
action="update", resource_type="source", resource_id=source_id,
|
|
before=before, after=after,
|
|
)
|
|
return after
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BulkPromoteRequest(BaseModel):
|
|
source_ids: list[int]
|
|
|
|
|
|
@router.post("/tenant/bulk-promote")
|
|
async def bulk_promote_tenant_sources(
|
|
data: BulkPromoteRequest,
|
|
request: Request,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Mehrere Tenant-Quellen auf einen Schlag zur Grundquelle befoerdern.
|
|
|
|
Returns:
|
|
promoted: int - Anzahl erfolgreich promoter Quellen
|
|
skipped: list - {id, name, reason} fuer uebersprungene
|
|
failed: list - {id, error} fuer Fehler
|
|
"""
|
|
promoted = 0
|
|
skipped = []
|
|
failed = []
|
|
|
|
for sid in data.source_ids:
|
|
try:
|
|
cur = await db.execute("SELECT * FROM sources WHERE id = ?", (sid,))
|
|
row = await cur.fetchone()
|
|
if not row:
|
|
failed.append({"id": sid, "error": "nicht gefunden"})
|
|
continue
|
|
if row["tenant_id"] is None:
|
|
skipped.append({"id": sid, "name": row["name"], "reason": "bereits Grundquelle"})
|
|
continue
|
|
before = dict(row)
|
|
|
|
if row["url"]:
|
|
cur = await db.execute(
|
|
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL",
|
|
(row["url"],),
|
|
)
|
|
if await cur.fetchone():
|
|
skipped.append({"id": sid, "name": row["name"],
|
|
"reason": "URL bereits als Grundquelle vorhanden"})
|
|
continue
|
|
|
|
await db.execute(
|
|
"UPDATE sources SET tenant_id = NULL, added_by = 'system' WHERE id = ?",
|
|
(sid,),
|
|
)
|
|
cur = await db.execute("SELECT * FROM sources WHERE id = ?", (sid,))
|
|
after = dict(await cur.fetchone())
|
|
await log_action(
|
|
db, admin, get_client_ip(request),
|
|
action="update", resource_type="source", resource_id=sid,
|
|
before=before, after=after,
|
|
)
|
|
promoted += 1
|
|
except Exception as e:
|
|
failed.append({"id": sid, "error": str(e)})
|
|
|
|
await db.commit()
|
|
return {"promoted": promoted, "skipped": skipped, "failed": failed}
|
|
|
|
|
|
@router.post("/discover")
|
|
async def discover_source_endpoint(
|
|
url: str,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""URL analysieren: Domain, Kategorie und RSS-Feeds automatisch erkennen.
|
|
|
|
Findet alle Feeds einer Domain, bewertet sie mit Claude und gibt
|
|
die relevanten zurueck. Prueft auf bereits vorhandene Grundquellen.
|
|
"""
|
|
try:
|
|
multi = await discover_all_feeds(url)
|
|
except Exception as e:
|
|
logger.error(f"Discovery fehlgeschlagen: {e}", exc_info=True)
|
|
raise HTTPException(status_code=500, detail="Discovery fehlgeschlagen")
|
|
|
|
domain = multi["domain"]
|
|
category = multi["category"]
|
|
feeds = multi.get("feeds", [])
|
|
|
|
# Fallback auf Einzel-Discovery wenn keine Feeds gefunden
|
|
if not feeds:
|
|
try:
|
|
single = await discover_source(url)
|
|
if single.get("rss_url"):
|
|
feeds = [{"name": single["name"], "url": single["rss_url"]}]
|
|
domain = single.get("domain", domain)
|
|
category = single.get("category", category)
|
|
except Exception:
|
|
pass
|
|
|
|
if not feeds:
|
|
return {
|
|
"domain": domain,
|
|
"category": category,
|
|
"feeds": [],
|
|
"existing": [],
|
|
"message": "Keine RSS-Feeds gefunden",
|
|
}
|
|
|
|
# Mit Claude bewerten
|
|
try:
|
|
relevant_feeds = await evaluate_feeds_with_claude(domain, feeds)
|
|
except Exception:
|
|
relevant_feeds = feeds[:3]
|
|
|
|
# Bereits vorhandene Grundquellen pruefen
|
|
cursor = await db.execute(
|
|
"SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL"
|
|
)
|
|
existing_urls = {row["url"] for row in await cursor.fetchall()}
|
|
|
|
result_feeds = []
|
|
existing = []
|
|
for feed in relevant_feeds:
|
|
info = {
|
|
"name": feed.get("name", domain_to_display_name(domain)),
|
|
"url": feed["url"],
|
|
"domain": domain,
|
|
"category": category,
|
|
}
|
|
if feed["url"] in existing_urls:
|
|
existing.append(info)
|
|
else:
|
|
result_feeds.append(info)
|
|
|
|
return {
|
|
"domain": domain,
|
|
"category": category,
|
|
"feeds": result_feeds,
|
|
"existing": existing,
|
|
}
|
|
|
|
|
|
@router.post("/discover/add")
|
|
async def add_discovered_sources(
|
|
feeds: list[dict],
|
|
request: Request,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Erkannte Feeds als Grundquellen anlegen.
|
|
|
|
Erwartet eine Liste von {name, url, domain, category}.
|
|
Ueberspringt bereits vorhandene URLs.
|
|
"""
|
|
cursor = await db.execute(
|
|
"SELECT url FROM sources WHERE tenant_id IS NULL AND url IS NOT NULL"
|
|
)
|
|
existing_urls = {row["url"] for row in await cursor.fetchall()}
|
|
|
|
added = 0
|
|
skipped = 0
|
|
added_ids = []
|
|
for feed in feeds:
|
|
if not feed.get("url"):
|
|
continue
|
|
if feed["url"] in existing_urls:
|
|
skipped += 1
|
|
continue
|
|
|
|
domain = feed.get("domain", "")
|
|
cur = await db.execute(
|
|
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
|
|
VALUES (?, ?, ?, 'rss_feed', ?, 'active', 'system', NULL)""",
|
|
(feed["name"], feed["url"], domain, feed.get("category", "sonstige")),
|
|
)
|
|
added_ids.append(cur.lastrowid)
|
|
existing_urls.add(feed["url"])
|
|
added += 1
|
|
|
|
# Web-Source für die Domain anlegen wenn noch nicht vorhanden
|
|
if feeds and feeds[0].get("domain"):
|
|
domain = feeds[0]["domain"]
|
|
cursor = await db.execute(
|
|
"SELECT id FROM sources WHERE LOWER(domain) = ? AND source_type = 'web_source' AND tenant_id IS NULL",
|
|
(domain.lower(),),
|
|
)
|
|
if not await cursor.fetchone():
|
|
await db.execute(
|
|
"""INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id)
|
|
VALUES (?, ?, ?, 'web_source', ?, 'active', 'system', NULL)""",
|
|
(domain_to_display_name(domain), f"https://{domain}", domain,
|
|
feeds[0].get("category", "sonstige")),
|
|
)
|
|
added += 1
|
|
|
|
await db.commit()
|
|
if added_ids:
|
|
await log_action(
|
|
db, admin, get_client_ip(request),
|
|
action="create", resource_type="source",
|
|
after={"discovered_add": {"count": added, "ids": added_ids,
|
|
"domain": feeds[0].get("domain") if feeds else None}},
|
|
)
|
|
return {"added": added, "skipped": skipped}
|
|
|
|
|
|
|
|
# --- Health-Check & Vorschläge ---
|
|
|
|
@router.get("/health")
|
|
async def get_health(
|
|
limit: int = 100,
|
|
offset: int = 0,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Health-Check-Ergebnisse abrufen.
|
|
|
|
Default-Limit 100, sortiert nach Status (errors first, dann warnings, dann ok).
|
|
Counters (errors/warnings/ok/total_checks) beziehen sich auf den GESAMTEN
|
|
Datenbestand, nicht nur auf die zurückgegebene Page. Damit kann das Frontend
|
|
den vollen Status anzeigen, ohne alle Zeilen rendern zu müssen.
|
|
has_more zeigt an, ob es weitere Items zum Nachladen gibt.
|
|
all_orgs liefert die Liste aller Tenants mit Health-Checks (für Filter-Dropdown).
|
|
"""
|
|
limit = max(1, min(int(limit or 100), 5000))
|
|
offset = max(0, int(offset or 0))
|
|
|
|
# Prüfen ob Tabelle existiert
|
|
cursor = await db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_checks'"
|
|
)
|
|
if not await cursor.fetchone():
|
|
return {
|
|
"last_check": None, "total_checks": 0,
|
|
"errors": 0, "warnings": 0, "ok": 0,
|
|
"checks": [], "all_orgs": [],
|
|
"limit": limit, "offset": offset, "has_more": False,
|
|
}
|
|
|
|
# Aggregate über GESAMTEN Bestand (eine GROUP-BY-Query, billig)
|
|
cursor = await db.execute(
|
|
"SELECT status, COUNT(*) AS n FROM source_health_checks GROUP BY status"
|
|
)
|
|
counts = {row["status"]: row["n"] for row in await cursor.fetchall()}
|
|
error_count = counts.get("error", 0)
|
|
warning_count = counts.get("warning", 0)
|
|
ok_count = counts.get("ok", 0)
|
|
total_checks = error_count + warning_count + ok_count
|
|
|
|
# Paginierte Daten
|
|
cursor = await db.execute("""
|
|
SELECT
|
|
h.source_id, s.name, s.domain, s.tenant_id, s.language,
|
|
o.name AS org_name,
|
|
h.check_type, h.status, h.message
|
|
FROM source_health_checks h
|
|
JOIN sources s ON s.id = h.source_id
|
|
LEFT JOIN organizations o ON o.id = s.tenant_id
|
|
ORDER BY
|
|
CASE h.status WHEN 'error' THEN 0 WHEN 'warning' THEN 1 ELSE 2 END,
|
|
s.name
|
|
LIMIT ? OFFSET ?
|
|
""", (limit, offset))
|
|
checks = [dict(row) for row in await cursor.fetchall()]
|
|
|
|
# Org-Liste (alle Tenants mit Health-Checks, für Frontend-Filter-Dropdown)
|
|
cursor = await db.execute("""
|
|
SELECT DISTINCT s.tenant_id AS id, o.name AS name
|
|
FROM source_health_checks h
|
|
JOIN sources s ON s.id = h.source_id
|
|
LEFT JOIN organizations o ON o.id = s.tenant_id
|
|
WHERE s.tenant_id IS NOT NULL
|
|
ORDER BY o.name
|
|
""")
|
|
all_orgs = [dict(row) for row in await cursor.fetchall()]
|
|
|
|
cursor = await db.execute("SELECT MAX(checked_at) as last_check FROM source_health_checks")
|
|
row = await cursor.fetchone()
|
|
last_check = row["last_check"] if row else None
|
|
|
|
return {
|
|
"last_check": last_check,
|
|
"total_checks": total_checks,
|
|
"errors": error_count,
|
|
"warnings": warning_count,
|
|
"ok": ok_count,
|
|
"checks": checks,
|
|
"all_orgs": all_orgs,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"has_more": (offset + len(checks)) < total_checks,
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/health/history")
|
|
async def get_health_history(
|
|
limit: int = 20,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Liefert die letzten N Health-Check-Runs aus source_health_history.
|
|
|
|
Pro Run: run_id, archived_at (Run-Zeitpunkt), Counts pro Status.
|
|
"""
|
|
cursor = await db.execute("""
|
|
SELECT name FROM sqlite_master WHERE type='table' AND name='source_health_history'
|
|
""")
|
|
if not await cursor.fetchone():
|
|
return []
|
|
|
|
cursor = await db.execute("""
|
|
SELECT run_id,
|
|
MIN(archived_at) AS archived_at,
|
|
COUNT(*) AS total,
|
|
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) AS errors,
|
|
SUM(CASE WHEN status = 'warning' THEN 1 ELSE 0 END) AS warnings,
|
|
SUM(CASE WHEN status = 'ok' THEN 1 ELSE 0 END) AS ok
|
|
FROM source_health_history
|
|
GROUP BY run_id
|
|
ORDER BY archived_at DESC
|
|
LIMIT ?
|
|
""", (max(1, min(limit, 100)),))
|
|
return [dict(row) for row in await cursor.fetchall()]
|
|
|
|
|
|
@router.get("/suggestions")
|
|
async def get_suggestions(
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Alle Vorschläge abrufen (pending zuerst, dann letzte 20 bearbeitete)."""
|
|
cursor = await db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_suggestions'"
|
|
)
|
|
if not await cursor.fetchone():
|
|
return []
|
|
|
|
cursor = await db.execute("""
|
|
SELECT * FROM source_suggestions
|
|
ORDER BY
|
|
CASE status WHEN 'pending' THEN 0 ELSE 1 END,
|
|
created_at DESC
|
|
LIMIT 50
|
|
""")
|
|
return [dict(row) for row in await cursor.fetchall()]
|
|
|
|
|
|
class SuggestionAction(BaseModel):
|
|
accept: bool
|
|
|
|
|
|
@router.put("/suggestions/{suggestion_id}")
|
|
async def update_suggestion(
|
|
suggestion_id: int,
|
|
action: SuggestionAction,
|
|
request: Request,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Vorschlag annehmen oder ablehnen."""
|
|
import json as _json
|
|
|
|
cursor = await db.execute(
|
|
"SELECT * FROM source_suggestions WHERE id = ?", (suggestion_id,)
|
|
)
|
|
suggestion = await cursor.fetchone()
|
|
if not suggestion:
|
|
raise HTTPException(status_code=404, detail="Vorschlag nicht gefunden")
|
|
|
|
suggestion = dict(suggestion)
|
|
if suggestion["status"] != "pending":
|
|
raise HTTPException(status_code=400, detail=f"Vorschlag bereits {suggestion['status']}")
|
|
|
|
new_status = "accepted" if action.accept else "rejected"
|
|
result_action = None
|
|
|
|
if action.accept:
|
|
stype = suggestion["suggestion_type"]
|
|
data = _json.loads(suggestion["suggested_data"]) if suggestion["suggested_data"] else {}
|
|
|
|
if stype == "add_source":
|
|
name = data.get("name", "Unbenannt")
|
|
url = data.get("url")
|
|
domain = data.get("domain", "")
|
|
category = data.get("category", "sonstige")
|
|
source_type = "rss_feed" if url and any(
|
|
x in (url or "").lower() for x in ("rss", "feed", "xml", "atom")
|
|
) else "web_source"
|
|
|
|
if url:
|
|
cursor = await db.execute(
|
|
"SELECT id FROM sources WHERE url = ? AND tenant_id IS NULL", (url,)
|
|
)
|
|
if await cursor.fetchone():
|
|
result_action = "übersprungen (URL bereits vorhanden)"
|
|
new_status = "rejected"
|
|
else:
|
|
await db.execute(
|
|
"INSERT INTO sources (name, url, domain, source_type, category, status, added_by, tenant_id) "
|
|
"VALUES (?, ?, ?, ?, ?, 'active', 'haiku-vorschlag', NULL)",
|
|
(name, url, domain, source_type, category),
|
|
)
|
|
result_action = f"Quelle '{name}' angelegt"
|
|
else:
|
|
result_action = "übersprungen (keine URL)"
|
|
new_status = "rejected"
|
|
|
|
elif stype == "deactivate_source":
|
|
source_id = suggestion["source_id"]
|
|
if source_id:
|
|
await db.execute("UPDATE sources SET status = 'inactive' WHERE id = ?", (source_id,))
|
|
result_action = "Quelle deaktiviert"
|
|
|
|
elif stype == "remove_source":
|
|
source_id = suggestion["source_id"]
|
|
if source_id:
|
|
await db.execute("DELETE FROM sources WHERE id = ?", (source_id,))
|
|
result_action = "Quelle gelöscht"
|
|
|
|
elif stype == "fix_url":
|
|
source_id = suggestion["source_id"]
|
|
new_url = data.get("url")
|
|
if source_id and new_url:
|
|
await db.execute("UPDATE sources SET url = ? WHERE id = ?", (new_url, source_id))
|
|
result_action = "URL aktualisiert"
|
|
|
|
# Auto-Reject: Wenn fix_url oder add_source akzeptiert wird,
|
|
# zugehörige deactivate_source-Vorschläge automatisch ablehnen
|
|
if stype in ("fix_url", "add_source") and suggestion.get("source_id"):
|
|
await db.execute(
|
|
"UPDATE source_suggestions SET status = 'rejected', reviewed_at = CURRENT_TIMESTAMP "
|
|
"WHERE source_id = ? AND suggestion_type = 'deactivate_source' AND status = 'pending' AND id != ?",
|
|
(suggestion["source_id"], suggestion_id),
|
|
)
|
|
|
|
await db.execute(
|
|
"UPDATE source_suggestions SET status = ?, reviewed_at = CURRENT_TIMESTAMP WHERE id = ?",
|
|
(new_status, suggestion_id),
|
|
)
|
|
await db.commit()
|
|
await log_action(
|
|
db, admin, get_client_ip(request),
|
|
action="update", resource_type="source",
|
|
resource_id=suggestion.get("source_id"),
|
|
before={"suggestion_id": suggestion_id, "status": "pending"},
|
|
after={"suggestion_id": suggestion_id, "status": new_status,
|
|
"result_action": result_action},
|
|
)
|
|
return {"status": new_status, "action": result_action}
|
|
|
|
|
|
@router.post("/health/run")
|
|
async def run_health_check_now(
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Health-Check manuell starten."""
|
|
# source_health und source_suggester importieren
|
|
from shared.services.source_health import run_health_checks
|
|
from shared.services.source_suggester import generate_suggestions
|
|
|
|
result = await run_health_checks(db)
|
|
suggestion_count = await generate_suggestions(db)
|
|
|
|
return {
|
|
"checked": result["checked"],
|
|
"issues": result["issues"],
|
|
"suggestions": suggestion_count,
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/health/run-stream")
|
|
async def run_health_check_stream(
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Health-Check mit Fortschrittsanzeige (SSE-Stream)."""
|
|
import json as _json
|
|
import asyncio
|
|
|
|
# Quellen laden
|
|
cursor = await db.execute(
|
|
"SELECT id, name, url, domain, source_type, article_count, last_seen_at "
|
|
"FROM sources WHERE status = 'active'" # tenant + global
|
|
)
|
|
sources = [dict(row) for row in await cursor.fetchall()]
|
|
sources_with_url = [s for s in sources if s["url"]]
|
|
total = len(sources_with_url)
|
|
|
|
async def generate():
|
|
import httpx
|
|
import feedparser
|
|
|
|
# Phase 1: Erreichbarkeit
|
|
yield f"data: {_json.dumps({'phase': 'check', 'checked': 0, 'total': total, 'current': ''})}\n\n"
|
|
|
|
# Bisherigen Stand archivieren, dann frisch
|
|
run_id = uuid.uuid4().hex[:12]
|
|
await db.execute(
|
|
"INSERT INTO source_health_history "
|
|
"(run_id, source_id, check_type, status, message, details, checked_at) "
|
|
"SELECT ?, source_id, check_type, status, message, details, checked_at "
|
|
"FROM source_health_checks",
|
|
(run_id,),
|
|
)
|
|
await db.execute("DELETE FROM source_health_checks")
|
|
await db.commit()
|
|
|
|
issues_found = 0
|
|
checked = 0
|
|
|
|
async with httpx.AsyncClient(
|
|
timeout=HEALTH_CHECK_TIMEOUT_S, follow_redirects=True,
|
|
headers={"User-Agent": HEALTH_CHECK_USER_AGENT},
|
|
) as client:
|
|
for source in sources_with_url:
|
|
try:
|
|
checks = []
|
|
try:
|
|
resp = await client.get(source["url"])
|
|
if resp.status_code >= 400:
|
|
checks.append({"type": "reachability", "status": "error",
|
|
"message": f"HTTP {resp.status_code} - nicht erreichbar"})
|
|
else:
|
|
checks.append({"type": "reachability", "status": "ok", "message": "Erreichbar"})
|
|
if source["source_type"] in ("rss_feed", "podcast_feed"):
|
|
text = resp.text[:20000]
|
|
if "<rss" not in text and "<feed" not in text and "<channel" not in text:
|
|
checks.append({"type": "feed_validity", "status": "error",
|
|
"message": "Kein RSS/Atom-Feed"})
|
|
else:
|
|
feed = await asyncio.to_thread(feedparser.parse, text)
|
|
if feed.get("bozo") and not feed.entries:
|
|
checks.append({"type": "feed_validity", "status": "error",
|
|
"message": "Feed fehlerhaft"})
|
|
elif not feed.entries:
|
|
checks.append({"type": "feed_validity", "status": "warning",
|
|
"message": "Feed leer"})
|
|
else:
|
|
checks.append({"type": "feed_validity", "status": "ok",
|
|
"message": f"Feed OK ({len(feed.entries)} Eintr.)"})
|
|
except httpx.TimeoutException:
|
|
checks.append({"type": "reachability", "status": "error", "message": "Timeout (15s)"})
|
|
except httpx.ConnectError:
|
|
checks.append({"type": "reachability", "status": "error", "message": "Verbindung fehlgeschlagen"})
|
|
except Exception as e:
|
|
checks.append({"type": "reachability", "status": "error", "message": f"{type(e).__name__}"})
|
|
|
|
for c in checks:
|
|
await db.execute(
|
|
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, ?, ?, ?)",
|
|
(source["id"], c["type"], c["status"], c["message"]))
|
|
if c["status"] != "ok":
|
|
issues_found += 1
|
|
except Exception:
|
|
pass
|
|
|
|
checked += 1
|
|
status_icon = "ok"
|
|
if any(c["status"] == "error" for c in checks):
|
|
status_icon = "error"
|
|
elif any(c["status"] == "warning" for c in checks):
|
|
status_icon = "warning"
|
|
|
|
yield f"data: {_json.dumps({'phase': 'check', 'checked': checked, 'total': total, 'current': source['name'], 'status': status_icon})}\n\n"
|
|
|
|
# Stale + Duplikate (schnell, kein Fortschritt noetig)
|
|
for source in sources:
|
|
if source["source_type"] in ("excluded", "web_source"):
|
|
continue
|
|
article_count = source.get("article_count") or 0
|
|
if article_count == 0:
|
|
await db.execute(
|
|
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'stale', 'warning', 'Noch nie Artikel geliefert')",
|
|
(source["id"],))
|
|
issues_found += 1
|
|
elif source.get("last_seen_at"):
|
|
try:
|
|
from datetime import datetime
|
|
last_dt = datetime.fromisoformat(source["last_seen_at"])
|
|
age = (datetime.now() - last_dt).days
|
|
if age > 30:
|
|
await db.execute(
|
|
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'stale', 'warning', ?)",
|
|
(source["id"], f"Letzter Artikel vor {age} Tagen"))
|
|
issues_found += 1
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Duplikate
|
|
url_map = {}
|
|
for s in sources:
|
|
if not s["url"]:
|
|
continue
|
|
url_norm = s["url"].lower().rstrip("/")
|
|
if url_norm in url_map:
|
|
existing = url_map[url_norm]
|
|
await db.execute(
|
|
"INSERT INTO source_health_checks (source_id, check_type, status, message) VALUES (?, 'duplicate', 'warning', ?)",
|
|
(s["id"], f"Doppelte URL wie '{existing['name']}' (ID {existing['id']})"))
|
|
issues_found += 1
|
|
else:
|
|
url_map[url_norm] = s
|
|
|
|
await db.commit()
|
|
|
|
# Phase 2: Vorschlaege
|
|
yield f"data: {_json.dumps({'phase': 'suggestions', 'checked': checked, 'total': total})}\n\n"
|
|
|
|
from shared.services.source_suggester import generate_suggestions
|
|
suggestion_count = await generate_suggestions(db)
|
|
|
|
# Fertig
|
|
yield f"data: {_json.dumps({'phase': 'done', 'checked': checked, 'total': total, 'issues': issues_found, 'suggestions': suggestion_count})}\n\n"
|
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream")
|
|
|
|
|
|
@router.post("/health/search-fix/{source_id}")
|
|
async def search_fix_for_source(
|
|
source_id: int,
|
|
admin: dict = Depends(get_current_admin),
|
|
db: aiosqlite.Connection = Depends(db_dependency),
|
|
):
|
|
"""Sonnet mit WebSearch nach Lösung für eine kaputte Quelle suchen lassen."""
|
|
import json as _json
|
|
|
|
cursor = await db.execute(
|
|
"SELECT id, name, url, domain, source_type, category FROM sources WHERE id = ?",
|
|
(source_id,),
|
|
)
|
|
source = await cursor.fetchone()
|
|
if not source:
|
|
raise HTTPException(status_code=404, detail="Quelle nicht gefunden")
|
|
|
|
source = dict(source)
|
|
|
|
# Health-Check-Probleme für diese Quelle laden
|
|
cursor = await db.execute(
|
|
"SELECT check_type, status, message FROM source_health_checks WHERE source_id = ?",
|
|
(source_id,),
|
|
)
|
|
issues = [dict(row) for row in await cursor.fetchall()]
|
|
issues_text = "\n".join(f"- {i['check_type']}: {i['status']} - {i['message']}" for i in issues)
|
|
|
|
prompt = f"""Du bist ein OSINT-Analyst. Folgende Quelle ist nicht mehr erreichbar:
|
|
|
|
Name: {source['name']}
|
|
URL: {source['url'] or 'keine'}
|
|
Domain: {source['domain'] or 'unbekannt'}
|
|
Typ: {source['source_type']}
|
|
Kategorie: {source['category']}
|
|
|
|
Probleme:
|
|
{issues_text}
|
|
|
|
Aufgabe: Suche im Internet nach funktionierenden Alternativen für diese Quelle.
|
|
- Finde konkrete RSS-Feed-URLs die tatsächlich funktionieren
|
|
- Prüfe ob es alternative Zugangswege gibt (andere Subdomains, Feed-Aggregatoren, alternative URLs)
|
|
- Gibt es eine Lösung oder ist die Quelle nur noch per WebSearch erreichbar?
|
|
|
|
Regeln:
|
|
- Maximal 3 Lösungen vorschlagen (die besten)
|
|
- Verwende echte deutsche Umlaute (ü, ä, ö, ß), keine Umschreibungen (ue, ae, oe, ss)
|
|
|
|
Antworte NUR mit einem JSON-Objekt:
|
|
{{
|
|
"fixable": true/false,
|
|
"solutions": [
|
|
{{
|
|
"type": "replace_url|add_feed|deactivate",
|
|
"name": "Anzeigename",
|
|
"url": "https://...",
|
|
"description": "Kurze Begründung"
|
|
}}
|
|
],
|
|
"summary": "Zusammenfassung in 1-2 Sätzen"
|
|
}}
|
|
|
|
Nur das JSON, kein anderer Text."""
|
|
|
|
from shared.agents.claude_client import call_claude
|
|
|
|
try:
|
|
response, usage = await call_claude(prompt, tools="WebSearch,WebFetch")
|
|
|
|
import re
|
|
json_match = re.search(r'\{.*\}', response, re.DOTALL)
|
|
if json_match:
|
|
result = _json.loads(json_match.group(0))
|
|
else:
|
|
result = {"fixable": False, "solutions": [], "summary": response[:500]}
|
|
|
|
# Lösungen als Vorschläge speichern
|
|
|
|
for sol in result.get("solutions", []):
|
|
sol_type = sol.get("type", "add_feed")
|
|
suggestion_type = {
|
|
"replace_url": "fix_url",
|
|
"add_feed": "add_source",
|
|
"deactivate": "deactivate_source",
|
|
}.get(sol_type, "add_source")
|
|
|
|
title = f"{source['name']}: {sol.get('description', sol_type)[:120]}"
|
|
|
|
# Duplikat-Check: gleicher Typ + gleiche Quelle bereits pending?
|
|
cursor = await db.execute(
|
|
"SELECT id FROM source_suggestions WHERE suggestion_type = ? AND source_id = ? AND status = 'pending'",
|
|
(suggestion_type, source_id),
|
|
)
|
|
if await cursor.fetchone():
|
|
continue
|
|
|
|
data = _json.dumps({
|
|
"name": sol.get("name", source["name"]),
|
|
"url": sol.get("url", ""),
|
|
"domain": source["domain"] or "",
|
|
"category": source["category"],
|
|
}, ensure_ascii=False)
|
|
|
|
await db.execute(
|
|
"INSERT INTO source_suggestions "
|
|
"(suggestion_type, title, description, source_id, suggested_data, priority, status) "
|
|
"VALUES (?, ?, ?, ?, ?, 'high', 'pending')",
|
|
(suggestion_type, title, sol.get("description", ""), source_id, data),
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
result["cost_usd"] = usage.cost_usd
|
|
result["tokens"] = {"input": usage.input_tokens, "output": usage.output_tokens}
|
|
return result
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Recherche fehlgeschlagen: {e}")
|