How to Implement API Gateway Patterns -- Step-by-Step
An API gateway sits between clients and your backend services, handling cross-cutting concerns like authentication, rate limiting, routing, and logging. This tutorial walks through the most common API gateway patterns and shows you how to implement them with Python.
Key Takeaways
- API gateways centralize cross-cutting concerns (auth, rate limiting, logging) away from business logic
- The five core patterns are: reverse proxy, aggregator, offloader, circuit breaker, and BFF (Backend for Frontend)
- You can build a functional gateway in under 200 lines of Python using FastAPI
- SearchHive's own API architecture uses these patterns to serve SwiftSearch, ScrapeForge, and DeepDive from a single endpoint
Prerequisites
- Python 3.10+
- pip install fastapi uvicorn httpx
- Basic understanding of REST APIs and HTTP
Step 1: Understand the Core Problem
Without a gateway, every microservice handles its own authentication, rate limiting, logging, and error handling. This leads to duplicated code, inconsistent behavior, and harder maintenance.
A gateway centralizes these concerns:
Client -> API Gateway -> Service A (SwiftSearch)
-> Service B (ScrapeForge)
-> Service C (DeepDive)
Step 2: Build a Basic Reverse Proxy Gateway
The simplest gateway pattern is a reverse proxy -- it forwards requests to backend services based on the URL path:
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import httpx
import time
app = FastAPI(title="API Gateway")
# Backend service registry
SERVICES = {
"search": "http://localhost:8001",
"scrape": "http://localhost:8002",
"research": "http://localhost:8003",
}
async def proxy_request(request: Request, service_url: str) -> Response:
"""Forward a request to a backend service."""
target_url = f"{service_url}{request.url.path}"
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.request(
method=request.method,
url=target_url,
headers={k: v for k, v in request.headers.items()
if k.lower() != "host"},
params=dict(request.query_params),
content=await request.body(),
)
return JSONResponse(
status_code=resp.status_code,
content=resp.json(),
)
@app.api_route("/api/v1/search/{path:path}", methods=["GET", "POST"])
async def search_gateway(request: Request):
"""Route to search service."""
return await proxy_request(request, SERVICES["search"])
@app.api_route("/api/v1/scrape/{path:path}", methods=["GET", "POST"])
async def scrape_gateway(request: Request):
"""Route to scrape service."""
return await proxy_request(request, SERVICES["scrape"])
@app.api_route("/api/v1/research/{path:path}", methods=["GET", "POST"])
async def research_gateway(request: Request):
"""Route to research service."""
return await proxy_request(request, SERVICES["research"])
Run with: uvicorn gateway:app --port 8080
Step 3: Add Authentication
Add API key validation at the gateway level so backend services don't need to handle it:
from fastapi import FastAPI, Request, HTTPException, Security
from fastapi.security import APIKeyHeader
from functools import wraps
app = FastAPI(title="API Gateway")
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# In production, use a database or environment variable
VALID_API_KEYS = {"sk-dev-abc123", "sk-prod-xyz789"}
async def verify_api_key(request: Request) -> str:
"""Extract and validate API key from request."""
api_key = request.headers.get("X-API-Key")
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
if api_key not in VALID_API_KEYS:
raise HTTPException(status_code=403, detail="Invalid API key")
return api_key
# Middleware to enforce auth on protected routes
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Skip auth for health endpoints, enforce on everything else."""
if request.url.path in ["/health", "/docs", "/openapi.json"]:
return await call_next(request)
if request.url.path.startswith("/api/v1/"):
api_key = request.headers.get("X-API-Key")
if not api_key or api_key not in VALID_API_KEYS:
return JSONResponse(
status_code=401,
content={"error": "Valid X-API-Key header required"},
)
return await call_next(request)
Step 4: Implement Rate Limiting
Rate limiting protects your backend services from abuse. A simple in-memory approach:
from collections import defaultdict
import time
class RateLimiter:
"""Token bucket rate limiter per API key."""
def __init__(self, max_requests=100, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, key: str) -> bool:
"""Check if a request is allowed under the rate limit."""
now = time.time()
# Clean old entries
self.requests[key] = [
t for t in self.requests[key]
if now - t < self.window_seconds
]
if len(self.requests[key]) >= self.max_requests:
return False
self.requests[key].append(now)
return True
def get_remaining(self, key: str) -> int:
"""Get remaining requests in current window."""
now = time.time()
active = [
t for t in self.requests[key]
if now - t < self.window_seconds
]
return max(0, self.max_requests - len(active))
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
# Add to middleware
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
if request.url.path.startswith("/api/v1/"):
api_key = request.headers.get("X-API-Key", "anonymous")
if not rate_limiter.is_allowed(api_key):
remaining = rate_limiter.get_remaining(api_key)
return JSONResponse(
status_code=429,
content={
"error": "Rate limit exceeded",
"retry_after": rate_limiter.window_seconds,
"remaining": remaining,
},
headers={"Retry-After": str(rate_limiter.window_seconds)},
)
return await call_next(request)
For production, use Redis instead of in-memory storage for distributed rate limiting across multiple gateway instances.
Step 5: Add Request Logging and Metrics
Observability is critical for any gateway. Log every request with timing:
import logging
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("gateway")
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
"""Log request method, path, status code, and duration."""
start = time.time()
response = await call_next(request)
duration = time.time() - start
logger.info(
f"{request.method} {request.url.path} "
f"-> {response.status_code} "
f"({duration:.3f}s)"
)
response.headers["X-Response-Time"] = f"{duration:.3f}"
return response
Step 6: Implement the Aggregator Pattern
The aggregator pattern combines responses from multiple backend services into a single response. This is useful when a client needs data from several services at once:
import asyncio
async def aggregated_search(request: Request) -> Response:
"""Search + scrape top results in a single gateway call."""
body = await request.json()
query = body.get("query", "")
async with httpx.AsyncClient(timeout=60.0) as client:
# Parallel: search + scrape
search_task = client.post(
f"{SERVICES['search']}/api/v1/search",
json={"query": query, "num_results": 10},
)
research_task = client.post(
f"{SERVICES['research']}/api/v1/research",
json={"query": query},
)
# Run both in parallel
search_resp, research_resp = await asyncio.gather(
search_task, research_task
)
return JSONResponse(content={
"search_results": search_resp.json().get("results", []),
"research": research_resp.json().get("results", []),
})
@app.post("/api/v1/aggregated")
async def aggregated_endpoint(request: Request):
"""Combined search + research endpoint."""
return await aggregated_search(request)
This is the pattern SearchHive's own API uses -- SwiftSearch, ScrapeForge, and DeepDive are all accessible through the same gateway.
Step 7: Add Circuit Breaker Pattern
A circuit breaker prevents cascading failures by stopping requests to a service that's consistently failing:
class CircuitBreaker:
"""Prevents requests to failing services."""
def __init__(self, failure_threshold=5, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = defaultdict(int)
self.last_failure = defaultdict(float)
self.state = defaultdict(lambda: "closed") # closed = healthy
def can_execute(self, service: str) -> bool:
"""Check if a request to the service is allowed."""
now = time.time()
if self.state[service] == "open":
# Check if recovery timeout has passed
if now - self.last_failure[service] > self.recovery_timeout:
self.state[service] = "half-open"
return True
return False
return True
def record_success(self, service: str):
"""Record a successful request."""
self.failures[service] = 0
self.state[service] = "closed"
def record_failure(self, service: str):
"""Record a failed request."""
self.failures[service] += 1
self.last_failure[service] = time.time()
if self.failures[service] >= self.failure_threshold:
self.state[service] = "open"
logger.warning(
f"Circuit breaker OPEN for {service} "
f"({self.failures[service]} failures)"
)
circuit_breaker = CircuitBreaker()
@app.middleware("http")
async def circuit_breaker_middleware(request: Request, call_next):
"""Apply circuit breaker to backend service calls."""
for service_name, prefix in [("search", "/api/v1/search"),
("scrape", "/api/v1/scrape"),
("research", "/api/v1/research")]:
if request.url.path.startswith(prefix):
if not circuit_breaker.can_execute(service_name):
return JSONResponse(
status_code=503,
content={
"error": f"Service {service_name} is temporarily unavailable",
"retry_after": circuit_breaker.recovery_timeout,
},
)
break
return await call_next(request)
Step 8: Complete Example
Here's the complete gateway combining all patterns:
# gateway.py -- Complete API Gateway with all patterns
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import httpx
import asyncio
import time
import logging
from collections import defaultdict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("gateway")
app = FastAPI(title="API Gateway")
SERVICES = {
"search": "http://localhost:8001",
"scrape": "http://localhost:8002",
"research": "http://localhost:8003",
}
VALID_API_KEYS = {"sk-dev-abc123"}
# Rate limiter
class RateLimiter:
def __init__(self, max_requests=100, window=60):
self.max = max_requests
self.window = window
self.requests = defaultdict(list)
def check(self, key):
now = time.time()
self.requests[key] = [t for t in self.requests[key]
if now - t < self.window]
if len(self.requests[key]) >= self.max:
return False
self.requests[key].append(now)
return True
limiter = RateLimiter()
# Circuit breaker
class CircuitBreaker:
def __init__(self, threshold=5, timeout=30):
self.threshold = threshold
self.timeout = timeout
self.failures = defaultdict(int)
self.last_fail = {}
self.state = {}
def ok(self, service):
if self.state.get(service) == "open":
if time.time() - self.last_fail.get(service, 0) > self.timeout:
self.state[service] = "half-open"
return True
return False
return True
def success(self, service):
self.failures[service] = 0
self.state[service] = "closed"
def fail(self, service):
self.failures[service] += 1
self.last_fail[service] = time.time()
if self.failures[service] >= self.threshold:
self.state[service] = "open"
breaker = CircuitBreaker()
@app.middleware("http")
async def gateway_middleware(request: Request, call_next):
start = time.time()
# Skip auth/limits for health and docs
if request.url.path in ["/health", "/docs"]:
return await call_next(request)
# Auth check
api_key = request.headers.get("X-API-Key")
if not api_key or api_key not in VALID_API_KEYS:
return JSONResponse(status_code=401,
content={"error": "Valid X-API-Key required"})
# Rate limit
if not limiter.check(api_key):
return JSONResponse(status_code=429,
content={"error": "Rate limit exceeded"})
# Circuit breaker for API routes
for svc, prefix in SERVICES.items():
if request.url.path.startswith(f"/api/v1/{svc}"):
if not breaker.ok(svc):
return JSONResponse(status_code=503,
content={"error": f"{svc} unavailable"})
response = await call_next(request)
duration = time.time() - start
logger.info(f"{request.method} {request.url.path} "
f"-> {response.status_code} ({duration:.3f}s)")
return response
@app.get("/health")
async def health():
return {"status": "ok", "services": list(SERVICES.keys())}
@app.api_route("/api/v1/{service}/{path:path}",
methods=["GET", "POST"])
async def proxy(request: Request, service: str):
if service not in SERVICES:
return JSONResponse(status_code=404,
content={"error": "Unknown service"})
target = f"{SERVICES[service]}/api/v1/{service}/{request.path_params['path']}"
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.request(
method=request.method,
url=target,
headers={k: v for k, v in request.headers.items()
if k.lower() != "host"},
content=await request.body(),
)
if resp.status_code >= 500:
breaker.fail(service)
else:
breaker.success(service)
return JSONResponse(status_code=resp.status_code, content=resp.json())
Common Issues
1. Request body consumed by middleware. If your middleware reads request.body(), the downstream handler can't read it again. Use await request.body() once and pass it explicitly.
2. WebSocket support. Standard reverse proxying doesn't handle WebSocket upgrades. Use httpx with ws support or a dedicated proxy like Nginx for WebSocket routes.
3. HTTPS termination. In production, terminate TLS at the gateway (Nginx, Cloudflare) and proxy to backend services over HTTP. Don't manage certificates on every microservice.
4. Distributed rate limiting. In-memory rate limiting only works with a single gateway instance. For multiple instances, use Redis with INCR + EXPIRE.
Next Steps
Once your gateway is working, consider adding:
- Request/response transformation (format conversion, field mapping)
- Caching (Redis or in-memory for frequently requested data)
- API versioning at the gateway level
- OpenAPI documentation aggregation from all backend services
For a real-world example of these patterns in production, SearchHive serves SwiftSearch, ScrapeForge, and DeepDive through a unified API gateway. Explore the docs to see how it works.
/blog/complete-guide-to-scraping-dynamic-content /blog/complete-guide-to-data-extraction-for-ai /blog/complete-guide-to-ecommerce-automation