How to Build a Web Scraping API Pipeline — Step-by-Step
A web scraping API lets you programmatically extract data from websites without managing proxies, handling CAPTCHAs, or parsing HTML yourself. Instead of writing and maintaining fragile scrapers, you send an HTTP request with a URL and get clean, structured data back.
This tutorial walks you through building a complete web scraping API pipeline with SearchHive's ScrapeForge API -- from your first API call to a production-ready batch processing system.
Key Takeaways
- Web scraping APIs handle the hard parts -- proxy rotation, CAPTCHA solving, and HTML rendering
- SearchHive ScrapeForge returns clean text or free JSON formatter from any URL with a single API call
- A production pipeline needs error handling, rate limiting, deduplication, and data storage
- Batch processing with parallelism dramatically improves throughput
- Start with the free tier -- 500 credits/month is enough to build and test your pipeline
Prerequisites
- Python 3.8+ installed
- A SearchHive account (free)
- Your API key from the dashboard
- Basic familiarity with Python requests library
pip install requests
Step 1: Make Your First Scrape Request
The simplest possible call to SearchHive's ScrapeForge API extracts text content from a URL:
import requests
API_KEY = "YOUR_API_KEY"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
response = requests.post(
"https://api.searchhive.dev/v1/scrape",
headers=HEADERS,
json={
"url": "https://example.com",
"format": "markdown",
"only_text": True
}
)
if response.status_code == 200:
data = response.json()
print(f"Extracted {len(data['content'])} characters")
print(data["content"][:500])
else:
print(f"Error: {response.status_code} - {response.text}")
This returns the page content as clean markdown text with all HTML tags, scripts, and navigation removed. No proxy setup, no CAPTCHA handling, no HTML parsing on your end.
Step 2: Extract Specific Data Fields
For more control, extract specific data fields from a page using CSS selectors:
response = requests.post(
"https://api.searchhive.dev/v1/scrape",
headers=HEADERS,
json={
"url": "https://news.ycombinator.com",
"format": "json",
"extract": {
"headlines": ".titleline > a",
"scores": ".score",
"comments": ".subtext a:last-child"
}
}
)
if response.status_code == 200:
data = response.json()
headlines = data.get("headlines", [])
print(f"Found {len(headlines)} headlines")
for h in headlines[:5]:
print(f" - {h}")
Step 3: Search Then Scrape
Combine SwiftSearch and ScrapeForge to discover relevant pages and extract their content:
import requests
import time
def search_and_scrape(query, num_results=5):
"""Search for pages matching a query, then scrape each one."""
# Step A: Search for relevant URLs
search_resp = requests.get(
"https://api.searchhive.dev/v1/search",
headers=HEADERS,
params={"q": query, "limit": num_results}
)
urls = []
for result in search_resp.json().get("results", []):
urls.append(result["url"])
print(f" Found: {result['title']}")
# Step B: Scrape each URL
scraped_data = []
for url in urls:
try:
scrape_resp = requests.post(
"https://api.searchhive.dev/v1/scrape",
headers=HEADERS,
json={"url": url, "format": "markdown", "only_text": True}
)
if scrape_resp.status_code == 200:
content = scrape_resp.json()["content"]
if len(content) > 100:
scraped_data.append({"url": url, "content": content})
print(f" Scraped: {url} ({len(content)} chars)")
time.sleep(0.5)
except Exception as e:
print(f" Failed: {url} - {e}")
return scraped_data
results = search_and_scrape("python data engineering best practices", num_results=3)
print(f"\nSuccessfully scraped {len(results)} pages")
Step 4: Build a Batch Processing Pipeline
For production workloads, you need parallel processing with error handling and retry logic:
import requests
import time
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
def scrape_url(url, max_retries=3):
"""Scrape a single URL with retry logic."""
for attempt in range(max_retries):
try:
resp = requests.post(
"https://api.searchhive.dev/v1/scrape",
headers=HEADERS,
json={"url": url, "format": "markdown", "only_text": True},
timeout=30
)
if resp.status_code == 200:
return {"url": url, "content": resp.json()["content"], "status": "ok"}
elif resp.status_code == 429:
wait = 2 ** attempt
print(f" Rate limited, waiting {wait}s...")
time.sleep(wait)
else:
return {"url": url, "status": f"error_{resp.status_code}"}
except requests.exceptions.Timeout:
print(f" Timeout: {url}")
if attempt < max_retries - 1:
time.sleep(2)
except Exception as e:
return {"url": url, "status": f"error: {str(e)}"}
return {"url": url, "status": "max_retries_exceeded"}
def batch_scrape(urls, max_workers=3):
"""Scrape multiple URLs in parallel with controlled concurrency."""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(scrape_url, url): url for url in urls}
for future in as_completed(futures):
result = future.result()
results.append(result)
status = "OK" if result.get("status") == "ok" else "FAIL"
print(f" [{status}] {result['url'][:60]}")
return results
# Run the batch pipeline
urls = [
"https://en.wikipedia.org/wiki/Web_scraping",
"https://en.wikipedia.org/wiki/API",
"https://en.wikipedia.org/wiki/JSON",
"https://en.wikipedia.org/wiki/REST",
"https://en.wikipedia.org/wiki/Data_extraction",
]
print("Starting batch scrape...")
results = batch_scrape(urls, max_workers=3)
success = sum(1 for r in results if r.get("status") == "ok")
print(f"\nCompleted: {success}/{len(results)} successful")
# Save results
with open("scraped_data.json", "w") as f:
json.dump(results, f, indent=2)
Step 5: Add Deduplication and Quality Filtering
Filter out stub pages, duplicates, and low-quality content:
def deduplicate(results):
"""Remove near-duplicate results based on content fingerprint."""
seen = set()
unique = []
for item in results:
content = item.get("content", "")
# Skip very short pages
if len(content) < 200:
continue
# Fingerprint based on first 300 chars
fingerprint = content[:300].strip().lower()
if fingerprint not in seen:
seen.add(fingerprint)
unique.append(item)
return unique
def quality_filter(results, min_chars=300, max_chars=100000):
"""Filter results by content quality metrics."""
filtered = []
for item in results:
content = item.get("content", "")
length = len(content)
if min_chars <= length <= max_chars:
# Check for actual content (not just boilerplate)
word_count = len(content.split())
if word_count > 50:
filtered.append(item)
return filtered
# Apply filters
unique = deduplicate(results)
quality = quality_filter(unique)
print(f"After dedup: {len(unique)} | After quality filter: {len(quality)}")
Step 6: Save to Structured Storage
Store your scraped data in a format that's easy to query and use downstream:
import json
from datetime import datetime
def save_results(results, filepath="scraped_data.json"):
"""Save results with metadata for downstream processing."""
output = {
"scrape_date": datetime.utcnow().isoformat(),
"total_pages": len(results),
"successful": sum(1 for r in results if r.get("status") == "ok"),
"pages": []
}
for r in results:
if r.get("status") == "ok":
output["pages"].append({
"url": r["url"],
"content_length": len(r["content"]),
"word_count": len(r["content"].split()),
"content": r["content"]
})
with open(filepath, "w") as f:
json.dump(output, f, indent=2)
print(f"Saved {len(output['pages'])} pages to {filepath}")
return output
data = save_results(quality)
Step 7: Complete Pipeline
Here's the full pipeline combining all steps:
import requests
import time
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
API_KEY = "YOUR_API_KEY"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
def full_pipeline(query, num_results=10, max_workers=3):
"""Complete search-scrape-filter-save pipeline."""
print(f"=== Pipeline: {query} ===")
# 1. Search
print("\n[1/4] Searching...")
resp = requests.get(
"https://api.searchhive.dev/v1/search",
headers=HEADERS,
params={"q": query, "limit": num_results}
)
urls = [r["url"] for r in resp.json().get("results", [])]
print(f" Found {len(urls)} URLs")
# 2. Scrape
print("\n[2/4] Scraping...")
raw = batch_scrape(urls, max_workers=max_workers)
# 3. Filter
print("\n[3/4] Filtering...")
unique = deduplicate(raw)
quality = quality_filter(unique)
print(f" {len(raw)} raw -> {len(unique)} unique -> {len(quality)} quality")
# 4. Save
print("\n[4/4] Saving...")
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
output = save_results(quality, f"pipeline_{timestamp}.json")
return output
# Run it
results = full_pipeline("web scraping API comparison 2025", num_results=8)
Common Issues and Fixes
Rate Limiting (HTTP 429)
Reduce max_workers and add exponential backoff. The retry logic in Step 4 handles this automatically.
Timeout Errors
Increase the timeout parameter in your request. Large pages take longer to render and extract.
Empty Content
Some pages require JavaScript rendering. If you get empty results, the page may use client-side rendering that isn't supported. Try requesting the raw HTML format instead.
Duplicate Results
The deduplication step handles this. If you see many duplicates, your search query may be too broad -- narrow it with site-specific operators.
Next Steps
Once your pipeline is working, consider these enhancements:
- Scheduled runs -- use cron expression generator or a task queue (Celery, RQ) to run your pipeline on a schedule
- Database storage -- save results to PostgreSQL or SQLite instead of JSON files
- Embedding pipeline -- chunk the extracted text and generate embeddings for RAG
- Monitoring -- track success rates, response times, and credit usage
Get started with the SearchHive free tier -- 500 credits per month, no credit card required. The API docs cover all parameters and response formats.
For more tutorials, see /blog/building-a-web-scraper-api-with-python and /compare/firecrawl.