跳到主要内容

💜route.continue_()

route.continue_() 是 Playwright 中路由拦截功能的核心方法之一,用于让被拦截的请求继续正常执行。以下是详细的介绍:

基本概念

当你使用 page.route()context.route() 拦截请求时,Playwright 会暂停该请求,等待你决定如何处理。route.continue_() 就是告诉 Playwright:"这个请求可以正常继续执行"。

基础用法

1. 简单的请求继续

from playwright.sync_api import sync_playwright

with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()

def handle_route(route, request):
print(f"拦截到请求: {request.url}")
# 让请求正常继续
route.continue_()

# 拦截所有请求
page.route("**/*", handle_route)

page.goto("https://example.com")
browser.close()

2. 有条件的请求继续

def handle_route(route, request):
if "api" in request.url:
print(f"API请求继续: {request.url}")
route.continue_()
elif "static" in request.url:
print(f"静态资源继续: {request.url}")
route.continue_()
else:
print(f"其他请求继续: {request.url}")
route.continue_()

高级用法 - 修改请求

route.continue_() 可以接受参数来修改请求的各个方面:

1. 修改请求头

def handle_route(route, request):
# 修改请求头
headers = request.headers.copy()
headers["User-Agent"] = "Custom Bot 1.0"
headers["Authorization"] = "Bearer your-token-here"

route.continue_(headers=headers)

page.route("**/*", handle_route)

2. 修改请求方法

def handle_route(route, request):
if request.method == "GET" and "/api/data" in request.url:
# 将GET请求改为POST
route.continue_(method="POST")
else:
route.continue_()

page.route("**/api/**", handle_route)

3. 修改URL

def handle_route(route, request):
original_url = request.url

# 将请求重定向到测试环境
if "production.example.com" in original_url:
new_url = original_url.replace("production.example.com", "staging.example.com")
print(f"重定向: {original_url} -> {new_url}")
route.continue_(url=new_url)
else:
route.continue_()

page.route("**/*", handle_route)

4. 修改POST数据

import json

def handle_route(route, request):
if request.method == "POST" and "/api/login" in request.url:
# 获取原始POST数据
try:
original_data = json.loads(request.post_data or "{}")
# 修改数据
original_data["test_flag"] = True
original_data["environment"] = "testing"

new_post_data = json.dumps(original_data)

route.continue_(
post_data=new_post_data,
headers={**request.headers, "Content-Type": "application/json"}
)
except (json.JSONDecodeError, TypeError):
# 如果不是JSON数据,正常继续
route.continue_()
else:
route.continue_()

page.route("**/api/**", handle_route)

完整的参数列表

route.continue_() 支持以下参数:

def handle_route(route, request):
route.continue_(
url="https://new-url.com", # 新的URL
method="POST", # 新的HTTP方法
headers={"Custom": "Header"}, # 新的请求头
post_data="new post data", # 新的POST数据
post_data_buffer=b"binary data" # 二进制POST数据
)

实际应用场景

1. API测试环境切换

class EnvironmentSwitcher:
def __init__(self, target_env="staging"):
self.target_env = target_env
self.env_mapping = {
"production": "prod.api.example.com",
"staging": "staging.api.example.com",
"development": "dev.api.example.com"
}

def handle_route(self, route, request):
url = request.url

# 检查是否是API请求
if "/api/" in url:
for env, domain in self.env_mapping.items():
if domain in url and env != self.target_env:
new_url = url.replace(domain, self.env_mapping[self.target_env])
print(f"环境切换: {url} -> {new_url}")
route.continue_(url=new_url)
return

route.continue_()

# 使用示例
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()

switcher = EnvironmentSwitcher("staging")
page.route("**/*", switcher.handle_route)

page.goto("https://app.example.com")
browser.close()

2. 请求日志记录

import time
from datetime import datetime

class RequestLogger:
def __init__(self):
self.requests = []

def handle_route(self, route, request):
start_time = time.time()

# 记录请求信息
request_info = {
"url": request.url,
"method": request.method,
"headers": dict(request.headers),
"timestamp": datetime.now().isoformat(),
"start_time": start_time
}

# 继续请求
route.continue_()

# 记录到列表中
self.requests.append(request_info)
print(f"[{request_info['timestamp']}] {request.method} {request.url}")

def get_summary(self):
return {
"total_requests": len(self.requests),
"methods": [req["method"] for req in self.requests],
"domains": list(set([req["url"].split("/")[2] for req in self.requests if len(req["url"].split("/")) > 2]))
}

