当使用selenium去某宝或其他网站进行爬虫或者模拟登陆时,会出现滑动验证码,并且无论是用ActionChains滑还是手动滑,都会很委婉的告诉你“哎呀网络错误,请刷新”等等。why?
爬虫都会碰到某些网站刚刚打开页面就被判定为:非人类行为,因为很多网站有对selenium的js监测机制。
经过科学上网,查阅众多资料,发现selenium有一些特征值, 例如下面:
window.navigator.webdriver
window.navigator.languages
window.navigator.plugins.length
1.“navigator.plugins.length”此参数可以检测selenium的headless模式,headless模式下为0,所以可以添加假的值来规避检测;
2.“navigator.languages”确保将此参数设置为chrome的默认值[“en-US”,“en”,“es”]
美团,大众,淘宝这些大站点都有这种技术能力。。对window.navigator.webdriver的检测机制。
正常情况下 window.navigator.webdriver的值为undefined。
而当我们使用selenium 的时候-window.navigator.webdriver的值为True。 如下图
手动安装
通过pip使用豆瓣源加速安装pyppeteer:
pip install -i https://pypi.douban.com/simple pypeteer
or
pip install pypeteer
按照官方手册,先来感受一下:
# -*- coding:utf-8 -*-
import asyncio
from pyppeteer import launch
async def main():
browser = await launch(headless=False)
page = await browser.newPage()
await page.goto('http://www.baidu.com/')
await asyncio.sleep(10)
await browser.close()
asyncio.get_event_loop().run_until_complete(main())
pyppeteer第一次运行时,会自动下载chromium浏览器,时间可能会有些长。不过,我第一次运行时,直接报错:
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1056)
尝试多种方法无果,无奈只能手动下载,但手动下载的方法网上资料也几乎没有,让我来做这个先行者吧。
上面代码运行虽然报错,但是控制台前两行却提供了很有用的信息:
[W:pyppeteer.chromium_downloader] start chromium download.
Download may take a few minutes.
可以看到,下载功能是由pyppeteer.chromium_downloader模块完成的,那么我们进入这个模块查看源码。
在这个模块源码中,我们可以看到downloadURLs、chromiumExecutable等变量,很明显指的就是下载链接和chromium的可执行文件路径。我们重点关注一下可执行文件路径
chromiumExecutable:
chromiumExecutable = {
'linux': DOWNLOADS_FOLDER / REVISION / 'chrome-linux' / 'chrome',
'mac': (DOWNLOADS_FOLDER / REVISION / 'chrome-mac' / 'Chromium.app' /
'Contents' / 'MacOS' / 'Chromium'),
'win32': DOWNLOADS_FOLDER / REVISION / 'chrome-win32' / 'chrome.exe',
'win64': DOWNLOADS_FOLDER / REVISION / 'chrome-win32' / 'chrome.exe',
}
可见,无论在哪个平台下,chromiumExecutable都是由是4个部分组成,其中 DOWNLOADS_FOLDER 和 REVISION是定义好的变量:
DOWNLOADS_FOLDER = Path.home() / '.pyppeteer' / 'local-chromium'
进一步查看可以发现:
from pathlib import Path
Path.home()
Out[3]: WindowsPath('C:/Users/WYXCz')
from pyppeteer import __chromimum_revision__ as REVISION
#__chromimum_revision__ = '543305'
REVISION = ‘543305’
所以,DOWNLOADS_FOLDER和 REVISION都是读取对应环境变量设置好的值,如果没有设置,就使用默认值。我们来输出一下,看看默认值:
import pyppeteer.chromium_downloader
print('默认版本是:{}'.format(pyppeteer.__chromimum_revision__))
print('可执行文件默认路径:{}'.format(pyppeteer.chromium_downloader.chromiumExecutable.get('win64')))
print('win64平台下载链接为:{}'.format(pyppeteer.chromium_downloader.downloadURLs.get('win64')))
输出结果如下:
默认版本是:543305
可执行文件默认路径:C:\Users\WYXCz\.pyppeteer\local-chromium\543305\chrome-win32\chrome.exe
win64平台下载链接为:https://storage.googleapis.com/chromium-browser-snapshots/Win_x64/543305/chrome-win32.zip
在使用上面代码的时候,你可以将win64换成你的平台就好了,有了上面的下载链接,这个时候就可以先开始下载着chromium浏览器(有些慢),然后继续往下看。
打开浏览器是通过pyppeteer.launcher.launch(options: dict = None, **kwargs) 方法,运行该函数后,会得到一个pyppeteer.browser.Browser实例,也就是说浏览器对象实例。launch方法是必须使用的方法,所以,详细学学它的参数,你也直接阅读官方文档,因为我也是直接翻译的:
一般来说我们只是会设置headless,devtools,和传入一些必要的args
newPage()方法,相当于我们在浏览器里点开了新建选项卡
goto(),里面传入我们想要的url,即可前往指定的网页
Page类选择器相关方法有5个,并且这五个都有别名,分别是:
J()别名querySelector()
JJ()别名querySelectorAll()
JJeval()别名querySelectorAllEval()
Jeval()别名querySelectorEval()
Jx()别名xpath()
NOTE 生成pdf的操作只有Chrome浏览器才有效。
page.pdf()以 print的 css media生成pdf,如果想生成一个 screenmedia的PDF,请在使用 page.pdf()之前调用page.emulateMedia(‘screen’)方法。
// Generates a PDF with 'screen' media type.
await page.emulateMedia('screen');
await page.pdf({path: 'page.pdf'});
width, height, 和 margin属性接受的值应该明确带上相应的单位,否则将会被默认为 px单位。
一些例子:
所有可选的单位:
format 属性的可选值:
如果你运行了上面的代码,你会发现,打开的页面只在窗口左上角一小块显示,看着很别扭,这是因为pyppeteer默认窗口大小是800*600,所以,调整一下吧。调整窗口大小通过方法实现,看下面代码,最大化窗口:
# -*- coding:utf-8 -*-
import asyncio
from pyppeteer import launch,chromium_downloader
def screen_size():
'使用tkinter获取屏幕大小'
import tkinter
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return width, height
async def main():
browser = await launch(headless=False)
page = await browser.newPage()
width, height = screen_size()
# 最大化窗口
await page.setViewport({
'width': width,
'height': height
})
await page.goto('http://www.baidu.com/')
await asyncio.sleep(10)
await browser.close()
asyncio.get_event_loop().run_until_complete(main())
3.3 设置userAgent
常规操作,不多说,上代码:
# -*- coding:utf-8 -*-
import asyncio
from pyppeteer import launch,chromium_downloader
def screen_size():
'使用tkinter获取屏幕大小'
import tkinter
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return width, height
async def main():
width, height = screen_size()
'''
利用launch方法传入args设定窗口大小,而后面那个disable-infobars则是去除那个浏览器的“chrome当前正在受自动化测试软件控制”这个选项卡
'''
browser = await launch(headless=False, args=[f'--window-size={width},{height}','--disable-infobars'])
page = await browser.newPage()
#设置请求头userAgent
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36')
# 最大化窗口
await page.setViewport({
'width': width,
'height': height
})
await page.goto('http://www.baidu.com/')
await asyncio.sleep(10)
await browser.close()
asyncio.get_event_loop().run_until_complete(main())
有时候,为了达成某些目的(例如屏蔽网站原有js),我们不可避免得需要执行一些js脚本。执行js脚本通过evaluate方法。如下所示,我们通过js来修改window.navigator.webdriver属性的值,由此绕过网站对webdriver的检测:
import asyncio
from pyppeteer import launch
async def main():
js1 = '''() =>{
Object.defineProperties(navigator,{
webdriver:{
get: () => false
}
})
}'''
js2 = '''() => {
alert (
window.navigator.webdriver
)
}'''
browser = await launch({'headless':False,'args':['–no-sandbox'],})
page = await browser.newPage()
await page.goto('https://h5.ele.me/login/')
await page.evaluate(js1)
await page.evaluate(js2)
asyncio.get_event_loop().run_until_complete(main())
在上面代码中,通过page.evalute方法执行了两段js脚本,第一段脚本将webdriver的属性值设为false,第二段代码在此读取 webdriver属性值,输出为false。
pyppeteer提供了Keyboard和Mouse两个类来实现模拟操作,前者是用来实现键盘模拟,后者实现鼠标模拟(还有其他触屏之类的就不说了)。
主要来说说输入和点击:
import asyncio
from pyppeteer import launch
async def main():
browser = await launch(headless=False, args=['–disable-infobars'])
page = await browser.newPage()
await page.goto('https://h5.ele.me/login/')
await page.type('form section input','12345678999') # 模拟键盘输入手机号
await page.click('form section button') # 模拟鼠标点击获取验证码
await asyncio.sleep(200)
await browser.close()
asyncio.get_event_loop().run_until_complete(main())
上面的模拟操作中,无论是模拟键盘输入还是鼠标点击定位都是通过css选择器,似乎pyppeteer的type和click直接模拟操作定位都只能通过css选择器(或者是我在官方文档中没找到方法),当然,要间接通过xpath先定位,然后再模拟操作也是可以的。下一小节中模拟登陆外卖平台就是用这种方法,不过,这种方法要麻烦一些,不推荐。
我曾经用selenium + chrome 实现了模拟登陆这个电商平台,但是实在是有些麻烦,绕过对webdriver的检测不难,但是,通过webdriver对浏览器的每一步操作都会留下特殊的痕迹,会被平台识别,这个必须通过重新编译chrome的webdriver才能实现,麻烦得让人想哭。不说了,都是泪,下面直接上用pyppeteer实现的代码:
import asyncio
from pyppeteer import launch
def screen_size():
#使用tkinter获取屏幕大小
import tkinter
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return width, height
async def main():
js1 = '''() =>{
Object.defineProperties(navigator,{
webdriver:{
get: () => false
}
})
}'''
js2 = '''() => {
alert (
window.navigator.webdriver
)
}'''
browser = await launch({'headless':False, 'args':['--no-sandbox'],})
page = await browser.newPage()
width, height = screen_size()
# 最大化窗口
await page.setViewport({
"width": width,
"height": height
})
await page.goto('https://h5.ele.me/login/')
await page.evaluate(js1)
await page.evaluate(js2)
input_sjh = await page.xpath('//form/section[1]/input[1]')
click_yzm = await page.xpath('//form/section[1]/button[1]')
input_yzm = await page.xpath('//form/section[2]/input[1]')
but = await page.xpath('//form/section[2]/input[1]')
print(input_sjh)
await input_sjh[0].type('*****手机号********')
await click_yzm[0].click()
ya = input('请输入验证码:')
await input_yzm[0].type(str(ya))
await but[0].click()
await asyncio.sleep(3)
await page.goto('https://www.ele.me/home/')
await asyncio.sleep(100)
await browser.close()
asyncio.get_event_loop().run_until_complete(main())
登录时,由于等待时间过长(我猜的)导致出现以下错误:
pyppeteer.errors.NetworkError: Protocol Error (Runtime.callFunctionOn): Session closed. Most likely the page has been closed.
在github上找到了解决方法,似乎只能改源码,找到pyppeteer包下的connection.py模块,在其43行和44行改为下面这样:
self._ws = websockets.client.connect(self._url, max_size=None, loop=self._loop)
self._url, max_size=None, loop=self._loop, ping_interval=None, ping_timeout=None)
再次运行就没问题了。可以成功绕过官方对webdriver的检测,登录成功,诸位可以自己尝试一下。
当使用selenium+webdriver写爬虫被检测到时,pyppeteer是你得不二选择,几乎所有能在人工操作浏览器进行的操作通过pyppeteer都能实现,且能完美避开官方对webdriver的检测。pyppeteer涉及的使用方法还很多,本文只介绍了常用方法的很小很小一部分,需要一说的是,pyppeteer的中文资料真的很少,多看看官方文档吧。
# -*- coding:utf-8 -*-
import asyncio
from pyppeteer import launch,chromium_downloader
def screen_size():
'使用tkinter获取屏幕大小'
import tkinter
tk = tkinter.Tk()
width = tk.winfo_screenwidth()
height = tk.winfo_screenheight()
tk.quit()
return width, height
async def main():
width, height = screen_size()
'''
利用launch方法传入args设定窗口大小,而后面那个disable-infobars则是去除那个浏览器的“chrome当前正在受自动化测试软件控制”这个选项卡
'''
browser = await launch(headless=False, args=[f'--window-size={width},{height}','--disable-infobars'])
page = await browser.newPage()
# 是否启用JS,enabled设为False,则无渲染效果
await page.setJavaScriptEnabled(enabled=True)
#设置请求头userAgent
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36')
# 最大化窗口
await page.setViewport({
'width': width,
'height': height
})
## 超时间见 1000 毫秒
res=await page.goto('http://www.baidu.com/',options={'timeout': 1000})
resp_headers = res.headers # 响应头
resp_status = res.status # 响应状态
#页面截图
await page.screenshot({'path': 'example.png'})
# 滚动到页面底部
await page.evaluate('window.scrollBy(0, document.body.scrollHeight)')
# 打印页面cookies
print(await page.cookies())
# 获取所有 html 内容
print(await page.content())
# 在网页上执行js 脚本
dimensions = await page.evaluate(pageFunction='''() => {
return {
width: document.documentElement.clientWidth, // 页面宽度
height: document.documentElement.clientHeight, // 页面高度
deviceScaleFactor: window.devicePixelRatio, // 像素比 1.0000000149011612
}
}''', force_expr=False) # force_expr=False 执行的是函数
print(dimensions)
# 只获取文本 执行 js 脚本 force_expr 为 True 则执行的是表达式
content = await page.evaluate(pageFunction='document.body.textContent', force_expr=True)
print(content)
# 打印当前页标题
print(await page.title())
# 抓取新闻内容 可以使用 xpath 表达式
"""
# Pyppeteer 三种解析方式
Page.querySelector() # 选择器
Page.querySelectorAll()
Page.xpath() # xpath 表达式
# 简写方式为:
Page.J(), Page.JJ(), and Page.Jx()
"""
element = await page.querySelector(".feed-infinite-wrapper > ul>li") # 只抓取一个
print(element)
# 获取所有文本内容 执行 js
content = await page.evaluate('(element) => element.textContent', element)
print(content)
# elements = await page.xpath('//div[@class="title-box"]/a')
elements = await page.querySelectorAll(".title-box a")
for item in elements:
print(await item.getProperty('textContent'))
# <pyppeteer.execution_context.JSHandle object at 0x000002220E7FE518>
# 获取文本
title_str = await (await item.getProperty('textContent')).jsonValue()
# 获取链接
title_link = await (await item.getProperty('href')).jsonValue()
print(title_str,title_link)
await page.click("#J_SubmitStatic")
# 使用page.pdf之前需要调用page.emulateMedia('screen')
await page.emulateMedia('screen')
await page.pdf({'path': 'page.pdf', 'width': '100px', 'format': 'A4'}) # 打印宽度设置为100像素
await page.pdf({'width': '10cm'}) # 打印宽度设置为100厘米
await asyncio.sleep(10)
await browser.close()#关闭浏览器对象
asyncio.get_event_loop().run_until_complete(main())
# -*- coding:utf-8 -*-
import asyncio
import time, random
from pyppeteer.launcher import launch # 控制模拟浏览器用
from retrying import retry # 设置重试次数用的
async def main(username, pwd, url): # 定义main协程函数,
# 以下使用await 可以针对耗时的操作进行挂起
# 启动pyppeteer 属于内存中实现交互的模拟器
browser = await launch({'headless': False, 'args': ['--no-sandbox'], })
page = await browser.newPage() # 启动个新的浏览器页面
await page.setUserAgent(
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36')
await page.goto(url) # 访问登录页面
# 替换淘宝在检测浏览时采集的一些参数。就是在浏览器运行的时候,始终让window.navigator.webdriver=false
# navigator是windiw对象的一个属性,同时修改plugins,languages,navigator 且让
# 以下为插入中间js,将淘宝会为了检测浏览器而调用的js修改其结果。
await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }''')
await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {}, }; }''')
await page.evaluate('''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''')
await page.evaluate('''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''')
# 使用type选定页面元素,并修改其数值,用于输入账号密码,修改的速度仿人类操作,因为有个输入速度的检测机制
# 因为 pyppeteer 框架需要转换为js操作,而js和python的类型定义不同,所以写法与参数要用字典,类型导入
await page.type('.J_UserName', username, {'delay': input_time_random() - 50})
await page.type('#J_StandardPwd input', pwd, {'delay': input_time_random()})
# await page.screenshot({'path': './headless-test-result.png'}) # 截图测试
time.sleep(2)
# 检测页面是否有滑块。原理是检测页面元素。
slider = await page.Jeval('#nocaptcha', 'node => node.style') # 是否有滑块
if slider:
print('当前页面出现滑块')
# await page.screenshot({'path': './headless-login-slide.png'}) # 截图测试
flag, page = await mouse_slide(page=page) # js拉动滑块过去。
if flag:
await page.keyboard.press('Enter') # 确保内容输入完毕,少数页面会自动完成按钮点击
print("print enter", flag)
# 如果无法通过回车键完成点击,就调用js模拟点击登录按钮。
await page.evaluate('''document.getElementById("J_SubmitStatic").click()''')
time.sleep(2)
# cookies_list = await page.cookies()
# print(cookies_list)
await get_cookie(page) # 导出cookie 完成登陆后就可以拿着cookie玩各种各样的事情了。
else:
await page.keyboard.press('Enter')
print("print enter")
await page.evaluate('''document.getElementById("J_SubmitStatic").click()''')
await page.waitFor(20)
await page.waitForNavigation()
try:
global error # 检测是否是账号密码错误
print("error_1:", error)
error = await page.Jeval('.error', 'node => node.textContent')
print("error_2:", error)
except Exception as e:
error = None
finally:
if error:
print('确保账户安全重新入输入')
# 程序退出。
loop.close()
else:
print(page.url)
await get_cookie(page)
time.sleep(100)
# 获取登录后cookie
async def get_cookie(page):
# res = await page.content()
cookies_list = await page.cookies()
cookies = ''
for cookie in cookies_list:
str_cookie = '{0}={1};'
str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value'))
cookies += str_cookie
print(cookies)
return cookies
def retry_if_result_none(result):
return result is None
@retry(retry_on_result=retry_if_result_none, )
async def mouse_slide(page=None):
await asyncio.sleep(2)
try:
# 鼠标移动到滑块,按下,滑动到头(然后延时处理),松开按键
await page.hover('#nc_1_n1z') # 不同场景的验证码模块能名字不同。
await page.mouse.down()
await page.mouse.move(2000, 0, {'delay': random.randint(1000, 2000)})
await page.mouse.up()
except Exception as e:
print(e, ':验证失败')
return None, page
else:
await asyncio.sleep(2)
# 判断是否通过
slider_again = await page.Jeval('.nc-lang-cnt', 'node => node.textContent')
if slider_again != '验证通过':
return None, page
else:
# await page.screenshot({'path': './headless-slide-result.png'}) # 截图测试
print('验证通过')
return 1, page
def input_time_random():
return random.randint(100, 151)
if __name__ == '__main__':
username = 'xxxxxxxx' # 淘宝用户名
pwd = 'xxxxxxxxx' # 密码
url = 'https://login.taobao.com/member/login.jhtml?style=mini&css_style=b2b&from=b2b&full_redirect=true&redirect_url=https://login.1688.com/member/jump.htm?target=https://login.1688.com/member/marketSigninJump.htm?Done=http://login.1688.com/member/taobaoSellerLoginDispatch.htm®= http://member.1688.com/member/join/enterprise_join.htm?lead=http://login.1688.com/member/taobaoSellerLoginDispatch.htm&leadUrl=http://login.1688.com/member/'
# 协程,开启个无限循环的程序流程,把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
loop = asyncio.get_event_loop()
# 将协程注册到事件循环,并启动事件循环
loop.run_until_complete(main(username, pwd, url))
运行pyppeteer时不时会报这个错误,虽然不影响到程序得运行,但是会影响到程序进程得关闭,这个错误是代表kill chrome 进程时失败。
然后再查看进程 发现程序关闭了,chrome进程依然在
解决办法 :
不要设置'args': ['--no-sandbox']
我的问题是这样解决的,
browser = await launch({'headless': False,'userDataDir':r'D:\temp'})
如果设置了userDataDir,有人说,不要设置–no-sandbox这个参数,但是并不能解决这个问题,今天看了pyppeteer的文档,想起来这个问题,原来我项目的临时数据目录是存在了c盘,但是当删除它的时候,应该是遇到了权限问题,没有权限没法删除啊,所以,如有遇到类似错误的朋友,自己在一个有权限删除的路径下,创建一个存储临时数据的目录,记住这个路径要有权限删除的哈。
也有可能是忘记关闭页面导致的错误,
await page.waitFor(30)
await page.close()
pyppeteer地址:https://github.com/miyakogi/pyppeteer