🤖AI Agents Guide
TutorialsComparisonsReviewsExamplesIntegrationsUse CasesTemplatesGlossary
Get Started
🤖AI Agents Guide

Your comprehensive resource for understanding, building, and implementing AI Agents.

Learn

  • Tutorials
  • Glossary
  • Use Cases
  • Examples

Compare

  • Tool Comparisons
  • Reviews
  • Integrations
  • Templates

Company

  • About
  • Contact
  • Privacy Policy

© 2026 AI Agents Guide. All rights reserved.

Home/Tutorials/AI Agent Rate Limiting: TPM & Cost Control
advanced17 min read

AI Agent Rate Limiting: TPM & Cost Control

Complete guide to rate limiting AI agents in production — token rate limits (TPM/RPM), exponential backoff retry logic, multiple model fallback, request queuing, cost budget enforcement, and asyncio semaphores for concurrent agents. Full Python implementation.

Tech abstract representing rate limiting and throttling systems
Photo by Growtika on Unsplash
By AI Agents Guide Team•March 1, 2026

Table of Contents

  1. What You'll Build
  2. Prerequisites
  3. Overview
  4. Step 1: Exponential Backoff with Tenacity
  5. Step 2: Token Budget Tracking
  6. Step 3: Asyncio Semaphore for Concurrent Agent Control
  7. Step 4: Multi-Model Fallback Chain
  8. Step 5: Request Queue for High-Volume Workloads
  9. Common Issues and Solutions
  10. Production Considerations
  11. Next Steps
AI visualization showing concurrent agent rate control
Photo by Growtika on Unsplash

What You'll Build#

A production rate limiting system for AI agents that includes:

  • Token budget tracking per session and per day
  • Exponential backoff retry with jitter using tenacity
  • Asyncio semaphores for controlling concurrent agent instances
  • A multi-model fallback chain (GPT-4o → GPT-4o-mini → cached response)
  • Request queuing with priority for concurrent agent workloads
  • Cost enforcement that stops or downgrades agents before they overspend

Prerequisites#

pip install openai anthropic tenacity redis asyncio python-dotenv
  • Python 3.11+
  • OpenAI and/or Anthropic API keys
  • Redis (optional, for distributed rate limiting)
  • Familiarity with agentic workflows

Overview#

LLM APIs enforce two types of rate limits: RPM (requests per minute) and TPM (tokens per minute). Both can throttle your agents. A well-designed rate limiting layer handles these transparently so your agent logic stays clean.

The key components:

  1. Retry logic: Automatically retry on 429 errors with backoff
  2. Concurrency control: Limit simultaneous agent runs with semaphores
  3. Budget enforcement: Track spend and stop/downgrade before hitting limits
  4. Model fallback: Use cheaper models when primary is rate limited

Step 1: Exponential Backoff with Tenacity#

The tenacity library provides production-grade retry logic. Build a reusable wrapper:

# rate_limiting/retry.py
import time
import logging
from typing import Callable, Any
import httpx
import openai
import anthropic
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential_jitter,
    retry_if_exception_type,
    before_sleep_log,
    RetryCallState,
)

logger = logging.getLogger(__name__)


def is_rate_limit_error(exception: Exception) -> bool:
    """Check if exception is a rate limit or quota error."""
    if isinstance(exception, openai.RateLimitError):
        return True
    if isinstance(exception, anthropic.RateLimitError):
        return True
    if isinstance(exception, httpx.HTTPStatusError):
        return exception.response.status_code == 429
    return False


def is_retryable_error(exception: Exception) -> bool:
    """Check if exception should trigger a retry."""
    if is_rate_limit_error(exception):
        return True
    # Also retry on server errors (500, 502, 503)
    if isinstance(exception, (openai.APIConnectionError, anthropic.APIConnectionError)):
        return True
    if isinstance(exception, httpx.HTTPStatusError):
        return exception.response.status_code in {500, 502, 503, 529}
    return False


def get_retry_after(exception: Exception) -> float | None:
    """Extract Retry-After header value if present."""
    response = None
    if isinstance(exception, openai.RateLimitError):
        response = exception.response
    elif isinstance(exception, httpx.HTTPStatusError):
        response = exception.response

    if response is not None:
        retry_after = response.headers.get("retry-after")
        if retry_after:
            try:
                return float(retry_after)
            except ValueError:
                pass
    return None


