Menu

LangGraph Subgraphs — How to Build Reusable, Modular Workflows

Written by Selva Prabhakaran | 29 min read

Your LangGraph project started simple — three nodes, two edges, done. But six months on, it’s a 20-node beast. Tweaking the summary logic breaks the classifier path. Sound familiar? The answer isn’t “just be careful.” The answer is subgraphs — standalone graphs you plug into a parent graph, the same way you’d split a 500-line function into smaller helpers.

What Is a Subgraph in LangGraph?

A subgraph is a compiled StateGraph that you slot in as a single node inside another graph. The parent sees it as one node. Under the hood, the subgraph runs its own multi-step flow with its own nodes, edges, and state rules.

Before You Start

  • Python: 3.10+

  • Packages: langgraph (0.4+), langchain-core (0.3+)

  • Install: pip install langgraph langchain-core

  • Background: LangGraph basics — nodes, edges, state, and routing (Posts 5–9 in this series)

  • Time: 25–30 minutes

Think of it like calling a function from inside another function. The parent doesn’t care what goes on inside. It passes state in and gets back the updated result.

Here’s the simplest subgraph you can build — two nodes that check input and then format it. We write two node functions (validate and format_text), wire them into a StateGraph, and compile.

python
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class SharedState(TypedDict):
    raw_text: str
    cleaned_text: str
    is_valid: bool

# --- Build the subgraph ---
def validate(state: SharedState) -> dict:
    text = state["raw_text"].strip()
    return {"is_valid": len(text) > 0, "cleaned_text": text}

def format_text(state: SharedState) -> dict:
    return {"cleaned_text": state["cleaned_text"].upper()}

sub_builder = StateGraph(SharedState)
sub_builder.add_node("validate", validate)
sub_builder.add_node("format_text", format_text)
sub_builder.add_edge(START, "validate")
sub_builder.add_edge("validate", "format_text")
sub_builder.add_edge("format_text", END)

sub_graph = sub_builder.compile()

Nothing fancy so far. We created a StateGraph, dropped in two nodes, chained them, and compiled. What we get back — sub_graph — is a runnable object. Try calling it directly:

python
result = sub_graph.invoke({
    "raw_text": "  hello world  ",
    "cleaned_text": "",
    "is_valid": False
})
print(result)
python
{'raw_text': '  hello world  ', 'cleaned_text': 'HELLO WORLD', 'is_valid': True}

It checked the input, stripped extra spaces, and turned the text to uppercase. On its own it’s just a regular graph. The cool part is nesting it inside a bigger one.

How Do You Add a Subgraph as a Node?

Why embed it instead of calling it by hand? Because the parent graph takes care of run order, error recovery, and checkpointing for you. You get clean, modular code without giving up any of those perks.

Here’s the core trick. Call add_node() on the parent builder and pass the compiled subgraph straight in — no wrapper needed. The parent below has two of its own nodes (intake and store) with the subgraph dropped in between them.

python
def intake(state: SharedState) -> dict:
    return {"raw_text": state["raw_text"]}

def store(state: SharedState) -> dict:
    print(f"Storing: {state['cleaned_text']}")
    return state

parent_builder = StateGraph(SharedState)
parent_builder.add_node("intake", intake)
parent_builder.add_node("processor", sub_graph)  # compiled subgraph as a node
parent_builder.add_node("store", store)

parent_builder.add_edge(START, "intake")
parent_builder.add_edge("intake", "processor")
parent_builder.add_edge("processor", "store")
parent_builder.add_edge("store", END)

parent_app = parent_builder.compile()
result = parent_app.invoke({
    "raw_text": "  data science rocks  ",
    "cleaned_text": "",
    "is_valid": False
})
print(result["cleaned_text"])
python
Storing: DATA SCIENCE ROCKS
DATA SCIENCE ROCKS

One line — add_node("processor", sub_graph) — slots the whole subgraph into the parent. The parent handles it like any other node. Data flows from intake through the subgraph’s inner nodes, then out to store.

Key Insight: Once compiled, a subgraph looks like any other node to the parent. Attach it with add_node(), link it with add_edge(), and the parent has no idea there are multiple steps running inside.

How Does Shared State Work Between Parent and Child?

The demo above works so smoothly because parent and child both use SharedState. Every key — raw_text, cleaned_text, is_valid — is the same on both sides.

When schemas line up like this, LangGraph passes the full state into the subgraph for you. Any changes the subgraph makes flow right back to the parent. No manual mapping at all.

python
# Both graphs use SharedState — keys flow freely
print(f"raw_text: {result['raw_text']}")
print(f"cleaned_text: {result['cleaned_text']}")
print(f"is_valid: {result['is_valid']}")

This prints:

python
raw_text:   data science rocks
cleaned_text: DATA SCIENCE ROCKS
is_valid: True

Keys the subgraph wrote to (cleaned_text, is_valid) appear in the parent’s output. Keys it never touched (raw_text) stay as they were.

Tip: Default to shared state. Even when your subgraph only reads a few of the parent’s keys, sharing works fine — extra keys are simply ignored. Switch to isolated state only when you have fields the parent should never see.

In practice, I start every project with shared schemas and introduce isolation only when a concrete need comes up.

How Do You Handle Different Schemas?

Sometimes your subgraph needs private fields — maybe an internal word_count that the parent has no use for. You can’t plug that subgraph in directly because the schemas clash. The fix: wrap the subgraph call in a plain node function that translates fields back and forth.

Below is a subgraph with its own SummaryState. Notice the keys — input_text, summary, word_count — none of them exist in the parent schema.

python
# Subgraph with a DIFFERENT state schema
class SummaryState(TypedDict):
    input_text: str
    summary: str
    word_count: int

def extract_key_points(state: SummaryState) -> dict:
    words = state["input_text"].split()
    short = " ".join(words[:5]) + "..."
    return {"summary": short, "word_count": len(words)}

summary_builder = StateGraph(SummaryState)
summary_builder.add_node("extract", extract_key_points)
summary_builder.add_edge(START, "extract")
summary_builder.add_edge("extract", END)

summary_graph = summary_builder.compile()

The parent schema has raw_text, cleaned_text, and final_summary — no shared keys at all. So we create a bridge: a wrapper called run_summarizer that reshapes the parent state, calls the subgraph, and reshapes the output.

python
class ParentState(TypedDict):
    raw_text: str
    cleaned_text: str
    final_summary: str

def run_summarizer(state: ParentState) -> dict:
    # Map parent state -> subgraph state
    sub_input = {
        "input_text": state["cleaned_text"],
        "summary": "",
        "word_count": 0
    }
    # Invoke subgraph
    sub_result = summary_graph.invoke(sub_input)
    # Map subgraph result -> parent state
    return {"final_summary": sub_result["summary"]}

parent2 = StateGraph(ParentState)
parent2.add_node("summarizer", run_summarizer)
parent2.add_edge(START, "summarizer")
parent2.add_edge("summarizer", END)

app2 = parent2.compile()
result2 = app2.invoke({
    "raw_text": "the quick brown fox jumps over the lazy dog today",
    "cleaned_text": "the quick brown fox jumps over the lazy dog today",
    "final_summary": ""
})
print(result2["final_summary"])
python
the quick brown fox jumps...

Three steps happen inside the wrapper: it maps cleaned_textinput_text, fires the subgraph, then maps summaryfinal_summary. The parent never touches word_count — that stays hidden in the subgraph.

Key Insight: Mismatched schemas need a translator, and the wrapper function is it. Going in, it reshapes parent state to fit the subgraph. Coming out, it reshapes the result to fit the parent. Private keys stay locked inside.

Can You Nest Subgraphs Inside Other Subgraphs?

Yes — and real projects hit this pattern fast. Picture a parent that calls a child, and that child has its own grandchild buried inside. Every layer compiles on its own, so you can still test each piece in isolation.

Below we build all three layers. The deepest graph (grandchild) handles tokenizing. The middle graph (child) scrubs out junk characters and then asks the grandchild to tokenize. The top graph (parent) kicks off the whole chain.

python
# Level 3: Grandchild — tokenizes text
class TokenState(TypedDict):
    text: str
    tokens: list[str]

def tokenize(state: TokenState) -> dict:
    return {"tokens": state["text"].lower().split()}

grandchild = StateGraph(TokenState)
grandchild.add_node("tokenize", tokenize)
grandchild.add_edge(START, "tokenize")
grandchild.add_edge("tokenize", END)
grandchild_graph = grandchild.compile()

Next up, the child. It runs re.sub() to scrub away anything that isn’t a letter or space, then feeds the clean result into the grandchild. The two graphs have different schemas (CleanState vs TokenState), so a small wrapper bridges the gap.

python
# Level 2: Child — cleans text, then delegates to grandchild
import re

class CleanState(TypedDict):
    raw: str
    cleaned: str
    token_list: list[str]

def clean(state: CleanState) -> dict:
    cleaned = re.sub(r'[^a-zA-Z\s]', '', state["raw"])
    return {"cleaned": cleaned.strip()}

def run_tokenizer(state: CleanState) -> dict:
    sub_input = {"text": state["cleaned"], "tokens": []}
    sub_result = grandchild_graph.invoke(sub_input)
    return {"token_list": sub_result["tokens"]}

child = StateGraph(CleanState)
child.add_node("clean", clean)
child.add_node("tokenize", run_tokenizer)
child.add_edge(START, "clean")
child.add_edge("clean", "tokenize")
child.add_edge("tokenize", END)
child_graph = child.compile()

Finally, the parent sits at the top. It only knows about the child — it has no idea a grandchild even exists. Once again, the wrapper reshapes state on the way in and on the way out.

python
# Level 1: Parent — sends raw input to child pipeline
class PipelineState(TypedDict):
    user_input: str
    processed_tokens: list[str]

def run_child(state: PipelineState) -> dict:
    sub_input = {"raw": state["user_input"], "cleaned": "", "token_list": []}
    sub_result = child_graph.invoke(sub_input)
    return {"processed_tokens": sub_result["token_list"]}

parent3 = StateGraph(PipelineState)
parent3.add_node("process", run_child)
parent3.add_edge(START, "process")
parent3.add_edge("process", END)

app3 = parent3.compile()
result3 = app3.invoke({"user_input": "Hello, World! 123", "processed_tokens": []})
print(result3["processed_tokens"])
python
['hello', 'world']

The input "Hello, World! 123" went through all three layers. The child scrubbed punctuation and digits, the grandchild split the clean string into lowercase tokens, and the parent collected the final list.

Warning: Stop at 3 layers. Every extra level makes bugs harder to trace. If you catch yourself adding a fourth, merge two neighboring graphs into one. Excessive nesting signals a design problem, not good architecture.

How Do You Test Subgraphs on Their Own?

If subgraphs had a marketing tagline, this would be it. Because every subgraph compiles on its own, you can feed it test data and inspect the output without spinning up the parent at all. Zero setup overhead.

python
# Test grandchild in isolation
test_result = grandchild_graph.invoke({
    "text": "LangGraph makes AI workflows modular",
    "tokens": []
})
print(test_result["tokens"])

# Test child in isolation
test_child = child_graph.invoke({
    "raw": "Hello!! World??",
    "cleaned": "",
    "token_list": []
})
print(test_child["token_list"])

Run these and you’ll see the grandchild lowercases everything, while the child scrubs away punctuation before handing off:

python
['langgraph', 'makes', 'ai', 'workflows', 'modular']
['hello', 'world']

For quick smoke tests, plain assert statements work great. For a real project, plug them into pytest. Either way, the interface is just dict-in, dict-out.

python
# Simple assertion-based tests
assert grandchild_graph.invoke({"text": "A B C", "tokens": []})["tokens"] == ["a", "b", "c"]
assert child_graph.invoke({"raw": "!!!", "cleaned": "", "token_list": []})["token_list"] == []
print("All tests passed!")
python
All tests passed!

Tip: Test each subgraph before you plug it into the parent. If the parent later gives bad output, every subgraph with green tests is off the suspect list. That leaves only the wrapper functions — the glue between stages.

Exercise 1: Build and Test a Scoring Subgraph

Build a subgraph that takes a text field and returns a score — the word count divided by 10, capped at 1.0. Test it with at least two inputs.

python
# Starter code — fill in the TODOs
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class ScorerState(TypedDict):
    text: str
    score: float

def score_text(state: ScorerState) -> dict:
    # TODO: count words in state["text"]
    # TODO: return score = min(word_count / 10, 1.0)
    pass

scorer_builder = StateGraph(ScorerState)
# TODO: add node, edges, compile
# scorer_graph = ...

# Test 1: 5-word input should score 0.5
# Test 2: 15-word input should score 1.0 (capped)

Hints

Hint 1: Use len(state["text"].split()) to count words.
Hint 2: The full score function is return {"score": min(len(state["text"].split()) / 10, 1.0)}. The graph needs add_node("score", score_text), edges from START to “score” and “score” to END, then .compile().

Solution

python
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

class ScorerState(TypedDict):
    text: str
    score: float

def score_text(state: ScorerState) -> dict:
    word_count = len(state["text"].split())
    return {"score": min(word_count / 10, 1.0)}

scorer_builder = StateGraph(ScorerState)
scorer_builder.add_node("score", score_text)
scorer_builder.add_edge(START, "score")
scorer_builder.add_edge("score", END)
scorer_graph = scorer_builder.compile()

# Test 1
r1 = scorer_graph.invoke({"text": "one two three four five", "score": 0.0})
assert r1["score"] == 0.5, f"Expected 0.5, got {r1['score']}"

# Test 2
r2 = scorer_graph.invoke({"text": "a b c d e f g h i j k l m n o", "score": 0.0})
assert r2["score"] == 1.0, f"Expected 1.0, got {r2['score']}"

print(f"Test 1 score: {r1['score']}")
print(f"Test 2 score: {r2['score']}")
print("All tests passed!")
python
Test 1 score: 0.5
Test 2 score: 1.0
All tests passed!

The function counts words with split(), divides by 10, and caps at 1.0 via min(). Five words give 0.5; fifteen words hit the cap.

Real-World Example — How Do You Build a Multi-Stage Document Pipeline?

Time for something closer to a real job. We’ll wire up a three-stage doc processor: one subgraph classifies, one extracts fields, and one runs a quality gate. Each stage lives in its own graph.

Stage one — the classifier — scans the doc for keywords and picks a label. Right now it’s basic string matching. Later you could drop in an LLM call, and the rest of the pipeline stays untouched. That swap-ability is why subgraphs exist.

python
# Stage 1: Classification subgraph
class ClassifyState(TypedDict):
    doc_text: str
    category: str
    confidence: float

def classify_doc(state: ClassifyState) -> dict:
    text = state["doc_text"].lower()
    if "invoice" in text or "payment" in text:
        return {"category": "financial", "confidence": 0.9}
    elif "contract" in text or "agreement" in text:
        return {"category": "legal", "confidence": 0.85}
    else:
        return {"category": "general", "confidence": 0.5}

classify_builder = StateGraph(ClassifyState)
classify_builder.add_node("classify", classify_doc)
classify_builder.add_edge(START, "classify")
classify_builder.add_edge("classify", END)
classify_graph = classify_builder.compile()

Stage two — the extractor — grabs useful fields from the document. It uses the category that the classifier just set, which the parent feeds through as part of the shared state.

python
# Stage 2: Extraction subgraph
class ExtractState(TypedDict):
    doc_text: str
    category: str
    extracted_data: dict

def extract_fields(state: ExtractState) -> dict:
    text = state["doc_text"]
    data = {"source_length": len(text)}
    if state["category"] == "financial":
        data["doc_type"] = "invoice"
        data["has_amount"] = "$" in text or "amount" in text.lower()
    elif state["category"] == "legal":
        data["doc_type"] = "contract"
        data["has_dates"] = any(w.isdigit() for w in text.split())
    else:
        data["doc_type"] = "general"
    return {"extracted_data": data}

extract_builder = StateGraph(ExtractState)
extract_builder.add_node("extract", extract_fields)
extract_builder.add_edge(START, "extract")
extract_builder.add_edge("extract", END)
extract_graph = extract_builder.compile()

Note: In a real pipeline, the extract subgraph would likely call an LLM with a structured output schema or a document parser. The keyword approach here keeps the focus on the composition pattern.

Stage three — the quality gate — counts how many fields the extractor found and decides if the document passes or fails.

python
# Stage 3: Quality check subgraph
class QualityState(TypedDict):
    extracted_data: dict
    quality_score: float
    passed_qa: bool

def check_quality(state: QualityState) -> dict:
    data = state["extracted_data"]
    field_count = len(data)
    score = min(field_count / 3, 1.0)
    return {"quality_score": round(score, 2), "passed_qa": score >= 0.6}

qa_builder = StateGraph(QualityState)
qa_builder.add_node("check", check_quality)
qa_builder.add_edge(START, "check")
qa_builder.add_edge("check", END)
qa_graph = qa_builder.compile()

Now the parent wires all three stages into one pipeline. Each wrapper reshapes data for its stage and funnels the result back. Follow the flow: document goes to the classifier, doc_category goes to the extractor, doc_data goes to the quality gate.

python
class DocPipelineState(TypedDict):
    document: str
    doc_category: str
    doc_confidence: float
    doc_data: dict
    qa_score: float
    qa_passed: bool

def run_classifier(state: DocPipelineState) -> dict:
    result = classify_graph.invoke({
        "doc_text": state["document"],
        "category": "",
        "confidence": 0.0
    })
    return {
        "doc_category": result["category"],
        "doc_confidence": result["confidence"]
    }

def run_extractor(state: DocPipelineState) -> dict:
    result = extract_graph.invoke({
        "doc_text": state["document"],
        "category": state["doc_category"],
        "extracted_data": {}
    })
    return {"doc_data": result["extracted_data"]}

def run_qa(state: DocPipelineState) -> dict:
    result = qa_graph.invoke({
        "extracted_data": state["doc_data"],
        "quality_score": 0.0,
        "passed_qa": False
    })
    return {"qa_score": result["quality_score"], "qa_passed": result["passed_qa"]}

Hook the wrappers into the parent graph and chain them: classify first, then extract, then qa_check.

python
pipeline = StateGraph(DocPipelineState)
pipeline.add_node("classify", run_classifier)
pipeline.add_node("extract", run_extractor)
pipeline.add_node("qa_check", run_qa)

pipeline.add_edge(START, "classify")
pipeline.add_edge("classify", "extract")
pipeline.add_edge("extract", "qa_check")
pipeline.add_edge("qa_check", END)

doc_app = pipeline.compile()

Feed an invoice string into the pipeline and check what comes out:

python
doc_result = doc_app.invoke({
    "document": "Invoice #1234: Payment of $5000 due by 2026-03-15",
    "doc_category": "",
    "doc_confidence": 0.0,
    "doc_data": {},
    "qa_score": 0.0,
    "qa_passed": False
})

print(f"Category: {doc_result['doc_category']}")
print(f"Confidence: {doc_result['doc_confidence']}")
print(f"Extracted: {doc_result['doc_data']}")
print(f"QA Score: {doc_result['qa_score']}")
print(f"QA Passed: {doc_result['qa_passed']}")
python
Category: financial
Confidence: 0.9
Extracted: {'source_length': 50, 'doc_type': 'invoice', 'has_amount': True}
QA Score: 1.0
QA Passed: True

The classifier spotted “invoice” and “payment,” so it returned “financial” at 0.9 confidence. The extractor pulled three fields from the text. Three fields meet the quality bar, so QA gave it a perfect 1.0.

Here’s the payoff: each stage is its own box. Swap the classifier for a GPT-powered one? The extractor and QA code never change. Add an archiving step? Drop in one more subgraph node and leave the rest alone.

Exercise 2: Add a State-Mapping Wrapper

You already have scorer_graph from Exercise 1. Write a parent graph with a ReviewState schema (keys: document and review_score) that uses a wrapper to call scorer_graph. The wrapper should map document to text and score back to review_score.

python
# Starter code — fill in the wrapper function
class ReviewState(TypedDict):
    document: str
    review_score: float

def run_scorer(state: ReviewState) -> dict:
    # TODO: build sub_input dict mapping document -> text
    # TODO: invoke scorer_graph
    # TODO: return dict mapping score -> review_score
    pass

# TODO: build parent graph, compile, and test

Hints

Hint 1: The subgraph expects {"text": …, "score": 0.0}. Map state["document"] to the text key.
Hint 2: After the call, grab result["score"] and return it as {"review_score": result["score"]}.

Solution

python
class ReviewState(TypedDict):
    document: str
    review_score: float

def run_scorer(state: ReviewState) -> dict:
    result = scorer_graph.invoke({
        "text": state["document"],
        "score": 0.0
    })
    return {"review_score": result["score"]}

review_parent = StateGraph(ReviewState)
review_parent.add_node("scorer", run_scorer)
review_parent.add_edge(START, "scorer")
review_parent.add_edge("scorer", END)

review_app = review_parent.compile()
review_result = review_app.invoke({
    "document": "one two three four five six seven eight nine ten eleven",
    "review_score": 0.0
})
print(f"Review score: {review_result['review_score']}")
python
Review score: 1.0

The wrapper maps document to text, calls the scorer, and maps score back to review_score. Eleven words divided by 10 gives 1.1, capped at 1.0.

How Do Checkpointing and Streaming Work with Subgraphs?

Once you move to production, you’ll care about two things: saving state between runs (checkpointing) and watching progress in real time (streaming). Both behave differently based on how you attach the subgraph.

Checkpointing comes free with direct mounts. If you call add_node("name", compiled_subgraph), the subgraph picks up the parent’s checkpointer and gets its own namespace — no collisions. But if you call the subgraph inside a wrapper function, it’s on its own. You’d need to pass a checkpointer in yourself.

python
from langgraph.checkpoint.memory import MemorySaver

# Direct mount: subgraph shares parent's checkpointer
checkpointer = MemorySaver()
parent_app = parent_builder.compile(checkpointer=checkpointer)

Streaming hides subgraph details by default — you only see parent-level events. To peek inside, flip the subgraphs=True flag on the .stream() call.

python
# Stream with subgraph visibility
for event in parent_app.stream(
    {"raw_text": "  test input  ", "cleaned_text": "", "is_valid": False},
    subgraphs=True
):
    print(event)

TIP: Keep subgraphs=True on while you build and debug. Once you ship, turn it off so your logs stay lean.

When Should You Use Subgraphs vs. a Flat Graph?

Subgraphs aren’t always the right call. Use this quick checklist.

Go modular when:

  • The graph has grown past 8 nodes and you can spot clear groups (classify, extract, validate)

  • You reuse the same logic across pipelines — say, one summarizer in three different apps

  • Multiple people work on different stages and need to own their piece

  • You need fast, focused unit tests for each stage

Keep it flat when:

  • The whole graph fits in 5–6 nodes

  • Every node reads and writes the same keys — splitting would just add overhead

  • The workflow is a one-off with no reuse

  • State mapping would cost more effort than it saves

Factor Flat Graph Subgraphs
Node count Under 6 8+ with clear groups
State overlap Everything shared Some stages need private state
Reuse One-off workflow Shared across pipelines
Team structure Solo dev Many teams or owners
Testing Integration tests are enough Unit tests per stage needed

Key Insight: Subgraphs are to graphs what functions are to scripts — they break a scary monolith into bite-sized pieces. If a 200-line function makes you cringe, a 20-node flat graph should too.

What Are the Most Common Subgraph Mistakes?

Mistake 1: Passing a builder instead of a compiled graph

Always call .compile() first. If you hand the raw builder to a parent, LangGraph throws an error.

Wrong:

python
sub_builder = StateGraph(SharedState)
sub_builder.add_node("step", some_func)
# ...
parent.add_node("sub", sub_builder)  # Error! Not compiled

Right:

python
sub_graph = sub_builder.compile()  # compile first
parent.add_node("sub", sub_graph)  # pass the compiled graph

Mistake 2: Direct-mounting a subgraph when the keys don’t match

Direct mounting with add_node() only works when parent and child share the same key names. Mismatched keys lead to silent data loss or missing-key crashes.

Wrong:

python
class ParentS(TypedDict):
    user_input: str

class ChildS(TypedDict):
    query: str  # different key name!

child_graph = StateGraph(ChildS)  # ...compile()
parent.add_node("child", child_graph)  # keys don't match!

Right — use a wrapper when keys differ:

python
def bridge(state: ParentS) -> dict:
    result = child_graph.invoke({"query": state["user_input"]})
    return {"user_input": result["query"]}

parent.add_node("child", bridge)

Mistake 3: Not providing all keys when calling a subgraph

