Menu

Build a Multi-Provider LLM Toolkit (Python Project)

Build a zero-dep LLM toolkit in Python — unified client for OpenAI, Claude, Gemini, Ollama with cost tracking, retry, streaming, and structured output.

Written by Selva Prabhakaran | 51 min read

One client class, four providers, built-in cost tracking, streaming, retries, and structured output — from scratch.

Interactive Code Blocks — The Python code blocks in this article are runnable. Click the Run button to execute them right in your browser.

You’re calling OpenAI today. Tomorrow your team wants Claude for long documents. Next week someone asks about Gemini because it’s cheaper. Suddenly you’ve got three different API integrations, three response formats, and three places where things break differently.

What if one Python class handled all of them? Same .chat() call, same response object, same error handling — regardless of which provider runs the request.

That’s what we’re building.

Before we write any code, here’s how the pieces connect. We start with a base interface — an abstract class that defines what every provider must do. Then we build four provider adapters: OpenAI, Anthropic, Google, and Ollama.

Each adapter translates the universal .chat() call into that provider’s HTTP format. It returns a unified response object. Your app code never cares which provider answered.

On top of the adapters, we layer a token counter. It estimates usage before you send a request. It also tracks actual usage from the response. Cost tracking plugs into that counter — it multiplies counts by each provider’s pricing and keeps a running total.

The retry/fallback system wraps individual calls with exponential backoff. If a provider stays down, it routes to the next one in your priority list.

Streaming lets you get tokens back word-by-word through a generator. Structured output parsing extracts typed Python objects from raw LLM text. And conversation management stores message history so multi-turn chats just work.

Each layer builds on the one before it. By the end, you’ll have a toolkit you can drop into any Python project.

LLM Toolkit Setup and Configuration

Every provider needs an API key. We’ll store them in environment variables and load them through a central config.

The toolkit also needs a pricing table. Each provider charges different rates per token.

Here’s the import block and configuration. We use os.getenv for API keys, dataclasses for clean structures, and typing for type hints. The PRICING dictionary maps each provider-model pair to its cost per million tokens.

import os
import json
import time
import random
import hashlib
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Generator, Optional
from datetime import datetime
import urllib.request
import urllib.error

# API keys from environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")

# Pricing per million tokens (USD)
PRICING = {
    "openai/gpt-4o": {"input": 2.50, "output": 10.00},
    "openai/gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "anthropic/claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
    "anthropic/claude-haiku-3.5": {"input": 0.80, "output": 4.00},
    "google/gemini-2.0-flash": {"input": 0.10, "output": 0.40},
    "google/gemini-2.5-pro": {"input": 1.25, "output": 10.00},
    "ollama/llama3": {"input": 0.00, "output": 0.00},
}

print("Config loaded.")
print(f"Providers configured: {len(PRICING)} models")
print(f"OpenAI key present: {bool(OPENAI_API_KEY)}")

Running this prints:

python
Config loaded.
Providers configured: 7 models
OpenAI key present: False

TIP Keep API keys out of your source code. Use .env files with python-dotenv or export them in your shell. The toolkit works fine with empty keys — it just fails gracefully when you try to call that provider.

The Unified Response Object

Every provider returns data in a different shape. OpenAI nests text inside choices[0].message.content. Anthropic puts it in content[0].text. Gemini uses candidates[0].content.parts[0].text.

Your application shouldn’t care about any of that.

We define one LLMResponse dataclass that every adapter returns. It holds the generated text, token counts, model name, latency, and raw provider data for debugging.

The cost property calculates the dollar cost from our pricing table. The summary() method gives a one-line overview.

@dataclass
class LLMResponse:
    """Unified response from any LLM provider."""
    text: str
    model: str
    provider: str
    input_tokens: int
    output_tokens: int
    latency_ms: float
    raw: dict = field(default_factory=dict)
    finish_reason: str = "stop"

    @property
    def total_tokens(self) -> int:
        return self.input_tokens + self.output_tokens

    @property
    def cost(self) -> float:
        key = f"{self.provider}/{self.model}"
        if key not in PRICING:
            return 0.0
        p = PRICING[key]
        input_cost = (self.input_tokens / 1_000_000) * p["input"]
        output_cost = (self.output_tokens / 1_000_000) * p["output"]
        return input_cost + output_cost

    def summary(self) -> str:
        return (
            f"[{self.provider}/{self.model}] "
            f"{self.total_tokens} tokens, "
            f"{self.latency_ms:.0f}ms, "
            f"${self.cost:.6f}"
        )

Let’s test it. We create a response with 10 input tokens and 5 output tokens on gpt-4o. The cost should be: 10 tokens at $2.50/M = $0.000025, plus 5 tokens at $10.00/M = $0.000050. Total: $0.000075.

resp = LLMResponse(
    text="Hello!", model="gpt-4o", provider="openai",
    input_tokens=10, output_tokens=5, latency_ms=230.0
)
print(resp.summary())
print(f"Cost breakdown: ${resp.cost:.6f}")

Output:

python
[openai/gpt-4o] 15 tokens, 230ms, $0.000075
Cost breakdown: $0.000075

The math checks out. Every response carries its own cost. No external tracking needed.

Quick check: What would the cost be with gpt-4o-mini instead? Same token counts. Check the pricing table and calculate it before moving on.

(Answer: 10 * $0.15/M + 5 * $0.60/M = $0.0000045 — about 17x cheaper.)


{
type: ‘exercise’,
id: ‘response-cost-ex’,
title: ‘Exercise 1: Create and Compare LLM Responses’,
difficulty: ‘beginner’,
exerciseType: ‘write’,
instructions: ‘Create two LLMResponse objects — one for anthropic/claude-sonnet-4-20250514 with 100 input tokens and 50 output tokens, and one for google/gemini-2.0-flash with the same counts. Print each summary and then print which one is cheaper.’,
starterCode: ‘# Create a Claude Sonnet response\nclaude_resp = LLMResponse(\n text=”Response from Claude”,\n model=”claude-sonnet-4-20250514″,\n provider=”anthropic”,\n input_tokens=100,\n output_tokens=50,\n latency_ms=450.0,\n)\n\n# Create a Gemini Flash response\ngemini_resp = # YOUR CODE HERE\n\nprint(claude_resp.summary())\nprint(gemini_resp.summary())\n\n# Print which is cheaper\ncheaper = “gemini” if gemini_resp.cost < claude_resp.cost else “claude”\nprint(f”Cheaper: {cheaper}”)’,
testCases: [
{ id: ‘tc1’, input: ‘print(claude_resp.summary())’, expectedOutput: ‘[anthropic/claude-sonnet-4-20250514] 150 tokens, 450ms, $0.001050’, description: ‘Claude response summary should match’ },
{ id: ‘tc2’, input: ‘print(“Cheaper:”, “gemini” if gemini_resp.cost < claude_resp.cost else “claude”)’, expectedOutput: ‘Cheaper: gemini’, description: ‘Gemini should be cheaper’ },
],
hints: [
‘Use the same LLMResponse structure but change model to “gemini-2.0-flash” and provider to “google”‘,
‘gemini_resp = LLMResponse(text=”Response from Gemini”, model=”gemini-2.0-flash”, provider=”google”, input_tokens=100, output_tokens=50, latency_ms=200.0)’,
],
solution: ‘claude_resp = LLMResponse(text=”Response from Claude”, model=”claude-sonnet-4-20250514″, provider=”anthropic”, input_tokens=100, output_tokens=50, latency_ms=450.0)\ngemini_resp = LLMResponse(text=”Response from Gemini”, model=”gemini-2.0-flash”, provider=”google”, input_tokens=100, output_tokens=50, latency_ms=200.0)\nprint(claude_resp.summary())\nprint(gemini_resp.summary())\ncheaper = “gemini” if gemini_resp.cost < claude_resp.cost else “claude”\nprint(f”Cheaper: {cheaper}”)’,
solutionExplanation: ‘Claude Sonnet costs $3.00/M input + $15.00/M output. For 100 + 50 tokens, that is $0.000300 + $0.000750 = $0.001050. Gemini Flash costs $0.10/M + $0.40/M, giving $0.000010 + $0.000020 = $0.000030. Gemini is about 35x cheaper for the same request.’,
xpReward: 15,
}


The Base Provider Interface

This is where the “unified” part comes from. We define an abstract base class called BaseLLMProvider. Every adapter inherits from it.

The base class enforces two methods: chat() for normal requests and chat_stream() for streaming. We also define custom error classes — LLMError for general failures, RateLimitError for 429s, and AuthenticationError for bad API keys. These let the retry system distinguish between retryable and fatal errors.

class LLMError(Exception):
    """Base error for LLM operations."""
    def __init__(self, message: str, provider: str = "", status_code: int = 0):
        self.provider = provider
        self.status_code = status_code
        super().__init__(message)

class RateLimitError(LLMError):
    """Raised when provider returns 429."""
    pass

class AuthenticationError(LLMError):
    """Raised when API key is invalid."""
    pass

The base class itself is short. It stores the API key, default model, and provider name. The abstract methods force each adapter to implement its own chat() and chat_stream().

class BaseLLMProvider(ABC):
    """Abstract base for all LLM provider adapters."""

    def __init__(self, api_key: str, default_model: str):
        self.api_key = api_key
        self.default_model = default_model
        self.provider_name = self.__class__.__name__.lower().replace("provider", "")

    @abstractmethod
    def chat(self, messages: list[dict], model: str = "",
             temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
        """Send a chat request and return a unified response."""
        ...

    @abstractmethod
    def chat_stream(self, messages: list[dict], model: str = "",
                    temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
        """Stream response tokens as a generator."""
        ...

    def _get_model(self, model: str) -> str:
        return model or self.default_model

print("Base classes defined.")
print(f"LLMError subclasses: {[c.__name__ for c in LLMError.__subclasses__()]}")

This gives us:

python
Base classes defined.
LLMError subclasses: ['RateLimitError', 'AuthenticationError']

KEY INSIGHT The base class is the contract. When a new provider appears next month, you write one adapter class. Every feature — retry, fallback, cost tracking, streaming — works automatically because they all operate on this shared interface.

Building Multi-Provider LLM Adapters

Each adapter translates the universal messages format into the provider’s specific HTTP request. Before diving into code, here’s how the four providers differ:

Provider Endpoint Auth Header System Message Response Text Path
OpenAI /v1/chat/completions Authorization: Bearer In messages array choices[0].message.content
Anthropic /v1/messages x-api-key Separate system field content[0].text
Google /:model:generateContent Query param ?key= systemInstruction field candidates[0]...parts[0].text
Ollama /api/chat None (local) In messages array message.content

Four APIs, four auth methods, four response shapes. The adapters hide all of this behind one chat() call.

OpenAI Adapter

The OpenAIProvider sends a POST request to https://api.openai.com/v1/chat/completions. It packs messages, model, temperature, and max_tokens into a JSON body. The response includes token counts in usage and generated text in choices[0].message.content.

First, the payload construction and HTTP call. Notice how error handling maps HTTP status codes to our custom error types — 429 becomes RateLimitError, 401 becomes AuthenticationError.

class OpenAIProvider(BaseLLMProvider):
    """Adapter for OpenAI's chat completions API."""

    BASE_URL = "https://api.openai.com/v1/chat/completions"

    def __init__(self, api_key: str = "", default_model: str = "gpt-4o-mini"):
        super().__init__(api_key or OPENAI_API_KEY, default_model)

    def chat(self, messages: list[dict], model: str = "",
             temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
        model = self._get_model(model)
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens,
        }
        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            self.BASE_URL, data=data,
            headers={"Authorization": f"Bearer {self.api_key}",
                     "Content-Type": "application/json"},
            method="POST",
        )
        start = time.time()
        try:
            with urllib.request.urlopen(req, timeout=60) as resp:
                body = json.loads(resp.read().decode("utf-8"))
        except urllib.error.HTTPError as e:
            if e.code == 429:
                raise RateLimitError("Rate limited by OpenAI", "openai", 429)
            if e.code == 401:
                raise AuthenticationError("Invalid OpenAI key", "openai", 401)
            raise LLMError(f"OpenAI returned {e.code}", "openai", e.code)
        latency = (time.time() - start) * 1000

        choice = body["choices"][0]
        usage = body.get("usage", {})
        return LLMResponse(
            text=choice["message"]["content"],
            model=model, provider="openai",
            input_tokens=usage.get("prompt_tokens", 0),
            output_tokens=usage.get("completion_tokens", 0),
            latency_ms=latency, raw=body,
            finish_reason=choice.get("finish_reason", "stop"),
        )

The streaming method adds "stream": True to the payload and reads Server-Sent Events (SSE) line by line. Each line starting with data: carries a JSON chunk with a delta containing the next token.

    def chat_stream(self, messages: list[dict], model: str = "",
                    temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
        model = self._get_model(model)
        payload = {
            "model": model, "messages": messages,
            "temperature": temperature, "max_tokens": max_tokens,
            "stream": True,
        }
        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            self.BASE_URL, data=data,
            headers={"Authorization": f"Bearer {self.api_key}",
                     "Content-Type": "application/json"},
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=120) as resp:
            for line in resp:
                line = line.decode("utf-8").strip()
                if not line.startswith("data: "):
                    continue
                chunk = line[6:]
                if chunk == "[DONE]":
                    break
                delta = json.loads(chunk)["choices"][0].get("delta", {})
                if "content" in delta:
                    yield delta["content"]

print("OpenAI adapter ready.")

Result:

python
OpenAI adapter ready.

Anthropic (Claude) Adapter

Claude’s API differs in three ways. First, the system message goes in a separate system field — not in the messages array. Second, auth uses x-api-key instead of Authorization: Bearer. Third, the response text lives in content[0].text instead of choices[0].message.content.

The _split_system helper extracts the system message before building the payload.

class AnthropicProvider(BaseLLMProvider):
    """Adapter for Anthropic's Claude messages API."""

    BASE_URL = "https://api.anthropic.com/v1/messages"

    def __init__(self, api_key: str = "", default_model: str = "claude-haiku-3.5"):
        super().__init__(api_key or ANTHROPIC_API_KEY, default_model)

    def _split_system(self, messages: list[dict]) -> tuple[str, list[dict]]:
        system = ""
        filtered = []
        for m in messages:
            if m["role"] == "system":
                system = m["content"]
            else:
                filtered.append(m)
        return system, filtered

    def chat(self, messages: list[dict], model: str = "",
             temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
        model = self._get_model(model)
        system, msgs = self._split_system(messages)
        payload = {"model": model, "messages": msgs,
                   "temperature": temperature, "max_tokens": max_tokens}
        if system:
            payload["system"] = system
        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            self.BASE_URL, data=data,
            headers={"x-api-key": self.api_key,
                     "anthropic-version": "2023-06-01",
                     "Content-Type": "application/json"},
            method="POST",
        )
        start = time.time()
        try:
            with urllib.request.urlopen(req, timeout=60) as resp:
                body = json.loads(resp.read().decode("utf-8"))
        except urllib.error.HTTPError as e:
            if e.code == 429:
                raise RateLimitError("Rate limited", "anthropic", 429)
            if e.code == 401:
                raise AuthenticationError("Invalid key", "anthropic", 401)
            raise LLMError(f"Anthropic returned {e.code}", "anthropic", e.code)
        latency = (time.time() - start) * 1000

        usage = body.get("usage", {})
        return LLMResponse(
            text=body["content"][0]["text"], model=model,
            provider="anthropic",
            input_tokens=usage.get("input_tokens", 0),
            output_tokens=usage.get("output_tokens", 0),
            latency_ms=latency, raw=body,
            finish_reason=body.get("stop_reason", "stop"),
        )

Streaming for Claude uses event types instead of [DONE]. We look for content_block_delta events and extract text from the delta.

    def chat_stream(self, messages: list[dict], model: str = "",
                    temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
        model = self._get_model(model)
        system, msgs = self._split_system(messages)
        payload = {"model": model, "messages": msgs,
                   "temperature": temperature, "max_tokens": max_tokens,
                   "stream": True}
        if system:
            payload["system"] = system
        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            self.BASE_URL, data=data,
            headers={"x-api-key": self.api_key,
                     "anthropic-version": "2023-06-01",
                     "Content-Type": "application/json"},
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=120) as resp:
            for line in resp:
                line = line.decode("utf-8").strip()
                if not line.startswith("data: "):
                    continue
                parsed = json.loads(line[6:])
                if parsed.get("type") == "content_block_delta":
                    yield parsed["delta"].get("text", "")

print("Anthropic adapter ready.")
python
Anthropic adapter ready.

Google (Gemini) Adapter

Google takes yet another approach. Messages become contents with a parts array. The API key goes as a URL query parameter. The system message sits in a systemInstruction field. The response nests text inside candidates[0].content.parts[0].text.

class GoogleProvider(BaseLLMProvider):
    """Adapter for Google's Gemini API."""

    BASE_URL = "https://generativelanguage.googleapis.com/v1beta/models"

    def __init__(self, api_key: str = "", default_model: str = "gemini-2.0-flash"):
        super().__init__(api_key or GOOGLE_API_KEY, default_model)

    def _convert_messages(self, messages: list[dict]) -> tuple[str, list[dict]]:
        system = ""
        contents = []
        for m in messages:
            if m["role"] == "system":
                system = m["content"]
            else:
                role = "user" if m["role"] == "user" else "model"
                contents.append({"role": role, "parts": [{"text": m["content"]}]})
        return system, contents

    def chat(self, messages: list[dict], model: str = "",
             temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
        model = self._get_model(model)
        system, contents = self._convert_messages(messages)
        url = f"{self.BASE_URL}/{model}:generateContent?key={self.api_key}"
        payload: dict[str, Any] = {
            "contents": contents,
            "generationConfig": {"temperature": temperature,
                                 "maxOutputTokens": max_tokens},
        }
        if system:
            payload["systemInstruction"] = {"parts": [{"text": system}]}
        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            url, data=data,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        start = time.time()
        try:
            with urllib.request.urlopen(req, timeout=60) as resp:
                body = json.loads(resp.read().decode("utf-8"))
        except urllib.error.HTTPError as e:
            if e.code == 429:
                raise RateLimitError("Rate limited", "google", 429)
            if e.code in (401, 403):
                raise AuthenticationError("Invalid key", "google", e.code)
            raise LLMError(f"Google returned {e.code}", "google", e.code)
        latency = (time.time() - start) * 1000

        candidate = body["candidates"][0]
        usage = body.get("usageMetadata", {})
        return LLMResponse(
            text=candidate["content"]["parts"][0]["text"],
            model=model, provider="google",
            input_tokens=usage.get("promptTokenCount", 0),
            output_tokens=usage.get("candidatesTokenCount", 0),
            latency_ms=latency, raw=body,
            finish_reason=candidate.get("finishReason", "STOP").lower(),
        )

    def chat_stream(self, messages: list[dict], model: str = "",
                    temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
        model = self._get_model(model)
        system, contents = self._convert_messages(messages)
        url = (f"{self.BASE_URL}/{model}:streamGenerateContent"
               f"?key={self.api_key}&alt=sse")
        payload: dict[str, Any] = {
            "contents": contents,
            "generationConfig": {"temperature": temperature,
                                 "maxOutputTokens": max_tokens},
        }
        if system:
            payload["systemInstruction"] = {"parts": [{"text": system}]}
        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            url, data=data,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=120) as resp:
            for line in resp:
                line = line.decode("utf-8").strip()
                if not line.startswith("data: "):
                    continue
                parsed = json.loads(line[6:])
                parts = (parsed.get("candidates", [{}])[0]
                         .get("content", {}).get("parts", []))
                for part in parts:
                    if "text" in part:
                        yield part["text"]

print("Google Gemini adapter ready.")
python
Google Gemini adapter ready.

Ollama (Local) Adapter

Ollama runs on your machine. No API key, no cost, no rate limits. It uses an OpenAI-compatible format at localhost:11434. The main quirk: temperature and token limits go inside an options object.

class OllamaProvider(BaseLLMProvider):
    """Adapter for locally running Ollama models."""

    def __init__(self, base_url: str = "", default_model: str = "llama3"):
        self.base_url = base_url or OLLAMA_BASE_URL
        super().__init__("", default_model)

    def chat(self, messages: list[dict], model: str = "",
             temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
        model = self._get_model(model)
        url = f"{self.base_url}/api/chat"
        payload = {
            "model": model, "messages": messages,
            "options": {"temperature": temperature,
                        "num_predict": max_tokens},
            "stream": False,
        }
        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            url, data=data,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        start = time.time()
        try:
            with urllib.request.urlopen(req, timeout=120) as resp:
                body = json.loads(resp.read().decode("utf-8"))
        except urllib.error.URLError:
            raise LLMError(
                "Ollama not running — start with 'ollama serve'",
                "ollama", 0,
            )
        latency = (time.time() - start) * 1000
        return LLMResponse(
            text=body["message"]["content"],
            model=model, provider="ollama",
            input_tokens=body.get("prompt_eval_count", 0),
            output_tokens=body.get("eval_count", 0),
            latency_ms=latency, raw=body,
        )

    def chat_stream(self, messages: list[dict], model: str = "",
                    temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
        model = self._get_model(model)
        url = f"{self.base_url}/api/chat"
        payload = {
            "model": model, "messages": messages,
            "options": {"temperature": temperature,
                        "num_predict": max_tokens},
            "stream": True,
        }
        data = json.dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            url, data=data,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=120) as resp:
            for line in resp:
                chunk = json.loads(line.decode("utf-8"))
                if not chunk.get("done", False):
                    yield chunk["message"]["content"]

print("Ollama adapter ready.")
python
Ollama adapter ready.

Provider Registry

We need a way to look up any provider by name. The PROVIDER_REGISTRY dictionary maps string keys to classes. The get_provider() function creates adapters on the fly.

PROVIDER_REGISTRY = {
    "openai": OpenAIProvider,
    "anthropic": AnthropicProvider,
    "google": GoogleProvider,
    "ollama": OllamaProvider,
}

def get_provider(name: str, **kwargs) -> BaseLLMProvider:
    """Create a provider adapter by name."""
    if name not in PROVIDER_REGISTRY:
        available = list(PROVIDER_REGISTRY.keys())
        raise ValueError(f"Unknown provider: {name}. Options: {available}")
    return PROVIDER_REGISTRY[name](**kwargs)

print(f"Registry: {list(PROVIDER_REGISTRY.keys())}")
oai = get_provider("openai")
print(f"Created: {oai.provider_name}, default model: {oai.default_model}")

Output:

python
Registry: ['openai', 'anthropic', 'google', 'ollama']
Created: openai, default model: gpt-4o-mini

KEY INSIGHT The registry is what makes this a toolkit, not a script. Adding a fifth provider means writing one class and one line in the dictionary. Nothing else changes.

Token Counting and Cost Tracking

Knowing how many tokens a message will use — before you send it — helps you pick the right model and avoid bill surprises.

We can’t run a provider’s actual tokenizer in the browser. So we use a heuristic: roughly 4 characters per token for English text. It’s not exact, but it’s close enough for budget estimation.

The TokenTracker class does three things. It estimates token counts before a request, records actual usage from each response, and gives you a spending summary broken down by provider.

class TokenTracker:
    """Estimates tokens, tracks usage, and reports costs."""

    def __init__(self):
        self.history: list[dict] = []
        self.total_cost: float = 0.0
        self.total_input_tokens: int = 0
        self.total_output_tokens: int = 0

    @staticmethod
    def estimate_tokens(text: str) -> int:
        """Rough estimate: ~4 chars per token for English."""
        return max(1, len(text) // 4)

    def estimate_messages(self, messages: list[dict]) -> int:
        """Estimate token count for a message list."""
        total = 0
        for m in messages:
            total += 4  # message overhead
            total += self.estimate_tokens(m.get("content", ""))
        return total

    def record(self, response: LLMResponse) -> None:
        """Record a completed request's usage."""
        self.total_input_tokens += response.input_tokens
        self.total_output_tokens += response.output_tokens
        self.total_cost += response.cost
        self.history.append({
            "provider": response.provider,
            "model": response.model,
            "input_tokens": response.input_tokens,
            "output_tokens": response.output_tokens,
            "cost": response.cost,
            "latency_ms": response.latency_ms,
            "timestamp": datetime.now().isoformat(),
        })

    def summary(self) -> str:
        """Return a spending summary."""
        lines = [f"Total requests: {len(self.history)}"]
        lines.append(f"Total tokens: {self.total_input_tokens + self.total_output_tokens:,}")
        lines.append(f"Total cost: ${self.total_cost:.4f}")
        by_provider: dict[str, float] = {}
        for h in self.history:
            key = h["provider"]
            by_provider[key] = by_provider.get(key, 0) + h["cost"]
        for prov, cost in sorted(by_provider.items()):
            lines.append(f"  {prov}: ${cost:.4f}")
        return "\n".join(lines)

Let’s test the estimation and recording flow. We’ll estimate tokens for a sample message, then simulate recording a response.

tracker = TokenTracker()
sample_messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Explain gradient descent in two sentences."},
]
estimated = tracker.estimate_messages(sample_messages)
print(f"Estimated input tokens: {estimated}")

fake_resp = LLMResponse(
    text="Gradient descent is an optimization algorithm.",
    model="gpt-4o-mini", provider="openai",
    input_tokens=22, output_tokens=8, latency_ms=340.0,
)
tracker.record(fake_resp)
print(tracker.summary())

This prints:

python
Estimated input tokens: 24
Total requests: 1
Total tokens: 30
Total cost: $0.0000
  openai: $0.0000

The cost shows $0.0000 because gpt-4o-mini at $0.15/M input and $0.60/M output — 30 tokens barely registers. At scale, these fractions add up fast.

Retry with Exponential Backoff and Fallback

API calls fail. Servers go down. Rate limits kick in. Your toolkit needs two layers of defense: retry the same provider, then fall back to a different one.

Exponential Backoff

The RetryConfig dataclass controls retry behavior. The retry_with_backoff function wraps any callable. On each failure, it doubles the wait time. Random jitter prevents the “thundering herd” — where many clients retry at the same moment and overload the server again.

One important detail: we never retry AuthenticationError. If your API key is wrong, waiting won’t fix it.

@dataclass
class RetryConfig:
    """Configuration for retry behavior."""
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 30.0
    retryable_errors: tuple = (RateLimitError, LLMError)
    jitter: bool = True

def retry_with_backoff(fn, config: RetryConfig = RetryConfig()):
    """Call fn() with exponential backoff on failure."""
    last_error = None
    for attempt in range(config.max_retries + 1):
        try:
            return fn()
        except config.retryable_errors as e:
            last_error = e
            if isinstance(e, AuthenticationError):
                raise  # Don't retry bad credentials
            if attempt < config.max_retries:
                delay = min(config.base_delay * (2 ** attempt), config.max_delay)
                if config.jitter:
                    delay = random.uniform(0, delay)
                print(f"  Retry {attempt + 1}/{config.max_retries} "
                      f"after {delay:.1f}s — {e}")
                time.sleep(delay)
    raise last_error

Let’s see it work. We’ll create a function that fails twice, then succeeds on the third attempt. With jitter=False, the delays double predictably: 0.1s, then 0.2s.

call_count = 0

def flaky_function():
    global call_count
    call_count += 1
    if call_count < 3:
        raise LLMError(f"Simulated failure #{call_count}", "test", 500)
    return f"Success on attempt {call_count}"

call_count = 0
config = RetryConfig(max_retries=3, base_delay=0.1, jitter=False)
result = retry_with_backoff(flaky_function, config)
print(f"Result: {result}")

Output:

python
  Retry 1/3 after 0.1s — Simulated failure #1
  Retry 2/3 after 0.2s — Simulated failure #2
Result: Success on attempt 3

Predict the output: What if call_count < 4 instead of < 3, with max_retries=2? Think about it.

The function would fail three times. But we only allow two retries — three total attempts. The last LLMError would be raised because all attempts are used up.

WARNING Never retry AuthenticationError. A bad API key won’t fix itself with time. You’ll burn your retry budget and delay the real error by 15+ seconds.

Fallback Chain

The FallbackChain takes a list of providers in priority order. It tries the first one with retries. If all retries fail, it moves to the next provider. If every provider fails, it raises a clear error.

class FallbackChain:
    """Tries providers in order until one succeeds."""

    def __init__(self, providers: list[BaseLLMProvider],
                 retry_config: RetryConfig = RetryConfig()):
        self.providers = providers
        self.retry_config = retry_config

    def chat(self, messages: list[dict], **kwargs) -> LLMResponse:
        errors = []
        for provider in self.providers:
            try:
                return retry_with_backoff(
                    lambda p=provider: p.chat(messages, **kwargs),
                    self.retry_config,
                )
            except LLMError as e:
                errors.append(f"{provider.provider_name}: {e}")
                print(f"  Provider {provider.provider_name} failed, "
                      f"trying next...")
                continue
        tried = ", ".join(p.provider_name for p in self.providers)
        raise LLMError(f"All providers failed ({tried}): {errors}")

chain = FallbackChain(
    providers=[get_provider("openai"), get_provider("anthropic"),
               get_provider("google")],
    retry_config=RetryConfig(max_retries=2, base_delay=0.5),
)
print(f"Fallback chain: {[p.provider_name for p in chain.providers]}")
print("Route: openai -> anthropic -> google")

Result:

python
Fallback chain: ['openai', 'anthropic', 'google']
Route: openai -> anthropic -> google

{
type: ‘exercise’,
id: ‘retry-fallback-ex’,
title: ‘Exercise 2: Test the Retry System’,
difficulty: ‘intermediate’,
exerciseType: ‘write’,
instructions: ‘Create a function called sometimes_fails that uses a global counter. It should raise LLMError on the first call and return "OK" on the second call. Then call retry_with_backoff with max_retries=1 and base_delay=0.01 (fast for testing) with jitter=False. Print the result.’,
starterCode: ‘counter = 0\n\ndef sometimes_fails():\n global counter\n counter += 1\n # YOUR CODE: fail on first call, succeed on second\n pass\n\ncounter = 0\ncfg = RetryConfig(max_retries=1, base_delay=0.01, jitter=False)\nresult = retry_with_backoff(sometimes_fails, cfg)\nprint(result)’,
testCases: [
{ id: ‘tc1’, input: ‘counter = 0; result = retry_with_backoff(sometimes_fails, RetryConfig(max_retries=1, base_delay=0.01, jitter=False)); print(result)’, expectedOutput: ‘OK’, description: ‘Should succeed after one retry’ },
{ id: ‘tc2’, input: ‘counter = 0; result = retry_with_backoff(sometimes_fails, RetryConfig(max_retries=1, base_delay=0.01, jitter=False)); print(counter)’, expectedOutput: ‘2’, description: ‘Counter should be 2 (failed once, succeeded once)’ },
],
hints: [
‘Check if counter == 1 to decide whether to raise or return’,
‘if counter < 2: raise LLMError(“fail”, “test”, 500)\nreturn “OK”‘,
],
solution: ‘counter = 0\n\ndef sometimes_fails():\n global counter\n counter += 1\n if counter < 2:\n raise LLMError(“fail”, “test”, 500)\n return “OK”\n\ncounter = 0\ncfg = RetryConfig(max_retries=1, base_delay=0.01, jitter=False)\nresult = retry_with_backoff(sometimes_fails, cfg)\nprint(result)’,
solutionExplanation: ‘On the first call, counter becomes 1, which is less than 2, so LLMError is raised. The retry system catches it and waits 0.01s. On the second call, counter becomes 2, the condition is false, and “OK” is returned.’,
xpReward: 15,
}


Structured Output Parsing

LLMs return text. Your application needs data — dictionaries, lists, typed objects. Structured output parsing bridges that gap.

The approach: ask the LLM to return JSON, then extract it from the response. But LLMs sometimes wrap JSON in markdown fences or add text before and after it.

Our StructuredOutputParser handles these cases. It tries three extraction strategies in order — from strictest to most lenient. First, pure json.loads. Second, extract from triple-backtick fences. Third, find the first { and last } and try that substring.

class StructuredOutputParser:
    """Extracts structured data from LLM text responses."""

    @staticmethod
    def parse_json(text: str) -> dict:
        """Extract JSON from LLM response text."""
        text = text.strip()
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            pass

        fence_match = re.search(
            r'```(?:json)?\s*\n(.*?)\n```', text, re.DOTALL
        )
        if fence_match:
            try:
                return json.loads(fence_match.group(1))
            except json.JSONDecodeError:
                pass

        start = text.find('{')
        end = text.rfind('}')
        if start != -1 and end != -1 and end > start:
            try:
                return json.loads(text[start:end + 1])
            except json.JSONDecodeError:
                pass

        raise ValueError(
            f"Could not parse JSON from: {text[:100]}..."
        )

    @staticmethod
    def parse_list(text: str) -> list:
        """Extract a JSON array from LLM response text."""
        text = text.strip()
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            pass

        fence_match = re.search(
            r'```(?:json)?\s*\n(.*?)\n```', text, re.DOTALL
        )
        if fence_match:
            try:
                return json.loads(fence_match.group(1))
            except json.JSONDecodeError:
                pass

        start = text.find('[')
        end = text.rfind(']')
        if start != -1 and end != -1 and end > start:
            try:
                return json.loads(text[start:end + 1])
            except json.JSONDecodeError:
                pass

        raise ValueError(
            f"Could not parse list from: {text[:100]}..."
        )

Four test cases cover the real-world scenarios. Clean JSON, fenced JSON, embedded JSON with surrounding text, and a list inside a code block.

parser = StructuredOutputParser()

result1 = parser.parse_json('{"name": "Alice", "score": 95}')
print(f"Clean JSON: {result1}")

result2 = parser.parse_json(
    'Here is the result:\n```json\n{"status": "ok", "count": 3}\n```\nDone!'
)
print(f"Fenced JSON: {result2}")

result3 = parser.parse_json(
    'The analysis shows {"sentiment": "positive", "confidence": 0.92} based on the input.'
)
print(f"Embedded JSON: {result3}")

result4 = parser.parse_list('```json\n["python", "rust", "go"]\n```')
print(f"Parsed list: {result4}")

Output:

python
Clean JSON: {'name': 'Alice', 'score': 95}
Fenced JSON: {'status': 'ok', 'count': 3}
Embedded JSON: {'sentiment': 'positive', 'confidence': 0.92}
Parsed list: ['python', 'rust', 'go']

All four work. The parser tries strategies from strictest to most lenient. You get clean data every time — or a clear error if the LLM returned gibberish.

TIP When asking an LLM for JSON, include “Respond with valid JSON only. No explanation.” in your prompt. This reduces fallback parsing and makes responses faster.


{
type: ‘exercise’,
id: ‘structured-parse-ex’,
title: ‘Exercise 3: Parse Tricky LLM Output’,
difficulty: ‘intermediate’,
exerciseType: ‘write’,
instructions: ‘The LLM returned this messy text:\n\n"Sure! Here\'s the analysis:\\n```json\\n{\\"language\\": \\"Python\\", \\"version\\": 3.11, \\"features\\": [\\"typing\\", \\"match\\"]}\\n```\\nHope that helps!"\n\nUse StructuredOutputParser.parse_json() to extract the data. Print the language value and the number of features.’,
starterCode: ‘messy = “Sure! Here\’s the analysis:\njson\\n{\\"language\\": \\"Python\\", \\"version\\": 3.11, \\"features\\": [\\"typing\\", \\"match\\"]}\\n\nHope that helps!”\n\nparser = StructuredOutputParser()\ndata = # YOUR CODE HERE\nprint(data[“language”])\nprint(len(data[“features”]))’,
testCases: [
{ id: ‘tc1’, input: ‘print(data[“language”])’, expectedOutput: ‘Python’, description: ‘Language should be Python’ },
{ id: ‘tc2’, input: ‘print(len(data[“features”]))’, expectedOutput: ‘2’, description: ‘Should have 2 features’ },
],
hints: [
‘Call parser.parse_json(messy) — the parser will find the JSON inside the markdown fence’,
‘data = parser.parse_json(messy)’,
],
solution: ‘messy = “Sure! Here\’s the analysis:\njson\\n{\\"language\\": \\"Python\\", \\"version\\": 3.11, \\"features\\": [\\"typing\\", \\"match\\"]}\\n\nHope that helps!”\nparser = StructuredOutputParser()\ndata = parser.parse_json(messy)\nprint(data[“language”])\nprint(len(data[“features”]))’,
solutionExplanation: ‘The parser first tries json.loads on the full string, which fails because of surrounding text. It then looks for a markdown code fence, finds one, and parses the JSON inside it. The result is a dictionary with “language”, “version”, and “features” keys.’,
xpReward: 15,
}


Conversation Management

Multi-turn chat needs message history. You send the full conversation each time so the LLM has context. But conversations grow. Without management, you’ll hit the context window limit.

The Conversation class stores messages, tracks estimated tokens, and trims old messages when the history gets too long. It always keeps the system message (first position). When truncation kicks in, it drops the oldest user/assistant pairs from the middle.

class Conversation:
    """Manages multi-turn chat with automatic truncation."""

    def __init__(self, system_prompt: str = "", max_tokens: int = 8000):
        self.messages: list[dict] = []
        self.max_tokens = max_tokens
        self.token_tracker = TokenTracker()
        if system_prompt:
            self.messages.append({"role": "system", "content": system_prompt})

    def add_user(self, content: str) -> None:
        """Add a user message."""
        self.messages.append({"role": "user", "content": content})
        self._truncate()

    def add_assistant(self, content: str) -> None:
        """Add an assistant response."""
        self.messages.append({"role": "assistant", "content": content})

    def _truncate(self) -> None:
        """Remove oldest messages when tokens exceed limit."""
        while (self.token_tracker.estimate_messages(self.messages)
               > self.max_tokens):
            if len(self.messages) <= 2:
                break
            self.messages.pop(1)

    def get_messages(self) -> list[dict]:
        return list(self.messages)

    def clear(self) -> None:
        """Clear history, keeping only system message."""
        system = [m for m in self.messages if m["role"] == "system"]
        self.messages = system

    def __len__(self) -> int:
        return len(self.messages)

Here’s a conversation with five exchanges. Notice that the system message stays first, and all turns are preserved because we’re well under the 200-token limit.

convo = Conversation(system_prompt="You are a Python expert.", max_tokens=200)
convo.add_user("What is a decorator?")
convo.add_assistant("A decorator wraps a function to add behavior.")
convo.add_user("Show me an example.")
convo.add_assistant("@timer measures execution time.")
convo.add_user("Can decorators take arguments?")

print(f"Messages: {len(convo)}")
est = convo.token_tracker.estimate_messages(convo.get_messages())
print(f"Estimated tokens: {est}")

for m in convo.get_messages():
    print(f"  {m['role'].upper()}: {m['content'][:50]}")

This prints:

python
Messages: 6
Estimated tokens: 54
  SYSTEM: You are a Python expert.
  USER: What is a decorator?
  ASSISTANT: A decorator wraps a function to add behavior.
  USER: Show me an example.
  ASSISTANT: @timer measures execution time.
  USER: Can decorators take arguments?

Six messages, 54 estimated tokens — well under our 200-token cap. If we kept adding messages, the oldest pairs would get trimmed automatically.

The Unified LLM Client

This is the main class. It ties every component together. The LLMClient takes a provider name (or a list for fallback), initializes adapters, wires up token tracking, and exposes clean methods.

The chat() method runs through retry/fallback and records usage. The chat_json() method adds structured output parsing. The stream() method returns a generator. And estimate_cost() shows what a request will cost before you send it.

class LLMClient:
    """Unified client for multi-provider LLM access."""

    def __init__(self, providers: list[str] | str = "openai",
                 retry_config: RetryConfig = RetryConfig(),
                 **provider_kwargs):
        if isinstance(providers, str):
            providers = [providers]
        self.adapters = [get_provider(p, **provider_kwargs) for p in providers]
        self.fallback = FallbackChain(self.adapters, retry_config)
        self.tracker = TokenTracker()
        self.parser = StructuredOutputParser()
        self.default_provider = self.adapters[0]

    def chat(self, messages: list[dict], **kwargs) -> LLMResponse:
        """Send a chat request with retry and fallback."""
        response = self.fallback.chat(messages, **kwargs)
        self.tracker.record(response)
        return response

    def chat_json(self, messages: list[dict], **kwargs) -> tuple[dict, LLMResponse]:
        """Chat and parse response as JSON."""
        response = self.chat(messages, **kwargs)
        return self.parser.parse_json(response.text), response

    def chat_list(self, messages: list[dict], **kwargs) -> tuple[list, LLMResponse]:
        """Chat and parse response as a JSON list."""
        response = self.chat(messages, **kwargs)
        return self.parser.parse_list(response.text), response

    def stream(self, messages: list[dict], **kwargs) -> Generator[str, None, None]:
        """Stream tokens from the default provider."""
        yield from self.default_provider.chat_stream(messages, **kwargs)

    def estimate_cost(self, messages: list[dict],
                      model: str = "", max_output_tokens: int = 500) -> dict:
        """Estimate cost before sending a request."""
        input_est = self.tracker.estimate_messages(messages)
        provider = self.default_provider.provider_name
        model = model or self.default_provider.default_model
        key = f"{provider}/{model}"
        pricing = PRICING.get(key, {"input": 0, "output": 0})
        input_cost = (input_est / 1_000_000) * pricing["input"]
        output_cost = (max_output_tokens / 1_000_000) * pricing["output"]
        return {
            "estimated_input_tokens": input_est,
            "max_output_tokens": max_output_tokens,
            "estimated_cost": input_cost + output_cost,
            "model": key,
        }

    def spending_summary(self) -> str:
        return self.tracker.summary()

Let’s create a client with three providers and estimate a cost.

client = LLMClient(
    providers=["openai", "anthropic", "google"],
    retry_config=RetryConfig(max_retries=2, base_delay=0.5),
)
print(f"Client ready with {len(client.adapters)} providers")
print(f"Primary: {client.default_provider.provider_name}")

messages = [
    {"role": "system", "content": "You are a data science tutor."},
    {"role": "user", "content": "Explain overfitting in three sentences."},
]
estimate = client.estimate_cost(messages, max_output_tokens=100)
print(f"\nCost estimate: {estimate}")

Output:

python
Client ready with 3 providers
Primary: openai

Cost estimate: {'estimated_input_tokens': 24, 'max_output_tokens': 100, 'estimated_cost': 6.36e-05, 'model': 'openai/gpt-4o-mini'}

Estimated cost for gpt-4o-mini: $0.0000636. That’s 24 input tokens at $0.15/M plus 100 output tokens at $0.60/M. When you switch to GPT-4o or Claude Sonnet, estimate_cost() shows the difference before you commit.

Putting It All Together — Full Demo

Here’s the toolkit in action. We’ll use a mock provider so the demo runs without API keys. The mock returns predefined responses, but every other component — conversation management, JSON parsing, cost tracking — works exactly as it would with a real provider.

class MockProvider(BaseLLMProvider):
    """Mock provider for testing without API keys."""

    def __init__(self, responses: list[str] = None):
        super().__init__("mock-key", "mock-model")
        self.responses = responses or ["Mock response."]
        self._call_idx = 0

    def chat(self, messages: list[dict], model: str = "",
             temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
        text = self.responses[self._call_idx % len(self.responses)]
        self._call_idx += 1
        input_tokens = sum(len(m["content"]) // 4 for m in messages)
        output_tokens = len(text) // 4
        return LLMResponse(
            text=text, model="mock-model", provider="openai",
            input_tokens=input_tokens, output_tokens=output_tokens,
            latency_ms=random.uniform(50, 200),
        )

    def chat_stream(self, messages: list[dict], **kwargs) -> Generator[str, None, None]:
        text = self.responses[self._call_idx % len(self.responses)]
        self._call_idx += 1
        for word in text.split():
            yield word + " "

mock = MockProvider(responses=[
    "Overfitting happens when a model memorizes training data.",
    '{"sentiment": "positive", "confidence": 0.87, "topics": ["AI", "ML"]}',
    "A good model generalizes well to unseen data.",
])
client = LLMClient(providers=["openai"])
client.adapters = [mock]
client.fallback = FallbackChain([mock])
client.default_provider = mock

Now we run a three-turn conversation. Turn 1 is a plain chat. Turn 2 asks for structured JSON output. Turn 3 is another plain chat. The tracker accumulates everything.

convo = Conversation(system_prompt="You are a helpful ML tutor.")

# Turn 1 — plain chat
convo.add_user("What is overfitting?")
resp1 = client.chat(convo.get_messages())
convo.add_assistant(resp1.text)
print(f"Turn 1: {resp1.text}")
print(f"  {resp1.summary()}")

# Turn 2 — structured output
convo.add_user('Analyze: "I love neural networks". Return JSON.')
resp2 = client.chat(convo.get_messages())
convo.add_assistant(resp2.text)
parsed = client.parser.parse_json(resp2.text)
print(f"\nTurn 2 (JSON): {parsed}")
print(f"  Confidence: {parsed['confidence']}")

# Turn 3 — plain chat
convo.add_user("How do I prevent overfitting?")
resp3 = client.chat(convo.get_messages())
convo.add_assistant(resp3.text)
print(f"\nTurn 3: {resp3.text}")
python
Turn 1: Overfitting happens when a model memorizes training data.
  [openai/mock-model] 24 tokens, 132ms, $0.000001
Turn 2 (JSON): {'sentiment': 'positive', 'confidence': 0.87, 'topics': ['AI', 'ML']}
  Confidence: 0.87
Turn 3: A good model generalizes well to unseen data.
  [openai/mock-model] 59 tokens, 89ms, $0.000002

Three turns. The second turn parsed JSON into a Python dictionary automatically. Now let’s see streaming and the spending summary.

print("Streaming: ", end="")
for token in client.stream(
    [{"role": "user", "content": "Explain bias-variance tradeoff."}]
):
    print(token, end="", flush=True)
print()
print()
print(client.spending_summary())

Result:

python
Streaming: Overfitting happens when a model memorizes training data.

Total requests: 3
Total tokens: 112
Total cost: $0.0000
  openai: $0.0000

Streaming comes through word-by-word. In a real app, you’d pipe those tokens to a frontend for a typing effect. The spending summary shows all three requests — total tokens and cost broken down by provider.

KEY INSIGHT The unified client doesn’t just save you from learning four APIs. It gives you one place to track costs, one place to add retry logic, and one place to swap providers. Every feature benefits all providers at once.

Common Mistakes and How to Fix Them

Mistake 1: Hardcoding provider response paths

Wrong:

# Breaks when you switch providers
text = response["choices"][0]["message"]["content"]

This only works for OpenAI. Anthropic uses content[0].text. Gemini uses a completely different path. Hardcoding means rewriting every call site when you add a provider.

Correct:

resp = client.chat(messages)
text = resp.text  # Works for any provider

Mistake 2: Retrying authentication errors

Wrong:

# Wastes 15 seconds retrying a bad API key
retry_with_backoff(lambda: provider.chat(msgs), RetryConfig(max_retries=3))

A 401 means your key is wrong. Waiting won’t fix it. You burn your retry budget and delay the real error.

Correct:

# Our retry function already skips AuthenticationError
# It raises immediately on 401 — no wasted time

Mistake 3: Tracking costs per provider separately

Wrong:

openai_cost = calculate_openai_cost(response)
claude_cost = calculate_claude_cost(response)
# Fragmented data — can't see total spending

When fallback chains hit two providers for one logical request, separate tracking loses that connection.

Correct:

client.chat(messages)  # Tracked automatically
print(client.spending_summary())  # All providers in one place

When NOT to Build This Yourself

This toolkit teaches how multi-provider LLM clients work from the inside. But you don’t always need to build from scratch.

Use LiteLLM if you need 100+ models, detailed logging, and a proxy server. It handles edge cases we skipped — streaming error recovery, actual tokenizers, and rate limit headers.

Use official SDKs if you only need one provider. The openai Python package handles retries, streaming, and structured output natively. Adding our abstraction on top would be unnecessary complexity.

NOTE LiteLLM (pip install litellm) provides a litellm.completion() function that works like our LLMClient.chat() but supports 100+ models out of the box. It’s the go-to choice for production multi-provider setups.

Build your own if you need custom routing logic. Route PII-free requests to the cheapest provider. Route sensitive data to your private Ollama instance. A custom toolkit gives you that control.

Complete Code

Click to expand the full script (copy-paste and run)
# Complete code from: Build a Multi-Provider LLM Toolkit in Python
# Requires: Python 3.10+ (stdlib only — no pip installs)

import os
import json
import time
import random
import hashlib
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Generator, Optional
from datetime import datetime
import urllib.request
import urllib.error

# --- Configuration ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")

PRICING = {
    "openai/gpt-4o": {"input": 2.50, "output": 10.00},
    "openai/gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "anthropic/claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
    "anthropic/claude-haiku-3.5": {"input": 0.80, "output": 4.00},
    "google/gemini-2.0-flash": {"input": 0.10, "output": 0.40},
    "google/gemini-2.5-pro": {"input": 1.25, "output": 10.00},
    "ollama/llama3": {"input": 0.00, "output": 0.00},
}

# --- Unified Response ---
@dataclass
class LLMResponse:
    text: str
    model: str
    provider: str
    input_tokens: int
    output_tokens: int
    latency_ms: float
    raw: dict = field(default_factory=dict)
    finish_reason: str = "stop"

    @property
    def total_tokens(self) -> int:
        return self.input_tokens + self.output_tokens

    @property
    def cost(self) -> float:
        key = f"{self.provider}/{self.model}"
        if key not in PRICING:
            return 0.0
        p = PRICING[key]
        return ((self.input_tokens / 1_000_000) * p["input"]
                + (self.output_tokens / 1_000_000) * p["output"])

    def summary(self) -> str:
        return (f"[{self.provider}/{self.model}] "
                f"{self.total_tokens} tokens, "
                f"{self.latency_ms:.0f}ms, ${self.cost:.6f}")

# --- Errors ---
class LLMError(Exception):
    def __init__(self, message, provider="", status_code=0):
        self.provider = provider
        self.status_code = status_code
        super().__init__(message)

class RateLimitError(LLMError): pass
class AuthenticationError(LLMError): pass

# --- Base Provider ---
class BaseLLMProvider(ABC):
    def __init__(self, api_key, default_model):
        self.api_key = api_key
        self.default_model = default_model
        self.provider_name = (self.__class__.__name__
                              .lower().replace("provider", ""))

    @abstractmethod
    def chat(self, messages, model="",
             temperature=0.7, max_tokens=1024) -> LLMResponse: ...

    @abstractmethod
    def chat_stream(self, messages, model="",
                    temperature=0.7, max_tokens=1024) -> Generator: ...

    def _get_model(self, model):
        return model or self.default_model

# --- OpenAI ---
class OpenAIProvider(BaseLLMProvider):
    BASE_URL = "https://api.openai.com/v1/chat/completions"
    def __init__(self, api_key="", default_model="gpt-4o-mini"):
        super().__init__(api_key or OPENAI_API_KEY, default_model)
    def chat(self, messages, model="", temperature=0.7, max_tokens=1024):
        model = self._get_model(model)
        payload = {"model": model, "messages": messages,
                   "temperature": temperature, "max_tokens": max_tokens}
        data = json.dumps(payload).encode()
        req = urllib.request.Request(
            self.BASE_URL, data=data,
            headers={"Authorization": f"Bearer {self.api_key}",
                     "Content-Type": "application/json"},
            method="POST")
        start = time.time()
        try:
            with urllib.request.urlopen(req, timeout=60) as resp:
                body = json.loads(resp.read())
        except urllib.error.HTTPError as e:
            if e.code == 429: raise RateLimitError("Rate limited", "openai", 429)
            if e.code == 401: raise AuthenticationError("Invalid key", "openai", 401)
            raise LLMError(f"OpenAI {e.code}", "openai", e.code)
        latency = (time.time() - start) * 1000
        c = body["choices"][0]; u = body.get("usage", {})
        return LLMResponse(
            text=c["message"]["content"], model=model, provider="openai",
            input_tokens=u.get("prompt_tokens", 0),
            output_tokens=u.get("completion_tokens", 0),
            latency_ms=latency, raw=body,
            finish_reason=c.get("finish_reason", "stop"))
    def chat_stream(self, messages, model="", temperature=0.7, max_tokens=1024):
        model = self._get_model(model)
        payload = {"model": model, "messages": messages,
                   "temperature": temperature, "max_tokens": max_tokens,
                   "stream": True}
        data = json.dumps(payload).encode()
        req = urllib.request.Request(
            self.BASE_URL, data=data,
            headers={"Authorization": f"Bearer {self.api_key}",
                     "Content-Type": "application/json"},
            method="POST")
        with urllib.request.urlopen(req, timeout=120) as resp:
            for line in resp:
                line = line.decode().strip()
                if not line.startswith("data: "): continue
                chunk = line[6:]
                if chunk == "[DONE]": break
                d = json.loads(chunk)["choices"][0].get("delta", {})
                if "content" in d: yield d["content"]

# --- Anthropic ---
class AnthropicProvider(BaseLLMProvider):
    BASE_URL = "https://api.anthropic.com/v1/messages"
    def __init__(self, api_key="", default_model="claude-haiku-3.5"):
        super().__init__(api_key or ANTHROPIC_API_KEY, default_model)
    def _split_system(self, messages):
        system, filtered = "", []
        for m in messages:
            if m["role"] == "system": system = m["content"]
            else: filtered.append(m)
        return system, filtered
    def chat(self, messages, model="", temperature=0.7, max_tokens=1024):
        model = self._get_model(model)
        system, msgs = self._split_system(messages)
        payload = {"model": model, "messages": msgs,
                   "temperature": temperature, "max_tokens": max_tokens}
        if system: payload["system"] = system
        data = json.dumps(payload).encode()
        req = urllib.request.Request(
            self.BASE_URL, data=data,
            headers={"x-api-key": self.api_key,
                     "anthropic-version": "2023-06-01",
                     "Content-Type": "application/json"},
            method="POST")
        start = time.time()
        try:
            with urllib.request.urlopen(req, timeout=60) as resp:
                body = json.loads(resp.read())
        except urllib.error.HTTPError as e:
            if e.code == 429: raise RateLimitError("Rate limited", "anthropic", 429)
            if e.code == 401: raise AuthenticationError("Invalid key", "anthropic", 401)
            raise LLMError(f"Anthropic {e.code}", "anthropic", e.code)
        latency = (time.time() - start) * 1000
        u = body.get("usage", {})
        return LLMResponse(
            text=body["content"][0]["text"], model=model, provider="anthropic",
            input_tokens=u.get("input_tokens", 0),
            output_tokens=u.get("output_tokens", 0),
            latency_ms=latency, raw=body,
            finish_reason=body.get("stop_reason", "stop"))
    def chat_stream(self, messages, model="", temperature=0.7, max_tokens=1024):
        model = self._get_model(model)
        system, msgs = self._split_system(messages)
        payload = {"model": model, "messages": msgs,
                   "temperature": temperature, "max_tokens": max_tokens,
                   "stream": True}
        if system: payload["system"] = system
        data = json.dumps(payload).encode()
        req = urllib.request.Request(
            self.BASE_URL, data=data,
            headers={"x-api-key": self.api_key,
                     "anthropic-version": "2023-06-01",
                     "Content-Type": "application/json"},
            method="POST")
        with urllib.request.urlopen(req, timeout=120) as resp:
            for line in resp:
                line = line.decode().strip()
                if not line.startswith("data: "): continue
                p = json.loads(line[6:])
                if p.get("type") == "content_block_delta":
                    yield p["delta"].get("text", "")

# --- Google ---
class GoogleProvider(BaseLLMProvider):
    BASE_URL = "https://generativelanguage.googleapis.com/v1beta/models"
    def __init__(self, api_key="", default_model="gemini-2.0-flash"):
        super().__init__(api_key or GOOGLE_API_KEY, default_model)
    def _convert(self, messages):
        system, contents = "", []
        for m in messages:
            if m["role"] == "system": system = m["content"]
            else:
                role = "user" if m["role"] == "user" else "model"
                contents.append({"role": role, "parts": [{"text": m["content"]}]})
        return system, contents
    def chat(self, messages, model="", temperature=0.7, max_tokens=1024):
        model = self._get_model(model)
        system, contents = self._convert(messages)
        url = f"{self.BASE_URL}/{model}:generateContent?key={self.api_key}"
        payload = {"contents": contents,
                   "generationConfig": {"temperature": temperature,
                                        "maxOutputTokens": max_tokens}}
        if system:
            payload["systemInstruction"] = {"parts": [{"text": system}]}
        data = json.dumps(payload).encode()
        req = urllib.request.Request(
            url, data=data,
            headers={"Content-Type": "application/json"}, method="POST")
        start = time.time()
        try:
            with urllib.request.urlopen(req, timeout=60) as resp:
                body = json.loads(resp.read())
        except urllib.error.HTTPError as e:
            if e.code == 429: raise RateLimitError("Rate limited", "google", 429)
            if e.code in (401, 403): raise AuthenticationError("Invalid key", "google", e.code)
            raise LLMError(f"Google {e.code}", "google", e.code)
        latency = (time.time() - start) * 1000
        c = body["candidates"][0]; u = body.get("usageMetadata", {})
        return LLMResponse(
            text=c["content"]["parts"][0]["text"], model=model, provider="google",
            input_tokens=u.get("promptTokenCount", 0),
            output_tokens=u.get("candidatesTokenCount", 0),
            latency_ms=latency, raw=body,
            finish_reason=c.get("finishReason", "STOP").lower())
    def chat_stream(self, messages, model="", temperature=0.7, max_tokens=1024):
        model = self._get_model(model)
        system, contents = self._convert(messages)
        url = (f"{self.BASE_URL}/{model}:streamGenerateContent"
               f"?key={self.api_key}&alt=sse")
        payload = {"contents": contents,
                   "generationConfig": {"temperature": temperature,
                                        "maxOutputTokens": max_tokens}}
        if system:
            payload["systemInstruction"] = {"parts": [{"text": system}]}
        data = json.dumps(payload).encode()
        req = urllib.request.Request(
            url, data=data,
            headers={"Content-Type": "application/json"}, method="POST")
        with urllib.request.urlopen(req, timeout=120) as resp:
            for line in resp:
                line = line.decode().strip()
                if not line.startswith("data: "): continue
                p = json.loads(line[6:])
                parts = (p.get("candidates", [{}])[0]
                         .get("content", {}).get("parts", []))
                for part in parts:
                    if "text" in part: yield part["text"]

# --- Ollama ---
class OllamaProvider(BaseLLMProvider):
    def __init__(self, base_url="", default_model="llama3"):
        self.base_url = base_url or OLLAMA_BASE_URL
        super().__init__("", default_model)
    def chat(self, messages, model="", temperature=0.7, max_tokens=1024):
        model = self._get_model(model)
        url = f"{self.base_url}/api/chat"
        payload = {"model": model, "messages": messages,
                   "options": {"temperature": temperature,
                               "num_predict": max_tokens},
                   "stream": False}
        data = json.dumps(payload).encode()
        req = urllib.request.Request(
            url, data=data,
            headers={"Content-Type": "application/json"}, method="POST")
        start = time.time()
        try:
            with urllib.request.urlopen(req, timeout=120) as resp:
                body = json.loads(resp.read())
        except urllib.error.URLError:
            raise LLMError("Ollama not running", "ollama", 0)
        latency = (time.time() - start) * 1000
        return LLMResponse(
            text=body["message"]["content"], model=model, provider="ollama",
            input_tokens=body.get("prompt_eval_count", 0),
            output_tokens=body.get("eval_count", 0),
            latency_ms=latency, raw=body)
    def chat_stream(self, messages, model="", temperature=0.7, max_tokens=1024):
        model = self._get_model(model)
        url = f"{self.base_url}/api/chat"
        payload = {"model": model, "messages": messages,
                   "options": {"temperature": temperature,
                               "num_predict": max_tokens},
                   "stream": True}
        data = json.dumps(payload).encode()
        req = urllib.request.Request(
            url, data=data,
            headers={"Content-Type": "application/json"}, method="POST")
        with urllib.request.urlopen(req, timeout=120) as resp:
            for line in resp:
                chunk = json.loads(line.decode())
                if not chunk.get("done", False):
                    yield chunk["message"]["content"]

# --- Registry ---
PROVIDER_REGISTRY = {
    "openai": OpenAIProvider,
    "anthropic": AnthropicProvider,
    "google": GoogleProvider,
    "ollama": OllamaProvider,
}

def get_provider(name, **kwargs):
    if name not in PROVIDER_REGISTRY:
        raise ValueError(f"Unknown provider: {name}")
    return PROVIDER_REGISTRY[name](**kwargs)

# --- Token Tracker ---
class TokenTracker:
    def __init__(self):
        self.history, self.total_cost = [], 0.0
        self.total_input_tokens = self.total_output_tokens = 0
    @staticmethod
    def estimate_tokens(text):
        return max(1, len(text) // 4)
    def estimate_messages(self, messages):
        return sum(4 + self.estimate_tokens(m.get("content", ""))
                   for m in messages)
    def record(self, response):
        self.total_input_tokens += response.input_tokens
        self.total_output_tokens += response.output_tokens
        self.total_cost += response.cost
        self.history.append({
            "provider": response.provider, "model": response.model,
            "input_tokens": response.input_tokens,
            "output_tokens": response.output_tokens,
            "cost": response.cost, "latency_ms": response.latency_ms,
            "timestamp": datetime.now().isoformat()})
    def summary(self):
        lines = [
            f"Total requests: {len(self.history)}",
            f"Total tokens: {self.total_input_tokens + self.total_output_tokens:,}",
            f"Total cost: ${self.total_cost:.4f}"]
        by_provider = {}
        for h in self.history:
            by_provider[h["provider"]] = (
                by_provider.get(h["provider"], 0) + h["cost"])
        for prov, cost in sorted(by_provider.items()):
            lines.append(f"  {prov}: ${cost:.4f}")
        return "\n".join(lines)

# --- Retry & Fallback ---
@dataclass
class RetryConfig:
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 30.0
    retryable_errors: tuple = (RateLimitError, LLMError)
    jitter: bool = True

def retry_with_backoff(fn, config=RetryConfig()):
    last_error = None
    for attempt in range(config.max_retries + 1):
        try:
            return fn()
        except config.retryable_errors as e:
            last_error = e
            if isinstance(e, AuthenticationError): raise
            if attempt < config.max_retries:
                delay = min(config.base_delay * (2 ** attempt),
                            config.max_delay)
                if config.jitter: delay = random.uniform(0, delay)
                time.sleep(delay)
    raise last_error

class FallbackChain:
    def __init__(self, providers, retry_config=RetryConfig()):
        self.providers = providers
        self.retry_config = retry_config
    def chat(self, messages, **kwargs):
        errors = []
        for provider in self.providers:
            try:
                return retry_with_backoff(
                    lambda p=provider: p.chat(messages, **kwargs),
                    self.retry_config)
            except LLMError as e:
                errors.append(f"{provider.provider_name}: {e}")
                continue
        raise LLMError(f"All providers failed: {errors}")

# --- Structured Output ---
class StructuredOutputParser:
    @staticmethod
    def parse_json(text):
        text = text.strip()
        try: return json.loads(text)
        except json.JSONDecodeError: pass
        m = re.search(r'```(?:json)?\s*\n(.*?)\n```', text, re.DOTALL)
        if m:
            try: return json.loads(m.group(1))
            except json.JSONDecodeError: pass
        s, e = text.find('{'), text.rfind('}')
        if s != -1 and e > s:
            try: return json.loads(text[s:e+1])
            except json.JSONDecodeError: pass
        raise ValueError(f"Could not parse JSON: {text[:100]}...")
    @staticmethod
    def parse_list(text):
        text = text.strip()
        try: return json.loads(text)
        except json.JSONDecodeError: pass
        m = re.search(r'```(?:json)?\s*\n(.*?)\n```', text, re.DOTALL)
        if m:
            try: return json.loads(m.group(1))
            except json.JSONDecodeError: pass
        s, e = text.find('['), text.rfind(']')
        if s != -1 and e > s:
            try: return json.loads(text[s:e+1])
            except json.JSONDecodeError: pass
        raise ValueError(f"Could not parse list: {text[:100]}...")

# --- Conversation ---
class Conversation:
    def __init__(self, system_prompt="", max_tokens=8000):
        self.messages, self.max_tokens = [], max_tokens
        self.token_tracker = TokenTracker()
        if system_prompt:
            self.messages.append({"role": "system", "content": system_prompt})
    def add_user(self, content):
        self.messages.append({"role": "user", "content": content})
        self._truncate()
    def add_assistant(self, content):
        self.messages.append({"role": "assistant", "content": content})
    def _truncate(self):
        while (self.token_tracker.estimate_messages(self.messages)
               > self.max_tokens):
            if len(self.messages) <= 2: break
            self.messages.pop(1)
    def get_messages(self): return list(self.messages)
    def clear(self):
        self.messages = [m for m in self.messages if m["role"] == "system"]
    def __len__(self): return len(self.messages)

# --- Unified Client ---
class LLMClient:
    def __init__(self, providers="openai", retry_config=RetryConfig(), **kw):
        if isinstance(providers, str): providers = [providers]
        self.adapters = [get_provider(p, **kw) for p in providers]
        self.fallback = FallbackChain(self.adapters, retry_config)
        self.tracker = TokenTracker()
        self.parser = StructuredOutputParser()
        self.default_provider = self.adapters[0]
    def chat(self, messages, **kwargs):
        response = self.fallback.chat(messages, **kwargs)
        self.tracker.record(response)
        return response
    def chat_json(self, messages, **kwargs):
        response = self.chat(messages, **kwargs)
        return self.parser.parse_json(response.text), response
    def chat_list(self, messages, **kwargs):
        response = self.chat(messages, **kwargs)
        return self.parser.parse_list(response.text), response
    def stream(self, messages, **kwargs):
        yield from self.default_provider.chat_stream(messages, **kwargs)
    def estimate_cost(self, messages, model="", max_output_tokens=500):
        est = self.tracker.estimate_messages(messages)
        prov = self.default_provider.provider_name
        model = model or self.default_provider.default_model
        key = f"{prov}/{model}"
        pr = PRICING.get(key, {"input": 0, "output": 0})
        return {"estimated_input_tokens": est,
                "max_output_tokens": max_output_tokens,
                "estimated_cost": (est/1e6)*pr["input"] + (max_output_tokens/1e6)*pr["output"],
                "model": key}
    def spending_summary(self): return self.tracker.summary()

print("Script completed successfully.")

Summary

You’ve built a complete multi-provider LLM toolkit from scratch. Here’s what each piece does:

Component Purpose Why It Matters
LLMResponse Unified response from any provider One format everywhere
BaseLLMProvider Abstract interface for adapters Adding providers is one class
4 adapters Translate calls to provider HTTP OpenAI, Anthropic, Google, Ollama
TokenTracker Estimate and track token usage Control your spending
RetryConfig + backoff Exponential retry with jitter Handle transient failures
FallbackChain Try providers in priority order Uptime when providers go down
StructuredOutputParser Extract JSON from messy text Type-safe data from LLM output
Conversation Manage history with truncation Multi-turn chat that scales
LLMClient Single API for everything One .chat() call for all

Practice exercise: Extend the toolkit with a CachingLayer class. When the same messages are sent twice, return the cached response. Use a dictionary with a hash of the messages as the key. Add a ttl_seconds parameter for expiration.

Click to see the solution
class CachingLayer:
    """Cache LLM responses to avoid duplicate API calls."""

    def __init__(self, ttl_seconds: int = 3600):
        self.cache: dict[str, tuple[LLMResponse, float]] = {}
        self.ttl = ttl_seconds

    def _cache_key(self, messages: list[dict]) -> str:
        content = json.dumps(messages, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

    def get(self, messages: list[dict]) -> Optional[LLMResponse]:
        key = self._cache_key(messages)
        if key in self.cache:
            response, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                return response
            del self.cache[key]
        return None

    def set(self, messages: list[dict], response: LLMResponse) -> None:
        key = self._cache_key(messages)
        self.cache[key] = (response, time.time())

The `_cache_key` hashes messages with SHA-256. Identical requests produce the same key. The `get` method checks both existence and TTL. Entries older than `ttl_seconds` are deleted and treated as a miss.

Frequently Asked Questions

Can I use this toolkit with async/await?

The current version uses synchronous urllib.request. For async, replace the HTTP calls with aiohttp or httpx.AsyncClient. The structure stays identical — change chat() to async def chat() and urlopen to await client.post(). The abstract base class works the same way with async methods.

How accurate is the 4-characters-per-token estimate?

For English text, it’s within 15-20% of actual counts. OpenAI’s GPT models average about 4 characters per token. Code, non-English text, and special characters differ significantly. For exact counts, use tiktoken (OpenAI) or Anthropic’s counting API. The estimate works for budgeting, not for context-window checks.

How do I add a new provider like Mistral or Cohere?

Write a class inheriting from BaseLLMProvider. Implement chat() and chat_stream(). Add it to PROVIDER_REGISTRY and its models to PRICING. Three changes. Every feature — retry, fallback, cost tracking — works automatically because they operate on the shared interface.

Does the fallback chain add latency?

Only on failure. If the first provider succeeds, zero overhead. If it fails after retries, those delays accumulate before the chain tries the next provider. Use short timeouts (5-10 seconds) on individual requests so fallback kicks in fast.

Why urllib instead of the requests library?

Zero dependencies. Every import comes from Python’s standard library. You can copy this into any project without installing anything. For production, switch to httpx for connection pooling and HTTP/2 support.

References

  1. OpenAI API Reference — Chat Completions. Link
  2. Anthropic API Reference — Messages. Link
  3. Google Gemini API — Generate Content. Link
  4. Ollama API Reference. Link
  5. Exponential Backoff and Jitter — AWS Architecture Blog. Link
  6. Circuit Breaker Pattern — Microsoft Cloud Design Patterns. Link
  7. Python urllib.request Documentation. Link
  8. LiteLLM — Multi-Provider LLM Gateway. Link

Meta description: Build a multi-provider LLM toolkit in Python — unified interface for OpenAI, Claude, Gemini, and Ollama with cost tracking, retry, fallback, and streaming.

[SCHEMA HINTS]
– Article type: TechArticle / Project Walkthrough
– Primary technology: Python 3.10+ (stdlib only)
– Programming language: Python
– Difficulty: Advanced
– Keywords: multi-provider LLM toolkit, LLM client Python, unified LLM interface, token counting, cost tracking, retry fallback, structured output

Free Course
Master Core Python — Your First Step into AI/ML

Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.

Start Free Course
Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Free Callback - Limited Slots
Not Sure Which Course to Start With?
Talk to our AI Counsellors and Practitioners. We'll help you clear all your questions for your background and goals, bridging the gap between your current skills and a career in AI.
10-digit mobile number
📞
Thank You!
We'll Call You Soon!
Our learning advisor will reach out within 24 hours.
(Check your inbox too — we've sent a confirmation)
⚡ Before you go

Python.
SQL. NumPy.
All free.

Get the exact 10-course programming foundation that Data Science professionals use.

🐍
Core Python — from first line to expert level
📈
NumPy & Pandas — the #1 libraries every DS job needs
🗃️
SQL Levels I–III — basics to Window Functions
📄
Real industry data — Jupyter notebooks included
R A M S K
57,000+ students
★★★★★ Rated 4.9/5
⚡ Before you go
Python. SQL.
All Free.
R A M S K
57,000+ students  ★★★★★ 4.9/5
Get Free Access Now
10 courses. Real projects. Zero cost. No credit card.
New learners enrolling right now
🔒 100% free ☕ No spam, ever ✓ Instant access
🚀
You're in!
Check your inbox for your access link.
(Check Promotions or Spam if you don't see it)
Or start your first course right now:
Start Free Course →
Scroll to Top
Scroll to Top
Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science