def log_retry_attempt(retry_state: RetryCallState) -> None:
    """Log retry attempts with timing info."""
    exception = retry_state.outcome.exception()
    wait_time = retry_state.next_action.sleep if retry_state.next_action else 0

    if is_rate_limit_error(exception):
        logger.warning(
            f"Rate limit hit (attempt {retry_state.attempt_number}). "
            f"Waiting {wait_time:.1f}s before retry. "
            f"Error: {type(exception).__name__}"
        )
    else:
        logger.error(
            f"Retryable error (attempt {retry_state.attempt_number}). "
            f"Waiting {wait_time:.1f}s. Error: {exception}"
        )


# Production retry decorator
def with_retry(max_attempts: int = 5, max_wait: float = 60.0):
    """Decorator factory for LLM API calls with exponential backoff."""
    return retry(
        retry=retry_if_exception_type(Exception),  # We filter in should_retry
        stop=stop_after_attempt(max_attempts),
        wait=wait_exponential_jitter(
            initial=1,      # Start at 1 second
            max=max_wait,   # Cap at max_wait seconds
            jitter=2,       # Add up to 2 seconds of jitter
        ),
        before_sleep=log_retry_attempt,
        reraise=True,
    )


# Async wrapper that respects Retry-After headers
async def call_with_retry(
    func: Callable,
    *args,
    max_attempts: int = 5,
    **kwargs,
) -> Any:
    """Call an async function with exponential backoff retry."""
    last_exception = None

    for attempt in range(max_attempts):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            last_exception = e

            if not is_retryable_error(e):
                raise  # Don't retry non-retryable errors

            if attempt == max_attempts - 1:
                break  # Last attempt, will re-raise below

            # Check for Retry-After header
            retry_after = get_retry_after(e)
            if retry_after:
                wait_time = min(retry_after, 120.0)  # Cap at 2 minutes
                logger.warning(f"Rate limit: Retry-After={retry_after}s. Waiting {wait_time}s.")
            else:
                # Exponential backoff with jitter
                base_wait = min(2 ** attempt, 60)
                jitter = base_wait * 0.5 * (2 * (time.time() % 1) - 1)  # +-50% jitter
                wait_time = max(1.0, base_wait + jitter)
                logger.warning(f"Retryable error (attempt {attempt+1}). Waiting {wait_time:.1f}s.")

            import asyncio
            await asyncio.sleep(wait_time)

    raise last_exception

Step 2: Token Budget Tracking#

Track token usage per session and enforce cost budgets:

# rate_limiting/budget.py
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional
import logging

logger = logging.getLogger(__name__)


# Model pricing (USD per 1M tokens as of March 2026 — verify current pricing)
MODEL_PRICING = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
    "claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
}


@dataclass
class TokenUsage:
    input_tokens: int = 0
    output_tokens: int = 0
    model: str = ""

    @property
    def total_tokens(self) -> int:
        return self.input_tokens + self.output_tokens

    @property
    def cost_usd(self) -> float:
        pricing = MODEL_PRICING.get(self.model, {"input": 5.0, "output": 15.0})
        input_cost = (self.input_tokens / 1_000_000) * pricing["input"]
        output_cost = (self.output_tokens / 1_000_000) * pricing["output"]
        return input_cost + output_cost


