python分布式编程实现过程解析

yipeiwu_com6年前Python基础

分布式编程的难点在于:

1.服务器之间的通信,主节点如何了解从节点的执行进度,并在从节点之间进行负载均衡和任务调度;

2.如何让多个服务器上的进程访问同一资源的不同部分进行执行

第一部分涉及到网络编程的底层细节

第二个问题让我联想到hdfs的一些功能。

首先分布式进程还是解决的是单机单进程无法处理的大数据量大计算量的问题,希望能加通过一份代码(最多主+从两份)来并行执行一个大任务。

这就面临两个问题,首先将程序分布到多台服务器,其次将输入数据分配给多台服务器。

第一个问题相对比较简单,毕竟程序一般不会太长,即便是超级jar包的spark程序,也不过百兆。

但数据里不同,如今企业级别的数据动辄GB、TB,如果在分布式程序执行之前首先要进行大容量数据的转移,显然是不可取的。

这时候我们就需要一个中央共享数据源,所有服务器都可以对这个数据源进行并行存取(块block),这就已经非常接近hdfs的功能。

因为在hdfs中,集群中的多台服务器共享同一个hdfs,每台机器访问hdfs就像访问本地数据一样(还是稍微慢一点);

计算任务执行完之后,每台服务器还可以将自己的计算结果写回hdfs,每台服务器的结果被存储成了结果目录中的小文件。

# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
  pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
  n = random.randint(0, 10000)
  print('Put task %d...' % n)
  task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
  r = result.get(timeout=10)
  print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')
# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
  pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
  try:
    n = task.get(timeout=1)
    print('run task %d * %d...' % (n, n))
    r = '%d * %d = %d' % (n, n, n*n)
    time.sleep(1)
    result.put(r)
  except Queue.Empty:
    print('task queue is empty.')
# 处理结束:
print('worker exit.')

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

pycharm设置鼠标悬停查看方法设置

pycharm设置鼠标悬停查看方法设置

我们使用pycharm的时候,有时遇到了不认识的方法习惯于将鼠标悬停在方法上查看方法介绍。那么如何设置呢?下面小编给大家分享一下。 首先假如我们要查看下图所示的方法,鼠标放上去并没有显示...

python 平衡二叉树实现代码示例

python 平衡二叉树实现代码示例

平衡二叉树: 在上一节二叉树的基础上我们实现,如何将生成平衡的二叉树 所谓平衡二叉树: 我自己定义就是:任何一个节点的左高度和右高度的差的绝对值都小于2 如图所示,此时a的左高度等于3,...

利用Python获取赶集网招聘信息前篇

如何获取一个网站的相关信息,获取赶集网的招聘信息,本文为大家介绍利用python获取赶集网招聘信息的关键代码,供大家参考,具体内容如下 import re import urllib...

Python pymongo模块用法示例

本文实例讲述了Python pymongo模块用法。分享给大家供大家参考,具体如下: MongoDB优点 MongoDB是一个为当代web应用而生的noSQL数据库,它有如下优点: 1...

python最长回文串算法

给定一个字符串,要求在这个字符串中找到符合回文性质的最长子串。所谓回文性是指诸如 “aba”,"ababa","abba"这类的字符串,当然单个字符以及两个相邻相同字符也满足回文性质。...