Menu

LLM API Retry & Fallback: Build a Resilient Client

Build a resilient LLM client in Python with retry, fallback chains, circuit breakers, and rate limiting — pure Python, runnable code, no SDKs needed.

Written by Selva Prabhakaran | 24 min read

Your LLM API call fails at 2 AM. The provider is down, your app crashes, and users see a blank screen. Here’s how to prevent that.

⚡ This post has interactive code — click ▶ Run or press Ctrl+Enter on any code block to execute it directly in your browser. The first run may take a few seconds to initialize.

You fire off an API request to your LLM provider. It times out. You retry immediately — and get rate-limited. Now two problems instead of one.

Production LLM apps need more than requests.post(). They need retry logic that backs off smartly, fallback chains that switch providers on their own, and rate limiters that stop you from hitting walls. This article builds all three from scratch — pure Python, no SDKs.

The Pipeline: What We’re Building

Before writing any code, here’s the full picture. We’re building a ResilientLLMClient in four stages. Each stage solves a specific failure mode.

Stage 1 — Retry with exponential backoff: When a request fails (timeout, 500, network glitch), wait and try again. Each retry waits longer than the last. Random jitter spreads retries apart.

Stage 2 — Provider fallback chain: If OpenAI is down, try Claude. If Claude is down, try Gemini. The client walks through a priority list until one responds.

Stage 3 — Rate limiter (token bucket): Before sending any request, check your remaining quota. If the bucket is empty, wait. This prevents 429 errors before they happen.

Stage 4 — Logging and cost tracking: Every request gets logged — provider, latency, tokens used, cost. You can’t optimize what you don’t measure.

Retry wraps each individual call. Fallback chains multiple providers. Rate limiting gates the whole pipeline. Logging observes everything.

Exponential Backoff with Jitter

When an API call fails, the worst response is retrying instantly. If the server is overloaded, you’re making things worse.

Exponential backoff doubles the wait after each attempt: 1s, 2s, 4s, 8s. But if 100 clients all back off on the same schedule, they all retry at the same moment. That’s the “thundering herd” problem.

Jitter fixes it by adding randomness. Instead of waiting exactly 4 seconds, you wait somewhere between 0 and 4. The retries spread out. The server gets breathing room.

Here’s a retry_with_backoff function that takes any callable and retries it with exponential backoff plus full jitter. It accepts max_retries, a base_delay in seconds, and a tuple of retryable_statuses — HTTP codes that trigger a retry. The core formula is delay = base * 2^attempt, capped at max_delay, with random.uniform(0, delay) for full jitter.

import micropip
await micropip.install(['requests'])


import time
import random
import json
from dataclasses import dataclass, field
from typing import Optional


class APIError(Exception):
    """Custom exception carrying HTTP status code."""
    def __init__(self, status_code, message=""):
        self.status_code = status_code
        self.message = message
        super().__init__(f"HTTP {status_code}: {message}")

The APIError class carries a status_code so retry logic can decide whether the error is retryable. A 503 gets retried. A 401 (bad credentials) does not.

def retry_with_backoff(
    func,
    max_retries=3,
    base_delay=1.0,
    max_delay=30.0,
    retryable_statuses=(429, 500, 502, 503, 504),
):
    """Retry a callable with exponential backoff + jitter."""
    last_exception = None
    for attempt in range(max_retries + 1):
        try:
            return func()
        except APIError as e:
            last_exception = e
            if e.status_code not in retryable_statuses:
                raise
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay)
            print(f"  Attempt {attempt+1} failed ({e.status_code}). "
                  f"Retrying in {jitter:.1f}s...")
            time.sleep(jitter)
    raise last_exception

The key line is jitter = random.uniform(0, delay). This is “full jitter” — the wait lands anywhere between zero and the computed delay. AWS recommends this over “equal jitter” or “decorrelated jitter” for most workloads.

Let’s test it. This simulator fails twice, then succeeds on the third call.

def simulate_flaky_api(fail_count=2):
    """Simulate an API that fails N times then succeeds."""
    call_count = 0
    def call():
        nonlocal call_count
        call_count += 1
        if call_count <= fail_count:
            raise APIError(503, "Service unavailable")
        return {"content": "Hello from the LLM!", "call": call_count}
    return call


random.seed(42)
flaky = simulate_flaky_api(fail_count=2)
result = retry_with_backoff(flaky, max_retries=3, base_delay=0.01)
print(f"Success on call #{result['call']}: {result['content']}")
python
  Attempt 1 failed (503). Retrying in 0.0s...
  Attempt 2 failed (503). Retrying in 0.0s...
Success on call #3: Hello from the LLM!

Two failures, two retries, then success. The delays are tiny because base_delay=0.01. In production, start with 1-2 seconds.

Key Insight: Exponential backoff without jitter is only half the solution. Jitter prevents synchronized retry storms across clients hitting the same provider.

Quick check: If base_delay=2 and this is attempt 3, what’s the maximum possible wait? Answer: min(2 * 2^3, max_delay) = 16 seconds (before jitter randomizes it down).

typescript
{
  type: 'exercise',
  id: 'retry-custom-status',
  title: 'Exercise 1: Custom Retryable Status Codes',
  difficulty: 'intermediate',
  exerciseType: 'write',
  instructions: 'Modify the `retry_with_backoff` function call below so it retries on status 408 (Request Timeout) and 429 (Rate Limited) but NOT on 500 or 503. The flaky API raises 408 twice then succeeds. Print the final result.',
  starterCode: 'def timeout_api():\n    timeout_api.count = getattr(timeout_api, "count", 0) + 1\n    if timeout_api.count <= 2:\n        raise APIError(408, "Request Timeout")\n    return {"status": "ok", "attempt": timeout_api.count}\n\n# Fix the retryable_statuses tuple\nresult = retry_with_backoff(\n    timeout_api,\n    max_retries=3,\n    base_delay=0.01,\n    retryable_statuses=(500, 503),  # <-- change this\n)\nprint(result["status"])\nprint(result["attempt"])',
  testCases: [
    { id: 'tc1', input: '', expectedOutput: 'ok', description: 'Should print ok after retries', hidden: false },
    { id: 'tc2', input: '', expectedOutput: '3', description: 'Should succeed on attempt 3', hidden: false },
  ],
  hints: [
    'Replace (500, 503) with a tuple that includes 408 and 429.',
    'retryable_statuses=(408, 429) — only retry on timeout and rate limit errors.',
  ],
  solution: 'def timeout_api():\n    timeout_api.count = getattr(timeout_api, "count", 0) + 1\n    if timeout_api.count <= 2:\n        raise APIError(408, "Request Timeout")\n    return {"status": "ok", "attempt": timeout_api.count}\n\nresult = retry_with_backoff(\n    timeout_api,\n    max_retries=3,\n    base_delay=0.01,\n    retryable_statuses=(408, 429),\n)\nprint(result["status"])\nprint(result["attempt"])',
  solutionExplanation: 'By setting retryable_statuses=(408, 429), the retry function catches 408 errors and retries. After 2 failures it succeeds on attempt 3. If we left (500, 503), the 408 would raise immediately without retrying.',
  xpReward: 15,
}

The Provider Fallback Chain

Retry handles transient blips — the server hiccuped but came back. What if the entire provider is down?

A fallback chain is a priority list of LLM providers. When the primary fails after all retries, the client moves to the next one. Think of a phone tree — first number doesn’t answer, try the second.

Each provider needs its own configuration: endpoint, model, headers, body format. The ProviderConfig dataclass below stores these details. Its build_headers method returns provider-specific headers, and build_body creates the JSON payload.

@dataclass
class ProviderConfig:
    """Configuration for one LLM provider."""
    name: str
    api_url: str
    model: str
    api_key: str
    max_retries: int = 3
    timeout: float = 30.0

    def build_headers(self):
        if self.name == "openai":
            return {"Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"}
        elif self.name == "anthropic":
            return {"x-api-key": self.api_key,
                    "anthropic-version": "2023-06-01",
                    "Content-Type": "application/json"}
        return {"Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"}

    def build_body(self, prompt, max_tokens=256):
        return {"model": self.model,
                "messages": [{"role": "user", "content": prompt}],
                "max_tokens": max_tokens}

OpenAI uses Authorization: Bearer. Anthropic uses x-api-key. The body format is similar here, but Gemini and Cohere differ more — you’d add branches to build_body for those.

Here’s a chain with three providers. The ordering matters — best model first, cheapest last.

providers = [
    ProviderConfig(
        name="openai",
        api_url="https://api.openai.com/v1/chat/completions",
        model="gpt-4o", api_key="sk-demo"),
    ProviderConfig(
        name="anthropic",
        api_url="https://api.anthropic.com/v1/messages",
        model="claude-sonnet-4-20250514", api_key="sk-ant-demo"),
    ProviderConfig(
        name="openai",
        api_url="https://api.openai.com/v1/chat/completions",
        model="gpt-4o-mini", api_key="sk-demo"),
]

print(f"Fallback chain: {' -> '.join(p.model for p in providers)}")
python
Fallback chain: gpt-4o -> claude-sonnet-4-20250514 -> gpt-4o-mini

GPT-4o is the primary. If it’s down, Claude takes over. If Claude fails too, GPT-4o-mini is the safety net — cheaper, faster, and often on separate infrastructure.

The Circuit Breaker

Here’s a scenario that wastes time. OpenAI has been down for 10 minutes. Your client still tries it first on every request — retrying 3 times, waiting 15 seconds total — before falling back. Every single request pays that penalty.

A circuit breaker fixes this. It tracks failures per provider. After N consecutive failures, the breaker “opens” and skips that provider entirely. After a cooldown, it lets one test request through. Success closes the breaker. Failure reopens it.

Three states: CLOSED (healthy), OPEN (skip this provider), HALF-OPEN (testing recovery).

@dataclass
class CircuitBreaker:
    """Track provider health, skip broken providers."""
    failure_threshold: int = 3
    cooldown_seconds: float = 60.0
    failure_count: int = 0
    last_failure_time: float = 0.0
    state: str = "CLOSED"

    def can_execute(self):
        if self.state == "CLOSED":
            return True
        if self.state == "OPEN":
            elapsed = time.time() - self.last_failure_time
            if elapsed >= self.cooldown_seconds:
                self.state = "HALF_OPEN"
                return True
            return False
        return True  # HALF_OPEN: allow test request

    def record_success(self):
        self.failure_count = 0
        self.state = "CLOSED"

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"
            print(f"  Circuit OPEN — skipping for "
                  f"{self.cooldown_seconds}s")

Watch the breaker trip after 3 consecutive failures.

breaker = CircuitBreaker(failure_threshold=3, cooldown_seconds=5.0)

for i in range(5):
    if breaker.can_execute():
        print(f"Request {i+1}: Allowed (state={breaker.state})")
        breaker.record_failure()
    else:
        print(f"Request {i+1}: BLOCKED (state={breaker.state})")
python
Request 1: Allowed (state=CLOSED)
Request 2: Allowed (state=CLOSED)
Request 3: Allowed (state=CLOSED)
  Circuit OPEN — skipping for 5.0s
Request 4: BLOCKED (state=OPEN)
Request 5: BLOCKED (state=OPEN)

Requests 4 and 5 skip the provider instantly. No retries. No wasted seconds. After the 5-second cooldown, the breaker moves to HALF_OPEN and tests with one request.

Warning: Don’t set the threshold too low. A threshold of 1 means one transient 500 opens the circuit. For LLM APIs that occasionally hiccup under load, 3-5 consecutive failures is safer.
typescript
{
  type: 'exercise',
  id: 'circuit-breaker-halfopen',
  title: 'Exercise 2: Test the HALF_OPEN Recovery',
  difficulty: 'intermediate',
  exerciseType: 'write',
  instructions: 'Create a CircuitBreaker with failure_threshold=2 and cooldown_seconds=0.1. Record 2 failures to open it. Then sleep past the cooldown, check that `can_execute()` returns True (HALF_OPEN), and record a success to close it. Print the final state.',
  starterCode: 'breaker = CircuitBreaker(failure_threshold=2, cooldown_seconds=0.1)\n\n# Step 1: Record failures to open the circuit\n# YOUR CODE HERE\n\n# Step 2: Sleep past cooldown\n# YOUR CODE HERE\n\n# Step 3: Check can_execute and record success\nif breaker.can_execute():\n    print(f"Test request allowed (state={breaker.state})")\n    # YOUR CODE HERE\n\nprint(f"Final state: {breaker.state}")',
  testCases: [
    { id: 'tc1', input: '', expectedOutput: 'HALF_OPEN', description: 'Should show HALF_OPEN after cooldown', hidden: false },
    { id: 'tc2', input: '', expectedOutput: 'Final state: CLOSED', description: 'Should close after success', hidden: false },
  ],
  hints: [
    'Call breaker.record_failure() twice, then time.sleep(0.2) to pass the cooldown.',
    'After can_execute() returns True in HALF_OPEN, call breaker.record_success() to close the circuit.',
  ],
  solution: 'breaker = CircuitBreaker(failure_threshold=2, cooldown_seconds=0.1)\n\nbreaker.record_failure()\nbreaker.record_failure()\n\ntime.sleep(0.2)\n\nif breaker.can_execute():\n    print(f"Test request allowed (state={breaker.state})")\n    breaker.record_success()\n\nprint(f"Final state: {breaker.state}")',
  solutionExplanation: 'Two failures open the circuit. After sleeping past the 0.1s cooldown, can_execute() transitions the breaker to HALF_OPEN and returns True. Recording a success then resets the breaker to CLOSED.',
  xpReward: 15,
}

Rate Limiting with Token Bucket

LLM providers cap your usage. OpenAI enforces requests per minute (RPM) and tokens per minute (TPM). Exceed the cap, and you get a 429. Your retry logic kicks in — but retrying a rate limit just extends the wait.

Better approach: prevent 429s before they happen. A token bucket tracks remaining capacity and pauses proactively.

Picture a bucket holding tokens. New tokens drip in at a steady rate. Each request removes one. Empty bucket? Wait for the next drip. The _refill method adds tokens based on elapsed time. The acquire method checks availability and returns the wait time (zero means “go ahead”).

@dataclass
class TokenBucketRateLimiter:
    """Rate limiter using the token bucket algorithm."""
    max_tokens: float
    refill_rate: float  # tokens per second
    tokens: float = 0.0
    last_refill: float = field(default_factory=time.time)

    def __post_init__(self):
        self.tokens = self.max_tokens

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.max_tokens,
                         self.tokens + elapsed * self.refill_rate)
        self.last_refill = now

    def acquire(self, count=1):
        """Consume tokens. Returns wait time (0 = go)."""
        self._refill()
        if self.tokens >= count:
            self.tokens -= count
            return 0.0
        return (count - self.tokens) / self.refill_rate

    def wait_and_acquire(self, count=1):
        """Block until tokens available, then consume."""
        wait = self.acquire(count)
        if wait > 0:
            print(f"  Rate limit: waiting {wait:.2f}s")
            time.sleep(wait)
            self._refill()
            self.tokens -= count

A bucket with 3 tokens, refilling at 2 per second. Five rapid requests — watch what happens.

limiter = TokenBucketRateLimiter(max_tokens=3, refill_rate=2.0)

for i in range(5):
    wait = limiter.acquire(1)
    if wait > 0:
        print(f"Request {i+1}: Wait {wait:.2f}s")
        time.sleep(wait)
        limiter._refill()
        limiter.tokens -= 1
    else:
        print(f"Request {i+1}: Sent ({limiter.tokens:.1f} left)")
python
Request 1: Sent (2.0 left)
Request 2: Sent (1.0 left)
Request 3: Sent (0.0 left)
Request 4: Wait 0.50s
Request 5: Wait 0.50s

Three fly through. The fourth and fifth wait for refill. In production, match max_tokens and refill_rate to your provider’s RPM.

Tip: Set your limiter slightly below the actual cap. If OpenAI allows 60 RPM, configure 50. This buffer absorbs clock drift and burst spikes.

Predict the output: If max_tokens=5 and refill_rate=1.0, how long would request #7 wait after 6 rapid requests? Answer: The bucket empties after 5 requests. Request 6 waits 1 second. Request 7 waits another 1 second (1 token per second refill).

Assembling the Resilient Client

Four pieces built. Time to wire them together. The ResilientLLMClient gives each provider its own circuit breaker and rate limiter.

The RequestLog dataclass records every call — provider, tokens, latency, cost, and whether it succeeded. The COST_TABLE maps models to per-1K-token pricing.

@dataclass
class RequestLog:
    """Log entry for a single LLM request."""
    provider: str
    model: str
    prompt_tokens: int
    completion_tokens: int
    latency_ms: float
    cost_usd: float
    success: bool
    error: str = ""

Each provider key in the client combines name and model. This lets you run two OpenAI models with independent breakers and limiters.

The complete method is the main entry point. For each provider, it checks the circuit breaker, acquires a rate limit token, builds the request, and calls with retry. On failure, it logs the error and tries the next provider.

class ResilientLLMClient:
    """LLM client with retry, fallback, circuit breaker,
    rate limiting, and cost tracking."""

    COST_TABLE = {
        "gpt-4o": {"input": 0.0025, "output": 0.01},
        "claude-sonnet-4-20250514": {"input": 0.003, "output": 0.015},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
    }

    def __init__(self, providers, rpm_limit=50):
        self.providers = providers
        self.breakers = {
            p.name + p.model: CircuitBreaker()
            for p in providers}
        self.limiters = {
            p.name + p.model: TokenBucketRateLimiter(
                max_tokens=rpm_limit,
                refill_rate=rpm_limit / 60.0)
            for p in providers}
        self.logs = []

    def _estimate_cost(self, model, p_tok, c_tok):
        rates = self.COST_TABLE.get(
            model, {"input": 0.001, "output": 0.002})
        return (p_tok / 1000 * rates["input"]
                + c_tok / 1000 * rates["output"])

The _estimate_cost method looks up per-1K-token pricing. If the model isn’t in the table, it falls back to a conservative default.

Here’s complete — the method that orchestrates the full pipeline. It tries each provider in order, using circuit breaker, rate limiter, and retry logic.

    def complete(self, prompt, max_tokens=256):
        """Send prompt through the resilient pipeline."""
        errors = []
        for provider in self.providers:
            key = provider.name + provider.model
            breaker = self.breakers[key]
            limiter = self.limiters[key]
            if not breaker.can_execute():
                errors.append(f"{provider.model}: circuit open")
                continue
            limiter.wait_and_acquire(1)
            headers = provider.build_headers()
            body = provider.build_body(prompt, max_tokens)
            start = time.time()
            try:
                result = retry_with_backoff(
                    lambda p=provider, h=headers, b=body:
                        self._send_request(p, h, b),
                    max_retries=provider.max_retries)
                latency = (time.time() - start) * 1000
                p_tok = int(len(prompt.split()) * 1.3)
                c_tok = int(max_tokens * 0.5)
                breaker.record_success()
                cost = self._estimate_cost(
                    provider.model, p_tok, c_tok)
                self.logs.append(RequestLog(
                    provider=provider.name,
                    model=provider.model,
                    prompt_tokens=p_tok,
                    completion_tokens=c_tok,
                    latency_ms=round(latency, 1),
                    cost_usd=round(cost, 6),
                    success=True))
                return result
            except APIError as e:
                breaker.record_failure()
                self.logs.append(RequestLog(
                    provider=provider.name,
                    model=provider.model,
                    prompt_tokens=0, completion_tokens=0,
                    latency_ms=round(
                        (time.time()-start)*1000, 1),
                    cost_usd=0.0, success=False,
                    error=str(e)))
                errors.append(f"{provider.model}: {e.message}")
        raise APIError(503,
            f"All providers failed: {'; '.join(errors)}")
Note: Token counting here is approximate. In production, use `tiktoken` for OpenAI models or the provider’s counting endpoint. The `words * 1.3` heuristic is a rough estimate.

Testing the Full Pipeline

We can’t make real API calls in the browser. Instead, we’ll mock the _send_request method to simulate OpenAI being down while Anthropic stays healthy.

def _mock_send(self, provider, headers, body):
    """OpenAI fails, Anthropic works."""
    if provider.model == "gpt-4o":
        raise APIError(503, "OpenAI is down")
    return {"provider": provider.name,
            "model": provider.model,
            "content": f"Response from {provider.model}"}


ResilientLLMClient._send_request = _mock_send
client = ResilientLLMClient(providers, rpm_limit=60)
random.seed(42)

result = client.complete("Explain gradient descent briefly")
print(f"Provider used: {result['model']}")
print(f"Response: {result['content']}")
python
  Attempt 1 failed (503). Retrying in 0.0s...
  Attempt 2 failed (503). Retrying in 0.0s...
  Attempt 3 failed (503). Retrying in 0.0s...
Provider used: claude-sonnet-4-20250514
Response: Response from claude-sonnet-4-20250514

OpenAI failed all 3 retries. The client moved to Anthropic automatically. Zero manual intervention.

Cost Summary and Logging

Every request in our client gets logged. Here’s a function that prints a spending and success report — essential for monitoring production LLM usage.

def print_cost_summary(client):
    total_cost = sum(l.cost_usd for l in client.logs)
    successes = sum(1 for l in client.logs if l.success)
    failures = len(client.logs) - successes
    print(f"\n{'='*45}")
    print(f"  Request Summary")
    print(f"{'='*45}")
    print(f"  Total requests:   {len(client.logs)}")
    print(f"  Successes:        {successes}")
    print(f"  Failures:         {failures}")
    print(f"  Total cost:       ${total_cost:.6f}")
    print(f"{'='*45}")
    for log in client.logs:
        status = "OK" if log.success else "FAIL"
        print(f"  [{status}] {log.model:30s} "
              f"{log.latency_ms:7.1f}ms ${log.cost_usd:.6f}")


print_cost_summary(client)
python
=============================================
  Request Summary
=============================================
  Total requests:   2
  Successes:        1
  Failures:         1
  Total cost:       $0.002295
=============================================
  [FAIL] gpt-4o                              0.0ms $0.000000
  [OK] claude-sonnet-4-20250514              0.0ms $0.002295

You see exactly which provider was used, the cost, and which calls failed. In production, pipe these to Datadog, Grafana, or a CSV.

Key Insight: Cost tracking isn’t optional — it’s survival. A single runaway loop can burn hundreds of dollars in minutes. Per-request logging lets you set alerts before that happens.

Common Mistakes and How to Fix Them

Mistake 1: Retrying 429s like 500s

Wrong:

if status_code in (429, 500, 503):
    time.sleep(base_delay * 2 ** attempt)

Why it’s wrong: A 429 includes a Retry-After header. It tells you exactly how long to wait. Ignoring it means you keep hitting the limit.

Correct:

if status_code == 429:
    wait = int(headers.get("Retry-After", 60))
    time.sleep(wait)
elif status_code in (500, 502, 503):
    time.sleep(base_delay * 2 ** attempt)

Mistake 2: No HTTP timeout

Wrong:

response = requests.post(url, json=body, headers=headers)

Why it’s wrong: A hung connection blocks your thread forever. This cascades into thread pool exhaustion.

Correct:

response = requests.post(
    url, json=body, headers=headers, timeout=30
)

Mistake 3: One rate limiter for all providers

Wrong:

global_limiter = TokenBucketRateLimiter(max_tokens=50)

Why it’s wrong: Each provider has separate limits. One shared limiter means hitting OpenAI’s cap also blocks Claude — even though Claude has capacity.

Correct: One limiter per provider, which ResilientLLMClient does by default.

When NOT to Build This Yourself

This article built everything from scratch for learning. In production, you have options.

Use a managed gateway when:
– You have 10+ developers calling LLM APIs
– You need centralized cost controls and audit logs
– LiteLLM, Portkey, or AWS Bedrock handle this at the infra layer

Use this custom approach when:
– You need full control over retry and fallback logic
– You’re building a library that others will consume
– Adding a gateway feels like overkill for your team size

Use Tenacity when:
– You only need retry (no fallbacks or circuit breakers)
– It’s battle-tested and handles async, callbacks, and jitter natively

I prefer the custom approach for small teams (1-5 developers). Once you cross 10+ services making LLM calls, a gateway pays for itself in operational simplicity.

Practice Exercise

Challenge: Extend the CircuitBreaker to support half_open_max_calls. In HALF_OPEN state, allow N test requests (not just one). If all N succeed, close the circuit. If any fail, reopen it.

Click to see the solution
@dataclass
class ImprovedCircuitBreaker:
    failure_threshold: int = 3
    cooldown_seconds: float = 60.0
    half_open_max_calls: int = 3
    failure_count: int = 0
    half_open_successes: int = 0
    last_failure_time: float = 0.0
    state: str = "CLOSED"

    def can_execute(self):
        if self.state == "CLOSED":
            return True
        if self.state == "OPEN":
            if time.time() - self.last_failure_time >= self.cooldown_seconds:
                self.state = "HALF_OPEN"
                self.half_open_successes = 0
                return True
            return False
        return True

    def record_success(self):
        if self.state == "HALF_OPEN":
            self.half_open_successes += 1
            if self.half_open_successes >= self.half_open_max_calls:
                self.failure_count = 0
                self.state = "CLOSED"
                print("  Circuit CLOSED — provider recovered")
        else:
            self.failure_count = 0

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.state == "HALF_OPEN":
            self.state = "OPEN"
            print("  Circuit reopened — test failed")
        elif self.failure_count >= self.failure_threshold:
            self.state = "OPEN"

The key change: `record_success` tracks `half_open_successes` and only closes after N consecutive successes. One lucky request doesn’t mark a flaky provider as healthy.

Complete Code

Click to expand the full script (copy-paste and run)
# Complete code from: Build a Resilient LLM Client in Python
# Requires: No external dependencies (pure stdlib)
# Python 3.9+

import time
import random
import json
from dataclasses import dataclass, field
from typing import Optional


class APIError(Exception):
    def __init__(self, status_code, message=""):
        self.status_code = status_code
        self.message = message
        super().__init__(f"HTTP {status_code}: {message}")


def retry_with_backoff(func, max_retries=3, base_delay=1.0,
                       max_delay=30.0,
                       retryable_statuses=(429, 500, 502, 503, 504)):
    last_exception = None
    for attempt in range(max_retries + 1):
        try:
            return func()
        except APIError as e:
            last_exception = e
            if e.status_code not in retryable_statuses:
                raise
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay)
            print(f"  Attempt {attempt+1} failed ({e.status_code}). "
                  f"Retrying in {jitter:.1f}s...")
            time.sleep(jitter)
    raise last_exception


@dataclass
class CircuitBreaker:
    failure_threshold: int = 3
    cooldown_seconds: float = 60.0
    failure_count: int = 0
    last_failure_time: float = 0.0
    state: str = "CLOSED"

    def can_execute(self):
        if self.state == "CLOSED":
            return True
        if self.state == "OPEN":
            if time.time() - self.last_failure_time >= self.cooldown_seconds:
                self.state = "HALF_OPEN"
                return True
            return False
        return True

    def record_success(self):
        self.failure_count = 0
        self.state = "CLOSED"

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"
            print(f"  Circuit OPEN — skipping for "
                  f"{self.cooldown_seconds}s")


@dataclass
class TokenBucketRateLimiter:
    max_tokens: float
    refill_rate: float
    tokens: float = 0.0
    last_refill: float = field(default_factory=time.time)

    def __post_init__(self):
        self.tokens = self.max_tokens

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.max_tokens,
                         self.tokens + elapsed * self.refill_rate)
        self.last_refill = now

    def acquire(self, count=1):
        self._refill()
        if self.tokens >= count:
            self.tokens -= count
            return 0.0
        return (count - self.tokens) / self.refill_rate

    def wait_and_acquire(self, count=1):
        wait = self.acquire(count)
        if wait > 0:
            print(f"  Rate limit: waiting {wait:.2f}s")
            time.sleep(wait)
            self._refill()
            self.tokens -= count


@dataclass
class ProviderConfig:
    name: str
    api_url: str
    model: str
    api_key: str
    max_retries: int = 3
    timeout: float = 30.0

    def build_headers(self):
        if self.name == "openai":
            return {"Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"}
        elif self.name == "anthropic":
            return {"x-api-key": self.api_key,
                    "anthropic-version": "2023-06-01",
                    "Content-Type": "application/json"}
        return {"Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"}

    def build_body(self, prompt, max_tokens=256):
        return {"model": self.model,
                "messages": [{"role": "user", "content": prompt}],
                "max_tokens": max_tokens}


@dataclass
class RequestLog:
    provider: str
    model: str
    prompt_tokens: int
    completion_tokens: int
    latency_ms: float
    cost_usd: float
    success: bool
    error: str = ""


class ResilientLLMClient:
    COST_TABLE = {
        "gpt-4o": {"input": 0.0025, "output": 0.01},
        "claude-sonnet-4-20250514": {"input": 0.003, "output": 0.015},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
    }

    def __init__(self, providers, rpm_limit=50):
        self.providers = providers
        self.breakers = {p.name + p.model: CircuitBreaker()
                        for p in providers}
        self.limiters = {p.name + p.model: TokenBucketRateLimiter(
                            max_tokens=rpm_limit,
                            refill_rate=rpm_limit / 60.0)
                        for p in providers}
        self.logs = []

    def _estimate_cost(self, model, p_tok, c_tok):
        rates = self.COST_TABLE.get(
            model, {"input": 0.001, "output": 0.002})
        return (p_tok / 1000 * rates["input"]
                + c_tok / 1000 * rates["output"])

    def complete(self, prompt, max_tokens=256):
        errors = []
        for provider in self.providers:
            key = provider.name + provider.model
            breaker = self.breakers[key]
            limiter = self.limiters[key]
            if not breaker.can_execute():
                errors.append(f"{provider.model}: circuit open")
                continue
            limiter.wait_and_acquire(1)
            headers = provider.build_headers()
            body = provider.build_body(prompt, max_tokens)
            start = time.time()
            try:
                result = retry_with_backoff(
                    lambda p=provider, h=headers, b=body:
                        self._send_request(p, h, b),
                    max_retries=provider.max_retries)
                latency = (time.time() - start) * 1000
                p_tok = int(len(prompt.split()) * 1.3)
                c_tok = int(max_tokens * 0.5)
                breaker.record_success()
                self.logs.append(RequestLog(
                    provider=provider.name, model=provider.model,
                    prompt_tokens=p_tok, completion_tokens=c_tok,
                    latency_ms=round(latency, 1),
                    cost_usd=round(self._estimate_cost(
                        provider.model, p_tok, c_tok), 6),
                    success=True))
                return result
            except APIError as e:
                breaker.record_failure()
                self.logs.append(RequestLog(
                    provider=provider.name, model=provider.model,
                    prompt_tokens=0, completion_tokens=0,
                    latency_ms=round((time.time()-start)*1000, 1),
                    cost_usd=0.0, success=False, error=str(e)))
                errors.append(f"{provider.model}: {e.message}")
        raise APIError(503,
            f"All providers failed: {'; '.join(errors)}")


# --- Demo ---
providers = [
    ProviderConfig(name="openai",
        api_url="https://api.openai.com/v1/chat/completions",
        model="gpt-4o", api_key="sk-demo"),
    ProviderConfig(name="anthropic",
        api_url="https://api.anthropic.com/v1/messages",
        model="claude-sonnet-4-20250514", api_key="sk-ant-demo"),
    ProviderConfig(name="openai",
        api_url="https://api.openai.com/v1/chat/completions",
        model="gpt-4o-mini", api_key="sk-demo"),
]