@dataclass
class BudgetTracker:
    """Tracks token usage and enforces cost budgets."""
    max_cost_per_run: float = 1.00      # Max $1.00 per agent run
    max_cost_per_day: float = 50.00     # Max $50.00 per day
    warn_at_pct: float = 0.80           # Warn at 80% of budget

    _total_cost: float = field(default=0.0, init=False)
    _daily_cost: float = field(default=0.0, init=False)
    _day_start: float = field(default_factory=time.time, init=False)
    _usage_history: list[TokenUsage] = field(default_factory=list, init=False)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, init=False)

    async def record_usage(self, usage: TokenUsage) -> None:
        """Record token usage and check budget limits."""
        async with self._lock:
            self._usage_history.append(usage)
            cost = usage.cost_usd
            self._total_cost += cost

            # Reset daily counter if a new day has started
            if time.time() - self._day_start > 86400:
                self._daily_cost = 0
                self._day_start = time.time()
            self._daily_cost += cost

            # Log usage
            logger.info(
                f"Token usage: {usage.input_tokens} in + {usage.output_tokens} out = "
                f"{usage.total_tokens} total (${cost:.4f}). "
                f"Run total: ${self._total_cost:.4f}"
            )

            # Warn at threshold
            if self._total_cost >= self.max_cost_per_run * self.warn_at_pct:
                logger.warning(
                    f"Approaching run budget: ${self._total_cost:.4f} / "
                    f"${self.max_cost_per_run:.2f} ({self._total_cost/self.max_cost_per_run:.0%})"
                )

    async def check_budget(self, estimated_tokens: int, model: str) -> Optional[str]:
        """
        Check if a call can proceed within budget.

        Returns None if OK, or an error message if budget exceeded.
        """
        pricing = MODEL_PRICING.get(model, {"input": 5.0, "output": 15.0})
        # Estimate cost (assume 2:1 output:input ratio as conservative estimate)
        estimated_cost = (estimated_tokens / 1_000_000) * max(pricing["input"], pricing["output"])

        async with self._lock:
            if self._total_cost + estimated_cost > self.max_cost_per_run:
                return (
                    f"Run budget exceeded: ${self._total_cost:.4f} spent of "
                    f"${self.max_cost_per_run:.2f} limit. "
                    f"Estimated next call: ${estimated_cost:.4f}"
                )
            if self._daily_cost + estimated_cost > self.max_cost_per_day:
                return (
                    f"Daily budget exceeded: ${self._daily_cost:.4f} spent of "
                    f"${self.max_cost_per_day:.2f} daily limit."
                )
        return None

    @property
    def summary(self) -> dict:
        return {
            "run_cost": self._total_cost,
            "daily_cost": self._daily_cost,
            "calls": len(self._usage_history),
            "total_tokens": sum(u.total_tokens for u in self._usage_history),
        }

Step 3: Asyncio Semaphore for Concurrent Agent Control#

Control how many agents run simultaneously:

# rate_limiting/concurrency.py
import asyncio
import time
from contextlib import asynccontextmanager
from typing import AsyncIterator
import logging

logger = logging.getLogger(__name__)


class RateLimitedSemaphore:
    """
    Semaphore that also enforces requests-per-minute limits.
    Combines concurrency control with RPM rate limiting.
    """

    def __init__(self, max_concurrent: int, max_rpm: int):
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._max_rpm = max_rpm
        self._request_times: list[float] = []
        self._lock = asyncio.Lock()

    async def _wait_for_rpm_capacity(self) -> None:
        """Wait until we have RPM capacity."""
        while True:
            async with self._lock:
                now = time.time()
                # Remove requests older than 60 seconds
                self._request_times = [t for t in self._request_times if now - t < 60]

                if len(self._request_times) < self._max_rpm:
                    self._request_times.append(now)
                    return

                # Calculate wait time: oldest request will expire in...
                oldest = self._request_times[0]
                wait_time = 60 - (now - oldest) + 0.1  # +0.1 buffer

            logger.debug(f"RPM limit reached. Waiting {wait_time:.1f}s.")
            await asyncio.sleep(wait_time)

    @asynccontextmanager
    async def acquire(self) -> AsyncIterator[None]:
        """Acquire both the semaphore and RPM slot."""
        await self._wait_for_rpm_capacity()
        async with self._semaphore:
            yield


# Global rate limiter instances (one per LLM provider)
openai_limiter = RateLimitedSemaphore(
    max_concurrent=10,  # Max 10 simultaneous OpenAI calls
    max_rpm=60,         # Max 60 requests/minute
)

anthropic_limiter = RateLimitedSemaphore(
    max_concurrent=5,   # Anthropic has lower concurrent limits
    max_rpm=50,
)


async def rate_limited_openai_call(client, **kwargs) -> dict:
    """Make an OpenAI API call with rate limiting."""
    async with openai_limiter.acquire():
        return await call_with_retry(
            client.chat.completions.create,
            **kwargs,
        )


async def rate_limited_anthropic_call(client, **kwargs) -> dict:
    """Make an Anthropic API call with rate limiting."""
    async with anthropic_limiter.acquire():
        return await call_with_retry(
            client.messages.create,
            **kwargs,
        )

Step 4: Multi-Model Fallback Chain#

Automatically fall back to cheaper models when primary is rate limited:

