A data lake built from web scraping gives you a centralized repository of structured and semi-structured data from across the internet -- product catalogs, job listings, real estate listings, news articles, financial data, and more. Unlike traditional ETL pipelines that pull from databases and APIs, a web scraping data lake ingests unstructured HTML, transforms it into clean data, and stores it in a queryable format. This tutorial shows you how to build one with Python, from scraping to storage.
Key Takeaways
- A web scraping data lake has four stages: ingest (scrape), transform (clean), store (persist), and query (analyze)
- SearchHive's ScrapeForge API provides reliable data ingestion with JS rendering and anti-bot bypass
- Use parquet files for columnar storage and DuckDB for fast SQL queries on your scraped data
- Partition data by source and date for efficient querying as the lake grows
- Schedule regular ingestion jobs to keep your data fresh
- Use DuckDB for fast analytical queries without a database server
- Parquet is the recommended format -- it is columnar, compressed, and supported by virtually every data tool
Prerequisites
- Python 3.9+
- pip install requests beautifulsoup4 pandas pyarrow duckdb
- A SearchHive API key (free at searchhive.dev)
- 5-10 GB of free disk space (or a cloud storage bucket)
Step 1: Design Your Data Lake Structure
A practical data lake for web scraping uses a simple directory-based layout with parquet files. This approach scales from hundreds to millions of records without needing a database server:
data_lake/
raw/
products/
2025-04-17/
amazon.parquet
bestbuy.parquet
jobs/
2025-04-17/
linkedin.parquet
indeed.parquet
curated/
products_all.parquet
jobs_all.parquet
metadata.json
The raw/ directory holds source-specific data partitioned by date. The curated/ directory holds merged, deduplicated datasets ready for analysis.
Step 2: Build the Scraper Ingestion Layer
Create a generic ingestion function that scrapes URLs and stores results as parquet:
import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import date
from pathlib import Path
import json
SH_KEY = "sk_live_your_key_here"
SH_HEADERS = {"Authorization": f"Bearer: {SH_KEY}", "Content-Type": "application/json"}
def scrape_url(url, wait_for=None):
"""Scrape a URL using SearchHive ScrapeForge with JS rendering."""
payload = {"url": url, "render_js": True, "follow_redirects": True}
if wait_for:
payload["wait_for"] = wait_for
resp = requests.post(
"https://www.searchhive.dev/api/v1/scrapeforge",
headers=SH_HEADERS,
json=payload
)
resp.raise_for_status()
return resp.json()
def ingest_to_lake(records, domain, category, lake_path="data_lake"):
"""Write scraped records to parquet in the data lake."""
today = date.today().isoformat()
partition_dir = Path(lake_path) / "raw" / category / today
partition_dir.mkdir(parents=True, exist_ok=True)
df = pd.DataFrame(records)
parquet_path = partition_dir / f"{domain}.parquet"
df.to_parquet(parquet_path, index=False, engine="pyarrow")
print(f"Ingested {len(records)} records -> {parquet_path}")
return parquet_path
The SearchHive ScrapeForge call handles JavaScript rendering, proxy rotation, and CAPTCHA solving automatically. This means your ingestion layer does not need to manage any browser infrastructure.
Step 3: Build Source-Specific Extractors
Each data source needs a parser that extracts structured records from HTML. The key principle is to map each site's unique DOM structure into your common schema. Here are examples for product data and job listings:
def extract_products(url, domain):
"""Scrape and extract product data from a product listing page."""
result = scrape_url(url, wait_for=".product-item, .product-card")
soup = BeautifulSoup(result.get("content", ""), "lxml")
records = []
for item in soup.select(".product-item, .product-card"):
name_el = item.select_one("h2, h3, .title, .product-name")
price_el = item.select_one(".price, .product-price, [data-price]")
records.append({
"name": name_el.text.strip() if name_el else "",
"price": price_el.text.strip() if price_el else "",
"url": item.select_one("a")["href"] if item.select_one("a") else "",
"source": domain,
"scraped_at": pd.Timestamp.now().isoformat()
})
return records
def extract_jobs(url, domain):
"""Scrape and extract job listing data."""
result = scrape_url(url, wait_for=".job-card, .job-listing, [data-job]")
soup = BeautifulSoup(result.get("content", ""), "lxml")
records = []
for item in soup.select(".job-card, .job-listing"):
title_el = item.select_one("h2, h3, .job-title")
company_el = item.select_one(".company, .employer")
location_el = item.select_one(".location, .job-location")
salary_el = item.select_one(".salary, .compensation")
records.append({
"title": title_el.text.strip() if title_el else "",
"company": company_el.text.strip() if company_el else "",
"location": location_el.text.strip() if location_el else "",
"salary": salary_el.text.strip() if salary_el else "",
"source": domain,
"scraped_at": pd.Timestamp.now().isoformat()
})
return records
Step 4: Create the Data Lake Manager
Orchestrate ingestion and curate merged datasets with a manager class:
import duckdb
from glob import glob
class DataLakeManager:
def __init__(self, lake_path="data_lake"):
self.lake_path = Path(lake_path)
self.raw_path = self.lake_path / "raw"
self.curated_path = self.lake_path / "curated"
self.raw_path.mkdir(parents=True, exist_ok=True)
self.curated_path.mkdir(parents=True, exist_ok=True)
def ingest(self, records, domain, category):
"""Ingest records into the raw layer."""
return ingest_to_lake(records, domain, category, str(self.lake_path))
def curate(self, category):
"""Merge all raw partitions for a category into a curated dataset."""
pattern = str(self.raw_path / category / "*" / "*.parquet")
files = glob(pattern)
if not files:
print(f"No raw data found for category: {category}")
return
dfs = [pd.read_parquet(f) for f in files]
combined = pd.concat(dfs, ignore_index=True)
# Deduplicate by URL or name
dedup_col = "url" if "url" in combined.columns else "name"
combined = combined.drop_duplicates(subset=[dedup_col], keep="last")
output = self.curated_path / f"{category}_all.parquet"
combined.to_parquet(output, index=False, engine="pyarrow")
print(f"Curated {len(combined)} records -> {output}")
return combined
def query(self, sql):
"""Run SQL queries against curated parquet files using DuckDB."""
conn = duckdb.connect()
result = conn.execute(sql).fetchdf()
conn.close()
return result
Step 5: Query Your Data Lake with SQL
DuckDB lets you run SQL queries directly on parquet files without loading everything into memory:
lake = DataLakeManager()
# Query all products sorted by price
results = lake.query("""
SELECT name, price, source, scraped_at
FROM read_parquet('data_lake/curated/products_all.parquet')
WHERE price != ''
ORDER BY source, scraped_at DESC
LIMIT 50
""")
print(results)
# Job market analysis
results = lake.query("""
SELECT
location,
COUNT(*) as job_count,
AVG(CASE WHEN salary != '' THEN 1 END) as salary_posted_pct
FROM read_parquet('data_lake/curated/jobs_all.parquet')
GROUP BY location
ORDER BY job_count DESC
LIMIT 20
""")
print(results)
# Time-based analysis -- when was data last scraped per source
results = lake.query("""
SELECT
source,
MIN(scraped_at) as first_scrape,
MAX(scraped_at) as last_scrape,
COUNT(*) as record_count
FROM read_parquet('data_lake/curated/products_all.parquet')
GROUP BY source
""")
print(results)
Step 6: Schedule Daily Ingestion
Keep your data lake fresh with scheduled ingestion jobs:
import schedule
import time
SOURCES = {
"products": [
("amazon", "https://www.amazon.com/s?k=laptops", extract_products),
("bestbuy", "https://www.bestbuy.com/site/searchpage.jsp?st=laptops", extract_products),
],
"jobs": [
("linkedin", "https://www.linkedin.com/jobs/search/?keywords=python", extract_jobs),
("indeed", "https://www.indeed.com/jobs?q=python+developer", extract_jobs),
]
}
def daily_ingestion():
lake = DataLakeManager()
for category, sources in SOURCES.items():
for domain, url, extractor in sources:
try:
records = extractor(url, domain)
if records:
lake.ingest(records, domain, category)
except Exception as e:
print(f"Ingestion failed for {domain}: {e}")
lake.curate(category)
print(f"Ingestion complete at {pd.Timestamp.now()}")
# Run immediately, then daily at 6 AM
daily_ingestion()
schedule.every().day.at("06:00").do(daily_ingestion)
while True:
schedule.run_pending()
time.sleep(3600)
Complete Code Example
Here is the full data lake pipeline in one script:
import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import date
from pathlib import Path
import duckdb
from glob import glob
SH_KEY = "sk_live_your_key_here"
SH_HEADERS = {"Authorization": f"Bearer: {SH_KEY}", "Content-Type": "application/json"}
def scrape_url(url, wait_for=None):
payload = {"url": url, "render_js": True, "follow_redirects": True}
if wait_for:
payload["wait_for"] = wait_for
resp = requests.post(
"https://www.searchhive.dev/api/v1/scrapeforge",
headers=SH_HEADERS, json=payload
)
resp.raise_for_status()
return resp.json()
def extract_generic(url, domain, item_selector, fields):
"""Generic extractor: scrape a page and pull fields from each item."""
result = scrape_url(url, wait_for=item_selector)
soup = BeautifulSoup(result.get("content", ""), "lxml")
records = []
for item in soup.select(item_selector):
row = {"source": domain, "scraped_at": pd.Timestamp.now().isoformat()}
for field_name, selector in fields.items():
el = item.select_one(selector)
row[field_name] = el.text.strip() if el else ""
records.append(row)
return records
def ingest(records, domain, category, lake_path="data_lake"):
today = date.today().isoformat()
d = Path(lake_path) / "raw" / category / today
d.mkdir(parents=True, exist_ok=True)
path = d / f"{domain}.parquet"
pd.DataFrame(records).to_parquet(path, index=False)
return path
def curate(category, lake_path="data_lake"):
files = glob(str(Path(lake_path) / "raw" / category / "*" / "*.parquet"))
if not files:
return None
combined = pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)
dedup_col = "url" if "url" in combined.columns else "name"
if dedup_col in combined.columns:
combined = combined.drop_duplicates(subset=[dedup_col], keep="last")
out = Path(lake_path) / "curated" / f"{category}_all.parquet"
combined.to_parquet(out, index=False)
return combined
# Run
records = extract_generic(
"https://books.toscrape.com/", "books.toscrape.com",
"article.product_pod",
{"title": "h3 a", "price": ".price_color", "rating": "p.star-rating"}
)
ingest(records, "books.toscrape.com", "products")
df = curate("products")
if df is not None:
print(f"Total products in lake: {len(df)}")
Common Issues
1. Running out of ScrapeForge credits. Monitor your usage at searchhive.dev/dashboard. The Builder plan ($49/mo) gives you 100K credits -- enough for scraping thousands of pages daily.
2. Large parquet files. If a single category exceeds 1M records, consider further partitioning by month or source. DuckDB handles partitioned parquet with wildcard queries like read_parquet('data_lake/raw/products/*/*.parquet').
3. Schema drift. When a source site changes its HTML, your extractor will return empty fields. Add validation checks after extraction and alert on unusually sparse records. A simple check like asserting that more than half the extracted records have non-empty names catches parser breakage immediately.
4. Deduplication across dates. Products or jobs may appear on multiple scrape dates with updated data. The curate() step keeps the latest record per unique key, so you always have the freshest version of each record.
5. Scaling beyond parquet. For lakes exceeding 100M records, consider ClickHouse or Apache Iceberg. DuckDB and parquet handle up to ~50M records comfortably on a single machine before query performance degrades.
Next Steps
- How to Extract Structured Data from HTML with Python -- detailed parsing techniques
- How to Monitor Webhooks and API Events with Python -- set up monitoring for your ingestion pipeline
- SearchHive API docs -- full parameter reference
Start building your data lake with SearchHive's free tier -- 500 credits to experiment with, no credit card required.