2025年3月14日 星期五 甲辰(龙)年 月十三 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

采用bs4和xpath两种方法抓取国外图书网站关于书的信息,保存到MongoDB、Redis、MySQL数据库

时间:12-07来源:作者:点击数:18

采用bs4和xpath两种方法抓取国外图书网站关于书的信息,保存到MongoDB、Redis、MySQL数据库

  • # 抓取http://www.allitebooks.org/page/1/ 作者、书名、封面图链接
  • import requests
  • import json
  • from lxml import etree
  • from bs4 import BeautifulSoup
  • import pymongo
  • import redis
  • import pymysql
  • class AllBooks(object):
  • def __init__(self):
  • self.base_url = "http://www.allitebooks.org/page/{}"
  • self.headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
  • # 列表嵌套字典,保存最后书籍信息
  • self.data_list = []
  • # 1.构建所有url
  • def get_all_url(self,total_page):
  • url_list = []
  • for i in range(1,total_page+1):
  • url = self.base_url.format(i)
  • url_list.append(url)
  • return url_list
  • # 2.发送请求
  • def send_request(self,url):
  • # print("处理地址:%s" % url)
  • response = requests.get(url,headers=self.headers)
  • data = response.content.decode('utf-8')
  • return data.replace("\xa0"," ")
  • # 3.用xpath解析数据
  • def parse_xpath_data(self,data):
  • html = etree.HTML(data)
  • # 1.取出所有书
  • all_book_list = html.xpath('//div[@class="main-content-inner clearfix"]/article')
  • for book in all_book_list:
  • book_dic = {}
  • # 书名
  • book_dic['book_name'] = book.xpath('.//h2[@class="entry-title"]//text()')[0]
  • # 封面图链接地址
  • book_dic['image_url'] = book.xpath('.//div/a/img/@src')[0]
  • # 作者,取出是list,有多作者情况['Adam Karneboge', 'Arek Dreyer']
  • author = book.xpath('.//h5[@class="entry-author"]/a/text()')
  • book_dic['author'] = ",".join(author)
  • # 书的简介
  • book_dic['book_info'] = book.xpath('.//div[@class="entry-summary"]/p/text()')[0]
  • self.data_list.append(book_dic)
  • # 3.用bs4解析数据
  • def parse_bs4_data(self,data):
  • soup = BeautifulSoup(data,'lxml')
  • # 取出所有书
  • all_book_list = soup.select("article")
  • for book in all_book_list:
  • book_dic = {}
  • # 书名
  • book_dic["book_name"] = book.select_one(".entry-title").get_text()
  • # 封面图链接地址
  • book_dic["image_url"] = book.select_one(".attachment-post-thumbnail").get("src")
  • # 作者 有多名作者的情况 By: Adam Karneboge, Arek Dreyer
  • book_dic["author"] = book.select_one(".entry-author").get_text().replace("By: ","")
  • # 简介
  • book_dic["book_info"] = book.select_one(".entry-summary p").get_text()
  • self.data_list.append(book_dic)
  • # 4.保存数据为json
  • def save_data_json(self):
  • print("保存成json文件")
  • json.dump(self.data_list, open("book_list.json", "w"))
  • # 4.保存数据为网页
  • def save_data_html(self,data,page):
  • with open("book-"+str(page)+".html","w",encoding='utf-8')as f:
  • f.write(data)
  • # 4.保存数据到mysql数据库,要先查询有没有这条数据
  • def save_mysql(self):
  • try:
  • # 一、连接mysql数据库,提前创建好数据库和表
  • # self.db = pymysql.connect(host='localhost',port=3306,user='root',passwd='123456',db='python4')
  • # 二、未提前创建数据库和表情况
  • self.db = pymysql.connect(host='localhost',port=3306,user='root',passwd='123456')
  • # 创建游标对象
  • self.cursor = self.db.cursor()
  • # 如果没有则创建python3数据库,如果已经创建好再次执行会有warning警告,所以可以先删除
  • try:
  • print("尝试创建数据库:python4")
  • self.cursor.execute('drop database if exists python4') # 移除数据库,慎用
  • self.cursor.execute('CREATE database if not EXISTS python4 default charset utf8 COLLATE utf8_general_ci;')
  • except:
  • print("此数据库已存在无需创建!")
  • self.cursor.execute("use python4;")
  • # 如果没有则创建book表,如果已经创建好再次执行会有warning警告,如果删掉库就没必要再执行删除表操作
  • try:
  • # self.cursor.execute('DROP table IF EXISTS books')
  • sql = """
  • CREATE TABLE IF not exists books(id int(11) not null auto_increment PRIMARY KEY,
  • book_name VARCHAR(200) not null,
  • author VARCHAR(200),
  • image_url VARCHAR(300),
  • book_info VARCHAR(800))ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • """
  • print("尝试创建books表")
  • self.cursor.execute(sql)
  • except:
  • print("books表已存在,无需创建!")
  • # 记录已增加的条数
  • count = 0
  • # 记录未增加的条数
  • no_count = 0
  • # 插入数据
  • for book in self.data_list:
  • # 查重处理
  • self.cursor.execute(
  • """SELECT book_name FROM books WHERE book_name = %s""", book['book_name'])
  • # 是否有重复数据:元组类型('The Art of Immutable Architecture',)
  • repetition = self.cursor.fetchone()
  • if repetition:
  • no_count += 1
  • else:
  • # keys = book.keys()
  • # values = [book[k] for k in keys]
  • # 以上类似于下面zip这一行
  • keys,values = zip(*book.items())
  • sql = "INSERT INTO {}({})VALUES ({})".format('books',','.join(keys),','.join(['%s']*len(keys)))
  • self.cursor.execute(sql,values)
  • # 以上等同于下面
  • # self.cursor.execute(
  • # """INSERT INTO books(book_name, author, image_url, book_info)VALUES(%s, %s, %s, %s)""",
  • # (book['book_name'], book['author'], book['image_url'], book['book_info']))
  • count += 1
  • self.db.commit()
  • except Exception as e:
  • print("mysql数据库出错:%s"%e)
  • # 回滚事务
  • self.db.rollback()
  • print("数据库处理完毕,本次共计增加%d条数据,未增加%d条已存在数据,谢谢使用!" % (count,no_count))
  • def search_mysql(self):
  • sql = """
  • select book_name from books
  • """
  • self.cursor.execute(sql)
  • result = self.cursor.fetchall()
  • for book_name in result:
  • print(book_name[0])
  • self.cursor.close()
  • self.db.close()
  • # 4.保存数据到redis数据库,要先查询有没有这条数据
  • def save_redis(self):
  • try:
  • # 链接redis数据库
  • self.redis_client = redis.StrictRedis(host='localhost',port=6379,db=2,decode_responses=True)
  • # ①set类型存储,会自动去重,无序存储
  • for book in self.data_list:
  • self.redis_client.sadd('bookset:items', book)
  • # 判断集合bookset:items是否存在值book
  • # val_sismember = self.redis_client.sismember('bookset:items',book)
  • # if not val_sismember:
  • # self.redis_client.sadd('bookset:items',book)
  • # ②list类型存储,不会去重,需要手动处理
  • key_exists = self.redis_client.exists('books:items')
  • # 记录插入多少条数据
  • num = 0
  • if not key_exists:
  • print("books数据库无数据,直接插入数据!")
  • for data in self.data_list:
  • self.redis_client.rpush('books:items',data)
  • num += 1
  • else:
  • # 清空库
  • # self.redis_client.delete('books:items')
  • # 先获取redis数据库里面的书籍名称
  • redis_all_book = self.redis_client.lrange('books:items', 0, -1)
  • redis_book_list = []
  • for redis_book in redis_all_book:
  • # str转成字典
  • s = json.loads(redis_book.replace("'", '"'))
  • redis_book_list.append(s['book_name'])
  • for book in self.data_list:
  • book_name = book['book_name']
  • if book_name not in redis_book_list:
  • print("%s:保存数据库中..." % book_name)
  • self.redis_client.rpush('books:items', book)
  • num += 1
  • print("处理完成,本次共保存到redis数据库:%s条数据!" % num)
  • except Exception as e:
  • print("链接redis出错:",e)
  • def search_redis(self):
  • # 查询所有插入的书名数据
  • # ①set存储类型查询
  • book_set = self.redis_client.smembers('bookset:items')
  • print("set存储类型查询书名!")
  • for redis_book in book_set:
  • bookset_dic = json.loads(redis_book.replace("'", '"'))
  • print(bookset_dic['book_name'])
  • # ②list存储类型查询
  • book_list = self.redis_client.lrange('books:items',0,-1)
  • print("list存储类型查询书名!")
  • for book in book_list:
  • booklist_dic = json.loads(book.replace("'", '"'))
  • print(booklist_dic['book_name'])
  • # 4.保存数据到MongoDB数据库,要先查询有没有这条数据
  • def save_mongo(self):
  • try:
  • # 连接mongodb数据库
  • self.client = pymongo.MongoClient('localhost',27017)
  • db_book = self.client['py3'] # 选择数据库
  • self.book = db_book['book'] # 选择集合,如果没有,插入数据的时候会自动创建
  • # 先判断数据库里有没有值,如果有值,再去判断即将添加的值是否存在
  • count = self.book.find().count()
  • # 记录插入多少条数据
  • num = 0
  • # 如果是空的库count=0,则直接添加数据
  • if count == 0:
  • print("数据库无数据,直接插入数据!")
  • self.book.insert(self.data_list)
  • num = self.book.find().count()
  • else:
  • # 清空book数据库
  • # self.book.remove({})
  • for data in self.data_list:
  • book_name = data['book_name']
  • count = self.book.find({'book_name': book_name}).count()
  • # 从库里没有查到书名,则添加
  • if count == 0:
  • print("%s:保存数据库中..." % book_name)
  • self.book.insert(data)
  • num += 1
  • print("已完成,本次共保存到MongoDB数据库:%s条数据!" % num)
  • self.client.close()
  • except Exception as e:
  • print("MongoDB数据库出错:",e)
  • # mongodb数据库相关查询
  • def search_mongo(self):
  • # 书名的值是一个列表,所以用unwind拆分成字符串
  • result = self.book.aggregate([
  • {'$group': {
  • '_id': '$author',
  • '书名': {'$push': '$book_name'}},
  • },
  • {'$unwind': '$书名'}
  • ])
  • for dic in result:
  • # 修改打印结果的key值_id为作者
  • dic['作者'] = dic.pop('_id')
  • print(dic)
  • def start(self):
  • # 先获取总共多少页
  • url = self.base_url.format(1)
  • data_ = self.send_request(url)
  • soup = BeautifulSoup(data_, 'lxml')
  • total_page = int(soup.select_one('a[title="Last Page →"]').get_text())
  • print("总共获取到%s页数据!" % total_page)
  • # 可以修改这个值来控制多少页
  • # total_page = 3
  • url_list = self.get_all_url(total_page)
  • length = len(url_list)+1
  • for page in range(1,length):
  • print("处理页数进度:%s/%s"%(page,total_page))
  • data = self.send_request(url_list[page-1])
  • self.parse_bs4_data(data)# 用bs4解析数据(二者选其一)
  • # self.parse_xpath_data(data) # 用xpath解析数据(二者选其一)
  • self.save_data_html(data, page) # 保存成网页形式
  • self.save_data_json() # 统一保存成一个json文件
  • # self.save_mongo() # 最后再保存到MongoDB数据库
  • # self.search_mongo() # 数据库相关查询
  • # self.save_redis() # 最后再保存到redis数据库
  • # self.search_redis() # redis数据库相关查询
  • self.save_mysql() # 最后再保存到redis数据库
  • self.search_mysql() # mysql数据库相关查询
  • if __name__ == "__main__":
  • allbooks = AllBooks()
  • try:
  • allbooks.start()
  • except Exception as e:
  • print("出错了:",e)

图书网址:http://www.allitebooks.org/

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