Webhooks and API events are how modern services communicate asynchronously. Stripe sends payment events, GitHub fires repository webhooks, Slack posts notification payloads -- and you need a way to receive, log, and analyze all of it. This tutorial covers building a webhook monitoring system with Python, from receiving events to storing and querying them with a real-time dashboard.
Key Takeaways
- A webhook monitor needs three components: a receiver (HTTP server), a storage layer, and a query/display interface
- Flask or FastAPI can receive webhook events in under 20 lines of code
- Store events in SQLite for simplicity or PostgreSQL for production scale
- Use SearchHive's SwiftSearch API to correlate webhook events with external data
- Alert on anomalies by tracking event frequency and failure patterns
Prerequisites
- Python 3.8+
- pip install flask fastapi uvicorn requests
- A public URL for webhook delivery (ngrok, Cloudflare Tunnel, or a VPS)
- Optional: pip install sqlalchemy alembic (for database ORM)
- A SearchHive API key (free at searchhive.dev)
Step 1: Build a Webhook Receiver with Flask
The simplest webhook receiver is an HTTP endpoint that accepts POST requests and logs the payload:
from flask import Flask, request, jsonify
import json
import uuid
from datetime import datetime, timezone
app = Flask(__name__)
# In-memory event store (replace with database in production)
event_store = []
@app.route("/webhook/<source>", methods=["POST"])
def receive_webhook(source):
"""Receive a webhook from any source."""
event_id = str(uuid.uuid4())[:8]
timestamp = datetime.now(timezone.utc).isoformat()
event = {
"id": event_id,
"source": source,
"timestamp": timestamp,
"headers": dict(request.headers),
"body": request.get_json(silent=True) or request.form.to_dict(),
"raw_body": request.get_data(as_text=True)[:5000]
}
event_store.append(event)
print(f"[{event_id}] {source}: {request.content_type} ({len(event['raw_body'])} chars)")
# Always return 200 quickly to prevent retries
return jsonify({"received": event_id}), 200
@app.route("/events", methods=["GET"])
def list_events():
"""List all stored events."""
return jsonify(event_store[-100:]) # Last 100 events
@app.route("/events/<source>", methods=["GET"])
def list_events_by_source(source):
"""List events from a specific source."""
filtered = [e for e in event_store if e["source"] == source]
return jsonify(filtered[-50:])
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)
Run this with python app.py and expose it with ngrok: ngrok http 8080. Use the ngrok URL as your webhook endpoint.
Step 2: Add FastAPI with Async Support
For production workloads, FastAPI offers async support, automatic docs, and better performance:
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import asyncio
import json
from datetime import datetime, timezone
app = FastAPI(title="Webhook Monitor")
# Replace with database in production
events = []
@app.post("/webhook/{source}")
async def receive_webhook(source: str, request: Request):
body = await request.json()
event = {
"source": source,
"timestamp": datetime.now(timezone.utc).isoformat(),
"body": body
}
events.append(event)
return {"status": "received", "event_id": len(events)}
@app.get("/events")
async def list_events(source: str = None):
if source:
return [e for e in events if e["source"] == source][-100:]
return events[-100:]
@app.get("/stats")
async def stats():
from collections import Counter
source_counts = Counter(e["source"] for e in events)
return {"total_events": len(events), "by_source": dict(source_counts)}
Run with: uvicorn app:app --host 0.0.0.0 --port 8080. FastAPI auto-generates interactive docs at /docs.
Step 3: Persist Events to SQLite
An in-memory list loses data on restart. Use SQLite for persistent storage:
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
DB_PATH = "webhooks.db"
def init_db():
with get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
timestamp TEXT NOT NULL,
event_type TEXT,
status TEXT DEFAULT 'received',
body TEXT,
headers TEXT,
raw_body TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_source ON events(source)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_timestamp ON events(timestamp)")
@contextmanager
def get_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def store_event(source, body, headers=None, event_type=None):
with get_conn() as conn:
conn.execute(
"INSERT INTO events (source, timestamp, event_type, body, headers) VALUES (?, ?, ?, ?, ?)",
(source, datetime.now(timezone.utc).isoformat(), event_type, json.dumps(body), json.dumps(headers or {}))
)
def query_events(source=None, limit=100, since=None):
with get_conn() as conn:
sql = "SELECT * FROM events"
params = []
conditions = []
if source:
conditions.append("source = ?")
params.append(source)
if since:
conditions.append("timestamp >= ?")
params.append(since)
if conditions:
sql += " WHERE " + " AND ".join(conditions)
sql += " ORDER BY id DESC LIMIT ?"
params.append(limit)
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def get_event_stats():
with get_conn() as conn:
total = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
by_source = dict(conn.execute(
"SELECT source, COUNT(*) as cnt FROM events GROUP BY source"
).fetchall())
latest = conn.execute(
"SELECT source, MAX(timestamp) as last FROM events GROUP BY source"
).fetchall()
return {"total": total, "by_source": by_source, "latest": [dict(r) for r in latest]}
Step 4: Add Event Processing and Alerting
Beyond storing events, process them to trigger actions and detect anomalies:
from collections import defaultdict
import time
class EventProcessor:
def __init__(self):
self.event_counts = defaultdict(lambda: {"count": 0, "last_seen": 0})
self.error_counts = defaultdict(int)
self.alert_threshold = 100 # Alert after 100 events from one source in 60s
def process(self, event):
"""Process an event: count, detect anomalies, trigger alerts."""
source = event.get("source", "unknown")
now = time.time()
counts = self.event_counts[source]
# Reset counter if more than 60 seconds since last event
if now - counts["last_seen"] > 60:
counts["count"] = 0
counts["count"] += 1
counts["last_seen"] = now
# Check for anomaly -- burst of events
if counts["count"] >= self.alert_threshold:
self.trigger_alert(source, counts["count"])
# Check for error events
if self.is_error_event(event):
self.error_counts[source] += 1
if self.error_counts[source] >= 10:
self.trigger_alert(source, f"{self.error_counts[source]} errors")
return {"processed": True, "source": source}
def is_error_event(self, event):
body = event.get("body", {})
if isinstance(body, dict):
return body.get("type") == "error" or body.get("status") == "failed"
return False
def trigger_alert(self, source, detail):
print(f"[ALERT] Anomaly detected from {source}: {detail}")
# In production, send to Slack, PagerDuty, email, etc.
# Integrate with Flask/FastAPI receiver
processor = EventProcessor()
# In your receive_webhook function, add:
# processor.process(event)
Step 5: Correlate Events with External Data Using SearchHive
Sometimes you need context beyond the webhook payload. If Stripe sends a charge.failed event with a customer email, you might want to look up that customer or research the error. SearchHive's SwiftSearch API provides real-time web search for enrichment:
import requests
SH_KEY = "sk_live_your_key_here"
SH_HEADERS = {"Authorization": f"Bearer: {SH_KEY}", "Content-Type": "application/json"}
def enrich_event(event):
"""Search for context about a webhook event."""
source = event.get("source", "")
body = event.get("body", {})
event_type = body.get("type", body.get("event", ""))
if not event_type:
return event
# Build a search query from the event data
query = f"{source} {event_type} troubleshooting documentation"
resp = requests.post(
"https://www.searchhive.dev/api/v1/swiftsearch",
headers=SH_HEADERS,
json={"query": query, "max_results": 3, "auto_scrape_top": 1}
)
if resp.status_code == 200:
data = resp.json()
event["enrichment"] = {
"search_query": query,
"top_results": [
{"title": r.get("title"), "url": r.get("url")}
for r in data.get("results", [])[:3]
]
}
return event
# Usage: enrich a Stripe error event
enriched = enrich_event({
"source": "stripe",
"body": {"type": "charge.failed", "error": {"code": "card_declined"}}
})
print(enriched.get("enrichment", {}).get("top_results", []))
This pattern is useful for ops teams who want to see relevant documentation or community discussions alongside raw webhook data.
Step 6: Build a Simple Dashboard Endpoint
Add a summary endpoint that returns HTML for a monitoring dashboard:
@app.get("/dashboard")
async def dashboard():
stats = get_event_stats()
recent = query_events(limit=20)
html = f"""
<html>
<head><title>Webhook Monitor</title>
<style>
body {{ font-family: monospace; background: #1a1a2e; color: #eee; padding: 20px; }}
.stat {{ display: inline-block; background: #16213e; padding: 15px 25px; margin: 5px; border-radius: 8px; }}
.stat h3 {{ color: #e94560; margin: 0; }}
table {{ border-collapse: collapse; width: 100%; margin-top: 20px; }}
th, td {{ border: 1px solid #333; padding: 8px 12px; text-align: left; }}
th {{ background: #16213e; color: #e94560; }}
</style></head>
<body>
<h1>Webhook Monitor</h1>
<div class="stat"><h3>{stats['total']}</h3><p>Total Events</p></div>
"""
for source, count in stats["by_source"].items():
html += f'<div class="stat"><h3>{count}</h3><p>{source}</p></div>'
html += "<h2>Recent Events</h2><table><tr><th>ID</th><th>Source</th><th>Timestamp</th><th>Type</th></tr>"
for e in recent:
body = json.loads(e["body"]) if isinstance(e["body"], str) else e["body"]
etype = body.get("type", body.get("event", "N/A"))
html += f'<tr><td>{e["id"]}</td><td>{e["source"]}</td><td>{e["timestamp"]}</td><td>{etype}</td></tr>'
html += "</table></body></html>"
from fastapi.responses import HTMLResponse
return HTMLResponse(content=html)
Complete Code Example
Here is the complete webhook monitoring system combining all components:
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, HTMLResponse
import sqlite3, json, uuid
from contextlib import contextmanager
from datetime import datetime, timezone
from collections import defaultdict
app = FastAPI(title="Webhook Monitor")
DB_PATH = "webhooks.db"
@contextmanager
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db():
with get_db() as conn:
conn.execute("""CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT, timestamp TEXT, body TEXT, headers TEXT
)""")
init_db()
@app.post("/webhook/{source}")
async def webhook(source: str, request: Request):
body = await request.json()
headers = dict(request.headers)
ts = datetime.now(timezone.utc).isoformat()
with get_db() as conn:
conn.execute("INSERT INTO events (source, timestamp, body, headers) VALUES (?,?,?,?)",
(source, ts, json.dumps(body), json.dumps(headers)))
return {"status": "ok", "source": source, "timestamp": ts}
@app.get("/events")
async def events(source: str = None, limit: int = 50):
with get_db() as conn:
if source:
rows = conn.execute("SELECT * FROM events WHERE source=? ORDER BY id DESC LIMIT ?", (source, limit)).fetchall()
else:
rows = conn.execute("SELECT * FROM events ORDER BY id DESC LIMIT ?", (limit,)).fetchall()
return [dict(r) for r in rows]
@app.get("/stats")
async def stats():
with get_db() as conn:
total = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
by_source = dict(conn.execute("SELECT source, COUNT(*) as c FROM events GROUP BY source").fetchall())
return {"total_events": total, "by_source": by_source}
Common Issues
1. Webhook delivery failures. If your server is down when a webhook fires, most services retry with exponential backoff. Return 200 quickly and process asynchronously to avoid timeouts.
2. Payload size limits. Large payloads (Stripe file uploads, GitHub push events with many commits) can exceed default Flask limits. Set app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 (10 MB).
3. Duplicate events. Services often redeliver webhooks if they do not receive a 200 within timeout. Deduplicate by storing an event ID and checking before processing.
4. Security. Verify webhook signatures. Stripe sends Stripe-Signature, GitHub sends X-Hub-Signature-256. Validate these before processing to prevent spoofed events.
Next Steps
- How to Build a Data Lake from Web Scraping with Python -- store processed webhook data in a data lake
- SearchHive SwiftSearch API docs -- enrich webhook events with real-time search
- SearchHive blog -- more tutorials on API integration and data pipelines
Start monitoring webhooks today with SearchHive's free tier -- 500 credits for search enrichment, no credit card required.