Menu

LangGraph Streaming — 5 Modes for Real-Time Agent Output (with Code)

Written by Selva Prabhakaran | 24 min read

So you built a LangGraph agent and it works. Great. But when a user types a question, nothing shows up for 5–10 seconds — then the whole reply lands in one block. They wonder if something broke.

Compare that to ChatGPT: words roll in as the model forms them. The user sees proof of progress instantly. Even when total latency is identical, it feels twice as fast.

That gap — blank screen vs. visible progress — is what streaming solves. LangGraph gives you five streaming modes and a granular event API. This guide walks through each one so you can pick the right mode and hook it into your app.

What Does Streaming Mean in LangGraph?

Instead of waiting for the full graph run, streaming pushes bits of output to your code as work happens. Data arrives the instant a node wraps up — or the instant a token leaves the LLM.

Two levels exist. Graph-level streaming sends state snapshots (or just the diffs) after each node. Token-level streaming sends single words from the LLM in real time — the “typing” effect people expect from chat apps.

Both share two methods: .stream() (sync) and .astream() (async). A stream_mode flag tells LangGraph what shape each emission takes.

Before You Start

  • Python: 3.9+ (3.11+ for async get_stream_writer)
  • Packages: langchain-openai 0.2+, langgraph 0.2+, python-dotenv
  • Install: pip install langchain-openai langgraph python-dotenv
  • API key: Set OPENAI_API_KEY in your .env file
  • Time: ~25 minutes
  • Background: You should know LangGraph tool calling. Comfort with nodes, edges, and MessagesState is assumed.

Below is the minimal graph we’ll use as a test bed throughout the article. A single node wrapping a single LLM call — just enough to show every mode.

python
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, MessagesState, START, END

load_dotenv()

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def chatbot(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}

graph = StateGraph(MessagesState)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot")
graph.add_edge("chatbot", END)
app = graph.compile()

We’ll feed this graph a question and stream the results five different ways.

How Do You Pick the Right Streaming Mode?

Before we walk through each mode, here’s a quick decision tree. Bookmark it — you’ll refer back often.

python
Do you need token-by-token output (ChatGPT effect)?
  YES → stream_mode="messages"
  NO  → Do you need the full state at every step?
          YES → stream_mode="values" (debugging)
          NO  → Do you need only what changed?
                  YES → stream_mode="updates" (production)
                  NO  → Do you need custom progress data?
                          YES → stream_mode="custom"
                          NO  → stream_mode="debug" (development)

And here’s a side-by-side cheat sheet:

Mode What It Sends Best For
values Full state after each node Debugging, simple UIs
updates Only the changes from each node Lean production frontends
messages LLM tokens + metadata Chat apps
custom Your own data via get_stream_writer Progress bars, status lines
debug Full execution trace Dev-time troubleshooting

Key Insight: Let the frontend decide. If it needs the complete picture every step, go with values. If it only needs what changed, go with updates. If the user should see words roll in, go with messages.

Most of you will land on messages. But we’ll start with values because it’s the simplest to reason about.

How Does values Mode Work?

Why ask for the entire state after every node? One word: debugging. When a 10-node graph misbehaves, a full state dump at each step pinpoints exactly where things went off the rails.

In values mode, each emission is the full state object — every key, every value — grabbed right after the node ends.

python
inputs = {"messages": [HumanMessage(content="What is LangGraph?")]}

for chunk in app.stream(inputs, stream_mode="values"):
    if "messages" in chunk:
        last_msg = chunk["messages"][-1]
        print(f"[{last_msg.type}] {last_msg.content[:80]}...")
python
[human] What is LangGraph?...
[ai] LangGraph is a framework built on top of LangChain for creating stateful,...

Two chunks come through. The first holds just your input (the state before chatbot runs). The second holds both messages (the state after it ends).

Quick Check: A graph with three nodes — planner, researcher, writer. How many chunks does values mode send? Answer: four. One for the start state, plus one after each node.

