🤖AI Agents Guide
TutorialsComparisonsReviewsExamplesIntegrationsUse CasesTemplatesGlossary
Get Started
🤖AI Agents Guide

Your comprehensive resource for understanding, building, and implementing AI Agents.

Learn

  • Tutorials
  • Glossary
  • Use Cases
  • Examples

Compare

  • Tool Comparisons
  • Reviews
  • Integrations
  • Templates

Company

  • About
  • Contact
  • Privacy Policy

© 2026 AI Agents Guide. All rights reserved.

Home/Tutorials/AI Agent Caching: Prompt, Semantic & Redis
advanced16 min read

AI Agent Caching: Prompt, Semantic & Redis

Reduce AI agent latency and cost with production caching strategies — prompt caching (Anthropic, OpenAI), semantic caching with embedding similarity, tool result caching, response caching with TTL, and Redis for distributed caching. Real cost and latency reduction examples.

Tech visualization representing caching infrastructure for AI agents
Photo by Google DeepMind on Unsplash
By AI Agents Guide Team•March 1, 2026

Table of Contents

  1. What You'll Build
  2. Prerequisites
  3. Overview
  4. Step 1: Anthropic Prompt Caching
  5. Step 2: Semantic Cache with Embeddings
  6. Step 3: Tool Result Caching
  7. Step 4: Embedding Cache
  8. Common Issues and Solutions
  9. Production Considerations
  10. Next Steps
a close up of a neon sign on a building
Photo by Ivan Rudoy on Unsplash

What You'll Build#

A multi-layer caching system for AI agents that includes:

  • Anthropic prompt caching for large system prompts and document context
  • OpenAI prompt caching configuration
  • Semantic caching using text embeddings and cosine similarity
  • Tool result caching with per-tool TTLs
  • Redis-backed distributed cache for multi-process deployments
  • A cache key strategy and invalidation system

Expected results: 60-85% reduction in LLM costs for repeated query patterns, 70-90% latency reduction on cache hits.

Prerequisites#

pip install anthropic openai redis numpy python-dotenv
  • Python 3.11+
  • Redis 7+ (local or managed)
  • Anthropic and/or OpenAI API keys
  • Familiarity with agentic workflows

Overview#

Caching for AI agents works at four distinct layers:

LayerWhat Is CachedSavingsComplexity
Prompt cache (provider)Token processing for repeated prefixes85-90% cost on cached tokensLow
Semantic cacheFull LLM responses for similar queries100% cost on hitsMedium
Tool result cacheResults from external tool callsReduces tool latencyLow
Embedding cacheEmbedding vectors for semantic lookupReduces embedding API costLow

Step 1: Anthropic Prompt Caching#

Prompt caching is the highest-ROI optimization for agents with large system prompts or RAG context:

# caching/prompt_cache.py
import anthropic
import os
from typing import Optional

client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])


def create_cached_system_prompt(system_text: str) -> list[dict]:
    """
    Create a system prompt with caching enabled.

    The entire system_text is marked as cacheable.
    Anthropic caches blocks of 1,024+ tokens.
    """
    return [
        {
            "type": "text",
            "text": system_text,
            "cache_control": {"type": "ephemeral"},  # 5-minute cache TTL
        }
    ]


def create_cached_document_context(documents: list[str], user_query: str) -> list[dict]:
    """
    Create a message with cached document context for RAG agents.

    The documents (large, static) are cached.
    The user query (small, dynamic) is not cached.
    """
    # Combine all documents into one cacheable block
    doc_text = "\n\n---\n\n".join([
        f"Document {i+1}:\n{doc}" for i, doc in enumerate(documents)
    ])

    return [
        {
            "type": "text",
            "text": f"Here are the reference documents:\n\n{doc_text}\n\nPlease answer questions based only on these documents.",
            "cache_control": {"type": "ephemeral"},  # Cache the documents
        },
        {
            "type": "text",
            "text": user_query,  # No cache_control — this changes each request
        }
    ]


