How to Build an A2A-Compliant Agent in Python
The A2A Protocol (Agent-to-Agent) is Google's open standard for enabling AI agents to communicate and collaborate across systems and vendors. Building A2A-compliant agents lets your agents participate in multi-agent pipelines, be discovered by orchestrators, and interoperate with agents built on any framework or platform.
This tutorial walks through building a fully compliant A2A agent from scratch — including implementing an Agent Card, the A2A server endpoints, task lifecycle management, OAuth 2.1 authentication, and Server-Sent Events (SSE) streaming.
Prerequisites#
- Python 3.11+
- Basic familiarity with FastAPI or async Python
- An OpenAI, Anthropic, or Google API key for LLM inference
Setup and Installation#
pip install a2a-sdk fastapi uvicorn python-jose[cryptography] openai
Create the project structure:
my-a2a-agent/
├── agent.py # Agent business logic
├── server.py # A2A HTTP server
├── auth.py # OAuth 2.1 / token validation
├── agent_card.py # Agent Card definition
└── requirements.txt
Step 1: Define Your Agent Card#
The Agent Card is the discovery document served at /.well-known/agent.json. It describes what your agent can do and how to authenticate.
# agent_card.py
from a2a.types import AgentCard, AgentCapabilities, AgentSkill, Authentication
def build_agent_card(base_url: str) -> AgentCard:
return AgentCard(
name="Document Analysis Agent",
description=(
"Analyzes documents and text to extract structured information, "
"generate summaries, identify key entities, and answer questions "
"about document content."
),
url=f"{base_url}/a2a",
version="1.0.0",
capabilities=AgentCapabilities(
streaming=True,
pushNotifications=False,
stateTransitionHistory=True,
),
defaultInputModes=["text/plain", "application/pdf", "text/markdown"],
defaultOutputModes=["text/plain", "application/json"],
authentication=Authentication(
schemes=["oauth2"],
credentials=f"{base_url}/.well-known/oauth-authorization-server",
),
skills=[
AgentSkill(
id="summarize",
name="Document Summarization",
description=(
"Generates a concise summary of a document, article, or text passage. "
"Returns a structured summary with key points, main themes, and conclusion."
),
inputModes=["text/plain", "application/pdf", "text/markdown"],
outputModes=["text/plain", "application/json"],
examples=[
"Summarize this research paper",
"Give me the key points from this document",
"What are the main findings in this report?",
],
),
AgentSkill(
id="extract-entities",
name="Entity Extraction",
description=(
"Extracts named entities (people, organizations, locations, dates, "
"monetary values) from text and returns them as structured JSON."
),
inputModes=["text/plain"],
outputModes=["application/json"],
examples=[
"Extract all company names from this document",
"Find all dates and monetary amounts mentioned",
"List all people mentioned in this contract",
],
),
AgentSkill(
id="answer-questions",
name="Document Q&A",
description=(
"Answers specific questions about provided document content using "
"the document as the primary knowledge source."
),
inputModes=["text/plain"],
outputModes=["text/plain"],
examples=[
"What is the payment schedule in this contract?",
"Who are the parties to this agreement?",
],
),
],
)
Step 2: Implement the Agent Logic#
# agent.py
import json
from openai import AsyncOpenAI
from a2a.types import Message, Part, Artifact
client = AsyncOpenAI()
SYSTEM_PROMPT = """You are a document analysis expert. You extract structured information,
generate summaries, and answer questions about document content with precision and clarity.
For summarization requests: Return a JSON object with keys:
- summary: 2-3 sentence executive summary
- key_points: array of 3-7 bullet points
- themes: array of main themes
For entity extraction: Return a JSON object with arrays for:
- people, organizations, locations, dates, monetary_values
For Q&A: Provide a direct, evidence-based answer citing specific sections."""
async def process_task(skill_id: str, message: Message) -> tuple[str, str]:
"""Process a task and return (response_text, output_mime_type)."""
# Extract text content from message parts
text_parts = [
part.text for part in message.parts
if part.type == "text" and part.text
]
user_content = "\n\n".join(text_parts)
# Build skill-specific prompt
if skill_id == "summarize":
user_prompt = f"Summarize the following document and return JSON:\n\n{user_content}"
output_type = "application/json"
elif skill_id == "extract-entities":
user_prompt = f"Extract all named entities from this text and return JSON:\n\n{user_content}"
output_type = "application/json"
else: # answer-questions
user_prompt = user_content
output_type = "text/plain"
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=0.1,
)
result = response.choices[0].message.content
return result, output_type
async def process_task_streaming(skill_id: str, message: Message):
"""Generator that yields text chunks for SSE streaming."""
text_parts = [
part.text for part in message.parts
if part.type == "text" and part.text
]
user_content = "\n\n".join(text_parts)
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
Step 3: Build the A2A Server#
The A2A server exposes the standard endpoints: POST /tasks/send, GET /tasks/{taskId}, POST /tasks/{taskId}/cancel, and optionally POST /tasks/sendSubscribe for SSE streaming.
# server.py
import uuid
import asyncio
from datetime import datetime
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse, StreamingResponse
from a2a.types import (
Task, TaskState, Message, Artifact, Part,
TaskSendRequest, TaskGetResponse, TaskStatusUpdateEvent,
)
from a2a.server import A2AServer
from agent_card import build_agent_card
from agent import process_task, process_task_streaming
from auth import verify_token
BASE_URL = "https://doc-analysis-agent.example.com"
app = FastAPI(title="Document Analysis A2A Agent")
a2a = A2AServer(app, agent_card=build_agent_card(BASE_URL))
# In-memory task store (use Redis or a database in production)
task_store: dict[str, Task] = {}
@a2a.task_handler
async def handle_task(request: TaskSendRequest, token=Depends(verify_token)) -> Task:
"""Handle incoming task submissions."""
task_id = request.id or str(uuid.uuid4())
# Create task in submitted state
task = Task(
id=task_id,
status=TaskState.submitted,
createdAt=datetime.utcnow().isoformat(),
updatedAt=datetime.utcnow().isoformat(),
)
task_store[task_id] = task
# Determine which skill to invoke based on message content
skill_id = infer_skill(request.message)
# Process asynchronously
asyncio.create_task(execute_task(task_id, skill_id, request.message))
return task
async def execute_task(task_id: str, skill_id: str, message: Message):
"""Execute a task and update its state."""
task = task_store[task_id]
try:
# Transition to working
task.status = TaskState.working
task.updatedAt = datetime.utcnow().isoformat()
# Process with LLM
result_text, output_mime_type = await process_task(skill_id, message)
# Create artifact from result
artifact = Artifact(
id=str(uuid.uuid4()),
parts=[Part(type="text", text=result_text, mimeType=output_mime_type)],
)
# Transition to completed
task.status = TaskState.completed
task.artifacts = [artifact]
task.updatedAt = datetime.utcnow().isoformat()
except Exception as e:
task.status = TaskState.failed
task.error = {"message": str(e)}
task.updatedAt = datetime.utcnow().isoformat()
@app.get("/a2a/tasks/{task_id}")
async def get_task(task_id: str, token=Depends(verify_token)) -> TaskGetResponse:
"""Return current task state."""
task = task_store.get(task_id)
if not task:
raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
return TaskGetResponse(task=task)
@app.post("/a2a/tasks/{task_id}/cancel")
async def cancel_task(task_id: str, token=Depends(verify_token)):
"""Cancel a running task."""
task = task_store.get(task_id)
if not task:
raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
if task.status in (TaskState.completed, TaskState.failed, TaskState.canceled):
raise HTTPException(status_code=400, detail="Task is already in a terminal state")
task.status = TaskState.canceled
task.updatedAt = datetime.utcnow().isoformat()
return {"status": "canceled"}
@app.post("/a2a/tasks/sendSubscribe")
async def send_task_subscribe(
request: TaskSendRequest,
token=Depends(verify_token)
) -> StreamingResponse:
"""Submit a task and stream results via Server-Sent Events."""
task_id = request.id or str(uuid.uuid4())
skill_id = infer_skill(request.message)
async def event_generator() -> AsyncGenerator[str, None]:
# Emit submitted event
yield format_sse_event(TaskStatusUpdateEvent(
taskId=task_id,
status=TaskState.submitted,
))
# Emit working event
yield format_sse_event(TaskStatusUpdateEvent(
taskId=task_id,
status=TaskState.working,
))
# Stream LLM response
full_response = []
async for chunk in process_task_streaming(skill_id, request.message):
full_response.append(chunk)
yield format_sse_event({"type": "chunk", "taskId": task_id, "delta": chunk})
# Emit completed event with artifact
artifact = Artifact(
id=str(uuid.uuid4()),
parts=[Part(type="text", text="".join(full_response))],
)
yield format_sse_event(TaskStatusUpdateEvent(
taskId=task_id,
status=TaskState.completed,
artifacts=[artifact],
))
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
def format_sse_event(data) -> str:
import json as _json
if hasattr(data, "model_dump"):
payload = data.model_dump()
else:
payload = data
return f"data: {_json.dumps(payload)}\n\n"
def infer_skill(message: Message) -> str:
"""Infer which skill to invoke from the message content."""
text = " ".join(
p.text.lower() for p in message.parts if p.type == "text" and p.text
)
if any(w in text for w in ["summar", "key point", "overview", "brief"]):
return "summarize"
elif any(w in text for w in ["extract", "entity", "entities", "name", "mention"]):
return "extract-entities"
else:
return "answer-questions"
Step 4: Implement OAuth 2.1 Authentication#
# auth.py
import os
from fastapi import HTTPException, Header
from jose import jwt, JWTError
JWKS_URI = os.getenv("OAUTH_JWKS_URI")
EXPECTED_ISSUER = os.getenv("OAUTH_ISSUER")
EXPECTED_AUDIENCE = os.getenv("OAUTH_AUDIENCE", "doc-analysis-agent")
async def verify_token(authorization: str = Header(...)) -> dict:
"""Validate Bearer token on every request."""
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization header format")
token = authorization[7:] # Strip "Bearer "
try:
# Fetch JWKS and validate token
payload = jwt.decode(
token,
key=get_jwks(), # Implement JWKS fetching with caching
algorithms=["RS256"],
audience=EXPECTED_AUDIENCE,
issuer=EXPECTED_ISSUER,
)
return payload
except JWTError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
def get_jwks():
"""Fetch and cache JWKS for token validation."""
import requests
# In production, cache this with a TTL of 1 hour
response = requests.get(JWKS_URI, timeout=5)
response.raise_for_status()
return response.json()
Step 5: Start the Server#
# Run with: uvicorn server:app --host 0.0.0.0 --port 8080
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
Your agent card will be automatically served at http://localhost:8080/.well-known/agent.json.
Step 6: Call Your Agent from Another Agent#
# Calling your A2A agent from an orchestrator
from a2a.client import A2AClient, TaskSendRequest, Message, Part
import uuid
async def call_document_agent(document_text: str):
async with A2AClient() as client:
# Discover the agent
card = await client.get_agent_card("https://doc-analysis-agent.example.com")
print(f"Discovered agent: {card.name}")
# Submit a task
task_id = str(uuid.uuid4())
response = await client.send_task(
agent_url=card.url,
request=TaskSendRequest(
id=task_id,
message=Message(
role="user",
parts=[
Part(
type="text",
text=f"Summarize this document:\n\n{document_text}"
)
]
)
),
access_token="your-oauth-token",
)
# Poll for completion
import asyncio
while response.task.status not in ("completed", "failed", "canceled"):
await asyncio.sleep(1)
task_response = await client.get_task(
agent_url=card.url,
task_id=task_id,
access_token="your-oauth-token",
)
response = task_response
if response.task.status == "completed":
artifact = response.task.artifacts[0]
return artifact.parts[0].text
else:
raise Exception(f"Task failed: {response.task.error}")
Deployment Checklist#
Before deploying to production:
- HTTPS only — the
/.well-known/agent.jsonendpoint and all A2A endpoints must be served over TLS - OAuth 2.1 enforced on all task endpoints
- Task store backed by a persistent database (not in-memory dict)
- Rate limiting on the task submission endpoint
- Request/response logging for all task operations (see agent audit trail)
- Graceful error handling — tasks should always reach a terminal state
- Health check endpoint for load balancer integration
- Agent Card version reflects current capabilities
Next Steps#
- Learn how to integrate A2A with Google ADK for multi-agent orchestration
- Read the A2A vs Function Calling comparison to understand when to use each
- Explore agent security best practices for production A2A deployments
- Build with MCP servers alongside A2A for tool integration