采用bs4和xpath两种方法抓取国外图书网站关于书的信息,保存到MongoDB、Redis、MySQL数据库
# 抓取http://www.allitebooks.org/page/1/ 作者、书名、封面图链接
import requests
import json
from lxml import etree
from bs4 import BeautifulSoup
import pymongo
import redis
import pymysql
class AllBooks(object):
def __init__(self):
self.base_url = "http://www.allitebooks.org/page/{}"
self.headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
# 列表嵌套字典,保存最后书籍信息
self.data_list = []
# 1.构建所有url
def get_all_url(self,total_page):
url_list = []
for i in range(1,total_page+1):
url = self.base_url.format(i)
url_list.append(url)
return url_list
# 2.发送请求
def send_request(self,url):
# print("处理地址:%s" % url)
response = requests.get(url,headers=self.headers)
data = response.content.decode('utf-8')
return data.replace("\xa0"," ")
# 3.用xpath解析数据
def parse_xpath_data(self,data):
html = etree.HTML(data)
# 1.取出所有书
all_book_list = html.xpath('//div[@class="main-content-inner clearfix"]/article')
for book in all_book_list:
book_dic = {}
# 书名
book_dic['book_name'] = book.xpath('.//h2[@class="entry-title"]//text()')[0]
# 封面图链接地址
book_dic['image_url'] = book.xpath('.//div/a/img/@src')[0]
# 作者,取出是list,有多作者情况['Adam Karneboge', 'Arek Dreyer']
author = book.xpath('.//h5[@class="entry-author"]/a/text()')
book_dic['author'] = ",".join(author)
# 书的简介
book_dic['book_info'] = book.xpath('.//div[@class="entry-summary"]/p/text()')[0]
self.data_list.append(book_dic)
# 3.用bs4解析数据
def parse_bs4_data(self,data):
soup = BeautifulSoup(data,'lxml')
# 取出所有书
all_book_list = soup.select("article")
for book in all_book_list:
book_dic = {}
# 书名
book_dic["book_name"] = book.select_one(".entry-title").get_text()
# 封面图链接地址
book_dic["image_url"] = book.select_one(".attachment-post-thumbnail").get("src")
# 作者 有多名作者的情况 By: Adam Karneboge, Arek Dreyer
book_dic["author"] = book.select_one(".entry-author").get_text().replace("By: ","")
# 简介
book_dic["book_info"] = book.select_one(".entry-summary p").get_text()
self.data_list.append(book_dic)
# 4.保存数据为json
def save_data_json(self):
print("保存成json文件")
json.dump(self.data_list, open("book_list.json", "w"))
# 4.保存数据为网页
def save_data_html(self,data,page):
with open("book-"+str(page)+".html","w",encoding='utf-8')as f:
f.write(data)
# 4.保存数据到mysql数据库,要先查询有没有这条数据
def save_mysql(self):
try:
# 一、连接mysql数据库,提前创建好数据库和表
# self.db = pymysql.connect(host='localhost',port=3306,user='root',passwd='123456',db='python4')
# 二、未提前创建数据库和表情况
self.db = pymysql.connect(host='localhost',port=3306,user='root',passwd='123456')
# 创建游标对象
self.cursor = self.db.cursor()
# 如果没有则创建python3数据库,如果已经创建好再次执行会有warning警告,所以可以先删除
try:
print("尝试创建数据库:python4")
self.cursor.execute('drop database if exists python4') # 移除数据库,慎用
self.cursor.execute('CREATE database if not EXISTS python4 default charset utf8 COLLATE utf8_general_ci;')
except:
print("此数据库已存在无需创建!")
self.cursor.execute("use python4;")
# 如果没有则创建book表,如果已经创建好再次执行会有warning警告,如果删掉库就没必要再执行删除表操作
try:
# self.cursor.execute('DROP table IF EXISTS books')
sql = """
CREATE TABLE IF not exists books(id int(11) not null auto_increment PRIMARY KEY,
book_name VARCHAR(200) not null,
author VARCHAR(200),
image_url VARCHAR(300),
book_info VARCHAR(800))ENGINE=InnoDB DEFAULT CHARSET=utf8;
"""
print("尝试创建books表")
self.cursor.execute(sql)
except:
print("books表已存在,无需创建!")
# 记录已增加的条数
count = 0
# 记录未增加的条数
no_count = 0
# 插入数据
for book in self.data_list:
# 查重处理
self.cursor.execute(
"""SELECT book_name FROM books WHERE book_name = %s""", book['book_name'])
# 是否有重复数据:元组类型('The Art of Immutable Architecture',)
repetition = self.cursor.fetchone()
if repetition:
no_count += 1
else:
# keys = book.keys()
# values = [book[k] for k in keys]
# 以上类似于下面zip这一行
keys,values = zip(*book.items())
sql = "INSERT INTO {}({})VALUES ({})".format('books',','.join(keys),','.join(['%s']*len(keys)))
self.cursor.execute(sql,values)
# 以上等同于下面
# self.cursor.execute(
# """INSERT INTO books(book_name, author, image_url, book_info)VALUES(%s, %s, %s, %s)""",
# (book['book_name'], book['author'], book['image_url'], book['book_info']))
count += 1
self.db.commit()
except Exception as e:
print("mysql数据库出错:%s"%e)
# 回滚事务
self.db.rollback()
print("数据库处理完毕,本次共计增加%d条数据,未增加%d条已存在数据,谢谢使用!" % (count,no_count))
def search_mysql(self):
sql = """
select book_name from books
"""
self.cursor.execute(sql)
result = self.cursor.fetchall()
for book_name in result:
print(book_name[0])
self.cursor.close()
self.db.close()
# 4.保存数据到redis数据库,要先查询有没有这条数据
def save_redis(self):
try:
# 链接redis数据库
self.redis_client = redis.StrictRedis(host='localhost',port=6379,db=2,decode_responses=True)
# ①set类型存储,会自动去重,无序存储
for book in self.data_list:
self.redis_client.sadd('bookset:items', book)
# 判断集合bookset:items是否存在值book
# val_sismember = self.redis_client.sismember('bookset:items',book)
# if not val_sismember:
# self.redis_client.sadd('bookset:items',book)
# ②list类型存储,不会去重,需要手动处理
key_exists = self.redis_client.exists('books:items')
# 记录插入多少条数据
num = 0
if not key_exists:
print("books数据库无数据,直接插入数据!")
for data in self.data_list:
self.redis_client.rpush('books:items',data)
num += 1
else:
# 清空库
# self.redis_client.delete('books:items')
# 先获取redis数据库里面的书籍名称
redis_all_book = self.redis_client.lrange('books:items', 0, -1)
redis_book_list = []
for redis_book in redis_all_book:
# str转成字典
s = json.loads(redis_book.replace("'", '"'))
redis_book_list.append(s['book_name'])
for book in self.data_list:
book_name = book['book_name']
if book_name not in redis_book_list:
print("%s:保存数据库中..." % book_name)
self.redis_client.rpush('books:items', book)
num += 1
print("处理完成,本次共保存到redis数据库:%s条数据!" % num)
except Exception as e:
print("链接redis出错:",e)
def search_redis(self):
# 查询所有插入的书名数据
# ①set存储类型查询
book_set = self.redis_client.smembers('bookset:items')
print("set存储类型查询书名!")
for redis_book in book_set:
bookset_dic = json.loads(redis_book.replace("'", '"'))
print(bookset_dic['book_name'])
# ②list存储类型查询
book_list = self.redis_client.lrange('books:items',0,-1)
print("list存储类型查询书名!")
for book in book_list:
booklist_dic = json.loads(book.replace("'", '"'))
print(booklist_dic['book_name'])
# 4.保存数据到MongoDB数据库,要先查询有没有这条数据
def save_mongo(self):
try:
# 连接mongodb数据库
self.client = pymongo.MongoClient('localhost',27017)
db_book = self.client['py3'] # 选择数据库
self.book = db_book['book'] # 选择集合,如果没有,插入数据的时候会自动创建
# 先判断数据库里有没有值,如果有值,再去判断即将添加的值是否存在
count = self.book.find().count()
# 记录插入多少条数据
num = 0
# 如果是空的库count=0,则直接添加数据
if count == 0:
print("数据库无数据,直接插入数据!")
self.book.insert(self.data_list)
num = self.book.find().count()
else:
# 清空book数据库
# self.book.remove({})
for data in self.data_list:
book_name = data['book_name']
count = self.book.find({'book_name': book_name}).count()
# 从库里没有查到书名,则添加
if count == 0:
print("%s:保存数据库中..." % book_name)
self.book.insert(data)
num += 1
print("已完成,本次共保存到MongoDB数据库:%s条数据!" % num)
self.client.close()
except Exception as e:
print("MongoDB数据库出错:",e)
# mongodb数据库相关查询
def search_mongo(self):
# 书名的值是一个列表,所以用unwind拆分成字符串
result = self.book.aggregate([
{'$group': {
'_id': '$author',
'书名': {'$push': '$book_name'}},
},
{'$unwind': '$书名'}
])
for dic in result:
# 修改打印结果的key值_id为作者
dic['作者'] = dic.pop('_id')
print(dic)
def start(self):
# 先获取总共多少页
url = self.base_url.format(1)
data_ = self.send_request(url)
soup = BeautifulSoup(data_, 'lxml')
total_page = int(soup.select_one('a[title="Last Page →"]').get_text())
print("总共获取到%s页数据!" % total_page)
# 可以修改这个值来控制多少页
# total_page = 3
url_list = self.get_all_url(total_page)
length = len(url_list)+1
for page in range(1,length):
print("处理页数进度:%s/%s"%(page,total_page))
data = self.send_request(url_list[page-1])
self.parse_bs4_data(data)# 用bs4解析数据(二者选其一)
# self.parse_xpath_data(data) # 用xpath解析数据(二者选其一)
self.save_data_html(data, page) # 保存成网页形式
self.save_data_json() # 统一保存成一个json文件
# self.save_mongo() # 最后再保存到MongoDB数据库
# self.search_mongo() # 数据库相关查询
# self.save_redis() # 最后再保存到redis数据库
# self.search_redis() # redis数据库相关查询
self.save_mysql() # 最后再保存到redis数据库
self.search_mysql() # mysql数据库相关查询
if __name__ == "__main__":
allbooks = AllBooks()
try:
allbooks.start()
except Exception as e:
print("出错了:",e)
图书网址:http://www.allitebooks.org/