Menu

LangGraph Structured Output & Self-Correcting Agents

Build LangGraph agents that return validated, typed data using Pydantic models — with automatic retry loops that self-correct on validation failures.

Written by Selva Prabhakaran | 35 min read


Force your LangGraph agents to return validated, typed data — and automatically fix themselves when they don’t.

Your agent sends back a nice paragraph. But the next function wants a JSON dict with five exact keys — not prose. So you parse the string, cross your fingers, and hope. One missing key and it all blows up.

This gap — “LLM text” vs. “data your app can use” — is the structured output problem. Closing it well is the biggest step from demo to product.

Here is the plan. First, define a Pydantic model — a class that spells out the data shape you need. Then hook it to the LLM with .with_structured_output() so it returns typed data, not free text.

That works well — until the LLM slips. A wrong type. A number that breaks a rule. A missing key. When that happens, you want the agent to catch the error, show it to the LLM, and let it try again. LangGraph makes this loop simple: a node tries the parse, a router checks the result, and a conditional edge loops back on failure. The error goes right into the next prompt so the LLM knows what to fix.

Schema basics come first. Then the self-correcting loop, step by step.

What Is Structured Output?

In short, structured output means the LLM gives back data in a shape you picked. You list the fields, their types, and the rules. The response must match.

Why care? Because your code needs a known shape. A database wants exact types. An API wants set keys. A UI widget can not show a blob of text.

Without it, you parse free text with regex — a path to late-night bugs. With it, you get a typed Pydantic object ready to use.

python
import os
from dotenv import load_dotenv
from pydantic import BaseModel, Field, field_validator
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, MessagesState, START, END
from typing import TypedDict, Annotated, Optional, Literal
import operator

load_dotenv()

llm = ChatOpenAI(model="gpt-4o-mini")
print("Environment ready")
python
Environment ready

Prerequisites

  • Python version: 3.10+
  • Required libraries: langchain-openai (0.2+), langgraph (0.2+), pydantic (2.0+), python-dotenv
  • Install: pip install langchain-openai langgraph pydantic python-dotenv
  • API Key: Set OPENAI_API_KEY in a .env file
  • Time to complete: 25-30 minutes

How Do Pydantic Models Work as Output Schemas?

It all starts with Pydantic. You write a class where each field has a name, a type, and a short hint. That hint is not just for you — it goes to the LLM as part of the prompt and shapes how well it fills in each field.

Here is a schema for pulling company info from raw text. The description strings act as mini-instructions for the model.

python
class CompanyInfo(BaseModel):
    """Extract structured company information from text."""
    name: str = Field(description="Official company name")
    industry: str = Field(description="Primary industry sector")
    employee_count: Optional[int] = Field(
        default=None,
        description="Number of employees, if mentioned"
    )
    headquarters: str = Field(
        description="City and country of headquarters"
    )
    key_products: list[str] = Field(
        description="Main products or services, max 5"
    )

print(CompanyInfo.model_json_schema())
python
{'description': 'Extract structured company information from text.', 'properties': {'name': {'description': 'Official company name', 'title': 'Name', 'type': 'string'}, 'industry': {'description': 'Primary industry sector', 'title': 'Industry', 'type': 'string'}, 'employee_count': {'anyOf': [{'type': 'integer'}, {'type': 'null'}], 'default': None, 'description': 'Number of employees, if mentioned', 'title': 'Employee Count'}, 'headquarters': {'description': 'City and country of headquarters', 'title': 'Headquarters', 'type': 'string'}, 'key_products': {'description': 'Main products or services, max 5', 'items': {'type': 'string'}, 'title': 'Key Products', 'type': 'array'}}, 'required': ['name', 'industry', 'headquarters', 'key_products'], 'title': 'CompanyInfo', 'type': 'object'}

That JSON goes straight to the LLM’s API. The model reads each hint and tries to fill the fields to match. Use vague text like description="company info" and the output will be all over the place. Clear, sharp hints are a must.

KEY INSIGHT: Field hints are prompt engineering in disguise. The LLM reads them as instructions. Clear text gives clean, steady output. Vague text gives chaos.

How Do You Use with_structured_output()?

The fastest way to get typed data is .with_structured_output(). Pass in your Pydantic model and it handles format rules, parsing, and type fixes for you.

What goes on under the hood? LangChain turns your class into a function spec and sends it through the tool-calling API. The model replies with arguments that map to your fields. LangChain grabs them and builds a Pydantic object.

python
structured_llm = llm.with_structured_output(CompanyInfo)

result = structured_llm.invoke(
    "Tesla was founded by Elon Musk and is headquartered in "
    "Austin, Texas, USA. The electric vehicle company has about "
    "140,000 employees and makes the Model 3, Model Y, Model S, "
    "Model X, and Cybertruck."
)

print(f"Name: {result.name}")
print(f"Industry: {result.industry}")
print(f"Employees: {result.employee_count}")
print(f"HQ: {result.headquarters}")
print(f"Products: {result.key_products}")

The result looks like this:

python
Name: Tesla
Industry: Electric Vehicles
Employees: 140000
HQ: Austin, Texas, USA
Products: ['Model 3', 'Model Y', 'Model S', 'Model X', 'Cybertruck']

The return value is not a string. It is a real CompanyInfo object. You grab .name, .industry, or any field with dot access. No parsing. No regex. No json.loads().

TIP: Pass include_raw=True when you need to debug. You get the Pydantic object plus the raw LLM reply, so you can see what the model sent before parsing ran.

When Does Validation Go Beyond Type Checking?

Type checks alone are not enough. What if the model says “Q7” for a quarter? Or gives a negative revenue? Both have the right type but break your rules.

Pydantic’s @field_validator fixes this. You write short checks that run when a field is set. Bad value? The check raises an error. Here is a schema with two such checks.

python
class FinancialReport(BaseModel):
    """Extract quarterly financial data from earnings text."""
    company: str = Field(description="Company name")
    quarter: str = Field(
        description="Quarter, e.g. Q1, Q2, Q3, Q4"
    )
    year: int = Field(description="Fiscal year")
    revenue_millions: float = Field(
        description="Revenue in millions of dollars"
    )
    profit_millions: float = Field(
        description="Net profit in millions of dollars"
    )

    @field_validator("quarter")
    @classmethod
    def validate_quarter(cls, v):
        valid = {"Q1", "Q2", "Q3", "Q4"}
        if v not in valid:
            raise ValueError(
                f"Quarter must be one of {valid}, got '{v}'"
            )
        return v

    @field_validator("revenue_millions")
    @classmethod
    def validate_revenue(cls, v):
        if v < 0:
            raise ValueError(
                f"Revenue cannot be negative, got {v}"
            )
        return v

print("FinancialReport schema with validators defined")
python
FinancialReport schema with validators defined

What happens when bad data hits these validators? Let me test with an invalid quarter.

python
try:
    bad_report = FinancialReport(
        company="Test Corp",
        quarter="Q7",
        year=2025,
        revenue_millions=100.0,
        profit_millions=10.0,
    )
except Exception as e:
    print(f"Validation error: {e}")

The validator blocks it right away:

python
Validation error: 1 validation error for FinancialReport
quarter
  Value error, Quarter must be one of {'Q1', 'Q2', 'Q3', 'Q4'}, got 'Q7' [type=value_error, input_value='Q7', url=https://errors.pydantic.dev/2.11/v/value_error]

The check did its job — “Q7” got blocked. But in a LangGraph agent, you do not want the whole thing to stop. You want the agent to catch the error, show it to the LLM, and ask for a fix.

That loop — extract, check, feed back the error, retry — is the core pattern. Let me show you how to build it.

How Do You Build a Self-Correcting Extraction Node?

The idea is simple. A node calls the LLM for data. If Pydantic says “good,” we save it and move on. If not, we log the error, show it to the LLM, and try again. A conditional edge loops until the data is valid or the tries run out.

Three parts make the graph work.

Extract node: sends text to the LLM with your schema. Clean parse? Store the data. Bad data? Log the error and bump the retry count.

Router: checks the state and picks a path. Pass goes to output. Fail with tries left loops back. Fail with no tries left goes to the fallback.

Fallback node: the safety net. If the LLM still can not get it right, this node hands back a partial result or a clear error so your app does not crash.

python
class ExtractionState(TypedDict):
    messages: Annotated[list, operator.add]
    extraction_result: Optional[dict]
    validation_error: Optional[str]
    retry_count: int
    max_retries: int

All the info the loop needs lives in this state: the chat so far, the parsed data (if it worked), the last error (if it did not), and two counters for retries.

Now look at the extract node. The key trick on retries: it slips the error into the message list. The LLM reads it, sees what broke, and can aim its fix. Skip this step and a retry is just a coin flip.

python
def extraction_node(state: ExtractionState):
    """Try to extract structured data, with error feedback on retries."""
    messages = state["messages"]
    retry_count = state.get("retry_count", 0)

    if state.get("validation_error"):
        error_msg = state["validation_error"]
        feedback = HumanMessage(
            content=(
                f"Your previous response failed validation: "
                f"{error_msg}\n\n"
                f"Please fix the errors and try again. "
                f"Return valid structured data."
            )
        )
        messages = messages + [feedback]

    try:
        structured_llm = llm.with_structured_output(
            FinancialReport
        )
        result = structured_llm.invoke(messages)
        return {
            "messages": [
                AIMessage(
                    content=f"Extracted: {result.model_dump_json()}"
                )
            ],
            "extraction_result": result.model_dump(),
            "validation_error": None,
            "retry_count": retry_count,
        }
    except Exception as e:
        return {
            "messages": [
                AIMessage(
                    content=f"Extraction failed: {str(e)}"
                )
            ],
            "extraction_result": None,
            "validation_error": str(e),
            "retry_count": retry_count + 1,
        }

print("Extraction node defined")
python
Extraction node defined

Two jobs in one node. It asks the LLM for data. And when a past try failed, it feeds the error back so the model can fix the right thing. That back-and-forth is what drives self-correction.

WARNING: Always cap retries. With no limit, the loop spins until LangGraph hits its own cap and throws a cryptic error. Three tries is a safe default. If three is not enough, the schema or prompt needs a rethink.

How Do You Wire the Self-Correcting Graph?

The router asks three things: did we get good data? Do we have tries left? Or are we out? Then it picks one of three paths.

python
def route_extraction(
    state: ExtractionState,
) -> Literal["extract", "output", "fallback"]:
    """Route based on extraction success or failure."""
    if state.get("extraction_result") is not None:
        return "output"
    if state.get("retry_count", 0) >= state.get("max_retries", 3):
        return "fallback"
    return "extract"


def output_node(state: ExtractionState):
    """Process successful extraction."""
    result = state["extraction_result"]
    summary = (
        f"Successfully extracted: {result['company']} "
        f"{result['quarter']} {result['year']} — "
        f"Revenue: ${result['revenue_millions']}M, "
        f"Profit: ${result['profit_millions']}M"
    )
    return {"messages": [AIMessage(content=summary)]}


def fallback_node(state: ExtractionState):
    """Handle extraction failure after max retries."""
    retries = state.get("retry_count", 0)
    last_error = state.get("validation_error", "Unknown")
    return {
        "messages": [
            AIMessage(
                content=(
                    f"Extraction failed after {retries} attempts. "
                    f"Last error: {last_error}. "
                    f"Returning empty result."
                )
            )
        ],
        "extraction_result": {},
    }

print("Routing and handler nodes defined")
python
Routing and handler nodes defined

Now put it all together. The conditional edge after the extract node is the key part — it turns a one-shot call into a loop that heals itself.

python
graph_builder = StateGraph(ExtractionState)

graph_builder.add_node("extract", extraction_node)
graph_builder.add_node("output", output_node)
graph_builder.add_node("fallback", fallback_node)

graph_builder.add_edge(START, "extract")
graph_builder.add_conditional_edges(
    "extract",
    route_extraction,
    {
        "extract": "extract",
        "output": "output",
        "fallback": "fallback",
    },
)
graph_builder.add_edge("output", END)
graph_builder.add_edge("fallback", END)

extraction_graph = graph_builder.compile()
print("Self-correcting extraction graph compiled")
python
Self-correcting extraction graph compiled

Let me feed it a real earnings blurb. The graph will try to parse the numbers, run them through the Pydantic validators, and circle back if anything is wrong.

python
test_input = {
    "messages": [
        SystemMessage(
            content="Extract financial data from the user's text."
        ),
        HumanMessage(
            content=(
                "Apple reported Q2 2025 earnings yesterday. "
                "Revenue came in at $94.8 billion for the quarter, "
                "with net profit of $24.2 billion."
            )
        ),
    ],
    "extraction_result": None,
    "validation_error": None,
    "retry_count": 0,
    "max_retries": 3,
}

result = extraction_graph.invoke(test_input)
print(f"\nFinal result: {result['extraction_result']}")
print(f"Retries used: {result['retry_count']}")

Here is what comes back:

python
Final result: {'company': 'Apple', 'quarter': 'Q2', 'year': 2025, 'revenue_millions': 94800.0, 'profit_millions': 24200.0}
Retries used: 0

Zero retries — it got it right the first time. But the safety net is there for when it does not.

KEY INSIGHT: Self-correction needs three things: data extraction, error feedback, and a retry loop. Drop any one and it breaks. No extraction means raw text. No feedback means the LLM repeats the same mistake. No loop means you crash on the first failure.

Exercise 1: Add a Validator and Test the Retry Loop

You have seen @field_validator catch bad data and how the extraction node feeds errors back. Now try it yourself.

Add a @field_validator to FinancialReport that rejects years before 2000 or after 2030. Then invoke the extraction graph with text that mentions a plausible but invalid year (like “fiscal year 1998”) to see the retry loop in action.

Hint 1

Add a new `@field_validator(“year”)` method that raises `ValueError` if the year is outside the 2000-2030 range.

Hint 2 (nearly the answer)
python
@field_validator("year")
@classmethod
def validate_year(cls, v):
    if v < 2000 or v > 2030:
        raise ValueError(f"Year must be 2000-2030, got {v}")
    return v

Then test with input text like: “Acme Corp reported Q3 1998 revenue of $50 million with $5 million profit.”

Solution
python
class FinancialReportStrict(BaseModel):
    """Extract quarterly financial data with strict year validation."""
    company: str = Field(description="Company name")
    quarter: str = Field(description="Quarter: Q1, Q2, Q3, Q4")
    year: int = Field(description="Fiscal year (2000-2030)")
    revenue_millions: float = Field(description="Revenue in millions USD")
    profit_millions: float = Field(description="Net profit in millions USD")

    @field_validator("quarter")
    @classmethod
    def validate_quarter(cls, v):
        valid = {"Q1", "Q2", "Q3", "Q4"}
        if v not in valid:
            raise ValueError(f"Quarter must be one of {valid}, got '{v}'")
        return v

    @field_validator("year")
    @classmethod
    def validate_year(cls, v):
        if v < 2000 or v > 2030:
            raise ValueError(f"Year must be 2000-2030, got {v}")
        return v

    @field_validator("revenue_millions")
    @classmethod
    def validate_revenue(cls, v):
        if v < 0:
            raise ValueError(f"Revenue cannot be negative, got {v}")
        return v

The retry loop catches the `ValueError`, sends the error text to the LLM, and the model adjusts. The message tells the model exactly what range is allowed.

How Do You Handle Multiple Schemas with Dynamic Routing?

What if your agent sees many kinds of input? A ticket system might get bug reports, feature asks, and billing questions — each needs its own schema.

The fix: classify first, then route to the right schema. One cheap label call saves you from costly wrong-schema failures.

python
class BugReport(BaseModel):
    """Extract bug report details."""
    title: str = Field(description="Short bug title")
    severity: str = Field(
        description="Bug severity: critical, high, medium, low"
    )
    steps_to_reproduce: list[str] = Field(
        description="Steps that trigger the bug"
    )
    expected_behavior: str = Field(
        description="What should happen instead"
    )

    @field_validator("severity")
    @classmethod
    def validate_severity(cls, v):
        valid = {"critical", "high", "medium", "low"}
        if v.lower() not in valid:
            raise ValueError(
                f"Severity must be one of {valid}, got '{v}'"
            )
        return v.lower()


class FeatureRequest(BaseModel):
    """Extract feature request details."""
    title: str = Field(description="Short feature title")
    priority: str = Field(
        description="Priority: must-have, nice-to-have, future"
    )
    use_case: str = Field(
        description="Why the user needs this feature"
    )
    suggested_solution: Optional[str] = Field(
        default=None,
        description="User's suggested implementation, if any"
    )

print("Support ticket schemas defined")
python
Support ticket schemas defined

The classifier reads the ticket and picks the right schema. A plain .invoke() call is all it needs — no structured output for a simple label.

python
class TicketState(TypedDict):
    messages: Annotated[list, operator.add]
    ticket_type: Optional[str]
    extraction_result: Optional[dict]
    validation_error: Optional[str]
    retry_count: int
    max_retries: int


def classify_ticket(state: TicketState):
    """Classify the ticket type before extraction."""
    messages = state["messages"]
    classifier_prompt = SystemMessage(
        content=(
            "Classify this support ticket as exactly one of: "
            "'bug_report' or 'feature_request'. "
            "Reply with just the classification, nothing else."
        )
    )
    response = llm.invoke([classifier_prompt] + messages)
    ticket_type = response.content.strip().lower()

    if "bug" in ticket_type:
        ticket_type = "bug_report"
    else:
        ticket_type = "feature_request"

    return {
        "messages": [
            AIMessage(content=f"Classified as: {ticket_type}")
        ],
        "ticket_type": ticket_type,
    }

print("Classifier node defined")
python
Classifier node defined

TIP: Always classify before you extract. The wrong schema wastes tokens and retries. One cheap label call saves costly failures later.

Each extract node uses its own schema and its own retry logic. The shape mirrors the single-schema pattern from before.

python
def extract_bug_report(state: TicketState):
    """Extract structured bug report data."""
    messages = state["messages"]

    if state.get("validation_error"):
        messages = messages + [
            HumanMessage(
                content=f"Fix this error: {state['validation_error']}"
            )
        ]

    try:
        structured = llm.with_structured_output(BugReport)
        result = structured.invoke(messages)
        return {
            "messages": [
                AIMessage(content="Bug report extracted")
            ],
            "extraction_result": result.model_dump(),
            "validation_error": None,
            "retry_count": state.get("retry_count", 0),
        }
    except Exception as e:
        return {
            "messages": [
                AIMessage(content=f"Bug extraction failed: {e}")
            ],
            "extraction_result": None,
            "validation_error": str(e),
            "retry_count": state.get("retry_count", 0) + 1,
        }


def extract_feature_request(state: TicketState):
    """Extract structured feature request data."""
    messages = state["messages"]

    if state.get("validation_error"):
        messages = messages + [
            HumanMessage(
                content=f"Fix this error: {state['validation_error']}"
            )
        ]

    try:
        structured = llm.with_structured_output(FeatureRequest)
        result = structured.invoke(messages)
        return {
            "messages": [
                AIMessage(content="Feature request extracted")
            ],
            "extraction_result": result.model_dump(),
            "validation_error": None,
            "retry_count": state.get("retry_count", 0),
        }
    except Exception as e:
        return {
            "messages": [
                AIMessage(
                    content=f"Feature extraction failed: {e}"
                )
            ],
            "extraction_result": None,
            "validation_error": str(e),
            "retry_count": state.get("retry_count", 0) + 1,
        }

print("Schema-specific extraction nodes defined")
python
Schema-specific extraction nodes defined

How Do You Build the Multi-Schema Graph?

Now wire it all into one graph. The conditional edges do double duty: first they pick the schema, then they handle retry logic.

python
def route_to_extractor(
    state: TicketState,
) -> Literal["extract_bug", "extract_feature"]:
    """Route to the correct extraction node."""
    if state.get("ticket_type") == "bug_report":
        return "extract_bug"
    return "extract_feature"


def route_after_extraction(
    state: TicketState,
) -> Literal["extract_bug", "extract_feature", "done"]:
    """Check extraction result and decide next step."""
    if state.get("extraction_result") is not None:
        return "done"
    if state.get("retry_count", 0) >= state.get("max_retries", 3):
        return "done"
    if state.get("ticket_type") == "bug_report":
        return "extract_bug"
    return "extract_feature"


def done_node(state: TicketState):
    """Final output node."""
    result = state.get("extraction_result", {})
    if result:
        return {
            "messages": [AIMessage(content=f"Final: {result}")],
        }
    return {
        "messages": [
            AIMessage(
                content="Extraction failed after all retries."
            )
        ],
    }

print("Routing functions and done node defined")
python
Routing functions and done node defined
python
ticket_graph_builder = StateGraph(TicketState)

ticket_graph_builder.add_node("classify", classify_ticket)
ticket_graph_builder.add_node("extract_bug", extract_bug_report)
ticket_graph_builder.add_node(
    "extract_feature", extract_feature_request
)
ticket_graph_builder.add_node("done", done_node)

ticket_graph_builder.add_edge(START, "classify")
ticket_graph_builder.add_conditional_edges(
    "classify",
    route_to_extractor,
    {
        "extract_bug": "extract_bug",
        "extract_feature": "extract_feature",
    },
)
ticket_graph_builder.add_conditional_edges(
    "extract_bug",
    route_after_extraction,
    {"extract_bug": "extract_bug", "done": "done"},
)
ticket_graph_builder.add_conditional_edges(
    "extract_feature",
    route_after_extraction,
    {"extract_feature": "extract_feature", "done": "done"},
)
ticket_graph_builder.add_edge("done", END)

ticket_graph = ticket_graph_builder.compile()
print("Multi-schema ticket graph compiled")
python
Multi-schema ticket graph compiled

Let me test it with a bug report.

python
bug_input = {
    "messages": [
        HumanMessage(
            content=(
                "The export button is broken. When I click it on "
                "the dashboard page, nothing happens. No file "
                "downloads. It used to work last week. I need "
                "this fixed ASAP because I can't generate "
                "monthly reports."
            )
        ),
    ],
    "ticket_type": None,
    "extraction_result": None,
    "validation_error": None,
    "retry_count": 0,
    "max_retries": 3,
}

result = ticket_graph.invoke(bug_input)
print(f"Type: {result['ticket_type']}")
print(f"Result: {result['extraction_result']}")
print(f"Retries: {result['retry_count']}")

You will see output like this:

python
Type: bug_report
Result: {'title': 'Export button not working on dashboard', 'severity': 'critical', 'steps_to_reproduce': ['Go to the dashboard page', 'Click the export button', 'Observe that nothing happens and no file downloads'], 'expected_behavior': 'A file should download containing the monthly report data'}
Retries: 0

It tagged the ticket as a bug, sent it to the right node, and got a clean BugReport back. The severity check made sure “critical” is in the allowed set.

How Do You Handle Nested Pydantic Models for Complex Data?

Real data is rarely flat. An invoice has line items. A paper has authors with their own fields. For layered data, nest your Pydantic models.

The LLM reads the full nested schema and fills every level in one shot. No need to extract each layer on its own.

python
class LineItem(BaseModel):
    """A single line item in an invoice."""
    description: str = Field(description="Item description")
    quantity: int = Field(description="Number of units")
    unit_price: float = Field(
        description="Price per unit in dollars"
    )
    total: float = Field(
        description="Line total (quantity * unit_price)"
    )

    @field_validator("quantity")
    @classmethod
    def validate_quantity(cls, v):
        if v <= 0:
            raise ValueError(
                f"Quantity must be positive, got {v}"
            )
        return v


class Invoice(BaseModel):
    """Extract structured invoice data."""
    vendor: str = Field(description="Vendor/seller name")
    invoice_number: str = Field(
        description="Invoice ID or number"
    )
    date: str = Field(
        description="Invoice date in YYYY-MM-DD format"
    )
    line_items: list[LineItem] = Field(
        description="Individual line items on the invoice"
    )
    subtotal: float = Field(description="Sum before tax")
    tax_rate: float = Field(
        description="Tax rate as a percentage"
    )
    total: float = Field(
        description="Final total including tax"
    )

    @field_validator("tax_rate")
    @classmethod
    def validate_tax_rate(cls, v):
        if v < 0 or v > 100:
            raise ValueError(
                f"Tax rate must be 0-100%, got {v}"
            )
        return v

print("Invoice schema with nested LineItem model defined")
python
Invoice schema with nested LineItem model defined

Let me extract from a realistic invoice.

python
invoice_text = """
Invoice #INV-2025-0847
From: CloudStack Solutions
Date: March 5, 2025

Items:
- 3x GPU Instance (A100) at $4.50/hr for 720 hours = $9,720.00
- 1x Storage (5TB) at $0.023/GB/mo = $115.00
- 2x Load Balancer at $25.00/mo = $50.00

Subtotal: $9,885.00
Tax (8.5%): $840.23
Total: $10,725.23
"""

structured_llm = llm.with_structured_output(Invoice)
invoice = structured_llm.invoke(
    f"Extract invoice data:\n\n{invoice_text}"
)

print(f"Vendor: {invoice.vendor}")
print(f"Invoice #: {invoice.invoice_number}")
print(f"Items: {len(invoice.line_items)}")
for item in invoice.line_items:
    print(
        f"  - {item.description}: "
        f"{item.quantity} x \({item.unit_price} = \){item.total}"
    )
print(f"Total: ${invoice.total}")

Here is what comes out:

python
Vendor: CloudStack Solutions
Invoice #: INV-2025-0847
Items: 3
  - GPU Instance (A100): 3 x $4.5 = $9720.0
  - Storage (5TB): 1 x $0.023 = $115.0
  - Load Balancer: 2 x $25.0 = $50.0
Total: $10725.23

Three line items, each with its own checked fields, all from one call. The checks on both LineItem and Invoice guard data at every level.

KEY INSIGHT: Nested models let you pull layered data in one LLM call. Spell out the full schema and the model fills all levels at once — no need for many rounds.

Exercise 2: Build a Multi-Schema Extraction Pipeline

You have seen the classify-then-extract pattern. Now build one from scratch.

Create two Pydantic schemas: MeetingNotes (with fields: title, date, attendees list, action_items list, decisions list) and StatusUpdate (with fields: project_name, status as “on_track”/”at_risk”/”blocked”, completed_tasks list, blockers list). Add a validator that rejects invalid status values.

Then build a LangGraph pipeline that classifies incoming text as either “meeting_notes” or “status_update” and routes to the right extractor.

Hint 1

Follow the same layout as the ticket system: build a `classify_node` that uses plain LLM invoke, then a routing function that sends to either `extract_meeting` or `extract_status` based on the label.

Hint 2 (nearly the answer)
python
class MeetingNotes(BaseModel):
    title: str = Field(description="Meeting title or topic")
    date: str = Field(description="Meeting date in YYYY-MM-DD format")
    attendees: list[str] = Field(description="List of attendee names")
    action_items: list[str] = Field(description="Action items assigned")
    decisions: list[str] = Field(description="Key decisions made")

class StatusUpdate(BaseModel):
    project_name: str = Field(description="Project name")
    status: str = Field(description="Status: on_track, at_risk, or blocked")
    completed_tasks: list[str] = Field(description="Tasks completed this period")
    blockers: list[str] = Field(description="Current blockers, if any")

    @field_validator("status")
    @classmethod
    def validate_status(cls, v):
        valid = {"on_track", "at_risk", "blocked"}
        if v.lower() not in valid:
            raise ValueError(f"Status must be one of {valid}")
        return v.lower()

Wire the graph just like the ticket system — classify, route, extract, done.

Solution
python
# Full solution follows the TicketState pattern:
# 1. Define MeetingNotes and StatusUpdate schemas (above)
# 2. Create DocState TypedDict with doc_type, extraction_result, etc.
# 3. Build classify_doc, extract_meeting, extract_status nodes
# 4. Wire with conditional edges: classify -> route -> extract -> done
# The key insight: the classify step prevents wrong-schema extraction

What Are the Most Common Mistakes?

Mistake 1: Leaving out field descriptions

Wrong:

python
class UserProfile(BaseModel):
    name: str
    age: int
    city: str

Why it fails: With no Field(description=...), the LLM guesses. “city” could mean birth city, home city, or dream city. You get random results.

Right:

python
class UserProfile(BaseModel):
    name: str = Field(description="User's full legal name")
    age: int = Field(description="User's current age in years")
    city: str = Field(
        description="User's current city of residence"
    )

Mistake 2: No cap on self-correcting loops

Wrong:

python
def route(state):
    if state["extraction_result"]:
        return "done"
    return "extract"  # Loops forever on persistent errors!

Why it fails: When the LLM keeps failing, this runs until LangGraph hits its own cap and throws a cryptic error.

Right:

python
def route(state):
    if state["extraction_result"]:
        return "done"
    if state["retry_count"] >= state["max_retries"]:
        return "fallback"
    return "extract"

Mistake 3: Swallowing errors instead of feeding them back

Wrong:

python
except Exception as e:
    return {"retry_count": state["retry_count"] + 1}

Why it fails: The LLM has no clue what went wrong. It will likely make the same mistake. The retry is just a coin flip.

Right:

python
except Exception as e:
    return {
        "validation_error": str(e),
        "retry_count": state["retry_count"] + 1,
    }

Then put that error into the next prompt so the LLM can fix the exact issue.

How Do You Add Confidence Scoring and Conditional Review?

Pass or fail is sometimes too blunt. You may want the LLM to say how sure it is. Then you send low scores to a human and let high scores go through.

Add a confidence field to your schema. Route high scores to output and low scores to a review queue.

python
class ExtractedEntity(BaseModel):
    """Extract a named entity with confidence scoring."""
    entity_name: str = Field(
        description="The extracted entity"
    )
    entity_type: str = Field(
        description="Type: person, organization, location, event"
    )
    confidence: float = Field(
        description=(
            "Your confidence in this extraction, 0.0 to 1.0. "
            "Use 0.9+ when text is explicit. "
            "Use 0.5-0.8 when inferring. "
            "Use below 0.5 when guessing."
        )
    )
    evidence: str = Field(
        description="Quote from text supporting this extraction"
    )

    @field_validator("confidence")
    @classmethod
    def validate_confidence(cls, v):
        if v < 0.0 or v > 1.0:
            raise ValueError(
                f"Confidence must be 0.0-1.0, got {v}"
            )
        return round(v, 2)

    @field_validator("entity_type")
    @classmethod
    def validate_entity_type(cls, v):
        valid = {"person", "organization", "location", "event"}
        if v.lower() not in valid:
            raise ValueError(
                f"entity_type must be one of {valid}"
            )
        return v.lower()


structured_llm = llm.with_structured_output(ExtractedEntity)

clear_result = structured_llm.invoke(
    "Extract the main entity: "
    "Microsoft CEO Satya Nadella announced the partnership."
)
print(f"Entity: {clear_result.entity_name}")
print(f"Type: {clear_result.entity_type}")
print(f"Confidence: {clear_result.confidence}")
print(f"Evidence: {clear_result.evidence}")

The LLM reports high confidence for clear mentions:

python
Entity: Satya Nadella
Type: person
Confidence: 0.95
Evidence: Microsoft CEO Satya Nadella announced the partnership.

Send anything below 0.7 to a human. Let high scores pass on their own.

TIP: Test your thresholds on real data first. LLMs tend to be too sure of themselves. Run 50+ samples to find the score where self-rated confidence lines up with real accuracy.

When Should You NOT Use Structured Output?

It is strong, but not always the right tool. Some tasks do better with plain text.

Creative work. Poems, brainstorm lists, draft emails — forcing structure kills flow. Let the LLM write freely.

Simple yes/no tasks. A Pydantic model with one bool field is overkill. A plain prompt does the job.

Fuzzy input. When the source text is too vague to fill fields well, you burn retries for nothing. Just ask for a plain text summary.

Huge schemas (20+ fields). Big schemas push what LLMs can fill in one go. Split them into smaller models and extract in steps.

WARNING: Do not lean on structured output when your needs are vague. If you can not spell out the schema, the LLM can not fill it well either. Nail down your needs before you write the model.

How Do You Put It All Together in a Production Pipeline?

Time to put it all in one place. This pipeline takes job postings, pulls out data with nested models, checks it, and retries on failure.

JobPosting uses a nested Salary model. The @field_validator on required_skills caps the list at 10. This is the kind of rule that type checks alone can not catch.

python
class Salary(BaseModel):
    """Salary information."""
    min_amount: Optional[int] = Field(
        default=None,
        description="Minimum salary in USD per year"
    )
    max_amount: Optional[int] = Field(
        default=None,
        description="Maximum salary in USD per year"
    )
    currency: str = Field(
        default="USD", description="Currency code"
    )


class JobPosting(BaseModel):
    """Extract structured data from a job posting."""
    title: str = Field(description="Job title")
    company: str = Field(description="Hiring company name")
    location: str = Field(
        description="Job location or 'Remote'"
    )
    experience_years: Optional[int] = Field(
        default=None,
        description="Required years of experience"
    )
    salary: Optional[Salary] = Field(
        default=None,
        description="Salary range if mentioned"
    )
    required_skills: list[str] = Field(
        description="Required technical skills, max 10"
    )
    is_remote: bool = Field(
        description="True if position allows remote work"
    )

    @field_validator("required_skills")
    @classmethod
    def validate_skills(cls, v):
        if len(v) > 10:
            raise ValueError(
                f"Max 10 required skills, got {len(v)}"
            )
        return v

print("JobPosting schema with nested Salary defined")
python
JobPosting schema with nested Salary defined

The graph follows the same shape: extract, route, retry or finish.

python
class JobExtractionState(TypedDict):
    messages: Annotated[list, operator.add]
    extraction_result: Optional[dict]
    validation_error: Optional[str]
    retry_count: int
    max_retries: int


def extract_job(state: JobExtractionState):
    """Extract job posting data with self-correction."""
    messages = state["messages"]
    retry_count = state.get("retry_count", 0)

    if state.get("validation_error"):
        messages = messages + [
            HumanMessage(
                content=(
                    f"Previous attempt failed: "
                    f"{state['validation_error']}\n"
                    f"Fix the errors and try again."
                )
            )
        ]

    try:
        structured = llm.with_structured_output(JobPosting)
        result = structured.invoke(messages)
        return {
            "messages": [
                AIMessage(
                    content="Job posting extracted successfully"
                )
            ],
            "extraction_result": result.model_dump(),
            "validation_error": None,
            "retry_count": retry_count,
        }
    except Exception as e:
        return {
            "messages": [
                AIMessage(content=f"Extraction error: {e}")
            ],
            "extraction_result": None,
            "validation_error": str(e),
            "retry_count": retry_count + 1,
        }


def route_job_extraction(
    state: JobExtractionState,
) -> Literal["extract", "success", "failure"]:
    if state.get("extraction_result") is not None:
        return "success"
    if state.get("retry_count", 0) >= state.get("max_retries", 3):
        return "failure"
    return "extract"


def success_node(state: JobExtractionState):
    result = state["extraction_result"]
    return {
        "messages": [
            AIMessage(
                content=(
                    f"Extracted: {result['title']} "
                    f"at {result['company']}"
                )
            )
        ],
    }


def failure_node(state: JobExtractionState):
    return {
        "messages": [
            AIMessage(
                content="Could not extract job posting data."
            )
        ],
    }


job_graph_builder = StateGraph(JobExtractionState)
job_graph_builder.add_node("extract", extract_job)
job_graph_builder.add_node("success", success_node)
job_graph_builder.add_node("failure", failure_node)

job_graph_builder.add_edge(START, "extract")
job_graph_builder.add_conditional_edges(
    "extract",
    route_job_extraction,
    {
        "extract": "extract",
        "success": "success",
        "failure": "failure",
    },
)
job_graph_builder.add_edge("success", END)
job_graph_builder.add_edge("failure", END)

job_graph = job_graph_builder.compile()
print("Job extraction pipeline compiled")
python
Job extraction pipeline compiled
python
job_text = """
Senior ML Engineer — DataFlow Inc.
Location: San Francisco, CA (Hybrid — 3 days in office)
Salary: $180,000 - $250,000/year

We're looking for an ML engineer with 5+ years of experience
to join our platform team. You'll build and deploy machine
learning pipelines at scale.

Requirements:
- Python, PyTorch, and TensorFlow
- Experience with Kubernetes and Docker
- Strong understanding of MLOps (MLflow, Kubeflow)
- SQL and data pipeline experience
- Familiarity with cloud platforms (AWS/GCP)
"""

job_result = job_graph.invoke({
    "messages": [
        SystemMessage(
            content="Extract structured data from this job posting."
        ),
        HumanMessage(content=job_text),
    ],
    "extraction_result": None,
    "validation_error": None,
    "retry_count": 0,
    "max_retries": 3,
})

extracted = job_result["extraction_result"]
print(f"Title: {extracted['title']}")
print(f"Company: {extracted['company']}")
print(f"Location: {extracted['location']}")
print(f"Remote: {extracted['is_remote']}")
print(f"Experience: {extracted['experience_years']} years")
if extracted.get("salary"):
    sal = extracted["salary"]
    print(f"Salary: \({sal['min_amount']:,} - \){sal['max_amount']:,}")
print(f"Skills: {', '.join(extracted['required_skills'])}")
print(f"Retries: {job_result['retry_count']}")

Here is what the pipeline extracts:

python
Title: Senior ML Engineer
Company: DataFlow Inc.
Location: San Francisco, CA
Remote: False
Experience: 5 years
Salary: $180,000 - $250,000
Skills: Python, PyTorch, TensorFlow, Kubernetes, Docker, MLOps, MLflow, Kubeflow, SQL, AWS/GCP
Retries: 0

It handled the nested salary, checked the skill count, flagged hybrid (not remote), and filled every field. The retry loop was ready but not needed — clear schemas with good field hints tend to pass on the first try.

Complete Code

Click to expand the full script (copy-paste and run)
python
# Complete code from: Structured Output and Self-Correcting Agents in LangGraph
# Requires: pip install langchain-openai langgraph pydantic python-dotenv
# Python 3.10+
# Set OPENAI_API_KEY in your .env file

import os
import operator
from typing import TypedDict, Annotated, Optional, Literal
from dotenv import load_dotenv
from pydantic import BaseModel, Field, field_validator
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, MessagesState, START, END

load_dotenv()

llm = ChatOpenAI(model="gpt-4o-mini")

# --- Schema: Financial Report ---

class FinancialReport(BaseModel):
    """Extract quarterly financial data from earnings text."""
    company: str = Field(description="Company name")
    quarter: str = Field(description="Quarter, e.g. Q1, Q2, Q3, Q4")
    year: int = Field(description="Fiscal year")
    revenue_millions: float = Field(description="Revenue in millions of dollars")
    profit_millions: float = Field(description="Net profit in millions of dollars")

    @field_validator("quarter")
    @classmethod
    def validate_quarter(cls, v):
        valid = {"Q1", "Q2", "Q3", "Q4"}
        if v not in valid:
            raise ValueError(f"Quarter must be one of {valid}, got '{v}'")
        return v

    @field_validator("revenue_millions")
    @classmethod
    def validate_revenue(cls, v):
        if v < 0:
            raise ValueError(f"Revenue cannot be negative, got {v}")
        return v


# --- Self-Correcting Extraction Graph ---

class ExtractionState(TypedDict):
    messages: Annotated[list, operator.add]
    extraction_result: Optional[dict]
    validation_error: Optional[str]
    retry_count: int
    max_retries: int


def extraction_node(state: ExtractionState):
    messages = state["messages"]
    retry_count = state.get("retry_count", 0)

    if state.get("validation_error"):
        feedback = HumanMessage(
            content=(
                f"Your previous response failed validation: "
                f"{state['validation_error']}\n\n"
                f"Please fix the errors and try again."
            )
        )
        messages = messages + [feedback]

    try:
        structured_llm = llm.with_structured_output(FinancialReport)
        result = structured_llm.invoke(messages)
        return {
            "messages": [AIMessage(content=f"Extracted: {result.model_dump_json()}")],
            "extraction_result": result.model_dump(),
            "validation_error": None,
            "retry_count": retry_count,
        }
    except Exception as e:
        return {
            "messages": [AIMessage(content=f"Extraction failed: {str(e)}")],
            "extraction_result": None,
            "validation_error": str(e),
            "retry_count": retry_count + 1,
        }


def route_extraction(state: ExtractionState) -> Literal["extract", "output", "fallback"]:
    if state.get("extraction_result") is not None:
        return "output"
    if state.get("retry_count", 0) >= state.get("max_retries", 3):
        return "fallback"
    return "extract"


def output_node(state: ExtractionState):
    result = state["extraction_result"]
    summary = (
        f"Successfully extracted: {result['company']} "
        f"{result['quarter']} {result['year']} — "
        f"Revenue: ${result['revenue_millions']}M, "
        f"Profit: ${result['profit_millions']}M"
    )
    return {"messages": [AIMessage(content=summary)]}


def fallback_node(state: ExtractionState):
    retries = state.get("retry_count", 0)
    last_error = state.get("validation_error", "Unknown")
    return {
        "messages": [
            AIMessage(content=f"Extraction failed after {retries} attempts. Last error: {last_error}.")
        ],
        "extraction_result": {},
    }


graph_builder = StateGraph(ExtractionState)
graph_builder.add_node("extract", extraction_node)
graph_builder.add_node("output", output_node)
graph_builder.add_node("fallback", fallback_node)
graph_builder.add_edge(START, "extract")
graph_builder.add_conditional_edges(
    "extract", route_extraction,
    {"extract": "extract", "output": "output", "fallback": "fallback"},
)
graph_builder.add_edge("output", END)
graph_builder.add_edge("fallback", END)

extraction_graph = graph_builder.compile()

# --- Test ---
test_input = {
    "messages": [
        SystemMessage(content="Extract financial data from the user's text."),
        HumanMessage(
            content="Apple reported Q2 2025 earnings yesterday. Revenue came in at $94.8 billion for the quarter, with net profit of $24.2 billion."
        ),
    ],
    "extraction_result": None,
    "validation_error": None,
    "retry_count": 0,
    "max_retries": 3,
}

result = extraction_graph.invoke(test_input)
print(f"Final result: {result['extraction_result']}")
print(f"Retries used: {result['retry_count']}")

print("\nScript completed successfully.")

Frequently Asked Questions

Can I use structured output without LangGraph?

Yes. .with_structured_output() works with any chat model that has function calling — no graph needed. LangGraph only adds value when you want the retry loop. If you do not need auto-retries, a plain llm.with_structured_output(MyModel).invoke(prompt) is enough.

Does structured output work with open-source models?

It depends on tool-use support. Llama 3 and Mistral handle it, so .with_structured_output() works fine. Smaller models need PydanticOutputParser instead.

python
from langchain.output_parsers import PydanticOutputParser

parser = PydanticOutputParser(pydantic_object=CompanyInfo)
format_instructions = parser.get_format_instructions()
# Add format_instructions to your prompt text

This puts format rules in the prompt and parses the text reply. Less reliable, but works with any model.

How do I handle optional fields the LLM skips?

Use Optional[type] = Field(default=None, ...). This tells both Pydantic and the LLM that the field can be blank. Without Optional, a missing field throws a ValidationError and wastes a retry — even when the data is not in the text at all.

What is the performance cost of self-correcting loops?

Each retry is a full LLM call — about half a second and a few cents with GPT-4o-mini. Start with max_retries=3. Good schemas rarely need more than one retry. If you keep hitting the cap, fix the schema, not the retry count.

Can I apply structured output at the graph level instead of per-node?

Not directly. StateGraph has no built-in structured output mode. You attach .with_structured_output() to the LLM inside each node. If the graph’s final output must match a schema, run a check in the last node before you return.

Summary

Structured output turns messy LLM text into clean, typed data your app can use right away. The core parts: Pydantic models set the schema, .with_structured_output() does the plumbing, and LangGraph conditional edges wire the retry loop.

Start with .with_structured_output() for basic use. Add @field_validator when you need rules beyond types. Add a LangGraph retry loop when you want the agent to fix its own mistakes.

The pattern scales from simple flat schemas all the way to nested models with several extractors. Classify first, route to the right schema, extract with validation, and retry with error feedback. That is the full toolkit.

Practice exercise: Build a self-correcting agent that takes restaurant reviews and produces structured data: restaurant name, cuisine type, rating (1-5, validated), price range, and dishes mentioned. Include a @field_validator for the rating and a retry loop capped at three tries.

Click to see the solution
python
class RestaurantReview(BaseModel):
    """Extract structured restaurant review data."""
    restaurant_name: str = Field(
        description="Name of the restaurant"
    )
    cuisine: str = Field(description="Type of cuisine")
    rating: int = Field(description="Rating from 1 to 5")
    price_range: str = Field(
        description="Price range: budget, moderate, upscale, fine-dining"
    )
    dishes_mentioned: list[str] = Field(
        description="Specific dishes mentioned in the review"
    )

    @field_validator("rating")
    @classmethod
    def validate_rating(cls, v):
        if v < 1 or v > 5:
            raise ValueError(f"Rating must be 1-5, got {v}")
        return v

    @field_validator("price_range")
    @classmethod
    def validate_price_range(cls, v):
        valid = {"budget", "moderate", "upscale", "fine-dining"}
        if v.lower() not in valid:
            raise ValueError(f"Must be one of {valid}, got '{v}'")
        return v.lower()

# Build the graph using ExtractionState and the same
# extract -> route -> output/fallback pattern from the article.
# Replace FinancialReport with RestaurantReview in the extraction node.

References

  1. LangChain documentation — Structured output. Link
  2. LangGraph documentation — Extraction with retries. Link
  3. Pydantic documentation — Validators. Link
  4. LangGraph documentation — Conditional edges. Link
  5. OpenAI documentation — Function calling. Link
  6. LangChain documentation — PydanticOutputParser. Link
  7. LangGraph documentation — StateGraph. Link
  8. Pydantic documentation — Field types and constraints. Link
Free Course
Master Core Python — Your First Step into AI/ML

Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.

Start Free Course
Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Free Callback - Limited Slots
Not Sure Which Course to Start With?
Talk to our AI Counsellors and Practitioners. We'll help you clear all your questions for your background and goals, bridging the gap between your current skills and a career in AI.
10-digit mobile number
📞
Thank You!
We'll Call You Soon!
Our learning advisor will reach out within 24 hours.
(Check your inbox too — we've sent a confirmation)
⚡ Before you go

Python.
SQL. NumPy.
All free.

Get the exact 10-course programming foundation that Data Science professionals use.

🐍
Core Python — from first line to expert level
📈
NumPy & Pandas — the #1 libraries every DS job needs
🗃️
SQL Levels I–III — basics to Window Functions
📄
Real industry data — Jupyter notebooks included
R A M S K
57,000+ students
★★★★★ Rated 4.9/5
⚡ Before you go
Python. SQL.
All Free.
R A M S K
57,000+ students  ★★★★★ 4.9/5
Get Free Access Now
10 courses. Real projects. Zero cost. No credit card.
New learners enrolling right now
🔒 100% free ☕ No spam, ever ✓ Instant access
🚀
You're in!
Check your inbox for your access link.
(Check Promotions or Spam if you don't see it)
Or start your first course right now:
Start Free Course →
Scroll to Top
Scroll to Top
Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science