Organizations accumulate thousands of PDFs, Word documents, contracts, invoices, and reports ā but searching and extracting information from them remains a manual task. A document processing AI agent changes that entirely: it reads files of any format, chunks and embeds them into a local vector store, classifies them by type, extracts structured fields like dates, totals, and parties, and answers questions about their content using retrieval-augmented generation.
In this tutorial you will build a complete document processing agent using LangChain, pdfplumber for PDF parsing, python-docx for Word files, and ChromaDB for local vector storage. The agent exposes five tools that can be called autonomously or triggered in a fixed pipeline, and works entirely locally without sending document content to external storage services.
Prerequisites#
Install dependencies before starting:
pip install langchain langchain-openai langchain-community \
chromadb pdfplumber python-docx sentence-transformers \
pydantic python-dotenv tiktoken
You will need:
- Python 3.10 or later
- An OpenAI API key (
OPENAI_API_KEY) ā or substitute a local embedding model - A directory of PDF or DOCX files to process
- Approximately 500 MB disk space for the Chroma vector store
# .env
OPENAI_API_KEY=sk-...
CHROMA_PERSIST_DIR=./chroma_db
DOCS_DIR=./documents
Architecture Overview#
The agent uses a five-tool pipeline. Each tool can be called independently or chained automatically by the agent based on the user's request.
Input: PDF / DOCX file path
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāā
ā extract_text ā ā pdfplumber (PDF) or python-docx (DOCX)
ā (parser tool) ā ā raw text string
āāāāāāāāāāā¬āāāāāāāāāāāā
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāā
ā classify_document ā ā LLM classification: invoice / contract /
ā (classifier tool) ā report / resume / other + confidence score
āāāāāāāāāāā¬āāāāāāāāāāāā
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāā
ā chunk_document ā ā RecursiveCharacterTextSplitter
ā (chunker tool) ā ā List[Document] with metadata
āāāāāāāāāāā¬āāāāāāāāāāāā
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāā
ā embed_and_store ā ā OpenAIEmbeddings ā ChromaDB
ā (storage tool) ā persisted to disk, deduped by file hash
āāāāāāāāāāā¬āāāāāāāāāāāā
ā
ā¼
āāāāāāāāāāāāāāāāāāāāāāā
ā query_document ā ā Similarity search + LLM answer generation
ā (RAG query tool) ā supports single-doc or cross-doc queries
āāāāāāāāāāāāāāāāāāāāāāā
The Chroma vector store persists to disk so documents are indexed once and queried repeatedly without re-processing.
Step 1: Text Extraction Tool#
# tools/extract_text.py
import hashlib
import os
from pathlib import Path
from langchain.tools import StructuredTool
from pydantic import BaseModel
try:
import pdfplumber
PDF_AVAILABLE = True
except ImportError:
PDF_AVAILABLE = False
try:
from docx import Document as DocxDocument
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
class ExtractTextInput(BaseModel):
file_path: str
def extract_text(file_path: str) -> dict:
"""
Extract raw text from a PDF or DOCX file.
Returns a dict with 'text', 'page_count', 'file_hash', and 'file_type'.
"""
path = Path(file_path)
if not path.exists():
return {"error": f"File not found: {file_path}"}
suffix = path.suffix.lower()
# Compute file hash for deduplication
with open(file_path, "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()[:16]
if suffix == ".pdf":
if not PDF_AVAILABLE:
return {"error": "pdfplumber not installed. Run: pip install pdfplumber"}
with pdfplumber.open(file_path) as pdf:
pages = []
for page in pdf.pages:
text = page.extract_text()
if text:
pages.append(text)
full_text = "\n\n".join(pages)
return {
"text": full_text,
"page_count": len(pdf.pages) if 'pdf' in dir() else len(pages),
"file_hash": file_hash,
"file_type": "pdf",
"file_name": path.name,
}
elif suffix in (".docx", ".doc"):
if not DOCX_AVAILABLE:
return {"error": "python-docx not installed. Run: pip install python-docx"}
doc = DocxDocument(file_path)
paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
# Also extract text from tables
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(
cell.text.strip() for cell in row.cells if cell.text.strip()
)
if row_text:
paragraphs.append(row_text)
full_text = "\n\n".join(paragraphs)
return {
"text": full_text,
"page_count": None,
"file_hash": file_hash,
"file_type": "docx",
"file_name": path.name,
}
elif suffix == ".txt":
with open(file_path, "r", encoding="utf-8", errors="replace") as f:
full_text = f.read()
return {
"text": full_text,
"page_count": 1,
"file_hash": file_hash,
"file_type": "txt",
"file_name": path.name,
}
else:
return {"error": f"Unsupported file type: {suffix}. Supported: .pdf, .docx, .txt"}
extract_text_tool = StructuredTool.from_function(
func=extract_text,
name="extract_text",
description=(
"Extract raw text from a PDF, DOCX, or TXT file. "
"Returns the full text content along with page count and a file hash "
"for deduplication. Always call this first before other document tools."
),
args_schema=ExtractTextInput,
)
Step 2: Document Classification Tool#
# tools/classify_document.py
from langchain.tools import StructuredTool
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from pydantic import BaseModel
from typing import Optional
import json
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
CLASSIFY_PROMPT = ChatPromptTemplate.from_messages([
("system", (
"You are a document classification system. Analyze the provided text "
"and classify the document. Output only valid JSON with these exact keys:\n"
"- type: one of [invoice, contract, report, resume, policy, email, other]\n"
"- confidence: float 0.0-1.0\n"
"- key_fields: list of important fields you can see "
"(e.g. ['invoice_number', 'total_amount', 'due_date'] for invoices)\n"
"- summary: one sentence description of the document"
)),
("human", "Classify this document:\n\n{text_sample}"),
])
class ClassifyDocumentInput(BaseModel):
text: str
file_name: Optional[str] = None
def classify_document(text: str, file_name: Optional[str] = None) -> dict:
"""
Classify a document by type and identify its key fields.
Uses the first 2000 characters for classification to minimize token usage.
"""
text_sample = text[:2000]
if file_name:
text_sample = f"File name: {file_name}\n\n{text_sample}"
chain = CLASSIFY_PROMPT | llm
result = chain.invoke({"text_sample": text_sample})
try:
content = result.content.strip()
if content.startswith("```"):
content = "\n".join(content.split("\n")[1:-1])
classification = json.loads(content)
except json.JSONDecodeError:
classification = {
"type": "other",
"confidence": 0.5,
"key_fields": [],
"summary": "Classification failed ā document stored as unclassified",
}
# Determine routing action based on type
routing_map = {
"invoice": "accounts_payable_workflow",
"contract": "legal_review_queue",
"report": "analytics_dashboard",
"resume": "hr_applicant_tracker",
"policy": "compliance_repository",
"email": "correspondence_archive",
"other": "general_storage",
}
classification["routing_destination"] = routing_map.get(
classification.get("type", "other"), "general_storage"
)
return classification
classify_document_tool = StructuredTool.from_function(
func=classify_document,
name="classify_document",
description=(
"Classify a document as invoice, contract, report, resume, policy, email, "
"or other. Returns document type, confidence score, key fields present, "
"a one-sentence summary, and a routing destination. "
"Call this after extract_text."
),
args_schema=ClassifyDocumentInput,
)
Step 3: Document Chunking Tool#
# tools/chunk_document.py
from langchain.tools import StructuredTool
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from pydantic import BaseModel
from typing import Optional, List
class ChunkDocumentInput(BaseModel):
text: str
file_name: str
file_hash: str
document_type: Optional[str] = "other"
chunk_size: int = 1000
chunk_overlap: int = 200
def chunk_document(
text: str,
file_name: str,
file_hash: str,
document_type: str = "other",
chunk_size: int = 1000,
chunk_overlap: int = 200,
) -> List[dict]:
"""
Split document text into overlapping chunks suitable for embedding.
Returns a list of chunk dicts with text and metadata.
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""],
)
chunks = splitter.split_text(text)
chunk_dicts = []
for i, chunk_text in enumerate(chunks):
chunk_dicts.append({
"text": chunk_text,
"metadata": {
"file_name": file_name,
"file_hash": file_hash,
"document_type": document_type,
"chunk_index": i,
"total_chunks": len(chunks),
}
})
return {
"chunks": chunk_dicts,
"total_chunks": len(chunks),
"avg_chunk_length": sum(len(c) for c in chunks) // max(len(chunks), 1),
}
chunk_document_tool = StructuredTool.from_function(
func=chunk_document,
name="chunk_document",
description=(
"Split document text into overlapping chunks for embedding. "
"Returns a list of text chunks with metadata. "
"Call this after classify_document and before embed_and_store."
),
args_schema=ChunkDocumentInput,
)
Step 4: Embedding and Storage Tool#
# tools/embed_and_store.py
import os
from langchain.tools import StructuredTool
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.schema import Document
from pydantic import BaseModel
from typing import List
from dotenv import load_dotenv
load_dotenv()
CHROMA_PERSIST_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")
COLLECTION_NAME = "documents"
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
class EmbedAndStoreInput(BaseModel):
chunks: List[dict] # List of {"text": str, "metadata": dict}
file_hash: str
def embed_and_store(chunks: List[dict], file_hash: str) -> dict:
"""
Embed document chunks and store them in the local Chroma vector database.
Skips embedding if the file_hash already exists in the store (deduplication).
Returns the number of chunks stored and the collection name.
"""
# Check for existing documents with this file hash to avoid duplicates
try:
vectorstore = Chroma(
collection_name=COLLECTION_NAME,
embedding_function=embeddings,
persist_directory=CHROMA_PERSIST_DIR,
)
# Query for existing entries with this file hash
existing = vectorstore.get(where={"file_hash": file_hash})
if existing and existing.get("ids"):
return {
"status": "skipped",
"reason": "file_hash already in store",
"existing_chunk_count": len(existing["ids"]),
"file_hash": file_hash,
}
except Exception:
pass # Collection doesn't exist yet ā proceed with creation
documents = [
Document(page_content=chunk["text"], metadata=chunk["metadata"])
for chunk in chunks
]
vectorstore = Chroma.from_documents(
documents=documents,
embedding=embeddings,
collection_name=COLLECTION_NAME,
persist_directory=CHROMA_PERSIST_DIR,
)
return {
"status": "stored",
"chunks_embedded": len(documents),
"file_hash": file_hash,
"collection": COLLECTION_NAME,
"persist_dir": CHROMA_PERSIST_DIR,
}
embed_and_store_tool = StructuredTool.from_function(
func=embed_and_store,
name="embed_and_store",
description=(
"Embed document chunks using OpenAI embeddings and persist them to the "
"local Chroma vector database. Automatically deduplicates by file hash. "
"Call this after chunk_document."
),
args_schema=EmbedAndStoreInput,
)
Step 5: RAG Query Tool#
# tools/query_document.py
import os
from langchain.tools import StructuredTool
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from pydantic import BaseModel
from typing import Optional
from dotenv import load_dotenv
load_dotenv()
CHROMA_PERSIST_DIR = os.getenv("CHROMA_PERSIST_DIR", "./chroma_db")
COLLECTION_NAME = "documents"
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
QA_PROMPT = PromptTemplate(
input_variables=["context", "question"],
template=(
"Use the following document excerpts to answer the question accurately. "
"If the answer is not in the excerpts, say 'Not found in documents.'\n\n"
"Document excerpts:\n{context}\n\n"
"Question: {question}\n\n"
"Answer:"
),
)
class QueryDocumentInput(BaseModel):
question: str
file_name_filter: Optional[str] = None
top_k: int = 5
def query_document(
question: str,
file_name_filter: Optional[str] = None,
top_k: int = 5,
) -> dict:
"""
Answer a question about stored documents using RAG.
Optionally filter to a specific document by file name.
Returns the answer and the source document chunks used.
"""
vectorstore = Chroma(
collection_name=COLLECTION_NAME,
embedding_function=embeddings,
persist_directory=CHROMA_PERSIST_DIR,
)
# Apply metadata filter if a specific document is requested
search_kwargs = {"k": top_k}
if file_name_filter:
search_kwargs["filter"] = {"file_name": file_name_filter}
retriever = vectorstore.as_retriever(search_kwargs=search_kwargs)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
return_source_documents=True,
chain_type_kwargs={"prompt": QA_PROMPT},
)
result = qa_chain.invoke({"query": question})
sources = []
for doc in result.get("source_documents", []):
sources.append({
"file_name": doc.metadata.get("file_name"),
"document_type": doc.metadata.get("document_type"),
"chunk_index": doc.metadata.get("chunk_index"),
"excerpt": doc.page_content[:200] + "...",
})
return {
"answer": result["result"],
"sources": sources,
"source_count": len(sources),
}
query_document_tool = StructuredTool.from_function(
func=query_document,
name="query_document",
description=(
"Answer questions about documents stored in the vector database using RAG. "
"Can query across all documents or filter to a specific file. "
"Use this to extract specific data like invoice totals, contract dates, "
"or report findings from previously indexed documents."
),
args_schema=QueryDocumentInput,
)
Step 6: Assemble the Document Agent#
# agent.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from tools.extract_text import extract_text_tool
from tools.classify_document import classify_document_tool
from tools.chunk_document import chunk_document_tool
from tools.embed_and_store import embed_and_store_tool
from tools.query_document import query_document_tool
load_dotenv()
TOOLS = [
extract_text_tool,
classify_document_tool,
chunk_document_tool,
embed_and_store_tool,
query_document_tool,
]
SYSTEM_PROMPT = """You are a document processing AI agent. You can index
documents into a vector database and answer questions about their content.
For INDEXING a document, follow this exact sequence:
1. extract_text ā get raw text and file hash
2. classify_document ā identify type, key fields, routing destination
3. chunk_document ā split into overlapping chunks with metadata
4. embed_and_store ā persist chunks to Chroma (auto-skips duplicates)
5. Report: document type, chunk count, routing destination
For QUERYING documents:
1. query_document ā RAG search across stored documents
2. If user asks about a specific file, pass the file_name_filter parameter
For BATCH INDEXING multiple files, process each file through steps 1-4 before
moving to the next file. Report a summary at the end.
Always report routing_destination from classification so the user knows
where each document should go in their workflow.
"""
prompt = ChatPromptTemplate.from_messages([
("system", SYSTEM_PROMPT),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
llm = ChatOpenAI(model="gpt-4o", temperature=0)
agent = create_tool_calling_agent(llm, TOOLS, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=TOOLS,
verbose=True,
max_iterations=40,
handle_parsing_errors=True,
)
def index_document(file_path: str) -> str:
return agent_executor.invoke({
"input": f"Index this document and tell me its type, key fields, and routing destination: {file_path}"
})["output"]
def query_documents(question: str, file_name: str = None) -> str:
query = question
if file_name:
query += f" (search only in file: {file_name})"
return agent_executor.invoke({"input": query})["output"]
Step 7: Batch Processing Multiple Files#
# batch_index.py
import os
from pathlib import Path
from agent import index_document
def index_directory(directory: str, extensions: list = None) -> dict:
"""
Index all documents in a directory. Returns a summary report.
"""
if extensions is None:
extensions = [".pdf", ".docx", ".txt"]
doc_dir = Path(directory)
files = [
f for f in doc_dir.iterdir()
if f.is_file() and f.suffix.lower() in extensions
]
print(f"Found {len(files)} documents to index in {directory}")
results = {"indexed": [], "failed": [], "skipped": []}
for file_path in files:
print(f"\nIndexing: {file_path.name}")
try:
result = index_document(str(file_path))
results["indexed"].append({"file": file_path.name, "result": result})
print(f" Done: {file_path.name}")
except Exception as e:
results["failed"].append({"file": file_path.name, "error": str(e)})
print(f" Failed: {file_path.name} ā {e}")
print(f"\nBatch complete: {len(results['indexed'])} indexed, "
f"{len(results['failed'])} failed")
return results
# Usage
if __name__ == "__main__":
results = index_directory("./documents")
# Query across all indexed documents
from agent import query_documents
print("\nQuerying indexed documents...")
answer = query_documents("What invoices have payment due dates in March 2026?")
print(answer)
Testing the Agent#
# tests/test_tools.py
import pytest
import tempfile
import os
def test_extract_text_txt():
from tools.extract_text import extract_text
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
f.write("This is a test document.\nSecond line of content.")
tmp_path = f.name
try:
result = extract_text(tmp_path)
assert "text" in result
assert "test document" in result["text"]
assert result["file_type"] == "txt"
assert len(result["file_hash"]) == 16
finally:
os.unlink(tmp_path)
def test_extract_text_missing_file():
from tools.extract_text import extract_text
result = extract_text("/nonexistent/path/file.pdf")
assert "error" in result
def test_chunk_document_produces_overlap():
from tools.chunk_document import chunk_document
long_text = "sentence. " * 300 # ~3000 chars
result = chunk_document(
text=long_text,
file_name="test.txt",
file_hash="abc123",
chunk_size=500,
chunk_overlap=100,
)
assert result["total_chunks"] > 1
chunks = result["chunks"]
assert all("text" in c and "metadata" in c for c in chunks)
# Verify overlap: end of chunk N should appear in start of chunk N+1
if len(chunks) >= 2:
end_of_first = chunks[0]["text"][-100:]
start_of_second = chunks[1]["text"][:200]
# Some content should overlap
words_in_first = set(end_of_first.split())
words_in_second = set(start_of_second.split())
assert len(words_in_first & words_in_second) > 0
def test_chunk_document_metadata():
from tools.chunk_document import chunk_document
result = chunk_document(
text="Short document text for testing metadata.",
file_name="test_doc.pdf",
file_hash="xyz789",
document_type="report",
)
for chunk in result["chunks"]:
assert chunk["metadata"]["file_name"] == "test_doc.pdf"
assert chunk["metadata"]["file_hash"] == "xyz789"
assert chunk["metadata"]["document_type"] == "report"
def test_embed_and_store_deduplication(tmp_path):
# This test requires OPENAI_API_KEY to be set
import os
if not os.getenv("OPENAI_API_KEY"):
pytest.skip("OPENAI_API_KEY not set")
os.environ["CHROMA_PERSIST_DIR"] = str(tmp_path)
from tools.embed_and_store import embed_and_store
chunks = [{"text": "Test chunk content.", "metadata": {"file_hash": "dup123"}}]
result1 = embed_and_store(chunks=chunks, file_hash="dup123")
assert result1["status"] == "stored"
result2 = embed_and_store(chunks=chunks, file_hash="dup123")
assert result2["status"] == "skipped"
Run tests:
pytest tests/test_tools.py -v
Production Considerations#
Handling Large PDFs: PDFs with hundreds of pages can consume significant memory when loaded entirely. Use pdfplumber's page-level iteration (already implemented above) and consider processing pages in batches of 50. For very large documents, increase chunk_size to 1500 and chunk_overlap to 300.
OCR for Scanned Documents: pdfplumber cannot extract text from scanned PDFs ā it only reads embedded text layers. For scanned documents, add pytesseract as a fallback: if extract_text returns fewer than 100 characters for a multi-page PDF, trigger OCR using pdf2image + pytesseract.
Vector Store Scaling: ChromaDB works well for collections under 1 million chunks. For larger document collections, migrate to Qdrant (self-hosted) or Pinecone (managed). The embed_and_store tool only requires a different LangChain vector store class to swap backends.
Concurrent Indexing: Wrap index_document in asyncio using loop.run_in_executor (same pattern as the lead generation tutorial) to index multiple documents in parallel. Set concurrency to 3ā5 to avoid OpenAI embedding rate limits.
Cost Management: text-embedding-3-small costs $0.02 per million tokens. A 100-page PDF typically generates 80,000ā120,000 tokens of text. Indexing 1,000 documents costs approximately $2ā8. Use gpt-4o-mini for classification and QA to keep inference costs minimal.
Document Security: Document content is sent to OpenAI for embedding and QA generation. For sensitive documents (legal contracts, HR files, financial records), use a local embedding model like sentence-transformers/all-MiniLM-L6-v2 via HuggingFaceEmbeddings and a local LLM via Ollama.
# Local embedding alternative (no API calls, no data leaves your machine)
from langchain_community.embeddings import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
Running the Full Pipeline#
# main.py
from agent import index_document, query_documents
# Index a single document
print("Indexing invoice...")
result = index_document("./documents/invoice_march_2026.pdf")
print(result)
# Index all documents in a folder
from batch_index import index_directory
index_directory("./documents")
# Ask questions across all documents
print("\nQuerying...")
answer = query_documents("What is the total amount due on all invoices?")
print(answer)
# Query a specific document
answer = query_documents(
"Who are the parties named in this contract?",
file_name="service_agreement_2026.pdf"
)
print(answer)
Frequently Asked Questions#
How accurate is the document classification?
With GPT-4o-mini and the first 2,000 characters of text, classification accuracy is above 90% for standard business documents (invoices, contracts, reports). Edge cases like hybrid documents (a report containing an embedded invoice table) may be misclassified ā add a low_confidence_threshold of 0.7 and route uncertain documents to a human review queue.
Can this agent process images or spreadsheets?
Not with the current implementation. Add openpyxl for XLSX files (read cell values row by row) and Pillow + pytesseract for images. Structured data from spreadsheets benefits from table-aware chunking ā keep rows together rather than splitting mid-table.
How do I extract specific fields like invoice totals or contract dates?
After indexing, use the query_document tool with targeted questions: "What is the invoice total?", "What is the contract start date?". For high-volume extraction (processing 1,000 invoices to a database), replace the QA step with a structured extraction prompt using Pydantic output parsing.
What happens when two documents have the same content?
The file_hash deduplication in embed_and_store skips re-indexing identical files. If the same contract is renamed and submitted again, it will have a different file name but the same hash and will be skipped cleanly.
How do I update a document that has changed?
Delete the old chunks by file hash from Chroma, then re-index the updated document. Add a delete_document tool that calls vectorstore.delete(where={"file_hash": old_hash}).
Next Steps#
- Learn How to Build a Research AI Agent to combine document knowledge with live web search
- Explore Introduction to RAG for AI Agents for deeper coverage of retrieval strategies
- Read How to Train an AI Agent on Your Own Data to move beyond retrieval to fine-tuned models
- See How to Automate Invoicing with AI Agents for a workflow that connects document extraction to accounting systems
- Browse the AI Agents Glossary: Vector Database to understand the embedding storage layer in depth