跳到主要内容

🟠Page 教程

import json
import websocket
import requests
import time
import threading
import base64
from datetime import datetime

def get_websocket_url(port=9222):
"""获取 WebSocket 调试 URL"""
response = requests.get(f'http://localhost:{port}/json')
tabs = response.json()
return tabs[0]['webSocketDebuggerUrl']

class CDPPage:
"""CDP Page 封装类"""

def __init__(self, ws_url):
self.ws = websocket.create_connection(ws_url)
self.command_id = 0
self.event_handlers = {}
self.running = True
self.events_log = []

# 启动事件监听线程
self.event_thread = threading.Thread(target=self._event_listener, daemon=True)
self.event_thread.start()

def send_command(self, method, params=None):
"""发送 CDP 命令"""
self.command_id += 1
command = {
"id": self.command_id,
"method": method,
"params": params or {}
}
print(f"\n>>> 发送命令: {method}")
if params:
print(f" 参数: {json.dumps(params, indent=6, ensure_ascii=False)}")

self.ws.send(json.dumps(command))

# 等待响应
while True:
response = json.loads(self.ws.recv())
if 'id' in response and response['id'] == self.command_id:
if 'error' not in response:
print(f"<<< 响应成功")
if 'result' in response and response['result']:
print(f" 结果: {json.dumps(response['result'], indent=6, ensure_ascii=False)[:200]}")
else:
print(f"<<< 错误: {response['error']}")
return response
elif 'method' in response:
self._handle_event(response)

def _event_listener(self):
"""后台监听事件"""
while self.running:
try:
self.ws.settimeout(0.1)
message = self.ws.recv()
data = json.loads(message)
if 'method' in data:
self._handle_event(data)
except:
continue

def _handle_event(self, event):
"""处理事件"""
method = event['method']
self.events_log.append({
'timestamp': datetime.now().isoformat(),
'method': method,
'params': event.get('params', {})
})

if method in self.event_handlers:
self.event_handlers[method](event)

def on_event(self, event_name, handler):
"""注册事件处理器"""
self.event_handlers[event_name] = handler

def wait_for_event(self, event_name, timeout=10):
"""等待特定事件"""
start_time = time.time()
while time.time() - start_time < timeout:
for event in reversed(self.events_log):
if event['method'] == event_name:
return event
time.sleep(0.1)
return None

def close(self):
self.running = False
time.sleep(0.2)
self.ws.close()


# ========================================
# 示例 1: 基础页面生命周期监听
# ========================================
def example_page_lifecycle():
"""
演示页面生命周期事件:
1. Page.enable - 启用页面事件
2. Page.lifecycleEvent - 生命周期事件
3. Page.loadEventFired - 页面加载完成
4. Page.domContentEventFired - DOM 内容加载完成
5. Page.frameStartedLoading - 框架开始加载
6. Page.frameStoppedLoading - 框架停止加载
"""
print("\n" + "="*60)
print("示例 1: 页面生命周期监听")
print("="*60)

page = CDPPage(get_websocket_url())

lifecycle_events = []

def handle_lifecycle(event):
params = event['params']
lifecycle_events.append({
'name': params.get('name'),
'timestamp': params.get('timestamp'),
'frameId': params.get('frameId', '')[:20]
})
print(f"\n🔄 生命周期事件: {params.get('name')}")
print(f" 时间戳: {params.get('timestamp')}")

def handle_load_event(event):
print(f"\n✅ Page.loadEventFired - 页面完全加载")
print(f" 时间戳: {event['params'].get('timestamp')}")

def handle_dom_content(event):
print(f"\n✅ Page.domContentEventFired - DOM 内容加载完成")
print(f" 时间戳: {event['params'].get('timestamp')}")

def handle_frame_started(event):
frame_id = event['params'].get('frameId', '')[:20]
print(f"\n🔵 框架开始加载: {frame_id}...")

def handle_frame_stopped(event):
frame_id = event['params'].get('frameId', '')[:20]
print(f"\n🔵 框架停止加载: {frame_id}...")

# 注册事件处理器
page.on_event("Page.lifecycleEvent", handle_lifecycle)
page.on_event("Page.loadEventFired", handle_load_event)
page.on_event("Page.domContentEventFired", handle_dom_content)
page.on_event("Page.frameStartedLoading", handle_frame_started)
page.on_event("Page.frameStoppedLoading", handle_frame_stopped)

# 1. 启用页面事件
page.send_command("Page.enable")

# 2. 设置生命周期事件监听
page.send_command("Page.setLifecycleEventsEnabled", {
"enabled": True
})

