LangGraph Subgraphs — How to Build Reusable, Modular Workflows
Your LangGraph project started simple — three nodes, two edges, done. But six months on, it’s a 20-node beast. Tweaking the summary logic breaks the classifier path. Sound familiar? The answer isn’t “just be careful.” The answer is subgraphs — standalone graphs you plug into a parent graph, the same way you’d split a 500-line function into smaller helpers.
What Is a Subgraph in LangGraph?
A subgraph is a compiled StateGraph that you slot in as a single node inside another graph. The parent sees it as one node. Under the hood, the subgraph runs its own multi-step flow with its own nodes, edges, and state rules.
Before You Start
-
Python: 3.10+
-
Packages: langgraph (0.4+), langchain-core (0.3+)
-
Install:
pip install langgraph langchain-core -
Background: LangGraph basics — nodes, edges, state, and routing (Posts 5–9 in this series)
-
Time: 25–30 minutes
Think of it like calling a function from inside another function. The parent doesn’t care what goes on inside. It passes state in and gets back the updated result.
Here’s the simplest subgraph you can build — two nodes that check input and then format it. We write two node functions (validate and format_text), wire them into a StateGraph, and compile.
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class SharedState(TypedDict):
raw_text: str
cleaned_text: str
is_valid: bool
# --- Build the subgraph ---
def validate(state: SharedState) -> dict:
text = state["raw_text"].strip()
return {"is_valid": len(text) > 0, "cleaned_text": text}
def format_text(state: SharedState) -> dict:
return {"cleaned_text": state["cleaned_text"].upper()}
sub_builder = StateGraph(SharedState)
sub_builder.add_node("validate", validate)
sub_builder.add_node("format_text", format_text)
sub_builder.add_edge(START, "validate")
sub_builder.add_edge("validate", "format_text")
sub_builder.add_edge("format_text", END)
sub_graph = sub_builder.compile()
Nothing fancy so far. We created a StateGraph, dropped in two nodes, chained them, and compiled. What we get back — sub_graph — is a runnable object. Try calling it directly:
result = sub_graph.invoke({
"raw_text": " hello world ",
"cleaned_text": "",
"is_valid": False
})
print(result)
{'raw_text': ' hello world ', 'cleaned_text': 'HELLO WORLD', 'is_valid': True}
It checked the input, stripped extra spaces, and turned the text to uppercase. On its own it’s just a regular graph. The cool part is nesting it inside a bigger one.
How Do You Add a Subgraph as a Node?
Why embed it instead of calling it by hand? Because the parent graph takes care of run order, error recovery, and checkpointing for you. You get clean, modular code without giving up any of those perks.
Here’s the core trick. Call add_node() on the parent builder and pass the compiled subgraph straight in — no wrapper needed. The parent below has two of its own nodes (intake and store) with the subgraph dropped in between them.
def intake(state: SharedState) -> dict:
return {"raw_text": state["raw_text"]}
def store(state: SharedState) -> dict:
print(f"Storing: {state['cleaned_text']}")
return state
parent_builder = StateGraph(SharedState)
parent_builder.add_node("intake", intake)
parent_builder.add_node("processor", sub_graph) # compiled subgraph as a node
parent_builder.add_node("store", store)
parent_builder.add_edge(START, "intake")
parent_builder.add_edge("intake", "processor")
parent_builder.add_edge("processor", "store")
parent_builder.add_edge("store", END)
parent_app = parent_builder.compile()
result = parent_app.invoke({
"raw_text": " data science rocks ",
"cleaned_text": "",
"is_valid": False
})
print(result["cleaned_text"])
Storing: DATA SCIENCE ROCKS
DATA SCIENCE ROCKS
One line — add_node("processor", sub_graph) — slots the whole subgraph into the parent. The parent handles it like any other node. Data flows from intake through the subgraph’s inner nodes, then out to store.
Key Insight: Once compiled, a subgraph looks like any other node to the parent. Attach it with add_node(), link it with add_edge(), and the parent has no idea there are multiple steps running inside.
How Does Shared State Work Between Parent and Child?
The demo above works so smoothly because parent and child both use SharedState. Every key — raw_text, cleaned_text, is_valid — is the same on both sides.
When schemas line up like this, LangGraph passes the full state into the subgraph for you. Any changes the subgraph makes flow right back to the parent. No manual mapping at all.
# Both graphs use SharedState — keys flow freely
print(f"raw_text: {result['raw_text']}")
print(f"cleaned_text: {result['cleaned_text']}")
print(f"is_valid: {result['is_valid']}")
This prints:
raw_text: data science rocks
cleaned_text: DATA SCIENCE ROCKS
is_valid: True
Keys the subgraph wrote to (cleaned_text, is_valid) appear in the parent’s output. Keys it never touched (raw_text) stay as they were.
Tip: Default to shared state. Even when your subgraph only reads a few of the parent’s keys, sharing works fine — extra keys are simply ignored. Switch to isolated state only when you have fields the parent should never see.
In practice, I start every project with shared schemas and introduce isolation only when a concrete need comes up.
How Do You Handle Different Schemas?
Sometimes your subgraph needs private fields — maybe an internal word_count that the parent has no use for. You can’t plug that subgraph in directly because the schemas clash. The fix: wrap the subgraph call in a plain node function that translates fields back and forth.
Below is a subgraph with its own SummaryState. Notice the keys — input_text, summary, word_count — none of them exist in the parent schema.
# Subgraph with a DIFFERENT state schema
class SummaryState(TypedDict):
input_text: str
summary: str
word_count: int
def extract_key_points(state: SummaryState) -> dict:
words = state["input_text"].split()
short = " ".join(words[:5]) + "..."
return {"summary": short, "word_count": len(words)}
summary_builder = StateGraph(SummaryState)
summary_builder.add_node("extract", extract_key_points)
summary_builder.add_edge(START, "extract")
summary_builder.add_edge("extract", END)
summary_graph = summary_builder.compile()
The parent schema has raw_text, cleaned_text, and final_summary — no shared keys at all. So we create a bridge: a wrapper called run_summarizer that reshapes the parent state, calls the subgraph, and reshapes the output.
class ParentState(TypedDict):
raw_text: str
cleaned_text: str
final_summary: str
def run_summarizer(state: ParentState) -> dict:
# Map parent state -> subgraph state
sub_input = {
"input_text": state["cleaned_text"],
"summary": "",
"word_count": 0
}
# Invoke subgraph
sub_result = summary_graph.invoke(sub_input)
# Map subgraph result -> parent state
return {"final_summary": sub_result["summary"]}
parent2 = StateGraph(ParentState)
parent2.add_node("summarizer", run_summarizer)
parent2.add_edge(START, "summarizer")
parent2.add_edge("summarizer", END)
app2 = parent2.compile()
result2 = app2.invoke({
"raw_text": "the quick brown fox jumps over the lazy dog today",
"cleaned_text": "the quick brown fox jumps over the lazy dog today",
"final_summary": ""
})
print(result2["final_summary"])
the quick brown fox jumps...
Three steps happen inside the wrapper: it maps cleaned_text → input_text, fires the subgraph, then maps summary → final_summary. The parent never touches word_count — that stays hidden in the subgraph.
Key Insight: Mismatched schemas need a translator, and the wrapper function is it. Going in, it reshapes parent state to fit the subgraph. Coming out, it reshapes the result to fit the parent. Private keys stay locked inside.
Can You Nest Subgraphs Inside Other Subgraphs?
Yes — and real projects hit this pattern fast. Picture a parent that calls a child, and that child has its own grandchild buried inside. Every layer compiles on its own, so you can still test each piece in isolation.
Below we build all three layers. The deepest graph (grandchild) handles tokenizing. The middle graph (child) scrubs out junk characters and then asks the grandchild to tokenize. The top graph (parent) kicks off the whole chain.
# Level 3: Grandchild — tokenizes text
class TokenState(TypedDict):
text: str
tokens: list[str]
def tokenize(state: TokenState) -> dict:
return {"tokens": state["text"].lower().split()}
grandchild = StateGraph(TokenState)
grandchild.add_node("tokenize", tokenize)
grandchild.add_edge(START, "tokenize")
grandchild.add_edge("tokenize", END)
grandchild_graph = grandchild.compile()
Next up, the child. It runs re.sub() to scrub away anything that isn’t a letter or space, then feeds the clean result into the grandchild. The two graphs have different schemas (CleanState vs TokenState), so a small wrapper bridges the gap.
# Level 2: Child — cleans text, then delegates to grandchild
import re
class CleanState(TypedDict):
raw: str
cleaned: str
token_list: list[str]
def clean(state: CleanState) -> dict:
cleaned = re.sub(r'[^a-zA-Z\s]', '', state["raw"])
return {"cleaned": cleaned.strip()}
def run_tokenizer(state: CleanState) -> dict:
sub_input = {"text": state["cleaned"], "tokens": []}
sub_result = grandchild_graph.invoke(sub_input)
return {"token_list": sub_result["tokens"]}
child = StateGraph(CleanState)
child.add_node("clean", clean)
child.add_node("tokenize", run_tokenizer)
child.add_edge(START, "clean")
child.add_edge("clean", "tokenize")
child.add_edge("tokenize", END)
child_graph = child.compile()
Finally, the parent sits at the top. It only knows about the child — it has no idea a grandchild even exists. Once again, the wrapper reshapes state on the way in and on the way out.
# Level 1: Parent — sends raw input to child pipeline
class PipelineState(TypedDict):
user_input: str
processed_tokens: list[str]
def run_child(state: PipelineState) -> dict:
sub_input = {"raw": state["user_input"], "cleaned": "", "token_list": []}
sub_result = child_graph.invoke(sub_input)
return {"processed_tokens": sub_result["token_list"]}
parent3 = StateGraph(PipelineState)
parent3.add_node("process", run_child)
parent3.add_edge(START, "process")
parent3.add_edge("process", END)
app3 = parent3.compile()
result3 = app3.invoke({"user_input": "Hello, World! 123", "processed_tokens": []})
print(result3["processed_tokens"])
['hello', 'world']
The input "Hello, World! 123" went through all three layers. The child scrubbed punctuation and digits, the grandchild split the clean string into lowercase tokens, and the parent collected the final list.
Warning: Stop at 3 layers. Every extra level makes bugs harder to trace. If you catch yourself adding a fourth, merge two neighboring graphs into one. Excessive nesting signals a design problem, not good architecture.
How Do You Test Subgraphs on Their Own?
If subgraphs had a marketing tagline, this would be it. Because every subgraph compiles on its own, you can feed it test data and inspect the output without spinning up the parent at all. Zero setup overhead.
# Test grandchild in isolation
test_result = grandchild_graph.invoke({
"text": "LangGraph makes AI workflows modular",
"tokens": []
})
print(test_result["tokens"])
# Test child in isolation
test_child = child_graph.invoke({
"raw": "Hello!! World??",
"cleaned": "",
"token_list": []
})
print(test_child["token_list"])
Run these and you’ll see the grandchild lowercases everything, while the child scrubs away punctuation before handing off:
['langgraph', 'makes', 'ai', 'workflows', 'modular']
['hello', 'world']
For quick smoke tests, plain assert statements work great. For a real project, plug them into pytest. Either way, the interface is just dict-in, dict-out.
# Simple assertion-based tests
assert grandchild_graph.invoke({"text": "A B C", "tokens": []})["tokens"] == ["a", "b", "c"]
assert child_graph.invoke({"raw": "!!!", "cleaned": "", "token_list": []})["token_list"] == []
print("All tests passed!")
All tests passed!
Tip: Test each subgraph before you plug it into the parent. If the parent later gives bad output, every subgraph with green tests is off the suspect list. That leaves only the wrapper functions — the glue between stages.
Exercise 1: Build and Test a Scoring Subgraph
Build a subgraph that takes a text field and returns a score — the word count divided by 10, capped at 1.0. Test it with at least two inputs.
# Starter code — fill in the TODOs
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class ScorerState(TypedDict):
text: str
score: float
def score_text(state: ScorerState) -> dict:
# TODO: count words in state["text"]
# TODO: return score = min(word_count / 10, 1.0)
pass
scorer_builder = StateGraph(ScorerState)
# TODO: add node, edges, compile
# scorer_graph = ...
# Test 1: 5-word input should score 0.5
# Test 2: 15-word input should score 1.0 (capped)
Hints
– Hint 1: Use len(state["text"].split()) to count words.
– Hint 2: The full score function is return {"score": min(len(state["text"].split()) / 10, 1.0)}. The graph needs add_node("score", score_text), edges from START to “score” and “score” to END, then .compile().
Solution
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class ScorerState(TypedDict):
text: str
score: float
def score_text(state: ScorerState) -> dict:
word_count = len(state["text"].split())
return {"score": min(word_count / 10, 1.0)}
scorer_builder = StateGraph(ScorerState)
scorer_builder.add_node("score", score_text)
scorer_builder.add_edge(START, "score")
scorer_builder.add_edge("score", END)
scorer_graph = scorer_builder.compile()
# Test 1
r1 = scorer_graph.invoke({"text": "one two three four five", "score": 0.0})
assert r1["score"] == 0.5, f"Expected 0.5, got {r1['score']}"
# Test 2
r2 = scorer_graph.invoke({"text": "a b c d e f g h i j k l m n o", "score": 0.0})
assert r2["score"] == 1.0, f"Expected 1.0, got {r2['score']}"
print(f"Test 1 score: {r1['score']}")
print(f"Test 2 score: {r2['score']}")
print("All tests passed!")
Test 1 score: 0.5
Test 2 score: 1.0
All tests passed!
The function counts words with split(), divides by 10, and caps at 1.0 via min(). Five words give 0.5; fifteen words hit the cap.
Real-World Example — How Do You Build a Multi-Stage Document Pipeline?
Time for something closer to a real job. We’ll wire up a three-stage doc processor: one subgraph classifies, one extracts fields, and one runs a quality gate. Each stage lives in its own graph.
Stage one — the classifier — scans the doc for keywords and picks a label. Right now it’s basic string matching. Later you could drop in an LLM call, and the rest of the pipeline stays untouched. That swap-ability is why subgraphs exist.
# Stage 1: Classification subgraph
class ClassifyState(TypedDict):
doc_text: str
category: str
confidence: float
def classify_doc(state: ClassifyState) -> dict:
text = state["doc_text"].lower()
if "invoice" in text or "payment" in text:
return {"category": "financial", "confidence": 0.9}
elif "contract" in text or "agreement" in text:
return {"category": "legal", "confidence": 0.85}
else:
return {"category": "general", "confidence": 0.5}
classify_builder = StateGraph(ClassifyState)
classify_builder.add_node("classify", classify_doc)
classify_builder.add_edge(START, "classify")
classify_builder.add_edge("classify", END)
classify_graph = classify_builder.compile()
Stage two — the extractor — grabs useful fields from the document. It uses the category that the classifier just set, which the parent feeds through as part of the shared state.
# Stage 2: Extraction subgraph
class ExtractState(TypedDict):
doc_text: str
category: str
extracted_data: dict
def extract_fields(state: ExtractState) -> dict:
text = state["doc_text"]
data = {"source_length": len(text)}
if state["category"] == "financial":
data["doc_type"] = "invoice"
data["has_amount"] = "$" in text or "amount" in text.lower()
elif state["category"] == "legal":
data["doc_type"] = "contract"
data["has_dates"] = any(w.isdigit() for w in text.split())
else:
data["doc_type"] = "general"
return {"extracted_data": data}
extract_builder = StateGraph(ExtractState)
extract_builder.add_node("extract", extract_fields)
extract_builder.add_edge(START, "extract")
extract_builder.add_edge("extract", END)
extract_graph = extract_builder.compile()
Note: In a real pipeline, the extract subgraph would likely call an LLM with a structured output schema or a document parser. The keyword approach here keeps the focus on the composition pattern.
Stage three — the quality gate — counts how many fields the extractor found and decides if the document passes or fails.
# Stage 3: Quality check subgraph
class QualityState(TypedDict):
extracted_data: dict
quality_score: float
passed_qa: bool
def check_quality(state: QualityState) -> dict:
data = state["extracted_data"]
field_count = len(data)
score = min(field_count / 3, 1.0)
return {"quality_score": round(score, 2), "passed_qa": score >= 0.6}
qa_builder = StateGraph(QualityState)
qa_builder.add_node("check", check_quality)
qa_builder.add_edge(START, "check")
qa_builder.add_edge("check", END)
qa_graph = qa_builder.compile()
Now the parent wires all three stages into one pipeline. Each wrapper reshapes data for its stage and funnels the result back. Follow the flow: document goes to the classifier, doc_category goes to the extractor, doc_data goes to the quality gate.
class DocPipelineState(TypedDict):
document: str
doc_category: str
doc_confidence: float
doc_data: dict
qa_score: float
qa_passed: bool
def run_classifier(state: DocPipelineState) -> dict:
result = classify_graph.invoke({
"doc_text": state["document"],
"category": "",
"confidence": 0.0
})
return {
"doc_category": result["category"],
"doc_confidence": result["confidence"]
}
def run_extractor(state: DocPipelineState) -> dict:
result = extract_graph.invoke({
"doc_text": state["document"],
"category": state["doc_category"],
"extracted_data": {}
})
return {"doc_data": result["extracted_data"]}
def run_qa(state: DocPipelineState) -> dict:
result = qa_graph.invoke({
"extracted_data": state["doc_data"],
"quality_score": 0.0,
"passed_qa": False
})
return {"qa_score": result["quality_score"], "qa_passed": result["passed_qa"]}
Hook the wrappers into the parent graph and chain them: classify first, then extract, then qa_check.
pipeline = StateGraph(DocPipelineState)
pipeline.add_node("classify", run_classifier)
pipeline.add_node("extract", run_extractor)
pipeline.add_node("qa_check", run_qa)
pipeline.add_edge(START, "classify")
pipeline.add_edge("classify", "extract")
pipeline.add_edge("extract", "qa_check")
pipeline.add_edge("qa_check", END)
doc_app = pipeline.compile()
Feed an invoice string into the pipeline and check what comes out:
doc_result = doc_app.invoke({
"document": "Invoice #1234: Payment of $5000 due by 2026-03-15",
"doc_category": "",
"doc_confidence": 0.0,
"doc_data": {},
"qa_score": 0.0,
"qa_passed": False
})
print(f"Category: {doc_result['doc_category']}")
print(f"Confidence: {doc_result['doc_confidence']}")
print(f"Extracted: {doc_result['doc_data']}")
print(f"QA Score: {doc_result['qa_score']}")
print(f"QA Passed: {doc_result['qa_passed']}")
Category: financial
Confidence: 0.9
Extracted: {'source_length': 50, 'doc_type': 'invoice', 'has_amount': True}
QA Score: 1.0
QA Passed: True
The classifier spotted “invoice” and “payment,” so it returned “financial” at 0.9 confidence. The extractor pulled three fields from the text. Three fields meet the quality bar, so QA gave it a perfect 1.0.
Here’s the payoff: each stage is its own box. Swap the classifier for a GPT-powered one? The extractor and QA code never change. Add an archiving step? Drop in one more subgraph node and leave the rest alone.
Exercise 2: Add a State-Mapping Wrapper
You already have scorer_graph from Exercise 1. Write a parent graph with a ReviewState schema (keys: document and review_score) that uses a wrapper to call scorer_graph. The wrapper should map document to text and score back to review_score.
# Starter code — fill in the wrapper function
class ReviewState(TypedDict):
document: str
review_score: float
def run_scorer(state: ReviewState) -> dict:
# TODO: build sub_input dict mapping document -> text
# TODO: invoke scorer_graph
# TODO: return dict mapping score -> review_score
pass
# TODO: build parent graph, compile, and test
Hints
– Hint 1: The subgraph expects {"text": …, "score": 0.0}. Map state["document"] to the text key.
– Hint 2: After the call, grab result["score"] and return it as {"review_score": result["score"]}.
Solution
class ReviewState(TypedDict):
document: str
review_score: float
def run_scorer(state: ReviewState) -> dict:
result = scorer_graph.invoke({
"text": state["document"],
"score": 0.0
})
return {"review_score": result["score"]}
review_parent = StateGraph(ReviewState)
review_parent.add_node("scorer", run_scorer)
review_parent.add_edge(START, "scorer")
review_parent.add_edge("scorer", END)
review_app = review_parent.compile()
review_result = review_app.invoke({
"document": "one two three four five six seven eight nine ten eleven",
"review_score": 0.0
})
print(f"Review score: {review_result['review_score']}")
Review score: 1.0
The wrapper maps document to text, calls the scorer, and maps score back to review_score. Eleven words divided by 10 gives 1.1, capped at 1.0.
How Do Checkpointing and Streaming Work with Subgraphs?
Once you move to production, you’ll care about two things: saving state between runs (checkpointing) and watching progress in real time (streaming). Both behave differently based on how you attach the subgraph.
Checkpointing comes free with direct mounts. If you call add_node("name", compiled_subgraph), the subgraph picks up the parent’s checkpointer and gets its own namespace — no collisions. But if you call the subgraph inside a wrapper function, it’s on its own. You’d need to pass a checkpointer in yourself.
from langgraph.checkpoint.memory import MemorySaver
# Direct mount: subgraph shares parent's checkpointer
checkpointer = MemorySaver()
parent_app = parent_builder.compile(checkpointer=checkpointer)
Streaming hides subgraph details by default — you only see parent-level events. To peek inside, flip the subgraphs=True flag on the .stream() call.
# Stream with subgraph visibility
for event in parent_app.stream(
{"raw_text": " test input ", "cleaned_text": "", "is_valid": False},
subgraphs=True
):
print(event)
TIP: Keep subgraphs=True on while you build and debug. Once you ship, turn it off so your logs stay lean.
When Should You Use Subgraphs vs. a Flat Graph?
Subgraphs aren’t always the right call. Use this quick checklist.
Go modular when:
-
The graph has grown past 8 nodes and you can spot clear groups (classify, extract, validate)
-
You reuse the same logic across pipelines — say, one summarizer in three different apps
-
Multiple people work on different stages and need to own their piece
-
You need fast, focused unit tests for each stage
Keep it flat when:
-
The whole graph fits in 5–6 nodes
-
Every node reads and writes the same keys — splitting would just add overhead
-
The workflow is a one-off with no reuse
-
State mapping would cost more effort than it saves
| Factor | Flat Graph | Subgraphs |
|---|---|---|
| Node count | Under 6 | 8+ with clear groups |
| State overlap | Everything shared | Some stages need private state |
| Reuse | One-off workflow | Shared across pipelines |
| Team structure | Solo dev | Many teams or owners |
| Testing | Integration tests are enough | Unit tests per stage needed |
Key Insight: Subgraphs are to graphs what functions are to scripts — they break a scary monolith into bite-sized pieces. If a 200-line function makes you cringe, a 20-node flat graph should too.
What Are the Most Common Subgraph Mistakes?
Mistake 1: Passing a builder instead of a compiled graph
Always call .compile() first. If you hand the raw builder to a parent, LangGraph throws an error.
Wrong:
sub_builder = StateGraph(SharedState)
sub_builder.add_node("step", some_func)
# ...
parent.add_node("sub", sub_builder) # Error! Not compiled
Right:
sub_graph = sub_builder.compile() # compile first
parent.add_node("sub", sub_graph) # pass the compiled graph
Mistake 2: Direct-mounting a subgraph when the keys don’t match
Direct mounting with add_node() only works when parent and child share the same key names. Mismatched keys lead to silent data loss or missing-key crashes.
Wrong:
class ParentS(TypedDict):
user_input: str
class ChildS(TypedDict):
query: str # different key name!
child_graph = StateGraph(ChildS) # ...compile()
parent.add_node("child", child_graph) # keys don't match!
Right — use a wrapper when keys differ:
def bridge(state: ParentS) -> dict:
result = child_graph.invoke({"query": state["user_input"]})
return {"user_input": result["query"]}
parent.add_node("child", bridge)
Mistake 3: Not providing all keys when calling a subgraph
Wrapper calls need the full dict — every key the subgraph schema defines, even the ones it plans to overwrite.
Wrong:
def wrapper(state):
return sub_graph.invoke({"input_text": state["text"]})
# Missing "summary" and "word_count" keys!
Right:
def wrapper(state):
return sub_graph.invoke({
"input_text": state["text"],
"summary": "", # provide all keys
"word_count": 0 # even with default values
})
Warning: Python’s TypedDict is a type hint, not a runtime guard. Skip a key and Python won’t complain — but the subgraph’s node will crash the moment it reads the missing field. Fill in every key in your invoke() dict, even with dummy values.
Summary
Subgraphs let you slice a sprawling LangGraph app into focused, self-contained pieces. Quick recap:
-
Compile a
StateGraphand pass it toadd_node()— the parent treats it like any other node -
Same schema? Mount the subgraph straight in. One line of code, zero mapping.
-
Different schema? Write a thin wrapper that reshapes data on the way in and out
-
Nesting is fine up to 3 layers — beyond that, flatten
-
Testing in isolation is the killer feature — feed a subgraph test data without touching the parent
-
Checkpoints ride along for free with direct mounts; wrapper calls need a manual checkpointer
-
Rule of thumb: once your graph hits 8+ nodes with clear clusters, it’s time to split
Practice exercise: Build a two-stage pipeline: a “preprocessor” subgraph that lowercases and strips whitespace from raw_text, and a “counter” subgraph that counts words and characters. Use different state schemas for each and link them through a parent graph with wrappers.
Solution sketch
Define PreprocessState with raw_text and processed_text. Define CounterState with text, word_count, and char_count. The parent PipeState has input_text, clean_text, words, and chars. Two wrappers translate between schemas. Wire as START -> preprocess -> count -> END.
The next post covers human-in-the-loop patterns — how to pause a graph mid-run, gather human input, and resume.
Complete Code
Click to expand the full script (copy-paste and run)
# Complete code from: Subgraphs in LangGraph
# Requires: pip install langgraph langchain-core
# Python 3.10+
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
import re
# ============================================================
# Section 1: Basic Subgraph with Shared State
# ============================================================
class SharedState(TypedDict):
raw_text: str
cleaned_text: str
is_valid: bool
def validate(state: SharedState) -> dict:
text = state["raw_text"].strip()
return {"is_valid": len(text) > 0, "cleaned_text": text}
def format_text(state: SharedState) -> dict:
return {"cleaned_text": state["cleaned_text"].upper()}
sub_builder = StateGraph(SharedState)
sub_builder.add_node("validate", validate)
sub_builder.add_node("format_text", format_text)
sub_builder.add_edge(START, "validate")
sub_builder.add_edge("validate", "format_text")
sub_builder.add_edge("format_text", END)
sub_graph = sub_builder.compile()
result = sub_graph.invoke({
"raw_text": " hello world ",
"cleaned_text": "",
"is_valid": False
})
print("Subgraph result:", result)
# ============================================================
# Section 2: Parent Graph with Subgraph Node
# ============================================================
def intake(state: SharedState) -> dict:
return {"raw_text": state["raw_text"]}
def store(state: SharedState) -> dict:
print(f"Storing: {state['cleaned_text']}")
return state
parent_builder = StateGraph(SharedState)
parent_builder.add_node("intake", intake)
parent_builder.add_node("processor", sub_graph)
parent_builder.add_node("store", store)
parent_builder.add_edge(START, "intake")
parent_builder.add_edge("intake", "processor")
parent_builder.add_edge("processor", "store")
parent_builder.add_edge("store", END)
parent_app = parent_builder.compile()
result = parent_app.invoke({
"raw_text": " data science rocks ",
"cleaned_text": "",
"is_valid": False
})
print("Parent result:", result["cleaned_text"])
# ============================================================
# Section 3: Isolated State (Different Schemas)
# ============================================================
class SummaryState(TypedDict):
input_text: str
summary: str
word_count: int
def extract_key_points(state: SummaryState) -> dict:
words = state["input_text"].split()
short = " ".join(words[:5]) + "..."
return {"summary": short, "word_count": len(words)}
summary_builder = StateGraph(SummaryState)
summary_builder.add_node("extract", extract_key_points)
summary_builder.add_edge(START, "extract")
summary_builder.add_edge("extract", END)
summary_graph = summary_builder.compile()
class ParentState(TypedDict):
raw_text: str
cleaned_text: str
final_summary: str
def run_summarizer(state: ParentState) -> dict:
sub_input = {
"input_text": state["cleaned_text"],
"summary": "",
"word_count": 0
}
sub_result = summary_graph.invoke(sub_input)
return {"final_summary": sub_result["summary"]}
parent2 = StateGraph(ParentState)
parent2.add_node("summarizer", run_summarizer)
parent2.add_edge(START, "summarizer")
parent2.add_edge("summarizer", END)
app2 = parent2.compile()
result2 = app2.invoke({
"raw_text": "the quick brown fox jumps over the lazy dog today",
"cleaned_text": "the quick brown fox jumps over the lazy dog today",
"final_summary": ""
})
print("Summary:", result2["final_summary"])
# ============================================================
# Section 4: Nested Subgraphs (3 levels)
# ============================================================
class TokenState(TypedDict):
text: str
tokens: list[str]
def tokenize(state: TokenState) -> dict:
return {"tokens": state["text"].lower().split()}
grandchild = StateGraph(TokenState)
grandchild.add_node("tokenize", tokenize)
grandchild.add_edge(START, "tokenize")
grandchild.add_edge("tokenize", END)
grandchild_graph = grandchild.compile()
class CleanState(TypedDict):
raw: str
cleaned: str
token_list: list[str]
def clean(state: CleanState) -> dict:
cleaned = re.sub(r'[^a-zA-Z\s]', '', state["raw"])
return {"cleaned": cleaned.strip()}
def run_tokenizer(state: CleanState) -> dict:
sub_input = {"text": state["cleaned"], "tokens": []}
sub_result = grandchild_graph.invoke(sub_input)
return {"token_list": sub_result["tokens"]}
child = StateGraph(CleanState)
child.add_node("clean", clean)
child.add_node("tokenize", run_tokenizer)
child.add_edge(START, "clean")
child.add_edge("clean", "tokenize")
child.add_edge("tokenize", END)
child_graph = child.compile()
class PipelineState(TypedDict):
user_input: str
processed_tokens: list[str]
def run_child(state: PipelineState) -> dict:
sub_input = {"raw": state["user_input"], "cleaned": "", "token_list": []}
sub_result = child_graph.invoke(sub_input)
return {"processed_tokens": sub_result["token_list"]}
parent3 = StateGraph(PipelineState)
parent3.add_node("process", run_child)
parent3.add_edge(START, "process")
parent3.add_edge("process", END)
app3 = parent3.compile()
result3 = app3.invoke({"user_input": "Hello, World! 123", "processed_tokens": []})
print("Nested result:", result3["processed_tokens"])
# ============================================================
# Section 5: Testing Subgraphs Independently
# ============================================================
assert grandchild_graph.invoke({"text": "A B C", "tokens": []})["tokens"] == ["a", "b", "c"]
assert child_graph.invoke({"raw": "!!!", "cleaned": "", "token_list": []})["token_list"] == []
print("All tests passed!")
# ============================================================
# Section 6: Multi-Stage Document Pipeline
# ============================================================
class ClassifyState(TypedDict):
doc_text: str
category: str
confidence: float
def classify_doc(state: ClassifyState) -> dict:
text = state["doc_text"].lower()
if "invoice" in text or "payment" in text:
return {"category": "financial", "confidence": 0.9}
elif "contract" in text or "agreement" in text:
return {"category": "legal", "confidence": 0.85}
else:
return {"category": "general", "confidence": 0.5}
classify_builder = StateGraph(ClassifyState)
classify_builder.add_node("classify", classify_doc)
classify_builder.add_edge(START, "classify")
classify_builder.add_edge("classify", END)
classify_graph = classify_builder.compile()
class ExtractState(TypedDict):
doc_text: str
category: str
extracted_data: dict
def extract_fields(state: ExtractState) -> dict:
text = state["doc_text"]
data = {"source_length": len(text)}
if state["category"] == "financial":
data["doc_type"] = "invoice"
data["has_amount"] = "$" in text or "amount" in text.lower()
elif state["category"] == "legal":
data["doc_type"] = "contract"
data["has_dates"] = any(w.isdigit() for w in text.split())
else:
data["doc_type"] = "general"
return {"extracted_data": data}
extract_builder = StateGraph(ExtractState)
extract_builder.add_node("extract", extract_fields)
extract_builder.add_edge(START, "extract")
extract_builder.add_edge("extract", END)
extract_graph = extract_builder.compile()
class QualityState(TypedDict):
extracted_data: dict
quality_score: float
passed_qa: bool
def check_quality(state: QualityState) -> dict:
data = state["extracted_data"]
field_count = len(data)
score = min(field_count / 3, 1.0)
return {"quality_score": round(score, 2), "passed_qa": score >= 0.6}
qa_builder = StateGraph(QualityState)
qa_builder.add_node("check", check_quality)
qa_builder.add_edge(START, "check")
qa_builder.add_edge("check", END)
qa_graph = qa_builder.compile()
class DocPipelineState(TypedDict):
document: str
doc_category: str
doc_confidence: float
doc_data: dict
qa_score: float
qa_passed: bool
def run_classifier(state: DocPipelineState) -> dict:
result = classify_graph.invoke({
"doc_text": state["document"],
"category": "",
"confidence": 0.0
})
return {
"doc_category": result["category"],
"doc_confidence": result["confidence"]
}
def run_extractor(state: DocPipelineState) -> dict:
result = extract_graph.invoke({
"doc_text": state["document"],
"category": state["doc_category"],
"extracted_data": {}
})
return {"doc_data": result["extracted_data"]}
def run_qa(state: DocPipelineState) -> dict:
result = qa_graph.invoke({
"extracted_data": state["doc_data"],
"quality_score": 0.0,
"passed_qa": False
})
return {"qa_score": result["quality_score"], "qa_passed": result["passed_qa"]}
pipeline = StateGraph(DocPipelineState)
pipeline.add_node("classify", run_classifier)
pipeline.add_node("extract", run_extractor)
pipeline.add_node("qa_check", run_qa)
pipeline.add_edge(START, "classify")
pipeline.add_edge("classify", "extract")
pipeline.add_edge("extract", "qa_check")
pipeline.add_edge("qa_check", END)
doc_app = pipeline.compile()
doc_result = doc_app.invoke({
"document": "Invoice #1234: Payment of $5000 due by 2026-03-15",
"doc_category": "",
"doc_confidence": 0.0,
"doc_data": {},
"qa_score": 0.0,
"qa_passed": False
})
print(f"\nCategory: {doc_result['doc_category']}")
print(f"Confidence: {doc_result['doc_confidence']}")
print(f"Extracted: {doc_result['doc_data']}")
print(f"QA Score: {doc_result['qa_score']}")
print(f"QA Passed: {doc_result['qa_passed']}")
print("\nScript completed successfully.")
FAQ
Can a subgraph use conditional edges?
Yes. Inside, a subgraph is a full StateGraph. It can branch, loop, or do anything a top-level graph can. The parent has no view into the internal routing — it just sees one node.
# Inside a subgraph builder
sub_builder.add_conditional_edges(
"classify",
route_fn,
{"urgent": "escalate", "normal": "auto_reply"}
)
Do subgraphs share checkpoints with the parent?
Only with direct mounts. When you pass a compiled subgraph to add_node(), it piggybacks on the parent’s checkpointer and gets a unique namespace so nothing collides. If you call the subgraph from a wrapper function, it runs in its own bubble — you’d have to hand it a checkpointer yourself.
Can I reuse one subgraph in many parent graphs?
That’s the whole point of the pattern. Build and compile the subgraph once, then drop it into as many parents as you need.
parent_a.add_node("validator", validation_subgraph)
parent_b.add_node("validator", validation_subgraph) # same object, different parent
Each parent gets its own execution. State from one parent never leaks into another.
Is there a performance cost to subgraphs?
Tiny. You pay one extra function call per subgraph — a few microseconds. Compare that to the milliseconds (or seconds) your LLM calls and data work take. In practice, subgraph overhead never shows up as a bottleneck.
How does namespace isolation work with many subgraphs?
Every stateful subgraph needs its own slot so saved states don’t step on each other. For direct mounts, LangGraph creates a unique namespace from the node name you give add_node() — you don’t have to do anything. For wrapper-based subgraphs, you’ll need to either pass a fresh checkpointer to each one or carve out namespaces manually.
References
-
LangGraph Documentation — Subgraphs. Link
-
LangGraph Documentation — Graph API Overview. Link
-
LangChain OpenTutorial — How to Transform Subgraph Input and Output. Link
-
LangGraph GitHub Repository. Link
-
LangGraph Documentation — Workflows and Agents. Link
-
James Li — Building Complex AI Workflows with LangGraph: Subgraph Architecture. DEV Community. Link
-
LangGraph Documentation — Subgraphs Overview. Link
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →