Complete Guide to Data Extraction ETL Pipeline
Building a data extraction ETL pipeline means creating a system that automatically Extracts data from web sources, Transforms it into a clean, structured format, and Loads it into your database or data warehouse. This is the backbone of competitive intelligence, market research, pricing analytics, and lead generation programs.
This case study walks through how we built a production extraction pipeline using SearchHive's APIs -- and what we learned along the way.
Background
A mid-market SaaS company needed to track competitor pricing, feature changes, and customer reviews across 200+ competitor websites. Their existing approach involved manual spreadsheet updates by a team of three analysts -- slow, error-prone, and impossible to scale.
The problem: Data was stale within days of collection. Pricing changed faster than the team could update it. By the time a pricing insight reached the product team, it was already outdated.
The goal: Build an automated pipeline that extracts competitor data daily, transforms it into a consistent schema, and loads it into a PostgreSQL database for analytics and alerting.
Challenge
The extraction pipeline needed to handle several complexities:
- 200+ source URLs across different website architectures (Shopify, WordPress, custom builds)
- Dynamic JavaScript rendering on ~40% of target sites
- Anti-bot protection including Cloudflare, PerimeterX, and rate limiting
- Schema normalization -- each site had different HTML structures for the same data types (price, features, reviews)
- Error handling -- sites go down, change layouts, or block requests unexpectedly
- Cost control -- the pipeline needed to run within a predictable budget
Previous attempts with self-hosted Playwright instances and proxy services had failed. Proxy costs alone were running $500/month, CAPTCHA solving added another $200, and the team was spending hours maintaining browser instances.
Solution with SearchHive
SearchHive provided three key APIs for this pipeline:
- ScrapeForge -- for individual page extraction with built-in JS rendering and anti-bot bypass
- DeepDive -- for crawling entire competitor sites and extracting structured data
- SwiftSearch -- for finding competitor URLs and discovering new sources
Architecture
Schedule (daily 6am UTC)
|
v
[1] Source List (PostgreSQL)
|
v
[2] ScrapeForge API (parallel, 20 concurrent)
|
v
[3] Transform (Python -- normalize schemas)
|
v
[4] Validate (schema checks + anomaly detection)
|
v
[5] Load (PostgreSQL -- upsert with timestamps)
|
v
[6] Alert (Slack webhook for price changes > 10%)
Implementation
Step 1: Source management
Store competitor URLs in PostgreSQL with metadata about what to extract:
import psycopg2
from datetime import datetime
conn = psycopg2.connect("dbname=competitors user=api")
cursor = conn.cursor()
def get_sources():
cursor.execute(
"SELECT id, url, extract_fields, source_type "
"FROM competitor_sources WHERE active = true"
)
return cursor.fetchall()
sources = get_sources()
# Returns: [(1, 'https://competitor1.com/pricing',
# '["plan_name","price","features"]', 'pricing_page'), ...]
Step 2: Extraction with ScrapeForge
Parallel extraction using Python's concurrent.futures:
import requests
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
API_KEY = "your-searchhive-api-key"
BASE_URL = "https://api.searchhive.dev/v1/scrape"
def extract_page(source_id, url, extract_fields):
response = requests.post(
BASE_URL,
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"url": url,
"extract": json.loads(extract_fields)
},
timeout=60
)
if response.status_code == 200:
data = response.json()
return {
"source_id": source_id,
"url": url,
"status": "success",
"data": data.get("results", []),
"extracted_at": datetime.utcnow().isoformat()
}
return {
"source_id": source_id,
"url": url,
"status": "error",
"error": f"HTTP {response.status_code}",
"extracted_at": datetime.utcnow().isoformat()
}
# Run extractions in parallel
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {
executor.submit(extract_page, src[0], src[1], src[2]): src
for src in sources
}
results = []
for future in as_completed(futures):
results.append(future.result())
Step 3: Transformation
Normalize data from different sources into a consistent schema:
def transform_pricing(raw_data, source_type):
# Normalize pricing data from various source formats
transformed = []
for item in raw_data:
entry = {
"plan_name": item.get("plan_name") or item.get("tier"),
"price_monthly": normalize_price(item.get("price")),
"features": item.get("features", []),
"currency": detect_currency(item.get("price")),
}
transformed.append(entry)
return transformed
def normalize_price(price_str):
# Handle various price formats: "$49/mo", "49 USD", "Free", etc.
if not price_str or "free" in price_str.lower():
return 0.0
cleaned = "".join(c for c in str(price_str) if c.isdigit() or c == ".")
return float(cleaned) if cleaned else 0.0
Step 4: Validation with anomaly detection
Flag data that looks wrong before loading:
def validate_record(record, historical_prices):
price = record["price_monthly"]
# Check for missing critical fields
if not record["plan_name"]:
return False, "Missing plan name"
# Flag extreme price changes (>50% from historical average)
if historical_prices:
avg = sum(historical_prices) / len(historical_prices)
if avg > 0 and abs(price - avg) / avg > 0.5:
return False, f"Price anomaly: {price} vs avg {avg:.2f}"
return True, "OK"
Step 5: Load into PostgreSQL
Upsert with timestamps to maintain history:
def load_records(records):
cursor.execute("""
INSERT INTO competitor_pricing
(source_id, plan_name, price_monthly, currency, features, extracted_at)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (source_id, plan_name, DATE(extracted_at))
DO UPDATE SET
price_monthly = EXCLUDED.price_monthly,
features = EXCLUDED.features
""", (
record["source_id"],
record["plan_name"],
record["price_monthly"],
record["currency"],
json.dumps(record["features"]),
record["extracted_at"]
))
conn.commit()
Step 6: Alerting
Send alerts for significant pricing changes:
def check_price_alerts(source_id, current_price):
cursor.execute("""
SELECT price_monthly FROM competitor_pricing
WHERE source_id = %s
ORDER BY extracted_at DESC LIMIT 2
""", (source_id,))
prices = [row[0] for row in cursor.fetchall()]
if len(prices) >= 2:
old, new = prices[1], prices[0]
if old > 0:
change = (new - old) / old * 100
if abs(change) >= 10:
send_slack_alert(
f"Price alert: Source {source_id} changed "
f"{change:+.1f}% (${old} -> ${new})"
)
Results
After deploying the pipeline:
- 200+ sources extracted daily in under 15 minutes (vs. 3 analysts taking 2 days)
- 99.2% extraction success rate -- SearchHive handled JS rendering and anti-bot automatically
- Monthly cost dropped from $700 to $49 -- SearchHive's Builder plan vs. proxy + CAPTCHA + infra
- Data latency reduced from 48 hours to under 1 hour
- Zero maintenance -- no browser instances to patch, no proxies to rotate
Lessons Learned
1. Start with a schema, not a scraper. Define your output schema first. Different sources will need different extraction logic, but the output should always conform to the same structure. This makes the Transform and Load steps much simpler.
2. Batch parallel, validate serial. Run extractions in parallel for speed, but validate and load sequentially. A bad record shouldn't block other extractions, but it shouldn't silently corrupt your database either.
3. Log everything. Store raw responses alongside transformed data. When a source's layout changes, you'll need the raw data to debug and update your extraction logic.
4. Use credits wisely. ScrapeForge costs 1 credit per page. For sites that update daily, scrape once daily, not hourly. For sites that rarely change, scrape weekly. The SearchHive pricing page shows credit costs per operation.
5. Monitor your monitors. Set up health checks on the pipeline itself. Track extraction success rates per source, flag sources that start failing, and alert when the pipeline doesn't complete on schedule.
Conclusion
Building a data extraction ETL pipeline doesn't have to mean managing proxy farms and browser clusters. With SearchHive's APIs, you get production-grade extraction infrastructure that handles the hard parts -- JS rendering, anti-bot, proxy rotation -- so you can focus on the data itself.
Get started with 500 free credits on SearchHive's free tier. No credit card required. Full access to SwiftSearch, ScrapeForge, and DeepDive endpoints. Check the docs for pipeline setup guides and see our Python web scraping guide for more extraction patterns.