对python操作kafka写入json数据的简单demo分享

yipeiwu_com5年前Python基础

如下所示:

安装kafka支持库pip install kafka-python
from kafka import KafkaProducer
import json
 
'''
 生产者demo
 向test_lyl2主题中循环写入10条json数据
 注意事项:要写入json数据需加上value_serializer参数,如下代码
'''
producer = KafkaProducer(
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667']
       )
for i in range(10):
 data={
  "name":"李四",
  "age":23,
  "gender":"男",
  "id":i
 }
 producer.send('test_lyl2', data)
producer.close()
from kafka import KafkaConsumer
import json
 
'''
 消费者demo
 消费test_lyl2主题中的数据
 注意事项:如需以json格式读取数据需加上value_deserializer参数
'''
 
 
consumer = KafkaConsumer('test_lyl2',group_id="lyl-gid1",
       bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'],
       auto_offset_reset='earliest',value_deserializer=json.loads
       )
for message in consumer:
 print(message.value)

以上这篇对python操作kafka写入json数据的简单demo分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

Python中sort和sorted函数代码解析

本文研究的主要是Python中sort和sorted函数的相关内容,具体如下。 一、sort函数 sort函数是序列的内部函数 函数原型: L.sort(cmp=None, key=No...

python 自动批量打开网页的示例

如下所示: import webbrowser import codecs import time with open("test.txt") as fp: for ebayno...

python字典多键值及重复键值的使用方法(详解)

python字典多键值及重复键值的使用方法(详解)

在Python中使用字典,格式如下: dict={ key1:value1 , key2;value2 ...} 在实际访问字典值时的使用格式如下: dict[key] 多...

python开发准备工作之配置虚拟环境(非常重要)

python开发准备工作之配置虚拟环境(非常重要)

之前作为菜鸟的我,从来不知道创建虚拟环境来开发python,都是使用全局的来开发项目,这样最后的结果是,所有的包全部安装在全局,也不能有好的在切换py2中切换,现在讲解在widow下使用...

python django集成cas验证系统

python django集成cas验证系统

加入cas的好处 cas是什么东西就不多说了,简而言之就是单点登陆系统,一处登陆,全网有权限的系统均可以访问. 一次登陆,多个系统互通 cas一般均放置在内网,加入cas验证则必须要求用...