- 【1】先确定是否为动态加载网站
- 【2】找URL规律
- 【3】正则表达式 | xpath表达式
- 【4】定义程序框架,补全并测试代码
-
- 【1】整体思路
- 1.1> 爬取一级页面,提取 所需数据+链接,继续跟进
- 1.2> 爬取二级页面,提取 所需数据+链接,继续跟进
- 1.3> ... ...
-
- 【2】代码实现思路
- 2.1> 避免重复代码 - 请求、解析需定义函数
-
- 【1】基于User-Agent反爬
- 1.1) 发送请求携带请求头: headers={'User-Agent' : 'Mozilla/5.0 xxxxxx'}
- 1.2) 多个请求时随机切换User-Agent
- a) 定义py文件存放大量User-Agent,导入后使用random.choice()每次随机选择
- b) 使用fake_useragent模块每次访问随机生成User-Agent
- from fake_useragent import UserAgent
- agent = UserAgent().random
- 细节要点:pycharm中下载fake-useragent
-
- 【2】响应内容存在特殊字符
- 解码时使用ignore参数
- html = requests.get(url=url, headers=headers).content.decode('', 'ignore')
-
使用requests获取的resposne对象,具有cookies属性。该属性值是一个cookieJar类型,包含了对方服务器设置在本地的cookie。我们如何将其转换为cookies字典呢?
- 【1】方法一 : requests.get()
- 【2】参数
- 2.1) url
- 2.2) headers
- 2.3) timeout
- 2.4) proxies
-
- 【3】方法二 :requests.post()
- 【4】参数
- data
-
- 【1】url
- 【2】proxies -> {}
- proxies = {
- 'http':'http://1.1.1.1:8888',
- 'https':'https://1.1.1.1:8888'
- }
- 【3】timeout
- 【4】headers
- 【5】cookies
-
- 【1】适用场景 : Post类型请求的网站
-
- 【2】参数 : data={}
- 2.1) Form表单数据: 字典
- 2.2) res = requests.post(url=url,data=data,headers=headers)
-
- 【3】POST请求特点 : Form表单提交数据
-
- data : 字典,Form表单数据
-
- 【1】pycharm进入方法 :Ctrl + r ,选中 Regex
- 【2】处理headers和formdata
- (.*): (.*)
- "$1": "$2",
- 【3】点击 Replace All
-
session实例在请求了一个网站后,对方服务器设置在本地的cookie会保存在session中,下一次再使用session请求对方服务器的时候,会带上前一次的cookie
- session = requests.session() # 实例化session对象
- response = session.get(url, headers, ...)
- response = session.post(url, data, ...)
-
- 【1】右键 -> 查看网页源码中没有具体数据
- 【2】滚动鼠标滑轮或其他动作时加载,或者页面局部刷新
-
- 【1】F12打开控制台,页面动作抓取网络数据包
- 【2】抓取json文件URL地址
- 2.1) 控制台中 XHR :异步加载的数据包
- 2.2) XHR -> QueryStringParameters(查询参数)
-
- 【1】作用 : 把json格式的字符串转为Python数据类型
-
- 【2】示例 : html = json.loads(res.text)
-
- 【1】作用
- 把python数据类型 转为 json格式的字符串,一般让你把抓取的数据保存为json文件时使用
-
- 【2】参数说明
- 2.1) 第1个参数: python类型的数据(字典,列表等)
- 2.2) 第2个参数: 文件对象
- 2.3) 第3个参数: ensure_ascii=False 序列化时编码
-
- 【3】示例代码
- # 示例1
- import json
-
- item = {'name':'QQ','app_id':1}
- with open('小米.json','a') as f:
- json.dump(item,f,ensure_ascii=False)
-
- # 示例2
- import json
-
- item_list = []
- for i in range(3):
- item = {'name':'QQ','id':i}
- item_list.append(item)
-
- with open('xiaomi.json','a') as f:
- json.dump(item_list,f,ensure_ascii=False)
-
jsonpath使用示例
- book_dict = {
- "store": {
- "book": [
- { "category": "reference",
- "author": "Nigel Rees",
- "title": "Sayings of the Century",
- "price": 8.95
- },
- { "category": "fiction",
- "author": "Evelyn Waugh",
- "title": "Sword of Honour",
- "price": 12.99
- },
- { "category": "fiction",
- "author": "Herman Melville",
- "title": "Moby Dick",
- "isbn": "0-553-21311-3",
- "price": 8.99
- },
- { "category": "fiction",
- "author": "J. R. R. Tolkien",
- "title": "The Lord of the Rings",
- "isbn": "0-395-19395-8",
- "price": 22.99
- }
- ],
- "bicycle": {
- "color": "red",
- "price": 19.95
- }
- }
- }
-
- from jsonpath import jsonpath
-
- print(jsonpath(book_dict, '$..author')) # 如果取不到将返回False # 返回列表,如果取不到将返回False
-
- # 爬虫最常用
- 【1】数据抓取 - json.loads(html)
- 将响应内容由: json 转为 python
- 【2】数据保存 - json.dump(item_list,f,ensure_ascii=False)
- 将抓取的数据保存到本地 json文件
-
- # 抓取数据一般处理方式
- 【1】txt文件
- 【2】csv文件
- 【3】json文件
- 【4】MySQL数据库
- 【5】MongoDB数据库
- 【6】Redis数据库
-
- 【1】打开浏览器,F12打开控制台,找到Network选项卡
-
- 【2】控制台常用选项
- 2.1) Network: 抓取网络数据包
- a> ALL: 抓取所有的网络数据包
- b> XHR:抓取异步加载的网络数据包
- c> JS : 抓取所有的JS文件
- 2.2) Sources: 格式化输出并打断点调试JavaScript代码,助于分析爬虫中一些参数
- 2.3) Console: 交互模式,可对JavaScript中的代码进行测试
-
- 【3】抓取具体网络数据包后
- 3.1) 单击左侧网络数据包地址,进入数据包详情,查看右侧
- 3.2) 右侧:
- a> Headers: 整个请求信息
- General、Response Headers、Request Headers、Query String、Form Data
- b> Preview: 对响应内容进行预览
- c> Response:响应内容
-
- 代理ip的匿名程度,代理IP可以分为下面三类:
- 1.透明代理(Transparent Proxy):透明代理虽然可以直接“隐藏”你的IP地址,但是还是可以查到你是谁。
- 2.匿名代理(Anonymous Proxy):使用匿名代理,别人只能知道你用了代理,无法知道你是谁。
- 3.高匿代理(Elite proxy或High Anonymity Proxy):高匿代理让别人根本无法发现你是在用代理,所以是最好的选择。
-
- 代理服务请求使用的协议可以分为:
- 1.http代理:目标url为http协议
- 2.https代理:目标url为https协议
- 3.socks隧道代理(例如socks5代理)等:
- socks 代理只是简单地传递数据包,不关心是何种应用协议(FTP、HTTP和HTTPS等)。
- socks 代理比http、https代理耗时少。
- socks 代理可以转发http和https的请求
-
- 【1】获取代理IP网站
- 西刺代理、快代理、全网代理、代理精灵、阿布云、芝麻代理... ...
-
- 【2】参数类型
- proxies = { '协议':'协议://IP:端口号' }
- proxies = {
- 'http':'http://IP:端口号',
- 'https':'https://IP:端口号',
- }
-
- # 使用免费普通代理IP访问测试网站: http://httpbin.org/get
- import requests
-
- url = 'http://httpbin.org/get'
- headers = {'User-Agent':'Mozilla/5.0'}
- # 定义代理,在代理IP网站中查找免费代理IP
- proxies = {
- 'http':'http://112.85.164.220:9999',
- 'https':'https://112.85.164.220:9999'
- }
- html = requests.get(url,proxies=proxies,headers=headers,timeout=5).text
- print(html)
-
- 【1】语法结构
- proxies = { '协议':'协议://用户名:密码@IP:端口号' }
-
- 【2】示例
- proxies = {
- 'http':'http://用户名:密码@IP:端口号',
- 'https':'https://用户名:密码@IP:端口号',
- }
-
- import requests
- url = 'http://httpbin.org/get'
- proxies = {
- 'http': 'http://309435365:szayclhp@106.75.71.140:16816',
- 'https':'https://309435365:szayclhp@106.75.71.140:16816',
- }
- headers = {
- 'User-Agent' : 'Mozilla/5.0',
- }
-
- html = requests.get(url,proxies=proxies,headers=headers,timeout=5).text
- print(html)
-
- """
- 收费代理:
- 建立开放代理的代理IP池
- 思路:
- 1、获取到开放代理
- 2、依次对每个代理IP进行测试,能用的保存到文件中
- """
- import requests
-
- class ProxyPool:
- def __init__(self):
- self.url = '代理网站的API链接'
- self.headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36'}
- # 打开文件,用来存放可用的代理IP
- self.f = open('proxy.txt', 'w')
-
- def get_html(self):
- html = requests.get(url=self.url, headers=self.headers).text
- proxy_list = html.split('\r\n')
- for proxy in proxy_list:
- # 依次测试每个代理IP是否可用
- if self.check_proxy(proxy):
- self.f.write(proxy + '\n')
-
- def check_proxy(self, proxy):
- """测试1个代理IP是否可用,可用返回True,否则返回False"""
- test_url = 'http://httpbin.org/get'
- proxies = {
- 'http' : 'http://{}'.format(proxy),
- 'https': 'https://{}'.format(proxy)
- }
- try:
- res = requests.get(url=test_url, proxies=proxies, headers=self.headers, timeout=2)
- if res.status_code == 200:
- print(proxy,'\033[31m可用\033[0m')
- return True
- else:
- print(proxy,'无效')
- return False
- except:
- print(proxy,'无效')
- return False
-
- def run(self):
- self.get_html()
- # 关闭文件
- self.f.close()
-
- if __name__ == '__main__':
- spider = ProxyPool()
- spider.run()
-
- import json
- import re
- import time
- import requests
- import multiprocessing
- from job_data_analysis.lagou_spider.handle_insert_data import lagou_mysql
-
-
- class HandleLaGou(object):
- def __init__(self):
- #使用session保存cookies信息
- self.lagou_session = requests.session()
- self.header = {
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36'
- }
- self.city_list = ""
-
- #获取全国所有城市列表的方法
- def handle_city(self):
- city_search = re.compile(r'www\.lagou\.com\/.*\/">(.*?)</a>')
- city_url = "https://www.lagou.com/jobs/allCity.html"
- city_result = self.handle_request(method="GET",url=city_url)
- #使用正则表达式获取城市列表
- self.city_list = set(city_search.findall(city_result))
- self.lagou_session.cookies.clear()
-
- def handle_city_job(self,city):
- first_request_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput="%city
- first_response = self.handle_request(method="GET",url=first_request_url)
- total_page_search = re.compile(r'class="span\stotalNum">(\d+)</span>')
- try:
- total_page = total_page_search.search(first_response).group(1)
- print(city,total_page)
- #由于没有岗位信息造成的exception
- except:
- return
- else:
- for i in range(1,int(total_page)+1):
- data = {
- "pn":i,
- "kd":"web"
- }
- page_url = "https://www.lagou.com/jobs/positionAjax.json?city=%s&needAddtionalResult=false"%city
- referer_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput="%city
- #referer的URL需要进行encode
- self.header['Referer'] = referer_url.encode()
- response = self.handle_request(method="POST",url=page_url,data=data,info=city)
- lagou_data = json.loads(response)
- job_list = lagou_data['content']['positionResult']['result']
- for job in job_list:
- lagou_mysql.insert_item(job)
-
-
-
- def handle_request(self,method,url,data=None,info=None):
- while True:
- #加入阿布云的动态代理
- proxyinfo = "http://%s:%s@%s:%s" % ('账号', '密码', 'http-dyn.abuyun.com', '9020')
- proxy = {
- "http":proxyinfo,
- "https":proxyinfo
- }
- try:
- if method == "GET":
- response = self.lagou_session.get(url=url,headers=self.header,proxies=proxy,timeout=6)
- elif method == "POST":
- response = self.lagou_session.post(url=url,headers=self.header,data=data,proxies=proxy,timeout=6)
- except:
- # 需要先清除cookies信息
- self.lagou_session.cookies.clear()
- # 重新获取cookies信息
- first_request_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput=" % info
- self.handle_request(method="GET", url=first_request_url)
- time.sleep(10)
- continue
- response.encoding = 'utf-8'
- if '频繁' in response.text:
- print(response.text)
- #需要先清除cookies信息
- self.lagou_session.cookies.clear()
- # 重新获取cookies信息
- first_request_url = "https://www.lagou.com/jobs/list_python?city=%s&cl=false&fromSearch=true&labelWords=&suginput="%info
- self.handle_request(method="GET",url=first_request_url)
- time.sleep(10)
- continue
- return response.text
-
- if __name__ == '__main__':
- lagou = HandleLaGou()
- #所有城市的方法
- lagou.handle_city()
- print(lagou.city_list)
- #引入多进程加速抓取
- pool = multiprocessing.Pool(2)
- for city in lagou.city_list:
- pool.apply_async(lagou.handle_city_job,args=(city,))
- pool.close()
- pool.join()
-
- s = ("我爱中国,i love china")
- s.decode("gb2312").encode("utf-8")
-
- import sys
- sys.getdefaultencoding()
-
- import re
- pattern = re.compile('正则表达式',re.S)
- r_list = pattern.findall(html)
-
- from lxml import etree
- p = etree.HTML(res.text)
- r_list = p.xpath('xpath表达式')
-
- 【谨记】只要调用了xpath,得到的结果一定为'列表'
-
- 【1】结果: 节点对象列表
- 1.1) xpath示例: //div、//div[@class="student"]、//div/a[@title="stu"]/span
-
- 【2】结果: 字符串列表
- 2.1) xpath表达式中末尾为: @src、@href、/text()
-
- 【1】基准xpath表达式: 得到节点对象列表
- 【2】for r in [节点对象列表]:
- username = r.xpath('./xxxxxx')
-
- 【注意】遍历后继续xpath一定要以: . 开头,代表当前节点
-
- import requests
- from lxml import etree
- from fake_useragent import UserAgent
- import time
- import random
-
- class DoubanBookSpider:
- def __init__(self):
- self.url = 'https://book.douban.com/top250?start={}'
-
- def get_html(self, url):
- """使用随机的User-Agent"""
- headers = {'User-Agent':UserAgent().random}
- html = requests.get(url=url, headers=headers).text
- self.parse_html(html)
-
- def parse_html(self, html):
- """lxml+xpath进行数据解析"""
- parse_obj = etree.HTML(html)
- # 1.基准xpath:提取每本书的节点对象列表
- table_list = parse_obj.xpath('//div[@class="indent"]/table')
- for table in table_list:
- item = {}
- # 书名
- name_list = table.xpath('.//div[@class="pl2"]/a/@title')
- item['name'] = name_list[0].strip() if name_list else None
- # 描述
- content_list = table.xpath('.//p[@class="pl"]/text()')
- item['content'] = content_list[0].strip() if content_list else None
- # 评分
- score_list = table.xpath('.//span[@class="rating_nums"]/text()')
- item['score'] = score_list[0].strip() if score_list else None
- # 评价人数
- nums_list = table.xpath('.//span[@class="pl"]/text()')
- item['nums'] = nums_list[0][1:-1].strip() if nums_list else None
- # 类别
- type_list = table.xpath('.//span[@class="inq"]/text()')
- item['type'] = type_list[0].strip() if type_list else None
-
- print(item)
-
- def run(self):
- for i in range(5):
- start = (i - 1) * 25
- page_url = self.url.format(start)
- self.get_html(page_url)
- time.sleep(random.randint(1,2))
-
- if __name__ == '__main__':
- spider = DoubanBookSpider()
- spider.run()
-
- """
- sql 表创建
-
- mysql -uroot -p
- create database maoyandb charset utf8;
- use maoyandb;
- create table maoyantab(
- name varchar(100),
- star varchar(300),
- time varchar(100)
- )charset=utf8;
- """
- """
- pymysql模块使用
- """
- import pymysql
-
- # 1.连接数据库
- db = pymysql.connect(
- 'localhost','root','123456','maoyandb',charset='utf8'
- )
- cur = db.cursor()
- # 2.执行sql命令
- ins = 'insert into maoyantab values(%s,%s,%s)'
- cur.execute(ins, ['肖申克的救赎','主演:张国荣,张曼玉,刘德华','2018-06-25'])
- # 3.提交到数据库执行
- db.commit()
-
- cur.close()
- db.close()
-
- import pymysql
-
- # __init__(self):
- self.db = pymysql.connect('IP',... ...)
- self.cursor = self.db.cursor()
-
- # save_html(self,r_list):
- self.cursor.execute('sql',[data1])
- self.cursor.executemany('sql', [(),(),()])
- self.db.commit()
-
- # run(self):
- self.cursor.close()
- self.db.close()
-
- """
- 【1】非关系型数据库,数据以键值对方式存储,端口27017
- 【2】MongoDB基于磁盘存储
- 【3】MongoDB数据类型单一,值为JSON文档,而Redis基于内存,
- 3.1> MySQL数据类型:数值类型、字符类型、日期时间类型、枚举类型
- 3.2> Redis数据类型:字符串、列表、哈希、集合、有序集合
- 3.3> MongoDB数据类型:值为JSON文档
- 【4】MongoDB: 库 -> 集合 -> 文档
- MySQL : 库 -> 表 -> 表记录
- """
-
- """
- Linux进入: mongo
- >show dbs - 查看所有库
- >use 库名 - 切换库
- >show collections - 查看当前库中所有集合
- >db.集合名.find().pretty() - 查看集合中文档
- >db.集合名.count() - 统计文档条数
- >db.集合名.drop() - 删除集合
- >db.dropDatabase() - 删除当前库
- """
- import pymongo
-
- # 创建连接对象
- connect = pymongo.MongoClient(host='127.0.0.1',port=27017)
- # 连接库对象
- db = connect['maoyandb']
- # 创建集合对象
- myset = db['maoyanset']
- # 插入单条数据
- # myset.insert_one({'name':'钱天一'})
- # 插入多条数据
- myset.insert_many[{'name':'张三'},{'name':'李四'},{'name':'王五'}]
-
- import requests
- import re
- import time
- import random
- import pymongo
-
- class MaoyanSpider:
- def __init__(self):
- self.url = 'https://maoyan.com/board/4?offset={}'
- self.headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.113 Safari/537.36'}
- # 三个对象:连接对象、库对象、库集合
- self.conn = pymongo.MongoClient('127.0.0.1',27017)
- self.db = self.conn['maoyandb']
- self.myset = self.db['maoyanset']
-
- ##提取网页数据
- def get_html(self,url):
- html = requests.get(url,url,headers=self.headers).text
- self.parse_html(html)
-
- ##解析网页数据
- def parse_html(self,html):
- regex = '<div class="movie-item-info">.*?title="(.*?)".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
- pattern = re.compile(regex,re.S)
- r_list = pattern.findall(html)
- self.save_html(r_list)
-
- ## 数据处理提取
- def save_html(self,r_list):
- for r in r_list:
- item ={}
- item['name'] = r[0].strip()
- item['star'] = r[1].strip()
- item['time'] = r[2].strip()
- print(item)
- # 存入到mongodb数据库
- self.myset.insert_one(item)
-
- def run(self):
- """程序入口函数"""
- for offset in range(0, 91, 10):
- url = self.url.format(offset)
- self.get_html(url=url)
- # 控制数据抓取频率:uniform()生成指定范围内的浮点数
- time.sleep(random.uniform(0, 1))
-
- if __name__ == '__main__':
- spider = MaoyanSpider()
- spider.run()
-
- import json
- # Demo1
- item = {'name':'QQ','e-mail':99362}
- with open('qq.json','a')as f:
- json.dump(item,f,ensure_ascii=False)
-
-
- # Demo2
- # import json
- # item_list =[]
- # for i in range(3):
- # item = {'name':'QQ','e-mail':'99362'}
- # item_list.append(item)
- #
- # with open('demo2.json','a')as f:
- # json.dump(item_list,f,ensure_ascii=False)
-
- import requests
- from fake_useragent import UserAgent
- import time
- import random
- import re
- import json
-
- # 豆瓣电影全栈抓取
- class DoubanSpider:
- def __init__(self):
- self.url = 'https://movie.douban.com/j/chart/top_list?'
- self.i = 0
- # 存入json文件
- self.f = open('douban.json', 'w', encoding='utf-8')
- self.all_film_list = []
-
- def get_agent(self):
- """获取随机的User-Agent"""
- return UserAgent().random
-
- def get_html(self, params):
- headers = {'User-Agent':self.get_agent()}
- html = requests.get(url=self.url, params=params, headers=headers).text
- # 把json格式的字符串转为python数据类型
- html = json.loads(html)
-
- self.parse_html(html)
-
- def parse_html(self, html):
- """解析"""
- # html: [{},{},{},{}]
- item = {}
- for one_film in html:
- item['rank'] = one_film['rank']
- item['title'] = one_film['title']
- item['score'] = one_film['score']
- print(item)
- self.all_film_list.append(item)
- self.i += 1
-
- def run(self):
- # d: {'剧情':'11','爱情':'13','喜剧':'5',...,...}
- d = self.get_d()
- # 1、给用户提示,让用户选择
- menu = ''
- for key in d:
- menu += key + '|'
- print(menu)
- choice = input('请输入电影类别:')
- if choice in d:
- code = d[choice]
- # 2、total: 电影总数
- total = self.get_total(code)
- for start in range(0,total,20):
- params = {
- 'type': code,
- 'interval_id': '100:90',
- 'action': '',
- 'start': str(start),
- 'limit': '20'
- }
- self.get_html(params=params)
- time.sleep(random.randint(1,2))
-
- # 把数据存入json文件
- json.dump(self.all_film_list, self.f, ensure_ascii=False)
- self.f.close()
- print('数量:',self.i)
- else:
- print('请做出正确的选择')
-
- def get_d(self):
- """{'剧情':'11','爱情':'13','喜剧':'5',...,...}"""
- url = 'https://movie.douban.com/chart'
- html = requests.get(url=url,headers={'User-Agent':self.get_agent()}).text
- regex = '<span><a href=".*?type_name=(.*?)&type=(.*?)&interval_id=100:90&action=">'
- pattern = re.compile(regex, re.S)
- # r_list: [('剧情','11'),('喜剧','5'),('爱情':'13')... ...]
- r_list = pattern.findall(html)
- # d: {'剧情': '11', '爱情': '13', '喜剧': '5', ..., ...}
- d = {}
- for r in r_list:
- d[r[0]] = r[1]
-
- return d
-
- def get_total(self, code):
- """获取某个类别下的电影总数"""
- url = 'https://movie.douban.com/j/chart/top_list_count?type={}&interval_id=100%3A90'.format(code)
- html = requests.get(url=url,headers={'User-Agent':self.get_agent()}).text
- html = json.loads(html)
-
- return html['total']
-
- if __name__ == '__main__':
- spider = DoubanSpider()
- spider.run()
-
- import json
-
- data = {
- 'name': 'pengjunlee',
- 'age': 32,
- 'vip': True,
- 'address': {'province': 'GuangDong', 'city': 'ShenZhen'}
- }
- # 将 Python 字典类型转换为 JSON 对象
- json_str = json.dumps(data)
- print(json_str) # 结果 {"name": "pengjunlee", "age": 32, "vip": true, "address": {"province": "GuangDong", "city": "ShenZhen"}}
-
- # 将 JSON 对象类型转换为 Python 字典
- user_dic = json.loads(json_str)
- print(user_dic['address']) # 结果 {'province': 'GuangDong', 'city': 'ShenZhen'}
-
- # 将 Python 字典直接输出到文件
- with open('pengjunlee.json', 'w', encoding='utf-8') as f:
- json.dump(user_dic, f, ensure_ascii=False, indent=4)
-
- # 将类文件对象中的JSON字符串直接转换成 Python 字典
- with open('pengjunlee.json', 'r', encoding='utf-8') as f:
- ret_dic = json.load(f)
- print(type(ret_dic)) # 结果 <class 'dict'>
- print(ret_dic['name']) # 结果 pengjunlee
-
- import json
-
- all_app_list = [
- {'name':'QQ'},
- {'name':'王者荣耀'},
- {'name':'和平精英'}
- ]
-
- with open('xiaomi.json', 'w') as f:
- json.dump(all_app_list, f, ensure_ascii=False)
-
- import csv
- from urllib import request, parse
- import re
- import time
- import random
-
-
- class MaoyanSpider(object):
- def __init__(self):
- self.url = 'https://maoyan.com/board/4?offset={}'
- # 计数
- self.num = 0
-
-
- def get_html(self, url):
- headers = {
- 'user-agent': 'Mozilla / 5.0(Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko'
- }
- req = request.Request(url=url, headers=headers)
- res = request.urlopen(req)
- html = res.read().decode('utf-8')
- # 直接调用解析函数
- self.parse_html(html)
-
- def parse_html(self, html):
- # 创建正则的编译对象
- re_ = '<div class="movie-item-info">.*?title="(.*?)".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p> '
- pattern = re.compile(re_, re.S)
- # film_list:[('霸王别姬','张国荣','1993')]
- film_list = pattern.findall(html)
- self.write_html(film_list)
-
- # 存入csv文件-writerrows
- def write_html(self, film_list):
- L = []
- with open('maoyanfilm.csv', 'a',newline='') as f:
- # 初始化写入对象,注意参数f不能忘
- writer = csv.writer(f)
- for film in film_list:
- t = (
- film[0].strip(),
- film[1].strip(),
- film[2].strip()[5:15]
- )
- self.num += 1
- L.append(t)
- # writerow()参数为列表
- writer.writerows(L)
- print(L)
-
-
- def main(self):
- for offset in range(0, 91, 10):
- url = self.url.format(offset)
- self.get_html(url)
- time.sleep(random.randint(1, 2))
- print('共抓取数据', self.num, "部")
-
-
- if __name__ == '__main__':
- start = time.time()
- spider = MaoyanSpider()
- spider.main()
- end = time.time()
- print('执行时间:%.2f' % (end - start))
-
注意: 如果需要换行要自己在写入内容中添加\n
- # 以二进制方式写入
- f = open("write.txt","wb")
- # 以二进制方式追加
- # f = open("write.txt","ab")
- # 写入
- res = f.write(b"Big Bang Bang\n")
- print(res)
- res = f.write(b"Big Bang Bang\n")
- print(res)
- #关闭
- f.close()
-
- f = open("write.txt","w")
- # 写入信息
- msg = f.writelines(["你好","我很好","那就好"])
- # 关闭
- f.close()
-
- 【1】原理
- 利用Redis集合特性,可将抓取过的指纹添加到redis集合中,根据返回值来判定是否需要抓取
- 返回值为1 : 代表之前未抓取过,需要进行抓取
- 返回值为0 : 代表已经抓取过,无须再次抓取
-
- hash加密:md5 32位、sha1 40位
-
- 创建md5对象
- 用update将href转为byte类型
- 最后通过hexdigest获取加密码
-
- 【2】代码实现模板
- import redis
- from hashlib import md5
- import sys
-
- class XxxIncrSpider:
- def __init__(self):
- self.r = redis.Redis(host='localhost',port=6379,db=0)
-
- def url_md5(self,url):
- """对URL进行md5加密函数"""
- s = md5()
- s.update(url.encode())
- return s.hexdigest()
-
- def run_spider(self):
- href_list = ['url1','url2','url3','url4']
- for href in href_list:
- href_md5 = self.url_md5(href)
- if self.r.sadd('spider:urls',href_md5) == 1:
- 返回值为1表示添加成功,即之前未抓取过,则开始抓取
- else:
- sys.exit()
-
- # 控制数据抓取频率:uniform()生成制定范围内的浮点数
- time.sleep(random.uniform(2,5))
- # 生成整形的数组
- time.sleep(random.randint(1,2))
-
- import datetime
- timestape = str(datetime.datetime.now()).split(".")[0]
-
- import os,time,json,random,jieba,requests
- import numpy as np
- from PIL import Image
- import matplotlib.pyplot as plt
- from wordcloud import WordCloud
-
- # 词云形状图片
- WC_MASK_IMG = 'wawa.jpg'
- # 评论数据保存文件
- COMMENT_FILE_PATH = 'jd_comment.txt'
- # 词云字体
- WC_FONT_PATH = '/Library/Fonts/Songti.ttc'
-
- def spider_comment(page=0):
- """
- 爬取京东指定页的评价数据
- :param page: 爬取第几,默认值为0
- """
- url = 'https://sclub.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98vv4646&productId=1263013576&score=0&sortType=5&page=%s&pageSize=10&isShadowSku=0&fold=1' % page
- kv = {'user-agent': 'Mozilla/5.0', 'Referer': 'https://item.jd.com/1263013576.html'}
- # proxies = {
- # '1.85.5.66':'8060',
- # '171.11.178.223':'9999',
- # '120.194.42.157':'38185',
- # '161.35.4.201':'80',
- # '175.42.123.196':'9999',
- # }
- try:
- r = requests.get(url, headers=kv)#,proxies=proxies
- r.raise_for_status()
- except:
- print('爬取失败')
- # 截取json数据字符串
- r_json_str = r.text[26:-2]
- # 字符串转json对象
- r_json_obj = json.loads(r_json_str)
- # 获取评价列表数据
- r_json_comments = r_json_obj['comments']
- # 遍历评论对象列表
- for r_json_comment in r_json_comments:
- # 以追加模式换行写入每条评价
- with open(COMMENT_FILE_PATH, 'a+') as file:
- file.write(r_json_comment['content'] + '\n')
- # 打印评论对象中的评论内容
- print(r_json_comment['content'])
-
- def batch_spider_comment():
- """
- 批量爬取某东评价
- """
- # 写入数据前先清空之前的数据
- if os.path.exists(COMMENT_FILE_PATH):
- os.remove(COMMENT_FILE_PATH)
- for i in range(3):
- spider_comment(i)
- # 模拟用户浏览,设置一个爬虫间隔,防止ip被封
- time.sleep(random.random() * 5)
-
- def cut_word():
- """
- 对数据分词
- :return: 分词后的数据
- """
- with open(COMMENT_FILE_PATH) as file:
- comment_txt = file.read()
- wordlist = jieba.cut(comment_txt, cut_all=True)
- wl = " ".join(wordlist)
- print(wl)
- return wl
-
- def create_word_cloud():
- """
- 生成词云
- :return:
- """
- # 设置词云形状图片
- wc_mask = np.array(Image.open(WC_MASK_IMG))
- # 设置词云的一些配置,如:字体,背景色,词云形状,大小
- wc = WordCloud(background_color="white", max_words=2000, mask=wc_mask, scale=4,
- max_font_size=50, random_state=42, font_path=WC_FONT_PATH)
- # 生成词云
- wc.generate(cut_word())
-
- # 在只设置mask的情况下,你将会得到一个拥有图片形状的词云
- plt.imshow(wc, interpolation="bilinear")
- plt.axis("off")
- plt.figure()
- plt.show()
-
- if __name__ == '__main__':
- # 爬取数据
- batch_spider_comment()
-
- # 生成词云
- create_word_cloud()
-
- 【1】多进程 :CPU密集程序
- 【2】多线程 :爬虫(网络I/O)、本地磁盘I/O
-
- # 抓取豆瓣电影剧情类别下的电影信息
- """
- 豆瓣电影 - 剧情 - 抓取
- """
- import requests
- from fake_useragent import UserAgent
- import time
- import random
- from threading import Thread,Lock
- from queue import Queue
-
- class DoubanSpider:
- def __init__(self):
- self.url = 'https://movie.douban.com/j/chart/top_list?type=13&interval_id=100%3A90&action=&start={}&limit=20'
- self.i = 0
- # 队列 + 锁
- self.q = Queue()
- self.lock = Lock()
-
- def get_agent(self):
- """获取随机的User-Agent"""
- return UserAgent().random
-
- def url_in(self):
- """把所有要抓取的URL地址入队列"""
- for start in range(0,684,20):
- url = self.url.format(start)
- # url入队列
- self.q.put(url)
-
- # 线程事件函数:请求+解析+数据处理
- def get_html(self):
- while True:
- # 从队列中获取URL地址
- # 一定要在判断队列是否为空 和 get() 地址 前后加锁,防止队列中只剩一个地址时出现重复判断
- self.lock.acquire()
- if not self.q.empty():
- headers = {'User-Agent': self.get_agent()}
- url = self.q.get()
- self.lock.release()
-
- html = requests.get(url=url, headers=headers).json()
- self.parse_html(html)
- else:
- # 如果队列为空,则最终必须释放锁
- self.lock.release()
- break
-
- def parse_html(self, html):
- """解析"""
- # html: [{},{},{},{}]
- item = {}
- for one_film in html:
- item['rank'] = one_film['rank']
- item['title'] = one_film['title']
- item['score'] = one_film['score']
- print(item)
- # 加锁 + 释放锁
- self.lock.acquire()
- self.i += 1
- self.lock.release()
-
- def run(self):
- # 先让URL地址入队列
- self.url_in()
- # 创建多个线程,开干吧
- t_list = []
- for i in range(1):
- t = Thread(target=self.get_html)
- t_list.append(t)
- t.start()
-
- for t in t_list:
- t.join()
-
- print('数量:',self.i)
-
- if __name__ == '__main__':
- start_time = time.time()
- spider = DoubanSpider()
- spider.run()
- end_time = time.time()
- print('执行时间:%.2f' % (end_time-start_time))
-
- 【1】定义
- 1.1) 开源的Web自动化测试工具
-
- 【2】用途
- 2.1) 对Web系统进行功能性测试,版本迭代时避免重复劳动
- 2.2) 兼容性测试(测试web程序在不同操作系统和不同浏览器中是否运行正常)
- 2.3) 对web系统进行大数量测试
-
- 【3】特点
- 3.1) 可根据指令操控浏览器
- 3.2) 只是工具,必须与第三方浏览器结合使用
-
- 【4】安装
- 4.1) Linux: sudo pip3 install selenium
- 4.2) Windows: python -m pip install selenium
-
- from selenium import webdriver
- options = webdriver.ChromeOptions()
- # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
- options.add_experimental_option('excludeSwitches', ['enable-automation'])
- #停止加载图片
- options.add_experimental_option("prefs", {"profile.managed_default_content_settings.images": 2})
- browser = webdriver.Chrome(options=options)
- browser.get('https://www.taobao.com/')
-
selenium能够帮助我们处理页面中的cookie,比如获取、删除,接下来我们就学习这部分知识
driver.get_cookies()返回列表,其中包含的是完整的cookie信息!不光有name、value,还有domain等cookie其他维度的信息。所以如果想要把获取的cookie信息和requests模块配合使用的话,需要转换为name、value作为键值对的cookie字典
- # 获取当前标签页的全部cookie信息
- print(driver.get_cookies())
- # 把cookie转化为字典
- cookies_dict = {cookie[‘name’]: cookie[‘value’] for cookie in driver.get_cookies()}
-
- #删除一条cookie
- driver.delete_cookie("CookieName")
-
- # 删除所有的cookie
- driver.delete_all_cookies()
-
selenium控制浏览器也是可以使用代理ip的!
- from selenium import webdriver
-
- options = webdriver.ChromeOptions() # 创建一个配置对象
- options.add_argument('--proxy-server=http://202.20.16.82:9527') # 使用代理ip
-
- driver = webdriver.Chrome(chrome_options=options) # 实例化带有配置的driver对象
-
- driver.get('http://www.itcast.cn')
- print(driver.title)
- driver.quit()
-
selenium控制谷歌浏览器时,User-Agent默认是谷歌浏览器的,这一小节我们就来学习使用不同的User-Agent
- from selenium import webdriver
-
- options = webdriver.ChromeOptions() # 创建一个配置对象
- options.add_argument('--user-agent=Mozilla/5.0 HAHA') # 替换User-Agent
-
- driver = webdriver.Chrome('./chromedriver', chrome_options=options)
-
- driver.get('http://www.itcast.cn')
- print(driver.title)
- driver.quit()
-
- from selenium import webdriver
- chrome_options = webdriver.ChromeOptions()
- chrome_options.add_experimental_option('debuggerAddress','127.0.0.1:9222')
- browser=webdriver.Chrome(executable_path=r'C:\Users\TR\AppData\Local\Google\Chrome
- \Application\chromedriver.exe',chrome_options=chrome_options)
- browser.get('http://www.zhihu.com')
-
- 【1】定义
- phantomjs为无界面浏览器(又称无头浏览器),在内存中进行页面加载,高效
-
- 【2】下载地址
- 2.1) chromedriver : 下载对应版本
- http://npm.taobao.org/mirrors/chromedriver/
-
- 2.2) geckodriver
- https://github.com/mozilla/geckodriver/releases
-
- 2.3) phantomjs
- https://phantomjs.org/download.html
-
- 【3】Ubuntu安装
- 3.1) 下载后解压 : tar -zxvf geckodriver.tar.gz
-
- 3.2) 拷贝解压后文件到 /usr/bin/ (添加环境变量)
- sudo cp geckodriver /usr/bin/
-
- 3.3) 添加可执行权限
- sudo chmod 777 /usr/bin/geckodriver
-
- 【4】Windows安装
- 4.1) 下载对应版本的phantomjs、chromedriver、geckodriver
- 4.2) 把chromedriver.exe拷贝到python安装目录的Scripts目录下(添加到系统环境变量)
- # 查看python安装路径: where python
- 4.3) 验证
- cmd命令行: chromedriver
-
- ***************************总结**************************************
- 【1】解压 - 放到用户主目录(chromedriver、geckodriver、phantomjs)
- 【2】拷贝 - sudo cp /home/tarena/chromedriver /usr/bin/
- 【3】权限 - sudo chmod 777 /usr/bin/chromedriver
-
- # 验证
- 【Ubuntu | Windows】
- ipython3
- from selenium import webdriver
- webdriver.Chrome()
- 或者
- webdriver.Firefox()
-
- 【mac】
- ipython3
- from selenium import webdriver
- webdriver.Chrome(executable_path='/Users/xxx/chromedriver')
- 或者
- webdriver.Firefox(executable_path='/User/xxx/geckodriver')
-
- """示例代码一:使用 selenium+浏览器 打开百度"""
-
- # 导入seleinum的webdriver接口
- from selenium import webdriver
- import time
-
- # 创建浏览器对象
- browser = webdriver.Chrome()
- browser.get('http://www.baidu.com/')
- # 5秒钟后关闭浏览器
- time.sleep(5)
- browser.quit()
-
- """示例代码二:打开百度,搜索赵丽颖,点击搜索,查看"""
-
- from selenium import webdriver
- import time
-
- # 1.创建浏览器对象 - 已经打开了浏览器
- browser = webdriver.Chrome()
- # 2.输入: http://www.baidu.com/
- browser.get('http://www.baidu.com/')
- # 3.找到搜索框,向这个节点发送文字: 赵丽颖
- browser.find_element_by_xpath('//*[@id="kw"]').send_keys('赵丽颖')
- # 4.找到 百度一下 按钮,点击一下
- browser.find_element_by_xpath('//*[@id="su"]').click()
-
- 【1】browser.get(url=url) - 地址栏输入url地址并确认
- 【2】browser.quit() - 关闭浏览器
- 【3】browser.close() - 关闭当前页
- 【4】browser.page_source - HTML结构源码
- 【5】browser.page_source.find('字符串')
- 从html源码中搜索指定字符串,没有找到返回:-1,经常用于判断是否为最后一页
- 【6】browser.maximize_window() - 浏览器窗口最大化
-
- 【1】单元素查找('结果为1个节点对象')
- 1.1) 【最常用】browser.find_element_by_id('id属性值')
- 1.2) 【最常用】browser.find_element_by_name('name属性值')
- 1.3) 【最常用】browser.find_element_by_class_name('class属性值')
- 1.4) 【最万能】browser.find_element_by_xpath('xpath表达式')
- 1.5) 【匹配a节点时常用】browser.find_element_by_link_text('链接文本')
- 1.6) 【匹配a节点时常用】browser.find_element_by_partical_link_text('部分链接文本')
- 1.7) 【最没用】browser.find_element_by_tag_name('标记名称')
- 1.8) 【较常用】browser.find_element_by_css_selector('css表达式')
-
- 【2】多元素查找('结果为[节点对象列表]')
- 2.1) browser.find_elements_by_id('id属性值')
- 2.2) browser.find_elements_by_name('name属性值')
- 2.3) browser.find_elements_by_class_name('class属性值')
- 2.4) browser.find_elements_by_xpath('xpath表达式')
- 2.5) browser.find_elements_by_link_text('链接文本')
- 2.6) browser.find_elements_by_partical_link_text('部分链接文本')
- 2.7) browser.find_elements_by_tag_name('标记名称')
- 2.8) browser.find_elements_by_css_selector('css表达式')
-
- from selenium import webdriver
- import time
-
- url = 'https://maoyan.com/board/4'
- browser = webdriver.Chrome()
- browser.get(url)
-
- def get_data():
- # 基准xpath: [<selenium xxx li at xxx>,<selenium xxx li at>]
- li_list = browser.find_elements_by_xpath('//*[@id="app"]/div/div/div[1]/dl/dd')
- for li in li_list:
- item = {}
- # info_list: ['1', '霸王别姬', '主演:张国荣', '上映时间:1993-01-01', '9.5']
- info_list = li.text.split('\n')
- item['number'] = info_list[0]
- item['name'] = info_list[1]
- item['star'] = info_list[2]
- item['time'] = info_list[3]
- item['score'] = info_list[4]
-
- print(item)
-
- while True:
- get_data()
- try:
- browser.find_element_by_link_text('下一页').click()
- time.sleep(2)
- except Exception as e:
- print('恭喜你!抓取结束')
- browser.quit()
- break
-
- 【1】文本框操作
- 1.1) node.send_keys('') - 向文本框发送内容
- 1.2) node.clear() - 清空文本
- 1.3) node.get_attribute('value') - 获取文本内容
-
- 【2】按钮操作
- 1.1) node.click() - 点击
- 1.2) node.is_enabled() - 判断按钮是否可用
- 1.3) node.get_attribute('value') - 获取按钮文本
-
- from selenium import webdriver
-
- options = webdriver.ChromeOptions()
- # 添加无界面参数
- options.add_argument('--headless')
- browser = webdriver.Chrome(options=options)
-
- from selenium import webdriver
- # 导入鼠标事件类
- from selenium.webdriver import ActionChains
-
- driver = webdriver.Chrome()
- driver.get('http://www.baidu.com/')
-
- # 移动到 设置,perform()是真正执行操作,必须有
- element = driver.find_element_by_xpath('//*[@id="u1"]/a[8]')
- ActionChains(driver).move_to_element(element).perform()
-
- # 单击,弹出的Ajax元素,根据链接节点的文本内容查找
- driver.find_element_by_link_text('高级搜索').click()
-
- 【1】鼠标操作
- from selenium.webdriver import ActionChains
- ActionChains(browser).move_to_element(node).perform()
-
- 【2】切换句柄
- all_handles = browser.window_handles
- time.sleep(1)
- browser.switch_to.window(all_handles[1])
-
- 【3】iframe子框架
- browser.switch_to.frame(iframe_element)
- # 写法1 - 任何场景都可以:
- iframe_node = browser.find_element_by_xpath('')
- browser.switch_to.frame(iframe_node)
-
- # 写法2 - 默认支持 id 和 name 两个属性值:
- browser.switch_to.frame('id属性值|name属性值')
-