Kafka入门(四)Kafka消息消费状态全解析:如何精准追踪你的数据流向?

本文将会介绍如何在Kafka中查看消息的消费状态。

在文章Kafka入门(三)Kafka 在 Web 应用中的实践:基于 FastAPI 实现图片 OCR中,笔者介绍了Kafka 在 Web 应用中的实践,也就是Kafka作为消息队列,FastAPI作为Web框架来实现图片上传、OCR识别及结果查看。但是在实际操作过程中,有一个问题困扰了我,那就是:

如何查看Kafka中的消息的消费状态,这些消息哪些已经被消费,哪些还未被消费呢?

本文使用的Kafka已在本地启动,topic为ocr-topic,consumer group为ocr-group。

我们的操作过程遵循如下流程:

  • 关闭consumer.py脚本,在图片上传页面上传3张图片,此时查看Kafka中的消息的消费状态;
  • 开启consumer.py脚本,等待所有图片处理完后,再查看Kafka中的消息的消费状态;

下面将会介绍几种常见的查询Kafka中的消息的消费状态的方法。

消费前

命令行

使用kafka-consumer-groups.sh命令来查看各个Consumer Group中的消息总数、未消费数量、已消费数量。

查询结果如下:

1
2
3
4
5
6
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ocr-group --describe

Consumer group 'ocr-group' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ocr-group ocr-topic 0 23 26 3 - - -

该结果表明在ocr-topic这个topic中共有26条消息,其中23条消息已被消费,最新加入的3条消息未被消费。

Python代码

使用Python代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# -*- coding: utf-8 -*-
# kafka-python==2.0.2

from kafka import KafkaConsumer, KafkaAdminClient, TopicPartition

topic = "ocr-topic"
group_id = "ocr-group"
bootstrap_servers = ["localhost:9092"]

# 创建 KafkaConsumer,但不订阅 topic,而是手动指定分区
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id
)

# 获取 topic 的所有分区
partitions = consumer.partitions_for_topic(topic)
if partitions is None:
print(f"无法获取 topic {topic} 的分区信息")
exit(1)

# 为每个分区创建一个 TopicPartition 对象
topic_partitions = [TopicPartition(topic, p) for p in partitions]

# 查询每个分区的最新 offset(Log End Offset)
end_offsets = consumer.end_offsets(topic_partitions)

# 查询消费组的 committed offset(已消费的消息偏移量)
committed_offsets = {tp: consumer.committed(tp) for tp in topic_partitions}

# 计算 LAG(未消费的消息数量)
for tp in topic_partitions:
committed_offset = committed_offsets.get(tp, 0) or 0 # 可能返回 None,需要转换为 0
end_offset = end_offsets[tp]
lag = end_offset - committed_offset
print(f"Partition {tp.partition}: Log End Offset = {end_offset}, Committed Offset = {committed_offset}, LAG = {lag}")

# 关闭消费者
consumer.close()

运行结果如下:

1
Partition 0: Log End Offset = 26, Committed Offset = 23, LAG = 3

UI界面

在Offset Explorer中,打开Kafka连接,在Consumer中查看ocr-group的信息,如下:

消费后

命令行

开启consumer.py脚本,将Kafka中未消费的消息进行处理,等待处理完成后,再次查看ocr-group中的消息的消费状态,如下:

1
2
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
ocr-group ocr-topic 0 26 26 0 kafka-python-2.0.2-3bf95370-c58c-485d-93c3-1e0e5eb24b04 /192.168.0.100 kafka-python-2.0.2

Python代码

此时Python脚本的运行结果如下:

1
Partition 0: Log End Offset = 26, Committed Offset = 26, LAG = 0

UI界面

在Offset Explorer中查看,结果如下:

总结

本文将会介绍了在Kafka中查看消息的消费状态。

后续笔者将会更新更多关于Kafka相关的文章,欢迎关注~


Kafka入门(四)Kafka消息消费状态全解析:如何精准追踪你的数据流向?
https://percent4.github.io/Kafka入门(四)Kafka消息消费状态全解析:如何精准追踪你的数据流向?/
作者
Jclian91
发布于
2025年2月21日
许可协议