Tip: values is great for debugging but costly at scale. If your state holds 50 keys and your graph has 10 nodes, that’s 500 key-value pairs sent over the wire. Save it for dev.

How Does updates Mode Work?

What if the full state is overkill and you just want the diff? That’s updates mode. Each chunk is a dict keyed by node name, holding only that node’s return value. No extras.

python
for chunk in app.stream(inputs, stream_mode="updates"):
    for node_name, node_output in chunk.items():
        print(f"Node '{node_name}' produced:")
        if "messages" in node_output:
            print(f"  {node_output['messages'][-1].content[:80]}...")
python
Node 'chatbot' produced:
  LangGraph is a framework built on top of LangChain for creating stateful,...

Only one emission this time. START doesn’t produce updates, so the mode skips it entirely.

Why does this matter at scale? Picture 10 nodes with 50 keys in state. values mode ships 500 key-value pairs total. updates mode sends only the 2–3 keys each node actually changed. That’s an order-of-magnitude cut in data over the wire.

How Does messages Mode Give You the ChatGPT Effect?

This is the one most people reach for. messages mode sends LLM tokens one at a time as the model creates them. The user watches text appear word by word.

Each chunk is a tuple: (message_chunk, metadata). The chunk holds a small piece of text — often a single token. The metadata says which node made it.

python
for msg, metadata in app.stream(inputs, stream_mode="messages"):
    if msg.content and metadata["langgraph_node"] == "chatbot":
        print(msg.content, end="", flush=True)
python
LangGraph is a framework built on top of LangChain for creating stateful, multi-step AI workflows...

Tokens land one at a time. The flush=True flag is critical — without it, Python holds output in a buffer and you lose the live-typing feel entirely.

Warning: messages mode only captures output from chat models. If a node does pure computation with no LLM call, it won’t produce message chunks. Pair messages with updates when you also need to track non-LLM nodes.

Need token streaming and node-completion signals in a single loop? Hand a list of modes to stream_mode. Each emission wraps a (mode_name, data) tuple, so you can branch on the mode inside one loop.

python
for event in app.stream(inputs, stream_mode=["messages", "updates"]):
    mode = event[0]
    data = event[1]
    if mode == "messages":
        msg, meta = data
        if msg.content:
            print(msg.content, end="", flush=True)
    elif mode == "updates":
        print(f"\n--- Node completed: {list(data.keys())} ---")

With a list of modes, every emission pairs the mode label with the payload. This pattern is ideal for UIs that display streaming text in a main panel and progress steps in a sidebar.

How Does custom Mode Work?

python
from langgraph.config import get_stream_writer

def research_node(state: MessagesState):
    writer = get_stream_writer()
    writer({"status": "Starting research..."})
    writer({"status": "Searching documents...", "progress": 0.5})
    response = llm.invoke(state["messages"])
    writer({"status": "Complete", "progress": 1.0})
    return {"messages": [response]}

The key line is get_stream_writer(). It returns a callable that lets your node push any data into the stream. Status lines, progress numbers, partial results — things the user needs to see but your state shouldn’t store.

I use custom mode a lot in real pipelines. When a graph runs for 30+ seconds, people need proof that work is going on. A progress bar fed by get_stream_writer() keeps them from mashing refresh.

Wire this node into a graph and stream it with both custom and updates. Custom events arrive while the node runs. The update event arrives once it finishes.

python
research_graph = StateGraph(MessagesState)
research_graph.add_node("research", research_node)
research_graph.add_edge(START, "research")
research_graph.add_edge("research", END)
research_app = research_graph.compile()

for event in research_app.stream(
    inputs, stream_mode=["custom", "updates"]
):
    mode, data = event
    if mode == "custom":
        print(f"  Status: {data}")
    elif mode == "updates":
        print(f"  Node done: {list(data.keys())}")
python
Status: {'status': 'Starting research...'}
  Status: {'status': 'Searching documents...', 'progress': 0.5}
  Status: {'status': 'Complete', 'progress': 1.0}
  Node done: ['research']

Think about it: What if you passed stream_mode="custom" without "updates"? You’d see only the three status dicts. The “node done” event would vanish because that belongs to the updates channel.

Note: get_stream_writer() requires Python 3.11+ when used in async nodes. On older versions, add a writer parameter to your node function and LangGraph will inject it automatically.

How Does debug Mode Work?

Had a conditional edge route to the wrong node and couldn’t figure out why? debug mode was made for that moment.

It spits out rich events for every internal step — node starts, node ends, full state snapshots, and error details. You’d never show this to a user, but in development it gives you X-ray vision into every graph decision.

python
for event in app.stream(inputs, stream_mode="debug"):
    event_type = event["type"]
    if event_type == "task":
        print(f"Task: node='{event['payload']['name']}'")
    elif event_type == "task_result":
        name = event['payload']['name']
        keys = list(event['payload']['result'].keys())
        print(f"Result: node='{name}' -> {keys}")
python
Task: node='chatbot'
Result: node='chatbot' -> ['messages']

The output lays bare every routing decision — which branch fired, which node ran next. I flip this mode on whenever conditional edges do something I don’t expect.

How Do You Stream Individual Tokens with astream_events?

The messages mode handles most chat use cases. But sometimes you need deeper control — filtering by model name, tracking tool-call lifecycle events, or tapping into nested subgraphs.

astream_events() is the heavy-duty option. It fires a rich event stream that covers every phase of a graph run. LLM calls, tool runs, node hops — all emit events you can filter and act on.

Here’s the core recipe. Loop through events, look for on_chat_model_stream, and pull out each token. Setting version="v2" pins the stable event schema.

python
import asyncio

async def stream_tokens():
    async for event in app.astream_events(
        inputs, version="v2"
    ):
        if event["event"] == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            if chunk.content:
                print(chunk.content, end="", flush=True)
    print()

asyncio.run(stream_tokens())
python
LangGraph is a framework built on top of LangChain for creating stateful, multi-step AI workflows...

Why deal with the extra wiring? You get lifecycle events — on_chat_model_start, on_chat_model_stream, on_chat_model_end — that form a triplet for every model call. The same pattern applies to chains, tools, and custom runnables.

python
async def show_event_details():
    async for event in app.astream_events(
        inputs, version="v2"
    ):
        kind = event["event"]
        if kind == "on_chat_model_start":
            print(f"Model started: {event['name']}")
        elif kind == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            if token:
                print(token, end="", flush=True)
        elif kind == "on_chat_model_end":
            print(f"\nModel finished: {event['name']}")

asyncio.run(show_event_details())
python
Model started: ChatOpenAI
LangGraph is a framework built on top of LangChain...
Model finished: ChatOpenAI

How Do You Filter Stream Events by Node?

In a real app, your graph likely has several nodes calling different models. You want tokens from the final answer node only — not from every LLM call in the pipeline.

The metadata dict makes this simple. Every event carries a langgraph_node field that names the graph node that created it.

python
async def stream_filtered():
    async for event in app.astream_events(
        inputs, version="v2"
    ):
        if event["event"] != "on_chat_model_stream":
            continue
        node = event.get("metadata", {}).get(
            "langgraph_node", ""
        )
        if node == "chatbot":
            token = event["data"]["chunk"].content
            if token:
                print(token, end="", flush=True)
    print()

asyncio.run(stream_filtered())

If your graph has a planner and a responder, this technique streams only the responder’s output. The planner runs silently in the background.

Tip: Name your models for cleaner filtering. Pass ChatOpenAI(model="gpt-4o-mini", name="response_model") when you create the LLM. Then filter with event["name"] == "response_model" — much simpler than checking node metadata, especially when multiple nodes use the same model class.

How Do You Stream When the Agent Calls Tools?

Here’s where things get interesting. When your agent calls tools, the stream mixes two kinds of data: regular text tokens and structured tool-call objects. Your code has to handle both.

Let’s build an agent with a weather tool and stream the complete interaction. The agent picks a tool, LangGraph executes it, and the agent crafts a streamed reply from the result.

python
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode, tools_condition

@tool
def get_weather(city: str) -> str:
    """Get the current weather for a city."""
    return f"Sunny, 22C in {city}"

tools = [get_weather]
llm_with_tools = ChatOpenAI(
    model="gpt-4o-mini", temperature=0
).bind_tools(tools)

def agent(state: MessagesState):
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

Now connect the agent and tool node with conditional routing. The tools_condition helper inspects the agent’s response for tool calls. If it finds any, it routes to tools. Otherwise, it routes to END.

python
tool_graph = StateGraph(MessagesState)
tool_graph.add_node("agent", agent)
tool_graph.add_node("tools", ToolNode(tools))
tool_graph.add_edge(START, "agent")
tool_graph.add_conditional_edges("agent", tools_condition)
tool_graph.add_edge("tools", "agent")
tool_app = tool_graph.compile()

With messages mode, tool calls show up as AIMessageChunk objects whose tool_calls field is populated. During that phase, content is usually blank — the model emits structured JSON rather than readable text.

python
tool_inputs = {
    "messages": [HumanMessage(content="What's the weather in Paris?")]
}

for msg, metadata in tool_app.stream(
    tool_inputs, stream_mode="messages"
):
    if hasattr(msg, "tool_calls") and msg.tool_calls:
        for tc in msg.tool_calls:
            print(f"[Tool Call] {tc['name']}({tc['args']})")
    elif msg.content:
        node = metadata.get("langgraph_node", "")
        print(f"[{node}] {msg.content}", end="", flush=True)
python
[Tool Call] get_weather({'city': 'Paris'})
[agent] The current weather in Paris is sunny with a temperature of 22°C.

The agent visits the agent node twice. On the first visit, it decides to call get_weather and emits a tool-call message. LangGraph routes to tools, executes the function, then bounces back to agent. On the second visit, the agent reads the tool output and streams a natural-language answer token by token.

Try to guess: What if the user asks “What’s 2 + 2?” with this agent? There’s no calculator — only get_weather. So the agent skips the tool call entirely, answers from its own knowledge, and you see only [agent] tokens in the stream.

How Do You Handle Stream Errors?

What if the connection drops mid-reply? Or the LLM times out after streaming half an answer? You need a recovery plan.

The fix is a try/except around your streaming loop. The two most common culprits are httpx.ReadTimeout from the model provider and ConnectionError from network blips.

python
import httpx

def safe_stream(graph, inputs, max_retries=2):
    """Stream with automatic retry on failure."""
    for attempt in range(max_retries + 1):
        try:
            collected = []
            for msg, meta in graph.stream(
                inputs, stream_mode="messages"
            ):
                if msg.content and meta["langgraph_node"] == "chatbot":
                    print(msg.content, end="", flush=True)
                    collected.append(msg.content)
            print()
            return "".join(collected)
        except (httpx.ReadTimeout, ConnectionError) as e:
            if attempt

This retries the entire graph run on failure. In a real system, combine it with LangGraph’s checkpointer — save state after each node so you can resume from the last good step instead of replaying everything.

Warning: Never catch a bare Exception in streaming loops. You’ll swallow KeyboardInterrupt and mask real bugs. Stick to specific types: httpx.ReadTimeout, httpx.ConnectError, openai.APIConnectionError.

How Does Streaming Work Inside Subgraphs?

If you nest one graph inside another, do tokens from the inner graph reach the outer stream? Yes — automatically, with zero extra config.

Both messages mode and astream_events propagate through subgraph boundaries. The metadata tells you where each token came from. In messages mode, metadata["langgraph_node"] names the outer node. In astream_events, the tags field traces the full subgraph path.

Here’s a quick proof. We’ll create an inner graph and embed it as a node in an outer graph.

python
# Inner graph -- a simple summarizer
def summarize(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}

inner = StateGraph(MessagesState)
inner.add_node("summarizer", summarize)
inner.add_edge(START, "summarizer")
inner.add_edge("summarizer", END)
inner_app = inner.compile()

# Outer graph -- uses inner as a node
outer = StateGraph(MessagesState)
outer.add_node("inner_graph", inner_app)
outer.add_edge(START, "inner_graph")
outer.add_edge("inner_graph", END)
outer_app = outer.compile()

Now stream from the outer graph. Tokens from the inner LLM call pass straight through:

python
for msg, meta in outer_app.stream(
    inputs, stream_mode="messages"
):
    if msg.content:
        node = meta.get("langgraph_node", "unknown")
        print(f"[{node}] {msg.content}", end="", flush=True)
python
[inner_graph] LangGraph is a framework built on top of LangChain...

Metadata reports inner_graph — that’s what the outer graph calls the node. If you need the actual inner node name (e.g., summarizer), switch to astream_events and inspect the tags field.

How Do You Build a Reusable Streaming Chat Interface?

Time to combine everything into a single reusable async function. It wraps the streaming plumbing into a clean interface that yields typed event dicts. Swap the consumer for Streamlit, FastAPI, or a plain CLI — the function itself stays identical.

We lean on astream_events with version="v2" for maximum control. Each yielded dict carries a type field so the frontend knows exactly how to render it.

python
from typing import AsyncGenerator

async def stream_response(
    graph, user_message: str
) -> AsyncGenerator[dict, None]:
    """Stream agent response with tool call tracking."""
    inputs = {
        "messages": [HumanMessage(content=user_message)]
    }
    async for event in graph.astream_events(
        inputs, version="v2"
    ):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            if token:
                yield {"type": "token", "content": token}
            chunk = event["data"]["chunk"]
            if chunk.tool_call_chunks:
                for tc in chunk.tool_call_chunks:
                    yield {
                        "type": "tool_call",
                        "name": tc.get("name", ""),
                        "args": tc.get("args", ""),
                    }
        elif kind == "on_tool_end":
            yield {
                "type": "tool_result",
                "name": event["name"],
                "output": str(event["data"]["output"]),
            }

Here’s a consumer that prints the events to the terminal:

python
async def chat_demo():
    async for event in stream_response(
        tool_app, "What's the weather in London?"
    ):
        if event["type"] == "token":
            print(event["content"], end="", flush=True)
        elif event["type"] == "tool_call" and event["name"]:
            print(f"\n  Calling: {event['name']}")
        elif event["type"] == "tool_result":
            print(f"  Result: {event['output']}")

asyncio.run(chat_demo())
python
Calling: get_weather
  Result: Sunny, 22C in London
The current weather in London is sunny with a temperature of 22°C.

The separation is clean: streaming plumbing lives in stream_response(), display logic lives in the consumer. Replace the consumer with a FastAPI SSE endpoint or a Streamlit st.write_stream() call and the streaming function stays untouched.

.stream() vs .astream() — Which One Should You Use?

Both methods accept the same stream_mode values. The only real difference is your runtime environment.

Feature .stream() .astream()
Syntax for chunk in graph.stream(...) async for chunk in graph.astream(...)
Event API Not available graph.astream_events(...)
Best for Scripts, notebooks, CLI tools Web servers, FastAPI, production
Token streaming stream_mode="messages" stream_mode="messages" or astream_events

For quick scripts and Jupyter notebooks, .stream() with stream_mode="messages" is the fastest path. No async ceremony needed.

For web servers, .astream() is the better call. FastAPI and similar frameworks are async-native. Using .astream() lets you serve many concurrent users without blocking the event loop.

Key Insight: You don’t need astream_events just to get the typing effect. The messages mode delivers token-by-token output with both .stream() and .astream(). Reserve astream_events for cases that demand lifecycle hooks, per-model filtering, or subgraph awareness.

What Are the Most Common Streaming Mistakes?

Mistake 1 — Forgetting flush=True when printing tokens

Wrong:

python
for msg, meta in app.stream(inputs, stream_mode="messages"):
    if msg.content:
        print(msg.content, end="")

Why it breaks: Python buffers stdout. Without flush=True, tokens pile up in the buffer and drop as big clumps. The smooth typing effect vanishes.

Fix:

python
for msg, meta in app.stream(inputs, stream_mode="messages"):
    if msg.content:
        print(msg.content, end="", flush=True)

Mistake 2 — Using values mode for chat UIs

Wrong:

python
for chunk in app.stream(inputs, stream_mode="values"):
    print(chunk["messages"][-1].content)

Why it breaks: values mode waits for each node to fully complete before emitting. The user gets the whole answer in a single dump — no word-by-word animation at all. From their perspective, it’s identical to .invoke().

Fix: Use messages mode for anything chat-like:

python
for msg, meta in app.stream(inputs, stream_mode="messages"):
    if msg.content:
        print(msg.content, end="", flush=True)

Mistake 3 — Not filtering by node in multi-node graphs

Wrong:

python
for msg, meta in tool_app.stream(
    tool_inputs, stream_mode="messages"
):
    print(msg.content, end="", flush=True)

Why it breaks: A tool-calling agent invokes the LLM more than once. This code prints tokens from every invocation — including the tool-call step where msg.content is empty. You end up dumping blank strings and internal reasoning alongside the real answer.

Fix:

python
for msg, meta in tool_app.stream(
    tool_inputs, stream_mode="messages"
):
    if msg.content and meta["langgraph_node"] == "agent":
        print(msg.content, end="", flush=True)

Mistake 4 — Sending empty chunks through SSE

Wrong:

python
for msg, meta in tool_app.stream(
    tool_inputs, stream_mode="messages"
):
    yield f"data: {msg.content}\n\n"  # sends "data: \n\n" for blank chunks

Why it breaks: As the model assembles a tool call, it emits AIMessageChunk objects where content is an empty string. Sending those through SSE wastes bandwidth and can confuse frontend parsers that expect actual text.

Fix:

python
for msg, meta in tool_app.stream(
    tool_inputs, stream_mode="messages"
):
    if msg.content:
        yield f"data: {msg.content}\n\n"

Warning: Always gate on msg.content before emitting. Tool-call phases produce chunks with empty strings. Forwarding them over SSE or WebSocket connections wastes bandwidth and risks breaking client-side parsing logic.

Complete Code

Click to expand the full script (copy-paste and run)

python
# Complete code from: LangGraph Streaming -- Real-Time Output
# Requires: pip install langchain-openai langgraph python-dotenv
# Python 3.9+
# Set OPENAI_API_KEY in your .env file

import os
import asyncio
from typing import AsyncGenerator
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.config import get_stream_writer

load_dotenv()

# --- Base chatbot graph ---
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def chatbot(state: MessagesState):
    return {"messages": [llm.invoke(state["messages"])]}

graph = StateGraph(MessagesState)
graph.add_node("chatbot", chatbot)
graph.add_edge(START, "chatbot")
graph.add_edge("chatbot", END)
app = graph.compile()

inputs = {"messages": [HumanMessage(content="What is LangGraph?")]}

# --- 1. Values mode ---
print("=== VALUES MODE ===")
for chunk in app.stream(inputs, stream_mode="values"):
    if "messages" in chunk:
        last_msg = chunk["messages"][-1]
        print(f"[{last_msg.type}] {last_msg.content[:80]}...")

# --- 2. Updates mode ---
print("\n=== UPDATES MODE ===")
for chunk in app.stream(inputs, stream_mode="updates"):
    for node_name, node_output in chunk.items():
        print(f"Node '{node_name}' produced:")
        if "messages" in node_output:
            print(f"  {node_output['messages'][-1].content[:80]}...")

# --- 3. Messages mode ---
print("\n=== MESSAGES MODE ===")
for msg, metadata in app.stream(inputs, stream_mode="messages"):
    if msg.content and metadata["langgraph_node"] == "chatbot":
        print(msg.content, end="", flush=True)
print()

# --- 4. Tool-calling agent ---
print("\n=== TOOL CALLING + STREAMING ===")

@tool
def get_weather(city: str) -> str:
    """Get the current weather for a city."""
    return f"Sunny, 22C in {city}"

tools = [get_weather]
llm_with_tools = ChatOpenAI(
    model="gpt-4o-mini", temperature=0
).bind_tools(tools)

def agent(state: MessagesState):
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

tool_graph = StateGraph(MessagesState)
tool_graph.add_node("agent", agent)
tool_graph.add_node("tools", ToolNode(tools))
tool_graph.add_edge(START, "agent")
tool_graph.add_conditional_edges("agent", tools_condition)
tool_graph.add_edge("tools", "agent")
tool_app = tool_graph.compile()

tool_inputs = {
    "messages": [HumanMessage(content="What's the weather in Paris?")]
}

for msg, metadata in tool_app.stream(
    tool_inputs, stream_mode="messages"
):
    if hasattr(msg, "tool_calls") and msg.tool_calls:
        for tc in msg.tool_calls:
            print(f"[Tool Call] {tc['name']}({tc['args']})")
    elif msg.content:
        node = metadata.get("langgraph_node", "")
        print(f"[{node}] {msg.content}", end="", flush=True)
print()

# --- 5. Async token streaming ---
print("\n=== ASTREAM_EVENTS ===")

async def stream_tokens():
    async for event in app.astream_events(inputs, version="v2"):
        if event["event"] == "on_chat_model_stream":
            chunk = event["data"]["chunk"]
            if chunk.content:
                print(chunk.content, end="", flush=True)
    print()

asyncio.run(stream_tokens())

print("\nScript completed successfully.")

Summary

Streaming transforms a sluggish batch agent into an app that feels alive. Here’s the cheat sheet:

  • values sends the full state after each node. Use it for debugging.

  • updates sends only what changed. Use it for lean production frontends.

  • messages streams LLM tokens with metadata. Use it for chat apps.

  • custom lets nodes push any data they want. Use it for progress bars.

  • debug shows the full trace. Keep it in dev only.

  • astream_events gives the finest control. Reach for it when messages isn’t enough.

For most chat apps, begin with stream_mode="messages". Reach for astream_events only when you need per-model filtering, lifecycle hooks, or deep subgraph access.

Practice Exercise:

Build an agent with two tools — a calculator that evaluates math and a dictionary that looks up word meanings. Stream the reply to “What is 15 * 23? Also define ‘streaming'” using messages mode. Show tool calls as [Tool: name] and stream the final answer token by token.

Solution

python
@tool
def calculator(expression: str) -> str:
    """Evaluate a math expression."""
    return str(eval(expression))

@tool
def dictionary(word: str) -> str:
    """Look up the definition of a word."""
    definitions = {
        "streaming": "Transmitting data continuously",
        "latency": "Time delay in a system",
    }
    return definitions.get(
        word.lower(), f"No definition for '{word}'"
    )

practice_tools = [calculator, dictionary]
practice_llm = ChatOpenAI(
    model="gpt-4o-mini", temperature=0
).bind_tools(practice_tools)

def practice_agent(state: MessagesState):
    return {"messages": [practice_llm.invoke(state["messages"])]}

pg = StateGraph(MessagesState)
pg.add_node("agent", practice_agent)
pg.add_node("tools", ToolNode(practice_tools))
pg.add_edge(START, "agent")
pg.add_conditional_edges("agent", tools_condition)
pg.add_edge("tools", "agent")
practice_app = pg.compile()

practice_inputs = {"messages": [HumanMessage(
    content="What is 15 * 23? Also define 'streaming'."
)]}

for msg, meta in practice_app.stream(
    practice_inputs, stream_mode="messages"
):
    if hasattr(msg, "tool_calls") and msg.tool_calls:
        for tc in msg.tool_calls:
            print(f"\n[Tool: {tc['name']}]", end="")
    elif msg.content and meta["langgraph_node"] == "agent":
        print(msg.content, end="", flush=True)
print()

Frequently Asked Questions

Can I stream from a subgraph inside a parent graph?