async def query_with_prompt_cache(
    system_prompt: str,
    user_message: str,
    cached_documents: Optional[list[str]] = None,
    model: str = "claude-3-5-sonnet-20241022",
) -> tuple[str, dict]:
    """
    Query Claude with prompt caching enabled.

    Returns (response_text, cache_stats).
    """
    # Build system with caching
    system = create_cached_system_prompt(system_prompt)

    # Build user message
    if cached_documents:
        content = create_cached_document_context(cached_documents, user_message)
    else:
        content = [{"type": "text", "text": user_message}]

    response = client.messages.create(
        model=model,
        max_tokens=1024,
        system=system,
        messages=[{"role": "user", "content": content}],
    )

    # Extract cache statistics from response
    cache_stats = {
        "cache_creation_tokens": getattr(response.usage, "cache_creation_input_tokens", 0),
        "cache_read_tokens": getattr(response.usage, "cache_read_input_tokens", 0),
        "uncached_input_tokens": response.usage.input_tokens,
        "output_tokens": response.usage.output_tokens,
    }

    # Calculate cost savings
    cache_read = cache_stats["cache_read_tokens"]
    if cache_read > 0:
        # Cache read costs 0.30/1M vs 3.00/1M = 90% savings
        saved_cost = (cache_read / 1_000_000) * (3.00 - 0.30)
        cache_stats["estimated_savings_usd"] = saved_cost

    return response.content[0].text, cache_stats


# Example: Agent with large system prompt that benefits from caching
LARGE_SYSTEM_PROMPT = """
You are an expert customer support agent for Acme SaaS...
[Imagine this is 5,000+ tokens of instructions, policies, and examples]
""" * 50  # Simulate large prompt


async def demo_prompt_cache_savings():
    """Demonstrate prompt cache cost savings over multiple calls."""
    queries = [
        "What is your refund policy?",
        "How do I reset my password?",
        "What plan do I need for 50 users?",
        "Can I export my data?",
    ]

    total_saved = 0.0
    for query in queries:
        response, stats = await query_with_prompt_cache(
            system_prompt=LARGE_SYSTEM_PROMPT,
            user_message=query,
        )
        saved = stats.get("estimated_savings_usd", 0)
        total_saved += saved
        cache_hit = "HIT" if stats["cache_read_tokens"] > 0 else "MISS"
        print(f"[{cache_hit}] Query: '{query[:40]}...' | Saved: ${saved:.4f}")

    print(f"\nTotal saved: ${total_saved:.4f}")

OpenAI prompt caching is enabled automatically for prompts over 1,024 tokens — no code changes needed. The cache is maintained for 5-10 minutes. Cached tokens are charged at 50% of the normal rate.

Step 2: Semantic Cache with Embeddings#

Cache LLM responses and reuse them for semantically similar queries:

# caching/semantic_cache.py
import hashlib
import json
import time
import numpy as np
from typing import Optional
import redis
from openai import OpenAI

openai_client = OpenAI()
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)


def get_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]:
    """Get text embedding for semantic similarity."""
    response = openai_client.embeddings.create(
        model=model,
        input=text.strip(),
    )
    return response.data[0].embedding


def cosine_similarity(a: list[float], b: list[float]) -> float:
    """Calculate cosine similarity between two embedding vectors."""
    a_arr = np.array(a)
    b_arr = np.array(b)
    return float(np.dot(a_arr, b_arr) / (np.linalg.norm(a_arr) * np.linalg.norm(b_arr)))


