AI Agent State Management: Conversation State, Memory and Checkpointing
State management is the difference between an AI toy and a production AI agent. A stateless agent answers questions. A stateful agent maintains context across a long conversation, remembers that you told it your preferences three exchanges ago, can pause mid-task and resume hours later, and accumulates knowledge about your specific situation over time.
In this tutorial, we cover LangGraph's state management system in depth — from basic TypedDict state definitions to persistent checkpointing across sessions, and advanced patterns for managing long-running agents.
Prerequisites: Python 3.11+, basic familiarity with LangGraph concepts, OpenAI API key.
Related: Agent State Glossary | AI Agent Memory Systems | Build a Multi-Agent Pipeline
The Anatomy of Agent State#
In LangGraph, state is a typed dictionary (a TypedDict) that flows through every node in your agent graph. Each node receives the current state and returns updates to it.
from typing import TypedDict, Annotated, List, Optional
import operator
class AgentState(TypedDict):
# Core state fields
messages: Annotated[list, operator.add] # add: new messages are appended
current_task: str # What the agent is currently working on
task_complete: bool # Whether the current task is done
# Accumulated knowledge
context: dict # Persistent facts about the user/session
tool_results: Annotated[list, operator.add] # Results from tool calls
# Control flow
iteration: int # How many loops the agent has run
errors: List[str] # Error log for debugging
next_action: Optional[str] # Next step the agent plans to take
The Reducer Pattern#
The Annotated[list, operator.add] pattern is crucial — it tells LangGraph how to combine state values when multiple nodes update the same field simultaneously.
import operator
from typing import Annotated
# Without annotation: last write wins (for scalar values)
class SimpleState(TypedDict):
current_task: str # Last node to write this wins
# With operator.add: values are concatenated (for lists)
class AccumulatingState(TypedDict):
messages: Annotated[list, operator.add] # Each node's messages are added to the list
# Custom reducer: apply business logic to state merging
def merge_contexts(existing: dict, new: dict) -> dict:
"""Merge context dicts, with new values overwriting old ones."""
return {**existing, **new}
class CustomState(TypedDict):
context: Annotated[dict, merge_contexts] # Custom merge logic
Building a Stateful Agent Step by Step#
Step 1: Define Your State#
Design your state schema carefully — it should contain everything any node in your graph might need to read or write.
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from typing import TypedDict, Annotated, List, Optional
import operator
class AssistantState(TypedDict):
"""State for a customer service assistant agent."""
# Conversation
messages: Annotated[list, operator.add]
# User context (accumulates over time)
user_name: Optional[str]
user_preferences: dict
user_history: List[str] # Summaries of past interactions
# Current task
current_intent: str # What the user is trying to do
collected_info: dict # Information gathered so far for current task
task_complete: bool
# Quality control
turn_count: int
needs_escalation: bool
escalation_reason: Optional[str]
Step 2: Define Agent Nodes#
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
def understand_intent_node(state: AssistantState) -> dict:
"""Classify what the user wants to do."""
last_message = state["messages"][-1].content
context = state.get("user_context", {})
intent_prompt = f"""Classify this user message into one of these intents:
- account_inquiry: User asking about their account
- support_request: User has a technical problem
- billing_question: User asking about payments or subscriptions
- general_question: General information request
- complaint: User expressing dissatisfaction
- other: Doesn't fit above categories
User context: {context}
User message: {last_message}
Respond with just the intent label and a brief explanation.
Format: INTENT: [label] | REASON: [brief explanation]"""
response = llm.invoke([HumanMessage(content=intent_prompt)])
content = response.content
# Parse intent
intent = "general_question"
if "INTENT:" in content:
intent_part = content.split("INTENT:")[1].split("|")[0].strip()
intent = intent_part.lower().replace(" ", "_")
return {
"current_intent": intent,
"turn_count": state.get("turn_count", 0) + 1
}
def gather_context_node(state: AssistantState) -> dict:
"""Extract relevant context from the conversation."""
messages = state["messages"]
# Ask LLM to extract facts from the conversation
extract_prompt = f"""Extract key facts from this conversation that should be remembered.
Look for: user name, preferences, account details, previous issues mentioned.
Conversation:
{chr(10).join([f"{m.type.upper()}: {m.content}" for m in messages[-10:]])}
Return as JSON with keys: user_name, preferences (dict), important_facts (list).
If no facts found, return empty values."""
response = llm.invoke([HumanMessage(content=extract_prompt)])
# Parse context (simplified - production code should use structured output)
import json
try:
# Extract JSON from response
content = response.content
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
extracted = json.loads(content)
except (json.JSONDecodeError, IndexError):
extracted = {"user_name": None, "preferences": {}, "important_facts": []}
updates = {}
if extracted.get("user_name") and not state.get("user_name"):
updates["user_name"] = extracted["user_name"]
if extracted.get("preferences"):
existing_prefs = state.get("user_preferences", {})
updates["user_preferences"] = {**existing_prefs, **extracted["preferences"]}
return updates
def respond_node(state: AssistantState) -> dict:
"""Generate a response based on current state."""
intent = state.get("current_intent", "general_question")
user_name = state.get("user_name", "")
preferences = state.get("user_preferences", {})
# Build personalized system prompt
system_content = f"""You are a helpful customer service assistant.
{f"The user's name is {user_name}." if user_name else ""}
{f"User preferences: {preferences}" if preferences else ""}
Current intent: {intent}
Keep responses concise and helpful. If you need more information, ask one question at a time."""
# Build message history (last 20 messages to manage context)
recent_messages = state["messages"][-20:]
response = llm.invoke([
SystemMessage(content=system_content),
*recent_messages
])
return {
"messages": [AIMessage(content=response.content)],
"task_complete": True # Simplified - production logic would be more nuanced
}
def check_escalation_node(state: AssistantState) -> dict:
"""Determine if this conversation needs human escalation."""
messages = state["messages"]
turn_count = state.get("turn_count", 0)
intent = state.get("current_intent", "")
escalate = False
reason = None
# Escalate on complaints after multiple turns
if intent == "complaint" and turn_count > 3:
escalate = True
reason = "Persistent complaint requiring human attention"
# Escalate on high turn count (agent struggling)
if turn_count > 10:
escalate = True
reason = "Long conversation indicating unresolved issue"
return {
"needs_escalation": escalate,
"escalation_reason": reason
}
Step 3: Build the Routing Logic#
def route_after_understanding(state: AssistantState) -> str:
"""Route based on the classified intent."""
if state.get("needs_escalation"):
return "escalate"
intent = state.get("current_intent", "general_question")
if intent in ["account_inquiry", "billing_question"]:
return "gather_context" # Need more context for account-specific help
return "respond" # General questions can be answered directly
def route_after_response(state: AssistantState) -> str:
"""Decide next step after responding."""
if state.get("needs_escalation"):
return "escalate"
return END # Conversation continues via external message input
Step 4: Wire Up the Graph#
from langgraph.graph import StateGraph, END
def build_assistant_graph():
workflow = StateGraph(AssistantState)
# Add all nodes
workflow.add_node("understand_intent", understand_intent_node)
workflow.add_node("gather_context", gather_context_node)
workflow.add_node("respond", respond_node)
workflow.add_node("check_escalation", check_escalation_node)
workflow.add_node("escalate", lambda state: {
"messages": [AIMessage(content="I'm connecting you with a human agent who can better assist you.")]
})
# Entry point
workflow.set_entry_point("understand_intent")
# Edges
workflow.add_edge("understand_intent", "check_escalation")
workflow.add_conditional_edges(
"check_escalation",
route_after_understanding,
{
"gather_context": "gather_context",
"respond": "respond",
"escalate": "escalate"
}
)
workflow.add_edge("gather_context", "respond")
workflow.add_conditional_edges(
"respond",
route_after_response,
{
"escalate": "escalate",
END: END
}
)
workflow.add_edge("escalate", END)
return workflow
Implementing State Persistence with Checkpointing#
Checkpointing is the mechanism that allows agents to resume across sessions, server restarts, and long time gaps.
In-Memory Checkpointing (Development)#
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = build_assistant_graph().compile(checkpointer=checkpointer)
# Thread ID identifies a specific conversation/session
config = {"configurable": {"thread_id": "user_12345"}}
# First interaction
result = graph.invoke(
{"messages": [HumanMessage(content="Hi, my name is Sarah and I need help with my account")]},
config=config
)
print(result["messages"][-1].content)
# Second interaction — agent remembers Sarah and the account context
result = graph.invoke(
{"messages": [HumanMessage(content="I was charged twice last month")]},
config=config
)
print(result["messages"][-1].content)
# Agent knows this is Sarah and has account inquiry context from previous turn
Persistent Checkpointing with SQLite (Production-Light)#
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
# Creates a persistent SQLite database
conn = sqlite3.connect("agent_checkpoints.db", check_same_thread=False)
checkpointer = SqliteSaver(conn)
graph = build_assistant_graph().compile(checkpointer=checkpointer)
# State persists across process restarts
config = {"configurable": {"thread_id": "user_12345"}}
Persistent Checkpointing with PostgreSQL (Production)#
from langgraph.checkpoint.postgres import PostgresSaver
import psycopg
connection_string = "postgresql://user:password@localhost/agentdb"
with psycopg.connect(connection_string) as conn:
checkpointer = PostgresSaver(conn)
checkpointer.setup() # Creates required tables
graph = build_assistant_graph().compile(checkpointer=checkpointer)
Resuming from a Checkpoint#
# Get current state for a thread
current_state = graph.get_state(config)
print("Current state snapshot:")
print(f" User name: {current_state.values.get('user_name')}")
print(f" Turn count: {current_state.values.get('turn_count')}")
print(f" Intent: {current_state.values.get('current_intent')}")
# Get the history of all checkpoints for this thread
state_history = list(graph.get_state_history(config))
print(f"\nThis conversation has {len(state_history)} checkpoints")
# Resume from an older checkpoint (for debugging or rollback)
specific_checkpoint = state_history[2] # 3rd checkpoint back
result = graph.invoke(
{"messages": [HumanMessage(content="Let me try again")]},
config={**config, "checkpoint_id": specific_checkpoint.config["configurable"]["checkpoint_id"]}
)
Advanced: State Summarization for Long-Running Agents#
As conversations grow, managing context window size becomes critical.
from langchain_core.messages import SystemMessage
def summarize_messages_node(state: AssistantState) -> dict:
"""Periodically summarize conversation history to manage context length."""
messages = state["messages"]
turn_count = state.get("turn_count", 0)
# Summarize every 20 turns
if turn_count % 20 != 0 or len(messages) < 10:
return {} # No changes needed
# Create a summary of the last 20 messages
messages_to_summarize = messages[:-5] # Keep last 5 messages intact
summary_prompt = f"""Summarize this conversation concisely, preserving:
1. Key facts about the user (name, preferences, account info)
2. Issues raised and how they were resolved
3. Any unresolved questions or commitments made
4. Important context for future interactions
Conversation:
{chr(10).join([f"{m.type}: {m.content}" for m in messages_to_summarize])}"""
summary_response = llm.invoke([HumanMessage(content=summary_prompt)])
# Replace old messages with summary + recent messages
summary_message = SystemMessage(
content=f"[Conversation Summary]: {summary_response.content}"
)
recent_messages = messages[-5:]
new_messages = [summary_message] + recent_messages
# Note: We reset messages (last write wins, not appending)
return {"messages": new_messages}
# Add summarization to graph
workflow.add_node("summarize", summarize_messages_node)
# Add edge to trigger summarization periodically
State Management Patterns Reference#
Pattern 1: Accumulating State (append-only)#
class AppendOnlyState(TypedDict):
events: Annotated[list, operator.add] # New items added, old items kept
Pattern 2: Replacement State (last writer wins)#
class ReplacementState(TypedDict):
current_status: str # Replaced each time a node writes to it
user_name: Optional[str]
Pattern 3: Custom Merge Logic#
def smart_merge(existing: dict, new: dict) -> dict:
"""Merge dicts, but never overwrite with None values."""
merged = dict(existing)
for key, value in new.items():
if value is not None:
merged[key] = value
return merged
class SmartMergeState(TypedDict):
user_profile: Annotated[dict, smart_merge]
Pattern 4: Time-Bounded State#
from datetime import datetime, timedelta
def expire_old_facts(existing: list, new: list) -> list:
"""Keep only facts from the last 24 hours."""
cutoff = datetime.now() - timedelta(hours=24)
fresh = [f for f in existing if f.get("timestamp", datetime.min) > cutoff]
return fresh + new
class TimeBoundedState(TypedDict):
recent_facts: Annotated[list, expire_old_facts]
Debugging Agent State#
# Inspect state at any point
def debug_state(graph, config):
state = graph.get_state(config)
print("=== Current Agent State ===")
for key, value in state.values.items():
if key == "messages":
print(f" messages: [{len(value)} messages]")
for msg in value[-3:]: # Show last 3
print(f" [{msg.type}]: {msg.content[:100]}...")
else:
print(f" {key}: {value}")
print(f"\n Next node: {state.next}")
print(f" Checkpoint ID: {state.config['configurable'].get('checkpoint_id', 'none')}")
# Use during development
debug_state(graph, config)
Key Takeaways#
State management is foundational to building useful AI agents:
- TypedDict schemas define what information flows through your agent — design them carefully upfront
- Reducers (like
operator.add) control how state is updated when multiple nodes write to the same field - Thread IDs separate different conversations/sessions — never mix thread IDs across users
- MemorySaver for development, PostgresSaver or SqliteSaver for production
- Summarization prevents context window overflow in long-running agents
- State inspection is the primary debugging tool — always check what state your agent is seeing
Next steps: Explore AI Agent Memory Systems for implementing long-term memory beyond session state, and Multi-Agent Pipelines for coordinating multiple stateful agents.