🤖AI Agents Guide
TutorialsComparisonsReviewsExamplesIntegrationsUse CasesTemplatesGlossary
Get Started
🤖AI Agents Guide

Your comprehensive resource for understanding, building, and implementing AI Agents.

Learn

  • Tutorials
  • Glossary
  • Use Cases
  • Examples

Compare

  • Tool Comparisons
  • Reviews
  • Integrations
  • Templates

Company

  • About
  • Contact
  • Privacy Policy

© 2026 AI Agents Guide. All rights reserved.

Home/Tutorials/Build Production CrewAI Workflows (2026)
advanced20 min read

Build Production CrewAI Workflows (2026)

Master production CrewAI workflows with custom tools, sequential vs hierarchical processes, YAML-based agent configs, Pydantic output models, async execution, and CrewAI Flows for event-driven pipelines. Complete Python examples included.

assorted hand tools on brown floor
Photo by Eka Cahyadi Hasti on Unsplash
By AI Agents Guide Team•March 1, 2026

Table of Contents

  1. What You'll Build
  2. Prerequisites
  3. Overview
  4. Step 1: Project Structure and YAML Configuration
  5. Step 2: Custom Tools with Pydantic Schemas
  6. Step 3: Crew Assembly with Pydantic Output Models
  7. Step 4: CrewAI Flows for Event-Driven Pipelines
  8. Step 5: Async Execution for Concurrent Crews
  9. Common Issues and Solutions
  10. Production Considerations
  11. Next Steps
  12. Frequently Asked Questions
a wall full of tools
Photo by Anton Savinov on Unsplash

What You'll Build#

By the end of this tutorial you will have a production-ready CrewAI workflow that includes:

  • Custom tools with error handling and Pydantic schemas
  • YAML-based agent and task configuration
  • Sequential and hierarchical process comparison
  • Structured output using Pydantic models
  • Async execution for concurrent crew runs
  • A CrewAI Flow that orchestrates a multi-stage research-to-report pipeline

The final system is a market research pipeline that searches the web, analyzes competitor data, and generates a structured report — all with typed outputs and async execution.

Prerequisites#

  • Python 3.11+
  • crewai>=0.70.0 and crewai-tools>=0.15.0
  • OpenAI or Anthropic API key
  • Basic familiarity with agentic workflows
pip install crewai crewai-tools pydantic python-dotenv

Overview#

CrewAI organizes work around three primitives: Agents (role-playing LLM instances), Tasks (work units with expected outputs), and Crews (coordinated groups of agents). Starting with v0.60, CrewAI also provides Flows for event-driven multi-stage pipelines.

Understanding tool calling is essential here — agents use tools to interact with external systems, and the quality of your tool definitions directly impacts agent performance.

Step 1: Project Structure and YAML Configuration#

Production CrewAI projects use YAML to separate configuration from code. Create the following structure:

market_research/
├── src/
│   └── market_research/
│       ├── config/
│       │   ├── agents.yaml
│       │   └── tasks.yaml
│       ├── tools/
│       │   ├── __init__.py
│       │   ├── search_tool.py
│       │   └── scraper_tool.py
│       ├── crew.py
│       ├── flow.py
│       └── main.py
├── .env
└── pyproject.toml

config/agents.yaml:

researcher:
  role: >
    Senior Market Research Analyst
  goal: >
    Gather comprehensive, accurate market data about {company} in the {industry} sector.
    Focus on recent developments from the last 12 months.
  backstory: >
    You are a veteran analyst with 15 years of experience at top consulting firms.
    You know how to identify signal from noise and always cite your sources.
  llm: openai/gpt-4o
  max_iter: 5
  verbose: false

analyst:
  role: >
    Competitive Intelligence Specialist
  goal: >
    Analyze market data and extract strategic insights about positioning,
    strengths, weaknesses, and opportunities for {company}.
  backstory: >
    You spent a decade at McKinsey building competitive intelligence frameworks.
    You communicate findings with clarity and back every claim with evidence.
  llm: openai/gpt-4o
  max_iter: 3
  verbose: false

config/tasks.yaml:

research_task:
  description: >
    Research {company} in the {industry} sector. Find:
    1. Recent product launches and announcements (last 12 months)
    2. Pricing and positioning strategy
    3. Key customer segments and use cases
    4. Funding history and financial signals if available
    Compile raw findings in a structured format.
  expected_output: >
    A detailed research report with sections: Recent Developments,
    Pricing Strategy, Customer Segments, Financial Signals.
    Each section must have at least 3 data points with sources.
  agent: researcher

analysis_task:
  description: >
    Using the research findings, produce a competitive analysis for {company}.
    Identify: top 3 strategic advantages, top 3 vulnerabilities,
    market positioning score (1-10), and 3 strategic recommendations.
  expected_output: >
    A structured JSON-compatible analysis with keys: advantages (list),
    vulnerabilities (list), positioning_score (int), recommendations (list).
  agent: analyst
  context:
    - research_task

Step 2: Custom Tools with Pydantic Schemas#

Custom tools extend what agents can do. Using StructuredTool with Pydantic gives you input validation and better agent reasoning.

tools/search_tool.py:

import os
from typing import Optional
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import httpx


class SearchInput(BaseModel):
    """Input schema for web search."""
    query: str = Field(description="The search query to execute")
    num_results: int = Field(default=5, ge=1, le=10, description="Number of results to return")
    date_range: Optional[str] = Field(
        default="past_year",
        description="Time filter: past_day, past_week, past_month, past_year"
    )


class WebSearchTool(BaseTool):
    name: str = "web_search"
    description: str = (
        "Search the web for current information. Use this for recent news, "
        "company data, product announcements, and market intelligence."
    )
    args_schema: type[BaseModel] = SearchInput

    def _run(self, query: str, num_results: int = 5, date_range: str = "past_year") -> str:
        """Execute search and return formatted results."""
        try:
            # Using Tavily API as example; replace with your preferred search API
            api_key = os.environ["TAVILY_API_KEY"]
            response = httpx.post(
                "https://api.tavily.com/search",
                json={
                    "api_key": api_key,
                    "query": query,
                    "max_results": num_results,
                    "search_depth": "advanced",
                    "time_range": date_range,
                },
                timeout=30.0,
            )
            response.raise_for_status()
            data = response.json()

            results = []
            for r in data.get("results", []):
                results.append(
                    f"**{r['title']}**\n"
                    f"URL: {r['url']}\n"
                    f"Summary: {r['content'][:500]}\n"
                )
            return "\n---\n".join(results) if results else "No results found."

        except httpx.TimeoutException:
            return "Search timed out. Try a more specific query."
        except httpx.HTTPStatusError as e:
            return f"Search API error: {e.response.status_code}. Try again."
        except KeyError:
            return "TAVILY_API_KEY not set in environment."

tools/scraper_tool.py:

from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import httpx
from bs4 import BeautifulSoup


class ScraperInput(BaseModel):
    url: str = Field(description="The URL to scrape for content")
    max_chars: int = Field(default=3000, description="Maximum characters to return")


class WebScraperTool(BaseTool):
    name: str = "web_scraper"
    description: str = (
        "Extract text content from a specific URL. Use this when you have "
        "a specific page to read in detail."
    )
    args_schema: type[BaseModel] = ScraperInput

    def _run(self, url: str, max_chars: int = 3000) -> str:
        try:
            headers = {"User-Agent": "Mozilla/5.0 (research bot)"}
            response = httpx.get(url, headers=headers, timeout=15.0, follow_redirects=True)
            response.raise_for_status()

            soup = BeautifulSoup(response.text, "html.parser")
            # Remove script/style tags
            for tag in soup(["script", "style", "nav", "footer"]):
                tag.decompose()

            text = soup.get_text(separator="\n", strip=True)
            # Collapse whitespace
            lines = [line.strip() for line in text.splitlines() if line.strip()]
            content = "\n".join(lines)[:max_chars]
            return content or "No readable content found."

        except httpx.TimeoutException:
            return f"Timeout fetching {url}"
        except Exception as e:
            return f"Failed to scrape {url}: {str(e)}"

Step 3: Crew Assembly with Pydantic Output Models#

Using Pydantic models as output_pydantic on tasks gives you structured, validated data instead of raw strings.

crew.py:

from crewai import Agent, Task, Crew, Process
from crewai.project import CrewBase, agent, crew, task
from pydantic import BaseModel, Field
from typing import List
from .tools.search_tool import WebSearchTool
from .tools.scraper_tool import WebScraperTool


# --- Output Models ---

class ResearchReport(BaseModel):
    recent_developments: List[str] = Field(description="List of recent developments with sources")
    pricing_strategy: str = Field(description="Pricing and positioning strategy summary")
    customer_segments: List[str] = Field(description="Key customer segments identified")
    financial_signals: List[str] = Field(description="Financial signals or funding info")


class CompetitiveAnalysis(BaseModel):
    company: str
    advantages: List[str] = Field(description="Top strategic advantages", min_length=3, max_length=5)
    vulnerabilities: List[str] = Field(description="Key vulnerabilities", min_length=3, max_length=5)
    positioning_score: int = Field(ge=1, le=10, description="Market positioning score")
    recommendations: List[str] = Field(description="Strategic recommendations", min_length=3)


# --- Crew Definition ---