class SemanticCache:
    """
    Cache LLM responses with semantic similarity lookup.

    Stores embeddings + responses in Redis.
    On cache lookup, finds the most similar stored query.
    If similarity > threshold, returns cached response.
    """

    def __init__(
        self,
        redis_client: redis.Redis,
        similarity_threshold: float = 0.92,
        ttl_seconds: int = 86400,  # 24 hours
        namespace: str = "agent_semantic_cache",
        max_entries: int = 10000,
    ):
        self.redis = redis_client
        self.threshold = similarity_threshold
        self.ttl = ttl_seconds
        self.namespace = namespace
        self.max_entries = max_entries
        self._index_key = f"{namespace}:index"

    def _entry_key(self, entry_id: str) -> str:
        return f"{self.namespace}:entry:{entry_id}"

    def get(self, query: str) -> Optional[str]:
        """
        Find a cached response for a semantically similar query.

        Returns the cached response string, or None if no match.
        """
        # Get embedding for the query
        query_embedding = get_embedding(query)

        # Get all stored entries
        entry_ids = self.redis.smembers(self._index_key)
        if not entry_ids:
            return None

        best_similarity = 0.0
        best_response = None

        for entry_id in entry_ids:
            entry_data = self.redis.get(self._entry_key(entry_id))
            if not entry_data:
                continue

            entry = json.loads(entry_data)
            stored_embedding = entry["embedding"]
            similarity = cosine_similarity(query_embedding, stored_embedding)

            if similarity > best_similarity:
                best_similarity = similarity
                if similarity >= self.threshold:
                    best_response = entry["response"]

        if best_response:
            print(f"Semantic cache HIT (similarity={best_similarity:.3f})")
            return best_response

        return None

    def set(self, query: str, response: str) -> str:
        """Store a query-response pair in the semantic cache."""
        query_embedding = get_embedding(query)

        entry_id = hashlib.md5(query.encode()).hexdigest()[:16]
        entry = {
            "query": query[:500],  # Store truncated query for debugging
            "response": response,
            "embedding": query_embedding,
            "created_at": int(time.time()),
        }

        # Store with TTL
        self.redis.setex(
            self._entry_key(entry_id),
            self.ttl,
            json.dumps(entry),
        )
        self.redis.sadd(self._index_key, entry_id)
        self.redis.expire(self._index_key, self.ttl)

        # Evict oldest entries if at max capacity
        current_count = self.redis.scard(self._index_key)
        if current_count > self.max_entries:
            # Remove 10% of oldest entries
            to_remove = list(entry_ids)[:int(self.max_entries * 0.1)]
            for old_id in to_remove:
                self.redis.delete(self._entry_key(old_id))
                self.redis.srem(self._index_key, old_id)

        return entry_id

    def invalidate_pattern(self, pattern: str) -> int:
        """Invalidate cache entries matching a query pattern."""
        pattern_embedding = get_embedding(pattern)
        entry_ids = self.redis.smembers(self._index_key)
        removed = 0

        for entry_id in entry_ids:
            entry_data = self.redis.get(self._entry_key(entry_id))
            if not entry_data:
                continue
            entry = json.loads(entry_data)
            similarity = cosine_similarity(pattern_embedding, entry["embedding"])
            if similarity >= 0.85:  # Lower threshold for invalidation
                self.redis.delete(self._entry_key(entry_id))
                self.redis.srem(self._index_key, entry_id)
                removed += 1

        return removed


# Integration with agent
semantic_cache = SemanticCache(redis_client, similarity_threshold=0.92)


async def query_with_semantic_cache(
    agent_func,
    user_query: str,
    agent_type: str,
) -> tuple[str, bool]:
    """
    Query an agent with semantic caching.

    Returns (response, cache_hit).
    """
    # Build cache key including agent type for isolation
    cache_query = f"{agent_type}:{user_query}"

    # Check cache first
    cached = semantic_cache.get(cache_query)
    if cached:
        return cached, True

    # Cache miss — run the agent
    response = await agent_func(user_query)

    # Store in cache
    semantic_cache.set(cache_query, response)

    return response, False

Step 3: Tool Result Caching#

Cache expensive tool calls to avoid redundant external API requests:

# caching/tool_cache.py
import asyncio
import hashlib
import json
import time
from typing import Callable, Any, Optional
import redis
import functools

redis_client = redis.Redis(host="localhost", port=6379, db=1, decode_responses=True)


def cached_tool(
    ttl_seconds: int = 3600,
    namespace: str = "tool_cache",
    skip_cache_on_error: bool = True,
):
    """
    Decorator that caches tool results in Redis.

    Uses the function name + serialized arguments as the cache key.
    """
    def decorator(func: Callable) -> Callable:

        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs) -> Any:
            # Build cache key from function name + arguments
            cache_input = {"args": list(args), "kwargs": kwargs}
            cache_key = f"{namespace}:{func.__name__}:{_hash_args(cache_input)}"

            # Check cache
            cached_result = redis_client.get(cache_key)
            if cached_result is not None:
                data = json.loads(cached_result)
                age = int(time.time()) - data["cached_at"]
                print(f"[Cache HIT] {func.__name__} (age={age}s)")
                return data["result"]

            # Cache miss — execute the tool
            print(f"[Cache MISS] {func.__name__}")
            try:
                result = await func(*args, **kwargs)
                # Store result
                redis_client.setex(
                    cache_key,
                    ttl_seconds,
                    json.dumps({"result": result, "cached_at": int(time.time())}),
                )
                return result
            except Exception as e:
                if skip_cache_on_error:
                    raise  # Don't cache errors
                raise

        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs) -> Any:
            cache_input = {"args": list(args), "kwargs": kwargs}
            cache_key = f"{namespace}:{func.__name__}:{_hash_args(cache_input)}"

            cached_result = redis_client.get(cache_key)
            if cached_result is not None:
                return json.loads(cached_result)["result"]

            result = func(*args, **kwargs)
            redis_client.setex(
                cache_key,
                ttl_seconds,
                json.dumps({"result": result, "cached_at": int(time.time())}),
            )
            return result

        if asyncio.iscoroutinefunction(func):
            return async_wrapper
        return sync_wrapper

    return decorator


def _hash_args(args: dict) -> str:
    """Create a stable hash from function arguments."""
    serialized = json.dumps(args, sort_keys=True, default=str)
    return hashlib.sha256(serialized.encode()).hexdigest()[:16]


# Apply to tools
@cached_tool(ttl_seconds=3600, namespace="search_cache")
async def search_web(query: str) -> str:
    """Cached web search — results valid for 1 hour."""
    # ... actual search implementation
    return f"Search results for: {query}"


@cached_tool(ttl_seconds=300, namespace="db_cache")  # 5 min TTL for DB queries
async def query_database(sql: str) -> list[dict]:
    """Cached database query — short TTL for fresher data."""
    # ... actual database implementation
    return []


@cached_tool(ttl_seconds=86400, namespace="kb_cache")  # 24 hours for static KB
async def search_knowledge_base(query: str) -> str:
    """Cached knowledge base search — long TTL for stable content."""
    # ... actual KB search
    return "Knowledge base result"

Step 4: Embedding Cache#

Cache embeddings to avoid re-computing them:

# caching/embedding_cache.py
import hashlib
import json
from typing import Optional
import redis
from openai import OpenAI

redis_client = redis.Redis(host="localhost", port=6379, db=2, decode_responses=True)
openai_client = OpenAI()


