Sales development reps spend the majority of their working day on tasks a well-designed AI agent can handle: finding companies that fit your ideal customer profile, pulling contact information, scoring leads, and writing the first outreach email. What takes a human 20ā30 minutes per prospect can be compressed to under 30 seconds with an agent that runs all four steps as a single autonomous pipeline.
In this tutorial you will build a lead generation AI agent using LangChain with four custom tools: a company discovery tool powered by Tavily search, a lead enrichment tool that pulls funding, headcount, and technology stack data, an ICP qualification scorer using Pydantic, and an LLM-powered personalized email drafter. You will also add async batching so the agent can process 50+ leads per run without hitting API rate limits.
Prerequisites#
Install the required packages before you begin:
pip install langchain langchain-openai langchain-community \
tavily-python pydantic python-dotenv aiohttp asyncio
You will also need:
- Python 3.10 or later
- An OpenAI API key (
OPENAI_API_KEY) - A Tavily API key (
TAVILY_API_KEY) ā free tier at tavily.com - Basic familiarity with Python async/await patterns
Set your keys in a .env file:
OPENAI_API_KEY=sk-...
TAVILY_API_KEY=tvly-...
Architecture Overview#
The agent runs a sequential four-stage pipeline for each lead. Multiple leads are processed in parallel batches to stay within rate limits.
Input: [target industry, company size, location filters]
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāā
ā search_companies ā ā Tavily search: "B2B SaaS startups Series A NYC"
ā (discovery tool) ā
āāāāāāāāāāā¬āāāāāāāāāāāā
ā [company name, domain, description]
ā¼
āāāāāāāāāāāāāāāāāāāāāāā
ā enrich_lead ā ā Apollo/Clearbit-style mock: funding, headcount,
ā (enrichment tool) ā tech stack, decision-maker name
āāāāāāāāāāā¬āāāāāāāāāāāā
ā [enriched lead dict]
ā¼
āāāāāāāāāāāāāāāāāāāāāāā
ā qualify_lead ā ā ICP scoring: industry match, size match,
ā (ICP scorer tool) ā tech stack fit, growth signals ā score 0ā100
āāāāāāāāāāā¬āāāāāāāāāāāā
ā [qualified lead with score]
ā¼
āāāāāāāāāāāāāāāāāāāāāāā
ā draft_email ā ā GPT-4o-mini: personalized subject + body
ā (email tool) ā referencing specific company signals
āāāāāāāāāāā¬āāāāāāāāāāāā
ā
ā¼
Output: [LeadResult list, CSV export]
Leads scoring below a configurable threshold (default: 60) are filtered out before email drafting, saving API credits on unqualified prospects.
Step 1: Define the Lead Data Models#
Start with Pydantic models that enforce structure throughout the pipeline:
# models.py
from pydantic import BaseModel, Field
from typing import Optional, List
from enum import Enum
class CompanySize(str, Enum):
SEED = "1-10"
EARLY = "11-50"
GROWTH = "51-200"
MID = "201-500"
ENTERPRISE = "500+"
class ICPCriteria(BaseModel):
target_industries: List[str] = Field(
default=["SaaS", "Fintech", "Healthcare Tech"],
description="Industries that match your ICP"
)
target_sizes: List[CompanySize] = Field(
default=[CompanySize.EARLY, CompanySize.GROWTH],
description="Company headcount ranges that fit"
)
required_tech_stack: List[str] = Field(
default=["Salesforce", "HubSpot", "Slack"],
description="Technologies that signal a good fit"
)
min_funding_usd: int = Field(
default=1_000_000,
description="Minimum funding raised to qualify"
)
geography: List[str] = Field(
default=["US", "Canada", "UK"],
description="Target geographies"
)
class Lead(BaseModel):
company_name: str
domain: str
description: str
industry: Optional[str] = None
headcount: Optional[int] = None
funding_usd: Optional[int] = None
tech_stack: List[str] = Field(default_factory=list)
decision_maker_name: Optional[str] = None
decision_maker_title: Optional[str] = None
decision_maker_email: Optional[str] = None
icp_score: Optional[float] = None
outreach_subject: Optional[str] = None
outreach_body: Optional[str] = None
class LeadBatch(BaseModel):
leads: List[Lead] = Field(default_factory=list)
qualified_count: int = 0
disqualified_count: int = 0
emails_drafted: int = 0
Step 2: Build the Four Agent Tools#
Each tool is a LangChain StructuredTool with typed input schemas.
Tool 1: Company Discovery#
# tools/discovery.py
from langchain.tools import StructuredTool
from tavily import TavilyClient
from pydantic import BaseModel
from typing import List
import os
tavily = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])
class SearchCompaniesInput(BaseModel):
query: str
max_results: int = 10
def search_companies(query: str, max_results: int = 10) -> List[dict]:
"""
Search for companies matching a description. Returns a list of company
dicts with name, domain, and a short description snippet.
"""
results = tavily.search(
query=query,
search_depth="basic",
max_results=max_results,
include_domains=["crunchbase.com", "linkedin.com", "techcrunch.com"]
)
companies = []
for r in results.get("results", []):
# Extract company name from title, domain from URL
title = r.get("title", "")
url = r.get("url", "")
domain = url.split("/")[2].replace("www.", "") if url else ""
companies.append({
"company_name": title.split(" - ")[0].split(" | ")[0].strip(),
"domain": domain,
"description": r.get("content", "")[:500],
})
return companies[:max_results]
search_companies_tool = StructuredTool.from_function(
func=search_companies,
name="search_companies",
description=(
"Search for companies that match a description, industry, or funding stage. "
"Returns company name, domain, and description for each result. "
"Use queries like 'B2B SaaS Series A HR tech startup 2024'."
),
args_schema=SearchCompaniesInput,
)
Tool 2: Lead Enrichment#
In production you would call Apollo.io, Clearbit, or Hunter.io APIs here. This implementation uses a realistic mock that demonstrates the data structure:
# tools/enrichment.py
from langchain.tools import StructuredTool
from pydantic import BaseModel
from typing import Optional
import random
class EnrichLeadInput(BaseModel):
company_name: str
domain: str
# Simulated enrichment data ā replace with Apollo/Clearbit API call in production
MOCK_TECH_STACKS = [
["Salesforce", "Slack", "AWS", "React"],
["HubSpot", "Intercom", "GCP", "Vue.js"],
["Pipedrive", "Zendesk", "Azure", "Node.js"],
["Salesforce", "Marketo", "AWS", "Python"],
]
def enrich_lead(company_name: str, domain: str) -> dict:
"""
Enrich a company lead with funding data, headcount, tech stack,
and a decision-maker contact. Returns enriched lead data dict.
"""
# In production, replace this block with:
# response = requests.get(
# f"https://api.apollo.io/v1/organizations/enrich",
# params={"domain": domain},
# headers={"X-Api-Key": os.environ["APOLLO_API_KEY"]}
# )
# return response.json()
headcounts = [15, 28, 45, 92, 150, 320]
fundings = [500_000, 2_000_000, 5_000_000, 12_000_000, 25_000_000]
titles = ["VP of Sales", "Head of Growth", "CTO", "CEO", "Director of Operations"]
first_names = ["Sarah", "Michael", "Emma", "James", "Priya", "David"]
last_names = ["Chen", "Park", "Johnson", "Williams", "Patel", "Garcia"]
# Deterministic seed from domain so repeated runs return same data
seed = sum(ord(c) for c in domain)
rng = random.Random(seed)
first = rng.choice(first_names)
last = rng.choice(last_names)
username = f"{first.lower()}.{last.lower()}"
return {
"company_name": company_name,
"domain": domain,
"industry": rng.choice(["SaaS", "Fintech", "Healthcare Tech", "EdTech", "MarTech"]),
"headcount": rng.choice(headcounts),
"funding_usd": rng.choice(fundings),
"tech_stack": rng.choice(MOCK_TECH_STACKS),
"decision_maker_name": f"{first} {last}",
"decision_maker_title": rng.choice(titles),
"decision_maker_email": f"{username}@{domain}",
}
enrich_lead_tool = StructuredTool.from_function(
func=enrich_lead,
name="enrich_lead",
description=(
"Enrich a company with funding data, headcount, technology stack, "
"and decision-maker contact information. Call this after search_companies "
"for each company you want to qualify."
),
args_schema=EnrichLeadInput,
)
Tool 3: ICP Lead Qualification#
# tools/qualification.py
from langchain.tools import StructuredTool
from pydantic import BaseModel
from typing import List, Optional
from models import ICPCriteria, CompanySize
# Default ICP ā override by passing different criteria at runtime
DEFAULT_ICP = ICPCriteria()
SIZE_TO_RANGE = {
"1-10": (1, 10),
"11-50": (11, 50),
"51-200": (51, 200),
"201-500": (201, 500),
"500+": (501, 99999),
}
class QualifyLeadInput(BaseModel):
company_name: str
industry: Optional[str]
headcount: Optional[int]
funding_usd: Optional[int]
tech_stack: List[str]
def qualify_lead(
company_name: str,
industry: Optional[str],
headcount: Optional[int],
funding_usd: Optional[int],
tech_stack: List[str],
) -> dict:
"""
Score a lead against ICP criteria. Returns a score 0-100 and a
breakdown of which criteria passed or failed.
"""
icp = DEFAULT_ICP
score = 0.0
breakdown = {}
# Industry match: 30 points
industry_match = industry and any(
t.lower() in industry.lower() for t in icp.target_industries
)
if industry_match:
score += 30
breakdown["industry"] = f"PASS ({industry})"
else:
breakdown["industry"] = f"FAIL ({industry})"
# Company size match: 25 points
size_match = False
if headcount:
for target_size in icp.target_sizes:
low, high = SIZE_TO_RANGE[target_size.value]
if low <= headcount <= high:
size_match = True
break
if size_match:
score += 25
breakdown["headcount"] = f"PASS ({headcount} employees)"
else:
breakdown["headcount"] = f"FAIL ({headcount} employees)"
# Funding threshold: 20 points
funding_match = funding_usd and funding_usd >= icp.min_funding_usd
if funding_match:
score += 20
breakdown["funding"] = f"PASS (${funding_usd:,})"
else:
breakdown["funding"] = f"FAIL (${funding_usd:,} < ${icp.min_funding_usd:,})"
# Tech stack overlap: 25 points (partial credit)
overlap = [t for t in tech_stack if t in icp.required_tech_stack]
tech_score = min(25, len(overlap) * 8)
score += tech_score
breakdown["tech_stack"] = (
f"{'PASS' if tech_score > 0 else 'FAIL'} "
f"({', '.join(overlap) or 'no match'})"
)
return {
"company_name": company_name,
"icp_score": round(score, 1),
"qualified": score >= 60,
"breakdown": breakdown,
}
qualify_lead_tool = StructuredTool.from_function(
func=qualify_lead,
name="qualify_lead",
description=(
"Score a lead against the ideal customer profile (ICP). Returns a score "
"0-100 and a breakdown showing which criteria passed. Leads with score "
"below 60 should not receive outreach emails."
),
args_schema=QualifyLeadInput,
)
Tool 4: Personalized Email Drafter#
# tools/email_drafter.py
from langchain.tools import StructuredTool
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from pydantic import BaseModel
from typing import List, Optional
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
EMAIL_PROMPT = ChatPromptTemplate.from_messages([
("system", (
"You are a senior sales development representative writing concise, "
"personalized cold outreach emails. Never use generic openers like "
"'Hope this finds you well'. Reference specific company details. "
"Keep emails under 120 words. Output JSON with keys 'subject' and 'body'."
)),
("human", (
"Write a cold outreach email for:\n"
"- Contact: {name}, {title} at {company}\n"
"- Industry: {industry}\n"
"- Tech stack: {tech_stack}\n"
"- Funding: ${funding:,} raised\n"
"- Our product: {product_description}\n\n"
"Output only valid JSON."
)),
])
class DraftEmailInput(BaseModel):
company_name: str
decision_maker_name: str
decision_maker_title: str
industry: str
tech_stack: List[str]
funding_usd: int
product_description: str = (
"an AI-powered sales analytics platform that helps "
"revenue teams close 30% more deals"
)
def draft_email(
company_name: str,
decision_maker_name: str,
decision_maker_title: str,
industry: str,
tech_stack: List[str],
funding_usd: int,
product_description: str = (
"an AI-powered sales analytics platform that helps "
"revenue teams close 30% more deals"
),
) -> dict:
"""
Draft a personalized outreach email using company and contact details.
Returns a dict with 'subject' and 'body' keys.
"""
chain = EMAIL_PROMPT | llm
result = chain.invoke({
"name": decision_maker_name,
"title": decision_maker_title,
"company": company_name,
"industry": industry,
"tech_stack": ", ".join(tech_stack),
"funding": funding_usd,
"product_description": product_description,
})
import json
try:
content = result.content.strip()
# Strip markdown code fences if present
if content.startswith("```"):
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
return json.loads(content)
except json.JSONDecodeError:
return {
"subject": f"Quick question about {company_name}'s sales process",
"body": result.content,
}
draft_email_tool = StructuredTool.from_function(
func=draft_email,
name="draft_email",
description=(
"Draft a personalized outreach email for a qualified lead. "
"Requires company details and decision-maker information. "
"Only call this for leads with icp_score >= 60."
),
args_schema=DraftEmailInput,
)
Step 3: Assemble the LangChain Agent#
# agent.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from tools.discovery import search_companies_tool
from tools.enrichment import enrich_lead_tool
from tools.qualification import qualify_lead_tool
from tools.email_drafter import draft_email_tool
load_dotenv()
TOOLS = [
search_companies_tool,
enrich_lead_tool,
qualify_lead_tool,
draft_email_tool,
]
SYSTEM_PROMPT = """You are a sales development AI agent. Your job is to build
a list of qualified leads and draft personalized outreach emails for each one.
Follow this exact process for every run:
1. Call search_companies with the provided search query to find 5-10 companies.
2. For each company found, call enrich_lead to get contact and funding data.
3. For each enriched company, call qualify_lead to score it against ICP.
4. For every company with icp_score >= 60, call draft_email to write outreach.
5. Return a structured summary of all leads processed, scores, and emails drafted.
Important rules:
- Do NOT skip the enrichment step. Qualify only enriched leads.
- Do NOT draft emails for leads with icp_score < 60.
- Process all companies before returning the final summary.
"""
prompt = ChatPromptTemplate.from_messages([
("system", SYSTEM_PROMPT),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = create_tool_calling_agent(llm, TOOLS, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=TOOLS,
verbose=True,
max_iterations=50,
handle_parsing_errors=True,
)
def run_lead_generation(search_query: str) -> str:
result = agent_executor.invoke({"input": search_query})
return result["output"]
Step 4: Add Async Batching for Scale#
Processing one lead at a time is slow. Use asyncio to run multiple lead searches in parallel while respecting rate limits:
# batch_runner.py
import asyncio
from typing import List
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from agent import TOOLS, prompt
async def run_lead_batch_async(queries: List[str], concurrency: int = 5) -> List[str]:
"""
Run multiple lead generation queries concurrently.
concurrency controls max parallel agent runs to avoid rate limiting.
"""
semaphore = asyncio.Semaphore(concurrency)
async def run_single(query: str) -> str:
async with semaphore:
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = create_tool_calling_agent(llm, TOOLS, prompt)
executor = AgentExecutor(
agent=agent,
tools=TOOLS,
verbose=False,
max_iterations=50,
handle_parsing_errors=True,
)
# Run synchronous agent in thread pool to avoid blocking event loop
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: executor.invoke({"input": query})
)
return result["output"]
tasks = [run_single(q) for q in queries]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage
if __name__ == "__main__":
search_queries = [
"B2B SaaS HR tech startups Series A 2024 using Salesforce",
"Fintech startup 50-200 employees raised Series B USA",
"Healthcare technology company 2023 funding HubSpot users",
"EdTech B2B platform UK Series A growth stage",
"MarTech startup New York using Slack and AWS",
]
results = asyncio.run(run_lead_batch_async(search_queries, concurrency=3))
for i, (query, result) in enumerate(zip(search_queries, results)):
print(f"\n{'='*60}")
print(f"Query {i+1}: {query}")
print(f"Result:\n{result}")
Step 5: Export Results to CSV#
# export.py
import csv
from datetime import datetime
from typing import List
from models import Lead
def export_leads_to_csv(leads: List[Lead], output_path: str = None) -> str:
if not output_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"leads_{timestamp}.csv"
fieldnames = [
"company_name", "domain", "industry", "headcount",
"funding_usd", "tech_stack", "decision_maker_name",
"decision_maker_title", "decision_maker_email",
"icp_score", "outreach_subject", "outreach_body"
]
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for lead in leads:
row = lead.model_dump()
row["tech_stack"] = ", ".join(row.get("tech_stack", []))
writer.writerow({k: row.get(k, "") for k in fieldnames})
return output_path
Testing the Agent#
Use pytest to validate each tool independently before running the full agent:
# tests/test_tools.py
import pytest
from tools.discovery import search_companies
from tools.enrichment import enrich_lead
from tools.qualification import qualify_lead
def test_enrich_lead_returns_required_fields():
result = enrich_lead("Acme Corp", "acmecorp.com")
assert "headcount" in result
assert "funding_usd" in result
assert "tech_stack" in result
assert isinstance(result["tech_stack"], list)
def test_qualify_lead_high_score():
result = qualify_lead(
company_name="FitSaaS",
industry="SaaS",
headcount=75, # matches 51-200 range
funding_usd=5_000_000, # above 1M threshold
tech_stack=["Salesforce", "Slack"], # 2 ICP tech matches
)
assert result["icp_score"] >= 60
assert result["qualified"] is True
def test_qualify_lead_low_score():
result = qualify_lead(
company_name="SmallCo",
industry="Retail", # not in ICP industries
headcount=5, # too small
funding_usd=50_000, # below threshold
tech_stack=["Shopify"], # no ICP tech match
)
assert result["icp_score"] < 60
assert result["qualified"] is False
def test_qualify_lead_partial_score():
result = qualify_lead(
company_name="PartialFit",
industry="Fintech", # matches
headcount=600, # too large for target range
funding_usd=3_000_000, # passes
tech_stack=["HubSpot"], # 1 match
)
assert 0 < result["icp_score"] < 100
Run the test suite:
pytest tests/ -v
Production Considerations#
Before deploying this agent to handle real sales workflows, address these areas:
API Rate Limiting: The Tavily free tier allows 1,000 searches/month. OpenAI rate limits vary by tier. Implement exponential backoff with the tenacity library and cache search results with Redis to avoid redundant API calls on the same target companies.
Real Enrichment APIs: Replace the mock enrichment with Apollo.io (best for B2B contact data), Clearbit (best for firmographics), or Hunter.io (best for email verification). Each has a Python SDK.
CRM Integration: Write qualified leads directly to Salesforce, HubSpot, or Pipedrive using their REST APIs. Add a push_to_crm tool so the agent can complete the full workflow without manual data export.
Deduplication: Before enriching a company, check your CRM or a local SQLite database for existing records on the same domain. Avoid paying enrichment API costs for companies already in your pipeline.
Email Compliance: Any automated outreach must comply with CAN-SPAM (US), GDPR (EU), and CASL (Canada). Add an unsubscribe mechanism and never send to personal email addresses discovered without consent.
Monitoring: Log every tool call result to a structured log (JSON lines format works well) and track ICP score distributions over time. If average scores drop, your ICP criteria likely need recalibration.
Running the Agent#
# main.py
from agent import run_lead_generation
result = run_lead_generation(
"Find B2B SaaS companies in fintech or HR tech, "
"Series A or B, 20-200 employees, USA or UK, "
"using Salesforce or HubSpot"
)
print(result)
A typical run discovers 6ā8 companies, enriches all of them, qualifies 3ā5 that meet the ICP threshold, and produces personalized email drafts within 45ā90 seconds depending on OpenAI response latency.
Frequently Asked Questions#
Can I use a different LLM instead of GPT-4o?
Yes. The agent works with any LangChain-compatible model that supports tool calling. Substitute ChatAnthropic (Claude 3.5 Sonnet) or ChatGroq for faster, cheaper alternatives. GPT-4o-mini works well for the email drafting step where creative quality matters less than cost.
How do I customize the ICP criteria?
Modify the DEFAULT_ICP in tools/qualification.py or pass a custom ICPCriteria instance at runtime. You can also make ICP criteria configurable via environment variables or a YAML config file.
What is the typical cost per lead batch? With GPT-4o for agent reasoning and GPT-4o-mini for email drafting, processing 10 leads costs approximately $0.10ā$0.40 depending on search result lengths. Using GPT-4o-mini throughout reduces costs by 80% with minor quality trade-offs.
How do I scale this to 500+ leads per day?
Use a task queue (Celery + Redis or AWS SQS) to distribute lead batches across multiple workers. Each worker runs the agent independently. Set concurrency=3 or lower per worker to avoid triggering OpenAI rate limits across the fleet.
Can the agent handle LinkedIn data? LinkedIn strictly prohibits automated scraping in their terms of service. Use official LinkedIn Sales Navigator API integrations, or Apollo.io which aggregates LinkedIn-sourced data through compliant channels.
Next Steps#
- Explore AI Agent for Sales Automation to connect this pipeline to a full SDR workflow
- Learn How to Build a Research AI Agent for deeper company intelligence gathering
- Read the LangChain Multi-Agent Tutorial to understand tool-calling agents in depth
- See LangGraph Multi-Agent Tutorial for orchestrating multiple specialized agents with state management
- Browse the AI Agents Glossary: Tool Use to understand how agents interact with external APIs