What You'll Build#
A multi-layer caching system for AI agents that includes:
- Anthropic prompt caching for large system prompts and document context
- OpenAI prompt caching configuration
- Semantic caching using text embeddings and cosine similarity
- Tool result caching with per-tool TTLs
- Redis-backed distributed cache for multi-process deployments
- A cache key strategy and invalidation system
Expected results: 60-85% reduction in LLM costs for repeated query patterns, 70-90% latency reduction on cache hits.
Prerequisites#
pip install anthropic openai redis numpy python-dotenv
- Python 3.11+
- Redis 7+ (local or managed)
- Anthropic and/or OpenAI API keys
- Familiarity with agentic workflows
Overview#
Caching for AI agents works at four distinct layers:
| Layer | What Is Cached | Savings | Complexity |
|---|---|---|---|
| Prompt cache (provider) | Token processing for repeated prefixes | 85-90% cost on cached tokens | Low |
| Semantic cache | Full LLM responses for similar queries | 100% cost on hits | Medium |
| Tool result cache | Results from external tool calls | Reduces tool latency | Low |
| Embedding cache | Embedding vectors for semantic lookup | Reduces embedding API cost | Low |
Step 1: Anthropic Prompt Caching#
Prompt caching is the highest-ROI optimization for agents with large system prompts or RAG context:
# caching/prompt_cache.py
import anthropic
import os
from typing import Optional
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
def create_cached_system_prompt(system_text: str) -> list[dict]:
"""
Create a system prompt with caching enabled.
The entire system_text is marked as cacheable.
Anthropic caches blocks of 1,024+ tokens.
"""
return [
{
"type": "text",
"text": system_text,
"cache_control": {"type": "ephemeral"}, # 5-minute cache TTL
}
]
def create_cached_document_context(documents: list[str], user_query: str) -> list[dict]:
"""
Create a message with cached document context for RAG agents.
The documents (large, static) are cached.
The user query (small, dynamic) is not cached.
"""
# Combine all documents into one cacheable block
doc_text = "\n\n---\n\n".join([
f"Document {i+1}:\n{doc}" for i, doc in enumerate(documents)
])
return [
{
"type": "text",
"text": f"Here are the reference documents:\n\n{doc_text}\n\nPlease answer questions based only on these documents.",
"cache_control": {"type": "ephemeral"}, # Cache the documents
},
{
"type": "text",
"text": user_query, # No cache_control — this changes each request
}
]
async def query_with_prompt_cache(
system_prompt: str,
user_message: str,
cached_documents: Optional[list[str]] = None,
model: str = "claude-3-5-sonnet-20241022",
) -> tuple[str, dict]:
"""
Query Claude with prompt caching enabled.
Returns (response_text, cache_stats).
"""
# Build system with caching
system = create_cached_system_prompt(system_prompt)
# Build user message
if cached_documents:
content = create_cached_document_context(cached_documents, user_message)
else:
content = [{"type": "text", "text": user_message}]
response = client.messages.create(
model=model,
max_tokens=1024,
system=system,
messages=[{"role": "user", "content": content}],
)
# Extract cache statistics from response
cache_stats = {
"cache_creation_tokens": getattr(response.usage, "cache_creation_input_tokens", 0),
"cache_read_tokens": getattr(response.usage, "cache_read_input_tokens", 0),
"uncached_input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
}
# Calculate cost savings
cache_read = cache_stats["cache_read_tokens"]
if cache_read > 0:
# Cache read costs 0.30/1M vs 3.00/1M = 90% savings
saved_cost = (cache_read / 1_000_000) * (3.00 - 0.30)
cache_stats["estimated_savings_usd"] = saved_cost
return response.content[0].text, cache_stats
# Example: Agent with large system prompt that benefits from caching
LARGE_SYSTEM_PROMPT = """
You are an expert customer support agent for Acme SaaS...
[Imagine this is 5,000+ tokens of instructions, policies, and examples]
""" * 50 # Simulate large prompt
async def demo_prompt_cache_savings():
"""Demonstrate prompt cache cost savings over multiple calls."""
queries = [
"What is your refund policy?",
"How do I reset my password?",
"What plan do I need for 50 users?",
"Can I export my data?",
]
total_saved = 0.0
for query in queries:
response, stats = await query_with_prompt_cache(
system_prompt=LARGE_SYSTEM_PROMPT,
user_message=query,
)
saved = stats.get("estimated_savings_usd", 0)
total_saved += saved
cache_hit = "HIT" if stats["cache_read_tokens"] > 0 else "MISS"
print(f"[{cache_hit}] Query: '{query[:40]}...' | Saved: ${saved:.4f}")
print(f"\nTotal saved: ${total_saved:.4f}")
OpenAI prompt caching is enabled automatically for prompts over 1,024 tokens — no code changes needed. The cache is maintained for 5-10 minutes. Cached tokens are charged at 50% of the normal rate.
Step 2: Semantic Cache with Embeddings#
Cache LLM responses and reuse them for semantically similar queries:
# caching/semantic_cache.py
import hashlib
import json
import time
import numpy as np
from typing import Optional
import redis
from openai import OpenAI
openai_client = OpenAI()
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
def get_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]:
"""Get text embedding for semantic similarity."""
response = openai_client.embeddings.create(
model=model,
input=text.strip(),
)
return response.data[0].embedding
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two embedding vectors."""
a_arr = np.array(a)
b_arr = np.array(b)
return float(np.dot(a_arr, b_arr) / (np.linalg.norm(a_arr) * np.linalg.norm(b_arr)))
class SemanticCache:
"""
Cache LLM responses with semantic similarity lookup.
Stores embeddings + responses in Redis.
On cache lookup, finds the most similar stored query.
If similarity > threshold, returns cached response.
"""
def __init__(
self,
redis_client: redis.Redis,
similarity_threshold: float = 0.92,
ttl_seconds: int = 86400, # 24 hours
namespace: str = "agent_semantic_cache",
max_entries: int = 10000,
):
self.redis = redis_client
self.threshold = similarity_threshold
self.ttl = ttl_seconds
self.namespace = namespace
self.max_entries = max_entries
self._index_key = f"{namespace}:index"
def _entry_key(self, entry_id: str) -> str:
return f"{self.namespace}:entry:{entry_id}"
def get(self, query: str) -> Optional[str]:
"""
Find a cached response for a semantically similar query.
Returns the cached response string, or None if no match.
"""
# Get embedding for the query
query_embedding = get_embedding(query)
# Get all stored entries
entry_ids = self.redis.smembers(self._index_key)
if not entry_ids:
return None
best_similarity = 0.0
best_response = None
for entry_id in entry_ids:
entry_data = self.redis.get(self._entry_key(entry_id))
if not entry_data:
continue
entry = json.loads(entry_data)
stored_embedding = entry["embedding"]
similarity = cosine_similarity(query_embedding, stored_embedding)
if similarity > best_similarity:
best_similarity = similarity
if similarity >= self.threshold:
best_response = entry["response"]
if best_response:
print(f"Semantic cache HIT (similarity={best_similarity:.3f})")
return best_response
return None
def set(self, query: str, response: str) -> str:
"""Store a query-response pair in the semantic cache."""
query_embedding = get_embedding(query)
entry_id = hashlib.md5(query.encode()).hexdigest()[:16]
entry = {
"query": query[:500], # Store truncated query for debugging
"response": response,
"embedding": query_embedding,
"created_at": int(time.time()),
}
# Store with TTL
self.redis.setex(
self._entry_key(entry_id),
self.ttl,
json.dumps(entry),
)
self.redis.sadd(self._index_key, entry_id)
self.redis.expire(self._index_key, self.ttl)
# Evict oldest entries if at max capacity
current_count = self.redis.scard(self._index_key)
if current_count > self.max_entries:
# Remove 10% of oldest entries
to_remove = list(entry_ids)[:int(self.max_entries * 0.1)]
for old_id in to_remove:
self.redis.delete(self._entry_key(old_id))
self.redis.srem(self._index_key, old_id)
return entry_id
def invalidate_pattern(self, pattern: str) -> int:
"""Invalidate cache entries matching a query pattern."""
pattern_embedding = get_embedding(pattern)
entry_ids = self.redis.smembers(self._index_key)
removed = 0
for entry_id in entry_ids:
entry_data = self.redis.get(self._entry_key(entry_id))
if not entry_data:
continue
entry = json.loads(entry_data)
similarity = cosine_similarity(pattern_embedding, entry["embedding"])
if similarity >= 0.85: # Lower threshold for invalidation
self.redis.delete(self._entry_key(entry_id))
self.redis.srem(self._index_key, entry_id)
removed += 1
return removed
# Integration with agent
semantic_cache = SemanticCache(redis_client, similarity_threshold=0.92)
async def query_with_semantic_cache(
agent_func,
user_query: str,
agent_type: str,
) -> tuple[str, bool]:
"""
Query an agent with semantic caching.
Returns (response, cache_hit).
"""
# Build cache key including agent type for isolation
cache_query = f"{agent_type}:{user_query}"
# Check cache first
cached = semantic_cache.get(cache_query)
if cached:
return cached, True
# Cache miss — run the agent
response = await agent_func(user_query)
# Store in cache
semantic_cache.set(cache_query, response)
return response, False
Step 3: Tool Result Caching#
Cache expensive tool calls to avoid redundant external API requests:
# caching/tool_cache.py
import asyncio
import hashlib
import json
import time
from typing import Callable, Any, Optional
import redis
import functools
redis_client = redis.Redis(host="localhost", port=6379, db=1, decode_responses=True)
def cached_tool(
ttl_seconds: int = 3600,
namespace: str = "tool_cache",
skip_cache_on_error: bool = True,
):
"""
Decorator that caches tool results in Redis.
Uses the function name + serialized arguments as the cache key.
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def async_wrapper(*args, **kwargs) -> Any:
# Build cache key from function name + arguments
cache_input = {"args": list(args), "kwargs": kwargs}
cache_key = f"{namespace}:{func.__name__}:{_hash_args(cache_input)}"
# Check cache
cached_result = redis_client.get(cache_key)
if cached_result is not None:
data = json.loads(cached_result)
age = int(time.time()) - data["cached_at"]
print(f"[Cache HIT] {func.__name__} (age={age}s)")
return data["result"]
# Cache miss — execute the tool
print(f"[Cache MISS] {func.__name__}")
try:
result = await func(*args, **kwargs)
# Store result
redis_client.setex(
cache_key,
ttl_seconds,
json.dumps({"result": result, "cached_at": int(time.time())}),
)
return result
except Exception as e:
if skip_cache_on_error:
raise # Don't cache errors
raise
@functools.wraps(func)
def sync_wrapper(*args, **kwargs) -> Any:
cache_input = {"args": list(args), "kwargs": kwargs}
cache_key = f"{namespace}:{func.__name__}:{_hash_args(cache_input)}"
cached_result = redis_client.get(cache_key)
if cached_result is not None:
return json.loads(cached_result)["result"]
result = func(*args, **kwargs)
redis_client.setex(
cache_key,
ttl_seconds,
json.dumps({"result": result, "cached_at": int(time.time())}),
)
return result
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
def _hash_args(args: dict) -> str:
"""Create a stable hash from function arguments."""
serialized = json.dumps(args, sort_keys=True, default=str)
return hashlib.sha256(serialized.encode()).hexdigest()[:16]
# Apply to tools
@cached_tool(ttl_seconds=3600, namespace="search_cache")
async def search_web(query: str) -> str:
"""Cached web search — results valid for 1 hour."""
# ... actual search implementation
return f"Search results for: {query}"
@cached_tool(ttl_seconds=300, namespace="db_cache") # 5 min TTL for DB queries
async def query_database(sql: str) -> list[dict]:
"""Cached database query — short TTL for fresher data."""
# ... actual database implementation
return []
@cached_tool(ttl_seconds=86400, namespace="kb_cache") # 24 hours for static KB
async def search_knowledge_base(query: str) -> str:
"""Cached knowledge base search — long TTL for stable content."""
# ... actual KB search
return "Knowledge base result"
Step 4: Embedding Cache#
Cache embeddings to avoid re-computing them:
# caching/embedding_cache.py
import hashlib
import json
from typing import Optional
import redis
from openai import OpenAI
redis_client = redis.Redis(host="localhost", port=6379, db=2, decode_responses=True)
openai_client = OpenAI()
class EmbeddingCache:
"""Cache embeddings indefinitely — they don't change for the same text."""
def __init__(self, model: str = "text-embedding-3-small"):
self.model = model
self._namespace = f"embeddings:{model}"
def _key(self, text: str) -> str:
text_hash = hashlib.sha256(text.encode()).hexdigest()
return f"{self._namespace}:{text_hash}"
def get(self, text: str) -> Optional[list[float]]:
cached = redis_client.get(self._key(text))
if cached:
return json.loads(cached)
return None
def set(self, text: str, embedding: list[float]) -> None:
redis_client.set(self._key(text), json.dumps(embedding))
# No TTL — embeddings are permanent for a given model
def get_or_compute(self, text: str) -> list[float]:
"""Get cached embedding or compute and cache it."""
cached = self.get(text)
if cached:
return cached
response = openai_client.embeddings.create(
model=self.model,
input=text.strip(),
)
embedding = response.data[0].embedding
self.set(text, embedding)
return embedding
def get_batch(self, texts: list[str]) -> list[list[float]]:
"""Get embeddings for multiple texts, using cache where possible."""
results = []
uncached_texts = []
uncached_indices = []
# Check cache for each text
for i, text in enumerate(texts):
cached = self.get(text)
if cached:
results.append(cached)
else:
results.append(None) # Placeholder
uncached_texts.append(text)
uncached_indices.append(i)
# Batch compute uncached embeddings
if uncached_texts:
response = openai_client.embeddings.create(
model=self.model,
input=uncached_texts,
)
for idx, (text, embedding_obj) in enumerate(zip(uncached_texts, response.data)):
embedding = embedding_obj.embedding
original_idx = uncached_indices[idx]
results[original_idx] = embedding
self.set(text, embedding)
return results
Common Issues and Solutions#
Issue: Semantic cache returns wrong answers due to low similarity threshold
Start at 0.95 and lower gradually while monitoring cache quality. Add a "cache hit review" queue in LangFuse where human reviewers validate a sample of semantic cache hits weekly. Any incorrect cached response that slips through should trigger a threshold increase.
Issue: Redis memory fills up with embedding data
Embeddings are large (1536 floats for ada-002 = ~6KB each). For 100K unique queries, that is ~600MB. Use Redis' maxmemory setting with allkeys-lru eviction policy. The LRU policy will evict least-recently-used embeddings first, keeping hot ones in cache.
Issue: Cache invalidation is too aggressive and causes cache misses
Invalidation should be surgical. Only invalidate entries related to the data that changed. Use the invalidate_pattern() method with a high similarity threshold (0.90+) to target only closely related entries. Avoid time-based invalidation for stable content — use event-based invalidation instead (trigger invalidation when the underlying data changes).
Production Considerations#
Cache warming: Pre-populate the semantic cache with common queries before going live. Run your top 100 FAQ queries through the agent and cache their responses. This prevents cold-start performance degradation.
Cost tracking: Track cache hit rate and calculate savings: saved_cost = hit_count * avg_cost_per_llm_call. Report this to stakeholders as a metric. A well-configured semantic cache for FAQ-heavy agents should achieve 40-60% cache hit rate.
Multi-tenant isolation: For SaaS applications, namespace cache keys by tenant ID to prevent cross-tenant data leakage. Use SemanticCache(namespace=f"tenant_{tenant_id}").
Next Steps#
- Add rate limiting to complement caching
- Set up monitoring to track cache hit rates
- Review agentic RAG for document-level caching
- Connect to the LangFuse observability platform
- Build a research agent that caches search results