# 3. 导航到页面
page.send_command("Page.navigate", {
"url": "https://example.com"
})

# 等待页面加载
time.sleep(5)

# 显示生命周期摘要
print("\n" + "="*60)
print("📊 生命周期事件摘要:")
for i, event in enumerate(lifecycle_events, 1):
print(f" {i}. {event['name']}")
print("="*60)

page.close()


# ========================================
# 示例 2: 页面导航控制
# ========================================
def example_page_navigation():
"""
演示页面导航命令:
1. Page.navigate - 导航到 URL
2. Page.reload - 重载页面
3. Page.stopLoading - 停止加载
4. Page.navigateToHistoryEntry - 导航到历史记录
5. Page.getNavigationHistory - 获取导航历史
"""
print("\n" + "="*60)
print("示例 2: 页面导航控制")
print("="*60)

page = CDPPage(get_websocket_url())

def handle_navigation(event):
params = event['params']
print(f"\n🔄 页面导航")
print(f" URL: {params.get('url', 'N/A')}")
print(f" Frame ID: {params.get('frameId', 'N/A')[:20]}...")

page.on_event("Page.frameNavigated", handle_navigation)
page.send_command("Page.enable")

# 1. 导航到第一个页面
print("\n--- 导航到 Example.com ---")
page.send_command("Page.navigate", {
"url": "https://example.com"
})
time.sleep(2)

# 2. 导航到第二个页面
print("\n--- 导航到 Wikipedia ---")
page.send_command("Page.navigate", {
"url": "https://www.wikipedia.org"
})
time.sleep(2)

# 3. 获取导航历史
print("\n--- 获取导航历史 ---")
history = page.send_command("Page.getNavigationHistory")
if 'result' in history:
entries = history['result'].get('entries', [])
current_index = history['result'].get('currentIndex', 0)
print(f" 当前索引: {current_index}")
print(f" 历史记录数: {len(entries)}")
for i, entry in enumerate(entries):
marker = " <- 当前" if i == current_index else ""
print(f" {i}. {entry.get('url', 'N/A')[:60]}{marker}")

# 4. 返回上一页
if 'result' in history and history['result'].get('currentIndex', 0) > 0:
print("\n--- 返回上一页 ---")
prev_index = history['result']['currentIndex'] - 1
prev_entry_id = history['result']['entries'][prev_index]['id']
page.send_command("Page.navigateToHistoryEntry", {
"entryId": prev_entry_id
})
time.sleep(2)

# 5. 重载页面
print("\n--- 重载页面 ---")
page.send_command("Page.reload", {
"ignoreCache": True # 忽略缓存
})
time.sleep(2)

page.close()


# ========================================
# 示例 3: 截图和 PDF 生成
# ========================================
def example_capture_screenshot():
"""
演示页面捕获命令:
1. Page.captureScreenshot - 截图
2. Page.printToPDF - 生成 PDF
3. Page.captureSnapshot - 捕获 MHTML
"""
print("\n" + "="*60)
print("示例 3: 截图和 PDF 生成")
print("="*60)

page = CDPPage(get_websocket_url())
page.send_command("Page.enable")

# 导航到页面
page.send_command("Page.navigate", {
"url": "https://example.com"
})
page.wait_for_event("Page.loadEventFired", timeout=10)
time.sleep(1)

# 1. 全屏截图
print("\n--- 捕获全屏截图 ---")
screenshot = page.send_command("Page.captureScreenshot", {
"format": "png",
"quality": 90,
"captureBeyondViewport": True # 捕获整个页面
})

if 'result' in screenshot and 'data' in screenshot['result']:
screenshot_data = base64.b64decode(screenshot['result']['data'])
filename = f"screenshot_{int(time.time())}.png"
with open(filename, 'wb') as f:
f.write(screenshot_data)
print(f" ✅ 截图已保存: {filename}")
print(f" 大小: {len(screenshot_data)} 字节")

# 2. 指定区域截图
print("\n--- 捕获指定区域截图 ---")
screenshot = page.send_command("Page.captureScreenshot", {
"format": "jpeg",
"quality": 80,
"clip": {
"x": 0,
"y": 0,
"width": 800,
"height": 600,
"scale": 1
}
})

if 'result' in screenshot and 'data' in screenshot['result']:
screenshot_data = base64.b64decode(screenshot['result']['data'])
filename = f"screenshot_clip_{int(time.time())}.jpg"
with open(filename, 'wb') as f:
f.write(screenshot_data)
print(f" ✅ 区域截图已保存: {filename}")

# 3. 生成 PDF
print("\n--- 生成 PDF ---")
pdf = page.send_command("Page.printToPDF", {
"landscape": False,
"displayHeaderFooter": True,
"printBackground": True,
"scale": 1.0,
"paperWidth": 8.5,
"paperHeight": 11,
"marginTop": 0.4,
"marginBottom": 0.4,
"marginLeft": 0.4,
"marginRight": 0.4
})

if 'result' in pdf and 'data' in pdf['result']:
pdf_data = base64.b64decode(pdf['result']['data'])
filename = f"page_{int(time.time())}.pdf"
with open(filename, 'wb') as f:
f.write(pdf_data)
print(f" ✅ PDF 已保存: {filename}")
print(f" 大小: {len(pdf_data)} 字节")

page.close()


# ========================================
# 示例 4: 页面资源管理
# ========================================
def example_page_resources():
"""
演示页面资源管理:
1. Page.getResourceTree - 获取资源树
2. Page.getResourceContent - 获取资源内容
3. Page.frameAttached - 框架附加事件
4. Page.frameDetached - 框架分离事件
"""
print("\n" + "="*60)
print("示例 4: 页面资源管理")
print("="*60)

page = CDPPage(get_websocket_url())

def handle_frame_attached(event):
frame_id = event['params'].get('frameId', '')[:20]
parent_id = event['params'].get('parentFrameId', '')[:20]
print(f"\n🔗 框架附加: {frame_id}... (父: {parent_id}...)")

def handle_frame_detached(event):
frame_id = event['params'].get('frameId', '')[:20]
print(f"\n❌ 框架分离: {frame_id}...")

page.on_event("Page.frameAttached", handle_frame_attached)
page.on_event("Page.frameDetached", handle_frame_detached)
page.send_command("Page.enable")

# 导航到有多个资源的页面
page.send_command("Page.navigate", {
"url": "https://news.ycombinator.com"
})
page.wait_for_event("Page.loadEventFired", timeout=10)
time.sleep(2)

# 1. 获取资源树
print("\n--- 获取资源树 ---")
resource_tree = page.send_command("Page.getResourceTree")

if 'result' in resource_tree and 'frameTree' in resource_tree['result']:
frame_tree = resource_tree['result']['frameTree']
frame = frame_tree.get('frame', {})
resources = frame_tree.get('resources', [])

print(f"\n主框架:")
print(f" ID: {frame.get('id', 'N/A')[:30]}...")
print(f" URL: {frame.get('url', 'N/A')}")
print(f" MIME 类型: {frame.get('mimeType', 'N/A')}")

print(f"\n资源列表 (共 {len(resources)} 个):")
for i, resource in enumerate(resources[:10], 1): # 只显示前10个
print(f" {i}. {resource.get('type', 'N/A'):15} - {resource.get('url', 'N/A')[:80]}")

if len(resources) > 10:
print(f" ... 还有 {len(resources) - 10} 个资源")

# 2. 获取第一个资源的内容
if resources:
first_resource = resources[0]
print(f"\n--- 获取资源内容 ---")
print(f" 资源 URL: {first_resource.get('url', 'N/A')[:80]}")

content = page.send_command("Page.getResourceContent", {
"frameId": frame.get('id'),
"url": first_resource.get('url')
})

if 'result' in content:
content_data = content['result'].get('content', '')
is_base64 = content['result'].get('base64Encoded', False)
print(f" Base64 编码: {is_base64}")
print(f" 内容大小: {len(content_data)} 字符")
if not is_base64:
print(f" 内容预览: {content_data[:200]}...")

page.close()


# ========================================
# 示例 5: JavaScript 对话框处理
# ========================================
def example_javascript_dialogs():
"""
演示 JavaScript 对话框处理:
1. Page.javascriptDialogOpening - 对话框打开事件
2. Page.handleJavaScriptDialog - 处理对话框
3. alert, confirm, prompt 拦截
"""
print("\n" + "="*60)
print("示例 5: JavaScript 对话框处理")
print("="*60)

page = CDPPage(get_websocket_url())

dialog_count = [0]

def handle_dialog_opening(event):
dialog_count[0] += 1
params = event['params']
dialog_type = params.get('type', 'N/A')
message = params.get('message', 'N/A')
default_prompt = params.get('defaultPrompt', '')

print(f"\n💬 对话框 #{dialog_count[0]} 打开:")
print(f" 类型: {dialog_type}")
print(f" 消息: {message}")
if default_prompt:
print(f" 默认值: {default_prompt}")

# 自动处理对话框
if dialog_type == "alert":
print(f" → 自动接受 alert")
page.send_command("Page.handleJavaScriptDialog", {
"accept": True
})
elif dialog_type == "confirm":
print(f" → 自动确认 confirm")
page.send_command("Page.handleJavaScriptDialog", {
"accept": True
})
elif dialog_type == "prompt":
print(f" → 自动输入 prompt: '自动回复'")
page.send_command("Page.handleJavaScriptDialog", {
"accept": True,
"promptText": "自动回复"
})

page.on_event("Page.javascriptDialogOpening", handle_dialog_opening)
page.send_command("Page.enable")
page.send_command("Runtime.enable")

# 导航到页面
page.send_command("Page.navigate", {
"url": "https://example.com"
})
page.wait_for_event("Page.loadEventFired", timeout=10)

# 1. 触发 alert
print("\n--- 触发 alert ---")
page.send_command("Runtime.evaluate", {
"expression": "alert('这是一个 alert 对话框')"
})
time.sleep(1)

# 2. 触发 confirm
print("\n--- 触发 confirm ---")
result = page.send_command("Runtime.evaluate", {
"expression": "confirm('确认操作吗?')",
"awaitPromise": True
})
time.sleep(1)

# 3. 触发 prompt
print("\n--- 触发 prompt ---")
result = page.send_command("Runtime.evaluate", {
"expression": "prompt('请输入您的名字:', '默认名称')",
"awaitPromise": True
})
time.sleep(1)

print(f"\n📊 总共处理了 {dialog_count[0]} 个对话框")

page.close()


# ========================================
# 示例 6: 页面视口和设备模拟
# ========================================
def example_viewport_and_emulation():
"""
演示视口和设备模拟:
1. Page.setDeviceMetricsOverride - 设置设备指标
2. Emulation.setTouchEmulationEnabled - 启用触摸模拟
3. Emulation.setGeolocationOverride - 设置地理位置
"""
print("\n" + "="*60)
print("示例 6: 视口和设备模拟")
print("="*60)

page = CDPPage(get_websocket_url())
page.send_command("Page.enable")

# 1. 设置移动设备视口 (iPhone 12)
print("\n--- 模拟 iPhone 12 ---")
page.send_command("Emulation.setDeviceMetricsOverride", {
"width": 390,
"height": 844,
"deviceScaleFactor": 3,
"mobile": True,
"screenOrientation": {
"type": "portraitPrimary",
"angle": 0
}
})

# 2. 设置 User-Agent
page.send_command("Emulation.setUserAgentOverride", {
"userAgent": "Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) AppleWebKit/605.1.15"
})

# 3. 启用触摸模拟
page.send_command("Emulation.setTouchEmulationEnabled", {
"enabled": True,
"maxTouchPoints": 5
})

# 4. 设置地理位置
print("\n--- 设置地理位置 (纽约) ---")
page.send_command("Emulation.setGeolocationOverride", {
"latitude": 40.7128,
"longitude": -74.0060,
"accuracy": 100
})

# 导航到页面
page.send_command("Page.navigate", {
"url": "https://www.whatismybrowser.com"
})
page.wait_for_event("Page.loadEventFired", timeout=10)
time.sleep(2)

# 截图看效果
screenshot = page.send_command("Page.captureScreenshot", {
"format": "png"
})

if 'result' in screenshot and 'data' in screenshot['result']:
screenshot_data = base64.b64decode(screenshot['result']['data'])
filename = f"mobile_screenshot_{int(time.time())}.png"
with open(filename, 'wb') as f:
f.write(screenshot_data)
print(f"\n✅ 移动端截图已保存: {filename}")

# 5. 恢复正常视口
print("\n--- 恢复桌面视口 ---")
page.send_command("Emulation.clearDeviceMetricsOverride")

page.close()


# ========================================
# 示例 7: 综合应用 - 页面监控器
# ========================================
def example_page_monitor():
"""
综合示例: 实现完整的页面监控
1. 监控所有页面事件
2. 性能指标收集
3. 资源加载统计
4. 错误捕获
"""
print("\n" + "="*60)
print("示例 7: 页面监控器")
print("="*60)

page = CDPPage(get_websocket_url())

# 统计数据
stats = {
"navigation_count": 0,
"load_time": None,
"dom_content_time": None,
"resources_loaded": 0,
"javascript_errors": 0,
"console_messages": 0,
"dialogs_opened": 0
}

start_time = time.time()

# 事件处理器
def handle_frame_navigated(event):
stats["navigation_count"] += 1
url = event['params'].get('frame', {}).get('url', 'N/A')
print(f"\n📍 页面导航 #{stats['navigation_count']}: {url[:80]}")

def handle_load_event(event):
stats["load_time"] = time.time() - start_time
print(f"\n✅ 页面加载完成: {stats['load_time']:.2f}秒")

def handle_dom_content(event):
stats["dom_content_time"] = time.time() - start_time
print(f"\n✅ DOM 内容加载: {stats['dom_content_time']:.2f}秒")

def handle_console_message(event):
stats["console_messages"] += 1
params = event['params']
level = params.get('level', 'log')
text = params.get('text', '')
print(f"\n💬 控制台 [{level}]: {text[:100]}")

def handle_exception(event):
stats["javascript_errors"] += 1
params = event['params']
description = params.get('exceptionDetails', {}).get('text', 'N/A')
print(f"\n❌ JavaScript 错误 #{stats['javascript_errors']}: {description[:100]}")

def handle_dialog(event):
stats["dialogs_opened"] += 1
dialog_type = event['params'].get('type', 'N/A')
print(f"\n💬 对话框 #{stats['dialogs_opened']}: {dialog_type}")
page.send_command("Page.handleJavaScriptDialog", {"accept": True})

# 注册所有事件
page.on_event("Page.frameNavigated", handle_frame_navigated)
page.on_event("Page.loadEventFired", handle_load_event)
page.on_event("Page.domContentEventFired", handle_dom_content)
page.on_event("Runtime.consoleAPICalled", handle_console_message)
page.on_event("Runtime.exceptionThrown", handle_exception)
page.on_event("Page.javascriptDialogOpening", handle_dialog)

# 启用所有必要的域
page.send_command("Page.enable")
page.send_command("Runtime.enable")
page.send_command("Network.enable")

# 启用控制台消息
page.send_command("Runtime.consoleAPICalled")

# 导航到页面
print("\n--- 开始监控 ---")
page.send_command("Page.navigate", {
"url": "https://news.ycombinator.com"
})

# 监控10秒
time.sleep(10)

# 获取性能指标
print("\n--- 获取性能指标 ---")
metrics = page.send_command("Performance.getMetrics")

if 'result' in metrics and 'metrics' in metrics['result']:
print("\n📊 性能指标:")
for metric in metrics['result']['metrics'][:10]:
print(f" {metric['name']}: {metric['value']}")

# 显示最终统计
print("\n" + "="*60)
print("📊 监控摘要:")
print(f" 导航次数: {stats['navigation_count']}")
print(f" DOM 加载时间: {stats['dom_content_time']:.2f}秒" if stats['dom_content_time'] else " DOM 加载时间: N/A")
print(f" 完全加载时间: {stats['load_time']:.2f}秒" if stats['load_time'] else " 完全加载时间: N/A")
print(f" 控制台消息: {stats['console_messages']}")
print(f" JavaScript 错误: {stats['javascript_errors']}")
print(f" 对话框: {stats['dialogs_opened']}")
print("="*60)

page.close()


# ========================================
# 主函数
# ========================================
if __name__ == "__main__":
print("\n" + "="*60)
print("CDP Page.enable 完整示例集")
print("="*60)
print("\n请确保 Chrome 已使用以下命令启动:")
print("chrome --remote-debugging-port=9222 --user-data-dir=/tmp/chrome-debug")
print("\n选择要运行的示例:")
print("1. 页面生命周期监听")
print("2. 页面导航控制")
print("3. 截图和 PDF 生成")
print("4. 页面资源管理")
print("5. JavaScript 对话框处理")
print("6. 视口和设备模拟")
print("7. 页面监控器(综合)")
print("0. 运行所有示例")

try:
choice = input("\n请输入选择 (0-7): ").strip()

examples = {
"1": example_page_lifecycle,
"2": example_page_navigation,
"3": example_capture_screenshot,
"4": example_page_resources,
"5": example_javascript_dialogs,
"6": example_viewport_and_emulation,
"7": example_page_monitor
}

if choice == "0":
for name, example in examples.items():
print(f"\n{'='*60}")
print(f"运行示例 {name}...")
print(f"{'='*60}")
example()
time.sleep(2)
elif choice in examples:
examples[choice]()
else:
print("无效选择")

print("\n✅ 示例执行完成!")

except requests.exceptions.ConnectionError:
print("\n❌ 错误: 无法连接到 Chrome")
print("请确保 Chrome 已使用 --remote-debugging-port=9222 启动")
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断")
except Exception as e:
print(f"\n❌ 发生错误: {e}")
import traceback
traceback.print_exc()

# %%