本文将会介绍如何在Kafka中查看消息的消费状态。
在文章Kafka入门(三)Kafka
在 Web 应用中的实践:基于 FastAPI 实现图片 OCR中,笔者介绍了Kafka 在
Web
应用中的实践,也就是Kafka作为消息队列,FastAPI作为Web框架来实现图片上传、OCR识别及结果查看。但是在实际操作过程中,有一个问题困扰了我,那就是:
如何查看Kafka中的消息的消费状态,这些消息哪些已经被消费,哪些还未被消费呢?
本文使用的Kafka已在本地启动,topic为ocr-topic,consumer
group为ocr-group。
我们的操作过程遵循如下流程:
- 关闭consumer.py脚本,在图片上传页面上传3张图片,此时查看Kafka中的消息的消费状态;
- 开启consumer.py脚本,等待所有图片处理完后,再查看Kafka中的消息的消费状态;
下面将会介绍几种常见的查询Kafka中的消息的消费状态的方法。
消费前
命令行
使用kafka-consumer-groups.sh
命令来查看各个Consumer
Group中的消息总数、未消费数量、已消费数量。
查询结果如下:
1 2 3 4 5 6
| $ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ocr-group --describe
Consumer group 'ocr-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID ocr-group ocr-topic 0 23 26 3 - - -
|
该结果表明在ocr-topic这个topic中共有26条消息,其中23条消息已被消费,最新加入的3条消息未被消费。
Python代码
使用Python代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
from kafka import KafkaConsumer, KafkaAdminClient, TopicPartition
topic = "ocr-topic" group_id = "ocr-group" bootstrap_servers = ["localhost:9092"]
consumer = KafkaConsumer( bootstrap_servers=bootstrap_servers, group_id=group_id )
partitions = consumer.partitions_for_topic(topic) if partitions is None: print(f"无法获取 topic {topic} 的分区信息") exit(1)
topic_partitions = [TopicPartition(topic, p) for p in partitions]
end_offsets = consumer.end_offsets(topic_partitions)
committed_offsets = {tp: consumer.committed(tp) for tp in topic_partitions}
for tp in topic_partitions: committed_offset = committed_offsets.get(tp, 0) or 0 end_offset = end_offsets[tp] lag = end_offset - committed_offset print(f"Partition {tp.partition}: Log End Offset = {end_offset}, Committed Offset = {committed_offset}, LAG = {lag}")
consumer.close()
|
运行结果如下:
1
| Partition 0: Log End Offset = 26, Committed Offset = 23, LAG = 3
|
UI界面
在Offset
Explorer中,打开Kafka连接,在Consumer中查看ocr-group
的信息,如下:

消费后
命令行
开启consumer.py
脚本,将Kafka中未消费的消息进行处理,等待处理完成后,再次查看ocr-group中的消息的消费状态,如下:
1 2
| GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID ocr-group ocr-topic 0 26 26 0 kafka-python-2.0.2-3bf95370-c58c-485d-93c3-1e0e5eb24b04 /192.168.0.100 kafka-python-2.0.2
|
Python代码
此时Python脚本的运行结果如下:
1
| Partition 0: Log End Offset = 26, Committed Offset = 26, LAG = 0
|
UI界面
在Offset Explorer中查看,结果如下:

总结
本文将会介绍了在Kafka中查看消息的消费状态。
后续笔者将会更新更多关于Kafka相关的文章,欢迎关注~