def _mock_send(self, provider, headers, body):
    if provider.model == "gpt-4o":
        raise APIError(503, "OpenAI is down")
    return {"provider": provider.name,
            "model": provider.model,
            "content": f"Response from {provider.model}"}


ResilientLLMClient._send_request = _mock_send
client = ResilientLLMClient(providers, rpm_limit=60)
random.seed(42)
result = client.complete("Explain gradient descent briefly")
print(f"Provider: {result['model']}")
print(f"Response: {result['content']}")


def print_cost_summary(client):
    total_cost = sum(l.cost_usd for l in client.logs)
    successes = sum(1 for l in client.logs if l.success)
    failures = len(client.logs) - successes
    print(f"\n{'='*45}")
    print(f"  Request Summary")
    print(f"{'='*45}")
    print(f"  Total requests:   {len(client.logs)}")
    print(f"  Successes:        {successes}")
    print(f"  Failures:         {failures}")
    print(f"  Total cost:       ${total_cost:.6f}")
    print(f"{'='*45}")
    for log in client.logs:
        status = "OK" if log.success else "FAIL"
        print(f"  [{status}] {log.model:30s} "
              f"{log.latency_ms:7.1f}ms ${log.cost_usd:.6f}")


print_cost_summary(client)
print("\nScript completed successfully.")

Frequently Asked Questions

Should I use Tenacity instead of writing retry logic?

For retry-only use cases, yes. Tenacity handles edge cases like async support, retry state, and callback hooks. But it doesn’t give you fallback chains, circuit breakers, or cost tracking. This article combines all four.

How do I handle streaming with retry logic?

Streaming complicates retries because failure can happen mid-stream. Retry at the connection level before data arrives. Once streaming starts, don’t retry — buffer what you have and let the caller decide.

What’s the difference between a circuit breaker and removing a provider?

A circuit breaker is temporary and automatic. After the cooldown, it tests whether the provider recovered. Removing a provider is permanent — someone has to add it back manually.

How do I find the right rate limit when providers don’t publish numbers?

Start at 30 RPM and increase gradually. Watch your 429 rate. If it’s above 1%, lower the limit. Parse x-ratelimit-remaining headers for dynamic adjustment.

References

  1. AWS Builders’ Library — Timeouts, Retries and Backoff with Jitter — The definitive guide to jitter strategies.
  2. Martin Fowler — CircuitBreaker — Original circuit breaker pattern description.
  3. OpenAI Rate Limits — RPM and TPM limits per tier.
  4. Anthropic API — Errors — Status codes and retry guidance.
  5. Python Tenacity — Standard retry library with backoff and jitter.
  6. Google Cloud — Bulletproof LLM Applications — SRE best practices for LLM infra.
  7. LiteLLM — Fallbacks and Retries — Gateway-level provider failover.
  8. Portkey — Retries, Fallbacks, Circuit Breakers — Production resilience patterns.

Reviewed: March 2026. Tested with Python 3.11.

Free Course
Master Core Python — Your First Step into AI/ML

Build a strong Python foundation with hands-on exercises designed for aspiring Data Scientists and AI/ML Engineers.

Start Free Course
Trusted by 50,000+ learners
Related Course
Master Gen AI — Hands-On
Join 5,000+ students at edu.machinelearningplus.com
Explore Course
Free Callback - Limited Slots
Not Sure Which Course to Start With?
Talk to our AI Counsellors and Practitioners. We'll help you clear all your questions for your background and goals, bridging the gap between your current skills and a career in AI.
10-digit mobile number
📞
Thank You!
We'll Call You Soon!
Our learning advisor will reach out within 24 hours.
(Check your inbox too — we've sent a confirmation)
⚡ Before you go

Python.
SQL. NumPy.
All free.

Get the exact 10-course programming foundation that Data Science professionals use.

🐍
Core Python — from first line to expert level
📈
NumPy & Pandas — the #1 libraries every DS job needs
🗃️
SQL Levels I–III — basics to Window Functions
📄
Real industry data — Jupyter notebooks included
R A M S K
57,000+ students
★★★★★ Rated 4.9/5
⚡ Before you go
Python. SQL.
All Free.
R A M S K
57,000+ students  ★★★★★ 4.9/5
Get Free Access Now
10 courses. Real projects. Zero cost. No credit card.
New learners enrolling right now
🔒 100% free ☕ No spam, ever ✓ Instant access
🚀
You're in!
Check your inbox for your access link.
(Check Promotions or Spam if you don't see it)
Or start your first course right now:
Start Free Course →
Scroll to Top
Scroll to Top
Course Preview

Machine Learning A-Z™: Hands-On Python & R In Data Science

Free Sample Videos:

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science

Machine Learning A-Z™: Hands-On Python & R In Data Science