今天搞一下,B站UP主前100名的数据信息~
不要多想,不要多问,纯粹为了技术,不为数据~ 说我的都信了!!
接下来,老规矩,上代码,看看怎么实现爬取B站up主的信息:
- # -*- coding:utf-8 -*-
- """
- @ auth : carl_DJ
- @ time : 2020-8-21
- """
-
- import requests
- import os
- import json
- import traceback
-
- #存放数据的根目录
- dir_path = os.path.join('up_100')
-
- def get_http_session(pool_connections=2,pool_maxsize=10,max_retries = 3):
- """
- http的连接池
- :param pool_connection: 连接池数
- :param poll_maxsize: 最大连接池数
- :param max_retries: 最大重试数
- :return:
- """
- session =requests.session()
- #适配器
- adapter = requests.adapters.HTTPAdapter(pool_connections=pool_connections, pool_maxsize=pool_maxsize, max_retries=max_retries)
-
- session.mount('http://',adapter)
- session.mount('https://',adapter)
- return session
-
- def save_file(filepath,content):
- """
- :param filepath: 文件路径
- :param content: 保存的内容
- :return:
- """
- with open(filepath) as f:
- f.write(content)
-
- def log(content,level,filepath):
- """
- :param content: 输入错误信息的内容
- :param level: 错误等级
- :param filepath: 写入错误文件地址
- :return:
- """
- if level == 'error':
- with open(filepath,'a') as f:
- f.write(content)
- elif level == 'fail':
- with open(filepath,'a') as f:
- f.write(content)
-
- def make_dir(name):
- up_dir = os.path.join(dir_path,name)
- if not os.path.exists(up_dir):
- os.makedirs(up_dir)
- return up_dir
-
- def read_json(filepath):
- """
- :param filepath: 读取文件
- :return:
- """
- with open(filepath,'r') as f:
- res = f.read()
- #将读取的文件转换成json格式,
- return json.loads(res)
-
- def get_up_base_info(name,uid):
- url = f"B站UP主的url"
- #设置获取url,超时时间为100
- try:
- r = get_http_session().get(url, timeout=100)
- if r.status_code ==200:
- up_dir = make_dir(name)
- #文件信息
- filepath = os.path.join(up_dir,f'{uid}_base_info.json')
- #获取up主的内容,缩进4位,不进行编码
- content = json.dumps(r.json(),indent=4,ensure_ascii= False)
- save_file(filepath,content)
- print(f'{name}up主信息保存成功!')
- else:
- #将失败的内容写入到log中
- fail_str = f'name:[{name}],uid:[{uid},url:[{url}]'
- log(fail_str,'fail','base_info_fail.log')
- except Exception as e:
- log(traceback.format_exc(),'error','base_info_error.log')
- #保存错误的基本信息
- error_str = f'name:[{name}],uid:[{uid}'
- log(error_str,'error','base_info_error.log')
-
- def base_info_task(power_json):
- ##启动方法,获取json的uid 和name信息
- for d in power_json:
- uid = d['uid']
- name = d['name']
- get_up_base_info(name,uid)
-
- def main():
- #读取pwoer_up
- power_json = read_json('power_up.json')
- base_info_task(power_json)
-
- if __name__ == '__main__':
- main()
-
-
-
-
注意:
1.获取的接口返回信息,可以使用工具Charles,Fiddler等,看哪个顺手,就用哪个;
2.获取的数据,都是以json的格式存放,方便查看;
3.log等级只定义error和fail,其他的未定义,可以自行进行封装;
4.这里我们只需要uid和name信息,需要其他信息,可以自行追加;
5.同样,这里没有做User-Agent的设置,所如果爬取太频繁的话,可能被封IP;
6.如果大批量爬取,可以做User-Agent设置和ip代理池设置;会有专门篇幅做ip代理池。