What You'll Build#
A production monitoring system for AI agents that includes:
- Key metric definitions with implementation (latency, error rate, token usage, cost)
- LangFuse integration for distributed tracing across all LLM calls and tool invocations
- Custom Prometheus metrics with proper labels for Grafana dashboards
- Alerting rules for agent failures, cost spikes, and latency degradation
- A reusable monitoring decorator that wraps any agent function
Prerequisites#
pip install langfuse prometheus-client openai python-dotenv
docker pull prom/prometheus
docker pull grafana/grafana
- Python 3.11+
- LangFuse Cloud account (free tier) or self-hosted LangFuse
- Prometheus + Grafana for infrastructure metrics
- Basic understanding of agent tracing
Overview#
Agent monitoring operates at two levels: infrastructure metrics (latency, error rate, throughput) measured with Prometheus, and behavioral traces (which tools were called, what the agent reasoned about, quality scores) captured with LangFuse.
Both are necessary. Infrastructure metrics tell you something is wrong. LangFuse traces tell you why and help you fix it.
Step 1: Defining Key Metrics#
# monitoring/metrics.py
from prometheus_client import (
Counter,
Histogram,
Gauge,
start_http_server,
)
import time
from typing import Optional
# --- Counter Metrics (cumulative, never decreases) ---
agent_requests_total = Counter(
"agent_requests_total",
"Total number of agent requests",
labelnames=["agent_type", "status"], # status: success, error, timeout
)
agent_errors_total = Counter(
"agent_errors_total",
"Total agent errors by category",
labelnames=["agent_type", "error_category"], # category: tool_failure, llm_error, timeout
)
llm_api_calls_total = Counter(
"llm_api_calls_total",
"Total LLM API calls made",
labelnames=["model", "call_type"], # call_type: completion, tool_call
)
tokens_used_total = Counter(
"llm_tokens_used_total",
"Total tokens consumed",
labelnames=["model", "token_type"], # token_type: input, output
)
llm_cost_usd_total = Counter(
"llm_cost_usd_total",
"Total LLM API cost in USD",
labelnames=["model", "agent_type"],
)
tool_calls_total = Counter(
"agent_tool_calls_total",
"Total tool calls made by agents",
labelnames=["tool_name", "status"], # status: success, failure, timeout
)
# --- Histogram Metrics (track distributions) ---
agent_request_duration_seconds = Histogram(
"agent_request_duration_seconds",
"Agent request duration in seconds",
labelnames=["agent_type"],
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0],
)
llm_call_duration_seconds = Histogram(
"llm_call_duration_seconds",
"Individual LLM API call duration",
labelnames=["model"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0],
)
agent_tool_call_count = Histogram(
"agent_tool_call_count",
"Number of tool calls per agent run",
labelnames=["agent_type"],
buckets=[0, 1, 2, 3, 5, 8, 13, 21],
)
agent_cost_per_request_usd = Histogram(
"agent_cost_per_request_usd",
"Cost per agent request in USD",
labelnames=["agent_type", "model"],
buckets=[0.001, 0.005, 0.01, 0.05, 0.10, 0.50, 1.00, 5.00],
)
# --- Gauge Metrics (current value, can go up or down) ---
active_agent_runs = Gauge(
"active_agent_runs",
"Currently executing agent runs",
labelnames=["agent_type"],
)
circuit_breaker_state = Gauge(
"agent_circuit_breaker_state",
"Circuit breaker state: 0=closed, 1=open, 2=half_open",
labelnames=["service_name"],
)
def start_metrics_server(port: int = 8000) -> None:
"""Start Prometheus metrics HTTP endpoint."""
start_http_server(port)
print(f"Prometheus metrics available at http://localhost:{port}/metrics")
Step 2: LangFuse Integration#
LangFuse captures the full agent trace — every LLM call, tool invocation, and intermediate result:
# monitoring/langfuse_tracer.py
import os
from langfuse import Langfuse
from langfuse.decorators import langfuse_context, observe
from typing import Optional, Any
from functools import wraps
import time
# Initialize LangFuse client
langfuse = Langfuse(
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
)
def trace_agent_run(
agent_type: str,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
metadata: Optional[dict] = None,
):
"""Decorator that creates a LangFuse trace for an agent run."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
trace = langfuse.trace(
name=f"agent-run-{agent_type}",
user_id=user_id,
session_id=session_id,
metadata={
"agent_type": agent_type,
**(metadata or {}),
},
)
# Make trace available within the function via context
langfuse_context.update_current_trace(
trace_id=trace.id,
user_id=user_id,
session_id=session_id,
)
start_time = time.perf_counter()
try:
result = await func(*args, **kwargs)
duration = time.perf_counter() - start_time
trace.update(
output=str(result)[:1000], # Truncate for storage
metadata={"duration_seconds": duration, "status": "success"},
)
return result
except Exception as e:
duration = time.perf_counter() - start_time
trace.update(
metadata={
"duration_seconds": duration,
"status": "error",
"error_type": type(e).__name__,
"error_message": str(e)[:500],
},
level="ERROR",
)
raise
return wrapper
return decorator
class LangFuseAgentTracer:
"""Manual tracing for non-decorator use cases."""
def __init__(self, agent_type: str, user_id: str = None):
self.agent_type = agent_type
self.trace = langfuse.trace(
name=f"agent-{agent_type}",
user_id=user_id,
)
self._current_span = None
def start_llm_call(self, model: str, messages: list[dict], tools: list[dict] = None) -> str:
"""Record start of an LLM call. Returns span ID."""
span = self.trace.generation(
name="llm-call",
model=model,
input=messages,
metadata={"has_tools": bool(tools), "tool_count": len(tools or [])},
)
self._current_span = span
return span.id
def end_llm_call(
self,
span_id: str,
output: str,
usage: dict,
model: str,
) -> None:
"""Record end of an LLM call with token usage."""
if self._current_span and self._current_span.id == span_id:
self._current_span.end(
output=output,
usage={
"input": usage.get("prompt_tokens", 0),
"output": usage.get("completion_tokens", 0),
"total": usage.get("total_tokens", 0),
},
model=model,
)
def record_tool_call(self, tool_name: str, input_data: dict, output: str, duration_ms: float) -> None:
"""Record a tool call as a span within the trace."""
self.trace.span(
name=f"tool-{tool_name}",
input=input_data,
output=output[:500], # Truncate long outputs
metadata={"duration_ms": duration_ms},
)
def add_score(self, name: str, value: float, comment: str = None) -> None:
"""Add an evaluation score to the trace (0.0 to 1.0)."""
langfuse.score(
trace_id=self.trace.id,
name=name,
value=value,
comment=comment,
)
def finish(self, output: str, status: str = "success") -> None:
"""Complete the trace."""
self.trace.update(
output=output[:1000],
metadata={"status": status},
level="DEFAULT" if status == "success" else "ERROR",
)
Step 3: Monitoring Decorator for Production Agents#
Combine Prometheus metrics and LangFuse tracing in one decorator:
# monitoring/agent_monitor.py
import asyncio
import time
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional
import logging
logger = logging.getLogger(__name__)
class AgentMonitor:
"""Instruments an agent with full metrics and tracing."""
def __init__(
self,
agent_type: str,
model: str,
user_id: Optional[str] = None,
):
self.agent_type = agent_type
self.model = model
self.user_id = user_id
self._run_start: float = 0
self._tracer: Optional[LangFuseAgentTracer] = None
self._tool_calls = 0
self._input_tokens = 0
self._output_tokens = 0
@asynccontextmanager
async def run_context(self) -> AsyncIterator["AgentMonitor"]:
"""Context manager for a complete agent run."""
self._run_start = time.perf_counter()
self._tracer = LangFuseAgentTracer(self.agent_type, self.user_id)
active_agent_runs.labels(agent_type=self.agent_type).inc()
try:
yield self
# Success path
duration = time.perf_counter() - self._run_start
self._record_completion(duration, "success")
except Exception as e:
duration = time.perf_counter() - self._run_start
error_category = self._classify_error_category(e)
self._record_completion(duration, "error", error_category)
raise
finally:
active_agent_runs.labels(agent_type=self.agent_type).dec()
def _record_completion(
self,
duration: float,
status: str,
error_category: str = None,
) -> None:
"""Record all metrics at the end of a run."""
# Prometheus metrics
agent_requests_total.labels(
agent_type=self.agent_type,
status=status,
).inc()
agent_request_duration_seconds.labels(
agent_type=self.agent_type,
).observe(duration)
agent_tool_call_count.labels(
agent_type=self.agent_type,
).observe(self._tool_calls)
if error_category:
agent_errors_total.labels(
agent_type=self.agent_type,
error_category=error_category,
).inc()
# Cost calculation
pricing = {"input": 2.50, "output": 10.00} # GPT-4o per 1M tokens
cost = (
(self._input_tokens / 1_000_000) * pricing["input"] +
(self._output_tokens / 1_000_000) * pricing["output"]
)
agent_cost_per_request_usd.labels(
agent_type=self.agent_type,
model=self.model,
).observe(cost)
llm_cost_usd_total.labels(
model=self.model,
agent_type=self.agent_type,
).inc(cost)
logger.info(
f"Agent run complete: type={self.agent_type} status={status} "
f"duration={duration:.2f}s tools={self._tool_calls} "
f"tokens={self._input_tokens+self._output_tokens} cost=${cost:.4f}"
)
def record_llm_call(self, input_tokens: int, output_tokens: int, duration: float) -> None:
"""Record a single LLM API call."""
self._input_tokens += input_tokens
self._output_tokens += output_tokens
llm_call_duration_seconds.labels(model=self.model).observe(duration)
llm_api_calls_total.labels(model=self.model, call_type="completion").inc()
tokens_used_total.labels(model=self.model, token_type="input").inc(input_tokens)
tokens_used_total.labels(model=self.model, token_type="output").inc(output_tokens)
def record_tool_call(self, tool_name: str, duration: float, success: bool) -> None:
"""Record a single tool call."""
self._tool_calls += 1
status = "success" if success else "failure"
tool_calls_total.labels(tool_name=tool_name, status=status).inc()
if self._tracer:
self._tracer.record_tool_call(
tool_name=tool_name,
input_data={},
output="",
duration_ms=duration * 1000,
)
@staticmethod
def _classify_error_category(error: Exception) -> str:
from .error_handling.classifier import classify_error, ErrorCategory
category = classify_error(error)
return category.value
# Usage
async def monitored_agent_run(user_query: str, user_id: str) -> str:
monitor = AgentMonitor(
agent_type="support_agent",
model="gpt-4o",
user_id=user_id,
)
async with monitor.run_context():
# Your agent code here
import time as t
llm_start = t.perf_counter()
# ... actual LLM call ...
result = "Agent response here"
llm_duration = t.perf_counter() - llm_start
monitor.record_llm_call(
input_tokens=500,
output_tokens=200,
duration=llm_duration,
)
monitor.record_tool_call("search_knowledge_base", 0.8, success=True)
return result
Step 4: Prometheus Alerting Rules#
# prometheus/alert_rules.yml
groups:
- name: ai_agent_alerts
interval: 60s
rules:
# High error rate
- alert: AgentHighErrorRate
expr: |
rate(agent_requests_total{status="error"}[5m])
/ rate(agent_requests_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "Agent error rate above 5%"
description: >-
Agent type {{ $labels.agent_type }} error rate is
{{ $value | humanizePercentage }} over the last 5 minutes.
# Critical error rate
- alert: AgentCriticalErrorRate
expr: |
rate(agent_requests_total{status="error"}[5m])
/ rate(agent_requests_total[5m]) > 0.15
for: 2m
labels:
severity: critical
annotations:
summary: "Agent error rate above 15% — immediate action required"
# P95 latency too high
- alert: AgentHighLatency
expr: |
histogram_quantile(0.95,
rate(agent_request_duration_seconds_bucket[10m])
) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "Agent P95 latency above 30 seconds"
description: >-
Agent {{ $labels.agent_type }} P95 latency is
{{ $value | humanizeDuration }}.
# Cost spike
- alert: AgentCostSpike
expr: |
rate(llm_cost_usd_total[1h]) * 3600 > 10
for: 5m
labels:
severity: warning
annotations:
summary: "LLM API cost exceeding $10/hour"
description: >-
Current spend rate: ${{ $value | printf "%.2f" }}/hour.
Check for runaway agents or prompt injection.
# Single expensive request
- alert: AgentExpensiveRequest
expr: agent_cost_per_request_usd_sum / agent_cost_per_request_usd_count > 1.0
labels:
severity: critical
annotations:
summary: "Average cost per request above $1.00"
# Circuit breaker open
- alert: AgentCircuitBreakerOpen
expr: agent_circuit_breaker_state == 1
for: 1m
labels:
severity: warning
annotations:
summary: "Circuit breaker OPEN for {{ $labels.service_name }}"
Step 5: Grafana Dashboard Configuration#
Key panels for your agent monitoring dashboard:
{
"dashboard": {
"title": "AI Agent Production Dashboard",
"panels": [
{
"title": "Request Rate",
"type": "timeseries",
"targets": [{
"expr": "rate(agent_requests_total[5m])",
"legendFormat": "{{agent_type}} - {{status}}"
}]
},
{
"title": "P50 / P95 Latency",
"type": "timeseries",
"targets": [
{
"expr": "histogram_quantile(0.50, rate(agent_request_duration_seconds_bucket[10m]))",
"legendFormat": "P50 {{agent_type}}"
},
{
"expr": "histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[10m]))",
"legendFormat": "P95 {{agent_type}}"
}
]
},
{
"title": "Error Rate %",
"type": "stat",
"targets": [{
"expr": "rate(agent_requests_total{status='error'}[5m]) / rate(agent_requests_total[5m]) * 100",
"legendFormat": "Error %"
}],
"thresholds": {"steps": [
{"value": 0, "color": "green"},
{"value": 5, "color": "yellow"},
{"value": 15, "color": "red"}
]}
},
{
"title": "LLM Cost (Hourly Rate)",
"type": "stat",
"targets": [{
"expr": "rate(llm_cost_usd_total[1h]) * 3600",
"legendFormat": "$/hour"
}]
},
{
"title": "Token Usage by Model",
"type": "timeseries",
"targets": [{
"expr": "rate(llm_tokens_used_total[5m])",
"legendFormat": "{{model}} {{token_type}}"
}]
},
{
"title": "Tool Call Success Rate",
"type": "timeseries",
"targets": [{
"expr": "rate(agent_tool_calls_total{status='success'}[5m]) / rate(agent_tool_calls_total[5m])",
"legendFormat": "{{tool_name}}"
}]
}
]
}
}
Common Issues and Solutions#
Issue: Metrics show high error rates but LangFuse shows no failures
This usually indicates errors happening outside the traced agent loop — in initialization, authentication, or network setup. Add metrics counters outside your monitoring decorators at the infrastructure layer.
Issue: LangFuse traces are missing tool call details
Ensure you call monitor.record_tool_call() inside every tool execution path, including error paths. Missing tool call records in traces usually indicate an uncaught exception in the tool that bypassed the recording code.
Issue: Prometheus metrics not scraped correctly
Verify your metrics server port is accessible from Prometheus. Check prometheus.yml scrape config includes your agent pods/containers. Add a health check endpoint that returns 200 only when the metrics server is ready.
Production Considerations#
Sampling: For high-volume agents (100K+ requests/day), sample LangFuse traces at 10-20% to manage cost and storage. Always trace 100% of errors regardless of sample rate.
PII scrubbing: Before storing traces in LangFuse, redact PII from user inputs and tool outputs. Apply regex or NER-based scrubbing before calling trace.update().
SLO tracking: Define SLOs for your agents (e.g., P95 latency < 15s, error rate < 2%). Track SLO burn rate — how quickly you're consuming your error budget. Alert at 5x burn rate for page-worthy incidents.
Next Steps#
- Add error handling that feeds structured errors to your monitoring
- Implement rate limiting and monitor limit hit frequency
- Set up caching to reduce the metrics you have to monitor
- Review agent tracing for deeper trace design patterns
- Connect to LangFuse for full observability