# rate_limiting/model_fallback.py
import asyncio
import openai
from dataclasses import dataclass
from typing import Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class ModelConfig:
    model: str
    max_tokens: int
    context_window: int
    priority: int  # Lower = higher priority


FALLBACK_CHAIN = [
    ModelConfig("gpt-4o", max_tokens=4096, context_window=128000, priority=1),
    ModelConfig("gpt-4o-mini", max_tokens=4096, context_window=128000, priority=2),
    ModelConfig("gpt-3.5-turbo", max_tokens=4096, context_window=16384, priority=3),
]


class ModelFallbackManager:
    """Manages fallback between models when rate limits are hit."""

    def __init__(self, client: openai.AsyncOpenAI, chain: list[ModelConfig] = None):
        self.client = client
        self.chain = chain or FALLBACK_CHAIN
        self._model_failures: dict[str, float] = {}  # model -> last failure time
        self._cooldown_seconds = 60  # How long to avoid a failed model

    def _is_in_cooldown(self, model: str) -> bool:
        if model not in self._model_failures:
            return False
        elapsed = asyncio.get_event_loop().time() - self._model_failures[model]
        return elapsed < self._cooldown_seconds

    def _get_available_models(self) -> list[ModelConfig]:
        """Return models not currently in cooldown."""
        return [m for m in self.chain if not self._is_in_cooldown(m.model)]

    async def create_completion(
        self,
        messages: list[dict],
        budget_tracker: Optional[BudgetTracker] = None,
        **kwargs,
    ) -> tuple[str, str]:
        """
        Create completion with automatic model fallback.

        Returns (response_content, model_used).
        """
        available = self._get_available_models()
        if not available:
            raise RuntimeError("All models in cooldown. Try again in a minute.")

        for model_config in available:
            # Check budget before calling
            if budget_tracker:
                # Estimate tokens (rough: 4 chars per token)
                estimated_tokens = sum(len(m.get("content", "")) for m in messages) // 4
                budget_error = await budget_tracker.check_budget(
                    estimated_tokens, model_config.model
                )
                if budget_error:
                    logger.warning(f"Budget check failed for {model_config.model}: {budget_error}")
                    continue

            try:
                logger.debug(f"Attempting completion with {model_config.model}")
                response = await rate_limited_openai_call(
                    self.client,
                    model=model_config.model,
                    messages=messages,
                    max_tokens=model_config.max_tokens,
                    **kwargs,
                )

                content = response.choices[0].message.content

                # Record usage
                if budget_tracker and hasattr(response, "usage"):
                    await budget_tracker.record_usage(TokenUsage(
                        input_tokens=response.usage.prompt_tokens,
                        output_tokens=response.usage.completion_tokens,
                        model=model_config.model,
                    ))

                return content, model_config.model

            except openai.RateLimitError as e:
                logger.warning(f"Rate limit on {model_config.model}. Adding to cooldown.")
                self._model_failures[model_config.model] = asyncio.get_event_loop().time()
                continue  # Try next model

            except openai.BadRequestError as e:
                if "context_length" in str(e).lower():
                    logger.warning(f"Context too long for {model_config.model}. Trying next.")
                    continue
                raise

        raise RuntimeError("All models exhausted. Rate limited or budget exceeded.")

Step 5: Request Queue for High-Volume Workloads#

For batch processing many agent runs with priority:

# rate_limiting/queue.py
import asyncio
import heapq
from dataclasses import dataclass, field
from typing import Callable, Any
import logging

logger = logging.getLogger(__name__)


@dataclass(order=True)
class QueuedRequest:
    priority: int           # Lower = higher priority
    timestamp: float        # For FIFO within same priority
    request_id: str = field(compare=False)
    func: Callable = field(compare=False)
    args: tuple = field(compare=False)
    kwargs: dict = field(compare=False)
    future: asyncio.Future = field(compare=False)


class PriorityRequestQueue:
    """Priority queue for LLM requests with rate limiting."""

    def __init__(self, rate_limiter: RateLimitedSemaphore):
        self._queue: list[QueuedRequest] = []
        self._rate_limiter = rate_limiter
        self._processing = False
        self._lock = asyncio.Lock()

    async def submit(
        self,
        func: Callable,
        *args,
        priority: int = 5,  # 1=urgent, 5=normal, 10=low
        request_id: str = None,
        **kwargs,
    ) -> Any:
        """Submit a request to the queue. Returns when complete."""
        import time
        import uuid
        future = asyncio.get_event_loop().create_future()
        request = QueuedRequest(
            priority=priority,
            timestamp=time.time(),
            request_id=request_id or str(uuid.uuid4()),
            func=func,
            args=args,
            kwargs=kwargs,
            future=future,
        )

        async with self._lock:
            heapq.heappush(self._queue, request)
            logger.debug(f"Queued request {request.request_id} (priority={priority}, queue_size={len(self._queue)})")

        # Start processing if not already running
        if not self._processing:
            asyncio.create_task(self._process_queue())

        return await future

    async def _process_queue(self) -> None:
        """Process queued requests with rate limiting."""
        self._processing = True
        try:
            while self._queue:
                async with self._lock:
                    if not self._queue:
                        break
                    request = heapq.heappop(self._queue)

                try:
                    async with self._rate_limiter.acquire():
                        result = await request.func(*request.args, **request.kwargs)
                        request.future.set_result(result)
                except Exception as e:
                    request.future.set_exception(e)

        finally:
            self._processing = False


# Usage example
async def process_batch_of_agents(prompts: list[str]) -> list[str]:
    """Process many agent prompts with rate-limited queuing."""
    queue = PriorityRequestQueue(openai_limiter)
    client = openai.AsyncOpenAI()

    async def single_completion(prompt: str) -> str:
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
        )
        return response.choices[0].message.content

    # Submit all requests — they'll be processed with rate limiting
    tasks = [
        queue.submit(single_completion, prompt, priority=5)
        for prompt in prompts
    ]
    results = await asyncio.gather(*tasks)
    return list(results)

Common Issues and Solutions#

Issue: Semaphore deadlock when agents call each other

Never acquire the same semaphore from within a semaphore-protected call. Use separate semaphores for different resource types (LLM calls vs database calls). Set a timeout on semaphore acquisition: asyncio.wait_for(semaphore.acquire(), timeout=30).

Issue: Budget counter incorrect under concurrent access

Always use asyncio.Lock when reading and writing budget counters. The async with self._lock: pattern in the BudgetTracker above prevents race conditions. In distributed deployments (multiple processes), use Redis atomic operations: INCRBY with expiry for counters.

Issue: Fallback to cheaper model produces worse results

Log which model was actually used for each agent run and track quality metrics by model. If the fallback model is unacceptable for your use case, configure the fallback to return a cached or static response instead of using a cheaper model.

Production Considerations#

Distributed rate limiting: For multi-process or multi-server deployments, replace the in-memory _request_times list in RateLimitedSemaphore with Redis: use ZADD with timestamps as scores and ZREMRANGEBYSCORE to expire old entries. This gives you consistent rate limiting across all processes.

Rate limit headroom: Don't consume 100% of your API limit. Set your software limits at 80% of the API limit to leave headroom for bursts. For example, if your OpenAI tier allows 60 RPM, set max_rpm=48.

Observability: Track rate limit hits, retry counts, and model fallback events as metrics. Alert when fallback usage exceeds 10% of requests — it indicates your primary model is consistently overloaded.

Next Steps#

  • Implement error handling for non-rate-limit failures
  • Set up production monitoring to track rate limit metrics
  • Add caching strategies to reduce API calls
  • Review agent state for managing agent context efficiently
  • Learn how the OpenAI Agents SDK handles retries natively

Related Tutorials

How to Create a Meeting Scheduling AI Agent

Build an autonomous AI agent to handle meeting scheduling, calendar checks, and bookings intelligently. This step-by-step tutorial covers Python implementation with LangChain, Google Calendar integration, and advanced features like conflict resolution for efficient automation.

How to Manage Multiple AI Agents

Master managing multiple AI agents with this in-depth tutorial. Learn orchestration, state sharing, parallel execution, and scaling using LangGraph and custom tools. From basics to production-ready swarms for complex tasks.

How to Train an AI Agent on Your Own Data

Master training AI agents on custom data with three methods: context stuffing, RAG using vector databases, and fine-tuning. This beginner-to-advanced guide includes step-by-step code examples, pitfalls, and best practices to build knowledgeable agents for your specific needs.

← Back to All Tutorials