一, 编写URL管理器 与 数据存储器 URLManager.py
import pickle
import hashlib
import codecs
import time
class UrlManager(object):
""" URL 管理器 """
def __init__(self):
# 未爬取的 URL 集合
self.new_urls = self.load_progress('new_urls.txt')
# 已经爬取的 URL md5 集合
self.old_urls = self.load_progress('old_urls.txt')
def has_new_url(self):
""" 判断是否有未爬取的URL """
return self.new_url_size() != 0
def get_new_url(self):
""" 获取一个未爬取的URL"""
new_url = self.new_urls.pop()
m = hashlib.md5()
m.update(new_url.encode())
# 由于url较少,md5冲突不大,只取中间16为字符串
self.old_urls.add(m.hexdigest()[8:-8])
return new_url
def add_new_url(self,url):
""" 将新的URL添加到未爬取的URL集合中 """
if url is None:
return
m = hashlib.md5()
m.update(url.encode())
url_md5 = m.hexdigest()[8:-8]
if url not in self.new_urls and url_md5 not in self.old_urls:
self.new_urls.add(url)
def add_new_urls(self,urls):
""" 将新的 URL 列表添加到未爬取的 URL 集合中 """
if urls is None or len(urls)==0:
return
for url in urls:
self.add_new_url(url)
def new_url_size(self):
""" 获取未爬取的 URL 集合的大小 """
return len(self.new_urls)
def old_url_size(self):
""" 获取已经爬取的 URL 集合的大小 """
return len(self.old_urls)
def save_progress(self,path,data):
""" 保存进度 """
with open(path,'wb') as f:
pickle.dump(data, f)
def load_progress(self,path):
""" 从本地文件加载进度 """
print('[+] 从本地文件加载进度: %s'%path)
try:
with open(path,'rb') as f:
tmp = pickle.load(f)
return tmp
except:
print('[!] 无进度文件, 创建: %s'%path)
return set()
class DataOutput(object):
""" 数据存储器 """
def __init__(self):
self.filepath = 'baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime()))
self.output_head(self.filepath)
self.datas = []
def store_data(self,data):
if data is None:
return
self.datas.append(data)
if len(self.datas) > 10:
self.output_html(self.filepath)
def output_head(self,path):
""" 将 HTML 头写进去 """
fout = codecs.open(path,'w',encoding='utf-8')
fout.write("<html>")
fout.write("<head><meta charset='utf-8'/></head>")
fout.write("<body>")
fout.write("<table>")
fout.close()
def output_html(self,path):
""" 将数据写入 HTML 文件中 """
fout = codecs.open(path,'a',encoding='utf-8')
for data in self.datas:
fout.write("<tr>")
wurl = "<td><a href='%s'>%s</a></td>"%(data['url'],data['url']) if data.get('url') else "<td></td>"
wtitle = "<td>%s</td>"%data['title'] if data.get('title') else "<td></td>"
wsum = "<td>%s</td>"%data['summary'] if data.get('summary') else "<td></td>"
fout.write(wurl)
fout.write(wtitle)
fout.write(wsum)
fout.write("</tr>")
self.datas.remove(data)
fout.close()
def ouput_end(self,path):
""" 输出 HTML 结束 """
fout = codecs.open(path,'a',encoding='utf-8')
fout.write("</table>")
fout.write("</body>")
fout.write("</html>")
fout.close()
二 编写爬虫节点 SpiderNode.py
import re
import requests
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from multiprocessing.managers import BaseManager
"""
爬虫节点
"""
class HtmlDownloader(object):
""" HTML 下载器 """
def download(self,url):
if url is None:
return None
user_agent = "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0"
headers = {'User-Agent':user_agent}
r = requests.get(url,headers=headers)
if r.status_code==200:
r.encoding = 'utf-8'
return r.text
return None
class HtmlParser(object):
"""HTML 解析器"""
def parser(self,page_url,html_cont):
''' 用于解析网页内容,抽取URL和数据'''
if page_url is None or html_cont is None:
return
soup = BeautifulSoup(html_cont,'html.parser') # ,from_encoding='utf-8'
new_urls = self._get_new_urls(page_url,soup)
new_data = self._get_new_data(page_url,soup)
return new_urls,new_data
def _get_new_urls(self,page_url,soup):
''' 抽取新的URL集合'''
new_urls = set()
# 抽取符合要求的a标记
links = soup.find_all('a') # ,href=re.compile(r'/view/\d+\.htm')
for link in links:
try:
# 提取 href 属性
new_url = link['href']
# 拼接成完整网址
new_full_url = urljoin(page_url,new_url)
new_urls.add(new_full_url)
except Exception as exc:
print(exc)
return new_urls
def _get_new_data(self,page_url,soup):
''' 抽取有效数据 '''
data = {}
data['url'] = page_url
try:
title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
data['title'] = title.get_text()
summary = soup.find('div',class_='lemma-summary')
# 获取 tag 中包含的所有文本内容,包括子孙 tag 中的内容
# 并将结果作为 Unicode 字符串返回
data['summary'] = summary.get_text()
except Exception as exc:
print(exc)
return data
class SpiderWork(object):
""" 爬虫调度器 """
def __init__(self):
# 初始化分布式进程中工作节点的连接工作
# 第一步:使用 BaseManager 注册用于获取 Queue 的方法名称
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
# 第二步:连接到服务器
server_addr = b'192.168.1.105'
print('Connect to sever %s...'%server_addr)
# 注意保持端口和验证口令与服务进程的设置完全一致
self.m = BaseManager(address=(server_addr,8001),authkey=b'isspider')
# 从网络连接
self.m.connect()
# 第三步:获取 Queue 对象
self.task = self.m.get_task_queue()
self.result = self.m.get_result_queue()
# 初始化网页下载器和解析器
self.downloader = HtmlDownloader()
self.parser = HtmlParser()
print('init finish')
def crawl(self):
while True:
try:
if not self.task.empty():
url = self.task.get()
if url == 'end':
print('控制节点通知爬虫节点停止工作...')
# 接着通知其他节点停止工作
self.result.put({'new_urls':'end','data':'end'})
return
print('爬虫节点正在解析:%s'%url.encode('utf-8'))
content = self.downloader.download(url)
new_urls,data = self.parser.parser(url,content)
self.result.put({'new_urls':new_urls,'data':data})
except EOFError as exc:
print('连接工作节点失败!')
return
except Exception as exc:
print(exc)
print('Crawl fali')
if __name__ == '__main__':
spider = SpiderWork()
spider.crawl()
三 编写控制调度器 NoderManager.py
import re
import requests
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from multiprocessing.managers import BaseManager
"""
爬虫节点
"""
class HtmlDownloader(object):
""" HTML 下载器 """
def download(self,url):
if url is None:
return None
user_agent = "Mozilla/5.0 (Windows NT 5.1; rv:52.0) Gecko/20100101 Firefox/52.0"
headers = {'User-Agent':user_agent}
r = requests.get(url,headers=headers)
if r.status_code==200:
r.encoding = 'utf-8'
return r.text
return None
class HtmlParser(object):
"""HTML 解析器"""
def parser(self,page_url,html_cont):
''' 用于解析网页内容,抽取URL和数据'''
if page_url is None or html_cont is None:
return
soup = BeautifulSoup(html_cont,'html.parser') # ,from_encoding='utf-8'
new_urls = self._get_new_urls(page_url,soup)
new_data = self._get_new_data(page_url,soup)
return new_urls,new_data
def _get_new_urls(self,page_url,soup):
''' 抽取新的URL集合'''
new_urls = set()
# 抽取符合要求的a标记
links = soup.find_all('a') # ,href=re.compile(r'/view/\d+\.htm')
for link in links:
try:
# 提取 href 属性
new_url = link['href']
# 拼接成完整网址
new_full_url = urljoin(page_url,new_url)
new_urls.add(new_full_url)
except Exception as exc:
print(exc)
return new_urls
def _get_new_data(self,page_url,soup):
''' 抽取有效数据 '''
data = {}
data['url'] = page_url
try:
title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1')
data['title'] = title.get_text()
summary = soup.find('div',class_='lemma-summary')
# 获取 tag 中包含的所有文本内容,包括子孙 tag 中的内容
# 并将结果作为 Unicode 字符串返回
data['summary'] = summary.get_text()
except Exception as exc:
print(exc)
return data
class SpiderWork(object):
""" 爬虫调度器 """
def __init__(self):
# 初始化分布式进程中工作节点的连接工作
# 第一步:使用 BaseManager 注册用于获取 Queue 的方法名称
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
# 第二步:连接到服务器
server_addr = b'192.168.1.105'
print('Connect to sever %s...'%server_addr)
# 注意保持端口和验证口令与服务进程的设置完全一致
self.m = BaseManager(address=(server_addr,8001),authkey=b'isspider')
# 从网络连接
self.m.connect()
# 第三步:获取 Queue 对象
self.task = self.m.get_task_queue()
self.result = self.m.get_result_queue()
# 初始化网页下载器和解析器
self.downloader = HtmlDownloader()
self.parser = HtmlParser()
print('init finish')
def crawl(self):
while True:
try:
if not self.task.empty():
url = self.task.get()
if url == 'end':
print('控制节点通知爬虫节点停止工作...')
# 接着通知其他节点停止工作
self.result.put({'new_urls':'end','data':'end'})
return
print('爬虫节点正在解析:%s'%url.encode('utf-8'))
content = self.downloader.download(url)
new_urls,data = self.parser.parser(url,content)
self.result.put({'new_urls':new_urls,'data':data})
except EOFError as exc:
print('连接工作节点失败!')
return
except Exception as exc:
print(exc)
print('Crawl fali')
if __name__ == '__main__':
spider = SpiderWork()
spider.crawl()
四 执行
1 运行控制调度器 NodeManager.py
python NodeManager.py
2 运行爬虫节点 (可在多台机器运行) SpiderNode.py
python NodeManager.py
执行后生成的HTML文件结果如下: