Dateien
AegisSight-Monitor/src/agents/claude_client.py
claude-dev c73541cdbe Block C: Prompt-Umlaute korrigiert + Timeout parametrisiert
- ENHANCE_PROMPT_ADHOC und ENHANCE_PROMPT_RESEARCH: Umschreibungen durch
  echte Umlaute ersetzt (fuer -> fuer, praezises -> praezises, ...). Behebt
  den Widerspruch, dass der Prompt "echte Umlaute verwenden" forderte,
  die Anweisung selbst aber ae/oe/ue/ss nutzte.
- call_claude() bekommt neuen timeout-Parameter. None = Fallback auf
  CLAUDE_TIMEOUT (1800s), sonst Override in Sekunden. asyncio.wait_for
  und die cancel-aware Variante nutzen durchgaengig den effective_timeout.
- Enhance-Endpoint ruft call_claude mit timeout=60 auf (Haiku-Single-Shot,
  vorher global 1800s).
- chat.py _call_claude_chat: Timeout von 60s auf 120s erhoeht (Chat-Antworten
  koennen etwas laenger dauern, haben aber keinen Anspruch auf 30 Min).
2026-04-23 17:56:28 +00:00

210 Zeilen
8.6 KiB
Python

"""Shared Claude CLI Client mit Usage-Tracking."""
import asyncio
import contextvars
import json
import logging
from dataclasses import dataclass
from config import CLAUDE_PATH, CLAUDE_TIMEOUT, CLAUDE_MODEL_FAST, CLAUDE_MODEL_STANDARD
# ContextVar fuer Cancel-Event: Wird vom Orchestrator gesetzt,
# call_claude prueft automatisch darauf -- kein Durchreichen noetig.
_cancel_event_var: contextvars.ContextVar[asyncio.Event | None] = contextvars.ContextVar("_cancel_event_var", default=None)
logger = logging.getLogger("osint.claude_client")
class ClaudeCliError(RuntimeError):
"""Strukturierter Fehler aus dem Claude CLI mit Kategorie.
error_type:
- "rate_limit": Anthropic Rate-Limit oder Overload (transient, retry-tauglich)
- "auth_error": Account-Problem (Organisation hat keinen Claude-Zugang,
Token abgelaufen/ungueltig) - kein Retry sinnvoll, Admin-Aktion noetig
- "timeout": Claude CLI Timeout (transient)
- "cli_error": Sonstiger CLI-Fehler (unspezifisch, Default)
"""
def __init__(self, error_type: str, message: str):
self.error_type = error_type
self.message = message
super().__init__(f"Claude CLI [{error_type}]: {message}")
def _classify_cli_error(combined_output: str) -> str:
"""Ordnet einer Fehler-Ausgabe eine error_type-Kategorie zu."""
txt = combined_output.lower()
rate_limit_keywords = ["hit your limit", "rate limit", "resets", "rate_limit", "overloaded"]
auth_error_keywords = ["does not have access", "login again", "contact your administrator"]
if any(kw in txt for kw in rate_limit_keywords):
return "rate_limit"
if any(kw in txt for kw in auth_error_keywords):
return "auth_error"
return "cli_error"
@dataclass
class ClaudeUsage:
"""Token-Verbrauch eines einzelnen Claude CLI Aufrufs."""
input_tokens: int = 0
output_tokens: int = 0
cache_creation_tokens: int = 0
cache_read_tokens: int = 0
cost_usd: float = 0.0
duration_ms: int = 0
@dataclass
class UsageAccumulator:
"""Akkumuliert Usage über mehrere Claude-Aufrufe eines Refreshs."""
input_tokens: int = 0
output_tokens: int = 0
cache_creation_tokens: int = 0
cache_read_tokens: int = 0
total_cost_usd: float = 0.0
call_count: int = 0
def add(self, usage: ClaudeUsage):
self.input_tokens += usage.input_tokens
self.output_tokens += usage.output_tokens
self.cache_creation_tokens += usage.cache_creation_tokens
self.cache_read_tokens += usage.cache_read_tokens
self.total_cost_usd += usage.cost_usd
self.call_count += 1
def _sanitize_mdash(text: str) -> str:
"""Ersetzt Gedankenstriche durch Bindestriche (KI-Indikator reduzieren)."""
return text.replace("\u2014", " - ").replace("\u2013", " - ")
async def call_claude(prompt: str, tools: str | None = "WebSearch,WebFetch", model: str | None = None, raw_text: bool = False, timeout: float | None = None) -> tuple[str, ClaudeUsage]:
"""Ruft Claude CLI auf. Gibt (result_text, usage) zurück.
Prompt wird via stdin uebergeben um OS ARG_MAX Limits zu vermeiden.
Args:
prompt: Der Prompt fuer Claude
tools: Kommagetrennte erlaubte Tools (None = keine Tools, --max-turns 1)
model: Optionales Modell (z.B. CLAUDE_MODEL_FAST fuer Haiku). None = CLAUDE_MODEL_STANDARD (Opus 4.7).
timeout: Override in Sekunden. None = Fallback auf globalen CLAUDE_TIMEOUT (1800s).
"""
effective_model = model or CLAUDE_MODEL_STANDARD
effective_timeout = timeout if timeout is not None else CLAUDE_TIMEOUT
cmd = [CLAUDE_PATH, "-p", "-", "--output-format", "json", "--model", effective_model]
if tools:
cmd.extend(["--allowedTools", tools])
else:
cmd.extend(["--max-turns", "1", "--allowedTools", ""])
if not raw_text:
cmd.extend(["--append-system-prompt",
"CRITICAL: You are a JSON-only output agent. "
"Output EXCLUSIVELY a single valid JSON object. "
"No explanatory text, no markdown fences, no continuation of previous responses. "
"Start your response with { and end with }."])
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
env={
"PATH": "/usr/local/bin:/usr/bin:/bin",
"HOME": "/home/claude-dev",
"LANG": "C.UTF-8",
"LC_ALL": "C.UTF-8",
},
)
try:
cancel_event = _cancel_event_var.get(None)
if cancel_event:
# Cancel-aware: Monitor cancel_event while process runs
communicate_task = asyncio.create_task(
process.communicate(input=prompt.encode("utf-8"))
)
cancel_wait_task = asyncio.create_task(cancel_event.wait())
timeout_task = asyncio.create_task(asyncio.sleep(effective_timeout))
done, pending = await asyncio.wait(
[communicate_task, cancel_wait_task, timeout_task],
return_when=asyncio.FIRST_COMPLETED,
)
for p in pending:
p.cancel()
if communicate_task in done:
stdout, stderr = communicate_task.result()
elif cancel_wait_task in done:
process.kill()
await process.wait()
raise asyncio.CancelledError("Cancel angefordert")
else:
process.kill()
await process.wait()
raise TimeoutError(f"Claude CLI Timeout nach {effective_timeout}s")
else:
stdout, stderr = await asyncio.wait_for(
process.communicate(input=prompt.encode("utf-8")), timeout=effective_timeout
)
except asyncio.TimeoutError:
process.kill()
raise TimeoutError(f"Claude CLI Timeout nach {effective_timeout}s")
if process.returncode != 0:
error_msg = stderr.decode("utf-8", errors="replace").strip()
stdout_msg = stdout.decode("utf-8", errors="replace").strip()
# Rate-Limit/Auth-Fehler kommen teils als JSON auf stdout, nicht auf stderr
combined_output = f"{error_msg} {stdout_msg}"
error_type = _classify_cli_error(combined_output)
if error_type == "rate_limit":
logger.warning(f"Claude CLI Rate-Limit (Exit {process.returncode}): {stdout_msg or error_msg}")
elif error_type == "auth_error":
logger.error(f"Claude CLI Auth-Fehler (Exit {process.returncode}): {stdout_msg or error_msg}")
else:
logger.error(f"Claude CLI Fehler (Exit {process.returncode}): {error_msg}")
if stdout_msg:
logger.error(f"Claude CLI stdout bei Fehler: {stdout_msg[:500]}")
raise ClaudeCliError(error_type, stdout_msg or error_msg)
raw = stdout.decode("utf-8", errors="replace").strip()
usage = ClaudeUsage()
result_text = raw
try:
data = json.loads(raw)
# CLI kann returncode=0 liefern und trotzdem is_error=true setzen
# (z.B. "Your organization does not have access to Claude")
if data.get("is_error"):
error_text = str(data.get("result", ""))
error_type = _classify_cli_error(error_text)
if error_type == "rate_limit":
logger.warning(f"Claude CLI Rate-Limit (is_error): {error_text}")
elif error_type == "auth_error":
logger.error(f"Claude CLI Auth-Fehler (is_error): {error_text}")
else:
logger.error(f"Claude CLI Fehler (is_error): {error_text}")
raise ClaudeCliError(error_type, error_text)
result_text = data.get("result", raw)
u = data.get("usage", {})
usage = ClaudeUsage(
input_tokens=u.get("input_tokens", 0),
output_tokens=u.get("output_tokens", 0),
cache_creation_tokens=u.get("cache_creation_input_tokens", 0),
cache_read_tokens=u.get("cache_read_input_tokens", 0),
cost_usd=data.get("total_cost_usd", 0.0),
duration_ms=data.get("duration_ms", 0),
)
model_info = f" [{model}]" if model else ""
logger.info(
f"Claude{model_info}: {usage.input_tokens} in / {usage.output_tokens} out / "
f"cache {usage.cache_creation_tokens}+{usage.cache_read_tokens} / "
f"${usage.cost_usd:.4f} / {usage.duration_ms}ms"
)
except json.JSONDecodeError:
logger.warning("Claude CLI Antwort kein gültiges JSON, nutze raw output")
result_text = _sanitize_mdash(result_text)
return result_text, usage