LangGraph RAG Agent — Build a Self-Correcting Retrieval Pipeline
You ask your chatbot about the company’s HR policy. It says employees get 30 vacation days. The real number is 20. That’s a classic hallucination — and it’s exactly the kind of mistake a well-built RAG agent catches before it ever reaches the user.
In this post, I’ll show you how to build that agent step by step. We’ll go from a basic retrieval setup all the way to a self-fixing pipeline that grades documents, spots hallucinations, and rewrites its own queries when the first try falls short.
What Is RAG and Why Do Agents Make It Better?
RAG — short for Retrieval-Augmented Generation — is a simple but powerful idea. You hook an LLM up to your own documents so it pulls answers from real data, not just what it picked up during training.
The basic version works like this: take the user’s question, search a vector database, stuff the top results into the prompt, and generate an answer. That’s “naive RAG.” It does fine for clean, simple lookups.
But it falls apart quickly. What if the search returns junk? The LLM writes an answer anyway — often a confident wrong one. What if the question is vague? Naive RAG doesn’t try again with better words.
KEY INSIGHT: Agentic RAG swaps out that rigid pipe for a decision-making graph. The agent picks whether to search, judges what it found, and circles back if the results are bad — much like a human researcher would.
Here’s a side-by-side look:
| Feature | Naive RAG | Agentic RAG |
|---|---|---|
| Retrieval | Always fetches, one shot | Picks IF and WHEN to fetch |
| Relevance check | None — uses whatever comes back | Grades each doc, drops the junk |
| Query refinement | None | Rewrites the query if results miss |
| Fallback sources | None | Web search, other indexes |
| Hallucination check | None | Checks the answer against sources |
| Answer quality | No check | Tests if the answer fits the question |
| Error recovery | Fails quietly | Loops back and tries again |
In short: naive RAG fires once and hopes for the best. Agentic RAG thinks, checks, and retries.
Here’s the plan for what we’ll build: a six-stage pipeline that routes the question, pulls docs, grades them, writes an answer, checks for made-up facts, and tests answer quality — all linked up in a LangGraph StateGraph.
What Do You Need Before Starting?
-
Python: 3.10+
-
Packages: langgraph (0.4+), langchain (0.3+), langchain-openai, langchain-community, chromadb, tiktoken
-
Install:
pip install langgraph langchain langchain-openai langchain-community chromadb tiktoken -
API key: An OpenAI API key set as
OPENAI_API_KEY. See the OpenAI platform to create one. -
Prior knowledge: LangGraph basics — nodes, edges, state. Check our earlier posts on graph concepts and state if you need a refresh.
-
Time: 35–40 minutes
How Do You Set Up the Retrieval Pipeline?
Before we build the agent, we need documents to search through. We’ll set up a small knowledge base, turn those documents into vectors, and store them in ChromaDB — a lightweight vector database that runs on your machine.
This first block loads everything we’ll use. We pull in LangGraph’s StateGraph for the agent, LangChain’s document and embedding classes, and ChromaDB for vector storage.
import os
from typing import List, TypedDict
from langgraph.graph import StateGraph, END, START
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from pydantic import BaseModel, Field
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
Next, we make sample documents — think of them as a mini HR knowledge base. In a real project, you’d load these from PDFs, databases, or web pages. Each Document holds some text and metadata like the file name.
documents = [
Document(
page_content="Employees receive 20 vacation days per year. "
"After 5 years of service, this increases to 25 days.",
metadata={"source": "hr-policy.pdf", "section": "leave"},
),
Document(
page_content="The company matches 401(k) contributions up to 6% "
"of the employee's salary. Vesting is immediate.",
metadata={"source": "benefits-guide.pdf",
"section": "retirement"},
),
Document(
page_content="Remote work is permitted 3 days per week. Employees "
"must be in-office on Tuesdays and Thursdays.",
metadata={"source": "hr-policy.pdf",
"section": "remote-work"},
),
Document(
page_content="Performance reviews occur twice per year, in June "
"and December. Managers use a 5-point rating scale.",
metadata={"source": "hr-policy.pdf", "section": "reviews"},
),
Document(
page_content="Health insurance covers medical, dental, and vision. "
"The company pays 80% of premiums for employees "
"and 60% for dependents.",
metadata={"source": "benefits-guide.pdf",
"section": "health"},
),
Document(
page_content="New employees complete a 90-day probation period. "
"During probation, either party may terminate with "
"one week's notice.",
metadata={"source": "hr-policy.pdf",
"section": "onboarding"},
),
]
Now we embed those docs and stash them in ChromaDB. OpenAIEmbeddings turns text into vectors. ChromaDB indexes those vectors for fast lookups. The from_documents call does both in one shot.
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=documents,
embedding=embeddings,
collection_name="hr_knowledge_base",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
The retriever hands back the top 3 closest documents for any query. We’ll plug it into our agent graph shortly.
TIP: Choose k based on your token budget. Every doc you fetch chews up context space. GPT-4 gives you plenty of room, but with smaller models, keep k at 2–3 so the system prompt still fits.
How Do You Define the Agent State?
Every LangGraph app starts with a state schema — a TypedDict that spells out what data moves through the graph. For our RAG agent, that means the question, the fetched docs, the generated answer, and some flags that steer routing.
class RAGState(TypedDict):
question: str
documents: List[Document]
generation: str
query_rewrite_count: int
relevance_decision: str # "relevant" or "not_relevant"
hallucination_check: str # "grounded" or "not_grounded"
answer_quality: str # "useful" or "not_useful"
Seven fields. The first three carry data (question, documents, answer). The last four are control signals — the agent reads them to decide where to go next.
How Do You Build Each Node of the RAG Agent?
This is where the real work happens. We’ll build six nodes — one per stage — and then wire them with routing edges. Let me walk through each one.
Node 1: Query Router
First up: does this question need a doc search, or can the LLM just wing it? “How many vacation days do I get?” needs the vector store. “Hi, how are you?” does not.
We use structured output to get a clean routing choice. The RouteDecision Pydantic model forces the LLM to pick either “retrieve” or “direct_answer.”
class RouteDecision(BaseModel):
"""Route the question to retrieval or direct answer."""
route: str = Field(
description="Route to 'retrieve' for domain questions "
"or 'direct_answer' for greetings and general chat"
)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
route_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a router. Given a user question, decide if it needs "
"document retrieval or can be answered directly.\n"
"- Use 'retrieve' for questions about company policies, "
"benefits, HR topics, or anything domain-specific.\n"
"- Use 'direct_answer' for greetings, general chat, or "
"questions that don't need company documents.\n"
"Respond with only the route."),
("human", "{question}"),
])
structured_llm_router = llm.with_structured_output(RouteDecision)
I use gpt-4o-mini for routing and grading calls. It’s fast, costs almost nothing, and handles binary choices well. Save the heavy models for the actual answer.
Node 2: Retrieve Documents
The retrieval node queries the vector store and returns matching docs. It’s the shortest node in the whole graph.
def retrieve(state: RAGState) -> dict:
"""Retrieve documents from the vector store."""
question = state["question"]
docs = retriever.invoke(question)
return {"documents": docs}
Clean and simple. The retriever embeds the question, searches ChromaDB, and returns the top-k hits. We just drop them into the state.
Node 3: Grade Document Relevance
Here’s the node that makes all the difference. Rather than blindly trusting what the search hands back, we grade every single doc. The LLM reads each one side by side with the question and gives a flat “yes” or “no.”
class RelevanceGrade(BaseModel):
"""Binary relevance grade for a retrieved document."""
score: str = Field(
description="'yes' if the document is relevant to the "
"question, 'no' otherwise"
)
grade_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a document relevance grader. Given a user question "
"and a retrieved document, decide if the document contains "
"information relevant to answering the question.\n"
"Give a binary 'yes' or 'no' score."),
("human",
"Question: {question}\n\nDocument: {document}"),
])
relevance_grader = grade_prompt | llm.with_structured_output(
RelevanceGrade
)
The grade_documents function loops over every fetched doc, grades it, and keeps only the ones that pass. If none survive, the agent will rewrite the query and try again.
def grade_documents(state: RAGState) -> dict:
"""Grade retrieved documents for relevance."""
question = state["question"]
docs = state["documents"]
relevant_docs = []
for doc in docs:
result = relevance_grader.invoke({
"question": question,
"document": doc.page_content,
})
if result.score == "yes":
relevant_docs.append(doc)
decision = "relevant" if relevant_docs else "not_relevant"
return {
"documents": relevant_docs,
"relevance_decision": decision,
}
KEY INSIGHT: Grading is the cheapest safety net in a RAG pipeline. One GPT-4o-mini grading call costs a sliver of a cent. A bad doc that slips into your prompt costs the user’s trust — and that’s far more costly.
Node 4: Generate the Answer
With good docs in hand, it’s time to write the answer. The prompt locks the LLM to the given context — no making stuff up. This “grounded generation” approach slashes hallucinations.
generate_prompt = ChatPromptTemplate.from_messages([
("system",
"You are an assistant answering questions using the provided "
"context. Use ONLY the information in the context to answer. "
"If the context doesn't contain enough information, say so. "
"Keep answers concise and direct."),
("human",
"Question: {question}\n\nContext:\n{context}"),
])
generate_chain = generate_prompt | llm | StrOutputParser()
def generate(state: RAGState) -> dict:
"""Generate an answer from relevant documents."""
question = state["question"]
docs = state["documents"]
context = "\n\n".join(doc.page_content for doc in docs)
answer = generate_chain.invoke({
"question": question,
"context": context,
})
return {"generation": answer}
Node 5: Hallucination Check
An answer can sound perfect yet state things the docs never mentioned. This node puts the answer next to the source text and asks: “Is every claim actually in the docs?”
class HallucinationGrade(BaseModel):
"""Check if generation is grounded in the documents."""
score: str = Field(
description="'yes' if the answer is grounded in the "
"documents, 'no' if it contains unsupported claims"
)
hallucination_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a hallucination grader. Given a set of source "
"documents and an LLM generation, determine if the "
"generation is supported by the documents.\n"
"Score 'yes' if all claims are grounded in the documents. "
"Score 'no' if any claim is not supported."),
("human",
"Documents:\n{documents}\n\nGeneration: {generation}"),
])
hallucination_grader = hallucination_prompt | llm.with_structured_output(
HallucinationGrade
)
def check_hallucination(state: RAGState) -> dict:
"""Check if the generation is grounded in documents."""
docs = state["documents"]
generation = state["generation"]
doc_text = "\n\n".join(doc.page_content for doc in docs)
result = hallucination_grader.invoke({
"documents": doc_text,
"generation": generation,
})
return {
"hallucination_check": (
"grounded" if result.score == "yes"
else "not_grounded"
)
}
Node 6: Answer Quality Check
An answer can be fully grounded yet still miss the point. This last gate asks whether the response actually helps with what the user asked.
class AnswerGrade(BaseModel):
"""Check if the answer addresses the question."""
score: str = Field(
description="'yes' if the answer addresses the question, "
"'no' if it misses the point"
)
answer_prompt = ChatPromptTemplate.from_messages([
("system",
"You are an answer quality grader. Given a user question "
"and an LLM generation, determine if the answer addresses "
"the question.\n"
"Score 'yes' if it is useful and relevant. "
"Score 'no' if it doesn't answer what was asked."),
("human",
"Question: {question}\n\nAnswer: {generation}"),
])
answer_grader = answer_prompt | llm.with_structured_output(
AnswerGrade
)
def check_answer_quality(state: RAGState) -> dict:
"""Check if the generation answers the question."""
question = state["question"]
generation = state["generation"]
result = answer_grader.invoke({
"question": question,
"generation": generation,
})
return {
"answer_quality": (
"useful" if result.score == "yes"
else "not_useful"
)
}
The Query Rewrite Node
When grading or quality checks fail, the agent rephrases the question to get better search hits. It also bumps a counter so we don’t loop forever.
rewrite_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a query rewriter. Given a user question that "
"didn't produce relevant search results, rewrite it to "
"improve retrieval. Make it more specific or use "
"different keywords. Return only the rewritten question."),
("human", "Original question: {question}"),
])
rewrite_chain = rewrite_prompt | llm | StrOutputParser()
def rewrite_query(state: RAGState) -> dict:
"""Rewrite the question for better retrieval."""
question = state["question"]
rewritten = rewrite_chain.invoke({"question": question})
count = state.get("query_rewrite_count", 0)
return {
"question": rewritten,
"query_rewrite_count": count + 1,
}
WARNING: Always cap the rewrite count. Without a limit, a question that truly can’t be answered from your docs will spin forever. Two tries is a good default — after that, return whatever you have or say “I don’t know.”
The Direct Answer Node
For questions that don’t need retrieval — hellos, small talk, off-topic stuff — we just let the LLM reply on its own.
def direct_answer(state: RAGState) -> dict:
"""Answer without retrieval for non-domain questions."""
question = state["question"]
response = llm.invoke(
f"Answer this briefly: {question}"
)
return {"generation": response.content}
How Do You Wire the Graph Together?
All six nodes are done. Now we connect them with routing edges. Three functions control the flow: route_question picks the entry path, decide_after_grading checks if the docs are good enough, and decide_after_checks handles what happens after the hallucination and quality gates.
def route_question(state: RAGState) -> str:
"""Route based on question type."""
question = state["question"]
result = structured_llm_router.invoke(
route_prompt.invoke({"question": question})
)
return result.route
def decide_after_grading(state: RAGState) -> str:
"""Decide next step based on document relevance."""
if state["relevance_decision"] == "relevant":
return "generate"
count = state.get("query_rewrite_count", 0)
if count >= 2:
return "generate" # proceed with what we have
return "rewrite"
def decide_after_checks(state: RAGState) -> str:
"""Decide based on hallucination and quality checks."""
if state["hallucination_check"] == "not_grounded":
return "generate" # regenerate
if state["answer_quality"] == "not_useful":
count = state.get("query_rewrite_count", 0)
if count >= 2:
return "finish"
return "rewrite"
return "finish"
See how the fix matches the failure? If the LLM made stuff up, the docs were fine — we just run generate again with the same context. If the answer misses the point, the docs were likely wrong — so we rewrite the query and search from scratch.
Here’s the assembly. Each add_node hooks up a function. Each add_conditional_edges tells LangGraph how to route between them.
workflow = StateGraph(RAGState)
# Add all nodes
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("check_hallucination", check_hallucination)
workflow.add_node("check_answer_quality", check_answer_quality)
workflow.add_node("rewrite_query", rewrite_query)
workflow.add_node("direct_answer", direct_answer)
# Entry point: route the question
workflow.add_conditional_edges(
START,
route_question,
{
"retrieve": "retrieve",
"direct_answer": "direct_answer",
},
)
# After retrieval, grade documents
workflow.add_edge("retrieve", "grade_documents")
# After grading, decide: generate or rewrite
workflow.add_conditional_edges(
"grade_documents",
decide_after_grading,
{
"generate": "generate",
"rewrite": "rewrite_query",
},
)
# After rewriting, retrieve again
workflow.add_edge("rewrite_query", "retrieve")
# After generation, check hallucination
workflow.add_edge("generate", "check_hallucination")
# After hallucination check, check answer quality
workflow.add_edge("check_hallucination", "check_answer_quality")
# After quality check, decide: finish or retry
workflow.add_conditional_edges(
"check_answer_quality",
decide_after_checks,
{
"finish": END,
"generate": "generate",
"rewrite": "rewrite_query",
},
)
# Direct answers go straight to END
workflow.add_edge("direct_answer", END)
# Compile the graph
rag_agent = workflow.compile()
And that’s it — the whole graph in one place. Six processing nodes, three routing functions, and conditional edges that create a self-fixing loop.
How Do You Run and Test the RAG Agent?
Let’s try a domain question that should trigger retrieval. The invoke method runs the whole graph and hands back the final state.
result = rag_agent.invoke({
"question": "How many vacation days do employees get?",
"documents": [],
"generation": "",
"query_rewrite_count": 0,
"relevance_decision": "",
"hallucination_check": "",
"answer_quality": "",
})
print(result["generation"])
Employees receive 20 vacation days per year. After 5 years of service, this increases to 25 days.
The agent found the leave policy doc, graded it as relevant, wrote a grounded answer, and passed both the hallucination and quality gates.
Now a greeting — one that should skip retrieval:
result = rag_agent.invoke({
"question": "Hello, how are you today?",
"documents": [],
"generation": "",
"query_rewrite_count": 0,
"relevance_decision": "",
"hallucination_check": "",
"answer_quality": "",
})
print(result["generation"])
Hello! I'm doing well, thanks for asking. How can I help you today?
The router sent this straight to direct_answer, skipping the whole retrieval pipeline.
TIP: Run rag_agent.get_graph().draw_mermaid() to draw your graph. It produces a Mermaid diagram with all nodes and edges — super handy for spotting flow bugs.
How Does the Self-Correcting Loop Work?
The self-correcting pattern is the most powerful piece of this design. When something goes wrong — bad docs, made-up facts, or an answer that misses the point — the agent adjusts and tries again.
The fix it picks depends on where the failure happened:
| Failure Point | What the Agent Does |
|---|---|
| No relevant docs | Rewrites the query, searches again |
| Hallucinated answer | Re-generates from the same docs |
| Answer misses the question | Rewrites query, re-retrieves, re-generates |
| Max retries hit | Returns best-effort answer with a disclaimer |
Why different fixes? A hallucinated answer means the docs were fine but the LLM went off script. Running the generate step again usually fixes it. A poor-quality answer means the search pulled wrong docs. You need to go further back — rewrite the query and search from scratch.
KEY INSIGHT: This loop mirrors how an expert actually researches a topic. They don’t stop at the first Google result. They judge what they find, tweak their search terms, and double-check their conclusions. Baking that loop into your agent makes it far more reliable.
How Do You Add Adaptive Routing for Multiple Sources?
The agent above uses one source — the vector store. Real systems often need several sources based on the question type. Adaptive RAG adds that layer.
Instead of a “retrieve or don’t” choice, you route to different paths:
-
Vector search — for questions that need your private docs (“What’s our leave policy?”)
-
Web search — for current events your docs don’t cover
-
SQL query — for structured data (“How many people joined last quarter?”)
-
Direct LLM — for general knowledge and chitchat
Here’s how to plug in web search as a fallback using Tavily, a search API built for LLMs.
# pip install tavily-python
# os.environ["TAVILY_API_KEY"] = "your-tavily-key"
from langchain_community.tools.tavily_search import (
TavilySearchResults,
)
web_search_tool = TavilySearchResults(max_results=3)
class AdaptiveRouteDecision(BaseModel):
"""Route to the appropriate retrieval strategy."""
route: str = Field(
description="One of: 'vectorstore', 'web_search', "
"'direct_answer'"
)
adaptive_route_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a query router for a company knowledge system.\n"
"Route to 'vectorstore' for company policy, benefits, "
"and HR questions.\n"
"Route to 'web_search' for current events, industry "
"trends, or information not in company docs.\n"
"Route to 'direct_answer' for greetings and general chat."
),
("human", "{question}"),
])
The web search node wraps its hits as Document objects. That way they pass through the same grading and generation steps as vector store results. The rest of the graph doesn’t care where the docs came from.
def web_search(state: RAGState) -> dict:
"""Search the web as a fallback retrieval source."""
question = state["question"]
results = web_search_tool.invoke({"query": question})
web_docs = [
Document(
page_content=r["content"],
metadata={"source": r["url"]},
)
for r in results
]
return {"documents": web_docs}
How Do You Add Source Citations?
Let’s make this production-ready by tacking on citations. Users need to check answers, and citing the source gives them a clear path back to the original document.
The prompt tells the LLM to note which docs it used. The function tags each document with its source metadata so the LLM can cite them.
citation_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a company knowledge assistant. Answer the "
"question using ONLY the provided context.\n"
"Rules:\n"
"1. If the context answers the question, provide a "
"clear, concise response.\n"
"2. After your answer, list the sources you used.\n"
"3. If the context doesn't contain enough information, "
"say 'I don't have enough information to answer this "
"question' and suggest who to contact.\n"
"Format sources as: [Source: filename, section]"),
("human",
"Question: {question}\n\nContext:\n{context}"),
])
def generate_with_citations(state: RAGState) -> dict:
"""Generate an answer with source citations."""
question = state["question"]
docs = state["documents"]
context_parts = []
for doc in docs:
source = doc.metadata.get("source", "unknown")
section = doc.metadata.get("section", "")
context_parts.append(
f"[From {source}, section: {section}]\n"
f"{doc.page_content}"
)
context = "\n\n".join(context_parts)
chain = citation_prompt | llm | StrOutputParser()
answer = chain.invoke({
"question": question,
"context": context,
})
return {"generation": answer}
This gives answers like: “Employees receive 20 vacation days per year, increasing to 25 after 5 years. [Source: hr-policy.pdf, section: leave]”. The user can verify the claim against the real document.
WARNING: Don’t trust the LLM to nail citations every time. It sometimes points to the wrong source. For high-stakes apps, verify citations in code by checking which doc actually contains the claimed text.
When Should You Use RAG vs. Fine-Tuning?
They tackle different jobs. Here’s a quick breakdown.
| Factor | RAG | Fine-Tuning |
|---|---|---|
| Best for | Fact-based Q&A over specific docs | Changing the model’s tone or domain vocab |
| Data freshness | Always current — just update the store | Fixed — retrain to refresh |
| Cost | API calls per query + storage | One-time training, cheaper per query |
| Hallucination risk | Lower — answers tied to docs | Higher — model generates from patterns |
| Setup effort | Medium — needs a vector store | High — needs a training pipeline |
Pick RAG when users ask about docs that change often. Pick fine-tuning when you want the model to talk in a certain style. Lots of real systems use both: fine-tune for domain lingo, then RAG for factual answers.
What Are the Most Common RAG Agent Mistakes?
Mistake 1: Skipping document grading
# WRONG: Directly use all retrieved docs — no filtering
def naive_generate(state):
docs = state["documents"]
context = "\n".join(d.page_content for d in docs)
return generate_chain.invoke({
"question": state["question"],
"context": context,
})
The problem: The retriever ranks by vector similarity, not true relevance. A doc about “company day celebrations” might score high for “how many vacation days” just because of the word “days.” Without grading, that noise bleeds into your context.
The fix: Run each doc through grade_documents before you generate.
Mistake 2: No cap on the rewrite loop
# WRONG: loops forever if docs don't exist
def decide_after_grading(state):
if state["relevance_decision"] == "not_relevant":
return "rewrite" # no exit condition!
return "generate"
The problem: If the user asks about a topic not in your docs, the agent rewrites and searches in circles. Each loop burns API calls and time.
The fix: Track query_rewrite_count in state. After 2 tries, go ahead with what you have or say “I don’t know.”
Mistake 3: Different embedding models for indexing and querying
# WRONG: indexing and querying with different models
index_embeddings = OpenAIEmbeddings(
model="text-embedding-3-large"
)
vectorstore = Chroma.from_documents(docs, index_embeddings)
query_embeddings = OpenAIEmbeddings(
model="text-embedding-3-small"
)
retriever = vectorstore.as_retriever()
# Similarity scores will be meaningless!
The problem: Each model maps text to a different vector space. Comparing a query vector from one model to doc vectors from another gives useless similarity scores.
The fix: Always use the same model for both indexing and querying.
Practice Exercise
Build a version of the RAG agent that adds a “confidence score” to each answer. Rate it as “high” (two or more relevant sources agree), “medium” (one source found), or “low” (answer is a best guess).
Click to see the solution
class ConfidenceState(TypedDict):
question: str
documents: List[Document]
generation: str
query_rewrite_count: int
relevance_decision: str
hallucination_check: str
answer_quality: str
confidence: str
def assess_confidence(state: ConfidenceState) -> dict:
"""Assess answer confidence based on source coverage."""
docs = state["documents"]
hallucination = state["hallucination_check"]
if hallucination == "not_grounded":
return {"confidence": "low"}
relevant_count = len(docs)
if relevant_count >= 2:
return {"confidence": "high"}
elif relevant_count == 1:
return {"confidence": "medium"}
else:
return {"confidence": "low"}
Drop this node in after check_answer_quality and before END. The confidence tag tells users how much to trust the answer.
{
type: 'exercise',
id: 'rag-confidence-ex1',
title: 'Exercise 1: Add Confidence Scoring',
difficulty: 'intermediate',
exerciseType: 'write',
instructions: 'Write a function `assess_confidence` that takes the agent state and returns a confidence level. Return "high" if 2+ relevant documents were found and the answer is grounded, "medium" if exactly 1 relevant document was found, and "low" otherwise.',
starterCode: 'def assess_confidence(state: dict) -> dict:\n """Assess answer confidence based on sources."""\n docs = state["documents"]\n hallucination = state["hallucination_check"]\n \n # Your code here\n # Return {"confidence": "high"/"medium"/"low"}\n pass',
testCases: [
{ id: 'tc1', input: 'result = assess_confidence({"documents": [doc1, doc2], "hallucination_check": "grounded"})\nprint(result["confidence"])', expectedOutput: 'high', description: '2 docs + grounded = high confidence' },
{ id: 'tc2', input: 'result = assess_confidence({"documents": [doc1], "hallucination_check": "grounded"})\nprint(result["confidence"])', expectedOutput: 'medium', description: '1 doc + grounded = medium confidence' },
{ id: 'tc3', input: 'result = assess_confidence({"documents": [], "hallucination_check": "not_grounded"})\nprint(result["confidence"])', expectedOutput: 'low', description: 'no docs or not grounded = low', hidden: true },
],
hints: [
'Check hallucination_check first — if "not_grounded", confidence is always "low"',
'Then count docs: len(docs) >= 2 means "high", == 1 means "medium", else "low"',
],
solution: 'def assess_confidence(state: dict) -> dict:\n docs = state["documents"]\n hallucination = state["hallucination_check"]\n if hallucination == "not_grounded":\n return {"confidence": "low"}\n if len(docs) >= 2:\n return {"confidence": "high"}\n elif len(docs) == 1:\n return {"confidence": "medium"}\n return {"confidence": "low"}',
solutionExplanation: 'The function checks grounding first. If the answer is not grounded, confidence is automatically low. Then it uses document count as a proxy — more corroborating sources means higher confidence.',
xpReward: 15,
}
{
type: 'exercise',
id: 'rag-routing-ex2',
title: 'Exercise 2: Build a Three-Way Router',
difficulty: 'intermediate',
exerciseType: 'write',
instructions: 'Write a `route_question` function that returns "vectorstore" for company/HR questions, "web_search" for current events or news, and "direct_answer" for greetings. Use simple keyword matching (no LLM needed).',
starterCode: 'def route_question(question: str) -> str:\n """Route question to the right retrieval strategy."""\n question_lower = question.lower()\n \n # Your code here\n # Return "vectorstore", "web_search", or "direct_answer"\n pass',
testCases: [
{ id: 'tc1', input: 'print(route_question("What is the vacation policy?"))', expectedOutput: 'vectorstore', description: 'HR question routes to vectorstore' },
{ id: 'tc2', input: 'print(route_question("What are the latest AI news?"))', expectedOutput: 'web_search', description: 'News question routes to web search' },
{ id: 'tc3', input: 'print(route_question("Hello!"))', expectedOutput: 'direct_answer', description: 'Greeting routes to direct answer' },
],
hints: [
'Check for HR-related keywords like "policy", "benefits", "vacation", "salary" for vectorstore routing',
'Check for news keywords like "latest", "news", "current", "today" for web_search. Default to "direct_answer".',
],
solution: 'def route_question(question: str) -> str:\n question_lower = question.lower()\n hr_keywords = ["policy", "benefits", "vacation", "salary", "hr", "leave", "insurance"]\n web_keywords = ["latest", "news", "current", "today", "recent", "trending"]\n if any(kw in question_lower for kw in hr_keywords):\n return "vectorstore"\n if any(kw in question_lower for kw in web_keywords):\n return "web_search"\n return "direct_answer"',
solutionExplanation: 'This keyword-based router checks domain terms first, then news terms, and defaults to direct answer. In production, you would use an LLM for more nuanced routing.',
xpReward: 15,
}
Complete Code
Click to expand the full script (copy-paste and run)
# Complete code: Building a RAG Agent with LangGraph
# Requires: pip install langgraph langchain langchain-openai
# langchain-community chromadb tiktoken
# Python 3.10+
import os
from typing import List, TypedDict
from langgraph.graph import StateGraph, END, START
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from pydantic import BaseModel, Field
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
# --- Documents and Vector Store ---
documents = [
Document(
page_content="Employees receive 20 vacation days per year. "
"After 5 years of service, this increases to 25 days.",
metadata={"source": "hr-policy.pdf", "section": "leave"},
),
Document(
page_content="The company matches 401(k) contributions up to "
"6% of the employee's salary. Vesting is immediate.",
metadata={"source": "benefits-guide.pdf",
"section": "retirement"},
),
Document(
page_content="Remote work is permitted 3 days per week. "
"Employees must be in-office Tuesdays and Thursdays.",
metadata={"source": "hr-policy.pdf",
"section": "remote-work"},
),
Document(
page_content="Performance reviews occur twice per year, in "
"June and December. Managers use a 5-point rating scale.",
metadata={"source": "hr-policy.pdf",
"section": "reviews"},
),
Document(
page_content="Health insurance covers medical, dental, and "
"vision. The company pays 80% of premiums for employees "
"and 60% for dependents.",
metadata={"source": "benefits-guide.pdf",
"section": "health"},
),
Document(
page_content="New employees complete a 90-day probation "
"period. During probation, either party may terminate "
"with one week's notice.",
metadata={"source": "hr-policy.pdf",
"section": "onboarding"},
),
]
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
documents=documents,
embedding=embeddings,
collection_name="hr_knowledge_base",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# --- State Definition ---
class RAGState(TypedDict):
question: str
documents: List[Document]
generation: str
query_rewrite_count: int
relevance_decision: str
hallucination_check: str
answer_quality: str
# --- LLM and Graders ---
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
class RouteDecision(BaseModel):
route: str = Field(
description="'retrieve' or 'direct_answer'"
)
route_prompt = ChatPromptTemplate.from_messages([
("system",
"You are a router. Route to 'retrieve' for company "
"policy/HR questions, 'direct_answer' for greetings."),
("human", "{question}"),
])
structured_llm_router = llm.with_structured_output(
RouteDecision
)
class RelevanceGrade(BaseModel):
score: str = Field(description="'yes' or 'no'")
grade_prompt = ChatPromptTemplate.from_messages([
("system",
"Grade if this document is relevant to the question. "
"Binary 'yes' or 'no'."),
("human", "Question: {question}\nDocument: {document}"),
])
relevance_grader = grade_prompt | llm.with_structured_output(
RelevanceGrade
)
generate_prompt = ChatPromptTemplate.from_messages([
("system",
"Answer using ONLY the provided context. Be concise."),
("human", "Question: {question}\nContext:\n{context}"),
])
generate_chain = generate_prompt | llm | StrOutputParser()
class HallucinationGrade(BaseModel):
score: str = Field(description="'yes' or 'no'")
hallucination_prompt = ChatPromptTemplate.from_messages([
("system",
"Is this generation grounded in the documents? "
"'yes' or 'no'."),
("human",
"Documents:\n{documents}\nGeneration: {generation}"),
])
hallucination_grader = (
hallucination_prompt
| llm.with_structured_output(HallucinationGrade)
)
class AnswerGrade(BaseModel):
score: str = Field(description="'yes' or 'no'")
answer_prompt = ChatPromptTemplate.from_messages([
("system",
"Does this answer address the question? 'yes' or 'no'."),
("human",
"Question: {question}\nAnswer: {generation}"),
])
answer_grader = (
answer_prompt
| llm.with_structured_output(AnswerGrade)
)
rewrite_prompt = ChatPromptTemplate.from_messages([
("system",
"Rewrite this question for better search results. "
"Return only the rewritten question."),
("human", "Original question: {question}"),
])
rewrite_chain = rewrite_prompt | llm | StrOutputParser()
# --- Node Functions ---
def retrieve(state: RAGState) -> dict:
docs = retriever.invoke(state["question"])
return {"documents": docs}
def grade_documents(state: RAGState) -> dict:
question = state["question"]
relevant = []
for doc in state["documents"]:
result = relevance_grader.invoke({
"question": question,
"document": doc.page_content,
})
if result.score == "yes":
relevant.append(doc)
decision = "relevant" if relevant else "not_relevant"
return {"documents": relevant,
"relevance_decision": decision}
def generate(state: RAGState) -> dict:
context = "\n\n".join(
d.page_content for d in state["documents"]
)
answer = generate_chain.invoke({
"question": state["question"],
"context": context,
})
return {"generation": answer}
def check_hallucination(state: RAGState) -> dict:
doc_text = "\n\n".join(
d.page_content for d in state["documents"]
)
result = hallucination_grader.invoke({
"documents": doc_text,
"generation": state["generation"],
})
check = (
"grounded" if result.score == "yes"
else "not_grounded"
)
return {"hallucination_check": check}
def check_answer_quality(state: RAGState) -> dict:
result = answer_grader.invoke({
"question": state["question"],
"generation": state["generation"],
})
quality = (
"useful" if result.score == "yes"
else "not_useful"
)
return {"answer_quality": quality}
def rewrite_query(state: RAGState) -> dict:
rewritten = rewrite_chain.invoke({
"question": state["question"]
})
count = state.get("query_rewrite_count", 0)
return {"question": rewritten,
"query_rewrite_count": count + 1}
def direct_answer(state: RAGState) -> dict:
response = llm.invoke(
f"Answer briefly: {state['question']}"
)
return {"generation": response.content}
# --- Routing Functions ---
def route_question(state: RAGState) -> str:
result = structured_llm_router.invoke(
route_prompt.invoke(
{"question": state["question"]}
)
)
return result.route
def decide_after_grading(state: RAGState) -> str:
if state["relevance_decision"] == "relevant":
return "generate"
count = state.get("query_rewrite_count", 0)
if count >= 2:
return "generate"
return "rewrite"
def decide_after_checks(state: RAGState) -> str:
if state["hallucination_check"] == "not_grounded":
return "generate"
if state["answer_quality"] == "not_useful":
count = state.get("query_rewrite_count", 0)
if count >= 2:
return "finish"
return "rewrite"
return "finish"
# --- Build Graph ---
workflow = StateGraph(RAGState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate)
workflow.add_node("check_hallucination", check_hallucination)
workflow.add_node("check_answer_quality", check_answer_quality)
workflow.add_node("rewrite_query", rewrite_query)
workflow.add_node("direct_answer", direct_answer)
workflow.add_conditional_edges(
START, route_question,
{"retrieve": "retrieve",
"direct_answer": "direct_answer"},
)
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents", decide_after_grading,
{"generate": "generate", "rewrite": "rewrite_query"},
)
workflow.add_edge("rewrite_query", "retrieve")
workflow.add_edge("generate", "check_hallucination")
workflow.add_edge(
"check_hallucination", "check_answer_quality"
)
workflow.add_conditional_edges(
"check_answer_quality", decide_after_checks,
{"finish": END, "generate": "generate",
"rewrite": "rewrite_query"},
)
workflow.add_edge("direct_answer", END)
rag_agent = workflow.compile()
# --- Run ---
result = rag_agent.invoke({
"question": "How many vacation days do employees get?",
"documents": [],
"generation": "",
"query_rewrite_count": 0,
"relevance_decision": "",
"hallucination_check": "",
"answer_quality": "",
})
print(result["generation"])
Summary
You’ve built a full RAG agent with LangGraph that goes well beyond naive retrieve-and-generate. It routes questions, grades documents, catches hallucinations, and fixes its own mistakes.
The key patterns to keep:
-
Route first — don’t search when you don’t need to
-
Grade every doc — never feed raw search results straight to the LLM
-
Check your answers — hallucination and quality gates cost pennies but protect user trust
-
Cap your loops — always set a max retry count
-
Cite your sources — traceability builds confidence
These pieces snap together nicely. Adding web search, multi-source routing, or a confidence scorer is just another node with the right edges. The graph grows without breaking what’s already there.
FAQ
How much does a RAG agent cost per query vs. a plain LLM call?
Each run fires multiple LLM calls: routing, grading each doc, writing the answer, plus the two checks. With GPT-4o-mini, that adds up to about 5–10x the cost of a single call — still under $0.01 in most cases. The boost in quality is usually worth the extra pennies.
Can I swap in open-source models instead of OpenAI?
Yes. Swap ChatOpenAI for any LangChain chat model. Ollama works great for local runs — try ChatOllama(model="llama3"). Just test the grading nodes first — they need clean instruction-following. Smaller models sometimes struggle with structured output.
How do I handle documents that are too long for the context window?
Split them into chunks before you index. LangChain’s RecursiveCharacterTextSplitter is the go-to tool. Set chunk size to 500–1000 characters with 100–200 character overlap.
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
)
splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=150,
)
chunks = splitter.split_documents(documents)
What’s the difference between Corrective RAG and Adaptive RAG?
Corrective RAG focuses on fixing bad retrieval — it grades docs and rewrites queries when results miss. Adaptive RAG adds smart routing — it picks the best search path based on what the user asked. Our agent blends both. The LangGraph docs call them “CRAG” and “Adaptive RAG.”
How do I measure whether my RAG agent is doing a good job?
Track three things: retrieval precision (share of fetched docs that are relevant), answer faithfulness (is the answer grounded in sources), and answer relevance (does it address the question). Tools like RAGAS and LangSmith can score all three for you. Start by logging every run and reviewing the edge cases by hand.
References
-
LangGraph documentation — Agentic RAG tutorial. Link
-
LangGraph documentation — Adaptive RAG tutorial. Link
-
LangChain blog — Self-Reflective RAG with LangGraph. Link
-
Yan, S. et al. — Corrective Retrieval Augmented Generation (CRAG). arXiv:2401.15884 (2024).
-
Jeong, S. et al. — Adaptive-RAG: Learning to Adapt Retrieval-Augmented Large Language Models through Question Complexity. arXiv:2403.14403 (2024).
-
Lewis, P. et al. — Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks. NeurIPS (2020). Link
-
ChromaDB documentation. Link
-
OpenAI Embeddings documentation. Link
-
LangChain documentation — Text Splitters. Link
-
Es, S. et al. — RAGAS: Automated Evaluation of Retrieval Augmented Generation. arXiv:2309.15217 (2023).
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →