Menu

LLM Streaming Tutorial: SSE in Python Step-by-Step

Stream LLM tokens from OpenAI, Claude, and Gemini in Python using SSE and async generators. Includes FastAPI server, backpressure handling, and runnable code.

Written by Selva Prabhakaran | 28 min read

You send a prompt. Then you wait. And wait. Five seconds later, a wall of text dumps on screen. That’s the non-streaming experience. Streaming fixes it — your user sees the first word in under a second.

⚡ This post has interactive code — click ▶ Run or press Ctrl+Enter on any code block to execute it directly in your browser. The first run may take a few seconds to initialize.

What Is LLM Streaming and Why Does It Matter?

When you call an LLM API without streaming, the server builds the entire response before sending a single byte. The user stares at a blank screen for 3-10 seconds. Streaming flips this.

With streaming on, the server ships each token the moment it’s ready. Your app shows text as it arrives. Perceived latency drops from seconds to milliseconds.

Why should you care? Every production LLM app streams. ChatGPT, Claude, Gemini — all of them. If yours doesn’t, it feels broken by comparison.

Mode Time to First Token Time to Full Response User Experience
Non-streaming 3-8 seconds 3-8 seconds Blank screen, then wall of text
Streaming 0.2-0.5 seconds 3-8 seconds (same) Text appears word by word

Total generation time stays the same. Streaming doesn’t speed up the model. It makes the experience feel instant because you see progress right away.

Prerequisites

  • Python version: 3.10+
  • Required libraries: requests, aiohttp, fastapi, uvicorn
  • Install: pip install requests aiohttp fastapi uvicorn
  • API keys: At least one from OpenAI, Anthropic, or Google (setup in our LLM API tutorial)
  • Time to complete: ~25 minutes
  • Cost: Under $0.05 total

The SSE Protocol — How Streaming Works Under the Hood

Before writing streaming code, you need to know what travels across the wire. Every major LLM provider uses the same protocol: Server-Sent Events (SSE).

SSE sits on top of plain HTTP. The server keeps the connection open and pushes data chunks as they become ready. Each chunk follows a strict text format.

Here’s a raw SSE stream from an LLM call:

python
data: {"choices":[{"delta":{"content":"Hello"}}]}

data: {"choices":[{"delta":{"content":" there"}}]}

data: {"choices":[{"delta":{"content":"!"}}]}

data: [DONE]

Three rules govern every SSE message:

  1. Each message starts with data: followed by a JSON payload
  2. Messages are separated by two newlines (\n\n)
  3. The stream ends with data: [DONE]

That’s it. No binary framing, no handshake, no protocol upgrade. Plain text over a normal HTTP response.

Key Insight: SSE is HTTP with the door held open. A regular API call opens a connection, sends one response, and closes. SSE opens the connection and keeps pushing data until the server says “done.” Every LLM provider chose it because it works through firewalls, load balancers, and CDNs with zero special config.

The Content-Type header for SSE is text/event-stream. When you spot that header, you know you’re dealing with a stream. Your client reads the response body line by line instead of waiting for the full payload.

Quick check: If an SSE message looks like data: {"token":"Hi"}\n\ndata: {"token":"!"}\n\n, how many separate messages does it contain? Answer: two. Each data: line followed by \n\n is one message.

Stream OpenAI Responses with Raw HTTP

OpenAI’s streaming format is the most widely copied, so we start here. We’ll use raw requests — no SDK — so you see exactly what happens at the HTTP level.

Two changes from a normal API call: set "stream": true in the request body, and iterate over lines instead of calling .json(). Each line holds a delta object with a content field. That’s your token.

import micropip
await micropip.install(['requests'])

import os
from js import prompt
OPENAI_API_KEY = prompt("Enter your OpenAI API key:")
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
ANTHROPIC_API_KEY = prompt("Enter your Anthropic API key:")
os.environ["ANTHROPIC_API_KEY"] = ANTHROPIC_API_KEY
GOOGLE_API_KEY = prompt("Enter your Google API key:")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

import requests
import json
import os

api_key = os.environ["OPENAI_API_KEY"]

response = requests.post(
    "https://api.openai.com/v1/chat/completions",
    headers={
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
    },
    json={
        "model": "gpt-4o-mini",
        "messages": [{"role": "user", "content": "Explain SSE in 2 sentences."}],
        "stream": True,
    },
    stream=True,
)

for line in response.iter_lines():
    if not line:
        continue
    text = line.decode("utf-8")
    if text == "data: [DONE]":
        break
    if text.startswith("data: "):
        chunk = json.loads(text[6:])
        token = chunk["choices"][0]["delta"].get("content", "")
        print(token, end="", flush=True)

print()

Notice two things. stream=True appears twice — once in the JSON body (tells OpenAI to stream) and once in requests.post() (tells the requests library not to buffer). Skip either one and streaming breaks.

The delta object sometimes has an empty content field. That happens on the first chunk (which carries the role field) and between some tokens. The .get("content", "") handles it cleanly.

Warning: Don’t forget `stream=True` on `requests.post()` itself. Setting it only in the JSON body tells OpenAI to stream, but `requests` still buffers the full response in memory before your loop starts. You need both flags.

Stream Anthropic (Claude) Responses

Anthropic’s format differs in one key way. Instead of a single delta.content field, Claude sends typed events. The token text arrives inside events of type content_block_delta.

The request body uses "stream": true like OpenAI. But each SSE event includes an event: field before the data: line. We filter for the event type we care about.

api_key = os.environ["ANTHROPIC_API_KEY"]

response = requests.post(
    "https://api.anthropic.com/v1/messages",
    headers={
        "x-api-key": api_key,
        "anthropic-version": "2023-06-01",
        "Content-Type": "application/json",
    },
    json={
        "model": "claude-sonnet-4-20250514",
        "max_tokens": 200,
        "messages": [{"role": "user", "content": "Explain SSE in 2 sentences."}],
        "stream": True,
    },
    stream=True,
)

for line in response.iter_lines():
    if not line:
        continue
    text = line.decode("utf-8")
    if text.startswith("data: "):
        payload = json.loads(text[6:])
        if payload.get("type") == "content_block_delta":
            token = payload["delta"].get("text", "")
            print(token, end="", flush=True)

print()

Anthropic sends several event types: message_start, content_block_start, content_block_delta, and message_stop. We only care about content_block_delta for actual text. The rest carry metadata — token counts, stop reasons, and usage stats.

Stream Google Gemini Responses

Gemini takes a third path. The endpoint URL changes — you append :streamGenerateContent with alt=sse as a query parameter. Without alt=sse, Gemini returns one big JSON array instead of a stream.

I find Gemini’s token extraction path the deepest of the three. You dig through candidates[0].content.parts[0].text to reach the actual token. Verbose, but consistent.

api_key = os.environ["GOOGLE_API_KEY"]
model = "gemini-2.0-flash"

response = requests.post(
    f"https://generativelanguage.googleapis.com/v1beta/models/"
    f"{model}:streamGenerateContent?alt=sse&key={api_key}",
    headers={"Content-Type": "application/json"},
    json={
        "contents": [{"parts": [{"text": "Explain SSE in 2 sentences."}]}],
    },
    stream=True,
)

for line in response.iter_lines():
    if not line:
        continue
    text = line.decode("utf-8")
    if text.startswith("data: "):
        chunk = json.loads(text[6:])
        parts = chunk.get("candidates", [{}])[0]
        content = parts.get("content", {}).get("parts", [{}])[0]
        token = content.get("text", "")
        print(token, end="", flush=True)

print()

Tip: Gemini’s `alt=sse` is easy to miss. Without it, you get the full response as one JSON array — no streaming at all. Always include it when you want token-by-token delivery.

Here’s how the three providers compare at the protocol level:

Feature OpenAI Anthropic Google Gemini
Stream flag "stream": true in body "stream": true in body alt=sse in URL
Token field delta.content delta.text candidates[0].content.parts[0].text
End signal data: [DONE] type: message_stop Last chunk has finishReason
Event types Single format Multiple typed events Single format
Auth Bearer token x-api-key header API key in URL

Async Generators — The Clean Way to Stream

Raw HTTP parsing works, but it’s repetitive and hard to reuse. Every time you want to stream, you’d copy-paste the same SSE parsing loop. Async generators solve this.

An async generator uses async def and yield. It produces values one at a time, and the caller handles each value as it arrives. This maps perfectly to token streaming — yield each token, let the caller decide what to do with it.

Here’s an async generator for OpenAI using aiohttp:

import aiohttp
import asyncio
import json
import os

async def stream_openai(prompt, model="gpt-4o-mini"):
    """Async generator that yields tokens from OpenAI."""
    api_key = os.environ["OPENAI_API_KEY"]
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            },
            json={
                "model": model,
                "messages": [{"role": "user", "content": prompt}],
                "stream": True,
            },
        ) as resp:
            async for line in resp.content:
                text = line.decode("utf-8").strip()
                if text == "data: [DONE]":
                    return
                if text.startswith("data: "):
                    chunk = json.loads(text[6:])
                    token = chunk["choices"][0]["delta"].get("content", "")
                    if token:
                        yield token

The generator encapsulates all SSE parsing. The caller doesn’t touch data: prefixes, JSON parsing, or [DONE] signals. It just receives clean tokens.

Consuming it is simple:

async def main():
    async for token in stream_openai("What is backpressure?"):
        print(token, end="", flush=True)
    print()

asyncio.run(main())

Key Insight: Async generators turn streaming from a parsing loop into a clean data pipeline. The generator owns the protocol. The caller owns the business logic. Swap providers, add logging, or build middleware — just wrap or chain generators.

Here’s a before/after comparison to see why generators matter:

# BEFORE: Raw parsing (repeated every time you stream)
for line in response.iter_lines():
    if not line:
        continue
    text = line.decode("utf-8")
    if text == "data: [DONE]":
        break
    if text.startswith("data: "):
        chunk = json.loads(text[6:])
        token = chunk["choices"][0]["delta"].get("content", "")
        print(token, end="", flush=True)
# AFTER: Generator (one line to consume the stream)
async for token in stream_openai("Your prompt here"):
    print(token, end="", flush=True)

The “before” code is 10 lines of protocol plumbing you’d repeat everywhere. The “after” code is 2 lines. That’s the value of wrapping streams in generators.

Exercise 1: Build an Async Streaming Generator for Anthropic

typescript
{
  type: 'exercise',
  id: 'anthropic-stream-generator',
  title: 'Exercise 1: Build an Async Streaming Generator for Anthropic',
  difficulty: 'intermediate',
  exerciseType: 'write',
  instructions: 'Complete the async generator function that yields tokens from the Anthropic (Claude) API. The function should handle SSE parsing and only yield actual text tokens from `content_block_delta` events.',
  starterCode: 'import aiohttp\nimport json\nimport os\n\nasync def stream_anthropic(prompt, model=\"claude-sonnet-4-20250514\"):\n    \"\"\"Async generator that yields tokens from Anthropic.\"\"\"\n    api_key = os.environ[\"ANTHROPIC_API_KEY\"]\n    async with aiohttp.ClientSession() as session:\n        async with session.post(\n            \"https://api.anthropic.com/v1/messages\",\n            headers={\n                \"x-api-key\": api_key,\n                \"anthropic-version\": \"2023-06-01\",\n                \"Content-Type\": \"application/json\",\n            },\n            json={\n                \"model\": model,\n                \"max_tokens\": 1024,\n                \"messages\": [{\"role\": \"user\", \"content\": prompt}],\n                \"stream\": True,\n            },\n        ) as resp:\n            async for line in resp.content:\n                text = line.decode(\"utf-8\").strip()\n                # TODO: Parse SSE lines and yield tokens\n                # Only yield text from content_block_delta events\n                pass',
  testCases: [
    { id: 'tc1', input: 'import inspect; print(inspect.isasyncgenfunction(stream_anthropic))', expectedOutput: 'True', description: 'Function must be an async generator' },
    { id: 'tc2', input: 'import ast; source = inspect.getsource(stream_anthropic); print("content_block_delta" in source)', expectedOutput: 'True', description: 'Must filter for content_block_delta events' },
    { id: 'tc3', input: 'print("yield" in inspect.getsource(stream_anthropic))', expectedOutput: 'True', description: 'Must yield tokens', hidden: true },
  ],
  hints: [
    'Check if the line starts with "data: " and parse the JSON. Then check if payload["type"] == "content_block_delta" before yielding.',
    'Full pattern: if text.startswith("data: "): payload = json.loads(text[6:]); if payload.get("type") == "content_block_delta": yield payload["delta"]["text"]',
  ],
  solution: 'import aiohttp\nimport json\nimport os\n\nasync def stream_anthropic(prompt, model=\"claude-sonnet-4-20250514\"):\n    \"\"\"Async generator that yields tokens from Anthropic.\"\"\"\n    api_key = os.environ[\"ANTHROPIC_API_KEY\"]\n    async with aiohttp.ClientSession() as session:\n        async with session.post(\n            \"https://api.anthropic.com/v1/messages\",\n            headers={\n                \"x-api-key\": api_key,\n                \"anthropic-version\": \"2023-06-01\",\n                \"Content-Type\": \"application/json\",\n            },\n            json={\n                \"model\": model,\n                \"max_tokens\": 1024,\n                \"messages\": [{\"role\": \"user\", \"content\": prompt}],\n                \"stream\": True,\n            },\n        ) as resp:\n            async for line in resp.content:\n                text = line.decode(\"utf-8\").strip()\n                if not text or not text.startswith(\"data: \"):\n                    continue\n                payload = json.loads(text[6:])\n                if payload.get(\"type\") == \"content_block_delta\":\n                    token = payload[\"delta\"].get(\"text\", \"\")\n                    if token:\n                        yield token',
  solutionExplanation: 'The key is filtering for `content_block_delta` events. Anthropic sends multiple event types (message_start, content_block_start, etc.), but only `content_block_delta` carries the actual token text. We skip empty lines and non-data lines, parse the JSON payload, check the type, and yield the text from the delta object.',
  xpReward: 15,
}

Build a Unified Streaming Client

All three providers follow the same pattern: open a connection, read lines, extract tokens, yield them. The differences are just auth headers, JSON paths, and end signals. Let’s combine them into one generator.

First, we set up the provider configs. Each config holds the URL, default model, and headers:

async def stream_llm(prompt, provider="openai", model=None):
    """Unified async generator for OpenAI, Anthropic, Gemini."""
    configs = {
        "openai": {
            "url": "https://api.openai.com/v1/chat/completions",
            "model": model or "gpt-4o-mini",
            "headers": {
                "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
                "Content-Type": "application/json",
            },
        },
        "anthropic": {
            "url": "https://api.anthropic.com/v1/messages",
            "model": model or "claude-sonnet-4-20250514",
            "headers": {
                "x-api-key": os.environ["ANTHROPIC_API_KEY"],
                "anthropic-version": "2023-06-01",
                "Content-Type": "application/json",
            },
        },
        "gemini": {
            "model": model or "gemini-2.0-flash",
            "headers": {"Content-Type": "application/json"},
        },
    }
    cfg = configs[provider]

Next, we build the request body. Each provider expects a slightly different shape:

    # Build request body per provider
    if provider == "openai":
        url = cfg["url"]
        body = {
            "model": cfg["model"],
            "messages": [{"role": "user", "content": prompt}],
            "stream": True,
        }
    elif provider == "anthropic":
        url = cfg["url"]
        body = {
            "model": cfg["model"],
            "max_tokens": 1024,
            "messages": [{"role": "user", "content": prompt}],
            "stream": True,
        }
    else:
        url = (
            f"https://generativelanguage.googleapis.com/v1beta/models/"
            f"{cfg['model']}:streamGenerateContent"
            f"?alt=sse&key={os.environ['GOOGLE_API_KEY']}"
        )
        body = {"contents": [{"parts": [{"text": prompt}]}]}

Finally, the streaming loop. We open one aiohttp session and route token extraction based on the provider:

    async with aiohttp.ClientSession() as session:
        async with session.post(
            url, headers=cfg["headers"], json=body
        ) as response:
            async for line in response.content:
                text = line.decode("utf-8").strip()
                if not text or not text.startswith("data: "):
                    continue
                data_str = text[6:]
                if data_str == "[DONE]":
                    return
                payload = json.loads(data_str)

                if provider == "openai":
                    token = (payload.get("choices", [{}])[0]
                             .get("delta", {}).get("content", ""))
                elif provider == "anthropic":
                    if payload.get("type") != "content_block_delta":
                        continue
                    token = payload["delta"].get("text", "")
                else:
                    parts = payload.get("candidates", [{}])[0]
                    token = (parts.get("content", {})
                             .get("parts", [{}])[0].get("text", ""))

                if token:
                    yield token

All three code blocks above form one function. You can stream from any provider with a single call:

async def compare_providers():
    prompt = "Name 3 uses of async generators."
    for provider in ["openai", "anthropic", "gemini"]:
        print(f"\n--- {provider.upper()} ---")
        async for token in stream_llm(prompt, provider=provider):
            print(token, end="", flush=True)
        print()

asyncio.run(compare_providers())

Exercise 2: Add Token Counting to the Unified Client

typescript
{
  type: 'exercise',
  id: 'token-counting-wrapper',
  title: 'Exercise 2: Add Token Counting to the Unified Client',
  difficulty: 'intermediate',
  exerciseType: 'write',
  instructions: 'Write a wrapper async generator called `stream_with_count` that wraps `stream_llm`, yields each token, and prints a summary line at the end showing the total token count and characters received.',
  starterCode: 'async def stream_with_count(prompt, provider=\"openai\"):\n    \"\"\"Wrapper that yields tokens and prints count at the end.\"\"\"\n    token_count = 0\n    char_count = 0\n    # TODO: iterate over stream_llm, count tokens, yield each one\n    # After the loop, print: f\"[{token_count} tokens, {char_count} chars]\"\n    pass',
  testCases: [
    { id: 'tc1', input: 'import inspect; print(inspect.isasyncgenfunction(stream_with_count))', expectedOutput: 'True', description: 'Must be an async generator function' },
    { id: 'tc2', input: 'source = inspect.getsource(stream_with_count); print("stream_llm" in source and "yield" in source)', expectedOutput: 'True', description: 'Must wrap stream_llm and yield tokens' },
  ],
  hints: [
    'Use `async for token in stream_llm(prompt, provider=provider):` inside your function. Increment counters and yield each token.',
    'After the async for loop ends, print the summary: `print(f\"[{token_count} tokens, {char_count} chars]\")`',
  ],
  solution: 'async def stream_with_count(prompt, provider=\"openai\"):\n    \"\"\"Wrapper that yields tokens and prints count at the end.\"\"\"\n    token_count = 0\n    char_count = 0\n    async for token in stream_llm(prompt, provider=provider):\n        token_count += 1\n        char_count += len(token)\n        yield token\n    print(f\"[{token_count} tokens, {char_count} chars]\")',
  solutionExplanation: 'This wrapper demonstrates the composability of async generators. It wraps `stream_llm`, passes each token through to the caller via `yield`, and tracks counts along the way. The summary prints after the stream ends. This pattern works for any middleware -- logging, filtering, rate limiting -- without modifying the original generator.',
  xpReward: 15,
}

Build a FastAPI Streaming Server

So far, you’ve consumed streams from LLM providers. Let’s flip it. You’ll build a server that accepts prompts and streams responses to your own users.

FastAPI’s StreamingResponse takes an async generator and sends each yielded chunk to the client. The sse_generator function below wraps our stream_openai generator and formats each token as a proper SSE message with data: prefix and \n\n separator.

python
# server.py -- NOT runnable in Pyodide (requires local Python)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def sse_generator(prompt):
    """Wraps the LLM stream into SSE format."""
    async for token in stream_openai(prompt):
        yield f"data: {json.dumps({'token': token})}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/stream")
async def stream_endpoint(request: dict):
    prompt = request.get("prompt", "")
    return StreamingResponse(
        sse_generator(prompt),
        media_type="text/event-stream",
    )

Start the server:

bash
uvicorn server:app --host 0.0.0.0 --port 8000
Note: This server code needs local Python. FastAPI with uvicorn can’t run in Pyodide. The client code below works anywhere you can make HTTP requests — including Pyodide if your server is deployed publicly.

Here’s the async client that connects to your server and displays tokens:

# client.py -- works anywhere with HTTP access
async def consume_stream(server_url, prompt):
    """Connect to our FastAPI server and display tokens."""
    async with aiohttp.ClientSession() as session:
        async with session.post(
            f"{server_url}/stream",
            json={"prompt": prompt},
        ) as response:
            async for line in response.content:
                text = line.decode("utf-8").strip()
                if not text:
                    continue
                if text == "data: [DONE]":
                    print("\n[Stream complete]")
                    return
                if text.startswith("data: "):
                    payload = json.loads(text[6:])
                    print(payload["token"], end="", flush=True)

asyncio.run(consume_stream("http://localhost:8000", "What is backpressure?"))

Predict the output: What happens if the client connects but the server’s stream_openai generator raises an exception mid-stream? The client receives a truncated response — some tokens arrive, then the connection closes without [DONE]. Your client should always handle this case with a timeout or a check for the [DONE] signal.

Handle Backpressure in LLM Streaming

What happens when tokens arrive faster than your code can process them? Maybe you’re writing each token to a database. Maybe the network is slow. This is backpressure — and ignoring it causes memory to balloon.

With SSE over TCP, basic backpressure works automatically. If the client stops reading, the TCP buffer fills, and the server blocks. But this is blunt. It can stall the entire stream for everyone if you share a connection pool.

A cleaner approach: put an asyncio.Queue between the producer (LLM stream) and the consumer (your processing logic). The queue decouples their speeds with a bounded buffer.

async def buffered_stream(prompt, buffer_size=50):
    """Buffer tokens in a queue to handle speed mismatch."""
    queue = asyncio.Queue(maxsize=buffer_size)

    async def producer():
        async for token in stream_openai(prompt):
            await queue.put(token)
        await queue.put(None)  # sentinel: done

    async def consumer():
        while True:
            token = await queue.get()
            if token is None:
                break
            await asyncio.sleep(0.05)  # simulate slow work
            print(token, end="", flush=True)

    await asyncio.gather(producer(), consumer())
    print()

asyncio.run(buffered_stream("Explain backpressure in 3 sentences."))

The maxsize=50 is the lever. When the queue fills, queue.put() pauses the producer until the consumer catches up. No token is dropped. No memory explodes.

Tip: Size the buffer to your consumer’s speed. Real-time display? 10-20 tokens is fine. Database writes? Bump it to 50-100 to absorb bursts. Setting `maxsize=0` means unlimited — you lose all backpressure protection.

Measure Streaming Performance

Two metrics matter: Time to First Token (TTFT) and tokens per second. TTFT tells you how fast the user sees something. Throughput tells you how fast the full response arrives.

The function below measures both. It uses time.perf_counter() for accurate timing and counts tokens as they flow through the generator.

import time

async def measure_stream(prompt, provider="openai"):
    """Measure TTFT and throughput for a streaming call."""
    start = time.perf_counter()
    first_token_time = None
    token_count = 0

    async for token in stream_llm(prompt, provider=provider):
        if first_token_time is None:
            first_token_time = time.perf_counter() - start
        token_count += 1

    total_time = time.perf_counter() - start
    tps = token_count / total_time if total_time > 0 else 0
    return {
        "provider": provider,
        "ttft_ms": round(first_token_time * 1000, 1) if first_token_time else 0,
        "total_s": round(total_time, 2),
        "tokens": token_count,
        "tok_per_sec": round(tps, 1),
    }

Run it against all three providers:

async def benchmark():
    prompt = "Write a 100-word summary of how neural networks learn."
    for provider in ["openai", "anthropic", "gemini"]:
        r = await measure_stream(prompt, provider=provider)
        print(f"{r['provider']:>10}: TTFT={r['ttft_ms']}ms | "
              f"{r['tok_per_sec']} tok/s | Total={r['total_s']}s")

asyncio.run(benchmark())

Your numbers will vary based on location, network speed, and server load. I’ve consistently seen Gemini deliver the fastest TTFT, with Anthropic competitive on throughput and OpenAI solid across both metrics. But rankings shift week to week. What matters: all three stream fast enough for smooth user experience.

Warning: Don’t benchmark on a single call. Network jitter can swing TTFT by 200ms. Run at least 5 iterations and take the median for meaningful numbers.

Common Mistakes and How to Fix Them

Mistake 1: Forgetting stream=True on the HTTP client

Wrong:

response = requests.post(url, json=body, stream=False)
for line in response.iter_lines():  # blocks until full response
    print(line)

Why it’s wrong: Without stream=True, requests downloads the entire response before iter_lines() runs. You lose all streaming.

Correct:

response = requests.post(url, json=body, stream=True)
for line in response.iter_lines():  # yields lines live
    print(line)

Mistake 2: Parsing every SSE line as JSON

Wrong:

for line in response.iter_lines():
    chunk = json.loads(line)  # crashes on empty lines and [DONE]

Why it’s wrong: SSE streams include empty separator lines and the data: [DONE] sentinel. Blind JSON parsing crashes immediately.

Correct:

for line in response.iter_lines():
    if not line:
        continue
    text = line.decode("utf-8")
    if text.startswith("data: ") and text != "data: [DONE]":
        chunk = json.loads(text[6:])

Mistake 3: Blocking the event loop with time.sleep()

Wrong:

async def process_stream():
    async for token in stream_openai("Hello"):
        time.sleep(0.1)  # freezes everything!
        print(token)

Why it’s wrong: time.sleep() blocks the entire event loop. Other coroutines — including the one reading the stream — can’t run.

Correct:

async def process_stream():
    async for token in stream_openai("Hello"):
        await asyncio.sleep(0.1)  # gives control back
        print(token)

Error Troubleshooting

aiohttp.ClientConnectionError: Connection closed
The SSE connection dropped mid-stream. Wrap your async for loop in try/except and implement retry logic. Most providers allow resuming with the same prompt.

json.JSONDecodeError: Expecting value
You’re parsing a non-JSON line (empty line, [DONE] sentinel, or event: prefix). Always check text.startswith("data: ") and skip [DONE] before calling json.loads().

RuntimeError: cannot reuse already awaited coroutine
You called asyncio.run() inside code that’s already running an event loop (common in Jupyter). Use await main() instead, or install nest_asyncio and call nest_asyncio.apply() first.

When NOT to Use Streaming

Streaming isn’t always the right call. Here are three situations where non-streaming wins:

Batch processing. Processing 1,000 documents overnight? Streaming adds complexity with no benefit. You don’t need real-time display. Non-streaming calls are simpler to retry and easier to parallelize.

JSON mode / structured output. When the LLM returns valid JSON, streaming gives you partial JSON you can’t parse mid-stream. You’d have to buffer the complete response anyway.

Short responses under 50 tokens. If the full response takes under 500ms, streaming adds overhead (connection management, SSE parsing) for no visible UX gain.

I’ve found roughly 80% of user-facing LLM calls benefit from streaming, and 80% of backend calls don’t. Match the tool to the task.

Summary

You’ve built LLM streaming from the ground up. You parsed raw SSE from all three major providers, wrapped the protocol in reusable async generators, built a FastAPI server that streams to your own clients, and learned to measure performance and handle backpressure.

The core pattern is simple: set stream=True, read lines, parse JSON, yield tokens. Everything else — unified clients, servers, buffering — is just wrapping that pattern for different use cases.

Practice Exercise

Build a streaming retry wrapper. The function should attempt to stream, and if the connection drops mid-stream, retry up to 3 times, picking up from where it left off (by including the partial response in the retry prompt).

Click to reveal solution
async def stream_with_retry(prompt, provider="openai", max_retries=3):
    """Stream with automatic retry on connection failure."""
    accumulated = ""
    for attempt in range(max_retries + 1):
        try:
            retry_prompt = prompt
            if accumulated:
                retry_prompt = (
                    f"{prompt}\n\nContinue from where you left off. "
                    f"So far you said: {accumulated}"
                )
            async for token in stream_llm(retry_prompt, provider):
                accumulated += token
                yield token
            return  # success, no retry needed
        except Exception as e:
            if attempt == max_retries:
                print(f"\n[Failed after {max_retries} retries: {e}]")
                return
            print(f"\n[Retry {attempt + 1}/{max_retries}...]")

