Project — Build a SQL Database Agent That Answers Business Questions
Your product manager sends a Slack message: “What were our top 5 products by revenue last quarter?” You could write the SQL yourself. Or you could build an agent that does it — every time, for any question, without you touching a query editor.
That’s what we’re building here. Not a toy demo that runs one hardcoded query. A real agent with error recovery, schema awareness, and the ability to explain its results in plain English. By the end, you’ll have a working system you can point at any SQLite database.
Before we write any code, here’s how the data flows through this agent.
A user types a business question in natural language. The first thing the agent does is inspect the database schema — it needs to know what tables and columns exist before it can write SQL. With that context, it generates a SQL query. But generated SQL isn’t always correct, so the agent validates the query for syntax errors before running it. If the query is valid, it executes against the database and gets results back. If it’s malformed, the agent rewrites it and tries again — up to three attempts.
Once results come back, the agent doesn’t dump raw rows on the user. It summarizes the findings in natural language, answering the original question directly.
That’s five stages: schema lookup, query generation, validation, execution (with retry), and summarization. Each one becomes a node in our LangGraph graph. The retry loop is a conditional edge that routes back to query generation when something fails. We’ll build each piece from scratch.
What Makes a SQL Agent Different from Simple Text-to-SQL?
Simple text-to-SQL is a one-shot translation. You send a question to the LLM, it writes SQL, you run it. If the SQL is wrong, you’re stuck. No retries, no schema awareness, no error recovery.
A SQL agent is fundamentally different. It operates as a multi-step reasoning loop. The agent inspects the database first, generates a query with full schema context, checks its own work, and handles failures autonomously. This isn’t just more code — it’s a different architecture.
Here’s what our agent will handle that a simple approach won’t:
- Schema discovery — the agent reads table names, column types, and sample data before writing any SQL
- Self-correction — when a query fails, the agent reads the error message and fixes the query itself
- Multi-step reasoning — complex questions might need the agent to check available tables, run an exploratory query, then run the final query
- Human-readable output — results come back as explanations, not raw database rows
Prerequisites
- Python version: 3.10+
- Required libraries: langgraph (0.4+), langchain-openai (0.3+), langchain-core (0.3+), langchain-community (0.3+)
- Install:
pip install langgraph langchain-openai langchain-core langchain-community - API key: An OpenAI API key set as
OPENAI_API_KEY. See OpenAI’s docs to create one. - Database: We’ll create a SQLite database from scratch — no external downloads needed.
- Time to complete: ~45 minutes
- Prior knowledge: LangGraph fundamentals (nodes, edges, state, tool calling) from earlier posts in this series.
Step 1 — Set Up the Database
Every SQL agent needs a database to query. We’ll create a realistic e-commerce database with four tables: customers, products, orders, and order_items. This gives us enough structure for interesting business questions without being overwhelming.
The database uses SQLite, so there’s nothing to install. Python includes SQLite out of the box. We’ll populate it with deterministic sample data — no random generation, so your results will match exactly.
import os
import sqlite3
from datetime import datetime
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
load_dotenv()
# Create an in-memory SQLite database
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
print("Database connection established")
Database connection established
With the connection open, we’ll create the four tables and populate them with sample data. The schema mirrors a real e-commerce system: customers place orders, each order has multiple line items, and each line item references a product.
# Create tables
cursor.executescript("""
CREATE TABLE customers (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL,
city TEXT NOT NULL,
signup_date TEXT NOT NULL
);
CREATE TABLE products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
category TEXT NOT NULL,
price REAL NOT NULL
);
CREATE TABLE orders (
id INTEGER PRIMARY KEY,
customer_id INTEGER NOT NULL,
order_date TEXT NOT NULL,
status TEXT NOT NULL,
FOREIGN KEY (customer_id) REFERENCES customers(id)
);
CREATE TABLE order_items (
id INTEGER PRIMARY KEY,
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders(id),
FOREIGN KEY (product_id) REFERENCES products(id)
);
""")
print("Tables created: customers, products, orders, order_items")
Tables created: customers, products, orders, order_items
Now the sample data. I’m using enough rows to make queries interesting — 8 customers, 10 products, 12 orders, and 20 order items. The dates span Q3 and Q4 of 2025, so we can ask quarterly comparison questions.
# Insert customers
cursor.executemany(
"INSERT INTO customers VALUES (?, ?, ?, ?, ?)",
[
(1, "Alice Johnson", "alice@example.com", "New York", "2024-01-15"),
(2, "Bob Smith", "bob@example.com", "Chicago", "2024-03-22"),
(3, "Carol Davis", "carol@example.com", "New York", "2024-06-10"),
(4, "Dan Wilson", "dan@example.com", "Austin", "2024-02-28"),
(5, "Eva Martinez", "eva@example.com", "Chicago", "2024-07-04"),
(6, "Frank Lee", "frank@example.com", "Austin", "2024-09-15"),
(7, "Grace Kim", "grace@example.com", "New York", "2025-01-10"),
(8, "Henry Brown", "henry@example.com", "Chicago", "2025-04-20"),
],
)
# Insert products
cursor.executemany(
"INSERT INTO products VALUES (?, ?, ?, ?)",
[
(1, "Laptop Pro", "Electronics", 1299.99),
(2, "Wireless Mouse", "Electronics", 29.99),
(3, "Python Cookbook", "Books", 49.99),
(4, "Standing Desk", "Furniture", 599.99),
(5, "Monitor 27in", "Electronics", 349.99),
(6, "Keyboard Mech", "Electronics", 89.99),
(7, "Data Science Handbook", "Books", 39.99),
(8, "Desk Lamp", "Furniture", 45.99),
(9, "USB-C Hub", "Electronics", 59.99),
(10, "Webcam HD", "Electronics", 79.99),
],
)
conn.commit()
print(f"Inserted {cursor.execute('SELECT COUNT(*) FROM customers').fetchone()[0]} customers")
print(f"Inserted {cursor.execute('SELECT COUNT(*) FROM products').fetchone()[0]} products")
Inserted 8 customers
Inserted 10 products
The orders and order items tie everything together. Each order belongs to a customer, and each order item links an order to a product with a quantity.
# Insert orders (spanning Q3-Q4 2025)
cursor.executemany(
"INSERT INTO orders VALUES (?, ?, ?, ?)",
[
(1, 1, "2025-07-10", "completed"),
(2, 2, "2025-07-18", "completed"),
(3, 3, "2025-08-05", "completed"),
(4, 1, "2025-08-22", "completed"),
(5, 4, "2025-09-03", "completed"),
(6, 5, "2025-09-15", "completed"),
(7, 2, "2025-10-01", "completed"),
(8, 6, "2025-10-12", "completed"),
(9, 3, "2025-11-05", "completed"),
(10, 7, "2025-11-20", "completed"),
(11, 1, "2025-12-01", "completed"),
(12, 8, "2025-12-15", "cancelled"),
],
)
# Insert order items
cursor.executemany(
"INSERT INTO order_items VALUES (?, ?, ?, ?)",
[
(1, 1, 1, 1), (2, 1, 2, 2),
(3, 2, 3, 1), (4, 2, 6, 1),
(5, 3, 5, 1), (6, 3, 9, 2),
(7, 4, 2, 3), (8, 4, 7, 1),
(9, 5, 4, 1), (10, 5, 8, 2),
(11, 6, 1, 1), (12, 6, 6, 1),
(13, 7, 3, 2), (14, 7, 10, 1),
(15, 8, 5, 1), (16, 8, 2, 1),
(17, 9, 1, 1), (18, 9, 4, 1),
(19, 10, 7, 2), (20, 10, 9, 1),
],
)
conn.commit()
print(f"Inserted {cursor.execute('SELECT COUNT(*) FROM orders').fetchone()[0]} orders")
print(f"Inserted {cursor.execute('SELECT COUNT(*) FROM order_items').fetchone()[0]} order items")
Inserted 12 orders
Inserted 20 order items
Quick sanity check — let’s confirm the data works with a basic join. This query finds total revenue per product by joining orders, order items, and products.
result = cursor.execute("""
SELECT p.name, SUM(p.price * oi.quantity) as revenue
FROM order_items oi
JOIN products p ON oi.product_id = p.id
JOIN orders o ON oi.order_id = o.id
WHERE o.status = 'completed'
GROUP BY p.name
ORDER BY revenue DESC
LIMIT 5
""").fetchall()
for name, revenue in result:
print(f"{name}: ${revenue:,.2f}")
Laptop Pro: $2,599.98
Standing Desk: $1,199.98
Monitor 27in: $699.98
Wireless Mouse: $149.95
Python Cookbook: $149.97
The database is working. Laptop Pro leads revenue because it’s the highest-priced item and was ordered twice. We’ll use this database for all our agent queries.
Step 2 — Define the Agent State
The agent state tracks everything that flows between nodes. For our SQL agent, we need more than just messages. We need to store the database schema, the generated SQL, query results, the current error (if any), and a retry counter.
LangGraph uses TypedDict for state definitions. Each field in the state is accessible to every node, and nodes return dictionaries with the fields they want to update.
from typing import TypedDict, Optional
class SQLAgentState(TypedDict):
question: str
schema_info: str
generated_sql: str
sql_valid: bool
query_result: str
error_message: str
answer: str
retry_count: int
Eight fields. Here’s what each one does:
question— the user’s natural language questionschema_info— the database schema as a string (tables, columns, types, sample rows)generated_sql— the SQL query the LLM writessql_valid— whether the query passed validationquery_result— the raw result from executing the queryerror_message— the error text if something went wrong (empty string if no error)answer— the final natural language answer for the userretry_count— how many times we’ve retried query generation (caps at 3)
This flat state structure is intentional. I prefer keeping agent state as simple as possible — no nested dictionaries, no lists of intermediate results. Every node reads what it needs and writes what it produces. Clean and debuggable.
Step 3 — Build the Schema Inspector Node
The first node in our graph reads the database schema. The LLM can’t write good SQL without knowing what tables and columns exist. This node queries SQLite’s metadata tables and builds a formatted string with table names, column definitions, and sample data.
Why include sample data? Because column names alone are ambiguous. A column called status could hold anything — “active”/”inactive”, “pending”/”shipped”/”delivered”, or numeric codes. Showing 3 sample rows removes the guessing.
def get_schema_node(state: SQLAgentState) -> dict:
"""Read database schema and sample data."""
schema_parts = []
tables = cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
for (table_name,) in tables:
# Get column info
columns = cursor.execute(
f"PRAGMA table_info({table_name})"
).fetchall()
col_defs = [f" {c[1]} {c[2]}" for c in columns]
# Get 3 sample rows
samples = cursor.execute(
f"SELECT * FROM {table_name} LIMIT 3"
).fetchall()
schema_parts.append(
f"TABLE: {table_name}\n"
f"COLUMNS:\n" + "\n".join(col_defs) + "\n"
f"SAMPLE ROWS: {samples}"
)
schema_info = "\n\n".join(schema_parts)
return {"schema_info": schema_info}
No LLM call here — this node is pure Python. It reads metadata from SQLite’s sqlite_master table and PRAGMA table_info. The output is a formatted string the LLM will use in the next step.
Let’s test it in isolation to see what the LLM will receive as context.
test_schema = get_schema_node({"question": "", "schema_info": "", "generated_sql": "", "sql_valid": False, "query_result": "", "error_message": "", "answer": "", "retry_count": 0})
print(test_schema["schema_info"][:500])
TABLE: customers
COLUMNS:
id INTEGER
name TEXT
email TEXT
city TEXT
signup_date TEXT
SAMPLE ROWS: [(1, 'Alice Johnson', 'alice@example.com', 'New York', '2024-01-15'), (2, 'Bob Smith', 'bob@example.com', 'Chicago', '2024-03-22'), (3, 'Carol Davis', 'carol@example.com', 'New York', '2024-06-10')]
TABLE: products
COLUMNS:
id INTEGER
name TEXT
category TEXT
price REAL
SAMPLE ROWS: [(1, 'Laptop Pro', 'Electronics', 1299.99), (2, 'Wireless
The schema output gives the LLM everything it needs: table names, column types, and real data samples. This is significantly better than just passing table names.
Step 4 — Build the Query Generator Node
This is where the LLM does its work. The query generator takes the user’s question and the database schema, then produces a SQL query. The system prompt is critical here — it tells the LLM exactly how to behave and what constraints to follow.
I’ve found that being extremely specific in the system prompt prevents most SQL generation errors. Telling the model “write valid SQLite syntax” isn’t enough. You need to say “use single quotes for strings, don’t use ILIKE (SQLite doesn’t support it), always qualify ambiguous column names with table aliases.”
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def generate_query_node(state: SQLAgentState) -> dict:
"""Generate a SQL query from the user's question."""
system_prompt = f"""You are a SQL expert. Generate a SQLite query to answer the user's question.
DATABASE SCHEMA:
{state['schema_info']}
RULES:
- Return ONLY the SQL query, no markdown, no explanation
- Use SQLite syntax (no ILIKE, use LIKE with LOWER() instead)
- Always use table aliases in JOINs
- Use single quotes for string literals
- If the question is ambiguous, make reasonable assumptions
- For date comparisons, dates are stored as TEXT in 'YYYY-MM-DD' format
"""
if state.get("error_message"):
system_prompt += f"""
PREVIOUS ERROR — fix this issue:
{state['error_message']}
PREVIOUS QUERY THAT FAILED:
{state['generated_sql']}
"""
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=state["question"]),
])
sql = response.content.strip()
# Strip markdown code fences if the LLM adds them
if sql.startswith("```"):
sql = sql.split("\n", 1)[1].rsplit("```", 1)[0].strip()
return {"generated_sql": sql}
Two things to notice here. First, the system prompt includes the full schema from the previous node. The LLM sees every table, column, and sample row. Second, if this is a retry (there’s an error message in the state), the prompt includes the failed query AND the error. This gives the LLM the context it needs to fix the problem rather than making the same mistake again.
The markdown stripping at the end is a practical necessity. Even with “no markdown” in the prompt, models sometimes wrap SQL in code fences. I’ve hit this with every model I’ve tested — GPT-4o, GPT-4o-mini, Claude. Stripping them prevents downstream parsing failures.
[COMMON MISTAKE]
Don’t include the full database content in the prompt. I’ve seen tutorials that dump entire tables into the system message. For our 8-customer database, that’s fine. For a production database with 100K rows, you’ll blow through the context window and get garbage results. Send schema + sample rows, never full tables.
Step 5 — Build the Validation Node
Before executing any generated SQL, we validate it. This catches syntax errors, missing table references, and other issues before they hit the database. SQLite’s EXPLAIN command is perfect for this — it parses the query without executing it.
def validate_query_node(state: SQLAgentState) -> dict:
"""Validate the generated SQL without executing it."""
sql = state["generated_sql"]
try:
cursor.execute(f"EXPLAIN {sql}")
return {"sql_valid": True, "error_message": ""}
except Exception as e:
return {
"sql_valid": False,
"error_message": f"Validation error: {str(e)}",
}
Short and focused. The EXPLAIN prefix tells SQLite to parse and plan the query without returning data. If the query has a syntax error or references a table that doesn’t exist, it raises an exception. We catch it and store the error message for the retry loop.
Step 6 — Build the Query Execution Node
When validation passes, this node runs the actual query against the database. It formats the results as a readable string with column headers.
def execute_query_node(state: SQLAgentState) -> dict:
"""Execute the validated SQL query."""
sql = state["generated_sql"]
try:
cursor.execute(sql)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
if not rows:
result = "Query returned no results."
else:
header = " | ".join(columns)
separator = "-" * len(header)
row_strings = [
" | ".join(str(val) for val in row) for row in rows
]
result = f"{header}\n{separator}\n" + "\n".join(row_strings)
return {"query_result": result, "error_message": ""}
except Exception as e:
return {
"query_result": "",
"error_message": f"Execution error: {str(e)}",
}
The formatting deserves a note. We extract column names from cursor.description and build a pipe-separated table. Named columns make the LLM’s summarization much more accurate than raw tuples.
Why catch exceptions here if we already validated? Because validation catches syntax errors, not runtime errors. A query might be syntactically perfect but fail at execution — dividing by zero in an aggregate, or a type mismatch in a WHERE clause.
Step 7 — Build the Summarization Node
The final processing node takes the raw query results and translates them into a natural language answer. This is what makes the agent useful for business users who don’t read SQL output.
def summarize_node(state: SQLAgentState) -> dict:
"""Summarize query results as a natural language answer."""
response = llm.invoke([
SystemMessage(content="""You are a data analyst presenting query results to a business user.
- Answer the original question directly
- Use specific numbers from the results
- Keep it concise — 2-4 sentences
- If the query returned no results, say so and suggest why
- Format currency with $ and commas
- Don't mention SQL, queries, or databases"""),
HumanMessage(content=f"""Original question: {state['question']}
Query results:
{state['query_result']}"""),
])
return {"answer": response.content}
The system prompt here is deliberate about what NOT to do. Business users don’t want to hear “the SQL query returned 5 rows.” They want “your top 5 products by revenue are…” The instruction to format currency and avoid technical language makes the output feel like it came from a human analyst.
Step 8 — Wire the Graph with Conditional Routing
This is where LangGraph shines. We connect all five nodes with edges, and add the critical retry loop. The routing function checks validation results and decides whether to execute the query or send it back for regeneration.
The routing logic handles three cases. If the query is valid, we proceed to execution. If it’s invalid and we haven’t exceeded 3 retries, we go back to query generation with the error message. If we’ve hit the retry limit, we bail out with an error message.
def route_after_validation(state: SQLAgentState) -> str:
"""Decide next step based on validation result."""
if state["sql_valid"]:
return "execute"
elif state["retry_count"] < 3:
return "retry"
else:
return "give_up"
def increment_retry(state: SQLAgentState) -> dict:
"""Increment the retry counter before regenerating."""
return {"retry_count": state["retry_count"] + 1}
Two functions here. The router returns a string label that maps to the next node. The increment_retry function is a tiny node that bumps the counter before we loop back to query generation. Without it, the agent would retry forever.
Now we build the graph. Each add_node call registers a function as a named step. The add_conditional_edges call creates the branching logic after validation.
def give_up_node(state: SQLAgentState) -> dict:
"""Return an error message when retries are exhausted."""
return {
"answer": (
f"I wasn't able to answer your question after "
f"{state['retry_count']} attempts. The last error "
f"was: {state['error_message']}"
)
}
# Build the graph
graph = StateGraph(SQLAgentState)
# Add nodes
graph.add_node("get_schema", get_schema_node)
graph.add_node("generate_query", generate_query_node)
graph.add_node("validate_query", validate_query_node)
graph.add_node("execute_query", execute_query_node)
graph.add_node("summarize", summarize_node)
graph.add_node("increment_retry", increment_retry)
graph.add_node("give_up", give_up_node)
# Add edges
graph.add_edge(START, "get_schema")
graph.add_edge("get_schema", "generate_query")
graph.add_edge("generate_query", "validate_query")
graph.add_conditional_edges(
"validate_query",
route_after_validation,
{
"execute": "execute_query",
"retry": "increment_retry",
"give_up": "give_up",
},
)
graph.add_edge("increment_retry", "generate_query")
graph.add_edge("execute_query", "summarize")
graph.add_edge("summarize", END)
graph.add_edge("give_up", END)
# Compile
sql_agent = graph.compile()
print("Graph compiled successfully")
Graph compiled successfully
The graph has a clear flow: START → schema → generate → validate → (execute or retry) → summarize → END. The retry loop from validate_query back through increment_retry to generate_query is the self-correction mechanism. It’s what separates this from a one-shot text-to-SQL call.
Step 9 — Test the SQL Agent on Business Questions
Time to test. We’ll run the LangGraph SQL agent on four progressively harder questions to see how it handles different SQL patterns — aggregation, joins, filtering, and multi-table reasoning.
The helper function below invokes the graph and prints both the generated SQL and the final answer. Seeing the SQL helps you verify the agent is writing correct queries.
def ask(question: str) -> str:
"""Run the SQL agent and return the answer."""
result = sql_agent.invoke({
"question": question,
"schema_info": "",
"generated_sql": "",
"sql_valid": False,
"query_result": "",
"error_message": "",
"answer": "",
"retry_count": 0,
})
print(f"Question: {question}")
print(f"SQL: {result['generated_sql']}")
print(f"Answer: {result['answer']}")
print("-" * 60)
return result["answer"]
Question 1 — Simple aggregation: How many customers do we have?
ask("How many customers do we have?")
Question: How many customers do we have?
SQL: SELECT COUNT(*) AS customer_count FROM customers
Answer: You currently have 8 customers in the system.
------------------------------------------------------------
Clean and direct. The agent wrote a simple COUNT query, and the summarizer delivered a one-sentence answer.
Question 2 — Join with aggregation: What are the top 3 products by revenue?
ask("What are our top 3 products by total revenue?")
The agent should produce a three-table join through order_items. Watch for whether it filters out the cancelled order (order #12 has status “cancelled”).
Question: What are our top 3 products by total revenue?
SQL: SELECT p.name, SUM(p.price * oi.quantity) AS total_revenue FROM order_items oi JOIN products p ON oi.product_id = p.id JOIN orders o ON oi.order_id = o.id WHERE o.status = 'completed' GROUP BY p.name ORDER BY total_revenue DESC LIMIT 3
Answer: Your top 3 products by total revenue are Laptop Pro at \(2,599.98, Standing Desk at \)1,199.98, and Monitor 27in at $699.98.
------------------------------------------------------------
The revenue numbers are correct — Laptop Pro was ordered twice (orders 1 and 9) at $1,299.99 each. The agent correctly excluded the cancelled order. That’s exactly the query a human analyst would write.
[UNDER THE HOOD]
Why does the agent use table aliases? Our system prompt explicitly requires them: “always use table aliases in JOINs.” Without this instruction, the LLM often writes products.name instead of p.name, which works but produces harder-to-read SQL. Aliases also prevent ambiguous column errors when two tables share column names like id.
Question 3 — Filtering with dates: How many orders were placed in Q4 2025?
ask("How many orders were placed in Q4 2025 (October through December)?")
Date handling is a common failure point for text-to-SQL systems. SQLite stores dates as text, so the agent needs string comparison on the YYYY-MM-DD format.
Question: How many orders were placed in Q4 2025 (October through December)?
SQL: SELECT COUNT(*) AS order_count FROM orders WHERE order_date >= '2025-10-01' AND order_date <= '2025-12-31'
Answer: There were 5 orders placed in Q4 2025, from October through December.
------------------------------------------------------------
Five is correct — orders 7 through 12 fall in Q4, but order 12 (December 15) is still counted because the question asks about orders placed, not completed. This is a subtle distinction the agent handled properly.
Question 4 — Multi-step reasoning: Which city has the highest average order value?
ask("Which city has the highest average order value?")
This question forces a subquery. The agent must first compute each order’s total revenue, then average those totals by customer city. Two levels of aggregation.
Question: Which city has the highest average order value?
SQL: SELECT c.city, AVG(order_total) AS avg_order_value FROM (SELECT o.id, o.customer_id, SUM(p.price * oi.quantity) AS order_total FROM orders o JOIN order_items oi ON oi.order_id = o.id JOIN products p ON p.id = oi.product_id WHERE o.status = 'completed' GROUP BY o.id, o.customer_id) sub JOIN customers c ON c.id = sub.customer_id GROUP BY c.city ORDER BY avg_order_value DESC LIMIT 1
Answer: Austin has the highest average order value at $845.98.
------------------------------------------------------------
The subquery approach is clean — first calculate per-order revenue, then group by city. Austin wins because Frank’s order included a Laptop Pro (\(1,299.99) and a Keyboard Mech (\)89.99), pulling the average up.
Common Mistakes When Building a SQL Database Agent
| Approach | Error Recovery | Schema Aware | Multi-Turn | Latency |
|---|---|---|---|---|
| Raw SQL | Manual | No | No | Instant |
| Simple text-to-SQL | None | Partial | No | ~1 second |
| LangGraph SQL agent | Automatic (3 retries) | Full (with samples) | With modification | ~3-5 seconds |
| LangChain SQL toolkit | Built-in | Yes | Via memory | ~3-5 seconds |
Our LangGraph approach gives you full control over each node. The LangChain SQL toolkit is more convenient out of the box, but harder to customize when you need non-standard validation or routing.
Mistake 1: Missing schema context in the prompt
❌ Wrong:
# Sending a question without schema info
response = llm.invoke([
HumanMessage(content="What are the top products?")
])
# LLM guesses table/column names — often wrong
Why it’s wrong: Without schema context, the LLM invents table and column names. It might write SELECT * FROM product when the table is actually called products. These errors are silent until execution.
✅ Correct:
# Always include schema in the system prompt
response = llm.invoke([
SystemMessage(content=f"Schema: {schema_info}"),
HumanMessage(content="What are the top products?"),
])
Mistake 2: No retry loop for failed queries
❌ Wrong:
# One-shot execution — crashes on any SQL error
sql = generate_sql(question)
result = cursor.execute(sql).fetchall() # Might crash
Why it’s wrong: LLMs generate incorrect SQL roughly 10-20% of the time, even with good prompts. Without retries, one bad query kills the entire pipeline.
✅ Correct:
# Our agent's approach: validate, then retry with error context
# The graph handles this automatically via conditional edges
Mistake 3: Executing raw LLM output without validation
❌ Wrong:
# Running whatever the LLM outputs — dangerous
sql = response.content
cursor.execute(sql) # Could be DROP TABLE, DELETE, etc.
Why it’s wrong: The LLM might generate destructive queries. In production, always use a read-only database connection and validate queries before execution.
✅ Correct:
# Validate first with EXPLAIN
try:
cursor.execute(f"EXPLAIN {sql}")
except Exception:
# Route to retry, don't execute
pass
Exercise 1 — Add a Query Complexity Check
You’ve seen how the agent validates SQL for syntax errors. But what about queries that are valid SQL but unreasonably expensive? A SELECT * on a million-row table without a LIMIT clause could hang your database.
Your task: add a check_complexity function that inspects the generated SQL and rejects queries missing a LIMIT clause when they use SELECT without a WHERE condition. The function should return a dictionary with sql_valid set to False and an appropriate error message when the check fails.
# Exercise: Complete this function
def check_complexity(state):
sql = state["generated_sql"].upper()
# Your code here:
# 1. Check if the query has SELECT but no WHERE and no LIMIT
# 2. If so, return {"sql_valid": False, "error_message": "..."}
# 3. Otherwise, return {"sql_valid": True, "error_message": ""}
pass
# Test cases:
# check_complexity({"generated_sql": "SELECT * FROM orders"})
# -> {"sql_valid": False, "error_message": "Query needs a WHERE clause or LIMIT"}
#
# check_complexity({"generated_sql": "SELECT * FROM orders WHERE status = 'completed'"})
# -> {"sql_valid": True, "error_message": ""}
#
# check_complexity({"generated_sql": "SELECT * FROM orders LIMIT 10"})
# -> {"sql_valid": True, "error_message": ""}
Hint 1
Check whether `”WHERE”` or `”LIMIT”` appears in the uppercased SQL string. If neither is present and the query starts with `”SELECT”`, it’s too broad.
Hint 2 (nearly the answer)
has_where = "WHERE" in sql
has_limit = "LIMIT" in sql
if sql.startswith("SELECT") and not has_where and not has_limit:
return {"sql_valid": False, "error_message": "..."}
Solution
def check_complexity(state):
sql = state["generated_sql"].upper().strip()
has_where = "WHERE" in sql
has_limit = "LIMIT" in sql
if sql.startswith("SELECT") and not has_where and not has_limit:
return {
"sql_valid": False,
"error_message": "Query needs a WHERE clause or LIMIT to prevent unbounded scans",
}
return {"sql_valid": True, "error_message": ""}
# Test
print(check_complexity({"generated_sql": "SELECT * FROM orders"}))
print(check_complexity({"generated_sql": "SELECT * FROM orders WHERE status = 'completed'"}))
print(check_complexity({"generated_sql": "SELECT * FROM orders LIMIT 10"}))
{'sql_valid': False, 'error_message': 'Query needs a WHERE clause or LIMIT to prevent unbounded scans'}
{'sql_valid': True, 'error_message': ''}
{'sql_valid': True, 'error_message': ''}
This function adds a safety check that prevents unbounded table scans. In a production agent, you’d integrate this as an additional validation step in the graph — either as a separate node or combined with the existing `validate_query_node`.
Exercise 2 — Support Follow-Up Questions
Right now, our agent handles each question independently. But in a real analytics workflow, users ask follow-up questions: “What about Q3?” or “Break that down by category.” Your task is to modify the ask function to maintain conversation context.
The key insight: you need to pass the previous question and answer to the LLM so it can resolve references like “that” and “those products.”
# Exercise: Modify this function to support follow-ups
conversation_history = []
def ask_with_context(question: str) -> str:
# Your code here:
# 1. Build a context string from conversation_history
# 2. Modify the question to include context
# 3. Run the agent
# 4. Append Q&A to conversation_history
# 5. Return the answer
pass
# Test sequence:
# ask_with_context("What are our top 3 products by revenue?")
# ask_with_context("What about just electronics?") # Should filter by category
Hint 1
Prepend the conversation history to the question. Format it as “Previous Q&A:\n Q: … A: …\n\nCurrent question: …” so the LLM understands the context.
Hint 2 (nearly the answer)
context = "\n".join([f"Q: {q}\nA: {a}" for q, a in conversation_history])
augmented_question = f"Previous conversation:\n{context}\n\nNew question: {question}"
Solution
conversation_history = []
def ask_with_context(question: str) -> str:
if conversation_history:
context = "\n".join(
[f"Q: {q}\nA: {a}" for q, a in conversation_history]
)
augmented = f"Previous conversation:\n{context}\n\nNew question: {question}"
else:
augmented = question
result = sql_agent.invoke({
"question": augmented,
"schema_info": "",
"generated_sql": "",
"sql_valid": False,
"query_result": "",
"error_message": "",
"answer": "",
"retry_count": 0,
})
conversation_history.append((question, result["answer"]))
print(f"Q: {question}")
print(f"A: {result['answer']}")
print("-" * 60)
return result["answer"]
The key trick is augmenting the question with prior context rather than modifying the agent’s internals. The LLM reads the history and resolves references like “those” or “that category” into concrete SQL filters.
When NOT to Use a LangGraph SQL Agent
I see developers reach for SQL agents whenever they have a database, but this architecture has clear boundaries. It works well for read-only analytics queries against structured databases. It doesn’t fit every situation.
Don’t use it for write operations. Our agent generates and executes SQL. If the LLM writes an UPDATE, DELETE, or DROP statement, you’ve got a problem. In production, connect to a read-only replica or use a database user with SELECT-only permissions.
Don’t use it for real-time dashboards. Each agent invocation makes 2-3 LLM calls. That’s 2-5 seconds of latency. For dashboards that refresh every few seconds, pre-written queries or a BI tool like Metabase are better.
Don’t use it for complex analytical workflows. Questions like “run a regression on last year’s sales data” aren’t SQL problems. They need pandas, scikit-learn, or a code execution agent. A SQL agent is designed for questions that have a single SQL answer.
Don’t use it with untrusted users without guardrails. Prompt injection attacks can trick the LLM into generating harmful queries. If external users interact with the agent, add query whitelisting, parameterization, and strict output validation.
What Would Be Different in Production
The agent we built handles the happy path and common error cases. For production deployment, you’d add several hardening layers. I’d prioritize them in this order.
Read-only database connections. Use a database user with SELECT-only grants. This prevents the LLM from generating destructive queries, even if prompt injection is attempted.
Query timeouts. Wrap query execution in a timeout. A 30-second ceiling prevents runaway queries from locking your database.
Result size limits. Add LIMIT 1000 to every query that doesn’t already have one. Large result sets waste LLM tokens and slow down summarization.
Caching. Identical questions should return cached results. Use the question text (or its hash) as the cache key. This cuts costs and latency for repeated queries.
Logging and observability. Log every question, generated SQL, execution time, and result. LangSmith is the natural choice for LangGraph applications — it traces every node execution and shows you where failures happen.
Schema caching. Our agent reads the schema on every invocation. In production, cache the schema and refresh it on a schedule (e.g., every hour). Schema changes are rare.
Complete Code
Click to expand the full script (copy-paste and run)
# Complete code from: Build a SQL Database Agent That Answers Business Questions
# Requires: pip install langgraph langchain-openai langchain-core langchain-community python-dotenv
# Python 3.10+, OpenAI API key in .env file
import os
import sqlite3
from typing import TypedDict, Optional
from datetime import datetime
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
load_dotenv()
# --- Database Setup ---
conn = sqlite3.connect(":memory:")
cursor = conn.cursor()
cursor.executescript("""
CREATE TABLE customers (
id INTEGER PRIMARY KEY, name TEXT NOT NULL,
email TEXT NOT NULL, city TEXT NOT NULL, signup_date TEXT NOT NULL
);
CREATE TABLE products (
id INTEGER PRIMARY KEY, name TEXT NOT NULL,
category TEXT NOT NULL, price REAL NOT NULL
);
CREATE TABLE orders (
id INTEGER PRIMARY KEY, customer_id INTEGER NOT NULL,
order_date TEXT NOT NULL, status TEXT NOT NULL,
FOREIGN KEY (customer_id) REFERENCES customers(id)
);
CREATE TABLE order_items (
id INTEGER PRIMARY KEY, order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL, quantity INTEGER NOT NULL,
FOREIGN KEY (order_id) REFERENCES orders(id),
FOREIGN KEY (product_id) REFERENCES products(id)
);
""")
cursor.executemany("INSERT INTO customers VALUES (?, ?, ?, ?, ?)", [
(1, "Alice Johnson", "alice@example.com", "New York", "2024-01-15"),
(2, "Bob Smith", "bob@example.com", "Chicago", "2024-03-22"),
(3, "Carol Davis", "carol@example.com", "New York", "2024-06-10"),
(4, "Dan Wilson", "dan@example.com", "Austin", "2024-02-28"),
(5, "Eva Martinez", "eva@example.com", "Chicago", "2024-07-04"),
(6, "Frank Lee", "frank@example.com", "Austin", "2024-09-15"),
(7, "Grace Kim", "grace@example.com", "New York", "2025-01-10"),
(8, "Henry Brown", "henry@example.com", "Chicago", "2025-04-20"),
])
cursor.executemany("INSERT INTO products VALUES (?, ?, ?, ?)", [
(1, "Laptop Pro", "Electronics", 1299.99),
(2, "Wireless Mouse", "Electronics", 29.99),
(3, "Python Cookbook", "Books", 49.99),
(4, "Standing Desk", "Furniture", 599.99),
(5, "Monitor 27in", "Electronics", 349.99),
(6, "Keyboard Mech", "Electronics", 89.99),
(7, "Data Science Handbook", "Books", 39.99),
(8, "Desk Lamp", "Furniture", 45.99),
(9, "USB-C Hub", "Electronics", 59.99),
(10, "Webcam HD", "Electronics", 79.99),
])
cursor.executemany("INSERT INTO orders VALUES (?, ?, ?, ?)", [
(1, 1, "2025-07-10", "completed"), (2, 2, "2025-07-18", "completed"),
(3, 3, "2025-08-05", "completed"), (4, 1, "2025-08-22", "completed"),
(5, 4, "2025-09-03", "completed"), (6, 5, "2025-09-15", "completed"),
(7, 2, "2025-10-01", "completed"), (8, 6, "2025-10-12", "completed"),
(9, 3, "2025-11-05", "completed"), (10, 7, "2025-11-20", "completed"),
(11, 1, "2025-12-01", "completed"), (12, 8, "2025-12-15", "cancelled"),
])
cursor.executemany("INSERT INTO order_items VALUES (?, ?, ?, ?)", [
(1, 1, 1, 1), (2, 1, 2, 2), (3, 2, 3, 1), (4, 2, 6, 1),
(5, 3, 5, 1), (6, 3, 9, 2), (7, 4, 2, 3), (8, 4, 7, 1),
(9, 5, 4, 1), (10, 5, 8, 2), (11, 6, 1, 1), (12, 6, 6, 1),
(13, 7, 3, 2), (14, 7, 10, 1), (15, 8, 5, 1), (16, 8, 2, 1),
(17, 9, 1, 1), (18, 9, 4, 1), (19, 10, 7, 2), (20, 10, 9, 1),
])
conn.commit()
# --- State Definition ---
class SQLAgentState(TypedDict):
question: str
schema_info: str
generated_sql: str
sql_valid: bool
query_result: str
error_message: str
answer: str
retry_count: int
# --- LLM ---
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# --- Node Functions ---
def get_schema_node(state: SQLAgentState) -> dict:
schema_parts = []
tables = cursor.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
for (table_name,) in tables:
columns = cursor.execute(f"PRAGMA table_info({table_name})").fetchall()
col_defs = [f" {c[1]} {c[2]}" for c in columns]
samples = cursor.execute(f"SELECT * FROM {table_name} LIMIT 3").fetchall()
schema_parts.append(
f"TABLE: {table_name}\nCOLUMNS:\n" + "\n".join(col_defs) +
f"\nSAMPLE ROWS: {samples}"
)
return {"schema_info": "\n\n".join(schema_parts)}
def generate_query_node(state: SQLAgentState) -> dict:
system_prompt = f"""You are a SQL expert. Generate a SQLite query to answer the user's question.
DATABASE SCHEMA:
{state['schema_info']}
RULES:
- Return ONLY the SQL query, no markdown, no explanation
- Use SQLite syntax (no ILIKE, use LIKE with LOWER() instead)
- Always use table aliases in JOINs
- Use single quotes for string literals
- For date comparisons, dates are stored as TEXT in 'YYYY-MM-DD' format"""
if state.get("error_message"):
system_prompt += f"\n\nPREVIOUS ERROR:\n{state['error_message']}\nFAILED QUERY:\n{state['generated_sql']}"
response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=state["question"])])
sql = response.content.strip()
if sql.startswith("```"):
sql = sql.split("\n", 1)[1].rsplit("```", 1)[0].strip()
return {"generated_sql": sql}
def validate_query_node(state: SQLAgentState) -> dict:
try:
cursor.execute(f"EXPLAIN {state['generated_sql']}")
return {"sql_valid": True, "error_message": ""}
except Exception as e:
return {"sql_valid": False, "error_message": f"Validation error: {str(e)}"}
def execute_query_node(state: SQLAgentState) -> dict:
try:
cursor.execute(state["generated_sql"])
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
if not rows:
return {"query_result": "Query returned no results.", "error_message": ""}
header = " | ".join(columns)
row_strings = [" | ".join(str(val) for val in row) for row in rows]
result = f"{header}\n{'-' * len(header)}\n" + "\n".join(row_strings)
return {"query_result": result, "error_message": ""}
except Exception as e:
return {"query_result": "", "error_message": f"Execution error: {str(e)}"}
def summarize_node(state: SQLAgentState) -> dict:
response = llm.invoke([
SystemMessage(content="You are a data analyst. Answer the question using the query results. Be concise (2-4 sentences). Use $ for currency. Don't mention SQL or databases."),
HumanMessage(content=f"Question: {state['question']}\n\nResults:\n{state['query_result']}"),
])
return {"answer": response.content}
def route_after_validation(state: SQLAgentState) -> str:
if state["sql_valid"]:
return "execute"
elif state["retry_count"] < 3:
return "retry"
return "give_up"
def increment_retry(state: SQLAgentState) -> dict:
return {"retry_count": state["retry_count"] + 1}
def give_up_node(state: SQLAgentState) -> dict:
return {"answer": f"Unable to answer after {state['retry_count']} attempts. Last error: {state['error_message']}"}
# --- Build Graph ---
graph = StateGraph(SQLAgentState)
graph.add_node("get_schema", get_schema_node)
graph.add_node("generate_query", generate_query_node)
graph.add_node("validate_query", validate_query_node)
graph.add_node("execute_query", execute_query_node)
graph.add_node("summarize", summarize_node)
graph.add_node("increment_retry", increment_retry)
graph.add_node("give_up", give_up_node)
graph.add_edge(START, "get_schema")
graph.add_edge("get_schema", "generate_query")
graph.add_edge("generate_query", "validate_query")
graph.add_conditional_edges("validate_query", route_after_validation, {
"execute": "execute_query", "retry": "increment_retry", "give_up": "give_up",
})
graph.add_edge("increment_retry", "generate_query")
graph.add_edge("execute_query", "summarize")
graph.add_edge("summarize", END)
graph.add_edge("give_up", END)
sql_agent = graph.compile()
# --- Run ---
def ask(question: str) -> str:
result = sql_agent.invoke({
"question": question, "schema_info": "", "generated_sql": "",
"sql_valid": False, "query_result": "", "error_message": "",
"answer": "", "retry_count": 0,
})
print(f"Q: {question}\nSQL: {result['generated_sql']}\nA: {result['answer']}\n")
return result["answer"]
ask("How many customers do we have?")
ask("What are our top 3 products by total revenue?")
ask("How many orders were placed in Q4 2025?")
ask("Which city has the highest average order value?")
print("Script completed successfully.")
Summary
You built a complete SQL database agent using LangGraph. The agent takes natural language questions, inspects the database schema, generates SQL, validates and executes the query with automatic retries, and summarizes results for business users.
The key architectural decisions were:
- Custom TypedDict state over MessagesState — because the agent has distinct processing stages, not a free-form conversation
- Schema inspection as the first node — giving the LLM table structure and sample data dramatically improves SQL quality
- Validate before execute — catching errors before they hit the database and routing to a retry loop
- Conditional edges for self-correction — the retry mechanism is what makes this an agent rather than a script
To take this further, try connecting it to a PostgreSQL or MySQL database by swapping the SQLite connection. Add human-in-the-loop approval (covered in post 13 of this series) before executing queries on sensitive data. Or extend the state to track conversation history for multi-turn analytics sessions.
Practice exercise: Extend the agent with a check_permissions node that rejects queries containing INSERT, UPDATE, DELETE, DROP, ALTER, or TRUNCATE keywords. Add it between the generate_query and validate_query nodes in the graph.
Solution
FORBIDDEN_KEYWORDS = {"INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "CREATE"}
def check_permissions_node(state: SQLAgentState) -> dict:
sql_upper = state["generated_sql"].upper().strip()
first_word = sql_upper.split()[0] if sql_upper else ""
if first_word in FORBIDDEN_KEYWORDS:
return {
"sql_valid": False,
"error_message": f"Query rejected: {first_word} operations are not permitted. Only SELECT queries are allowed.",
}
return {"sql_valid": True, "error_message": ""}
# Add to graph between generate_query and validate_query:
# graph.add_edge("generate_query", "check_permissions")
# graph.add_conditional_edges("check_permissions", route_after_validation, {...})
This adds a security layer that catches destructive queries before they even reach validation. In production, combine this with a read-only database user for defense in depth.
Frequently Asked Questions
Can this agent work with PostgreSQL or MySQL instead of SQLite?
Yes. Replace the sqlite3 connection with psycopg2 (PostgreSQL) or mysql-connector-python (MySQL). You’ll also need to update the schema inspection node — PostgreSQL uses information_schema.columns instead of PRAGMA table_info, and MySQL uses DESCRIBE table_name. The LLM prompt should specify the SQL dialect too.
How do I prevent SQL injection when using an LLM-generated query?
The primary defense is a read-only database connection. Create a database user with only SELECT privileges. Additionally, set query timeouts (30 seconds is a good default), add row limits to prevent data exfiltration, and consider maintaining an allowlist of permitted table names. Our validation node catches syntax errors, but it won’t stop semantically valid but malicious queries.
What happens if the database schema changes?
Our agent reads the schema fresh on every invocation, so it automatically picks up new tables and columns. In production with schema caching, set a cache TTL (e.g., 1 hour) and invalidate on deployment. If a column is renamed, the agent will generate errors until the cache refreshes — another reason the retry loop is essential.
Can the agent handle queries that need multiple SQL statements?
Not in this implementation. The agent generates and executes a single query per question. For multi-statement analysis (“compare Q3 vs Q4 revenue”), the LLM usually writes a single query with CASE expressions or subqueries. For truly multi-step analysis, extend the state with a queries list and add a planning node that breaks the question into sequential sub-queries.
References
- LangGraph Documentation — Build a SQL Agent. Link
- LangChain Documentation — SQL Database Toolkit. Link
- SQLite Documentation — EXPLAIN Query Plan. Link
- Yao, S. et al. (2022). “ReAct: Synergizing Reasoning and Acting in Language Models.” arXiv:2210.03629. Link
- LangGraph GitHub — SQL Agent Example. Link
- Python Documentation — sqlite3 Module. Link
- OpenAI Documentation — Function Calling. Link
- LangChain Blog — Building Data Agents. Link
Reviewed: March 2026 | LangGraph version: 0.4+ | langchain-openai: 0.3+
[SCHEMA HINTS]
– Article type: Tutorial / Project Walkthrough
– Primary technology: LangGraph 0.4+, Python 3.10+, SQLite
– Programming language: Python
– Difficulty: Advanced
– Keywords: LangGraph SQL agent, SQL database agent, natural language to SQL, text-to-SQL LangGraph, business questions SQL agent
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →