Map-Reduce and Parallel Execution in LangGraph
You’ve got 50 documents that each need an LLM summary. Running them one-by-one takes 10 minutes. Running them in parallel? Twelve seconds. That’s the difference between a demo and a product.
LangGraph’s Send API makes this kind of parallel execution straightforward. This article shows you exactly how it works, with runnable code you can adapt to your own pipelines.
Here’s the mental model before we touch any code. Your graph starts at a single node. That node examines the current state — maybe a list of documents, topics, or search queries — and decides how many parallel branches to create.
It doesn’t know the count at compile time. It discovers it at runtime. Each branch runs the same node function with a different slice of the state. When all branches finish, their results merge back through a reducer function.
That’s map-reduce in LangGraph: fan out dynamically, execute in parallel, fan back in automatically. The “map” step distributes work. The “reduce” step collects results. The Send object bridges them.
What Is the LangGraph Map-Reduce Pattern?
Map-reduce splits a big job into smaller parallel jobs, then combines the results. The name comes from functional programming: map applies a function to every item, reduce aggregates the outputs.
Why does LangGraph need a special API for this? Because you often don’t know how many items you’ll process until the graph is already running. An earlier node might generate five topics or twelve. Static edges can’t handle that.
Static edges (compile time): Dynamic Send (runtime):
Node A ──→ Node B Node A ──→ Send() ──→ Node B (x3)
Node A ──→ Node C ──→ Node B (x3)
──→ Node B (x3)
(fixed at build time) (count decided at runtime)
The Send API lets a conditional edge function return a list of Send objects at runtime. Each one tells LangGraph: “run this node with this state.” LangGraph executes them all concurrently within the same superstep, then moves on once every branch completes.
KEY INSIGHT:
Sendcreates edges at runtime, not at compile time. This is what separates it from regular conditional edges, which choose between pre-defined paths.Sendcreates new paths — as many as you need — on the fly.
Prerequisites
- Python version: 3.10+
- Required libraries: langgraph (0.4+), langchain-openai (0.3+), langchain-core (0.3+)
- Install:
pip install langgraph langchain-openai langchain-core - API key: An OpenAI API key set as
OPENAI_API_KEY. See OpenAI’s docs to create one. - Time to complete: ~30 minutes
- Prior knowledge: LangGraph basics (nodes, edges, state, conditional edges) from earlier posts in this series.
The first code block imports everything we’ll use. We pull in Send from langgraph.types, the graph-building utilities, and operator.add — the reducer that makes parallel aggregation work.
import operator
from typing import Annotated, TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
Why Parallel Execution Matters in LangGraph
Think about a document summarization pipeline. You’ve got 20 research papers. Each summary takes 3 seconds from the LLM.
Sequentially, that’s 60 seconds of wall-clock time. In parallel, all 20 summaries run simultaneously — total time drops to roughly 3 seconds plus overhead. That’s a 20x speedup with zero changes to the summarization logic.
The gains go beyond speed:
- Throughput: Process more items in the same time window.
- User experience: Your app feels responsive, not sluggish.
- Cost efficiency: API rate limits often allow concurrent calls. You’re using your quota instead of leaving it idle.
But you can’t just throw asyncio.gather() at this and walk away. Each parallel branch writes results that must merge cleanly into the parent state. Race conditions and lost results are real risks without proper state management.
LangGraph handles that through its reducer system — which we’ll dig into next.
How the Send API Works — The Core Mechanic
You use Send inside a conditional edge function. Instead of returning a node name (like regular conditional edges), you return a list of Send objects. Each Send takes two arguments: the target node name and the state to pass to that node.
# Send signature: Send(node_name: str, state: dict)
# Each Send fires one parallel branch:
Send("process_item", {"item": "document_1", "content": "..."})
Send("process_item", {"item": "document_2", "content": "..."})
When LangGraph encounters a list of Send objects, it creates a superstep — a group of node executions that all run concurrently. Every Send in the list fires at the same time. The graph waits until all complete before moving forward.
What makes this flexible: the state you send to each branch can differ from the main graph’s state. You can send a subset, a transformed version, or entirely new keys. The branch node receives whatever you put in the Send object.
TIP: The state you pass to
Send()doesn’t need to match the main graph’s state schema. Define a separateTypedDictfor branch nodes. This keeps each branch’s logic clean and focused on its one job.
Building Your First LangGraph Map-Reduce Graph
Enough theory — let’s build something you can run.
We’ll create a graph that takes a topic, generates three related subtopics, creates a joke for each subtopic in parallel, and picks the best one. Why jokes? The LLM calls are independent (perfect for parallelism), and you can see the fan-out/fan-in pattern clearly.
We need two state schemas. OverallState tracks the full pipeline. JokeState is what each parallel branch receives — just one subject.
class JokeState(TypedDict):
subject: str
class OverallState(TypedDict):
topic: str
subjects: list[str]
jokes: Annotated[list[str], operator.add]
best_joke: str
See the Annotated[list[str], operator.add] on jokes? That’s the reducer. When multiple branches each return {"jokes": ["some joke"]}, the reducer concatenates all those single-item lists into one combined list. Without this annotation, the last branch to finish would silently overwrite all the others.
Quick check — predict the output: Three parallel branches return {"jokes": ["joke_A"]}, {"jokes": ["joke_B"]}, and {"jokes": ["joke_C"]}. What does state["jokes"] look like after all three complete? Answer: ["joke_A", "joke_B", "joke_C"]. The operator.add reducer concatenated them.
Implementing the Map Phase — Fanning Out
The map phase has two parts: a node that generates the list of subjects, and a routing function that returns Send objects — one per subject.
The generate_subjects node calls the LLM, asks for subtopics, and splits the comma-separated response into a list.
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
def generate_subjects(state: OverallState) -> dict:
"""Generate subtopics related to the main topic."""
prompt = (
f"Generate exactly 3 short subtopics related to '{state['topic']}'. "
f"Return them as a comma-separated list, nothing else."
)
response = model.invoke([HumanMessage(content=prompt)])
subjects = [s.strip() for s in response.content.split(",")]
return {"subjects": subjects}
Here’s the routing function that creates the fan-out. It reads state["subjects"] and returns one Send per subject. Each Send targets the "generate_joke" node with a JokeState containing that single subject.
def map_to_jokes(state: OverallState) -> list[Send]:
"""Create one parallel branch per subject."""
return [
Send("generate_joke", {"subject": subject})
for subject in state["subjects"]
]
If state["subjects"] has three items, LangGraph launches three parallel instances of generate_joke. If it has ten, you get ten. The count is entirely dynamic.
KEY INSIGHT: The routing function that returns
Sendobjects IS the map step. It’s where you decide what to parallelize, how to slice the data, and what state each branch receives.
Implementing the Reduce Phase — Aggregating Results
Each parallel branch runs the generate_joke node. It receives a JokeState (not the full OverallState) and returns a joke wrapped in a list.
def generate_joke(state: JokeState) -> dict:
"""Generate a joke about the given subject."""
prompt = f"Write a short, funny one-liner joke about {state['subject']}."
response = model.invoke([HumanMessage(content=prompt)])
return {"jokes": [response.content]}
Two things to notice here. The return wraps the joke in a list: [response.content]. This is required because operator.add concatenates lists. Returning a bare string would concatenate characters instead of items — a nasty subtle bug.
Also, the function accepts JokeState, not OverallState. Each branch only sees its assigned subject. It has no access to other branches’ work.
After all branches complete, the reduce node reads the aggregated jokes list and picks a winner.
def pick_best_joke(state: OverallState) -> dict:
"""Select the best joke from all generated jokes."""
jokes_text = "\n".join(
f"{i+1}. {joke}" for i, joke in enumerate(state["jokes"])
)
prompt = (
f"Here are some jokes:\n{jokes_text}\n\n"
f"Pick the funniest one. Return ONLY the joke text, nothing else."
)
response = model.invoke([HumanMessage(content=prompt)])
return {"best_joke": response.content}
This node sees the fully aggregated jokes list — all jokes from all parallel branches, merged by the reducer.
Wiring the Graph Together
Here’s how all the pieces connect. The add_conditional_edges call with the routing function is the line that creates the parallel fan-out. The add_edge from generate_joke to pick_best_joke creates the fan-in — LangGraph waits for ALL branches to complete before running pick_best_joke.
builder = StateGraph(OverallState)
builder.add_node("generate_subjects", generate_subjects)
builder.add_node("generate_joke", generate_joke)
builder.add_node("pick_best_joke", pick_best_joke)
builder.add_edge(START, "generate_subjects")
builder.add_conditional_edges("generate_subjects", map_to_jokes)
builder.add_edge("generate_joke", "pick_best_joke")
builder.add_edge("pick_best_joke", END)
graph = builder.compile()
Run it and see the results:
result = graph.invoke({"topic": "animals"})
print("Topic:", result["topic"])
print("Subjects:", result["subjects"])
print("\nAll jokes:")
for i, joke in enumerate(result["jokes"], 1):
print(f" {i}. {joke}")
print(f"\nBest joke: {result['best_joke']}")
You’ll see three subjects generated from “animals,” three jokes (one per subject, all generated in parallel), and the LLM’s pick for best joke. Your exact subjects and jokes will differ on each run.
Send() with Dynamic Destinations
So far, every Send has targeted the same node. But what if different items need different processing?
Imagine texts of varying lengths. Short ones need a quick summary. Long ones need chunking first, then summarization. Your routing function can inspect each item and send it to the right node. Both nodes still run in the same superstep — LangGraph doesn’t care that they’re different.
def route_by_length(state: OverallState) -> list[Send]:
"""Route items to different nodes based on their length."""
sends = []
for item in state["items"]:
if len(item) < 500:
sends.append(Send("quick_summary", {"text": item}))
else:
sends.append(Send("chunk_and_summarize", {"text": item}))
return sends
This pattern is especially handy for research agents. Some queries work best with a web search, others need a database lookup, and a few should go straight to a knowledge base. The routing function inspects each query and sends it to the right tool node.
WARNING: Every node targeted by
Sendin the same routing function must write to state fields with reducers. If two parallel branches both write to a plainstrfield, one overwrites the other. Always useAnnotated[list, operator.add]for fields that receive parallel writes.
Real-World Example — Parallel Document Summarization
Let's build something you'd actually deploy. This graph takes a list of documents, summarizes each one in parallel, then generates a combined executive summary.
Two state schemas again. DocState holds a single document for each branch. SummaryState holds the full collection and the aggregated results.
class DocState(TypedDict):
doc_id: str
content: str
class SummaryState(TypedDict):
documents: list[dict]
summaries: Annotated[list[str], operator.add]
executive_summary: str
The summarize_doc node processes one document. It receives a DocState, calls the LLM, and returns the summary wrapped in a list. The doc ID prefix tracks which result came from which branch.
def summarize_doc(state: DocState) -> dict:
"""Summarize a single document."""
prompt = (
f"Summarize this document in 2-3 sentences:\n\n"
f"Document {state['doc_id']}:\n{state['content']}"
)
response = model.invoke([HumanMessage(content=prompt)])
return {"summaries": [f"[{state['doc_id']}] {response.content}"]}
The fan-out function creates one Send per document, pulling the ID and content from the main state.
def fan_out_docs(state: SummaryState) -> list[Send]:
"""Create one summarization branch per document."""
return [
Send("summarize_doc", {
"doc_id": doc["id"],
"content": doc["content"],
})
for doc in state["documents"]
]
The executive summary node reads all individual summaries (aggregated by the reducer) and produces a combined overview.
def write_executive_summary(state: SummaryState) -> dict:
"""Combine all summaries into an executive summary."""
all_summaries = "\n\n".join(state["summaries"])
prompt = (
f"Based on these document summaries, write a brief "
f"executive summary (3-4 sentences):\n\n{all_summaries}"
)
response = model.invoke([HumanMessage(content=prompt)])
return {"executive_summary": response.content}
Wire it up and run. The pattern matches the joke example — only the node logic changes.
summary_builder = StateGraph(SummaryState)
summary_builder.add_node("summarize_doc", summarize_doc)
summary_builder.add_node("write_executive_summary", write_executive_summary)
summary_builder.add_conditional_edges(START, fan_out_docs)
summary_builder.add_edge("summarize_doc", "write_executive_summary")
summary_builder.add_edge("write_executive_summary", END)
summary_graph = summary_builder.compile()
docs = [
{"id": "doc-1", "content": "LangGraph is a framework for building stateful AI agents using graph-based orchestration."},
{"id": "doc-2", "content": "The Send API enables parallel execution by creating runtime-determined branches in a graph."},
{"id": "doc-3", "content": "Reducers in LangGraph safely merge state updates from concurrent node executions."},
]
result = summary_graph.invoke({"documents": docs})
print("Individual summaries:")
for s in result["summaries"]:
print(f" {s}")
print(f"\nExecutive summary:\n{result['executive_summary']}")
All three documents get summarized simultaneously. The executive summary runs only after every individual summary completes. You could pass 50 documents and the structure stays the same — only the Send count changes.
Combining Map-Reduce with Conditional Edges
Map-reduce doesn't exist in isolation. You can combine it with regular conditional edges for more sophisticated workflows.
Here's a useful pattern: after the reduce step, check the quality of results. If they pass, continue. If not, loop back and regenerate.
def quality_check(state: OverallState) -> str:
"""Route based on result quality."""
if len(state["jokes"]) >= 3:
return "accept"
return "retry"
Wire it as a conditional edge after the reduce node:
builder.add_conditional_edges(
"pick_best_joke",
quality_check,
{"accept": END, "retry": "generate_subjects"},
)
This creates a loop: generate subjects, fan out jokes, pick the best, check quality, retry if needed. The Send API handles the parallel portion. Regular conditional edges handle the control flow. They compose naturally because Send is just another type of edge routing.
TIP: Always set a recursion limit when combining map-reduce with loops. Without one, a failing quality check loops forever. Pass
{"recursion_limit": 10}in the config:graph.invoke(input, config={"recursion_limit": 10}).
When NOT to Use Map-Reduce
Not every batch job needs Send. Here are situations where it's overkill or wrong:
- Fixed branch count known at compile time. If you always process exactly 3 items, use regular parallel edges instead.
Sendshines when the count is dynamic. - Branches that need each other's results. Each
Sendbranch runs in isolation. If branch 2 needs output from branch 1, you need sequential processing — not map-reduce. - CPU-bound processing. LangGraph's parallelism uses
asyncio, which helps with I/O-bound work (API calls, database queries). CPU-heavy tasks won't benefit unless you offload them to a process pool.
NOTE: For fewer than 3 items, the overhead of
Sendrouting may exceed the parallel gains. Use sequential processing for trivially small batches and reserve map-reduce for dynamic or large item counts.
Performance Considerations
Parallel execution has real limits. Here's what matters most in practice.
API rate limits are the biggest constraint. If you fan out 200 LLM calls and your rate limit is 60 RPM, you'll hit 429 errors. Batch your Send calls into groups, or add retry logic with exponential backoff inside your node functions.
Memory usage grows linearly with branch count. Each parallel branch maintains its own state copy during execution. Keep the Send payload minimal — send IDs and let each branch fetch its own data.
Superstep atomicity means all-or-nothing execution. If one branch in a superstep fails, ALL branches' state updates get discarded. With checkpointing enabled, LangGraph retries only the failures. Without checkpointing, everything re-runs.
Common Mistakes and How to Fix Them
Mistake 1: Missing reducer on parallel-write fields
This is the single most common map-reduce bug. You define a state field as list[str] instead of Annotated[list[str], operator.add]. The code runs without errors — but only the last branch's result survives.
# WRONG: No reducer — last write wins
class BadState(TypedDict):
results: list[str]
# CORRECT: Concatenates results from all branches
class GoodState(TypedDict):
results: Annotated[list[str], operator.add]
Mistake 2: Returning a plain value instead of a list
Each branch must return a list for operator.add to work. Returning a bare string fails silently — operator.add on strings concatenates characters, not items.
# WRONG: String, not list!
def bad_node(state):
return {"results": "some result"}
# CORRECT: Single-item list
def good_node(state):
return {"results": ["some result"]}
Mistake 3: Assuming branch execution order
Parallel branches complete in whatever order the I/O finishes. Don't assume state["results"][0] matches the first Send you created. Include an identifier in each branch's output if order matters.
# WRONG: Assumes first result = first Send
first_result = state["results"][0]
# CORRECT: Label each result with its source
def labeled_node(state: DocState) -> dict:
result = process(state["content"])
return {"results": [f"{state['doc_id']}|{result}"]}
Mistake 4: Bloated Send payloads
# WRONG: Ships entire document collection to every branch
Send("process", {"all_docs": all_documents, "index": i})
# CORRECT: Send only what this branch needs
Send("process", {"content": all_documents[i]["text"], "doc_id": i})
Each Send payload gets copied. Large payloads times many branches equals significant memory pressure.
Quick check: You define results: list[str] (no reducer) and fan out to 5 branches. Each returns {"results": ["item"]}. How many items end up in state["results"]? Answer: just 1. Only the last branch's result is stored. You need Annotated[list[str], operator.add] to get all 5.
Practice Exercise
Build a map-reduce graph that takes a list of city names, looks up a mock weather forecast for each city in parallel, then produces a travel recommendation based on all forecasts.
Click to see the solution
import operator
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
class CityState(TypedDict):
city: str
class TravelState(TypedDict):
cities: list[str]
forecasts: Annotated[list[str], operator.add]
recommendation: str
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
MOCK_WEATHER = {
"tokyo": "Sunny, 75F",
"london": "Rainy, 55F",
"paris": "Cloudy, 62F",
"sydney": "Clear, 82F",
}
def get_forecast(state: CityState) -> dict:
city = state["city"]
weather = MOCK_WEATHER.get(city.lower(), "Unknown")
return {"forecasts": [f"{city}: {weather}"]}
def fan_out_cities(state: TravelState) -> list[Send]:
return [Send("get_forecast", {"city": c}) for c in state["cities"]]
def recommend(state: TravelState) -> dict:
forecasts = "\n".join(state["forecasts"])
prompt = (
f"Based on these weather forecasts:\n{forecasts}\n\n"
f"Which city is best for a vacation? Explain in 2 sentences."
)
response = model.invoke([HumanMessage(content=prompt)])
return {"recommendation": response.content}
builder = StateGraph(TravelState)
builder.add_node("get_forecast", get_forecast)
builder.add_node("recommend", recommend)
builder.add_conditional_edges(START, fan_out_cities)
builder.add_edge("get_forecast", "recommend")
builder.add_edge("recommend", END)
travel_graph = builder.compile()
result = travel_graph.invoke({
"cities": ["Tokyo", "London", "Paris", "Sydney"]
})
print("Forecasts:")
for f in result["forecasts"]:
print(f" {f}")
print(f"\nRecommendation: {result['recommendation']}")
Complete Code
Click to expand the full script (copy-paste and run)
# Complete code from: Map-Reduce and Parallel Execution in LangGraph
# Requires: pip install langgraph langchain-openai langchain-core
# Python 3.10+
import operator
from typing import Annotated, TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
# --- Model setup ---
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
# --- State schemas ---
class JokeState(TypedDict):
subject: str
class OverallState(TypedDict):
topic: str
subjects: list[str]
jokes: Annotated[list[str], operator.add]
best_joke: str
# --- Node functions ---
def generate_subjects(state: OverallState) -> dict:
"""Generate subtopics related to the main topic."""
prompt = (
f"Generate exactly 3 short subtopics related to '{state['topic']}'. "
f"Return them as a comma-separated list, nothing else."
)
response = model.invoke([HumanMessage(content=prompt)])
subjects = [s.strip() for s in response.content.split(",")]
return {"subjects": subjects}
def generate_joke(state: JokeState) -> dict:
"""Generate a joke about the given subject."""
prompt = f"Write a short, funny one-liner joke about {state['subject']}."
response = model.invoke([HumanMessage(content=prompt)])
return {"jokes": [response.content]}
def pick_best_joke(state: OverallState) -> dict:
"""Select the best joke from all generated jokes."""
jokes_text = "\n".join(
f"{i+1}. {joke}" for i, joke in enumerate(state["jokes"])
)
prompt = (
f"Here are some jokes:\n{jokes_text}\n\n"
f"Pick the funniest one. Return ONLY the joke text, nothing else."
)
response = model.invoke([HumanMessage(content=prompt)])
return {"best_joke": response.content}
# --- Routing (map step) ---
def map_to_jokes(state: OverallState) -> list[Send]:
"""Create one parallel branch per subject."""
return [
Send("generate_joke", {"subject": subject})
for subject in state["subjects"]
]
# --- Build and run the graph ---
builder = StateGraph(OverallState)
builder.add_node("generate_subjects", generate_subjects)
builder.add_node("generate_joke", generate_joke)
builder.add_node("pick_best_joke", pick_best_joke)
builder.add_edge(START, "generate_subjects")
builder.add_conditional_edges("generate_subjects", map_to_jokes)
builder.add_edge("generate_joke", "pick_best_joke")
builder.add_edge("pick_best_joke", END)
graph = builder.compile()
result = graph.invoke({"topic": "animals"})
print("Topic:", result["topic"])
print("Subjects:", result["subjects"])
print("\nAll jokes:")
for i, joke in enumerate(result["jokes"], 1):
print(f" {i}. {joke}")
print(f"\nBest joke: {result['best_joke']}")
# --- Document Summarization Example ---
class DocState(TypedDict):
doc_id: str
content: str
class SummaryState(TypedDict):
documents: list[dict]
summaries: Annotated[list[str], operator.add]
executive_summary: str
def summarize_doc(state: DocState) -> dict:
"""Summarize a single document."""
prompt = (
f"Summarize this document in 2-3 sentences:\n\n"
f"Document {state['doc_id']}:\n{state['content']}"
)
response = model.invoke([HumanMessage(content=prompt)])
return {"summaries": [f"[{state['doc_id']}] {response.content}"]}
def fan_out_docs(state: SummaryState) -> list[Send]:
"""Create one summarization branch per document."""
return [
Send("summarize_doc", {
"doc_id": doc["id"],
"content": doc["content"],
})
for doc in state["documents"]
]
def write_executive_summary(state: SummaryState) -> dict:
"""Combine all summaries into an executive summary."""
all_summaries = "\n\n".join(state["summaries"])
prompt = (
f"Based on these document summaries, write a brief "
f"executive summary (3-4 sentences):\n\n{all_summaries}"
)
response = model.invoke([HumanMessage(content=prompt)])
return {"executive_summary": response.content}
summary_builder = StateGraph(SummaryState)
summary_builder.add_node("summarize_doc", summarize_doc)
summary_builder.add_node("write_executive_summary", write_executive_summary)
summary_builder.add_conditional_edges(START, fan_out_docs)
summary_builder.add_edge("summarize_doc", "write_executive_summary")
summary_builder.add_edge("write_executive_summary", END)
summary_graph = summary_builder.compile()
docs = [
{"id": "doc-1", "content": "LangGraph is a framework for building stateful AI agents using graph-based orchestration."},
{"id": "doc-2", "content": "The Send API enables parallel execution by creating runtime-determined branches in a graph."},
{"id": "doc-3", "content": "Reducers in LangGraph safely merge state updates from concurrent node executions."},
]
result = summary_graph.invoke({"documents": docs})
print("\n--- Document Summarization ---")
print("Individual summaries:")
for s in result["summaries"]:
print(f" {s}")
print(f"\nExecutive summary:\n{result['executive_summary']}")
print("\nScript completed successfully.")
Frequently Asked Questions
Does Send() actually use threads for parallelism?
LangGraph uses Python's asyncio under the hood, not threads or processes. The invoke() method wraps execution in an async event loop. Parallel branches run as concurrent tasks. For I/O-bound work like LLM API calls, this provides near-perfect parallelism. For CPU-bound processing, offload to a process pool inside your node function.
How do I limit the number of concurrent branches?
LangGraph doesn't have a built-in concurrency cap for Send. All Send objects in one routing call execute in the same superstep. To throttle, batch your items in the routing function. Return Send objects for the first batch, have the reduce step check if more items remain, then loop back for the next batch.
What happens if one parallel branch fails?
The entire superstep fails atomically. None of the successful branches' state updates get applied. With checkpointing enabled, LangGraph retries only the failed branch. Without checkpointing, all branches re-run from scratch.
Can I use Send() with subgraphs?
Yes. You can Send to a node that is itself a compiled subgraph. Each branch gets its own subgraph instance. This works well when parallel processing involves multiple internal steps — like chunking, summarizing chunks, then merging results.
Can I nest map-reduce — fan out within a fan-out?
You can, using subgraphs. The inner fan-out lives in a subgraph, and each outer Send targets that subgraph. This creates tree-shaped execution. Keep nesting shallow — two levels deep is usually the practical limit before state management gets hard to reason about.
How do I handle errors in individual parallel branches?
Wrap the node function's core logic in a try/except block. On failure, return a sentinel value like {"results": ["ERROR: doc-5 failed"]} instead of raising. This lets the superstep complete and your reduce node can filter or retry failed items. For automatic retries, enable checkpointing with a MemorySaver or database-backed checkpointer.
Summary
Map-reduce in LangGraph lets you process dynamic collections in parallel. The Send API is the mechanism: return Send objects from a routing function, each targeting a node with its own state slice.
The key pieces:
Send(node_name, state)creates a runtime edge with specific state.Annotated[list, operator.add]is the reducer that safely merges parallel writes.- Routing functions return lists of
Sendobjects to control fan-out. - Supersteps group parallel executions — all must finish before the graph continues.
The pattern works for document summarization, batch classification, parallel tool calls, and multi-source research — any workload with a dynamic item count. The graph structure stays the same. Only the node logic changes.
Next up, we'll explore dynamic breakpoints and time travel — techniques for debugging and inspecting your LangGraph agents mid-execution.
References
- LangGraph documentation — How to create map-reduce branches for parallel execution. Link
- LangGraph documentation — Send API reference. Link
- LangGraph documentation — Branching (fan-out / fan-in). Link
- LangChain documentation — How to summarize text through parallelization. Link
- LangGraph Academy — Map-Reduce Pattern module. Link
- Dean, J. & Ghemawat, S. — "MapReduce: Simplified Data Processing on Large Clusters." OSDI 2004. Link
- LangGraph documentation — Concepts: Supersteps and execution model. Link
- LangGraph GitHub repository — Source code and examples. Link
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →