您当前的位置:首页 > 计算机 > 编程开发 > Python

采用bs4和xpath两种方法抓取国外图书网站关于书的信息,保存到MongoDB、Redis、MySQL数据库

时间:12-07来源:作者:点击数:

采用bs4和xpath两种方法抓取国外图书网站关于书的信息,保存到MongoDB、Redis、MySQL数据库

# 抓取http://www.allitebooks.org/page/1/ 作者、书名、封面图链接
import requests
import json
from lxml import etree
from bs4 import BeautifulSoup
import pymongo
import redis
import pymysql

class AllBooks(object):
    def __init__(self):
        self.base_url = "http://www.allitebooks.org/page/{}"
        self.headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
        # 列表嵌套字典,保存最后书籍信息
        self.data_list = []

    # 1.构建所有url
    def get_all_url(self,total_page):
        url_list = []
        for i in range(1,total_page+1):
            url = self.base_url.format(i)
            url_list.append(url)
        return url_list

    # 2.发送请求
    def send_request(self,url):
        # print("处理地址:%s" % url)
        response = requests.get(url,headers=self.headers)
        data = response.content.decode('utf-8')
        return data.replace("\xa0"," ")

    # 3.用xpath解析数据
    def parse_xpath_data(self,data):
        html = etree.HTML(data)
        # 1.取出所有书
        all_book_list = html.xpath('//div[@class="main-content-inner clearfix"]/article')
        for book in all_book_list:
            book_dic = {}
            # 书名
            book_dic['book_name'] = book.xpath('.//h2[@class="entry-title"]//text()')[0]
            # 封面图链接地址
            book_dic['image_url'] = book.xpath('.//div/a/img/@src')[0]
            # 作者,取出是list,有多作者情况['Adam Karneboge', 'Arek Dreyer']
            author = book.xpath('.//h5[@class="entry-author"]/a/text()')
            book_dic['author'] = ",".join(author)
            # 书的简介
            book_dic['book_info'] = book.xpath('.//div[@class="entry-summary"]/p/text()')[0]
            self.data_list.append(book_dic)

    # 3.用bs4解析数据
    def parse_bs4_data(self,data):
        soup = BeautifulSoup(data,'lxml')
        # 取出所有书
        all_book_list = soup.select("article")
        for book in all_book_list:
            book_dic = {}
            # 书名
            book_dic["book_name"] = book.select_one(".entry-title").get_text()
            # 封面图链接地址
            book_dic["image_url"] = book.select_one(".attachment-post-thumbnail").get("src")
            # 作者 有多名作者的情况 By: Adam Karneboge, Arek Dreyer
            book_dic["author"] = book.select_one(".entry-author").get_text().replace("By: ","")
            # 简介
            book_dic["book_info"] = book.select_one(".entry-summary p").get_text()
            self.data_list.append(book_dic)

    # 4.保存数据为json
    def save_data_json(self):
        print("保存成json文件")
        json.dump(self.data_list, open("book_list.json", "w"))
    # 4.保存数据为网页
    def save_data_html(self,data,page):
        with open("book-"+str(page)+".html","w",encoding='utf-8')as f:
            f.write(data)

    # 4.保存数据到mysql数据库,要先查询有没有这条数据
    def save_mysql(self):
        try:
            # 一、连接mysql数据库,提前创建好数据库和表
            # self.db = pymysql.connect(host='localhost',port=3306,user='root',passwd='123456',db='python4')
            # 二、未提前创建数据库和表情况
            self.db = pymysql.connect(host='localhost',port=3306,user='root',passwd='123456')
            # 创建游标对象
            self.cursor = self.db.cursor()
            # 如果没有则创建python3数据库,如果已经创建好再次执行会有warning警告,所以可以先删除
            try:
                print("尝试创建数据库:python4")
                self.cursor.execute('drop database if exists python4') # 移除数据库,慎用
                self.cursor.execute('CREATE database if not EXISTS python4 default charset utf8 COLLATE utf8_general_ci;')
            except:
                print("此数据库已存在无需创建!")
            self.cursor.execute("use python4;")
            # 如果没有则创建book表,如果已经创建好再次执行会有warning警告,如果删掉库就没必要再执行删除表操作
            try:
                # self.cursor.execute('DROP table IF EXISTS books')
                sql = """
                CREATE TABLE IF not exists books(id int(11) not null auto_increment PRIMARY KEY,
                book_name VARCHAR(200) not null,
                author VARCHAR(200),
                image_url VARCHAR(300),
                book_info VARCHAR(800))ENGINE=InnoDB DEFAULT CHARSET=utf8;
                """
                print("尝试创建books表")
                self.cursor.execute(sql)
            except:
                print("books表已存在,无需创建!")

            # 记录已增加的条数
            count = 0
            # 记录未增加的条数
            no_count = 0
            # 插入数据
            for book in self.data_list:
                # 查重处理
                self.cursor.execute(
                    """SELECT book_name FROM books WHERE book_name = %s""", book['book_name'])
                # 是否有重复数据:元组类型('The Art of Immutable Architecture',)
                repetition = self.cursor.fetchone()
                if repetition:
                    no_count += 1
                else:
                    # keys = book.keys()
                    # values = [book[k] for k in keys]
                    # 以上类似于下面zip这一行
                    keys,values = zip(*book.items())
                    sql = "INSERT INTO {}({})VALUES ({})".format('books',','.join(keys),','.join(['%s']*len(keys)))
                    self.cursor.execute(sql,values)
                    # 以上等同于下面
                    # self.cursor.execute(
                    #     """INSERT INTO books(book_name, author, image_url, book_info)VALUES(%s, %s, %s, %s)""",
                    #     (book['book_name'], book['author'], book['image_url'], book['book_info']))
                    count += 1
            self.db.commit()
        except Exception as e:
            print("mysql数据库出错:%s"%e)
            # 回滚事务
            self.db.rollback()
        print("数据库处理完毕,本次共计增加%d条数据,未增加%d条已存在数据,谢谢使用!" % (count,no_count))
    def search_mysql(self):
        sql = """
        select book_name from books
        """
        self.cursor.execute(sql)
        result = self.cursor.fetchall()
        for book_name in result:
            print(book_name[0])
        self.cursor.close()
        self.db.close()
    # 4.保存数据到redis数据库,要先查询有没有这条数据
    def save_redis(self):
        try:
            # 链接redis数据库
            self.redis_client = redis.StrictRedis(host='localhost',port=6379,db=2,decode_responses=True)
            # ①set类型存储,会自动去重,无序存储
            for book in self.data_list:
                self.redis_client.sadd('bookset:items', book)
                # 判断集合bookset:items是否存在值book
                # val_sismember = self.redis_client.sismember('bookset:items',book)
                # if not val_sismember:
                #     self.redis_client.sadd('bookset:items',book)

            # ②list类型存储,不会去重,需要手动处理
            key_exists = self.redis_client.exists('books:items')
            # 记录插入多少条数据
            num = 0
            if not key_exists:
                print("books数据库无数据,直接插入数据!")
                for data in self.data_list:
                    self.redis_client.rpush('books:items',data)
                    num += 1
            else:
                # 清空库
                # self.redis_client.delete('books:items')
                # 先获取redis数据库里面的书籍名称
                redis_all_book = self.redis_client.lrange('books:items', 0, -1)
                redis_book_list = []
                for redis_book in redis_all_book:
                    # str转成字典
                    s = json.loads(redis_book.replace("'", '"'))
                    redis_book_list.append(s['book_name'])
                for book in self.data_list:
                    book_name = book['book_name']
                    if book_name not in redis_book_list:
                        print("%s:保存数据库中..." % book_name)
                        self.redis_client.rpush('books:items', book)
                        num += 1
            print("处理完成,本次共保存到redis数据库:%s条数据!" % num)
        except Exception as e:
            print("链接redis出错:",e)

    def search_redis(self):
        # 查询所有插入的书名数据
        # ①set存储类型查询
        book_set = self.redis_client.smembers('bookset:items')
        print("set存储类型查询书名!")
        for redis_book in book_set:
            bookset_dic = json.loads(redis_book.replace("'", '"'))
            print(bookset_dic['book_name'])
        # ②list存储类型查询
        book_list = self.redis_client.lrange('books:items',0,-1)
        print("list存储类型查询书名!")
        for book in book_list:
            booklist_dic = json.loads(book.replace("'", '"'))
            print(booklist_dic['book_name'])

    # 4.保存数据到MongoDB数据库,要先查询有没有这条数据
    def save_mongo(self):
        try:
            # 连接mongodb数据库
            self.client = pymongo.MongoClient('localhost',27017)
            db_book = self.client['py3'] # 选择数据库
            self.book = db_book['book'] # 选择集合,如果没有,插入数据的时候会自动创建
            # 先判断数据库里有没有值,如果有值,再去判断即将添加的值是否存在
            count = self.book.find().count()
            # 记录插入多少条数据
            num = 0
            # 如果是空的库count=0,则直接添加数据
            if count == 0:
                print("数据库无数据,直接插入数据!")
                self.book.insert(self.data_list)
                num = self.book.find().count()
            else:
                # 清空book数据库
                # self.book.remove({})
                for data in self.data_list:
                    book_name = data['book_name']
                    count = self.book.find({'book_name': book_name}).count()
                    # 从库里没有查到书名,则添加
                    if count == 0:
                        print("%s:保存数据库中..." % book_name)
                        self.book.insert(data)
                        num += 1
            print("已完成,本次共保存到MongoDB数据库:%s条数据!" % num)
            self.client.close()
        except Exception as e:
            print("MongoDB数据库出错:",e)

    # mongodb数据库相关查询
    def search_mongo(self):
        # 书名的值是一个列表,所以用unwind拆分成字符串
        result = self.book.aggregate([
            {'$group': {
                '_id': '$author',
                '书名': {'$push': '$book_name'}},
            },
            {'$unwind': '$书名'}
        ])
        for dic in result:
            # 修改打印结果的key值_id为作者
            dic['作者'] = dic.pop('_id')
            print(dic)

    def start(self):
        # 先获取总共多少页
        url = self.base_url.format(1)
        data_ = self.send_request(url)
        soup = BeautifulSoup(data_, 'lxml')
        total_page = int(soup.select_one('a[title="Last Page →"]').get_text())
        print("总共获取到%s页数据!" % total_page)
        # 可以修改这个值来控制多少页
        # total_page = 3
        url_list = self.get_all_url(total_page)
        length = len(url_list)+1
        for page in range(1,length):
            print("处理页数进度:%s/%s"%(page,total_page))
            data = self.send_request(url_list[page-1])
            self.parse_bs4_data(data)# 用bs4解析数据(二者选其一)
            # self.parse_xpath_data(data) # 用xpath解析数据(二者选其一)
            self.save_data_html(data, page)  # 保存成网页形式
        self.save_data_json()  # 统一保存成一个json文件
        # self.save_mongo() # 最后再保存到MongoDB数据库
        # self.search_mongo() # 数据库相关查询
        # self.save_redis() # 最后再保存到redis数据库
        # self.search_redis() # redis数据库相关查询
        self.save_mysql() # 最后再保存到redis数据库
        self.search_mysql() # mysql数据库相关查询

if __name__ == "__main__":
    allbooks = AllBooks()
    try:
        allbooks.start()
    except Exception as e:
        print("出错了:",e)

图书网址:http://www.allitebooks.org/

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门
本栏推荐