跳到主要内容

🟠Target教程

  import json
import websocket
import requests
import time
import threading
from datetime import datetime

def get_websocket_url(port=9222):
"""获取主浏览器 WebSocket 调试 URL"""
response = requests.get(f'http://localhost:{port}/json')
tabs = response.json()
return tabs[0]['webSocketDebuggerUrl']

def get_browser_websocket_url(port=9222):
"""获取浏览器级别的 WebSocket URL"""
response = requests.get(f'http://localhost:{port}/json/version')
version_info = response.json()
return version_info['webSocketDebuggerUrl']

class CDPTarget:
"""CDP Target 封装类"""

def __init__(self, ws_url):
self.ws = websocket.create_connection(ws_url)
self.command_id = 0
self.event_handlers = {}
self.running = True
self.events_log = []
self.targets = {} # 存储所有 target 信息

# 启动事件监听线程
self.event_thread = threading.Thread(target=self._event_listener, daemon=True)
self.event_thread.start()

def send_command(self, method, params=None):
"""发送 CDP 命令"""
self.command_id += 1
command = {
"id": self.command_id,
"method": method,
"params": params or {}
}
print(f"\n>>> 发送命令: {method}")
if params:
# 截断过长的参数显示
params_str = json.dumps(params, indent=6, ensure_ascii=False)
if len(params_str) > 300:
params_str = params_str[:300] + "..."
print(f" 参数: {params_str}")

self.ws.send(json.dumps(command))

# 等待响应
while True:
try:
response = json.loads(self.ws.recv())
if 'id' in response and response['id'] == self.command_id:
if 'error' not in response:
print(f"<<< 响应成功")
if 'result' in response and response['result']:
result_str = json.dumps(response['result'], indent=6, ensure_ascii=False)
if len(result_str) > 300:
result_str = result_str[:300] + "..."
print(f" 结果: {result_str}")
else:
print(f"<<< 错误: {response['error']}")
return response
elif 'method' in response:
self._handle_event(response)
except Exception as e:
print(f" ⚠️ 接收响应出错: {e}")
return {"error": str(e)}

def _event_listener(self):
"""后台监听事件"""
while self.running:
try:
self.ws.settimeout(0.1)
message = self.ws.recv()
data = json.loads(message)
if 'method' in data:
self._handle_event(data)
except:
continue

def _handle_event(self, event):
"""处理事件"""
method = event['method']
self.events_log.append({
'timestamp': datetime.now().isoformat(),
'method': method,
'params': event.get('params', {})
})

if method in self.event_handlers:
self.event_handlers[method](event)

def on_event(self, event_name, handler):
"""注册事件处理器"""
self.event_handlers[event_name] = handler

def wait_for_event(self, event_name, timeout=10):
"""等待特定事件"""
start_time = time.time()
while time.time() - start_time < timeout:
for event in reversed(self.events_log):
if event['method'] == event_name:
return event
time.sleep(0.1)
return None

def close(self):
self.running = False
time.sleep(0.2)
self.ws.close()


# ========================================
# 示例 1: 基础 Target 发现和信息获取
# ========================================
def example_target_discovery():
"""
演示 Target 发现:
1. Target.setDiscoverTargets - 启用 target 发现
2. Target.getTargets - 获取所有 targets
3. Target.getTargetInfo - 获取单个 target 信息
4. Target.targetCreated - target 创建事件
5. Target.targetDestroyed - target 销毁事件
"""
print("\n" + "="*60)
print("示例 1: Target 发现和信息获取")
print("="*60)

# 使用浏览器级别的连接
browser_ws = get_browser_websocket_url()
target = CDPTarget(browser_ws)

created_count = [0]
destroyed_count = [0]

def handle_target_created(event):
created_count[0] += 1
info = event['params']['targetInfo']
print(f"\n🆕 Target 创建 #{created_count[0]}:")
print(f" ID: {info.get('targetId', 'N/A')[:40]}...")
print(f" 类型: {info.get('type', 'N/A')}")
print(f" URL: {info.get('url', 'N/A')[:80]}")
print(f" 标题: {info.get('title', 'N/A')[:50]}")

