Python测试Kafka集群(pykafka)实例

yipeiwu_com6年前Python基础

生产者代码:

# -* coding:utf8 *- 
from pykafka import KafkaClient 
 
host = 'IP:9092, IP:9092, IP:9092'
client = KafkaClient(hosts = host) 
 
print client.topics 
 
# 生产者 
topicdocu = client.topics['my-topic'] 
producer = topicdocu.get_producer() 
for i in range(100): 
  print i 
  producer.produce('test message ' + str(i ** 2)) 
producer.stop()

消费者代码:

# -* coding:utf8 *- 
from pykafka import KafkaClient 
 
host = 'IP:9092, IP:9092, IP:9092'
client = KafkaClient(hosts = host) 
 
print client.topics 
 
# 消费者 
topic = client.topics['my-topic'] 
consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1, 
                   consumer_id='test') 
for message in consumer: 
  if message is not None: 
    print message.offset, message.value 

以上这篇Python测试Kafka集群(pykafka)实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

浅谈django的render函数的参数问题

hello.html 文件代码如下: HelloWorld/templates/hello.html 文件代码: <h1>{{ hello }}</h1>...

Python和Perl绘制中国北京跑步地图的方法

Python和Perl绘制中国北京跑步地图的方法

当你在一个城市,穿越大街小巷,跑步跑了几千公里之后,一个显而易见的想法是,我到底和之前比快了多少,跑量有何变化,如果能把在这个城市的所有路线全部画出来,会是怎样的景象呢? 1.数据来源...

python装饰器深入学习

什么是装饰器 在我们的软件产品升级时,常常需要给各个函数新增功能,而在我们的软件产品中,相同的函数可能会被调用上百次,这种情况是很常见的,如果我们一个个的修改,那我们的码农岂不要挂掉了(...

Python中反射和描述器总结

反射 在Python中,能够通过一个对象,找出type、class、attribute或者method的能力,成为反射。 函数与方法 内建函数: getattr(object,name[...

Eclipse中Python开发环境搭建简单教程

Eclipse中Python开发环境搭建简单教程

一、背景介绍   Eclipse是一款基于Java的可扩展开发平台。其官方下载中包括J2EE方向版本、Java方向版本、C/C++方向版本、移动应用方向版本等诸多版本。除此之外,Ecli...