🤖AI Agents Guide
TutorialsComparisonsReviewsExamplesIntegrationsUse CasesTemplatesGlossary
Get Started
🤖AI Agents Guide

Your comprehensive resource for understanding, building, and implementing AI Agents.

Learn

  • Tutorials
  • Glossary
  • Use Cases
  • Examples

Compare

  • Tool Comparisons
  • Reviews
  • Integrations
  • Templates

Company

  • About
  • Contact
  • Privacy Policy

© 2026 AI Agents Guide. All rights reserved.

Home/Tutorials/Build an A2A-Compliant AI Agent (Python)
intermediate18 min read

Build an A2A-Compliant AI Agent (Python)

A complete guide to building A2A Protocol-compliant agents in Python. Covers implementing Agent Cards, A2A server endpoints, task lifecycle management, OAuth 2.1 authentication, streaming with SSE, and using the official A2A Python SDK.

text
Photo by Markus Spiske on Unsplash
By AI Agents Guide Team•March 1, 2026

Table of Contents

  1. Prerequisites
  2. Setup and Installation
  3. Step 1: Define Your Agent Card
  4. Step 2: Implement the Agent Logic
  5. Step 3: Build the A2A Server
  6. Step 4: Implement OAuth 2.1 Authentication
  7. Step 5: Start the Server
  8. Step 6: Call Your Agent from Another Agent
  9. Deployment Checklist
  10. Next Steps
surface chart
Photo by MARIOLA GROBELSKA on Unsplash

How to Build an A2A-Compliant Agent in Python

The A2A Protocol (Agent-to-Agent) is Google's open standard for enabling AI agents to communicate and collaborate across systems and vendors. Building A2A-compliant agents lets your agents participate in multi-agent pipelines, be discovered by orchestrators, and interoperate with agents built on any framework or platform.

This tutorial walks through building a fully compliant A2A agent from scratch — including implementing an Agent Card, the A2A server endpoints, task lifecycle management, OAuth 2.1 authentication, and Server-Sent Events (SSE) streaming.

Prerequisites#

  • Python 3.11+
  • Basic familiarity with FastAPI or async Python
  • An OpenAI, Anthropic, or Google API key for LLM inference

Setup and Installation#

pip install a2a-sdk fastapi uvicorn python-jose[cryptography] openai

Create the project structure:

my-a2a-agent/
├── agent.py          # Agent business logic
├── server.py         # A2A HTTP server
├── auth.py           # OAuth 2.1 / token validation
├── agent_card.py     # Agent Card definition
└── requirements.txt

Step 1: Define Your Agent Card#

The Agent Card is the discovery document served at /.well-known/agent.json. It describes what your agent can do and how to authenticate.

# agent_card.py
from a2a.types import AgentCard, AgentCapabilities, AgentSkill, Authentication

def build_agent_card(base_url: str) -> AgentCard:
    return AgentCard(
        name="Document Analysis Agent",
        description=(
            "Analyzes documents and text to extract structured information, "
            "generate summaries, identify key entities, and answer questions "
            "about document content."
        ),
        url=f"{base_url}/a2a",
        version="1.0.0",
        capabilities=AgentCapabilities(
            streaming=True,
            pushNotifications=False,
            stateTransitionHistory=True,
        ),
        defaultInputModes=["text/plain", "application/pdf", "text/markdown"],
        defaultOutputModes=["text/plain", "application/json"],
        authentication=Authentication(
            schemes=["oauth2"],
            credentials=f"{base_url}/.well-known/oauth-authorization-server",
        ),
        skills=[
            AgentSkill(
                id="summarize",
                name="Document Summarization",
                description=(
                    "Generates a concise summary of a document, article, or text passage. "
                    "Returns a structured summary with key points, main themes, and conclusion."
                ),
                inputModes=["text/plain", "application/pdf", "text/markdown"],
                outputModes=["text/plain", "application/json"],
                examples=[
                    "Summarize this research paper",
                    "Give me the key points from this document",
                    "What are the main findings in this report?",
                ],
            ),
            AgentSkill(
                id="extract-entities",
                name="Entity Extraction",
                description=(
                    "Extracts named entities (people, organizations, locations, dates, "
                    "monetary values) from text and returns them as structured JSON."
                ),
                inputModes=["text/plain"],
                outputModes=["application/json"],
                examples=[
                    "Extract all company names from this document",
                    "Find all dates and monetary amounts mentioned",
                    "List all people mentioned in this contract",
                ],
            ),
            AgentSkill(
                id="answer-questions",
                name="Document Q&A",
                description=(
                    "Answers specific questions about provided document content using "
                    "the document as the primary knowledge source."
                ),
                inputModes=["text/plain"],
                outputModes=["text/plain"],
                examples=[
                    "What is the payment schedule in this contract?",
                    "Who are the parties to this agreement?",
                ],
            ),
        ],
    )

