LangGraph Persistence and Checkpointing — Save, Resume, and Time Travel
Picture this: you spend an hour talking to a LangGraph chatbot. The server reboots. Poof — your whole conversation is gone, and you’re back to square one.
That’s the problem persistence fixes. After this guide, your graph will save its full state at every step. Server crash, new deploy, user gone for a week — the chat picks up right where it left off.
Here’s how the parts fit. A checkpointer watches your graph run. After each node finishes, it grabs a snapshot and tags it with a thread ID — think of that ID as a folder name that groups all the snapshots from one chat.
Next time you pass that thread ID, the graph loads its latest snapshot and keeps going. These snapshots stack up like save files in a video game. You can browse them, jump back to any one, or branch off in a new path. LangGraph calls this “time travel.”
What Is a Checkpointer and Why Should You Care?
A checkpointer is a small object you plug into your graph when you compile it. Its one job: after each node runs, write the state to storage. Skip it, and the graph starts fresh every time. Add it, and the graph gains memory.
The code change is tiny, but the effect is huge. Without a checkpointer, each call stands alone:
# Without persistence — each call starts fresh
result1 = graph.invoke({"messages": [HumanMessage("My name is Alice")]})
result2 = graph.invoke({"messages": [HumanMessage("What's my name?")]})
# The graph has NO idea the user said "Alice" in the previous call
With persistence, the graph tracks the whole conversation:
# With persistence — the graph remembers
config = {"configurable": {"thread_id": "user-123"}}
result1 = graph.invoke({"messages": [HumanMessage("My name is Alice")]}, config)
result2 = graph.invoke({"messages": [HumanMessage("What's my name?")]}, config)
# The graph knows the user is Alice — it loaded the previous state
See the thread_id? That one string tells the checkpointer which chat to open. Two different IDs give you two separate chats. Same ID picks up where you left off.
Key Insight: The checkpointer doesn’t just save the end result. It takes a snapshot after every node. So you can jump into the middle of a run to check values, replay a step, or debug a wrong turn.
Before You Start
- Python: 3.10+
- Packages: langgraph 0.4+, langchain-openai 0.3+, langchain-core 0.3+, langgraph-checkpoint-sqlite 2.0+
- Install:
pip install langgraph langchain-openai langchain-core langgraph-checkpoint-sqlite - API key:
OPENAI_API_KEYin your environment - Time: ~30 minutes
- Background: Basic LangGraph concepts (nodes, edges, state) from earlier posts
Start by pulling in the pieces we need — an LLM wrapper, message types, the graph builder, and the built-in in-memory checkpointer.
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
How Does MemorySaver Work?
MemorySaver is the fastest way in. It keeps all checkpoints in RAM, so setup takes one line. The catch: when Python stops, all checkpoints vanish. Great for learning and tests — not for production.
Let’s build a small chatbot to see it work. Our graph has one node that sends the message list to the LLM and hands back the reply.
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def chatbot(state: MessagesState):
"""Send the full message history to the LLM."""
return {"messages": [llm.invoke(state["messages"])]}
# Build the graph
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
# Compile WITH a checkpointer
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
The key line is builder.compile(checkpointer=memory). Drop that argument and the graph still runs — but it forgets everything between calls.
Time to prove it works. We’ll fire two messages under the same thread_id and see whether the bot remembers the first one.
config = {"configurable": {"thread_id": "thread-1"}}
# First message
response1 = graph.invoke(
{"messages": [HumanMessage(content="Hi, I'm building a RAG pipeline.")]},
config
)
print(response1["messages"][-1].content)
The checkpointer stored that exchange. Now ask a follow-up — without repeating any context.
# Second message — same thread_id
response2 = graph.invoke(
{"messages": [HumanMessage(content="What did I say I'm working on?")]},
config
)
print(response2["messages"][-1].content)
It knows about the RAG pipeline — even though we never said it again. Behind the scenes, the checkpointer pulled the saved state and added the new message before calling the LLM.
Quick check: Swap in a different thread_id for the second call and watch what happens. The bot draws a blank — it can’t find the RAG pipeline because that history lives under "thread-1", not the new ID.
How Do Thread IDs and the Config Object Work?
The checkpointer finds the right chat by reading the thread_id from a config dict. Every invoke() or stream() call needs one.
# Each thread_id is a separate conversation
config_alice = {"configurable": {"thread_id": "alice-session"}}
config_bob = {"configurable": {"thread_id": "bob-session"}}
# Alice's conversation
graph.invoke(
{"messages": [HumanMessage(content="I prefer PyTorch.")]},
config_alice
)
# Bob's conversation — completely independent
graph.invoke(
{"messages": [HumanMessage(content="I prefer TensorFlow.")]},
config_bob
)
Alice and Bob each run in their own lane. Nothing from one thread leaks into the other. In a live app you’d use a user ID, session token, or UUID as the thread ID.
Tip: While you’re developing, pick human-readable IDs like "test-alice". They make print-debugging painless. Once you deploy, switch to UUIDs so threads never collide.
The config also has a checkpoint_id field. You won’t set it often, but it lets you target one exact snapshot inside a thread. We’ll use it in the time-travel section.
How Does SqliteSaver Survive Restarts?
RAM storage is fine for playing around, but at some point you need data that outlives the process. SqliteSaver writes checkpoints to a SQLite file on disk. Restart Python, reboot the box, copy the file to a new laptop — the chat picks up right away.
We set up langgraph-checkpoint-sqlite in the prereqs. Call from_conn_string with a file path and the lib makes the database on the fly if it’s not there yet.
from langgraph.checkpoint.sqlite import SqliteSaver
# The database file is created if it doesn't exist
with SqliteSaver.from_conn_string("checkpoints.db") as sqlite_saver:
graph = builder.compile(checkpointer=sqlite_saver)
config = {"configurable": {"thread_id": "persistent-thread"}}
response = graph.invoke(
{"messages": [HumanMessage(content="Remember this: project deadline is March 15.")]},
config
)
print(response["messages"][-1].content)
Your state now lives in checkpoints.db. Close the script, restart Python, point at that same file — the chat picks up like nothing changed.
# After restarting Python — the conversation is still there
from langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string("checkpoints.db") as sqlite_saver:
graph = builder.compile(checkpointer=sqlite_saver)
config = {"configurable": {"thread_id": "persistent-thread"}}
response = graph.invoke(
{"messages": [HumanMessage(content="When is my project deadline?")]},
config
)
print(response["messages"][-1].content)
The saver pulled all past messages from disk and fed them to the LLM. That’s the kind of memory you want while building locally.
Warning: See the with block? SqliteSaver opens a database link that must close cleanly. Skip the context manager and you must call .close() by hand. Forget, and the file can get corrupt.
One catch: SQLite locks the whole file on writes. Two requests at the same time? One waits for the other. Fine for scripts, notebooks, and demos — not for multi-user servers.
When Should You Use PostgresSaver?
When your app serves real users on many servers, you need a real database. PostgresSaver stores checkpoints in PostgreSQL — true concurrency, easy scaling, and the rock-solid uptime Postgres is known for.
Install the extra package first:
pip install langgraph-checkpoint-postgres
The API is nearly the same — swap the import and pass a Postgres connection string. One extra step: call setup() once to build the checkpoint tables.
# Pseudocode — requires a running PostgreSQL instance
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:password@localhost:5432/langgraph_checkpoints"
with PostgresSaver.from_conn_string(DB_URI) as pg_saver:
pg_saver.setup() # creates checkpoint tables — run once
graph = builder.compile(checkpointer=pg_saver)
config = {"configurable": {"thread_id": "prod-thread-001"}}
response = graph.invoke(
{"messages": [HumanMessage(content="Start a new analysis task.")]},
config
)
Tip: On FastAPI or an async server? Both savers have async twins — AsyncSqliteSaver and AsyncPostgresSaver. They slot right into async/await code and keep your event loop clear.
Which Checkpointer Should You Pick?
| Checkpointer | Storage | Survives Restart? | Concurrent Access | Best For |
|---|---|---|---|---|
MemorySaver |
RAM | No | Single process | Tutorials, unit tests |
SqliteSaver |
File on disk | Yes | Single process | Local dev, prototypes |
PostgresSaver |
PostgreSQL | Yes | Multi-process | Production, multi-server |
Rule of thumb: MemorySaver while learning, SqliteSaver for solo hacking, PostgresSaver the moment real users touch your app.
How Do You Peek at the Current State?
Your graph now has memory across calls. But what if you want to peek at what’s stored without running it again? Call get_state(). It gives back a StateSnapshot — a read-only view of where the graph stands right now.
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "inspect-demo"}}
graph.invoke(
{"messages": [HumanMessage(content="Explain gradient descent in one sentence.")]},
config
)
# Inspect the current state
snapshot = graph.get_state(config)
print(f"Next node to run: {snapshot.next}")
print(f"Number of messages: {len(snapshot.values['messages'])}")
print(f"Last message: {snapshot.values['messages'][-1].content[:80]}...")
Five fields live inside the snapshot:
-
values— the state dict itself (yourMessagesStatefields are in here) -
next— which nodes fire next (empty tuple when the graph is done) -
config— holds thecheckpoint_idso you can reload this snapshot later -
metadata— step count, timing, and how this checkpoint was made -
parent_config— links to the snapshot that came right before
If next is (), the graph hit END. If it reads ('chatbot',), the graph paused before that node — the kind of thing you see with human-in-the-loop setups.
How Do You Change State Between Runs?
What if you need to change what the graph “knows” between calls? Maybe you want to add a system prompt, fix a bad tool call, or trim old messages. update_state() lets you edit the checkpoint in place.
Here’s a real case: the model works fine, but you want to add a format rule on the fly.
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "update-demo"}}
graph.invoke(
{"messages": [HumanMessage(content="What's the capital of Australia?")]},
config
)
# Check the current state
snapshot = graph.get_state(config)
print(f"Model said: {snapshot.values['messages'][-1].content}")
Now we’ll sneak a system instruction into the state. The as_node argument tells LangGraph which node should “own” this change.
# Inject a system-level instruction
graph.update_state(
config,
{"messages": [SystemMessage(content="Always respond in exactly two sentences.")]},
as_node="chatbot"
)
# The next invocation sees the injected message
response = graph.invoke(
{"messages": [HumanMessage(content="Tell me about kangaroos.")]},
config
)
print(response["messages"][-1].content)
Why does as_node matter? Set it to "chatbot" and the graph thinks the chatbot already ran — it moves on to the next step. Set it to a node before the chatbot, and the chatbot runs again with the new state.
Warning: update_state only edits the stored checkpoint. No node code runs. You still need to call invoke() or stream() after to keep the graph going.
How Do You Browse Past Checkpoints?
Each time a node finishes, LangGraph writes a new checkpoint. get_state_history() gives you back all of them — a full trail of every step the graph took.
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "history-demo"}}
# Have a short conversation
graph.invoke(
{"messages": [HumanMessage(content="What is LangGraph?")]},
config
)
graph.invoke(
{"messages": [HumanMessage(content="How does it handle state?")]},
config
)
# Browse all checkpoints for this thread
for snapshot in graph.get_state_history(config):
msg_count = len(snapshot.values["messages"])
source = snapshot.metadata.get("source", "unknown")
step = snapshot.metadata.get("step", "?")
print(f"Step {step} | Messages: {msg_count} | Source: {source}")
The output starts with the newest checkpoint and works backward. You’ll spot entries for both calls, plus the steps in between.
Each snapshot has a unique checkpoint_id in snapshot.config. Use that ID to pull up any past state:
history = list(graph.get_state_history(config))
# The last item is the very first checkpoint
oldest = history[-1]
print(f"Oldest checkpoint has {len(oldest.values['messages'])} messages")
Try to predict: How many checkpoints do two calls produce? At least five. Each call generates one when it receives input and another after the chatbot node finishes. On top of that, there’s the blank starting state. The exact number can vary between LangGraph versions, but it’s always more than two.
How Does Time Travel Work?
This is the feature that makes persistence truly fun. Time travel lets you jump back to any past checkpoint and keep going from there. Got a weird reply? Rewind one step and try a new prompt. Need an undo button? Fork from an older snapshot and take a fresh path.
The steps are simple: grab a snapshot from the history, pass its config to invoke(), and the graph resumes on the spot.
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "time-travel-demo"}}
# Build up some conversation
graph.invoke(
{"messages": [HumanMessage(content="I want to learn about transformers.")]},
config
)
graph.invoke(
{"messages": [HumanMessage(content="Actually, let's talk about CNNs instead.")]},
config
)
# Check current state
current = graph.get_state(config)
print(f"Current messages: {len(current.values['messages'])}")
Now let’s locate the snapshot right after the first exchange and branch off from it — pretending the CNN message never existed.
# Find the checkpoint after the first exchange (2 messages: user + AI)
history = list(graph.get_state_history(config))
for snap in history:
if len(snap.values["messages"]) == 2:
rewind_config = snap.config
print(f"Found checkpoint with {len(snap.values['messages'])} messages")
break
# Resume from that earlier point with a DIFFERENT follow-up
response = graph.invoke(
{"messages": [HumanMessage(content="How do attention heads work?")]},
rewind_config
)
print(response["messages"][-1].content)
The model talked about attention heads in the context of transformers — it has no clue about the CNN turn. Meanwhile, the full thread (with the CNN message) still sits there untouched.
Key Insight: Rewinding never deletes data. It forks. The old chat path stays as-is, and a new branch grows from the snapshot you picked. You can always go back to the first path.
What’s Inside Checkpoint Metadata?
Each checkpoint comes with a metadata dict. It logs the time, the node that made the snapshot, and the step number. Super handy when you’re debugging a graph with many nodes and need to trace what happened.
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "metadata-demo"}}
graph.invoke(
{"messages": [HumanMessage(content="Hello!")]},
config
)
for snapshot in graph.get_state_history(config):
meta = snapshot.metadata
print(f"Step: {meta.get('step')} | "
f"Source: {meta.get('source')} | "
f"Writes: {meta.get('writes', {}).keys() if meta.get('writes') else 'none'}")
Here’s what’s in there:
-
source—"input"means a user message came in;"loop"means a node ran -
step— a number that counts each step in the run -
writes— shows which state keys this step changed -
thread_id— which thread this checkpoint sits in
Since metadata is just a dict, filtering is easy. Want only the steps where the chatbot node ran?
chatbot_checkpoints = [
snap for snap in graph.get_state_history(config)
if snap.metadata.get("source") == "loop"
]
print(f"The chatbot node ran {len(chatbot_checkpoints)} time(s)")
How Do You Handle Many Users at Once?
A live app may serve dozens — or thousands — of users at once. The checkpointer walls off each chat: every user gets their own thread_id, and nothing bleeds across.
Here’s a fast demo with three users hitting the same bot.
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
threads = {
"user-alice": "I need help with pandas groupby.",
"user-bob": "How do I deploy a Flask app?",
"user-carol": "Explain gradient boosting.",
}
for thread_id, message in threads.items():
config = {"configurable": {"thread_id": thread_id}}
graph.invoke(
{"messages": [HumanMessage(content=message)]},
config
)
# Later — Alice sends a follow-up
alice_config = {"configurable": {"thread_id": "user-alice"}}
response = graph.invoke(
{"messages": [HumanMessage(content="Show me multi-column groupby.")]},
alice_config
)
print(response["messages"][-1].content)
Alice’s follow-up lands in the right thread because it still holds the pandas-groupby context. Bob and Carol stay in their own bubbles.
Tip: Threads pile up and LangGraph won’t clean them for you. In production, run a scheduled job that prunes old threads. With PostgresSaver, one SQL DELETE on the checkpoint table does it.
Real-World Pattern — How Do You Build Resumable Workflows?
Chatbots are the easy win, but checkpointing really shines in long pipelines. Say step 2 of a three-step job hits an API timeout. Without a checkpointer you’d redo step 1 from scratch. With one, you just fix the issue and pick up where it broke.
Below, each node handles one stage. We define a WorkflowState with a field for each stage’s output.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class WorkflowState(TypedDict):
task: str
step1_result: str
step2_result: str
step3_result: str
def step_one(state: WorkflowState) -> dict:
"""Simulate an expensive API call."""
print("Running step 1 — fetching data...")
return {"step1_result": f"Data for '{state['task']}' fetched"}
def step_two(state: WorkflowState) -> dict:
"""Simulate processing."""
print("Running step 2 — processing data...")
return {"step2_result": f"Processed: {state['step1_result']}"}
def step_three(state: WorkflowState) -> dict:
"""Simulate saving results."""
print("Running step 3 — saving results...")
return {"step3_result": f"Saved: {state['step2_result']}"}
Hook these nodes up in a straight line and add a checkpointer.
workflow = StateGraph(WorkflowState)
workflow.add_node("step1", step_one)
workflow.add_node("step2", step_two)
workflow.add_node("step3", step_three)
workflow.add_edge(START, "step1")
workflow.add_edge("step1", "step2")
workflow.add_edge("step2", "step3")
workflow.add_edge("step3", END)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "workflow-run-1"}}
result = app.invoke({"task": "quarterly-report"}, config)
print(result["step3_result"])
Running step 1 — fetching data...
Running step 2 — processing data...
Running step 3 — saving results...
Saved: Processed: Data for 'quarterly-report' fetched
Every stage saved a checkpoint. If step 2 had crashed, you’d check the state, fix the bug, and restart from that checkpoint. Step 1 wouldn’t run again. For costly data jobs, that payoff alone is worth the setup.
What Are the Most Common Checkpointing Mistakes?
Mistake 1: Calling invoke() without a config
This trips up almost everyone at first. If you don’t pass a thread_id, the checkpointer has nowhere to save — and nowhere to load from.
# WRONG — no config passed
result = graph.invoke({"messages": [HumanMessage(content="Hello")]})
# Raises ValueError or runs without persistence
# RIGHT — always pass config with thread_id
config = {"configurable": {"thread_id": "my-thread"}}
result = graph.invoke({"messages": [HumanMessage(content="Hello")]}, config)
Mistake 2: Reusing thread IDs across different graphs
When two separate graphs share both a checkpointer and a thread ID, the second graph loads state that was meant for the first. The schemas won’t match, and you’ll get confusing errors.
# WRONG — same thread_id for different graphs
config = {"configurable": {"thread_id": "shared-id"}}
chatbot_graph.invoke(input, config) # saves chatbot state
workflow_graph.invoke(input, config) # loads chatbot state — breaks!
# RIGHT — prefix thread_ids by graph type
chatbot_config = {"configurable": {"thread_id": "chat-shared-id"}}
workflow_config = {"configurable": {"thread_id": "wf-shared-id"}}
[COMMON-MISTAKE]
Mistake 3: Using MemorySaver in production. One server restart — deploy, crash, scale event — wipes every chat. Users come back to a blank slate. For anything beyond local tests, use SqliteSaver or PostgresSaver.
Mistake 4: Skipping setup() on PostgresSaver
Postgres won’t make the checkpoint tables on its own. Skip pg_saver.setup() and the first write throws a “relation does not exist” error.
# WRONG — no setup call
with PostgresSaver.from_conn_string(DB_URI) as pg_saver:
graph = builder.compile(checkpointer=pg_saver)
graph.invoke(input, config) # ERROR: relation does not exist
# RIGHT — call setup() first
with PostgresSaver.from_conn_string(DB_URI) as pg_saver:
pg_saver.setup()
graph = builder.compile(checkpointer=pg_saver)
graph.invoke(input, config) # works
[UNDER-THE-HOOD]
What ends up on disk? The savers turn the full state dict into JSON and write one row per checkpoint. Each row also holds a parent pointer — the checkpoint_id of the one before it. These pointers form a linked list that get_state_history() walks through. The key thing to know: checkpoints are full copies, not diffs. A chat with 200 messages and 50 checkpoints stores those 200 messages 50 times. Keep your state lean.
Summary
With checkpointers, LangGraph stops being a fire-and-forget runner and becomes a system that genuinely remembers. Here’s what we covered:
-
MemorySaver — RAM-only storage. Lightning fast, but gone the instant the process stops.
-
SqliteSaver — file-based storage. Survives restarts and is ideal for solo development.
-
PostgresSaver — production-grade storage. Handles concurrent users and scales across servers.
-
Thread IDs — the key that maps checkpoints to a conversation. Different IDs, different chats.
-
get_state() / update_state() — read or patch the stored state without running the graph.
-
get_state_history() — browse every snapshot for debugging or time travel.
-
Time travel — fork from any past snapshot and explore a new path without losing the original.
Practice Exercise
Build a multi-turn research assistant backed by SqliteSaver. Give it three tools — web search, calculator, and note-taker. Have a conversation, stop the script, restart Python, and confirm the assistant still has full context. Then walk through get_state_history() to pinpoint the exact checkpoint where the assistant first invoked a tool.
Solution outline
- Define three tools with the
@tooldecorator - Build a ReAct-style graph with a tool node and conditional routing
- Compile with
SqliteSaver.from_conn_string("research.db") - Run a multi-turn conversation with a fixed
thread_id - Kill the script, restart Python, reconnect to
"research.db" - Call
invoke()with the samethread_id— the assistant should have full context - Use
get_state_history()and filter for checkpoints whose messages contain aToolMessage
FAQ
Can I use more than one checkpointer in the same app?
A graph accepts a single checkpointer when you compile it. However, nothing stops you from giving separate graphs their own savers.
graph_a = builder_a.compile(checkpointer=SqliteSaver.from_conn_string("a.db"))
graph_b = builder_b.compile(checkpointer=PostgresSaver.from_conn_string(DB_URI))
How much disk space do checkpoints use?
Checkpoints are full snapshots — not diffs. So a 50-message thread with 10 checkpoints stores those messages 10 times over. Keep an eye on your database size and set up a retention policy early.
What happens when two requests write to the same thread_id at the same time?
MemorySaver has no locking, so you’ll hit race conditions. PostgresSaver relies on database transactions to serialize writes safely — one of the strongest reasons to go with Postgres in production.
Can I delete old checkpoints?
There’s no built-in cleanup method yet. For database savers, write a SQL query that deletes rows older than your cutoff date. For MemorySaver, just reset its internal dict or let Python’s garbage collector reclaim the memory.
References
-
LangGraph documentation — Persistence concepts. Link
-
LangGraph documentation — How to add persistence (“Add memory”). Link
-
LangGraph documentation — Time travel. Link
-
langgraph-checkpoint — PyPI. Link
-
langgraph-checkpoint-sqlite — PyPI. Link
-
langgraph-checkpoint-postgres — PyPI. Link
-
LangGraph v0.2 release blog — New checkpointer libraries. Link
-
LangGraph API Reference — StateSnapshot. Link
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →