How to Build a Data Extraction Pipeline — Step-by-Step
A data extraction pipeline automates the process of fetching data from websites, transforming it into a usable format, and storing it for analysis. Whether you're monitoring competitor prices, building a training dataset, or feeding data to an AI model, a reliable pipeline is essential.
This tutorial walks through building a production-ready pipeline using Python and the SearchHive API.
Prerequisites
- Python 3.9+ installed
- A SearchHive API key (get one free)
- Basic familiarity with Python and HTTP requests
Install the required packages:
pip install httpx tenacity sqlalchemy python-dotenv
Create a .env file:
SEARCHHIVE_API_KEY=your_key_here
DATABASE_URL=sqlite:///pipeline.db
Step 1: Define Your Data Sources
Start by listing what you want to extract. Create a configuration that maps sources to extraction rules:
import os
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class ExtractionSource:
url: str
name: str
extract_pattern: Optional[str] = None # CSS selector or None for full page
frequency_hours: int = 24
enabled: bool = True
# Define sources to extract from
SOURCES = [
ExtractionSource(
url="https://news.ycombinator.com",
name="hackernews_front",
frequency_hours=1
),
ExtractionSource(
url="https://example.com/blog",
name="example_blog",
frequency_hours=12
),
]
Step 2: Fetch Page Content
Use SearchHive's ScrapeForge API to extract clean content from each URL. ScrapeForge handles JavaScript rendering, anti-bot bypass, and returns structured markdown:
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.environ.get("SEARCHHIVE_API_KEY")
API_BASE = "https://api.searchhive.dev/v1"
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def fetch_page(url: str, format: str = "markdown") -> str:
"""Fetch and extract content from a URL using ScrapeForge.
Retries on failure with exponential backoff."""
resp = httpx.post(
f"{API_BASE}/scrapeforge",
json={"url": url, "format": format},
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=30.0
)
resp.raise_for_status()
data = resp.json()
return data.get("content", "")
Step 3: Parse and Transform Data
Raw markdown needs structure. Parse it into a consistent format based on your needs:
import re
from datetime import datetime
def parse_hackernews(content: str) -> list:
"""Parse Hacker News front page content into structured records."""
records = []
# HN pages have title links and point/score lines
lines = content.strip().split("\n")
current_title = None
current_url = None
for line in lines:
line = line.strip()
if not line:
continue
# Look for numbered links (HN titles)
match = re.match(r'^\d+\.\s+\[([^\]]+)\]\(([^)]+)\)', line)
if match:
if current_title:
records.append({
"title": current_title,
"url": current_url,
"source": "hackernews",
"extracted_at": datetime.utcnow().isoformat()
})
current_title = match.group(1)
current_url = match.group(2)
if current_title:
records.append({
"title": current_title,
"url": current_url,
"source": "hackernews",
"extracted_at": datetime.utcnow().isoformat()
})
return records
def parse_generic_blog(content: str) -> list:
"""Parse a generic blog page into article records.
Extracts headings as article titles."""
records = []
# Look for H2 headings in markdown
headings = re.findall(r'^##\s+(.+)$', content, re.MULTILINE)
for heading in headings:
records.append({
"title": heading.strip(),
"content_preview": content[:500],
"source": "blog",
"extracted_at": datetime.utcnow().isoformat()
})
return records
# Map source names to parsers
PARSERS = {
"hackernews_front": parse_hackernews,
"example_blog": parse_generic_blog,
}
Step 4: Set Up Storage
Store extracted data in a database for querying and deduplication:
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:///pipeline.db")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
class ExtractedRecord(Base):
__tablename__ = "extracted_records"
id = Column(Integer, primary_key=True)
title = Column(String(500))
url = Column(String(1000), unique=True, index=True)
content = Column(Text)
source = Column(String(100))
extracted_at = Column(DateTime)
Base.metadata.create_all(engine)
def save_records(records: list):
"""Save records to database, skip duplicates by URL."""
db = SessionLocal()
saved = 0
for record in records:
url = record.get("url", "")
title = record.get("title", "")
# Skip empty records
if not title:
continue
# Check for duplicate URL
existing = db.query(ExtractedRecord).filter_by(url=url).first()
if existing:
continue
db_record = ExtractedRecord(
title=title,
url=url,
content=record.get("content_preview", ""),
source=record.get("source", ""),
extracted_at=datetime.utcnow()
)
db.add(db_record)
saved += 1
db.commit()
db.close()
return saved
Step 5: Build the Pipeline Orchestrator
Tie everything together into a pipeline that processes all sources:
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def run_pipeline():
"""Run one complete extraction pipeline cycle."""
logger.info("Starting pipeline cycle")
total_saved = 0
for source in SOURCES:
if not source.enabled:
continue
logger.info(f"Fetching: {source.name} ({source.url})")
try:
content = fetch_page(source.url)
logger.info(f" Fetched {len(content)} characters")
parser = PARSERS.get(source.name)
if parser:
records = parser(content)
logger.info(f" Parsed {len(records)} records")
saved = save_records(records)
total_saved += saved
logger.info(f" Saved {saved} new records")
else:
logger.warning(f" No parser for {source.name}, saving raw content")
save_records([{
"title": source.name,
"url": source.url,
"content_preview": content[:1000],
"source": source.name
}])
except Exception as e:
logger.error(f" Failed: {e}")
continue
logger.info(f"Pipeline complete: {total_saved} new records saved")
return total_saved
if __name__ == "__main__":
run_pipeline()
Step 6: Add Search-Based Discovery
Static source lists miss new pages. Add search-based discovery to find relevant URLs dynamically:
def discover_new_sources(query: str, limit: int = 10) -> list:
"""Use SwiftSearch to find URLs matching a query."""
resp = httpx.get(
f"{API_BASE}/swiftsearch",
params={"q": query, "limit": limit},
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=15.0
)
resp.raise_for_status()
results = resp.json().get("results", [])
return [r["url"] for r in results if "url" in r]
# Example: Find new blog posts about web scraping
new_urls = discover_new_sources("web scraping tutorial 2025 site:medium.com")
for url in new_urls[:3]:
content = fetch_page(url)
records = parse_generic_blog(content)
saved = save_records(records)
print(f"Discovered and saved {saved} records from {url}")
Step 7: Schedule the Pipeline
Use a simple scheduler to run the pipeline at regular intervals:
import time
import schedule
def scheduled_job():
run_pipeline()
# Schedule based on source frequencies
schedule.every(1).hours.do(scheduled_job)
if __name__ == "__main__":
logger.info("Pipeline scheduler started")
run_pipeline() # Run immediately on start
while True:
schedule.run_pending()
time.sleep(60)
For production, consider using cron expression generator, Celery, or a cloud scheduler (AWS EventBridge, GCP Cloud Scheduler) instead of an in-process scheduler.
Complete Pipeline Code
Put it all together in a single file for quick setup:
# pipeline.py — Complete data extraction pipeline
import os
import re
import time
import logging
import httpx
from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_exponential
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.environ.get("SEARCHHIVE_API_KEY")
API_BASE = "https://api.searchhive.dev/v1"
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def fetch_page(url: str) -> str:
resp = httpx.post(
f"{API_BASE}/scrapeforge",
json={"url": url, "format": "markdown"},
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=30.0
)
resp.raise_for_status()
return resp.json().get("content", "")
def extract_links_from_markdown(content: str) -> list:
# Find all markdown links
return re.findall(r'\[([^\]]+)\]\(([^)]+)\)', content)
def run_pipeline():
logger.info("Pipeline cycle started")
sources = [
"https://news.ycombinator.com",
"https://example.com",
]
total_records = 0
for url in sources:
try:
content = fetch_page(url)
links = extract_links_from_markdown(content)
logger.info(f"{url}: fetched {len(content)} chars, found {len(links)} links")
total_records += len(links)
except Exception as e:
logger.error(f"{url} failed: {e}")
logger.info(f"Pipeline cycle complete: {total_records} records processed")
return total_records
if __name__ == "__main__":
run_pipeline()
Common Issues and Fixes
Timeout errors: Increase the timeout parameter for slow sites. Some pages take 10-15 seconds to render with JavaScript.
Empty content: Some sites block scraping. ScrapeForge handles most anti-bot measures, but extremely protected sites (Cloudflare Enterprise, Datadome) may require additional configuration.
Duplicate records: Always check your database for existing URLs before inserting. The unique=True constraint on the URL column handles this at the database level.
Rate limiting: Space out requests if you're hitting many sources. Add time.sleep(1) between fetches or use async I/O with httpx.AsyncClient.
Next Steps
Once your basic pipeline is running, consider these enhancements:
- Add content classification using a local model or SearchHive DeepDive
- Set up alerts via webhooks when specific patterns are detected
- Export data to CSV, free JSON formatter, or a data warehouse for analysis
- Monitor pipeline health with logging and alerting on failures
For more on search-based data discovery, see /blog/complete-guide-to-news-monitoring-automation and /blog/best-search-api-pricing-tools-2025.
Start Building with SearchHive
SearchHive's ScrapeForge API handles the hardest part of any extraction pipeline — reliably fetching page content. Get 500 free credits and build your first pipeline today.