Subgraphs in LangGraph — Composing Complex Workflows from Reusable Parts
Your LangGraph workflow started small — three nodes, two edges, done. But six months later it’s a 20-node monster where changing the summarization logic breaks the classification path. Sound familiar? The fix isn’t “be more careful.” The fix is subgraphs — self-contained graphs you embed inside a parent graph, the same way you’d break a 500-line function into smaller ones.
What Is a Subgraph in LangGraph?
A subgraph is a compiled StateGraph that you add as a single node in another graph. The parent graph sees it as one node. Internally, the subgraph runs its own multi-step workflow with its own nodes, edges, and state logic.
Prerequisites
- Python version: 3.10+
- Required libraries: langgraph (0.4+), langchain-core (0.3+)
- Install:
pip install langgraph langchain-core - Prior knowledge: LangGraph basics — nodes, edges, state, conditional routing (Posts 5-9 of this series)
- Time to complete: 25-30 minutes
Think of it like calling a function from another function. The parent doesn’t care what happens inside. It sends state in and gets updated state back.
Here’s the simplest possible subgraph — a two-node workflow that validates input and then formats it. We define two node functions (validate and format_text), wire them into a StateGraph, and compile it into a runnable object.
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class SharedState(TypedDict):
raw_text: str
cleaned_text: str
is_valid: bool
# --- Build the subgraph ---
def validate(state: SharedState) -> dict:
text = state["raw_text"].strip()
return {"is_valid": len(text) > 0, "cleaned_text": text}
def format_text(state: SharedState) -> dict:
return {"cleaned_text": state["cleaned_text"].upper()}
sub_builder = StateGraph(SharedState)
sub_builder.add_node("validate", validate)
sub_builder.add_node("format_text", format_text)
sub_builder.add_edge(START, "validate")
sub_builder.add_edge("validate", "format_text")
sub_builder.add_edge("format_text", END)
sub_graph = sub_builder.compile()
Nothing fancy yet. We built a StateGraph, added two nodes, connected them, and compiled. The result — sub_graph — is a runnable graph object. You can invoke it directly:
result = sub_graph.invoke({
"raw_text": " hello world ",
"cleaned_text": "",
"is_valid": False
})
print(result)
{'raw_text': ' hello world ', 'cleaned_text': 'HELLO WORLD', 'is_valid': True}
The subgraph validated the input, stripped whitespace, and uppercased it. On its own, it’s just a regular graph. The interesting part is embedding it inside a parent.
Adding a Compiled Subgraph as a Node
Why would you embed instead of just calling the subgraph yourself? Because the parent graph handles execution order, error recovery, and checkpointing for you. You get modular code without losing orchestration benefits.
Here’s the core pattern. You call add_node() on the parent builder and pass the compiled subgraph directly — no wrapper function needed. The parent below has two of its own nodes (intake and store) with the subgraph slotted between them.
def intake(state: SharedState) -> dict:
return {"raw_text": state["raw_text"]}
def store(state: SharedState) -> dict:
print(f"Storing: {state['cleaned_text']}")
return state
parent_builder = StateGraph(SharedState)
parent_builder.add_node("intake", intake)
parent_builder.add_node("processor", sub_graph) # compiled subgraph as a node
parent_builder.add_node("store", store)
parent_builder.add_edge(START, "intake")
parent_builder.add_edge("intake", "processor")
parent_builder.add_edge("processor", "store")
parent_builder.add_edge("store", END)
parent_app = parent_builder.compile()
result = parent_app.invoke({
"raw_text": " data science rocks ",
"cleaned_text": "",
"is_valid": False
})
print(result["cleaned_text"])
Storing: DATA SCIENCE ROCKS
DATA SCIENCE ROCKS
One line — add_node("processor", sub_graph) — plugs the entire subgraph into the parent. The parent treats it like any other node. Data flows from intake into the subgraph’s internal nodes, then back out to store.
Shared State — When Parent and Child Use the Same Schema
The example above works seamlessly because both the parent and the subgraph use the same SharedState class. They share every key — raw_text, cleaned_text, is_valid.
When schemas match, LangGraph passes the full state into the subgraph automatically. Whatever the subgraph modifies flows back to the parent. No transformation needed.
# Both graphs use SharedState — keys flow seamlessly
print(f"raw_text: {result['raw_text']}")
print(f"cleaned_text: {result['cleaned_text']}")
print(f"is_valid: {result['is_valid']}")
This prints:
raw_text: data science rocks
cleaned_text: DATA SCIENCE ROCKS
is_valid: True
Every key the subgraph touched (cleaned_text, is_valid) is visible in the parent’s final state. Keys the subgraph didn’t touch (raw_text) stay unchanged.
I’d recommend starting with shared state in every project and only introducing state isolation when you hit a concrete reason to do so.
Isolated State — When Schemas Differ
But what happens when your subgraph tracks internal counters or intermediate results that the parent doesn’t care about? Maybe the subgraph has a word_count field that’s only useful inside it.
You can’t add that subgraph directly with add_node() because the schemas don’t overlap. Instead, you wrap the subgraph call inside a regular node function that translates between the two schemas.
Here’s a subgraph with its own SummaryState. The key difference: it has input_text, summary, and word_count — none of which exist in the parent’s schema.
# Subgraph with a DIFFERENT state schema
class SummaryState(TypedDict):
input_text: str
summary: str
word_count: int
def extract_key_points(state: SummaryState) -> dict:
words = state["input_text"].split()
short = " ".join(words[:5]) + "..."
return {"summary": short, "word_count": len(words)}
summary_builder = StateGraph(SummaryState)
summary_builder.add_node("extract", extract_key_points)
summary_builder.add_edge(START, "extract")
summary_builder.add_edge("extract", END)
summary_graph = summary_builder.compile()
The parent has raw_text, cleaned_text, and final_summary. No overlap with the subgraph. Here’s the bridge — a wrapper function called run_summarizer that maps parent state into subgraph state, invokes the subgraph, and maps the result back.
class ParentState(TypedDict):
raw_text: str
cleaned_text: str
final_summary: str
def run_summarizer(state: ParentState) -> dict:
# Map parent state -> subgraph state
sub_input = {
"input_text": state["cleaned_text"],
"summary": "",
"word_count": 0
}
# Invoke subgraph
sub_result = summary_graph.invoke(sub_input)
# Map subgraph result -> parent state
return {"final_summary": sub_result["summary"]}
parent2 = StateGraph(ParentState)
parent2.add_node("summarizer", run_summarizer)
parent2.add_edge(START, "summarizer")
parent2.add_edge("summarizer", END)
app2 = parent2.compile()
result2 = app2.invoke({
"raw_text": "the quick brown fox jumps over the lazy dog today",
"cleaned_text": "the quick brown fox jumps over the lazy dog today",
"final_summary": ""
})
print(result2["final_summary"])
the quick brown fox jumps...
The wrapper does three things: maps cleaned_text to input_text, calls the subgraph, and maps summary back to final_summary. The parent never sees word_count — it stays private inside the subgraph.
Nested Subgraphs — Subgraphs Inside Subgraphs
Can you put a subgraph inside another subgraph? Yes — and it’s more useful than you’d think. A parent graph calls a child subgraph, which itself contains a grandchild subgraph. Each level compiles independently and stays testable.
Here’s a three-level example. The grandchild tokenizes text. The child cleans text and delegates tokenization to the grandchild. The parent orchestrates everything.
# Level 3: Grandchild — tokenizes text
class TokenState(TypedDict):
text: str
tokens: list[str]
def tokenize(state: TokenState) -> dict:
return {"tokens": state["text"].lower().split()}
grandchild = StateGraph(TokenState)
grandchild.add_node("tokenize", tokenize)
grandchild.add_edge(START, "tokenize")
grandchild.add_edge("tokenize", END)
grandchild_graph = grandchild.compile()
The child subgraph uses re.sub() to strip non-letter characters, then hands the cleaned text to the grandchild via a wrapper. Because CleanState and TokenState have different keys, the wrapper translates between them.
# Level 2: Child — cleans text, then delegates to grandchild
import re
class CleanState(TypedDict):
raw: str
cleaned: str
token_list: list[str]
def clean(state: CleanState) -> dict:
cleaned = re.sub(r'[^a-zA-Z\s]', '', state["raw"])
return {"cleaned": cleaned.strip()}
def run_tokenizer(state: CleanState) -> dict:
sub_input = {"text": state["cleaned"], "tokens": []}
sub_result = grandchild_graph.invoke(sub_input)
return {"token_list": sub_result["tokens"]}
child = StateGraph(CleanState)
child.add_node("clean", clean)
child.add_node("tokenize", run_tokenizer)
child.add_edge(START, "clean")
child.add_edge("clean", "tokenize")
child.add_edge("tokenize", END)
child_graph = child.compile()
Finally, the parent invokes the child subgraph through its own wrapper. Notice the same translation pattern — map in, invoke, map out.
# Level 1: Parent — sends raw input to child pipeline
class PipelineState(TypedDict):
user_input: str
processed_tokens: list[str]
def run_child(state: PipelineState) -> dict:
sub_input = {"raw": state["user_input"], "cleaned": "", "token_list": []}
sub_result = child_graph.invoke(sub_input)
return {"processed_tokens": sub_result["token_list"]}
parent3 = StateGraph(PipelineState)
parent3.add_node("process", run_child)
parent3.add_edge(START, "process")
parent3.add_edge("process", END)
app3 = parent3.compile()
result3 = app3.invoke({"user_input": "Hello, World! 123", "processed_tokens": []})
print(result3["processed_tokens"])
['hello', 'world']
Three levels deep — parent, child, grandchild. The punctuation and numbers from "Hello, World! 123" got stripped by the child’s clean node, then tokenized by the grandchild.
Testing Subgraphs Independently
This is — hands down — the biggest practical benefit of subgraphs. Each one compiles into a standalone runnable. You test it in isolation with known inputs and verify outputs. No parent graph, no complex setup.
# Test grandchild in isolation
test_result = grandchild_graph.invoke({
"text": "LangGraph makes AI workflows modular",
"tokens": []
})
print(test_result["tokens"])
# Test child in isolation
test_child = child_graph.invoke({
"raw": "Hello!! World??",
"cleaned": "",
"token_list": []
})
print(test_child["token_list"])
The grandchild tokenizer produces lowercase tokens, and the child strips punctuation first:
['langgraph', 'makes', 'ai', 'workflows', 'modular']
['hello', 'world']
You can write standard assertions or use pytest. Each subgraph is a function that takes a dict and returns a dict — dead simple to test.
# Simple assertion-based tests
assert grandchild_graph.invoke({"text": "A B C", "tokens": []})["tokens"] == ["a", "b", "c"]
assert child_graph.invoke({"raw": "!!!", "cleaned": "", "token_list": []})["token_list"] == []
print("All tests passed!")
All tests passed!
Exercise 1: Build and Test a Scoring Subgraph
Build a subgraph that takes a text field and returns a score — the word count divided by 10, capped at 1.0. Test it with at least two different inputs.
# Starter code — fill in the TODOs
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class ScorerState(TypedDict):
text: str
score: float
def score_text(state: ScorerState) -> dict:
# TODO: count words in state["text"]
# TODO: return score = min(word_count / 10, 1.0)
pass
scorer_builder = StateGraph(ScorerState)
# TODO: add node, edges, compile
# scorer_graph = ...
# Test 1: 5-word input should score 0.5
# Test 2: 15-word input should score 1.0 (capped)
Hints
– **Hint 1:** Use `len(state[“text”].split())` to count words.
– **Hint 2:** The complete score function is `return {“score”: min(len(state[“text”].split()) / 10, 1.0)}`. The graph needs `add_node(“score”, score_text)`, edges from START to “score” and “score” to END, then `.compile()`.
Solution
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class ScorerState(TypedDict):
text: str
score: float
def score_text(state: ScorerState) -> dict:
word_count = len(state["text"].split())
return {"score": min(word_count / 10, 1.0)}
scorer_builder = StateGraph(ScorerState)
scorer_builder.add_node("score", score_text)
scorer_builder.add_edge(START, "score")
scorer_builder.add_edge("score", END)
scorer_graph = scorer_builder.compile()
# Test 1
r1 = scorer_graph.invoke({"text": "one two three four five", "score": 0.0})
assert r1["score"] == 0.5, f"Expected 0.5, got {r1['score']}"
# Test 2
r2 = scorer_graph.invoke({"text": "a b c d e f g h i j k l m n o", "score": 0.0})
assert r2["score"] == 1.0, f"Expected 1.0, got {r2['score']}"
print(f"Test 1 score: {r1['score']}")
print(f"Test 2 score: {r2['score']}")
print("All tests passed!")
Test 1 score: 0.5
Test 2 score: 1.0
All tests passed!
The function counts words with `split()`, divides by 10, and caps at 1.0 using `min()`. Five words give 0.5, fifteen words hit the cap at 1.0.
Real-World Example — Multi-Stage Document Pipeline
Enough toy examples. Here’s a pipeline I’d actually build in a production setting — a three-stage document processor with classification, extraction, and quality checking. Each stage is its own subgraph.
The classification subgraph reads document text and assigns a category. We use keyword matching here. In production, you’d swap this with an LLM call — and the rest of the pipeline wouldn’t need to change at all. That’s the whole point.
# Stage 1: Classification subgraph
class ClassifyState(TypedDict):
doc_text: str
category: str
confidence: float
def classify_doc(state: ClassifyState) -> dict:
text = state["doc_text"].lower()
if "invoice" in text or "payment" in text:
return {"category": "financial", "confidence": 0.9}
elif "contract" in text or "agreement" in text:
return {"category": "legal", "confidence": 0.85}
else:
return {"category": "general", "confidence": 0.5}
classify_builder = StateGraph(ClassifyState)
classify_builder.add_node("classify", classify_doc)
classify_builder.add_edge(START, "classify")
classify_builder.add_edge("classify", END)
classify_graph = classify_builder.compile()
The extraction subgraph pulls key data points based on the document’s category. Notice that category comes from the classification stage — the parent passes it through.
# Stage 2: Extraction subgraph
class ExtractState(TypedDict):
doc_text: str
category: str
extracted_data: dict
def extract_fields(state: ExtractState) -> dict:
text = state["doc_text"]
data = {"source_length": len(text)}
if state["category"] == "financial":
data["doc_type"] = "invoice"
data["has_amount"] = "$" in text or "amount" in text.lower()
elif state["category"] == "legal":
data["doc_type"] = "contract"
data["has_dates"] = any(w.isdigit() for w in text.split())
else:
data["doc_type"] = "general"
return {"extracted_data": data}
extract_builder = StateGraph(ExtractState)
extract_builder.add_node("extract", extract_fields)
extract_builder.add_edge(START, "extract")
extract_builder.add_edge("extract", END)
extract_graph = extract_builder.compile()
The quality check subgraph validates extraction results. It flags documents that didn’t extract enough data.
# Stage 3: Quality check subgraph
class QualityState(TypedDict):
extracted_data: dict
quality_score: float
passed_qa: bool
def check_quality(state: QualityState) -> dict:
data = state["extracted_data"]
field_count = len(data)
score = min(field_count / 3, 1.0)
return {"quality_score": round(score, 2), "passed_qa": score >= 0.6}
qa_builder = StateGraph(QualityState)
qa_builder.add_node("check", check_quality)
qa_builder.add_edge(START, "check")
qa_builder.add_edge("check", END)
qa_graph = qa_builder.compile()
The parent graph ties all three stages together. Each wrapper function translates between the parent’s schema and each subgraph’s schema. Watch how data flows: document feeds into classification, the resulting doc_category feeds into extraction, and doc_data feeds into quality checking.
class DocPipelineState(TypedDict):
document: str
doc_category: str
doc_confidence: float
doc_data: dict
qa_score: float
qa_passed: bool
def run_classifier(state: DocPipelineState) -> dict:
result = classify_graph.invoke({
"doc_text": state["document"],
"category": "",
"confidence": 0.0
})
return {
"doc_category": result["category"],
"doc_confidence": result["confidence"]
}
def run_extractor(state: DocPipelineState) -> dict:
result = extract_graph.invoke({
"doc_text": state["document"],
"category": state["doc_category"],
"extracted_data": {}
})
return {"doc_data": result["extracted_data"]}
def run_qa(state: DocPipelineState) -> dict:
result = qa_graph.invoke({
"extracted_data": state["doc_data"],
"quality_score": 0.0,
"passed_qa": False
})
return {"qa_score": result["quality_score"], "qa_passed": result["passed_qa"]}
Here’s where we wire the three wrappers into the parent graph. The pipeline runs classify -> extract -> qa_check sequentially.
pipeline = StateGraph(DocPipelineState)
pipeline.add_node("classify", run_classifier)
pipeline.add_node("extract", run_extractor)
pipeline.add_node("qa_check", run_qa)
pipeline.add_edge(START, "classify")
pipeline.add_edge("classify", "extract")
pipeline.add_edge("extract", "qa_check")
pipeline.add_edge("qa_check", END)
doc_app = pipeline.compile()
Let’s send a financial document through and inspect every field:
doc_result = doc_app.invoke({
"document": "Invoice #1234: Payment of $5000 due by 2026-03-15",
"doc_category": "",
"doc_confidence": 0.0,
"doc_data": {},
"qa_score": 0.0,
"qa_passed": False
})
print(f"Category: {doc_result['doc_category']}")
print(f"Confidence: {doc_result['doc_confidence']}")
print(f"Extracted: {doc_result['doc_data']}")
print(f"QA Score: {doc_result['qa_score']}")
print(f"QA Passed: {doc_result['qa_passed']}")
Category: financial
Confidence: 0.9
Extracted: {'source_length': 50, 'doc_type': 'invoice', 'has_amount': True}
QA Score: 1.0
QA Passed: True
Classification tagged it as “financial” with 0.9 confidence. Extraction pulled three fields (source_length, doc_type, has_amount). QA scored 1.0 because three fields exceed the threshold.
Each subgraph handles one concern. Want to swap classification with an LLM-powered version? The extraction and QA subgraphs stay untouched. Need a fourth stage for archiving? Add another subgraph node. The existing ones don’t change.
Exercise 2: Add a State-Mapping Wrapper
You already have scorer_graph from Exercise 1. Write a parent graph with a ReviewState schema (keys: document and review_score) that uses a wrapper function to invoke scorer_graph. The wrapper should map document to text and score back to review_score.
# Starter code — fill in the wrapper function
class ReviewState(TypedDict):
document: str
review_score: float
def run_scorer(state: ReviewState) -> dict:
# TODO: build sub_input dict mapping document -> text
# TODO: invoke scorer_graph
# TODO: return dict mapping score -> review_score
pass
# TODO: build parent graph, compile, and test
Hints
– **Hint 1:** The subgraph expects `{“text”: …, “score”: 0.0}`. Map `state[“document”]` to the `text` key.
– **Hint 2:** After invoking, grab `result[“score”]` and return it as `{“review_score”: result[“score”]}`.
Solution
class ReviewState(TypedDict):
document: str
review_score: float
def run_scorer(state: ReviewState) -> dict:
result = scorer_graph.invoke({
"text": state["document"],
"score": 0.0
})
return {"review_score": result["score"]}
review_parent = StateGraph(ReviewState)
review_parent.add_node("scorer", run_scorer)
review_parent.add_edge(START, "scorer")
review_parent.add_edge("scorer", END)
review_app = review_parent.compile()
review_result = review_app.invoke({
"document": "one two three four five six seven eight nine ten eleven",
"review_score": 0.0
})
print(f"Review score: {review_result['review_score']}")
Review score: 1.0
The wrapper maps `document` to `text`, invokes the scorer, and maps `score` back to `review_score`. Eleven words divided by 10 is 1.1, capped at 1.0.
Checkpointing and Streaming with Subgraphs
Two things you’ll need in production: checkpointing and streaming. How they behave depends on how you mount the subgraph.
Checkpointing behavior differs between direct mounting and wrapper-based invocation. When you mount a compiled subgraph directly with add_node("name", compiled_subgraph), it shares the parent’s checkpointer automatically. LangGraph assigns a separate namespace so checkpoints don’t collide. When you invoke via a wrapper function, the subgraph runs independently — no shared checkpoints unless you pass a checkpointer explicitly.
from langgraph.checkpoint.memory import MemorySaver
# Direct mount: subgraph shares parent's checkpointer
checkpointer = MemorySaver()
parent_app = parent_builder.compile(checkpointer=checkpointer)
Streaming from subgraphs requires one extra parameter. By default, you only see events from the parent graph’s nodes. Pass subgraphs=True to see internal subgraph events too.
# Stream with subgraph visibility
for event in parent_app.stream(
{"raw_text": " test input ", "cleaned_text": "", "is_valid": False},
subgraphs=True
):
print(event)
TIP: Use
subgraphs=Trueduring development and debugging. In production, you’ll usually want only the parent-level events to keep logs clean.
When to Use Subgraphs vs. a Single Graph
Not every workflow needs subgraphs. Here’s how I decide.
Use subgraphs when:
- Your graph has 8+ nodes with distinct functional groups (classification, extraction, validation)
- Multiple workflows reuse the same logic — the same summarizer in three pipelines
- Different team members own different pipeline stages
- You need to unit-test stages independently
Stick with a single graph when:
- Your workflow has fewer than 6 nodes
- Every node depends tightly on every other node’s state
- There’s no reuse across projects
- State mapping overhead outweighs the modularity benefit
| Factor | Single Graph | Subgraphs |
|---|---|---|
| Node count | Under 6 | 8+ with distinct groups |
| State overlap | Everything shares state | Some stages need private state |
| Reuse potential | One-off workflow | Shared across pipelines |
| Team structure | Solo developer | Multiple teams or owners |
| Testing | Integration tests suffice | Unit tests per stage needed |
Common Mistakes and How to Fix Them
Mistake 1: Passing a builder instead of a compiled graph
You must call .compile() before adding a subgraph to a parent. Passing the builder object directly raises an error.
Wrong approach:
sub_builder = StateGraph(SharedState)
sub_builder.add_node("step", some_func)
# ...
parent.add_node("sub", sub_builder) # Error! Not compiled
Correct approach:
sub_graph = sub_builder.compile() # compile first
parent.add_node("sub", sub_graph) # pass the compiled graph
Mistake 2: Directly mounting a subgraph with mismatched keys
When you use add_node("name", compiled_subgraph) directly, parent and subgraph must share state keys. Different key names cause missing key errors or silent data loss.
Wrong approach:
class ParentS(TypedDict):
user_input: str
class ChildS(TypedDict):
query: str # different key name!
child_graph = StateGraph(ChildS) # ...compile()
parent.add_node("child", child_graph) # keys don't match!
Correct approach — use a wrapper when keys differ:
def bridge(state: ParentS) -> dict:
result = child_graph.invoke({"query": state["user_input"]})
return {"user_input": result["query"]}
parent.add_node("child", bridge)
Mistake 3: Forgetting to initialize all subgraph state keys
When invoking a subgraph via a wrapper, you must provide every key it expects — even ones the subgraph will overwrite.
Wrong approach:
def wrapper(state):
return sub_graph.invoke({"input_text": state["text"]})
# Missing "summary" and "word_count" keys!
Correct approach:
def wrapper(state):
return sub_graph.invoke({
"input_text": state["text"],
"summary": "", # provide all keys
"word_count": 0 # even with default values
})
Summary
Subgraphs turn large, tangled LangGraph workflows into manageable pieces. Here’s what you’ve learned:
- A subgraph is a compiled
StateGraphadded as a node withadd_node() - Shared state: when parent and child use the same schema, mounting is direct — one line
- Isolated state: when schemas differ, a wrapper function translates between them
- Nested subgraphs work but shouldn’t go deeper than 3 levels
- Independent testing is the biggest practical win — each subgraph is standalone
- Checkpointing works automatically with direct mounting; wrappers need explicit setup
- Use subgraphs when your graph has 8+ nodes with distinct functional groups
Practice exercise: Build a two-stage pipeline where a “preprocessor” subgraph lowercases and strips whitespace from raw_text, and a “counter” subgraph counts words and characters. Use different state schemas for each and connect them through a parent graph with wrapper functions.
Solution sketch
Define `PreprocessState` with `raw_text` and `processed_text`. Define `CounterState` with `text`, `word_count`, and `char_count`. The parent `PipeState` has `input_text`, `clean_text`, `words`, and `chars`. Two wrapper functions translate between schemas. Wire them as `START -> preprocess -> count -> END`.
The next post covers human-in-the-loop patterns — how to pause a graph mid-execution, collect human input, and resume.
Complete Code
Click to expand the full script (copy-paste and run)
# Complete code from: Subgraphs in LangGraph
# Requires: pip install langgraph langchain-core
# Python 3.10+
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
import re
# ============================================================
# Section 1: Basic Subgraph with Shared State
# ============================================================
class SharedState(TypedDict):
raw_text: str
cleaned_text: str
is_valid: bool
def validate(state: SharedState) -> dict:
text = state["raw_text"].strip()
return {"is_valid": len(text) > 0, "cleaned_text": text}
def format_text(state: SharedState) -> dict:
return {"cleaned_text": state["cleaned_text"].upper()}
sub_builder = StateGraph(SharedState)
sub_builder.add_node("validate", validate)
sub_builder.add_node("format_text", format_text)
sub_builder.add_edge(START, "validate")
sub_builder.add_edge("validate", "format_text")
sub_builder.add_edge("format_text", END)
sub_graph = sub_builder.compile()
result = sub_graph.invoke({
"raw_text": " hello world ",
"cleaned_text": "",
"is_valid": False
})
print("Subgraph result:", result)
# ============================================================
# Section 2: Parent Graph with Subgraph Node
# ============================================================
def intake(state: SharedState) -> dict:
return {"raw_text": state["raw_text"]}
def store(state: SharedState) -> dict:
print(f"Storing: {state['cleaned_text']}")
return state
parent_builder = StateGraph(SharedState)
parent_builder.add_node("intake", intake)
parent_builder.add_node("processor", sub_graph)
parent_builder.add_node("store", store)
parent_builder.add_edge(START, "intake")
parent_builder.add_edge("intake", "processor")
parent_builder.add_edge("processor", "store")
parent_builder.add_edge("store", END)
parent_app = parent_builder.compile()
result = parent_app.invoke({
"raw_text": " data science rocks ",
"cleaned_text": "",
"is_valid": False
})
print("Parent result:", result["cleaned_text"])
# ============================================================
# Section 3: Isolated State (Different Schemas)
# ============================================================
class SummaryState(TypedDict):
input_text: str
summary: str
word_count: int
def extract_key_points(state: SummaryState) -> dict:
words = state["input_text"].split()
short = " ".join(words[:5]) + "..."
return {"summary": short, "word_count": len(words)}
summary_builder = StateGraph(SummaryState)
summary_builder.add_node("extract", extract_key_points)
summary_builder.add_edge(START, "extract")
summary_builder.add_edge("extract", END)
summary_graph = summary_builder.compile()
class ParentState(TypedDict):
raw_text: str
cleaned_text: str
final_summary: str
def run_summarizer(state: ParentState) -> dict:
sub_input = {
"input_text": state["cleaned_text"],
"summary": "",
"word_count": 0
}
sub_result = summary_graph.invoke(sub_input)
return {"final_summary": sub_result["summary"]}
parent2 = StateGraph(ParentState)
parent2.add_node("summarizer", run_summarizer)
parent2.add_edge(START, "summarizer")
parent2.add_edge("summarizer", END)
app2 = parent2.compile()
result2 = app2.invoke({
"raw_text": "the quick brown fox jumps over the lazy dog today",
"cleaned_text": "the quick brown fox jumps over the lazy dog today",
"final_summary": ""
})
print("Summary:", result2["final_summary"])
# ============================================================
# Section 4: Nested Subgraphs (3 levels)
# ============================================================
class TokenState(TypedDict):
text: str
tokens: list[str]
def tokenize(state: TokenState) -> dict:
return {"tokens": state["text"].lower().split()}
grandchild = StateGraph(TokenState)
grandchild.add_node("tokenize", tokenize)
grandchild.add_edge(START, "tokenize")
grandchild.add_edge("tokenize", END)
grandchild_graph = grandchild.compile()
class CleanState(TypedDict):
raw: str
cleaned: str
token_list: list[str]
def clean(state: CleanState) -> dict:
cleaned = re.sub(r'[^a-zA-Z\s]', '', state["raw"])
return {"cleaned": cleaned.strip()}
def run_tokenizer(state: CleanState) -> dict:
sub_input = {"text": state["cleaned"], "tokens": []}
sub_result = grandchild_graph.invoke(sub_input)
return {"token_list": sub_result["tokens"]}
child = StateGraph(CleanState)
child.add_node("clean", clean)
child.add_node("tokenize", run_tokenizer)
child.add_edge(START, "clean")
child.add_edge("clean", "tokenize")
child.add_edge("tokenize", END)
child_graph = child.compile()
class PipelineState(TypedDict):
user_input: str
processed_tokens: list[str]
def run_child(state: PipelineState) -> dict:
sub_input = {"raw": state["user_input"], "cleaned": "", "token_list": []}
sub_result = child_graph.invoke(sub_input)
return {"processed_tokens": sub_result["token_list"]}
parent3 = StateGraph(PipelineState)
parent3.add_node("process", run_child)
parent3.add_edge(START, "process")
parent3.add_edge("process", END)
app3 = parent3.compile()
result3 = app3.invoke({"user_input": "Hello, World! 123", "processed_tokens": []})
print("Nested result:", result3["processed_tokens"])
# ============================================================
# Section 5: Testing Subgraphs Independently
# ============================================================
assert grandchild_graph.invoke({"text": "A B C", "tokens": []})["tokens"] == ["a", "b", "c"]
assert child_graph.invoke({"raw": "!!!", "cleaned": "", "token_list": []})["token_list"] == []
print("All tests passed!")
# ============================================================
# Section 6: Multi-Stage Document Pipeline
# ============================================================
class ClassifyState(TypedDict):
doc_text: str
category: str
confidence: float
def classify_doc(state: ClassifyState) -> dict:
text = state["doc_text"].lower()
if "invoice" in text or "payment" in text:
return {"category": "financial", "confidence": 0.9}
elif "contract" in text or "agreement" in text:
return {"category": "legal", "confidence": 0.85}
else:
return {"category": "general", "confidence": 0.5}
classify_builder = StateGraph(ClassifyState)
classify_builder.add_node("classify", classify_doc)
classify_builder.add_edge(START, "classify")
classify_builder.add_edge("classify", END)
classify_graph = classify_builder.compile()
class ExtractState(TypedDict):
doc_text: str
category: str
extracted_data: dict
def extract_fields(state: ExtractState) -> dict:
text = state["doc_text"]
data = {"source_length": len(text)}
if state["category"] == "financial":
data["doc_type"] = "invoice"
data["has_amount"] = "$" in text or "amount" in text.lower()
elif state["category"] == "legal":
data["doc_type"] = "contract"
data["has_dates"] = any(w.isdigit() for w in text.split())
else:
data["doc_type"] = "general"
return {"extracted_data": data}
extract_builder = StateGraph(ExtractState)
extract_builder.add_node("extract", extract_fields)
extract_builder.add_edge(START, "extract")
extract_builder.add_edge("extract", END)
extract_graph = extract_builder.compile()
class QualityState(TypedDict):
extracted_data: dict
quality_score: float
passed_qa: bool
def check_quality(state: QualityState) -> dict:
data = state["extracted_data"]
field_count = len(data)
score = min(field_count / 3, 1.0)
return {"quality_score": round(score, 2), "passed_qa": score >= 0.6}
qa_builder = StateGraph(QualityState)
qa_builder.add_node("check", check_quality)
qa_builder.add_edge(START, "check")
qa_builder.add_edge("check", END)
qa_graph = qa_builder.compile()
class DocPipelineState(TypedDict):
document: str
doc_category: str
doc_confidence: float
doc_data: dict
qa_score: float
qa_passed: bool
def run_classifier(state: DocPipelineState) -> dict:
result = classify_graph.invoke({
"doc_text": state["document"],
"category": "",
"confidence": 0.0
})
return {
"doc_category": result["category"],
"doc_confidence": result["confidence"]
}
def run_extractor(state: DocPipelineState) -> dict:
result = extract_graph.invoke({
"doc_text": state["document"],
"category": state["doc_category"],
"extracted_data": {}
})
return {"doc_data": result["extracted_data"]}
def run_qa(state: DocPipelineState) -> dict:
result = qa_graph.invoke({
"extracted_data": state["doc_data"],
"quality_score": 0.0,
"passed_qa": False
})
return {"qa_score": result["quality_score"], "qa_passed": result["passed_qa"]}
pipeline = StateGraph(DocPipelineState)
pipeline.add_node("classify", run_classifier)
pipeline.add_node("extract", run_extractor)
pipeline.add_node("qa_check", run_qa)
pipeline.add_edge(START, "classify")
pipeline.add_edge("classify", "extract")
pipeline.add_edge("extract", "qa_check")
pipeline.add_edge("qa_check", END)
doc_app = pipeline.compile()
doc_result = doc_app.invoke({
"document": "Invoice #1234: Payment of $5000 due by 2026-03-15",
"doc_category": "",
"doc_confidence": 0.0,
"doc_data": {},
"qa_score": 0.0,
"qa_passed": False
})
print(f"\nCategory: {doc_result['doc_category']}")
print(f"Confidence: {doc_result['doc_confidence']}")
print(f"Extracted: {doc_result['doc_data']}")
print(f"QA Score: {doc_result['qa_score']}")
print(f"QA Passed: {doc_result['qa_passed']}")
print("\nScript completed successfully.")
Frequently Asked Questions
Can a subgraph have conditional edges?
Yes. A subgraph is a full StateGraph — it supports conditional edges, cycles, and every other LangGraph feature. The parent doesn’t see or control the internal routing.
# Inside a subgraph builder
sub_builder.add_conditional_edges(
"classify",
route_fn,
{"urgent": "escalate", "normal": "auto_reply"}
)
Do subgraphs share checkpointing with the parent?
It depends on how you mount them. When you use add_node("name", compiled_subgraph) directly, the subgraph shares the parent’s checkpointer with its own namespace — no checkpoint collisions. When you invoke via a wrapper function, the subgraph runs independently and doesn’t share checkpoints unless you pass a checkpointer explicitly.
Can I reuse the same subgraph in multiple parent graphs?
Absolutely — that’s the whole point. Compile once, add to many parents.
parent_a.add_node("validator", validation_subgraph)
parent_b.add_node("validator", validation_subgraph) # same object, different parent
Each parent invokes its own instance. There’s no shared state between parents.
What’s the performance overhead of subgraphs?
Negligible. The overhead is one function call per subgraph invocation — microseconds compared to the milliseconds your nodes spend on LLM calls or data processing. I’ve never seen subgraph overhead be the bottleneck in any pipeline.
How do I handle namespace isolation with multiple subgraphs?
When you have several stateful subgraphs, each needs its own storage space so checkpoints don’t collide. LangGraph handles this automatically for directly mounted subgraphs by assigning unique namespaces based on the node name you provide in add_node(). For wrapper-invoked subgraphs, you’d need to pass separate checkpointer instances or configure namespaces manually.
References
- LangGraph Documentation — Subgraphs. Link
- LangGraph Documentation — Graph API Overview. Link
- LangChain OpenTutorial — How to Transform Subgraph Input and Output. Link
- LangGraph GitHub Repository. Link
- LangGraph Documentation — Workflows and Agents. Link
- James Li — Building Complex AI Workflows with LangGraph: Subgraph Architecture. DEV Community. Link
- LangGraph Documentation — Subgraphs Overview. Link
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →