selenium-wire
1 安装
pip install selenium-wire
2 简单例子
from seleniumwire import webdriver
driver = webdriver.Chrome()
driver.get('https://www.baidu.com/')
# Access requests via the `requests` attribute
for request in driver.requests:
if request.response:
print(
request.url, # 请求的url
request.response.status_code, # 状态码
request.headers # 请求的headers
request.response.headers, # 返回的headers
)
3 安装SSL(winodws不需要安装)
# For apt based Linux systems
sudo apt install openssl
# For RPM based Linux systems
sudo yum install openssl
# For Linux alpine
sudo apk add openssl
4 远程网络驱动程序
from selenium.webdriver.common.by import By
from seleniumwire import webdriver
options = {
'suppress_connection_errors': False,
'auto_config': False,
'addr': '0.0.0.0',
'port': 8087,
'proxy': {
'http': <forward proxy details like scheme://user:pass@ip:port>,
'https': <forward proxy details like scheme://user:pass@ip:port>,,
},
}
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--proxy-server=kubernetes-service-name:8087')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument("--disable-dev-shm-usage");
chrome_options.add_argument("start-maximized");
chrome_options.add_argument("disable-infobars");
chrome_options.add_argument("--disable-extensions")
chrome_options.add_argument("--disable-gpu");
chrome_options.add_argument("--no-sandbox");
chrome_options.add_argument("--user-data-dir=/root/chrome/data")
# 指定chrome的路径
chrome_options.binary_location = "/opt/google/chrome/chrome"
s = Service("/usr/bin/chromedriver")
browser = webdriver.Remote('http://selenium-service-name:4444/wd/hub',service=s, desired_capabilities=chrome_options.to_capabilities(), seleniumwire_options=options)
print("Browser setup done.")
# Use try/finally so the browser quits even if there is an exception
try:
print("Getting yt.")
browser.get("https://www.youtube.com/")
print("Saving screenshot for yt")
browser.save_screenshot('yt.png')
print("Extracting Xpath.")
text = browser.find_element(By.XPATH,'/html/body/ytd-app/div/ytd-page-manager/ytd-browse/ytd-two-column-browse'
'-results-renderer/div[1]/ytd-rich-grid-renderer/div['
'6]/ytd-rich-item-renderer[1]/div/ytd-rich-grid-media/div[1]/div/div['
'1]/h3/a/yt-formatted-string').text
print(f'The title of the first video on youtube is : {text}')
except Exception as e:
print(e)
finally:
browser.quit()
print(browser.requests)
5 访问请求
driver.requests # 按时间顺序排列的捕获请求列表。
driver.last_request # 最后一个请求 用于检索最近捕获的请求的便利属性。这比使用driver.requests[-1]更有效。
driver.wait_for_request(pat, timeout=10) # 此方法将等待,直到它看到与模式匹配的请求 pat可以是简单的子字符串或正则表达式
driver.proxy = { # 设置代理
'http': 'http://user:pass@192.168.10.100:8888',
'https': 'https://user:pass@192.168.10.100:8889',
}
driver.har # 已发生的 HTTP 事务的 JSON 格式的 HAR 存档
driver.iter_requests() # 返回捕获请求的迭代器。在处理大量请求时很有用
def request_interceptor(request, response): # A response interceptor takes two args
del request.headers['Referer'] # Remember to delete the header first
request.headers['Referer'] = 'some_referer' # Spoof the referer
driver.request_interceptor = request_interceptor # 用于设置请求拦截器。
def response_interceptor(request, response): # A response interceptor takes two args
if request.url == 'https://server.com/some/path':
if request.url == 'https://server.com/some/path':
response.headers['New-Header'] = 'Some Value'
driver.response_interceptor = response_interceptor # 用于设置响应拦截器。
6 Options
options = {
'addr': '192.168.0.10', # 运行的 IP 地址或主机名。这默认为127.0.0.1。如果您使用的是远程webdriver,您可能希望将其更改为机器(或容器)的公共 IP
'port': 9999, # 后端监听的端口号。您通常不需要指定端口,因为系统会自动选择随机端口号
'auto_config': True, # Selenium Wire 是否应该为请求捕获自动配置浏览器 默认为False
'ca_cert': '/path/to/ca.crt', # 如果您更喜欢使用自己的证书而不是使用默认证书,则为根 (CA) 证书的路径
'ca_key': '/path/to/ca.key', # 如果您使用自己的根证书,则为私钥的路径。使用您自己的证书时,必须始终提供密钥
'disable_capture': True, # 禁用请求捕获。当True时,没有任何内容被拦截或存储 默认False
'disable_encoding': True, # 要求服务器发回未压缩的数据。默认False。当为True时,这会将Accept-Encoding标头设置为所有出站请求的标识
'enable_har': True, # 当True时,将保留 HTTP 事务的 HAR 存档,可以使用driver.har检索。默认False
'exclude_hosts': ['google-analytics.com'], # 应完全绕过 Selenium Wire 的地址列表
'ignore_http_methods': [], # 忽略且不捕获的 HTTP 方法列表(指定为大写字符串)
'proxy': { # 如果您使用代理,则为上游代理服务器配置。
'http': 'http://user:pass@192.168.10.100:8888',
'https': 'https://user:pass@192.168.10.100:8889',
'no_proxy': 'localhost,127.0.0.1'
},
'request_storage': 'memory', # 要使用的存储类型。Selenium Wire 默认为基于磁盘的存储
'request_storage_base_dir': '/my/storage/folder', # 使用其默认的基于磁盘的存储时存储捕获的请求和响应的基本位置
'request_storage_max_size': 100, # 使用内存存储时要存储的最大请求数。默认无限制。使用默认的基于磁盘的存储时无效
'verify_ssl': True, # 是否应验证 SSL 证书。False默认情况下,这可以防止自签名证书出错
'suppress_connection_errors': False # 是否抑制与连接相关的回溯。True默认情况下,这意味着有时在浏览器关闭时发生的无害错误不会提醒用户
}
driver = webdriver.Chrome(seleniumwire_options=options)
7 Request 对象
body # 请求正文为bytes. 如果请求没有正文,则 的值为body空,即b''.
cert # 字典格式的有关服务器 SSL 证书的信息。对于非 HTTPS 请求为空。
date # 发出请求的日期时间。
headers # 请求标头的类似字典的对象。标头不区分大小写,并且允许重复。请求request.headers['user-agent']将返回标头的值User-Agent
host # 请求主机,例如www.example.com
method # HTTP 方法,例如GET或POST等。
params # 请求参数字典。如果同名参数在请求中多次出现,它在字典中的值将是一个列表。
path # 请求路径,例如/some/path/index.html
querystring # 查询字符串,例如foo=bar&spam=eggs
response # 与请求关联的响应对象。None如果请求没有响应,就会出现这种情况。
url # 请求网址,例如https://www.example.com/some/path/index.html?foo=bar&spam=eggs
ws_messages # 如果请求是 websocket 握手请求(通常以 URL 开头wss://),则将ws_messages包含发送和接收的所有 websocket 消息的列表
abort(error_code=403) # 使用提供的错误代码触发立即终止请求。在请求拦截器中使用
create_response(status_code, headers=(), body=b'') # 创建一个响应并返回它而不向远程服务器发送任何数据。在请求拦截器中使用
8 WebSocketMessage 对象
content # 消息内容可以是str或bytes
date # 消息的日期时间
from_client # True消息何时由客户端发送以及False何时由服务器发送
9 Response 对象
body # 响应主体为bytes. 如果响应没有正文,则 的值为body空,即b''。有时正文可能已被服务器压缩。您可以使用disable_encoding 选项来防止这种情况。要手动解码编码的响应主体,您可以执行以下操作:
from seleniumwire.utils import decode
body = decode(response.body, response.headers.get('Content-Encoding', 'identity'))
date # 收到响应的日期时间
headers # 类似字典的响应标头对象。标头不区分大小写,并且允许重复
reason # 原因短语,例如OK或Not Found等
status_code # 响应的状态代码,例如200或404等
10 拦截Requests and Responses
def interceptor(request): # 添加请求头
request.headers['New-Header'] = 'Some Value'
driver.request_interceptor = interceptor
driver.get(...)
def interceptor(request): # 替换现有请求标头
del request.headers['Referer'] # Remember to delete the header first
request.headers['Referer'] = 'some_referer' # Spoof the referer
driver.request_interceptor = interceptor
driver.get(...)
def interceptor(request, response): # 添加响应标头
if request.url == 'https://server.com/some/path':
response.headers['New-Header'] = 'Some Value'
driver.response_interceptor = interceptor
driver.get(...)
def interceptor(request): # 添加请求参数
params = request.params
params['foo'] = 'bar'
request.params = params
driver.request_interceptor = interceptor
driver.get(...)
import json
def interceptor(request): # 更新 POST 请求正文中的 JSON
if request.method == 'POST' and request.headers['Content-Type'] == 'application/json':
# The body is in bytes so convert to a string
body = request.body.decode('utf-8')
# Load the JSON
data = json.loads(body)
# Add a new property
data['foo'] = 'bar'
# Set the JSON back on the request
request.body = json.dumps(data).encode('utf-8')
# Update the content length
del request.headers['Content-Length']
request.headers['Content-Length'] = str(len(request.body))
driver.request_interceptor = interceptor
driver.get(...)
import base64 # 基本身份验证
auth = (
base64.encodebytes('my_username:my_password'.encode())
.decode()
.strip()
)
def interceptor(request):
if request.host == 'host_that_needs_auth':
request.headers['Authorization'] = f'Basic {auth}'
driver.request_interceptor = interceptor
driver.get(...)
def interceptor(request): # 阻止请求
# Block PNG, JPEG and GIF images
if request.path.endswith(('.png', '.jpg', '.gif')):
request.abort()
driver.request_interceptor = interceptor
driver.get(...)
def interceptor(request): # 模拟响应
if request.url == 'https://server.com/some/path':
request.create_response(
status_code=200,
headers={'Content-Type': 'text/html'}, # Optional headers dictionary
body='<html>Hello World!</html>' # Optional body
)
driver.request_interceptor = interceptor
driver.get(...)
del driver.request_interceptor # 取消设置拦截器
del driver.response_interceptor
11 限制Request
driver.scopes = [ # 这接受将匹配要捕获的 URL 的正则表达式列表。它应该在发出任何请求之前在驱动程序上设置。当为空(默认)时,将捕获所有 URL
'.*stackoverflow.*',
'.*github.*'
]
driver.get(...) # Start making requests
options = { # 使用此选项关闭请求捕获。请求仍将通过 Selenium Wire 和您配置的任何上游代理传递,但不会被拦截或存储
'disable_capture': True # Don't intercept/store any requests
}
driver = webdriver.Chrome(seleniumwire_options=options)
options = { # 使用此选项可以完全绕过 Selenium Wire。对此处列出的地址发出的任何请求都将直接从浏览器发送到服务器,而不涉及 Selenium Wire
'exclude_hosts': ['host1.com', 'host2.com'] # Bypass Selenium Wire for these hosts
}
driver = webdriver.Chrome(seleniumwire_options=options)
def interceptor(request): # 您可以通过在请求拦截器request.abort()中使用 from来提前中止请求。这将立即向客户端发送响应,而无需进一步传输请求。您可以使用此机制来阻止某些类型的请求(例如图像)以提高页面加载性能。
# Block PNG, JPEG and GIF images
if request.path.endswith(('.png', '.jpg', '.gif')):
request.abort()
driver.request_interceptor = interceptor
driver.get(...) # Start making requests
12 代理
options = { # 配置采用以下格式
'proxy': {
'http': 'http://192.168.10.100:8888',
'https': 'https://192.168.10.100:8888',
'no_proxy': 'localhost,127.0.0.1'
}
}
driver = webdriver.Chrome(seleniumwire_options=options)
options = { # 要将 HTTP Basic Auth 与您的代理一起使用,请在 URL 中指定用户名和密码:
'proxy': {
'https': 'https://user:pass@192.168.10.100:8888',
}
}
options = { # Proxy-Authorization对于 Basic 以外的身份验证,您可以使用该选项为标头提供完整值custom_authorization。例如,如果您的代理使用 Bearer 方案
'proxy': {
'https': 'https://192.168.10.100:8888', # No username or password used
'custom_authorization': 'Bearer mytoken123' # Custom Proxy-Authorization header value
}
}
$ export HTTP_PROXY="http://192.168.10.100:8888" # 代理配置也可以通过名为HTTP_PROXY,HTTPS_PROXY和的环境变量加载NO_PROXY
$ export HTTPS_PROXY="https://192.168.10.100:8888"
$ export NO_PROXY="localhost,127.0.0.1"
options = { # 使用 SOCKS 代理与使用基于 HTTP 的代理相同,但您将方案设置为socks5
'proxy': { # 如果您的代理不需要身份验证,您可以省略 和user。pass
'http': 'socks5://user:pass@192.168.10.100:8888',
'https': 'socks5://user:pass@192.168.10.100:8888',
'no_proxy': 'localhost,127.0.0.1'
}
}
driver = webdriver.Chrome(seleniumwire_options=options)
driver.get(...) # 动态切换
# Change the proxy
driver.proxy = {
'https': 'https://user:pass@192.168.10.100:8888',
}
driver.get(...) # These requests will use the new proxy
13 机器人检测
pip install undetected-chromedriver
import seleniumwire.undetected_chromedriver as uc
chrome_options = uc.ChromeOptions()
driver = uc.Chrome(
options=chrome_options,
seleniumwire_options={}
)