What You'll Build#
A production rate limiting system for AI agents that includes:
- Token budget tracking per session and per day
- Exponential backoff retry with jitter using
tenacity - Asyncio semaphores for controlling concurrent agent instances
- A multi-model fallback chain (GPT-4o → GPT-4o-mini → cached response)
- Request queuing with priority for concurrent agent workloads
- Cost enforcement that stops or downgrades agents before they overspend
Prerequisites#
pip install openai anthropic tenacity redis asyncio python-dotenv
- Python 3.11+
- OpenAI and/or Anthropic API keys
- Redis (optional, for distributed rate limiting)
- Familiarity with agentic workflows
Overview#
LLM APIs enforce two types of rate limits: RPM (requests per minute) and TPM (tokens per minute). Both can throttle your agents. A well-designed rate limiting layer handles these transparently so your agent logic stays clean.
The key components:
- Retry logic: Automatically retry on 429 errors with backoff
- Concurrency control: Limit simultaneous agent runs with semaphores
- Budget enforcement: Track spend and stop/downgrade before hitting limits
- Model fallback: Use cheaper models when primary is rate limited
Step 1: Exponential Backoff with Tenacity#
The tenacity library provides production-grade retry logic. Build a reusable wrapper:
# rate_limiting/retry.py
import time
import logging
from typing import Callable, Any
import httpx
import openai
import anthropic
from tenacity import (
retry,
stop_after_attempt,
wait_exponential_jitter,
retry_if_exception_type,
before_sleep_log,
RetryCallState,
)
logger = logging.getLogger(__name__)
def is_rate_limit_error(exception: Exception) -> bool:
"""Check if exception is a rate limit or quota error."""
if isinstance(exception, openai.RateLimitError):
return True
if isinstance(exception, anthropic.RateLimitError):
return True
if isinstance(exception, httpx.HTTPStatusError):
return exception.response.status_code == 429
return False
def is_retryable_error(exception: Exception) -> bool:
"""Check if exception should trigger a retry."""
if is_rate_limit_error(exception):
return True
# Also retry on server errors (500, 502, 503)
if isinstance(exception, (openai.APIConnectionError, anthropic.APIConnectionError)):
return True
if isinstance(exception, httpx.HTTPStatusError):
return exception.response.status_code in {500, 502, 503, 529}
return False
def get_retry_after(exception: Exception) -> float | None:
"""Extract Retry-After header value if present."""
response = None
if isinstance(exception, openai.RateLimitError):
response = exception.response
elif isinstance(exception, httpx.HTTPStatusError):
response = exception.response
if response is not None:
retry_after = response.headers.get("retry-after")
if retry_after:
try:
return float(retry_after)
except ValueError:
pass
return None
def log_retry_attempt(retry_state: RetryCallState) -> None:
"""Log retry attempts with timing info."""
exception = retry_state.outcome.exception()
wait_time = retry_state.next_action.sleep if retry_state.next_action else 0
if is_rate_limit_error(exception):
logger.warning(
f"Rate limit hit (attempt {retry_state.attempt_number}). "
f"Waiting {wait_time:.1f}s before retry. "
f"Error: {type(exception).__name__}"
)
else:
logger.error(
f"Retryable error (attempt {retry_state.attempt_number}). "
f"Waiting {wait_time:.1f}s. Error: {exception}"
)
# Production retry decorator
def with_retry(max_attempts: int = 5, max_wait: float = 60.0):
"""Decorator factory for LLM API calls with exponential backoff."""
return retry(
retry=retry_if_exception_type(Exception), # We filter in should_retry
stop=stop_after_attempt(max_attempts),
wait=wait_exponential_jitter(
initial=1, # Start at 1 second
max=max_wait, # Cap at max_wait seconds
jitter=2, # Add up to 2 seconds of jitter
),
before_sleep=log_retry_attempt,
reraise=True,
)
# Async wrapper that respects Retry-After headers
async def call_with_retry(
func: Callable,
*args,
max_attempts: int = 5,
**kwargs,
) -> Any:
"""Call an async function with exponential backoff retry."""
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if not is_retryable_error(e):
raise # Don't retry non-retryable errors
if attempt == max_attempts - 1:
break # Last attempt, will re-raise below
# Check for Retry-After header
retry_after = get_retry_after(e)
if retry_after:
wait_time = min(retry_after, 120.0) # Cap at 2 minutes
logger.warning(f"Rate limit: Retry-After={retry_after}s. Waiting {wait_time}s.")
else:
# Exponential backoff with jitter
base_wait = min(2 ** attempt, 60)
jitter = base_wait * 0.5 * (2 * (time.time() % 1) - 1) # +-50% jitter
wait_time = max(1.0, base_wait + jitter)
logger.warning(f"Retryable error (attempt {attempt+1}). Waiting {wait_time:.1f}s.")
import asyncio
await asyncio.sleep(wait_time)
raise last_exception
Step 2: Token Budget Tracking#
Track token usage per session and enforce cost budgets:
# rate_limiting/budget.py
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional
import logging
logger = logging.getLogger(__name__)
# Model pricing (USD per 1M tokens as of March 2026 — verify current pricing)
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
}
@dataclass
class TokenUsage:
input_tokens: int = 0
output_tokens: int = 0
model: str = ""
@property
def total_tokens(self) -> int:
return self.input_tokens + self.output_tokens
@property
def cost_usd(self) -> float:
pricing = MODEL_PRICING.get(self.model, {"input": 5.0, "output": 15.0})
input_cost = (self.input_tokens / 1_000_000) * pricing["input"]
output_cost = (self.output_tokens / 1_000_000) * pricing["output"]
return input_cost + output_cost
@dataclass
class BudgetTracker:
"""Tracks token usage and enforces cost budgets."""
max_cost_per_run: float = 1.00 # Max $1.00 per agent run
max_cost_per_day: float = 50.00 # Max $50.00 per day
warn_at_pct: float = 0.80 # Warn at 80% of budget
_total_cost: float = field(default=0.0, init=False)
_daily_cost: float = field(default=0.0, init=False)
_day_start: float = field(default_factory=time.time, init=False)
_usage_history: list[TokenUsage] = field(default_factory=list, init=False)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)
async def record_usage(self, usage: TokenUsage) -> None:
"""Record token usage and check budget limits."""
async with self._lock:
self._usage_history.append(usage)
cost = usage.cost_usd
self._total_cost += cost
# Reset daily counter if a new day has started
if time.time() - self._day_start > 86400:
self._daily_cost = 0
self._day_start = time.time()
self._daily_cost += cost
# Log usage
logger.info(
f"Token usage: {usage.input_tokens} in + {usage.output_tokens} out = "
f"{usage.total_tokens} total (${cost:.4f}). "
f"Run total: ${self._total_cost:.4f}"
)
# Warn at threshold
if self._total_cost >= self.max_cost_per_run * self.warn_at_pct:
logger.warning(
f"Approaching run budget: ${self._total_cost:.4f} / "
f"${self.max_cost_per_run:.2f} ({self._total_cost/self.max_cost_per_run:.0%})"
)
async def check_budget(self, estimated_tokens: int, model: str) -> Optional[str]:
"""
Check if a call can proceed within budget.
Returns None if OK, or an error message if budget exceeded.
"""
pricing = MODEL_PRICING.get(model, {"input": 5.0, "output": 15.0})
# Estimate cost (assume 2:1 output:input ratio as conservative estimate)
estimated_cost = (estimated_tokens / 1_000_000) * max(pricing["input"], pricing["output"])
async with self._lock:
if self._total_cost + estimated_cost > self.max_cost_per_run:
return (
f"Run budget exceeded: ${self._total_cost:.4f} spent of "
f"${self.max_cost_per_run:.2f} limit. "
f"Estimated next call: ${estimated_cost:.4f}"
)
if self._daily_cost + estimated_cost > self.max_cost_per_day:
return (
f"Daily budget exceeded: ${self._daily_cost:.4f} spent of "
f"${self.max_cost_per_day:.2f} daily limit."
)
return None
@property
def summary(self) -> dict:
return {
"run_cost": self._total_cost,
"daily_cost": self._daily_cost,
"calls": len(self._usage_history),
"total_tokens": sum(u.total_tokens for u in self._usage_history),
}
Step 3: Asyncio Semaphore for Concurrent Agent Control#
Control how many agents run simultaneously:
# rate_limiting/concurrency.py
import asyncio
import time
from contextlib import asynccontextmanager
from typing import AsyncIterator
import logging
logger = logging.getLogger(__name__)
class RateLimitedSemaphore:
"""
Semaphore that also enforces requests-per-minute limits.
Combines concurrency control with RPM rate limiting.
"""
def __init__(self, max_concurrent: int, max_rpm: int):
self._semaphore = asyncio.Semaphore(max_concurrent)
self._max_rpm = max_rpm
self._request_times: list[float] = []
self._lock = asyncio.Lock()
async def _wait_for_rpm_capacity(self) -> None:
"""Wait until we have RPM capacity."""
while True:
async with self._lock:
now = time.time()
# Remove requests older than 60 seconds
self._request_times = [t for t in self._request_times if now - t < 60]
if len(self._request_times) < self._max_rpm:
self._request_times.append(now)
return
# Calculate wait time: oldest request will expire in...
oldest = self._request_times[0]
wait_time = 60 - (now - oldest) + 0.1 # +0.1 buffer
logger.debug(f"RPM limit reached. Waiting {wait_time:.1f}s.")
await asyncio.sleep(wait_time)
@asynccontextmanager
async def acquire(self) -> AsyncIterator[None]:
"""Acquire both the semaphore and RPM slot."""
await self._wait_for_rpm_capacity()
async with self._semaphore:
yield
# Global rate limiter instances (one per LLM provider)
openai_limiter = RateLimitedSemaphore(
max_concurrent=10, # Max 10 simultaneous OpenAI calls
max_rpm=60, # Max 60 requests/minute
)
anthropic_limiter = RateLimitedSemaphore(
max_concurrent=5, # Anthropic has lower concurrent limits
max_rpm=50,
)
async def rate_limited_openai_call(client, **kwargs) -> dict:
"""Make an OpenAI API call with rate limiting."""
async with openai_limiter.acquire():
return await call_with_retry(
client.chat.completions.create,
**kwargs,
)
async def rate_limited_anthropic_call(client, **kwargs) -> dict:
"""Make an Anthropic API call with rate limiting."""
async with anthropic_limiter.acquire():
return await call_with_retry(
client.messages.create,
**kwargs,
)
Step 4: Multi-Model Fallback Chain#
Automatically fall back to cheaper models when primary is rate limited:
# rate_limiting/model_fallback.py
import asyncio
import openai
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class ModelConfig:
model: str
max_tokens: int
context_window: int
priority: int # Lower = higher priority
FALLBACK_CHAIN = [
ModelConfig("gpt-4o", max_tokens=4096, context_window=128000, priority=1),
ModelConfig("gpt-4o-mini", max_tokens=4096, context_window=128000, priority=2),
ModelConfig("gpt-3.5-turbo", max_tokens=4096, context_window=16384, priority=3),
]
class ModelFallbackManager:
"""Manages fallback between models when rate limits are hit."""
def __init__(self, client: openai.AsyncOpenAI, chain: list[ModelConfig] = None):
self.client = client
self.chain = chain or FALLBACK_CHAIN
self._model_failures: dict[str, float] = {} # model -> last failure time
self._cooldown_seconds = 60 # How long to avoid a failed model
def _is_in_cooldown(self, model: str) -> bool:
if model not in self._model_failures:
return False
elapsed = asyncio.get_event_loop().time() - self._model_failures[model]
return elapsed < self._cooldown_seconds
def _get_available_models(self) -> list[ModelConfig]:
"""Return models not currently in cooldown."""
return [m for m in self.chain if not self._is_in_cooldown(m.model)]
async def create_completion(
self,
messages: list[dict],
budget_tracker: Optional[BudgetTracker] = None,
**kwargs,
) -> tuple[str, str]:
"""
Create completion with automatic model fallback.
Returns (response_content, model_used).
"""
available = self._get_available_models()
if not available:
raise RuntimeError("All models in cooldown. Try again in a minute.")
for model_config in available:
# Check budget before calling
if budget_tracker:
# Estimate tokens (rough: 4 chars per token)
estimated_tokens = sum(len(m.get("content", "")) for m in messages) // 4
budget_error = await budget_tracker.check_budget(
estimated_tokens, model_config.model
)
if budget_error:
logger.warning(f"Budget check failed for {model_config.model}: {budget_error}")
continue
try:
logger.debug(f"Attempting completion with {model_config.model}")
response = await rate_limited_openai_call(
self.client,
model=model_config.model,
messages=messages,
max_tokens=model_config.max_tokens,
**kwargs,
)
content = response.choices[0].message.content
# Record usage
if budget_tracker and hasattr(response, "usage"):
await budget_tracker.record_usage(TokenUsage(
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
model=model_config.model,
))
return content, model_config.model
except openai.RateLimitError as e:
logger.warning(f"Rate limit on {model_config.model}. Adding to cooldown.")
self._model_failures[model_config.model] = asyncio.get_event_loop().time()
continue # Try next model
except openai.BadRequestError as e:
if "context_length" in str(e).lower():
logger.warning(f"Context too long for {model_config.model}. Trying next.")
continue
raise
raise RuntimeError("All models exhausted. Rate limited or budget exceeded.")
Step 5: Request Queue for High-Volume Workloads#
For batch processing many agent runs with priority:
# rate_limiting/queue.py
import asyncio
import heapq
from dataclasses import dataclass, field
from typing import Callable, Any
import logging
logger = logging.getLogger(__name__)
@dataclass(order=True)
class QueuedRequest:
priority: int # Lower = higher priority
timestamp: float # For FIFO within same priority
request_id: str = field(compare=False)
func: Callable = field(compare=False)
args: tuple = field(compare=False)
kwargs: dict = field(compare=False)
future: asyncio.Future = field(compare=False)
class PriorityRequestQueue:
"""Priority queue for LLM requests with rate limiting."""
def __init__(self, rate_limiter: RateLimitedSemaphore):
self._queue: list[QueuedRequest] = []
self._rate_limiter = rate_limiter
self._processing = False
self._lock = asyncio.Lock()
async def submit(
self,
func: Callable,
*args,
priority: int = 5, # 1=urgent, 5=normal, 10=low
request_id: str = None,
**kwargs,
) -> Any:
"""Submit a request to the queue. Returns when complete."""
import time
import uuid
future = asyncio.get_event_loop().create_future()
request = QueuedRequest(
priority=priority,
timestamp=time.time(),
request_id=request_id or str(uuid.uuid4()),
func=func,
args=args,
kwargs=kwargs,
future=future,
)
async with self._lock:
heapq.heappush(self._queue, request)
logger.debug(f"Queued request {request.request_id} (priority={priority}, queue_size={len(self._queue)})")
# Start processing if not already running
if not self._processing:
asyncio.create_task(self._process_queue())
return await future
async def _process_queue(self) -> None:
"""Process queued requests with rate limiting."""
self._processing = True
try:
while self._queue:
async with self._lock:
if not self._queue:
break
request = heapq.heappop(self._queue)
try:
async with self._rate_limiter.acquire():
result = await request.func(*request.args, **request.kwargs)
request.future.set_result(result)
except Exception as e:
request.future.set_exception(e)
finally:
self._processing = False
# Usage example
async def process_batch_of_agents(prompts: list[str]) -> list[str]:
"""Process many agent prompts with rate-limited queuing."""
queue = PriorityRequestQueue(openai_limiter)
client = openai.AsyncOpenAI()
async def single_completion(prompt: str) -> str:
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
# Submit all requests — they'll be processed with rate limiting
tasks = [
queue.submit(single_completion, prompt, priority=5)
for prompt in prompts
]
results = await asyncio.gather(*tasks)
return list(results)
Common Issues and Solutions#
Issue: Semaphore deadlock when agents call each other
Never acquire the same semaphore from within a semaphore-protected call. Use separate semaphores for different resource types (LLM calls vs database calls). Set a timeout on semaphore acquisition: asyncio.wait_for(semaphore.acquire(), timeout=30).
Issue: Budget counter incorrect under concurrent access
Always use asyncio.Lock when reading and writing budget counters. The async with self._lock: pattern in the BudgetTracker above prevents race conditions. In distributed deployments (multiple processes), use Redis atomic operations: INCRBY with expiry for counters.
Issue: Fallback to cheaper model produces worse results
Log which model was actually used for each agent run and track quality metrics by model. If the fallback model is unacceptable for your use case, configure the fallback to return a cached or static response instead of using a cheaper model.
Production Considerations#
Distributed rate limiting: For multi-process or multi-server deployments, replace the in-memory _request_times list in RateLimitedSemaphore with Redis: use ZADD with timestamps as scores and ZREMRANGEBYSCORE to expire old entries. This gives you consistent rate limiting across all processes.
Rate limit headroom: Don't consume 100% of your API limit. Set your software limits at 80% of the API limit to leave headroom for bursts. For example, if your OpenAI tier allows 60 RPM, set max_rpm=48.
Observability: Track rate limit hits, retry counts, and model fallback events as metrics. Alert when fallback usage exceeds 10% of requests — it indicates your primary model is consistently overloaded.
Next Steps#
- Implement error handling for non-rate-limit failures
- Set up production monitoring to track rate limit metrics
- Add caching strategies to reduce API calls
- Review agent state for managing agent context efficiently
- Learn how the OpenAI Agents SDK handles retries natively