kafka-python批量发送数据的实例

yipeiwu_com6年前Python基础

如下所示:

from kafka import KafkaClient
from kafka.producer import SimpleProducer
def send_data_2_kafka(datas):
  '''
    向kafka解析队列发送数据
  '''
  client = KafkaClient(hosts=KAFKABROKER.split(","), timeout=30)
  producer = SimpleProducer(client, async=False)
 
  curcount = len(datas)/PARTNUM
  for i in range(0, PARTNUM):
    start = i*curcount
    if i != PARTNUM - 1:
      end = (i+1)*curcount
      curdata = datas[start:end]
      producer.send_messages(TOPICNAME, *curdata)
    else:
      curdata = datas[start:]
      producer.send_messages(TOPICNAME, *curdata)
    
  producer.stop()
  client.close()

其中PARTNUM为topic的partition的数目,这样保证批量发送的数据均匀的落在kafka的partition中。

以上这篇kafka-python批量发送数据的实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

详解python中xlrd包的安装与处理Excel表格

一、安装xlrd 地址 下载后,使用 pip install .whl 安装即好。 查看帮助: >>> import xlrd >>> help...

python实现多进程代码示例

想要充分利用多核CPU资源,Python中大部分情况下都需要使用多进程,Python中提供了multiprocessing这个包实现多进程。multiprocessing支持子进程、进程...

python 自定义异常和异常捕捉的方法

异常捕捉: try: XXXXX1 raise Exception(“xxxxx2”) except (Exception1,Exception2,……): xxxx3...

python中的全局变量用法分析

本文实例分析了python中的全局变量用法。分享给大家供大家参考。具体分析如下: Python是一种面向对象的开发语言,在函数中使用全局变量,一般应作全局变量说明,只有在函数内经过说明的...

Python处理JSON时的值报错及编码报错的两则解决实录

1、ValueError: Invalid control character at: line 1 column 8363 (char 8362) 使用json.loads(json_...