diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index 452eb32..b4d909d 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -32,6 +32,10 @@ CATEGORY_REPUTATION = { "sonstige": 0.4, } +# Research-Modus: Automatisch 3 Durchläufe für optimale Ergebnisse +RESEARCH_MULTI_PASS_COUNT = 3 +RESEARCH_PASS_LABELS = {1: "Breite Erfassung", 2: "Vertiefung", 3: "Konsolidierung"} + def _normalize_url(url: str) -> str: """URL normalisieren für Duplikat-Erkennung.""" @@ -476,9 +480,15 @@ class AgentOrchestrator: last_error = None try: + # Research-Lagen: Automatisch 3 Durchläufe + incident_type = await self._get_incident_type(incident_id) + for attempt in range(3): try: - await self._run_refresh(incident_id, trigger_type=trigger_type, retry_count=attempt, user_id=user_id) + if incident_type == "research": + await self._run_research_multi_pass(incident_id, trigger_type=trigger_type, user_id=user_id) + else: + await self._run_refresh(incident_id, trigger_type=trigger_type, retry_count=attempt, user_id=user_id) last_error = None break # Erfolg except asyncio.CancelledError: @@ -589,7 +599,7 @@ class AgentOrchestrator: await db.close() return visibility, created_by, tenant_id - async def _run_refresh(self, incident_id: int, trigger_type: str = "manual", retry_count: int = 0, user_id: int = None): + async def _run_refresh(self, incident_id: int, trigger_type: str = "manual", retry_count: int = 0, user_id: int = None, _suppress_complete: bool = False, _pass_info: dict = None): """Führt einen kompletten Refresh-Zyklus durch.""" import aiosqlite from database import get_db @@ -640,11 +650,17 @@ class AgentOrchestrator: research_status = "deep_researching" if incident_type == "research" else "researching" research_detail = "Hintergrundrecherche im Web läuft..." if incident_type == "research" else "RSS-Feeds und Web werden durchsucht..." + # Multi-Pass: Detail-Text mit Durchlauf-Info versehen + _ws_extra = {} + if _pass_info: + _pnr, _ptotal, _plabel = _pass_info["nr"], _pass_info["total"], _pass_info["label"] + research_detail = f"Recherche {_pnr}/{_ptotal}: {_plabel}..." + _ws_extra = {"research_pass": _pnr, "research_total_passes": _ptotal} if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "status_update", "incident_id": incident_id, - "data": {"status": research_status, "detail": research_detail, "started_at": now_utc}, + "data": {"status": research_status, "detail": research_detail, "started_at": now_utc, **_ws_extra}, }, visibility, created_by, tenant_id) # Bestehende Artikel vorladen (für Dedup UND Kontext) @@ -802,14 +818,18 @@ class AgentOrchestrator: unique_results.sort(key=lambda a: a.get("relevance_score", 0), reverse=True) source_count = len(set(a.get("source", "") for a in unique_results)) + _analyze_detail = f"Analysiert {len(unique_results)} Meldungen aus {source_count} Quellen..." + if _pass_info: + _analyze_detail = f"[{_pass_info['nr']}/{_pass_info['total']}] {_analyze_detail}" if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "status_update", "incident_id": incident_id, "data": { "status": "analyzing", - "detail": f"Analysiert {len(unique_results)} Meldungen aus {source_count} Quellen...", + "detail": _analyze_detail, "started_at": now, + **_ws_extra, }, }, visibility, created_by, tenant_id) @@ -957,11 +977,14 @@ class AgentOrchestrator: ) all_articles_preloaded = [dict(row) for row in await cursor.fetchall()] + _parallel_detail = "Analyse und Faktencheck laufen parallel..." + if _pass_info: + _parallel_detail = f"[{_pass_info['nr']}/{_pass_info['total']}] {_parallel_detail}" if self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "status_update", "incident_id": incident_id, - "data": {"status": "analyzing", "detail": "Analyse und Faktencheck laufen parallel...", "started_at": now_utc}, + "data": {"status": "analyzing", "detail": _parallel_detail, "started_at": now_utc, **_ws_extra}, }, visibility, created_by, tenant_id) # Quelleneinordnung (Bias) an Artikel anhaengen @@ -1355,7 +1378,7 @@ class AgentOrchestrator: if unique_results: asyncio.create_task(_background_discover_sources(unique_results)) - if self._ws_manager: + if not _suppress_complete and self._ws_manager: await self._ws_manager.broadcast_for_incident({ "type": "refresh_complete", "incident_id": incident_id, @@ -1375,5 +1398,73 @@ class AgentOrchestrator: await db.close() + async def _get_incident_type(self, incident_id: int) -> str: + """Incident-Typ laden (adhoc/research).""" + from database import get_db + db = await get_db() + try: + cursor = await db.execute( + "SELECT type FROM incidents WHERE id = ?", (incident_id,) + ) + row = await cursor.fetchone() + return row["type"] if row else "adhoc" + finally: + await db.close() + + async def _run_research_multi_pass(self, incident_id: int, trigger_type: str, user_id: int = None): + """Führt automatisch 3 Recherche-Durchläufe für Research-Lagen durch. + + Durchlauf 1: Breite Erfassung (initiale 4-Phasen-Recherche) + Durchlauf 2: Vertiefung (andere Quellen, inkrementelle Analyse) + Durchlauf 3: Konsolidierung (letzte Lücken, Fakten-Upgrade) + """ + total = RESEARCH_MULTI_PASS_COUNT + + for pass_nr in range(1, total + 1): + # Cancel zwischen Durchläufen prüfen + self._check_cancelled(incident_id) + + is_last = (pass_nr == total) + pass_info = { + "nr": pass_nr, + "total": total, + "label": RESEARCH_PASS_LABELS.get(pass_nr, f"Durchlauf {pass_nr}"), + } + + logger.info( + f"Research Multi-Pass {pass_nr}/{total} für Lage {incident_id}: " + f"{pass_info['label']}" + ) + + try: + await self._run_refresh( + incident_id, + trigger_type=trigger_type, + retry_count=0, + user_id=user_id, + _suppress_complete=not is_last, + _pass_info=pass_info, + ) + except asyncio.CancelledError: + logger.info( + f"Research Multi-Pass abgebrochen in Durchlauf {pass_nr}/{total} " + f"für Lage {incident_id}" + ) + raise + except Exception as e: + logger.error( + f"Research Multi-Pass {pass_nr}/{total} fehlgeschlagen " + f"für Lage {incident_id}: {e}" + ) + if is_last: + raise + # Nicht-letzter Durchlauf: weiter mit nächstem, bisherige Ergebnisse bleiben + + logger.info( + f"Research Multi-Pass abgeschlossen für Lage {incident_id}: " + f"{total} Durchläufe" + ) + + # Singleton-Instanz orchestrator = AgentOrchestrator() diff --git a/src/static/css/style.css b/src/static/css/style.css index 891abcd..86d1c30 100644 --- a/src/static/css/style.css +++ b/src/static/css/style.css @@ -1995,6 +1995,14 @@ a:hover { font-size: 12px; } +.progress-pass-info { + font-size: 11px; + color: var(--accent-primary); + margin-left: 8px; + font-weight: 600; + letter-spacing: 0.3px; +} + .progress-cancel-btn { position: absolute; right: var(--sp-xl); diff --git a/src/static/dashboard.html b/src/static/dashboard.html index 442441e..741a033 100644 --- a/src/static/dashboard.html +++ b/src/static/dashboard.html @@ -198,6 +198,7 @@