diff --git a/src/database.py b/src/database.py index 19f06bf..54d6b7e 100644 --- a/src/database.py +++ b/src/database.py @@ -158,7 +158,31 @@ CREATE TABLE IF NOT EXISTS sources ( article_count INTEGER DEFAULT 0, last_seen_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - tenant_id INTEGER REFERENCES organizations(id) + tenant_id INTEGER REFERENCES organizations(id), + language TEXT, + bias TEXT, + political_orientation TEXT DEFAULT 'na', + media_type TEXT DEFAULT 'sonstige', + reliability TEXT DEFAULT 'na', + state_affiliated INTEGER DEFAULT 0, + country_code TEXT, + classification_source TEXT DEFAULT 'legacy', + classified_at TIMESTAMP, + proposed_political_orientation TEXT, + proposed_media_type TEXT, + proposed_reliability TEXT, + proposed_state_affiliated INTEGER, + proposed_country_code TEXT, + proposed_alignments_json TEXT, + proposed_confidence REAL, + proposed_reasoning TEXT, + proposed_at TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS source_alignments ( + source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, + alignment TEXT NOT NULL, + PRIMARY KEY (source_id, alignment) ); CREATE TABLE IF NOT EXISTS notifications ( @@ -611,6 +635,57 @@ async def init_db(): await db.execute("ALTER TABLE sources ADD COLUMN tenant_id INTEGER REFERENCES organizations(id)") await db.commit() + # Migration: language + bias (Freitext, schon laenger im Einsatz, Schema-Lueck schliessen) + if "language" not in src_columns: + await db.execute("ALTER TABLE sources ADD COLUMN language TEXT") + await db.commit() + if "bias" not in src_columns: + await db.execute("ALTER TABLE sources ADD COLUMN bias TEXT") + await db.commit() + + # Migration: strukturierte Klassifikations-Spalten fuer sources + for col, ddl in [ + ("political_orientation", "ALTER TABLE sources ADD COLUMN political_orientation TEXT DEFAULT 'na'"), + ("media_type", "ALTER TABLE sources ADD COLUMN media_type TEXT DEFAULT 'sonstige'"), + ("reliability", "ALTER TABLE sources ADD COLUMN reliability TEXT DEFAULT 'na'"), + ("state_affiliated", "ALTER TABLE sources ADD COLUMN state_affiliated INTEGER DEFAULT 0"), + ("country_code", "ALTER TABLE sources ADD COLUMN country_code TEXT"), + ("classification_source", "ALTER TABLE sources ADD COLUMN classification_source TEXT DEFAULT 'legacy'"), + ("classified_at", "ALTER TABLE sources ADD COLUMN classified_at TIMESTAMP"), + ("proposed_political_orientation", "ALTER TABLE sources ADD COLUMN proposed_political_orientation TEXT"), + ("proposed_media_type", "ALTER TABLE sources ADD COLUMN proposed_media_type TEXT"), + ("proposed_reliability", "ALTER TABLE sources ADD COLUMN proposed_reliability TEXT"), + ("proposed_state_affiliated", "ALTER TABLE sources ADD COLUMN proposed_state_affiliated INTEGER"), + ("proposed_country_code", "ALTER TABLE sources ADD COLUMN proposed_country_code TEXT"), + ("proposed_alignments_json", "ALTER TABLE sources ADD COLUMN proposed_alignments_json TEXT"), + ("proposed_confidence", "ALTER TABLE sources ADD COLUMN proposed_confidence REAL"), + ("proposed_reasoning", "ALTER TABLE sources ADD COLUMN proposed_reasoning TEXT"), + ("proposed_at", "ALTER TABLE sources ADD COLUMN proposed_at TIMESTAMP"), + ]: + if col not in src_columns: + await db.execute(ddl) + await db.commit() + if any(c not in src_columns for c in ("political_orientation", "media_type", "reliability")): + logger.info("Migration: Klassifikations-Spalten zu sources hinzugefuegt") + + # Migration: source_alignments-Tabelle (Mehrfach-Tags fuer geopolitische Naehe) + cursor = await db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='source_alignments'" + ) + if not await cursor.fetchone(): + await db.executescript( + """ + CREATE TABLE source_alignments ( + source_id INTEGER NOT NULL REFERENCES sources(id) ON DELETE CASCADE, + alignment TEXT NOT NULL, + PRIMARY KEY (source_id, alignment) + ); + CREATE INDEX IF NOT EXISTS idx_source_alignments_alignment ON source_alignments(alignment); + """ + ) + await db.commit() + logger.info("Migration: source_alignments-Tabelle erstellt") + # Migration: tenant_id fuer notifications cursor = await db.execute("PRAGMA table_info(notifications)") notif_columns = [row[1] for row in await cursor.fetchall()] diff --git a/src/models.py b/src/models.py index 6c1e547..32d3bb7 100644 --- a/src/models.py +++ b/src/models.py @@ -139,24 +139,51 @@ class IncidentListItem(BaseModel): # Sources (Quellenverwaltung) +SOURCE_TYPE_PATTERN = "^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$" +SOURCE_CATEGORY_PATTERN = "^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$" +SOURCE_STATUS_PATTERN = "^(active|inactive)$" +POLITICAL_ORIENTATION_PATTERN = "^(links_extrem|links|mitte_links|liberal|mitte|konservativ|mitte_rechts|rechts|rechts_extrem|na)$" +MEDIA_TYPE_PATTERN = "^(tageszeitung|wochenzeitung|magazin|tv_sender|radio|oeffentlich_rechtlich|nachrichtenagentur|online_only|blog|telegram_kanal|telegram_bot|podcast|social_media|imageboard|think_tank|ngo|behoerde|staatsmedium|fachmedium|sonstige)$" +RELIABILITY_PATTERN = "^(sehr_hoch|hoch|gemischt|niedrig|sehr_niedrig|na)$" +ALIGNMENT_PATTERN = "^(prorussisch|proiranisch|prowestlich|proukrainisch|prochinesisch|projapanisch|proisraelisch|propalaestinensisch|protuerkisch|panarabisch|neutral|sonstige)$" +COUNTRY_CODE_PATTERN = "^[A-Z]{2}$" +CLASSIFICATION_SOURCE_PATTERN = "^(manual|llm_approved|llm_pending|legacy)$" + + class SourceCreate(BaseModel): name: str = Field(min_length=1, max_length=200) url: Optional[str] = None domain: Optional[str] = None - source_type: str = Field(default="rss_feed", pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$") - category: str = Field(default="sonstige", pattern="^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$") - status: str = Field(default="active", pattern="^(active|inactive)$") + source_type: str = Field(default="rss_feed", pattern=SOURCE_TYPE_PATTERN) + category: str = Field(default="sonstige", pattern=SOURCE_CATEGORY_PATTERN) + status: str = Field(default="active", pattern=SOURCE_STATUS_PATTERN) notes: Optional[str] = None + language: Optional[str] = None + bias: Optional[str] = None + political_orientation: Optional[str] = Field(default=None, pattern=POLITICAL_ORIENTATION_PATTERN) + media_type: Optional[str] = Field(default=None, pattern=MEDIA_TYPE_PATTERN) + reliability: Optional[str] = Field(default=None, pattern=RELIABILITY_PATTERN) + state_affiliated: Optional[bool] = None + country_code: Optional[str] = Field(default=None, pattern=COUNTRY_CODE_PATTERN) + alignments: Optional[list[str]] = None class SourceUpdate(BaseModel): name: Optional[str] = Field(default=None, max_length=200) url: Optional[str] = None domain: Optional[str] = None - source_type: Optional[str] = Field(default=None, pattern="^(rss_feed|web_source|excluded|telegram_channel|podcast_feed)$") - category: Optional[str] = Field(default=None, pattern="^(nachrichtenagentur|oeffentlich-rechtlich|qualitaetszeitung|behoerde|fachmedien|think-tank|international|regional|boulevard|sonstige)$") - status: Optional[str] = Field(default=None, pattern="^(active|inactive)$") + source_type: Optional[str] = Field(default=None, pattern=SOURCE_TYPE_PATTERN) + category: Optional[str] = Field(default=None, pattern=SOURCE_CATEGORY_PATTERN) + status: Optional[str] = Field(default=None, pattern=SOURCE_STATUS_PATTERN) notes: Optional[str] = None + language: Optional[str] = None + bias: Optional[str] = None + political_orientation: Optional[str] = Field(default=None, pattern=POLITICAL_ORIENTATION_PATTERN) + media_type: Optional[str] = Field(default=None, pattern=MEDIA_TYPE_PATTERN) + reliability: Optional[str] = Field(default=None, pattern=RELIABILITY_PATTERN) + state_affiliated: Optional[bool] = None + country_code: Optional[str] = Field(default=None, pattern=COUNTRY_CODE_PATTERN) + alignments: Optional[list[str]] = None class SourceResponse(BaseModel): @@ -174,6 +201,14 @@ class SourceResponse(BaseModel): created_at: str language: Optional[str] = None bias: Optional[str] = None + political_orientation: Optional[str] = None + media_type: Optional[str] = None + reliability: Optional[str] = None + state_affiliated: bool = False + country_code: Optional[str] = None + classification_source: Optional[str] = None + classified_at: Optional[str] = None + alignments: list[str] = [] is_global: bool = False diff --git a/src/routers/sources.py b/src/routers/sources.py index f6318d1..9adade2 100644 --- a/src/routers/sources.py +++ b/src/routers/sources.py @@ -12,7 +12,56 @@ logger = logging.getLogger("osint.sources") router = APIRouter(prefix="/api/sources", tags=["sources"]) -SOURCE_UPDATE_COLUMNS = {"name", "url", "domain", "source_type", "category", "status", "notes"} +SOURCE_UPDATE_COLUMNS = { + "name", "url", "domain", "source_type", "category", "status", "notes", + "language", "bias", + "political_orientation", "media_type", "reliability", + "state_affiliated", "country_code", +} +SOURCE_CLASSIFICATION_FIELDS = { + "political_orientation", "media_type", "reliability", + "state_affiliated", "country_code", +} +ALLOWED_ALIGNMENTS = { + "prorussisch", "proiranisch", "prowestlich", "proukrainisch", + "prochinesisch", "projapanisch", "proisraelisch", "propalaestinensisch", + "protuerkisch", "panarabisch", "neutral", "sonstige", +} + + +async def _load_alignments_for(db: aiosqlite.Connection, source_ids: list[int]) -> dict[int, list[str]]: + """Lädt alignments fuer mehrere Quellen in einer Query und gibt {source_id: [alignment, ...]} zurück.""" + if not source_ids: + return {} + placeholders = ",".join("?" for _ in source_ids) + cursor = await db.execute( + f"SELECT source_id, alignment FROM source_alignments WHERE source_id IN ({placeholders}) ORDER BY alignment", + source_ids, + ) + out: dict[int, list[str]] = {sid: [] for sid in source_ids} + for row in await cursor.fetchall(): + out.setdefault(row["source_id"], []).append(row["alignment"]) + return out + + +async def _replace_alignments(db: aiosqlite.Connection, source_id: int, alignments: list[str]): + """Ersetzt die alignments-Liste einer Quelle (DELETE + INSERT) — Aufrufer muss commit() machen.""" + await db.execute("DELETE FROM source_alignments WHERE source_id = ?", (source_id,)) + seen: set[str] = set() + for raw in alignments: + a = (raw or "").strip().lower() + if not a or a in seen: + continue + if a not in ALLOWED_ALIGNMENTS: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"Ungueltiger alignment-Wert: '{a}'", + ) + seen.add(a) + await db.execute( + "INSERT INTO source_alignments (source_id, alignment) VALUES (?, ?)", + (source_id, a), + ) def _check_source_ownership(source: dict, username: str): @@ -34,6 +83,11 @@ async def list_sources( source_type: str = None, category: str = None, source_status: str = None, + political_orientation: str = None, + media_type: str = None, + reliability: str = None, + state_affiliated: bool = None, + alignment: str = None, current_user: dict = Depends(get_current_user), db: aiosqlite.Connection = Depends(db_dependency), ): @@ -41,27 +95,43 @@ async def list_sources( tenant_id = current_user.get("tenant_id") # Global (tenant_id=NULL) + eigene Org - query = "SELECT * FROM sources WHERE (tenant_id IS NULL OR tenant_id = ?)" - params = [tenant_id] + query = "SELECT s.* FROM sources s WHERE (s.tenant_id IS NULL OR s.tenant_id = ?)" + params: list = [tenant_id] if source_type: - query += " AND source_type = ?" + query += " AND s.source_type = ?" params.append(source_type) if category: - query += " AND category = ?" + query += " AND s.category = ?" params.append(category) if source_status: - query += " AND status = ?" + query += " AND s.status = ?" params.append(source_status) + if political_orientation: + query += " AND s.political_orientation = ?" + params.append(political_orientation) + if media_type: + query += " AND s.media_type = ?" + params.append(media_type) + if reliability: + query += " AND s.reliability = ?" + params.append(reliability) + if state_affiliated is not None: + query += " AND s.state_affiliated = ?" + params.append(1 if state_affiliated else 0) + if alignment: + query += " AND EXISTS (SELECT 1 FROM source_alignments sa WHERE sa.source_id = s.id AND sa.alignment = ?)" + params.append(alignment.lower()) - query += " ORDER BY source_type, category, name" + query += " ORDER BY s.source_type, s.category, s.name" cursor = await db.execute(query, params) rows = await cursor.fetchall() - results = [] - for row in rows: - d = dict(row) + results = [dict(row) for row in rows] + alignments_map = await _load_alignments_for(db, [r["id"] for r in results]) + for d in results: d["is_global"] = d.get("tenant_id") is None - results.append(d) + d["state_affiliated"] = bool(d.get("state_affiliated")) + d["alignments"] = alignments_map.get(d["id"], []) return results @@ -454,26 +524,60 @@ async def create_source( detail=f"Domain '{domain}' bereits als Quelle vorhanden: {domain_existing['name']}. Für einen neuen RSS-Feed bitte die Feed-URL angeben.", ) + payload = data.model_dump(exclude_unset=True) + alignments = payload.pop("alignments", None) + classification_touched = bool(SOURCE_CLASSIFICATION_FIELDS & payload.keys()) or alignments is not None + + cols = ["name", "url", "domain", "source_type", "category", "status", "notes", + "language", "bias", + "political_orientation", "media_type", "reliability", + "state_affiliated", "country_code", + "added_by", "tenant_id"] + vals = [ + data.name, + data.url, + domain, + data.source_type, + data.category, + data.status, + data.notes, + payload.get("language"), + payload.get("bias"), + payload.get("political_orientation"), + payload.get("media_type"), + payload.get("reliability"), + 1 if payload.get("state_affiliated") else 0, + payload.get("country_code"), + current_user["username"], + tenant_id, + ] + if classification_touched: + cols += ["classification_source", "classified_at"] + vals += ["manual"] + ts_marker = True + else: + ts_marker = False + + placeholders = ", ".join(["?"] * len(vals) + (["CURRENT_TIMESTAMP"] if ts_marker else [])) cursor = await db.execute( - """INSERT INTO sources (name, url, domain, source_type, category, status, notes, added_by, tenant_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", - ( - data.name, - data.url, - domain, - data.source_type, - data.category, - data.status, - data.notes, - current_user["username"], - tenant_id, - ), + f"INSERT INTO sources ({', '.join(cols)}) VALUES ({placeholders})", + vals, ) + new_id = cursor.lastrowid + + if alignments: + await _replace_alignments(db, new_id, alignments) + await db.commit() - cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (cursor.lastrowid,)) + cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (new_id,)) row = await cursor.fetchone() - return dict(row) + result = dict(row) + result["is_global"] = result.get("tenant_id") is None + result["state_affiliated"] = bool(result.get("state_affiliated")) + alignments_map = await _load_alignments_for(db, [new_id]) + result["alignments"] = alignments_map.get(new_id, []) + return result @router.put("/{source_id}", response_model=SourceResponse) @@ -494,27 +598,51 @@ async def update_source( _check_source_ownership(dict(row), current_user["username"]) + payload = data.model_dump(exclude_unset=True) + alignments = payload.pop("alignments", None) + updates = {} - for field, value in data.model_dump(exclude_none=True).items(): + for field, value in payload.items(): if field not in SOURCE_UPDATE_COLUMNS: continue # Domain normalisieren if field == "domain" and value: value = _DOMAIN_ALIASES.get(value.lower(), value.lower()) + if field == "state_affiliated": + value = 1 if value else 0 updates[field] = value - if not updates: - return dict(row) + classification_touched = bool(SOURCE_CLASSIFICATION_FIELDS & updates.keys()) or alignments is not None + if classification_touched: + updates["classification_source"] = "manual" + updates["classified_at"] = "CURRENT_TIMESTAMP_MARKER" - set_clause = ", ".join(f"{k} = ?" for k in updates) - values = list(updates.values()) + [source_id] + if updates: + set_parts = [] + values = [] + for k, v in updates.items(): + if v == "CURRENT_TIMESTAMP_MARKER": + set_parts.append(f"{k} = CURRENT_TIMESTAMP") + else: + set_parts.append(f"{k} = ?") + values.append(v) + values.append(source_id) + await db.execute(f"UPDATE sources SET {', '.join(set_parts)} WHERE id = ?", values) - await db.execute(f"UPDATE sources SET {set_clause} WHERE id = ?", values) - await db.commit() + if alignments is not None: + await _replace_alignments(db, source_id, alignments) + + if updates or alignments is not None: + await db.commit() cursor = await db.execute("SELECT * FROM sources WHERE id = ?", (source_id,)) row = await cursor.fetchone() - return dict(row) + result = dict(row) + result["is_global"] = result.get("tenant_id") is None + result["state_affiliated"] = bool(result.get("state_affiliated")) + alignments_map = await _load_alignments_for(db, [source_id]) + result["alignments"] = alignments_map.get(source_id, []) + return result @router.delete("/{source_id}", status_code=status.HTTP_204_NO_CONTENT)