machine learning +
LLM Temperature, Top-P, and Top-K Explained — With Python Simulations
LangGraph Streaming: Real-Time Agent Output Guide
Add real-time streaming to your LangGraph agents — pick the right stream mode, show tokens as they arrive, and build chat UIs that feel instant.
Learn how to stream LLM tokens in real time so your LangGraph agent feels fast — even when it takes seconds to think.
Your LangGraph agent runs great — until a user asks it something. The screen goes blank. Five seconds pass. Ten seconds. Then the full answer lands all at once. To a user, that silence feels like a crash.
ChatGPT handles this differently. Words appear the moment they are born. The screen stays alive. People feel like the app is fast, even though the total wait is the same.
That live feel comes from streaming. LangGraph ships five streaming modes and a fine-grained event API to help you get it. By the end of this guide, you will know which mode fits your use case and how to wire it into your app.
What Does Streaming Mean in LangGraph?
When a graph streams, it pushes bits of output before the full run is done. You do not wait for every node to finish. Data arrives as each node wraps up — or even as each token leaves the LLM.
Two layers make this work. Graph-level streaming fires state snapshots or diffs every time a node wraps up. Token-level streaming drips single tokens out of the LLM the instant they form. That is the typing feel ChatGPT made famous.
Both layers share the same two methods: .stream() for sync code and .astream() for async. A single stream_mode flag controls what comes back.
What You Need Before Starting
- Python: 3.9 or newer (3.11+ if you plan to use async
get_stream_writer) - Packages: langchain-openai 0.2+, langgraph 0.2+, python-dotenv
- One-line install:
pip install langchain-openai langgraph python-dotenv - API Key: Drop your
OPENAI_API_KEYinto a.envfile (grab one here) - Clock: Roughly 25 minutes end to end
- Prior knowledge: You should already be comfortable with LangGraph nodes, edges, and
MessagesStatefrom the Tool Calling tutorial.
Here is the tiny graph every example in this guide will share. One node, one LLM call — nothing else, on purpose.
python
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, MessagesState, START, END
load_dotenv()
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def chatbot(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}
graph = StateGraph(MessagesState)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot")
graph.add_edge("chatbot", END)
app = graph.compile()
We will ask this graph a question and read the answer back through five different lenses.
Which Streaming Mode Should You Pick?
Before we dive in, here is a decision tree worth bookmarking. It answers the “which mode?” question in seconds.
python
Do you need token-by-token output (ChatGPT effect)?
YES → stream_mode="messages"
NO → Do you need the full state at every step?
YES → stream_mode="values" (debugging)
NO → Do you need only what changed?
YES → stream_mode="updates" (production)
NO → Do you need custom progress data?
YES → stream_mode="custom"
NO → stream_mode="debug" (development)
The same info as a lookup table:
| Mode | What It Sends | Best For |
|---|---|---|
values | Full state after each node | Debugging, simple UIs |
updates | Only the changes from each node | Lean live frontends |
messages | LLM tokens + metadata | Chat apps |
custom | Your own data via get_stream_writer | Progress bars, status notes |
debug | Full run trace | Dev work, fixing bugs |
Key Insight: Let your frontend decide the mode. Full snapshot at every step? `values`. Only deltas? `updates`. Live typing feel? `messages`.
If you are building a chatbot, messages is probably where you will land. Still, we start with values — it is the simplest to reason about and makes a great mental model for the rest.
How Does values Mode Work?
Imagine your 10-node graph returns garbage. Where did the data go off the rails? With values mode, you find out fast.
Each chunk is a full copy of the state at that moment — every key, every value, frozen right after a node wraps up.
python
inputs = {"messages": [HumanMessage(content="What is LangGraph?")]}
for chunk in app.stream(inputs, stream_mode="values"):
if "messages" in chunk:
last_msg = chunk["messages"][-1]
print(f"[{last_msg.type}] {last_msg.content[:80]}...")
python
[human] What is LangGraph?...
[ai] LangGraph is a framework built on top of LangChain for creating stateful,...
Two chunks come back. Chunk one holds just your input — the state snapshot before the chatbot fires. Chunk two holds both messages — the snapshot after the chatbot wraps up.
Quick Check: Picture a three-node graph: planner, researcher, writer. How many chunks does values produce? Four total. The initial state plus one per node.
Tip: `values` shines for debugging but wastes bandwidth in prod. A state with 50 keys and 10 nodes means 500 pairs cross the wire — most of them the same data you already sent.
How Does updates Mode Work?
Sometimes you only care about the delta — what a node added or changed. updates gives you just that. Each chunk is a dict keyed by node name, holding only what that node put out.
python
for chunk in app.stream(inputs, stream_mode="updates"):
for node_name, node_output in chunk.items():
print(f"Node '{node_name}' produced:")
if "messages" in node_output:
print(f" {node_output['messages'][-1].content[:80]}...")
python
Node 'chatbot' produced:
LangGraph is a framework built on top of LangChain for creating stateful,...
Only one chunk lands this time. The START node produces nothing, so updates ignores it entirely.
Why care? Do the math. A 10-node graph with 50 state keys sends 500 pairs in values mode. With updates, each node sends only the 2-3 keys it touched. That cuts payload size by 10x — a big deal under real traffic.
How Does messages Mode Create the ChatGPT Effect?
Here is the crowd favorite. messages mode fires every LLM token the instant the model produces it. Your users see words land on screen one at a time, just like ChatGPT.
Each chunk is a tuple: (message_chunk, metadata). The first item holds a sliver of text — often one token. The second tells you which graph node made it.
python
for msg, metadata in app.stream(inputs, stream_mode="messages"):
if msg.content and metadata["langgraph_node"] == "chatbot":
print(msg.content, end="", flush=True)
python
LangGraph is a framework built on top of LangChain for creating stateful, multi-step AI workflows...
Tokens drip onto the screen one by one. That flush=True flag is vital — leave it off and Python holds data in a buffer, killing the live-typing feel.
Warning: `messages` only carries chat model output. Nodes that do math or logic without an LLM stay silent. Pair `messages` with `updates` if you need to see those steps too.
Want live tokens and a “node done” ping in one loop? Pass a list of modes. Each chunk becomes a (mode_name, data) tuple so you can branch on the label.
python
for event in app.stream(inputs, stream_mode=["messages", "updates"]):
mode = event[0]
data = event[1]
if mode == "messages":
msg, meta = data
if msg.content:
print(msg.content, end="", flush=True)
elif mode == "updates":
print(f"\n--- Node completed: {list(data.keys())} ---")
The mode label lets your UI route data to the right widget — live text in the main pane, a progress sidebar for node-done signals.
{type: ‘exercise’, id: ‘streaming-basics-ex1’, title: ‘Exercise 1: Stream and Count Tokens’, difficulty: ‘intermediate’, exerciseType: ‘write’, instructions: ‘Using the base app graph from above, stream the response to the question “Explain Python decorators in 2 sentences” using messages mode. Count the total number of tokens streamed and print the count at the end. Filter to only count tokens from the “chatbot” node.’, starterCode: ‘inputs = {“messages”: [HumanMessage(content=”Explain Python decorators in 2 sentences”)]}\n\ntoken_count = 0\nfor msg, metadata in app.stream(inputs, stream_mode=”messages”):\n # TODO: Check if this is from the chatbot node and has content\n # TODO: Print each token and increment the counter\n pass\n\nprint(f”\nTotal tokens streamed: {token_count}”)’, testCases: [{id: ‘tc1’, input: ‘print(token_count > 0)’, expectedOutput: ‘True’, description: ‘Should count at least 1 token’}, {id: ‘tc2’, input: ‘print(type(token_count))’, expectedOutput: “metadata["langgraph_node"] == "chatbot" and msg.content is not empty before counting.’, ‘Full condition: if msg.content and metadata["langgraph_node"] == "chatbot": print(msg.content, end="", flush=True); token_count += 1‘], solution: ‘inputs = {“messages”: [HumanMessage(content=”Explain Python decorators in 2 sentences”)]}\n\ntoken_count = 0\nfor msg, metadata in app.stream(inputs, stream_mode=”messages”):\n if msg.content and metadata[“langgraph_node”] == “chatbot”:\n print(msg.content, end=””, flush=True)\n token_count += 1\n\nprint(f”\nTotal tokens streamed: {token_count}”)’, solutionExplanation: ‘Each emission in messages mode contains one token chunk. By filtering on langgraph_node and checking that msg.content is non-empty, we count only actual content tokens from our target node. The count tells us how many individual chunks the LLM streamed.’, xpReward: 15}
How Does custom Mode Let You Push Your Own Data?
python
from langgraph.config import get_stream_writer
def research_node(state: MessagesState):
writer = get_stream_writer()
writer({"status": "Starting research..."})
writer({"status": "Searching documents...", "progress": 0.5})
response = llm.invoke(state["messages"])
writer({"status": "Complete", "progress": 1.0})
return {"messages": [response]}
See get_stream_writer()? It gives your node a callback that injects any dict into the live stream. Progress bars, status labels, partial scores — stuff your UI wants but your state should not hold.
I use custom mode whenever a pipeline runs past 30 seconds. Without visible progress, users think the app froze. A small status bar from get_stream_writer() keeps them calm.
Wire this node into a graph and ask for both custom and updates. Custom events arrive while the node runs; the updates event lands once it finishes.
python
research_graph = StateGraph(MessagesState)
research_graph.add_node("research", research_node)
research_graph.add_edge(START, "research")
research_graph.add_edge("research", END)
research_app = research_graph.compile()
for event in research_app.stream(
inputs, stream_mode=["custom", "updates"]
):
mode, data = event
if mode == "custom":
print(f" Status: {data}")
elif mode == "updates":
print(f" Node done: {list(data.keys())}")
python
Status: {'status': 'Starting research...'}
Status: {'status': 'Searching documents...', 'progress': 0.5}
Status: {'status': 'Complete', 'progress': 1.0}
Node done: ['research']
Try this: Drop "updates" from the list and pass stream_mode="custom" alone. You see only the three status dicts. The “node done” line vanishes — it lives in the updates layer, which you did not ask for.
Note: `get_stream_writer()` needs Python 3.11+ in async code. On older Python, add a `writer` param to your node and LangGraph injects it for you.
What Does debug Mode Show You?
Picture this: a conditional edge sends control to the wrong node and you have no idea why. debug mode is your answer.
It blasts rich events at every stage — node starts, node ends, state snapshots, and error payloads. Never show this to end users, but in dev it works like an X-ray for your graph.
python
for event in app.stream(inputs, stream_mode="debug"):
event_type = event["type"]
if event_type == "task":
print(f"Task: node='{event['payload']['name']}'")
elif event_type == "task_result":
name = event['payload']['name']
keys = list(event['payload']['result'].keys())
print(f"Result: node='{name}' -> {keys}")
python
Task: node='chatbot'
Result: node='chatbot' -> ['messages']
Every routing decision is laid bare — which condition matched, which node ran next. I flip this mode on the instant a conditional edge misbehaves.
How Do You Stream Tokens with astream_events?
For most chat apps, messages mode is enough. But what if you need to filter by model name, track a tool call from start to finish, or grab tokens from a nested subgraph?
astream_events() opens that door. It fires a typed event for every LLM call, tool run, and node switch in a graph run. You pick the events you care about and ignore the rest.
The basic pattern: loop through events, match on on_chat_model_stream, and grab the token. The version="v2" flag pins the stable event format.
python
import asyncio
async def stream_tokens():
async for event in app.astream_events(
inputs, version="v2"
):
if event["event"] == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if chunk.content:
print(chunk.content, end="", flush=True)
print()
asyncio.run(stream_tokens())
python
LangGraph is a framework built on top of LangChain for creating stateful, multi-step AI workflows...
Why bother? You get lifecycle triplets — on_chat_model_start, on_chat_model_stream, on_chat_model_end — for every model call. Chains, tools, and custom runnables follow the same pattern.
python
async def show_event_details():
async for event in app.astream_events(
inputs, version="v2"
):
kind = event["event"]
if kind == "on_chat_model_start":
print(f"Model started: {event['name']}")
elif kind == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
elif kind == "on_chat_model_end":
print(f"\nModel finished: {event['name']}")
asyncio.run(show_event_details())
python
Model started: ChatOpenAI
LangGraph is a framework built on top of LangChain...
Model finished: ChatOpenAI
How Do You Filter Events by Node?
Real graphs have many nodes. A planner, a retriever, and a responder might each call a different model. If you stream every token from every call, the user drowns in noise.
Fix: filter on metadata. Each event has a langgraph_node key that names the node that made it.
python
async def stream_filtered():
async for event in app.astream_events(
inputs, version="v2"
):
if event["event"] != "on_chat_model_stream":
continue
node = event.get("metadata", {}).get(
"langgraph_node", ""
)
if node == "chatbot":
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
print()
asyncio.run(stream_filtered())
In a planner-plus-responder graph, this filter lets you stream only the responder’s tokens. The planning step churns away in silence.
Tip: Name your models. Pass `ChatOpenAI(model=”gpt-4o-mini”, name=”response_model”)`. Then filter with `event[“name”] == “response_model”` — cleaner than digging through node metadata when many nodes share one model class.
How Does Streaming Work with Tool Calls?
Tools add a twist. The stream now holds two kinds of data: raw LLM text and structured tool-call objects. Your code has to sort them apart.
We will build an agent with a weather tool and stream the full round trip. The agent picks a tool, LangGraph runs it, then the agent reads the result and writes a streamed reply.
python
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode, tools_condition
@tool
def get_weather(city: str) -> str:
"""Get the current weather for a city."""
return f"Sunny, 22C in {city}"
tools = [get_weather]
llm_with_tools = ChatOpenAI(
model="gpt-4o-mini", temperature=0
).bind_tools(tools)
def agent(state: MessagesState):
return {"messages": [llm_with_tools.invoke(state["messages"])]}
Connect the agent and tool node with a conditional edge. tools_condition checks the agent’s output. If it holds tool calls, control goes to tools. If not, it goes to END.
python
tool_graph = StateGraph(MessagesState)
tool_graph.add_node("agent", agent)
tool_graph.add_node("tools", ToolNode(tools))
tool_graph.add_edge(START, "agent")
tool_graph.add_conditional_edges("agent", tools_condition)
tool_graph.add_edge("tools", "agent")
tool_app = tool_graph.compile()
In messages mode, tool calls show up as AIMessageChunk objects with a filled tool_calls list. While a tool runs, the content field is empty — the model is sending JSON, not text.
python
tool_inputs = {
"messages": [HumanMessage(content="What's the weather in Paris?")]
}
for msg, metadata in tool_app.stream(
tool_inputs, stream_mode="messages"
):
if hasattr(msg, "tool_calls") and msg.tool_calls:
for tc in msg.tool_calls:
print(f"[Tool Call] {tc['name']}({tc['args']})")
elif msg.content:
node = metadata.get("langgraph_node", "")
print(f"[{node}] {msg.content}", end="", flush=True)
python
[Tool Call] get_weather({'city': 'Paris'})
[agent] The current weather in Paris is sunny with a temperature of 22°C.
The agent node fires twice. First pass: the model asks for weather data and emits a tool call. LangGraph routes to tools, runs get_weather, and loops back. Second pass: the model reads the tool result and streams a final answer token by token.
Try this: Ask “What’s 2 + 2?” The agent has no math tool — only get_weather. So it skips tools, answers on its own, and you see only [agent] tokens. No [Tool Call] line shows up.
{type: ‘exercise’, id: ‘streaming-tools-ex2’, title: ‘Exercise 2: Stream with Multiple Tools’, difficulty: ‘intermediate’, exerciseType: ‘write’, instructions: ‘Add a second tool called get_time that takes a timezone string and returns a fake time string. Build the agent graph with both tools. Stream a response to “What is the weather and time in Tokyo?” using messages mode. Print tool calls in [Tool: name] format and stream the final answer.’, starterCode: ‘@tool\ndef get_time(timezone: str) -> str:\n “””Get the current time in a timezone.”””\n return f”14:30 in {timezone}”\n\n# TODO: Create tools list with both get_weather and get_time\n# TODO: Bind tools to a ChatOpenAI model\n# TODO: Build the graph with agent + tools nodes\n# TODO: Stream with messages mode, printing tool calls and tokens\n\nmulti_tools = [get_weather, get_time]\nllm_multi = ChatOpenAI(model=”gpt-4o-mini”, temperature=0).bind_tools(multi_tools)\n\ndef multi_agent(state: MessagesState):\n return {“messages”: [llm_multi.invoke(state[“messages”])]}\n\nmg = StateGraph(MessagesState)\nmg.add_node(“agent”, multi_agent)\nmg.add_node(“tools”, ToolNode(multi_tools))\nmg.add_edge(START, “agent”)\nmg.add_conditional_edges(“agent”, tools_condition)\nmg.add_edge(“tools”, “agent”)\nmulti_app = mg.compile()\n\nmulti_inputs = {“messages”: [HumanMessage(content=”What is the weather and time in Tokyo?”)]}\n\nfor msg, meta in multi_app.stream(multi_inputs, stream_mode=”messages”):\n # TODO: Handle tool calls and streaming tokens\n pass’, testCases: [{id: ‘tc1’, input: ‘print(“graph_compiled” if multi_app else “no”)’, expectedOutput: ‘graph_compiled’, description: ‘Graph should compile successfully’}, {id: ‘tc2’, input: ‘print(len(multi_tools))’, expectedOutput: ‘2’, description: ‘Should have 2 tools’}], hints: [‘Check for tool calls with hasattr(msg, "tool_calls") and msg.tool_calls. Print the name with tc["name"].’, ‘For token streaming, check msg.content and meta["langgraph_node"] == "agent" then print with end="" and flush=True.’], solution: ‘@tool\ndef get_time(timezone: str) -> str:\n “””Get the current time in a timezone.”””\n return f”14:30 in {timezone}”\n\nmulti_tools = [get_weather, get_time]\nllm_multi = ChatOpenAI(model=”gpt-4o-mini”, temperature=0).bind_tools(multi_tools)\n\ndef multi_agent(state: MessagesState):\n return {“messages”: [llm_multi.invoke(state[“messages”])]}\n\nmg = StateGraph(MessagesState)\nmg.add_node(“agent”, multi_agent)\nmg.add_node(“tools”, ToolNode(multi_tools))\nmg.add_edge(START, “agent”)\nmg.add_conditional_edges(“agent”, tools_condition)\nmg.add_edge(“tools”, “agent”)\nmulti_app = mg.compile()\n\nmulti_inputs = {“messages”: [HumanMessage(content=”What is the weather and time in Tokyo?”)]}\n\nfor msg, meta in multi_app.stream(multi_inputs, stream_mode=”messages”):\n if hasattr(msg, “tool_calls”) and msg.tool_calls:\n for tc in msg.tool_calls:\n print(f”[Tool: {tc[\”name\”]}]”)\n elif msg.content and meta[“langgraph_node”] == “agent”:\n print(msg.content, end=””, flush=True)\nprint()’, solutionExplanation: ‘The agent may call both tools in a single turn (parallel tool calls) or sequentially. Either way, the messages stream mode emits tool call chunks and content chunks separately. Filtering by langgraph_node == "agent" ensures we only stream the final text response.’, xpReward: 20}
How Do You Handle Streaming Errors?
Streams can break mid-sentence. The LLM might time out, or the network might drop after half an answer landed on screen.
A try/except around the loop catches the two most common culprits: httpx.ReadTimeout from the model host and ConnectionError from network blips.
python
import httpx
def safe_stream(graph, inputs, max_retries=2):
"""Stream with automatic retry on failure."""
for attempt in range(max_retries + 1):
try:
collected = []
for msg, meta in graph.stream(
inputs, stream_mode="messages"
):
if msg.content and meta["langgraph_node"] == "chatbot":
print(msg.content, end="", flush=True)
collected.append(msg.content)
print()
return "".join(collected)
except (httpx.ReadTimeout, ConnectionError) as e:
if attempt < max_retries:
print(f"\n[Retry {attempt + 1}] Stream interrupted: {e}")
else:
print(f"\n[Failed] Could not complete stream after {max_retries + 1} attempts")
raise
result = safe_stream(app, inputs)
This helper reruns the full graph on each retry. In production, pair it with a checkpointer. Save state after every node. On failure, resume from the last done step instead of starting from scratch.
Warning: Never catch bare `Exception` around a streaming loop. Doing so swallows `KeyboardInterrupt` and masks real bugs. Stick to specific types: `httpx.ReadTimeout`, `httpx.ConnectError`, `openai.APIConnectionError`.
Do Tokens Flow Through Subgraphs?
Bigger apps often nest one graph inside another. Good news: tokens from the inner graph bubble up to the outer stream on their own. No extra config.
Both messages mode and astream_events flow through nested graphs. The key difference is metadata. messages uses the outer node name in metadata["langgraph_node"]. astream_events goes deeper — its tags list traces the full nesting path.
Quick proof. We build an inner graph and plug it into an outer graph as a node.
python
# Inner graph -- a simple summarizer
def summarize(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}
inner = StateGraph(MessagesState)
inner.add_node("summarizer", summarize)
inner.add_edge(START, "summarizer")
inner.add_edge("summarizer", END)
inner_app = inner.compile()
# Outer graph -- uses inner as a node
outer = StateGraph(MessagesState)
outer.add_node("inner_graph", inner_app)
outer.add_edge(START, "inner_graph")
outer.add_edge("inner_graph", END)
outer_app = outer.compile()
Now stream from the outer graph. Tokens born inside the inner graph's LLM call surface automatically:
python
for msg, meta in outer_app.stream(
inputs, stream_mode="messages"
):
if msg.content:
node = meta.get("langgraph_node", "unknown")
print(f"[{node}] {msg.content}", end="", flush=True)
python
[inner_graph] LangGraph is a framework built on top of LangChain...
The metadata reports inner_graph — the outer graph's label for that node. If you need the precise inner node name (summarizer), switch to astream_events and inspect its tags list.
How Do You Build a Reusable Streaming Chat Function?
Let's wrap all the plumbing into one async generator you can drop into any project. It yields typed event dicts. Swap the consumer — Streamlit, FastAPI, terminal — and the generator stays the same.
We use astream_events with version="v2" for full control. Each yielded dict has a type key so the consumer knows how to render it.
python
from typing import AsyncGenerator
async def stream_response(
graph, user_message: str
) -> AsyncGenerator[dict, None]:
"""Stream agent response with tool call tracking."""
inputs = {
"messages": [HumanMessage(content=user_message)]
}
async for event in graph.astream_events(
inputs, version="v2"
):
kind = event["event"]
if kind == "on_chat_model_stream":
token = event["data"]["chunk"].content
if token:
yield {"type": "token", "content": token}
chunk = event["data"]["chunk"]
if chunk.tool_call_chunks:
for tc in chunk.tool_call_chunks:
yield {
"type": "tool_call",
"name": tc.get("name", ""),
"args": tc.get("args", ""),
}
elif kind == "on_tool_end":
yield {
"type": "tool_result",
"name": event["name"],
"output": str(event["data"]["output"]),
}
A minimal consumer that dumps events to the terminal:
python
async def chat_demo():
async for event in stream_response(
tool_app, "What's the weather in London?"
):
if event["type"] == "token":
print(event["content"], end="", flush=True)
elif event["type"] == "tool_call" and event["name"]:
print(f"\n Calling: {event['name']}")
elif event["type"] == "tool_result":
print(f" Result: {event['output']}")
asyncio.run(chat_demo())
python
Calling: get_weather
Result: Sunny, 22C in London
The current weather in London is sunny with a temperature of 22°C.
Clean split: all streaming lives in stream_response(). The consumer owns display only. Swap the print loop for a FastAPI SSE route or st.write_stream() and the generator stays as-is.
.stream() vs .astream() — Which One Should You Use?
Every stream_mode works with both methods. The deciding factor is your runtime.
| Feature | .stream() | .astream() |
|---|---|---|
| Syntax | for chunk in graph.stream(...) | async for chunk in graph.astream(...) |
| Event API | Not on offer | graph.astream_events(...) |
| Best for | Scripts, notebooks, CLI tools | Web servers, FastAPI, live apps |
| Token streaming | stream_mode="messages" | stream_mode="messages" or astream_events |
For scripts and Jupyter, .stream() with stream_mode="messages" is the fastest path. No async needed.
For web servers, go with .astream(). It plugs into async frameworks like FastAPI and lets you handle many users at once without blocking the event loop.
Key Insight: Skip `astream_events` unless you need it. Plain `messages` mode gives you the ChatGPT typing effect via both `.stream()` and `.astream()`. Save the events API for lifecycle hooks, per-model filtering, or subgraph depth.
Watch Out for These Common Mistakes
Mistake 1: Leaving out flush=True when printing tokens
Wrong:
python
for msg, meta in app.stream(inputs, stream_mode="messages"):
if msg.content:
print(msg.content, end="")
Why it breaks: Python batches stdout writes. Tokens pile up behind the buffer and then dump in one burst. The word-by-word illusion vanishes.
Fix:
python
for msg, meta in app.stream(inputs, stream_mode="messages"):
if msg.content:
print(msg.content, end="", flush=True)
Mistake 2: Picking values mode for a chat UI
Wrong:
python
for chunk in app.stream(inputs, stream_mode="values"):
print(chunk["messages"][-1].content)
Why it breaks: values holds back until an entire node completes. The reply lands in one lump — no progressive reveal. To the user it feels identical to a plain .invoke() call.
Fix: Use messages mode for chat apps:
python
for msg, meta in app.stream(inputs, stream_mode="messages"):
if msg.content:
print(msg.content, end="", flush=True)
Mistake 3: Not filtering by node in a multi-node graph
Wrong:
python
for msg, meta in tool_app.stream(
tool_inputs, stream_mode="messages"
):
print(msg.content, end="", flush=True)
Why it breaks: A tool-calling agent calls the LLM more than once per turn. Without a node filter, you print tokens from the internal tool-decision pass (where content is often empty) next to the real answer. The result is a mess of blanks and leaked internals.
Fix:
python
for msg, meta in tool_app.stream(
tool_inputs, stream_mode="messages"
):
if msg.content and meta["langgraph_node"] == "agent":
print(msg.content, end="", flush=True)
Mistake 4: Sending empty content during tool calls
Wrong:
python
for msg, meta in tool_app.stream(
tool_inputs, stream_mode="messages"
):
yield f"data: {msg.content}\n\n" # sends "data: \n\n" for empty chunks
Why it breaks: While tools run, the model emits chunks with empty content. Sending them as SSE events creates blank payloads that trip up frontend parsers.
Fix:
python
for msg, meta in tool_app.stream(
tool_inputs, stream_mode="messages"
):
if msg.content:
yield f"data: {msg.content}\n\n"
Warning: Guard every send with an `if msg.content` check. Tool-call phases produce empty-content chunks. Forwarding them through SSE or WebSocket wastes bytes and risks breaking your client's parser.
Complete Code
Summary
Streaming is what turns a batch tool into a live, fluid assistant. Quick recap:
values— full state snapshot per node. Ideal for debugging complex graphs.updates— only the keys a node touched. Minimal payload, production-friendly.messages— live LLM tokens with node metadata. The default pick for chat UIs.custom— inject arbitrary data mid-node. Perfect for progress indicators.debug— exhaustive execution trace. Development eyes only.astream_events— lifecycle hooks for every model, tool, and chain call. The power tool whenmessagesfalls short.
Most of the time, stream_mode="messages" is all you need. Use astream_events only when you must filter by model name, track tool timing, or dig into nested subgraphs.
Practice Exercise:
Build an agent with two tools — calculator for math and dictionary for word lookups. Stream the answer to "What is 15 * 23? Also define 'streaming'" via messages mode. Print each tool call as [Tool: name] and drip the final answer token by token.
Frequently Asked Questions
Can I stream from a subgraph inside a parent graph?
Yes. Tokens from inner graphs bubble up on their own. In messages mode, metadata["langgraph_node"] names the outer node. In astream_events, the tags list traces the nesting path. See Streaming from Subgraphs for a full demo.
Does streaming work with LangGraph's checkpointer?
Yes. The two features are separate. Attach any checkpointer (MemorySaver, SqliteSaver, PostgresSaver) and stream as normal. Just add a thread_id to the config:
python
config = {"configurable": {"thread_id": "user-123"}}
for msg, meta in app.stream(
inputs, stream_mode="messages", config=config
):
if msg.content:
print(msg.content, end="", flush=True)
How do I push streamed output through a FastAPI route?
Wrap an async generator in StreamingResponse. The result is SSE that the browser reads in real time:
python
# Requires FastAPI app setup -- not standalone
from fastapi.responses import StreamingResponse
async def event_generator(message: str):
inputs = {"messages": [HumanMessage(content=message)]}
async for msg, meta in app.astream(
inputs, stream_mode="messages"
):
if msg.content:
yield f"data: {msg.content}\n\n"
Need two-way traffic — like letting a user cancel mid-stream? Swap SSE for FastAPI's WebSocket class. For most chat UIs, SSE is simpler and fast enough.
How is astream_events different from stream_mode="messages"?
messages is the simple path — you get (chunk, metadata) tuples and that is it. astream_events returns richer dicts with lifecycle hooks (on_chat_model_start, on_tool_end, etc.). Use it when you need per-model filtering, tool timing, or subgraph depth.
How do I stream with LangGraph Platform or Cloud?
LangGraph Platform ships with built-in streaming routes. Deploy your graph to LangGraph Cloud and the REST API streams for you. The Python client SDK's .stream() works the same as the local one. See the LangGraph Platform docs for setup.
References
- LangGraph Documentation -- Streaming Concepts. Link
- LangGraph Documentation -- How to Stream LLM Tokens. Link
- LangChain Documentation -- Streaming API Reference. Link
- LangGraph GitHub Repository -- Source Code. Link
- LangChain Documentation -- ChatOpenAI Integration. Link
- LangGraph Documentation -- Custom Streaming with get_stream_writer. Link
- Harrison Chase -- "LangGraph: Multi-Actor Applications with LLMs." LangChain Blog (2024). Link
- LangGraph Academy -- Streaming Events and Modes Module. Link
Free Course
Master Core Python — Your First Step into AI/ML
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Up Next in Learning Path
LangGraph Human-in-the-Loop: Add Approval Steps