Step 2: Implement the Agent Logic#

# agent.py
import json
from openai import AsyncOpenAI
from a2a.types import Message, Part, Artifact

client = AsyncOpenAI()

SYSTEM_PROMPT = """You are a document analysis expert. You extract structured information,
generate summaries, and answer questions about document content with precision and clarity.

For summarization requests: Return a JSON object with keys:
- summary: 2-3 sentence executive summary
- key_points: array of 3-7 bullet points
- themes: array of main themes

For entity extraction: Return a JSON object with arrays for:
- people, organizations, locations, dates, monetary_values

For Q&A: Provide a direct, evidence-based answer citing specific sections."""


async def process_task(skill_id: str, message: Message) -> tuple[str, str]:
    """Process a task and return (response_text, output_mime_type)."""

    # Extract text content from message parts
    text_parts = [
        part.text for part in message.parts
        if part.type == "text" and part.text
    ]
    user_content = "\n\n".join(text_parts)

    # Build skill-specific prompt
    if skill_id == "summarize":
        user_prompt = f"Summarize the following document and return JSON:\n\n{user_content}"
        output_type = "application/json"
    elif skill_id == "extract-entities":
        user_prompt = f"Extract all named entities from this text and return JSON:\n\n{user_content}"
        output_type = "application/json"
    else:  # answer-questions
        user_prompt = user_content
        output_type = "text/plain"

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_prompt},
        ],
        temperature=0.1,
    )

    result = response.choices[0].message.content
    return result, output_type


async def process_task_streaming(skill_id: str, message: Message):
    """Generator that yields text chunks for SSE streaming."""
    text_parts = [
        part.text for part in message.parts
        if part.type == "text" and part.text
    ]
    user_content = "\n\n".join(text_parts)

    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_content},
        ],
        stream=True,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            yield delta.content

Step 3: Build the A2A Server#

The A2A server exposes the standard endpoints: POST /tasks/send, GET /tasks/{taskId}, POST /tasks/{taskId}/cancel, and optionally POST /tasks/sendSubscribe for SSE streaming.

# server.py
import uuid
import asyncio
from datetime import datetime
from typing import AsyncGenerator

from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse, StreamingResponse
from a2a.types import (
    Task, TaskState, Message, Artifact, Part,
    TaskSendRequest, TaskGetResponse, TaskStatusUpdateEvent,
)
from a2a.server import A2AServer

from agent_card import build_agent_card
from agent import process_task, process_task_streaming
from auth import verify_token

BASE_URL = "https://doc-analysis-agent.example.com"

app = FastAPI(title="Document Analysis A2A Agent")
a2a = A2AServer(app, agent_card=build_agent_card(BASE_URL))

# In-memory task store (use Redis or a database in production)
task_store: dict[str, Task] = {}


@a2a.task_handler
async def handle_task(request: TaskSendRequest, token=Depends(verify_token)) -> Task:
    """Handle incoming task submissions."""
    task_id = request.id or str(uuid.uuid4())

    # Create task in submitted state
    task = Task(
        id=task_id,
        status=TaskState.submitted,
        createdAt=datetime.utcnow().isoformat(),
        updatedAt=datetime.utcnow().isoformat(),
    )
    task_store[task_id] = task

    # Determine which skill to invoke based on message content
    skill_id = infer_skill(request.message)

    # Process asynchronously
    asyncio.create_task(execute_task(task_id, skill_id, request.message))

    return task


