python 多进程队列数据处理详解

yipeiwu_com5年前Python基础

我就废话不多说了,直接上代码吧!

# -*- coding:utf8 -*-
import paho.mqtt.client as mqtt
from multiprocessing import Process, Queue
import time, random, os
import camera_person_num
 
MQTTHOST = "172.19.4.4"
MQTTPORT = 1883
mqttClient = mqtt.Client()
q = Queue() 
 
 
# 连接MQTT服务器
def on_mqtt_connect():
  mqttClient.connect(MQTTHOST, MQTTPORT, 60)
  mqttClient.loop_start()
 
 
# 消息处理函数
def on_message_come(lient, userdata, msg):
  # print(msg.topic + ":" + str(msg.payload.decode("utf-8")))
  
  q.put(msg.payload.decode("utf-8")) # 放入队列
  print("产生消息", msg.payload.decode("utf-8"))
  # 消息处理开启多进程
  # p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8")))
  # p.start()
 
 
def consumer(q, pid):
  print("开启消费序列进程", pid)
  while True:
    msg = q.get()
    # p = Process(target=talk, args=("/camera/person/num/result", msg, pid))
    # p.start()
    talk("/camera/person/num/result", msg, pid) 
 
 
# subscribe 消息订阅
def on_subscribe():
  mqttClient.subscribe("test123", 1) # 主题为"test"
  mqttClient.on_message = on_message_come # 消息到来处理函数
 
 
# publish 消息发布
def on_publish(topic, msg, qos):
  mqttClient.publish(topic, msg, qos);
 
 
# 多进程中发布消息需要重新初始化mqttClient
def talk(topic, msg, pid):
  cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)
  t_max, t_mean, t_min = cameraPsersonNum.personNum()
  # time.sleep(20)
  print("消费消息", pid, msg) 
  mqttClient2 = mqtt.Client()
  mqttClient2.connect(MQTTHOST, MQTTPORT, 60)
  mqttClient2.loop_start()
  mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1)
  mqttClient2.disconnect()
 
 
def main():
  
  on_mqtt_connect()
  on_subscribe()
  for i in range(1, 3):
    c1 = Process(target=consumer, args=(q, i))
    c1.start()
  while True:
    pass
 
 
if __name__ == '__main__':
  main()

以上这篇python 多进程队列数据处理详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

Python实现简单石头剪刀布游戏

Python实现简单石头剪刀布游戏

近日在学习Python的一些基础知识,觉得还是很有趣的一个一门语言!就目前的学习的一些知识,编写了一些一个简单的石头剪刀布的游戏。主要是熟悉一些Python的一些控制语句。 impo...

Python加密方法小结【md5,base64,sha1】

本文实例总结了python加密方法。分享给大家供大家参考,具体如下: MD5加密: def md5(str): import hashlib m = hashlib.md5(...

python twilio模块实现发送手机短信功能

python twilio模块实现发送手机短信功能

前排提示:这个模块不是用于对陌生人进行短信轰炸和电话骚扰的,这个模块也没有这个功能,如果是抱着这个心态来的,可以关闭网页了 语言:python 步骤一:安装twilio模块 pip in...

Django2.1集成xadmin管理后台所遇到的错误集锦(填坑)

Django2.1集成xadmin管理后台所遇到的错误集锦(填坑)

django默认是有一个admin的后台管理模块,但是丑,功能也不齐全,但是大神给我们已经集成好了xadmin后台,我们拿来用即可,但是呢,django已经升级到2.1版本了,xadmi...

python中偏函数partial用法实例分析

本文实例讲述了python中偏函数partial用法。分享给大家供大家参考。具体如下: 函数在执行时,要带上所有必要的参数进行调用。但是,有时参数可以在函数被调用之前提前获知。这种情况下...