← All Articles
April 8, 2026 8 min read Technical Deep-Dive API Python

API Rate Limits Will Wreck Your AI Project (Here's How to Handle Them)

Your automation is running smoothly. Then at 2 PM on Tuesday, everything stops. The Claude API is rejecting requests with 429 Too Many Requests. If you're not prepared, rate limits will break your system when you can't afford downtime.

Your automation was working fine yesterday at the same volume. What changed? Maybe nothing. Maybe Claude had brief capacity constraints. Maybe you spiked unexpectedly.

The point: rate limits will happen. And if you're not prepared, they'll break your system when you can't afford downtime.

I'm going to walk through the architecture that survives rate limits—and the code patterns that make it work.

Why Rate Limits Exist

First, context. Rate limits aren't punishment. They're protection.

  1. Capacity protection. Claude API has finite compute. They limit per-user request volume to ensure nobody monopolizes capacity.
  2. Cost control. Buggy code can spend $50K in 30 seconds if you're not careful. Rate limits give you time to notice and kill runaway jobs.
  3. Fair use. Without limits, one customer could starve everyone else.

For Claude, the limits are typically:

Both matter. You could be under the RPM limit but over the TPM limit.

The Naive Approach (That Fails)

Most people do this:

for invoice in invoices: response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=100, messages=[{"role": "user", "content": f"Extract data: {invoice}"}] ) save_to_db(response)

If you hit 500 invoices per minute and you're over the RPM limit, this crashes with a 429 error. The entire batch fails. The user sees "something went wrong." You're paged at 2 AM.

The Production Approach: Exponential Backoff with Jitter

Here's what actually works:

import time import random from anthropic import Anthropic, RateLimitError client = Anthropic() def call_claude_with_retry(prompt, max_retries=5): """Call Claude with exponential backoff + jitter.""" for attempt in range(max_retries): try: response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=100, messages=[{"role": "user", "content": prompt}] ) return response except RateLimitError: if attempt == max_retries - 1: raise # Give up on final attempt # Exponential backoff: 2^attempt seconds, plus random jitter backoff = 2 ** attempt jitter = random.uniform(0, 1) wait_time = backoff + jitter print(f"Rate limited. Retrying in {wait_time:.2f}s (attempt {attempt+1}/{max_retries})") time.sleep(wait_time) return None

What's happening here:

  1. Try the request. If it succeeds, great.
  2. Catch rate limits. If you get a 429, don't fail immediately.
  3. Back off exponentially. First retry: wait 1–2 seconds. Second: 2–4 seconds. Third: 4–8 seconds. This gives the API time to recover.
  4. Add jitter. Without jitter, 100 clients all retry at the same time and hammer the API again. Jitter spreads retries randomly, preventing thundering herd.

With this pattern, temporary rate limit spikes are absorbed transparently. The user never sees an error.

Token Budget: The Real Limit

Requests per minute is easy to track. Tokens per minute is sneakier.

A "token" is roughly 4 characters. A 500-word document might be 800 tokens. If you're processing invoices that average 1,000 tokens, and you have a 50K TPM limit, you can do 50 extractions per minute maximum. That's not about request volume; it's about token volume.

To manage this, track tokens:

def extract_invoice(invoice_text, client): """Extract with token tracking.""" prompt = f"Extract vendor, amount, date: {invoice_text}" response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=100, messages=[{"role": "user", "content": prompt}] ) # Access token usage from response input_tokens = response.usage.input_tokens output_tokens = response.usage.output_tokens total_tokens = input_tokens + output_tokens print(f"Used {total_tokens} tokens. Total this minute: {track_tokens()}") return response

Then implement a token bucket:

class TokenBucket: """Rate limit by tokens, not requests.""" def __init__(self, capacity, refill_rate_per_second): self.capacity = capacity self.tokens = capacity self.refill_rate = refill_rate_per_second self.last_refill = time.time() def consume(self, tokens): """Consume tokens. Block if not enough capacity.""" now = time.time() elapsed = now - self.last_refill refill = elapsed * self.refill_rate self.tokens = min(self.capacity, self.tokens + refill) self.last_refill = now if self.tokens >= tokens: self.tokens -= tokens return True return False def wait_until_available(self, tokens): """Block until tokens are available.""" while not self.consume(tokens): time.sleep(0.1) # Usage bucket = TokenBucket(capacity=50_000, refill_rate_per_second=50_000/60) for invoice in invoices: prompt = f"Extract: {invoice}" # Estimate tokens (rough: 4 chars per token) estimated_tokens = len(prompt) // 4 + 100 # prompt + response bucket.wait_until_available(estimated_tokens) response = extract_invoice(invoice, client)

This ensures you never exceed your TPM limit. You're self-throttling at 100% utilization, not crashing at 101%.

Queue-Based Architecture (For High Scale)

If you're processing thousands of documents per hour, a queue pattern is better:

import json import boto3 from concurrent.futures import ThreadPoolExecutor sqs = boto3.client("sqs") QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/invoices" def producer(invoices): """Put invoices on queue.""" for invoice in invoices: sqs.send_message( QueueUrl=QUEUE_URL, MessageBody=json.dumps({"invoice_text": invoice}) ) def consumer(token_bucket): """Pull from queue and process with rate limiting.""" while True: response = sqs.receive_message( QueueUrl=QUEUE_URL, MaxNumberOfMessages=10 ) if "Messages" not in response: time.sleep(1) continue for message in response["Messages"]: body = json.loads(message["Body"]) invoice_text = body["invoice_text"] # Wait for token budget estimated_tokens = len(invoice_text) // 4 + 100 token_bucket.wait_until_available(estimated_tokens) # Process extracted = extract_invoice(invoice_text, client) save_to_db(extracted) # Delete from queue sqs.delete_message( QueueUrl=QUEUE_URL, ReceiptHandle=message["ReceiptHandle"] ) # Run consumer with multiple workers token_bucket = TokenBucket(capacity=50_000, refill_rate_per_second=50_000/60) with ThreadPoolExecutor(max_workers=3) as executor: for _ in range(3): executor.submit(consumer, token_bucket)

The queue decouples producers (your app submitting invoices) from consumers (workers processing them). If rate limits hit, the queue just backs up. Invoices don't fail; they wait. The user keeps using the app.

Monitoring Rate Limits

Finally, track what's actually happening:

import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RateLimitMetrics: def __init__(self): self.rate_limit_hits = 0 self.total_tokens = 0 self.total_requests = 0 def log_request(self, tokens, was_rate_limited): self.total_requests += 1 self.total_tokens += tokens if was_rate_limited: self.rate_limit_hits += 1 if self.total_requests % 100 == 0: logger.info( f"Metrics: {self.total_requests} requests, " f"{self.total_tokens} tokens, " f"{self.rate_limit_hits} rate limit hits" )

Log to CloudWatch. Alert if you're hitting rate limits repeatedly (means your token budget is too tight).

Bottom Line

Rate limits are inevitable. But they don't have to crash your system.

The pattern:

  1. Exponential backoff + jitter for transient limits.
  2. Token bucket to self-throttle before hitting limits.
  3. Queue architecture for high volume.
  4. Monitoring to catch trends before they break.

Build this from day one. It's not complex. It's not costly. And it saves you the 2 AM page.

Get a production-ready rate-limiting library

I've packaged these patterns—exponential backoff, token bucket, and metrics—into a reusable Python module. Drop it into any Claude API project.

Drop-in Python module Battle-tested patterns No dependencies

No spam. Unsubscribe anytime.

or

Need help building resilient AI systems?

We build API integrations with proper error handling, rate limiting, and monitoring baked in from day one.

Book Your Free Discovery Call
← The $5,000 Automation That Replaced a $60,000 Process