feat(sources): strukturierte Klassifikation (Politik/Medientyp/Reliability/Alignments)

- Neue sources-Spalten: political_orientation (7+2 Stufen), media_type (20),
  reliability (5+1), state_affiliated, country_code, classification_source,
  classified_at sowie proposed_*-Spalten fuer LLM-Vorschlaege.
- Neue source_alignments-Tabelle fuer Mehrfach-Tagging geopolitischer Naehe
  (prorussisch, proiranisch, prowestlich, ...).
- API-Filter: ?political_orientation, ?media_type, ?reliability,
  ?state_affiliated, ?alignment.
- create/update_source nehmen alignments[] entgegen und setzen
  classification_source automatisch auf 'manual' bei Klassifikations-Edits.

Backwards-kompatibel: bestehendes bias/language/category bleibt unveraendert,
Default fuer Bestandsquellen ist classification_source = 'legacy'.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Dieser Commit ist enthalten in:
Claude Code
2026-05-07 18:21:45 +00:00
Ursprung 7f220a9b65
Commit f8e2f73bc0
3 geänderte Dateien mit 279 neuen und 41 gelöschten Zeilen

Datei anzeigen

@@ -158,7 +158,31 @@ CREATE TABLE IF NOT EXISTS sources (
article_count INTEGER DEFAULT 0,
last_seen_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
tenant_id INTEGER REFERENCES organizations(id)
tenant_id INTEGER REFERENCES organizations(id),
language TEXT,
bias TEXT,
political_orientation TEXT DEFAULT 'na',
media_type TEXT DEFAULT 'sonstige',
reliability TEXT DEFAULT 'na',
state_affiliated INTEGER DEFAULT 0,
country_code TEXT,
classification_source TEXT DEFAULT 'legacy',
classified_at TIMESTAMP,
proposed_political_orientation TEXT,
proposed_media_type TEXT,
proposed_reliability TEXT,
proposed_state_affiliated INTEGER,
proposed_country_code TEXT,
proposed_alignments_json TEXT,
proposed_confidence REAL,
proposed_reasoning TEXT,
proposed_at TIMESTAMP
);
CREATE TABLE IF NOT EXISTS source_alignments (
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
alignment TEXT NOT NULL,
PRIMARY KEY (source_id, alignment)
);
CREATE TABLE IF NOT EXISTS notifications (
@@ -611,6 +635,57 @@ async def init_db():
await db.execute("ALTER TABLE sources ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)")
await db.commit()
# Migration: language + bias (Freitext, schon laenger im Einsatz, Schema-Lueck schliessen)
if "language" not in src_columns:
await db.execute("ALTER TABLE sources ADD COLUMN language TEXT")
await db.commit()
if "bias" not in src_columns:
await db.execute("ALTER TABLE sources ADD COLUMN bias TEXT")
await db.commit()
# Migration: strukturierte Klassifikations-Spalten fuer sources
for col, ddl in [
("political_orientation", "ALTER TABLE sources ADD COLUMN political_orientation TEXT DEFAULT 'na'"),
("media_type", "ALTER TABLE sources ADD COLUMN media_type TEXT DEFAULT 'sonstige'"),
("reliability", "ALTER TABLE sources ADD COLUMN reliability TEXT DEFAULT 'na'"),
("state_affiliated", "ALTER TABLE sources ADD COLUMN state_affiliated INTEGER DEFAULT 0"),
("country_code", "ALTER TABLE sources ADD COLUMN country_code TEXT"),
("classification_source", "ALTER TABLE sources ADD COLUMN classification_source TEXT DEFAULT 'legacy'"),
("classified_at", "ALTER TABLE sources ADD COLUMN classified_at TIMESTAMP"),
("proposed_political_orientation", "ALTER TABLE sources ADD COLUMN proposed_political_orientation TEXT"),
("proposed_media_type", "ALTER TABLE sources ADD COLUMN proposed_media_type TEXT"),
("proposed_reliability", "ALTER TABLE sources ADD COLUMN proposed_reliability TEXT"),
("proposed_state_affiliated", "ALTER TABLE sources ADD COLUMN proposed_state_affiliated INTEGER"),
("proposed_country_code", "ALTER TABLE sources ADD COLUMN proposed_country_code TEXT"),
("proposed_alignments_json", "ALTER TABLE sources ADD COLUMN proposed_alignments_json TEXT"),
("proposed_confidence", "ALTER TABLE sources ADD COLUMN proposed_confidence REAL"),
("proposed_reasoning", "ALTER TABLE sources ADD COLUMN proposed_reasoning TEXT"),
("proposed_at", "ALTER TABLE sources ADD COLUMN proposed_at TIMESTAMP"),
]:
if col not in src_columns:
await db.execute(ddl)
await db.commit()
if any(c not in src_columns for c in ("political_orientation", "media_type", "reliability")):
logger.info("Migration: Klassifikations-Spalten zu sources hinzugefuegt")
# Migration: source_alignments-Tabelle (Mehrfach-Tags fuer geopolitische Naehe)
cursor = await db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='source_alignments'"
)
if not await cursor.fetchone():
await db.executescript(
"""
CREATE TABLE source_alignments (
source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE,
alignment TEXT NOT NULL,
PRIMARY KEY (source_id, alignment)
);
CREATE INDEX IF NOT EXISTS idx_source_alignments_alignment ON source_alignments(alignment);
"""
)
await db.commit()
logger.info("Migration: source_alignments-Tabelle erstellt")
# Migration: tenant_id fuer notifications
cursor = await db.execute("PRAGMA table_info(notifications)")
notif_columns = [row[1] for row in await cursor.fetchall()]

Datei anzeigen

@@ -139,24 +139,51 @@ class IncidentListItem(BaseModel):
# Sources (Quellenverwaltung)
SOURCE_TYPE_PATTERN = "^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$"
SOURCE_CATEGORY_PATTERN = "^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$"
SOURCE_STATUS_PATTERN = "^(active|inactive)$"
POLITICAL_ORIENTATION_PATTERN = "^(links_extrem|links|mitte_links|liberal|mitte|konservativ|mitte_rechts|rechts|rechts_extrem|na)$"
MEDIA_TYPE_PATTERN = "^(tageszeitung|wochenzeitung|magazin|tv_sender|radio|oeffentlich_rechtlich|nachrichtenagentur|online_only|blog|telegram_kanal|telegram_bot|podcast|social_media|imageboard|think_tank|ngo|behoerde|staatsmedium|fachmedium|sonstige)$"
RELIABILITY_PATTERN = "^(sehr_hoch|hoch|gemischt|niedrig|sehr_niedrig|na)$"
ALIGNMENT_PATTERN = "^(prorussisch|proiranisch|prowestlich|proukrainisch|prochinesisch|projapanisch|proisraelisch|propalaestinensisch|protuerkisch|panarabisch|neutral|sonstige)$"
COUNTRY_CODE_PATTERN = "^[A-Z]{2}$"
CLASSIFICATION_SOURCE_PATTERN = "^(manual|llm_approved|llm_pending|legacy)$"
class SourceCreate(BaseModel):
name: str = Field(min_length=1, max_length=200)
url: Optional[str] = None
domain: Optional[str] = None
source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$")
category: str = Field(default="sonstige", pattern="^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$")
status: str = Field(default="active", pattern="^(active|inactive)$")
source_type: str = Field(default="rss_feed", pattern=SOURCE_TYPE_PATTERN)
category: str = Field(default="sonstige", pattern=SOURCE_CATEGORY_PATTERN)
status: str = Field(default="active", pattern=SOURCE_STATUS_PATTERN)
notes: Optional[str] = None
language: Optional[str] = None
bias: Optional[str] = None
political_orientation: Optional[str] = Field(default=None, pattern=POLITICAL_ORIENTATION_PATTERN)
media_type: Optional[str] = Field(default=None, pattern=MEDIA_TYPE_PATTERN)
reliability: Optional[str] = Field(default=None, pattern=RELIABILITY_PATTERN)
state_affiliated: Optional[bool] = None
country_code: Optional[str] = Field(default=None, pattern=COUNTRY_CODE_PATTERN)
alignments: Optional[list[str]] = None
class SourceUpdate(BaseModel):
name: Optional[str] = Field(default=None, max_length=200)
url: Optional[str] = None
domain: Optional[str] = None
source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$")
category: Optional[str] = Field(default=None, pattern="^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$")
status: Optional[str] = Field(default=None, pattern="^(active|inactive)$")
source_type: Optional[str] = Field(default=None, pattern=SOURCE_TYPE_PATTERN)
category: Optional[str] = Field(default=None, pattern=SOURCE_CATEGORY_PATTERN)
status: Optional[str] = Field(default=None, pattern=SOURCE_STATUS_PATTERN)
notes: Optional[str] = None
language: Optional[str] = None
bias: Optional[str] = None
political_orientation: Optional[str] = Field(default=None, pattern=POLITICAL_ORIENTATION_PATTERN)
media_type: Optional[str] = Field(default=None, pattern=MEDIA_TYPE_PATTERN)
reliability: Optional[str] = Field(default=None, pattern=RELIABILITY_PATTERN)
state_affiliated: Optional[bool] = None
country_code: Optional[str] = Field(default=None, pattern=COUNTRY_CODE_PATTERN)
alignments: Optional[list[str]] = None
class SourceResponse(BaseModel):
@@ -174,6 +201,14 @@ class SourceResponse(BaseModel):
created_at: str
language: Optional[str] = None
bias: Optional[str] = None
political_orientation: Optional[str] = None
media_type: Optional[str] = None
reliability: Optional[str] = None
state_affiliated: bool = False
country_code: Optional[str] = None
classification_source: Optional[str] = None
classified_at: Optional[str] = None
alignments: list[str] = []
is_global: bool = False