def handle_target_destroyed(event):
destroyed_count[0] += 1
target_id = event['params']['targetId']
print(f"\n❌ Target 销毁 #{destroyed_count[0]}: {target_id[:40]}...")

def handle_target_info_changed(event):
info = event['params']['targetInfo']
print(f"\n🔄 Target 信息变化:")
print(f" ID: {info.get('targetId', 'N/A')[:40]}...")
print(f" URL: {info.get('url', 'N/A')[:80]}")

# 注册事件处理器
target.on_event("Target.targetCreated", handle_target_created)
target.on_event("Target.targetDestroyed", handle_target_destroyed)
target.on_event("Target.targetInfoChanged", handle_target_info_changed)

# 1. 启用 target 发现
print("\n--- 启用 Target 发现 ---")
target.send_command("Target.setDiscoverTargets", {
"discover": True
})

time.sleep(2)

# 2. 获取所有 targets
print("\n--- 获取所有 Targets ---")
result = target.send_command("Target.getTargets")

if 'result' in result and 'targetInfos' in result['result']:
targets = result['result']['targetInfos']
print(f"\n📊 当前活跃的 Targets (共 {len(targets)} 个):")

for i, info in enumerate(targets, 1):
print(f"\n {i}. Target:")
print(f" ID: {info.get('targetId', 'N/A')[:40]}...")
print(f" 类型: {info.get('type', 'N/A')}")
print(f" URL: {info.get('url', 'N/A')[:80]}")
print(f" 标题: {info.get('title', 'N/A')[:50]}")
print(f" 已附加: {info.get('attached', False)}")

# 3. 创建新标签页触发事件
print("\n--- 创建新标签页 ---")
new_target = target.send_command("Target.createTarget", {
"url": "https://example.com"
})

time.sleep(3)

# 4. 关闭新创建的标签页
if 'result' in new_target and 'targetId' in new_target['result']:
target_id = new_target['result']['targetId']
print(f"\n--- 关闭标签页 {target_id[:40]}... ---")
target.send_command("Target.closeTarget", {
"targetId": target_id
})

time.sleep(2)

print(f"\n📊 统计: 创建 {created_count[0]} 个, 销毁 {destroyed_count[0]} 个")

target.close()


# ========================================
# 示例 2: 创建和管理 Targets
# ========================================
def example_create_and_manage_targets():
"""
演示创建和管理 targets:
1. Target.createTarget - 创建新标签页
2. Target.closeTarget - 关闭标签页
3. Target.activateTarget - 激活标签页
4. Target.createBrowserContext - 创建隔离上下文
5. Target.disposeBrowserContext - 销毁上下文
"""
print("\n" + "="*60)
print("示例 2: 创建和管理 Targets")
print("="*60)

browser_ws = get_browser_websocket_url()
target = CDPTarget(browser_ws)

target.send_command("Target.setDiscoverTargets", {"discover": True})
time.sleep(1)

# 1. 创建普通标签页
print("\n--- 创建普通标签页 ---")
tab1 = target.send_command("Target.createTarget", {
"url": "https://www.google.com",
"width": 1024,
"height": 768,
"enableBeginFrameControl": False
})

tab1_id = tab1['result']['targetId'] if 'result' in tab1 else None
print(f" 创建的 Target ID: {tab1_id[:40] if tab1_id else 'N/A'}...")
time.sleep(2)

# 2. 创建隔离的浏览器上下文
print("\n--- 创建隔离浏览器上下文 ---")
context = target.send_command("Target.createBrowserContext", {
"disposeOnDetach": True, # 断开连接时自动销毁
"proxyServer": "", # 可选:设置代理
"proxyBypassList": "" # 可选:代理绕过列表
})

context_id = context['result']['browserContextId'] if 'result' in context else None
print(f" 上下文 ID: {context_id[:40] if context_id else 'N/A'}...")

# 3. 在隔离上下文中创建标签页
if context_id:
print("\n--- 在隔离上下文中创建标签页 ---")
tab2 = target.send_command("Target.createTarget", {
"url": "https://www.wikipedia.org",
"browserContextId": context_id
})

tab2_id = tab2['result']['targetId'] if 'result' in tab2 else None
print(f" 创建的 Target ID: {tab2_id[:40] if tab2_id else 'N/A'}...")
time.sleep(2)

# 4. 激活第一个标签页
if tab1_id:
print("\n--- 激活第一个标签页 ---")
target.send_command("Target.activateTarget", {
"targetId": tab1_id
})
time.sleep(1)

# 5. 获取当前所有 targets
print("\n--- 当前所有 Targets ---")
all_targets = target.send_command("Target.getTargets")
if 'result' in all_targets:
print(f" 总数: {len(all_targets['result']['targetInfos'])}")

# 6. 关闭创建的标签页
print("\n--- 清理: 关闭创建的标签页 ---")
if tab1_id:
target.send_command("Target.closeTarget", {"targetId": tab1_id})
print(f" 已关闭: {tab1_id[:40]}...")

# 7. 销毁浏览器上下文(会自动关闭其中的所有标签页)
if context_id:
print("\n--- 销毁浏览器上下文 ---")
target.send_command("Target.disposeBrowserContext", {
"browserContextId": context_id
})
print(f" 已销毁上下文: {context_id[:40]}...")

time.sleep(2)
target.close()


# ========================================
# 示例 3: 附加到 Target 并控制
# ========================================
def example_attach_and_control():
"""
演示附加到 target 并控制:
1. Target.attachToTarget - 附加到 target
2. Target.detachFromTarget - 从 target 分离
3. Target.sendMessageToTarget - 向 target 发送消息
4. Target.setAutoAttach - 自动附加设置
"""
print("\n" + "="*60)
print("示例 3: 附加到 Target 并控制")
print("="*60)

browser_ws = get_browser_websocket_url()
target = CDPTarget(browser_ws)

session_ids = {}

def handle_attached_to_target(event):
params = event['params']
session_id = params.get('sessionId')
target_info = params.get('targetInfo', {})

print(f"\n✅ 已附加到 Target:")
print(f" Session ID: {session_id[:40]}...")
print(f" Target ID: {target_info.get('targetId', 'N/A')[:40]}...")
print(f" 类型: {target_info.get('type', 'N/A')}")

session_ids[target_info.get('targetId')] = session_id

def handle_detached_from_target(event):
session_id = event['params'].get('sessionId')
print(f"\n❌ 已从 Target 分离: {session_id[:40]}...")

def handle_received_message(event):
params = event['params']
session_id = params.get('sessionId', '')[:20]
message = json.loads(params.get('message', '{}'))
method = message.get('method', 'N/A')

# 只显示重要消息
if method in ['Page.loadEventFired', 'Runtime.executionContextCreated']:
print(f"\n📨 收到消息 [Session: {session_id}...]: {method}")

target.on_event("Target.attachedToTarget", handle_attached_to_target)
target.on_event("Target.detachedFromTarget", handle_detached_from_target)
target.on_event("Target.receivedMessageFromTarget", handle_received_message)

target.send_command("Target.setDiscoverTargets", {"discover": True})

# 1. 设置自动附加到新 targets
print("\n--- 设置自动附加 ---")
target.send_command("Target.setAutoAttach", {
"autoAttach": True,
"waitForDebuggerOnStart": False,
"flatten": True # 扁平化,包括 iframe 等
})

time.sleep(1)

# 2. 创建新标签页(会自动附加)
print("\n--- 创建新标签页(自动附加)---")
new_tab = target.send_command("Target.createTarget", {
"url": "https://example.com"
})

tab_id = new_tab['result']['targetId'] if 'result' in new_tab else None
time.sleep(3)

# 3. 通过 session 向 target 发送命令
if tab_id and tab_id in session_ids:
session_id = session_ids[tab_id]

print(f"\n--- 向 Target 发送命令 ---")
print(f" Session ID: {session_id[:40]}...")

# 启用 Page 域
message = {
"id": 1,
"method": "Page.enable",
"params": {}
}

target.send_command("Target.sendMessageToTarget", {
"sessionId": session_id,
"message": json.dumps(message)
})

time.sleep(1)

# 导航到新 URL
message = {
"id": 2,
"method": "Page.navigate",
"params": {"url": "https://www.wikipedia.org"}
}

target.send_command("Target.sendMessageToTarget", {
"sessionId": session_id,
"message": json.dumps(message)
})

time.sleep(3)

