Production Client¶
Production-ready client implementations with retry logic, error handling, and best practices.
Synchronous Client¶
import requests
import time
from typing import Optional, Dict, Any
from pathlib import Path
class TajiriVisionClient:
"""Production-ready client for Tajiri Vision API."""
def __init__(
self,
base_url: str = "https://api.tajirifarm.com",
timeout: int = 60,
max_retries: int = 3
):
self.base_url = base_url
self.timeout = timeout
self.max_retries = max_retries
self.session = requests.Session()
def diagnose(
self,
image: str | bytes | Path,
crop_type: Optional[str] = None,
region: Optional[str] = None,
growth_stage: Optional[str] = None,
description: Optional[str] = None,
language: str = "en",
detail_level: str = "standard",
include_bbox: bool = False
) -> Dict[str, Any]:
"""
Diagnose plant disease from image.
Args:
image: File path, Path object, or bytes
crop_type: Type of crop (e.g., "tomato", "maize")
region: Geographic region (e.g., "Kenya")
growth_stage: Plant growth stage
description: Symptom description
language: Response language (en, fr, sw, es, pt, it)
detail_level: Detail level (simple, standard, expert)
include_bbox: Include bounding boxes
Returns:
Diagnosis response dictionary
Raises:
TajiriAPIError: On API errors after retries exhausted
"""
url = f"{self.base_url}/diagnoses/"
# Prepare image
if isinstance(image, (str, Path)):
image_data = Path(image).read_bytes()
filename = Path(image).name
else:
image_data = image
filename = "image.jpg"
# Prepare request
files = {"image": (filename, image_data)}
data = {
k: v for k, v in {
"crop_type": crop_type,
"region": region,
"growth_stage": growth_stage,
"description": description,
"language": language,
"detail_level": detail_level,
"include_bbox": str(include_bbox).lower(),
}.items() if v is not None
}
# Make request with retry
return self._request_with_retry(url, files, data)
def _request_with_retry(
self,
url: str,
files: dict,
data: dict
) -> Dict[str, Any]:
"""Execute request with exponential backoff retry."""
last_error = None
for attempt in range(self.max_retries):
try:
response = self.session.post(
url,
files=files,
data=data,
timeout=self.timeout
)
# Handle rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 30))
time.sleep(retry_after)
continue
# Handle server errors with backoff
if response.status_code >= 500:
time.sleep(2 ** attempt)
continue
# Raise for client errors
if response.status_code >= 400:
raise TajiriAPIError(
response.status_code,
response.json().get("detail", "Unknown error")
)
return response.json()
except requests.exceptions.RequestException as e:
last_error = e
time.sleep(2 ** attempt)
raise TajiriAPIError(0, f"Request failed after {self.max_retries} retries: {last_error}")
def health_check(self) -> Dict[str, Any]:
"""Check API health status."""
response = self.session.get(f"{self.base_url}/health", timeout=10)
response.raise_for_status()
return response.json()
class TajiriAPIError(Exception):
"""Custom exception for API errors."""
def __init__(self, status_code: int, message: str):
self.status_code = status_code
self.message = message
super().__init__(f"HTTP {status_code}: {message}")
interface DiagnosisOptions {
cropType?: string;
region?: string;
growthStage?: string;
description?: string;
language?: 'en' | 'fr' | 'sw' | 'es' | 'pt' | 'it';
detailLevel?: 'simple' | 'standard' | 'expert';
includeBbox?: boolean;
}
interface DiagnosisResponse {
request_id: string;
image_analysis: { is_plant: boolean; quality_issue: string | null };
crop_health: 'healthy' | 'unhealthy' | 'unknown';
diagnoses: Array<{
scientific_name: string | null;
eppo_code: string | null;
name: string;
category: string | null;
confidence: number;
urgency: string | null;
description: string;
affected_parts: string[];
reference_images: string[] | null;
}>;
treatment: object | null;
detections: Array<{ name: string; bbox: number[]; confidence: number }> | null;
additional_notes: string | null;
error: string | null;
}
class TajiriVisionClient {
private baseUrl: string;
private timeout: number;
private maxRetries: number;
constructor(
baseUrl: string = 'https://api.tajirifarm.com',
timeout: number = 60000,
maxRetries: number = 3
) {
this.baseUrl = baseUrl;
this.timeout = timeout;
this.maxRetries = maxRetries;
}
async diagnose(
image: File | Blob,
options: DiagnosisOptions = {}
): Promise<DiagnosisResponse> {
const formData = new FormData();
formData.append('image', image);
if (options.cropType) formData.append('crop_type', options.cropType);
if (options.region) formData.append('region', options.region);
if (options.growthStage) formData.append('growth_stage', options.growthStage);
if (options.description) formData.append('description', options.description);
formData.append('language', options.language || 'en');
formData.append('detail_level', options.detailLevel || 'standard');
formData.append('include_bbox', String(options.includeBbox || false));
return this.requestWithRetry(formData);
}
private async requestWithRetry(formData: FormData): Promise<DiagnosisResponse> {
let lastError: Error | null = null;
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeout);
const response = await fetch(`${this.baseUrl}/diagnoses/`, {
method: 'POST',
body: formData,
signal: controller.signal
});
clearTimeout(timeoutId);
// Handle rate limiting
if (response.status === 429) {
const retryAfter = parseInt(response.headers.get('Retry-After') || '30');
await this.sleep(retryAfter * 1000);
continue;
}
// Handle server errors with backoff
if (response.status >= 500) {
await this.sleep(Math.pow(2, attempt) * 1000);
continue;
}
if (!response.ok) {
const error = await response.json();
throw new TajiriAPIError(response.status, error.detail || 'Unknown error');
}
return response.json();
} catch (error) {
lastError = error as Error;
if (error instanceof TajiriAPIError) throw error;
await this.sleep(Math.pow(2, attempt) * 1000);
}
}
throw new TajiriAPIError(0, `Request failed after ${this.maxRetries} retries: ${lastError?.message}`);
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
async healthCheck(): Promise<{ status: string; version: string }> {
const response = await fetch(`${this.baseUrl}/health`);
return response.json();
}
}
class TajiriAPIError extends Error {
constructor(public statusCode: number, message: string) {
super(`HTTP ${statusCode}: ${message}`);
this.name = 'TajiriAPIError';
}
}
Usage¶
from tajiri_client import TajiriVisionClient, TajiriAPIError
client = TajiriVisionClient()
try:
result = client.diagnose(
"plant.jpg",
crop_type="tomato",
region="Kenya",
language="en"
)
print(f"Health: {result['crop_health']}")
for diagnosis in result['diagnoses']:
print(f"- {diagnosis['name']}: {diagnosis['confidence']:.0%}")
if diagnosis.get('urgency'):
print(f" Urgency: {diagnosis['urgency']}")
except TajiriAPIError as e:
print(f"API Error: {e}")
const client = new TajiriVisionClient();
try {
const result = await client.diagnose(imageFile, {
cropType: 'tomato',
region: 'Kenya',
language: 'en'
});
console.log(`Health: ${result.crop_health}`);
for (const diagnosis of result.diagnoses) {
console.log(`- ${diagnosis.name}: ${(diagnosis.confidence * 100).toFixed(0)}%`);
if (diagnosis.urgency) {
console.log(` Urgency: ${diagnosis.urgency}`);
}
}
} catch (error) {
if (error instanceof TajiriAPIError) {
console.error(`API Error: ${error.message}`);
}
}
Async Client (Python)¶
For high-throughput applications processing multiple images concurrently.
import aiohttp
import asyncio
from typing import Optional, Dict, Any
class AsyncTajiriClient:
"""Async client for high-throughput applications."""
def __init__(
self,
base_url: str = "https://api.tajirifarm.com",
timeout: int = 60
):
self.base_url = base_url
self.timeout = aiohttp.ClientTimeout(total=timeout)
async def diagnose(
self,
image_bytes: bytes,
crop_type: Optional[str] = None,
region: Optional[str] = None,
language: str = "en"
) -> Dict[str, Any]:
"""Async diagnosis request."""
data = aiohttp.FormData()
data.add_field("image", image_bytes, filename="image.jpg")
if crop_type:
data.add_field("crop_type", crop_type)
if region:
data.add_field("region", region)
data.add_field("language", language)
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.post(f"{self.base_url}/diagnoses/", data=data) as response:
response.raise_for_status()
return await response.json()
async def batch_diagnose(image_paths: list[str], **kwargs) -> list[dict]:
"""Diagnose multiple images concurrently."""
client = AsyncTajiriClient()
async def process_one(path: str):
with open(path, "rb") as f:
return await client.diagnose(f.read(), **kwargs)
return await asyncio.gather(*[process_one(p) for p in image_paths])
# Usage
images = ["plant1.jpg", "plant2.jpg", "plant3.jpg"]
results = asyncio.run(batch_diagnose(images, crop_type="tomato", region="Kenya"))
for path, result in zip(images, results):
print(f"{path}: {result['crop_health']}")
Configuration¶
Environment Variables¶
Store configuration in environment variables for production:
Error Handling Best Practices¶
from tajiri_client import TajiriVisionClient, TajiriAPIError
client = TajiriVisionClient()
try:
result = client.diagnose("plant.jpg")
except TajiriAPIError as e:
if e.status_code == 400:
print("Invalid request - check image format")
elif e.status_code == 429:
print("Rate limited - slow down requests")
elif e.status_code >= 500:
print("Server error - retry later")
else:
print(f"API error: {e}")
except FileNotFoundError:
print("Image file not found")
except Exception as e:
print(f"Unexpected error: {e}")