@CrewBase
class MarketResearchCrew:
    """Market research crew using YAML config."""
    agents_config = "config/agents.yaml"
    tasks_config = "config/tasks.yaml"

    @agent
    def researcher(self) -> Agent:
        return Agent(
            config=self.agents_config["researcher"],
            tools=[WebSearchTool(), WebScraperTool()],
            allow_delegation=False,  # Disable for sequential predictability
        )

    @agent
    def analyst(self) -> Agent:
        return Agent(
            config=self.agents_config["analyst"],
            tools=[],  # Analyst only reasons, no external tools
            allow_delegation=False,
        )

    @task
    def research_task(self) -> Task:
        return Task(
            config=self.tasks_config["research_task"],
            output_pydantic=ResearchReport,  # Structured output
        )

    @task
    def analysis_task(self) -> Task:
        return Task(
            config=self.tasks_config["analysis_task"],
            output_pydantic=CompetitiveAnalysis,
        )

    @crew
    def crew(self) -> Crew:
        return Crew(
            agents=self.agents,
            tasks=self.tasks,
            process=Process.sequential,  # Deterministic order
            verbose=False,
            max_rpm=20,  # Rate limit: 20 requests/minute
            memory=False,  # Disable for stateless production runs
        )

Step 4: CrewAI Flows for Event-Driven Pipelines#

CrewAI Flows let you build multi-stage pipelines with branching and state. Here is a research-to-report flow:

flow.py:

from crewai.flow.flow import Flow, listen, start, router
from pydantic import BaseModel
from typing import Optional
from .crew import MarketResearchCrew, CompetitiveAnalysis


class ResearchFlowState(BaseModel):
    """Shared state across all flow steps."""
    company: str = ""
    industry: str = ""
    raw_research: str = ""
    analysis: Optional[CompetitiveAnalysis] = None
    report_markdown: str = ""
    needs_human_review: bool = False


class MarketResearchFlow(Flow[ResearchFlowState]):

    @start()
    def initialize(self):
        """Entry point — set up initial state from inputs."""
        print(f"Starting research for: {self.state.company}")
        return "research"

    @listen("research")
    def run_research_crew(self):
        """Execute the research crew."""
        crew = MarketResearchCrew()
        result = crew.crew().kickoff(inputs={
            "company": self.state.company,
            "industry": self.state.industry,
        })
        # Access Pydantic output from the last task
        analysis: CompetitiveAnalysis = result.tasks_output[-1].pydantic
        self.state.analysis = analysis
        self.state.raw_research = str(result.raw)

        # Route based on positioning score
        if analysis.positioning_score >= 7:
            return "high_performer"
        return "needs_improvement"

    @router(run_research_crew)
    def route_by_score(self):
        if self.state.analysis and self.state.analysis.positioning_score >= 7:
            return "high_performer"
        return "needs_improvement"

    @listen("high_performer")
    def generate_growth_report(self):
        """Generate report for high-performing companies."""
        a = self.state.analysis
        self.state.report_markdown = self._format_report(a, focus="growth")
        self.state.needs_human_review = False

    @listen("needs_improvement")
    def generate_improvement_report(self):
        """Generate report with improvement focus."""
        a = self.state.analysis
        self.state.report_markdown = self._format_report(a, focus="improvement")
        self.state.needs_human_review = True  # Flag for human review

    def _format_report(self, analysis: CompetitiveAnalysis, focus: str) -> str:
        return f"""# Competitive Analysis: {analysis.company}

**Positioning Score:** {analysis.positioning_score}/10
**Focus:** {focus.title()}

## Strategic Advantages
{chr(10).join(f'- {a}' for a in analysis.advantages)}

## Vulnerabilities
{chr(10).join(f'- {v}' for v in analysis.vulnerabilities)}

## Recommendations
{chr(10).join(f'{i+1}. {r}' for i, r in enumerate(analysis.recommendations))}
"""

Step 5: Async Execution for Concurrent Crews#

For processing multiple companies in parallel:

main.py:

import asyncio
from crewai import Crew
from .flow import MarketResearchFlow, ResearchFlowState


async def analyze_company(company: str, industry: str) -> dict:
    """Run analysis for a single company asynchronously."""
    flow = MarketResearchFlow()
    await flow.kickoff_async(inputs={
        "company": company,
        "industry": industry,
    })
    return {
        "company": company,
        "score": flow.state.analysis.positioning_score if flow.state.analysis else 0,
        "report": flow.state.report_markdown,
        "needs_review": flow.state.needs_human_review,
    }


