Project — Build a Multi-Agent Research Assistant in LangGraph
You need a research report on a new topic. You paste the query into ChatGPT, get a decent summary, but it misses nuance. It doesn’t cross-reference sources. It doesn’t structure findings the way you’d want. So you do it yourself — search, read, take notes, organize, write. That takes hours. What if you could build a system where three specialized agents handle the entire pipeline automatically?
That’s exactly what we’ll build here. A planner agent breaks your question into research sub-tasks. A researcher agent searches the web for each sub-task and extracts key findings. A writer agent synthesizes everything into a structured report. A reviewer agent checks quality and can loop back for revisions. The whole system runs on LangGraph, with conditional routing controlling the handoffs.
Before we write code, here’s how data flows through this system.
You give it a research question — something like “What are the latest advances in protein folding prediction?” The planner receives this and breaks it into 3-5 focused sub-questions. Each sub-question targets a specific angle: recent breakthroughs, key methods, leading teams, practical applications.
Those sub-questions flow to the researcher. For each one, the researcher searches the web, collects relevant information, and produces a structured finding with source attribution. By the end, you have sourced findings covering every angle of the topic.
The writer takes all those findings and builds a coherent report. It organizes them by theme, writes section summaries, adds an executive summary, and lists all sources. The output is a polished document you could share with your team.
The reviewer reads the report and compares it against the original sub-questions. If coverage is sufficient, it approves. If something’s missing, it routes back to the researcher for another pass. A recursion limit prevents infinite loops.
We’ll build each piece, wire them together, and run the full pipeline on a real research question.
What Makes This a Multi-Agent Problem?
Could a single agent handle this? Technically, yes. But it’d struggle badly.
A single agent with a prompt that says “plan research, search the web, and write a report” faces three problems. Its context window fills with tool descriptions it doesn’t need at each stage. Its prompt tries to juggle three conflicting roles. And when something goes wrong, you can’t tell which stage failed.
Multi-agent systems fix this with separation of concerns. Each agent gets:
- A focused system prompt. The planner only thinks about decomposing questions. The researcher only thinks about finding information. The writer only thinks about presenting.
- Only the tools it needs. The planner needs no tools — it just reasons. The researcher needs a search tool. The writer works with the collected findings.
- Its own slice of the conversation. No cross-contamination between stages.
| Approach | Prompt Complexity | Tool Clarity | Debuggability |
|---|---|---|---|
| Single agent | High — one prompt covers all roles | Low — all tools visible always | Hard — can’t isolate failures |
| Multi-agent | Low — each prompt is focused | High — each agent sees only its tools | Easy — trace shows which agent failed |
Prerequisites
- Python version: 3.10+
- Required libraries: langgraph (0.4+), langchain-openai (0.3+), langchain-core (0.3+), langchain-community (0.3+), tavily-python (0.5+)
- Install:
pip install langgraph langchain-openai langchain-core langchain-community tavily-python - API keys: An OpenAI API key (
OPENAI_API_KEY) and a Tavily API key (TAVILY_API_KEY). See OpenAI’s docs and Tavily’s docs to create them. - Time to complete: ~45 minutes
- Prior knowledge: Basic LangGraph concepts (nodes, edges, state) from earlier posts in this series.
Setting Up the Project
The first code block imports everything we need. We use ChatOpenAI for the LLM, TavilySearchResults as our web search tool, and LangGraph’s StateGraph for building the agent workflow. The TypedDict defines our shared state — the central data structure all agents read from and write to.
import os
import json
from typing import Annotated, TypedDict
from datetime import datetime
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import create_react_agent
os.environ["OPENAI_API_KEY"] = "your-openai-key-here"
os.environ["TAVILY_API_KEY"] = "your-tavily-key-here"
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
Designing the Shared State
Every LangGraph graph needs a state object. Think of it as a shared whiteboard that all agents can read from and write to. For our research assistant, the state tracks the original query, sub-questions from the planner, findings from the researcher, and the final report.
The research_findings field uses a list that grows as the researcher works. Each finding is a dictionary with a question, answer, and source URLs. The current_step field acts as a routing signal — it tells the supervisor which agent should run next.
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
research_query: str
sub_questions: list[str]
research_findings: list[dict]
final_report: str
current_step: str
review_count: int
Seven fields. The messages field uses LangGraph’s add_messages reducer, which appends new messages instead of replacing them. The review_count field prevents infinite review loops — more on that later.
Why not just use MessagesState? Because a flat message list doesn’t give you structure. With typed fields, the planner writes to sub_questions, the researcher writes to research_findings, and the writer reads both. Each agent knows exactly where its data lives.
Building the Planner Agent
The planner’s job is straightforward but critical: take a broad research question and break it into 3-5 focused sub-questions. Good sub-questions make the difference between a thorough report and a shallow one.
The planner doesn’t need any tools. It uses the LLM’s reasoning ability to decompose the question. We define it as a regular function that returns state updates — LangGraph merges those updates into the existing state automatically.
def planner_agent(state: ResearchState) -> dict:
"""Break the research query into focused sub-questions."""
query = state["research_query"]
planner_prompt = f"""You are a research planner. Break down
a research question into 3-5 focused sub-questions that together
provide comprehensive coverage of the topic.
Research question: {query}
Return ONLY a JSON list of strings. Each string is one sub-question.
Example: ["What is X?", "How does X compare to Y?"]
"""
response = model.invoke([HumanMessage(content=planner_prompt)])
sub_questions = json.loads(response.content)
return {
"sub_questions": sub_questions,
"current_step": "research",
"messages": [AIMessage(
content=f"Created {len(sub_questions)} sub-questions: {sub_questions}",
name="planner"
)]
}
Notice the return format. The function returns a dictionary with only the fields it wants to update. The planner doesn’t touch research_findings or final_report — those aren’t its responsibility.
[UNDER THE HOOD]
Why return a dict instead of a full state? LangGraph uses a merge strategy. When an agent returns {"sub_questions": [...], "current_step": "research"}, LangGraph updates only those two fields. Everything else stays unchanged. This means agents can’t accidentally overwrite each other’s data — a critical safety property in multi-agent systems.
Predict the output: If your research query is “What are the benefits of meditation?”, what kinds of sub-questions would you expect? Think about it before running — you’d want questions covering physical health benefits, mental health benefits, scientific evidence, and practical getting-started advice.
Building the Researcher Agent
This is where the real work happens. The researcher takes each sub-question from the planner, searches the web using Tavily, and produces a structured finding with a question, a summary answer, and source URLs.
We set up Tavily with max_results=3 to keep things focused. More results means more noise without much benefit for summary-style research. The researcher iterates through sub-questions, calls the search API, and asks the LLM to summarize what it finds.
search_tool = TavilySearchResults(max_results=3)
def researcher_agent(state: ResearchState) -> dict:
"""Research each sub-question using web search."""
sub_questions = state["sub_questions"]
findings = []
errors = []
for question in sub_questions:
try:
search_results = search_tool.invoke({"query": question})
if not search_results:
errors.append(f"No results for: {question}")
continue
context = "\n".join([
f"Source: {r['url']}\nContent: {r['content']}"
for r in search_results
])
research_prompt = f"""Answer this question based on search results.
Return valid JSON: {{"question": "...", "answer": "...", "sources": [...]}}
Question: {question}
Search Results:
{context}"""
response = model.invoke([HumanMessage(content=research_prompt)])
finding = json.loads(response.content)
findings.append(finding)
except json.JSONDecodeError:
errors.append(f"JSON parse error for: {question}")
except Exception as e:
errors.append(f"Error researching '{question}': {str(e)}")
error_msg = f" Errors: {errors}" if errors else ""
return {
"research_findings": findings,
"current_step": "writing",
"messages": [AIMessage(
content=f"Researched {len(findings)}/{len(sub_questions)}.{error_msg}",
name="researcher"
)]
}
The error handling here is important. If one search fails, the others still complete. The try-except block catches both API errors and JSON parsing failures. Error messages get logged in the agent’s message so you can inspect them later.
Exercise 1: Add a Relevance Filter to the Researcher
The researcher currently uses all search results regardless of quality. Add a relevance scoring step that filters out low-quality results before summarizing.
type: 'exercise'
id: 'relevance-filter'
title: 'Exercise 1: Add a Relevance Filter'
difficulty: 'advanced'
exerciseType: 'write'
instructions: |
Modify the researcher to score each search result's relevance (0-10)
before including it in the context. Only include results scoring 7+.
If no results pass the threshold, include the top result anyway.
starterCode: |
def filter_relevant_results(question, search_results, model):
"""Score and filter search results by relevance."""
filtered = []
for result in search_results:
# TODO: Ask the model to score relevance 0-10
# TODO: Include only results scoring 7+
pass
# If nothing passed, keep the best result
if not filtered and search_results:
filtered = [search_results[0]]
return filtered
testCases:
- id: 'tc1'
input: 'print(len(filter_relevant_results("test", [{"content": "relevant"}, {"content": "irrelevant"}], model)))'
expectedOutput: '# Output depends on model scoring'
description: 'Should return filtered results'
- id: 'tc2'
input: 'print(type(filter_relevant_results("test", [], model)))'
expectedOutput: "<class 'list'>"
description: 'Should return a list even with empty input'
hints:
- 'Ask the model: "Score the relevance of this content to the question on a scale of 0-10. Return only the number."'
- 'Parse the score with int(response.content.strip()), then check if score >= 7'
solution: |
def filter_relevant_results(question, search_results, model):
filtered = []
for result in search_results:
score_prompt = f"Score 0-10 how relevant this is to '{question}': {result['content'][:200]}. Return ONLY the number."
response = model.invoke([HumanMessage(content=score_prompt)])
try:
score = int(response.content.strip())
if score >= 7:
filtered.append(result)
except ValueError:
continue
if not filtered and search_results:
filtered = [search_results[0]]
return filtered
solutionExplanation: |
The function asks the LLM to score each result individually.
Results below 7/10 are dropped. If nothing passes, we keep
the best available result to avoid empty findings.
xpReward: 20
Building the Writer Agent
The writer receives all research findings and produces a structured report. It doesn’t search or plan — it only synthesizes and organizes. I prefer keeping the writer completely tool-free. Its entire job is turning structured data into readable prose.
The writer’s prompt is the longest because formatting matters here. A good research report needs an executive summary, organized sections, and a source list. The prompt instructs the LLM to organize by theme rather than by question — this produces a more coherent read.
def writer_agent(state: ResearchState) -> dict:
"""Synthesize research findings into a structured report."""
query = state["research_query"]
findings = state.get("research_findings", [])
if not findings:
return {
"final_report": "No research findings available.",
"current_step": "complete",
"messages": [AIMessage(content="No findings to write.", name="writer")]
}
findings_text = "\n\n".join([
f"Q: {f['question']}\nA: {f['answer']}\nSources: {', '.join(f['sources'])}"
for f in findings
])
writer_prompt = f"""Synthesize these findings into a research report.
Original question: {query}
Research findings:
{findings_text}
Structure the report as:
1. Executive Summary (2-3 sentences)
2. Key Findings (organized by theme, not by question)
3. Detailed Analysis (expand on each finding with source citations)
4. Sources (list all unique URLs)
Write clearly. Use short paragraphs. Cite sources inline."""
response = model.invoke([HumanMessage(content=writer_prompt)])
return {
"final_report": response.content,
"current_step": "review",
"messages": [AIMessage(content="Report draft completed.", name="writer")]
}
The writer reads research_findings directly from state — no message parsing needed. Notice that it sets current_step to “review”, not “complete”. The report goes to the reviewer before it’s finalized.
The validation at the top is important. If the researcher failed on all sub-questions, findings could be empty. Without that guard, the writer would crash or produce a meaningless report.
Adding the Quality Reviewer
Here’s where the architecture gets interesting. A linear pipeline — plan, research, write — works, but it has no quality check. What if the researcher missed an important angle? What if the report is poorly organized?
The reviewer reads the final report, compares it to the original sub-questions, and decides: approve or send back for revision. If it sends work back, the researcher runs again with the feedback in the message history. This creates a cycle in the graph.
def reviewer_agent(state: ResearchState) -> dict:
"""Review the report quality and decide next steps."""
if state.get("review_count", 0) >= 2:
return {
"current_step": "complete",
"messages": [AIMessage(
content="Max reviews reached. Approving report.",
name="reviewer"
)]
}
report = state["final_report"]
sub_questions = state["sub_questions"]
review_prompt = f"""You are a research quality reviewer.
Review this report against the original sub-questions.
Sub-questions: {json.dumps(sub_questions)}
Report:
{report}
Rate as APPROVED or NEEDS_REVISION.
If NEEDS_REVISION, say what's missing in 1-2 sentences.
Return JSON: {{"verdict": "APPROVED" or "NEEDS_REVISION", "feedback": "..."}}
"""
response = model.invoke([HumanMessage(content=review_prompt)])
review = json.loads(response.content)
if review["verdict"] == "APPROVED":
return {
"current_step": "complete",
"messages": [AIMessage(content="Report APPROVED.", name="reviewer")]
}
else:
return {
"current_step": "research",
"review_count": state.get("review_count", 0) + 1,
"messages": [AIMessage(
content=f"Revision needed: {review['feedback']}",
name="reviewer"
)]
}
The review_count check at the top is your safety net. Without it, a perfectionist reviewer could loop forever, racking up API costs. Two revision cycles is a reasonable limit — after that, publish what you have.
Exercise 2: Build a Scoring Reviewer
The current reviewer gives a binary verdict (APPROVED/NEEDS_REVISION). Build a version that scores the report on three dimensions and only approves if all scores are above a threshold.
type: 'exercise'
id: 'scoring-reviewer'
title: 'Exercise 2: Multi-Dimension Review Scoring'
difficulty: 'advanced'
exerciseType: 'write'
instructions: |
Modify the reviewer to score the report on three dimensions:
coverage (0-10), depth (0-10), and coherence (0-10).
Approve only if ALL scores are 7+. Return the scores in the state.
starterCode: |
def scoring_reviewer(state: ResearchState) -> dict:
report = state["final_report"]
sub_questions = state["sub_questions"]
review_prompt = f"""Score this report on 3 dimensions (0-10 each):
- coverage: does it address all sub-questions?
- depth: are answers substantive or shallow?
- coherence: does it flow logically?
Sub-questions: {json.dumps(sub_questions)}
Report: {report}
Return JSON: {{"coverage": N, "depth": N, "coherence": N}}"""
response = model.invoke([HumanMessage(content=review_prompt)])
scores = json.loads(response.content)
# TODO: Check if all scores >= 7
# TODO: Return appropriate current_step
pass
testCases:
- id: 'tc1'
input: 'print("approved" if all(v >= 7 for v in {"coverage": 8, "depth": 9, "coherence": 7}.values()) else "revision")'
expectedOutput: 'approved'
description: 'All scores 7+ should approve'
- id: 'tc2'
input: 'print("approved" if all(v >= 7 for v in {"coverage": 8, "depth": 5, "coherence": 7}.values()) else "revision")'
expectedOutput: 'revision'
description: 'Any score below 7 should trigger revision'
hints:
- 'Use all(score >= 7 for score in scores.values()) to check the threshold'
- 'If approved, set current_step to "complete". If not, set it to "research" and increment review_count'
solution: |
def scoring_reviewer(state: ResearchState) -> dict:
if state.get("review_count", 0) >= 2:
return {"current_step": "complete",
"messages": [AIMessage(content="Max reviews.", name="reviewer")]}
report = state["final_report"]
sub_questions = state["sub_questions"]
review_prompt = f"""Score this report (0-10 each):
coverage, depth, coherence.
Sub-questions: {json.dumps(sub_questions)}
Report: {report}
Return JSON: {{"coverage": N, "depth": N, "coherence": N}}"""
response = model.invoke([HumanMessage(content=review_prompt)])
scores = json.loads(response.content)
approved = all(v >= 7 for v in scores.values())
if approved:
return {"current_step": "complete",
"messages": [AIMessage(content=f"APPROVED. Scores: {scores}", name="reviewer")]}
else:
return {"current_step": "research",
"review_count": state.get("review_count", 0) + 1,
"messages": [AIMessage(content=f"Scores: {scores}. Needs revision.", name="reviewer")]}
solutionExplanation: |
The scoring reviewer uses structured scores instead of a binary verdict.
The all() check ensures every dimension meets the 7/10 threshold.
The review_count safeguard still prevents infinite loops.
xpReward: 20
Wiring the Graph
Everything connects here. We add each agent as a node and use conditional edges to route between them. The route_step function reads current_step from the state and returns the name of the next node.
def route_step(state: ResearchState) -> str:
"""Route to the next agent based on current step."""
step = state.get("current_step", "planning")
routes = {
"planning": "planner",
"research": "researcher",
"writing": "writer",
"review": "reviewer",
"complete": END,
}
return routes.get(step, END)
workflow = StateGraph(ResearchState)
workflow.add_node("planner", planner_agent)
workflow.add_node("researcher", researcher_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("reviewer", reviewer_agent)
workflow.add_conditional_edges(START, route_step)
workflow.add_conditional_edges("planner", route_step)
workflow.add_conditional_edges("researcher", route_step)
workflow.add_conditional_edges("writer", route_step)
workflow.add_conditional_edges("reviewer", route_step)
graph = workflow.compile()
The flow is: START -> planner -> researcher -> writer -> reviewer -> (END or back to researcher). Each agent sets current_step in its return value, and route_step reads it to decide what happens next.
Why use conditional edges everywhere instead of fixed edges for the linear parts? Because this pattern scales cleanly. When you add a new agent — say, a fact-checker between researcher and writer — you just add a node and update the routing dictionary. No rewiring of edge logic.
Here’s the graph structure visualized:
START --> planner --> researcher --> writer --> reviewer --+--> END
^ |
+---------- (revision) -----------+
Running the Full Pipeline
Time to test the complete system. We create an initial state and invoke the graph. The current_step starts at “planning”, so the router sends it to the planner first.
result = graph.invoke({
"messages": [HumanMessage(content="Start research")],
"research_query": "What are the latest advances in protein folding prediction?",
"sub_questions": [],
"research_findings": [],
"final_report": "",
"current_step": "planning",
"review_count": 0,
})
After the graph finishes, inspect the outputs. Each piece of the pipeline produced something you can examine independently.
print("=== Sub-Questions ===")
for i, q in enumerate(result["sub_questions"], 1):
print(f"{i}. {q}")
print("\n=== Research Findings ===")
for f in result["research_findings"]:
print(f"\nQ: {f['question']}")
print(f"A: {f['answer'][:150]}...")
print("\n=== Final Report (first 500 chars) ===")
print(result["final_report"][:500])
print("\n=== Agent Trace ===")
for msg in result["messages"]:
if hasattr(msg, "name") and msg.name:
print(f"[{msg.name}] {msg.content[:120]}")
The trace shows the sequential flow: planner creates sub-questions, researcher fills in findings, writer produces a draft, reviewer either approves or requests revision. If revision happened, you’ll see the researcher appear again in the trace.
Quick check: What would happen if you set current_step to “research” in the initial state? The router would skip the planner and go straight to the researcher. But sub_questions would be empty, so the researcher would loop over nothing and produce zero findings. The writer would then receive an empty list and return “No research findings available.” The state schema catches this mistake naturally.
When to Use This Pattern (and When Not To)
This planner-researcher-writer pattern works well for:
- Open-ended research where the topic needs decomposition into sub-questions.
- Report generation where multiple sources need synthesis into a single document.
- Due diligence tasks where thoroughness matters more than speed.
It doesn’t fit well for:
- Simple factual questions (“What’s the capital of France?”). A single agent with search handles this faster and cheaper.
- Real-time applications where latency matters. Each agent adds a round-trip to the LLM. Four agents means four round-trips minimum.
- Tasks needing deep domain expertise. The agents are only as good as the search results. For specialized domains, you’d need custom knowledge bases and domain-specific tools.
Common Mistakes and How to Fix Them
Mistake 1: Overloading a single agent’s prompt
Wrong:
prompt = """You are a planner, researcher, and writer.
First break down the question, then search for each part,
then write a report. Use the search tool for research."""
Why it’s wrong: The agent can’t decide which role to play at each step. Tool selection breaks because the model sees all tools when it only needs one at a time.
Fix: Split into separate agents with focused prompts. Each agent gets only the tools it needs.
Mistake 2: Not validating state between agents
Wrong:
def writer_agent(state):
findings = state["research_findings"] # Crashes if empty
report = generate_report(findings)
Fix: Always validate inputs at the start of each agent function:
def writer_agent(state):
findings = state.get("research_findings", [])
if not findings:
return {"final_report": "No findings.", "current_step": "complete"}
Mistake 3: Missing recursion limits on review cycles
A reviewer that always finds flaws creates an infinite loop. API costs pile up before LangGraph’s default recursion limit (25 steps) kicks in.
Fix: Track review count in your state and hard-stop after 2-3 cycles:
if state.get("review_count", 0) >= 2:
return {"current_step": "complete"}
Exercise 3: Add a Fact-Checker Agent
Build a fact-checker agent that sits between the researcher and writer. It should verify each finding by running an independent search.
type: 'exercise'
id: 'fact-checker'
title: 'Exercise 3: Build a Fact-Checker Agent'
difficulty: 'advanced'
exerciseType: 'write'
instructions: |
Create a fact_checker_agent that:
1. Takes each finding from research_findings
2. Searches the web to verify the finding's answer
3. Flags findings as verified or unverified
4. Passes only verified findings forward
Add a "verified_findings" field to the state.
starterCode: |
def fact_checker_agent(state: ResearchState) -> dict:
"""Verify each finding with an independent search."""
findings = state["research_findings"]
verified = []
for finding in findings:
# TODO: Search to verify the finding
# TODO: Ask model if evidence supports the claim
# TODO: Append to verified if supported
pass
return {
"research_findings": verified, # Replace with only verified
"current_step": "writing",
"messages": [AIMessage(
content=f"Verified {len(verified)}/{len(findings)}.",
name="fact_checker"
)]
}
testCases:
- id: 'tc1'
input: 'print(type(fact_checker_agent({"research_findings": [], "current_step": "verify"})))'
expectedOutput: "<class 'dict'>"
description: 'Should return a dict'
hints:
- 'Search with: search_tool.invoke({"query": f"verify: {finding[\"answer\"][:100]}"})'
- 'Ask the model: "Does this evidence support the claim? Return JSON: {\"supported\": true/false}"'
solution: |
def fact_checker_agent(state: ResearchState) -> dict:
findings = state["research_findings"]
verified = []
for finding in findings:
try:
results = search_tool.invoke({"query": f"verify: {finding['answer'][:100]}"})
if results:
check_prompt = f"Does this evidence support the claim?\nClaim: {finding['answer']}\nEvidence: {results[0]['content'][:300]}\nReturn JSON: {{\"supported\": true/false}}"
resp = model.invoke([HumanMessage(content=check_prompt)])
check = json.loads(resp.content)
if check.get("supported", False):
verified.append(finding)
except Exception:
continue
return {"research_findings": verified, "current_step": "writing",
"messages": [AIMessage(content=f"Verified {len(verified)}/{len(findings)}.", name="fact_checker")]}
solutionExplanation: |
The fact-checker searches for independent evidence of each claim,
then asks the LLM to compare the evidence against the original finding.
Only findings with supporting evidence pass through. Failed verifications
are silently dropped to keep the report trustworthy.
xpReward: 20
Complete Code
Click to expand the full script (copy-paste and run)
# Complete code from: Build a Multi-Agent Research Assistant in LangGraph
# Requires: pip install langgraph langchain-openai langchain-core langchain-community tavily-python
# Python 3.10+
import os
import json
from typing import Annotated, TypedDict
from datetime import datetime
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import create_react_agent
# --- Configuration ---
os.environ["OPENAI_API_KEY"] = "your-openai-key-here"
os.environ["TAVILY_API_KEY"] = "your-tavily-key-here"
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
search_tool = TavilySearchResults(max_results=3)
# --- State ---
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
research_query: str
sub_questions: list[str]
research_findings: list[dict]
final_report: str
current_step: str
review_count: int
# --- Agents ---
def planner_agent(state: ResearchState) -> dict:
query = state["research_query"]
planner_prompt = f"""Break this research question into 3-5 focused sub-questions.
Return ONLY a JSON list of strings.
Research question: {query}"""
response = model.invoke([HumanMessage(content=planner_prompt)])
sub_questions = json.loads(response.content)
return {
"sub_questions": sub_questions,
"current_step": "research",
"messages": [AIMessage(
content=f"Created {len(sub_questions)} sub-questions.",
name="planner"
)]
}
def researcher_agent(state: ResearchState) -> dict:
sub_questions = state["sub_questions"]
findings = []
errors = []
for question in sub_questions:
try:
search_results = search_tool.invoke({"query": question})
if not search_results:
errors.append(f"No results for: {question}")
continue
context = "\n".join([
f"Source: {r['url']}\nContent: {r['content']}"
for r in search_results
])
research_prompt = f"""Answer this question based on the search results.
Return valid JSON: {{"question": "...", "answer": "...", "sources": [...]}}
Question: {question}
Search Results:
{context}"""
response = model.invoke([HumanMessage(content=research_prompt)])
finding = json.loads(response.content)
findings.append(finding)
except json.JSONDecodeError:
errors.append(f"JSON parse error for: {question}")
except Exception as e:
errors.append(f"Error: {str(e)}")
error_msg = f" Errors: {errors}" if errors else ""
return {
"research_findings": findings,
"current_step": "writing",
"messages": [AIMessage(
content=f"Researched {len(findings)}/{len(sub_questions)}.{error_msg}",
name="researcher"
)]
}
def writer_agent(state: ResearchState) -> dict:
query = state["research_query"]
findings = state.get("research_findings", [])
if not findings:
return {
"final_report": "No research findings available.",
"current_step": "complete",
"messages": [AIMessage(content="No findings to write.", name="writer")]
}
findings_text = "\n\n".join([
f"Q: {f['question']}\nA: {f['answer']}\nSources: {', '.join(f['sources'])}"
for f in findings
])
writer_prompt = f"""Synthesize these findings into a research report.
Original question: {query}
Findings:
{findings_text}
Structure: Executive Summary, Key Findings, Detailed Analysis, Sources.
Write clearly with short paragraphs and inline citations."""
response = model.invoke([HumanMessage(content=writer_prompt)])
return {
"final_report": response.content,
"current_step": "review",
"messages": [AIMessage(content="Report draft completed.", name="writer")]
}
def reviewer_agent(state: ResearchState) -> dict:
if state.get("review_count", 0) >= 2:
return {
"current_step": "complete",
"messages": [AIMessage(
content="Max reviews reached. Approving.",
name="reviewer"
)]
}
report = state["final_report"]
sub_questions = state["sub_questions"]
review_prompt = f"""Review this report against the sub-questions.
Rate as APPROVED or NEEDS_REVISION.
Return JSON: {{"verdict": "APPROVED" or "NEEDS_REVISION", "feedback": "..."}}
Sub-questions: {json.dumps(sub_questions)}
Report:
{report}"""
response = model.invoke([HumanMessage(content=review_prompt)])
review = json.loads(response.content)
if review["verdict"] == "APPROVED":
return {
"current_step": "complete",
"messages": [AIMessage(content="Report APPROVED.", name="reviewer")]
}
else:
return {
"current_step": "research",
"review_count": state.get("review_count", 0) + 1,
"messages": [AIMessage(
content=f"Revision needed: {review['feedback']}",
name="reviewer"
)]
}
# --- Routing ---
def route_step(state: ResearchState) -> str:
step = state.get("current_step", "planning")
routes = {
"planning": "planner",
"research": "researcher",
"writing": "writer",
"review": "reviewer",
"complete": END,
}
return routes.get(step, END)
# --- Graph ---
workflow = StateGraph(ResearchState)
workflow.add_node("planner", planner_agent)
workflow.add_node("researcher", researcher_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("reviewer", reviewer_agent)
workflow.add_conditional_edges(START, route_step)
workflow.add_conditional_edges("planner", route_step)
workflow.add_conditional_edges("researcher", route_step)
workflow.add_conditional_edges("writer", route_step)
workflow.add_conditional_edges("reviewer", route_step)
graph = workflow.compile()
# --- Run ---
result = graph.invoke({
"messages": [HumanMessage(content="Start research")],
"research_query": "What are the latest advances in protein folding prediction?",
"sub_questions": [],
"research_findings": [],
"final_report": "",
"current_step": "planning",
"review_count": 0,
})
# --- Output ---
print("=== Sub-Questions ===")
for i, q in enumerate(result["sub_questions"], 1):
print(f"{i}. {q}")
print("\n=== Final Report ===")
print(result["final_report"])
print("\n=== Agent Trace ===")
for msg in result["messages"]:
if hasattr(msg, "name") and msg.name:
print(f"[{msg.name}] {msg.content[:120]}")
print("\nScript completed successfully.")
Summary
You’ve built a four-agent research system from scratch. Here’s what each piece does and why it matters:
- Typed state fields replace flat message passing — each agent reads and writes to specific, named fields.
- Conditional routing based on
current_step— the state drives the workflow, not hardcoded edges. - A review cycle with a recursion safeguard — quality control without infinite loops.
- Error handling at the agent level — one failed sub-question doesn’t crash the pipeline.
Where can you take this next? Swap Tavily for a custom knowledge base and you have an internal research tool. Add a citation formatter and you have an academic assistant. Replace the writer with a slide generator and you have a presentation builder.
The foundation — typed state, conditional routing, specialized agents, review cycles — applies to any multi-agent system you’ll design in LangGraph.
Frequently Asked Questions
Can I use a different LLM for each agent?
Yes, and you should consider it. Create separate ChatOpenAI instances with different models. Use gpt-4o for the planner (better reasoning) and writer (better prose), but gpt-4o-mini for the researcher since it runs multiple times and cost adds up. Each agent function references its own model instance.
How do I add memory across research sessions?
Use LangGraph’s checkpointing. Add a MemorySaver when compiling: graph = workflow.compile(checkpointer=MemorySaver()). Pass a thread_id when invoking. The entire state — including past findings — persists between runs. This lets you build on previous research without starting over.
How do I run sub-questions in parallel?
LangGraph’s Send API handles this. Instead of a for-loop in the researcher, create a map node that dispatches each sub-question to a separate researcher instance. LangGraph runs them concurrently and collects results. Total latency drops from N * search_time to roughly 1 * search_time.
from langgraph.types import Send
def dispatch_research(state):
return [Send("research_one", {"question": q}) for q in state["sub_questions"]]
Is the Tavily API required?
No. Tavily is convenient because it returns clean text, but you can substitute any search API. DuckDuckGo (via langchain-community), SerpAPI, or even a custom RAG pipeline over your own documents would work. Just replace the search_tool instance.
What about rate limiting?
Add a sleep between API calls in the researcher loop: time.sleep(1). For production, use a proper rate limiter like tenacity with exponential backoff. Tavily’s free tier allows 1,000 searches per month. OpenAI rate limits depend on your plan tier.
References
- LangGraph documentation — Multi-agent architectures. Link
- LangGraph documentation — StateGraph and conditional edges. Link
- LangChain documentation — Tavily Search integration. Link
- LangGraph tutorials — Hierarchical Agent Teams. Link
- LangGraph tutorials — Multi-agent collaboration. Link
- OpenAI documentation — Chat Completions API. Link
- Tavily documentation — Search API reference. Link
- LangChain blog — Multi-agent workflows in LangGraph. Link
- LangGraph documentation — Map-reduce and parallel execution. Link
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →