2025年2月14日 星期五 甲辰(龙)年 腊月十四 设为首页 加入收藏
rss
您当前的位置:首页 > 计算机 > 编程开发 > Python

python 小小的分布式爬虫

时间:04-01来源:作者:点击数:33

一, 编写URL管理器 与 数据存储器 URLManager.py

  • import pickle
  • import hashlib
  • import codecs
  • import time
  • class UrlManager(object):
  • """ URL 管理器 """
  • def __init__(self):
  • # 未爬取的 URL 集合
  • self.new_urls = self.load_progress('new_urls.txt')
  • # 已经爬取的 URL md5 集合
  • self.old_urls = self.load_progress('old_urls.txt')
  • def has_new_url(self):
  • """ 判断是否有未爬取的URL """
  • return self.new_url_size() != 0
  • def get_new_url(self):
  • """ 获取一个未爬取的URL"""
  • new_url = self.new_urls.pop()
  • m = hashlib.md5()
  • m.update(new_url.encode())
  • # 由于url较少,md5冲突不大,只取中间16为字符串
  • self.old_urls.add(m.hexdigest()[8:-8])
  • return new_url
  • def add_new_url(self,url):
  • """ 将新的URL添加到未爬取的URL集合中 """
  • if url is None:
  • return
  • m = hashlib.md5()
  • m.update(url.encode())
  • url_md5 = m.hexdigest()[8:-8]
  • if url not in self.new_urls and url_md5 not in self.old_urls:
  • self.new_urls.add(url)
  • def add_new_urls(self,urls):
  • """ 将新的 URL 列表添加到未爬取的 URL 集合中 """
  • if urls is None or len(urls)==0:
  • return
  • for url in urls:
  • self.add_new_url(url)
  • def new_url_size(self):
  • """ 获取未爬取的 URL 集合的大小 """
  • return len(self.new_urls)
  • def old_url_size(self):
  • """ 获取已经爬取的 URL 集合的大小 """
  • return len(self.old_urls)
  • def save_progress(self,path,data):
  • """ 保存进度 """
  • with open(path,'wb') as f:
  • pickle.dump(data, f)
  • def load_progress(self,path):
  • """ 从本地文件加载进度 """
  • print('[+] 从本地文件加载进度: %s'%path)
  • try:
  • with open(path,'rb') as f:
  • tmp = pickle.load(f)
  • return tmp
  • except:
  • print('[!] 无进度文件, 创建: %s'%path)
  • return set()
  • class DataOutput(object):
  • """ 数据存储器 """
  • def __init__(self):
  • self.filepath = 'baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime()))
  • self.output_head(self.filepath)
  • self.datas = []
  • def store_data(self,data):
  • if data is None:
  • return
  • self.datas.append(data)
  • if len(self.datas) > 10:
  • self.output_html(self.filepath)
  • def output_head(self,path):
  • """ 将 HTML 头写进去 """
  • fout = codecs.open(path,'w',encoding='utf-8')
  • fout.write("<html>")
  • fout.write("<head><meta charset='utf-8'/></head>")
  • fout.write("<body>")
  • fout.write("<table>")
  • fout.close()
  • def output_html(self,path):
  • """ 将数据写入 HTML 文件中 """
  • fout = codecs.open(path,'a',encoding='utf-8')
  • for data in self.datas:
  • fout.write("<tr>")
  • wurl = "<td><a href='%s'>%s</a></td>"%(data['url'],data['url']) if data.get('url') else "<td></td>"
  • wtitle = "<td>%s</td>"%data['title'] if data.get('title') else "<td></td>"
  • wsum = "<td>%s</td>"%data['summary'] if data.get('summary') else "<td></td>"
  • fout.write(wurl)
  • fout.write(wtitle)
  • fout.write(wsum)
  • fout.write("</tr>")
  • self.datas.remove(data)
  • fout.close()
  • def ouput_end(self,path):
  • """ 输出 HTML 结束 """
  • fout = codecs.open(path,'a',encoding='utf-8')
  • fout.write("</table>")
  • fout.write("</body>")
  • fout.write("</html>")
  • fout.close()

二 编写爬虫节点 SpiderNode.py

  • import re
  • import requests
  • from urllib.parse import urljoin
  • from bs4 import BeautifulSoup
  • from multiprocessing.managers import BaseManager
  • """
  • 爬虫节点
  • """
  • class HtmlDownloader(object):
  • """ HTML 下载器 """
  • def download(self,url):
  • if url is None:
  • return None
  • user_agent = "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0"
  • headers = {'User-Agent':user_agent}
  • r = requests.get(url,headers=headers)
  • if r.status_code==200:
  • r.encoding = 'utf-8'
  • return r.text
  • return None
  • class HtmlParser(object):
  • """HTML 解析器"""
  • def parser(self,page_url,html_cont):
  • ''' 用于解析网页内容,抽取URL和数据'''
  • if page_url is None or html_cont is None:
  • return
  • soup = BeautifulSoup(html_cont,'html.parser') # ,from_encoding='utf-8'
  • new_urls = self._get_new_urls(page_url,soup)
  • new_data = self._get_new_data(page_url,soup)
  • return new_urls,new_data
  • def _get_new_urls(self,page_url,soup):
  • ''' 抽取新的URL集合'''
  • new_urls = set()
  • # 抽取符合要求的a标记
  • links = soup.find_all('a') # ,href=re.compile(r'/view/\d+\.htm')
  • for link in links:
  • try:
  • # 提取 href 属性
  • new_url = link['href']
  • # 拼接成完整网址
  • new_full_url = urljoin(page_url,new_url)
  • new_urls.add(new_full_url)
  • except Exception as exc:
  • print(exc)
  • return new_urls
  • def _get_new_data(self,page_url,soup):
  • ''' 抽取有效数据 '''
  • data = {}
  • data['url'] = page_url
  • try:
  • title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
  • data['title'] = title.get_text()
  • summary = soup.find('div',class_='lemma-summary')
  • # 获取 tag 中包含的所有文本内容,包括子孙 tag 中的内容
  • # 并将结果作为 Unicode 字符串返回
  • data['summary'] = summary.get_text()
  • except Exception as exc:
  • print(exc)
  • return data
  • class SpiderWork(object):
  • """ 爬虫调度器 """
  • def __init__(self):
  • # 初始化分布式进程中工作节点的连接工作
  • # 第一步:使用 BaseManager 注册用于获取 Queue 的方法名称
  • BaseManager.register('get_task_queue')
  • BaseManager.register('get_result_queue')
  • # 第二步:连接到服务器
  • server_addr = b'192.168.1.105'
  • print('Connect to sever %s...'%server_addr)
  • # 注意保持端口和验证口令与服务进程的设置完全一致
  • self.m = BaseManager(address=(server_addr,8001),authkey=b'isspider')
  • # 从网络连接
  • self.m.connect()
  • # 第三步:获取 Queue 对象
  • self.task = self.m.get_task_queue()
  • self.result = self.m.get_result_queue()
  • # 初始化网页下载器和解析器
  • self.downloader = HtmlDownloader()
  • self.parser = HtmlParser()
  • print('init finish')
  • def crawl(self):
  • while True:
  • try:
  • if not self.task.empty():
  • url = self.task.get()
  • if url == 'end':
  • print('控制节点通知爬虫节点停止工作...')
  • # 接着通知其他节点停止工作
  • self.result.put({'new_urls':'end','data':'end'})
  • return
  • print('爬虫节点正在解析:%s'%url.encode('utf-8'))
  • content = self.downloader.download(url)
  • new_urls,data = self.parser.parser(url,content)
  • self.result.put({'new_urls':new_urls,'data':data})
  • except EOFError as exc:
  • print('连接工作节点失败!')
  • return
  • except Exception as exc:
  • print(exc)
  • print('Crawl fali')
  • if __name__ == '__main__':
  • spider = SpiderWork()
  • spider.crawl()

三 编写控制调度器 NoderManager.py

  • import re
  • import requests
  • from urllib.parse import urljoin
  • from bs4 import BeautifulSoup
  • from multiprocessing.managers import BaseManager
  • """
  • 爬虫节点
  • """
  • class HtmlDownloader(object):
  • """ HTML 下载器 """
  • def download(self,url):
  • if url is None:
  • return None
  • user_agent = "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0"
  • headers = {'User-Agent':user_agent}
  • r = requests.get(url,headers=headers)
  • if r.status_code==200:
  • r.encoding = 'utf-8'
  • return r.text
  • return None
  • class HtmlParser(object):
  • """HTML 解析器"""
  • def parser(self,page_url,html_cont):
  • ''' 用于解析网页内容,抽取URL和数据'''
  • if page_url is None or html_cont is None:
  • return
  • soup = BeautifulSoup(html_cont,'html.parser') # ,from_encoding='utf-8'
  • new_urls = self._get_new_urls(page_url,soup)
  • new_data = self._get_new_data(page_url,soup)
  • return new_urls,new_data
  • def _get_new_urls(self,page_url,soup):
  • ''' 抽取新的URL集合'''
  • new_urls = set()
  • # 抽取符合要求的a标记
  • links = soup.find_all('a') # ,href=re.compile(r'/view/\d+\.htm')
  • for link in links:
  • try:
  • # 提取 href 属性
  • new_url = link['href']
  • # 拼接成完整网址
  • new_full_url = urljoin(page_url,new_url)
  • new_urls.add(new_full_url)
  • except Exception as exc:
  • print(exc)
  • return new_urls
  • def _get_new_data(self,page_url,soup):
  • ''' 抽取有效数据 '''
  • data = {}
  • data['url'] = page_url
  • try:
  • title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
  • data['title'] = title.get_text()
  • summary = soup.find('div',class_='lemma-summary')
  • # 获取 tag 中包含的所有文本内容,包括子孙 tag 中的内容
  • # 并将结果作为 Unicode 字符串返回
  • data['summary'] = summary.get_text()
  • except Exception as exc:
  • print(exc)
  • return data
  • class SpiderWork(object):
  • """ 爬虫调度器 """
  • def __init__(self):
  • # 初始化分布式进程中工作节点的连接工作
  • # 第一步:使用 BaseManager 注册用于获取 Queue 的方法名称
  • BaseManager.register('get_task_queue')
  • BaseManager.register('get_result_queue')
  • # 第二步:连接到服务器
  • server_addr = b'192.168.1.105'
  • print('Connect to sever %s...'%server_addr)
  • # 注意保持端口和验证口令与服务进程的设置完全一致
  • self.m = BaseManager(address=(server_addr,8001),authkey=b'isspider')
  • # 从网络连接
  • self.m.connect()
  • # 第三步:获取 Queue 对象
  • self.task = self.m.get_task_queue()
  • self.result = self.m.get_result_queue()
  • # 初始化网页下载器和解析器
  • self.downloader = HtmlDownloader()
  • self.parser = HtmlParser()
  • print('init finish')
  • def crawl(self):
  • while True:
  • try:
  • if not self.task.empty():
  • url = self.task.get()
  • if url == 'end':
  • print('控制节点通知爬虫节点停止工作...')
  • # 接着通知其他节点停止工作
  • self.result.put({'new_urls':'end','data':'end'})
  • return
  • print('爬虫节点正在解析:%s'%url.encode('utf-8'))
  • content = self.downloader.download(url)
  • new_urls,data = self.parser.parser(url,content)
  • self.result.put({'new_urls':new_urls,'data':data})
  • except EOFError as exc:
  • print('连接工作节点失败!')
  • return
  • except Exception as exc:
  • print(exc)
  • print('Crawl fali')
  • if __name__ == '__main__':
  • spider = SpiderWork()
  • spider.crawl()

四 执行

1 运行控制调度器 NodeManager.py

python NodeManager.py

2 运行爬虫节点 (可在多台机器运行) SpiderNode.py

python NodeManager.py

执行后生成的HTML文件结果如下:


方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门