async def analyze_batch(companies: list[dict]) -> list[dict]:
    """Analyze multiple companies with concurrency control."""
    semaphore = asyncio.Semaphore(3)  # Max 3 concurrent crews

    async def bounded_analyze(company_data: dict):
        async with semaphore:
            return await analyze_company(
                company_data["name"],
                company_data["industry"]
            )

    tasks = [bounded_analyze(c) for c in companies]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Handle exceptions from individual runs
    output = []
    for company, result in zip(companies, results):
        if isinstance(result, Exception):
            print(f"Failed for {company['name']}: {result}")
            output.append({"company": company["name"], "error": str(result)})
        else:
            output.append(result)
    return output


if __name__ == "__main__":
    companies = [
        {"name": "Salesforce", "industry": "CRM Software"},
        {"name": "HubSpot", "industry": "Marketing Automation"},
        {"name": "Pipedrive", "industry": "Sales Software"},
    ]

    results = asyncio.run(analyze_batch(companies))
    for r in results:
        print(f"\n{'='*50}")
        if "error" in r:
            print(f"ERROR - {r['company']}: {r['error']}")
        else:
            print(r["report"])
            if r["needs_review"]:
                print("** FLAGGED FOR HUMAN REVIEW **")

Common Issues and Solutions#

Issue: Agent loops or exceeds max_iter

Set explicit max_iter on each agent (default is 25, which is too high for production). For most tasks, 5 iterations is sufficient:

Agent(config=..., max_iter=5)

Also set a max_execution_time on tasks to prevent runaway execution:

Task(config=..., max_execution_time=120)  # 2 minute limit

Issue: Pydantic output fails validation

The LLM sometimes produces output that doesn't match your Pydantic schema. Add a converter_llm to the crew for automatic retry and conversion:

from crewai import LLM
Crew(
    ...,
    full_output=True,
    converter_llm=LLM(model="openai/gpt-4o-mini"),  # Cheap model for conversion
)

Issue: Tool rate limits from concurrent crews

Implement tool-level rate limiting using a shared semaphore or a token bucket:

import asyncio
_tool_semaphore = asyncio.Semaphore(5)  # Max 5 tool calls at once

class WebSearchTool(BaseTool):
    async def _arun(self, query: str, **kwargs) -> str:
        async with _tool_semaphore:
            # your async implementation
            ...

Production Considerations#

Cost control: Set max_rpm on your crew and use gpt-4o-mini for the researcher role where possible — it handles information gathering well at 10x lower cost.

Observability: Integrate with LangFuse or CrewAI's built-in telemetry. Set OTEL_EXPORTER_OTLP_ENDPOINT to send traces to your collector.

Idempotency: Store crew run outputs with a job ID. If a run fails mid-flow, you can replay from the last successful step using flow state persistence.

Testing: Run crews with gpt-4o-mini and temperature=0 in your test suite for deterministic outputs. See the agent testing guide for a full testing pipeline.

Config management: Keep agents.yaml and tasks.yaml in version control. Changing agent behavior is a code change — treat it accordingly with code review and staging environments.

Next Steps#

  • Add human-in-the-loop checkpoints using human_input=True on sensitive tasks
  • Explore agent state management for long-running flows
  • Connect your crew to the LangFuse observability platform
  • Build a customer support agent using these patterns
  • Review the CrewAI directory entry for ecosystem tools

Frequently Asked Questions#

CrewAI sequential vs hierarchical process, CrewAI Flows, and async execution are covered in the FAQ above the article. Additional technical questions answered below.

When should I use allow_delegation=True in production?

Only enable delegation when you need agents to dynamically reassign work. In hierarchical process, the manager agent uses delegation to route tasks. In sequential process, delegation creates unpredictable behavior and extra LLM calls. For most production workflows, keep delegation off and rely on explicit task context chains.

How do I pass data between tasks in a sequential crew?

Use the context field on tasks to specify which previous tasks an agent should read. The crew automatically injects the output of listed tasks as context. For structured data, use output_pydantic on the preceding task and access task.output.pydantic in your flow logic.

Related Tutorials

How to Create a Meeting Scheduling AI Agent

Build an autonomous AI agent to handle meeting scheduling, calendar checks, and bookings intelligently. This step-by-step tutorial covers Python implementation with LangChain, Google Calendar integration, and advanced features like conflict resolution for efficient automation.

How to Manage Multiple AI Agents

Master managing multiple AI agents with this in-depth tutorial. Learn orchestration, state sharing, parallel execution, and scaling using LangGraph and custom tools. From basics to production-ready swarms for complex tasks.

How to Train an AI Agent on Your Own Data

Master training AI agents on custom data with three methods: context stuffing, RAG using vector databases, and fine-tuning. This beginner-to-advanced guide includes step-by-step code examples, pitfalls, and best practices to build knowledgeable agents for your specific needs.

← Back to All Tutorials