Menu

OpenAI Chat Completions API: Complete Python Guide

Master the OpenAI API in Python with raw HTTP requests. Learn chat completions, streaming, parameters, error handling, retries, and cost tracking with runnable examples.

Written by Selva Prabhakaran | 29 min read

Build a robust OpenAI client with streaming, retry logic, error handling, and cost tracking — using raw HTTP requests.

This post has interactive code — click ‘Run’ or press Ctrl+Enter on any code block to execute it directly in your browser. The first run may take a few seconds to initialize.

You’ve seen ChatGPT write smart answers. But when you try to add that power to your own Python app, real questions come up. How do you shape the response? How do you stream tokens live? What if the API throws a 429 error at 2 AM and your script dies?

This article covers all of it. You’ll build a full OpenAI API client with raw HTTP — no SDK needed. By the end, you’ll make chat calls, tune every output setting, stream tokens one by one, handle errors, and track what each call costs.

What Is the OpenAI Chat Completions API?

The Chat Completions API is one HTTP endpoint. You send it a list of messages — a chat — and it sends back the model’s reply. Every call to GPT-4o, GPT-4o-mini, or any OpenAI model goes through this URL.

The URL is https://api.openai.com/v1/chat/completions. You POST your API key in the header and a JSON body with the model name and messages. You get back JSON with the text, token counts, and more.

That’s the whole thing. One endpoint, one format in, one format out. Streaming, params, error handling — all of it builds on this base.

KEY INSIGHT: The OpenAI API is a plain REST API. You don’t need a special SDK. Any tool that sends HTTP POST requests — Python’s requests, curl, even a browser plugin — can call it. Knowing the raw HTTP layer gives you full control and makes bugs easier to find.

Setting Up: Your First Raw API Call

Prerequisites

  • Python version: 3.9+
  • Required library: requests
  • Install: pip install requests
  • API key: You need an OpenAI API key. Make one at platform.openai.com/api-keys. Save it in an env var called OPENAI_API_KEY.
  • Time to complete: 25-30 minutes

NOTE: This tutorial uses Python’s requests library for HTTP calls. The code runs in any normal Python setup. For browser use with Pyodide, swap requests.post() for pyodide.http.pyfetch() — the JSON body and headers stay the same.

Here’s the simplest API call you can make. We send one user message to gpt-4o-mini and print the reply. The requests.post() call sends our JSON to the endpoint. We then grab the text from choices[0].message.content in the response.

import micropip
await micropip.install('requests')

import requests
import os
import json

API_KEY = os.environ.get("OPENAI_API_KEY")
API_URL = "https://api.openai.com/v1/chat/completions"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

payload = {
    "model": "gpt-4o-mini",
    "messages": [
        {"role": "user", "content": "What is gradient descent in one sentence?"}
    ]
}

response = requests.post(API_URL, headers=headers, json=payload)
data = response.json()
print(data["choices"][0]["message"]["content"])

Three things to note. The choices array holds one or more replies. Each choice has a message with role and content. And the usage field tells you how many tokens the call used.

Let’s look at the full response. This prints the whole JSON so you can see every field — token counts and all.

print(json.dumps(data, indent=2))

The usage field is your billing meter. prompt_tokens counts what you sent. completion_tokens counts what the model wrote. total_tokens is the sum. You’ll use these for cost tracking later.

Quick Check: If a response shows prompt_tokens: 50 and completion_tokens: 120, how many total tokens were billed? (Answer: 170. Input and output tokens are summed, and you pay for both.)

Multi-Turn Conversations: The Messages Array

One question and one answer is fine. But real apps need back-and-forth chat. The API is stateless — it doesn’t recall past calls. You keep the chat history by sending all messages each time.

Each message has a role. Three choices: system sets the model’s tone, user is your input, and assistant is the model’s past replies. By adding old exchanges, you give it context.

Here’s a two-turn chat. We ask the model to explain gradient descent, then ask for code. The key: include the model’s first reply as an assistant message so it knows what you’re talking about.

messages = [
    {"role": "system", "content": "You are a concise data science tutor."},
    {"role": "user", "content": "Explain gradient descent briefly."},
    {"role": "assistant", "content": "Gradient descent iteratively adjusts model parameters by moving in the direction of steepest decrease of the loss function."},
    {"role": "user", "content": "Show me a 5-line Python example."}
]

payload = {"model": "gpt-4o-mini", "messages": messages}
response = requests.post(API_URL, headers=headers, json=payload)
print(response.json()["choices"][0]["message"]["content"])

The model reads the whole chat and replies in context. Without that prior assistant message, it won’t know what “a 5-line Python example” means.

WARNING: Every message counts toward your token bill. A 20-message chat sends ALL 20 messages each time. That adds up fast. Trim old messages when the chat grows past what you need.

Generation Parameters: Controlling the Output

The basic API call works fine. But for real use, you need tighter control over the model’s output. These settings go in your request body next to model and messages.

temperature

Controls randomness. Range: 0.0 to 2.0. Default: 1.0.

At temperature=0, the model almost always picks the most likely token. The output is nearly the same each time. At temperature=1.5, it picks more freely — creative, but sometimes messy.

top_p

Another way to control randomness, called nucleus sampling. Range: 0.0 to 1.0. Default: 1.0.

With top_p=0.1, the model only looks at the top 10% most likely tokens. This shrinks the pool in a way that’s different from temperature. OpenAI says to change one or the other, not both.

max_completion_tokens

Caps how long the reply can be. Default: varies by model.

Setting max_completion_tokens=100 stops the model after 100 output tokens, even if it’s mid-sentence. I find this key for keeping costs down in batch jobs.

NOTE: The older max_tokens param still works but is outdated. Use max_completion_tokens for new code. The o-series models (like o1, o3) need this newer param because max_tokens doesn’t count their thinking tokens.

stop

A string or list of up to 4 strings. Default: null.

When the model generates any of these sequences, it stops. Handy for structured output: "stop": ["\n\n", "END"] halts generation at a double newline or the word “END”.

frequency_penalty and presence_penalty

Both range from -2.0 to 2.0. Default: 0 for each.

frequency_penalty makes repeat words less likely, based on how often they’ve shown up. presence_penalty adds a flat cost to any word that’s been used at all. Use frequency_penalty to cut repeats. Use presence_penalty to push the model toward fresh topics.

response_format (JSON mode)

Want clean JSON back instead of free-form text? Set "response_format": {"type": "json_object"} and say “JSON” in your prompt. The model will return valid JSON every time — no more guessing if the format is right.

This helps a lot when you extract data and need a stable shape. Without it, the model might wrap JSON in code fences or add chat text around it.

Here’s a call that uses several params at once. We set temperature=1.3 for a creative reply, max_completion_tokens=80 to cap length, and frequency_penalty=0.8 to cut down on repeat words.

payload = {
    "model": "gpt-4o-mini",
    "messages": [
        {"role": "user", "content": "Write a haiku about debugging code."}
    ],
    "temperature": 1.3,
    "max_completion_tokens": 80,
    "frequency_penalty": 0.8,
    "presence_penalty": 0.3
}

response = requests.post(API_URL, headers=headers, json=payload)
print(response.json()["choices"][0]["message"]["content"])

Now change temperature to 0.0 and run it again. You’ll get a duller, more stable haiku. That’s the tradeoff — steady output vs. wild variety.

TIP: For data work and code tasks, use temperature=0 or 0.1. For creative writing, try 0.7 to 1.2. Above 1.5 usually gives you nonsense. I’ve learned this the hard way on a few batch runs.

Here’s a quick reference table.

ParameterRangeDefaultUse Case
temperature0.0 — 2.01.0Control randomness
top_p0.0 — 1.01.0Nucleus sampling (alt. to temperature)
max_completion_tokens1 — model maxModel-dep.Cap response length
stopstring or listnullStop at specific sequences
frequency_penalty-2.0 — 2.00.0Reduce word repetition
presence_penalty-2.0 — 2.00.0Encourage new topics
response_formatobjectnullForce JSON output

{
  type: 'exercise',
  id: 'params-exercise',
  title: 'Exercise 1: Craft a Precise API Call',
  difficulty: 'beginner',
  exerciseType: 'write',
  instructions: 'Build a payload dictionary for a chat completions request that: (1) uses the gpt-4o-mini model, (2) asks "List 3 Python web frameworks", (3) limits the response to 60 tokens, and (4) sets temperature to 0 for a deterministic answer. Print the payload as formatted JSON.',
  starterCode: 'import json\n\n# Build the payload dictionary\npayload = {\n    "model": "gpt-4o-mini",\n    "messages": [\n        {"role": "user", "content": "List 3 Python web frameworks"}\n    ],\n    # Add max_completion_tokens and temperature here\n}\n\nprint(json.dumps(payload, indent=2))',
  testCases: [
    { id: 'tc1', input: 'print(payload.get("max_completion_tokens"))', expectedOutput: '60', description: 'max_completion_tokens should be 60' },
    { id: 'tc2', input: 'print(payload.get("temperature"))', expectedOutput: '0', description: 'temperature should be 0' },
    { id: 'tc3', input: 'print(payload["messages"][0]["content"])', expectedOutput: 'List 3 Python web frameworks', description: 'Message content should match', hidden: true }
  ],
  hints: [
    'Add "max_completion_tokens": 60 and "temperature": 0 as keys in the payload dictionary.',
    'Full answer: payload = {"model": "gpt-4o-mini", "messages": [{"role": "user", "content": "List 3 Python web frameworks"}], "max_completion_tokens": 60, "temperature": 0}'
  ],
  solution: 'import json\n\npayload = {\n    "model": "gpt-4o-mini",\n    "messages": [\n        {"role": "user", "content": "List 3 Python web frameworks"}\n    ],\n    "max_completion_tokens": 60,\n    "temperature": 0\n}\n\nprint(json.dumps(payload, indent=2))',
  solutionExplanation: 'The payload is a standard Python dictionary. "max_completion_tokens": 60 caps the response at 60 tokens. "temperature": 0 makes the output nearly deterministic -- the model picks the highest-probability token every time.',
  xpReward: 10,
}

Now that you can shape the model’s output, let’s tackle how to receive it in real time.

Streaming Responses Token by Token

With a normal API call, you wait for the whole reply before you see any text. For a long answer, that’s a few seconds of dead air. Streaming fixes this.

Set "stream": true, and the API sends tokens as the model writes them. Each token arrives as a Server-Sent Event (SSE) — a text line that starts with data:. Your app can show text live, just like ChatGPT does.

Here’s how streaming works. We pass stream=True to requests.post() and loop over lines as they come in. Each SSE line holds a JSON chunk with a delta — the new text bit. The stream ends with data: [DONE].

payload = {
    "model": "gpt-4o-mini",
    "messages": [
        {"role": "user", "content": "Explain what an API is in 3 sentences."}
    ],
    "stream": True
}

response = requests.post(
    API_URL, headers=headers, json=payload, stream=True
)

full_response = ""
for line in response.iter_lines():
    line = line.decode("utf-8")
    if line.startswith("data: ") and line != "data: [DONE]":
        chunk = json.loads(line[6:])
        delta = chunk["choices"][0]["delta"]
        if "content" in delta:
            token = delta["content"]
            print(token, end="", flush=True)
            full_response += token

print()  # newline after streaming completes
print(f"\nFull response length: {len(full_response)} characters")

See the shift: normal replies have a message field, but streamed chunks have delta instead. The first chunk’s delta is {"role": "assistant"}. Later chunks hold {"content": "..."} with a token or two. The last chunk’s delta is empty — {}.

KEY INSIGHT: Streaming doesn’t change what the model writes. It changes when you get it. Token count, cost, and quality are the same whether you stream or not. Streaming just kills the wait.

Quick Check: In a streaming response, what does the delta field contain in the very first chunk? (Answer: {"role": "assistant"}. The content tokens start in the second chunk onward.)

One gotcha: streamed replies don’t include a usage field by default. If you need token counts — and you do for cost tracking — add "stream_options": {"include_usage": true}. The last chunk will then carry the usage data.

payload = {
    "model": "gpt-4o-mini",
    "messages": [
        {"role": "user", "content": "What is Python?"}
    ],
    "stream": True,
    "stream_options": {"include_usage": True}
}

response = requests.post(
    API_URL, headers=headers, json=payload, stream=True
)

usage_data = None
for line in response.iter_lines():
    line = line.decode("utf-8")
    if line.startswith("data: ") and line != "data: [DONE]":
        chunk = json.loads(line[6:])
        if chunk.get("usage"):
            usage_data = chunk["usage"]
        delta = chunk["choices"][0]["delta"]
        if "content" in delta:
            print(delta["content"], end="", flush=True)

print(f"\n\nTokens used: {usage_data}")


{
  type: 'exercise',
  id: 'streaming-exercise',
  title: 'Exercise 2: Build a Streaming Token Counter',
  difficulty: 'intermediate',
  exerciseType: 'write',
  instructions: 'Write a function called `count_streamed_tokens` that takes a list of SSE lines (strings) and returns the total number of content tokens received. Each line is either a data line like \'data: {"choices":[{"delta":{"content":"hello"}}]}\' or the terminator \'data: [DONE]\'. Count each chunk that has a "content" key in the delta as one token.',
  starterCode: 'import json\n\ndef count_streamed_tokens(sse_lines):\n    """Count content tokens in a list of SSE lines."""\n    count = 0\n    for line in sse_lines:\n        # Skip non-data lines and the [DONE] marker\n        if not line.startswith("data: ") or line == "data: [DONE]":\n            continue\n        # Parse the JSON and check for content in delta\n        # YOUR CODE HERE\n    return count\n\n# Test\ntest_lines = [\n    \'data: {"choices":[{"delta":{"role":"assistant"}}]}\',\n    \'data: {"choices":[{"delta":{"content":"Hello"}}]}\',\n    \'data: {"choices":[{"delta":{"content":" world"}}]}\',\n    \'data: {"choices":[{"delta":{"content":"!"}}]}\',\n    \'data: {"choices":[{"delta":{}}]}\',\n    \'data: [DONE]\'\n]\nprint(count_streamed_tokens(test_lines))',
  testCases: [
    { id: 'tc1', input: 'print(count_streamed_tokens(test_lines))', expectedOutput: '3', description: 'Should count 3 content tokens (Hello, world, !)' },
    { id: 'tc2', input: 'print(count_streamed_tokens(["data: [DONE]"]))', expectedOutput: '0', description: 'Empty stream returns 0' }
  ],
  hints: [
    'Parse the JSON with json.loads(line[6:]) to skip the "data: " prefix. Then check if "content" exists in chunk["choices"][0]["delta"].',
    'Full inner logic: chunk = json.loads(line[6:]); delta = chunk["choices"][0]["delta"]; if "content" in delta: count += 1'
  ],
  solution: 'import json\n\ndef count_streamed_tokens(sse_lines):\n    count = 0\n    for line in sse_lines:\n        if not line.startswith("data: ") or line == "data: [DONE]":\n            continue\n        chunk = json.loads(line[6:])\n        delta = chunk["choices"][0]["delta"]\n        if "content" in delta:\n            count += 1\n    return count\n\ntest_lines = [\n    \'data: {"choices":[{"delta":{"role":"assistant"}}]}\',\n    \'data: {"choices":[{"delta":{"content":"Hello"}}]}\',\n    \'data: {"choices":[{"delta":{"content":" world"}}]}\',\n    \'data: {"choices":[{"delta":{"content":"!"}}]}\',\n    \'data: {"choices":[{"delta":{}}]}\',\n    \'data: [DONE]\'\n]\nprint(count_streamed_tokens(test_lines))',
  solutionExplanation: 'We skip lines that aren\'t data events and the [DONE] terminator. For each remaining line, we strip the "data: " prefix (first 6 characters), parse the JSON, and check whether the delta object contains a "content" key. Each content delta represents one streamed token fragment.',
  xpReward: 15,
}

With streaming under your belt, let’s handle the situations where things go wrong.

Error Handling: What Goes Wrong and How to Fix It

API calls fail. Networks drop. Rate limits kick in. Servers crash. I once had a batch job silently fail on 200 calls because I skipped the status check. Don’t make that mistake.

Here are the HTTP status codes you’ll see most often.

Status CodeMeaningWhat To Do
401Invalid API keyCheck your key. Regenerate if needed.
429Rate limit exceededWait and retry with backoff.
400Bad requestFix the request body. Check param names.
500Server errorRetry after a short delay. Not your fault.
503Service unavailableAPI is overloaded. Retry.

Let’s build a function that checks for errors and gives clear messages. handle_response() looks at the status code and either returns the parsed JSON or raises a clear error.

def handle_response(response):
    """Check API response for errors and return parsed JSON."""
    if response.status_code == 200:
        return response.json()

    error_msg = response.json().get("error", {}).get("message", "Unknown")

    if response.status_code == 401:
        raise PermissionError(f"Invalid API key: {error_msg}")
    elif response.status_code == 429:
        raise RuntimeError(f"Rate limited: {error_msg}")
    elif response.status_code == 400:
        raise ValueError(f"Bad request: {error_msg}")
    elif response.status_code >= 500:
        raise ConnectionError(f"Server error ({response.status_code}): {error_msg}")
    else:
        raise RuntimeError(f"API error {response.status_code}: {error_msg}")

Why use different error types? So your code can react the right way. Catch RuntimeError for rate limits and retry. Let ValueError for bad requests crash right away — retrying a bad request won’t fix it.

Retry with Exponential Backoff

Rate limits and server errors are short-lived. Wait a bit, try again, and they clear up. But retrying in a tight loop makes it worse — you’ll just pound the rate limit harder.

The fix is called exponential backoff. Wait 1 second, then 2, then 4, then 8 — doubling each time. Add some random noise so many clients don’t all retry at once.

The call_openai() function below tries up to max_retries times. On 429 or 5xx errors, it backs off with jitter. On 400 or 401, it fails right away — those won’t fix on their own.

import time
import random

def call_openai(payload, max_retries=5):
    """Call OpenAI API with exponential backoff retry."""
    for attempt in range(max_retries):
        response = requests.post(
            API_URL, headers=headers, json=payload
        )

        if response.status_code == 200:
            return response.json()

        if response.status_code in (400, 401):
            error_msg = response.json().get("error", {}).get("message", "")
            raise ValueError(f"Non-retryable error {response.status_code}: {error_msg}")

        if response.status_code in (429, 500, 503):
            delay = min(2 ** attempt + random.uniform(0, 1), 60)
            print(f"Attempt {attempt + 1} failed ({response.status_code}). Retrying in {delay:.1f}s...")
            time.sleep(delay)

    raise RuntimeError(f"Failed after {max_retries} attempts. Last status: {response.status_code}")

Here’s a test with a normal call. The function returns on the first try when things work. Retry only kicks in when something breaks.

result = call_openai({
    "model": "gpt-4o-mini",
    "messages": [{"role": "user", "content": "Say hello in French."}]
})
print(result["choices"][0]["message"]["content"])

The min(..., 60) caps the longest wait at 60 seconds. Without it, try 7 would wait 128 seconds — way too long. I usually cap at 30-60 seconds based on how fast the app needs to be.

TIP: The Retry-After header in 429 responses sometimes tells you exactly how many seconds to wait. In production, check for that header first and use its value instead of your calculated delay.

Cost Tracking: Know What You’re Spending

Every API call costs money. If you make hundreds of calls, you need to know what you’re spending. Good news: the API tells you how many tokens each call used. Just multiply by the price per token.

Current prices for the two most popular models (as of March 2026):

ModelInput (per 1M tokens)Output (per 1M tokens)
gpt-4o$2.50$10.00
gpt-4o-mini$0.15$0.60

The CostTracker class stores pricing per model, works out each call’s cost from the usage field, and keeps a running total. The calculate() method takes the JSON response and returns the cost in dollars.

class CostTracker:
    """Track cumulative OpenAI API costs."""

    PRICING = {
        "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
        "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    }

    def __init__(self):
        self.total_cost = 0.0
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.call_count = 0

    def calculate(self, response_data):
        """Calculate cost from API response and update totals."""
        model = response_data["model"]
        usage = response_data["usage"]

        pricing_key = None
        for key in self.PRICING:
            if key in model:
                pricing_key = key
                break

        if not pricing_key:
            print(f"Warning: no pricing for '{model}'")
            return 0.0

        rates = self.PRICING[pricing_key]
        input_cost = usage["prompt_tokens"] * rates["input"]
        output_cost = usage["completion_tokens"] * rates["output"]
        call_cost = input_cost + output_cost

        self.total_cost += call_cost
        self.total_input_tokens += usage["prompt_tokens"]
        self.total_output_tokens += usage["completion_tokens"]
        self.call_count += 1
        return call_cost

The summary() method prints a clean report:

def summary(self):
        """Print a cost summary."""
        print(f"API Calls: {self.call_count}")
        print(f"Input tokens: {self.total_input_tokens:,}")
        print(f"Output tokens: {self.total_output_tokens:,}")
        print(f"Total cost: ${self.total_cost:.6f}")

Let’s test it. We make a request, feed the response to the tracker, and print both the answer and cost.

tracker = CostTracker()

result = call_openai({
    "model": "gpt-4o-mini",
    "messages": [{"role": "user", "content": "What is the capital of Japan?"}]
})

cost = tracker.calculate(result)
print(f"Answer: {result['choices'][0]['message']['content']}")
print(f"This call cost: ${cost:.6f}")
tracker.summary()

With gpt-4o-mini at $0.15 per million input tokens, a 20-token prompt costs $0.000003. You’d need 300,000+ short calls to spend a dollar. But switch to gpt-4o with long chats, and costs jump fast.

Putting It All Together: A Robust Chat Function

Let’s tie everything into one function you can use in real apps. robust_chat() takes messages, output settings, and a cost tracker. It handles both streaming and normal modes. Errors get retried. Costs get logged.

Here’s the main function. It builds the payload, loops through retry tries, and sends the result to either the stream handler or the normal parser.

def robust_chat(
    messages,
    model="gpt-4o-mini",
    stream=False,
    cost_tracker=None,
    max_retries=3,
    **kwargs
):
    """Production-ready chat completion with retry and cost tracking."""
    payload = {
        "model": model,
        "messages": messages,
        "stream": stream,
        **kwargs
    }
    if stream:
        payload["stream_options"] = {"include_usage": True}

    for attempt in range(max_retries):
        try:
            resp = requests.post(
                API_URL, headers=headers, json=payload,
                stream=stream, timeout=60
            )
            if resp.status_code in (400, 401):
                err = resp.json().get("error", {}).get("message", "")
                raise ValueError(f"Error {resp.status_code}: {err}")
            if resp.status_code in (429, 500, 503):
                delay = min(2 ** attempt + random.uniform(0, 1), 30)
                print(f"Retry {attempt + 1}/{max_retries} in {delay:.1f}s...")
                time.sleep(delay)
                continue
            break  # success
        except requests.exceptions.Timeout:
            print(f"Timeout on attempt {attempt + 1}. Retrying...")
            continue
        except requests.exceptions.ConnectionError:
            time.sleep(2 ** attempt)
            continue
    else:
        raise RuntimeError(f"Failed after {max_retries} attempts")

    if stream:
        return _handle_stream(resp, cost_tracker, model)

    data = resp.json()
    if cost_tracker:
        cost_tracker.calculate(data)
    return data["choices"][0]["message"]["content"]

The stream handler is its own function. It grabs tokens as they come in, prints them live, and pulls usage data from the last chunk.

def _handle_stream(response, cost_tracker, model):
    """Process streaming response: print tokens, track cost."""
    full_text = ""
    usage_data = None

    for line in response.iter_lines():
        line = line.decode("utf-8")
        if not line.startswith("data: ") or line == "data: [DONE]":
            continue
        chunk = json.loads(line[6:])
        if chunk.get("usage"):
            usage_data = chunk["usage"]
        delta = chunk["choices"][0]["delta"]
        if "content" in delta:
            print(delta["content"], end="", flush=True)
            full_text += delta["content"]

    print()
    if cost_tracker and usage_data:
        cost_tracker.calculate({"model": model, "usage": usage_data})
    return full_text

Here’s the whole system in action. We create a tracker, make a streaming call with custom parameters, and check the running total.

tracker = CostTracker()

answer = robust_chat(
    messages=[
        {"role": "system", "content": "You are a helpful Python tutor."},
        {"role": "user", "content": "Explain list comprehensions in 3 sentences."}
    ],
    model="gpt-4o-mini",
    stream=True,
    cost_tracker=tracker,
    temperature=0.3,
    max_completion_tokens=150
)

print(f"\n--- Cost Report ---")
tracker.summary()

That’s a solid pattern for real apps. Retry handles short-lived failures. The tracker logs every call. Streaming gives users live feedback. And **kwargs passes any setting through without changing the function.


{
  type: 'exercise',
  id: 'cost-exercise',
  title: 'Exercise 3: Calculate API Cost',
  difficulty: 'beginner',
  exerciseType: 'write',
  instructions: 'Write a function called `estimate_cost` that takes a model name (string), prompt_tokens (int), and completion_tokens (int), and returns the cost in dollars. Use these rates: gpt-4o costs $2.50/1M input, $10.00/1M output. gpt-4o-mini costs $0.15/1M input, $0.60/1M output. Return 0.0 for unknown models.',
  starterCode: 'def estimate_cost(model, prompt_tokens, completion_tokens):\n    pricing = {\n        "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},\n        "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},\n    }\n    # YOUR CODE HERE: look up rates, calculate, return cost\n    pass\n\n# Test\nprint(estimate_cost("gpt-4o-mini", 100, 200))',
  testCases: [
    { id: 'tc1', input: 'print(f"{estimate_cost(\"gpt-4o-mini\", 100, 200):.7f}")', expectedOutput: '0.0001350', description: 'gpt-4o-mini: 100 input + 200 output tokens' },
    { id: 'tc2', input: 'print(f"{estimate_cost(\"gpt-4o\", 1000, 500):.6f}")', expectedOutput: '0.007500', description: 'gpt-4o: 1000 input + 500 output tokens' },
    { id: 'tc3', input: 'print(estimate_cost("unknown-model", 100, 100))', expectedOutput: '0.0', description: 'Unknown model returns 0.0', hidden: true }
  ],
  hints: [
    'Look up the model in the pricing dict. If not found, return 0.0. Otherwise, multiply prompt_tokens by the input rate and completion_tokens by the output rate, then sum them.',
    'rates = pricing.get(model); if not rates: return 0.0; return prompt_tokens * rates["input"] + completion_tokens * rates["output"]'
  ],
  solution: 'def estimate_cost(model, prompt_tokens, completion_tokens):\n    pricing = {\n        "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},\n        "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},\n    }\n    rates = pricing.get(model)\n    if not rates:\n        return 0.0\n    return prompt_tokens * rates["input"] + completion_tokens * rates["output"]\n\nprint(estimate_cost("gpt-4o-mini", 100, 200))',
  solutionExplanation: 'We use dict.get() to safely look up the model. For gpt-4o-mini with 100 input tokens: 100 * (0.15/1M) = $0.000015. With 200 output tokens: 200 * (0.60/1M) = $0.00012. Total: $0.000135.',
  xpReward: 15,
}

Common Mistakes and How to Fix Them

Mistake 1: Parsing JSON without checking the status code

Wrong:

response = requests.post(API_URL, headers=headers, json=payload)
content = response.json()["choices"][0]["message"]["content"]

Why it breaks: A 429 or 500 response has a different JSON shape — an error object, not choices. You’ll get a KeyError crash.

Fix:

response = requests.post(API_URL, headers=headers, json=payload)
if response.status_code != 200:
    print(f"Error {response.status_code}: {response.json()['error']['message']}")
else:
    content = response.json()["choices"][0]["message"]["content"]

Mistake 2: Retrying 401 errors

A 401 means your API key is bad or missing. Retrying won’t help. Your retry code should only retry short-lived errors (429, 500, 503) and fail fast on client errors (400, 401, 403).

Mistake 3: Using max_tokens with o-series models

The older max_tokens param breaks with models like o1 and o3-mini. These models use hidden thinking tokens that max_tokens doesn’t count. Use max_completion_tokens instead — it works with all models.

Mistake 4: Never trimming conversation history

Every call sends the full messages list. A 50-message chat can top 10,000 prompt tokens — that’s $0.025 per call with gpt-4o.

Fix: Keep a sliding window of recent messages, or summarize older ones into a single system message.

When NOT to Use Raw HTTP Requests

Raw HTTP gives you maximum control. But it’s not always the right call.

Use the SDK when you need function calling, structured output parsing, or the Assistants API. The SDK handles tool-call flows and multi-step tasks that’d take lots of code with raw HTTP.

Use LangChain or LiteLLM when you switch between providers (OpenAI, Anthropic, Google) a lot. These tools make the interface the same for all of them. One import swap changes the model.

Stick with raw HTTP when you want zero extra packages, full clarity, or you’re working with a tool the SDK doesn’t support. For learning, raw HTTP can’t be beat — you see every byte in and out.

TIP: Start with raw HTTP to understand the API. Move to the SDK once you’re comfortable. You’ll know what it’s doing under the hood, which makes debugging far easier.

Practice Exercise

Build a ChatSession class that tracks chat history, works with both streaming and normal modes, logs costs, and keeps only the last 10 messages plus the system prompt.

Click to see solution
class ChatSession:
    """Multi-turn conversation with cost tracking and history trimming."""

    def __init__(self, system_prompt, model="gpt-4o-mini", max_history=10):
        self.system_message = {"role": "system", "content": system_prompt}
        self.history = []
        self.model = model
        self.max_history = max_history
        self.tracker = CostTracker()

    def send(self, user_message, stream=False, **kwargs):
        self.history.append({"role": "user", "content": user_message})
        if len(self.history) > self.max_history:
            self.history = self.history[-self.max_history:]

        messages = [self.system_message] + self.history
        reply = robust_chat(
            messages=messages, model=self.model,
            stream=stream, cost_tracker=self.tracker, **kwargs
        )
        self.history.append({"role": "assistant", "content": reply})
        return reply

    def cost_report(self):
        self.tracker.summary()

This ties together multi-turn chat, cost tracking, and history management — the three skills that matter most for real apps.

Complete Code

Click to expand the full script (copy-paste and run)
# Complete code from: OpenAI API Crash Course
# Requires: pip install requests
# Python 3.9+
# Set OPENAI_API_KEY environment variable before running

import requests
import os
import json
import time
import random

# --- Setup ---
API_KEY = os.environ.get("OPENAI_API_KEY")
API_URL = "https://api.openai.com/v1/chat/completions"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# --- Cost Tracker ---
class CostTracker:
    PRICING = {
        "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
        "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    }

    def __init__(self):
        self.total_cost = 0.0
        self.total_input_tokens = 0
        self.total_output_tokens = 0
        self.call_count = 0

    def calculate(self, response_data):
        model = response_data["model"]
        usage = response_data["usage"]
        pricing_key = None
        for key in self.PRICING:
            if key in model:
                pricing_key = key
                break
        if not pricing_key:
            return 0.0
        rates = self.PRICING[pricing_key]
        input_cost = usage["prompt_tokens"] * rates["input"]
        output_cost = usage["completion_tokens"] * rates["output"]
        call_cost = input_cost + output_cost
        self.total_cost += call_cost
        self.total_input_tokens += usage["prompt_tokens"]
        self.total_output_tokens += usage["completion_tokens"]
        self.call_count += 1
        return call_cost

    def summary(self):
        print(f"API Calls: {self.call_count}")
        print(f"Input tokens: {self.total_input_tokens:,}")
        print(f"Output tokens: {self.total_output_tokens:,}")
        print(f"Total cost: ${self.total_cost:.6f}")

# --- Retry Logic ---
def call_openai(payload, max_retries=5):
    for attempt in range(max_retries):
        response = requests.post(API_URL, headers=headers, json=payload)
        if response.status_code == 200:
            return response.json()
        if response.status_code in (400, 401):
            error_msg = response.json().get("error", {}).get("message", "")
            raise ValueError(f"Non-retryable: {response.status_code}: {error_msg}")
        if response.status_code in (429, 500, 503):
            delay = min(2 ** attempt + random.uniform(0, 1), 60)
            print(f"Attempt {attempt + 1} failed ({response.status_code}). Retrying in {delay:.1f}s...")
            time.sleep(delay)
    raise RuntimeError(f"Failed after {max_retries} attempts")

# --- Stream Handler ---
def _handle_stream(response, cost_tracker, model):
    full_text = ""
    usage_data = None
    for line in response.iter_lines():
        line = line.decode("utf-8")
        if not line.startswith("data: ") or line == "data: [DONE]":
            continue
        chunk = json.loads(line[6:])
        if chunk.get("usage"):
            usage_data = chunk["usage"]
        delta = chunk["choices"][0]["delta"]
        if "content" in delta:
            print(delta["content"], end="", flush=True)
            full_text += delta["content"]
    print()
    if cost_tracker and usage_data:
        cost_tracker.calculate({"model": model, "usage": usage_data})
    return full_text

# --- Robust Chat Function ---
def robust_chat(messages, model="gpt-4o-mini", stream=False,
                cost_tracker=None, max_retries=3, **kwargs):
    payload = {"model": model, "messages": messages, "stream": stream, **kwargs}
    if stream:
        payload["stream_options"] = {"include_usage": True}
    for attempt in range(max_retries):
        try:
            resp = requests.post(
                API_URL, headers=headers, json=payload,
                stream=stream, timeout=60
            )
            if resp.status_code in (400, 401):
                err = resp.json().get("error", {}).get("message", "")
                raise ValueError(f"Error {resp.status_code}: {err}")
            if resp.status_code in (429, 500, 503):
                delay = min(2 ** attempt + random.uniform(0, 1), 30)
                print(f"Retry {attempt + 1}/{max_retries} in {delay:.1f}s...")
                time.sleep(delay)
                continue
            break
        except requests.exceptions.Timeout:
            continue
        except requests.exceptions.ConnectionError:
            time.sleep(2 ** attempt)
            continue
    else:
        raise RuntimeError(f"Failed after {max_retries} attempts")
    if stream:
        return _handle_stream(resp, cost_tracker, model)
    data = resp.json()
    if cost_tracker:
        cost_tracker.calculate(data)
    return data["choices"][0]["message"]["content"]

# --- Demo ---
if __name__ == "__main__":
    tracker = CostTracker()

    print("=== Basic Call ===")
    result = call_openai({
        "model": "gpt-4o-mini",
        "messages": [{"role": "user", "content": "What is gradient descent in one sentence?"}]
    })
    tracker.calculate(result)
    print(result["choices"][0]["message"]["content"])

    print("\n=== Streaming Call ===")
    answer = robust_chat(
        messages=[{"role": "user", "content": "Explain APIs in 3 sentences."}],
        stream=True, cost_tracker=tracker, temperature=0.3
    )

    print("\n=== Cost Summary ===")
    tracker.summary()

Frequently Asked Questions

How do I use the OpenAI API without the official Python SDK?

Send HTTP POST requests to https://api.openai.com/v1/chat/completions using Python’s requests library. Set your API key in the Authorization: Bearer <key> header and put model, messages, and params as JSON in the body.

response = requests.post(url, headers={"Authorization": f"Bearer {key}"}, json=payload)
print(response.json()["choices"][0]["message"]["content"])

What’s the difference between max_tokens and max_completion_tokens?

max_tokens caps output length but is old and on its way out. max_completion_tokens is the new one. It works with all models, even o-series which use hidden thinking tokens. Always use max_completion_tokens in new code.

Does streaming cost more than non-streaming?

No. The cost is identical. The same model generates the same tokens. Streaming just delivers them one at a time instead of all at once. You won’t pay a penny more.

How do I avoid hitting rate limits?

Three ways: (1) add backoff retry logic so your code waits and retries, (2) space out batch calls with short pauses, and (3) use gpt-4o-mini for simple tasks — it has higher limits and costs 16x less per token than gpt-4o.

Can I use this code with Azure OpenAI?

Almost. The JSON body and reply format are the same. You change the URL to your Azure resource path. Use an api-key header instead of Authorization: Bearer. The rest stays the same.

NOTE: OpenAI is rolling out a newer Responses API next to Chat Completions. It has built-in tool handling and cleaner streaming events. Chat Completions isn’t going away — it’s still the base. But if you start a new project, check the Responses API too. All you learned here about params, errors, and retry still holds.

Summary

You’ve built a full toolkit for the OpenAI API at the HTTP level:

  • Raw API calls with requests.post() — no SDK needed
  • Multi-turn chats via the messages list
  • Output control with temperature, top_p, max_completion_tokens, stop, and penalties
  • Live streaming using Server-Sent Events
  • Error handling that tells retryable from fatal errors apart
  • Exponential backoff with jitter for auto retry
  • Cost tracking that logs tokens and dollar cost per call
  • A ready-to-use robust_chat() function that ties it all up

The SDK wraps all of this in a cleaner package. But now you know what’s going on inside. When the SDK acts up, you can drop to raw HTTP and check every byte on the wire.

References

  1. OpenAI API Reference — Chat Completions. Link
  2. OpenAI Streaming Guide. Link
  3. OpenAI Cookbook — How to Stream Completions. Link
  4. OpenAI Rate Limits and Best Practices. Link
  5. OpenAI Cookbook — How to Handle Rate Limits. Link
  6. OpenAI API Pricing (March 2026). Link
  7. OpenAI Python SDK — GitHub Repository. Link
  8. Python requests Library Documentation. Link
Free Course
Master Core Python — Your First Step into AI/ML

Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.

Start Free Course
Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Free Callback - Limited Slots
Not Sure Which Course to Start With?
Talk to our AI Counsellors and Practitioners. We'll help you clear all your questions for your background and goals, bridging the gap between your current skills and a career in AI.
10-digit mobile number
📞
Thank You!
We'll Call You Soon!
Our learning advisor will reach out within 24 hours.
(Check your inbox too — we've sent a confirmation)
⚡ Before you go

Python.
SQL. NumPy.
All free.

Get the exact 10-course programming foundation that Data Science professionals use.

🐍
Core Python — from first line to expert level
📈
NumPy & Pandas — the #1 libraries every DS job needs
🗃️
SQL Levels I–III — basics to Window Functions
📄
Real industry data — Jupyter notebooks included
R A M S K
57,000+ students
★★★★★ Rated 4.9/5
⚡ Before you go
Python. SQL.
All Free.
R A M S K
57,000+ students  ★★★★★ 4.9/5
Get Free Access Now
10 courses. Real projects. Zero cost. No credit card.
New learners enrolling right now
🔒 100% free ☕ No spam, ever ✓ Instant access
🚀
You're in!
Check your inbox for your access link.
(Check Promotions or Spam if you don't see it)
Or start your first course right now:
Start Free Course →
Scroll to Top
Scroll to Top
Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science