What You'll Build#
A production error handling framework for AI agents covering:
- Error classification (retryable, transient, fatal, user-error)
- Tool-level error handling that returns safe strings instead of raising
- Agent-level recovery with different prompts on failure
- LLM timeout handling with asyncio
- Circuit breaker pattern for external tool dependencies
- Graceful degradation when capabilities are unavailable
Prerequisites#
pip install openai anthropic asyncio pydantic python-dotenv
- Python 3.11+
- Familiarity with agentic workflows and tool calling
- Basic asyncio knowledge
Overview#
Agent errors fall into distinct categories that require different handling strategies. Conflating them leads to either excessive retrying (wasting cost) or premature failure (degraded user experience).
Error taxonomy for agents:
| Category | Examples | Strategy |
|---|---|---|
| Retryable | Rate limit, timeout, temporary API down | Backoff and retry |
| Correctable | LLM produced wrong format, wrong tool called | Retry with corrected prompt |
| Degradable | Tool unavailable, data missing | Continue with reduced capability |
| Fatal | Invalid API key, quota exhausted | Fail immediately with clear message |
| User error | Missing required info, out-of-scope | Request clarification |
Step 1: Error Classification System#
# error_handling/classifier.py
from enum import Enum
from typing import Optional
import openai
import anthropic
import httpx
class ErrorCategory(Enum):
RETRYABLE = "retryable" # Temporary — retry with backoff
CORRECTABLE = "correctable" # Agent behavior — retry with guidance
DEGRADABLE = "degradable" # Partial — continue with less capability
FATAL = "fatal" # Stop immediately
USER_ERROR = "user_error" # Request user action
def classify_error(exception: Exception) -> ErrorCategory:
"""Classify an exception to determine the handling strategy."""
# OpenAI errors
if isinstance(exception, openai.RateLimitError):
return ErrorCategory.RETRYABLE
if isinstance(exception, openai.APITimeoutError):
return ErrorCategory.RETRYABLE
if isinstance(exception, openai.APIConnectionError):
return ErrorCategory.RETRYABLE
if isinstance(exception, openai.InternalServerError):
return ErrorCategory.RETRYABLE
if isinstance(exception, openai.AuthenticationError):
return ErrorCategory.FATAL
if isinstance(exception, openai.PermissionDeniedError):
return ErrorCategory.FATAL
if isinstance(exception, openai.BadRequestError):
msg = str(exception).lower()
if "context_length" in msg or "max_tokens" in msg:
return ErrorCategory.CORRECTABLE # Reduce context and retry
return ErrorCategory.FATAL
# Anthropic errors
if isinstance(exception, anthropic.RateLimitError):
return ErrorCategory.RETRYABLE
if isinstance(exception, anthropic.APITimeoutError):
return ErrorCategory.RETRYABLE
if isinstance(exception, anthropic.AuthenticationError):
return ErrorCategory.FATAL
# HTTP errors from tool calls
if isinstance(exception, httpx.TimeoutException):
return ErrorCategory.DEGRADABLE # Tool unavailable, continue without it
if isinstance(exception, httpx.ConnectError):
return ErrorCategory.DEGRADABLE
if isinstance(exception, httpx.HTTPStatusError):
if exception.response.status_code in {401, 403}:
return ErrorCategory.FATAL
if exception.response.status_code == 404:
return ErrorCategory.USER_ERROR
if exception.response.status_code >= 500:
return ErrorCategory.RETRYABLE
return ErrorCategory.DEGRADABLE
# Validation errors (LLM produced invalid output)
if isinstance(exception, (ValueError, KeyError, AttributeError)):
return ErrorCategory.CORRECTABLE
# Default: treat as degradable (continue if possible)
return ErrorCategory.DEGRADABLE
class AgentError(Exception):
"""Enriched agent error with classification and context."""
def __init__(
self,
message: str,
category: ErrorCategory,
original_exception: Optional[Exception] = None,
context: Optional[dict] = None,
):
super().__init__(message)
self.category = category
self.original_exception = original_exception
self.context = context or {}
def is_fatal(self) -> bool:
return self.category == ErrorCategory.FATAL
def is_retryable(self) -> bool:
return self.category == ErrorCategory.RETRYABLE
def __repr__(self) -> str:
return f"AgentError({self.category.value}: {self})"
Step 2: Tool-Level Error Handling#
Tools should never raise exceptions to the agent. Convert all errors to strings:
# error_handling/safe_tools.py
import functools
import asyncio
from typing import Callable, Any
import logging
logger = logging.getLogger(__name__)
def safe_tool(
fallback_message: str = "Tool temporarily unavailable.",
log_errors: bool = True,
) -> Callable:
"""Decorator that wraps tool functions to catch all exceptions."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def async_wrapper(*args, **kwargs) -> str:
try:
result = await func(*args, **kwargs)
return str(result) if result is not None else "Tool returned no data."
except asyncio.TimeoutError:
msg = f"Tool timed out after waiting. {fallback_message}"
if log_errors:
logger.warning(f"Timeout in tool {func.__name__}: {msg}")
return msg
except httpx.TimeoutException:
msg = f"External service timed out. {fallback_message}"
if log_errors:
logger.warning(f"HTTP timeout in tool {func.__name__}")
return msg
except Exception as e:
category = classify_error(e)
if category == ErrorCategory.FATAL:
# Fatal errors should still propagate
raise AgentError(
f"Fatal error in {func.__name__}: {e}",
category=ErrorCategory.FATAL,
original_exception=e,
) from e
msg = _format_tool_error(func.__name__, e, category)
if log_errors:
logger.error(f"Error in tool {func.__name__}: {e}", exc_info=True)
return msg
@functools.wraps(func)
def sync_wrapper(*args, **kwargs) -> str:
try:
result = func(*args, **kwargs)
return str(result) if result is not None else "Tool returned no data."
except Exception as e:
category = classify_error(e)
if category == ErrorCategory.FATAL:
raise AgentError(str(e), category, e) from e
return _format_tool_error(func.__name__, e, category)
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
def _format_tool_error(tool_name: str, error: Exception, category: ErrorCategory) -> str:
"""Convert an error into a helpful string for the agent."""
if category == ErrorCategory.RETRYABLE:
return (
f"Tool '{tool_name}' is temporarily unavailable due to a service issue. "
f"You may retry in a moment, or proceed with other information you have."
)
elif category == ErrorCategory.DEGRADABLE:
return (
f"Tool '{tool_name}' could not complete: {str(error)[:200]}. "
f"Consider using an alternative approach or informing the user that "
f"this specific data is currently unavailable."
)
elif category == ErrorCategory.USER_ERROR:
return (
f"Tool '{tool_name}' could not find the requested resource. "
f"Please verify the input data is correct and try again."
)
else:
return f"Tool '{tool_name}' encountered an error: {str(error)[:200]}."
# Apply to your tools
@safe_tool(fallback_message="Try searching a different topic.", log_errors=True)
async def search_web(query: str) -> str:
"""Search the web — errors are caught and returned as strings."""
import httpx
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
"https://api.search.example.com",
params={"q": query}
)
response.raise_for_status()
return response.json()["results"][0]["snippet"]
Step 3: Agent-Level Recovery with Error-Guided Retry#
When the agent itself fails (bad output, wrong tools), retry with corrective prompts:
# error_handling/agent_recovery.py
import asyncio
from dataclasses import dataclass
from typing import Optional
from openai import AsyncOpenAI
import logging
logger = logging.getLogger(__name__)
@dataclass
class RecoveryAttempt:
attempt_number: int
failure_reason: str
guidance: str
class AgentWithRecovery:
"""Agent wrapper that retries with corrective prompts on failure."""
def __init__(
self,
client: AsyncOpenAI,
system_prompt: str,
tools: list[dict],
max_retries: int = 2,
timeout_seconds: float = 60.0,
):
self.client = client
self.system_prompt = system_prompt
self.tools = tools
self.max_retries = max_retries
self.timeout_seconds = timeout_seconds
async def run(self, user_message: str) -> tuple[str, list[RecoveryAttempt]]:
"""
Run the agent with automatic recovery on failure.
Returns (final_response, recovery_attempts_made).
"""
messages = [{"role": "user", "content": user_message}]
recovery_attempts = []
for attempt in range(self.max_retries + 1):
try:
response = await asyncio.wait_for(
self._run_agent_loop(messages),
timeout=self.timeout_seconds,
)
return response, recovery_attempts
except asyncio.TimeoutError:
failure_reason = f"Agent timed out after {self.timeout_seconds}s"
guidance = (
"You ran out of time. Give a direct, concise response. "
"Use fewer tool calls and stop when you have enough information."
)
except AgentError as e:
if e.is_fatal():
raise # Fatal errors don't recover
failure_reason = str(e)
guidance = self._generate_recovery_guidance(e)
except Exception as e:
failure_reason = f"Unexpected error: {type(e).__name__}: {e}"
guidance = "An unexpected error occurred. Try a simpler approach."
if attempt < self.max_retries:
recovery = RecoveryAttempt(
attempt_number=attempt + 1,
failure_reason=failure_reason,
guidance=guidance,
)
recovery_attempts.append(recovery)
logger.warning(
f"Agent attempt {attempt+1} failed: {failure_reason}. "
f"Retrying with guidance."
)
# Add correction to message history
messages.append({
"role": "system",
"content": (
f"Your previous attempt failed: {failure_reason}. "
f"Recovery guidance: {guidance} "
f"This is retry {attempt + 1} of {self.max_retries}."
)
})
else:
logger.error(f"Agent failed after {self.max_retries} retries: {failure_reason}")
return (
f"I was unable to complete this request after {self.max_retries} attempts. "
f"Last error: {failure_reason}. "
f"Please try rephrasing your request or contact support.",
recovery_attempts,
)
# Should not reach here
return "Agent failed unexpectedly.", recovery_attempts
def _generate_recovery_guidance(self, error: AgentError) -> str:
"""Generate specific guidance based on error type."""
if error.category == ErrorCategory.CORRECTABLE:
original = error.original_exception
if original and "context_length" in str(original).lower():
return (
"Your response was too long. Provide a shorter, more focused answer. "
"Skip unnecessary background information."
)
return (
"Your previous output was malformed. Ensure your final answer is "
"clear prose, not JSON or code (unless specifically requested)."
)
if error.category == ErrorCategory.DEGRADABLE:
return (
"Some tools failed. Proceed with the information you successfully retrieved. "
"Acknowledge any gaps in your response."
)
return "An error occurred. Try a different approach to answer the question."
async def _run_agent_loop(self, messages: list[dict]) -> str:
"""Run the tool-calling agent loop."""
loop_messages = [
{"role": "system", "content": self.system_prompt},
*messages,
]
while True:
response = await self.client.chat.completions.create(
model="gpt-4o",
messages=loop_messages,
tools=self.tools if self.tools else openai.NOT_GIVEN,
temperature=0,
)
msg = response.choices[0].message
loop_messages.append(msg.model_dump(exclude_unset=True))
if not msg.tool_calls:
return msg.content or ""
# Process tool calls
for tool_call in msg.tool_calls:
result = await self._dispatch_tool(tool_call)
loop_messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result,
})
async def _dispatch_tool(self, tool_call) -> str:
"""Dispatch a tool call with timeout protection."""
tool_name = tool_call.function.name
import json
try:
args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
return f"Tool {tool_name} called with invalid JSON arguments. Cannot execute."
# Map tool names to functions
tool_map = {
"search_web": search_web,
# add your tools here
}
func = tool_map.get(tool_name)
if not func:
return f"Unknown tool: {tool_name}"
try:
return await asyncio.wait_for(func(**args), timeout=30.0)
except asyncio.TimeoutError:
return f"Tool {tool_name} timed out. Results are unavailable."
Step 4: Circuit Breaker Pattern#
Prevent cascading failures from external tool dependencies:
# error_handling/circuit_breaker.py
import asyncio
import time
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Any, Optional
import logging
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking calls after failures
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Open after N consecutive failures
reset_timeout: float = 30.0 # Try again after N seconds
success_threshold: int = 2 # Close after N successes in half-open
class CircuitBreaker:
"""Circuit breaker for external tool dependencies."""
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: Optional[float] = None
self._lock = asyncio.Lock()
@property
def state(self) -> CircuitState:
return self._state
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute a function through the circuit breaker."""
await self._check_state()
try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure(e)
raise
async def _check_state(self) -> None:
async with self._lock:
if self._state == CircuitState.OPEN:
elapsed = time.time() - (self._last_failure_time or 0)
if elapsed >= self.config.reset_timeout:
logger.info(f"Circuit {self.name}: OPEN → HALF_OPEN (testing recovery)")
self._state = CircuitState.HALF_OPEN
self._success_count = 0
else:
wait_remaining = self.config.reset_timeout - elapsed
raise AgentError(
f"Circuit breaker '{self.name}' is OPEN. "
f"Service unavailable. Retry in {wait_remaining:.0f}s.",
category=ErrorCategory.DEGRADABLE,
)
async def _on_success(self) -> None:
async with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.config.success_threshold:
logger.info(f"Circuit {self.name}: HALF_OPEN → CLOSED (recovered)")
self._state = CircuitState.CLOSED
self._failure_count = 0
elif self._state == CircuitState.CLOSED:
self._failure_count = 0 # Reset on success
async def _on_failure(self, error: Exception) -> None:
async with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
logger.warning(f"Circuit {self.name}: HALF_OPEN → OPEN (recovery failed)")
self._state = CircuitState.OPEN
elif self._failure_count >= self.config.failure_threshold:
logger.error(
f"Circuit {self.name}: CLOSED → OPEN "
f"({self._failure_count} consecutive failures). Error: {error}"
)
self._state = CircuitState.OPEN
# Register circuit breakers for each external dependency
_circuit_breakers: dict[str, CircuitBreaker] = {}
def get_circuit_breaker(service_name: str) -> CircuitBreaker:
if service_name not in _circuit_breakers:
_circuit_breakers[service_name] = CircuitBreaker(
name=service_name,
config=CircuitBreakerConfig(
failure_threshold=5,
reset_timeout=30.0,
)
)
return _circuit_breakers[service_name]
# Apply circuit breaker to a tool
async def protected_search_web(query: str) -> str:
"""Web search protected by circuit breaker."""
cb = get_circuit_breaker("web_search_api")
try:
return await cb.call(search_web, query)
except AgentError as e:
if "OPEN" in str(e):
return "Web search is temporarily unavailable. Using cached information only."
raise
Common Issues and Solutions#
Issue: Agent retries forever on correctable errors
Always set max_retries and enforce it with a counter. Log every retry with the failure reason and guidance applied. Add a max_execution_time outer timeout to the entire agent run to catch infinite loops.
Issue: Circuit breaker opens too aggressively
Tune failure_threshold based on the normal failure rate of the service. A service with 1% error rate needs a threshold of at least 5-10 to avoid false positives. Use a time window (failures within the last 60 seconds) rather than consecutive failures for services with sporadic errors.
Issue: Error messages leak sensitive data to users
Sanitize error messages before returning them to users. Keep detailed error information in logs (with PII redacted), and return only the category and a safe message to the user.
Production Considerations#
Structured error logging: Log all errors with a correlation ID that links to the agent run. Include: error category, tool name, attempt number, and anonymized input. Use structured JSON logging for easy querying.
Error budget: Track your error rate as a percentage of total agent runs. Set an SLO (e.g., error rate below 2%). Alert when the error budget is being consumed faster than expected. See agent monitoring for metric setup.
Human escalation: When an agent fails after all retries, create a support ticket automatically with the full error context. The human-in-the-loop pattern ensures failures don't go unresolved.
Next Steps#
- Set up agent monitoring to track error rates
- Add rate limiting alongside error handling
- Review agent tracing for end-to-end failure visibility
- Implement human-in-the-loop escalation for fatal errors
- Build caching strategies to reduce error impact