What You'll Build#
By the end of this tutorial you will have a production-ready CrewAI workflow that includes:
- Custom tools with error handling and Pydantic schemas
- YAML-based agent and task configuration
- Sequential and hierarchical process comparison
- Structured output using Pydantic models
- Async execution for concurrent crew runs
- A CrewAI Flow that orchestrates a multi-stage research-to-report pipeline
The final system is a market research pipeline that searches the web, analyzes competitor data, and generates a structured report — all with typed outputs and async execution.
Prerequisites#
- Python 3.11+
crewai>=0.70.0andcrewai-tools>=0.15.0- OpenAI or Anthropic API key
- Basic familiarity with agentic workflows
pip install crewai crewai-tools pydantic python-dotenv
Overview#
CrewAI organizes work around three primitives: Agents (role-playing LLM instances), Tasks (work units with expected outputs), and Crews (coordinated groups of agents). Starting with v0.60, CrewAI also provides Flows for event-driven multi-stage pipelines.
Understanding tool calling is essential here — agents use tools to interact with external systems, and the quality of your tool definitions directly impacts agent performance.
Step 1: Project Structure and YAML Configuration#
Production CrewAI projects use YAML to separate configuration from code. Create the following structure:
market_research/
├── src/
│ └── market_research/
│ ├── config/
│ │ ├── agents.yaml
│ │ └── tasks.yaml
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── search_tool.py
│ │ └── scraper_tool.py
│ ├── crew.py
│ ├── flow.py
│ └── main.py
├── .env
└── pyproject.toml
config/agents.yaml:
researcher:
role: >
Senior Market Research Analyst
goal: >
Gather comprehensive, accurate market data about {company} in the {industry} sector.
Focus on recent developments from the last 12 months.
backstory: >
You are a veteran analyst with 15 years of experience at top consulting firms.
You know how to identify signal from noise and always cite your sources.
llm: openai/gpt-4o
max_iter: 5
verbose: false
analyst:
role: >
Competitive Intelligence Specialist
goal: >
Analyze market data and extract strategic insights about positioning,
strengths, weaknesses, and opportunities for {company}.
backstory: >
You spent a decade at McKinsey building competitive intelligence frameworks.
You communicate findings with clarity and back every claim with evidence.
llm: openai/gpt-4o
max_iter: 3
verbose: false
config/tasks.yaml:
research_task:
description: >
Research {company} in the {industry} sector. Find:
1. Recent product launches and announcements (last 12 months)
2. Pricing and positioning strategy
3. Key customer segments and use cases
4. Funding history and financial signals if available
Compile raw findings in a structured format.
expected_output: >
A detailed research report with sections: Recent Developments,
Pricing Strategy, Customer Segments, Financial Signals.
Each section must have at least 3 data points with sources.
agent: researcher
analysis_task:
description: >
Using the research findings, produce a competitive analysis for {company}.
Identify: top 3 strategic advantages, top 3 vulnerabilities,
market positioning score (1-10), and 3 strategic recommendations.
expected_output: >
A structured JSON-compatible analysis with keys: advantages (list),
vulnerabilities (list), positioning_score (int), recommendations (list).
agent: analyst
context:
- research_task
Step 2: Custom Tools with Pydantic Schemas#
Custom tools extend what agents can do. Using StructuredTool with Pydantic gives you input validation and better agent reasoning.
tools/search_tool.py:
import os
from typing import Optional
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import httpx
class SearchInput(BaseModel):
"""Input schema for web search."""
query: str = Field(description="The search query to execute")
num_results: int = Field(default=5, ge=1, le=10, description="Number of results to return")
date_range: Optional[str] = Field(
default="past_year",
description="Time filter: past_day, past_week, past_month, past_year"
)
class WebSearchTool(BaseTool):
name: str = "web_search"
description: str = (
"Search the web for current information. Use this for recent news, "
"company data, product announcements, and market intelligence."
)
args_schema: type[BaseModel] = SearchInput
def _run(self, query: str, num_results: int = 5, date_range: str = "past_year") -> str:
"""Execute search and return formatted results."""
try:
# Using Tavily API as example; replace with your preferred search API
api_key = os.environ["TAVILY_API_KEY"]
response = httpx.post(
"https://api.tavily.com/search",
json={
"api_key": api_key,
"query": query,
"max_results": num_results,
"search_depth": "advanced",
"time_range": date_range,
},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
results = []
for r in data.get("results", []):
results.append(
f"**{r['title']}**\n"
f"URL: {r['url']}\n"
f"Summary: {r['content'][:500]}\n"
)
return "\n---\n".join(results) if results else "No results found."
except httpx.TimeoutException:
return "Search timed out. Try a more specific query."
except httpx.HTTPStatusError as e:
return f"Search API error: {e.response.status_code}. Try again."
except KeyError:
return "TAVILY_API_KEY not set in environment."
tools/scraper_tool.py:
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import httpx
from bs4 import BeautifulSoup
class ScraperInput(BaseModel):
url: str = Field(description="The URL to scrape for content")
max_chars: int = Field(default=3000, description="Maximum characters to return")
class WebScraperTool(BaseTool):
name: str = "web_scraper"
description: str = (
"Extract text content from a specific URL. Use this when you have "
"a specific page to read in detail."
)
args_schema: type[BaseModel] = ScraperInput
def _run(self, url: str, max_chars: int = 3000) -> str:
try:
headers = {"User-Agent": "Mozilla/5.0 (research bot)"}
response = httpx.get(url, headers=headers, timeout=15.0, follow_redirects=True)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove script/style tags
for tag in soup(["script", "style", "nav", "footer"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
# Collapse whitespace
lines = [line.strip() for line in text.splitlines() if line.strip()]
content = "\n".join(lines)[:max_chars]
return content or "No readable content found."
except httpx.TimeoutException:
return f"Timeout fetching {url}"
except Exception as e:
return f"Failed to scrape {url}: {str(e)}"
Step 3: Crew Assembly with Pydantic Output Models#
Using Pydantic models as output_pydantic on tasks gives you structured, validated data instead of raw strings.
crew.py:
from crewai import Agent, Task, Crew, Process
from crewai.project import CrewBase, agent, crew, task
from pydantic import BaseModel, Field
from typing import List
from .tools.search_tool import WebSearchTool
from .tools.scraper_tool import WebScraperTool
# --- Output Models ---
class ResearchReport(BaseModel):
recent_developments: List[str] = Field(description="List of recent developments with sources")
pricing_strategy: str = Field(description="Pricing and positioning strategy summary")
customer_segments: List[str] = Field(description="Key customer segments identified")
financial_signals: List[str] = Field(description="Financial signals or funding info")
class CompetitiveAnalysis(BaseModel):
company: str
advantages: List[str] = Field(description="Top strategic advantages", min_length=3, max_length=5)
vulnerabilities: List[str] = Field(description="Key vulnerabilities", min_length=3, max_length=5)
positioning_score: int = Field(ge=1, le=10, description="Market positioning score")
recommendations: List[str] = Field(description="Strategic recommendations", min_length=3)
# --- Crew Definition ---
@CrewBase
class MarketResearchCrew:
"""Market research crew using YAML config."""
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config["researcher"],
tools=[WebSearchTool(), WebScraperTool()],
allow_delegation=False, # Disable for sequential predictability
)
@agent
def analyst(self) -> Agent:
return Agent(
config=self.agents_config["analyst"],
tools=[], # Analyst only reasons, no external tools
allow_delegation=False,
)
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config["research_task"],
output_pydantic=ResearchReport, # Structured output
)
@task
def analysis_task(self) -> Task:
return Task(
config=self.tasks_config["analysis_task"],
output_pydantic=CompetitiveAnalysis,
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential, # Deterministic order
verbose=False,
max_rpm=20, # Rate limit: 20 requests/minute
memory=False, # Disable for stateless production runs
)
Step 4: CrewAI Flows for Event-Driven Pipelines#
CrewAI Flows let you build multi-stage pipelines with branching and state. Here is a research-to-report flow:
flow.py:
from crewai.flow.flow import Flow, listen, start, router
from pydantic import BaseModel
from typing import Optional
from .crew import MarketResearchCrew, CompetitiveAnalysis
class ResearchFlowState(BaseModel):
"""Shared state across all flow steps."""
company: str = ""
industry: str = ""
raw_research: str = ""
analysis: Optional[CompetitiveAnalysis] = None
report_markdown: str = ""
needs_human_review: bool = False
class MarketResearchFlow(Flow[ResearchFlowState]):
@start()
def initialize(self):
"""Entry point — set up initial state from inputs."""
print(f"Starting research for: {self.state.company}")
return "research"
@listen("research")
def run_research_crew(self):
"""Execute the research crew."""
crew = MarketResearchCrew()
result = crew.crew().kickoff(inputs={
"company": self.state.company,
"industry": self.state.industry,
})
# Access Pydantic output from the last task
analysis: CompetitiveAnalysis = result.tasks_output[-1].pydantic
self.state.analysis = analysis
self.state.raw_research = str(result.raw)
# Route based on positioning score
if analysis.positioning_score >= 7:
return "high_performer"
return "needs_improvement"
@router(run_research_crew)
def route_by_score(self):
if self.state.analysis and self.state.analysis.positioning_score >= 7:
return "high_performer"
return "needs_improvement"
@listen("high_performer")
def generate_growth_report(self):
"""Generate report for high-performing companies."""
a = self.state.analysis
self.state.report_markdown = self._format_report(a, focus="growth")
self.state.needs_human_review = False
@listen("needs_improvement")
def generate_improvement_report(self):
"""Generate report with improvement focus."""
a = self.state.analysis
self.state.report_markdown = self._format_report(a, focus="improvement")
self.state.needs_human_review = True # Flag for human review
def _format_report(self, analysis: CompetitiveAnalysis, focus: str) -> str:
return f"""# Competitive Analysis: {analysis.company}
**Positioning Score:** {analysis.positioning_score}/10
**Focus:** {focus.title()}
## Strategic Advantages
{chr(10).join(f'- {a}' for a in analysis.advantages)}
## Vulnerabilities
{chr(10).join(f'- {v}' for v in analysis.vulnerabilities)}
## Recommendations
{chr(10).join(f'{i+1}. {r}' for i, r in enumerate(analysis.recommendations))}
"""
Step 5: Async Execution for Concurrent Crews#
For processing multiple companies in parallel:
main.py:
import asyncio
from crewai import Crew
from .flow import MarketResearchFlow, ResearchFlowState
async def analyze_company(company: str, industry: str) -> dict:
"""Run analysis for a single company asynchronously."""
flow = MarketResearchFlow()
await flow.kickoff_async(inputs={
"company": company,
"industry": industry,
})
return {
"company": company,
"score": flow.state.analysis.positioning_score if flow.state.analysis else 0,
"report": flow.state.report_markdown,
"needs_review": flow.state.needs_human_review,
}
async def analyze_batch(companies: list[dict]) -> list[dict]:
"""Analyze multiple companies with concurrency control."""
semaphore = asyncio.Semaphore(3) # Max 3 concurrent crews
async def bounded_analyze(company_data: dict):
async with semaphore:
return await analyze_company(
company_data["name"],
company_data["industry"]
)
tasks = [bounded_analyze(c) for c in companies]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions from individual runs
output = []
for company, result in zip(companies, results):
if isinstance(result, Exception):
print(f"Failed for {company['name']}: {result}")
output.append({"company": company["name"], "error": str(result)})
else:
output.append(result)
return output
if __name__ == "__main__":
companies = [
{"name": "Salesforce", "industry": "CRM Software"},
{"name": "HubSpot", "industry": "Marketing Automation"},
{"name": "Pipedrive", "industry": "Sales Software"},
]
results = asyncio.run(analyze_batch(companies))
for r in results:
print(f"\n{'='*50}")
if "error" in r:
print(f"ERROR - {r['company']}: {r['error']}")
else:
print(r["report"])
if r["needs_review"]:
print("** FLAGGED FOR HUMAN REVIEW **")
Common Issues and Solutions#
Issue: Agent loops or exceeds max_iter
Set explicit max_iter on each agent (default is 25, which is too high for production). For most tasks, 5 iterations is sufficient:
Agent(config=..., max_iter=5)
Also set a max_execution_time on tasks to prevent runaway execution:
Task(config=..., max_execution_time=120) # 2 minute limit
Issue: Pydantic output fails validation
The LLM sometimes produces output that doesn't match your Pydantic schema. Add a converter_llm to the crew for automatic retry and conversion:
from crewai import LLM
Crew(
...,
full_output=True,
converter_llm=LLM(model="openai/gpt-4o-mini"), # Cheap model for conversion
)
Issue: Tool rate limits from concurrent crews
Implement tool-level rate limiting using a shared semaphore or a token bucket:
import asyncio
_tool_semaphore = asyncio.Semaphore(5) # Max 5 tool calls at once
class WebSearchTool(BaseTool):
async def _arun(self, query: str, **kwargs) -> str:
async with _tool_semaphore:
# your async implementation
...
Production Considerations#
Cost control: Set max_rpm on your crew and use gpt-4o-mini for the researcher role where possible — it handles information gathering well at 10x lower cost.
Observability: Integrate with LangFuse or CrewAI's built-in telemetry. Set OTEL_EXPORTER_OTLP_ENDPOINT to send traces to your collector.
Idempotency: Store crew run outputs with a job ID. If a run fails mid-flow, you can replay from the last successful step using flow state persistence.
Testing: Run crews with gpt-4o-mini and temperature=0 in your test suite for deterministic outputs. See the agent testing guide for a full testing pipeline.
Config management: Keep agents.yaml and tasks.yaml in version control. Changing agent behavior is a code change — treat it accordingly with code review and staging environments.
Next Steps#
- Add human-in-the-loop checkpoints using
human_input=Trueon sensitive tasks - Explore agent state management for long-running flows
- Connect your crew to the LangFuse observability platform
- Build a customer support agent using these patterns
- Review the CrewAI directory entry for ecosystem tools
Frequently Asked Questions#
CrewAI sequential vs hierarchical process, CrewAI Flows, and async execution are covered in the FAQ above the article. Additional technical questions answered below.
When should I use allow_delegation=True in production?
Only enable delegation when you need agents to dynamically reassign work. In hierarchical process, the manager agent uses delegation to route tasks. In sequential process, delegation creates unpredictable behavior and extra LLM calls. For most production workflows, keep delegation off and rely on explicit task context chains.
How do I pass data between tasks in a sequential crew?
Use the context field on tasks to specify which previous tasks an agent should read. The crew automatically injects the output of listed tasks as context. For structured data, use output_pydantic on the preceding task and access task.output.pydantic in your flow logic.