# 4. 手动从 target 分离
print(f"\n--- 手动分离 ---")
target.send_command("Target.detachFromTarget", {
"sessionId": session_id
})

# 5. 清理
if tab_id:
print(f"\n--- 关闭标签页 ---")
target.send_command("Target.closeTarget", {"targetId": tab_id})

time.sleep(2)
target.close()


# ========================================
# 示例 4: 多标签页并行操作
# ========================================
def example_parallel_tabs():
"""
演示多标签页并行操作:
1. 创建多个标签页
2. 同时控制多个标签页
3. 收集多个标签页的结果
"""
print("\n" + "="*60)
print("示例 4: 多标签页并行操作")
print("="*60)

browser_ws = get_browser_websocket_url()
target = CDPTarget(browser_ws)

target.send_command("Target.setDiscoverTargets", {"discover": True})
target.send_command("Target.setAutoAttach", {
"autoAttach": True,
"waitForDebuggerOnStart": False,
"flatten": True
})

urls = [
"https://example.com",
"https://www.wikipedia.org",
"https://news.ycombinator.com"
]

tabs = []
sessions = {}

def handle_attached(event):
session_id = event['params']['sessionId']
target_id = event['params']['targetInfo']['targetId']
sessions[target_id] = session_id

target.on_event("Target.attachedToTarget", handle_attached)

# 1. 创建多个标签页
print(f"\n--- 创建 {len(urls)} 个标签页 ---")
for i, url in enumerate(urls, 1):
print(f" 创建标签页 {i}: {url}")
result = target.send_command("Target.createTarget", {"url": url})
if 'result' in result:
tabs.append({
'targetId': result['result']['targetId'],
'url': url
})

time.sleep(4)

# 2. 对每个标签页执行操作
print(f"\n--- 并行操作 {len(tabs)} 个标签页 ---")
for i, tab in enumerate(tabs, 1):
target_id = tab['targetId']
if target_id in sessions:
session_id = sessions[target_id]

print(f"\n 操作标签页 {i} [Session: {session_id[:20]}...]")

# 启用 Runtime
target.send_command("Target.sendMessageToTarget", {
"sessionId": session_id,
"message": json.dumps({
"id": 100 + i,
"method": "Runtime.enable",
"params": {}
})
})

# 执行 JavaScript
target.send_command("Target.sendMessageToTarget", {
"sessionId": session_id,
"message": json.dumps({
"id": 200 + i,
"method": "Runtime.evaluate",
"params": {
"expression": f"console.log('标签页 {i} 正在执行')"
}
})
})

time.sleep(2)

# 3. 收集信息
print(f"\n--- 获取所有标签页信息 ---")
all_targets = target.send_command("Target.getTargets")
if 'result' in all_targets:
page_targets = [
t for t in all_targets['result']['targetInfos']
if t.get('type') == 'page'
]
print(f" 当前页面类型 Targets: {len(page_targets)} 个")

# 4. 清理所有标签页
print(f"\n--- 清理: 关闭所有创建的标签页 ---")
for i, tab in enumerate(tabs, 1):
target.send_command("Target.closeTarget", {
"targetId": tab['targetId']
})
print(f" 已关闭标签页 {i}")

time.sleep(2)
target.close()


# ========================================
# 示例 5: Service Worker 和后台 Targets
# ========================================
def example_service_workers():
"""
演示 Service Worker 管理:
1. 发现 Service Worker targets
2. 附加到 Service Worker
3. 监控 Worker 生命周期
"""
print("\n" + "="*60)
print("示例 5: Service Worker 和后台 Targets")
print("="*60)

browser_ws = get_browser_websocket_url()
target = CDPTarget(browser_ws)

worker_targets = []

def handle_target_created(event):
info = event['params']['targetInfo']
target_type = info.get('type')

if target_type in ['service_worker', 'worker', 'shared_worker']:
worker_targets.append(info)
print(f"\n👷 发现 Worker:")
print(f" 类型: {target_type}")
print(f" URL: {info.get('url', 'N/A')[:80]}")
print(f" Target ID: {info.get('targetId', 'N/A')[:40]}...")

target.on_event("Target.targetCreated", handle_target_created)

# 1. 启用发现
target.send_command("Target.setDiscoverTargets", {"discover": True})
time.sleep(1)

# 2. 获取所有 targets 并过滤 workers
print("\n--- 扫描现有 Targets ---")
all_targets = target.send_command("Target.getTargets")

if 'result' in all_targets:
targets_by_type = {}
for info in all_targets['result']['targetInfos']:
t_type = info.get('type', 'unknown')
targets_by_type[t_type] = targets_by_type.get(t_type, 0) + 1

print(f"\n📊 Targets 分类统计:")
for t_type, count in sorted(targets_by_type.items()):
print(f" {t_type}: {count}")

# 显示所有 worker 类型的 targets
worker_types = ['service_worker', 'worker', 'shared_worker']
workers = [
t for t in all_targets['result']['targetInfos']
if t.get('type') in worker_types
]

if workers:
print(f"\n👷 当前活跃的 Workers ({len(workers)} 个):")
for i, worker in enumerate(workers, 1):
print(f"\n {i}. {worker.get('type', 'N/A')}")
print(f" URL: {worker.get('url', 'N/A')[:80]}")
print(f" Title: {worker.get('title', 'N/A')}")
else:
print(f"\n 暂无活跃的 Workers")

# 3. 创建一个包含 Service Worker 的页面
print("\n--- 创建包含 Service Worker 的测试页面 ---")
test_tab = target.send_command("Target.createTarget", {
"url": "https://mdn.github.io/sw-test/" # 一个有 Service Worker 的示例
})

print(" 等待 Service Worker 注册...")
time.sleep(5)

# 4. 再次检查 workers
print("\n--- 检查新的 Workers ---")
if worker_targets:
print(f" 发现了 {len(worker_targets)} 个新 Worker")

# 5. 清理
if 'result' in test_tab:
target.send_command("Target.closeTarget", {
"targetId": test_tab['result']['targetId']
})

time.sleep(2)
target.close()


# ========================================
# 示例 6: 浏览器上下文隔离(隐身模式模拟)
# ========================================
def example_browser_contexts():
"""
演示浏览器上下文隔离:
1. 创建多个隔离的浏览器上下文
2. 在不同上下文中操作
3. 验证 Cookie 和存储隔离
"""
print("\n" + "="*60)
print("示例 6: 浏览器上下文隔离")
print("="*60)

browser_ws = get_browser_websocket_url()
target = CDPTarget(browser_ws)

target.send_command("Target.setDiscoverTargets", {"discover": True})

contexts = []

# 1. 创建第一个隔离上下文
print("\n--- 创建上下文 1 ---")
ctx1 = target.send_command("Target.createBrowserContext", {
"disposeOnDetach": True
})
if 'result' in ctx1:
ctx1_id = ctx1['result']['browserContextId']
contexts.append(ctx1_id)
print(f" 上下文 1 ID: {ctx1_id[:40]}...")

# 在上下文 1 中创建标签页
tab1 = target.send_command("Target.createTarget", {
"url": "https://httpbin.org/cookies/set/context1/value1",
"browserContextId": ctx1_id
})
print(f" 已在上下文 1 中创建标签页")

time.sleep(2)

# 2. 创建第二个隔离上下文
print("\n--- 创建上下文 2 ---")
ctx2 = target.send_command("Target.createBrowserContext", {
"disposeOnDetach": True
})
if 'result' in ctx2:
ctx2_id = ctx2['result']['browserContextId']
contexts.append(ctx2_id)
print(f" 上下文 2 ID: {ctx2_id[:40]}...")

# 在上下文 2 中创建标签页
tab2 = target.send_command("Target.createTarget", {
"url": "https://httpbin.org/cookies/set/context2/value2",
"browserContextId": ctx2_id
})
print(f" 已在上下文 2 中创建标签页")

time.sleep(2)

# 3. 创建默认上下文中的标签页
print("\n--- 在默认上下文中创建标签页 ---")
tab3 = target.send_command("Target.createTarget", {
"url": "https://httpbin.org/cookies/set/default/value3"
})

time.sleep(2)

# 4. 获取浏览器上下文列表
print("\n--- 获取所有浏览器上下文 ---")
all_contexts = target.send_command("Target.getBrowserContexts")
if 'result' in all_contexts:
context_ids = all_contexts['result']['browserContextIds']
print(f" 浏览器上下文数量: {len(context_ids) + 1}") # +1 是默认上下文
for i, ctx_id in enumerate(context_ids, 1):
print(f" {i}. {ctx_id[:40]}...")

# 5. 显示所有标签页及其所属上下文
print("\n--- 所有标签页及其上下文 ---")
all_targets = target.send_command("Target.getTargets")
if 'result' in all_targets:
pages = [t for t in all_targets['result']['targetInfos'] if t.get('type') == 'page']
for i, page in enumerate(pages, 1):
ctx_id = page.get('browserContextId', '默认上下文')
print(f" {i}. URL: {page.get('url', 'N/A')[:60]}")
print(f" 上下文: {ctx_id[:40] if ctx_id != '默认上下文' else ctx_id}...")

# 6. 清理上下文(会自动关闭其中的标签页)
print("\n--- 清理: 销毁所有创建的上下文 ---")
for i, ctx_id in enumerate(contexts, 1):
target.send_command("Target.disposeBrowserContext", {
"browserContextId": ctx_id
})
print(f" 已销毁上下文 {i}")

# 清理默认上下文中的标签页
if 'result' in tab3:
target.send_command("Target.closeTarget", {
"targetId": tab3['result']['targetId']
})

time.sleep(2)
target.close()


# ========================================
# 示例 7: 综合应用 - Target 管理器
# ========================================
def example_target_manager():
"""
综合示例: 实现完整的 Target 管理器
1. 实时监控所有 targets
2. 自动管理连接
3. 统计和报告
"""
print("\n" + "="*60)
print("示例 7: Target 管理器")
print("="*60)

browser_ws = get_browser_websocket_url()
target = CDPTarget(browser_ws)

# 管理器状态
manager_state = {
"all_targets": {},
"sessions": {},
"statistics": {
"created": 0,
"destroyed": 0,
"attached": 0,
"detached": 0
}
}

def handle_target_created(event):
info = event['params']['targetInfo']
target_id = info['targetId']

manager_state["all_targets"][target_id] = info
manager_state["statistics"]["created"] += 1

print(f"\n🆕 Target #{manager_state['statistics']['created']} 创建")
print(f" ID: {target_id[:30]}...")
print(f" 类型: {info.get('type', 'N/A')}")
print(f" URL: {info.get('url', 'N/A')[:60]}")

def handle_target_destroyed(event):
target_id = event['params']['targetId']

if target_id in manager_state["all_targets"]:
del manager_state["all_targets"][target_id]

manager_state["statistics"]["destroyed"] += 1
print(f"\n❌ Target 销毁: {target_id[:30]}...")

def handle_target_info_changed(event):
info = event['params']['targetInfo']
target_id = info['targetId']

if target_id in manager_state["all_targets"]:
old_url = manager_state["all_targets"][target_id].get('url', '')
new_url = info.get('url', '')

if old_url != new_url:
print(f"\n🔄 Target 导航")
print(f" 从: {old_url[:50]}")
print(f" 到: {new_url[:50]}")

manager_state["all_targets"][target_id] = info

def handle_attached_to_target(event):
session_id = event['params']['sessionId']
target_id = event['params']['targetInfo']['targetId']

manager_state["sessions"][target_id] = session_id
manager_state["statistics"]["attached"] += 1

print(f"\n✅ 附加到 Target")
print(f" Target: {target_id[:30]}...")
print(f" Session: {session_id[:30]}...")

def handle_detached_from_target(event):
session_id = event['params']['sessionId']
manager_state["statistics"]["detached"] += 1

# 找到对应的 target_id 并移除
for tid, sid in list(manager_state["sessions"].items()):
if sid == session_id:
del manager_state["sessions"][tid]
break

print(f"\n❌ 从 Target 分离: {session_id[:30]}...")

# 注册所有事件
target.on_event("Target.targetCreated", handle_target_created)
target.on_event("Target.targetDestroyed", handle_target_destroyed)
target.on_event("Target.targetInfoChanged", handle_target_info_changed)
target.on_event("Target.attachedToTarget", handle_attached_to_target)
target.on_event("Target.detachedFromTarget", handle_detached_from_target)