Wrapper calls need the full dict — every key the subgraph schema defines, even the ones it plans to overwrite.

Wrong:

python
def wrapper(state):
    return sub_graph.invoke({"input_text": state["text"]})
    # Missing "summary" and "word_count" keys!

Right:

python
def wrapper(state):
    return sub_graph.invoke({
        "input_text": state["text"],
        "summary": "",      # provide all keys
        "word_count": 0      # even with default values
    })

Warning: Python’s TypedDict is a type hint, not a runtime guard. Skip a key and Python won’t complain — but the subgraph’s node will crash the moment it reads the missing field. Fill in every key in your invoke() dict, even with dummy values.

Summary

Subgraphs let you slice a sprawling LangGraph app into focused, self-contained pieces. Quick recap:

  • Compile a StateGraph and pass it to add_node() — the parent treats it like any other node

  • Same schema? Mount the subgraph straight in. One line of code, zero mapping.

  • Different schema? Write a thin wrapper that reshapes data on the way in and out

  • Nesting is fine up to 3 layers — beyond that, flatten

  • Testing in isolation is the killer feature — feed a subgraph test data without touching the parent

  • Checkpoints ride along for free with direct mounts; wrapper calls need a manual checkpointer

  • Rule of thumb: once your graph hits 8+ nodes with clear clusters, it’s time to split

Practice exercise: Build a two-stage pipeline: a “preprocessor” subgraph that lowercases and strips whitespace from raw_text, and a “counter” subgraph that counts words and characters. Use different state schemas for each and link them through a parent graph with wrappers.

Solution sketch

Define PreprocessState with raw_text and processed_text. Define CounterState with text, word_count, and char_count. The parent PipeState has input_text, clean_text, words, and chars. Two wrappers translate between schemas. Wire as START -> preprocess -> count -> END.

The next post covers human-in-the-loop patterns — how to pause a graph mid-run, gather human input, and resume.

Complete Code

Click to expand the full script (copy-paste and run)

python
# Complete code from: Subgraphs in LangGraph
# Requires: pip install langgraph langchain-core
# Python 3.10+

from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
import re

# ============================================================
# Section 1: Basic Subgraph with Shared State
# ============================================================

class SharedState(TypedDict):
    raw_text: str
    cleaned_text: str
    is_valid: bool

def validate(state: SharedState) -> dict:
    text = state["raw_text"].strip()
    return {"is_valid": len(text) > 0, "cleaned_text": text}

def format_text(state: SharedState) -> dict:
    return {"cleaned_text": state["cleaned_text"].upper()}

sub_builder = StateGraph(SharedState)
sub_builder.add_node("validate", validate)
sub_builder.add_node("format_text", format_text)
sub_builder.add_edge(START, "validate")
sub_builder.add_edge("validate", "format_text")
sub_builder.add_edge("format_text", END)
sub_graph = sub_builder.compile()

result = sub_graph.invoke({
    "raw_text": "  hello world  ",
    "cleaned_text": "",
    "is_valid": False
})
print("Subgraph result:", result)

# ============================================================
# Section 2: Parent Graph with Subgraph Node
# ============================================================

def intake(state: SharedState) -> dict:
    return {"raw_text": state["raw_text"]}

def store(state: SharedState) -> dict:
    print(f"Storing: {state['cleaned_text']}")
    return state

parent_builder = StateGraph(SharedState)
parent_builder.add_node("intake", intake)
parent_builder.add_node("processor", sub_graph)
parent_builder.add_node("store", store)

parent_builder.add_edge(START, "intake")
parent_builder.add_edge("intake", "processor")
parent_builder.add_edge("processor", "store")
parent_builder.add_edge("store", END)

parent_app = parent_builder.compile()
result = parent_app.invoke({
    "raw_text": "  data science rocks  ",
    "cleaned_text": "",
    "is_valid": False
})
print("Parent result:", result["cleaned_text"])

# ============================================================
# Section 3: Isolated State (Different Schemas)
# ============================================================

class SummaryState(TypedDict):
    input_text: str
    summary: str
    word_count: int

def extract_key_points(state: SummaryState) -> dict:
    words = state["input_text"].split()
    short = " ".join(words[:5]) + "..."
    return {"summary": short, "word_count": len(words)}

summary_builder = StateGraph(SummaryState)
summary_builder.add_node("extract", extract_key_points)
summary_builder.add_edge(START, "extract")
summary_builder.add_edge("extract", END)
summary_graph = summary_builder.compile()

class ParentState(TypedDict):
    raw_text: str
    cleaned_text: str
    final_summary: str

def run_summarizer(state: ParentState) -> dict:
    sub_input = {
        "input_text": state["cleaned_text"],
        "summary": "",
        "word_count": 0
    }
    sub_result = summary_graph.invoke(sub_input)
    return {"final_summary": sub_result["summary"]}

parent2 = StateGraph(ParentState)
parent2.add_node("summarizer", run_summarizer)
parent2.add_edge(START, "summarizer")
parent2.add_edge("summarizer", END)