class EmbeddingCache:
    """Cache embeddings indefinitely — they don't change for the same text."""

    def __init__(self, model: str = "text-embedding-3-small"):
        self.model = model
        self._namespace = f"embeddings:{model}"

    def _key(self, text: str) -> str:
        text_hash = hashlib.sha256(text.encode()).hexdigest()
        return f"{self._namespace}:{text_hash}"

    def get(self, text: str) -> Optional[list[float]]:
        cached = redis_client.get(self._key(text))
        if cached:
            return json.loads(cached)
        return None

    def set(self, text: str, embedding: list[float]) -> None:
        redis_client.set(self._key(text), json.dumps(embedding))
        # No TTL — embeddings are permanent for a given model

    def get_or_compute(self, text: str) -> list[float]:
        """Get cached embedding or compute and cache it."""
        cached = self.get(text)
        if cached:
            return cached

        response = openai_client.embeddings.create(
            model=self.model,
            input=text.strip(),
        )
        embedding = response.data[0].embedding
        self.set(text, embedding)
        return embedding

    def get_batch(self, texts: list[str]) -> list[list[float]]:
        """Get embeddings for multiple texts, using cache where possible."""
        results = []
        uncached_texts = []
        uncached_indices = []

        # Check cache for each text
        for i, text in enumerate(texts):
            cached = self.get(text)
            if cached:
                results.append(cached)
            else:
                results.append(None)  # Placeholder
                uncached_texts.append(text)
                uncached_indices.append(i)

        # Batch compute uncached embeddings
        if uncached_texts:
            response = openai_client.embeddings.create(
                model=self.model,
                input=uncached_texts,
            )
            for idx, (text, embedding_obj) in enumerate(zip(uncached_texts, response.data)):
                embedding = embedding_obj.embedding
                original_idx = uncached_indices[idx]
                results[original_idx] = embedding
                self.set(text, embedding)

        return results

Common Issues and Solutions#

Issue: Semantic cache returns wrong answers due to low similarity threshold

Start at 0.95 and lower gradually while monitoring cache quality. Add a "cache hit review" queue in LangFuse where human reviewers validate a sample of semantic cache hits weekly. Any incorrect cached response that slips through should trigger a threshold increase.

Issue: Redis memory fills up with embedding data

Embeddings are large (1536 floats for ada-002 = ~6KB each). For 100K unique queries, that is ~600MB. Use Redis' maxmemory setting with allkeys-lru eviction policy. The LRU policy will evict least-recently-used embeddings first, keeping hot ones in cache.

Issue: Cache invalidation is too aggressive and causes cache misses

Invalidation should be surgical. Only invalidate entries related to the data that changed. Use the invalidate_pattern() method with a high similarity threshold (0.90+) to target only closely related entries. Avoid time-based invalidation for stable content — use event-based invalidation instead (trigger invalidation when the underlying data changes).

Production Considerations#

Cache warming: Pre-populate the semantic cache with common queries before going live. Run your top 100 FAQ queries through the agent and cache their responses. This prevents cold-start performance degradation.

Cost tracking: Track cache hit rate and calculate savings: saved_cost = hit_count * avg_cost_per_llm_call. Report this to stakeholders as a metric. A well-configured semantic cache for FAQ-heavy agents should achieve 40-60% cache hit rate.

Multi-tenant isolation: For SaaS applications, namespace cache keys by tenant ID to prevent cross-tenant data leakage. Use SemanticCache(namespace=f"tenant_{tenant_id}").

Next Steps#

  • Add rate limiting to complement caching
  • Set up monitoring to track cache hit rates
  • Review agentic RAG for document-level caching
  • Connect to the LangFuse observability platform
  • Build a research agent that caches search results

Related Tutorials

How to Create a Meeting Scheduling AI Agent

Build an autonomous AI agent to handle meeting scheduling, calendar checks, and bookings intelligently. This step-by-step tutorial covers Python implementation with LangChain, Google Calendar integration, and advanced features like conflict resolution for efficient automation.

How to Manage Multiple AI Agents

Master managing multiple AI agents with this in-depth tutorial. Learn orchestration, state sharing, parallel execution, and scaling using LangGraph and custom tools. From basics to production-ready swarms for complex tasks.

How to Train an AI Agent on Your Own Data

Master training AI agents on custom data with three methods: context stuffing, RAG using vector databases, and fine-tuning. This beginner-to-advanced guide includes step-by-step code examples, pitfalls, and best practices to build knowledgeable agents for your specific needs.

← Back to All Tutorials