Datei anzeigen

@@ -12,7 +12,56 @@ logger = logging.getLogger("osint.sources")
router = APIRouter(prefix="/api/sources", tags=["sources"])
SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes"}
SOURCE_UPDATE_COLUMNS = {
"name", "url", "domain", "source_type", "category", "status", "notes",
"language", "bias",
"political_orientation", "media_type", "reliability",
"state_affiliated", "country_code",
}
SOURCE_CLASSIFICATION_FIELDS = {
"political_orientation", "media_type", "reliability",
"state_affiliated", "country_code",
}
ALLOWED_ALIGNMENTS = {
"prorussisch", "proiranisch", "prowestlich", "proukrainisch",
"prochinesisch", "projapanisch", "proisraelisch", "propalaestinensisch",
"protuerkisch", "panarabisch", "neutral", "sonstige",
}
async def _load_alignments_for(db: aiosqlite.Connection, source_ids: list[int]) -> dict[int, list[str]]:
"""Lädt alignments fuer mehrere Quellen in einer Query und gibt {source_id: [alignment, ...]} zurück."""
if not source_ids:
return {}
placeholders = ",".join("?" for _ in source_ids)
cursor = await db.execute(
f"SELECT source_id, alignment FROM source_alignments WHERE source_id IN ({placeholders}) ORDER BY alignment",
source_ids,
)
out: dict[int, list[str]] = {sid: [] for sid in source_ids}
for row in await cursor.fetchall():
out.setdefault(row["source_id"], []).append(row["alignment"])
return out
async def _replace_alignments(db: aiosqlite.Connection, source_id: int, alignments: list[str]):
"""Ersetzt die alignments-Liste einer Quelle (DELETE + INSERT) — Aufrufer muss commit() machen."""
await db.execute("DELETE FROM source_alignments WHERE source_id = ?", (source_id,))
seen: set[str] = set()
for raw in alignments:
a = (raw or "").strip().lower()
if not a or a in seen:
continue
if a not in ALLOWED_ALIGNMENTS:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Ungueltiger alignment-Wert: '{a}'",
)
seen.add(a)
await db.execute(
"INSERT INTO source_alignments (source_id, alignment) VALUES (?, ?)",
(source_id, a),
)
def _check_source_ownership(source: dict, username: str):
@@ -34,6 +83,11 @@ async def list_sources(
source_type: str = None,
category: str = None,
source_status: str = None,
political_orientation: str = None,
media_type: str = None,
reliability: str = None,
state_affiliated: bool = None,
alignment: str = None,
current_user: dict = Depends(get_current_user),
db: aiosqlite.Connection = Depends(db_dependency),
):
@@ -41,27 +95,43 @@ async def list_sources(
tenant_id = current_user.get("tenant_id")
# Global (tenant_id=NULL) + eigene Org
query = "SELECT * FROM sources WHERE (tenant_id IS NULL OR tenant_id = ?)"
params = [tenant_id]
query = "SELECT s.* FROM sources s WHERE (s.tenant_id IS NULL OR s.tenant_id = ?)"
params: list = [tenant_id]
if source_type:
query += " AND source_type = ?"
query += " AND s.source_type = ?"
params.append(source_type)
if category:
query += " AND category = ?"
query += " AND s.category = ?"
params.append(category)
if source_status:
query += " AND status = ?"
query += " AND s.status = ?"
params.append(source_status)
if political_orientation:
query += " AND s.political_orientation = ?"
params.append(political_orientation)
if media_type:
query += " AND s.media_type = ?"
params.append(media_type)
if reliability:
query += " AND s.reliability = ?"
params.append(reliability)
if state_affiliated is not None:
query += " AND s.state_affiliated = ?"
params.append(1 if state_affiliated else 0)
if alignment:
query += " AND EXISTS (SELECT 1 FROM source_alignments sa WHERE sa.source_id = s.id AND sa.alignment = ?)"
params.append(alignment.lower())
query += " ORDER BY source_type, category, name"
query += " ORDER BY s.source_type, s.category, s.name"
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
results = []
for row in rows:
d = dict(row)
results = [dict(row) for row in rows]
alignments_map = await _load_alignments_for(db, [r["id"] for r in results])
for d in results:
d["is_global"] = d.get("tenant_id") is None
results.append(d)
d["state_affiliated"] = bool(d.get("state_affiliated"))
d["alignments"] = alignments_map.get(d["id"], [])
return results
@@ -454,26 +524,60 @@ async def create_source(
detail=f"Domain '{domain}' bereits als Quelle vorhanden: {domain_existing['name']}. Für einen neuen RSS-Feed bitte die Feed-URL angeben.",
)
payload = data.model_dump(exclude_unset=True)
alignments = payload.pop("alignments", None)
classification_touched = bool(SOURCE_CLASSIFICATION_FIELDS & payload.keys()) or alignments is not None
cols = ["name", "url", "domain", "source_type", "category", "status", "notes",
"language", "bias",
"political_orientation", "media_type", "reliability",
"state_affiliated", "country_code",
"added_by", "tenant_id"]
vals = [
data.name,
data.url,
domain,
data.source_type,
data.category,
data.status,
data.notes,
payload.get("language"),
payload.get("bias"),
payload.get("political_orientation"),
payload.get("media_type"),
payload.get("reliability"),
1 if payload.get("state_affiliated") else 0,
payload.get("country_code"),
current_user["username"],
tenant_id,
]
if classification_touched:
cols += ["classification_source", "classified_at"]
vals += ["manual"]
ts_marker = True
else:
ts_marker = False
placeholders = ", ".join(["?"] * len(vals) + (["CURRENT_TIMESTAMP"] if ts_marker else []))
cursor = await db.execute(
"""INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
data.name,
data.url,
domain,
data.source_type,
data.category,
data.status,
data.notes,
current_user["username"],
tenant_id,
),
f"INSERT INTO sources ({', '.join(cols)}) VALUES ({placeholders})",
vals,
)
new_id = cursor.lastrowid
if alignments:
await _replace_alignments(db, new_id, alignments)
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,))
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (new_id,))
row = await cursor.fetchone()
return dict(row)
result = dict(row)
result["is_global"] = result.get("tenant_id") is None
result["state_affiliated"] = bool(result.get("state_affiliated"))
alignments_map = await _load_alignments_for(db, [new_id])
result["alignments"] = alignments_map.get(new_id, [])
return result
@router.put("/{source_id}", response_model=SourceResponse)
@@ -494,27 +598,51 @@ async def update_source(
_check_source_ownership(dict(row), current_user["username"])
payload = data.model_dump(exclude_unset=True)
alignments = payload.pop("alignments", None)
updates = {}
for field, value in data.model_dump(exclude_none=True).items():
for field, value in payload.items():
if field not in SOURCE_UPDATE_COLUMNS:
continue
# Domain normalisieren
if field == "domain" and value:
value = _DOMAIN_ALIASES.get(value.lower(), value.lower())
if field == "state_affiliated":
value = 1 if value else 0
updates[field] = value
if not updates:
return dict(row)
classification_touched = bool(SOURCE_CLASSIFICATION_FIELDS & updates.keys()) or alignments is not None
if classification_touched:
updates["classification_source"] = "manual"
updates["classified_at"] = "CURRENT_TIMESTAMP_MARKER"
set_clause = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [source_id]
if updates:
set_parts = []
values = []
for k, v in updates.items():
if v == "CURRENT_TIMESTAMP_MARKER":
set_parts.append(f"{k} = CURRENT_TIMESTAMP")
else:
set_parts.append(f"{k} = ?")
values.append(v)
values.append(source_id)
await db.execute(f"UPDATE sources SET {', '.join(set_parts)} WHERE id = ?", values)
await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values)
await db.commit()
if alignments is not None:
await _replace_alignments(db, source_id, alignments)
if updates or alignments is not None:
await db.commit()
cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,))
row = await cursor.fetchone()
return dict(row)
result = dict(row)
result["is_global"] = result.get("tenant_id") is None
result["state_affiliated"] = bool(result.get("state_affiliated"))
alignments_map = await _load_alignments_for(db, [source_id])
result["alignments"] = alignments_map.get(source_id, [])
return result
@router.delete("/{source_id}", status_code=status.HTTP_204_NO_CONTENT)