LLM Streaming Tutorial: SSE in Python Step-by-Step
Stream LLM tokens from OpenAI, Claude, and Gemini in Python using SSE and async generators. Includes FastAPI server, backpressure handling, and runnable code.
You send a prompt. Then you wait. And wait. Five seconds later, a wall of text dumps on screen. That’s the non-streaming experience. Streaming fixes it — your user sees the first word in under a second.
What Is LLM Streaming and Why Does It Matter?
When you call an LLM API without streaming, the server builds the entire response before sending a single byte. The user stares at a blank screen for 3-10 seconds. Streaming flips this.
With streaming on, the server ships each token the moment it’s ready. Your app shows text as it arrives. Perceived latency drops from seconds to milliseconds.
Why should you care? Every production LLM app streams. ChatGPT, Claude, Gemini — all of them. If yours doesn’t, it feels broken by comparison.
| Mode | Time to First Token | Time to Full Response | User Experience |
|---|---|---|---|
| Non-streaming | 3-8 seconds | 3-8 seconds | Blank screen, then wall of text |
| Streaming | 0.2-0.5 seconds | 3-8 seconds (same) | Text appears word by word |
Total generation time stays the same. Streaming doesn’t speed up the model. It makes the experience feel instant because you see progress right away.
Prerequisites
- Python version: 3.10+
- Required libraries:
requests,aiohttp,fastapi,uvicorn - Install:
pip install requests aiohttp fastapi uvicorn - API keys: At least one from OpenAI, Anthropic, or Google (setup in our LLM API tutorial)
- Time to complete: ~25 minutes
- Cost: Under $0.05 total
The SSE Protocol — How Streaming Works Under the Hood
Before writing streaming code, you need to know what travels across the wire. Every major LLM provider uses the same protocol: Server-Sent Events (SSE).
SSE sits on top of plain HTTP. The server keeps the connection open and pushes data chunks as they become ready. Each chunk follows a strict text format.
Here’s a raw SSE stream from an LLM call:
data: {"choices":[{"delta":{"content":"Hello"}}]}
data: {"choices":[{"delta":{"content":" there"}}]}
data: {"choices":[{"delta":{"content":"!"}}]}
data: [DONE]
Three rules govern every SSE message:
- Each message starts with
data:followed by a JSON payload - Messages are separated by two newlines (
\n\n) - The stream ends with
data: [DONE]
That’s it. No binary framing, no handshake, no protocol upgrade. Plain text over a normal HTTP response.
The Content-Type header for SSE is text/event-stream. When you spot that header, you know you’re dealing with a stream. Your client reads the response body line by line instead of waiting for the full payload.
Quick check: If an SSE message looks like data: {"token":"Hi"}\n\ndata: {"token":"!"}\n\n, how many separate messages does it contain? Answer: two. Each data: line followed by \n\n is one message.
Stream OpenAI Responses with Raw HTTP
OpenAI’s streaming format is the most widely copied, so we start here. We’ll use raw requests — no SDK — so you see exactly what happens at the HTTP level.
Two changes from a normal API call: set "stream": true in the request body, and iterate over lines instead of calling .json(). Each line holds a delta object with a content field. That’s your token.
import micropip
await micropip.install(['requests'])
import os
from js import prompt
OPENAI_API_KEY = prompt("Enter your OpenAI API key:")
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
ANTHROPIC_API_KEY = prompt("Enter your Anthropic API key:")
os.environ["ANTHROPIC_API_KEY"] = ANTHROPIC_API_KEY
GOOGLE_API_KEY = prompt("Enter your Google API key:")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
import requests
import json
import os
api_key = os.environ["OPENAI_API_KEY"]
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "Explain SSE in 2 sentences."}],
"stream": True,
},
stream=True,
)
for line in response.iter_lines():
if not line:
continue
text = line.decode("utf-8")
if text == "data: [DONE]":
break
if text.startswith("data: "):
chunk = json.loads(text[6:])
token = chunk["choices"][0]["delta"].get("content", "")
print(token, end="", flush=True)
print()
Notice two things. stream=True appears twice — once in the JSON body (tells OpenAI to stream) and once in requests.post() (tells the requests library not to buffer). Skip either one and streaming breaks.
The delta object sometimes has an empty content field. That happens on the first chunk (which carries the role field) and between some tokens. The .get("content", "") handles it cleanly.
Stream Anthropic (Claude) Responses
Anthropic’s format differs in one key way. Instead of a single delta.content field, Claude sends typed events. The token text arrives inside events of type content_block_delta.
The request body uses "stream": true like OpenAI. But each SSE event includes an event: field before the data: line. We filter for the event type we care about.
api_key = os.environ["ANTHROPIC_API_KEY"]
response = requests.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
},
json={
"model": "claude-sonnet-4-20250514",
"max_tokens": 200,
"messages": [{"role": "user", "content": "Explain SSE in 2 sentences."}],
"stream": True,
},
stream=True,
)
for line in response.iter_lines():
if not line:
continue
text = line.decode("utf-8")
if text.startswith("data: "):
payload = json.loads(text[6:])
if payload.get("type") == "content_block_delta":
token = payload["delta"].get("text", "")
print(token, end="", flush=True)
print()
Anthropic sends several event types: message_start, content_block_start, content_block_delta, and message_stop. We only care about content_block_delta for actual text. The rest carry metadata — token counts, stop reasons, and usage stats.
Stream Google Gemini Responses
Gemini takes a third path. The endpoint URL changes — you append :streamGenerateContent with alt=sse as a query parameter. Without alt=sse, Gemini returns one big JSON array instead of a stream.
I find Gemini’s token extraction path the deepest of the three. You dig through candidates[0].content.parts[0].text to reach the actual token. Verbose, but consistent.
api_key = os.environ["GOOGLE_API_KEY"]
model = "gemini-2.0-flash"
response = requests.post(
f"https://generativelanguage.googleapis.com/v1beta/models/"
f"{model}:streamGenerateContent?alt=sse&key={api_key}",
headers={"Content-Type": "application/json"},
json={
"contents": [{"parts": [{"text": "Explain SSE in 2 sentences."}]}],
},
stream=True,
)
for line in response.iter_lines():
if not line:
continue
text = line.decode("utf-8")
if text.startswith("data: "):
chunk = json.loads(text[6:])
parts = chunk.get("candidates", [{}])[0]
content = parts.get("content", {}).get("parts", [{}])[0]
token = content.get("text", "")
print(token, end="", flush=True)
print()
Here’s how the three providers compare at the protocol level:
| Feature | OpenAI | Anthropic | Google Gemini |
|---|---|---|---|
| Stream flag | "stream": true in body |
"stream": true in body |
alt=sse in URL |
| Token field | delta.content |
delta.text |
candidates[0].content.parts[0].text |
| End signal | data: [DONE] |
type: message_stop |
Last chunk has finishReason |
| Event types | Single format | Multiple typed events | Single format |
| Auth | Bearer token | x-api-key header |
API key in URL |
Async Generators — The Clean Way to Stream
Raw HTTP parsing works, but it’s repetitive and hard to reuse. Every time you want to stream, you’d copy-paste the same SSE parsing loop. Async generators solve this.
An async generator uses async def and yield. It produces values one at a time, and the caller handles each value as it arrives. This maps perfectly to token streaming — yield each token, let the caller decide what to do with it.
Here’s an async generator for OpenAI using aiohttp:
import aiohttp
import asyncio
import json
import os
async def stream_openai(prompt, model="gpt-4o-mini"):
"""Async generator that yields tokens from OpenAI."""
api_key = os.environ["OPENAI_API_KEY"]
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": True,
},
) as resp:
async for line in resp.content:
text = line.decode("utf-8").strip()
if text == "data: [DONE]":
return
if text.startswith("data: "):
chunk = json.loads(text[6:])
token = chunk["choices"][0]["delta"].get("content", "")
if token:
yield token
The generator encapsulates all SSE parsing. The caller doesn’t touch data: prefixes, JSON parsing, or [DONE] signals. It just receives clean tokens.
Consuming it is simple:
async def main():
async for token in stream_openai("What is backpressure?"):
print(token, end="", flush=True)
print()
asyncio.run(main())
Here’s a before/after comparison to see why generators matter:
# BEFORE: Raw parsing (repeated every time you stream)
for line in response.iter_lines():
if not line:
continue
text = line.decode("utf-8")
if text == "data: [DONE]":
break
if text.startswith("data: "):
chunk = json.loads(text[6:])
token = chunk["choices"][0]["delta"].get("content", "")
print(token, end="", flush=True)
# AFTER: Generator (one line to consume the stream)
async for token in stream_openai("Your prompt here"):
print(token, end="", flush=True)
The “before” code is 10 lines of protocol plumbing you’d repeat everywhere. The “after” code is 2 lines. That’s the value of wrapping streams in generators.
Exercise 1: Build an Async Streaming Generator for Anthropic
{
type: 'exercise',
id: 'anthropic-stream-generator',
title: 'Exercise 1: Build an Async Streaming Generator for Anthropic',
difficulty: 'intermediate',
exerciseType: 'write',
instructions: 'Complete the async generator function that yields tokens from the Anthropic (Claude) API. The function should handle SSE parsing and only yield actual text tokens from `content_block_delta` events.',
starterCode: 'import aiohttp\nimport json\nimport os\n\nasync def stream_anthropic(prompt, model=\"claude-sonnet-4-20250514\"):\n \"\"\"Async generator that yields tokens from Anthropic.\"\"\"\n api_key = os.environ[\"ANTHROPIC_API_KEY\"]\n async with aiohttp.ClientSession() as session:\n async with session.post(\n \"https://api.anthropic.com/v1/messages\",\n headers={\n \"x-api-key\": api_key,\n \"anthropic-version\": \"2023-06-01\",\n \"Content-Type\": \"application/json\",\n },\n json={\n \"model\": model,\n \"max_tokens\": 1024,\n \"messages\": [{\"role\": \"user\", \"content\": prompt}],\n \"stream\": True,\n },\n ) as resp:\n async for line in resp.content:\n text = line.decode(\"utf-8\").strip()\n # TODO: Parse SSE lines and yield tokens\n # Only yield text from content_block_delta events\n pass',
testCases: [
{ id: 'tc1', input: 'import inspect; print(inspect.isasyncgenfunction(stream_anthropic))', expectedOutput: 'True', description: 'Function must be an async generator' },
{ id: 'tc2', input: 'import ast; source = inspect.getsource(stream_anthropic); print("content_block_delta" in source)', expectedOutput: 'True', description: 'Must filter for content_block_delta events' },
{ id: 'tc3', input: 'print("yield" in inspect.getsource(stream_anthropic))', expectedOutput: 'True', description: 'Must yield tokens', hidden: true },
],
hints: [
'Check if the line starts with "data: " and parse the JSON. Then check if payload["type"] == "content_block_delta" before yielding.',
'Full pattern: if text.startswith("data: "): payload = json.loads(text[6:]); if payload.get("type") == "content_block_delta": yield payload["delta"]["text"]',
],
solution: 'import aiohttp\nimport json\nimport os\n\nasync def stream_anthropic(prompt, model=\"claude-sonnet-4-20250514\"):\n \"\"\"Async generator that yields tokens from Anthropic.\"\"\"\n api_key = os.environ[\"ANTHROPIC_API_KEY\"]\n async with aiohttp.ClientSession() as session:\n async with session.post(\n \"https://api.anthropic.com/v1/messages\",\n headers={\n \"x-api-key\": api_key,\n \"anthropic-version\": \"2023-06-01\",\n \"Content-Type\": \"application/json\",\n },\n json={\n \"model\": model,\n \"max_tokens\": 1024,\n \"messages\": [{\"role\": \"user\", \"content\": prompt}],\n \"stream\": True,\n },\n ) as resp:\n async for line in resp.content:\n text = line.decode(\"utf-8\").strip()\n if not text or not text.startswith(\"data: \"):\n continue\n payload = json.loads(text[6:])\n if payload.get(\"type\") == \"content_block_delta\":\n token = payload[\"delta\"].get(\"text\", \"\")\n if token:\n yield token',
solutionExplanation: 'The key is filtering for `content_block_delta` events. Anthropic sends multiple event types (message_start, content_block_start, etc.), but only `content_block_delta` carries the actual token text. We skip empty lines and non-data lines, parse the JSON payload, check the type, and yield the text from the delta object.',
xpReward: 15,
}
Build a Unified Streaming Client
All three providers follow the same pattern: open a connection, read lines, extract tokens, yield them. The differences are just auth headers, JSON paths, and end signals. Let’s combine them into one generator.
First, we set up the provider configs. Each config holds the URL, default model, and headers:
async def stream_llm(prompt, provider="openai", model=None):
"""Unified async generator for OpenAI, Anthropic, Gemini."""
configs = {
"openai": {
"url": "https://api.openai.com/v1/chat/completions",
"model": model or "gpt-4o-mini",
"headers": {
"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
"Content-Type": "application/json",
},
},
"anthropic": {
"url": "https://api.anthropic.com/v1/messages",
"model": model or "claude-sonnet-4-20250514",
"headers": {
"x-api-key": os.environ["ANTHROPIC_API_KEY"],
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
},
},
"gemini": {
"model": model or "gemini-2.0-flash",
"headers": {"Content-Type": "application/json"},
},
}
cfg = configs[provider]
Next, we build the request body. Each provider expects a slightly different shape:
# Build request body per provider
if provider == "openai":
url = cfg["url"]
body = {
"model": cfg["model"],
"messages": [{"role": "user", "content": prompt}],
"stream": True,
}
elif provider == "anthropic":
url = cfg["url"]
body = {
"model": cfg["model"],
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}],
"stream": True,
}
else:
url = (
f"https://generativelanguage.googleapis.com/v1beta/models/"
f"{cfg['model']}:streamGenerateContent"
f"?alt=sse&key={os.environ['GOOGLE_API_KEY']}"
)
body = {"contents": [{"parts": [{"text": prompt}]}]}
Finally, the streaming loop. We open one aiohttp session and route token extraction based on the provider:
async with aiohttp.ClientSession() as session:
async with session.post(
url, headers=cfg["headers"], json=body
) as response:
async for line in response.content:
text = line.decode("utf-8").strip()
if not text or not text.startswith("data: "):
continue
data_str = text[6:]
if data_str == "[DONE]":
return
payload = json.loads(data_str)
if provider == "openai":
token = (payload.get("choices", [{}])[0]
.get("delta", {}).get("content", ""))
elif provider == "anthropic":
if payload.get("type") != "content_block_delta":
continue
token = payload["delta"].get("text", "")
else:
parts = payload.get("candidates", [{}])[0]
token = (parts.get("content", {})
.get("parts", [{}])[0].get("text", ""))
if token:
yield token
All three code blocks above form one function. You can stream from any provider with a single call:
async def compare_providers():
prompt = "Name 3 uses of async generators."
for provider in ["openai", "anthropic", "gemini"]:
print(f"\n--- {provider.upper()} ---")
async for token in stream_llm(prompt, provider=provider):
print(token, end="", flush=True)
print()
asyncio.run(compare_providers())
Exercise 2: Add Token Counting to the Unified Client
{
type: 'exercise',
id: 'token-counting-wrapper',
title: 'Exercise 2: Add Token Counting to the Unified Client',
difficulty: 'intermediate',
exerciseType: 'write',
instructions: 'Write a wrapper async generator called `stream_with_count` that wraps `stream_llm`, yields each token, and prints a summary line at the end showing the total token count and characters received.',
starterCode: 'async def stream_with_count(prompt, provider=\"openai\"):\n \"\"\"Wrapper that yields tokens and prints count at the end.\"\"\"\n token_count = 0\n char_count = 0\n # TODO: iterate over stream_llm, count tokens, yield each one\n # After the loop, print: f\"[{token_count} tokens, {char_count} chars]\"\n pass',
testCases: [
{ id: 'tc1', input: 'import inspect; print(inspect.isasyncgenfunction(stream_with_count))', expectedOutput: 'True', description: 'Must be an async generator function' },
{ id: 'tc2', input: 'source = inspect.getsource(stream_with_count); print("stream_llm" in source and "yield" in source)', expectedOutput: 'True', description: 'Must wrap stream_llm and yield tokens' },
],
hints: [
'Use `async for token in stream_llm(prompt, provider=provider):` inside your function. Increment counters and yield each token.',
'After the async for loop ends, print the summary: `print(f\"[{token_count} tokens, {char_count} chars]\")`',
],
solution: 'async def stream_with_count(prompt, provider=\"openai\"):\n \"\"\"Wrapper that yields tokens and prints count at the end.\"\"\"\n token_count = 0\n char_count = 0\n async for token in stream_llm(prompt, provider=provider):\n token_count += 1\n char_count += len(token)\n yield token\n print(f\"[{token_count} tokens, {char_count} chars]\")',
solutionExplanation: 'This wrapper demonstrates the composability of async generators. It wraps `stream_llm`, passes each token through to the caller via `yield`, and tracks counts along the way. The summary prints after the stream ends. This pattern works for any middleware -- logging, filtering, rate limiting -- without modifying the original generator.',
xpReward: 15,
}
Build a FastAPI Streaming Server
So far, you’ve consumed streams from LLM providers. Let’s flip it. You’ll build a server that accepts prompts and streams responses to your own users.
FastAPI’s StreamingResponse takes an async generator and sends each yielded chunk to the client. The sse_generator function below wraps our stream_openai generator and formats each token as a proper SSE message with data: prefix and \n\n separator.
# server.py -- NOT runnable in Pyodide (requires local Python)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
async def sse_generator(prompt):
"""Wraps the LLM stream into SSE format."""
async for token in stream_openai(prompt):
yield f"data: {json.dumps({'token': token})}\n\n"
yield "data: [DONE]\n\n"
@app.post("/stream")
async def stream_endpoint(request: dict):
prompt = request.get("prompt", "")
return StreamingResponse(
sse_generator(prompt),
media_type="text/event-stream",
)
Start the server:
uvicorn server:app --host 0.0.0.0 --port 8000
Here’s the async client that connects to your server and displays tokens:
# client.py -- works anywhere with HTTP access
async def consume_stream(server_url, prompt):
"""Connect to our FastAPI server and display tokens."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{server_url}/stream",
json={"prompt": prompt},
) as response:
async for line in response.content:
text = line.decode("utf-8").strip()
if not text:
continue
if text == "data: [DONE]":
print("\n[Stream complete]")
return
if text.startswith("data: "):
payload = json.loads(text[6:])
print(payload["token"], end="", flush=True)
asyncio.run(consume_stream("http://localhost:8000", "What is backpressure?"))
Predict the output: What happens if the client connects but the server’s stream_openai generator raises an exception mid-stream? The client receives a truncated response — some tokens arrive, then the connection closes without [DONE]. Your client should always handle this case with a timeout or a check for the [DONE] signal.
Handle Backpressure in LLM Streaming
What happens when tokens arrive faster than your code can process them? Maybe you’re writing each token to a database. Maybe the network is slow. This is backpressure — and ignoring it causes memory to balloon.
With SSE over TCP, basic backpressure works automatically. If the client stops reading, the TCP buffer fills, and the server blocks. But this is blunt. It can stall the entire stream for everyone if you share a connection pool.
A cleaner approach: put an asyncio.Queue between the producer (LLM stream) and the consumer (your processing logic). The queue decouples their speeds with a bounded buffer.
async def buffered_stream(prompt, buffer_size=50):
"""Buffer tokens in a queue to handle speed mismatch."""
queue = asyncio.Queue(maxsize=buffer_size)
async def producer():
async for token in stream_openai(prompt):
await queue.put(token)
await queue.put(None) # sentinel: done
async def consumer():
while True:
token = await queue.get()
if token is None:
break
await asyncio.sleep(0.05) # simulate slow work
print(token, end="", flush=True)
await asyncio.gather(producer(), consumer())
print()
asyncio.run(buffered_stream("Explain backpressure in 3 sentences."))
The maxsize=50 is the lever. When the queue fills, queue.put() pauses the producer until the consumer catches up. No token is dropped. No memory explodes.
Measure Streaming Performance
Two metrics matter: Time to First Token (TTFT) and tokens per second. TTFT tells you how fast the user sees something. Throughput tells you how fast the full response arrives.
The function below measures both. It uses time.perf_counter() for accurate timing and counts tokens as they flow through the generator.
import time
async def measure_stream(prompt, provider="openai"):
"""Measure TTFT and throughput for a streaming call."""
start = time.perf_counter()
first_token_time = None
token_count = 0
async for token in stream_llm(prompt, provider=provider):
if first_token_time is None:
first_token_time = time.perf_counter() - start
token_count += 1
total_time = time.perf_counter() - start
tps = token_count / total_time if total_time > 0 else 0
return {
"provider": provider,
"ttft_ms": round(first_token_time * 1000, 1) if first_token_time else 0,
"total_s": round(total_time, 2),
"tokens": token_count,
"tok_per_sec": round(tps, 1),
}
Run it against all three providers:
async def benchmark():
prompt = "Write a 100-word summary of how neural networks learn."
for provider in ["openai", "anthropic", "gemini"]:
r = await measure_stream(prompt, provider=provider)
print(f"{r['provider']:>10}: TTFT={r['ttft_ms']}ms | "
f"{r['tok_per_sec']} tok/s | Total={r['total_s']}s")
asyncio.run(benchmark())
Your numbers will vary based on location, network speed, and server load. I’ve consistently seen Gemini deliver the fastest TTFT, with Anthropic competitive on throughput and OpenAI solid across both metrics. But rankings shift week to week. What matters: all three stream fast enough for smooth user experience.
Common Mistakes and How to Fix Them
Mistake 1: Forgetting stream=True on the HTTP client
❌ Wrong:
response = requests.post(url, json=body, stream=False)
for line in response.iter_lines(): # blocks until full response
print(line)
Why it’s wrong: Without stream=True, requests downloads the entire response before iter_lines() runs. You lose all streaming.
✅ Correct:
response = requests.post(url, json=body, stream=True)
for line in response.iter_lines(): # yields lines live
print(line)
Mistake 2: Parsing every SSE line as JSON
❌ Wrong:
for line in response.iter_lines():
chunk = json.loads(line) # crashes on empty lines and [DONE]
Why it’s wrong: SSE streams include empty separator lines and the data: [DONE] sentinel. Blind JSON parsing crashes immediately.
✅ Correct:
for line in response.iter_lines():
if not line:
continue
text = line.decode("utf-8")
if text.startswith("data: ") and text != "data: [DONE]":
chunk = json.loads(text[6:])
Mistake 3: Blocking the event loop with time.sleep()
❌ Wrong:
async def process_stream():
async for token in stream_openai("Hello"):
time.sleep(0.1) # freezes everything!
print(token)
Why it’s wrong: time.sleep() blocks the entire event loop. Other coroutines — including the one reading the stream — can’t run.
✅ Correct:
async def process_stream():
async for token in stream_openai("Hello"):
await asyncio.sleep(0.1) # gives control back
print(token)
Error Troubleshooting
aiohttp.ClientConnectionError: Connection closed
The SSE connection dropped mid-stream. Wrap your async for loop in try/except and implement retry logic. Most providers allow resuming with the same prompt.
json.JSONDecodeError: Expecting value
You’re parsing a non-JSON line (empty line, [DONE] sentinel, or event: prefix). Always check text.startswith("data: ") and skip [DONE] before calling json.loads().
RuntimeError: cannot reuse already awaited coroutine
You called asyncio.run() inside code that’s already running an event loop (common in Jupyter). Use await main() instead, or install nest_asyncio and call nest_asyncio.apply() first.
When NOT to Use Streaming
Streaming isn’t always the right call. Here are three situations where non-streaming wins:
Batch processing. Processing 1,000 documents overnight? Streaming adds complexity with no benefit. You don’t need real-time display. Non-streaming calls are simpler to retry and easier to parallelize.
JSON mode / structured output. When the LLM returns valid JSON, streaming gives you partial JSON you can’t parse mid-stream. You’d have to buffer the complete response anyway.
Short responses under 50 tokens. If the full response takes under 500ms, streaming adds overhead (connection management, SSE parsing) for no visible UX gain.
I’ve found roughly 80% of user-facing LLM calls benefit from streaming, and 80% of backend calls don’t. Match the tool to the task.
Summary
You’ve built LLM streaming from the ground up. You parsed raw SSE from all three major providers, wrapped the protocol in reusable async generators, built a FastAPI server that streams to your own clients, and learned to measure performance and handle backpressure.
The core pattern is simple: set stream=True, read lines, parse JSON, yield tokens. Everything else — unified clients, servers, buffering — is just wrapping that pattern for different use cases.
Practice Exercise
Build a streaming retry wrapper. The function should attempt to stream, and if the connection drops mid-stream, retry up to 3 times, picking up from where it left off (by including the partial response in the retry prompt).
Complete Code
Frequently Asked Questions
Can I use streaming with function calling / tool use?
Yes, but the stream includes tool call arguments as they generate. You get partial JSON in delta.tool_calls that you must accumulate until the tool call completes. Only then can you parse it. Both OpenAI and Anthropic support this — check their streaming docs for the specific event types.
Is SSE the same as WebSockets?
No. SSE is one-way (server to client) over standard HTTP. WebSockets are bidirectional and require a protocol upgrade handshake. For LLM streaming, SSE wins because you only need server-to-client flow, and it works through proxies and CDNs without special config.
How do I handle errors mid-stream?
Wrap the streaming loop in try/except and track received tokens. If the connection drops, retry with the partial response included in the prompt. Most providers also send error events in the SSE stream before closing.
Does streaming work with Ollama and local models?
Yes. Ollama serves the same SSE-style streaming at localhost:11434. Set "stream": true in the body and parse lines the same way. The stream_llm function from this tutorial works with Ollama if you add a local provider config.
Why does my stream hang in Jupyter notebooks?
Jupyter already runs an event loop. Calling asyncio.run() inside it raises an error or hangs. Fix: install nest_asyncio, call nest_asyncio.apply() at the top, then use await main() directly in a cell.
References
- OpenAI API documentation — Streaming responses. Link
- Anthropic API documentation — Streaming messages. Link
- Google Gemini API documentation — Stream generate content. Link
- MDN Web Docs — Server-Sent Events. Link
- FastAPI documentation — StreamingResponse. Link
- Python
aiohttpdocumentation — Client quickstart. Link - Python
asynciodocumentation — Async generators (PEP 525). Link - WHATWG — Server-Sent Events specification. Link
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →