python分布式编程实现过程解析

yipeiwu_com5年前Python基础

分布式编程的难点在于:

1.服务器之间的通信,主节点如何了解从节点的执行进度,并在从节点之间进行负载均衡和任务调度;

2.如何让多个服务器上的进程访问同一资源的不同部分进行执行

第一部分涉及到网络编程的底层细节

第二个问题让我联想到hdfs的一些功能。

首先分布式进程还是解决的是单机单进程无法处理的大数据量大计算量的问题,希望能加通过一份代码(最多主+从两份)来并行执行一个大任务。

这就面临两个问题,首先将程序分布到多台服务器,其次将输入数据分配给多台服务器。

第一个问题相对比较简单,毕竟程序一般不会太长,即便是超级jar包的spark程序,也不过百兆。

但数据里不同,如今企业级别的数据动辄GB、TB,如果在分布式程序执行之前首先要进行大容量数据的转移,显然是不可取的。

这时候我们就需要一个中央共享数据源,所有服务器都可以对这个数据源进行并行存取(块block),这就已经非常接近hdfs的功能。

因为在hdfs中,集群中的多台服务器共享同一个hdfs,每台机器访问hdfs就像访问本地数据一样(还是稍微慢一点);

计算任务执行完之后,每台服务器还可以将自己的计算结果写回hdfs,每台服务器的结果被存储成了结果目录中的小文件。

# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
  pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
  n = random.randint(0, 10000)
  print('Put task %d...' % n)
  task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
  r = result.get(timeout=10)
  print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')
# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
  pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
  try:
    n = task.get(timeout=1)
    print('run task %d * %d...' % (n, n))
    r = '%d * %d = %d' % (n, n, n*n)
    time.sleep(1)
    result.put(r)
  except Queue.Empty:
    print('task queue is empty.')
# 处理结束:
print('worker exit.')

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

python中字典(Dictionary)用法实例详解

本文实例讲述了python中字典(Dictionary)用法。分享给大家供大家参考。具体分析如下: 字典(Dictionary)是一种映射结构的数据类型,由无序的“键-值对”组成。字典的...

Python 除法小技巧

复制代码 代码如下:from __future__ import division print 7/3 输出结果: 2.3333333333...

基于pandas将类别属性转化为数值属性的方法

基于pandas将类别属性转化为数值属性的方法

离散特征的编码分为两种情况: 1、离散特征的取值之间没有大小的意义,比如color:[red,blue],那么就使用one-hot编码 2、离散特征的取值有大小的意义,比如size:[...

python2与python3的print及字符串格式化小结

python2与python3的print及字符串格式化小结

最近一直在用python写程序,对于python的print一直很恼火,老是不按照预期输出。在python2中print是一种输出语句,和if语句,while语句一样的东西,在pytho...

Python 面试中 8 个必考问题

1、下面这段代码的输出结果是什么?请解释。 def extendList(val, list=[]): list.append(val) return list list...