一, 编写URL管理器 与 数据存储器 URLManager.py
- import pickle
- import hashlib
- import codecs
- import time
-
- class UrlManager(object):
- """ URL 管理器 """
- def __init__(self):
- # 未爬取的 URL 集合
- self.new_urls = self.load_progress('new_urls.txt')
- # 已经爬取的 URL md5 集合
- self.old_urls = self.load_progress('old_urls.txt')
-
- def has_new_url(self):
- """ 判断是否有未爬取的URL """
- return self.new_url_size() != 0
-
- def get_new_url(self):
- """ 获取一个未爬取的URL"""
- new_url = self.new_urls.pop()
- m = hashlib.md5()
- m.update(new_url.encode())
- # 由于url较少,md5冲突不大,只取中间16为字符串
- self.old_urls.add(m.hexdigest()[8:-8])
- return new_url
-
- def add_new_url(self,url):
- """ 将新的URL添加到未爬取的URL集合中 """
- if url is None:
- return
- m = hashlib.md5()
- m.update(url.encode())
- url_md5 = m.hexdigest()[8:-8]
- if url not in self.new_urls and url_md5 not in self.old_urls:
- self.new_urls.add(url)
-
- def add_new_urls(self,urls):
- """ 将新的 URL 列表添加到未爬取的 URL 集合中 """
- if urls is None or len(urls)==0:
- return
- for url in urls:
- self.add_new_url(url)
-
- def new_url_size(self):
- """ 获取未爬取的 URL 集合的大小 """
- return len(self.new_urls)
-
- def old_url_size(self):
- """ 获取已经爬取的 URL 集合的大小 """
- return len(self.old_urls)
-
- def save_progress(self,path,data):
- """ 保存进度 """
- with open(path,'wb') as f:
- pickle.dump(data, f)
-
- def load_progress(self,path):
- """ 从本地文件加载进度 """
- print('[+] 从本地文件加载进度: %s'%path)
- try:
- with open(path,'rb') as f:
- tmp = pickle.load(f)
- return tmp
- except:
- print('[!] 无进度文件, 创建: %s'%path)
- return set()
-
-
- class DataOutput(object):
- """ 数据存储器 """
- def __init__(self):
- self.filepath = 'baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime()))
- self.output_head(self.filepath)
- self.datas = []
-
- def store_data(self,data):
- if data is None:
- return
- self.datas.append(data)
- if len(self.datas) > 10:
- self.output_html(self.filepath)
-
- def output_head(self,path):
- """ 将 HTML 头写进去 """
- fout = codecs.open(path,'w',encoding='utf-8')
- fout.write("<html>")
- fout.write("<head><meta charset='utf-8'/></head>")
- fout.write("<body>")
- fout.write("<table>")
- fout.close()
-
- def output_html(self,path):
- """ 将数据写入 HTML 文件中 """
- fout = codecs.open(path,'a',encoding='utf-8')
- for data in self.datas:
- fout.write("<tr>")
- wurl = "<td><a href='%s'>%s</a></td>"%(data['url'],data['url']) if data.get('url') else "<td></td>"
- wtitle = "<td>%s</td>"%data['title'] if data.get('title') else "<td></td>"
- wsum = "<td>%s</td>"%data['summary'] if data.get('summary') else "<td></td>"
- fout.write(wurl)
- fout.write(wtitle)
- fout.write(wsum)
- fout.write("</tr>")
- self.datas.remove(data)
- fout.close()
-
- def ouput_end(self,path):
- """ 输出 HTML 结束 """
- fout = codecs.open(path,'a',encoding='utf-8')
- fout.write("</table>")
- fout.write("</body>")
- fout.write("</html>")
- fout.close()
-
二 编写爬虫节点 SpiderNode.py
- import re
- import requests
- from urllib.parse import urljoin
- from bs4 import BeautifulSoup
- from multiprocessing.managers import BaseManager
-
- """
- 爬虫节点
- """
-
- class HtmlDownloader(object):
- """ HTML 下载器 """
-
- def download(self,url):
- if url is None:
- return None
- user_agent = "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0"
- headers = {'User-Agent':user_agent}
- r = requests.get(url,headers=headers)
- if r.status_code==200:
- r.encoding = 'utf-8'
- return r.text
- return None
-
-
-
-
- class HtmlParser(object):
- """HTML 解析器"""
-
- def parser(self,page_url,html_cont):
- ''' 用于解析网页内容,抽取URL和数据'''
- if page_url is None or html_cont is None:
- return
- soup = BeautifulSoup(html_cont,'html.parser') # ,from_encoding='utf-8'
- new_urls = self._get_new_urls(page_url,soup)
- new_data = self._get_new_data(page_url,soup)
- return new_urls,new_data
-
- def _get_new_urls(self,page_url,soup):
- ''' 抽取新的URL集合'''
- new_urls = set()
- # 抽取符合要求的a标记
- links = soup.find_all('a') # ,href=re.compile(r'/view/\d+\.htm')
- for link in links:
- try:
- # 提取 href 属性
- new_url = link['href']
- # 拼接成完整网址
- new_full_url = urljoin(page_url,new_url)
- new_urls.add(new_full_url)
- except Exception as exc:
- print(exc)
- return new_urls
-
- def _get_new_data(self,page_url,soup):
- ''' 抽取有效数据 '''
- data = {}
- data['url'] = page_url
- try:
- title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
- data['title'] = title.get_text()
- summary = soup.find('div',class_='lemma-summary')
- # 获取 tag 中包含的所有文本内容,包括子孙 tag 中的内容
- # 并将结果作为 Unicode 字符串返回
- data['summary'] = summary.get_text()
- except Exception as exc:
- print(exc)
- return data
-
-
-
- class SpiderWork(object):
- """ 爬虫调度器 """
-
- def __init__(self):
- # 初始化分布式进程中工作节点的连接工作
- # 第一步:使用 BaseManager 注册用于获取 Queue 的方法名称
- BaseManager.register('get_task_queue')
- BaseManager.register('get_result_queue')
- # 第二步:连接到服务器
- server_addr = b'192.168.1.105'
- print('Connect to sever %s...'%server_addr)
- # 注意保持端口和验证口令与服务进程的设置完全一致
- self.m = BaseManager(address=(server_addr,8001),authkey=b'isspider')
- # 从网络连接
- self.m.connect()
- # 第三步:获取 Queue 对象
- self.task = self.m.get_task_queue()
- self.result = self.m.get_result_queue()
- # 初始化网页下载器和解析器
- self.downloader = HtmlDownloader()
- self.parser = HtmlParser()
- print('init finish')
-
- def crawl(self):
- while True:
- try:
- if not self.task.empty():
- url = self.task.get()
- if url == 'end':
- print('控制节点通知爬虫节点停止工作...')
- # 接着通知其他节点停止工作
- self.result.put({'new_urls':'end','data':'end'})
- return
- print('爬虫节点正在解析:%s'%url.encode('utf-8'))
- content = self.downloader.download(url)
- new_urls,data = self.parser.parser(url,content)
- self.result.put({'new_urls':new_urls,'data':data})
- except EOFError as exc:
- print('连接工作节点失败!')
- return
- except Exception as exc:
- print(exc)
- print('Crawl fali')
-
- if __name__ == '__main__':
- spider = SpiderWork()
- spider.crawl()
三 编写控制调度器 NoderManager.py
- import re
- import requests
- from urllib.parse import urljoin
- from bs4 import BeautifulSoup
- from multiprocessing.managers import BaseManager
-
- """
- 爬虫节点
- """
-
- class HtmlDownloader(object):
- """ HTML 下载器 """
-
- def download(self,url):
- if url is None:
- return None
- user_agent = "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0"
- headers = {'User-Agent':user_agent}
- r = requests.get(url,headers=headers)
- if r.status_code==200:
- r.encoding = 'utf-8'
- return r.text
- return None
-
-
-
-
- class HtmlParser(object):
- """HTML 解析器"""
-
- def parser(self,page_url,html_cont):
- ''' 用于解析网页内容,抽取URL和数据'''
- if page_url is None or html_cont is None:
- return
- soup = BeautifulSoup(html_cont,'html.parser') # ,from_encoding='utf-8'
- new_urls = self._get_new_urls(page_url,soup)
- new_data = self._get_new_data(page_url,soup)
- return new_urls,new_data
-
- def _get_new_urls(self,page_url,soup):
- ''' 抽取新的URL集合'''
- new_urls = set()
- # 抽取符合要求的a标记
- links = soup.find_all('a') # ,href=re.compile(r'/view/\d+\.htm')
- for link in links:
- try:
- # 提取 href 属性
- new_url = link['href']
- # 拼接成完整网址
- new_full_url = urljoin(page_url,new_url)
- new_urls.add(new_full_url)
- except Exception as exc:
- print(exc)
- return new_urls
-
- def _get_new_data(self,page_url,soup):
- ''' 抽取有效数据 '''
- data = {}
- data['url'] = page_url
- try:
- title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
- data['title'] = title.get_text()
- summary = soup.find('div',class_='lemma-summary')
- # 获取 tag 中包含的所有文本内容,包括子孙 tag 中的内容
- # 并将结果作为 Unicode 字符串返回
- data['summary'] = summary.get_text()
- except Exception as exc:
- print(exc)
- return data
-
-
-
- class SpiderWork(object):
- """ 爬虫调度器 """
-
- def __init__(self):
- # 初始化分布式进程中工作节点的连接工作
- # 第一步:使用 BaseManager 注册用于获取 Queue 的方法名称
- BaseManager.register('get_task_queue')
- BaseManager.register('get_result_queue')
- # 第二步:连接到服务器
- server_addr = b'192.168.1.105'
- print('Connect to sever %s...'%server_addr)
- # 注意保持端口和验证口令与服务进程的设置完全一致
- self.m = BaseManager(address=(server_addr,8001),authkey=b'isspider')
- # 从网络连接
- self.m.connect()
- # 第三步:获取 Queue 对象
- self.task = self.m.get_task_queue()
- self.result = self.m.get_result_queue()
- # 初始化网页下载器和解析器
- self.downloader = HtmlDownloader()
- self.parser = HtmlParser()
- print('init finish')
-
- def crawl(self):
- while True:
- try:
- if not self.task.empty():
- url = self.task.get()
- if url == 'end':
- print('控制节点通知爬虫节点停止工作...')
- # 接着通知其他节点停止工作
- self.result.put({'new_urls':'end','data':'end'})
- return
- print('爬虫节点正在解析:%s'%url.encode('utf-8'))
- content = self.downloader.download(url)
- new_urls,data = self.parser.parser(url,content)
- self.result.put({'new_urls':new_urls,'data':data})
- except EOFError as exc:
- print('连接工作节点失败!')
- return
- except Exception as exc:
- print(exc)
- print('Crawl fali')
-
- if __name__ == '__main__':
- spider = SpiderWork()
- spider.crawl()
-
-
-
-
四 执行
1 运行控制调度器 NodeManager.py
python NodeManager.py
2 运行爬虫节点 (可在多台机器运行) SpiderNode.py
python NodeManager.py
执行后生成的HTML文件结果如下: