code-compass / src /bedrock_claude.py
technophyle's picture
Sync from GitHub via hub-sync
f1089a9 verified
import os
import random
import time
from typing import Optional, Tuple
class BedrockTransientError(RuntimeError):
pass
def create_bedrock_runtime_client():
try:
import boto3
except ImportError as exc:
raise RuntimeError("Bedrock Claude support requires the `boto3` package.") from exc
max_attempts = int(os.getenv("BEDROCK_CLIENT_MAX_ATTEMPTS", "10"))
try:
from botocore.config import Config
config = Config(
retries={
"max_attempts": max_attempts,
"mode": os.getenv("BEDROCK_RETRY_MODE", "adaptive"),
}
)
except Exception:
config = None
return boto3.client(
"bedrock-runtime",
region_name=os.getenv("AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "us-east-1")),
config=config,
)
def is_bedrock_retryable_error(exc: Exception) -> bool:
code = ""
response = getattr(exc, "response", None)
if isinstance(response, dict):
code = str((response.get("Error") or {}).get("Code") or "")
message = str(exc)
normalized = f"{code} {message}".lower()
return any(
marker in normalized
for marker in {
"throttling",
"too many requests",
"rate exceeded",
"serviceunavailable",
"service unavailable",
"timeout",
"timed out",
"temporarily unavailable",
}
)
def generate_bedrock_claude_text(
client,
model: str,
system_prompt: str,
user_prompt: str,
*,
max_tokens: int,
temperature: float,
top_p: Optional[float] = None,
) -> Tuple[str, str]:
inference_config = {
"maxTokens": max_tokens,
"temperature": temperature,
}
if top_p is not None:
inference_config["topP"] = top_p
max_retries = int(os.getenv("BEDROCK_LLM_MAX_RETRIES", "6"))
base_seconds = float(os.getenv("BEDROCK_LLM_RETRY_BASE_SECONDS", "1.5"))
max_sleep_seconds = float(os.getenv("BEDROCK_LLM_RETRY_MAX_SECONDS", "30"))
last_exc = None
for attempt in range(1, max_retries + 1):
try:
response = client.converse(
modelId=model,
system=[{"text": system_prompt.strip()}],
messages=[
{
"role": "user",
"content": [{"text": user_prompt.strip()}],
}
],
inferenceConfig=inference_config,
)
break
except Exception as exc:
if not is_bedrock_retryable_error(exc) or attempt >= max_retries:
if is_bedrock_retryable_error(exc):
raise BedrockTransientError(
f"Bedrock throttled or was temporarily unavailable after "
f"{attempt} attempts: {exc}"
) from exc
raise
last_exc = exc
sleep_seconds = min(max_sleep_seconds, base_seconds * (2 ** (attempt - 1)))
sleep_seconds += random.uniform(0.0, min(1.0, sleep_seconds * 0.2))
print(
f"[bedrock] Transient Bedrock error; retrying "
f"attempt={attempt}/{max_retries} wait={sleep_seconds:.1f}s error={exc}",
flush=True,
)
time.sleep(sleep_seconds)
else:
raise BedrockTransientError(f"Bedrock request failed after retries: {last_exc}")
content_blocks = (((response or {}).get("output") or {}).get("message") or {}).get("content") or []
text = "".join(block.get("text", "") for block in content_blocks if block.get("text")).strip()
if not text:
raise RuntimeError("Bedrock Claude returned an empty response.")
return text, str((response or {}).get("stopReason", "") or "")