Rate Limits¶
Understanding and working with API rate limits.
Current Limits¶
| Endpoint | Limit | Window |
|---|---|---|
POST /diagnosis/ |
30 requests | per minute |
GET /health |
100 requests | per minute |
Rate limits are applied per IP address.
Rate Limit Headers¶
Every response includes rate limit information:
| Header | Description |
|---|---|
X-RateLimit-Limit |
Maximum requests allowed in window |
X-RateLimit-Remaining |
Requests remaining in current window |
X-RateLimit-Reset |
Unix timestamp when window resets |
Example:
Rate Limit Exceeded Response¶
When you exceed the limit, you receive:
HTTP/1.1 429 Too Many Requests
Retry-After: 30
Content-Type: application/json
{
"detail": "Rate limit exceeded"
}
The Retry-After header indicates seconds until you can retry.
Handling Rate Limits¶
Python¶
import requests
import time
def diagnose_with_rate_limit(url, files, data, max_retries=3):
"""Handle rate limits with exponential backoff."""
for attempt in range(max_retries):
response = requests.post(url, files=files, data=data)
if response.status_code == 200:
return response.json()
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 30))
remaining = response.headers.get("X-RateLimit-Remaining", "0")
print(f"Rate limited. Waiting {retry_after}s... (remaining: {remaining})")
time.sleep(retry_after)
continue
response.raise_for_status()
raise Exception("Max retries exceeded")
JavaScript¶
async function diagnoseWithRateLimit(formData, maxRetries = 3) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
const response = await fetch('https://api.tajirifarm.com/diagnosis/', {
method: 'POST',
body: formData
});
if (response.ok) {
return response.json();
}
if (response.status === 429) {
const retryAfter = parseInt(response.headers.get('Retry-After') || '30');
console.log(`Rate limited. Waiting ${retryAfter}s...`);
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
continue;
}
throw new Error(`HTTP ${response.status}`);
}
throw new Error('Max retries exceeded');
}
Best Practices¶
1. Implement Client-Side Throttling¶
Don't wait for 429 errors. Track your requests proactively:
import time
from collections import deque
class RateLimiter:
def __init__(self, max_requests=30, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
def wait_if_needed(self):
now = time.time()
# Remove old requests outside window
while self.requests and self.requests[0] < now - self.window_seconds:
self.requests.popleft()
# Wait if at limit
if len(self.requests) >= self.max_requests:
oldest = self.requests[0]
wait_time = oldest + self.window_seconds - now
if wait_time > 0:
print(f"Throttling: waiting {wait_time:.1f}s")
time.sleep(wait_time)
self.requests.append(now)
# Usage
limiter = RateLimiter(max_requests=30, window_seconds=60)
for image in images:
limiter.wait_if_needed()
result = diagnose(image)
2. Batch Processing with Delays¶
When processing multiple images:
import time
def batch_diagnose(images, delay_seconds=2):
"""Process images with delay to avoid rate limits."""
results = []
for i, image in enumerate(images):
print(f"Processing {i+1}/{len(images)}: {image}")
result = diagnose(image)
results.append(result)
# Don't delay after last image
if i < len(images) - 1:
time.sleep(delay_seconds)
return results
3. Use Exponential Backoff¶
For retry logic, increase wait time with each attempt:
import time
import random
def exponential_backoff(attempt, base_delay=1, max_delay=60):
"""Calculate delay with jitter."""
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
# Usage in retry loop
for attempt in range(max_retries):
try:
return make_request()
except RateLimitError:
delay = exponential_backoff(attempt)
time.sleep(delay)
4. Cache Results¶
Don't re-diagnose the same image:
import hashlib
class DiagnosisCache:
def __init__(self):
self.cache = {}
def get_hash(self, image_data):
return hashlib.md5(image_data).hexdigest()
def get(self, image_data):
return self.cache.get(self.get_hash(image_data))
def set(self, image_data, result):
self.cache[self.get_hash(image_data)] = result
# Usage
cache = DiagnosisCache()
def diagnose_cached(image_path):
with open(image_path, 'rb') as f:
image_data = f.read()
cached = cache.get(image_data)
if cached:
return cached
result = diagnose(image_data)
cache.set(image_data, result)
return result
Enterprise Rate Limits¶
Need higher limits for production use?
Enterprise API access includes:
- Higher rate limits (up to 1000 req/min)
- Dedicated capacity
- Priority processing
- SLA guarantees
Contact: partners@tajirifarm.com