【1】先确定是否为动态加载网站
【2】找URL规律
【3】正则表达式 | xpath表达式
【4】定义程序框架,补全并测试代码
【1】整体思路
1.1> 爬取一级页面,提取 所需数据+链接,继续跟进
1.2> 爬取二级页面,提取 所需数据+链接,继续跟进
1.3> ... ...
【2】代码实现思路
2.1> 避免重复代码 - 请求、解析需定义函数
【1】基于User-Agent反爬
1.1) 发送请求携带请求头: headers={'User-Agent' : 'Mozilla/5.0 xxxxxx'}
1.2) 多个请求时随机切换User-Agent
a) 定义py文件存放大量User-Agent,导入后使用random.choice()每次随机选择
b) 使用fake_useragent模块每次访问随机生成User-Agent
from fake_useragent import UserAgent
agent = UserAgent().random
细节要点:pycharm中下载fake-useragent
【2】响应内容存在特殊字符
解码时使用ignore参数
html = requests.get(url=url, headers=headers).content.decode('', 'ignore')
使用requests获取的resposne对象,具有cookies属性。该属性值是一个cookieJar类型,包含了对方服务器设置在本地的cookie。我们如何将其转换为cookies字典呢?
【1】方法一 : requests.get()
【2】参数
2.1) url
2.2) headers
2.3) timeout
2.4) proxies
【3】方法二 :requests.post()
【4】参数
data
【1】url
【2】proxies -> {}
proxies = {
'http':'http://1.1.1.1:8888',
'https':'https://1.1.1.1:8888'
}
【3】timeout
【4】headers
【5】cookies
【1】适用场景 : Post类型请求的网站
【2】参数 : data={}
2.1) Form表单数据: 字典
2.2) res = requests.post(url=url,data=data,headers=headers)
【3】POST请求特点 : Form表单提交数据
data : 字典,Form表单数据
【1】pycharm进入方法 :Ctrl + r ,选中 Regex
【2】处理headers和formdata
(.*): (.*)
"$1": "$2",
【3】点击 Replace All
session实例在请求了一个网站后,对方服务器设置在本地的cookie会保存在session中,下一次再使用session请求对方服务器的时候,会带上前一次的cookie
session = requests.session() # 实例化session对象
response = session.get(url, headers, ...)
response = session.post(url, data, ...)
【1】右键 -> 查看网页源码中没有具体数据
【2】滚动鼠标滑轮或其他动作时加载,或者页面局部刷新
【1】F12打开控制台,页面动作抓取网络数据包
【2】抓取json文件URL地址
2.1) 控制台中 XHR :异步加载的数据包
2.2) XHR -> QueryStringParameters(查询参数)
【1】作用 : 把json格式的字符串转为Python数据类型
【2】示例 : html = json.loads(res.text)
【1】作用
把python数据类型 转为 json格式的字符串,一般让你把抓取的数据保存为json文件时使用
【2】参数说明
2.1) 第1个参数: python类型的数据(字典,列表等)
2.2) 第2个参数: 文件对象
2.3) 第3个参数: ensure_ascii=False 序列化时编码
【3】示例代码
# 示例1
import json
item = {'name':'QQ','app_id':1}
with open('小米.json','a') as f:
json.dump(item,f,ensure_ascii=False)
# 示例2
import json
item_list = []
for i in range(3):
item = {'name':'QQ','id':i}
item_list.append(item)
with open('xiaomi.json','a') as f:
json.dump(item_list,f,ensure_ascii=False)
jsonpath使用示例
book_dict = {
"store": {
"book": [
{ "category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{ "category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{ "category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{ "category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
}
}
from jsonpath import jsonpath
print(jsonpath(book_dict, '$..author')) # 如果取不到将返回False # 返回列表,如果取不到将返回False
# 爬虫最常用
【1】数据抓取 - json.loads(html)
将响应内容由: json 转为 python
【2】数据保存 - json.dump(item_list,f,ensure_ascii=False)
将抓取的数据保存到本地 json文件
# 抓取数据一般处理方式
【1】txt文件
【2】csv文件
【3】json文件
【4】MySQL数据库
【5】MongoDB数据库
【6】Redis数据库
【1】打开浏览器,F12打开控制台,找到Network选项卡
【2】控制台常用选项
2.1) Network: 抓取网络数据包
a> ALL: 抓取所有的网络数据包
b> XHR:抓取异步加载的网络数据包
c> JS : 抓取所有的JS文件
2.2) Sources: 格式化输出并打断点调试JavaScript代码,助于分析爬虫中一些参数
2.3) Console: 交互模式,可对JavaScript中的代码进行测试
【3】抓取具体网络数据包后
3.1) 单击左侧网络数据包地址,进入数据包详情,查看右侧
3.2) 右侧:
a> Headers: 整个请求信息
General、Response Headers、Request Headers、Query String、Form Data
b> Preview: 对响应内容进行预览
c> Response:响应内容
代理ip的匿名程度,代理IP可以分为下面三类:
1.透明代理(Transparent Proxy):透明代理虽然可以直接“隐藏”你的IP地址,但是还是可以查到你是谁。
2.匿名代理(Anonymous Proxy):使用匿名代理,别人只能知道你用了代理,无法知道你是谁。
3.高匿代理(Elite proxy或High Anonymity Proxy):高匿代理让别人根本无法发现你是在用代理,所以是最好的选择。
代理服务请求使用的协议可以分为:
1.http代理:目标url为http协议
2.https代理:目标url为https协议
3.socks隧道代理(例如socks5代理)等:
socks 代理只是简单地传递数据包,不关心是何种应用协议(FTP、HTTP和HTTPS等)。
socks 代理比http、https代理耗时少。
socks 代理可以转发http和https的请求
【1】获取代理IP网站
西刺代理、快代理、全网代理、代理精灵、阿布云、芝麻代理... ...
【2】参数类型
proxies = { '协议':'协议://IP:端口号' }
proxies = {
'http':'http://IP:端口号',
'https':'https://IP:端口号',
}
# 使用免费普通代理IP访问测试网站: http://httpbin.org/get
import requests
url = 'http://httpbin.org/get'
headers = {'User-Agent':'Mozilla/5.0'}
# 定义代理,在代理IP网站中查找免费代理IP
proxies = {
'http':'http://112.85.164.220:9999',
'https':'https://112.85.164.220:9999'
}
html = requests.get(url,proxies=proxies,headers=headers,timeout=5).text
print(html)
【1】语法结构
proxies = { '协议':'协议://用户名:密码@IP:端口号' }
【2】示例
proxies = {
'http':'http://用户名:密码@IP:端口号',
'https':'https://用户名:密码@IP:端口号',
}
import requests
url = 'http://httpbin.org/get'
proxies = {
'http': 'http://309435365:szayclhp@106.75.71.140:16816',
'https':'https://309435365:szayclhp@106.75.71.140:16816',
}
headers = {
'User-Agent' : 'Mozilla/5.0',
}
html = requests.get(url,proxies=proxies,headers=headers,timeout=5).text
print(html)
"""
收费代理:
建立开放代理的代理IP池
思路:
1、获取到开放代理
2、依次对每个代理IP进行测试,能用的保存到文件中
"""
import requests
class ProxyPool:
def __init__(self):
self.url = '代理网站的API链接'
self.headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36'}
# 打开文件,用来存放可用的代理IP
self.f = open('proxy.txt', 'w')
def get_html(self):
html = requests.get(url=self.url, headers=self.headers).text
proxy_list = html.split('\r\n')
for proxy in proxy_list:
# 依次测试每个代理IP是否可用
if self.check_proxy(proxy):
self.f.write(proxy + '\n')
def check_proxy(self, proxy):
"""测试1个代理IP是否可用,可用返回True,否则返回False"""
test_url = 'http://httpbin.org/get'
proxies = {
'http' : 'http://{}'.format(proxy),
'https': 'https://{}'.format(proxy)
}
try:
res = requests.get(url=test_url, proxies=proxies, headers=self.headers, timeout=2)
if res.status_code == 200:
print(proxy,'\033[31m可用\033[0m')
return True
else:
print(proxy,'无效')
return False
except:
print(proxy,'无效')
return False
def run(self):
self.get_html()
# 关闭文件
self.f.close()
if __name__ == '__main__':
spider = ProxyPool()
spider.run()
import json
import re
import time
import requests
import multiprocessing
from job_data_analysis.lagou_spider.handle_insert_data import lagou_mysql
class HandleLaGou(object):
def __init__(self):
#使用session保存cookies信息
self.lagou_session = requests.session()
self.header = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36'
}
self.city_list = ""
#获取全国所有城市列表的方法
def handle_city(self):
city_search = re.compile(r'www\.lagou\.com\/.*\/">(.*?)</a>')
city_url = "https://www.lagou.com/jobs/allCity.html"
city_result = self.handle_request(method="GET",url=city_url)
#使用正则表达式获取城市列表
self.city_list = set(city_search.findall(city_result))
self.lagou_session.cookies.clear()
def handle_city_job(self,city):
first_request_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput="%city
first_response = self.handle_request(method="GET",url=first_request_url)
total_page_search = re.compile(r'class="span\stotalNum">(\d+)</span>')
try:
total_page = total_page_search.search(first_response).group(1)
print(city,total_page)
#由于没有岗位信息造成的exception
except:
return
else:
for i in range(1,int(total_page)+1):
data = {
"pn":i,
"kd":"web"
}
page_url = "https://www.lagou.com/jobs/positionAjax.json?city=%s&needAddtionalResult=false"%city
referer_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput="%city
#referer的URL需要进行encode
self.header['Referer'] = referer_url.encode()
response = self.handle_request(method="POST",url=page_url,data=data,info=city)
lagou_data = json.loads(response)
job_list = lagou_data['content']['positionResult']['result']
for job in job_list:
lagou_mysql.insert_item(job)
def handle_request(self,method,url,data=None,info=None):
while True:
#加入阿布云的动态代理
proxyinfo = "http://%s:%s@%s:%s" % ('账号', '密码', 'http-dyn.abuyun.com', '9020')
proxy = {
"http":proxyinfo,
"https":proxyinfo
}
try:
if method == "GET":
response = self.lagou_session.get(url=url,headers=self.header,proxies=proxy,timeout=6)
elif method == "POST":
response = self.lagou_session.post(url=url,headers=self.header,data=data,proxies=proxy,timeout=6)
except:
# 需要先清除cookies信息
self.lagou_session.cookies.clear()
# 重新获取cookies信息
first_request_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput=" % info
self.handle_request(method="GET", url=first_request_url)
time.sleep(10)
continue
response.encoding = 'utf-8'
if '频繁' in response.text:
print(response.text)
#需要先清除cookies信息
self.lagou_session.cookies.clear()
# 重新获取cookies信息
first_request_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput="%info
self.handle_request(method="GET",url=first_request_url)
time.sleep(10)
continue
return response.text
if __name__ == '__main__':
lagou = HandleLaGou()
#所有城市的方法
lagou.handle_city()
print(lagou.city_list)
#引入多进程加速抓取
pool = multiprocessing.Pool(2)
for city in lagou.city_list:
pool.apply_async(lagou.handle_city_job,args=(city,))
pool.close()
pool.join()
s = ("我爱中国,i love china")
s.decode("gb2312").encode("utf-8")
import sys
sys.getdefaultencoding()
import re
pattern = re.compile('正则表达式',re.S)
r_list = pattern.findall(html)
from lxml import etree
p = etree.HTML(res.text)
r_list = p.xpath('xpath表达式')
【谨记】只要调用了xpath,得到的结果一定为'列表'
【1】结果: 节点对象列表
1.1) xpath示例: //div、//div[@class="student"]、//div/a[@title="stu"]/span
【2】结果: 字符串列表
2.1) xpath表达式中末尾为: @src、@href、/text()
【1】基准xpath表达式: 得到节点对象列表
【2】for r in [节点对象列表]:
username = r.xpath('./xxxxxx')
【注意】遍历后继续xpath一定要以: . 开头,代表当前节点
import requests
from lxml import etree
from fake_useragent import UserAgent
import time
import random
class DoubanBookSpider:
def __init__(self):
self.url = 'https://book.douban.com/top250?start={}'
def get_html(self, url):
"""使用随机的User-Agent"""
headers = {'User-Agent':UserAgent().random}
html = requests.get(url=url, headers=headers).text
self.parse_html(html)
def parse_html(self, html):
"""lxml+xpath进行数据解析"""
parse_obj = etree.HTML(html)
# 1.基准xpath:提取每本书的节点对象列表
table_list = parse_obj.xpath('//div[@class="indent"]/table')
for table in table_list:
item = {}
# 书名
name_list = table.xpath('.//div[@class="pl2"]/a/@title')
item['name'] = name_list[0].strip() if name_list else None
# 描述
content_list = table.xpath('.//p[@class="pl"]/text()')
item['content'] = content_list[0].strip() if content_list else None
# 评分
score_list = table.xpath('.//span[@class="rating_nums"]/text()')
item['score'] = score_list[0].strip() if score_list else None
# 评价人数
nums_list = table.xpath('.//span[@class="pl"]/text()')
item['nums'] = nums_list[0][1:-1].strip() if nums_list else None
# 类别
type_list = table.xpath('.//span[@class="inq"]/text()')
item['type'] = type_list[0].strip() if type_list else None
print(item)
def run(self):
for i in range(5):
start = (i - 1) * 25
page_url = self.url.format(start)
self.get_html(page_url)
time.sleep(random.randint(1,2))
if __name__ == '__main__':
spider = DoubanBookSpider()
spider.run()
"""
sql 表创建
mysql -uroot -p
create database maoyandb charset utf8;
use maoyandb;
create table maoyantab(
name varchar(100),
star varchar(300),
time varchar(100)
)charset=utf8;
"""
"""
pymysql模块使用
"""
import pymysql
# 1.连接数据库
db = pymysql.connect(
'localhost','root','123456','maoyandb',charset='utf8'
)
cur = db.cursor()
# 2.执行sql命令
ins = 'insert into maoyantab values(%s,%s,%s)'
cur.execute(ins, ['肖申克的救赎','主演:张国荣,张曼玉,刘德华','2018-06-25'])
# 3.提交到数据库执行
db.commit()
cur.close()
db.close()
import pymysql
# __init__(self):
self.db = pymysql.connect('IP',... ...)
self.cursor = self.db.cursor()
# save_html(self,r_list):
self.cursor.execute('sql',[data1])
self.cursor.executemany('sql', [(),(),()])
self.db.commit()
# run(self):
self.cursor.close()
self.db.close()
"""
【1】非关系型数据库,数据以键值对方式存储,端口27017
【2】MongoDB基于磁盘存储
【3】MongoDB数据类型单一,值为JSON文档,而Redis基于内存,
3.1> MySQL数据类型:数值类型、字符类型、日期时间类型、枚举类型
3.2> Redis数据类型:字符串、列表、哈希、集合、有序集合
3.3> MongoDB数据类型:值为JSON文档
【4】MongoDB: 库 -> 集合 -> 文档
MySQL : 库 -> 表 -> 表记录
"""
"""
Linux进入: mongo
>show dbs - 查看所有库
>use 库名 - 切换库
>show collections - 查看当前库中所有集合
>db.集合名.find().pretty() - 查看集合中文档
>db.集合名.count() - 统计文档条数
>db.集合名.drop() - 删除集合
>db.dropDatabase() - 删除当前库
"""
import pymongo
# 创建连接对象
connect = pymongo.MongoClient(host='127.0.0.1',port=27017)
# 连接库对象
db = connect['maoyandb']
# 创建集合对象
myset = db['maoyanset']
# 插入单条数据
# myset.insert_one({'name':'钱天一'})
# 插入多条数据
myset.insert_many[{'name':'张三'},{'name':'李四'},{'name':'王五'}]
import requests
import re
import time
import random
import pymongo
class MaoyanSpider:
def __init__(self):
self.url = 'https://maoyan.com/board/4?offset={}'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.113 Safari/537.36'}
# 三个对象:连接对象、库对象、库集合
self.conn = pymongo.MongoClient('127.0.0.1',27017)
self.db = self.conn['maoyandb']
self.myset = self.db['maoyanset']
##提取网页数据
def get_html(self,url):
html = requests.get(url,url,headers=self.headers).text
self.parse_html(html)
##解析网页数据
def parse_html(self,html):
regex = '<div class="movie-item-info">.*?title="(.*?)".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
pattern = re.compile(regex,re.S)
r_list = pattern.findall(html)
self.save_html(r_list)
## 数据处理提取
def save_html(self,r_list):
for r in r_list:
item ={}
item['name'] = r[0].strip()
item['star'] = r[1].strip()
item['time'] = r[2].strip()
print(item)
# 存入到mongodb数据库
self.myset.insert_one(item)
def run(self):
"""程序入口函数"""
for offset in range(0, 91, 10):
url = self.url.format(offset)
self.get_html(url=url)
# 控制数据抓取频率:uniform()生成指定范围内的浮点数
time.sleep(random.uniform(0, 1))
if __name__ == '__main__':
spider = MaoyanSpider()
spider.run()
import json
# Demo1
item = {'name':'QQ','e-mail':99362}
with open('qq.json','a')as f:
json.dump(item,f,ensure_ascii=False)
# Demo2
# import json
# item_list =[]
# for i in range(3):
# item = {'name':'QQ','e-mail':'99362'}
# item_list.append(item)
#
# with open('demo2.json','a')as f:
# json.dump(item_list,f,ensure_ascii=False)
import requests
from fake_useragent import UserAgent
import time
import random
import re
import json
# 豆瓣电影全栈抓取
class DoubanSpider:
def __init__(self):
self.url = 'https://movie.douban.com/j/chart/top_list?'
self.i = 0
# 存入json文件
self.f = open('douban.json', 'w', encoding='utf-8')
self.all_film_list = []
def get_agent(self):
"""获取随机的User-Agent"""
return UserAgent().random
def get_html(self, params):
headers = {'User-Agent':self.get_agent()}
html = requests.get(url=self.url, params=params, headers=headers).text
# 把json格式的字符串转为python数据类型
html = json.loads(html)
self.parse_html(html)
def parse_html(self, html):
"""解析"""
# html: [{},{},{},{}]
item = {}
for one_film in html:
item['rank'] = one_film['rank']
item['title'] = one_film['title']
item['score'] = one_film['score']
print(item)
self.all_film_list.append(item)
self.i += 1
def run(self):
# d: {'剧情':'11','爱情':'13','喜剧':'5',...,...}
d = self.get_d()
# 1、给用户提示,让用户选择
menu = ''
for key in d:
menu += key + '|'
print(menu)
choice = input('请输入电影类别:')
if choice in d:
code = d[choice]
# 2、total: 电影总数
total = self.get_total(code)
for start in range(0,total,20):
params = {
'type': code,
'interval_id': '100:90',
'action': '',
'start': str(start),
'limit': '20'
}
self.get_html(params=params)
time.sleep(random.randint(1,2))
# 把数据存入json文件
json.dump(self.all_film_list, self.f, ensure_ascii=False)
self.f.close()
print('数量:',self.i)
else:
print('请做出正确的选择')
def get_d(self):
"""{'剧情':'11','爱情':'13','喜剧':'5',...,...}"""
url = 'https://movie.douban.com/chart'
html = requests.get(url=url,headers={'User-Agent':self.get_agent()}).text
regex = '<span><a href=".*?type_name=(.*?)&type=(.*?)&interval_id=100:90&action=">'
pattern = re.compile(regex, re.S)
# r_list: [('剧情','11'),('喜剧','5'),('爱情':'13')... ...]
r_list = pattern.findall(html)
# d: {'剧情': '11', '爱情': '13', '喜剧': '5', ..., ...}
d = {}
for r in r_list:
d[r[0]] = r[1]
return d
def get_total(self, code):
"""获取某个类别下的电影总数"""
url = 'https://movie.douban.com/j/chart/top_list_count?type={}&interval_id=100%3A90'.format(code)
html = requests.get(url=url,headers={'User-Agent':self.get_agent()}).text
html = json.loads(html)
return html['total']
if __name__ == '__main__':
spider = DoubanSpider()
spider.run()
import json
data = {
'name': 'pengjunlee',
'age': 32,
'vip': True,
'address': {'province': 'GuangDong', 'city': 'ShenZhen'}
}
# 将 Python 字典类型转换为 JSON 对象
json_str = json.dumps(data)
print(json_str) # 结果 {"name": "pengjunlee", "age": 32, "vip": true, "address": {"province": "GuangDong", "city": "ShenZhen"}}
# 将 JSON 对象类型转换为 Python 字典
user_dic = json.loads(json_str)
print(user_dic['address']) # 结果 {'province': 'GuangDong', 'city': 'ShenZhen'}
# 将 Python 字典直接输出到文件
with open('pengjunlee.json', 'w', encoding='utf-8') as f:
json.dump(user_dic, f, ensure_ascii=False, indent=4)
# 将类文件对象中的JSON字符串直接转换成 Python 字典
with open('pengjunlee.json', 'r', encoding='utf-8') as f:
ret_dic = json.load(f)
print(type(ret_dic)) # 结果 <class 'dict'>
print(ret_dic['name']) # 结果 pengjunlee
import json
all_app_list = [
{'name':'QQ'},
{'name':'王者荣耀'},
{'name':'和平精英'}
]
with open('xiaomi.json', 'w') as f:
json.dump(all_app_list, f, ensure_ascii=False)
import csv
from urllib import request, parse
import re
import time
import random
class MaoyanSpider(object):
def __init__(self):
self.url = 'https://maoyan.com/board/4?offset={}'
# 计数
self.num = 0
def get_html(self, url):
headers = {
'user-agent': 'Mozilla / 5.0(Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko'
}
req = request.Request(url=url, headers=headers)
res = request.urlopen(req)
html = res.read().decode('utf-8')
# 直接调用解析函数
self.parse_html(html)
def parse_html(self, html):
# 创建正则的编译对象
re_ = '<div class="movie-item-info">.*?title="(.*?)".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p> '
pattern = re.compile(re_, re.S)
# film_list:[('霸王别姬','张国荣','1993')]
film_list = pattern.findall(html)
self.write_html(film_list)
# 存入csv文件-writerrows
def write_html(self, film_list):
L = []
with open('maoyanfilm.csv', 'a',newline='') as f:
# 初始化写入对象,注意参数f不能忘
writer = csv.writer(f)
for film in film_list:
t = (
film[0].strip(),
film[1].strip(),
film[2].strip()[5:15]
)
self.num += 1
L.append(t)
# writerow()参数为列表
writer.writerows(L)
print(L)
def main(self):
for offset in range(0, 91, 10):
url = self.url.format(offset)
self.get_html(url)
time.sleep(random.randint(1, 2))
print('共抓取数据', self.num, "部")
if __name__ == '__main__':
start = time.time()
spider = MaoyanSpider()
spider.main()
end = time.time()
print('执行时间:%.2f' % (end - start))
注意: 如果需要换行要自己在写入内容中添加\n
# 以二进制方式写入
f = open("write.txt","wb")
# 以二进制方式追加
# f = open("write.txt","ab")
# 写入
res = f.write(b"Big Bang Bang\n")
print(res)
res = f.write(b"Big Bang Bang\n")
print(res)
#关闭
f.close()
f = open("write.txt","w")
# 写入信息
msg = f.writelines(["你好","我很好","那就好"])
# 关闭
f.close()
【1】原理
利用Redis集合特性,可将抓取过的指纹添加到redis集合中,根据返回值来判定是否需要抓取
返回值为1 : 代表之前未抓取过,需要进行抓取
返回值为0 : 代表已经抓取过,无须再次抓取
hash加密:md5 32位、sha1 40位
创建md5对象
用update将href转为byte类型
最后通过hexdigest获取加密码
【2】代码实现模板
import redis
from hashlib import md5
import sys
class XxxIncrSpider:
def __init__(self):
self.r = redis.Redis(host='localhost',port=6379,db=0)
def url_md5(self,url):
"""对URL进行md5加密函数"""
s = md5()
s.update(url.encode())
return s.hexdigest()
def run_spider(self):
href_list = ['url1','url2','url3','url4']
for href in href_list:
href_md5 = self.url_md5(href)
if self.r.sadd('spider:urls',href_md5) == 1:
返回值为1表示添加成功,即之前未抓取过,则开始抓取
else:
sys.exit()
# 控制数据抓取频率:uniform()生成制定范围内的浮点数
time.sleep(random.uniform(2,5))
# 生成整形的数组
time.sleep(random.randint(1,2))
import datetime
timestape = str(datetime.datetime.now()).split(".")[0]
import os,time,json,random,jieba,requests
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from wordcloud import WordCloud
# 词云形状图片
WC_MASK_IMG = 'wawa.jpg'
# 评论数据保存文件
COMMENT_FILE_PATH = 'jd_comment.txt'
# 词云字体
WC_FONT_PATH = '/Library/Fonts/Songti.ttc'
def spider_comment(page=0):
"""
爬取京东指定页的评价数据
:param page: 爬取第几,默认值为0
"""
url = 'https://sclub.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98vv4646&productId=1263013576&score=0&sortType=5&page=%s&pageSize=10&isShadowSku=0&fold=1' % page
kv = {'user-agent': 'Mozilla/5.0', 'Referer': 'https://item.jd.com/1263013576.html'}
# proxies = {
# '1.85.5.66':'8060',
# '171.11.178.223':'9999',
# '120.194.42.157':'38185',
# '161.35.4.201':'80',
# '175.42.123.196':'9999',
# }
try:
r = requests.get(url, headers=kv)#,proxies=proxies
r.raise_for_status()
except:
print('爬取失败')
# 截取json数据字符串
r_json_str = r.text[26:-2]
# 字符串转json对象
r_json_obj = json.loads(r_json_str)
# 获取评价列表数据
r_json_comments = r_json_obj['comments']
# 遍历评论对象列表
for r_json_comment in r_json_comments:
# 以追加模式换行写入每条评价
with open(COMMENT_FILE_PATH, 'a+') as file:
file.write(r_json_comment['content'] + '\n')
# 打印评论对象中的评论内容
print(r_json_comment['content'])
def batch_spider_comment():
"""
批量爬取某东评价
"""
# 写入数据前先清空之前的数据
if os.path.exists(COMMENT_FILE_PATH):
os.remove(COMMENT_FILE_PATH)
for i in range(3):
spider_comment(i)
# 模拟用户浏览,设置一个爬虫间隔,防止ip被封
time.sleep(random.random() * 5)
def cut_word():
"""
对数据分词
:return: 分词后的数据
"""
with open(COMMENT_FILE_PATH) as file:
comment_txt = file.read()
wordlist = jieba.cut(comment_txt, cut_all=True)
wl = " ".join(wordlist)
print(wl)
return wl
def create_word_cloud():
"""
生成词云
:return:
"""
# 设置词云形状图片
wc_mask = np.array(Image.open(WC_MASK_IMG))
# 设置词云的一些配置,如:字体,背景色,词云形状,大小
wc = WordCloud(background_color="white", max_words=2000, mask=wc_mask, scale=4,
max_font_size=50, random_state=42, font_path=WC_FONT_PATH)
# 生成词云
wc.generate(cut_word())
# 在只设置mask的情况下,你将会得到一个拥有图片形状的词云
plt.imshow(wc, interpolation="bilinear")
plt.axis("off")
plt.figure()
plt.show()
if __name__ == '__main__':
# 爬取数据
batch_spider_comment()
# 生成词云
create_word_cloud()
【1】多进程 :CPU密集程序
【2】多线程 :爬虫(网络I/O)、本地磁盘I/O
# 抓取豆瓣电影剧情类别下的电影信息
"""
豆瓣电影 - 剧情 - 抓取
"""
import requests
from fake_useragent import UserAgent
import time
import random
from threading import Thread,Lock
from queue import Queue
class DoubanSpider:
def __init__(self):
self.url = 'https://movie.douban.com/j/chart/top_list?type=13&interval_id=100%3A90&action=&start={}&limit=20'
self.i = 0
# 队列 + 锁
self.q = Queue()
self.lock = Lock()
def get_agent(self):
"""获取随机的User-Agent"""
return UserAgent().random
def url_in(self):
"""把所有要抓取的URL地址入队列"""
for start in range(0,684,20):
url = self.url.format(start)
# url入队列
self.q.put(url)
# 线程事件函数:请求+解析+数据处理
def get_html(self):
while True:
# 从队列中获取URL地址
# 一定要在判断队列是否为空 和 get() 地址 前后加锁,防止队列中只剩一个地址时出现重复判断
self.lock.acquire()
if not self.q.empty():
headers = {'User-Agent': self.get_agent()}
url = self.q.get()
self.lock.release()
html = requests.get(url=url, headers=headers).json()
self.parse_html(html)
else:
# 如果队列为空,则最终必须释放锁
self.lock.release()
break
def parse_html(self, html):
"""解析"""
# html: [{},{},{},{}]
item = {}
for one_film in html:
item['rank'] = one_film['rank']
item['title'] = one_film['title']
item['score'] = one_film['score']
print(item)
# 加锁 + 释放锁
self.lock.acquire()
self.i += 1
self.lock.release()
def run(self):
# 先让URL地址入队列
self.url_in()
# 创建多个线程,开干吧
t_list = []
for i in range(1):
t = Thread(target=self.get_html)
t_list.append(t)
t.start()
for t in t_list:
t.join()
print('数量:',self.i)
if __name__ == '__main__':
start_time = time.time()
spider = DoubanSpider()
spider.run()
end_time = time.time()
print('执行时间:%.2f' % (end_time-start_time))
【1】定义
1.1) 开源的Web自动化测试工具
【2】用途
2.1) 对Web系统进行功能性测试,版本迭代时避免重复劳动
2.2) 兼容性测试(测试web程序在不同操作系统和不同浏览器中是否运行正常)
2.3) 对web系统进行大数量测试
【3】特点
3.1) 可根据指令操控浏览器
3.2) 只是工具,必须与第三方浏览器结合使用
【4】安装
4.1) Linux: sudo pip3 install selenium
4.2) Windows: python -m pip install selenium
from selenium import webdriver
options = webdriver.ChromeOptions()
# 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
options.add_experimental_option('excludeSwitches', ['enable-automation'])
#停止加载图片
options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
browser = webdriver.Chrome(options=options)
browser.get('https://www.taobao.com/')
selenium能够帮助我们处理页面中的cookie,比如获取、删除,接下来我们就学习这部分知识
driver.get_cookies()返回列表,其中包含的是完整的cookie信息!不光有name、value,还有domain等cookie其他维度的信息。所以如果想要把获取的cookie信息和requests模块配合使用的话,需要转换为name、value作为键值对的cookie字典
# 获取当前标签页的全部cookie信息
print(driver.get_cookies())
# 把cookie转化为字典
cookies_dict = {cookie[‘name’]: cookie[‘value’] for cookie in driver.get_cookies()}
#删除一条cookie
driver.delete_cookie("CookieName")
# 删除所有的cookie
driver.delete_all_cookies()
selenium控制浏览器也是可以使用代理ip的!
from selenium import webdriver
options = webdriver.ChromeOptions() # 创建一个配置对象
options.add_argument('--proxy-server=http://202.20.16.82:9527') # 使用代理ip
driver = webdriver.Chrome(chrome_options=options) # 实例化带有配置的driver对象
driver.get('http://www.itcast.cn')
print(driver.title)
driver.quit()
selenium控制谷歌浏览器时,User-Agent默认是谷歌浏览器的,这一小节我们就来学习使用不同的User-Agent
from selenium import webdriver
options = webdriver.ChromeOptions() # 创建一个配置对象
options.add_argument('--user-agent=Mozilla/5.0 HAHA') # 替换User-Agent
driver = webdriver.Chrome('./chromedriver', chrome_options=options)
driver.get('http://www.itcast.cn')
print(driver.title)
driver.quit()
from selenium import webdriver
chrome_options = webdriver.ChromeOptions()
chrome_options.add_experimental_option('debuggerAddress','127.0.0.1:9222')
browser=webdriver.Chrome(executable_path=r'C:\Users\TR\AppData\Local\Google\Chrome
\Application\chromedriver.exe',chrome_options=chrome_options)
browser.get('http://www.zhihu.com')
【1】定义
phantomjs为无界面浏览器(又称无头浏览器),在内存中进行页面加载,高效
【2】下载地址
2.1) chromedriver : 下载对应版本
http://npm.taobao.org/mirrors/chromedriver/
2.2) geckodriver
https://github.com/mozilla/geckodriver/releases
2.3) phantomjs
https://phantomjs.org/download.html
【3】Ubuntu安装
3.1) 下载后解压 : tar -zxvf geckodriver.tar.gz
3.2) 拷贝解压后文件到 /usr/bin/ (添加环境变量)
sudo cp geckodriver /usr/bin/
3.3) 添加可执行权限
sudo chmod 777 /usr/bin/geckodriver
【4】Windows安装
4.1) 下载对应版本的phantomjs、chromedriver、geckodriver
4.2) 把chromedriver.exe拷贝到python安装目录的Scripts目录下(添加到系统环境变量)
# 查看python安装路径: where python
4.3) 验证
cmd命令行: chromedriver
***************************总结**************************************
【1】解压 - 放到用户主目录(chromedriver、geckodriver、phantomjs)
【2】拷贝 - sudo cp /home/tarena/chromedriver /usr/bin/
【3】权限 - sudo chmod 777 /usr/bin/chromedriver
# 验证
【Ubuntu | Windows】
ipython3
from selenium import webdriver
webdriver.Chrome()
或者
webdriver.Firefox()
【mac】
ipython3
from selenium import webdriver
webdriver.Chrome(executable_path='/Users/xxx/chromedriver')
或者
webdriver.Firefox(executable_path='/User/xxx/geckodriver')
"""示例代码一:使用 selenium+浏览器 打开百度"""
# 导入seleinum的webdriver接口
from selenium import webdriver
import time
# 创建浏览器对象
browser = webdriver.Chrome()
browser.get('http://www.baidu.com/')
# 5秒钟后关闭浏览器
time.sleep(5)
browser.quit()
"""示例代码二:打开百度,搜索赵丽颖,点击搜索,查看"""
from selenium import webdriver
import time
# 1.创建浏览器对象 - 已经打开了浏览器
browser = webdriver.Chrome()
# 2.输入: http://www.baidu.com/
browser.get('http://www.baidu.com/')
# 3.找到搜索框,向这个节点发送文字: 赵丽颖
browser.find_element_by_xpath('//*[@id="kw"]').send_keys('赵丽颖')
# 4.找到 百度一下 按钮,点击一下
browser.find_element_by_xpath('//*[@id="su"]').click()
【1】browser.get(url=url) - 地址栏输入url地址并确认
【2】browser.quit() - 关闭浏览器
【3】browser.close() - 关闭当前页
【4】browser.page_source - HTML结构源码
【5】browser.page_source.find('字符串')
从html源码中搜索指定字符串,没有找到返回:-1,经常用于判断是否为最后一页
【6】browser.maximize_window() - 浏览器窗口最大化
【1】单元素查找('结果为1个节点对象')
1.1) 【最常用】browser.find_element_by_id('id属性值')
1.2) 【最常用】browser.find_element_by_name('name属性值')
1.3) 【最常用】browser.find_element_by_class_name('class属性值')
1.4) 【最万能】browser.find_element_by_xpath('xpath表达式')
1.5) 【匹配a节点时常用】browser.find_element_by_link_text('链接文本')
1.6) 【匹配a节点时常用】browser.find_element_by_partical_link_text('部分链接文本')
1.7) 【最没用】browser.find_element_by_tag_name('标记名称')
1.8) 【较常用】browser.find_element_by_css_selector('css表达式')
【2】多元素查找('结果为[节点对象列表]')
2.1) browser.find_elements_by_id('id属性值')
2.2) browser.find_elements_by_name('name属性值')
2.3) browser.find_elements_by_class_name('class属性值')
2.4) browser.find_elements_by_xpath('xpath表达式')
2.5) browser.find_elements_by_link_text('链接文本')
2.6) browser.find_elements_by_partical_link_text('部分链接文本')
2.7) browser.find_elements_by_tag_name('标记名称')
2.8) browser.find_elements_by_css_selector('css表达式')
from selenium import webdriver
import time
url = 'https://maoyan.com/board/4'
browser = webdriver.Chrome()
browser.get(url)
def get_data():
# 基准xpath: [<selenium xxx li at xxx>,<selenium xxx li at>]
li_list = browser.find_elements_by_xpath('//*[@id="app"]/div/div/div[1]/dl/dd')
for li in li_list:
item = {}
# info_list: ['1', '霸王别姬', '主演:张国荣', '上映时间:1993-01-01', '9.5']
info_list = li.text.split('\n')
item['number'] = info_list[0]
item['name'] = info_list[1]
item['star'] = info_list[2]
item['time'] = info_list[3]
item['score'] = info_list[4]
print(item)
while True:
get_data()
try:
browser.find_element_by_link_text('下一页').click()
time.sleep(2)
except Exception as e:
print('恭喜你!抓取结束')
browser.quit()
break
【1】文本框操作
1.1) node.send_keys('') - 向文本框发送内容
1.2) node.clear() - 清空文本
1.3) node.get_attribute('value') - 获取文本内容
【2】按钮操作
1.1) node.click() - 点击
1.2) node.is_enabled() - 判断按钮是否可用
1.3) node.get_attribute('value') - 获取按钮文本
from selenium import webdriver
options = webdriver.ChromeOptions()
# 添加无界面参数
options.add_argument('--headless')
browser = webdriver.Chrome(options=options)
from selenium import webdriver
# 导入鼠标事件类
from selenium.webdriver import ActionChains
driver = webdriver.Chrome()
driver.get('http://www.baidu.com/')
# 移动到 设置,perform()是真正执行操作,必须有
element = driver.find_element_by_xpath('//*[@id="u1"]/a[8]')
ActionChains(driver).move_to_element(element).perform()
# 单击,弹出的Ajax元素,根据链接节点的文本内容查找
driver.find_element_by_link_text('高级搜索').click()
【1】鼠标操作
from selenium.webdriver import ActionChains
ActionChains(browser).move_to_element(node).perform()
【2】切换句柄
all_handles = browser.window_handles
time.sleep(1)
browser.switch_to.window(all_handles[1])
【3】iframe子框架
browser.switch_to.frame(iframe_element)
# 写法1 - 任何场景都可以:
iframe_node = browser.find_element_by_xpath('')
browser.switch_to.frame(iframe_node)
# 写法2 - 默认支持 id 和 name 两个属性值:
browser.switch_to.frame('id属性值|name属性值')