Menu

Subgraphs in LangGraph — Composing Complex Workflows from Reusable Parts

Written by Selva Prabhakaran | 28 min read

Your LangGraph workflow started small — three nodes, two edges, done. But six months later it’s a 20-node monster where changing the summarization logic breaks the classification path. Sound familiar? The fix isn’t “be more careful.” The fix is subgraphs — self-contained graphs you embed inside a parent graph, the same way you’d break a 500-line function into smaller ones.

What Is a Subgraph in LangGraph?

A subgraph is a compiled StateGraph that you add as a single node in another graph. The parent graph sees it as one node. Internally, the subgraph runs its own multi-step workflow with its own nodes, edges, and state logic.

Prerequisites

  • Python version: 3.10+
  • Required libraries: langgraph (0.4+), langchain-core (0.3+)
  • Install: pip install langgraph langchain-core
  • Prior knowledge: LangGraph basics — nodes, edges, state, conditional routing (Posts 5-9 of this series)
  • Time to complete: 25-30 minutes

Think of it like calling a function from another function. The parent doesn’t care what happens inside. It sends state in and gets updated state back.

Here’s the simplest possible subgraph — a two-node workflow that validates input and then formats it. We define two node functions (validate and format_text), wire them into a StateGraph, and compile it into a runnable object.

python
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class SharedState(TypedDict):
    raw_text: str
    cleaned_text: str
    is_valid: bool

# --- Build the subgraph ---
def validate(state: SharedState) -> dict:
    text = state["raw_text"].strip()
    return {"is_valid": len(text) > 0, "cleaned_text": text}

def format_text(state: SharedState) -> dict:
    return {"cleaned_text": state["cleaned_text"].upper()}

sub_builder = StateGraph(SharedState)
sub_builder.add_node("validate", validate)
sub_builder.add_node("format_text", format_text)
sub_builder.add_edge(START, "validate")
sub_builder.add_edge("validate", "format_text")
sub_builder.add_edge("format_text", END)

sub_graph = sub_builder.compile()

Nothing fancy yet. We built a StateGraph, added two nodes, connected them, and compiled. The result — sub_graph — is a runnable graph object. You can invoke it directly:

python
result = sub_graph.invoke({
    "raw_text": "  hello world  ",
    "cleaned_text": "",
    "is_valid": False
})
print(result)
python
{'raw_text': '  hello world  ', 'cleaned_text': 'HELLO WORLD', 'is_valid': True}

The subgraph validated the input, stripped whitespace, and uppercased it. On its own, it’s just a regular graph. The interesting part is embedding it inside a parent.

Adding a Compiled Subgraph as a Node

Why would you embed instead of just calling the subgraph yourself? Because the parent graph handles execution order, error recovery, and checkpointing for you. You get modular code without losing orchestration benefits.

Here’s the core pattern. You call add_node() on the parent builder and pass the compiled subgraph directly — no wrapper function needed. The parent below has two of its own nodes (intake and store) with the subgraph slotted between them.

python
def intake(state: SharedState) -> dict:
    return {"raw_text": state["raw_text"]}

def store(state: SharedState) -> dict:
    print(f"Storing: {state['cleaned_text']}")
    return state

parent_builder = StateGraph(SharedState)
parent_builder.add_node("intake", intake)
parent_builder.add_node("processor", sub_graph)  # compiled subgraph as a node
parent_builder.add_node("store", store)

parent_builder.add_edge(START, "intake")
parent_builder.add_edge("intake", "processor")
parent_builder.add_edge("processor", "store")
parent_builder.add_edge("store", END)

parent_app = parent_builder.compile()
result = parent_app.invoke({
    "raw_text": "  data science rocks  ",
    "cleaned_text": "",
    "is_valid": False
})
print(result["cleaned_text"])
python
Storing: DATA SCIENCE ROCKS
DATA SCIENCE ROCKS

One line — add_node("processor", sub_graph) — plugs the entire subgraph into the parent. The parent treats it like any other node. Data flows from intake into the subgraph’s internal nodes, then back out to store.

Key Insight: **A compiled subgraph is just another node.** You add it with `add_node()`, connect it with `add_edge()`, and the parent graph doesn’t know or care that it runs multiple internal steps.

Shared State — When Parent and Child Use the Same Schema

The example above works seamlessly because both the parent and the subgraph use the same SharedState class. They share every key — raw_text, cleaned_text, is_valid.

When schemas match, LangGraph passes the full state into the subgraph automatically. Whatever the subgraph modifies flows back to the parent. No transformation needed.

python
# Both graphs use SharedState — keys flow seamlessly
print(f"raw_text: {result['raw_text']}")
print(f"cleaned_text: {result['cleaned_text']}")
print(f"is_valid: {result['is_valid']}")

This prints:

python
raw_text:   data science rocks
cleaned_text: DATA SCIENCE ROCKS
is_valid: True

Every key the subgraph touched (cleaned_text, is_valid) is visible in the parent’s final state. Keys the subgraph didn’t touch (raw_text) stay unchanged.

Tip: **Start with shared state.** If your subgraph needs only a subset of the parent’s keys, shared state still works — the subgraph ignores keys it doesn’t need. Only move to isolated state when you need private keys that the parent shouldn’t see.

I’d recommend starting with shared state in every project and only introducing state isolation when you hit a concrete reason to do so.

Isolated State — When Schemas Differ

But what happens when your subgraph tracks internal counters or intermediate results that the parent doesn’t care about? Maybe the subgraph has a word_count field that’s only useful inside it.

You can’t add that subgraph directly with add_node() because the schemas don’t overlap. Instead, you wrap the subgraph call inside a regular node function that translates between the two schemas.

Here’s a subgraph with its own SummaryState. The key difference: it has input_text, summary, and word_count — none of which exist in the parent’s schema.

python
# Subgraph with a DIFFERENT state schema
class SummaryState(TypedDict):
    input_text: str
    summary: str
    word_count: int

def extract_key_points(state: SummaryState) -> dict:
    words = state["input_text"].split()
    short = " ".join(words[:5]) + "..."
    return {"summary": short, "word_count": len(words)}

summary_builder = StateGraph(SummaryState)
summary_builder.add_node("extract", extract_key_points)
summary_builder.add_edge(START, "extract")
summary_builder.add_edge("extract", END)

summary_graph = summary_builder.compile()

The parent has raw_text, cleaned_text, and final_summary. No overlap with the subgraph. Here’s the bridge — a wrapper function called run_summarizer that maps parent state into subgraph state, invokes the subgraph, and maps the result back.

python
class ParentState(TypedDict):
    raw_text: str
    cleaned_text: str
    final_summary: str

def run_summarizer(state: ParentState) -> dict:
    # Map parent state -> subgraph state
    sub_input = {
        "input_text": state["cleaned_text"],
        "summary": "",
        "word_count": 0
    }
    # Invoke subgraph
    sub_result = summary_graph.invoke(sub_input)
    # Map subgraph result -> parent state
    return {"final_summary": sub_result["summary"]}

parent2 = StateGraph(ParentState)
parent2.add_node("summarizer", run_summarizer)
parent2.add_edge(START, "summarizer")
parent2.add_edge("summarizer", END)

app2 = parent2.compile()
result2 = app2.invoke({
    "raw_text": "the quick brown fox jumps over the lazy dog today",
    "cleaned_text": "the quick brown fox jumps over the lazy dog today",
    "final_summary": ""
})
print(result2["final_summary"])
python
the quick brown fox jumps...

The wrapper does three things: maps cleaned_text to input_text, calls the subgraph, and maps summary back to final_summary. The parent never sees word_count — it stays private inside the subgraph.

Key Insight: **When schemas differ, the wrapper function is your translator.** It converts parent state to subgraph state before invocation and converts the result back afterward. The subgraph’s private keys never leak into the parent.

Nested Subgraphs — Subgraphs Inside Subgraphs

Can you put a subgraph inside another subgraph? Yes — and it’s more useful than you’d think. A parent graph calls a child subgraph, which itself contains a grandchild subgraph. Each level compiles independently and stays testable.

Here’s a three-level example. The grandchild tokenizes text. The child cleans text and delegates tokenization to the grandchild. The parent orchestrates everything.

python
# Level 3: Grandchild — tokenizes text
class TokenState(TypedDict):
    text: str
    tokens: list[str]

def tokenize(state: TokenState) -> dict:
    return {"tokens": state["text"].lower().split()}

grandchild = StateGraph(TokenState)
grandchild.add_node("tokenize", tokenize)
grandchild.add_edge(START, "tokenize")
grandchild.add_edge("tokenize", END)
grandchild_graph = grandchild.compile()

The child subgraph uses re.sub() to strip non-letter characters, then hands the cleaned text to the grandchild via a wrapper. Because CleanState and TokenState have different keys, the wrapper translates between them.

python
# Level 2: Child — cleans text, then delegates to grandchild
import re

class CleanState(TypedDict):
    raw: str
    cleaned: str
    token_list: list[str]

def clean(state: CleanState) -> dict:
    cleaned = re.sub(r'[^a-zA-Z\s]', '', state["raw"])
    return {"cleaned": cleaned.strip()}

def run_tokenizer(state: CleanState) -> dict:
    sub_input = {"text": state["cleaned"], "tokens": []}
    sub_result = grandchild_graph.invoke(sub_input)
    return {"token_list": sub_result["tokens"]}

child = StateGraph(CleanState)
child.add_node("clean", clean)
child.add_node("tokenize", run_tokenizer)
child.add_edge(START, "clean")
child.add_edge("clean", "tokenize")
child.add_edge("tokenize", END)
child_graph = child.compile()

Finally, the parent invokes the child subgraph through its own wrapper. Notice the same translation pattern — map in, invoke, map out.

python
# Level 1: Parent — sends raw input to child pipeline
class PipelineState(TypedDict):
    user_input: str
    processed_tokens: list[str]

def run_child(state: PipelineState) -> dict:
    sub_input = {"raw": state["user_input"], "cleaned": "", "token_list": []}
    sub_result = child_graph.invoke(sub_input)
    return {"processed_tokens": sub_result["token_list"]}

parent3 = StateGraph(PipelineState)
parent3.add_node("process", run_child)
parent3.add_edge(START, "process")
parent3.add_edge("process", END)

app3 = parent3.compile()
result3 = app3.invoke({"user_input": "Hello, World! 123", "processed_tokens": []})
print(result3["processed_tokens"])
python
['hello', 'world']

Three levels deep — parent, child, grandchild. The punctuation and numbers from "Hello, World! 123" got stripped by the child’s clean node, then tokenized by the grandchild.

Warning: **Don’t nest deeper than 3 levels.** Each nesting level adds debugging complexity. If you’re at 4+ levels, flatten by combining two adjacent levels into one graph. Deep nesting is a code smell, not a feature.

Testing Subgraphs Independently

This is — hands down — the biggest practical benefit of subgraphs. Each one compiles into a standalone runnable. You test it in isolation with known inputs and verify outputs. No parent graph, no complex setup.

python
# Test grandchild in isolation
test_result = grandchild_graph.invoke({
    "text": "LangGraph makes AI workflows modular",
    "tokens": []
})
print(test_result["tokens"])

# Test child in isolation
test_child = child_graph.invoke({
    "raw": "Hello!! World??",
    "cleaned": "",
    "token_list": []
})
print(test_child["token_list"])

The grandchild tokenizer produces lowercase tokens, and the child strips punctuation first:

python
['langgraph', 'makes', 'ai', 'workflows', 'modular']
['hello', 'world']

You can write standard assertions or use pytest. Each subgraph is a function that takes a dict and returns a dict — dead simple to test.

python
# Simple assertion-based tests
assert grandchild_graph.invoke({"text": "A B C", "tokens": []})["tokens"] == ["a", "b", "c"]
assert child_graph.invoke({"raw": "!!!", "cleaned": "", "token_list": []})["token_list"] == []
print("All tests passed!")
python
All tests passed!
Tip: **Write tests for each subgraph before composing them.** When the parent graph produces wrong output, you can immediately rule out subgraphs whose unit tests pass. That narrows the bug to the wrapper functions — the integration points.

Exercise 1: Build and Test a Scoring Subgraph

Build a subgraph that takes a text field and returns a score — the word count divided by 10, capped at 1.0. Test it with at least two different inputs.

python
# Starter code — fill in the TODOs
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class ScorerState(TypedDict):
    text: str
    score: float

def score_text(state: ScorerState) -> dict:
    # TODO: count words in state["text"]
    # TODO: return score = min(word_count / 10, 1.0)
    pass

scorer_builder = StateGraph(ScorerState)
# TODO: add node, edges, compile
# scorer_graph = ...

# Test 1: 5-word input should score 0.5
# Test 2: 15-word input should score 1.0 (capped)
Hints

– **Hint 1:** Use `len(state[“text”].split())` to count words.
– **Hint 2:** The complete score function is `return {“score”: min(len(state[“text”].split()) / 10, 1.0)}`. The graph needs `add_node(“score”, score_text)`, edges from START to “score” and “score” to END, then `.compile()`.

Solution
python
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class ScorerState(TypedDict):
    text: str
    score: float

def score_text(state: ScorerState) -> dict:
    word_count = len(state["text"].split())
    return {"score": min(word_count / 10, 1.0)}

scorer_builder = StateGraph(ScorerState)
scorer_builder.add_node("score", score_text)
scorer_builder.add_edge(START, "score")
scorer_builder.add_edge("score", END)
scorer_graph = scorer_builder.compile()

# Test 1
r1 = scorer_graph.invoke({"text": "one two three four five", "score": 0.0})
assert r1["score"] == 0.5, f"Expected 0.5, got {r1['score']}"

# Test 2
r2 = scorer_graph.invoke({"text": "a b c d e f g h i j k l m n o", "score": 0.0})
assert r2["score"] == 1.0, f"Expected 1.0, got {r2['score']}"

print(f"Test 1 score: {r1['score']}")
print(f"Test 2 score: {r2['score']}")
print("All tests passed!")
python
Test 1 score: 0.5
Test 2 score: 1.0
All tests passed!

The function counts words with `split()`, divides by 10, and caps at 1.0 using `min()`. Five words give 0.5, fifteen words hit the cap at 1.0.

Real-World Example — Multi-Stage Document Pipeline

Enough toy examples. Here’s a pipeline I’d actually build in a production setting — a three-stage document processor with classification, extraction, and quality checking. Each stage is its own subgraph.

The classification subgraph reads document text and assigns a category. We use keyword matching here. In production, you’d swap this with an LLM call — and the rest of the pipeline wouldn’t need to change at all. That’s the whole point.

python
# Stage 1: Classification subgraph
class ClassifyState(TypedDict):
    doc_text: str
    category: str
    confidence: float

def classify_doc(state: ClassifyState) -> dict:
    text = state["doc_text"].lower()
    if "invoice" in text or "payment" in text:
        return {"category": "financial", "confidence": 0.9}
    elif "contract" in text or "agreement" in text:
        return {"category": "legal", "confidence": 0.85}
    else:
        return {"category": "general", "confidence": 0.5}

classify_builder = StateGraph(ClassifyState)
classify_builder.add_node("classify", classify_doc)
classify_builder.add_edge(START, "classify")
classify_builder.add_edge("classify", END)
classify_graph = classify_builder.compile()

The extraction subgraph pulls key data points based on the document’s category. Notice that category comes from the classification stage — the parent passes it through.

python
# Stage 2: Extraction subgraph
class ExtractState(TypedDict):
    doc_text: str
    category: str
    extracted_data: dict

def extract_fields(state: ExtractState) -> dict:
    text = state["doc_text"]
    data = {"source_length": len(text)}
    if state["category"] == "financial":
        data["doc_type"] = "invoice"
        data["has_amount"] = "$" in text or "amount" in text.lower()
    elif state["category"] == "legal":
        data["doc_type"] = "contract"
        data["has_dates"] = any(w.isdigit() for w in text.split())
    else:
        data["doc_type"] = "general"
    return {"extracted_data": data}

extract_builder = StateGraph(ExtractState)
extract_builder.add_node("extract", extract_fields)
extract_builder.add_edge(START, "extract")
extract_builder.add_edge("extract", END)
extract_graph = extract_builder.compile()
Note: **Production vs. tutorial code:** In a real pipeline, the extraction subgraph would likely use an LLM with a structured output schema or a document parsing library. The keyword-based approach here is simplified to focus on the subgraph composition pattern.

The quality check subgraph validates extraction results. It flags documents that didn’t extract enough data.

python
# Stage 3: Quality check subgraph
class QualityState(TypedDict):
    extracted_data: dict
    quality_score: float
    passed_qa: bool

def check_quality(state: QualityState) -> dict:
    data = state["extracted_data"]
    field_count = len(data)
    score = min(field_count / 3, 1.0)
    return {"quality_score": round(score, 2), "passed_qa": score >= 0.6}

qa_builder = StateGraph(QualityState)
qa_builder.add_node("check", check_quality)
qa_builder.add_edge(START, "check")
qa_builder.add_edge("check", END)
qa_graph = qa_builder.compile()

The parent graph ties all three stages together. Each wrapper function translates between the parent’s schema and each subgraph’s schema. Watch how data flows: document feeds into classification, the resulting doc_category feeds into extraction, and doc_data feeds into quality checking.

python
class DocPipelineState(TypedDict):
    document: str
    doc_category: str
    doc_confidence: float
    doc_data: dict
    qa_score: float
    qa_passed: bool

def run_classifier(state: DocPipelineState) -> dict:
    result = classify_graph.invoke({
        "doc_text": state["document"],
        "category": "",
        "confidence": 0.0
    })
    return {
        "doc_category": result["category"],
        "doc_confidence": result["confidence"]
    }

def run_extractor(state: DocPipelineState) -> dict:
    result = extract_graph.invoke({
        "doc_text": state["document"],
        "category": state["doc_category"],
        "extracted_data": {}
    })
    return {"doc_data": result["extracted_data"]}

def run_qa(state: DocPipelineState) -> dict:
    result = qa_graph.invoke({
        "extracted_data": state["doc_data"],
        "quality_score": 0.0,
        "passed_qa": False
    })
    return {"qa_score": result["quality_score"], "qa_passed": result["passed_qa"]}

Here’s where we wire the three wrappers into the parent graph. The pipeline runs classify -> extract -> qa_check sequentially.

python
pipeline = StateGraph(DocPipelineState)
pipeline.add_node("classify", run_classifier)
pipeline.add_node("extract", run_extractor)
pipeline.add_node("qa_check", run_qa)

pipeline.add_edge(START, "classify")
pipeline.add_edge("classify", "extract")
pipeline.add_edge("extract", "qa_check")
pipeline.add_edge("qa_check", END)

doc_app = pipeline.compile()

Let’s send a financial document through and inspect every field:

python
doc_result = doc_app.invoke({
    "document": "Invoice #1234: Payment of $5000 due by 2026-03-15",
    "doc_category": "",
    "doc_confidence": 0.0,
    "doc_data": {},
    "qa_score": 0.0,
    "qa_passed": False
})

print(f"Category: {doc_result['doc_category']}")
print(f"Confidence: {doc_result['doc_confidence']}")
print(f"Extracted: {doc_result['doc_data']}")
print(f"QA Score: {doc_result['qa_score']}")
print(f"QA Passed: {doc_result['qa_passed']}")
python
Category: financial
Confidence: 0.9
Extracted: {'source_length': 50, 'doc_type': 'invoice', 'has_amount': True}
QA Score: 1.0
QA Passed: True

Classification tagged it as “financial” with 0.9 confidence. Extraction pulled three fields (source_length, doc_type, has_amount). QA scored 1.0 because three fields exceed the threshold.

Each subgraph handles one concern. Want to swap classification with an LLM-powered version? The extraction and QA subgraphs stay untouched. Need a fourth stage for archiving? Add another subgraph node. The existing ones don’t change.

Exercise 2: Add a State-Mapping Wrapper

You already have scorer_graph from Exercise 1. Write a parent graph with a ReviewState schema (keys: document and review_score) that uses a wrapper function to invoke scorer_graph. The wrapper should map document to text and score back to review_score.

python
# Starter code — fill in the wrapper function
class ReviewState(TypedDict):
    document: str
    review_score: float

def run_scorer(state: ReviewState) -> dict:
    # TODO: build sub_input dict mapping document -> text
    # TODO: invoke scorer_graph
    # TODO: return dict mapping score -> review_score
    pass

# TODO: build parent graph, compile, and test
Hints

– **Hint 1:** The subgraph expects `{“text”: …, “score”: 0.0}`. Map `state[“document”]` to the `text` key.
– **Hint 2:** After invoking, grab `result[“score”]` and return it as `{“review_score”: result[“score”]}`.

Solution
python
class ReviewState(TypedDict):
    document: str
    review_score: float

def run_scorer(state: ReviewState) -> dict:
    result = scorer_graph.invoke({
        "text": state["document"],
        "score": 0.0
    })
    return {"review_score": result["score"]}

review_parent = StateGraph(ReviewState)
review_parent.add_node("scorer", run_scorer)
review_parent.add_edge(START, "scorer")
review_parent.add_edge("scorer", END)

review_app = review_parent.compile()
review_result = review_app.invoke({
    "document": "one two three four five six seven eight nine ten eleven",
    "review_score": 0.0
})
print(f"Review score: {review_result['review_score']}")
python
Review score: 1.0

The wrapper maps `document` to `text`, invokes the scorer, and maps `score` back to `review_score`. Eleven words divided by 10 is 1.1, capped at 1.0.

Checkpointing and Streaming with Subgraphs

Two things you’ll need in production: checkpointing and streaming. How they behave depends on how you mount the subgraph.

Checkpointing behavior differs between direct mounting and wrapper-based invocation. When you mount a compiled subgraph directly with add_node("name", compiled_subgraph), it shares the parent’s checkpointer automatically. LangGraph assigns a separate namespace so checkpoints don’t collide. When you invoke via a wrapper function, the subgraph runs independently — no shared checkpoints unless you pass a checkpointer explicitly.

python
from langgraph.checkpoint.memory import MemorySaver

# Direct mount: subgraph shares parent's checkpointer
checkpointer = MemorySaver()
parent_app = parent_builder.compile(checkpointer=checkpointer)

Streaming from subgraphs requires one extra parameter. By default, you only see events from the parent graph’s nodes. Pass subgraphs=True to see internal subgraph events too.

python
# Stream with subgraph visibility
for event in parent_app.stream(
    {"raw_text": "  test input  ", "cleaned_text": "", "is_valid": False},
    subgraphs=True
):
    print(event)

TIP: Use subgraphs=True during development and debugging. In production, you’ll usually want only the parent-level events to keep logs clean.

When to Use Subgraphs vs. a Single Graph

Not every workflow needs subgraphs. Here’s how I decide.

Use subgraphs when:

  • Your graph has 8+ nodes with distinct functional groups (classification, extraction, validation)
  • Multiple workflows reuse the same logic — the same summarizer in three pipelines
  • Different team members own different pipeline stages
  • You need to unit-test stages independently

Stick with a single graph when:

  • Your workflow has fewer than 6 nodes
  • Every node depends tightly on every other node’s state
  • There’s no reuse across projects
  • State mapping overhead outweighs the modularity benefit
Factor Single Graph Subgraphs
Node count Under 6 8+ with distinct groups
State overlap Everything shares state Some stages need private state
Reuse potential One-off workflow Shared across pipelines
Team structure Solo developer Multiple teams or owners
Testing Integration tests suffice Unit tests per stage needed
Key Insight: **Subgraphs solve the same problem as functions in regular code — they manage complexity through decomposition.** If you wouldn’t write a 200-line function, don’t build a 20-node flat graph.

Common Mistakes and How to Fix Them

Mistake 1: Passing a builder instead of a compiled graph

You must call .compile() before adding a subgraph to a parent. Passing the builder object directly raises an error.

Wrong approach:

python
sub_builder = StateGraph(SharedState)
sub_builder.add_node("step", some_func)
# ...
parent.add_node("sub", sub_builder)  # Error! Not compiled

Correct approach:

python
sub_graph = sub_builder.compile()  # compile first
parent.add_node("sub", sub_graph)  # pass the compiled graph

Mistake 2: Directly mounting a subgraph with mismatched keys

When you use add_node("name", compiled_subgraph) directly, parent and subgraph must share state keys. Different key names cause missing key errors or silent data loss.

Wrong approach:

python
class ParentS(TypedDict):
    user_input: str

class ChildS(TypedDict):
    query: str  # different key name!

child_graph = StateGraph(ChildS)  # ...compile()
parent.add_node("child", child_graph)  # keys don't match!

Correct approach — use a wrapper when keys differ:

python
def bridge(state: ParentS) -> dict:
    result = child_graph.invoke({"query": state["user_input"]})
    return {"user_input": result["query"]}

parent.add_node("child", bridge)

Mistake 3: Forgetting to initialize all subgraph state keys

When invoking a subgraph via a wrapper, you must provide every key it expects — even ones the subgraph will overwrite.

Wrong approach:

python
def wrapper(state):
    return sub_graph.invoke({"input_text": state["text"]})
    # Missing "summary" and "word_count" keys!

Correct approach:

python
def wrapper(state):
    return sub_graph.invoke({
        "input_text": state["text"],
        "summary": "",      # provide all keys
        "word_count": 0      # even with default values
    })
Warning: **TypedDict doesn’t enforce required keys at runtime.** Python won’t warn you if you forget a key. The error shows up later when the subgraph’s node function tries to access the missing key. Always initialize every key in your `invoke()` call.

Summary

Subgraphs turn large, tangled LangGraph workflows into manageable pieces. Here’s what you’ve learned:

  • A subgraph is a compiled StateGraph added as a node with add_node()
  • Shared state: when parent and child use the same schema, mounting is direct — one line
  • Isolated state: when schemas differ, a wrapper function translates between them
  • Nested subgraphs work but shouldn’t go deeper than 3 levels
  • Independent testing is the biggest practical win — each subgraph is standalone
  • Checkpointing works automatically with direct mounting; wrappers need explicit setup
  • Use subgraphs when your graph has 8+ nodes with distinct functional groups

Practice exercise: Build a two-stage pipeline where a “preprocessor” subgraph lowercases and strips whitespace from raw_text, and a “counter” subgraph counts words and characters. Use different state schemas for each and connect them through a parent graph with wrapper functions.

Solution sketch

Define `PreprocessState` with `raw_text` and `processed_text`. Define `CounterState` with `text`, `word_count`, and `char_count`. The parent `PipeState` has `input_text`, `clean_text`, `words`, and `chars`. Two wrapper functions translate between schemas. Wire them as `START -> preprocess -> count -> END`.

The next post covers human-in-the-loop patterns — how to pause a graph mid-execution, collect human input, and resume.

Complete Code

Click to expand the full script (copy-paste and run)
python
# Complete code from: Subgraphs in LangGraph
# Requires: pip install langgraph langchain-core
# Python 3.10+

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
import re

# ============================================================
# Section 1: Basic Subgraph with Shared State
# ============================================================

class SharedState(TypedDict):
    raw_text: str
    cleaned_text: str
    is_valid: bool

def validate(state: SharedState) -> dict:
    text = state["raw_text"].strip()
    return {"is_valid": len(text) > 0, "cleaned_text": text}

def format_text(state: SharedState) -> dict:
    return {"cleaned_text": state["cleaned_text"].upper()}

sub_builder = StateGraph(SharedState)
sub_builder.add_node("validate", validate)
sub_builder.add_node("format_text", format_text)
sub_builder.add_edge(START, "validate")
sub_builder.add_edge("validate", "format_text")
sub_builder.add_edge("format_text", END)
sub_graph = sub_builder.compile()

result = sub_graph.invoke({
    "raw_text": "  hello world  ",
    "cleaned_text": "",
    "is_valid": False
})
print("Subgraph result:", result)

# ============================================================
# Section 2: Parent Graph with Subgraph Node
# ============================================================

def intake(state: SharedState) -> dict:
    return {"raw_text": state["raw_text"]}

def store(state: SharedState) -> dict:
    print(f"Storing: {state['cleaned_text']}")
    return state

parent_builder = StateGraph(SharedState)
parent_builder.add_node("intake", intake)
parent_builder.add_node("processor", sub_graph)
parent_builder.add_node("store", store)

parent_builder.add_edge(START, "intake")
parent_builder.add_edge("intake", "processor")
parent_builder.add_edge("processor", "store")
parent_builder.add_edge("store", END)

parent_app = parent_builder.compile()
result = parent_app.invoke({
    "raw_text": "  data science rocks  ",
    "cleaned_text": "",
    "is_valid": False
})
print("Parent result:", result["cleaned_text"])

# ============================================================
# Section 3: Isolated State (Different Schemas)
# ============================================================

class SummaryState(TypedDict):
    input_text: str
    summary: str
    word_count: int

def extract_key_points(state: SummaryState) -> dict:
    words = state["input_text"].split()
    short = " ".join(words[:5]) + "..."
    return {"summary": short, "word_count": len(words)}

summary_builder = StateGraph(SummaryState)
summary_builder.add_node("extract", extract_key_points)
summary_builder.add_edge(START, "extract")
summary_builder.add_edge("extract", END)
summary_graph = summary_builder.compile()

class ParentState(TypedDict):
    raw_text: str
    cleaned_text: str
    final_summary: str

def run_summarizer(state: ParentState) -> dict:
    sub_input = {
        "input_text": state["cleaned_text"],
        "summary": "",
        "word_count": 0
    }
    sub_result = summary_graph.invoke(sub_input)
    return {"final_summary": sub_result["summary"]}

parent2 = StateGraph(ParentState)
parent2.add_node("summarizer", run_summarizer)
parent2.add_edge(START, "summarizer")
parent2.add_edge("summarizer", END)

