How to Build a Multi-Agent Pipeline
Multi-agent pipelines are one of the most powerful patterns in AI development. Instead of a single agent trying to do everything, you compose multiple specialized agents — each excellent at one thing — into a coordinated pipeline that handles complex, multi-step tasks reliably.
In this tutorial, we build a complete research-writing-editing pipeline: a ResearchAgent that gathers information, a WriterAgent that produces structured content, and an EditorAgent that refines and fact-checks the output. We implement this using both CrewAI (simpler API) and LangGraph (more control).
Prerequisites: Python 3.11+, familiarity with basic AI agent concepts, API keys for OpenAI or Anthropic.
What you will build: A pipeline that takes a research topic → produces a polished, fact-checked article.
Related: Multi-Agent Systems Guide | Multi-Agent Systems Glossary | LangGraph Multi-Agent Tutorial
Understanding the Orchestrator-Worker Pattern#
The orchestrator-worker pattern divides multi-agent systems into two roles:
Orchestrator: Manages the workflow. Breaks the goal into tasks, delegates to workers, synthesizes results. Does not do domain-specific work itself.
Workers: Specialists. Each worker is excellent at one thing — research, writing, coding, verification. They receive a specific task and return a specific output.
Goal → Orchestrator → [Research Worker] → [Writer Worker] → [Editor Worker] → Output
↑_________________________ Error handling ___________________|
This separation has important benefits:
- Each worker agent can be optimized independently for its task
- You can swap workers without changing the orchestration logic
- Workers can run in parallel when tasks are independent
- Failures are isolated — one worker failing does not crash the entire system
Implementation 1: Multi-Agent Pipeline with CrewAI#
CrewAI's role-based design maps naturally to the orchestrator-worker pattern.
Setup#
pip install crewai crewai-tools langchain-openai python-dotenv
# .env
OPENAI_API_KEY=your_key_here
SERPER_API_KEY=your_serper_key # For web search
Define the Tools#
from crewai_tools import SerperDevTool, WebsiteSearchTool
search_tool = SerperDevTool()
web_tool = WebsiteSearchTool()
Define the Worker Agents#
from crewai import Agent
# Research specialist: gathers and synthesizes information
researcher = Agent(
role='Senior Research Analyst',
goal='Research topics thoroughly and provide accurate, comprehensive information with citations',
backstory="""You are a senior research analyst with expertise in finding,
evaluating, and synthesizing information from multiple sources.
You prioritize accuracy and always cite your sources.""",
llm='gpt-4o',
tools=[search_tool, web_tool],
verbose=True,
max_iter=5,
memory=True
)
# Writing specialist: transforms research into structured content
writer = Agent(
role='Content Writer',
goal='Transform research findings into well-structured, engaging, accurate articles',
backstory="""You are an expert content writer who excels at turning
raw research into clear, engaging long-form content. You structure
information logically and write in a style that is both authoritative and accessible.""",
llm='gpt-4o',
verbose=True,
memory=True
)
# Editor specialist: refines and fact-checks content
editor = Agent(
role='Senior Editor',
goal='Review and improve content for accuracy, clarity, structure, and professional quality',
backstory="""You are a meticulous senior editor with high standards.
You improve clarity, fix factual inconsistencies, ensure logical flow,
and polish language. You catch errors others miss.""",
llm='gpt-4o',
tools=[web_tool], # Can verify facts
verbose=True,
memory=True
)
Define the Tasks#
Tasks define exactly what each agent must produce. Crucially, you can pass the output of previous tasks as context.
from crewai import Task
def create_research_task(topic: str) -> Task:
return Task(
description=f"""Research the following topic comprehensively: {topic}
Your research must include:
1. A factual overview of the topic (3-5 key points)
2. Current developments and recent news (last 6 months)
3. Expert perspectives or quotes where available
4. Key statistics or data points with sources
5. Related concepts and context
Provide your research as a structured document with clear sections and source citations.""",
expected_output="""A comprehensive research document with:
- Executive summary (2-3 sentences)
- Key facts and findings (5-10 bullet points with citations)
- Current developments section
- Data and statistics section
- Source list""",
agent=researcher
)
def create_writing_task(research_task: Task) -> Task:
return Task(
description="""Using the research provided, write a comprehensive article.
Article requirements:
- Length: 800-1200 words
- Structure: Introduction, 3-4 main sections with H2 headings, conclusion
- Tone: Authoritative but accessible to a technical audience
- Include: A strong hook in the introduction, concrete examples, actionable insights
- Format: Ready for publication with proper markdown formatting""",
expected_output="""A complete, publication-ready article in markdown format with:
- Compelling title (H1)
- Introduction with hook
- 3-4 main sections (H2s) with substantive content
- Conclusion with key takeaways
- Word count: 800-1200 words""",
agent=writer,
context=[research_task] # Writer receives research as context
)
def create_editing_task(writing_task: Task) -> Task:
return Task(
description="""Review and improve the article provided.
Editing checklist:
1. Factual accuracy: Flag any claims that need verification
2. Structure: Ensure logical flow and clear section transitions
3. Clarity: Simplify complex sentences, eliminate jargon where possible
4. Completeness: Identify any important gaps in coverage
5. Quality: Polish language for professional publication
Return the improved article with a brief editor's note explaining key changes made.""",
expected_output="""The improved article in full, plus an editor's note listing:
- Key changes made
- Any facts that were verified or corrected
- Quality assessment (1-10 score with rationale)""",
agent=editor,
context=[writing_task] # Editor receives the written article
)
Assemble the Crew and Run#
from crewai import Crew, Process
def run_research_writing_pipeline(topic: str) -> str:
"""Run the complete research → writing → editing pipeline."""
# Create tasks
research_task = create_research_task(topic)
writing_task = create_writing_task(research_task)
editing_task = create_editing_task(writing_task)
# Create the crew with sequential process
crew = Crew(
agents=[researcher, writer, editor],
tasks=[research_task, writing_task, editing_task],
process=Process.sequential, # Tasks run in order
verbose=True,
memory=True, # Enable crew-level memory
max_rpm=10 # Rate limiting for API calls
)
# Run the pipeline
result = crew.kickoff()
return result
# Run it
topic = "The impact of AI agents on software development workflows in 2026"
output = run_research_writing_pipeline(topic)
print(output)
Adding Parallel Execution#
When tasks are independent of each other, run them in parallel:
# Example: Research multiple topics simultaneously
from crewai import Crew, Process
multi_topic_crew = Crew(
agents=[researcher, researcher, writer], # Two researcher instances
tasks=[research_task_1, research_task_2, combined_writing_task],
process=Process.hierarchical, # Manager coordinates parallel work
manager_llm='gpt-4o', # Orchestrator LLM
verbose=True
)
Implementation 2: Multi-Agent Pipeline with LangGraph#
LangGraph gives you more control over the execution flow through explicit state management and conditional routing.
Setup#
pip install langgraph langchain-openai langchain-community
Define State Schema#
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict, Annotated, List
from langchain_openai import ChatOpenAI
import operator
class PipelineState(TypedDict):
"""State shared across all agents in the pipeline."""
topic: str # Input research topic
research_notes: str # Output from research agent
draft_article: str # Output from writer agent
final_article: str # Output from editor agent
editor_feedback: str # Editor's assessment
iteration_count: int # Track revision cycles
messages: Annotated[list, operator.add] # Full conversation history
errors: List[str] # Error tracking
Define Agent Nodes#
Each node is a function that takes state and returns updated state.
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_community.tools import DuckDuckGoSearchRun
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
search = DuckDuckGoSearchRun()
def research_node(state: PipelineState) -> dict:
"""Research agent: gathers comprehensive information on the topic."""
topic = state["topic"]
# Gather search results
search_results = search.run(f"{topic} latest developments 2026")
search_results_2 = search.run(f"{topic} statistics data analysis")
# Synthesize research with LLM
research_prompt = f"""You are a senior research analyst.
Topic: {topic}
Search Results:
{search_results}
{search_results_2}
Synthesize these search results into comprehensive research notes that include:
1. Key facts and findings (with sources where available)
2. Current developments
3. Relevant statistics and data points
4. Important context
Format as structured research notes."""
response = llm.invoke([HumanMessage(content=research_prompt)])
return {
"research_notes": response.content,
"messages": [HumanMessage(content=f"Research complete for: {topic}")]
}
def writer_node(state: PipelineState) -> dict:
"""Writer agent: transforms research into a structured article."""
research = state["research_notes"]
topic = state["topic"]
writing_prompt = f"""You are an expert content writer.
Topic: {topic}
Research Notes:
{research}
Write a comprehensive, well-structured article based on this research.
Requirements:
- Length: 800-1200 words
- Format: Markdown with H1 title, H2 section headers
- Tone: Authoritative but accessible
- Include concrete examples and actionable insights
- Strong introduction and clear conclusion
Write the complete article now:"""
response = llm.invoke([HumanMessage(content=writing_prompt)])
return {
"draft_article": response.content,
"messages": [HumanMessage(content="Article draft complete")]
}
def editor_node(state: PipelineState) -> dict:
"""Editor agent: reviews, improves, and fact-checks the article."""
draft = state["draft_article"]
research = state["research_notes"]
editing_prompt = f"""You are a meticulous senior editor.
Original Research:
{research}
Article Draft:
{draft}
Review this article and:
1. Check factual accuracy against the research notes
2. Improve clarity and flow
3. Fix any structural issues
4. Polish language for publication quality
5. Provide an editor's assessment
Return:
FINAL_ARTICLE:
[The improved article in full]
EDITOR_NOTES:
[Brief assessment: changes made, quality score 1-10, publication readiness]"""
response = llm.invoke([HumanMessage(content=editing_prompt)])
content = response.content
# Parse the editor's response
if "FINAL_ARTICLE:" in content and "EDITOR_NOTES:" in content:
parts = content.split("EDITOR_NOTES:")
final_article = parts[0].replace("FINAL_ARTICLE:", "").strip()
editor_notes = parts[1].strip()
else:
final_article = content
editor_notes = "Review complete"
return {
"final_article": final_article,
"editor_feedback": editor_notes,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": [HumanMessage(content=f"Editing complete. Feedback: {editor_notes[:100]}...")]
}
def quality_check(state: PipelineState) -> str:
"""Routing function: decide if the article needs another revision."""
feedback = state.get("editor_feedback", "")
iteration_count = state.get("iteration_count", 0)
# If quality score is below 7 and we haven't iterated too many times, revise
if iteration_count < 2 and "score: " in feedback.lower():
# Extract score if present
try:
score_text = feedback.lower().split("score:")[1][:5]
score = float(''.join(filter(lambda x: x.isdigit() or x == '.', score_text)))
if score < 7.0:
return "writer" # Send back to writer for revision
except (ValueError, IndexError):
pass
return END # Accept the article
Build the Graph#
def build_pipeline() -> StateGraph:
"""Assemble the multi-agent pipeline graph."""
workflow = StateGraph(PipelineState)
# Add nodes
workflow.add_node("research", research_node)
workflow.add_node("writer", writer_node)
workflow.add_node("editor", editor_node)
# Define the flow
workflow.set_entry_point("research")
workflow.add_edge("research", "writer")
workflow.add_edge("writer", "editor")
# Conditional edge: editor can send back to writer or accept
workflow.add_conditional_edges(
"editor",
quality_check,
{
"writer": "writer", # Revision needed
END: END # Accept article
}
)
return workflow
# Compile with checkpointing for persistence
checkpointer = MemorySaver()
pipeline = build_pipeline().compile(checkpointer=checkpointer)
Run the Pipeline#
def run_pipeline(topic: str, thread_id: str = "default") -> dict:
"""Run the pipeline with state persistence."""
initial_state = {
"topic": topic,
"research_notes": "",
"draft_article": "",
"final_article": "",
"editor_feedback": "",
"iteration_count": 0,
"messages": [],
"errors": []
}
config = {"configurable": {"thread_id": thread_id}}
# Stream output to see progress
for event in pipeline.stream(initial_state, config):
node_name = list(event.keys())[0]
print(f"\n{'='*50}")
print(f"Completed: {node_name.upper()}")
print(f"{'='*50}")
# Get final state
final_state = pipeline.get_state(config)
return final_state.values
# Run
result = run_pipeline(
topic="How AI agents are transforming software development in 2026",
thread_id="article-001"
)
print("\nFINAL ARTICLE:")
print(result["final_article"])
print("\nEDITOR FEEDBACK:")
print(result["editor_feedback"])
Advanced Patterns: Parallel Research Agents#
For comprehensive coverage, run multiple research agents in parallel:
import asyncio
from langgraph.graph import StateGraph
class ParallelResearchState(TypedDict):
topic: str
research_subtopics: List[str]
research_results: Annotated[list, operator.add] # Collected from parallel agents
final_synthesis: str
async def parallel_research_node(state: ParallelResearchState) -> dict:
"""Run multiple research agents concurrently."""
subtopics = state["research_subtopics"]
async def research_subtopic(subtopic: str) -> str:
search_results = search.run(subtopic)
response = await llm.ainvoke([
HumanMessage(content=f"Research notes for '{subtopic}': {search_results}")
])
return response.content
# Run all subtopic research in parallel
results = await asyncio.gather(*[
research_subtopic(subtopic) for subtopic in subtopics
])
return {"research_results": list(results)}
Production Considerations#
Before deploying a multi-agent pipeline to production:
1. Rate limiting: Multiple agents calling LLM APIs simultaneously can hit rate limits. Add delays or use exponential backoff.
2. Cost tracking: Multi-agent systems can be expensive. Instrument each node to log token usage.
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
result = pipeline.invoke(initial_state, config)
print(f"Total tokens: {cb.total_tokens}")
print(f"Total cost: ${cb.total_cost:.4f}")
3. Error isolation: Wrap each node in try/except and route failures to an error handler rather than crashing the pipeline.
4. Observability: Add tracing to every node for debugging. LangSmith or Langfuse both integrate with LangGraph automatically.
5. Human review gates: For high-stakes pipelines, add a human-in-the-loop node before final publication:
from langgraph.types import interrupt
def human_review_node(state: PipelineState) -> dict:
"""Pause for human review before publishing."""
user_decision = interrupt({
"article": state["final_article"],
"feedback": state["editor_feedback"],
"question": "Approve this article for publication? (approve/reject/revise)"
})
return {"human_decision": user_decision}
Key Takeaways#
- The orchestrator-worker pattern separates coordination (orchestrator) from domain expertise (workers)
- CrewAI is faster to start with; LangGraph gives you more control over complex flows
- Context passing between tasks is critical — each agent should receive the relevant output of previous agents
- Quality gates (like the editor routing back to the writer) prevent low-quality outputs from propagating
- Production multi-agent pipelines need rate limiting, cost tracking, error handling, and observability
For the next step, see our guides on AI Agent State Management and AI Agent Memory Systems to make your pipelines more sophisticated.