# 使用示例
logger = RequestLogger()

with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()

page.route("**/*", logger.handle_route)

page.goto("https://example.com")
page.click("a[href='/about']")

print("\n请求摘要:", logger.get_summary())
browser.close()

3. 请求重试机制

import time

class RetryHandler:
def __init__(self, max_retries=3):
self.max_retries = max_retries
self.retry_count = {}

def handle_route(self, route, request):
url = request.url

# 为每个URL维护重试计数
if url not in self.retry_count:
self.retry_count[url] = 0

try:
# 尝试继续请求
route.continue_()
except Exception as e:
# 如果请求失败且未达到最大重试次数
if self.retry_count[url] < self.max_retries:
self.retry_count[url] += 1
print(f"请求失败,重试 {self.retry_count[url]}/{self.max_retries}: {url}")
time.sleep(1) # 等待1秒后重试
route.continue_()
else:
print(f"请求最终失败: {url}")
route.abort()

# 使用示例
retry_handler = RetryHandler(max_retries=2)

with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()

page.route("**/*", retry_handler.handle_route)

page.goto("https://example.com")
browser.close()

4. 缓存控制

class CacheController:
def __init__(self, cache_policy="no-cache"):
self.cache_policy = cache_policy

def handle_route(self, route, request):
headers = request.headers.copy()

if self.cache_policy == "no-cache":
headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
headers["Pragma"] = "no-cache"
headers["Expires"] = "0"
elif self.cache_policy == "max-cache":
headers["Cache-Control"] = "max-age=31536000"

route.continue_(headers=headers)

# 使用示例
cache_controller = CacheController("no-cache")

with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()

page.route("**/*", cache_controller.handle_route)

page.goto("https://example.com")
browser.close()

错误处理

def safe_continue(route, request):
try:
# 检查请求是否有效
if not request.url or request.url == "about:blank":
route.abort()
return

# 检查是否是支持的方法
if request.method not in ["GET", "POST", "PUT", "DELETE", "PATCH"]:
print(f"不支持的HTTP方法: {request.method}")
route.abort()
return

# 正常继续
route.continue_()

except Exception as e:
print(f"处理请求时出错: {e}")
# 如果修改请求失败,尝试不修改直接继续
try:
route.continue_()
except:
# 最后的选择:中止请求
route.abort()

page.route("**/*", safe_continue)

性能考虑

import fnmatch

class OptimizedRouter:
def __init__(self):
# 预编译的模式匹配
self.static_patterns = ["*.css", "*.js", "*.png", "*.jpg", "*.gif"]
self.api_patterns = ["/api/*", "/graphql*"]

def handle_route(self, route, request):
url = request.url

# 快速检查静态资源
if any(fnmatch.fnmatch(url, pattern) for pattern in self.static_patterns):
# 静态资源直接继续,不做额外处理
route.continue_()
return

# API请求可能需要特殊处理
if any(fnmatch.fnmatch(url, pattern) for pattern in self.api_patterns):
self._handle_api_request(route, request)
else:
route.continue_()

def _handle_api_request(self, route, request):
# API特定的处理逻辑
headers = request.headers.copy()
headers["X-Test-Mode"] = "true"
route.continue_(headers=headers)

# 使用示例
router = OptimizedRouter()
page.route("**/*", router.handle_route)

调试技巧

import json

def debug_route(route, request):
print(f"\n=== 请求调试信息 ===")
print(f"URL: {request.url}")
print(f"方法: {request.method}")
print(f"头部: {json.dumps(dict(request.headers), indent=2)}")

if request.post_data:
print(f"POST数据: {request.post_data[:200]}...") # 只显示前200字符

print(f"资源类型: {request.resource_type}")
print(f"是否导航: {request.is_navigation_request()}")
print("===================\n")

# 继续请求
route.continue_()

page.route("**/*", debug_route)

注意事项

  1. 必须调用处理方法:每个被拦截的请求都必须调用 continue_()fulfill()abort() 中的一个
  2. 只能调用一次:每个 route 对象只能调用一次处理方法
  3. 异步处理:在异步环境中使用时要注意 await
  4. 性能影响:拦截所有请求可能影响性能,建议使用具体的URL模式
  5. 错误处理:要妥善处理可能的异常情况

通过灵活使用 route.continue_(),你可以实现强大的请求拦截和修改功能,这对于测试、调试和模拟不同环境非常有用。