🤖AI Agents Guide
TutorialsComparisonsReviewsExamplesIntegrationsUse CasesTemplatesGlossary
Get Started
🤖AI Agents Guide

Your comprehensive resource for understanding, building, and implementing AI Agents.

Learn

  • Tutorials
  • Glossary
  • Use Cases
  • Examples

Compare

  • Tool Comparisons
  • Reviews
  • Integrations
  • Templates

Company

  • About
  • Contact
  • Privacy Policy

© 2026 AI Agents Guide. All rights reserved.

Home/Tutorials/Build Validated AI Agents with PydanticAI
advanced17 min read

Build Validated AI Agents with PydanticAI

Build type-safe AI agents with PydanticAI — structured output validation with Pydantic models, dependency injection using RunContext, typed tool definitions, testing PydanticAI agents, and production patterns for reliable agent systems.

Abstract AI visualization representing type-safe agent systems
Photo by Growtika on Unsplash
By AI Agents Guide Team•March 1, 2026

Table of Contents

  1. What You'll Build
  2. Prerequisites
  3. Overview
  4. Step 1: Defining Output Models and Dependencies
  5. Step 2: Building the Agent with Dependency-Injected Tools
  6. Step 3: Running the Agent
  7. Step 4: Testing PydanticAI Agents
  8. Step 5: Production Patterns
  9. Common Issues and Solutions
  10. Production Considerations
  11. Next Steps
Code editor showing Python type-safe agent components
Photo by Lautaro Andreani on Unsplash

What You'll Build#

A type-safe customer analytics agent using PydanticAI that:

  • Returns fully validated, typed Pydantic output models
  • Uses dependency injection for database and HTTP client access
  • Defines tools with RunContext[Deps] for clean dependency management
  • Handles validation failures with automatic retry
  • Has a complete test suite with mock dependencies

Prerequisites#

pip install pydantic-ai pydantic httpx pytest pytest-asyncio python-dotenv
  • Python 3.11+ (PydanticAI requires 3.9+ but 3.11+ is strongly recommended)
  • OpenAI or Anthropic API key
  • Understanding of agentic workflows and tool calling

Overview#

PydanticAI wraps LLM APIs with strong typing at every layer. The core concept is that your agent's output is a Pydantic model — not a string — and PydanticAI handles the prompt engineering needed to get the model to produce valid structured output. If validation fails, it retries automatically.

The dependency injection system is the standout feature: instead of global clients or parameter threading, you define what your agent needs as a typed Deps class and inject it at runtime.

Step 1: Defining Output Models and Dependencies#

Start by defining what your agent produces and what it needs to do its job:

# agents/analytics/models.py
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import date


class CustomerSegment(BaseModel):
    """Represents a customer segment with analytics."""
    segment_name: str = Field(description="Human-readable segment name")
    customer_count: int = Field(ge=0, description="Number of customers in segment")
    avg_revenue: float = Field(ge=0, description="Average monthly revenue per customer in USD")
    churn_risk: Literal["low", "medium", "high"] = Field(
        description="Churn risk level based on engagement and payment patterns"
    )
    top_features_used: list[str] = Field(
        description="Top 3-5 features used by this segment",
        min_length=1,
        max_length=5
    )


class AnalyticsReport(BaseModel):
    """Complete analytics report returned by the agent."""
    report_date: date = Field(description="Date this report was generated")
    period: str = Field(description="Time period analyzed, e.g. 'Q1 2026' or 'Last 30 days'")
    total_customers: int = Field(ge=0)
    total_mrr: float = Field(ge=0, description="Total monthly recurring revenue in USD")
    segments: list[CustomerSegment] = Field(
        description="Customer segments with individual analytics",
        min_length=1
    )
    key_insights: list[str] = Field(
        description="3-5 actionable insights from the data",
        min_length=3,
        max_length=5
    )
    recommended_actions: list[str] = Field(
        description="Prioritized list of recommended actions",
        min_length=1,
        max_length=5
    )
    data_quality_notes: Optional[str] = Field(
        default=None,
        description="Any data quality issues or limitations to flag"
    )
# agents/analytics/deps.py
from dataclasses import dataclass
from typing import Protocol
import httpx


class DatabaseProtocol(Protocol):
    """Protocol defining what database operations the agent needs."""
    async def query(self, sql: str, params: dict | None = None) -> list[dict]: ...
    async def get_customer_count(self, segment: str | None = None) -> int: ...
    async def get_mrr(self, segment: str | None = None) -> float: ...
    async def get_feature_usage(self, segment: str | None = None) -> dict[str, int]: ...


@dataclass
class AnalyticsDeps:
    """Dependencies for the analytics agent."""
    db: DatabaseProtocol           # Database connection
    http_client: httpx.AsyncClient  # For external API calls
    user_timezone: str = "UTC"     # User's timezone for date calculations
    currency: str = "USD"          # Currency for financial figures

Step 2: Building the Agent with Dependency-Injected Tools#

# agents/analytics/agent.py
from pydantic_ai import Agent, RunContext
from datetime import date
import json
from .models import AnalyticsReport, CustomerSegment
from .deps import AnalyticsDeps


# Create the agent with typed output and retry config
analytics_agent = Agent(
    model="openai:gpt-4o",         # or "anthropic:claude-3-5-sonnet-latest"
    result_type=AnalyticsReport,   # Typed output — PydanticAI validates this
    result_retries=2,              # Retry up to 2 times on validation failure
    system_prompt="""You are a business analytics specialist. Your job is to analyze
    customer data and produce comprehensive analytics reports.

    When generating reports:
    - Always use the database tools to fetch real data — never fabricate numbers
    - Group customers into 2-4 meaningful segments based on plan, usage, and value
    - Provide specific, actionable insights with supporting data
    - Flag any data quality issues in the data_quality_notes field
    - Use today's date for the report_date field
    """,
)


@analytics_agent.system_prompt
async def add_context(ctx: RunContext[AnalyticsDeps]) -> str:
    """Dynamic system prompt that adds runtime context."""
    return (
        f"Timezone for date calculations: {ctx.deps.user_timezone}\n"
        f"Currency for financial figures: {ctx.deps.currency}"
    )


@analytics_agent.tool
async def get_customer_metrics(
    ctx: RunContext[AnalyticsDeps],
    segment: str | None = None,
) -> str:
    """Get customer count and MRR metrics from the database.

    Args:
        segment: Optional segment filter. Options: 'starter', 'pro', 'enterprise'.
                 If None, returns metrics for all customers.

    Returns:
        JSON with customer_count and monthly_recurring_revenue.
    """
    count = await ctx.deps.db.get_customer_count(segment)
    mrr = await ctx.deps.db.get_mrr(segment)

    return json.dumps({
        "segment": segment or "all",
        "customer_count": count,
        "monthly_recurring_revenue": mrr,
        "avg_revenue_per_customer": mrr / count if count > 0 else 0,
    })


@analytics_agent.tool
async def get_feature_usage(
    ctx: RunContext[AnalyticsDeps],
    segment: str | None = None,
    top_n: int = 5,
) -> str:
    """Get feature usage statistics to understand what customers use most.

    Args:
        segment: Optional segment filter (same options as get_customer_metrics).
        top_n: Number of top features to return (1-10).

    Returns:
        JSON array of features sorted by usage count, descending.
    """
    if top_n < 1 or top_n > 10:
        return "Error: top_n must be between 1 and 10"

    usage = await ctx.deps.db.get_feature_usage(segment)
    sorted_features = sorted(usage.items(), key=lambda x: x[1], reverse=True)
    top_features = sorted_features[:top_n]

    return json.dumps([
        {"feature": name, "usage_count": count}
        for name, count in top_features
    ])


