一、创建服务端,taskManager.py,代码如下:
import random
import time
import queue
from multiprocessing.managers import BaseManager
# 第一步:建立task_queue 和 result_queue,用来存放任务结果
task_queue = queue.Queue()
result_queue = queue.Queue()
def get_task():
return task_queue
def get_result():
return result_queue
class Queuemanager(BaseManager):
pass
def main(ip,port,kl):
# 第二步:把创建的两个队列注册在网络上,利用 register 方法,
# callable参数关联了 Queue 对象,将Queue对象在网络中暴露
Queuemanager.register('get_task_queue',callable=get_task)
Queuemanager.register('get_result_queue',callable=get_result)
# 第三步:初始化对象:绑定IP、端口、设置验证口令 。
manager = Queuemanager(address=(ip,port),authkey=kl)
# 第四步:启动管理,监听信息通道
manager.start()
# 第五步:通过管理实例的方法获得通过网络访问的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 第六步:添加任务
for url in ["ImageUrl_"+str(i) for i in range(10)]:
print('put task %s ...'%url)
task.put(url)
# 获得返回结果
print('try get result...')
for i in range(10):
print('result is %s'%result.get(timeout=10)) # 最大等待10秒
# 关闭管理
manager.shutdown()
if __name__ == '__main__':
ip = '127.0.0.1' # 要绑定的本机IP
port = 8001 # 端口号
passwd = b'distributed' # 口令
main(ip,port,passwd)
二、创建客户端,taskWorker.py,客户端代码可以放入多台电脑运行,以达到分布式需求,代码如下:
import time
from multiprocessing.managers import BaseManager
# 创建类似的 QueueManager
class QueueManager(BaseManager):
pass
def main(server_addr,port,kl):
# 第一步:使用 QueueManager 注册用于获取Queue的方法名称
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 第二步:连接到服务器
print('Connect to server %s...'%server_addr)
# 端口和验证口令注意保持与服务进程完全一致
m = QueueManager(address=(server_addr,port),authkey=kl)
# 从网络连接
m.connect()
# 第三步:获取Queue的对象
task = m.get_task_queue()
result = m.get_result_queue()
# 第四步:从task队列获取任务,并把结果写入result队列
while (not task.empty()):
image_url = task.get(True,timeout=5)
print('run task download %s...'%image_url)
time.sleep(1)
result.put('%s--->Computer 1 success'%image_url)
# 处理结束
print('worker exit.')
if __name__ == '__main__':
ip = '127.0.0.1' # 要连接的服务端的IP
port = 8001 # 服务端设定的端口号
passwd = b'distributed' # 口令
main(ip,port,passwd)