How to Monitor Competitor Prices with Python — Automated System
Tracking competitor pricing manually doesn't scale. If you're managing an e-commerce operation, SaaS product, or marketplace listing, you need an automated system to monitor competitor prices and alert you to changes in real time.
This tutorial builds a complete price monitoring system in Python — from scraping competitor pages to storing historical data, detecting price changes, and sending alerts. We'll use SearchHive for reliable scraping and show you how to deploy it as a scheduled job.
Key Takeaways
- SearchHive ScrapeForge extracts prices from JS-rendered product pages with built-in proxy rotation
- SQLite stores historical price data for trend analysis without requiring a database server
- APScheduler runs the monitoring pipeline on any schedule (hourly, daily, weekly)
- Price change detection with configurable thresholds triggers alerts via email or Slack
- The complete system runs under 200 lines of Python with zero external infrastructure
Prerequisites
Install the required packages:
pip install searchhive apscheduler pandas
- searchhive — Web scraping API with free tier (50K requests/month)
- apscheduler — Job scheduling library for running periodic tasks
- pandas — Data analysis for price trend calculations
You also need:
- A list of competitor product URLs to monitor
- (Optional) Slack webhook URL or SMTP credentials for alerts
Step 1: Define Your Competitor Product List
Start by creating a structured configuration of the products and competitors you want to track:
# competitors.py
COMPETITORS = [
{
"name": "Competitor A",
"products": [
{
"name": "Widget Pro",
"url": "https://competitor-a.com/products/widget-pro",
"price_selector": ".price-current",
"in_stock_selector": ".stock-status",
},
{
"name": "Gadget X",
"url": "https://competitor-a.com/products/gadget-x",
"price_selector": ".product-price",
"in_stock_selector": ".availability",
},
]
},
{
"name": "Competitor B",
"products": [
{
"name": "Widget Pro",
"url": "https://competitor-b.com/widget-pro",
"price_selector": "[data-price]",
"in_stock_selector": ".in-stock",
},
]
},
]
Keeping your configuration separate from your scraping logic makes it easy to add or remove competitors without touching code.
Step 2: Scrape Prices with SearchHive
SearchHive ScrapeForge handles JavaScript rendering and proxy rotation — essential for e-commerce sites that load prices dynamically and aggressively block scrapers:
# scraper.py
from searchhive import ScrapeForge
import re
from datetime import datetime
def parse_price(price_text: str) -> float:
"""Extract numeric price from text like '$29.99' or '29,99 EUR'."""
cleaned = re.sub(r'[^0-9.]', '', price_text.replace(',', '.'))
numbers = re.findall(r'\d+\.\d{2}', cleaned)
return float(numbers[0]) if numbers else 0.0
def scrape_competitor_prices(competitors_config: list[dict]) -> list[dict]:
"""Scrape current prices from all configured competitors."""
client = ScrapeForge()
results = []
all_urls = []
url_map = {} # url -> (competitor_name, product_name)
for comp in competitors_config:
for product in comp["products"]:
url = product["url"]
all_urls.append(url)
url_map[url] = {
"competitor": comp["name"],
"product": product["name"],
"price_selector": product["price_selector"],
}
# Batch scrape all URLs concurrently
scrape_results = client.scrape_batch(
all_urls,
render_js=True,
wait_for=".price-current, .product-price, [data-price]",
selectors={
"price": "TEXT_PLACEHOLDER", # Will use raw HTML instead
},
concurrency=3
)
timestamp = datetime.utcnow().isoformat()
for r in scrape_results:
meta = url_map.get(r.url)
if not meta or not r.success:
continue
# Extract price from the scraped HTML
price_text = r.raw_html or ""
price = parse_price(price_text)
results.append({
"timestamp": timestamp,
"competitor": meta["competitor"],
"product": meta["product"],
"url": r.url,
"price": price,
})
return results
For a more targeted approach, use per-URL selectors:
from searchhive import ScrapeForge
def scrape_single_product(url: str, price_selector: str) -> dict:
"""Scrape a single product page for its price."""
client = ScrapeForge()
result = client.scrape(
url=url,
render_js=True,
selectors={
"price": price_selector,
"in_stock": ".stock-status, .availability, .in-stock",
"title": "h1",
}
)
if result.data:
price_text = result.data.get("price", "0")
price = parse_price(str(price_text))
return {
"price": price,
"in_stock": "out" not in str(result.data.get("in_stock", "")).lower(),
"title": result.data.get("title", ""),
}
return {"price": 0.0, "in_stock": False, "title": ""}
Step 3: Store Price History in SQLite
SQLite is perfect for this — zero setup, file-based, and handles time-series queries well:
# storage.py
import sqlite3
from datetime import datetime
def init_db(db_path: str = "prices.db"):
"""Initialize the price history database."""
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
competitor TEXT NOT NULL,
product TEXT NOT NULL,
url TEXT,
price REAL NOT NULL,
in_stock INTEGER DEFAULT 1
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_product
ON price_history(competitor, product, timestamp)
""")
conn.commit()
conn.close()
def save_prices(prices: list[dict], db_path: str = "prices.db"):
"""Save scraped prices to the database."""
conn = sqlite3.connect(db_path)
for p in prices:
conn.execute(
"INSERT INTO price_history (timestamp, competitor, product, url, price, in_stock) VALUES (?, ?, ?, ?, ?, ?)",
(p["timestamp"], p["competitor"], p["product"], p.get("url", ""), p["price"], int(p.get("in_stock", True)))
)
conn.commit()
conn.close()
def get_latest_prices(db_path: str = "prices.db") -> list[dict]:
"""Get the most recent price for each competitor/product combo."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT ph.* FROM price_history ph
INNER JOIN (
SELECT competitor, product, MAX(timestamp) as max_ts
FROM price_history
GROUP BY competitor, product
) latest ON ph.competitor = latest.competitor
AND ph.product = latest.product
AND ph.timestamp = latest.max_ts
ORDER BY ph.competitor, ph.product
""").fetchall()
conn.close()
return [dict(r) for r in rows]
def get_price_history(product: str, competitor: str, days: int = 30, db_path: str = "prices.db") -> list[dict]:
"""Get price history for a specific product."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT timestamp, price FROM price_history
WHERE product = ? AND competitor = ?
AND timestamp >= datetime('now', ?)
ORDER BY timestamp
""", (product, competitor, f'-{days} days')).fetchall()
conn.close()
return [dict(r) for r in rows]
Step 4: Detect Price Changes and Generate Alerts
Compare each new scrape against the previous recorded price to detect changes:
# alerts.py
import sqlite3
from datetime import datetime
def check_price_changes(new_prices: list[dict], threshold: float = 0.05, db_path: str = "prices.db") -> list[dict]:
"""Check for price changes exceeding the threshold.
Args:
new_prices: Latest scraped prices
threshold: Minimum change to trigger alert (0.05 = 5%)
"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
alerts = []
for price in new_prices:
# Get previous price
row = conn.execute("""
SELECT price FROM price_history
WHERE competitor = ? AND product = ?
ORDER BY timestamp DESC LIMIT 1 OFFSET 1
""", (price["competitor"], price["product"])).fetchone()
if row:
old_price = row["price"]
if old_price > 0:
change_pct = (price["price"] - old_price) / old_price
if abs(change_pct) >= threshold:
direction = "dropped" if change_pct < 0 else "increased"
alerts.append({
"competitor": price["competitor"],
"product": price["product"],
"old_price": old_price,
"new_price": price["price"],
"change_pct": round(change_pct * 100, 1),
"direction": direction,
"timestamp": price["timestamp"],
})
conn.close()
return alerts
def format_alert(alert: dict) -> str:
"""Format an alert as a readable message."""
emoji = "📉" if alert["direction"] == "dropped" else "📈"
return (
f"{emoji} Price {alert['direction'].upper()}!\n"
f"{alert['competitor']} — {alert['product']}\n"
f"${alert['old_price']:.2f} -> ${alert['new_price']:.2f} ({alert['change_pct']}%)"
)
def send_slack_alert(alerts: list[dict], webhook_url: str):
"""Send price alerts to Slack."""
import requests
blocks = []
for alert in alerts:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": format_alert(alert)
}
})
if blocks:
requests.post(webhook_url, json={"blocks": blocks}, timeout=10)
print(f"Sent {len(alerts)} alerts to Slack")
Step 5: Schedule Automated Monitoring
Use APScheduler to run the monitoring pipeline on a regular schedule:
# monitor.py
from apscheduler.schedulers.blocking import BlockingScheduler
from scraper import scrape_competitor_prices
from storage import init_db, save_prices
from alerts import check_price_changes, send_slack_alert
from competitors import COMPETITORS
import os
def run_monitoring_cycle():
"""Complete monitoring cycle: scrape, store, alert."""
print(f"[{datetime.now().isoformat()}] Starting price monitoring cycle...")
# Step 1: Scrape current prices
prices = scrape_competitor_prices(COMPETITORS)
print(f" Scraped {len(prices)} prices")
# Step 2: Store in database
save_prices(prices)
# Step 3: Check for changes
alerts = check_price_changes(prices, threshold=0.05)
if alerts:
print(f" {len(alerts)} price changes detected!")
for alert in alerts:
print(f" {alert['competitor']} - {alert['product']}: {alert['direction']} {alert['change_pct']}%")
# Send to Slack if webhook configured
webhook = os.environ.get("SLACK_WEBHOOK_URL")
if webhook:
send_slack_alert(alerts, webhook)
else:
print(" No significant price changes")
print(f"[{datetime.now().isoformat()}] Cycle complete\n")
if __name__ == "__main__":
init_db()
scheduler = BlockingScheduler()
# Run every 6 hours
scheduler.add_job(run_monitoring_cycle, 'interval', hours=6)
# Also run once immediately on startup
scheduler.add_job(run_monitoring_cycle)
print("Price monitoring started. Running every 6 hours.")
print("Press Ctrl+C to stop.")
scheduler.start()
Step 6: Analyze Price Trends
Once you have historical data, analyze trends to understand competitor pricing strategy:
# analysis.py
import sqlite3
import pandas as pd
def get_price_dataframe(db_path: str = "prices.db") -> pd.DataFrame:
"""Load all price history into a pandas DataFrame."""
conn = sqlite3.connect(db_path)
df = pd.read_sql_query("SELECT * FROM price_history ORDER BY timestamp", conn)
conn.close()
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
def competitor_summary(df: pd.DataFrame) -> pd.DataFrame:
"""Generate a summary of current vs previous prices."""
df_sorted = df.sort_values("timestamp")
latest = df_sorted.groupby(["competitor", "product"]).last().reset_index()
summary = latest[["competitor", "product", "price", "timestamp"]].copy()
summary.columns = ["Competitor", "Product", "Current Price", "Last Checked"]
summary = summary.sort_values(["Product", "Competitor"])
return summary
# Usage
df = get_price_dataframe()
summary = competitor_summary(df)
print(summary.to_string(index=False))
# Calculate average price by competitor
avg_prices = df.groupby("competitor")["price"].agg(["mean", "min", "max", "std"])
print("\nPrice statistics by competitor:")
print(avg_prices.round(2))
Complete Code Example
Here's the entire monitoring system in a single runnable script:
from searchhive import ScrapeForge
from apscheduler.schedulers.blocking import BlockingScheduler
import sqlite3
import re
import json
from datetime import datetime
class PriceMonitor:
def __init__(self, db_path: str = "prices.db"):
self.db_path = db_path
self.client = ScrapeForge()
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT, competitor TEXT, product TEXT,
url TEXT, price REAL
)
""")
conn.commit()
conn.close()
def parse_price(self, text: str) -> float:
cleaned = re.sub(r'[^0-9.]', '', text.replace(',', '.'))
numbers = re.findall(r'\d+\.\d{2}', cleaned)
return float(numbers[0]) if numbers else 0.0
def monitor(self, competitors: list[dict], threshold: float = 0.05):
all_urls = []
url_meta = {}
for comp in competitors:
for p in comp["products"]:
all_urls.append(p["url"])
url_meta[p["url"]] = {"competitor": comp["name"], "product": p["name"]}
results = self.client.scrape_batch(
all_urls, render_js=True,
selectors={"raw": "body"}, concurrency=3
)
ts = datetime.utcnow().isoformat()
alerts = []
conn = sqlite3.connect(self.db_path)
for r in results:
meta = url_meta.get(r.url, {})
if not r.success:
continue
price = self.parse_price(str(r.data) if r.data else "")
# Check previous price
prev = conn.execute(
"SELECT price FROM price_history WHERE competitor=? AND product=? ORDER BY timestamp DESC LIMIT 1",
(meta.get("competitor", ""), meta.get("product", ""))
).fetchone()
if prev and prev[0] > 0:
change = (price - prev[0]) / prev[0]
if abs(change) >= threshold:
alerts.append({
"competitor": meta["competitor"],
"product": meta["product"],
"old": prev[0], "new": price,
"change": f"{change*100:+.1f}%"
})
conn.execute(
"INSERT INTO price_history (timestamp, competitor, product, url, price) VALUES (?,?,?,?,?)",
(ts, meta.get("competitor"), meta.get("product"), r.url, price)
)
conn.commit()
conn.close()
if alerts:
print(f"ALERTS: {json.dumps(alerts, indent=2)}")
return alerts
if __name__ == "__main__":
competitors = [
{"name": "Competitor A", "products": [
{"name": "Widget Pro", "url": "https://example.com/product1"},
]},
]
monitor = PriceMonitor()
alerts = monitor.monitor(competitors)
print(f"Monitoring complete. {len(alerts)} alerts.")
Common Issues
Competitor blocks your IP after a few requests
SearchHive's automatic proxy rotation handles this. Each request routes through a different residential proxy, so your IP never appears repeatedly to the target site.
Prices load after a delay (AJAX)
Use render_js=True and set wait_for to the price element's CSS selector. SearchHive will wait until the element appears before extracting data.
Prices in non-standard formats ("From $29", "$29/mo", "Save 20%")
Your parse_price function needs to handle these cases. Extract the first numeric value with a decimal point — it's almost always the base price.
Database grows too large
SQLite handles millions of rows fine. If you need more, consider PostgreSQL with TimescaleDB extension for efficient time-series queries.
Next Steps
- Set up the scheduled monitor as a systemd service or deploy to a cloud VM for 24/7 operation
- Use SearchHive SwiftSearch to discover new competitor product pages automatically
- Check /blog/how-to-extract-contact-info-from-websites-with-python for extracting competitor contact information
- Read /compare/brightdata to see how SearchHive compares to other enterprise scraping solutions
Start monitoring competitor prices today with SearchHive's free tier — 50,000 requests/month with JS rendering and proxy rotation. No credit card required. Read the docs.