What You'll Build#
A type-safe customer analytics agent using PydanticAI that:
- Returns fully validated, typed Pydantic output models
- Uses dependency injection for database and HTTP client access
- Defines tools with
RunContext[Deps]for clean dependency management - Handles validation failures with automatic retry
- Has a complete test suite with mock dependencies
Prerequisites#
pip install pydantic-ai pydantic httpx pytest pytest-asyncio python-dotenv
- Python 3.11+ (PydanticAI requires 3.9+ but 3.11+ is strongly recommended)
- OpenAI or Anthropic API key
- Understanding of agentic workflows and tool calling
Overview#
PydanticAI wraps LLM APIs with strong typing at every layer. The core concept is that your agent's output is a Pydantic model — not a string — and PydanticAI handles the prompt engineering needed to get the model to produce valid structured output. If validation fails, it retries automatically.
The dependency injection system is the standout feature: instead of global clients or parameter threading, you define what your agent needs as a typed Deps class and inject it at runtime.
Step 1: Defining Output Models and Dependencies#
Start by defining what your agent produces and what it needs to do its job:
# agents/analytics/models.py
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import date
class CustomerSegment(BaseModel):
"""Represents a customer segment with analytics."""
segment_name: str = Field(description="Human-readable segment name")
customer_count: int = Field(ge=0, description="Number of customers in segment")
avg_revenue: float = Field(ge=0, description="Average monthly revenue per customer in USD")
churn_risk: Literal["low", "medium", "high"] = Field(
description="Churn risk level based on engagement and payment patterns"
)
top_features_used: list[str] = Field(
description="Top 3-5 features used by this segment",
min_length=1,
max_length=5
)
class AnalyticsReport(BaseModel):
"""Complete analytics report returned by the agent."""
report_date: date = Field(description="Date this report was generated")
period: str = Field(description="Time period analyzed, e.g. 'Q1 2026' or 'Last 30 days'")
total_customers: int = Field(ge=0)
total_mrr: float = Field(ge=0, description="Total monthly recurring revenue in USD")
segments: list[CustomerSegment] = Field(
description="Customer segments with individual analytics",
min_length=1
)
key_insights: list[str] = Field(
description="3-5 actionable insights from the data",
min_length=3,
max_length=5
)
recommended_actions: list[str] = Field(
description="Prioritized list of recommended actions",
min_length=1,
max_length=5
)
data_quality_notes: Optional[str] = Field(
default=None,
description="Any data quality issues or limitations to flag"
)
# agents/analytics/deps.py
from dataclasses import dataclass
from typing import Protocol
import httpx
class DatabaseProtocol(Protocol):
"""Protocol defining what database operations the agent needs."""
async def query(self, sql: str, params: dict | None = None) -> list[dict]: ...
async def get_customer_count(self, segment: str | None = None) -> int: ...
async def get_mrr(self, segment: str | None = None) -> float: ...
async def get_feature_usage(self, segment: str | None = None) -> dict[str, int]: ...
@dataclass
class AnalyticsDeps:
"""Dependencies for the analytics agent."""
db: DatabaseProtocol # Database connection
http_client: httpx.AsyncClient # For external API calls
user_timezone: str = "UTC" # User's timezone for date calculations
currency: str = "USD" # Currency for financial figures
Step 2: Building the Agent with Dependency-Injected Tools#
# agents/analytics/agent.py
from pydantic_ai import Agent, RunContext
from datetime import date
import json
from .models import AnalyticsReport, CustomerSegment
from .deps import AnalyticsDeps
# Create the agent with typed output and retry config
analytics_agent = Agent(
model="openai:gpt-4o", # or "anthropic:claude-3-5-sonnet-latest"
result_type=AnalyticsReport, # Typed output — PydanticAI validates this
result_retries=2, # Retry up to 2 times on validation failure
system_prompt="""You are a business analytics specialist. Your job is to analyze
customer data and produce comprehensive analytics reports.
When generating reports:
- Always use the database tools to fetch real data — never fabricate numbers
- Group customers into 2-4 meaningful segments based on plan, usage, and value
- Provide specific, actionable insights with supporting data
- Flag any data quality issues in the data_quality_notes field
- Use today's date for the report_date field
""",
)
@analytics_agent.system_prompt
async def add_context(ctx: RunContext[AnalyticsDeps]) -> str:
"""Dynamic system prompt that adds runtime context."""
return (
f"Timezone for date calculations: {ctx.deps.user_timezone}\n"
f"Currency for financial figures: {ctx.deps.currency}"
)
@analytics_agent.tool
async def get_customer_metrics(
ctx: RunContext[AnalyticsDeps],
segment: str | None = None,
) -> str:
"""Get customer count and MRR metrics from the database.
Args:
segment: Optional segment filter. Options: 'starter', 'pro', 'enterprise'.
If None, returns metrics for all customers.
Returns:
JSON with customer_count and monthly_recurring_revenue.
"""
count = await ctx.deps.db.get_customer_count(segment)
mrr = await ctx.deps.db.get_mrr(segment)
return json.dumps({
"segment": segment or "all",
"customer_count": count,
"monthly_recurring_revenue": mrr,
"avg_revenue_per_customer": mrr / count if count > 0 else 0,
})
@analytics_agent.tool
async def get_feature_usage(
ctx: RunContext[AnalyticsDeps],
segment: str | None = None,
top_n: int = 5,
) -> str:
"""Get feature usage statistics to understand what customers use most.
Args:
segment: Optional segment filter (same options as get_customer_metrics).
top_n: Number of top features to return (1-10).
Returns:
JSON array of features sorted by usage count, descending.
"""
if top_n < 1 or top_n > 10:
return "Error: top_n must be between 1 and 10"
usage = await ctx.deps.db.get_feature_usage(segment)
sorted_features = sorted(usage.items(), key=lambda x: x[1], reverse=True)
top_features = sorted_features[:top_n]
return json.dumps([
{"feature": name, "usage_count": count}
for name, count in top_features
])
@analytics_agent.tool
async def fetch_industry_benchmark(
ctx: RunContext[AnalyticsDeps],
metric: str,
industry: str = "SaaS",
) -> str:
"""Fetch industry benchmark data for comparison.
Args:
metric: The metric to benchmark. Options: churn_rate, nps, cac, ltv_cac_ratio
industry: Industry segment for benchmarks. Default is 'SaaS'.
Returns:
JSON with benchmark value and percentile information.
"""
# Use the injected HTTP client
try:
response = await ctx.deps.http_client.get(
f"https://api.benchmarks.example.com/v1/{industry}/{metric}",
timeout=10.0,
)
response.raise_for_status()
return response.text
except httpx.TimeoutException:
return json.dumps({"error": "Benchmark API timed out", "metric": metric})
except httpx.HTTPStatusError as e:
return json.dumps({"error": f"Benchmark API error: {e.response.status_code}"})
Step 3: Running the Agent#
# main.py
import asyncio
import httpx
from datetime import date
from agents.analytics.agent import analytics_agent
from agents.analytics.deps import AnalyticsDeps
from agents.analytics.models import AnalyticsReport
class MockDatabase:
"""Simple mock database for demonstration."""
async def query(self, sql: str, params=None) -> list[dict]:
return []
async def get_customer_count(self, segment=None) -> int:
counts = {"starter": 1200, "pro": 340, "enterprise": 45, None: 1585}
return counts.get(segment, 0)
async def get_mrr(self, segment=None) -> float:
mrr = {"starter": 34800, "pro": 33660, "enterprise": 22500, None: 90960}
return mrr.get(segment, 0)
async def get_feature_usage(self, segment=None) -> dict[str, int]:
return {
"api_calls": 45000,
"dashboard_views": 12000,
"report_exports": 3400,
"webhook_events": 8900,
"team_invites": 1200,
}
async def generate_analytics_report(period: str) -> AnalyticsReport:
"""Generate an analytics report for the given period."""
async with httpx.AsyncClient() as http_client:
deps = AnalyticsDeps(
db=MockDatabase(),
http_client=http_client,
user_timezone="America/New_York",
currency="USD",
)
result = await analytics_agent.run(
f"Generate a comprehensive analytics report for {period}. "
f"Segment customers by plan tier (starter, pro, enterprise) and "
f"analyze their usage patterns, revenue contribution, and churn risk.",
deps=deps,
)
# result.data is a fully validated AnalyticsReport instance
return result.data
async def main():
report = await generate_analytics_report("Q1 2026")
print(f"Analytics Report: {report.period}")
print(f"Total Customers: {report.total_customers:,}")
print(f"Total MRR: ${report.total_mrr:,.2f}")
print(f"\nSegments ({len(report.segments)}):")
for seg in report.segments:
print(f" {seg.segment_name}: {seg.customer_count} customers, "
f"${seg.avg_revenue:.0f}/mo avg, {seg.churn_risk} churn risk")
print(f"\nKey Insights:")
for insight in report.key_insights:
print(f" - {insight}")
if __name__ == "__main__":
asyncio.run(main())
Step 4: Testing PydanticAI Agents#
PydanticAI's dependency injection makes testing clean — inject mocks at the deps level:
# tests/test_analytics_agent.py
import pytest
from datetime import date, timedelta
from unittest.mock import AsyncMock, MagicMock
import httpx
from pydantic_ai.testing import TestModel
from agents.analytics.agent import analytics_agent
from agents.analytics.deps import AnalyticsDeps
from agents.analytics.models import AnalyticsReport, CustomerSegment
class MockDatabase:
"""Controlled mock database for testing."""
def __init__(self, customers: dict | None = None, mrr: dict | None = None):
self.customers = customers or {
"starter": 100, "pro": 50, "enterprise": 10, None: 160
}
self.mrr_data = mrr or {
"starter": 2900, "pro": 4950, "enterprise": 5000, None: 12850
}
self.features = {
"api_calls": 5000,
"dashboard": 2000,
"exports": 500,
}
async def get_customer_count(self, segment=None) -> int:
return self.customers.get(segment, 0)
async def get_mrr(self, segment=None) -> float:
return self.mrr_data.get(segment, 0)
async def get_feature_usage(self, segment=None) -> dict[str, int]:
return self.features
async def query(self, sql: str, params=None) -> list[dict]:
return []
@pytest.fixture
def mock_deps():
"""Provide mock dependencies for testing."""
return AnalyticsDeps(
db=MockDatabase(),
http_client=AsyncMock(spec=httpx.AsyncClient),
user_timezone="UTC",
currency="USD",
)
class TestAnalyticsAgent:
@pytest.mark.asyncio
async def test_returns_valid_analytics_report(self, mock_deps):
"""Test that the agent returns a properly structured report."""
# Use TestModel to avoid real LLM calls
with analytics_agent.override(model=TestModel()):
result = await analytics_agent.run(
"Generate analytics report for Q1 2026",
deps=mock_deps,
)
assert isinstance(result.data, AnalyticsReport)
assert result.data.total_customers >= 0
assert result.data.total_mrr >= 0
assert len(result.data.segments) >= 1
assert len(result.data.key_insights) >= 3
@pytest.mark.asyncio
async def test_report_date_is_today(self, mock_deps):
"""Report date should be today's date."""
with analytics_agent.override(model=TestModel()):
result = await analytics_agent.run(
"Generate analytics report for Q1 2026",
deps=mock_deps,
)
today = date.today()
# Allow 1 day tolerance for timezone edge cases
assert abs((result.data.report_date - today).days) <= 1
@pytest.mark.asyncio
async def test_tool_uses_injected_database(self, mock_deps):
"""Verify tools use injected deps, not global state."""
custom_db = MockDatabase(
customers={"starter": 9999, None: 9999},
mrr={"starter": 999900.0, None: 999900.0},
)
custom_deps = AnalyticsDeps(
db=custom_db,
http_client=AsyncMock(spec=httpx.AsyncClient),
)
with analytics_agent.override(model=TestModel()):
result = await analytics_agent.run(
"Generate a quick analytics report",
deps=custom_deps,
)
# The agent should use the injected database values
# Total MRR should reflect the custom database
assert result.data.total_mrr == pytest.approx(999900.0, rel=0.1)
@pytest.mark.asyncio
async def test_handles_database_failure_gracefully(self):
"""Agent should handle tool failures without crashing."""
class FailingDatabase:
async def get_customer_count(self, segment=None) -> int:
raise ConnectionError("Database connection lost")
async def get_mrr(self, segment=None) -> float:
raise ConnectionError("Database connection lost")
async def get_feature_usage(self, segment=None) -> dict:
raise ConnectionError("Database connection lost")
async def query(self, *args, **kwargs):
raise ConnectionError("Database connection lost")
failing_deps = AnalyticsDeps(
db=FailingDatabase(),
http_client=AsyncMock(spec=httpx.AsyncClient),
)
with analytics_agent.override(model=TestModel()):
result = await analytics_agent.run(
"Generate analytics report",
deps=failing_deps,
)
# Should still return a report — agent adapts to tool failures
assert isinstance(result.data, AnalyticsReport)
# Data quality notes should mention the issue
assert result.data.data_quality_notes is not None
@pytest.mark.asyncio
async def test_segment_churn_risk_is_valid(self, mock_deps):
"""All segment churn risk values must be valid enum values."""
with analytics_agent.override(model=TestModel()):
result = await analytics_agent.run(
"Analyze customer segments with churn risk",
deps=mock_deps,
)
valid_risks = {"low", "medium", "high"}
for segment in result.data.segments:
assert segment.churn_risk in valid_risks, (
f"Invalid churn_risk '{segment.churn_risk}' for segment '{segment.segment_name}'"
)
@pytest.mark.asyncio
async def test_integration_with_real_model(self, mock_deps):
"""Integration test with real LLM (requires API key, skip in unit test CI)."""
pytest.importorskip("openai") # Skip if openai not configured
result = await analytics_agent.run(
"Generate a brief analytics report for Q1 2026 with 2 segments",
deps=mock_deps,
)
report = result.data
assert isinstance(report, AnalyticsReport)
assert 1 <= len(report.segments) <= 4
assert all(isinstance(s, CustomerSegment) for s in report.segments)
Step 5: Production Patterns#
Multi-model agent with fallback:
from pydantic_ai import Agent
from pydantic_ai.models import OpenAIModel, AnthropicModel
import os
def create_agent_with_fallback(primary_model: str, fallback_model: str) -> Agent:
"""Create an agent that falls back to a cheaper model on quota errors."""
primary = (
OpenAIModel(primary_model) if "gpt" in primary_model
else AnthropicModel(primary_model)
)
agent = Agent(
model=primary,
result_type=AnalyticsReport,
result_retries=2,
system_prompt="You are a business analytics specialist.",
)
return agent
# Usage with per-request model override
async def run_with_fallback(deps: AnalyticsDeps, prompt: str) -> AnalyticsReport:
agent = create_agent_with_fallback("gpt-4o", "gpt-4o-mini")
try:
result = await agent.run(prompt, deps=deps)
return result.data
except Exception as e:
if "quota" in str(e).lower() or "rate_limit" in str(e).lower():
# Fall back to cheaper model
result = await agent.run(prompt, deps=deps, model="openai:gpt-4o-mini")
return result.data
raise
Streaming structured output:
async def stream_analytics_report(deps: AnalyticsDeps, prompt: str):
"""Stream the analytics report as it's being generated."""
async with analytics_agent.run_stream(prompt, deps=deps) as stream:
async for delta in stream.stream_text():
# Stream raw text delta to client
yield delta
# Get the final validated result
report = await stream.get_data()
return report
Common Issues and Solutions#
Issue: Validation fails repeatedly even after retries
Add more detailed field descriptions to your Pydantic models. The model uses Field(description=...) to understand what each field expects. For numeric fields, add example values: Field(description="MRR in USD, e.g. 12500.00").
Issue: RunContext not available in async tools
Make tool functions async def and use await for all deps calls. PydanticAI supports both sync and async tools, but async is required when your deps methods are async.
Issue: Tool results are being ignored by the agent
Check that your tool return type is str. PydanticAI tool functions must return strings — not dicts or Pydantic models. Use json.dumps() to serialize complex data.
Production Considerations#
Model cost: Use gpt-4o-mini or claude-3-haiku for agents with simple structured output requirements. Reserve gpt-4o and claude-3-5-sonnet for complex analysis tasks. PydanticAI's model abstraction makes switching trivial.
Observability: Enable Logfire integration (PydanticAI's native observability) with import logfire; logfire.configure(). It captures all tool calls, model interactions, and validation attempts automatically.
Next Steps#
- Add rate limiting to your PydanticAI agents
- Set up monitoring in production
- Build a complete research agent using these patterns
- Learn agent error handling for robust systems
- Review tool calling concepts for tool design best practices