app2 = parent2.compile()
result2 = app2.invoke({
    "raw_text": "the quick brown fox jumps over the lazy dog today",
    "cleaned_text": "the quick brown fox jumps over the lazy dog today",
    "final_summary": ""
})
print("Summary:", result2["final_summary"])

# ============================================================
# Section 4: Nested Subgraphs (3 levels)
# ============================================================

class TokenState(TypedDict):
    text: str
    tokens: list[str]

def tokenize(state: TokenState) -> dict:
    return {"tokens": state["text"].lower().split()}

grandchild = StateGraph(TokenState)
grandchild.add_node("tokenize", tokenize)
grandchild.add_edge(START, "tokenize")
grandchild.add_edge("tokenize", END)
grandchild_graph = grandchild.compile()

class CleanState(TypedDict):
    raw: str
    cleaned: str
    token_list: list[str]

def clean(state: CleanState) -> dict:
    cleaned = re.sub(r'[^a-zA-Z\s]', '', state["raw"])
    return {"cleaned": cleaned.strip()}

def run_tokenizer(state: CleanState) -> dict:
    sub_input = {"text": state["cleaned"], "tokens": []}
    sub_result = grandchild_graph.invoke(sub_input)
    return {"token_list": sub_result["tokens"]}

child = StateGraph(CleanState)
child.add_node("clean", clean)
child.add_node("tokenize", run_tokenizer)
child.add_edge(START, "clean")
child.add_edge("clean", "tokenize")
child.add_edge("tokenize", END)
child_graph = child.compile()

class PipelineState(TypedDict):
    user_input: str
    processed_tokens: list[str]

def run_child(state: PipelineState) -> dict:
    sub_input = {"raw": state["user_input"], "cleaned": "", "token_list": []}
    sub_result = child_graph.invoke(sub_input)
    return {"processed_tokens": sub_result["token_list"]}

parent3 = StateGraph(PipelineState)
parent3.add_node("process", run_child)
parent3.add_edge(START, "process")
parent3.add_edge("process", END)

app3 = parent3.compile()
result3 = app3.invoke({"user_input": "Hello, World! 123", "processed_tokens": []})
print("Nested result:", result3["processed_tokens"])

# ============================================================
# Section 5: Testing Subgraphs Independently
# ============================================================

assert grandchild_graph.invoke({"text": "A B C", "tokens": []})["tokens"] == ["a", "b", "c"]
assert child_graph.invoke({"raw": "!!!", "cleaned": "", "token_list": []})["token_list"] == []
print("All tests passed!")

# ============================================================
# Section 6: Multi-Stage Document Pipeline
# ============================================================

class ClassifyState(TypedDict):
    doc_text: str
    category: str
    confidence: float

def classify_doc(state: ClassifyState) -> dict:
    text = state["doc_text"].lower()
    if "invoice" in text or "payment" in text:
        return {"category": "financial", "confidence": 0.9}
    elif "contract" in text or "agreement" in text:
        return {"category": "legal", "confidence": 0.85}
    else:
        return {"category": "general", "confidence": 0.5}

classify_builder = StateGraph(ClassifyState)
classify_builder.add_node("classify", classify_doc)
classify_builder.add_edge(START, "classify")
classify_builder.add_edge("classify", END)
classify_graph = classify_builder.compile()

class ExtractState(TypedDict):
    doc_text: str
    category: str
    extracted_data: dict

def extract_fields(state: ExtractState) -> dict:
    text = state["doc_text"]
    data = {"source_length": len(text)}
    if state["category"] == "financial":
        data["doc_type"] = "invoice"
        data["has_amount"] = "$" in text or "amount" in text.lower()
    elif state["category"] == "legal":
        data["doc_type"] = "contract"
        data["has_dates"] = any(w.isdigit() for w in text.split())
    else:
        data["doc_type"] = "general"
    return {"extracted_data": data}

extract_builder = StateGraph(ExtractState)
extract_builder.add_node("extract", extract_fields)
extract_builder.add_edge(START, "extract")
extract_builder.add_edge("extract", END)
extract_graph = extract_builder.compile()

class QualityState(TypedDict):
    extracted_data: dict
    quality_score: float
    passed_qa: bool

def check_quality(state: QualityState) -> dict:
    data = state["extracted_data"]
    field_count = len(data)
    score = min(field_count / 3, 1.0)
    return {"quality_score": round(score, 2), "passed_qa": score >= 0.6}

qa_builder = StateGraph(QualityState)
qa_builder.add_node("check", check_quality)
qa_builder.add_edge(START, "check")
qa_builder.add_edge("check", END)
qa_graph = qa_builder.compile()

class DocPipelineState(TypedDict):
    document: str
    doc_category: str
    doc_confidence: float
    doc_data: dict
    qa_score: float
    qa_passed: bool

def run_classifier(state: DocPipelineState) -> dict:
    result = classify_graph.invoke({
        "doc_text": state["document"],
        "category": "",
        "confidence": 0.0
    })
    return {
        "doc_category": result["category"],
        "doc_confidence": result["confidence"]
    }

def run_extractor(state: DocPipelineState) -> dict:
    result = extract_graph.invoke({
        "doc_text": state["document"],
        "category": state["doc_category"],
        "extracted_data": {}
    })
    return {"doc_data": result["extracted_data"]}

def run_qa(state: DocPipelineState) -> dict:
    result = qa_graph.invoke({
        "extracted_data": state["doc_data"],
        "quality_score": 0.0,
        "passed_qa": False
    })
    return {"qa_score": result["quality_score"], "qa_passed": result["passed_qa"]}

pipeline = StateGraph(DocPipelineState)
pipeline.add_node("classify", run_classifier)
pipeline.add_node("extract", run_extractor)
pipeline.add_node("qa_check", run_qa)

pipeline.add_edge(START, "classify")
pipeline.add_edge("classify", "extract")
pipeline.add_edge("extract", "qa_check")
pipeline.add_edge("qa_check", END)

doc_app = pipeline.compile()

doc_result = doc_app.invoke({
    "document": "Invoice #1234: Payment of $5000 due by 2026-03-15",
    "doc_category": "",
    "doc_confidence": 0.0,
    "doc_data": {},
    "qa_score": 0.0,
    "qa_passed": False
})

print(f"\nCategory: {doc_result['doc_category']}")
print(f"Confidence: {doc_result['doc_confidence']}")
print(f"Extracted: {doc_result['doc_data']}")
print(f"QA Score: {doc_result['qa_score']}")
print(f"QA Passed: {doc_result['qa_passed']}")

print("\nScript completed successfully.")

FAQ

Can a subgraph use conditional edges?

Yes. Inside, a subgraph is a full StateGraph. It can branch, loop, or do anything a top-level graph can. The parent has no view into the internal routing — it just sees one node.

python
# Inside a subgraph builder
sub_builder.add_conditional_edges(
    "classify",
    route_fn,
    {"urgent": "escalate", "normal": "auto_reply"}
)

Do subgraphs share checkpoints with the parent?

Only with direct mounts. When you pass a compiled subgraph to add_node(), it piggybacks on the parent’s checkpointer and gets a unique namespace so nothing collides. If you call the subgraph from a wrapper function, it runs in its own bubble — you’d have to hand it a checkpointer yourself.

Can I reuse one subgraph in many parent graphs?

That’s the whole point of the pattern. Build and compile the subgraph once, then drop it into as many parents as you need.

python
parent_a.add_node("validator", validation_subgraph)
parent_b.add_node("validator", validation_subgraph)  # same object, different parent

Each parent gets its own execution. State from one parent never leaks into another.

Is there a performance cost to subgraphs?

Tiny. You pay one extra function call per subgraph — a few microseconds. Compare that to the milliseconds (or seconds) your LLM calls and data work take. In practice, subgraph overhead never shows up as a bottleneck.

How does namespace isolation work with many subgraphs?

Every stateful subgraph needs its own slot so saved states don’t step on each other. For direct mounts, LangGraph creates a unique namespace from the node name you give add_node() — you don’t have to do anything. For wrapper-based subgraphs, you’ll need to either pass a fresh checkpointer to each one or carve out namespaces manually.

References

  • LangGraph Documentation — Subgraphs. Link

  • LangGraph Documentation — Graph API Overview. Link

  • LangChain OpenTutorial — How to Transform Subgraph Input and Output. Link

  • LangGraph GitHub Repository. Link

  • LangGraph Documentation — Workflows and Agents. Link

  • James Li — Building Complex AI Workflows with LangGraph: Subgraph Architecture. DEV Community. Link

  • LangGraph Documentation — Subgraphs Overview. Link

Free Course
Master Core Python — Your First Step into AI/ML

Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.

Start Free Course
Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Get the full course,
completely free.
Join 57,000+ students learning Python, SQL & ML. One year of access, all resources included.
📚 10 Courses
🐍 Python & ML
🗄️ SQL
📦 Downloads
📅 1 Year Access
No thanks
🎓
Free AI/ML Starter Kit
Python · SQL · ML · 10 Courses · 57,000+ students
🎉   You're in! Check your inbox (or Promotions/Spam) for the access link.
⚡ Before you go

Python.
SQL. NumPy.
All free.

Get the exact 10-course programming foundation that Data Science professionals use.

🐍
Core Python — from first line to expert level
📈
NumPy & Pandas — the #1 libraries every DS job needs
🗃️
SQL Levels I–III — basics to Window Functions
📄
Real industry data — Jupyter notebooks included
R A M S K
57,000+ students
★★★★★ Rated 4.9/5
⚡ Before you go
Python. SQL.
All Free.
R A M S K
57,000+ students  ★★★★★ 4.9/5
Get Free Access Now
10 courses. Real projects. Zero cost. No credit card.
New learners enrolling right now
🔒 100% free ☕ No spam, ever ✓ Instant access
🚀
You're in!
Check your inbox for your access link.
(Check Promotions or Spam if you don't see it)
Or start your first course right now:
Start Free Course →
Scroll to Top
Scroll to Top
Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science