import json
import websocket
import requests
import time
import base64
import threading
def get_websocket_url(port=9222):
"""获取 WebSocket 调试 URL"""
response = requests.get(f'http://localhost:{port}/json')
tabs = response.json()
return tabs[0]['webSocketDebuggerUrl']
class CDPFetch:
"""CDP Fetch 封装类"""
def __init__(self, ws_url):
self.ws = websocket.create_connection(ws_url)
self.command_id = 0
self.event_handlers = {}
self.running = True
self.event_thread = threading.Thread(target=self._event_listener, daemon=True)
self.event_thread.start()
def send_command(self, method, params=None):
"""发送 CDP 命令"""
self.command_id += 1
command = {
"id": self.command_id,
"method": method,
"params": params or {}
}
print(f"\n>>> 发送命令: {method}")
if params:
print(f" 参数: {json.dumps(params, indent=6, ensure_ascii=False)}")
self.ws.send(json.dumps(command))
while True:
response = json.loads(self.ws.recv())
if 'id' in response and response['id'] == self.command_id:
print(f"<<< 响应: {json.dumps(response, indent=4, ensure_ascii=False)}")
return response
elif 'method' in response:
self._handle_event(response)
def _event_listener(self):
"""后台监听事件"""
while self.running:
try:
self.ws.settimeout(0.1)
message = self.ws.recv()
data = json.loads(message)
if 'method' in data:
self._handle_event(data)
except:
continue
def _handle_event(self, event):
"""处理事件"""
method = event['method']
if method in self.event_handlers:
self.event_handlers[method](event)
def on_event(self, event_name, handler):
"""注册事件处理器"""
self.event_handlers[event_name] = handler
def close(self):
self.running = False
time.sleep(0.2)
self.ws.close()
def example_basic_fetch():
"""
演示基础 Fetch 功能:
1. Fetch.enable - 启用请求拦截
2. 监听 Fetch.requestPaused 事件
3. Fetch.continueRequest - 继续请求
4. Fetch.fulfillRequest - 模拟响应
5. Fetch.failRequest - 阻止请求
"""
print("\n" + "="*60)
print("示例 1: 基础请求拦截")
print("="*60)
fetcher = CDPFetch(get_websocket_url())
request_count = [0]
def handle_request_paused(event):
"""处理请求暂停事件"""
request_count[0] += 1
params = event['params']
request_id = params['requestId']
request = params['request']
print(f"\n🔵 请求被拦截 #{request_count[0]}:")
print(f" URL: {request['url']}")
print(f" 方法: {request['method']}")
print(f" 请求ID: {request_id}")
fetcher.send_command("Fetch.continueRequest", {
"requestId": request_id
})
fetcher.on_event("Fetch.requestPaused", handle_request_paused)
fetcher.send_command("Fetch.enable", {
"patterns": [
{
"urlPattern": "*",
"requestStage": "Request"
}
]
})
fetcher.send_command("Page.enable")
fetcher.send_command("Page.navigate", {
"url": "https://example.com"
})
time.sleep(3)
print(f"\n✅ 总共拦截了 {request_count[0]} 个请求")
fetcher.close()
def example_modify_request():
"""
演示修改请求:
1. 修改请求头
2. 修改请求方法
3. 修改 POST 数据
"""
print("\n" + "="*60)
print("示例 2: 修改请求")
print("="*60)
fetcher = CDPFetch(get_websocket_url())
def handle_request_paused(event):
params = event['params']
request_id = params['requestId']
request = params['request']
print(f"\n🔵 拦截到请求: {request['url']}")
modified_headers = request.get('headers', {}).copy()
modified_headers['X-Custom-Header'] = 'Modified by CDP'
modified_headers['User-Agent'] = 'Custom CDP Bot/1.0'
print(f" ✏️ 添加自定义请求头")
fetcher.send_command("Fetch.continueRequest", {
"requestId": request_id,
"headers": [
{"name": k, "value": v}
for k, v in modified_headers.items()
]
})
fetcher.on_event("Fetch.requestPaused", handle_request_paused)
fetcher.send_command("Fetch.enable", {
"patterns": [{
"urlPattern": "*",
"requestStage": "Request"
}]
})
fetcher.send_command("Page.enable")
fetcher.send_command("Page.navigate", {
"url": "https://httpbin.org/headers"
})
time.sleep(3)
fetcher.close()
def example_mock_response():
"""
演示模拟响应:
1. 拦截特定 URL
2. 返回自定义响应内容
3. 设置自定义响应头和状态码
"""
print("\n" + "="*60)
print("示例 3: 模拟响应")
print("="*60)
fetcher = CDPFetch(get_websocket_url())
def handle_request_paused(event):
params = event['params']
request_id = params['requestId']
request = params['request']
url = request['url']
print(f"\n🔵 拦截到请求: {url}")
if 'api' in url or 'json' in url:
print(f" 🎭 返回模拟响应")
mock_data = {
"status": "success",
"message": "This is a mocked response from CDP",
"data": {
"id": 123,
"name": "Mock User",
"timestamp": time.time()
}
}
response_body = json.dumps(mock_data)
encoded_body = base64.b64encode(response_body.encode()).decode()
fetcher.send_command("Fetch.fulfillRequest", {
"requestId": request_id,
"responseCode": 200,
"responseHeaders": [
{"name": "Content-Type", "value": "application/json"},
{"name": "X-Mocked-By", "value": "CDP"},
{"name": "Access-Control-Allow-Origin", "value": "*"}
],
"body": encoded_body
})
else:
fetcher.send_command("Fetch.continueRequest", {
"requestId": request_id
})
fetcher.on_event("Fetch.requestPaused", handle_request_paused)
fetcher.send_command("Fetch.enable", {
"patterns": [{
"urlPattern": "*",
"requestStage": "Request"
}]
})
fetcher.send_command("Page.enable")
fetcher.send_command("Runtime.enable")
fetcher.send_command("Page.navigate", {"url": "https://example.com"})
time.sleep(2)
fetcher.send_command("Runtime.evaluate", {
"expression": """
fetch('https://api.example.com/users')
.then(r => r.json())
.then(data => console.log('Response:', data))
"""
})
time.sleep(2)
fetcher.close()
def example_block_requests():
"""
演示阻止请求:
1. 阻止广告
2. 阻止追踪脚本
3. 阻止图片加载
"""
print("\n" + "="*60)
print("示例 4: 阻止特定请求")
print("="*60)
fetcher = CDPFetch(get_websocket_url())
blocked_patterns = [
'ads',
'analytics',
'tracking',
'doubleclick',
'googletagmanager'
]
blocked_count = [0]
allowed_count = [0]
def handle_request_paused(event):
params = event['params']
request_id = params['requestId']
request = params['request']
url = request['url'].lower()
should_block = any(pattern in url for pattern in blocked_patterns)
if should_block:
blocked_count[0] += 1
print(f"\n🚫 阻止请求 #{blocked_count[0]}: {request['url'][:80]}")
fetcher.send_command("Fetch.failRequest", {
"requestId": request_id,
"errorReason": "BlockedByClient"
})
else:
allowed_count[0] += 1
print(f"\n✅ 允许请求 #{allowed_count[0]}: {request['url'][:80]}")
fetcher.send_command("Fetch.continueRequest", {
"requestId": request_id
})
fetcher.on_event("Fetch.requestPaused", handle_request_paused)
fetcher.send_command("Fetch.enable", {
"patterns": [{
"urlPattern": "*",
"requestStage": "Request"
}]
})
fetcher.send_command("Page.enable")
fetcher.send_command("Page.navigate", {
"url": "https://www.nytimes.com"
})
time.sleep(5)
print(f"\n" + "="*60)
print(f"📊 统计:")
print(f" 阻止: {blocked_count[0]} 个请求")
print(f" 允许: {allowed_count[0]} 个请求")
print("="*60)
fetcher.close()
def example_intercept_response():
"""
演示拦截响应:
1. 在响应阶段拦截
2. Fetch.getResponseBody - 获取响应体
3. 修改响应内容
"""
print("\n" + "="*60)
print("示例 5: 拦截和修改响应")
print("="*60)
fetcher = CDPFetch(get_websocket_url())
def handle_request_paused(event):
params = event['params']
request_id = params['requestId']
if 'responseStatusCode' in params:
print(f"\n🔵 拦截到响应:")
print(f" 状态码: {params['responseStatusCode']}")
print(f" URL: {params['request']['url'][:80]}")
try:
response = fetcher.send_command("Fetch.getResponseBody", {
"requestId": request_id
})
if 'result' in response and 'body' in response['result']:
original_body = response['result']['body']
if response['result'].get('base64Encoded'):
original_body = base64.b64decode(original_body).decode('utf-8')
print(f" 原始响应大小: {len(original_body)} 字节")
modified_body = original_body.replace(
'</body>',
'<script>console.log("Injected by CDP!");</script></body>'
)
encoded_body = base64.b64encode(modified_body.encode()).decode()
fetcher.send_command("Fetch.fulfillRequest", {
"requestId": request_id,
"responseCode": params['responseStatusCode'],
"responseHeaders": params.get('responseHeaders', []),
"body": encoded_body
})
print(f" ✏️ 已注入自定义脚本")
return
except Exception as e:
print(f" ⚠️ 无法获取响应体: {e}")
fetcher.send_command("Fetch.continueRequest", {
"requestId": request_id
})
fetcher.on_event("Fetch.requestPaused", handle_request_paused)
fetcher.send_command("Fetch.enable", {
"patterns": [{
"urlPattern": "*",
"resourceType": "Document",
"requestStage": "Response"
}]
})
fetcher.send_command("Page.enable")
fetcher.send_command("Page.navigate", {
"url": "https://example.com"
})
time.sleep(3)
fetcher.close()
def example_http_proxy():
"""
综合示例: 实现类似 HTTP 代理的功能
1. 记录所有请求/响应
2. 修改特定请求
3. 缓存响应
4. 统计分析
"""
print("\n" + "="*60)
print("示例 6: HTTP 代理功能")
print("="*60)
fetcher = CDPFetch(get_websocket_url())
stats = {
"total_requests": 0,
"blocked": 0,
"modified": 0,
"cached": 0,
"by_type": {}
}
response_cache = {}
def handle_request_paused(event):
params = event['params']
request_id = params['requestId']
request = params['request']
url = request['url']
resource_type = params.get('resourceType', 'Other')
stats["total_requests"] += 1
stats["by_type"][resource_type] = stats["by_type"].get(resource_type, 0) + 1
print(f"\n📦 请求 #{stats['total_requests']}")
print(f" 类型: {resource_type}")
print(f" URL: {url[:100]}")
if url in response_cache:
stats["cached"] += 1
print(f" 💾 使用缓存")
cached = response_cache[url]
fetcher.send_command("Fetch.fulfillRequest", cached)
return
if resource_type == "Image":
stats["blocked"] += 1
print(f" 🚫 阻止图片")
fetcher.send_command("Fetch.failRequest", {
"requestId": request_id,
"errorReason": "BlockedByClient"
})
return
if resource_type == "Script":
stats["modified"] += 1
modified_headers = [
{"name": k, "value": v}
for k, v in request.get('headers', {}).items()
]
modified_headers.append(
{"name": "X-Script-Modified", "value": "true"}
)
print(f" ✏️ 修改脚本请求头")
fetcher.send_command("Fetch.continueRequest", {
"requestId": request_id,
"headers": modified_headers
})
return
fetcher.send_command("Fetch.continueRequest", {
"requestId": request_id
})
fetcher.on_event("Fetch.requestPaused", handle_request_paused)
fetcher.send_command("Fetch.enable", {
"patterns": [{
"urlPattern": "*",
"requestStage": "Request"
}],
"handleAuthRequests": True
})
fetcher.send_command("Page.enable")
fetcher.send_command("Page.navigate", {
"url": "https://news.ycombinator.com"
})
time.sleep(5)
print("\n" + "="*60)
print("📊 代理统计:")
print(f" 总请求数: {stats['total_requests']}")
print(f" 已阻止: {stats['blocked']}")
print(f" 已修改: {stats['modified']}")
print(f" 使用缓存: {stats['cached']}")
print(f"\n按类型分组:")
for res_type, count in sorted(stats['by_type'].items(), key=lambda x: -x[1]):
print(f" {res_type}: {count}")
print("="*60)
fetcher.close()
if __name__ == "__main__":
print("\n" + "="*60)
print("CDP Fetch.enable 完整示例集")
print("="*60)
print("\n请确保 Chrome 已使用以下命令启动:")
print("chrome --remote-debugging-port=9222 --user-data-dir=/tmp/chrome-debug")
print("\n选择要运行的示例:")
print("1. 基础请求拦截")
print("2. 修改请求")
print("3. 模拟响应")
print("4. 阻止特定请求")
print("5. 拦截和修改响应")
print("6. HTTP 代理功能")
print("0. 运行所有示例")
try:
choice = input("\n请输入选择 (0-6): ").strip()
examples = {
"1": example_basic_fetch,
"2": example_modify_request,
"3": example_mock_response,
"4": example_block_requests,
"5": example_intercept_response,
"6": example_http_proxy
}
if choice == "0":
for example in examples.values():
example()
time.sleep(2)
elif choice in examples:
examples[choice]()
else:
print("无效选择")
print("\n✅ 示例执行完成!")
except requests.exceptions.ConnectionError:
print("\n❌ 错误: 无法连接到 Chrome")
print("请确保 Chrome 已使用 --remote-debugging-port=9222 启动")
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断")
except Exception as e:
print(f"\n❌ 发生错误: {e}")
import traceback
traceback.print_exc()