@analytics_agent.tool
async def fetch_industry_benchmark(
    ctx: RunContext[AnalyticsDeps],
    metric: str,
    industry: str = "SaaS",
) -> str:
    """Fetch industry benchmark data for comparison.

    Args:
        metric: The metric to benchmark. Options: churn_rate, nps, cac, ltv_cac_ratio
        industry: Industry segment for benchmarks. Default is 'SaaS'.

    Returns:
        JSON with benchmark value and percentile information.
    """
    # Use the injected HTTP client
    try:
        response = await ctx.deps.http_client.get(
            f"https://api.benchmarks.example.com/v1/{industry}/{metric}",
            timeout=10.0,
        )
        response.raise_for_status()
        return response.text
    except httpx.TimeoutException:
        return json.dumps({"error": "Benchmark API timed out", "metric": metric})
    except httpx.HTTPStatusError as e:
        return json.dumps({"error": f"Benchmark API error: {e.response.status_code}"})

Step 3: Running the Agent#

# main.py
import asyncio
import httpx
from datetime import date
from agents.analytics.agent import analytics_agent
from agents.analytics.deps import AnalyticsDeps
from agents.analytics.models import AnalyticsReport


class MockDatabase:
    """Simple mock database for demonstration."""

    async def query(self, sql: str, params=None) -> list[dict]:
        return []

    async def get_customer_count(self, segment=None) -> int:
        counts = {"starter": 1200, "pro": 340, "enterprise": 45, None: 1585}
        return counts.get(segment, 0)

    async def get_mrr(self, segment=None) -> float:
        mrr = {"starter": 34800, "pro": 33660, "enterprise": 22500, None: 90960}
        return mrr.get(segment, 0)

    async def get_feature_usage(self, segment=None) -> dict[str, int]:
        return {
            "api_calls": 45000,
            "dashboard_views": 12000,
            "report_exports": 3400,
            "webhook_events": 8900,
            "team_invites": 1200,
        }


async def generate_analytics_report(period: str) -> AnalyticsReport:
    """Generate an analytics report for the given period."""
    async with httpx.AsyncClient() as http_client:
        deps = AnalyticsDeps(
            db=MockDatabase(),
            http_client=http_client,
            user_timezone="America/New_York",
            currency="USD",
        )

        result = await analytics_agent.run(
            f"Generate a comprehensive analytics report for {period}. "
            f"Segment customers by plan tier (starter, pro, enterprise) and "
            f"analyze their usage patterns, revenue contribution, and churn risk.",
            deps=deps,
        )

        # result.data is a fully validated AnalyticsReport instance
        return result.data


async def main():
    report = await generate_analytics_report("Q1 2026")

    print(f"Analytics Report: {report.period}")
    print(f"Total Customers: {report.total_customers:,}")
    print(f"Total MRR: ${report.total_mrr:,.2f}")
    print(f"\nSegments ({len(report.segments)}):")
    for seg in report.segments:
        print(f"  {seg.segment_name}: {seg.customer_count} customers, "
              f"${seg.avg_revenue:.0f}/mo avg, {seg.churn_risk} churn risk")
    print(f"\nKey Insights:")
    for insight in report.key_insights:
        print(f"  - {insight}")


if __name__ == "__main__":
    asyncio.run(main())

Step 4: Testing PydanticAI Agents#

PydanticAI's dependency injection makes testing clean — inject mocks at the deps level:

# tests/test_analytics_agent.py
import pytest
from datetime import date, timedelta
from unittest.mock import AsyncMock, MagicMock
import httpx
from pydantic_ai.testing import TestModel

from agents.analytics.agent import analytics_agent
from agents.analytics.deps import AnalyticsDeps
from agents.analytics.models import AnalyticsReport, CustomerSegment


class MockDatabase:
    """Controlled mock database for testing."""

    def __init__(self, customers: dict | None = None, mrr: dict | None = None):
        self.customers = customers or {
            "starter": 100, "pro": 50, "enterprise": 10, None: 160
        }
        self.mrr_data = mrr or {
            "starter": 2900, "pro": 4950, "enterprise": 5000, None: 12850
        }
        self.features = {
            "api_calls": 5000,
            "dashboard": 2000,
            "exports": 500,
        }

    async def get_customer_count(self, segment=None) -> int:
        return self.customers.get(segment, 0)

    async def get_mrr(self, segment=None) -> float:
        return self.mrr_data.get(segment, 0)

    async def get_feature_usage(self, segment=None) -> dict[str, int]:
        return self.features

    async def query(self, sql: str, params=None) -> list[dict]:
        return []


@pytest.fixture
def mock_deps():
    """Provide mock dependencies for testing."""
    return AnalyticsDeps(
        db=MockDatabase(),
        http_client=AsyncMock(spec=httpx.AsyncClient),
        user_timezone="UTC",
        currency="USD",
    )


class TestAnalyticsAgent:

    @pytest.mark.asyncio
    async def test_returns_valid_analytics_report(self, mock_deps):
        """Test that the agent returns a properly structured report."""
        # Use TestModel to avoid real LLM calls
        with analytics_agent.override(model=TestModel()):
            result = await analytics_agent.run(
                "Generate analytics report for Q1 2026",
                deps=mock_deps,
            )

        assert isinstance(result.data, AnalyticsReport)
        assert result.data.total_customers >= 0
        assert result.data.total_mrr >= 0
        assert len(result.data.segments) >= 1
        assert len(result.data.key_insights) >= 3

    @pytest.mark.asyncio
    async def test_report_date_is_today(self, mock_deps):
        """Report date should be today's date."""
        with analytics_agent.override(model=TestModel()):
            result = await analytics_agent.run(
                "Generate analytics report for Q1 2026",
                deps=mock_deps,
            )

        today = date.today()
        # Allow 1 day tolerance for timezone edge cases
        assert abs((result.data.report_date - today).days) <= 1

    @pytest.mark.asyncio
    async def test_tool_uses_injected_database(self, mock_deps):
        """Verify tools use injected deps, not global state."""
        custom_db = MockDatabase(
            customers={"starter": 9999, None: 9999},
            mrr={"starter": 999900.0, None: 999900.0},
        )
        custom_deps = AnalyticsDeps(
            db=custom_db,
            http_client=AsyncMock(spec=httpx.AsyncClient),
        )

        with analytics_agent.override(model=TestModel()):
            result = await analytics_agent.run(
                "Generate a quick analytics report",
                deps=custom_deps,
            )

        # The agent should use the injected database values
        # Total MRR should reflect the custom database
        assert result.data.total_mrr == pytest.approx(999900.0, rel=0.1)

    @pytest.mark.asyncio
    async def test_handles_database_failure_gracefully(self):
        """Agent should handle tool failures without crashing."""
        class FailingDatabase:
            async def get_customer_count(self, segment=None) -> int:
                raise ConnectionError("Database connection lost")
            async def get_mrr(self, segment=None) -> float:
                raise ConnectionError("Database connection lost")
            async def get_feature_usage(self, segment=None) -> dict:
                raise ConnectionError("Database connection lost")
            async def query(self, *args, **kwargs):
                raise ConnectionError("Database connection lost")

        failing_deps = AnalyticsDeps(
            db=FailingDatabase(),
            http_client=AsyncMock(spec=httpx.AsyncClient),
        )

        with analytics_agent.override(model=TestModel()):
            result = await analytics_agent.run(
                "Generate analytics report",
                deps=failing_deps,
            )

        # Should still return a report — agent adapts to tool failures
        assert isinstance(result.data, AnalyticsReport)
        # Data quality notes should mention the issue
        assert result.data.data_quality_notes is not None

    @pytest.mark.asyncio
    async def test_segment_churn_risk_is_valid(self, mock_deps):
        """All segment churn risk values must be valid enum values."""
        with analytics_agent.override(model=TestModel()):
            result = await analytics_agent.run(
                "Analyze customer segments with churn risk",
                deps=mock_deps,
            )

        valid_risks = {"low", "medium", "high"}
        for segment in result.data.segments:
            assert segment.churn_risk in valid_risks, (
                f"Invalid churn_risk '{segment.churn_risk}' for segment '{segment.segment_name}'"
            )

    @pytest.mark.asyncio
    async def test_integration_with_real_model(self, mock_deps):
        """Integration test with real LLM (requires API key, skip in unit test CI)."""
        pytest.importorskip("openai")  # Skip if openai not configured

        result = await analytics_agent.run(
            "Generate a brief analytics report for Q1 2026 with 2 segments",
            deps=mock_deps,
        )

        report = result.data
        assert isinstance(report, AnalyticsReport)
        assert 1 <= len(report.segments) <= 4
        assert all(isinstance(s, CustomerSegment) for s in report.segments)

Step 5: Production Patterns#

Multi-model agent with fallback:

from pydantic_ai import Agent
from pydantic_ai.models import OpenAIModel, AnthropicModel
import os


def create_agent_with_fallback(primary_model: str, fallback_model: str) -> Agent:
    """Create an agent that falls back to a cheaper model on quota errors."""

    primary = (
        OpenAIModel(primary_model) if "gpt" in primary_model
        else AnthropicModel(primary_model)
    )

    agent = Agent(
        model=primary,
        result_type=AnalyticsReport,
        result_retries=2,
        system_prompt="You are a business analytics specialist.",
    )
    return agent


# Usage with per-request model override
async def run_with_fallback(deps: AnalyticsDeps, prompt: str) -> AnalyticsReport:
    agent = create_agent_with_fallback("gpt-4o", "gpt-4o-mini")

    try:
        result = await agent.run(prompt, deps=deps)
        return result.data
    except Exception as e:
        if "quota" in str(e).lower() or "rate_limit" in str(e).lower():
            # Fall back to cheaper model
            result = await agent.run(prompt, deps=deps, model="openai:gpt-4o-mini")
            return result.data
        raise

Streaming structured output:

async def stream_analytics_report(deps: AnalyticsDeps, prompt: str):
    """Stream the analytics report as it's being generated."""
    async with analytics_agent.run_stream(prompt, deps=deps) as stream:
        async for delta in stream.stream_text():
            # Stream raw text delta to client
            yield delta

        # Get the final validated result
        report = await stream.get_data()
        return report

Common Issues and Solutions#

Issue: Validation fails repeatedly even after retries

Add more detailed field descriptions to your Pydantic models. The model uses Field(description=...) to understand what each field expects. For numeric fields, add example values: Field(description="MRR in USD, e.g. 12500.00").

Issue: RunContext not available in async tools

Make tool functions async def and use await for all deps calls. PydanticAI supports both sync and async tools, but async is required when your deps methods are async.

Issue: Tool results are being ignored by the agent

Check that your tool return type is str. PydanticAI tool functions must return strings — not dicts or Pydantic models. Use json.dumps() to serialize complex data.

Production Considerations#

Model cost: Use gpt-4o-mini or claude-3-haiku for agents with simple structured output requirements. Reserve gpt-4o and claude-3-5-sonnet for complex analysis tasks. PydanticAI's model abstraction makes switching trivial.

Observability: Enable Logfire integration (PydanticAI's native observability) with import logfire; logfire.configure(). It captures all tool calls, model interactions, and validation attempts automatically.

Next Steps#

  • Add rate limiting to your PydanticAI agents
  • Set up monitoring in production
  • Build a complete research agent using these patterns
  • Learn agent error handling for robust systems
  • Review tool calling concepts for tool design best practices

Related Tutorials

How to Create a Meeting Scheduling AI Agent

Build an autonomous AI agent to handle meeting scheduling, calendar checks, and bookings intelligently. This step-by-step tutorial covers Python implementation with LangChain, Google Calendar integration, and advanced features like conflict resolution for efficient automation.

How to Manage Multiple AI Agents

Master managing multiple AI agents with this in-depth tutorial. Learn orchestration, state sharing, parallel execution, and scaling using LangGraph and custom tools. From basics to production-ready swarms for complex tasks.

How to Train an AI Agent on Your Own Data

Master training AI agents on custom data with three methods: context stuffing, RAG using vector databases, and fine-tuning. This beginner-to-advanced guide includes step-by-step code examples, pitfalls, and best practices to build knowledgeable agents for your specific needs.

← Back to All Tutorials