What You'll Build#
A production research agent that:
- Searches the web using Tavily API with multi-query strategy
- Extracts and retrieves documents from source URLs
- Tracks citations with URL, title, date, and snippet
- Synthesizes findings from multiple sources into a coherent narrative
- Runs a fact-checking loop to verify key claims
- Generates a structured markdown report with inline citations
The final agent produces research reports comparable to what a human researcher would compile in several hours — in under two minutes.
Prerequisites#
pip install langgraph langchain langchain-openai tavily-python \
httpx beautifulsoup4 pydantic python-dotenv
- Python 3.11+
- OpenAI API key
- Tavily API key (free tier: 1,000 searches/month)
- Familiarity with agentic workflows and tool calling
Overview#
The research agent follows a multi-phase workflow:
- Query planning — decompose the research question into targeted search queries
- Search — execute queries in parallel using Tavily
- Extraction — fetch full content from top source URLs
- Synthesis — combine findings with citation tracking
- Fact-checking — verify key claims against secondary sources
- Report generation — produce structured markdown with citations
Step 1: Citation Tracking System#
Citations are the core data structure of the research agent. Track them from the first search:
# research_agent/citations.py
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
import hashlib
class Citation(BaseModel):
"""A single source citation."""
id: str = Field(description="Unique citation identifier (cite-001, cite-002, etc.)")
url: str
title: str
snippet: str = Field(description="Relevant excerpt from this source")
retrieved_at: str = Field(default_factory=lambda: datetime.now().isoformat())
publication_date: Optional[str] = None
author: Optional[str] = None
relevance_score: float = Field(ge=0.0, le=1.0, default=0.5)
fact_checked: bool = False
fact_check_result: Optional[str] = None # "confirmed", "disputed", "unverifiable"
class CitationRegistry:
"""Tracks all citations across the research session."""
def __init__(self):
self._citations: dict[str, Citation] = {}
self._url_to_id: dict[str, str] = {}
self._counter = 0
def add(self, url: str, title: str, snippet: str, **kwargs) -> Citation:
"""Add a citation. Returns existing citation if URL already registered."""
if url in self._url_to_id:
return self._citations[self._url_to_id[url]]
self._counter += 1
citation_id = f"cite-{self._counter:03d}"
citation = Citation(
id=citation_id,
url=url,
title=title,
snippet=snippet[:500],
**kwargs,
)
self._citations[citation_id] = citation
self._url_to_id[url] = citation_id
return citation
def get(self, citation_id: str) -> Optional[Citation]:
return self._citations.get(citation_id)
def all(self) -> list[Citation]:
return list(self._citations.values())
def format_bibliography(self) -> str:
"""Format all citations as a bibliography section."""
lines = ["## Sources\n"]
for c in sorted(self._citations.values(), key=lambda x: x.id):
date_str = f" ({c.publication_date})" if c.publication_date else ""
fact_str = f" [{c.fact_check_result.upper()}]" if c.fact_check_result else ""
lines.append(f"[{c.id}] {c.title}{date_str} — {c.url}{fact_str}")
return "\n".join(lines)
def format_for_agent(self) -> str:
"""Format citations compactly for the synthesis prompt."""
lines = []
for c in self._citations.values():
lines.append(f"[{c.id}] {c.title}\n URL: {c.url}\n Excerpt: {c.snippet[:300]}")
return "\n\n".join(lines)
Step 2: Search and Extraction Tools#
# research_agent/tools.py
import asyncio
import httpx
from bs4 import BeautifulSoup
from tavily import AsyncTavilyClient
from .citations import CitationRegistry, Citation
import os
tavily_client = AsyncTavilyClient(api_key=os.environ["TAVILY_API_KEY"])
async def search_web(
queries: list[str],
registry: CitationRegistry,
max_results_per_query: int = 5,
days_back: int = 365,
) -> list[Citation]:
"""
Execute multiple search queries in parallel and register citations.
Args:
queries: List of search queries to execute
registry: Citation registry to populate
max_results_per_query: Results per query (1-10)
days_back: Filter results to last N days
Returns:
List of Citation objects added to the registry
"""
async def single_search(query: str) -> list[dict]:
try:
result = await tavily_client.search(
query=query,
max_results=max_results_per_query,
search_depth="advanced",
days=days_back,
include_raw_content=False,
)
return result.get("results", [])
except Exception as e:
print(f"Search failed for '{query}': {e}")
return []
# Execute all queries in parallel
all_results = await asyncio.gather(*[single_search(q) for q in queries])
new_citations = []
for results in all_results:
for r in results:
citation = registry.add(
url=r.get("url", ""),
title=r.get("title", "Untitled"),
snippet=r.get("content", ""),
relevance_score=r.get("score", 0.5),
publication_date=r.get("published_date"),
)
new_citations.append(citation)
return new_citations
async def extract_page_content(
url: str,
max_chars: int = 5000,
) -> str:
"""
Extract readable text content from a URL.
Returns extracted text or an error message.
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (research-agent/1.0; +https://ai-agents-guide.com)"
}
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.text, "html.parser")
# Remove non-content elements
for tag in soup(["script", "style", "nav", "footer", "header", "aside", "ads"]):
tag.decompose()
# Extract main content
# Try to find article/main content area
main_content = soup.find("article") or soup.find("main") or soup.find("body")
if not main_content:
return "Could not extract content from this page."
text = main_content.get_text(separator="\n", strip=True)
lines = [line.strip() for line in text.splitlines() if len(line.strip()) > 30]
content = "\n".join(lines)[:max_chars]
return content or "No readable content found."
except httpx.TimeoutException:
return f"Timeout fetching {url}"
except httpx.HTTPStatusError as e:
return f"HTTP {e.response.status_code} error fetching {url}"
except Exception as e:
return f"Failed to extract {url}: {str(e)[:100]}"
async def extract_top_sources(
citations: list[Citation],
max_sources: int = 5,
) -> dict[str, str]:
"""Extract full content from the top-ranked citations."""
# Sort by relevance, take top N
top_citations = sorted(citations, key=lambda c: c.relevance_score, reverse=True)[:max_sources]
# Extract in parallel
extraction_tasks = [
extract_page_content(c.url) for c in top_citations
]
contents = await asyncio.gather(*extraction_tasks)
return {
c.id: content
for c, content in zip(top_citations, contents)
}
Step 3: LangGraph Research Workflow#
# research_agent/state.py
from typing import Annotated, Optional
import operator
from pydantic import BaseModel
class ResearchState(BaseModel):
"""State for the research agent graph."""
# Input
research_question: str
depth: str = "standard" # "quick", "standard", "deep"
max_sources: int = 10
# Planning
search_queries: list[str] = []
# Search results
search_citation_ids: Annotated[list[str], operator.add] = []
extracted_content: dict[str, str] = {} # citation_id -> content
# Synthesis
raw_synthesis: str = ""
key_claims: list[str] = [] # Claims to fact-check
# Fact checking
fact_check_results: dict[str, str] = {} # claim -> "confirmed|disputed|unverifiable"
fact_check_iterations: int = 0
# Output
final_report: str = ""
bibliography: str = ""
# Tracking
nodes_visited: Annotated[list[str], operator.add] = []
class Config:
arbitrary_types_allowed = True
# research_agent/graph.py
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from .state import ResearchState
from .citations import CitationRegistry
from .tools import search_web, extract_top_sources
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Global registry per research session (pass through state in production)
_registry = CitationRegistry()
async def plan_queries_node(state: ResearchState) -> dict:
"""Generate targeted search queries from the research question."""
prompt = ChatPromptTemplate.from_messages([
("system", """You are a research strategist. Generate 3-5 targeted search queries
to thoroughly answer the research question.
Guidelines:
- Each query should target a different aspect of the question
- Use specific, search-engine-friendly phrasing
- Include queries for recent data (add "2025" or "2026" where relevant)
- Include one query for expert opinions or academic perspectives
- Keep queries under 10 words each
Return ONLY a JSON array of query strings.
"""),
("human", "Research question: {question}\nDepth: {depth}"),
])
import json
response = await llm.ainvoke(
prompt.format_messages(
question=state.research_question,
depth=state.depth,
)
)
try:
# Parse query list from response
content = response.content.strip()
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
queries = json.loads(content)
except (json.JSONDecodeError, IndexError):
# Fallback: use the research question directly
queries = [state.research_question]
return {
"search_queries": queries[:5],
"nodes_visited": ["plan_queries"],
}
async def search_node(state: ResearchState) -> dict:
"""Execute search queries and populate citation registry."""
citations = await search_web(
queries=state.search_queries,
registry=_registry,
max_results_per_query=5,
)
return {
"search_citation_ids": [c.id for c in citations],
"nodes_visited": ["search"],
}
async def extract_node(state: ResearchState) -> dict:
"""Extract full content from top source URLs."""
# Get citation objects from registry
all_citations = [_registry.get(cid) for cid in state.search_citation_ids if _registry.get(cid)]
content_map = await extract_top_sources(all_citations, max_sources=state.max_sources // 2)
return {
"extracted_content": content_map,
"nodes_visited": ["extract"],
}
async def synthesize_node(state: ResearchState) -> dict:
"""Synthesize findings from all sources into a coherent narrative."""
citations_text = _registry.format_for_agent()
# Combine extracted content
source_content = "\n\n".join([
f"[{cid}] Full Content:\n{content[:2000]}"
for cid, content in state.extracted_content.items()
])
prompt = ChatPromptTemplate.from_messages([
("system", """You are a research analyst synthesizing information from multiple sources.
Create a comprehensive, accurate synthesis that:
- Covers all key aspects of the research question
- Cites sources using [cite-XXX] format inline
- Acknowledges conflicting information honestly
- Distinguishes facts from analysis/opinion
- Identifies 3-5 key claims that should be fact-checked
Format your response as:
SYNTHESIS:
[Your synthesis here with inline citations]
KEY_CLAIMS:
- [Claim 1 that needs verification]
- [Claim 2]
- ...
"""),
("human", """Research Question: {question}
Sources Available:
{citations}
Extracted Full Content:
{source_content}
Synthesize these findings comprehensively.
"""),
])
response = await llm.ainvoke(
prompt.format_messages(
question=state.research_question,
citations=citations_text[:5000],
source_content=source_content[:8000],
)
)
content = response.content
# Parse synthesis and claims
synthesis = content
claims = []
if "KEY_CLAIMS:" in content:
parts = content.split("KEY_CLAIMS:")
synthesis = parts[0].replace("SYNTHESIS:", "").strip()
claims_text = parts[1].strip()
claims = [
line.lstrip("- ").strip()
for line in claims_text.splitlines()
if line.strip() and line.strip().startswith("-")
]
return {
"raw_synthesis": synthesis,
"key_claims": claims[:5],
"nodes_visited": ["synthesize"],
}
async def fact_check_node(state: ResearchState) -> dict:
"""Verify key claims against additional sources."""
if not state.key_claims or state.fact_check_iterations >= 2:
return {"nodes_visited": ["fact_check_skip"]}
fact_check_results = {}
for claim in state.key_claims:
# Search specifically to verify this claim
verification_query = f"verify: {claim[:100]}"
verifying_citations = await search_web(
queries=[verification_query],
registry=_registry,
max_results_per_query=3,
)
if not verifying_citations:
fact_check_results[claim] = "unverifiable"
continue
# Ask LLM to assess if sources support the claim
snippets = "\n\n".join([
f"Source: {c.title}\n{c.snippet}"
for c in verifying_citations[:3]
])
assessment_prompt = f"""Does the following evidence support, dispute, or not address this claim?
Claim: {claim}
Evidence:
{snippets}
Respond with exactly one word: confirmed, disputed, or unverifiable"""
response = await llm.ainvoke([{"role": "user", "content": assessment_prompt}])
result = response.content.strip().lower()
if result not in {"confirmed", "disputed", "unverifiable"}:
result = "unverifiable"
fact_check_results[claim] = result
# Update citations with fact check results
for c in verifying_citations:
if c.id in _registry._citations:
_registry._citations[c.id].fact_checked = True
_registry._citations[c.id].fact_check_result = result
return {
"fact_check_results": fact_check_results,
"fact_check_iterations": state.fact_check_iterations + 1,
"nodes_visited": ["fact_check"],
}
async def generate_report_node(state: ResearchState) -> dict:
"""Generate the final structured markdown research report."""
# Build fact-checking summary
fc_summary = ""
if state.fact_check_results:
confirmed = [c for c, r in state.fact_check_results.items() if r == "confirmed"]
disputed = [c for c, r in state.fact_check_results.items() if r == "disputed"]
unverifiable = [c for c, r in state.fact_check_results.items() if r == "unverifiable"]
fc_summary = "\n\n## Fact-Check Summary\n"
if confirmed:
fc_summary += f"\n**Confirmed claims:** {len(confirmed)}\n"
if disputed:
fc_summary += f"\n**Disputed claims ({len(disputed)}):**\n"
for c in disputed:
fc_summary += f"- {c}\n"
if unverifiable:
fc_summary += f"\n**Could not verify:** {len(unverifiable)}\n"
report = f"""# Research Report: {state.research_question}
*Generated by AI Research Agent | Sources: {len(_registry.all())} | Date: {__import__('datetime').date.today()}*
---
## Executive Summary
{state.raw_synthesis[:500]}...
---
## Detailed Findings
{state.raw_synthesis}
{fc_summary}
---
{_registry.format_bibliography()}
---
*Note: This report was generated by an AI research agent. All claims are sourced from the citations above.
Disputed claims are marked for human review. Verify time-sensitive information against primary sources.*
"""
return {
"final_report": report,
"bibliography": _registry.format_bibliography(),
"nodes_visited": ["generate_report"],
}
def should_fact_check(state: ResearchState) -> str:
"""Decide whether to fact-check or go straight to report."""
if state.key_claims and state.fact_check_iterations < 2:
return "fact_check"
return "generate_report"
def build_research_graph() -> StateGraph:
"""Build the research agent graph."""
workflow = StateGraph(ResearchState)
workflow.add_node("plan_queries", plan_queries_node)
workflow.add_node("search", search_node)
workflow.add_node("extract", extract_node)
workflow.add_node("synthesize", synthesize_node)
workflow.add_node("fact_check", fact_check_node)
workflow.add_node("generate_report", generate_report_node)
workflow.add_edge(START, "plan_queries")
workflow.add_edge("plan_queries", "search")
workflow.add_edge("search", "extract")
workflow.add_edge("extract", "synthesize")
workflow.add_conditional_edges(
"synthesize",
should_fact_check,
{"fact_check": "fact_check", "generate_report": "generate_report"},
)
workflow.add_edge("fact_check", "generate_report")
workflow.add_edge("generate_report", END)
return workflow
research_graph = build_research_graph().compile()
Step 4: Running the Research Agent#
# main.py
import asyncio
from research_agent.graph import research_graph, _registry
from research_agent.state import ResearchState
async def run_research(question: str, depth: str = "standard") -> str:
"""Run the research agent and return the final report."""
# Reset registry for each research session
_registry.__init__()
initial_state = ResearchState(
research_question=question,
depth=depth,
max_sources=10 if depth == "deep" else 6,
)
result = await research_graph.ainvoke(initial_state)
return result["final_report"]
async def main():
questions = [
"What are the key trends in AI agent frameworks in 2026?",
"How does retrieval-augmented generation compare to fine-tuning for domain-specific AI?",
]
for question in questions:
print(f"\nResearching: {question}")
print("=" * 60)
report = await run_research(question, depth="standard")
print(report[:2000])
print("... [truncated]")
# Save full report to file
filename = question[:50].replace(" ", "_").replace("?", "") + ".md"
with open(f"reports/{filename}", "w") as f:
f.write(report)
print(f"\nFull report saved to reports/{filename}")
if __name__ == "__main__":
import os
os.makedirs("reports", exist_ok=True)
asyncio.run(main())
Common Issues and Solutions#
Issue: Agent hallucinates citations that don't exist
The agent should only reference citation IDs that are in the registry. Add a post-processing step that validates every [cite-XXX] reference in the synthesis against the registry and removes any that don't exist. Also add explicit instruction: "Only use citation IDs from the list provided. Do not invent new citation IDs."
Issue: Extracted content is too long for context window
Limit extracted content to 2,000-3,000 characters per source. Use the article/main tag extraction approach shown in extract_page_content() to get the most relevant portion. For very long documents, extract only the first 3,000 characters plus any paragraph that mentions your key search terms.
Issue: Fact-checking loop runs too many iterations
Set max_fact_check_iterations=2 and enforce it in the conditional edge. After 2 rounds, mark remaining unverified claims as "unverifiable" rather than continuing to search. The marginal accuracy gain from additional fact-checking rounds does not justify the cost.
Production Considerations#
Parallelism: The search and extraction steps can run much faster with parallel execution. The current implementation already parallelizes search queries with asyncio.gather(). Extend this to extract all sources in parallel as well.
Cost management: A typical deep research run costs $0.05-0.20 in LLM API calls and $0.01-0.05 in Tavily API costs. Set a hard token budget per research session. For high-volume research applications, add caching for repeated queries.
Quality scoring: After generating the report, run an automated quality check: minimum source count (5+), minimum word count (800+), citation density (at least 1 citation per 100 words), and fact-check coverage (all key claims verified). Reject reports that fail quality checks and re-run with expanded search.
Next Steps#
- Connect this agent to agentic RAG for document-grounded research
- Add monitoring to track research quality
- Implement caching for repeated research queries
- Review agent tracing for debugging the fact-check loop
- Explore CrewAI workflow patterns as an alternative implementation