Error Handling, Retries, and Fallback Strategies in LangGraph
Your LangGraph agent works perfectly in development. You demo it to your team. Everyone’s impressed. Then you deploy it, and within an hour, an API times out, the LLM returns malformed JSON, and a tool throws an exception nobody anticipated. The whole graph crashes.
Sound familiar? Production agents fail. The question isn’t whether they’ll fail — it’s whether they’ll recover. This article shows you how to build LangGraph agents that handle errors gracefully, retry intelligently, and fall back to safer paths when things go wrong.
Why Error Handling Matters in Agent Systems
A traditional Python script fails at one point. You get a traceback, fix the bug, and move on. Agent systems are different — they chain multiple LLM calls, invoke external tools, and pass state between nodes.
Three things make agent error handling harder than regular error handling.
First, external dependencies are unpredictable. Your agent calls OpenAI’s API, a web search tool, and a database. Any of these can fail with rate limits, timeouts, or unexpected responses.
Second, errors compound across nodes. If node A produces bad output, node B consumes it and produces worse output. By the time you see the error, the root cause is three nodes back.
Third, some failures are recoverable. A rate limit error goes away if you wait 30 seconds. Crashing immediately wastes an opportunity to recover.
Let’s set up our environment and build error handling from the ground up.
import os
import time
import random
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.types import RetryPolicy
load_dotenv()
llm = ChatOpenAI(model="gpt-4o-mini")
print("Environment ready")
Environment ready
Try/Except in Node Functions
What’s the simplest way to keep your graph alive when a node fails? Wrap the risky operation in try/except and return a meaningful error message instead of crashing.
Here’s a node that calls an external API. We’ll simulate an unreliable service that sometimes times out or refuses connections. The safe_fetch_node function catches both error types and returns a useful message the LLM can reason about.
def call_external_api(query: str) -> str:
"""Simulates an unreliable API."""
roll = random.random()
if roll < 0.3:
raise TimeoutError("API request timed out after 30s")
if roll < 0.5:
raise ConnectionError("Could not reach api.example.com")
return f"Results for '{query}': 42 matching records found"
def safe_fetch_node(state: MessagesState):
"""Fetch node with structured error handling."""
query = state["messages"][-1].content
try:
result = call_external_api(query)
return {"messages": [AIMessage(content=result)]}
except (TimeoutError, ConnectionError) as e:
error_type = type(e).__name__
return {"messages": [AIMessage(
content=f"[{error_type}] {str(e)}. Continuing with available info."
)]}
random.seed(42)
test_state = {"messages": [HumanMessage(content="test query")]}
result = safe_fetch_node(test_state)
print(result["messages"][0].content)
Results for 'test query': 42 matching records found
Notice the pattern. Each except block returns a valid state update — not a crash. The graph continues to the next node with an error message.
Quick check: what would happen if call_external_api raised a ValueError instead? That exception isn’t caught, so the node would crash. Be specific about which errors you expect.
Retry Patterns with State Counters
Sometimes the right response to a failure isn’t to give up — it’s to try again. You can implement retries manually by tracking attempt counts in your graph state.
The approach: add retry_count and max_retries to your state. The unreliable node increments the counter on each failure. A conditional edge function checks the counter and routes back to the same node (retry) or forward to a fallback (give up).
class RetryState(TypedDict):
messages: Annotated[list, lambda a, b: a + b]
retry_count: int
max_retries: int
last_error: str
def unreliable_node(state: RetryState):
"""A node that fails sometimes and tracks retries."""
try:
if random.random() < 0.6:
raise ConnectionError("Service temporarily unavailable")
return {
"messages": [AIMessage(content="Operation succeeded!")],
"retry_count": 0,
"last_error": "",
}
except ConnectionError as e:
new_count = state.get("retry_count", 0) + 1
return {
"messages": [],
"retry_count": new_count,
"last_error": str(e),
}
The conditional edge decides what happens next. If retry_count exceeds max_retries, we route to a give_up node. If last_error is set but retries remain, we loop back. Otherwise, we continue normally.
def should_retry(state: RetryState) -> str:
"""Route to retry or give up based on attempt count."""
max_retries = state.get("max_retries", 3)
if state.get("retry_count", 0) >= max_retries:
return "give_up"
if state.get("last_error"):
return "retry"
return "continue"
def give_up_node(state: RetryState):
"""Fallback when retries are exhausted."""
count = state.get("retry_count", 0)
error = state.get("last_error", "Unknown error")
return {"messages": [AIMessage(
content=f"Failed after {count} attempts. Last error: {error}"
)]}
builder = StateGraph(RetryState)
builder.add_node("attempt", unreliable_node)
builder.add_node("give_up", give_up_node)
builder.add_edge(START, "attempt")
builder.add_conditional_edges("attempt", should_retry, {
"retry": "attempt",
"give_up": "give_up",
"continue": END,
})
builder.add_edge("give_up", END)
graph = builder.compile()
print("Retry graph compiled")
Retry graph compiled
LangGraph’s Built-In RetryPolicy
Manual retry logic gives you control, but LangGraph provides RetryPolicy for common cases. You attach it to any node and LangGraph handles retries automatically with exponential backoff.
RetryPolicy lives in langgraph.types. Here’s what each parameter controls:
| Parameter | Default | Purpose |
|---|---|---|
initial_interval |
0.5s | Wait before first retry |
backoff_factor |
2.0 | Multiplier after each retry |
max_attempts |
3 | Total attempts allowed |
max_interval |
128s | Backoff ceiling |
jitter |
True | Randomize intervals |
retry_on |
default_retry_on | Which exceptions to retry |
Attaching a policy is one line. Pass retry=RetryPolicy(...) when calling add_node().
def flaky_api_node(state: MessagesState):
"""Node that calls an unreliable API."""
if random.random() < 0.5:
raise ConnectionError("Service unavailable")
return {"messages": [AIMessage(content="API call succeeded")]}
builder = StateGraph(MessagesState)
builder.add_node(
"api_call",
flaky_api_node,
retry=RetryPolicy(max_attempts=5, initial_interval=1.0)
)
builder.add_edge(START, "api_call")
builder.add_edge("api_call", END)
retry_graph = builder.compile()
print("Graph with RetryPolicy compiled")
Graph with RetryPolicy compiled
LangGraph retries flaky_api_node up to 5 times. It waits 1 second before the first retry, then 2 seconds, then 4, and so on. Your node function stays clean — it just raises exceptions when things go wrong.
Predict the output: if initial_interval=1.0 and backoff_factor=2.0, what’s the delay before the 4th retry (ignoring jitter)?
Answer: 1.0 * 2^3 = 8.0 seconds. Each retry doubles the previous delay.
The retry_on parameter deserves special attention. By default, LangGraph retries most exceptions. For HTTP errors (from requests or httpx), it only retries 5xx status codes. You can customize this with a callable.
def should_retry_error(error: Exception) -> bool:
"""Only retry transient network errors."""
return isinstance(error, (ConnectionError, TimeoutError))
builder = StateGraph(MessagesState)
builder.add_node(
"selective_retry",
flaky_api_node,
retry=RetryPolicy(max_attempts=3, retry_on=should_retry_error)
)
builder.add_edge(START, "selective_retry")
builder.add_edge("selective_retry", END)
selective_graph = builder.compile()
print("Selective retry graph compiled")
Selective retry graph compiled
Fallback Nodes and Edges
Retrying doesn’t help when the service is completely down. That’s where fallback nodes come in — they provide an alternative path through your graph.
The pattern: a primary node tries the preferred approach. If it fails, a conditional edge routes to a fallback node that uses a simpler or cached approach. Both paths converge at the same downstream point.
class FallbackState(TypedDict):
messages: Annotated[list, lambda a, b: a + b]
primary_failed: bool
def primary_node(state: FallbackState):
"""Try the preferred approach first."""
try:
response = llm.invoke(state["messages"])
return {"messages": [response], "primary_failed": False}
except Exception as e:
return {
"messages": [AIMessage(content=f"Primary failed: {e}")],
"primary_failed": True,
}
def fallback_node(state: FallbackState):
"""Simpler approach when primary fails."""
query = state["messages"][0].content
return {"messages": [AIMessage(
content=f"I couldn't process your request fully, "
f"but here's a basic response to: '{query}'"
)]}
def route_after_primary(state: FallbackState) -> str:
if state.get("primary_failed", False):
return "fallback"
return "done"
builder = StateGraph(FallbackState)
builder.add_node("primary", primary_node)
builder.add_node("fallback", fallback_node)
builder.add_edge(START, "primary")
builder.add_conditional_edges("primary", route_after_primary, {
"fallback": "fallback",
"done": END,
})
builder.add_edge("fallback", END)
fallback_graph = builder.compile()
print("Fallback graph compiled")
Fallback graph compiled
This pattern is especially useful for LLM model fallbacks. Your primary node calls GPT-4o. If that fails, the fallback calls GPT-4o-mini. LangChain makes this even easier with .with_fallbacks().
def create_resilient_llm():
"""Create an LLM with automatic fallback chain."""
primary = ChatOpenAI(
model="gpt-4o",
timeout=30,
max_retries=3,
)
fallback = ChatOpenAI(
model="gpt-4o-mini",
timeout=30,
max_retries=3,
)
return primary.with_fallbacks([fallback])
resilient_llm = create_resilient_llm()
print("Resilient LLM with fallback chain ready")
Resilient LLM with fallback chain ready
I use this pattern in every production agent. The cost difference between GPT-4o and GPT-4o-mini is significant, so the fallback saves money too.
ToolNode Error Handling with handle_tool_errors
LangGraph’s ToolNode has built-in error handling through handle_tool_errors. When a tool throws an exception, ToolNode catches it and returns the error as a ToolMessage. The LLM sees the error and can self-correct.
Here are two tools that can fail. divide raises on division by zero. fetch_price raises on unknown tickers.
@tool
def divide(a: float, b: float) -> float:
"""Divide a by b."""
if b == 0:
raise ValueError("Cannot divide by zero")
return a / b
@tool
def fetch_price(ticker: str) -> str:
"""Fetch stock price for a ticker symbol."""
valid_tickers = {"AAPL": 185.50, "GOOGL": 142.30}
if ticker not in valid_tickers:
raise ValueError(f"Unknown ticker: {ticker}")
return f"${valid_tickers[ticker]}"
tools = [divide, fetch_price]
You can configure handle_tool_errors in four ways. Here’s each option side by side.
# Option 1: Catch all errors, return error text to LLM
tool_node_basic = ToolNode(tools, handle_tool_errors=True)
# Option 2: Custom error message for all errors
tool_node_custom = ToolNode(
tools,
handle_tool_errors="Tool failed. Try different parameters."
)
# Option 3: Custom handler function
def handle_tool_error(error: Exception) -> str:
if isinstance(error, ValueError):
return f"Invalid input: {error}. Check your parameters."
return f"Tool error: {error}. Try a different approach."
tool_node_handler = ToolNode(tools, handle_tool_errors=handle_tool_error)
# Option 4: Catch only specific exception types
tool_node_selective = ToolNode(
tools, handle_tool_errors=(ValueError,)
)
print("Four ToolNode error handling configurations ready")
Four ToolNode error handling configurations ready
When handle_tool_errors=True, the LLM receives the actual error text. If it tried to divide by zero, it gets “Cannot divide by zero” and can reformulate. This self-correction loop is one of the most powerful patterns in agent design.
Graceful Degradation Strategies
What happens when retries fail and fallbacks fail too? Graceful degradation means your agent still provides something useful, even if it can’t deliver the full answer.
I think about degradation in three levels:
Level 1 — Retry and succeed. The operation fails, retries, and eventually works. The user never notices.
Level 2 — Use an alternative. The primary tool or model fails. The agent switches to a backup. The response is less detailed but still useful.
Level 3 — Acknowledge and guide. Nothing works. The agent explains what happened and suggests next steps.
class DegradationState(TypedDict):
messages: Annotated[list, lambda a, b: a + b]
degradation_level: int
def smart_response_node(state: DegradationState):
"""Generate response with graceful degradation."""
level = state.get("degradation_level", 0)
if level == 0:
return {"messages": [AIMessage(
content="Full response with all data sources."
)]}
elif level == 1:
return {"messages": [AIMessage(
content="Partial response. Some data sources were unavailable."
)]}
else:
return {"messages": [AIMessage(
content="I'm having trouble accessing my tools. "
"Here's what I know from training data. "
"Please verify this information independently."
)]}
for level in [0, 1, 2]:
test_state = {"messages": [], "degradation_level": level}
result = smart_response_node(test_state)
print(f"Level {level}: {result['messages'][0].content[:55]}...")
Level 0: Full response with all data sources....
Level 1: Partial response. Some data sources were unavailable...
Level 2: I'm having trouble accessing my tools. Here's what I...
The key: degradation should be transparent. Don’t pretend everything is fine when it isn’t. Tell the user what you couldn’t do and why.
Error State Propagation and Monitoring
When errors happen deep in your graph, downstream nodes need to know. Propagating error information through state lets every node make informed decisions.
Add an errors list to your state that accumulates error information. Each entry includes the node name, error type, message, and timestamp. Here’s a reusable wrapper that adds error tracking to any node function.
from datetime import datetime
class ErrorTrackingState(TypedDict):
messages: Annotated[list, lambda a, b: a + b]
errors: Annotated[list[dict], lambda a, b: a + b]
def node_with_error_tracking(node_name: str, operation):
"""Wrap any node function with error tracking."""
def wrapped(state: ErrorTrackingState):
try:
return operation(state)
except Exception as e:
error_info = {
"node": node_name,
"error_type": type(e).__name__,
"message": str(e),
"timestamp": datetime.now().isoformat(),
}
return {"messages": [], "errors": [error_info]}
return wrapped
def step_one(state):
return {"messages": [AIMessage(content="Step 1 done")], "errors": []}
def step_two(state):
raise ValueError("Database connection refused")
builder = StateGraph(ErrorTrackingState)
builder.add_node("step1", node_with_error_tracking("step1", step_one))
builder.add_node("step2", node_with_error_tracking("step2", step_two))
builder.add_edge(START, "step1")
builder.add_edge("step1", "step2")
builder.add_edge("step2", END)
error_graph = builder.compile()
result = error_graph.invoke({"messages": [], "errors": []})
print(f"Errors collected: {len(result['errors'])}")
for err in result["errors"]:
print(f" {err['node']}: [{err['error_type']}] {err['message']}")
Errors collected: 1
step2: [ValueError] Database connection refused
For production monitoring, pair error tracking with Python’s logging module. Log at INFO for successes, WARNING for retries, and ERROR for failures. LangSmith captures these traces automatically if you set LANGCHAIN_TRACING_V2=true.
Building a Resilient Agent End-to-End
Let’s combine everything into a production-ready agent. This research agent has three tools, three layers of error protection, and graceful fallback paths.
We’ll create tools with different failure modes. web_search simulates rate limiting. calculator fails on invalid expressions. summarize_text rejects short input. The graph handles all of them.
@tool
def web_search(query: str) -> str:
"""Search the web for information."""
if random.random() < 0.3:
raise ConnectionError("Search API rate limited")
return f"Search results for '{query}': Found 5 relevant articles."
@tool
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression safely."""
allowed = set("0123456789+-*/.() ")
if not all(c in allowed for c in expression):
raise ValueError(f"Invalid characters in: '{expression}'")
result = eval(expression)
return str(result)
@tool
def summarize_text(text: str) -> str:
"""Summarize a block of text."""
if len(text) < 10:
raise ValueError("Text too short to summarize")
return f"Summary: {text[:100]}..."
research_tools = [web_search, calculator, summarize_text]
research_llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(research_tools)
The agent node wraps the LLM call in try/except. The ToolNode uses handle_tool_errors=True. And we attach RetryPolicy to the tools node for transient failures.
def agent_node(state: MessagesState):
"""Agent with error handling on LLM calls."""
try:
response = research_llm.invoke(state["messages"])
return {"messages": [response]}
except Exception as e:
return {"messages": [AIMessage(
content="I'm having trouble processing your request. "
"Could you rephrase your question?"
)]}
tool_node = ToolNode(research_tools, handle_tool_errors=True)
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node(
"tools", tool_node,
retry=RetryPolicy(max_attempts=3, initial_interval=1.0)
)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent")
resilient_agent = builder.compile()
print("Resilient research agent compiled")
Resilient research agent compiled
Three layers of protection work together here:
Layer 1 (tool-level): handle_tool_errors=True catches tool exceptions and sends error text back to the LLM. The LLM can self-correct by trying different parameters.
Layer 2 (node-level): RetryPolicy on the tools node retries the entire tool execution on transient errors. Rate limits and timeouts resolve themselves.
Layer 3 (agent-level): The agent_node try/except catches LLM failures and returns a graceful message.
Common Mistakes and How to Fix Them
Mistake 1: Catching exceptions inside tools instead of using ToolNode
❌ Wrong:
@tool
def my_tool(query: str) -> str:
"""Do something."""
try:
return do_work(query)
except Exception:
return "Something went wrong"
Why it’s wrong: The LLM gets a useless message. It can’t tell an error from a real result. You’ve also hidden the error from your logs.
✅ Correct:
@tool
def my_tool(query: str) -> str:
"""Do something."""
return do_work(query) # Let ToolNode handle errors
Let ToolNode with handle_tool_errors=True catch the exception. It returns the actual error message as a ToolMessage, which the LLM uses to self-correct.
Mistake 2: Retrying errors that will never resolve
❌ Wrong:
builder.add_node("parse", parse_node, retry=RetryPolicy(max_attempts=5))
Why it’s wrong: If the input is malformed, retrying produces the same error five times. You waste time and API credits.
✅ Correct:
def should_retry(error: Exception) -> bool:
return isinstance(error, (ConnectionError, TimeoutError))
builder.add_node("parse", parse_node, retry=RetryPolicy(
max_attempts=5, retry_on=should_retry
))
Use retry_on to filter which exceptions deserve a retry. Network errors are worth retrying. Input validation errors are not.
Mistake 3: Silent failures that crash downstream nodes
❌ Wrong:
def node_a(state):
try:
return {"result": risky_operation()}
except Exception:
return {"result": None} # Silent failure
def node_b(state):
processed = state["result"].upper() # Crashes: NoneType has no .upper()
Why it’s wrong: Node B doesn’t know node A failed. It crashes with an AttributeError that hides the real problem.
✅ Correct:
def node_a(state):
try:
return {"result": risky_operation(), "errors": []}
except Exception as e:
return {"result": None, "errors": [{"node": "a", "error": str(e)}]}
def node_b(state):
if state.get("errors"):
return {"result": "Skipped: upstream error"}
return {"result": state["result"].upper(), "errors": []}
Propagate errors through state. Downstream nodes check for errors before processing.
Error Troubleshooting Guide
GraphRecursionError: Recursion limit of 25 reached
This happens when your retry loop creates a cycle that exceeds LangGraph’s recursion limit. Fix: set recursion_limit when compiling the graph, or reduce max_retries in your state-based retry logic.
graph = builder.compile()
result = graph.invoke(state, {"recursion_limit": 50})
ToolException: Tool 'my_tool' not found
Your LLM requested a tool that isn’t registered in the ToolNode. This typically happens when the LLM hallucinates a tool name. With handle_tool_errors=True, this error goes back to the LLM for correction.
openai.RateLimitError: Rate limit reached
You’ve exceeded your API quota. The ChatOpenAI class retries automatically via max_retries. If it persists, add .with_fallbacks() to switch to a cheaper model.
When NOT to Use These Patterns
Not every graph needs retry logic and fallback paths. Here’s when to keep things simple.
Prototyping and exploration. During development, let errors crash loudly. You want to see what fails and why. Adding error handling too early hides bugs.
Deterministic pipelines. If your graph doesn’t call external APIs or LLMs, failures are bugs in your code, not transient errors. Fix the bug instead of retrying around it.
Cost-sensitive applications. Every retry costs money (LLM calls, API usage). If your budget is tight, fail fast and surface the error to the user rather than burning credits on retries.
Exercises
{
type: ‘exercise’,
id: ‘retry-node-exercise’,
title: ‘Exercise 1: Build a Retry-Aware Node’,
difficulty: ‘intermediate’,
exerciseType: ‘write’,
instructions: ‘Complete the retry_node function so it retries flaky_operation() up to 3 times with exponential backoff (base delay of 1 second). On success, return the result as an AIMessage. After all attempts fail, return a fallback message.’,
starterCode: ‘import time\nimport random\nfrom langchain_core.messages import AIMessage\n\ndef flaky_operation():\n “””Succeeds only 30% of the time.”””\n if random.random() > 0.3:\n raise ConnectionError(“Service unavailable”)\n return “Operation completed successfully”\n\ndef retry_node(state):\n max_attempts = 3\n # YOUR CODE HERE\n # Try flaky_operation() with exponential backoff\n # Return AIMessage with result on success\n # Return AIMessage with fallback on failure\n pass’,
testCases: [
{ id: ‘tc1’, input: ‘random.seed(10)\nresult = retry_node({“messages”: []})\nprint(result[“messages”][0].content)’, expectedOutput: ‘Operation completed successfully’, description: ‘Should succeed after retries with seed 10’ },
{ id: ‘tc2’, input: ‘random.seed(1)\nresult = retry_node({“messages”: []})\nprint(“fallback” in result[“messages”][0].content.lower())’, expectedOutput: ‘True’, description: ‘Should return fallback message when all retries fail’, hidden: true },
],
hints: [
‘Use a for loop with range(max_attempts). Inside the loop, wrap flaky_operation() in try/except. Calculate delay as 1.0 * (2 ** attempt) and call time.sleep(delay) before the next attempt.’,
‘After the loop completes without returning, all attempts failed. Return {“messages”: [AIMessage(content=”All retries exhausted. Using fallback response.”)]}’,
],
solution: ‘def retry_node(state):\n max_attempts = 3\n for attempt in range(max_attempts):\n try:\n result = flaky_operation()\n return {“messages”: [AIMessage(content=result)]}\n except ConnectionError:\n if attempt < max_attempts – 1:\n delay = 1.0 * (2 ** attempt)\n time.sleep(delay)\n return {“messages”: [AIMessage(content=”All retries exhausted. Using fallback response.”)]}’,
solutionExplanation: ‘The loop tries the operation up to 3 times. On each failure, it waits 1s, then 2s before the next attempt. After the final failure, it returns a fallback message instead of crashing.’,
xpReward: 15,
}
{
type: ‘exercise’,
id: ‘error-tracking-exercise’,
title: ‘Exercise 2: Add Error Tracking to a Graph’,
difficulty: ‘intermediate’,
exerciseType: ‘write’,
instructions: ‘Create a safe_wrap(name, fn) function that wraps any node function with error tracking. When the wrapped function catches an exception, it should append an error dict with “node” and “error” keys to the errors list in state. Then use it to build a 3-node graph where step_b always fails.’,
starterCode: ‘from typing import TypedDict, Annotated\nfrom langchain_core.messages import AIMessage\nfrom langgraph.graph import StateGraph, START, END\n\nclass TrackedState(TypedDict):\n messages: Annotated[list, lambda a, b: a + b]\n errors: Annotated[list[dict], lambda a, b: a + b]\n\ndef step_a(state):\n return {“messages”: [AIMessage(content=”A done”)], “errors”: []}\n\ndef step_b(state):\n raise ValueError(“Database connection failed”)\n\ndef step_c(state):\n count = len(state[“errors”])\n return {“messages”: [AIMessage(content=f”Done with {count} error(s)”)], “errors”: []}\n\ndef safe_wrap(name, fn):\n # YOUR CODE HERE\n pass\n\n# YOUR CODE HERE: Build graph with wrapped nodes’,
testCases: [
{ id: ‘tc1’, input: ‘result = graph.invoke({“messages”: [], “errors”: []})\nprint(result[“messages”][-1].content)’, expectedOutput: ‘Done with 1 error(s)’, description: ‘Should collect 1 error from step_b’ },
{ id: ‘tc2’, input: ‘result = graph.invoke({“messages”: [], “errors”: []})\nprint(result[“errors”][0][“node”])’, expectedOutput: ‘b’, description: ‘Error should identify the failing node’ },
],
hints: [
‘safe_wrap should return a new function that calls fn(state) inside try/except. On exception, return {“messages”: [], “errors”: [{“node”: name, “error”: str(e)}]}.’,
‘Build the graph: add_node(“a”, safe_wrap(“a”, step_a)), add_node(“b”, safe_wrap(“b”, step_b)), add_node(“c”, step_c). Chain edges: START -> a -> b -> c -> END.’,
],
solution: ‘def safe_wrap(name, fn):\n def wrapped(state):\n try:\n return fn(state)\n except Exception as e:\n return {“messages”: [], “errors”: [{“node”: name, “error”: str(e)}]}\n return wrapped\n\nbuilder = StateGraph(TrackedState)\nbuilder.add_node(“a”, safe_wrap(“a”, step_a))\nbuilder.add_node(“b”, safe_wrap(“b”, step_b))\nbuilder.add_node(“c”, step_c)\nbuilder.add_edge(START, “a”)\nbuilder.add_edge(“a”, “b”)\nbuilder.add_edge(“b”, “c”)\nbuilder.add_edge(“c”, END)\ngraph = builder.compile()’,
solutionExplanation: ‘safe_wrap catches exceptions from any node and stores error info in state. Step_b fails with ValueError, but the wrapper catches it. Step_c sees 1 error in state and reports it.’,
xpReward: 20,
}
Complete Code
Click to expand the full script (copy-paste and run)
# Complete code from: Error Handling, Retries, and Fallback Strategies in LangGraph
# Requires: pip install langchain-openai langgraph python-dotenv
# Python 3.10+
# Set OPENAI_API_KEY in your .env file
import os
import time
import random
from typing import TypedDict, Annotated
from datetime import datetime
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.types import RetryPolicy
load_dotenv()
# --- Setup ---
llm = ChatOpenAI(model="gpt-4o-mini")
# --- Try/Except in Node Functions ---
def call_external_api(query: str) -> str:
roll = random.random()
if roll < 0.3:
raise TimeoutError("API request timed out after 30s")
if roll < 0.5:
raise ConnectionError("Could not reach api.example.com")
return f"Results for '{query}': 42 matching records found"
def safe_fetch_node(state: MessagesState):
query = state["messages"][-1].content
try:
result = call_external_api(query)
return {"messages": [AIMessage(content=result)]}
except (TimeoutError, ConnectionError) as e:
error_type = type(e).__name__
return {"messages": [AIMessage(
content=f"[{error_type}] {str(e)}. Continuing with available info."
)]}
# --- Retry with State Counters ---
class RetryState(TypedDict):
messages: Annotated[list, lambda a, b: a + b]
retry_count: int
max_retries: int
last_error: str
def unreliable_node(state: RetryState):
try:
if random.random() < 0.6:
raise ConnectionError("Service temporarily unavailable")
return {
"messages": [AIMessage(content="Operation succeeded!")],
"retry_count": 0, "last_error": "",
}
except ConnectionError as e:
new_count = state.get("retry_count", 0) + 1
return {"messages": [], "retry_count": new_count, "last_error": str(e)}
def should_retry(state: RetryState) -> str:
if state.get("retry_count", 0) >= state.get("max_retries", 3):
return "give_up"
if state.get("last_error"):
return "retry"
return "continue"
def give_up_node(state: RetryState):
return {"messages": [AIMessage(
content=f"Failed after {state.get('retry_count', 0)} attempts."
)]}
# --- Exponential Backoff ---
def backoff_delay(attempt: int, base: float = 1.0, jitter: bool = True) -> float:
delay = base * (2 ** attempt)
if jitter:
delay *= (0.5 + random.random())
return min(delay, 60.0)
# --- Fallback Pattern ---
class FallbackState(TypedDict):
messages: Annotated[list, lambda a, b: a + b]
primary_failed: bool
def primary_node(state: FallbackState):
try:
response = llm.invoke(state["messages"])
return {"messages": [response], "primary_failed": False}
except Exception as e:
return {"messages": [AIMessage(content=f"Primary failed: {e}")],
"primary_failed": True}
def fallback_node(state: FallbackState):
query = state["messages"][0].content
return {"messages": [AIMessage(
content=f"Basic response to: '{query}'"
)]}
# --- Error Tracking ---
class ErrorTrackingState(TypedDict):
messages: Annotated[list, lambda a, b: a + b]
errors: Annotated[list[dict], lambda a, b: a + b]
def node_with_error_tracking(node_name: str, operation):
def wrapped(state: ErrorTrackingState):
try:
return operation(state)
except Exception as e:
return {"messages": [], "errors": [{
"node": node_name,
"error_type": type(e).__name__,
"message": str(e),
"timestamp": datetime.now().isoformat(),
}]}
return wrapped
# --- Resilient Agent ---
@tool
def web_search(query: str) -> str:
"""Search the web for information."""
if random.random() < 0.3:
raise ConnectionError("Search API rate limited")
return f"Search results for '{query}': Found 5 relevant articles."
@tool
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression safely."""
allowed = set("0123456789+-*/.() ")
if not all(c in allowed for c in expression):
raise ValueError(f"Invalid characters in: '{expression}'")
return str(eval(expression))
@tool
def summarize_text(text: str) -> str:
"""Summarize a block of text."""
if len(text) < 10:
raise ValueError("Text too short to summarize")
return f"Summary: {text[:100]}..."
research_tools = [web_search, calculator, summarize_text]
research_llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(research_tools)
def agent_node(state: MessagesState):
try:
response = research_llm.invoke(state["messages"])
return {"messages": [response]}
except Exception as e:
return {"messages": [AIMessage(
content="I'm having trouble. Could you rephrase?"
)]}
tool_node = ToolNode(research_tools, handle_tool_errors=True)
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node, retry=RetryPolicy(max_attempts=3))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", tools_condition)
builder.add_edge("tools", "agent")
resilient_agent = builder.compile()
# Run the agent
result = resilient_agent.invoke({
"messages": [HumanMessage(content="What is 25 * 4?")]
})
for msg in result["messages"]:
if isinstance(msg, HumanMessage):
print(f"User: {msg.content}")
elif isinstance(msg, ToolMessage):
print(f"Tool: {msg.content[:80]}")
else:
print(f"Agent: {msg.content[:120]}")
print("\nScript completed successfully.")
Summary
Error handling in LangGraph isn’t a nice-to-have — it’s essential for production agents. Here’s what you’ve learned.
Try/except in nodes keeps your graph alive when operations fail. Catch specific exceptions and return meaningful messages.
State-based retries give you full control. Track counts in state and use conditional edges to decide whether to retry.
RetryPolicy handles retries automatically. Attach it to any node with retry=RetryPolicy(...) for exponential backoff out of the box.
Fallback nodes provide alternative paths. Chain LLM models with .with_fallbacks() or build conditional fallback edges.
ToolNode’s handle_tool_errors catches tool exceptions and feeds them back to the LLM for self-correction.
Error propagation lets downstream nodes react to upstream failures. Add an errors list to your state and check it before processing.
Practice exercise: build an agent with two tools where one is unreliable. Add RetryPolicy to the tools node, handle_tool_errors=True to ToolNode, and a fallback agent node that responds without tools when all else fails.
Solution outline
# 1. Define reliable_tool and unreliable_tool
# 2. Create ToolNode with handle_tool_errors=True
# 3. Add tools node with RetryPolicy(max_attempts=3)
# 4. Add fallback_agent node that responds without tools
# 5. Conditional edge: route to fallback after repeated failures
# 6. Test with a query that triggers the unreliable tool
Frequently Asked Questions
Does RetryPolicy retry the entire graph or just the failed node?
Just the failed node. When a node raises an exception, RetryPolicy retries that specific node with the same input state. The rest of the graph waits.
# Each node gets its own policy
builder.add_node("a", node_a, retry=RetryPolicy(max_attempts=5))
builder.add_node("b", node_b, retry=RetryPolicy(max_attempts=2))
How does handle_tool_errors interact with RetryPolicy?
They work at different levels. handle_tool_errors catches tool exceptions and converts them to ToolMessage responses. The graph doesn’t see a failure, so RetryPolicy doesn’t trigger. If the error happens outside the tool (serialization, network), RetryPolicy kicks in.
Can I add a delay between state-based retries?
Yes. Add a time.sleep() call at the start of your node when retry_count > 0. Calculate the delay with exponential backoff: delay = base * (2 ** state["retry_count"]).
def retry_aware_node(state):
if state.get("retry_count", 0) > 0:
delay = 1.0 * (2 ** state["retry_count"])
time.sleep(min(delay, 30))
# ... rest of node logic
What happens when RetryPolicy exhausts all attempts?
LangGraph raises the original exception. If your graph has no error handling around that node, execution stops. Design your graph with conditional edges to a fallback node for graceful handling.
References
- LangGraph Documentation — How to Add Node Retry Policies: https://langchain-ai.github.io/langgraph/how-tos/node-retries/
- LangGraph Types Reference — RetryPolicy: https://langchain-ai.github.io/langgraph/reference/types/
- LangChain Documentation — ToolNode and Tool Execution: https://python.langchain.com/docs/langgraph/prebuilt/toolnode
- LangChain Documentation — Model Fallbacks: https://python.langchain.com/docs/how_to/fallbacks/
- LangGraph Changelog — Enhanced State Management and Retries: https://changelog.langchain.com/announcements/enhanced-state-management-retries-in-langgraph-python
- Python Documentation — Logging HOWTO: https://docs.python.org/3/howto/logging.html
- LangGraph Documentation — Thinking in LangGraph: https://docs.langchain.com/oss/python/langgraph/thinking-in-langgraph
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →