您的位置: 网站首页> python并发编程> 当前文章

分布式概念及分布式进程

老董-我爱我家房产SEO2019-07-17163围观,131赞

  分布式是个高大上的概念,并不是规定死的概念,也不是难以理解的概念,分布式通俗点说就是多台机器协作完成任务!

  假设原来用一台机器上的一个脚本抓取链接提取内容保存数据。现在任务量大,第1台机器负责抓取链接,第2台机器负责提取内容,第三台机器负责保存数据!机器之间通过网络传递信息,这就是一个简单的分布式爬虫了。

  如果是数据库分布式存储,以clickhouse(牛逼的数据库)为例,可以准备4台机器,每台上创建物理表(存储数据),再选择一台创建分布表(管理分配数据),这样就成了一个分布式存储。具体实现上述配置需要参考官方文档。

  Python的multiprocessing模块的子模块managers支持把多个进程分布到多台机器上,通过网络来通信。managers模块封装了网络通信的细节,我们可以很容易地编写分布式多进程程序。我们通过一台windows电脑模拟一个如下场景:

  一台机器server负责生产任务产出任务队列task,另一台机器client负责从task队列取出并完成任务,然后把结果放进esult队列返回给server机器。

  1、准备server_task.py负责生产任务。

  2、准备client.py负责执行任务.

  3、先运行server_task.py,然后运行client.py,再看server_task.py的结果变化。
 

# ‐*‐ coding: utf‐8 ‐*‐

import queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

# 任务个数
task_num = 10

# 任务队列,保存任务
task_queue = queue.Queue(task_num)
# 结果队列,保存结果
result_queue = queue.Queue(task_num)


def get_task():
    return task_queue


def get_result():
    return result_queue


# 创建类似的QueueManager

class QueueManager(BaseManager):
    pass


def win_distributed():
    QueueManager.register('get_task_queue', callable=get_task)
    QueueManager.register('get_result_queue', callable=get_result)
    # 绑定端口并设置验证口令,windows下需要填写IP地址,Linux下不填,默认为本地
    manager = QueueManager(address=('127.0.0.1', 8000),authkey='aaa'.encode('utf-8'))

    # 启动
    manager.start()

    # 通过网络传递任务队列和结果队列
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    try:
        # 添加任务
        for i in range(10):
            print("将整数%s放入待发送的消息列队..." %i)
            task.put(i)
        print('等待结果返回...')

        for i in range(10):
            print('返回结果 %s' % result.get(timeout=10))
    except Exception as e:
        print(e)
    finally:
        # 一定要关闭,否则会报管理未关闭的错误
        manager.shutdown()
        print('master exit!')


if __name__ == '__main__':
    # windows下多进程可能会出现问题,添加这句可以缓解
    freeze_support()
    win_distributed()

D:installpython3python.exe D:/pyscript/test/server_task.py
将整数0放入待发送的消息列队...
将整数1放入待发送的消息列队...
将整数2放入待发送的消息列队...
将整数3放入待发送的消息列队...
将整数4放入待发送的消息列队...
将整数5放入待发送的消息列队...
将整数6放入待发送的消息列队...
将整数7放入待发送的消息列队...
将整数8放入待发送的消息列队...
将整数9放入待发送的消息列队...
等待结果返回...

# ‐*‐ coding: utf‐8 ‐*‐

from multiprocessing.managers import BaseManager


# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass


# 第一步:使用QueueManager注册用于获取Queue的方法名称
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 第二步:连接服务器
server_addr = '127.0.0.1'
print("Connect to server %s" % server_addr)

# 端口和验证口令注意保持与服务进程完全一致
m = QueueManager(address=(server_addr, 8000),authkey='aaa'.encode('utf-8'))

# 从网络连接
m.connect()

# 第三步:获取Queue的对象
task = m.get_task_queue()
result = m.get_result_queue()

# 第四步:从task队列获取任务,并把结果写入result队列:

while not task.empty():
    print("从%s:8000读取任务" % server_addr)
    i = task.get(True, timeout=10)
    print("开始计算平方结果")
    res = i * i
    result.put('{0}的平方是{1}'.format(i,res))

# 处理结束
print('worker exit')


D:installpython3python.exe D:/pyscript/test/client.py
Connect to server 127.0.0.1
从127.0.0.1:8000读取任务
开始计算平方结果
从127.0.0.1:8000读取任务
开始计算平方结果
从127.0.0.1:8000读取任务
开始计算平方结果
从127.0.0.1:8000读取任务
开始计算平方结果
从127.0.0.1:8000读取任务
开始计算平方结果
从127.0.0.1:8000读取任务
开始计算平方结果
从127.0.0.1:8000读取任务
开始计算平方结果
从127.0.0.1:8000读取任务
开始计算平方结果
从127.0.0.1:8000读取任务
开始计算平方结果
从127.0.0.1:8000读取任务
开始计算平方结果
worker exit

Process finished with exit code 0



最终回头看server_task.py的效果:

D:installpython3python.exe D:/pyscript/test/server_task.py
将整数0放入待发送的消息列队...
将整数1放入待发送的消息列队...
将整数2放入待发送的消息列队...
将整数3放入待发送的消息列队...
将整数4放入待发送的消息列队...
将整数5放入待发送的消息列队...
将整数6放入待发送的消息列队...
将整数7放入待发送的消息列队...
将整数8放入待发送的消息列队...
将整数9放入待发送的消息列队...
等待结果返回...
返回结果 0的平方是0
返回结果 1的平方是1
返回结果 2的平方是4
返回结果 3的平方是9
返回结果 4的平方是16
返回结果 5的平方是25
返回结果 6的平方是36
返回结果 7的平方是49
返回结果 8的平方是64
返回结果 9的平方是81
master exit!

Process finished with exit code 0

很赞哦!

python编程网提示:转载请注明来源www.python66.com。
有宝贵意见可添加站长微信(底部),获取技术资料请到公众号(底部)。同行交流请加群 python学习会

文章评论

    分布式概念及分布式进程文章写得不错,值得赞赏

站点信息

  • 网站程序:Laravel
  • 客服微信:a772483200