# 启动管理器
print("\n--- 启动 Target 管理器 ---")
target.send_command("Target.setDiscoverTargets", {"discover": True})
target.send_command("Target.setAutoAttach", {
"autoAttach": True,
"waitForDebuggerOnStart": False,
"flatten": True
})

time.sleep(2)

# 创建一些测试 targets
print("\n--- 创建测试 Targets ---")
test_urls = [
"https://example.com",
"https://www.wikipedia.org"
]

test_targets = []
for url in test_urls:
result = target.send_command("Target.createTarget", {"url": url})
if 'result' in result:
test_targets.append(result['result']['targetId'])

time.sleep(5)

# 显示当前状态
print("\n" + "="*60)
print("📊 当前管理器状态:")
print(f" 活跃 Targets: {len(manager_state['all_targets'])}")
print(f" 已附加会话: {len(manager_state['sessions'])}")

print("\n📈 统计数据:")
for key, value in manager_state['statistics'].items():
print(f" {key}: {value}")

print("\n📋 Targets 详情:")
for i, (tid, info) in enumerate(manager_state['all_targets'].items(), 1):
print(f"\n {i}. {info.get('type', 'N/A')}")
print(f" ID: {tid[:40]}...")
print(f" URL: {info.get('url', 'N/A')[:60]}")
print(f" 已附加: {'是' if tid in manager_state['sessions'] else '否'}")

# 通过 session 控制 target
print("\n--- 通过 Session 控制 Targets ---")
for tid, session_id in manager_state['sessions'].items():
if tid in test_targets[:1]: # 只控制第一个测试 target
print(f"\n控制 Target: {tid[:30]}...")

# 执行截图
message = {
"id": 999,
"method": "Page.captureScreenshot",
"params": {"format": "png"}
}

target.send_command("Target.sendMessageToTarget", {
"sessionId": session_id,
"message": json.dumps(message)
})
print(f" 已发送截图命令")

time.sleep(2)

# 清理
print("\n--- 清理所有测试 Targets ---")
for tid in test_targets:
target.send_command("Target.closeTarget", {"targetId": tid})

time.sleep(2)

# 最终报告
print("\n" + "="*60)
print("📊 最终统计:")
print(f" 总创建: {manager_state['statistics']['created']}")
print(f" 总销毁: {manager_state['statistics']['destroyed']}")
print(f" 总附加: {manager_state['statistics']['attached']}")
print(f" 总分离: {manager_state['statistics']['detached']}")
print(f" 剩余活跃: {len(manager_state['all_targets'])}")
print("="*60)

target.close()


# ========================================
# 主函数
# ========================================
if __name__ == "__main__":
print("\n" + "="*60)
print("CDP Target 域完整示例集")
print("="*60)
print("\n请确保 Chrome 已使用以下命令启动:")
print("chrome --remote-debugging-port=9222 --user-data-dir=/tmp/chrome-debug")
print("\n选择要运行的示例:")
print("1. Target 发现和信息获取")
print("2. 创建和管理 Targets")
print("3. 附加到 Target 并控制")
print("4. 多标签页并行操作")
print("5. Service Worker 和后台 Targets")
print("6. 浏览器上下文隔离")
print("7. Target 管理器(综合)")
print("0. 运行所有示例")

try:
choice = input("\n请输入选择 (0-7): ").strip()

examples = {
"1": example_target_discovery,
"2": example_create_and_manage_targets,
"3": example_attach_and_control,
"4": example_parallel_tabs,
"5": example_service_workers,
"6": example_browser_contexts,
"7": example_target_manager
}

if choice == "0":
for name, example in examples.items():
print(f"\n{'='*60}")
print(f"运行示例 {name}...")
print(f"{'='*60}")
example()
time.sleep(3)
elif choice in examples:
examples[choice]()
else:
print("无效选择")

print("\n✅ 示例执行完成!")

except requests.exceptions.ConnectionError:
print("\n❌ 错误: 无法连接到 Chrome")
print("请确保 Chrome 已使用 --remote-debugging-port=9222 启动")
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断")
except Exception as e:
print(f"\n❌ 发生错误: {e}")
import traceback
traceback.print_exc()