🤖AI Agents Guide
TutorialsComparisonsReviewsExamplesIntegrationsUse CasesTemplatesGlossary
Get Started
🤖AI Agents Guide

Your comprehensive resource for understanding, building, and implementing AI Agents.

Learn

  • Tutorials
  • Glossary
  • Use Cases
  • Examples

Compare

  • Tool Comparisons
  • Reviews
  • Integrations
  • Templates

Company

  • About
  • Contact
  • Privacy Policy

© 2026 AI Agents Guide. All rights reserved.

Home/Tutorials/How to Build a Multi-Agent Pipeline
intermediate18 min read

How to Build a Multi-Agent Pipeline

Step-by-step tutorial for building a multi-agent pipeline using CrewAI and LangGraph. Learn the orchestrator-worker agent pattern with a complete research-writing-editing pipeline implementation in Python. Full code included.

Diagram of multi-agent pipeline showing orchestrator and worker agents
Photo by Unsplash on Unsplash
By AI Agents Guide Team•March 1, 2026

Table of Contents

  1. Understanding the Orchestrator-Worker Pattern
  2. Implementation 1: Multi-Agent Pipeline with CrewAI
  3. Setup
  4. Define the Tools
  5. Define the Worker Agents
  6. Define the Tasks
  7. Assemble the Crew and Run
  8. Adding Parallel Execution
  9. Implementation 2: Multi-Agent Pipeline with LangGraph
  10. Setup
  11. Define State Schema
  12. Define Agent Nodes
  13. Build the Graph
  14. Run the Pipeline
  15. Advanced Patterns: Parallel Research Agents
  16. Production Considerations
  17. Key Takeaways
Code editor showing multi-agent pipeline implementation with CrewAI
Photo by Unsplash on Unsplash

How to Build a Multi-Agent Pipeline

Multi-agent pipelines are one of the most powerful patterns in AI development. Instead of a single agent trying to do everything, you compose multiple specialized agents — each excellent at one thing — into a coordinated pipeline that handles complex, multi-step tasks reliably.

In this tutorial, we build a complete research-writing-editing pipeline: a ResearchAgent that gathers information, a WriterAgent that produces structured content, and an EditorAgent that refines and fact-checks the output. We implement this using both CrewAI (simpler API) and LangGraph (more control).

Prerequisites: Python 3.11+, familiarity with basic AI agent concepts, API keys for OpenAI or Anthropic.

What you will build: A pipeline that takes a research topic → produces a polished, fact-checked article.

Related: Multi-Agent Systems Guide | Multi-Agent Systems Glossary | LangGraph Multi-Agent Tutorial


Understanding the Orchestrator-Worker Pattern#

The orchestrator-worker pattern divides multi-agent systems into two roles:

Orchestrator: Manages the workflow. Breaks the goal into tasks, delegates to workers, synthesizes results. Does not do domain-specific work itself.

Workers: Specialists. Each worker is excellent at one thing — research, writing, coding, verification. They receive a specific task and return a specific output.

Goal → Orchestrator → [Research Worker] → [Writer Worker] → [Editor Worker] → Output
                    ↑_________________________ Error handling ___________________|

This separation has important benefits:

  • Each worker agent can be optimized independently for its task
  • You can swap workers without changing the orchestration logic
  • Workers can run in parallel when tasks are independent
  • Failures are isolated — one worker failing does not crash the entire system

Implementation 1: Multi-Agent Pipeline with CrewAI#

CrewAI's role-based design maps naturally to the orchestrator-worker pattern.

Setup#

pip install crewai crewai-tools langchain-openai python-dotenv
# .env
OPENAI_API_KEY=your_key_here
SERPER_API_KEY=your_serper_key  # For web search

Define the Tools#

from crewai_tools import SerperDevTool, WebsiteSearchTool

search_tool = SerperDevTool()
web_tool = WebsiteSearchTool()

Define the Worker Agents#

from crewai import Agent

# Research specialist: gathers and synthesizes information
researcher = Agent(
    role='Senior Research Analyst',
    goal='Research topics thoroughly and provide accurate, comprehensive information with citations',
    backstory="""You are a senior research analyst with expertise in finding,
    evaluating, and synthesizing information from multiple sources.
    You prioritize accuracy and always cite your sources.""",
    llm='gpt-4o',
    tools=[search_tool, web_tool],
    verbose=True,
    max_iter=5,
    memory=True
)

# Writing specialist: transforms research into structured content
writer = Agent(
    role='Content Writer',
    goal='Transform research findings into well-structured, engaging, accurate articles',
    backstory="""You are an expert content writer who excels at turning
    raw research into clear, engaging long-form content. You structure
    information logically and write in a style that is both authoritative and accessible.""",
    llm='gpt-4o',
    verbose=True,
    memory=True
)

# Editor specialist: refines and fact-checks content
editor = Agent(
    role='Senior Editor',
    goal='Review and improve content for accuracy, clarity, structure, and professional quality',
    backstory="""You are a meticulous senior editor with high standards.
    You improve clarity, fix factual inconsistencies, ensure logical flow,
    and polish language. You catch errors others miss.""",
    llm='gpt-4o',
    tools=[web_tool],  # Can verify facts
    verbose=True,
    memory=True
)

Define the Tasks#

Tasks define exactly what each agent must produce. Crucially, you can pass the output of previous tasks as context.

from crewai import Task

def create_research_task(topic: str) -> Task:
    return Task(
        description=f"""Research the following topic comprehensively: {topic}

        Your research must include:
        1. A factual overview of the topic (3-5 key points)
        2. Current developments and recent news (last 6 months)
        3. Expert perspectives or quotes where available
        4. Key statistics or data points with sources
        5. Related concepts and context

        Provide your research as a structured document with clear sections and source citations.""",
        expected_output="""A comprehensive research document with:
        - Executive summary (2-3 sentences)
        - Key facts and findings (5-10 bullet points with citations)
        - Current developments section
        - Data and statistics section
        - Source list""",
        agent=researcher
    )

def create_writing_task(research_task: Task) -> Task:
    return Task(
        description="""Using the research provided, write a comprehensive article.

        Article requirements:
        - Length: 800-1200 words
        - Structure: Introduction, 3-4 main sections with H2 headings, conclusion
        - Tone: Authoritative but accessible to a technical audience
        - Include: A strong hook in the introduction, concrete examples, actionable insights
        - Format: Ready for publication with proper markdown formatting""",
        expected_output="""A complete, publication-ready article in markdown format with:
        - Compelling title (H1)
        - Introduction with hook
        - 3-4 main sections (H2s) with substantive content
        - Conclusion with key takeaways
        - Word count: 800-1200 words""",
        agent=writer,
        context=[research_task]  # Writer receives research as context
    )

def create_editing_task(writing_task: Task) -> Task:
    return Task(
        description="""Review and improve the article provided.

        Editing checklist:
        1. Factual accuracy: Flag any claims that need verification
        2. Structure: Ensure logical flow and clear section transitions
        3. Clarity: Simplify complex sentences, eliminate jargon where possible
        4. Completeness: Identify any important gaps in coverage
        5. Quality: Polish language for professional publication

        Return the improved article with a brief editor's note explaining key changes made.""",
        expected_output="""The improved article in full, plus an editor's note listing:
        - Key changes made
        - Any facts that were verified or corrected
        - Quality assessment (1-10 score with rationale)""",
        agent=editor,
        context=[writing_task]  # Editor receives the written article
    )

Assemble the Crew and Run#

from crewai import Crew, Process

def run_research_writing_pipeline(topic: str) -> str:
    """Run the complete research → writing → editing pipeline."""

    # Create tasks
    research_task = create_research_task(topic)
    writing_task = create_writing_task(research_task)
    editing_task = create_editing_task(writing_task)

    # Create the crew with sequential process
    crew = Crew(
        agents=[researcher, writer, editor],
        tasks=[research_task, writing_task, editing_task],
        process=Process.sequential,  # Tasks run in order
        verbose=True,
        memory=True,  # Enable crew-level memory
        max_rpm=10    # Rate limiting for API calls
    )

    # Run the pipeline
    result = crew.kickoff()
    return result

# Run it
topic = "The impact of AI agents on software development workflows in 2026"
output = run_research_writing_pipeline(topic)
print(output)

Adding Parallel Execution#

When tasks are independent of each other, run them in parallel:

# Example: Research multiple topics simultaneously
from crewai import Crew, Process

multi_topic_crew = Crew(
    agents=[researcher, researcher, writer],  # Two researcher instances
    tasks=[research_task_1, research_task_2, combined_writing_task],
    process=Process.hierarchical,  # Manager coordinates parallel work
    manager_llm='gpt-4o',  # Orchestrator LLM
    verbose=True
)

Implementation 2: Multi-Agent Pipeline with LangGraph#

LangGraph gives you more control over the execution flow through explicit state management and conditional routing.

Setup#

pip install langgraph langchain-openai langchain-community

Define State Schema#

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated, List
from langchain_openai import ChatOpenAI
import operator

class PipelineState(TypedDict):
    """State shared across all agents in the pipeline."""
    topic: str                          # Input research topic
    research_notes: str                 # Output from research agent
    draft_article: str                  # Output from writer agent
    final_article: str                  # Output from editor agent
    editor_feedback: str                # Editor's assessment
    iteration_count: int               # Track revision cycles
    messages: Annotated[list, operator.add]  # Full conversation history
    errors: List[str]                   # Error tracking

Define Agent Nodes#

Each node is a function that takes state and returns updated state.

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_community.tools import DuckDuckGoSearchRun

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
search = DuckDuckGoSearchRun()

def research_node(state: PipelineState) -> dict:
    """Research agent: gathers comprehensive information on the topic."""
    topic = state["topic"]

    # Gather search results
    search_results = search.run(f"{topic} latest developments 2026")
    search_results_2 = search.run(f"{topic} statistics data analysis")

    # Synthesize research with LLM
    research_prompt = f"""You are a senior research analyst.

Topic: {topic}

Search Results:
{search_results}

{search_results_2}

Synthesize these search results into comprehensive research notes that include:
1. Key facts and findings (with sources where available)
2. Current developments
3. Relevant statistics and data points
4. Important context

Format as structured research notes."""

    response = llm.invoke([HumanMessage(content=research_prompt)])

    return {
        "research_notes": response.content,
        "messages": [HumanMessage(content=f"Research complete for: {topic}")]
    }

def writer_node(state: PipelineState) -> dict:
    """Writer agent: transforms research into a structured article."""
    research = state["research_notes"]
    topic = state["topic"]

    writing_prompt = f"""You are an expert content writer.

Topic: {topic}

Research Notes:
{research}

Write a comprehensive, well-structured article based on this research.

Requirements:
- Length: 800-1200 words
- Format: Markdown with H1 title, H2 section headers
- Tone: Authoritative but accessible
- Include concrete examples and actionable insights
- Strong introduction and clear conclusion

Write the complete article now:"""

    response = llm.invoke([HumanMessage(content=writing_prompt)])

    return {
        "draft_article": response.content,
        "messages": [HumanMessage(content="Article draft complete")]
    }

def editor_node(state: PipelineState) -> dict:
    """Editor agent: reviews, improves, and fact-checks the article."""
    draft = state["draft_article"]
    research = state["research_notes"]

    editing_prompt = f"""You are a meticulous senior editor.

Original Research:
{research}

Article Draft:
{draft}

Review this article and:
1. Check factual accuracy against the research notes
2. Improve clarity and flow
3. Fix any structural issues
4. Polish language for publication quality
5. Provide an editor's assessment

Return:
FINAL_ARTICLE:
[The improved article in full]

EDITOR_NOTES:
[Brief assessment: changes made, quality score 1-10, publication readiness]"""

    response = llm.invoke([HumanMessage(content=editing_prompt)])
    content = response.content

    # Parse the editor's response
    if "FINAL_ARTICLE:" in content and "EDITOR_NOTES:" in content:
        parts = content.split("EDITOR_NOTES:")
        final_article = parts[0].replace("FINAL_ARTICLE:", "").strip()
        editor_notes = parts[1].strip()
    else:
        final_article = content
        editor_notes = "Review complete"

    return {
        "final_article": final_article,
        "editor_feedback": editor_notes,
        "iteration_count": state.get("iteration_count", 0) + 1,
        "messages": [HumanMessage(content=f"Editing complete. Feedback: {editor_notes[:100]}...")]
    }

def quality_check(state: PipelineState) -> str:
    """Routing function: decide if the article needs another revision."""
    feedback = state.get("editor_feedback", "")
    iteration_count = state.get("iteration_count", 0)

    # If quality score is below 7 and we haven't iterated too many times, revise
    if iteration_count < 2 and "score: " in feedback.lower():
        # Extract score if present
        try:
            score_text = feedback.lower().split("score:")[1][:5]
            score = float(''.join(filter(lambda x: x.isdigit() or x == '.', score_text)))
            if score < 7.0:
                return "writer"  # Send back to writer for revision
        except (ValueError, IndexError):
            pass

    return END  # Accept the article

Build the Graph#

def build_pipeline() -> StateGraph:
    """Assemble the multi-agent pipeline graph."""

    workflow = StateGraph(PipelineState)

    # Add nodes
    workflow.add_node("research", research_node)
    workflow.add_node("writer", writer_node)
    workflow.add_node("editor", editor_node)

    # Define the flow
    workflow.set_entry_point("research")
    workflow.add_edge("research", "writer")
    workflow.add_edge("writer", "editor")

    # Conditional edge: editor can send back to writer or accept
    workflow.add_conditional_edges(
        "editor",
        quality_check,
        {
            "writer": "writer",  # Revision needed
            END: END             # Accept article
        }
    )

    return workflow

# Compile with checkpointing for persistence
checkpointer = MemorySaver()
pipeline = build_pipeline().compile(checkpointer=checkpointer)

Run the Pipeline#

def run_pipeline(topic: str, thread_id: str = "default") -> dict:
    """Run the pipeline with state persistence."""

    initial_state = {
        "topic": topic,
        "research_notes": "",
        "draft_article": "",
        "final_article": "",
        "editor_feedback": "",
        "iteration_count": 0,
        "messages": [],
        "errors": []
    }

    config = {"configurable": {"thread_id": thread_id}}

    # Stream output to see progress
    for event in pipeline.stream(initial_state, config):
        node_name = list(event.keys())[0]
        print(f"\n{'='*50}")
        print(f"Completed: {node_name.upper()}")
        print(f"{'='*50}")

    # Get final state
    final_state = pipeline.get_state(config)
    return final_state.values

# Run
result = run_pipeline(
    topic="How AI agents are transforming software development in 2026",
    thread_id="article-001"
)

print("\nFINAL ARTICLE:")
print(result["final_article"])
print("\nEDITOR FEEDBACK:")
print(result["editor_feedback"])

Advanced Patterns: Parallel Research Agents#

For comprehensive coverage, run multiple research agents in parallel:

import asyncio
from langgraph.graph import StateGraph

class ParallelResearchState(TypedDict):
    topic: str
    research_subtopics: List[str]
    research_results: Annotated[list, operator.add]  # Collected from parallel agents
    final_synthesis: str

async def parallel_research_node(state: ParallelResearchState) -> dict:
    """Run multiple research agents concurrently."""
    subtopics = state["research_subtopics"]

    async def research_subtopic(subtopic: str) -> str:
        search_results = search.run(subtopic)
        response = await llm.ainvoke([
            HumanMessage(content=f"Research notes for '{subtopic}': {search_results}")
        ])
        return response.content

    # Run all subtopic research in parallel
    results = await asyncio.gather(*[
        research_subtopic(subtopic) for subtopic in subtopics
    ])

    return {"research_results": list(results)}

Production Considerations#

Before deploying a multi-agent pipeline to production:

1. Rate limiting: Multiple agents calling LLM APIs simultaneously can hit rate limits. Add delays or use exponential backoff.

2. Cost tracking: Multi-agent systems can be expensive. Instrument each node to log token usage.

from langchain.callbacks import get_openai_callback

with get_openai_callback() as cb:
    result = pipeline.invoke(initial_state, config)
    print(f"Total tokens: {cb.total_tokens}")
    print(f"Total cost: ${cb.total_cost:.4f}")

3. Error isolation: Wrap each node in try/except and route failures to an error handler rather than crashing the pipeline.

4. Observability: Add tracing to every node for debugging. LangSmith or Langfuse both integrate with LangGraph automatically.

5. Human review gates: For high-stakes pipelines, add a human-in-the-loop node before final publication:

from langgraph.types import interrupt

def human_review_node(state: PipelineState) -> dict:
    """Pause for human review before publishing."""
    user_decision = interrupt({
        "article": state["final_article"],
        "feedback": state["editor_feedback"],
        "question": "Approve this article for publication? (approve/reject/revise)"
    })
    return {"human_decision": user_decision}

Key Takeaways#

  • The orchestrator-worker pattern separates coordination (orchestrator) from domain expertise (workers)
  • CrewAI is faster to start with; LangGraph gives you more control over complex flows
  • Context passing between tasks is critical — each agent should receive the relevant output of previous agents
  • Quality gates (like the editor routing back to the writer) prevent low-quality outputs from propagating
  • Production multi-agent pipelines need rate limiting, cost tracking, error handling, and observability

For the next step, see our guides on AI Agent State Management and AI Agent Memory Systems to make your pipelines more sophisticated.

Related Tutorials

How to Create a Meeting Scheduling AI Agent

Build an autonomous AI agent to handle meeting scheduling, calendar checks, and bookings intelligently. This step-by-step tutorial covers Python implementation with LangChain, Google Calendar integration, and advanced features like conflict resolution for efficient automation.

How to Manage Multiple AI Agents

Master managing multiple AI agents with this in-depth tutorial. Learn orchestration, state sharing, parallel execution, and scaling using LangGraph and custom tools. From basics to production-ready swarms for complex tasks.

How to Train an AI Agent on Your Own Data

Master training AI agents on custom data with three methods: context stuffing, RAG using vector databases, and fine-tuning. This beginner-to-advanced guide includes step-by-step code examples, pitfalls, and best practices to build knowledgeable agents for your specific needs.

← Back to All Tutorials