应用场景
1、多进程 :CPU密集程序
2、多线程 :爬虫(网络I/O)、本地磁盘I/O
队列
# 导入模块
from queue import Queue
# 使用
q = Queue()
q.put(url)
q.get() # 当队列为空时,阻塞
q.empty() # 判断队列是否为空,True/False
线程模块
# 导入模块
from threading import Thread
# 使用流程
t = Thread(target=函数名) # 创建线程对象
t.start() # 创建并启动线程
t.join() # 阻塞等待回收线程
目标
实现步骤
1、确认是否为动态加载
1、页面局部刷新
2、右键查看网页源代码,搜索关键字未搜到,因此此网站为动态加载网站,需要抓取网络数据包分析
2、F12抓取网络数据包
1、抓取返回json数据的URL地址(Headers中的Request URL)
http://app.mi.com/categotyAllListApi?page={}&categoryId=2&pageSize=30
2、查看并分析查询参数(headers中的Query String Parameters)只有page在变,0 1 2 3 ... ... ,这样我们就可以通过控制page的值拼接多个返回json数据的URL地址
page: 1
categoryId: 2
pageSize: 30
3、将抓取数据保存到csv文件
注意多线程写入的线程锁问题
from threading import Lock
lock = Lock()
lock.acquire()
lock.release()
整体思路
import requests
from threading import Thread
from queue import Queue
import time
from lxml import etree
import csv
from threading import Lock
from fake_useragent import UserAgent
class XiaomiSpider(object):
def __init__(self):
self.url = 'http://app.mi.com/categotyAllListApi?page={}&categoryId={}&pageSize=30'
self.q = Queue() # 存放所有URL地址的队列
self.i = 0
self.id_list = [] # 存放所有类型id的空列表
# 打开文件
self.f = open('xiaomi.csv', 'a', newline="")
self.writer = csv.writer(self.f)
self.lock = Lock() # 创建锁
self.ua = UserAgent()
def get_cateid(self):
# 请求
url = 'http://app.mi.com/'
headers = {'User-Agent': self.ua.random}
html = requests.get(url=url, headers=headers).text
# 解析
parse_html = etree.HTML(html)
li_list = parse_html.xpath('//ul[@class="category-list"]/li')
for li in li_list:
typ_name = li.xpath('./a/text()')[0]
typ_id = li.xpath('./a/@href')[0].split('/')[-1]
pages = self.get_pages(typ_id) # 计算每个类型的页数
self.id_list.append((typ_id, pages))
self.url_in() # 入队列
# 获取counts的值并计算页数
def get_pages(self, typ_id):
# 每页返回的json数据中,都有count这个key
url = self.url.format(0, typ_id)
html = requests.get(url=url, headers={'User-Agent': self.ua.random}).json()
count = html['count'] # 类别中的数据总数
pages = int(count) // 30 + 1 # 每页30个,看有多少页
return pages
# url入队列
def url_in(self):
for id in self.id_list:
# id为元组,(typ_id, pages)-->('2',pages)
for page in range(2):
url = self.url.format(page, id[0])
print(url)
# 把URL地址入队列
self.q.put(url)
# 线程事件函数: get() - 请求 - 解析 - 处理数据
def get_data(self):
while True:
# 当队列不为空时,获取url地址
if not self.q.empty():
url = self.q.get()
headers = {'User-Agent': self.ua.random}
html = requests.get(url=url, headers=headers).json()
self.parse_html(html)
else:
break
# 解析函数
def parse_html(self, html):
# 存放1页的数据 - 写入到csv文件
app_list = []
for app in html['data']:
# 应用名称 + 链接 + 分类
name = app['displayName']
link = 'http://app.mi.com/details?id=' + app['packageName']
typ_name = app['level1CategoryName']
# 把每一条数据放到app_list中,目的为了 writerows()
app_list.append([name, typ_name, link])
print(name, typ_name)
self.i += 1
# 开始写入1页数据 - app_list
self.lock.acquire()
self.writer.writerows(app_list)
self.lock.release()
# 主函数
def main(self):
self.get_cateid() # URL入队列
t_list = []
# 创建多个线程
for i in range(1):
t = Thread(target=self.get_data)
t_list.append(t)
t.start()
# 统一回收线程
for t in t_list:
t.join()
# 关闭文件
self.f.close()
print('数量:', self.i)
if __name__ == '__main__':
start = time.time()
spider = XiaomiSpider()
spider.main()
end = time.time()
print('执行时间:%.2f' % (end - start))
确定URL地址及目标
要求与分析
一级页面json地址(pageIndex在变,timestamp未检查)
https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn
二级页面地址(postId在变,在一级页面中可拿到)
https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn
useragents.py文件
ua_list = [
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1',
'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:6.0) Gecko/20100101 Firefox/6.0',
'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; InfoPath.3)',
]
我们先来回忆一下原来的腾讯招聘爬虫代码
import time
import json
import random
import requests
from useragents import ua_list
class TencentSpider(object):
def __init__(self):
self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn'
self.two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn'
self.f = open('tencent.json', 'a') # 打开文件
self.item_list = [] # 存放抓取的item字典数据
# 获取响应内容函数
def get_page(self, url):
headers = {'User-Agent': random.choice(ua_list)}
html = requests.get(url=url, headers=headers).text
html = json.loads(html) # json格式字符串转为Python数据类型
return html
# 主线函数: 获取所有数据
def parse_page(self, one_url):
html = self.get_page(one_url)
item = {}
for job in html['Data']['Posts']:
item['name'] = job['RecruitPostName'] # 名称
post_id = job['PostId'] # postId,拿postid为了拼接二级页面地址
# 拼接二级地址,获取职责和要求
two_url = self.two_url.format(post_id)
item['duty'], item['require'] = self.parse_two_page(two_url)
print(item)
self.item_list.append(item) # 添加到大列表中
# 解析二级页面函数
def parse_two_page(self, two_url):
html = self.get_page(two_url)
duty = html['Data']['Responsibility'] # 工作责任
duty = duty.replace('\r\n', '').replace('\n', '') # 去掉换行
require = html['Data']['Requirement'] # 工作要求
require = require.replace('\r\n', '').replace('\n', '') # 去掉换行
return duty, require
# 获取总页数
def get_numbers(self):
url = self.one_url.format(1)
html = self.get_page(url)
numbers = int(html['Data']['Count']) // 10 + 1 # 每页有10个推荐
return numbers
def main(self):
number = self.get_numbers()
for page in range(1, 3):
one_url = self.one_url.format(page)
self.parse_page(one_url)
# 保存到本地json文件:json.dump
json.dump(self.item_list, self.f, ensure_ascii=False)
self.f.close()
if __name__ == '__main__':
start = time.time()
spider = TencentSpider()
spider.main()
end = time.time()
print('执行时间:%.2f' % (end - start))
多线程即把所有一级页面链接提交到队列,进行多线程数据抓取
代码实现
import requests
import json
import time
import random
from useragents import ua_list
from threading import Thread
from queue import Queue
class TencentSpider(object):
def __init__(self):
self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn'
self.two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn'
self.q = Queue()
self.i = 0 # 计数
# 获取响应内容函数
def get_page(self, url):
headers = {'User-Agent': random.choice(ua_list)}
html = requests.get(url=url, headers=headers).text
# json.loads()把json格式的字符串转为python数据类型
html = json.loads(html)
return html
# 主线函数: 获取所有数据
def parse_page(self):
while True:
if not self.q.empty():
one_url = self.q.get()
html = self.get_page(one_url)
item = {}
for job in html['Data']['Posts']:
item['name'] = job['RecruitPostName'] # 名称
post_id = job['PostId'] # 拿postid为了拼接二级页面地址
# 拼接二级地址,获取职责和要求
two_url = self.two_url.format(post_id)
item['duty'], item['require'] = self.parse_two_page(two_url)
print(item)
# 每爬取按完成1页随机休眠
time.sleep(random.uniform(0, 1))
else:
break
# 解析二级页面函数
def parse_two_page(self, two_url):
html = self.get_page(two_url)
# 用replace处理一下特殊字符
duty = html['Data']['Responsibility']
duty = duty.replace('\r\n', '').replace('\n', '')
# 处理要求
require = html['Data']['Requirement']
require = require.replace('\r\n', '').replace('\n', '')
return duty, require
# 获取总页数
def get_numbers(self):
url = self.one_url.format(1)
html = self.get_page(url)
numbers = int(html['Data']['Count']) // 10 + 1
return numbers
def main(self):
# one_url入队列
number = self.get_numbers()
for page in range(1, number + 1):
one_url = self.one_url.format(page)
self.q.put(one_url)
t_list = []
for i in range(5):
t = Thread(target=self.parse_page)
t_list.append(t)
t.start()
for t in t_list:
t.join()
print('数量:', self.i)
if __name__ == '__main__':
start = time.time()
spider = TencentSpider()
spider.main()
end = time.time()
print('执行时间:%.2f' % (end - start))
import requests
import json
import time
import random
from useragents import ua_list
from multiprocessing import Process
from queue import Queue
class TencentSpider(object):
def __init__(self):
self.one_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1563912271089&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn'
self.two_url = 'https://careers.tencent.com/tencentcareer/api/post/ByPostId?timestamp=1563912374645&postId={}&language=zh-cn'
self.q = Queue()
# 获取响应内容函数
def get_page(self, url):
headers = {'User-Agent': random.choice(ua_list)}
html = requests.get(url=url, headers=headers).text
# json格式字符串 -> Python
html = json.loads(html)
return html
# 主线函数: 获取所有数据
def parse_page(self):
while True:
if not self.q.empty():
one_url = self.q.get()
html = self.get_page(one_url)
item = {}
for job in html['Data']['Posts']:
# 名称
item['name'] = job['RecruitPostName']
# postId
post_id = job['PostId']
# 拼接二级地址,获取职责和要求
two_url = self.two_url.format(post_id)
item['duty'], item['require'] = self.parse_two_page(two_url)
print(item)
else:
break
# 解析二级页面函数
def parse_two_page(self, two_url):
html = self.get_page(two_url)
# 用replace处理一下特殊字符
duty = html['Data']['Responsibility']
duty = duty.replace('\r\n', '').replace('\n', '')
# 处理要求
require = html['Data']['Requirement']
require = require.replace('\r\n', '').replace('\n', '')
return duty, require
# 获取总页数
def get_numbers(self):
url = self.one_url.format(1)
html = self.get_page(url)
numbers = int(html['Data']['Count']) // 10 + 1
return numbers
def main(self):
# url入队列
number = self.get_numbers()
for page in range(1, number + 1):
one_url = self.one_url.format(page)
self.q.put(one_url)
t_list = []
for i in range(4):
t = Process(target=self.parse_page)
t_list.append(t)
t.start()
for t in t_list:
t.join()
if __name__ == '__main__':
start = time.time()
spider = TencentSpider()
spider.main()
end = time.time()
print('执行时间:%.2f' % (end - start))