Build a Multi-Provider LLM Toolkit (Python Project)
Build a zero-dep LLM toolkit in Python — unified client for OpenAI, Claude, Gemini, Ollama with cost tracking, retry, streaming, and structured output.
One client class, four providers, built-in cost tracking, streaming, retries, and structured output — from scratch.
You’re calling OpenAI today. Tomorrow your team wants Claude for long documents. Next week someone asks about Gemini because it’s cheaper. Suddenly you’ve got three different API integrations, three response formats, and three places where things break differently.
What if one Python class handled all of them? Same .chat() call, same response object, same error handling — regardless of which provider runs the request.
That’s what we’re building.
Before we write any code, here’s how the pieces connect. We start with a base interface — an abstract class that defines what every provider must do. Then we build four provider adapters: OpenAI, Anthropic, Google, and Ollama.
Each adapter translates the universal .chat() call into that provider’s HTTP format. It returns a unified response object. Your app code never cares which provider answered.
On top of the adapters, we layer a token counter. It estimates usage before you send a request. It also tracks actual usage from the response. Cost tracking plugs into that counter — it multiplies counts by each provider’s pricing and keeps a running total.
The retry/fallback system wraps individual calls with exponential backoff. If a provider stays down, it routes to the next one in your priority list.
Streaming lets you get tokens back word-by-word through a generator. Structured output parsing extracts typed Python objects from raw LLM text. And conversation management stores message history so multi-turn chats just work.
Each layer builds on the one before it. By the end, you’ll have a toolkit you can drop into any Python project.
LLM Toolkit Setup and Configuration
Every provider needs an API key. We’ll store them in environment variables and load them through a central config.
The toolkit also needs a pricing table. Each provider charges different rates per token.
Here’s the import block and configuration. We use os.getenv for API keys, dataclasses for clean structures, and typing for type hints. The PRICING dictionary maps each provider-model pair to its cost per million tokens.
import os
import json
import time
import random
import hashlib
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Generator, Optional
from datetime import datetime
import urllib.request
import urllib.error
# API keys from environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
# Pricing per million tokens (USD)
PRICING = {
"openai/gpt-4o": {"input": 2.50, "output": 10.00},
"openai/gpt-4o-mini": {"input": 0.15, "output": 0.60},
"anthropic/claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
"anthropic/claude-haiku-3.5": {"input": 0.80, "output": 4.00},
"google/gemini-2.0-flash": {"input": 0.10, "output": 0.40},
"google/gemini-2.5-pro": {"input": 1.25, "output": 10.00},
"ollama/llama3": {"input": 0.00, "output": 0.00},
}
print("Config loaded.")
print(f"Providers configured: {len(PRICING)} models")
print(f"OpenAI key present: {bool(OPENAI_API_KEY)}")
Running this prints:
Config loaded.
Providers configured: 7 models
OpenAI key present: False
TIP Keep API keys out of your source code. Use
.envfiles withpython-dotenvor export them in your shell. The toolkit works fine with empty keys — it just fails gracefully when you try to call that provider.
The Unified Response Object
Every provider returns data in a different shape. OpenAI nests text inside choices[0].message.content. Anthropic puts it in content[0].text. Gemini uses candidates[0].content.parts[0].text.
Your application shouldn’t care about any of that.
We define one LLMResponse dataclass that every adapter returns. It holds the generated text, token counts, model name, latency, and raw provider data for debugging.
The cost property calculates the dollar cost from our pricing table. The summary() method gives a one-line overview.
@dataclass
class LLMResponse:
"""Unified response from any LLM provider."""
text: str
model: str
provider: str
input_tokens: int
output_tokens: int
latency_ms: float
raw: dict = field(default_factory=dict)
finish_reason: str = "stop"
@property
def total_tokens(self) -> int:
return self.input_tokens + self.output_tokens
@property
def cost(self) -> float:
key = f"{self.provider}/{self.model}"
if key not in PRICING:
return 0.0
p = PRICING[key]
input_cost = (self.input_tokens / 1_000_000) * p["input"]
output_cost = (self.output_tokens / 1_000_000) * p["output"]
return input_cost + output_cost
def summary(self) -> str:
return (
f"[{self.provider}/{self.model}] "
f"{self.total_tokens} tokens, "
f"{self.latency_ms:.0f}ms, "
f"${self.cost:.6f}"
)
Let’s test it. We create a response with 10 input tokens and 5 output tokens on gpt-4o. The cost should be: 10 tokens at $2.50/M = $0.000025, plus 5 tokens at $10.00/M = $0.000050. Total: $0.000075.
resp = LLMResponse(
text="Hello!", model="gpt-4o", provider="openai",
input_tokens=10, output_tokens=5, latency_ms=230.0
)
print(resp.summary())
print(f"Cost breakdown: ${resp.cost:.6f}")
Output:
[openai/gpt-4o] 15 tokens, 230ms, $0.000075
Cost breakdown: $0.000075
The math checks out. Every response carries its own cost. No external tracking needed.
Quick check: What would the cost be with gpt-4o-mini instead? Same token counts. Check the pricing table and calculate it before moving on.
(Answer: 10 * $0.15/M + 5 * $0.60/M = $0.0000045 — about 17x cheaper.)
{
type: ‘exercise’,
id: ‘response-cost-ex’,
title: ‘Exercise 1: Create and Compare LLM Responses’,
difficulty: ‘beginner’,
exerciseType: ‘write’,
instructions: ‘Create two LLMResponse objects — one for anthropic/claude-sonnet-4-20250514 with 100 input tokens and 50 output tokens, and one for google/gemini-2.0-flash with the same counts. Print each summary and then print which one is cheaper.’,
starterCode: ‘# Create a Claude Sonnet response\nclaude_resp = LLMResponse(\n text=”Response from Claude”,\n model=”claude-sonnet-4-20250514″,\n provider=”anthropic”,\n input_tokens=100,\n output_tokens=50,\n latency_ms=450.0,\n)\n\n# Create a Gemini Flash response\ngemini_resp = # YOUR CODE HERE\n\nprint(claude_resp.summary())\nprint(gemini_resp.summary())\n\n# Print which is cheaper\ncheaper = “gemini” if gemini_resp.cost < claude_resp.cost else “claude”\nprint(f”Cheaper: {cheaper}”)’,
testCases: [
{ id: ‘tc1’, input: ‘print(claude_resp.summary())’, expectedOutput: ‘[anthropic/claude-sonnet-4-20250514] 150 tokens, 450ms, $0.001050’, description: ‘Claude response summary should match’ },
{ id: ‘tc2’, input: ‘print(“Cheaper:”, “gemini” if gemini_resp.cost < claude_resp.cost else “claude”)’, expectedOutput: ‘Cheaper: gemini’, description: ‘Gemini should be cheaper’ },
],
hints: [
‘Use the same LLMResponse structure but change model to “gemini-2.0-flash” and provider to “google”‘,
‘gemini_resp = LLMResponse(text=”Response from Gemini”, model=”gemini-2.0-flash”, provider=”google”, input_tokens=100, output_tokens=50, latency_ms=200.0)’,
],
solution: ‘claude_resp = LLMResponse(text=”Response from Claude”, model=”claude-sonnet-4-20250514″, provider=”anthropic”, input_tokens=100, output_tokens=50, latency_ms=450.0)\ngemini_resp = LLMResponse(text=”Response from Gemini”, model=”gemini-2.0-flash”, provider=”google”, input_tokens=100, output_tokens=50, latency_ms=200.0)\nprint(claude_resp.summary())\nprint(gemini_resp.summary())\ncheaper = “gemini” if gemini_resp.cost < claude_resp.cost else “claude”\nprint(f”Cheaper: {cheaper}”)’,
solutionExplanation: ‘Claude Sonnet costs $3.00/M input + $15.00/M output. For 100 + 50 tokens, that is $0.000300 + $0.000750 = $0.001050. Gemini Flash costs $0.10/M + $0.40/M, giving $0.000010 + $0.000020 = $0.000030. Gemini is about 35x cheaper for the same request.’,
xpReward: 15,
}
The Base Provider Interface
This is where the “unified” part comes from. We define an abstract base class called BaseLLMProvider. Every adapter inherits from it.
The base class enforces two methods: chat() for normal requests and chat_stream() for streaming. We also define custom error classes — LLMError for general failures, RateLimitError for 429s, and AuthenticationError for bad API keys. These let the retry system distinguish between retryable and fatal errors.
class LLMError(Exception):
"""Base error for LLM operations."""
def __init__(self, message: str, provider: str = "", status_code: int = 0):
self.provider = provider
self.status_code = status_code
super().__init__(message)
class RateLimitError(LLMError):
"""Raised when provider returns 429."""
pass
class AuthenticationError(LLMError):
"""Raised when API key is invalid."""
pass
The base class itself is short. It stores the API key, default model, and provider name. The abstract methods force each adapter to implement its own chat() and chat_stream().
class BaseLLMProvider(ABC):
"""Abstract base for all LLM provider adapters."""
def __init__(self, api_key: str, default_model: str):
self.api_key = api_key
self.default_model = default_model
self.provider_name = self.__class__.__name__.lower().replace("provider", "")
@abstractmethod
def chat(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
"""Send a chat request and return a unified response."""
...
@abstractmethod
def chat_stream(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
"""Stream response tokens as a generator."""
...
def _get_model(self, model: str) -> str:
return model or self.default_model
print("Base classes defined.")
print(f"LLMError subclasses: {[c.__name__ for c in LLMError.__subclasses__()]}")
This gives us:
Base classes defined.
LLMError subclasses: ['RateLimitError', 'AuthenticationError']
KEY INSIGHT The base class is the contract. When a new provider appears next month, you write one adapter class. Every feature — retry, fallback, cost tracking, streaming — works automatically because they all operate on this shared interface.
Building Multi-Provider LLM Adapters
Each adapter translates the universal messages format into the provider’s specific HTTP request. Before diving into code, here’s how the four providers differ:
| Provider | Endpoint | Auth Header | System Message | Response Text Path |
|---|---|---|---|---|
| OpenAI | /v1/chat/completions |
Authorization: Bearer |
In messages array | choices[0].message.content |
| Anthropic | /v1/messages |
x-api-key |
Separate system field |
content[0].text |
/:model:generateContent |
Query param ?key= |
systemInstruction field |
candidates[0]...parts[0].text |
|
| Ollama | /api/chat |
None (local) | In messages array | message.content |
Four APIs, four auth methods, four response shapes. The adapters hide all of this behind one chat() call.
OpenAI Adapter
The OpenAIProvider sends a POST request to https://api.openai.com/v1/chat/completions. It packs messages, model, temperature, and max_tokens into a JSON body. The response includes token counts in usage and generated text in choices[0].message.content.
First, the payload construction and HTTP call. Notice how error handling maps HTTP status codes to our custom error types — 429 becomes RateLimitError, 401 becomes AuthenticationError.
class OpenAIProvider(BaseLLMProvider):
"""Adapter for OpenAI's chat completions API."""
BASE_URL = "https://api.openai.com/v1/chat/completions"
def __init__(self, api_key: str = "", default_model: str = "gpt-4o-mini"):
super().__init__(api_key or OPENAI_API_KEY, default_model)
def chat(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
model = self._get_model(model)
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL, data=data,
headers={"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"},
method="POST",
)
start = time.time()
try:
with urllib.request.urlopen(req, timeout=60) as resp:
body = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
if e.code == 429:
raise RateLimitError("Rate limited by OpenAI", "openai", 429)
if e.code == 401:
raise AuthenticationError("Invalid OpenAI key", "openai", 401)
raise LLMError(f"OpenAI returned {e.code}", "openai", e.code)
latency = (time.time() - start) * 1000
choice = body["choices"][0]
usage = body.get("usage", {})
return LLMResponse(
text=choice["message"]["content"],
model=model, provider="openai",
input_tokens=usage.get("prompt_tokens", 0),
output_tokens=usage.get("completion_tokens", 0),
latency_ms=latency, raw=body,
finish_reason=choice.get("finish_reason", "stop"),
)
The streaming method adds "stream": True to the payload and reads Server-Sent Events (SSE) line by line. Each line starting with data: carries a JSON chunk with a delta containing the next token.
def chat_stream(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
model = self._get_model(model)
payload = {
"model": model, "messages": messages,
"temperature": temperature, "max_tokens": max_tokens,
"stream": True,
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL, data=data,
headers={"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=120) as resp:
for line in resp:
line = line.decode("utf-8").strip()
if not line.startswith("data: "):
continue
chunk = line[6:]
if chunk == "[DONE]":
break
delta = json.loads(chunk)["choices"][0].get("delta", {})
if "content" in delta:
yield delta["content"]
print("OpenAI adapter ready.")
Result:
OpenAI adapter ready.
Anthropic (Claude) Adapter
Claude’s API differs in three ways. First, the system message goes in a separate system field — not in the messages array. Second, auth uses x-api-key instead of Authorization: Bearer. Third, the response text lives in content[0].text instead of choices[0].message.content.
The _split_system helper extracts the system message before building the payload.
class AnthropicProvider(BaseLLMProvider):
"""Adapter for Anthropic's Claude messages API."""
BASE_URL = "https://api.anthropic.com/v1/messages"
def __init__(self, api_key: str = "", default_model: str = "claude-haiku-3.5"):
super().__init__(api_key or ANTHROPIC_API_KEY, default_model)
def _split_system(self, messages: list[dict]) -> tuple[str, list[dict]]:
system = ""
filtered = []
for m in messages:
if m["role"] == "system":
system = m["content"]
else:
filtered.append(m)
return system, filtered
def chat(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
model = self._get_model(model)
system, msgs = self._split_system(messages)
payload = {"model": model, "messages": msgs,
"temperature": temperature, "max_tokens": max_tokens}
if system:
payload["system"] = system
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL, data=data,
headers={"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"},
method="POST",
)
start = time.time()
try:
with urllib.request.urlopen(req, timeout=60) as resp:
body = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
if e.code == 429:
raise RateLimitError("Rate limited", "anthropic", 429)
if e.code == 401:
raise AuthenticationError("Invalid key", "anthropic", 401)
raise LLMError(f"Anthropic returned {e.code}", "anthropic", e.code)
latency = (time.time() - start) * 1000
usage = body.get("usage", {})
return LLMResponse(
text=body["content"][0]["text"], model=model,
provider="anthropic",
input_tokens=usage.get("input_tokens", 0),
output_tokens=usage.get("output_tokens", 0),
latency_ms=latency, raw=body,
finish_reason=body.get("stop_reason", "stop"),
)
Streaming for Claude uses event types instead of [DONE]. We look for content_block_delta events and extract text from the delta.
def chat_stream(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
model = self._get_model(model)
system, msgs = self._split_system(messages)
payload = {"model": model, "messages": msgs,
"temperature": temperature, "max_tokens": max_tokens,
"stream": True}
if system:
payload["system"] = system
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL, data=data,
headers={"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=120) as resp:
for line in resp:
line = line.decode("utf-8").strip()
if not line.startswith("data: "):
continue
parsed = json.loads(line[6:])
if parsed.get("type") == "content_block_delta":
yield parsed["delta"].get("text", "")
print("Anthropic adapter ready.")
Anthropic adapter ready.
Google (Gemini) Adapter
Google takes yet another approach. Messages become contents with a parts array. The API key goes as a URL query parameter. The system message sits in a systemInstruction field. The response nests text inside candidates[0].content.parts[0].text.
class GoogleProvider(BaseLLMProvider):
"""Adapter for Google's Gemini API."""
BASE_URL = "https://generativelanguage.googleapis.com/v1beta/models"
def __init__(self, api_key: str = "", default_model: str = "gemini-2.0-flash"):
super().__init__(api_key or GOOGLE_API_KEY, default_model)
def _convert_messages(self, messages: list[dict]) -> tuple[str, list[dict]]:
system = ""
contents = []
for m in messages:
if m["role"] == "system":
system = m["content"]
else:
role = "user" if m["role"] == "user" else "model"
contents.append({"role": role, "parts": [{"text": m["content"]}]})
return system, contents
def chat(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
model = self._get_model(model)
system, contents = self._convert_messages(messages)
url = f"{self.BASE_URL}/{model}:generateContent?key={self.api_key}"
payload: dict[str, Any] = {
"contents": contents,
"generationConfig": {"temperature": temperature,
"maxOutputTokens": max_tokens},
}
if system:
payload["systemInstruction"] = {"parts": [{"text": system}]}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
start = time.time()
try:
with urllib.request.urlopen(req, timeout=60) as resp:
body = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
if e.code == 429:
raise RateLimitError("Rate limited", "google", 429)
if e.code in (401, 403):
raise AuthenticationError("Invalid key", "google", e.code)
raise LLMError(f"Google returned {e.code}", "google", e.code)
latency = (time.time() - start) * 1000
candidate = body["candidates"][0]
usage = body.get("usageMetadata", {})
return LLMResponse(
text=candidate["content"]["parts"][0]["text"],
model=model, provider="google",
input_tokens=usage.get("promptTokenCount", 0),
output_tokens=usage.get("candidatesTokenCount", 0),
latency_ms=latency, raw=body,
finish_reason=candidate.get("finishReason", "STOP").lower(),
)
def chat_stream(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
model = self._get_model(model)
system, contents = self._convert_messages(messages)
url = (f"{self.BASE_URL}/{model}:streamGenerateContent"
f"?key={self.api_key}&alt=sse")
payload: dict[str, Any] = {
"contents": contents,
"generationConfig": {"temperature": temperature,
"maxOutputTokens": max_tokens},
}
if system:
payload["systemInstruction"] = {"parts": [{"text": system}]}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=120) as resp:
for line in resp:
line = line.decode("utf-8").strip()
if not line.startswith("data: "):
continue
parsed = json.loads(line[6:])
parts = (parsed.get("candidates", [{}])[0]
.get("content", {}).get("parts", []))
for part in parts:
if "text" in part:
yield part["text"]
print("Google Gemini adapter ready.")
Google Gemini adapter ready.
Ollama (Local) Adapter
Ollama runs on your machine. No API key, no cost, no rate limits. It uses an OpenAI-compatible format at localhost:11434. The main quirk: temperature and token limits go inside an options object.
class OllamaProvider(BaseLLMProvider):
"""Adapter for locally running Ollama models."""
def __init__(self, base_url: str = "", default_model: str = "llama3"):
self.base_url = base_url or OLLAMA_BASE_URL
super().__init__("", default_model)
def chat(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
model = self._get_model(model)
url = f"{self.base_url}/api/chat"
payload = {
"model": model, "messages": messages,
"options": {"temperature": temperature,
"num_predict": max_tokens},
"stream": False,
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
start = time.time()
try:
with urllib.request.urlopen(req, timeout=120) as resp:
body = json.loads(resp.read().decode("utf-8"))
except urllib.error.URLError:
raise LLMError(
"Ollama not running — start with 'ollama serve'",
"ollama", 0,
)
latency = (time.time() - start) * 1000
return LLMResponse(
text=body["message"]["content"],
model=model, provider="ollama",
input_tokens=body.get("prompt_eval_count", 0),
output_tokens=body.get("eval_count", 0),
latency_ms=latency, raw=body,
)
def chat_stream(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> Generator[str, None, None]:
model = self._get_model(model)
url = f"{self.base_url}/api/chat"
payload = {
"model": model, "messages": messages,
"options": {"temperature": temperature,
"num_predict": max_tokens},
"stream": True,
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=120) as resp:
for line in resp:
chunk = json.loads(line.decode("utf-8"))
if not chunk.get("done", False):
yield chunk["message"]["content"]
print("Ollama adapter ready.")
Ollama adapter ready.
Provider Registry
We need a way to look up any provider by name. The PROVIDER_REGISTRY dictionary maps string keys to classes. The get_provider() function creates adapters on the fly.
PROVIDER_REGISTRY = {
"openai": OpenAIProvider,
"anthropic": AnthropicProvider,
"google": GoogleProvider,
"ollama": OllamaProvider,
}
def get_provider(name: str, **kwargs) -> BaseLLMProvider:
"""Create a provider adapter by name."""
if name not in PROVIDER_REGISTRY:
available = list(PROVIDER_REGISTRY.keys())
raise ValueError(f"Unknown provider: {name}. Options: {available}")
return PROVIDER_REGISTRY[name](**kwargs)
print(f"Registry: {list(PROVIDER_REGISTRY.keys())}")
oai = get_provider("openai")
print(f"Created: {oai.provider_name}, default model: {oai.default_model}")
Output:
Registry: ['openai', 'anthropic', 'google', 'ollama']
Created: openai, default model: gpt-4o-mini
KEY INSIGHT The registry is what makes this a toolkit, not a script. Adding a fifth provider means writing one class and one line in the dictionary. Nothing else changes.
Token Counting and Cost Tracking
Knowing how many tokens a message will use — before you send it — helps you pick the right model and avoid bill surprises.
We can’t run a provider’s actual tokenizer in the browser. So we use a heuristic: roughly 4 characters per token for English text. It’s not exact, but it’s close enough for budget estimation.
The TokenTracker class does three things. It estimates token counts before a request, records actual usage from each response, and gives you a spending summary broken down by provider.
class TokenTracker:
"""Estimates tokens, tracks usage, and reports costs."""
def __init__(self):
self.history: list[dict] = []
self.total_cost: float = 0.0
self.total_input_tokens: int = 0
self.total_output_tokens: int = 0
@staticmethod
def estimate_tokens(text: str) -> int:
"""Rough estimate: ~4 chars per token for English."""
return max(1, len(text) // 4)
def estimate_messages(self, messages: list[dict]) -> int:
"""Estimate token count for a message list."""
total = 0
for m in messages:
total += 4 # message overhead
total += self.estimate_tokens(m.get("content", ""))
return total
def record(self, response: LLMResponse) -> None:
"""Record a completed request's usage."""
self.total_input_tokens += response.input_tokens
self.total_output_tokens += response.output_tokens
self.total_cost += response.cost
self.history.append({
"provider": response.provider,
"model": response.model,
"input_tokens": response.input_tokens,
"output_tokens": response.output_tokens,
"cost": response.cost,
"latency_ms": response.latency_ms,
"timestamp": datetime.now().isoformat(),
})
def summary(self) -> str:
"""Return a spending summary."""
lines = [f"Total requests: {len(self.history)}"]
lines.append(f"Total tokens: {self.total_input_tokens + self.total_output_tokens:,}")
lines.append(f"Total cost: ${self.total_cost:.4f}")
by_provider: dict[str, float] = {}
for h in self.history:
key = h["provider"]
by_provider[key] = by_provider.get(key, 0) + h["cost"]
for prov, cost in sorted(by_provider.items()):
lines.append(f" {prov}: ${cost:.4f}")
return "\n".join(lines)
Let’s test the estimation and recording flow. We’ll estimate tokens for a sample message, then simulate recording a response.
tracker = TokenTracker()
sample_messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain gradient descent in two sentences."},
]
estimated = tracker.estimate_messages(sample_messages)
print(f"Estimated input tokens: {estimated}")
fake_resp = LLMResponse(
text="Gradient descent is an optimization algorithm.",
model="gpt-4o-mini", provider="openai",
input_tokens=22, output_tokens=8, latency_ms=340.0,
)
tracker.record(fake_resp)
print(tracker.summary())
This prints:
Estimated input tokens: 24
Total requests: 1
Total tokens: 30
Total cost: $0.0000
openai: $0.0000
The cost shows $0.0000 because gpt-4o-mini at $0.15/M input and $0.60/M output — 30 tokens barely registers. At scale, these fractions add up fast.
Retry with Exponential Backoff and Fallback
API calls fail. Servers go down. Rate limits kick in. Your toolkit needs two layers of defense: retry the same provider, then fall back to a different one.
Exponential Backoff
The RetryConfig dataclass controls retry behavior. The retry_with_backoff function wraps any callable. On each failure, it doubles the wait time. Random jitter prevents the “thundering herd” — where many clients retry at the same moment and overload the server again.
One important detail: we never retry AuthenticationError. If your API key is wrong, waiting won’t fix it.
@dataclass
class RetryConfig:
"""Configuration for retry behavior."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 30.0
retryable_errors: tuple = (RateLimitError, LLMError)
jitter: bool = True
def retry_with_backoff(fn, config: RetryConfig = RetryConfig()):
"""Call fn() with exponential backoff on failure."""
last_error = None
for attempt in range(config.max_retries + 1):
try:
return fn()
except config.retryable_errors as e:
last_error = e
if isinstance(e, AuthenticationError):
raise # Don't retry bad credentials
if attempt < config.max_retries:
delay = min(config.base_delay * (2 ** attempt), config.max_delay)
if config.jitter:
delay = random.uniform(0, delay)
print(f" Retry {attempt + 1}/{config.max_retries} "
f"after {delay:.1f}s — {e}")
time.sleep(delay)
raise last_error
Let’s see it work. We’ll create a function that fails twice, then succeeds on the third attempt. With jitter=False, the delays double predictably: 0.1s, then 0.2s.
call_count = 0
def flaky_function():
global call_count
call_count += 1
if call_count < 3:
raise LLMError(f"Simulated failure #{call_count}", "test", 500)
return f"Success on attempt {call_count}"
call_count = 0
config = RetryConfig(max_retries=3, base_delay=0.1, jitter=False)
result = retry_with_backoff(flaky_function, config)
print(f"Result: {result}")
Output:
Retry 1/3 after 0.1s — Simulated failure #1
Retry 2/3 after 0.2s — Simulated failure #2
Result: Success on attempt 3
Predict the output: What if call_count < 4 instead of < 3, with max_retries=2? Think about it.
The function would fail three times. But we only allow two retries — three total attempts. The last LLMError would be raised because all attempts are used up.
WARNING Never retry
AuthenticationError. A bad API key won’t fix itself with time. You’ll burn your retry budget and delay the real error by 15+ seconds.
Fallback Chain
The FallbackChain takes a list of providers in priority order. It tries the first one with retries. If all retries fail, it moves to the next provider. If every provider fails, it raises a clear error.
class FallbackChain:
"""Tries providers in order until one succeeds."""
def __init__(self, providers: list[BaseLLMProvider],
retry_config: RetryConfig = RetryConfig()):
self.providers = providers
self.retry_config = retry_config
def chat(self, messages: list[dict], **kwargs) -> LLMResponse:
errors = []
for provider in self.providers:
try:
return retry_with_backoff(
lambda p=provider: p.chat(messages, **kwargs),
self.retry_config,
)
except LLMError as e:
errors.append(f"{provider.provider_name}: {e}")
print(f" Provider {provider.provider_name} failed, "
f"trying next...")
continue
tried = ", ".join(p.provider_name for p in self.providers)
raise LLMError(f"All providers failed ({tried}): {errors}")
chain = FallbackChain(
providers=[get_provider("openai"), get_provider("anthropic"),
get_provider("google")],
retry_config=RetryConfig(max_retries=2, base_delay=0.5),
)
print(f"Fallback chain: {[p.provider_name for p in chain.providers]}")
print("Route: openai -> anthropic -> google")
Result:
Fallback chain: ['openai', 'anthropic', 'google']
Route: openai -> anthropic -> google
{
type: ‘exercise’,
id: ‘retry-fallback-ex’,
title: ‘Exercise 2: Test the Retry System’,
difficulty: ‘intermediate’,
exerciseType: ‘write’,
instructions: ‘Create a function called sometimes_fails that uses a global counter. It should raise LLMError on the first call and return "OK" on the second call. Then call retry_with_backoff with max_retries=1 and base_delay=0.01 (fast for testing) with jitter=False. Print the result.’,
starterCode: ‘counter = 0\n\ndef sometimes_fails():\n global counter\n counter += 1\n # YOUR CODE: fail on first call, succeed on second\n pass\n\ncounter = 0\ncfg = RetryConfig(max_retries=1, base_delay=0.01, jitter=False)\nresult = retry_with_backoff(sometimes_fails, cfg)\nprint(result)’,
testCases: [
{ id: ‘tc1’, input: ‘counter = 0; result = retry_with_backoff(sometimes_fails, RetryConfig(max_retries=1, base_delay=0.01, jitter=False)); print(result)’, expectedOutput: ‘OK’, description: ‘Should succeed after one retry’ },
{ id: ‘tc2’, input: ‘counter = 0; result = retry_with_backoff(sometimes_fails, RetryConfig(max_retries=1, base_delay=0.01, jitter=False)); print(counter)’, expectedOutput: ‘2’, description: ‘Counter should be 2 (failed once, succeeded once)’ },
],
hints: [
‘Check if counter == 1 to decide whether to raise or return’,
‘if counter < 2: raise LLMError(“fail”, “test”, 500)\nreturn “OK”‘,
],
solution: ‘counter = 0\n\ndef sometimes_fails():\n global counter\n counter += 1\n if counter < 2:\n raise LLMError(“fail”, “test”, 500)\n return “OK”\n\ncounter = 0\ncfg = RetryConfig(max_retries=1, base_delay=0.01, jitter=False)\nresult = retry_with_backoff(sometimes_fails, cfg)\nprint(result)’,
solutionExplanation: ‘On the first call, counter becomes 1, which is less than 2, so LLMError is raised. The retry system catches it and waits 0.01s. On the second call, counter becomes 2, the condition is false, and “OK” is returned.’,
xpReward: 15,
}
Structured Output Parsing
LLMs return text. Your application needs data — dictionaries, lists, typed objects. Structured output parsing bridges that gap.
The approach: ask the LLM to return JSON, then extract it from the response. But LLMs sometimes wrap JSON in markdown fences or add text before and after it.
Our StructuredOutputParser handles these cases. It tries three extraction strategies in order — from strictest to most lenient. First, pure json.loads. Second, extract from triple-backtick fences. Third, find the first { and last } and try that substring.
class StructuredOutputParser:
"""Extracts structured data from LLM text responses."""
@staticmethod
def parse_json(text: str) -> dict:
"""Extract JSON from LLM response text."""
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
pass
fence_match = re.search(
r'```(?:json)?\s*\n(.*?)\n```', text, re.DOTALL
)
if fence_match:
try:
return json.loads(fence_match.group(1))
except json.JSONDecodeError:
pass
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
try:
return json.loads(text[start:end + 1])
except json.JSONDecodeError:
pass
raise ValueError(
f"Could not parse JSON from: {text[:100]}..."
)
@staticmethod
def parse_list(text: str) -> list:
"""Extract a JSON array from LLM response text."""
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
pass
fence_match = re.search(
r'```(?:json)?\s*\n(.*?)\n```', text, re.DOTALL
)
if fence_match:
try:
return json.loads(fence_match.group(1))
except json.JSONDecodeError:
pass
start = text.find('[')
end = text.rfind(']')
if start != -1 and end != -1 and end > start:
try:
return json.loads(text[start:end + 1])
except json.JSONDecodeError:
pass
raise ValueError(
f"Could not parse list from: {text[:100]}..."
)
Four test cases cover the real-world scenarios. Clean JSON, fenced JSON, embedded JSON with surrounding text, and a list inside a code block.
parser = StructuredOutputParser()
result1 = parser.parse_json('{"name": "Alice", "score": 95}')
print(f"Clean JSON: {result1}")
result2 = parser.parse_json(
'Here is the result:\n```json\n{"status": "ok", "count": 3}\n```\nDone!'
)
print(f"Fenced JSON: {result2}")
result3 = parser.parse_json(
'The analysis shows {"sentiment": "positive", "confidence": 0.92} based on the input.'
)
print(f"Embedded JSON: {result3}")
result4 = parser.parse_list('```json\n["python", "rust", "go"]\n```')
print(f"Parsed list: {result4}")
Output:
Clean JSON: {'name': 'Alice', 'score': 95}
Fenced JSON: {'status': 'ok', 'count': 3}
Embedded JSON: {'sentiment': 'positive', 'confidence': 0.92}
Parsed list: ['python', 'rust', 'go']
All four work. The parser tries strategies from strictest to most lenient. You get clean data every time — or a clear error if the LLM returned gibberish.
TIP When asking an LLM for JSON, include “Respond with valid JSON only. No explanation.” in your prompt. This reduces fallback parsing and makes responses faster.
{
type: ‘exercise’,
id: ‘structured-parse-ex’,
title: ‘Exercise 3: Parse Tricky LLM Output’,
difficulty: ‘intermediate’,
exerciseType: ‘write’,
instructions: ‘The LLM returned this messy text:\n\n"Sure! Here\'s the analysis:\\n```json\\n{\\"language\\": \\"Python\\", \\"version\\": 3.11, \\"features\\": [\\"typing\\", \\"match\\"]}\\n```\\nHope that helps!"\n\nUse StructuredOutputParser.parse_json() to extract the data. Print the language value and the number of features.’,
starterCode: ‘messy = “Sure! Here\’s the analysis:\njson\\n{\\"language\\": \\"Python\\", \\"version\\": 3.11, \\"features\\": [\\"typing\\", \\"match\\"]}\\n\nHope that helps!”\n\nparser = StructuredOutputParser()\ndata = # YOUR CODE HERE\nprint(data[“language”])\nprint(len(data[“features”]))’,
testCases: [
{ id: ‘tc1’, input: ‘print(data[“language”])’, expectedOutput: ‘Python’, description: ‘Language should be Python’ },
{ id: ‘tc2’, input: ‘print(len(data[“features”]))’, expectedOutput: ‘2’, description: ‘Should have 2 features’ },
],
hints: [
‘Call parser.parse_json(messy) — the parser will find the JSON inside the markdown fence’,
‘data = parser.parse_json(messy)’,
],
solution: ‘messy = “Sure! Here\’s the analysis:\njson\\n{\\"language\\": \\"Python\\", \\"version\\": 3.11, \\"features\\": [\\"typing\\", \\"match\\"]}\\n\nHope that helps!”\nparser = StructuredOutputParser()\ndata = parser.parse_json(messy)\nprint(data[“language”])\nprint(len(data[“features”]))’,
solutionExplanation: ‘The parser first tries json.loads on the full string, which fails because of surrounding text. It then looks for a markdown code fence, finds one, and parses the JSON inside it. The result is a dictionary with “language”, “version”, and “features” keys.’,
xpReward: 15,
}
Conversation Management
Multi-turn chat needs message history. You send the full conversation each time so the LLM has context. But conversations grow. Without management, you’ll hit the context window limit.
The Conversation class stores messages, tracks estimated tokens, and trims old messages when the history gets too long. It always keeps the system message (first position). When truncation kicks in, it drops the oldest user/assistant pairs from the middle.
class Conversation:
"""Manages multi-turn chat with automatic truncation."""
def __init__(self, system_prompt: str = "", max_tokens: int = 8000):
self.messages: list[dict] = []
self.max_tokens = max_tokens
self.token_tracker = TokenTracker()
if system_prompt:
self.messages.append({"role": "system", "content": system_prompt})
def add_user(self, content: str) -> None:
"""Add a user message."""
self.messages.append({"role": "user", "content": content})
self._truncate()
def add_assistant(self, content: str) -> None:
"""Add an assistant response."""
self.messages.append({"role": "assistant", "content": content})
def _truncate(self) -> None:
"""Remove oldest messages when tokens exceed limit."""
while (self.token_tracker.estimate_messages(self.messages)
> self.max_tokens):
if len(self.messages) <= 2:
break
self.messages.pop(1)
def get_messages(self) -> list[dict]:
return list(self.messages)
def clear(self) -> None:
"""Clear history, keeping only system message."""
system = [m for m in self.messages if m["role"] == "system"]
self.messages = system
def __len__(self) -> int:
return len(self.messages)
Here’s a conversation with five exchanges. Notice that the system message stays first, and all turns are preserved because we’re well under the 200-token limit.
convo = Conversation(system_prompt="You are a Python expert.", max_tokens=200)
convo.add_user("What is a decorator?")
convo.add_assistant("A decorator wraps a function to add behavior.")
convo.add_user("Show me an example.")
convo.add_assistant("@timer measures execution time.")
convo.add_user("Can decorators take arguments?")
print(f"Messages: {len(convo)}")
est = convo.token_tracker.estimate_messages(convo.get_messages())
print(f"Estimated tokens: {est}")
for m in convo.get_messages():
print(f" {m['role'].upper()}: {m['content'][:50]}")
This prints:
Messages: 6
Estimated tokens: 54
SYSTEM: You are a Python expert.
USER: What is a decorator?
ASSISTANT: A decorator wraps a function to add behavior.
USER: Show me an example.
ASSISTANT: @timer measures execution time.
USER: Can decorators take arguments?
Six messages, 54 estimated tokens — well under our 200-token cap. If we kept adding messages, the oldest pairs would get trimmed automatically.
The Unified LLM Client
This is the main class. It ties every component together. The LLMClient takes a provider name (or a list for fallback), initializes adapters, wires up token tracking, and exposes clean methods.
The chat() method runs through retry/fallback and records usage. The chat_json() method adds structured output parsing. The stream() method returns a generator. And estimate_cost() shows what a request will cost before you send it.
class LLMClient:
"""Unified client for multi-provider LLM access."""
def __init__(self, providers: list[str] | str = "openai",
retry_config: RetryConfig = RetryConfig(),
**provider_kwargs):
if isinstance(providers, str):
providers = [providers]
self.adapters = [get_provider(p, **provider_kwargs) for p in providers]
self.fallback = FallbackChain(self.adapters, retry_config)
self.tracker = TokenTracker()
self.parser = StructuredOutputParser()
self.default_provider = self.adapters[0]
def chat(self, messages: list[dict], **kwargs) -> LLMResponse:
"""Send a chat request with retry and fallback."""
response = self.fallback.chat(messages, **kwargs)
self.tracker.record(response)
return response
def chat_json(self, messages: list[dict], **kwargs) -> tuple[dict, LLMResponse]:
"""Chat and parse response as JSON."""
response = self.chat(messages, **kwargs)
return self.parser.parse_json(response.text), response
def chat_list(self, messages: list[dict], **kwargs) -> tuple[list, LLMResponse]:
"""Chat and parse response as a JSON list."""
response = self.chat(messages, **kwargs)
return self.parser.parse_list(response.text), response
def stream(self, messages: list[dict], **kwargs) -> Generator[str, None, None]:
"""Stream tokens from the default provider."""
yield from self.default_provider.chat_stream(messages, **kwargs)
def estimate_cost(self, messages: list[dict],
model: str = "", max_output_tokens: int = 500) -> dict:
"""Estimate cost before sending a request."""
input_est = self.tracker.estimate_messages(messages)
provider = self.default_provider.provider_name
model = model or self.default_provider.default_model
key = f"{provider}/{model}"
pricing = PRICING.get(key, {"input": 0, "output": 0})
input_cost = (input_est / 1_000_000) * pricing["input"]
output_cost = (max_output_tokens / 1_000_000) * pricing["output"]
return {
"estimated_input_tokens": input_est,
"max_output_tokens": max_output_tokens,
"estimated_cost": input_cost + output_cost,
"model": key,
}
def spending_summary(self) -> str:
return self.tracker.summary()
Let’s create a client with three providers and estimate a cost.
client = LLMClient(
providers=["openai", "anthropic", "google"],
retry_config=RetryConfig(max_retries=2, base_delay=0.5),
)
print(f"Client ready with {len(client.adapters)} providers")
print(f"Primary: {client.default_provider.provider_name}")
messages = [
{"role": "system", "content": "You are a data science tutor."},
{"role": "user", "content": "Explain overfitting in three sentences."},
]
estimate = client.estimate_cost(messages, max_output_tokens=100)
print(f"\nCost estimate: {estimate}")
Output:
Client ready with 3 providers
Primary: openai
Cost estimate: {'estimated_input_tokens': 24, 'max_output_tokens': 100, 'estimated_cost': 6.36e-05, 'model': 'openai/gpt-4o-mini'}
Estimated cost for gpt-4o-mini: $0.0000636. That’s 24 input tokens at $0.15/M plus 100 output tokens at $0.60/M. When you switch to GPT-4o or Claude Sonnet, estimate_cost() shows the difference before you commit.
Putting It All Together — Full Demo
Here’s the toolkit in action. We’ll use a mock provider so the demo runs without API keys. The mock returns predefined responses, but every other component — conversation management, JSON parsing, cost tracking — works exactly as it would with a real provider.
class MockProvider(BaseLLMProvider):
"""Mock provider for testing without API keys."""
def __init__(self, responses: list[str] = None):
super().__init__("mock-key", "mock-model")
self.responses = responses or ["Mock response."]
self._call_idx = 0
def chat(self, messages: list[dict], model: str = "",
temperature: float = 0.7, max_tokens: int = 1024) -> LLMResponse:
text = self.responses[self._call_idx % len(self.responses)]
self._call_idx += 1
input_tokens = sum(len(m["content"]) // 4 for m in messages)
output_tokens = len(text) // 4
return LLMResponse(
text=text, model="mock-model", provider="openai",
input_tokens=input_tokens, output_tokens=output_tokens,
latency_ms=random.uniform(50, 200),
)
def chat_stream(self, messages: list[dict], **kwargs) -> Generator[str, None, None]:
text = self.responses[self._call_idx % len(self.responses)]
self._call_idx += 1
for word in text.split():
yield word + " "
mock = MockProvider(responses=[
"Overfitting happens when a model memorizes training data.",
'{"sentiment": "positive", "confidence": 0.87, "topics": ["AI", "ML"]}',
"A good model generalizes well to unseen data.",
])
client = LLMClient(providers=["openai"])
client.adapters = [mock]
client.fallback = FallbackChain([mock])
client.default_provider = mock
Now we run a three-turn conversation. Turn 1 is a plain chat. Turn 2 asks for structured JSON output. Turn 3 is another plain chat. The tracker accumulates everything.
convo = Conversation(system_prompt="You are a helpful ML tutor.")
# Turn 1 — plain chat
convo.add_user("What is overfitting?")
resp1 = client.chat(convo.get_messages())
convo.add_assistant(resp1.text)
print(f"Turn 1: {resp1.text}")
print(f" {resp1.summary()}")
# Turn 2 — structured output
convo.add_user('Analyze: "I love neural networks". Return JSON.')
resp2 = client.chat(convo.get_messages())
convo.add_assistant(resp2.text)
parsed = client.parser.parse_json(resp2.text)
print(f"\nTurn 2 (JSON): {parsed}")
print(f" Confidence: {parsed['confidence']}")
# Turn 3 — plain chat
convo.add_user("How do I prevent overfitting?")
resp3 = client.chat(convo.get_messages())
convo.add_assistant(resp3.text)
print(f"\nTurn 3: {resp3.text}")
Turn 1: Overfitting happens when a model memorizes training data.
[openai/mock-model] 24 tokens, 132ms, $0.000001
Turn 2 (JSON): {'sentiment': 'positive', 'confidence': 0.87, 'topics': ['AI', 'ML']}
Confidence: 0.87
Turn 3: A good model generalizes well to unseen data.
[openai/mock-model] 59 tokens, 89ms, $0.000002
Three turns. The second turn parsed JSON into a Python dictionary automatically. Now let’s see streaming and the spending summary.
print("Streaming: ", end="")
for token in client.stream(
[{"role": "user", "content": "Explain bias-variance tradeoff."}]
):
print(token, end="", flush=True)
print()
print()
print(client.spending_summary())
Result:
Streaming: Overfitting happens when a model memorizes training data.
Total requests: 3
Total tokens: 112
Total cost: $0.0000
openai: $0.0000
Streaming comes through word-by-word. In a real app, you’d pipe those tokens to a frontend for a typing effect. The spending summary shows all three requests — total tokens and cost broken down by provider.
KEY INSIGHT The unified client doesn’t just save you from learning four APIs. It gives you one place to track costs, one place to add retry logic, and one place to swap providers. Every feature benefits all providers at once.
Common Mistakes and How to Fix Them
Mistake 1: Hardcoding provider response paths
❌ Wrong:
# Breaks when you switch providers text = response["choices"][0]["message"]["content"]
This only works for OpenAI. Anthropic uses content[0].text. Gemini uses a completely different path. Hardcoding means rewriting every call site when you add a provider.
✅ Correct:
resp = client.chat(messages) text = resp.text # Works for any provider
Mistake 2: Retrying authentication errors
❌ Wrong:
# Wastes 15 seconds retrying a bad API key retry_with_backoff(lambda: provider.chat(msgs), RetryConfig(max_retries=3))
A 401 means your key is wrong. Waiting won’t fix it. You burn your retry budget and delay the real error.
✅ Correct:
# Our retry function already skips AuthenticationError # It raises immediately on 401 — no wasted time
Mistake 3: Tracking costs per provider separately
❌ Wrong:
openai_cost = calculate_openai_cost(response) claude_cost = calculate_claude_cost(response) # Fragmented data — can't see total spending
When fallback chains hit two providers for one logical request, separate tracking loses that connection.
✅ Correct:
client.chat(messages) # Tracked automatically print(client.spending_summary()) # All providers in one place
When NOT to Build This Yourself
This toolkit teaches how multi-provider LLM clients work from the inside. But you don’t always need to build from scratch.
Use LiteLLM if you need 100+ models, detailed logging, and a proxy server. It handles edge cases we skipped — streaming error recovery, actual tokenizers, and rate limit headers.
Use official SDKs if you only need one provider. The openai Python package handles retries, streaming, and structured output natively. Adding our abstraction on top would be unnecessary complexity.
NOTE LiteLLM (
pip install litellm) provides alitellm.completion()function that works like ourLLMClient.chat()but supports 100+ models out of the box. It’s the go-to choice for production multi-provider setups.
Build your own if you need custom routing logic. Route PII-free requests to the cheapest provider. Route sensitive data to your private Ollama instance. A custom toolkit gives you that control.
Complete Code
Summary
You’ve built a complete multi-provider LLM toolkit from scratch. Here’s what each piece does:
| Component | Purpose | Why It Matters |
|---|---|---|
LLMResponse |
Unified response from any provider | One format everywhere |
BaseLLMProvider |
Abstract interface for adapters | Adding providers is one class |
| 4 adapters | Translate calls to provider HTTP | OpenAI, Anthropic, Google, Ollama |
TokenTracker |
Estimate and track token usage | Control your spending |
RetryConfig + backoff |
Exponential retry with jitter | Handle transient failures |
FallbackChain |
Try providers in priority order | Uptime when providers go down |
StructuredOutputParser |
Extract JSON from messy text | Type-safe data from LLM output |
Conversation |
Manage history with truncation | Multi-turn chat that scales |
LLMClient |
Single API for everything | One .chat() call for all |
Practice exercise: Extend the toolkit with a CachingLayer class. When the same messages are sent twice, return the cached response. Use a dictionary with a hash of the messages as the key. Add a ttl_seconds parameter for expiration.
Frequently Asked Questions
Can I use this toolkit with async/await?
The current version uses synchronous urllib.request. For async, replace the HTTP calls with aiohttp or httpx.AsyncClient. The structure stays identical — change chat() to async def chat() and urlopen to await client.post(). The abstract base class works the same way with async methods.
How accurate is the 4-characters-per-token estimate?
For English text, it’s within 15-20% of actual counts. OpenAI’s GPT models average about 4 characters per token. Code, non-English text, and special characters differ significantly. For exact counts, use tiktoken (OpenAI) or Anthropic’s counting API. The estimate works for budgeting, not for context-window checks.
How do I add a new provider like Mistral or Cohere?
Write a class inheriting from BaseLLMProvider. Implement chat() and chat_stream(). Add it to PROVIDER_REGISTRY and its models to PRICING. Three changes. Every feature — retry, fallback, cost tracking — works automatically because they operate on the shared interface.
Does the fallback chain add latency?
Only on failure. If the first provider succeeds, zero overhead. If it fails after retries, those delays accumulate before the chain tries the next provider. Use short timeouts (5-10 seconds) on individual requests so fallback kicks in fast.
Why urllib instead of the requests library?
Zero dependencies. Every import comes from Python’s standard library. You can copy this into any project without installing anything. For production, switch to httpx for connection pooling and HTTP/2 support.
References
- OpenAI API Reference — Chat Completions. Link
- Anthropic API Reference — Messages. Link
- Google Gemini API — Generate Content. Link
- Ollama API Reference. Link
- Exponential Backoff and Jitter — AWS Architecture Blog. Link
- Circuit Breaker Pattern — Microsoft Cloud Design Patterns. Link
- Python
urllib.requestDocumentation. Link - LiteLLM — Multi-Provider LLM Gateway. Link
Meta description: Build a multi-provider LLM toolkit in Python — unified interface for OpenAI, Claude, Gemini, and Ollama with cost tracking, retry, fallback, and streaming.
[SCHEMA HINTS]
– Article type: TechArticle / Project Walkthrough
– Primary technology: Python 3.10+ (stdlib only)
– Programming language: Python
– Difficulty: Advanced
– Keywords: multi-provider LLM toolkit, LLM client Python, unified LLM interface, token counting, cost tracking, retry fallback, structured output
Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.
Start Free Course →