Chromium / ChromeDriver 完全指南 / 08 - 网页爬虫实战
08 - 网页爬虫实战
使用浏览器自动化技术抓取动态渲染页面,应对反爬机制,实现代理轮换与数据提取。
8.1 为什么需要浏览器爬虫
传统 HTTP 爬虫(如 requests + BeautifulSoup)只能获取服务器返回的原始 HTML。现代网站大量使用 JavaScript 动态渲染内容,传统爬虫无法获取渲染后的内容。
传统 HTTP 爬虫:
HTTP 请求 → 原始 HTML → 解析
✅ 速度快,资源低
❌ 无法获取 JS 渲染内容
浏览器自动化爬虫:
启动浏览器 → 加载页面 → 执行 JS → 获取渲染后 DOM → 解析
✅ 能获取动态内容
✅ 能模拟用户交互
❌ 资源消耗大,速度较慢
何时使用浏览器爬虫
| 场景 | 建议 |
|---|---|
| 静态 HTML 页面 | ❌ 使用 requests/httpx |
| REST API 可用 | ❌ 直接调用 API |
| SPA (React/Vue/Angular) | ✅ 浏览器爬虫 |
| 需要登录/交互 | ✅ 浏览器爬虫 |
| 无限滚动加载 | ✅ 浏览器爬虫 |
| Canvas/WebGL 渲染 | ✅ 浏览器爬虫 |
| 需要截图证据 | ✅ 浏览器爬虫 |
8.2 基础爬虫架构
┌─────────────────────────────────────────────────┐
│ 爬虫调度器 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ URL 队列 │→│ 浏览器池 │→│ 数据管道 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ↑ │ │ │
│ ┌──────────┐ ┌─────────┐ ┌──────────┐ │
│ │ 链接提取 │ │ 反检测 │ │ 数据存储 │ │
│ └──────────┘ └─────────┘ └──────────┘ │
└─────────────────────────────────────────────────┘
8.3 Selenium 爬虫
基础示例 — 抓取新闻列表
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import json
import time
def create_driver():
options = Options()
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--window-size=1920,1080")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
return webdriver.Chrome(options=options)
def scrape_news(url, max_pages=5):
driver = create_driver()
all_articles = []
try:
for page_num in range(1, max_pages + 1):
driver.get(f"{url}?page={page_num}")
wait = WebDriverWait(driver, 15)
# 等待文章列表加载
articles = wait.until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".article-item"))
)
for article in articles:
try:
title = article.find_element(By.CSS_SELECTOR, ".title").text
link = article.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
summary = article.find_element(By.CSS_SELECTOR, ".summary").text
date = article.find_element(By.CSS_SELECTOR, ".date").text
all_articles.append({
"title": title,
"link": link,
"summary": summary,
"date": date,
})
except Exception as e:
print(f"解析文章失败: {e}")
continue
print(f"第 {page_num} 页: 获取 {len(articles)} 篇文章")
time.sleep(2) # 礼貌延迟
finally:
driver.quit()
return all_articles
if __name__ == "__main__":
articles = scrape_news("https://example.com/news", max_pages=3)
print(f"\n共获取 {len(articles)} 篇文章")
with open("/tmp/articles.json", "w", encoding="utf-8") as f:
json.dump(articles, f, ensure_ascii=False, indent=2)
无限滚动加载
def scrape_infinite_scroll(url, max_scrolls=20):
"""抓取无限滚动页面"""
driver = create_driver()
items = set()
try:
driver.get(url)
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".item")))
for i in range(max_scrolls):
# 记录当前元素数量
current_count = len(driver.find_elements(By.CSS_SELECTOR, ".item"))
# 滚动到底部
driver.execute_script(
"window.scrollTo(0, document.body.scrollHeight);"
)
# 等待新内容加载
try:
WebDriverWait(driver, 5).until(
lambda d: len(d.find_elements(By.CSS_SELECTOR, ".item")) > current_count
)
except:
print(f"滚动 {i+1} 次后无新内容,停止")
break
print(f"滚动 {i+1}: 当前 {len(driver.find_elements(By.CSS_SELECTOR, '.item'))} 条")
# 提取所有数据
elements = driver.find_elements(By.CSS_SELECTOR, ".item")
for el in elements:
items.add(el.text)
finally:
driver.quit()
return list(items)
点击翻页
def scrape_with_clicks(url):
"""通过点击按钮翻页"""
driver = create_driver()
results = []
try:
driver.get(url)
while True:
# 提取当前页数据
items = driver.find_elements(By.CSS_SELECTOR, ".product")
for item in items:
results.append({
"name": item.find_element(By.CSS_SELECTOR, ".name").text,
"price": item.find_element(By.CSS_SELECTOR, ".price").text,
})
# 查找下一页按钮
try:
next_btn = driver.find_element(By.CSS_SELECTOR, ".pagination .next:not(.disabled)")
next_btn.click()
# 等待页面更新
WebDriverWait(driver, 10).until(
EC.staleness_of(items[0])
)
time.sleep(1)
except:
print("没有下一页了")
break
finally:
driver.quit()
return results
8.4 Playwright 爬虫
from playwright.sync_api import sync_playwright
import json
def scrape_with_playwright(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
viewport={"width": 1920, "height": 1080},
locale="zh-CN",
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
)
page = context.new_page()
# 路由拦截 (加速)
page.route("**/*.{png,jpg,jpeg,gif,webp,svg,woff,woff2}", lambda route: route.abort())
page.goto(url, wait_until="networkidle")
# 自动等待 + 提取数据
page.wait_for_selector(".article-list")
articles = page.locator(".article-item").all()
results = []
for article in articles:
results.append({
"title": article.locator(".title").text_content(),
"link": article.locator("a").get_attribute("href"),
"summary": article.locator(".summary").text_content(),
})
browser.close()
return results
# 使用异步版本
async def scrape_async(url):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
await page.goto(url)
# ... 异步操作
await browser.close()
无限滚动 (Playwright)
def scrape_infinite_scroll_pw(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url)
while True:
# 记录当前数量
count = page.locator(".item").count()
# 滚动到底部
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
page.wait_for_timeout(2000)
# 检查是否有新内容
new_count = page.locator(".item").count()
if new_count == count:
break
# 提取所有数据
items = page.locator(".item").all()
results = [item.text_content() for item in items]
browser.close()
return results
8.5 反检测策略
8.5.1 基础反检测
def create_stealth_driver():
"""创建不易被检测的浏览器"""
options = Options()
options.add_argument("--headless=new") # 新版无头,指纹与有头一致
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
# 自定义 UserAgent
options.add_argument(
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
driver = webdriver.Chrome(options=options)
# 移除 webdriver 标志
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en-US', 'en']
});
window.chrome = { runtime: {} };
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications'
? Promise.resolve({ state: Notification.permission })
: originalQuery(parameters)
);
"""
})
return driver
8.5.2 使用 undetected-chromedriver
pip install undetected-chromedriver
import undetected_chromedriver as uc
def scrape_with_uc(url):
options = uc.ChromeOptions()
options.add_argument("--headless=new")
driver = uc.Chrome(options=options)
driver.get(url)
# 自动处理 Cloudflare 等反爬
title = driver.title
print(f"标题: {title}")
driver.quit()
return title
8.5.3 Playwright Stealth
npm install playwright-extra puppeteer-extra-plugin-stealth
const { chromium } = require('playwright-extra');
const stealth = require('puppeteer-extra-plugin-stealth')();
chromium.use(stealth);
(async () => {
const browser = await chromium.launch({ headless: true });
const page = await browser.newPage();
await page.goto('https://example.com');
// ... 操作
await browser.close();
})();
反检测策略汇总
| 检测手段 | 应对策略 |
|---|---|
| UserAgent 检测 | 设置真实 UA,移除 HeadlessChrome 标识 |
navigator.webdriver | Object.defineProperty 覆盖 |
| 插件数量 | 伪造 navigator.plugins |
| 语言设置 | 设置 navigator.languages |
| Canvas 指纹 | 使用 --headless=new(与有头一致) |
| WebGL 指纹 | 修改 WEBGL_debug_renderer_info |
| TLS 指纹 | 使用 curl_cffi 或 tls-client |
| Cloudflare | 使用 undetected-chromedriver 或等待 |
| 速率限制 | 请求间隔 + 随机延迟 |
| IP 封禁 | 使用代理池 |
| 验证码 | 人工介入或第三方服务 |
8.6 代理配置
Selenium 代理
# HTTP 代理
options.add_argument("--proxy-server=http://proxy.example.com:8080")
# SOCKS5 代理
options.add_argument("--proxy-server=socks5://proxy.example.com:1080")
# 需要认证的代理
# Chrome 不支持直接在参数中传递代理认证,需使用扩展
代理认证扩展
import zipfile
import base64
def create_proxy_extension(host, port, username, password):
"""创建代理认证扩展"""
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Proxy Auth",
"permissions": ["proxy", "webRequest", "webRequestBlocking", "<all_urls>"],
"background": { "scripts": ["background.js"] }
}
"""
background_js = f"""
chrome.webRequest.onAuthRequired.addListener(
(details, callback) => {{
callback({{
authCredentials: {{ username: "{username}", password: "{password}" }}
}});
}},
{{ urls: ["<all_urls>"] }},
["blocking"]
);
chrome.proxy.settings.set({{
value: {{
mode: "fixed_servers",
rules: {{
singleProxy: {{
scheme: "http",
host: "{host}",
port: parseInt("{port}")
}},
bypassList: ["localhost"]
}}
}},
scope: "regular"
}});
"""
plugin_path = "/tmp/proxy_auth_plugin.zip"
with zipfile.ZipFile(plugin_path, 'w') as zp:
zp.writestr("manifest.json", manifest_json)
zp.writestr("background.js", background_js)
return plugin_path
# 使用
ext_path = create_proxy_extension("proxy.example.com", "8080", "user", "pass")
options.add_extension(ext_path)
代理轮换
import random
PROXY_LIST = [
"http://proxy1.example.com:8080",
"http://proxy2.example.com:8080",
"http://proxy3.example.com:8080",
"socks5://proxy4.example.com:1080",
]
def get_random_proxy():
return random.choice(PROXY_LIST)
def scrape_with_proxy_rotation(urls):
results = {}
for url in urls:
proxy = get_random_proxy()
options = Options()
options.add_argument(f"--proxy-server={proxy}")
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
driver = webdriver.Chrome(options=options)
try:
driver.get(url)
results[url] = driver.page_source
print(f"✅ {url} via {proxy}")
except Exception as e:
print(f"❌ {url} via {proxy}: {e}")
finally:
driver.quit()
time.sleep(random.uniform(1, 3))
return results
8.7 数据提取技巧
8.7.1 复杂数据提取
def extract_product_data(driver):
"""提取结构化商品数据"""
products = []
items = driver.find_elements(By.CSS_SELECTOR, ".product-card")
for item in items:
product = {}
# 标题
try:
product["name"] = item.find_element(By.CSS_SELECTOR, ".product-name").text
except:
product["name"] = None
# 价格
try:
price_text = item.find_element(By.CSS_SELECTOR, ".price").text
product["price"] = float(price_text.replace("¥", "").replace(",", "").strip())
except:
product["price"] = None
# 图片 URL
try:
product["image"] = item.find_element(By.CSS_SELECTOR, "img").get_attribute("src")
except:
product["image"] = None
# 评分
try:
stars = item.find_elements(By.CSS_SELECTOR, ".star.filled")
product["rating"] = len(stars)
except:
product["rating"] = None
# 链接
try:
product["url"] = item.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
except:
product["url"] = None
products.append(product)
return products
8.7.2 JavaScript 提取
def extract_with_js(driver):
"""使用 JavaScript 提取数据(更高效)"""
data = driver.execute_script("""
return Array.from(document.querySelectorAll('.item')).map(item => ({
title: item.querySelector('.title')?.textContent?.trim(),
link: item.querySelector('a')?.href,
image: item.querySelector('img')?.src,
price: parseFloat(item.querySelector('.price')?.textContent?.replace(/[^0-9.]/g, '')),
tags: Array.from(item.querySelectorAll('.tag')).map(t => t.textContent.trim()),
}));
""")
return data
8.7.3 Playwright 数据提取
def extract_with_playwright(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url)
# 使用 evaluate 提取
data = page.evaluate("""
() => Array.from(document.querySelectorAll('.item')).map(item => ({
title: item.querySelector('.title')?.textContent?.trim(),
link: item.querySelector('a')?.href,
}))
""")
# 使用 Locator API 提取
items = page.locator(".item").all()
for item in items:
title = item.locator(".title").text_content()
link = item.locator("a").get_attribute("href")
browser.close()
return data
8.8 爬虫最佳实践
礼貌爬取
import random
import time
def polite_delay(min_sec=1, max_sec=3):
"""随机延迟,模拟人类行为"""
time.sleep(random.uniform(min_sec, max_sec))
# 每次请求后延迟
for url in url_list:
driver.get(url)
# ... 提取数据
polite_delay(2, 5) # 2-5 秒随机延迟
错误重试
from functools import wraps
import time
def retry(max_retries=3, delay=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
wait = delay * (2 ** attempt) # 指数退避
print(f"重试 {attempt + 1}/{max_retries}: {e}, 等待 {wait}s")
time.sleep(wait)
return None
return wrapper
return decorator
@retry(max_retries=3, delay=2)
def fetch_page(url):
driver.get(url)
# ... 提取数据
数据去重
import hashlib
def deduplicate(items, key="url"):
"""根据指定字段去重"""
seen = set()
unique = []
for item in items:
val = item.get(key, "")
hash_val = hashlib.md5(val.encode()).hexdigest()
if hash_val not in seen:
seen.add(hash_val)
unique.append(item)
return unique
8.9 性能优化
| 优化策略 | 说明 | 效果 |
|---|---|---|
| 禁用图片 | 拦截图片请求 | 减少 30-60% 加载时间 |
| 禁用 CSS | 拦截 CSS 请求 | 减少 10-30% 加载时间 |
| 禁用字体 | 拦截字体请求 | 减少 5-10% 加载时间 |
使用 eager 等待 | 不等图片/CSS 加载 | 减少 20-40% 等待 |
| 连接复用 | 复用 BrowserContext | 避免重复启动 |
| 并发控制 | 多 tab 并行 | N 倍吞吐量 |
| API 优先 | 发现 API 直接调用 | 10x+ 速度提升 |
并发爬虫 (Playwright)
import asyncio
from playwright.async_api import async_playwright
async def scrape_page(page, url):
await page.goto(url, wait_until="domcontentloaded")
title = await page.title()
return {"url": url, "title": title}
async def scrape_concurrent(urls, concurrency=5):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
# 创建并发任务
semaphore = asyncio.Semaphore(concurrency)
results = []
async def limited_scrape(url):
async with semaphore:
page = await context.new_page()
try:
result = await scrape_page(page, url)
results.append(result)
finally:
await page.close()
tasks = [limited_scrape(url) for url in urls]
await asyncio.gather(*tasks, return_exceptions=True)
await browser.close()
return results
# 运行
urls = [f"https://example.com/page/{i}" for i in range(100)]
results = asyncio.run(scrape_concurrent(urls, concurrency=5))
8.10 要点回顾
| 要点 | 说明 |
|---|---|
| API 优先 | 能用 API 就不用浏览器爬虫 |
| 反检测必备 | 设置 UA、移除 webdriver 标志、随机延迟 |
| 代理轮换 | 避免 IP 封禁 |
| 随机延迟 | 模拟人类行为,礼貌爬取 |
| 错误重试 | 指数退避 + 最大重试次数 |
| 数据去重 | URL 哈希去重 |
| 资源拦截 | 禁用图片/CSS/字体加速加载 |
8.11 注意事项
⚠️ 法律合规: 爬虫行为需遵守网站的
robots.txt和服务条款,避免抓取敏感个人信息。⚠️ 服务器压力: 不要过于频繁地请求,合理设置延迟,避免对目标服务器造成压力。
⚠️ 数据存储: 大规模爬取注意存储方案,小规模用 JSON/CSV,大规模用数据库。
⚠️ Headless 指纹: 虽然
--headless=new指纹与有头模式一致,但部分高级检测(如 BrowserLeaks)仍可能区分。
8.12 扩展阅读
| 资源 | 链接 |
|---|---|
| robots.txt 规范 | https://www.robotstxt.org/ |
| undetected-chromedriver | https://github.com/ultrafunkamsterdam/undetected-chromedriver |
| playwright-extra | https://github.com/berstend/puppeteer-extra/tree/master/packages/playwright-extra |
| Scrapy + Playwright | https://github.com/scrapy-plugins/scrapy-playwright |
| 浏览器指纹检测 | https://browserleaks.com/ |
| Bot 检测测试 | https://bot.sannysoft.com/ |