What You'll Build#
A production multi-agent system using the OpenAI Agents SDK that includes:
- Function tools with typed Python signatures and docstrings
- Hosted tools: WebSearchTool and FileSearchTool
- A tool approval workflow using interrupts
- Streaming tool call responses
- A triage agent that hands off to specialist agents
The system implements a research assistant that can search the web, query documents, and hand off to a writer agent for report generation.
Prerequisites#
pip install openai-agents pydantic python-dotenv
- Python 3.11+
- OpenAI API key with Responses API access
- Familiarity with function calling and tool use
Overview#
The OpenAI Agents SDK (released early 2025) provides a higher-level abstraction over the Responses API. Instead of managing the tool call loop manually, the SDK handles the reasoning loop — you define agents, tools, and handoffs, then call Runner.run().
Key SDK concepts:
Agent: An LLM with instructions, tools, and optional handoffs@function_tool: Decorator that converts a Python function into an agent toolRunner: Executes agents, manages the tool call loopHandoff: Transfers control between agents- Hosted tools: WebSearchTool, FileSearchTool, ComputerTool — run on OpenAI infra
Step 1: Function Tools with Type Hints#
The @function_tool decorator uses Python type hints and docstrings to generate the tool schema. Write docstrings for the LLM, not for human developers.
from openai_agents import function_tool, RunContext
from pydantic import BaseModel, Field
from typing import Optional
import httpx
import json
@function_tool
def search_database(
query: str,
table: str,
limit: int = 10,
) -> str:
"""Search the internal database for records.
Use this tool to find customer records, product data, or historical reports.
Do NOT use for real-time web information — use web_search for that.
Args:
query: SQL WHERE clause conditions (e.g., "email = 'user@example.com'")
table: Table name to query. Options: customers, products, orders, reports
limit: Maximum records to return (1-50)
Returns:
JSON array of matching records, or empty array if none found.
"""
# Validate table name (prevent SQL injection)
allowed_tables = {"customers", "products", "orders", "reports"}
if table not in allowed_tables:
return json.dumps({"error": f"Table '{table}' not allowed. Use: {allowed_tables}"})
# Simulated database — replace with real DB connection
mock_data = {
"customers": [
{"id": 1, "email": "alice@example.com", "plan": "Pro", "mrr": 99},
{"id": 2, "email": "bob@example.com", "plan": "Enterprise", "mrr": 499},
],
"products": [
{"id": 1, "name": "Starter", "price": 29, "features": ["5 users", "10GB"]},
{"id": 2, "name": "Pro", "price": 99, "features": ["25 users", "100GB"]},
],
}
results = mock_data.get(table, [])
return json.dumps(results[:limit])
@function_tool
async def send_email(
to: str,
subject: str,
body: str,
priority: str = "normal",
) -> str:
"""Send an email to a recipient.
WARNING: This action is irreversible. Only use when the user has explicitly
requested sending an email. Confirm recipient and content before calling.
Args:
to: Recipient email address
subject: Email subject line (max 100 characters)
body: Email body in plain text or markdown
priority: Email priority. Options: low, normal, high
Returns:
Confirmation string with message ID, or error description.
"""
if len(subject) > 100:
return "Error: Subject exceeds 100 character limit."
# Simulate email send
import random
msg_id = f"msg_{random.randint(100000, 999999)}"
print(f"[EMAIL SENT] To: {to} | Subject: {subject} | Priority: {priority} | ID: {msg_id}")
return f"Email sent successfully. Message ID: {msg_id}"
# Tool with complex Pydantic input schema
class AnalysisRequest(BaseModel):
data_source: str = Field(description="Where to get data: 'database', 'file', or 'provided'")
metrics: list[str] = Field(
description="Metrics to calculate. Options: revenue, churn, ltv, cac, nps"
)
time_period: str = Field(description="Time period: 'last_7d', 'last_30d', 'last_90d', 'ytd'")
segment: Optional[str] = Field(
default=None,
description="Customer segment to filter by: 'pro', 'enterprise', 'starter', or null for all"
)
@function_tool
def analyze_metrics(request: AnalysisRequest) -> str:
"""Run a business metrics analysis.
Use this when asked to analyze performance, calculate KPIs, or compare segments.
Supports revenue analysis, churn calculation, LTV/CAC ratios, and NPS scores.
Args:
request: Analysis configuration including data source, metrics, and filters
Returns:
JSON object with calculated metrics and trend indicators.
"""
# Simulated analysis results
results = {
"period": request.time_period,
"segment": request.segment or "all",
"metrics": {}
}
for metric in request.metrics:
if metric == "revenue":
results["metrics"]["revenue"] = {"value": 125000, "change_pct": 12.3, "trend": "up"}
elif metric == "churn":
results["metrics"]["churn"] = {"value": 0.023, "change_pct": -0.5, "trend": "down"}
elif metric == "ltv":
results["metrics"]["ltv"] = {"value": 2840, "change_pct": 8.1, "trend": "up"}
return json.dumps(results, indent=2)
Step 2: Hosted Tools#
Hosted tools run on OpenAI's infrastructure — no implementation needed:
from openai_agents import Agent, WebSearchTool, FileSearchTool
# WebSearchTool: real-time web search via Bing
web_search = WebSearchTool(
# Optional: restrict to specific domains
# allowed_domains=["techcrunch.com", "reuters.com"],
)
# FileSearchTool: RAG over your OpenAI vector store
# First, create a vector store and upload documents via the OpenAI Files API
file_search = FileSearchTool(
vector_store_ids=["vs_abc123def456"], # Your vector store ID
max_num_results=5,
# Filters for metadata-based retrieval
# filters={"category": "product_docs"}
)
research_agent = Agent(
name="Research Agent",
instructions="""You are a research specialist. Your job is to gather accurate,
current information from web searches and internal documents.
Always:
- Use web_search for current events, news, and publicly available information
- Use file_search for internal documentation, product specs, and past reports
- Cite your sources in the format [Source: URL or Document Name]
- Distinguish between information from web search vs internal documents
""",
tools=[web_search, file_search, search_database],
model="gpt-4o",
)
Step 3: Multi-Agent Handoffs#
Build a triage system that routes to specialists:
from openai_agents import Agent, handoff, Runner
import asyncio
# Specialist agents
billing_agent = Agent(
name="Billing Specialist",
instructions="""You are a billing specialist. Handle payment issues, invoices,
subscription changes, and refund requests. Always look up the customer account
before responding. Escalate disputes over $1000 to your supervisor.""",
tools=[search_database, send_email],
model="gpt-4o",
)
technical_agent = Agent(
name="Technical Support",
instructions="""You are a technical support engineer. Diagnose and resolve
product issues. Search internal docs for known solutions. Create bug reports
for confirmed product defects.""",
tools=[file_search, search_database],
model="gpt-4o",
)
research_agent = Agent(
name="Research Specialist",
instructions="""You are a research specialist. Search web and internal docs
to answer complex questions. Synthesize information from multiple sources.
Always cite sources.""",
tools=[web_search, file_search, analyze_metrics],
model="gpt-4o",
)
# Triage agent — routes to specialists
triage_agent = Agent(
name="Triage Agent",
instructions="""You are a triage agent. Classify incoming requests and route
to the appropriate specialist agent.
Routing rules:
- Billing, payment, invoice, refund → billing_specialist
- Bug, error, technical issue, not working → technical_support
- Research, analysis, market data, metrics → research_specialist
- Simple FAQ you can answer directly → answer yourself
When routing: briefly acknowledge the request, then hand off.
Do NOT attempt to resolve issues that belong to a specialist.
""",
handoffs=[billing_agent, technical_agent, research_agent],
model="gpt-4o-mini", # Cheaper model for triage
)
async def run_triage_system(user_message: str) -> str:
"""Run the triage system and return the final response."""
result = await Runner.run(
starting_agent=triage_agent,
input=user_message,
)
return result.final_output
# Example usage
async def main():
queries = [
"I was charged twice this month. Email: user@example.com",
"Getting a 500 error when I try to export data",
"What was our revenue growth last quarter?",
]
for query in queries:
print(f"\nQuery: {query}")
response = await run_triage_system(query)
print(f"Response: {response}")
if __name__ == "__main__":
asyncio.run(main())
Step 4: Tool Approval Workflow#
Implement human-in-the-loop approval for sensitive tool calls:
from openai_agents import Agent, Runner, RunConfig
from openai_agents.interrupts import ToolCallInterrupt
import asyncio
class ApprovalRequired(Exception):
"""Raised when a tool call requires human approval."""
def __init__(self, tool_name: str, tool_input: dict, context: str):
self.tool_name = tool_name
self.tool_input = tool_input
self.context = context
# Track pending approvals (use Redis in production)
pending_approvals: dict[str, dict] = {}
@function_tool
async def send_bulk_email(
segment: str,
subject: str,
body: str,
) -> str:
"""Send a bulk email campaign to a customer segment.
REQUIRES HUMAN APPROVAL before execution.
Use for: marketing campaigns, product announcements, billing notices.
Args:
segment: Customer segment to target: 'all', 'pro', 'enterprise', 'trial'
subject: Email subject line
body: Email body content
"""
# Check for approval in our tracking store
import hashlib
action_key = hashlib.md5(f"{segment}{subject}".encode()).hexdigest()[:8]
if action_key not in pending_approvals or not pending_approvals[action_key].get("approved"):
# Store pending action and raise interrupt
pending_approvals[action_key] = {
"tool": "send_bulk_email",
"inputs": {"segment": segment, "subject": subject, "body": body},
"approved": False,
}
return (
f"APPROVAL REQUIRED: This action will send email to all {segment} customers. "
f"Action ID: {action_key}. "
f"A human must approve this action before it executes. "
f"Please wait for approval and try again."
)
# Approved — execute
count = {"all": 5000, "pro": 800, "enterprise": 150, "trial": 2000}.get(segment, 0)
del pending_approvals[action_key] # Clear after use
return f"Bulk email sent to {count} {segment} customers. Subject: {subject}"
# Human approval endpoint (implement in your web framework)
async def approve_action(action_key: str) -> bool:
"""Called by human reviewer to approve an action."""
if action_key in pending_approvals:
pending_approvals[action_key]["approved"] = True
return True
return False
# Agent with approval-gated tool
campaign_agent = Agent(
name="Campaign Manager",
instructions="""You help plan and execute email campaigns.
When sending bulk emails, the send_bulk_email tool requires human approval.
If approval is needed, inform the user and ask them to approve via the dashboard.
After approval, the action can be retried.""",
tools=[send_bulk_email, analyze_metrics],
model="gpt-4o",
)
Step 5: Streaming Tool Calls#
Stream agent responses in real-time for better UX:
from openai_agents import Runner
from openai_agents.stream_events import (
AgentUpdatedStreamEvent,
RunItemStreamEvent,
RawResponsesStreamEvent,
)
import asyncio
async def stream_agent_response(agent: Agent, user_input: str):
"""Stream agent output with tool call visibility."""
async with Runner.run_streamed(
starting_agent=agent,
input=user_input,
) as stream:
async for event in stream.stream_events():
if isinstance(event, RawResponsesStreamEvent):
# Raw text delta from the model
for chunk in event.data.choices:
if chunk.delta.content:
print(chunk.delta.content, end="", flush=True)
elif isinstance(event, RunItemStreamEvent):
item = event.item
# Tool call started
if hasattr(item, "type") and item.type == "tool_call":
print(f"\n[Tool: {item.name}] Input: {item.arguments}")
# Tool result received
elif hasattr(item, "type") and item.type == "tool_call_output":
result_preview = str(item.output)[:100]
print(f"[Tool Result]: {result_preview}...")
elif isinstance(event, AgentUpdatedStreamEvent):
# Agent handoff occurred
new_agent = event.new_agent
print(f"\n[Handoff to: {new_agent.name}]")
print() # Final newline
return await stream.get_final_output()
# FastAPI endpoint for streaming to frontend
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(body: dict):
user_message = body.get("message", "")
async def event_generator():
async with Runner.run_streamed(
starting_agent=triage_agent,
input=user_message,
) as stream:
async for event in stream.stream_events():
if isinstance(event, RawResponsesStreamEvent):
for chunk in event.data.choices:
if chunk.delta.content:
data = json.dumps({"type": "text", "content": chunk.delta.content})
yield f"data: {data}\n\n"
elif isinstance(event, AgentUpdatedStreamEvent):
data = json.dumps({
"type": "handoff",
"agent": event.new_agent.name
})
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
Common Issues and Solutions#
Issue: Tool docstring is ignored or misinterpreted
The SDK uses the function's docstring as the tool description verbatim. Keep it under 300 words. Start with what the tool does, then when to use it, then parameter descriptions. Avoid markdown formatting inside docstrings — it sometimes confuses the model.
Issue: Handoff loops (agents hand off back and forth indefinitely)
Set max_turns in Runner.run(): Runner.run(agent, input, max_turns=10). Also add explicit routing instructions: "Once you hand off to a specialist, do NOT request a handoff back. The specialist handles the response."
Issue: FileSearchTool returns irrelevant documents
Improve your vector store's chunking strategy. Aim for 500-1000 token chunks with 50-token overlap. Add metadata filters to the FileSearchTool to narrow retrieval by document category or date.
Production Considerations#
Concurrency: Run multiple agent instances with asyncio.gather(). The SDK is fully async — use Runner.run() (async) not the sync wrapper in production.
Tracing: Set OPENAI_AGENTS_TRACE=1 environment variable to enable built-in OpenTelemetry tracing. Send traces to LangFuse or your collector.
Cost: Track token usage via result.usage on the Runner output. Implement token budget enforcement by checking cumulative usage and stopping before hitting limits.
Next Steps#
- Review the OpenAI Agents SDK directory entry for ecosystem tools
- Add rate limiting for production deployments
- Implement agent monitoring for production observability
- Learn agent state management for stateful conversation systems
- Build a complete customer support agent using these patterns