# Usage:
# async for token in stream_with_retry("Explain SSE"):
#     print(token, end="", flush=True)

Complete Code

Click to expand the full script (copy-paste and run)
# Complete code from: LLM Streaming in Python
# Requires: pip install requests aiohttp
# Python 3.10+
# Set environment variables: OPENAI_API_KEY, ANTHROPIC_API_KEY, GOOGLE_API_KEY

import requests
import aiohttp
import asyncio
import json
import os
import time


# --- Section 1: Raw HTTP streaming (OpenAI) ---

def stream_openai_sync(prompt):
    """Synchronous streaming from OpenAI using requests."""
    api_key = os.environ["OPENAI_API_KEY"]
    response = requests.post(
        "https://api.openai.com/v1/chat/completions",
        headers={
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        },
        json={
            "model": "gpt-4o-mini",
            "messages": [{"role": "user", "content": prompt}],
            "stream": True,
        },
        stream=True,
    )
    for line in response.iter_lines():
        if not line:
            continue
        text = line.decode("utf-8")
        if text == "data: [DONE]":
            break
        if text.startswith("data: "):
            chunk = json.loads(text[6:])
            token = chunk["choices"][0]["delta"].get("content", "")
            print(token, end="", flush=True)
    print()


# --- Section 2: Async generator (OpenAI) ---

async def stream_openai(prompt, model="gpt-4o-mini"):
    """Async generator that yields tokens from OpenAI."""
    api_key = os.environ["OPENAI_API_KEY"]
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            },
            json={
                "model": model,
                "messages": [{"role": "user", "content": prompt}],
                "stream": True,
            },
        ) as resp:
            async for line in resp.content:
                text = line.decode("utf-8").strip()
                if text == "data: [DONE]":
                    return
                if text.startswith("data: "):
                    chunk = json.loads(text[6:])
                    token = chunk["choices"][0]["delta"].get("content", "")
                    if token:
                        yield token


# --- Section 3: Unified streaming client ---

async def stream_llm(prompt, provider="openai", model=None):
    """Unified async generator for OpenAI, Anthropic, and Gemini."""
    configs = {
        "openai": {
            "url": "https://api.openai.com/v1/chat/completions",
            "model": model or "gpt-4o-mini",
            "headers": {
                "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
                "Content-Type": "application/json",
            },
        },
        "anthropic": {
            "url": "https://api.anthropic.com/v1/messages",
            "model": model or "claude-sonnet-4-20250514",
            "headers": {
                "x-api-key": os.environ["ANTHROPIC_API_KEY"],
                "anthropic-version": "2023-06-01",
                "Content-Type": "application/json",
            },
        },
        "gemini": {
            "model": model or "gemini-2.0-flash",
            "headers": {"Content-Type": "application/json"},
        },
    }
    cfg = configs[provider]

    if provider == "openai":
        url = cfg["url"]
        body = {
            "model": cfg["model"],
            "messages": [{"role": "user", "content": prompt}],
            "stream": True,
        }
    elif provider == "anthropic":
        url = cfg["url"]
        body = {
            "model": cfg["model"],
            "max_tokens": 1024,
            "messages": [{"role": "user", "content": prompt}],
            "stream": True,
        }
    else:
        url = (
            f"https://generativelanguage.googleapis.com/v1beta/models/"
            f"{cfg['model']}:streamGenerateContent"
            f"?alt=sse&key={os.environ['GOOGLE_API_KEY']}"
        )
        body = {"contents": [{"parts": [{"text": prompt}]}]}

    async with aiohttp.ClientSession() as session:
        async with session.post(
            url, headers=cfg["headers"], json=body
        ) as response:
            async for line in response.content:
                text = line.decode("utf-8").strip()
                if not text or not text.startswith("data: "):
                    continue
                data_str = text[6:]
                if data_str == "[DONE]":
                    return
                payload = json.loads(data_str)

                if provider == "openai":
                    token = (payload.get("choices", [{}])[0]
                             .get("delta", {}).get("content", ""))
                elif provider == "anthropic":
                    if payload.get("type") != "content_block_delta":
                        continue
                    token = payload["delta"].get("text", "")
                else:
                    parts = payload.get("candidates", [{}])[0]
                    token = (parts.get("content", {})
                             .get("parts", [{}])[0].get("text", ""))

                if token:
                    yield token


# --- Section 4: Backpressure handling ---

async def buffered_stream(prompt, buffer_size=50):
    """Buffer tokens in a queue to handle speed mismatch."""
    queue = asyncio.Queue(maxsize=buffer_size)

    async def producer():
        async for token in stream_openai(prompt):
            await queue.put(token)
        await queue.put(None)

    async def consumer():
        while True:
            token = await queue.get()
            if token is None:
                break
            await asyncio.sleep(0.05)
            print(token, end="", flush=True)

    await asyncio.gather(producer(), consumer())
    print()


# --- Section 5: Performance measurement ---

async def measure_stream(prompt, provider="openai"):
    """Measure TTFT and throughput for a streaming call."""
    start = time.perf_counter()
    first_token_time = None
    token_count = 0

    async for token in stream_llm(prompt, provider=provider):
        if first_token_time is None:
            first_token_time = time.perf_counter() - start
        token_count += 1

    total_time = time.perf_counter() - start
    tps = token_count / total_time if total_time > 0 else 0
    return {
        "provider": provider,
        "ttft_ms": round(first_token_time * 1000, 1) if first_token_time else 0,
        "total_s": round(total_time, 2),
        "tokens": token_count,
        "tok_per_sec": round(tps, 1),
    }


# --- Main ---

async def main():
    prompt = "Explain async generators in Python in 3 sentences."

    print("=== Streaming from all providers ===\n")
    for provider in ["openai", "anthropic", "gemini"]:
        print(f"--- {provider.upper()} ---")
        async for token in stream_llm(prompt, provider=provider):
            print(token, end="", flush=True)
        print("\n")

    print("=== Performance Benchmark ===\n")
    for provider in ["openai", "anthropic", "gemini"]:
        r = await measure_stream(prompt, provider=provider)
        print(f"{r['provider']:>10}: TTFT={r['ttft_ms']}ms | "
              f"{r['tok_per_sec']} tok/s | Total={r['total_s']}s")

if __name__ == "__main__":
    asyncio.run(main())
    print("\nScript completed successfully.")

Frequently Asked Questions

Can I use streaming with function calling / tool use?

Yes, but the stream includes tool call arguments as they generate. You get partial JSON in delta.tool_calls that you must accumulate until the tool call completes. Only then can you parse it. Both OpenAI and Anthropic support this — check their streaming docs for the specific event types.

Is SSE the same as WebSockets?

No. SSE is one-way (server to client) over standard HTTP. WebSockets are bidirectional and require a protocol upgrade handshake. For LLM streaming, SSE wins because you only need server-to-client flow, and it works through proxies and CDNs without special config.

How do I handle errors mid-stream?

Wrap the streaming loop in try/except and track received tokens. If the connection drops, retry with the partial response included in the prompt. Most providers also send error events in the SSE stream before closing.

Does streaming work with Ollama and local models?

Yes. Ollama serves the same SSE-style streaming at localhost:11434. Set "stream": true in the body and parse lines the same way. The stream_llm function from this tutorial works with Ollama if you add a local provider config.

Why does my stream hang in Jupyter notebooks?

Jupyter already runs an event loop. Calling asyncio.run() inside it raises an error or hangs. Fix: install nest_asyncio, call nest_asyncio.apply() at the top, then use await main() directly in a cell.

References

  1. OpenAI API documentation — Streaming responses. Link
  2. Anthropic API documentation — Streaming messages. Link
  3. Google Gemini API documentation — Stream generate content. Link
  4. MDN Web Docs — Server-Sent Events. Link
  5. FastAPI documentation — StreamingResponse. Link
  6. Python aiohttp documentation — Client quickstart. Link
  7. Python asyncio documentation — Async generators (PEP 525). Link
  8. WHATWG — Server-Sent Events specification. Link
Free Course
Master Core Python — Your First Step into AI/ML

Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.

Start Free Course
Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Free Callback - Limited Slots
Not Sure Which Course to Start With?
Talk to our AI Counsellors and Practitioners. We'll help you clear all your questions for your background and goals, bridging the gap between your current skills and a career in AI.
10-digit mobile number
📞
Thank You!
We'll Call You Soon!
Our learning advisor will reach out within 24 hours.
(Check your inbox too — we've sent a confirmation)
⚡ Before you go

Python.
SQL. NumPy.
All free.

Get the exact 10-course programming foundation that Data Science professionals use.

🐍
Core Python — from first line to expert level
📈
NumPy & Pandas — the #1 libraries every DS job needs
🗃️
SQL Levels I–III — basics to Window Functions
📄
Real industry data — Jupyter notebooks included
R A M S K
57,000+ students
★★★★★ Rated 4.9/5
⚡ Before you go
Python. SQL.
All Free.
R A M S K
57,000+ students  ★★★★★ 4.9/5
Get Free Access Now
10 courses. Real projects. Zero cost. No credit card.
New learners enrolling right now
🔒 100% free ☕ No spam, ever ✓ Instant access
🚀
You're in!
Check your inbox for your access link.
(Check Promotions or Spam if you don't see it)
Or start your first course right now:
Start Free Course →
Scroll to Top
Scroll to Top
Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science