Build an Autonomous Web Scraping Pipeline with LangGraph
You need data from a website. Not once — regularly. You write a scraper, it works for a week, then the site changes its layout and everything breaks. You patch CSS selectors, add error handling, bolt on pagination logic. Before long, you’ve got spaghetti code held together with try-except blocks. What if an AI agent could handle all of that — deciding what to scrape, recovering from failures, and summarizing what it found?
That’s what we’re building. A LangGraph pipeline where an LLM drives the entire scraping workflow autonomously.
Before we write any code, here’s how the pieces connect. The pipeline starts with a URL and a goal — something like “extract all product listings from this category page.” The first stage fetches the raw HTML. If the page fails to load, the agent retries or adjusts its approach.
Once HTML arrives, a parsing stage extracts structured data — product names, prices, ratings — based on what the LLM finds in the page structure. Then the agent checks: are there more pages? If pagination exists, it loops back to fetch the next page.
When all pages are scraped, the data flows into an analysis stage. The LLM summarizes trends, computes statistics, and produces a final report. Each stage feeds directly into the next through LangGraph’s state, and the whole thing runs as a single graph invocation.
The Pipeline Architecture — Five Nodes, One Loop
The pipeline has five nodes connected by conditional edges. Each node handles one responsibility:
| Node | Purpose | Input | Output |
|---|---|---|---|
| Fetch | Download HTML, handle HTTP errors | URL from state | Raw HTML or error status |
| Parse | Extract structured data via LLM | Raw HTML + goal | List of JSON records |
| Pagination Check | Find “next page” links | Raw HTML | Next URL or “done” signal |
| Accumulate | Merge new data with existing | New + existing records | Combined dataset |
| Analyze | Produce statistical summary | All collected records | Report string |
The conditional edge after the pagination check creates the loop. If more pages exist, flow returns to Fetch. Otherwise, it moves to Analyze. This is the core pattern — and it’s surprisingly simple to implement.
Prerequisites
- Python version: 3.10+
- Required libraries: langgraph (0.4+), langchain-openai (0.3+), langchain-core (0.3+), requests (2.31+), beautifulsoup4 (4.12+)
- Install:
pip install langgraph langchain-openai langchain-core requests beautifulsoup4 - API Key: OpenAI API key (set as
OPENAI_API_KEYenvironment variable). Create one at platform.openai.com/api-keys. - Prior knowledge: Basic LangGraph concepts — nodes, edges, state. Familiarity with Python’s
requestslibrary. - Time to complete: 35-40 minutes
import os
import json
import time
import requests
from bs4 import BeautifulSoup
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
The imports split into three groups: standard library, third-party scraping tools, and LangGraph/LangChain components. We’re using gpt-4o-mini because it’s fast, cheap, and more than capable for HTML analysis.
Define the Pipeline State
Every LangGraph graph needs a state schema — a shared data structure that nodes read from and write to. Think of it as the pipeline’s memory.
Our state tracks the scraping URL, the extraction goal, raw HTML content, accumulated data records, pagination counters, and the final analysis report. Here’s the full schema with an important detail to watch for in the Annotated types.
def merge_lists(existing: list, new: list) -> list:
"""Reducer that appends new items to existing list."""
return existing + new
class ScraperState(TypedDict):
url: str
goal: str
raw_html: str
extracted_data: Annotated[list[dict], merge_lists]
current_page: int
max_pages: int
next_page_url: str
analysis_report: str
error_log: Annotated[list[str], merge_lists]
retry_count: int
status: str
See the Annotated type on extracted_data and error_log? The merge_lists reducer tells LangGraph how to combine state updates. Without it, returning {"extracted_data": new_records} would replace the old list entirely. With the reducer, it appends. That’s critical for pagination — without it, each page’s data would overwrite the previous page’s data.
Build the Fetch Node — Downloading Pages with Retries
What happens when you request a webpage and the server returns a 503? Or the connection times out? The fetch node handles all of that.
It reads the current URL from state, sends a GET request with a browser-like user-agent header, and stores the HTML. On failure, it logs the error and increments a retry counter. The status field tells downstream nodes whether the fetch worked.
def fetch_page(state: ScraperState) -> dict:
"""Fetch HTML content from the current URL."""
current_page = state.get("current_page", 1)
if current_page > 1:
time.sleep(2) # Polite delay between requests
url = state.get("next_page_url") or state["url"]
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
try:
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
return {
"raw_html": response.text,
"status": "fetched",
"retry_count": 0,
}
except requests.RequestException as error:
return {
"status": "fetch_error",
"error_log": [f"Fetch failed for {url}: {str(error)}"],
"retry_count": state.get("retry_count", 0) + 1,
}
Two design choices worth noting. The user-agent header mimics Chrome — without it, many sites block requests from scripts outright. The retry counter resets to zero on success because we only care about consecutive failures, not lifetime totals.
The 2-second delay before non-first-page fetches is a courtesy. Hammering a server with rapid requests is a fast way to get your IP banned.
Build the Parse Node — LLM-Driven Data Extraction
Here’s where this pipeline diverges from traditional scrapers. Instead of hardcoded CSS selectors that break when a site redesigns, the LLM reads the page content and extracts data based on your goal description.
The parse node strips noise from the HTML (scripts, styles, navigation), converts the rest to plain text, and sends it to the LLM with extraction instructions. The LLM returns a JSON array of structured records. We truncate the text to 12,000 characters to stay within token limits — most useful content sits in the first third of a page anyway.
def parse_content(state: ScraperState) -> dict:
"""Use LLM to extract structured data from HTML."""
html = state["raw_html"]
goal = state["goal"]
soup = BeautifulSoup(html, "html.parser")
# Strip noise: scripts, styles, navigation, footer
for tag in soup(["script", "style", "nav", "footer"]):
tag.decompose()
clean_text = soup.get_text(separator="\n", strip=True)
truncated = clean_text[:12000]
prompt = f"""Extract structured data from this webpage.
Goal: {goal}
Return a JSON array of objects with consistent keys.
If no relevant data exists, return an empty array [].
Return ONLY valid JSON — no markdown, no explanation.
Webpage content:
{truncated}"""
response = llm.invoke([
SystemMessage(content="You are a data extraction specialist."),
HumanMessage(content=prompt),
])
try:
records = json.loads(response.content)
if not isinstance(records, list):
records = [records]
except json.JSONDecodeError:
return {
"extracted_data": [],
"error_log": ["LLM returned invalid JSON during parsing"],
"status": "parse_error",
}
return {"extracted_data": records, "status": "parsed"}
Why strip <script>, <style>, <nav>, and <footer> tags? A typical webpage is 50-100KB of HTML, but actual content might be 5KB. JavaScript, CSS rules, and repeated navigation links eat tokens without contributing useful data. Removing them cuts costs by 70-80% and improves extraction accuracy.
| Approach | Selector Maintenance | Adapts to Layout Changes | Cost per Page |
|---|---|---|---|
| Hardcoded CSS selectors | Manual — breaks on redesign | No | $0 |
| LLM-driven extraction | Zero — LLM adapts | Yes | ~$0.002 |
| Visual scraping tools | GUI re-training needed | Partially | Varies |
The tradeoff is clear. You’re paying a fraction of a cent per page to eliminate the maintenance burden that makes traditional scrapers a headache.
Build the Pagination Node — Following “Next Page” Links
Ever noticed how many scraping tutorials skip pagination entirely? In practice, most data you’d want lives across multiple pages. This node solves that.
Instead of hardcoding a pagination URL pattern (which breaks the moment the site changes its URL scheme), we pre-filter anchor tags for pagination keywords and let the LLM pick the right one. The max_pages guard prevents infinite loops.
def check_pagination(state: ScraperState) -> dict:
"""Determine if there are more pages to scrape."""
current = state.get("current_page", 1)
max_pages = state.get("max_pages", 5)
if current >= max_pages:
return {"status": "all_pages_scraped"}
soup = BeautifulSoup(state["raw_html"], "html.parser")
links = []
for a_tag in soup.find_all("a", href=True):
link_text = a_tag.get_text(strip=True).lower()
href = a_tag["href"]
if any(kw in link_text for kw in [
"next", ">>", "\u203a", "older"
]):
links.append(f"{link_text}: {href}")
if not links:
return {"status": "all_pages_scraped"}
prompt = f"""Which link leads to the next page of results?
Links found:
{chr(10).join(links)}
Current URL: {state.get('next_page_url') or state['url']}
Return ONLY the full absolute URL. If no next page exists, return NONE.
If the href is relative, combine it with the base URL."""
response = llm.invoke([HumanMessage(content=prompt)])
answer = response.content.strip()
if answer == "NONE" or not answer.startswith("http"):
return {"status": "all_pages_scraped"}
return {
"next_page_url": answer,
"current_page": current + 1,
"status": "has_next_page",
}
We pre-filter links before sending them to the LLM. A typical webpage has 50-200 anchor tags. Sending all of them wastes tokens and muddies the decision. Filtering for keywords like “next” and “>>” narrows the candidates to 1-3 links — a much easier choice for the model.
Build the Analysis Node — From Raw Data to Insights
Scraping without analysis is just data hoarding. What patterns hide in the data? What’s the distribution? Are there outliers? The analysis node answers these questions.
It sends the first 50 records to the LLM (staying within token limits) and asks for a structured report covering totals, statistics, patterns, and data quality. For a production system, I’d compute the statistics in Python and send only the summary — LLMs are great at interpretation but occasionally miscalculate arithmetic.
def analyze_data(state: ScraperState) -> dict:
"""Generate analytical summary from all scraped data."""
data = state.get("extracted_data", [])
if not data:
return {
"analysis_report": "No data was extracted.",
"status": "complete",
}
data_str = json.dumps(data[:50], indent=2)
prompt = f"""Analyze this scraped dataset and produce a report.
Total records collected: {len(data)}
Sample data (first 50 records):
{data_str}
Include these sections:
1. **Summary**: Total records, fields present, data completeness
2. **Key Statistics**: Counts, averages, ranges where applicable
3. **Notable Patterns**: Trends, outliers, interesting findings
4. **Data Quality Notes**: Missing fields, inconsistencies
Be specific. Use actual numbers from the data."""
response = llm.invoke([
SystemMessage(content="You are a data analyst. Be concise and specific."),
HumanMessage(content=prompt),
])
return {
"analysis_report": response.content,
"status": "complete",
}
Wire the Graph — Routing and Conditional Edges
This is the part that makes LangGraph click. We connect the nodes with edges, and two conditional edges create the branching logic: one for error recovery after fetch, one for the pagination loop.
The routing functions are deliberately simple. Each checks one state field and returns a string that maps to the next node.
def route_after_fetch(state: ScraperState) -> str:
"""Decide next step after fetching a page."""
if state["status"] == "fetched":
return "parse"
if state.get("retry_count", 0) < 3:
return "retry_fetch"
return "analyze"
def route_after_pagination(state: ScraperState) -> str:
"""Continue scraping or move to analysis."""
if state["status"] == "has_next_page":
return "fetch_next"
return "analyze"
Three scenarios for route_after_fetch: success goes to parsing, recoverable failure loops back to fetch, exhausted retries skip to analysis with whatever data we have. No nested conditions, no complex logic.
Here’s the full graph assembly. Watch how add_conditional_edges takes a routing function and an explicit mapping dictionary — every possible path is visible in the code.
graph = StateGraph(ScraperState)
# Add all nodes
graph.add_node("fetch", fetch_page)
graph.add_node("parse", parse_content)
graph.add_node("check_pagination", check_pagination)
graph.add_node("analyze", analyze_data)
# Entry point
graph.add_edge(START, "fetch")
# Conditional: parse on success, retry on failure, analyze on exhaustion
graph.add_conditional_edges(
"fetch",
route_after_fetch,
{
"parse": "parse",
"retry_fetch": "fetch",
"analyze": "analyze",
},
)
# After parsing, always check for more pages
graph.add_edge("parse", "check_pagination")
# Conditional: fetch next page or finalize
graph.add_conditional_edges(
"check_pagination",
route_after_pagination,
{
"fetch_next": "fetch",
"analyze": "analyze",
},
)
# Analysis is the terminal node
graph.add_edge("analyze", END)
scraper_agent = graph.compile()
That routing map — the third argument to add_conditional_edges — is what makes the graph self-documenting. Anyone reading this code can trace every possible execution path without running it. I find this far more readable than deeply nested if-else chains.
Run the Pipeline — A Complete Example
Time to see it work. We’ll scrape job listings from Real Python’s fake jobs page — a static demo site designed for scraping practice. No rate limits, no terms-of-service concerns.
The initial state sets the target URL, an extraction goal describing what fields to capture, and a 3-page limit to keep the demo fast.
initial_state = {
"url": "https://realpython.github.io/fake-jobs/",
"goal": (
"Extract all job listings. For each job, get: "
"title, company, location, and posting date."
),
"extracted_data": [],
"current_page": 1,
"max_pages": 3,
"next_page_url": "",
"analysis_report": "",
"error_log": [],
"retry_count": 0,
"status": "ready",
}
result = scraper_agent.invoke(initial_state)
After the pipeline finishes, inspect the results. The extracted_data list contains structured dictionaries, and analysis_report holds the LLM’s summary.
print(f"Records extracted: {len(result['extracted_data'])}")
print(f"\nFirst 3 records:")
for record in result["extracted_data"][:3]:
print(json.dumps(record, indent=2))
print(f"\n{'='*50}")
print("ANALYSIS REPORT")
print(f"{'='*50}")
print(result["analysis_report"])
if result["error_log"]:
print(f"\nErrors: {result['error_log']}")
Your output will show structured job data and an analytical summary. The exact records depend on what the LLM extracts, but the structure looks like this:
Records extracted: 100
First 3 records:
{
"title": "Energy engineer",
"company": "Vasquez-Davidson",
"location": "Christopherport, AA",
"posting_date": "2021-04-08"
}
...
<!-- OUTPUT — Record count and specific fields vary based on LLM extraction.
The demo site has 100 fake job listings on a single page. -->
Add Error Recovery — Making It Production-Ready
The basic pipeline handles happy paths. But what about a 429 rate-limit response? A timeout on a slow server? A page that returns garbled HTML?
A dedicated error handler node classifies failures and sets a status the routing function can act on. The handler diagnoses. The router decides. The fetch node acts. Clean separation.
def handle_error(state: ScraperState) -> dict:
"""Classify errors and set recovery strategy."""
errors = state.get("error_log", [])
last_error = errors[-1] if errors else "Unknown error"
if "429" in last_error or "rate" in last_error.lower():
return {
"status": "rate_limited",
"error_log": ["Rate limited — backing off before retry"],
}
if "timeout" in last_error.lower():
return {
"status": "timeout_retry",
"error_log": ["Timeout — retrying with longer wait"],
}
return {
"status": "unrecoverable",
"error_log": [f"Giving up after error: {last_error}"],
}
In a production deployment, you’d also want exponential backoff for rate limits, proxy rotation for IP bans, and a dead-letter queue for persistent failures. Those are full topics on their own — the node architecture makes adding them straightforward because each concern lives in its own node.
Exercise 1: Add a Data Validation Node
You’ve seen how each node handles one responsibility. Now add a validate_data node that checks extracted records for quality before they reach analysis.
The node should filter out records missing required keys and log how many it removed. If 90% of records get filtered, that signals a problem with the extraction prompt — not the data.
# Complete this function
def validate_data(state: ScraperState) -> dict:
"""Validate and clean extracted data."""
data = state.get("extracted_data", [])
required_keys = {"title", "company", "location"}
# TODO: Filter records that contain all required keys
# TODO: Count how many records were removed
# TODO: Return cleaned data with appropriate status
valid_records = [] # Your filtering logic here
removed_count = 0 # Your count here
return {
"extracted_data": valid_records,
"error_log": [
f"Validation: kept {len(valid_records)}, "
f"removed {removed_count} incomplete records"
],
"status": "validated" if valid_records else "no_valid_data",
}
Hint 1
Use `all()` inside a list comprehension to check if every required key exists: `all(key in record for key in required_keys)`.
Hint 2
Full filtering: `valid_records = [r for r in data if all(k in r for k in required_keys)]`. Then `removed_count = len(data) – len(valid_records)`.
Solution
def validate_data(state: ScraperState) -> dict:
"""Validate and clean extracted data."""
data = state.get("extracted_data", [])
required_keys = {"title", "company", "location"}
valid_records = [
record for record in data
if all(key in record for key in required_keys)
]
removed_count = len(data) - len(valid_records)
return {
"extracted_data": valid_records,
"error_log": [
f"Validation: kept {len(valid_records)}, "
f"removed {removed_count} incomplete records"
],
"status": "validated" if valid_records else "no_valid_data",
}
**Why this works:** `all()` returns `True` only when every required key exists in the record. Records with missing fields get filtered out. The removed count helps you diagnose extraction issues — high removal rates mean the extraction prompt needs tuning.
Customize for Different Scraping Goals
The same pipeline scrapes any type of data. You don’t change the code — you change the goal string. The LLM adapts its extraction strategy at runtime.
Want product listings instead of jobs?
product_state = {
"url": "https://example-store.com/electronics",
"goal": (
"Extract product listings: name, price in USD, "
"star rating as a float, and availability status"
),
"extracted_data": [],
"current_page": 1,
"max_pages": 5,
"next_page_url": "",
"analysis_report": "",
"error_log": [],
"retry_count": 0,
"status": "ready",
}
Research paper metadata? Same pipeline, different goal:
research_state = {
"url": "https://arxiv.org/list/cs.AI/recent",
"goal": (
"Extract paper listings: title, authors, "
"abstract summary, and submission date"
),
"extracted_data": [],
"current_page": 1,
"max_pages": 2,
"next_page_url": "",
"analysis_report": "",
"error_log": [],
"retry_count": 0,
"status": "ready",
}
Exercise 2: Add Configurable Rate Limiting
Web servers don’t appreciate rapid-fire requests. Modify the fetch logic to use a configurable delay from state instead of the hardcoded 2 seconds.
# Add a 'fetch_delay' field to the state and use it
def fetch_page_configurable(state: ScraperState) -> dict:
"""Fetch with configurable delay between pages."""
current_page = state.get("current_page", 1)
delay = state.get("fetch_delay", 2) # Default 2 seconds
# TODO: Apply delay for non-first pages
# TODO: Fetch the URL with error handling
url = state.get("next_page_url") or state["url"]
pass # Complete the implementation
Hint 1
Check `current_page > 1` before sleeping. The first page doesn’t need a delay.
Hint 2
Add `if current_page > 1: time.sleep(delay)` before the request. The rest follows the original `fetch_page` pattern.
Solution
def fetch_page_configurable(state: ScraperState) -> dict:
"""Fetch with configurable delay between pages."""
current_page = state.get("current_page", 1)
delay = state.get("fetch_delay", 2)
if current_page > 1:
time.sleep(delay)
url = state.get("next_page_url") or state["url"]
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36"
)
}
try:
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
return {
"raw_html": response.text,
"status": "fetched",
"retry_count": 0,
}
except requests.RequestException as error:
return {
"status": "fetch_error",
"error_log": [f"Fetch failed: {str(error)}"],
"retry_count": state.get("retry_count", 0) + 1,
}
**Why configurable delays matter:** Some sites tolerate 1-second gaps. Others need 5 seconds. Making it a state parameter lets you tune per-target without touching code.
Common Mistakes and How to Fix Them
Mistake 1: No Page Limit on Pagination
❌ Wrong:
initial_state = {
"max_pages": 999, # Or omitting it entirely
}
Why it’s dangerous: Some sites have thousands of pages. Your pipeline runs for hours, burns API credits, and might trigger an IP ban.
✅ Correct:
initial_state = {
"max_pages": 10, # Start small, increase if needed
}
Mistake 2: Sending Raw HTML to the LLM
❌ Wrong:
prompt = f"Extract data from: {state['raw_html']}"
Why it fails: Raw HTML is 80% noise. A product page has 150KB of HTML but 2KB of useful content. You waste tokens and confuse the model.
✅ Correct:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "nav", "footer"]):
tag.decompose()
clean_text = soup.get_text(separator="\n", strip=True)[:12000]
Mistake 3: No Status Checks in Routing
❌ Wrong:
def route_after_fetch(state):
return "parse" # Always parse, even on failure
Why it breaks: If the fetch failed, raw_html is either empty or stale from the previous page. The parse node crashes or produces duplicates.
✅ Correct:
def route_after_fetch(state):
if state["status"] == "fetched":
return "parse"
if state.get("retry_count", 0) < 3:
return "retry_fetch"
return "analyze" # Graceful fallback
When NOT to Use This Approach
This pipeline isn’t always the right tool. Here are the scenarios where you should reach for something else:
Use an API instead if the site offers one. APIs return clean JSON — no parsing, no LLM costs. Always check for developer docs or /api/ endpoints first.
Use hardcoded selectors for high-volume scraping. At 100K pages with 12K tokens each, LLM costs reach roughly $180. Traditional CSS selectors cost zero for extraction. If the site structure is stable, selectors are the pragmatic choice.
Use a deterministic scraper for real-time monitoring. The LLM adds 1-3 seconds latency per page. Sub-second scraping for price tracking needs hardcoded logic.
This pipeline shines when site layouts change frequently, you’re scraping diverse sites with different structures, or you’re prototyping a scraper that needs to work across multiple domains without custom selectors for each one.
Complete Code
Click to expand the full script (copy-paste and run)
# Complete code from: Autonomous Web Scraping Pipeline with LangGraph
# Requires: pip install langgraph langchain-openai langchain-core requests beautifulsoup4
# Python 3.10+
# Set OPENAI_API_KEY environment variable before running
import os
import json
import time
import requests
from bs4 import BeautifulSoup
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
# --- Setup ---
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# --- State ---
def merge_lists(existing: list, new: list) -> list:
return existing + new
class ScraperState(TypedDict):
url: str
goal: str
raw_html: str
extracted_data: Annotated[list[dict], merge_lists]
current_page: int
max_pages: int
next_page_url: str
analysis_report: str
error_log: Annotated[list[str], merge_lists]
retry_count: int
status: str
# --- Nodes ---
def fetch_page(state: ScraperState) -> dict:
current_page = state.get("current_page", 1)
if current_page > 1:
time.sleep(2)
url = state.get("next_page_url") or state["url"]
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
}
try:
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
return {
"raw_html": response.text,
"status": "fetched",
"retry_count": 0,
}
except requests.RequestException as error:
return {
"status": "fetch_error",
"error_log": [f"Fetch failed for {url}: {str(error)}"],
"retry_count": state.get("retry_count", 0) + 1,
}
def parse_content(state: ScraperState) -> dict:
html = state["raw_html"]
goal = state["goal"]
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "nav", "footer"]):
tag.decompose()
clean_text = soup.get_text(separator="\n", strip=True)
truncated = clean_text[:12000]
prompt = f"""Extract structured data from this webpage.
Goal: {goal}
Return a JSON array of objects with consistent keys.
If no relevant data exists, return an empty array [].
Return ONLY valid JSON — no markdown, no explanation.
Webpage content:
{truncated}"""
response = llm.invoke([
SystemMessage(content="You are a data extraction specialist."),
HumanMessage(content=prompt),
])
try:
records = json.loads(response.content)
if not isinstance(records, list):
records = [records]
except json.JSONDecodeError:
return {
"extracted_data": [],
"error_log": ["LLM returned invalid JSON during parsing"],
"status": "parse_error",
}
return {"extracted_data": records, "status": "parsed"}
def check_pagination(state: ScraperState) -> dict:
current = state.get("current_page", 1)
max_pages = state.get("max_pages", 5)
if current >= max_pages:
return {"status": "all_pages_scraped"}
soup = BeautifulSoup(state["raw_html"], "html.parser")
links = []
for a_tag in soup.find_all("a", href=True):
link_text = a_tag.get_text(strip=True).lower()
href = a_tag["href"]
if any(kw in link_text for kw in ["next", ">>", "\u203a", "older"]):
links.append(f"{link_text}: {href}")
if not links:
return {"status": "all_pages_scraped"}
prompt = f"""Which link leads to the next page of results?
Links found:
{chr(10).join(links)}
Current URL: {state.get('next_page_url') or state['url']}
Return ONLY the full absolute URL. If no next page exists, return NONE.
If the href is relative, combine it with the base URL."""
response = llm.invoke([HumanMessage(content=prompt)])
answer = response.content.strip()
if answer == "NONE" or not answer.startswith("http"):
return {"status": "all_pages_scraped"}
return {
"next_page_url": answer,
"current_page": current + 1,
"status": "has_next_page",
}
def analyze_data(state: ScraperState) -> dict:
data = state.get("extracted_data", [])
if not data:
return {"analysis_report": "No data was extracted.", "status": "complete"}
data_str = json.dumps(data[:50], indent=2)
prompt = f"""Analyze this scraped dataset and produce a report.
Total records collected: {len(data)}
Sample data (first 50 records):
{data_str}
Include:
1. Summary: total records, fields present, data completeness
2. Key Statistics: counts, averages, ranges where applicable
3. Notable Patterns: trends, outliers, interesting findings
4. Data Quality Notes: missing fields, inconsistencies
Be specific. Use actual numbers from the data."""
response = llm.invoke([
SystemMessage(content="You are a data analyst. Be concise and specific."),
HumanMessage(content=prompt),
])
return {"analysis_report": response.content, "status": "complete"}
# --- Routing ---
def route_after_fetch(state: ScraperState) -> str:
if state["status"] == "fetched":
return "parse"
if state.get("retry_count", 0) < 3:
return "retry_fetch"
return "analyze"
def route_after_pagination(state: ScraperState) -> str:
if state["status"] == "has_next_page":
return "fetch_next"
return "analyze"
# --- Graph Assembly ---
graph = StateGraph(ScraperState)
graph.add_node("fetch", fetch_page)
graph.add_node("parse", parse_content)
graph.add_node("check_pagination", check_pagination)
graph.add_node("analyze", analyze_data)
graph.add_edge(START, "fetch")
graph.add_conditional_edges(
"fetch",
route_after_fetch,
{"parse": "parse", "retry_fetch": "fetch", "analyze": "analyze"},
)
graph.add_edge("parse", "check_pagination")
graph.add_conditional_edges(
"check_pagination",
route_after_pagination,
{"fetch_next": "fetch", "analyze": "analyze"},
)
graph.add_edge("analyze", END)
scraper_agent = graph.compile()
# --- Run ---
if __name__ == "__main__":
result = scraper_agent.invoke({
"url": "https://realpython.github.io/fake-jobs/",
"goal": (
"Extract all job listings. For each job, get: "
"title, company, location, and posting date."
),
"extracted_data": [],
"current_page": 1,
"max_pages": 3,
"next_page_url": "",
"analysis_report": "",
"error_log": [],
"retry_count": 0,
"status": "ready",
})
print(f"Records extracted: {len(result['extracted_data'])}")
for record in result["extracted_data"][:3]:
print(json.dumps(record, indent=2))
print(f"\n{'='*50}")
print("ANALYSIS REPORT")
print(f"{'='*50}")
print(result["analysis_report"])
if result["error_log"]:
print(f"\nErrors encountered: {result['error_log']}")
Summary
You built an autonomous web scraping pipeline with LangGraph that fetches pages, extracts structured data using an LLM, follows pagination links, recovers from errors, and produces analytical reports.
The four design decisions that make it work:
- State reducers (
merge_lists) accumulate data across pagination loops without overwriting - Conditional edges create the pagination loop and error recovery branches
- LLM-driven extraction adapts to any page structure without hardcoded selectors
- Single-responsibility nodes — each does one thing, routing functions decide flow
The architecture extends naturally. Need validation? Add a node. Need CSV export? Add a node. Need deduplication? Add a node. Each plugs into the graph at the right spot without touching existing code.
Practice Exercise
Extend the pipeline with an export_node that runs after analysis and writes extracted_data to a CSV file using Python’s csv.DictWriter.
Solution
import csv
def export_to_csv(state: ScraperState) -> dict:
"""Export extracted data to a CSV file."""
data = state.get("extracted_data", [])
if not data:
return {"status": "no_data_to_export"}
headers = list(data[0].keys())
filename = "scraped_data.csv"
with open(filename, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=headers)
writer.writeheader()
writer.writerows(data)
return {
"status": "exported",
"error_log": [f"Exported {len(data)} records to {filename}"],
}
Wire it into the graph by replacing the `analyze -> END` edge:
graph.add_node("export", export_to_csv)
graph.add_edge("analyze", "export")
graph.add_edge("export", END)
Frequently Asked Questions
Can this pipeline handle JavaScript-rendered pages?
No — requests fetches raw HTML only. For JS-heavy sites (React, Vue, Angular), swap requests.get() for Selenium or Playwright in the fetch node. The rest of the pipeline stays identical because each node is independent.
# Swap this into the fetch node for JS-rendered pages
from playwright.sync_api import sync_playwright
def fetch_with_playwright(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url, wait_until="networkidle")
html = page.content()
browser.close()
return html
How much does running this pipeline cost?
With gpt-4o-mini, each page costs roughly \(0.001-\)0.003 in API fees for parsing plus pagination check. Scraping 100 pages runs about \(0.15-\)0.30 total. The analysis step adds \(0.005-\)0.01. For cost-sensitive work, swap ChatOpenAI for ChatOllama and run a local model like Llama 3.
Is web scraping legal?
It depends on jurisdiction and the specific site. In the US, the hiQ v. LinkedIn ruling established that scraping publicly available data doesn’t violate the CFAA. That said — always check robots.txt and terms of service. Respect rate limits. Don’t scrape personal data without consent.
How do I scrape sites that require login?
Add a login_node that authenticates first and stores session cookies in state. Subsequent fetch requests include those cookies. You can also pass authentication headers directly in the fetch node’s requests.get() call.
References
- LangGraph documentation — StateGraph, conditional edges, and state management. Link
- LangChain documentation — ChatOpenAI model integration. Link
- BeautifulSoup documentation — Parsing HTML and navigating the tree. Link
- Python requests library documentation. Link
- Cohorte Projects — How to Build a Smart Web-Scraping AI Agent with LangGraph and Selenium. Link
- Firecrawl — Building a Documentation Agent with LangGraph and Firecrawl. Link
- Real Python — LangGraph: Build Stateful AI Agents in Python. Link
- hiQ Labs, Inc. v. LinkedIn Corp., 938 F.3d 985 (9th Cir. 2019) — Legal precedent for public data scraping.
Reviewed: March 2026 | LangGraph version: 0.4+ | Python: 3.10+
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →