kafka-python 获取topic lag值方式

yipeiwu_com6年前Python基础

说真,这个问题看上去很简单,但“得益”与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码。

直接上代码吧

from kafka import SimpleClient, KafkaConsumer
from kafka.common import OffsetRequestPayload, TopicPartition

def get_topic_offset(brokers, topic):
  """
  获取一个topic的offset值的和
  """
  client = SimpleClient(brokers)
  partitions = client.topic_partitions[topic]
  offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
  offsets_responses = client.send_offset_request(offset_requests)
  return sum([r.offsets[0] for r in offsets_responses])


def get_group_offset(brokers, group_id, topic):
  """
  获取一个topic特定group已经消费的offset值的和
  """
  consumer = KafkaConsumer(bootstrap_servers=brokers,
               group_id=group_id,
               )
  pts = [TopicPartition(topic=topic, partition=i) for i in
      consumer.partitions_for_topic(topic)]
  result = consumer._coordinator.fetch_committed_offsets(pts)
  return sum([r.offset for r in result.values()])


if __name__ == '__main__':
  topic_offset = get_topic_offset("brokers", "topic")
  group_offset = get_group_offset("brokers", "group_id", "topic")
  lag = topic_offset - group_offset

以上这篇kafka-python 获取topic lag值方式就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持【听图阁-专注于Python设计】。

相关文章

Python程序设计入门(5)类的使用简介

一、类的定义和使用 python定义一个类的基本语法是: 复制代码 代码如下:class classname([基类一,基类二...]):     [def...

Python使用re模块正则提取字符串中括号内的内容示例

本文实例讲述了Python使用re模块正则提取字符串中括号内的内容操作。分享给大家供大家参考,具体如下: 直接上代码吧: # -*- coding:utf-8 -*- #! pyth...

windows下python虚拟环境virtualenv安装和使用详解

前面介绍了python在ubuntu16.04环境下,python的虚拟环境virtualenv的安装,下面介绍在windows环境下的安装和使用。 环境信息 操作系统:windo...

python实现通过shelve修改对象实例

本文实例讲述了python实现通过shelve修改对象的方法,分享给大家供大家参考。 具体实现方法如下: import shelve she = shelve.open('try.s...

python发送HTTP请求的方法小结

本文实例讲述了python发送HTTP请求的方法。分享给大家供大家参考。具体如下: 这里包含 Python 使用 GET/HEAD/POST 方法进行 HTTP 请求 1. GET 方法...