- src/services/source_classifier.py: classify_source(db, id) ruft Haiku mit
strukturiertem Prompt (4 Achsen + state_affiliated + country + Konfidenz)
und schreibt Vorschlaege in proposed_*-Spalten. bulk_classify(db, limit)
iteriert sequenziell ueber unklassifizierte Quellen.
- API-Endpoints (alle hinter Auth, globale Quellen nur fuer org_admin):
- GET /api/sources/classification/stats
- GET /api/sources/classification/queue
- POST /api/sources/{id}/classification/approve (proposed_* -> echte Felder)
- POST /api/sources/{id}/classification/reject (proposed_* loeschen)
- POST /api/sources/{id}/classification/reclassify (sofort, ~3-5s)
- POST /api/sources/classification/bulk-classify (BackgroundTask)
- scripts/migrate_sources_classification.py: CLI-Wrapper fuer Bulk-Migration
zur einmaligen Erstbestueckung aller Bestandsquellen.
Sample-Test auf Staging steht aus.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
65 Zeilen
2.0 KiB
Python
65 Zeilen
2.0 KiB
Python
"""Einmalige LLM-Klassifikation aller noch unklassifizierten Quellen.
|
|
|
|
Verwendung:
|
|
python3 scripts/migrate_sources_classification.py --limit 50
|
|
python3 scripts/migrate_sources_classification.py --limit 500 # Alle
|
|
python3 scripts/migrate_sources_classification.py --recheck-pending # bereits Pending neu
|
|
|
|
Schreibt Vorschlaege in proposed_*-Spalten. Approval erfolgt anschliessend
|
|
ueber das Verwaltungs-UI / API (POST /api/sources/{id}/classification/approve).
|
|
"""
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# src/ in PYTHONPATH aufnehmen, wenn Skript direkt aufgerufen wird
|
|
HERE = Path(__file__).resolve().parent
|
|
SRC = HERE.parent / "src"
|
|
if str(SRC) not in sys.path:
|
|
sys.path.insert(0, str(SRC))
|
|
|
|
from database import get_db # noqa: E402
|
|
from services.source_classifier import bulk_classify # noqa: E402
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
|
)
|
|
logger = logging.getLogger("migrate_sources")
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(description="LLM-Klassifikation aller Quellen.")
|
|
parser.add_argument("--limit", type=int, default=50, help="Max. Quellen pro Lauf")
|
|
parser.add_argument(
|
|
"--recheck-pending",
|
|
action="store_true",
|
|
help="Auch Quellen mit classification_source='llm_pending' neu klassifizieren",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
db = await get_db()
|
|
try:
|
|
result = await bulk_classify(
|
|
db,
|
|
limit=args.limit,
|
|
only_unclassified=not args.recheck_pending,
|
|
)
|
|
finally:
|
|
await db.close()
|
|
|
|
print(f"Verarbeitet: {result['processed']}")
|
|
print(f"Erfolgreich: {result['success']}")
|
|
print(f"Fehler: {len(result['errors'])}")
|
|
print(f"Kosten: ${result['total_cost_usd']:.4f}")
|
|
if result["errors"]:
|
|
print("\nFehler-Details:")
|
|
for e in result["errors"][:10]:
|
|
print(f" source_id={e['source_id']}: {e['error']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|