Yes. Both messages mode and astream_events pass through subgraphs on their own. With messages mode, metadata["langgraph_node"] names the outer node that produced each token. With astream_events, check the tags field for the full subgraph path. See the subgraph section above for a working demo.

Does streaming work with LangGraph’s checkpointer?

Streaming and persistence are fully separate. You can use .stream() with any checkpointer — MemorySaver, SqliteSaver, or PostgresSaver. Just pass a config dict with a thread_id:

python
config = {"configurable": {"thread_id": "user-123"}}
for msg, meta in app.stream(
    inputs, stream_mode="messages", config=config
):
    if msg.content:
        print(msg.content, end="", flush=True)

How do I send streamed output through a FastAPI endpoint?

Use FastAPI’s StreamingResponse with an async generator. This sends Server-Sent Events (SSE) to the browser:

python
# Requires FastAPI app setup -- not standalone
from fastapi.responses import StreamingResponse

async def event_generator(message: str):
    inputs = {"messages": [HumanMessage(content=message)]}
    async for msg, meta in app.astream(
        inputs, stream_mode="messages"
    ):
        if msg.content:
            yield f"data: {msg.content}\n\n"

For WebSocket streaming, FastAPI’s WebSocket class works the same way. SSE is simpler and covers most chat UIs. Use WebSockets when you need two-way communication — like letting users cancel a stream mid-reply.

What’s the difference between astream_events and stream_mode=”messages”?

messages mode is simpler. You get (message_chunk, metadata) tuples — great for basic chat UIs. astream_events gives richer event dicts with lifecycle hooks (on_chat_model_start, on_chat_model_stream, on_chat_model_end, on_tool_start, on_tool_end). Pick astream_events when you need to filter by model name, track tool timing, or handle subgraph events.

How do I stream with LangGraph Platform or Cloud?

LangGraph Platform has built-in streaming endpoints. When you deploy your graph to LangGraph Cloud, streaming works through the REST API. The client SDK’s .stream() method behaves the same as the local version. Check the LangGraph Platform docs for deployment-specific setup.

References

  • LangGraph Documentation — Streaming Concepts. Link

  • LangGraph Documentation — How to Stream LLM Tokens. Link

  • LangChain Documentation — Streaming API Reference. Link

  • LangGraph GitHub Repository — Source Code. Link

  • LangChain Documentation — ChatOpenAI Integration. Link

  • LangGraph Documentation — Custom Streaming with get_stream_writer. Link

  • Harrison Chase — “LangGraph: Multi-Actor Applications with LLMs.” LangChain Blog (2024). Link

  • LangGraph Academy — Streaming Events and Modes Module. Link

Free Course
Master Core Python — Your First Step into AI/ML

Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.

Start Free Course
Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Get the full course,
completely free.
Join 57,000+ students learning Python, SQL & ML. One year of access, all resources included.
📚 10 Courses
🐍 Python & ML
🗄️ SQL
📦 Downloads
📅 1 Year Access
No thanks
🎓
Free AI/ML Starter Kit
Python · SQL · ML · 10 Courses · 57,000+ students
🎉   You're in! Check your inbox (or Promotions/Spam) for the access link.
⚡ Before you go

Python.
SQL. NumPy.
All free.

Get the exact 10-course programming foundation that Data Science professionals use.

🐍
Core Python — from first line to expert level
📈
NumPy & Pandas — the #1 libraries every DS job needs
🗃️
SQL Levels I–III — basics to Window Functions
📄
Real industry data — Jupyter notebooks included
R A M S K
57,000+ students
★★★★★ Rated 4.9/5
⚡ Before you go
Python. SQL.
All Free.
R A M S K
57,000+ students  ★★★★★ 4.9/5
Get Free Access Now
10 courses. Real projects. Zero cost. No credit card.
New learners enrolling right now
🔒 100% free ☕ No spam, ever ✓ Instant access
🚀
You're in!
Check your inbox for your access link.
(Check Promotions or Spam if you don't see it)
Or start your first course right now:
Start Free Course →
Scroll to Top
Scroll to Top
Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science