您的位置: 网站首页> python并发编程> 当前文章
分布式概念及分布式进程
老董-我爱我家房产SEO2019-07-17180围观,113赞
分布式是个高大上的概念,并不是规定死的概念,也不是难以理解的概念,分布式通俗点说就是多台机器协作完成任务!
假设原来用一台机器上的一个脚本抓取链接提取内容保存数据。现在任务量大,第1台机器负责抓取链接,第2台机器负责提取内容,第三台机器负责保存数据!机器之间通过网络传递信息,这就是一个简单的分布式爬虫了。
如果是数据库分布式存储,以clickhouse(牛逼的数据库)为例,可以准备4台机器,每台上创建物理表(存储数据),再选择一台创建分布表(管理分配数据),这样就成了一个分布式存储。具体实现上述配置需要参考官方文档。
Python的multiprocessing模块的子模块managers支持把多个进程分布到多台机器上,通过网络来通信。managers模块封装了网络通信的细节,我们可以很容易地编写分布式多进程程序。我们通过一台windows电脑模拟一个如下场景:
一台机器server负责生产任务产出任务队列task,另一台机器client负责从task队列取出并完成任务,然后把结果放进esult队列返回给server机器。
1、准备server_task.py负责生产任务。
2、准备client.py负责执行任务.
3、先运行server_task.py,然后运行client.py,再看server_task.py的结果变化。
# ‐*‐ coding: utf‐8 ‐*‐ import queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support # 任务个数 task_num = 10 # 任务队列,保存任务 task_queue = queue.Queue(task_num) # 结果队列,保存结果 result_queue = queue.Queue(task_num) def get_task(): return task_queue def get_result(): return result_queue # 创建类似的QueueManager class QueueManager(BaseManager): pass def win_distributed(): QueueManager.register('get_task_queue', callable=get_task) QueueManager.register('get_result_queue', callable=get_result) # 绑定端口并设置验证口令,windows下需要填写IP地址,Linux下不填,默认为本地 manager = QueueManager(address=('127.0.0.1', 8000),authkey='aaa'.encode('utf-8')) # 启动 manager.start() # 通过网络传递任务队列和结果队列 task = manager.get_task_queue() result = manager.get_result_queue() try: # 添加任务 for i in range(10): print("将整数%s放入待发送的消息列队..." %i) task.put(i) print('等待结果返回...') for i in range(10): print('返回结果 %s' % result.get(timeout=10)) except Exception as e: print(e) finally: # 一定要关闭,否则会报管理未关闭的错误 manager.shutdown() print('master exit!') if __name__ == '__main__': # windows下多进程可能会出现问题,添加这句可以缓解 freeze_support() win_distributed()
D:installpython3python.exe D:/pyscript/test/server_task.py 将整数0放入待发送的消息列队... 将整数1放入待发送的消息列队... 将整数2放入待发送的消息列队... 将整数3放入待发送的消息列队... 将整数4放入待发送的消息列队... 将整数5放入待发送的消息列队... 将整数6放入待发送的消息列队... 将整数7放入待发送的消息列队... 将整数8放入待发送的消息列队... 将整数9放入待发送的消息列队... 等待结果返回...
# ‐*‐ coding: utf‐8 ‐*‐ from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 第一步:使用QueueManager注册用于获取Queue的方法名称 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 第二步:连接服务器 server_addr = '127.0.0.1' print("Connect to server %s" % server_addr) # 端口和验证口令注意保持与服务进程完全一致 m = QueueManager(address=(server_addr, 8000),authkey='aaa'.encode('utf-8')) # 从网络连接 m.connect() # 第三步:获取Queue的对象 task = m.get_task_queue() result = m.get_result_queue() # 第四步:从task队列获取任务,并把结果写入result队列: while not task.empty(): print("从%s:8000读取任务" % server_addr) i = task.get(True, timeout=10) print("开始计算平方结果") res = i * i result.put('{0}的平方是{1}'.format(i,res)) # 处理结束 print('worker exit')
D:installpython3python.exe D:/pyscript/test/client.py Connect to server 127.0.0.1 从127.0.0.1:8000读取任务 开始计算平方结果 从127.0.0.1:8000读取任务 开始计算平方结果 从127.0.0.1:8000读取任务 开始计算平方结果 从127.0.0.1:8000读取任务 开始计算平方结果 从127.0.0.1:8000读取任务 开始计算平方结果 从127.0.0.1:8000读取任务 开始计算平方结果 从127.0.0.1:8000读取任务 开始计算平方结果 从127.0.0.1:8000读取任务 开始计算平方结果 从127.0.0.1:8000读取任务 开始计算平方结果 从127.0.0.1:8000读取任务 开始计算平方结果 worker exit Process finished with exit code 0
最终回头看server_task.py的效果:
D:installpython3python.exe D:/pyscript/test/server_task.py 将整数0放入待发送的消息列队... 将整数1放入待发送的消息列队... 将整数2放入待发送的消息列队... 将整数3放入待发送的消息列队... 将整数4放入待发送的消息列队... 将整数5放入待发送的消息列队... 将整数6放入待发送的消息列队... 将整数7放入待发送的消息列队... 将整数8放入待发送的消息列队... 将整数9放入待发送的消息列队... 等待结果返回... 返回结果 0的平方是0 返回结果 1的平方是1 返回结果 2的平方是4 返回结果 3的平方是9 返回结果 4的平方是16 返回结果 5的平方是25 返回结果 6的平方是36 返回结果 7的平方是49 返回结果 8的平方是64 返回结果 9的平方是81 master exit! Process finished with exit code 0
很赞哦!
python编程网提示:转载请注明来源www.python66.com。
有宝贵意见可添加站长微信(底部),获取技术资料请到公众号(底部)。同行交流请加群
下一篇:进程和线程的区别
相关文章
文章评论
-
分布式概念及分布式进程文章写得不错,值得赞赏
站点信息
- 网站程序:Laravel
- 客服微信:a772483200