app2 = parent2.compile()
result2 = app2.invoke({
    "raw_text": "the quick brown fox jumps over the lazy dog today",
    "cleaned_text": "the quick brown fox jumps over the lazy dog today",
    "final_summary": ""
})
print("Summary:", result2["final_summary"])

# ============================================================
# Section 4: Nested Subgraphs (3 levels)
# ============================================================

class TokenState(TypedDict):
    text: str
    tokens: list[str]

def tokenize(state: TokenState) -> dict:
    return {"tokens": state["text"].lower().split()}

grandchild = StateGraph(TokenState)
grandchild.add_node("tokenize", tokenize)
grandchild.add_edge(START, "tokenize")
grandchild.add_edge("tokenize", END)
grandchild_graph = grandchild.compile()

class CleanState(TypedDict):
    raw: str
    cleaned: str
    token_list: list[str]

def clean(state: CleanState) -> dict:
    cleaned = re.sub(r'[^a-zA-Z\s]', '', state["raw"])
    return {"cleaned": cleaned.strip()}

def run_tokenizer(state: CleanState) -> dict:
    sub_input = {"text": state["cleaned"], "tokens": []}
    sub_result = grandchild_graph.invoke(sub_input)
    return {"token_list": sub_result["tokens"]}

child = StateGraph(CleanState)
child.add_node("clean", clean)
child.add_node("tokenize", run_tokenizer)
child.add_edge(START, "clean")
child.add_edge("clean", "tokenize")
child.add_edge("tokenize", END)
child_graph = child.compile()

class PipelineState(TypedDict):
    user_input: str
    processed_tokens: list[str]

def run_child(state: PipelineState) -> dict:
    sub_input = {"raw": state["user_input"], "cleaned": "", "token_list": []}
    sub_result = child_graph.invoke(sub_input)
    return {"processed_tokens": sub_result["token_list"]}

parent3 = StateGraph(PipelineState)
parent3.add_node("process", run_child)
parent3.add_edge(START, "process")
parent3.add_edge("process", END)

app3 = parent3.compile()
result3 = app3.invoke({"user_input": "Hello, World! 123", "processed_tokens": []})
print("Nested result:", result3["processed_tokens"])

# ============================================================
# Section 5: Testing Subgraphs Independently
# ============================================================

assert grandchild_graph.invoke({"text": "A B C", "tokens": []})["tokens"] == ["a", "b", "c"]
assert child_graph.invoke({"raw": "!!!", "cleaned": "", "token_list": []})["token_list"] == []
print("All tests passed!")

# ============================================================
# Section 6: Multi-Stage Document Pipeline
# ============================================================

class ClassifyState(TypedDict):
    doc_text: str
    category: str
    confidence: float

def classify_doc(state: ClassifyState) -> dict:
    text = state["doc_text"].lower()
    if "invoice" in text or "payment" in text:
        return {"category": "financial", "confidence": 0.9}
    elif "contract" in text or "agreement" in text:
        return {"category": "legal", "confidence": 0.85}
    else:
        return {"category": "general", "confidence": 0.5}

classify_builder = StateGraph(ClassifyState)
classify_builder.add_node("classify", classify_doc)
classify_builder.add_edge(START, "classify")
classify_builder.add_edge("classify", END)
classify_graph = classify_builder.compile()

class ExtractState(TypedDict):
    doc_text: str
    category: str
    extracted_data: dict

def extract_fields(state: ExtractState) -> dict:
    text = state["doc_text"]
    data = {"source_length": len(text)}
    if state["category"] == "financial":
        data["doc_type"] = "invoice"
        data["has_amount"] = "$" in text or "amount" in text.lower()
    elif state["category"] == "legal":
        data["doc_type"] = "contract"
        data["has_dates"] = any(w.isdigit() for w in text.split())
    else:
        data["doc_type"] = "general"
    return {"extracted_data": data}

extract_builder = StateGraph(ExtractState)
extract_builder.add_node("extract", extract_fields)
extract_builder.add_edge(START, "extract")
extract_builder.add_edge("extract", END)
extract_graph = extract_builder.compile()

class QualityState(TypedDict):
    extracted_data: dict
    quality_score: float
    passed_qa: bool

def check_quality(state: QualityState) -> dict:
    data = state["extracted_data"]
    field_count = len(data)
    score = min(field_count / 3, 1.0)
    return {"quality_score": round(score, 2), "passed_qa": score >= 0.6}

qa_builder = StateGraph(QualityState)
qa_builder.add_node("check", check_quality)
qa_builder.add_edge(START, "check")
qa_builder.add_edge("check", END)
qa_graph = qa_builder.compile()

class DocPipelineState(TypedDict):
    document: str
    doc_category: str
    doc_confidence: float
    doc_data: dict
    qa_score: float
    qa_passed: bool

def run_classifier(state: DocPipelineState) -> dict:
    result = classify_graph.invoke({
        "doc_text": state["document"],
        "category": "",
        "confidence": 0.0
    })
    return {
        "doc_category": result["category"],
        "doc_confidence": result["confidence"]
    }

def run_extractor(state: DocPipelineState) -> dict:
    result = extract_graph.invoke({
        "doc_text": state["document"],
        "category": state["doc_category"],
        "extracted_data": {}
    })
    return {"doc_data": result["extracted_data"]}

def run_qa(state: DocPipelineState) -> dict:
    result = qa_graph.invoke({
        "extracted_data": state["doc_data"],
        "quality_score": 0.0,
        "passed_qa": False
    })
    return {"qa_score": result["quality_score"], "qa_passed": result["passed_qa"]}

pipeline = StateGraph(DocPipelineState)
pipeline.add_node("classify", run_classifier)
pipeline.add_node("extract", run_extractor)
pipeline.add_node("qa_check", run_qa)

pipeline.add_edge(START, "classify")
pipeline.add_edge("classify", "extract")
pipeline.add_edge("extract", "qa_check")
pipeline.add_edge("qa_check", END)

doc_app = pipeline.compile()

doc_result = doc_app.invoke({
    "document": "Invoice #1234: Payment of $5000 due by 2026-03-15",
    "doc_category": "",
    "doc_confidence": 0.0,
    "doc_data": {},
    "qa_score": 0.0,
    "qa_passed": False
})

print(f"\nCategory: {doc_result['doc_category']}")
print(f"Confidence: {doc_result['doc_confidence']}")
print(f"Extracted: {doc_result['doc_data']}")
print(f"QA Score: {doc_result['qa_score']}")
print(f"QA Passed: {doc_result['qa_passed']}")

print("\nScript completed successfully.")

Frequently Asked Questions

Can a subgraph have conditional edges?

Yes. A subgraph is a full StateGraph — it supports conditional edges, cycles, and every other LangGraph feature. The parent doesn’t see or control the internal routing.

python
# Inside a subgraph builder
sub_builder.add_conditional_edges(
    "classify",
    route_fn,
    {"urgent": "escalate", "normal": "auto_reply"}
)

Do subgraphs share checkpointing with the parent?

It depends on how you mount them. When you use add_node("name", compiled_subgraph) directly, the subgraph shares the parent’s checkpointer with its own namespace — no checkpoint collisions. When you invoke via a wrapper function, the subgraph runs independently and doesn’t share checkpoints unless you pass a checkpointer explicitly.

Can I reuse the same subgraph in multiple parent graphs?

Absolutely — that’s the whole point. Compile once, add to many parents.

python
parent_a.add_node("validator", validation_subgraph)
parent_b.add_node("validator", validation_subgraph)  # same object, different parent

Each parent invokes its own instance. There’s no shared state between parents.

What’s the performance overhead of subgraphs?

Negligible. The overhead is one function call per subgraph invocation — microseconds compared to the milliseconds your nodes spend on LLM calls or data processing. I’ve never seen subgraph overhead be the bottleneck in any pipeline.

How do I handle namespace isolation with multiple subgraphs?

When you have several stateful subgraphs, each needs its own storage space so checkpoints don’t collide. LangGraph handles this automatically for directly mounted subgraphs by assigning unique namespaces based on the node name you provide in add_node(). For wrapper-invoked subgraphs, you’d need to pass separate checkpointer instances or configure namespaces manually.

References

  1. LangGraph Documentation — Subgraphs. Link
  2. LangGraph Documentation — Graph API Overview. Link
  3. LangChain OpenTutorial — How to Transform Subgraph Input and Output. Link
  4. LangGraph GitHub Repository. Link
  5. LangGraph Documentation — Workflows and Agents. Link
  6. James Li — Building Complex AI Workflows with LangGraph: Subgraph Architecture. DEV Community. Link
  7. LangGraph Documentation — Subgraphs Overview. Link

Free Course
Master Core Python — Your First Step into AI/ML

Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.

Start Free Course
Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Get the full course,
completely free.
Join 57,000+ students learning Python, SQL & ML. One year of access, all resources included.
📚 10 Courses
🐍 Python & ML
🗄️ SQL
📦 Downloads
📅 1 Year Access
No thanks
🎓
Free AI/ML Starter Kit
Python · SQL · ML · 10 Courses · 57,000+ students
🎉   You're in! Check your inbox (or Promotions/Spam) for the access link.
⚡ Before you go

Python.
SQL. NumPy.
All free.

Get the exact 10-course programming foundation that Data Science professionals use.

🐍
Core Python — from first line to expert level
📈
NumPy & Pandas — the #1 libraries every DS job needs
🗃️
SQL Levels I–III — basics to Window Functions
📄
Real industry data — Jupyter notebooks included
R A M S K
57,000+ students
★★★★★ Rated 4.9/5
⚡ Before you go
Python. SQL.
All Free.
R A M S K
57,000+ students  ★★★★★ 4.9/5
Get Free Access Now
10 courses. Real projects. Zero cost. No credit card.
New learners enrolling right now
🔒 100% free ☕ No spam, ever ✓ Instant access
🚀
You're in!
Check your inbox for your access link.
(Check Promotions or Spam if you don't see it)
Or start your first course right now:
Start Free Course →
Scroll to Top
Scroll to Top
Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science