async def execute_task(task_id: str, skill_id: str, message: Message):
    """Execute a task and update its state."""
    task = task_store[task_id]

    try:
        # Transition to working
        task.status = TaskState.working
        task.updatedAt = datetime.utcnow().isoformat()

        # Process with LLM
        result_text, output_mime_type = await process_task(skill_id, message)

        # Create artifact from result
        artifact = Artifact(
            id=str(uuid.uuid4()),
            parts=[Part(type="text", text=result_text, mimeType=output_mime_type)],
        )

        # Transition to completed
        task.status = TaskState.completed
        task.artifacts = [artifact]
        task.updatedAt = datetime.utcnow().isoformat()

    except Exception as e:
        task.status = TaskState.failed
        task.error = {"message": str(e)}
        task.updatedAt = datetime.utcnow().isoformat()


@app.get("/a2a/tasks/{task_id}")
async def get_task(task_id: str, token=Depends(verify_token)) -> TaskGetResponse:
    """Return current task state."""
    task = task_store.get(task_id)
    if not task:
        raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
    return TaskGetResponse(task=task)


@app.post("/a2a/tasks/{task_id}/cancel")
async def cancel_task(task_id: str, token=Depends(verify_token)):
    """Cancel a running task."""
    task = task_store.get(task_id)
    if not task:
        raise HTTPException(status_code=404, detail=f"Task {task_id} not found")

    if task.status in (TaskState.completed, TaskState.failed, TaskState.canceled):
        raise HTTPException(status_code=400, detail="Task is already in a terminal state")

    task.status = TaskState.canceled
    task.updatedAt = datetime.utcnow().isoformat()
    return {"status": "canceled"}


@app.post("/a2a/tasks/sendSubscribe")
async def send_task_subscribe(
    request: TaskSendRequest,
    token=Depends(verify_token)
) -> StreamingResponse:
    """Submit a task and stream results via Server-Sent Events."""
    task_id = request.id or str(uuid.uuid4())
    skill_id = infer_skill(request.message)

    async def event_generator() -> AsyncGenerator[str, None]:
        # Emit submitted event
        yield format_sse_event(TaskStatusUpdateEvent(
            taskId=task_id,
            status=TaskState.submitted,
        ))

        # Emit working event
        yield format_sse_event(TaskStatusUpdateEvent(
            taskId=task_id,
            status=TaskState.working,
        ))

        # Stream LLM response
        full_response = []
        async for chunk in process_task_streaming(skill_id, request.message):
            full_response.append(chunk)
            yield format_sse_event({"type": "chunk", "taskId": task_id, "delta": chunk})

        # Emit completed event with artifact
        artifact = Artifact(
            id=str(uuid.uuid4()),
            parts=[Part(type="text", text="".join(full_response))],
        )
        yield format_sse_event(TaskStatusUpdateEvent(
            taskId=task_id,
            status=TaskState.completed,
            artifacts=[artifact],
        ))

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )


def format_sse_event(data) -> str:
    import json as _json
    if hasattr(data, "model_dump"):
        payload = data.model_dump()
    else:
        payload = data
    return f"data: {_json.dumps(payload)}\n\n"


def infer_skill(message: Message) -> str:
    """Infer which skill to invoke from the message content."""
    text = " ".join(
        p.text.lower() for p in message.parts if p.type == "text" and p.text
    )
    if any(w in text for w in ["summar", "key point", "overview", "brief"]):
        return "summarize"
    elif any(w in text for w in ["extract", "entity", "entities", "name", "mention"]):
        return "extract-entities"
    else:
        return "answer-questions"

Step 4: Implement OAuth 2.1 Authentication#

# auth.py
import os
from fastapi import HTTPException, Header
from jose import jwt, JWTError

JWKS_URI = os.getenv("OAUTH_JWKS_URI")
EXPECTED_ISSUER = os.getenv("OAUTH_ISSUER")
EXPECTED_AUDIENCE = os.getenv("OAUTH_AUDIENCE", "doc-analysis-agent")


async def verify_token(authorization: str = Header(...)) -> dict:
    """Validate Bearer token on every request."""
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid authorization header format")

    token = authorization[7:]  # Strip "Bearer "

    try:
        # Fetch JWKS and validate token
        payload = jwt.decode(
            token,
            key=get_jwks(),  # Implement JWKS fetching with caching
            algorithms=["RS256"],
            audience=EXPECTED_AUDIENCE,
            issuer=EXPECTED_ISSUER,
        )
        return payload
    except JWTError as e:
        raise HTTPException(status_code=401, detail=f"Invalid token: {e}")


def get_jwks():
    """Fetch and cache JWKS for token validation."""
    import requests
    # In production, cache this with a TTL of 1 hour
    response = requests.get(JWKS_URI, timeout=5)
    response.raise_for_status()
    return response.json()

Step 5: Start the Server#

# Run with: uvicorn server:app --host 0.0.0.0 --port 8080

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Your agent card will be automatically served at http://localhost:8080/.well-known/agent.json.

Step 6: Call Your Agent from Another Agent#

# Calling your A2A agent from an orchestrator
from a2a.client import A2AClient, TaskSendRequest, Message, Part
import uuid

async def call_document_agent(document_text: str):
    async with A2AClient() as client:
        # Discover the agent
        card = await client.get_agent_card("https://doc-analysis-agent.example.com")
        print(f"Discovered agent: {card.name}")

        # Submit a task
        task_id = str(uuid.uuid4())
        response = await client.send_task(
            agent_url=card.url,
            request=TaskSendRequest(
                id=task_id,
                message=Message(
                    role="user",
                    parts=[
                        Part(
                            type="text",
                            text=f"Summarize this document:\n\n{document_text}"
                        )
                    ]
                )
            ),
            access_token="your-oauth-token",
        )

        # Poll for completion
        import asyncio
        while response.task.status not in ("completed", "failed", "canceled"):
            await asyncio.sleep(1)
            task_response = await client.get_task(
                agent_url=card.url,
                task_id=task_id,
                access_token="your-oauth-token",
            )
            response = task_response

        if response.task.status == "completed":
            artifact = response.task.artifacts[0]
            return artifact.parts[0].text
        else:
            raise Exception(f"Task failed: {response.task.error}")

Deployment Checklist#

Before deploying to production:

  • HTTPS only — the /.well-known/agent.json endpoint and all A2A endpoints must be served over TLS
  • OAuth 2.1 enforced on all task endpoints
  • Task store backed by a persistent database (not in-memory dict)
  • Rate limiting on the task submission endpoint
  • Request/response logging for all task operations (see agent audit trail)
  • Graceful error handling — tasks should always reach a terminal state
  • Health check endpoint for load balancer integration
  • Agent Card version reflects current capabilities

Next Steps#

  • Learn how to integrate A2A with Google ADK for multi-agent orchestration
  • Read the A2A vs Function Calling comparison to understand when to use each
  • Explore agent security best practices for production A2A deployments
  • Build with MCP servers alongside A2A for tool integration

Related Tutorials

How to Create a Meeting Scheduling AI Agent

Build an autonomous AI agent to handle meeting scheduling, calendar checks, and bookings intelligently. This step-by-step tutorial covers Python implementation with LangChain, Google Calendar integration, and advanced features like conflict resolution for efficient automation.

How to Manage Multiple AI Agents

Master managing multiple AI agents with this in-depth tutorial. Learn orchestration, state sharing, parallel execution, and scaling using LangGraph and custom tools. From basics to production-ready swarms for complex tasks.

How to Train an AI Agent on Your Own Data

Master training AI agents on custom data with three methods: context stuffing, RAG using vector databases, and fine-tuning. This beginner-to-advanced guide includes step-by-step code examples, pitfalls, and best practices to build knowledgeable agents for your specific needs.

← Back to All Tutorials