import json
import websocket
import requests
import time
import threading
from datetime import datetime
def get_websocket_url(port=9222):
"""获取 WebSocket 调试 URL"""
response = requests.get(f'http://localhost:{port}/json')
tabs = response.json()
return tabs[0]['webSocketDebuggerUrl']
class CDPRuntime:
"""CDP Runtime 封装类"""
def __init__(self, ws_url):
self.ws = websocket.create_connection(ws_url)
self.command_id = 0
self.event_handlers = {}
self.running = True
self.events_log = []
self.event_thread = threading.Thread(target=self._event_listener, daemon=True)
self.event_thread.start()
def send_command(self, method, params=None):
"""发送 CDP 命令"""
self.command_id += 1
command = {
"id": self.command_id,
"method": method,
"params": params or {}
}
print(f"\n>>> 发送命令: {method}")
if params:
params_str = json.dumps(params, indent=6, ensure_ascii=False)
if len(params_str) > 500:
params_str = params_str[:500] + "..."
print(f" 参数: {params_str}")
self.ws.send(json.dumps(command))
start_time = time.time()
while time.time() - start_time < 10:
try:
response = json.loads(self.ws.recv())
if 'id' in response and response['id'] == self.command_id:
if 'error' not in response:
print(f"<<< 响应成功")
if 'result' in response:
result_str = json.dumps(response['result'], indent=6, ensure_ascii=False)
if len(result_str) > 500:
result_str = result_str[:500] + "..."
print(f" 结果: {result_str}")
else:
print(f"<<< 错误: {response['error']}")
return response
elif 'method' in response:
self._handle_event(response)
except Exception as e:
continue
print(f"<<< 超时")
return {"error": "timeout"}
def _event_listener(self):
"""后台监听事件"""
while self.running:
try:
self.ws.settimeout(0.1)
message = self.ws.recv()
data = json.loads(message)
if 'method' in data:
self._handle_event(data)
except:
continue
def _handle_event(self, event):
"""处理事件"""
method = event['method']
self.events_log.append({
'timestamp': datetime.now().isoformat(),
'method': method,
'params': event.get('params', {})
})
if method in self.event_handlers:
self.event_handlers[method](event)
def on_event(self, event_name, handler):
"""注册事件处理器"""
self.event_handlers[event_name] = handler
def close(self):
self.running = False
time.sleep(0.2)
self.ws.close()
def example_basic_evaluation():
"""
演示基础 JavaScript 执行:
1. Runtime.enable - 启用运行时
2. Runtime.evaluate - 执行表达式
3. 不同的返回值类型
4. returnByValue vs objectId
"""
print("\n" + "="*60)
print("示例 1: 基础 JavaScript 执行")
print("="*60)
runtime = CDPRuntime(get_websocket_url())
print("\n--- 启用 Runtime ---")
runtime.send_command("Runtime.enable")
runtime.send_command("Page.enable")
runtime.send_command("Page.navigate", {"url": "https://example.com"})
time.sleep(2)
print("\n--- 执行简单表达式 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": "1 + 1",
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
value = result['result']['result']['value']
print(f" 计算结果: {value}")
print("\n--- 获取页面信息 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": "document.title",
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
title = result['result']['result']['value']
print(f" 页面标题: {title}")
print("\n--- 返回对象 (returnByValue) ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
({
url: window.location.href,
width: window.innerWidth,
height: window.innerHeight,
userAgent: navigator.userAgent.substring(0, 50)
})
""",
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
obj = result['result']['result']['value']
print(f" 返回对象: {json.dumps(obj, indent=8, ensure_ascii=False)}")
print("\n--- 获取对象引用 (objectId) ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": "document.body",
"returnByValue": False
})
if 'result' in result and 'result' in result['result']:
object_id = result['result']['result'].get('objectId')
if object_id:
print(f" 对象 ID: {object_id[:50]}...")
print(f" 类型: {result['result']['result'].get('type')}")
print(f" 子类型: {result['result']['result'].get('subtype')}")
print("\n--- 执行异步代码 (Promise) ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
new Promise((resolve) => {
setTimeout(() => {
resolve('异步结果: 延迟1秒');
}, 1000);
})
""",
"awaitPromise": True,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
value = result['result']['result']['value']
print(f" 异步结果: {value}")
runtime.close()
def example_object_inspection():
"""
演示对象检查:
1. Runtime.getProperties - 获取对象属性
2. Runtime.callFunctionOn - 在对象上调用函数
3. Runtime.releaseObject - 释放对象
4. Runtime.releaseObjectGroup - 释放对象组
"""
print("\n" + "="*60)
print("示例 2: 对象属性检查")
print("="*60)
runtime = CDPRuntime(get_websocket_url())
runtime.send_command("Runtime.enable")
runtime.send_command("Page.enable")
runtime.send_command("Page.navigate", {"url": "https://example.com"})
time.sleep(2)
print("\n--- 获取 window 对象引用 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": "window",
"returnByValue": False
})
window_object_id = None
if 'result' in result and 'result' in result['result']:
window_object_id = result['result']['result'].get('objectId')
print(f" Window 对象 ID: {window_object_id[:50]}...")
if window_object_id:
print("\n--- 获取 window 对象的属性 ---")
props = runtime.send_command("Runtime.getProperties", {
"objectId": window_object_id,
"ownProperties": True,
"accessorPropertiesOnly": False,
"generatePreview": True
})
if 'result' in props and 'result' in props['result']:
properties = props['result']['result']
print(f" 属性数量: {len(properties)}")
print(f"\n 前10个属性:")
for i, prop in enumerate(properties[:10], 1):
name = prop.get('name', 'N/A')
value = prop.get('value', {})
prop_type = value.get('type', 'N/A')
print(f" {i}. {name}: {prop_type}")
print("\n--- 检查 document 对象 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": "document",
"returnByValue": False
})
doc_object_id = None
if 'result' in result and 'result' in result['result']:
doc_object_id = result['result']['result'].get('objectId')
if doc_object_id:
props = runtime.send_command("Runtime.getProperties", {
"objectId": doc_object_id,
"ownProperties": False
})
if 'result' in props and 'result' in props['result']:
properties = props['result']['result']
interesting_props = ['title', 'URL', 'domain', 'readyState']
print(f" 文档属性:")
for prop in properties:
if prop.get('name') in interesting_props:
name = prop['name']
value = prop.get('value', {})
if 'value' in value:
print(f" {name}: {value['value']}")
if doc_object_id:
print("\n--- 在 document 上调用函数 ---")
result = runtime.send_command("Runtime.callFunctionOn", {
"objectId": doc_object_id,
"functionDeclaration": """
function() {
return {
elementCount: this.getElementsByTagName('*').length,
linkCount: this.getElementsByTagName('a').length,
imageCount: this.getElementsByTagName('img').length
};
}
""",
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
stats = result['result']['result']['value']
print(f" 页面统计: {json.dumps(stats, indent=8)}")
print("\n--- 释放对象引用 ---")
if window_object_id:
runtime.send_command("Runtime.releaseObject", {
"objectId": window_object_id
})
print(f" 已释放 window 对象")
if doc_object_id:
runtime.send_command("Runtime.releaseObject", {
"objectId": doc_object_id
})
print(f" 已释放 document 对象")
runtime.close()
def example_console_handling():
"""
演示控制台消息处理:
1. Runtime.consoleAPICalled - 控制台 API 调用
2. Runtime.exceptionThrown - 异常抛出
3. 不同类型的控制台消息
"""
print("\n" + "="*60)
print("示例 3: 控制台消息处理")
print("="*60)
runtime = CDPRuntime(get_websocket_url())
console_messages = []
exceptions = []
def handle_console_api(event):
params = event['params']
msg_type = params.get('type', 'log')
args = params.get('args', [])
values = []
for arg in args:
if 'value' in arg:
values.append(str(arg['value']))
elif 'description' in arg:
values.append(arg['description'])
else:
values.append(str(arg.get('type', 'unknown')))
message = ' '.join(values)
console_messages.append({
'type': msg_type,
'message': message,
'timestamp': params.get('timestamp')
})
icon = {
'log': '📝',
'info': 'ℹ️',
'warn': '⚠️',
'error': '❌',
'debug': '🐛'
}.get(msg_type, '📝')
print(f"\n{icon} 控制台 [{msg_type}]: {message}")
def handle_exception(event):
params = event['params']
exception_details = params.get('exceptionDetails', {})
text = exception_details.get('text', 'N/A')
exception = exception_details.get('exception', {})
description = exception.get('description', '')
exceptions.append({
'text': text,
'description': description,
'timestamp': params.get('timestamp')
})
print(f"\n💥 JavaScript 异常:")
print(f" 错误: {text}")
if description:
print(f" 描述: {description[:200]}")
runtime.on_event("Runtime.consoleAPICalled", handle_console_api)
runtime.on_event("Runtime.exceptionThrown", handle_exception)
runtime.send_command("Runtime.enable")
runtime.send_command("Page.enable")
runtime.send_command("Page.navigate", {"url": "https://example.com"})
time.sleep(2)
print("\n--- 触发控制台消息 ---")
runtime.send_command("Runtime.evaluate", {
"expression": "console.log('这是一条日志消息')"
})
runtime.send_command("Runtime.evaluate", {
"expression": "console.info('这是一条信息消息')"
})
runtime.send_command("Runtime.evaluate", {
"expression": "console.warn('这是一条警告消息')"
})
runtime.send_command("Runtime.evaluate", {
"expression": "console.error('这是一条错误消息')"
})
print("\n--- 输出对象 ---")
runtime.send_command("Runtime.evaluate", {
"expression": """
console.log('用户信息:', {
name: 'Alice',
age: 30,
role: 'Developer'
})
"""
})
print("\n--- 触发异常 ---")
runtime.send_command("Runtime.evaluate", {
"expression": "throw new Error('这是一个测试错误')"
})
runtime.send_command("Runtime.evaluate", {
"expression": "undefinedFunction()"
})
time.sleep(2)
print("\n" + "="*60)
print(f"📊 统计:")
print(f" 控制台消息: {len(console_messages)} 条")
print(f" 异常: {len(exceptions)} 个")
if console_messages:
print(f"\n控制台消息类型分布:")
types_count = {}
for msg in console_messages:
t = msg['type']
types_count[t] = types_count.get(t, 0) + 1
for t, count in types_count.items():
print(f" {t}: {count}")
print("="*60)
runtime.close()
def example_execution_contexts():
"""
演示执行上下文管理:
1. Runtime.executionContextCreated - 上下文创建
2. Runtime.executionContextDestroyed - 上下文销毁
3. 在不同上下文中执行代码
4. iframe 上下文
"""
print("\n" + "="*60)
print("示例 4: 执行上下文管理")
print("="*60)
runtime = CDPRuntime(get_websocket_url())
contexts = {}
def handle_context_created(event):
context = event['params']['context']
context_id = context['id']
name = context.get('name', 'unnamed')
origin = context.get('origin', 'N/A')
contexts[context_id] = context
print(f"\n🆕 执行上下文创建:")
print(f" ID: {context_id}")
print(f" 名称: {name}")
print(f" 来源: {origin}")
print(f" 是否默认: {context.get('auxData', {}).get('isDefault', False)}")
def handle_context_destroyed(event):
context_id = event['params']['executionContextId']
if context_id in contexts:
del contexts[context_id]
print(f"\n❌ 执行上下文销毁: {context_id}")
runtime.on_event("Runtime.executionContextCreated", handle_context_created)
runtime.on_event("Runtime.executionContextDestroyed", handle_context_destroyed)
runtime.send_command("Runtime.enable")
runtime.send_command("Page.enable")
print("\n--- 导航到包含 iframe 的页面 ---")
runtime.send_command("Page.navigate", {
"url": "data:text/html,<html><body><h1>主页面</h1><iframe src='data:text/html,<h1>iframe</h1>'></iframe></body></html>"
})
time.sleep(3)
print("\n--- 当前执行上下文 ---")
print(f" 上下文数量: {len(contexts)}")
for i, (ctx_id, ctx) in enumerate(contexts.items(), 1):
print(f"\n {i}. 上下文 ID: {ctx_id}")
print(f" 名称: {ctx.get('name', 'N/A')}")
print(f" 来源: {ctx.get('origin', 'N/A')}")
print("\n--- 在主上下文中执行 ---")
main_context = None
for ctx_id, ctx in contexts.items():
if ctx.get('auxData', {}).get('isDefault'):
main_context = ctx_id
break
if main_context:
result = runtime.send_command("Runtime.evaluate", {
"expression": "document.querySelector('h1').textContent",
"contextId": main_context,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
print(f" 主页面 h1: {result['result']['result']['value']}")
print("\n--- 在所有上下文中执行 ---")
for ctx_id in contexts.keys():
result = runtime.send_command("Runtime.evaluate", {
"expression": "window.location.href",
"contextId": ctx_id,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
url = result['result']['result']['value']
print(f" 上下文 {ctx_id}: {url[:80]}")
runtime.close()
def example_advanced_operations():
"""
演示高级操作:
1. Runtime.compileScript - 编译脚本
2. Runtime.runScript - 运行脚本
3. Runtime.globalLexicalScopeNames - 全局词法作用域
4. 性能测量
"""
print("\n" + "="*60)
print("示例 5: 高级 JavaScript 操作")
print("="*60)
runtime = CDPRuntime(get_websocket_url())
runtime.send_command("Runtime.enable")
runtime.send_command("Page.enable")
runtime.send_command("Page.navigate", {"url": "https://example.com"})
time.sleep(2)
print("\n--- 编译 JavaScript 脚本 ---")
script_source = """
function fibonacci(n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
fibonacci(10);
"""
compile_result = runtime.send_command("Runtime.compileScript", {
"expression": script_source,
"sourceURL": "fibonacci.js",
"persistScript": True
})
script_id = None
if 'result' in compile_result and 'scriptId' in compile_result['result']:
script_id = compile_result['result']['scriptId']
print(f" 编译成功,脚本 ID: {script_id}")
if script_id:
print("\n--- 运行编译的脚本 ---")
run_result = runtime.send_command("Runtime.runScript", {
"scriptId": script_id,
"returnByValue": True
})
if 'result' in run_result and 'result' in run_result['result']:
result_value = run_result['result']['result']['value']
print(f" 执行结果: fibonacci(10) = {result_value}")
print("\n--- 获取全局词法作用域 ---")
scope_result = runtime.send_command("Runtime.globalLexicalScopeNames")
if 'result' in scope_result and 'names' in scope_result['result']:
names = scope_result['result']['names']
print(f" 全局变量数量: {len(names)}")
if names:
print(f" 前10个: {names[:10]}")
print("\n--- 性能测量示例 ---")
start_time = time.time()
runtime.send_command("Runtime.evaluate", {
"expression": "Array.from({length: 1000}, (_, i) => i).reduce((a, b) => a + b, 0)",
"returnByValue": True
})
eval_time = time.time() - start_time
print(f" Runtime.evaluate 耗时: {eval_time:.3f}秒")
print("\n--- 执行复杂 DOM 操作 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
(function() {
const stats = {
totalElements: document.getElementsByTagName('*').length,
textNodes: 0,
maxDepth: 0
};
function getDepth(element, depth = 0) {
stats.maxDepth = Math.max(stats.maxDepth, depth);
for (let child of element.children) {
getDepth(child, depth + 1);
}
}
getDepth(document.body);
// 统计文本节点
const walker = document.createTreeWalker(
document.body,
NodeFilter.SHOW_TEXT
);
while (walker.nextNode()) {
if (walker.currentNode.textContent.trim()) {
stats.textNodes++;
}
}
return stats;
})()
""",
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
stats = result['result']['result']['value']
print(f" DOM 统计: {json.dumps(stats, indent=8)}")
runtime.close()
def example_async_operations():
"""
演示异步操作:
1. awaitPromise - 等待 Promise
2. Promise 链式调用
3. async/await 语法
4. 超时处理
"""
print("\n" + "="*60)
print("示例 6: 异步操作和 Promise")
print("="*60)
runtime = CDPRuntime(get_websocket_url())
runtime.send_command("Runtime.enable")
runtime.send_command("Page.enable")
runtime.send_command("Page.navigate", {"url": "https://httpbin.org/delay/1"})
time.sleep(3)
print("\n--- 基础 Promise ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
new Promise((resolve) => {
setTimeout(() => resolve('延迟500ms'), 500);
})
""",
"awaitPromise": True,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
print(f" 结果: {result['result']['result']['value']}")
print("\n--- Promise 链式调用 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
Promise.resolve(10)
.then(x => x * 2)
.then(x => x + 5)
.then(x => `最终结果: ${x}`)
""",
"awaitPromise": True,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
print(f" {result['result']['result']['value']}")
print("\n--- async/await 语法 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
(async function() {
await new Promise(r => setTimeout(r, 500));
const data = {
timestamp: Date.now(),
message: 'Async 操作完成'
};
return data;
})()
""",
"awaitPromise": True,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
data = result['result']['result']['value']
print(f" 结果: {json.dumps(data, indent=8)}")
print("\n--- Fetch API 示例 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
fetch('https://httpbin.org/json')
.then(response => response.json())
.then(data => ({
hasData: data !== null,
keys: Object.keys(data).slice(0, 5)
}))
.catch(error => ({
error: error.message
}))
""",
"awaitPromise": True,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
data = result['result']['result']['value']
print(f" Fetch 结果: {json.dumps(data, indent=8, ensure_ascii=False)}")
print("\n--- Promise.all 并行操作 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
Promise.all([
new Promise(r => setTimeout(() => r('任务1'), 300)),
new Promise(r => setTimeout(() => r('任务2'), 200)),
new Promise(r => setTimeout(() => r('任务3'), 100))
]).then(results => ({
completed: results.length,
results: results
}))
""",
"awaitPromise": True,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
data = result['result']['result']['value']
print(f" 并行结果: {json.dumps(data, indent=8, ensure_ascii=False)}")
print("\n--- 捕获 Promise 拒绝 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
new Promise((resolve, reject) => {
setTimeout(() => reject(new Error('故意拒绝')), 100);
}).catch(error => ({
caught: true,
message: error.message
}))
""",
"awaitPromise": True,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
data = result['result']['result']['value']
print(f" 捕获结果: {json.dumps(data, indent=8, ensure_ascii=False)}")
runtime.close()
def example_debug_tool():
"""
综合示例: 实现 JavaScript 调试工具
1. 实时执行代码
2. 监控异常
3. 查看对象
4. 性能分析
"""
print("\n" + "="*60)
print("示例 7: JavaScript 调试工具")
print("="*60)
runtime = CDPRuntime(get_websocket_url())
debug_state = {
"console_logs": [],
"exceptions": [],
"execution_count": 0,
"object_cache": {}
}
def handle_console(event):
params = event['params']
msg_type = params.get('type', 'log')
args = params.get('args', [])
message_parts = []
for arg in args:
if 'value' in arg:
message_parts.append(str(arg['value']))
elif 'description' in arg:
message_parts.append(arg['description'])
message = ' '.join(message_parts)
debug_state["console_logs"].append({
'type': msg_type,
'message': message
})
print(f"\n[CONSOLE] {msg_type.upper()}: {message}")
def handle_exception(event):
exception = event['params']['exceptionDetails']
text = exception.get('text', 'Unknown error')
debug_state["exceptions"].append({
'text': text,
'line': exception.get('lineNumber'),
'column': exception.get('columnNumber')
})
print(f"\n[EXCEPTION] {text}")
if exception.get('lineNumber'):
print(f" 位置: 行 {exception['lineNumber']}, 列 {exception.get('columnNumber', 0)}")
runtime.on_event("Runtime.consoleAPICalled", handle_console)
runtime.on_event("Runtime.exceptionThrown", handle_exception)
print("\n--- 初始化调试环境 ---")
runtime.send_command("Runtime.enable")
runtime.send_command("Page.enable")
runtime.send_command("Page.navigate", {"url": "https://example.com"})
time.sleep(2)
print("\n--- 注入调试辅助函数 ---")
runtime.send_command("Runtime.evaluate", {
"expression": """
window.__debug__ = {
inspect: function(obj) {
console.log('=== Object Inspection ===');
console.log('Type:', typeof obj);
console.log('Constructor:', obj?.constructor?.name);
if (typeof obj === 'object' && obj !== null) {
console.log('Keys:', Object.keys(obj));
}
return obj;
},
measure: function(fn, label = 'Operation') {
const start = performance.now();
const result = fn();
const duration = performance.now() - start;
console.log(`${label} took ${duration.toFixed(2)}ms`);
return result;
},
memory: function() {
if (performance.memory) {
return {
used: (performance.memory.usedJSHeapSize / 1048576).toFixed(2) + ' MB',
total: (performance.memory.totalJSHeapSize / 1048576).toFixed(2) + ' MB',
limit: (performance.memory.jsHeapSizeLimit / 1048576).toFixed(2) + ' MB'
};
}
return null;
}
};
console.log('Debug utilities loaded');
"""
})
time.sleep(1)
print("\n--- 执行调试命令 ---")
print("\n > 检查内存使用")
result = runtime.send_command("Runtime.evaluate", {
"expression": "__debug__.memory()",
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
memory = result['result']['result']['value']
if memory:
print(f" 内存使用: {json.dumps(memory, indent=8)}")
print("\n > 检查 document 对象")
runtime.send_command("Runtime.evaluate", {
"expression": "__debug__.inspect(document)"
})
time.sleep(1)
print("\n > 性能测量")
runtime.send_command("Runtime.evaluate", {
"expression": """
__debug__.measure(() => {
let sum = 0;
for (let i = 0; i < 1000000; i++) {
sum += i;
}
return sum;
}, 'Sum calculation')
"""
})
time.sleep(1)
print("\n--- 查询页面信息 ---")
queries = [
("标题", "document.title"),
("URL", "window.location.href"),
("元素数量", "document.getElementsByTagName('*').length"),
("Cookie 数量", "document.cookie.split(';').length"),
("视口宽度", "window.innerWidth"),
("视口高度", "window.innerHeight")
]
print("\n 页面信息:")
for label, expression in queries:
result = runtime.send_command("Runtime.evaluate", {
"expression": expression,
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
value = result['result']['result']['value']
if isinstance(value, str) and len(value) > 80:
value = value[:80] + "..."
print(f" {label}: {value}")
debug_state["execution_count"] += 1
print("\n--- 测试异常处理 ---")
runtime.send_command("Runtime.evaluate", {
"expression": "throw new Error('测试异常捕获')"
})
time.sleep(1)
print("\n--- 执行页面分析 ---")
result = runtime.send_command("Runtime.evaluate", {
"expression": """
(function analyzeDOM() {
const analysis = {
structure: {
depth: 0,
totalNodes: 0,
elementTypes: {}
},
content: {
totalText: 0,
links: 0,
images: 0,
forms: 0
},
scripts: {
inline: document.querySelectorAll('script:not([src])').length,
external: document.querySelectorAll('script[src]').length
},
styles: {
inline: document.querySelectorAll('style').length,
external: document.querySelectorAll('link[rel="stylesheet"]').length
}
};
// 分析深度
function getDepth(element, depth = 0) {
analysis.structure.depth = Math.max(analysis.structure.depth, depth);
analysis.structure.totalNodes++;
const tag = element.tagName?.toLowerCase();
if (tag) {
analysis.structure.elementTypes[tag] =
(analysis.structure.elementTypes[tag] || 0) + 1;
}
for (let child of element.children) {
getDepth(child, depth + 1);
}
}
if (document.body) {
getDepth(document.body);
}
// 内容统计
analysis.content.links = document.getElementsByTagName('a').length;
analysis.content.images = document.getElementsByTagName('img').length;
analysis.content.forms = document.getElementsByTagName('form').length;
// 文本统计
const walker = document.createTreeWalker(
document.body || document,
NodeFilter.SHOW_TEXT
);
while (walker.nextNode()) {
const text = walker.currentNode.textContent.trim();
if (text) {
analysis.content.totalText += text.length;
}
}
return analysis;
})()
""",
"returnByValue": True
})
if 'result' in result and 'result' in result['result']:
analysis = result['result']['result']['value']
print(f"\n DOM 分析结果:")
print(f" 结构:")
print(f" 最大深度: {analysis['structure']['depth']}")
print(f" 总节点数: {analysis['structure']['totalNodes']}")
print(f" 元素类型: {len(analysis['structure']['elementTypes'])} 种")
print(f"\n 内容:")
print(f" 文本长度: {analysis['content']['totalText']} 字符")
print(f" 链接: {analysis['content']['links']}")
print(f" 图片: {analysis['content']['images']}")
print(f" 表单: {analysis['content']['forms']}")
print(f"\n 脚本:")
print(f" 内联: {analysis['scripts']['inline']}")
print(f" 外部: {analysis['scripts']['external']}")
print(f"\n 样式:")
print(f" 内联: {analysis['styles']['inline']}")
print(f" 外部: {analysis['styles']['external']}")
print("\n" + "="*60)
print("📊 调试会话统计:")
print(f" 执行次数: {debug_state['execution_count']}")
print(f" 控制台日志: {len(debug_state['console_logs'])}")
print(f" 捕获异常: {len(debug_state['exceptions'])}")
if debug_state['console_logs']:
print(f"\n 控制台日志类型:")
log_types = {}
for log in debug_state['console_logs']:
t = log['type']
log_types[t] = log_types.get(t, 0) + 1
for t, count in log_types.items():
print(f" {t}: {count}")
if debug_state['exceptions']:
print(f"\n 异常列表:")
for i, exc in enumerate(debug_state['exceptions'], 1):
print(f" {i}. {exc['text']}")
print("="*60)
runtime.close()
if __name__ == "__main__":
print("\n" + "="*60)
print("CDP Runtime 域完整示例集")
print("="*60)
print("\n请确保 Chrome 已使用以下命令启动:")
print("chrome --remote-debugging-port=9222 --user-data-dir=/tmp/chrome-debug")
print("\n选择要运行的示例:")
print("1. 基础 JavaScript 执行")
print("2. 对象属性检查")
print("3. 控制台消息处理")
print("4. 执行上下文管理")
print("5. 高级 JavaScript 操作")
print("6. 异步操作和 Promise")
print("7. JavaScript 调试工具(综合)")
print("0. 运行所有示例")
try:
choice = input("\n请输入选择 (0-7): ").strip()
examples = {
"1": example_basic_evaluation,
"2": example_object_inspection,
"3": example_console_handling,
"4": example_execution_contexts,
"5": example_advanced_operations,
"6": example_async_operations,
"7": example_debug_tool
}
if choice == "0":
for name, example in examples.items():
print(f"\n{'='*60}")
print(f"运行示例 {name}...")
print(f"{'='*60}")
example()
time.sleep(3)
elif choice in examples:
examples[choice]()
else:
print("无效选择")
print("\n✅ 示例执行完成!")
except requests.exceptions.ConnectionError:
print("\n❌ 错误: 无法连接到 Chrome")
print("请确保 Chrome 已使用 --remote-debugging-port=9222 启动")
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断")
except Exception as e:
print(f"\n❌ 发生错误: {e}")
import traceback
traceback.print_exc()