machine learning +
LLM Temperature, Top-P, and Top-K Explained — With Python Simulations
LangGraph Checkpointing: Save & Resume Graph State
Save and resume LangGraph agent state with checkpointers so your conversations survive crashes, restarts, and long breaks between sessions.
Give your LangGraph agents a durable memory — one that holds up through crashes, restarts, and days of idle time — by wiring in a checkpointer.
Imagine this. You spend ten minutes chatting with a LangGraph bot about your data pipeline. Then the server reboots. When you come back, the bot has zero clue who you are or what you talked about. Every message, gone.
That’s life without persistence. And it’s the problem this post solves.
By the time you finish reading, you’ll know how to save every step of a graph’s run, load it back at any point, and even rewind to an earlier state. Crashes, deploys, week-long breaks — none of that will erase your agent’s memory.
Here’s the gist. After each node runs, the checkpointer snaps a picture of the full state and tags it with a thread ID — a label that ties together all the snapshots from one chat. When the user comes back, you hand in the same thread ID, and the graph loads the most recent snapshot. These snapshots pile up in order, like chapters in a book. You can flip back to any chapter, look at what the state was, or even fork a new path from that spot. LangGraph calls this trick “time travel.”
What Is a Checkpointer and Why Should You Care?
A checkpointer is the piece that saves your graph’s state after every step. Leave it out, and the graph is a blank slate on each call — nothing carries over. Plug one in, and the graph picks up where it left off.
Let me show you the gap in code. Without a checkpointer, calls are strangers to each other:
python
# Without persistence — each call starts fresh
result1 = graph.invoke({"messages": [HumanMessage("My name is Alice")]})
result2 = graph.invoke({"messages": [HumanMessage("What's my name?")]})
# The graph has NO idea the user said "Alice" in the previous call
With a checkpointer, every call builds on the last:
python
# With persistence — the graph remembers
config = {"configurable": {"thread_id": "user-123"}}
result1 = graph.invoke({"messages": [HumanMessage("My name is Alice")]}, config)
result2 = graph.invoke({"messages": [HumanMessage("What's my name?")]}, config)
# The graph knows the user is Alice — it loaded the previous state
The thread_id is what links these calls. It tells the checkpointer which chat to look up. Change the thread ID and you start a brand-new chat. Keep it the same and the old chat resumes.
Key Insight: > A checkpointer doesn’t just keep the final output — it keeps every step along the way. Each time a node runs, a new snapshot appears. That means you can inspect, debug, or replay the graph at any point in its life, not just at the end.
Prerequisites
- Python version: 3.10+
- Required libraries: langgraph (0.4+), langchain-openai (0.3+), langchain-core (0.3+), langgraph-checkpoint-sqlite (2.0+)
- Install:
pip install langgraph langchain-openai langchain-core langgraph-checkpoint-sqlite - API key: An OpenAI API key set as
OPENAI_API_KEY. See OpenAI’s docs to create one. - Time to complete: ~30 minutes
- Prior knowledge: Basic LangGraph concepts (nodes, edges, state) from earlier posts in this series.
The next block pulls in every import we need for the rest of the article — the LLM wrapper, message types, graph helpers, and the in-memory checkpointer.
python
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
How Does MemorySaver Work?
MemorySaver is the quickest way to add state saving. It stashes snapshots in RAM — ideal while you learn, but not fit for real apps since everything vanishes when the process ends.
Let me build a tiny chatbot and attach a MemorySaver. One node sends the full message log to the LLM and returns the reply.
python
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def chatbot(state: MessagesState):
"""Send the full message history to the LLM."""
return {"messages": [llm.invoke(state["messages"])]}
# Build the graph
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
# Compile WITH a checkpointer
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
The magic is in builder.compile(checkpointer=memory). Drop that argument, and the graph works but forgets every call.
Time to test. I’ll fire off two messages on the same thread_id and see if the graph recalls the first one.
python
config = {"configurable": {"thread_id": "thread-1"}}
# First message
response1 = graph.invoke(
{"messages": [HumanMessage(content="Hi, I'm building a RAG pipeline.")]},
config
)
print(response1["messages"][-1].content)
State saved. Now for the real test — a follow-up that shares no context at all.
python
# Second message — same thread_id
response2 = graph.invoke(
{"messages": [HumanMessage(content="What did I say I'm working on?")]},
config
)
print(response2["messages"][-1].content)
It remembers. The checkpointer pulled the old state and tacked the new message on top. That’s persistence doing its job.
Think about it: What would happen if you swapped in a different thread_id for that second call? The graph opens a fresh, empty chat. It has no clue about the RAG pipeline, because that context belongs to "thread-1", not some other thread.
How Do Thread IDs and the Config Object Fit Together?
The checkpointer figures out which chat to load by looking at the thread_id in the config dict. Every call to invoke() needs one.
python
# Each thread_id is a separate conversation
config_alice = {"configurable": {"thread_id": "alice-session"}}
config_bob = {"configurable": {"thread_id": "bob-session"}}
# Alice's conversation
graph.invoke(
{"messages": [HumanMessage(content="I prefer PyTorch.")]},
config_alice
)
# Bob's conversation — completely independent
graph.invoke(
{"messages": [HumanMessage(content="I prefer TensorFlow.")]},
config_bob
)
Alice and Bob each get their own lane. Nothing leaks between them. In a real product, you’d use a user ID, session token, or UUID as the thread ID.
Tip: > Give threads readable names while you develop. Something like"test-thread-1"beats a random UUID for debugging. Flip to UUIDs once you go live so names never collide.
The config also holds a checkpoint_id. You rarely set it yourself, but it pinpoints one exact snapshot inside a thread. We’ll lean on it when we reach the time-travel section.
How Does SqliteSaver Keep State Across Restarts?
MemorySaver vanishes the second your process dies. SqliteSaver writes every snapshot to a file on disk. Your graph state survives reboots, crashes, and even a move to a new machine — as long as you bring the file along.
You need the langgraph-checkpoint-sqlite package (already in our prereqs). Call from_conn_string with a file path, and it builds the database if the file isn’t there yet.
python
from langgraph.checkpoint.sqlite import SqliteSaver
# The database file is created if it doesn't exist
with SqliteSaver.from_conn_string("checkpoints.db") as sqlite_saver:
graph = builder.compile(checkpointer=sqlite_saver)
config = {"configurable": {"thread_id": "persistent-thread"}}
response = graph.invoke(
{"messages": [HumanMessage(content="Remember this: project deadline is March 15.")]},
config
)
print(response["messages"][-1].content)
State is on disk now inside checkpoints.db. Close the script, fire up a fresh Python session, point at the same file, and the chat carries right on.
python
# After restarting Python — the conversation is still there
from langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string("checkpoints.db") as sqlite_saver:
graph = builder.compile(checkpointer=sqlite_saver)
config = {"configurable": {"thread_id": "persistent-thread"}}
response = graph.invoke(
{"messages": [HumanMessage(content="When is my project deadline?")]},
config
)
print(response["messages"][-1].content)
SqliteSaver loaded the whole history from disk. That’s the saving you want while building locally and running quick demos.
Warning: > SqliteSaver wraps in a context manager (withblock) because it holds a database link. Skip thewithand you own the cleanup. Forget to close it, and you risk a broken database file.
This saver works best when only one process writes at a time. SQLite locks the whole file on every write. Two requests at once? One has to wait for the other to finish. That rules it out for busy web apps.
When Should You Reach for PostgresSaver?
For anything that real users touch, use PostgresSaver. It stores snapshots in PostgreSQL, giving you parallel connections, multi-machine scaling, and battle-tested durability.
Grab the package first:
bash
pip install langgraph-checkpoint-postgres
The code looks almost the same as SqliteSaver. Pass a connection string, call setup() once to create the tables, and you’re set.
python
# Pseudocode — requires a running PostgreSQL instance
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:password@localhost:5432/langgraph_checkpoints"
with PostgresSaver.from_conn_string(DB_URI) as pg_saver:
pg_saver.setup() # creates checkpoint tables — run once
graph = builder.compile(checkpointer=pg_saver)
config = {"configurable": {"thread_id": "prod-thread-001"}}
response = graph.invoke(
{"messages": [HumanMessage(content="Start a new analysis task.")]},
config
)
Tip: > Go async in web apps. Both savers ship with async twins:AsyncSqliteSaverandAsyncPostgresSaver. They play nicely withasync/awaitand won’t jam your event loop.
Which Checkpointer Should You Pick?
| Checkpointer | Storage | Survives Restart? | Concurrent Access | Best For |
|---|---|---|---|---|
MemorySaver | RAM | No | Single process | Tutorials, unit tests |
SqliteSaver | File on disk | Yes | Single process | Local dev, prototypes |
PostgresSaver | PostgreSQL | Yes | Multi-process | Production, multi-server |
Grab MemorySaver to learn. Grab SqliteSaver for side projects. Grab PostgresSaver for anything with real traffic.
How Do You Peek Inside the State with get_state?
You’ve wired up saving. But what if you want to look inside the graph without running it again? get_state() hands you a StateSnapshot — a full picture of where things stand right now.
python
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "inspect-demo"}}
graph.invoke(
{"messages": [HumanMessage(content="Explain gradient descent in one sentence.")]},
config
)
# Inspect the current state
snapshot = graph.get_state(config)
print(f"Next node to run: {snapshot.next}")
print(f"Number of messages: {len(snapshot.values['messages'])}")
print(f"Last message: {snapshot.values['messages'][-1].content[:80]}...")
Inside the StateSnapshot you’ll find five things worth knowing:
snapshot.values— the state dict (yourMessagesStatefields)snapshot.next— which node fires next (empty tuple if the graph reached the end)snapshot.config— the config including thecheckpoint_idfor this particular snapshotsnapshot.metadata— timing info and the node that made this snapshotsnapshot.parent_config— a link back to the snapshot that came before
If snapshot.next is (), the graph hit END. If it shows ('chatbot',), the graph is paused right before the chatbot node — typical when you use human-in-the-loop stops.
How Do You Patch the State with update_state?
Between runs, you might need to tweak the state. Maybe you want to slip in a system prompt, undo a bad tool call, or shift the tone of the chat. update_state() lets you write new values straight into the snapshot.
Here’s a handy example. The model is fine, but you want to steer its style by injecting a rule.
python
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "update-demo"}}
graph.invoke(
{"messages": [HumanMessage(content="What's the capital of Australia?")]},
config
)
# Check the current state
snapshot = graph.get_state(config)
print(f"Model said: {snapshot.values['messages'][-1].content}")
Now I’ll inject a system note that changes the reply style. The as_node flag tells LangGraph which node “owns” this update.
python
# Inject a system-level instruction
graph.update_state(
config,
{"messages": [SystemMessage(content="Always respond in exactly two sentences.")]},
as_node="chatbot"
)
# The next invocation sees the injected message
response = graph.invoke(
{"messages": [HumanMessage(content="Tell me about kangaroos.")]},
config
)
print(response["messages"][-1].content)
Why does as_node matter? Setting it to "chatbot" makes the graph think the chatbot already ran, so it jumps to whatever comes next. Point it at a node that feeds into the chatbot, and the chatbot runs again with the fresh state.
Warning: >update_statewrites to the snapshot but doesn’t run any node. You still need to callinvoke()orstream()afterward to push the graph forward.
python
ExerciseBlock:
id: "inspect-modify-state"
title: "Inspect and Modify Graph State"
difficulty: "intermediate"
exerciseType: "write"
instructions: |
After running the chatbot once with the message "My favorite food is sushi":
1. Use get_state() to check how many messages are in the state
2. Use update_state() to inject a SystemMessage saying "Respond only in haiku format"
3. Invoke the graph again with "Write a poem about my favorite food"
starterCode: |
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def chatbot(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "exercise-update"}}
graph.invoke({"messages": [HumanMessage(content="My favorite food is sushi")]}, config)
# TODO: Step 1 — get_state and print message count
# TODO: Step 2 — update_state with SystemMessage
# TODO: Step 3 — invoke with new HumanMessage
testCases:
- id: "test-1"
input: "snapshot = graph.get_state(config); print(type(snapshot).__name__)"
expectedOutput: "StateSnapshot"
description: "get_state should return a StateSnapshot"
- id: "test-2"
input: "snapshot = graph.get_state(config); print(len(snapshot.values['messages']) >= 2)"
expectedOutput: "True"
description: "State should have at least 2 messages"
hints:
- "Call snapshot = graph.get_state(config) and check len(snapshot.values['messages'])."
- "Use graph.update_state(config, {'messages': [SystemMessage(content='Respond only in haiku format')]}, as_node='chatbot')."
solution: |
snapshot = graph.get_state(config)
print(f"Messages: {len(snapshot.values['messages'])}")
graph.update_state(
config,
{"messages": [SystemMessage(content="Respond only in haiku format")]},
as_node="chatbot"
)
response = graph.invoke(
{"messages": [HumanMessage(content="Write a poem about my favorite food")]},
config
)
print(response["messages"][-1].content)
solutionExplanation: |
get_state() returns the current snapshot with 2 messages (user + AI response).
update_state() injects the system message as if the chatbot produced it.
The next invoke() loads the full state — including the system instruction — so
the LLM responds in haiku format about sushi.
xpReward: 20
How Do You Scroll Through Past Snapshots?
Each node run adds a snapshot. get_state_history() lets you walk through every one of them — a full log of everything the graph did.
python
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "history-demo"}}
# Have a short conversation
graph.invoke(
{"messages": [HumanMessage(content="What is LangGraph?")]},
config
)
graph.invoke(
{"messages": [HumanMessage(content="How does it handle state?")]},
config
)
# Browse all checkpoints for this thread
for snapshot in graph.get_state_history(config):
msg_count = len(snapshot.values["messages"])
source = snapshot.metadata.get("source", "unknown")
step = snapshot.metadata.get("step", "?")
print(f"Step {step} | Messages: {msg_count} | Source: {source}")
Results come back newest-first. You’ll spot entries from both calls, plus the in-between steps.
Every snapshot’s snapshot.config holds a one-of-a-kind checkpoint_id. Feed it back to load that exact moment:
python
history = list(graph.get_state_history(config))
# The last item is the very first checkpoint
oldest = history[-1]
print(f"Oldest checkpoint has {len(oldest.values['messages'])} messages")
Guess the count: Two rounds with this chatbot graph create at least five snapshots. Each round adds one when the input lands and one after the chatbot node wraps up. There’s also the blank starting state. The exact total may shift by LangGraph version, but it’s always more than two.
How Does Time Travel Work?
Here is where persistence gets genuinely exciting. Time travel lets you jump back to any old snapshot and carry on from there. Chasing down a weird reply? Rewind to the moment before it showed up and feed in a new prompt. Building an “undo” feature? Branch off from an older snapshot.
Pick a snapshot from the history, pass its config into invoke(), and the graph resumes from that exact spot.
python
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "time-travel-demo"}}
# Build up some conversation
graph.invoke(
{"messages": [HumanMessage(content="I want to learn about transformers.")]},
config
)
graph.invoke(
{"messages": [HumanMessage(content="Actually, let's talk about CNNs instead.")]},
config
)
# Check current state
current = graph.get_state(config)
print(f"Current messages: {len(current.values['messages'])}")
I’ll hunt for the snapshot right after the first exchange and branch from there — as if the second message never happened.
python
# Find the checkpoint after the first exchange (2 messages: user + AI)
history = list(graph.get_state_history(config))
for snap in history:
if len(snap.values["messages"]) == 2:
rewind_config = snap.config
print(f"Found checkpoint with {len(snap.values['messages'])} messages")
break
# Resume from that earlier point with a DIFFERENT follow-up
response = graph.invoke(
{"messages": [HumanMessage(content="How do attention heads work?")]},
rewind_config
)
print(response["messages"][-1].content)
The graph picked up from “I want to learn about transformers” — the CNN message never reached it. The main thread still holds both messages, but this new path split off on its own.
Key Insight: > Time travel creates a fork, not a rewrite. The original chat stays put. When you resume from a past snapshot, LangGraph opens a fresh branch. You can always go back to the main line.
python
ExerciseBlock:
id: "time-travel-fork"
title: "Time Travel — Fork a Conversation"
difficulty: "intermediate"
exerciseType: "write"
instructions: |
1. Run two exchanges with the chatbot: first about "neural networks", then about "backpropagation"
2. Use get_state_history() to list all checkpoints
3. Find the checkpoint after the FIRST exchange (2 messages)
4. Fork from that checkpoint by asking "How do CNNs differ from RNNs?"
starterCode: |
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "fork-exercise"}}
# Exchange 1
graph.invoke({"messages": [HumanMessage(content="Let's discuss neural networks")]}, config)
# Exchange 2
graph.invoke({"messages": [HumanMessage(content="Now explain backpropagation")]}, config)
# TODO: find the checkpoint with 2 messages and fork from it
rewind_config = None
testCases:
- id: "test-1"
input: "print(len(list(graph.get_state_history(config))) > 2)"
expectedOutput: "True"
description: "Should have more than 2 checkpoints"
- id: "test-2"
input: "print(rewind_config is not None)"
expectedOutput: "True"
description: "Should find a checkpoint to rewind to"
hints:
- "Loop through graph.get_state_history(config) and check len(snap.values['messages']) == 2."
- "Pass the found snapshot's snap.config as the config to graph.invoke()."
solution: |
rewind_config = None
for snap in graph.get_state_history(config):
if len(snap.values["messages"]) == 2:
rewind_config = snap.config
break
response = graph.invoke(
{"messages": [HumanMessage(content="How do CNNs differ from RNNs?")]},
rewind_config
)
print(response["messages"][-1].content)
solutionExplanation: |
We scan the checkpoint history for the snapshot with exactly 2 messages (the first
user message plus the AI response). Invoking with that snapshot's config creates a
fork — the graph only knows about neural networks, not backpropagation.
xpReward: 20
What Can You Learn from Snapshot Metadata?
Every snapshot carries a tag bag of metadata — when it was made, which node made it, and what step it landed on. This is a lifesaver when you’re debugging a graph with a dozen nodes.
python
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "metadata-demo"}}
graph.invoke(
{"messages": [HumanMessage(content="Hello!")]},
config
)
for snapshot in graph.get_state_history(config):
meta = snapshot.metadata
print(f"Step: {meta.get('step')} | "
f"Source: {meta.get('source')} | "
f"Writes: {meta.get('writes', {}).keys() if meta.get('writes') else 'none'}")
What’s in the metadata dict?
source—"input"means a user message arrived,"loop"means a node ranstep— the step number inside the runwrites— which state keys got changed at this stepthread_id— which thread owns this snapshot
You can filter by metadata too. Want only the snapshots where the chatbot node did its thing?
python
chatbot_checkpoints = [
snap for snap in graph.get_state_history(config)
if snap.metadata.get("source") == "loop"
]
print(f"The chatbot node ran {len(chatbot_checkpoints)} time(s)")
How Do You Juggle Many Users at Once?
In a live product, many people chat at the same time. Each one gets a unique thread_id, and the checkpointer keeps every chat walled off from the others.
Here’s a quick sketch — three users hitting the same bot in parallel.
python
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
threads = {
"user-alice": "I need help with pandas groupby.",
"user-bob": "How do I deploy a Flask app?",
"user-carol": "Explain gradient boosting.",
}
for thread_id, message in threads.items():
config = {"configurable": {"thread_id": thread_id}}
graph.invoke(
{"messages": [HumanMessage(content=message)]},
config
)
# Later — Alice sends a follow-up
alice_config = {"configurable": {"thread_id": "user-alice"}}
response = graph.invoke(
{"messages": [HumanMessage(content="Show me multi-column groupby.")]},
alice_config
)
print(response["messages"][-1].content)
Alice’s follow-up lands because her thread holds the full history. Bob and Carol are untouched.
Tip: > LangGraph won’t tidy up stale threads on its own. In a live system, schedule a cleanup task that removes threads past your keep-alive window. WithPostgresSaver, that’s a simple SQLDELETEon the snapshots table.
What Does a Real-World Resumable Pipeline Look Like?
Persistence isn’t just a chatbot feature. Long-running workflows that can fail mid-way gain just as much from it. If a three-step job crashes on step two, you don’t want to re-do step one from scratch.
Each node below handles one clear task. The checkpointer saves state after every one. A custom WorkflowState holds fields for each step’s output.
python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class WorkflowState(TypedDict):
task: str
step1_result: str
step2_result: str
step3_result: str
def step_one(state: WorkflowState) -> dict:
"""Simulate an expensive API call."""
print("Running step 1 — fetching data...")
return {"step1_result": f"Data for '{state['task']}' fetched"}
def step_two(state: WorkflowState) -> dict:
"""Simulate processing."""
print("Running step 2 — processing data...")
return {"step2_result": f"Processed: {state['step1_result']}"}
def step_three(state: WorkflowState) -> dict:
"""Simulate saving results."""
print("Running step 3 — saving results...")
return {"step3_result": f"Saved: {state['step2_result']}"}
Wire them into a straight-line graph and attach a checkpointer.
python
workflow = StateGraph(WorkflowState)
workflow.add_node("step1", step_one)
workflow.add_node("step2", step_two)
workflow.add_node("step3", step_three)
workflow.add_edge(START, "step1")
workflow.add_edge("step1", "step2")
workflow.add_edge("step2", "step3")
workflow.add_edge("step3", END)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "workflow-run-1"}}
result = app.invoke({"task": "quarterly-report"}, config)
print(result["step3_result"])
python
Running step 1 — fetching data...
Running step 2 — processing data...
Running step 3 — saving results...
Saved: Processed: Data for 'quarterly-report' fetched
A snapshot landed after each step. Had step two blown up, you’d look at the state, fix the root cause, and resume from that snapshot — no need to run step one again. That’s the real win of persistence for production pipelines.
What Mistakes Trip People Up Most?
Mistake 1: Forgetting the config
This is the number-one slip-up. Without a thread_id, the checkpointer can’t save or load a thing.
python
# WRONG — no config passed
result = graph.invoke({"messages": [HumanMessage(content="Hello")]})
# Raises ValueError or runs without persistence
# RIGHT — always pass config with thread_id
config = {"configurable": {"thread_id": "my-thread"}}
result = graph.invoke({"messages": [HumanMessage(content="Hello")]}, config)
Mistake 2: Sharing a thread_id between unrelated graphs
Two graphs with the same checkpointer and the same thread_id end up loading each other’s state. The shapes clash, and you get baffling errors.
python
# WRONG — same thread_id for different graphs
config = {"configurable": {"thread_id": "shared-id"}}
chatbot_graph.invoke(input, config) # saves chatbot state
workflow_graph.invoke(input, config) # loads chatbot state — breaks!
# RIGHT — prefix thread_ids by graph type
chatbot_config = {"configurable": {"thread_id": "chat-shared-id"}}
workflow_config = {"configurable": {"thread_id": "wf-shared-id"}}
[COMMON-MISTAKE]
Mistake 3: Running MemorySaver in a live product. A server reboot — deploy, crash, scaling event — wipes out all history. Users lose their context without a trace. Stick withSqliteSaverorPostgresSaverfor anything that must survive a restart.
Mistake 4: Skipping setup() with PostgresSaver
PostgresSaver needs tables before it can write. Forget pg_saver.setup() and the first write throws a database error.
python
# WRONG — no setup call
with PostgresSaver.from_conn_string(DB_URI) as pg_saver:
graph = builder.compile(checkpointer=pg_saver)
graph.invoke(input, config) # ERROR: relation does not exist
# RIGHT — call setup() first
with PostgresSaver.from_conn_string(DB_URI) as pg_saver:
pg_saver.setup()
graph = builder.compile(checkpointer=pg_saver)
graph.invoke(input, config) # works
[UNDER-THE-HOOD]
How the data is stored behind the scenes. Each snapshot holds the full state dict, serialized as JSON for file- and database-backed savers. The saver also stores a parent pointer — thecheckpoint_idof the previous snapshot. This forms a linked list thatget_state_history()walks backward. Bigger states mean bigger snapshots, so keep your state lean. AMessagesStatewith 200 messages and 50 snapshots means those 200 messages are stored 50 times over.
Exercises
Exercise: Build a Chat That Remembers Across Calls
Bring it all together. Build a chatbot, have a multi-turn chat, peek at the state, and use time travel to branch off.
python
ExerciseBlock:
id: "persistence-chatbot"
title: "Build a Persistent Chatbot with State Inspection"
difficulty: "intermediate"
exerciseType: "write"
instructions: |
1. Build a chatbot graph with MemorySaver
2. Send "My favorite language is Python" and then "What's my favorite language?"
3. Use get_state() to print the total number of messages
4. Use get_state_history() to print the number of checkpoints
Use thread_id "exercise-thread".
starterCode: |
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def chatbot(state: MessagesState):
# TODO: call the LLM with state["messages"]
pass
# TODO: build graph, compile with MemorySaver
# TODO: invoke twice, then inspect state and history
testCases:
- id: "test-1"
input: "print(type(graph).__name__)"
expectedOutput: "CompiledStateGraph"
description: "Graph should be compiled"
- id: "test-2"
input: "snapshot = graph.get_state(config); print(len(snapshot.values['messages']) == 4)"
expectedOutput: "True"
description: "Should have 4 messages after 2 exchanges"
- id: "test-3"
input: "print(len(list(graph.get_state_history(config))) > 2)"
expectedOutput: "True"
description: "Should have multiple checkpoints"
hints:
- "The chatbot node returns {'messages': [llm.invoke(state['messages'])]}."
- "After two invoke() calls with the same thread_id, get_state() will show 4 messages (2 user + 2 AI)."
solution: |
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.checkpoint.memory import MemorySaver
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def chatbot(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}
builder = StateGraph(MessagesState)
builder.add_node("chatbot", chatbot)
builder.add_edge(START, "chatbot")
builder.add_edge("chatbot", END)
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "exercise-thread"}}
graph.invoke({"messages": [HumanMessage(content="My favorite language is Python")]}, config)
graph.invoke({"messages": [HumanMessage(content="What's my favorite language?")]}, config)
snapshot = graph.get_state(config)
print(f"Total messages: {len(snapshot.values['messages'])}")
print(f"Checkpoints: {len(list(graph.get_state_history(config)))}")
solutionExplanation: |
After two exchanges, the state has 4 messages: two from the user and two AI responses.
The checkpoint history contains more entries because LangGraph creates checkpoints at
the start of each invocation and after each node runs.
xpReward: 15
Summary
Persistence turns LangGraph from a fire-and-forget engine into a system with lasting memory. Here’s the quick recap:
- MemorySaver — RAM-based, fast, gone the moment the process ends
- SqliteSaver — file-based, survives restarts, great for local builds
- PostgresSaver — database-backed, handles many users, built for traffic
- thread_id — the tag that ties snapshots to a chat; new ID = new chat
- get_state() — look at the current state; update_state() — patch it on the fly
- get_state_history() — browse every past snapshot for debugging and rewinding
- Time travel — branch off from any old snapshot without losing the main thread
Practice Exercise
Build a multi-turn helper that writes to disk with SqliteSaver. Give it three tools (web search, math, note-taker). Chat with it, kill the script, restart Python, and make sure the helper still has full context. Then dig through get_state_history() to find the first snapshot where the helper called a tool.
FAQ
Q: Can I wire up more than one checkpointer in the same app?
Each graph accepts one checkpointer at compile time. But different graphs can each use their own saver.
python
graph_a = builder_a.compile(checkpointer=SqliteSaver.from_conn_string("a.db"))
graph_b = builder_b.compile(checkpointer=PostgresSaver.from_conn_string(DB_URI))
Q: How much disk space do snapshots eat?
Each snapshot is a full copy, not a diff. A 50-message chat with 10 snapshots stores those 50 messages 10 times over. Watch your database size and set a retention window for live apps.
Q: What happens when two requests write to the same thread_id at the same instant?
MemorySaver gives you a race condition. PostgresSaver handles it through database-level locks. That’s a core reason to pick Postgres for anything public-facing.
Q: Can I delete old snapshots?
LangGraph ships no built-in cleanup API. For database-backed savers, write SQL against the snapshot tables. For MemorySaver, wipe the internal dict or let Python’s garbage collector handle it.
References
- LangGraph documentation — Persistence concepts. Link
- LangGraph documentation — How to add persistence (“Add memory”). Link
- LangGraph documentation — Time travel. Link
- langgraph-checkpoint — PyPI. Link
- langgraph-checkpoint-sqlite — PyPI. Link
- langgraph-checkpoint-postgres — PyPI. Link
- LangGraph v0.2 release blog — New checkpointer libraries. Link
- LangGraph API Reference — StateSnapshot. Link
Free Course
Master Core Python — Your First Step into AI/ML
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Up Next in Learning Path
LangGraph Memory: Short-Term, Long-Term & Conversation
