本文将会介绍如何使用Docker来启动最新版Kafka和可视化工具kafka-ui,并使用Python连接Kafka进行基础操作。
前言
在文章Kafka入门(一)Kafak介绍、安装与简单使用中,笔者介绍了Kafka的基本概念、使用场景以及从安装到使用的完整流程,包括命令行模式、Offset
Explorer 可视化工具和 Python 脚本操作,适合初学者快速上手。
当时使用的Kafka版本为3.9.0,启动依赖于Zookeeper,且在本地系统中采用软件安装。
本文将会介绍如何使用Docker来启动最新版的Kafka服务和可视化工具kafka-ui.
Docker启动Kafka
拉取最新版Kafka的Docker镜像bitnami/kafka:latest,对应的Kafka版本为4.0.0,其服务启动已不再依赖于Zookeeper.
配置的docker-compose.yml文件如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 
 | version: '3.8'
 networks:
 kafka-net:
 
 services:
 kafka:
 image: bitnami/kafka:latest
 container_name: kafka
 networks:
 - kafka-net
 ports:
 - "9092:9092"
 volumes:
 - ./kafka_data:/bitnami/kafka
 environment:
 - KAFKA_KRAFT_MODE=true
 - KAFKA_CFG_NODE_ID=1
 - KAFKA_CFG_PROCESS_ROLES=broker,controller
 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
 - KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
 - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://kafka:9092
 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT
 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT
 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
 - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
 - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
 - ALLOW_PLAINTEXT_LISTENER=yes
 - KAFKA_CLIENT_USERS=jc
 - KAFKA_CLIENT_PASSWORDS=jckafka
 
 kafka-ui:
 image: provectuslabs/kafka-ui:latest
 container_name: kafka-ui
 depends_on:
 - kafka
 networks:
 - kafka-net
 ports:
 - "8090:8080"
 environment:
 - KAFKA_CLUSTERS_0_NAME=local
 - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
 - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_PLAINTEXT
 - KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN
 - KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="jc" password="jckafka";
 
 | 
该Kafka服务采用SASL/PLAIN认证方式。
Python操作Kafka
进入Kafka服务的Docker容器,新建school这个topic,命令如下:
| 12
 3
 4
 5
 6
 7
 
 | kafka-topics.sh \--bootstrap-server localhost:9092 \
 --command-config /tmp/client.properties \
 --create \
 --topic school \
 --partitions 1 \
 --replication-factor 1
 
 | 
生产者代码:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | from kafka import KafkaProducer
 producer = KafkaProducer(bootstrap_servers='localhost:9092',
 security_protocol='SASL_PLAINTEXT',
 sasl_mechanism='PLAIN',
 sasl_plain_username='jc',
 sasl_plain_password='jckafka')
 for i in range(10):
 message = f'Hello {i} from Kafka.'.encode('utf-8')
 producer.send(topic='school', value=message)
 producer.close()
 
 | 
消费者代码:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 
 | from kafka import KafkaConsumer
 consumer = KafkaConsumer(
 'school',
 bootstrap_servers='localhost:9092',
 security_protocol='SASL_PLAINTEXT',
 sasl_mechanism='PLAIN',
 sasl_plain_username='jc',
 sasl_plain_password='jckafka'
 )
 
 for message in consumer:
 print(f'Received message: {message.value.decode("utf-8")}')
 
 | 
在kafka-ui页面中查看message内容,如下图:

总结
本文最大的贡献在于通过与ChatGPT的交互,给出了Docker启动Kafka的docker-compose.yml配置文件,这样可以很方便地启动Kafka。之前在本地采用软件方式启动Kafka,过程较为麻烦且依赖于Zookeeper.
本文中的docker配置文件和Python脚本已上传至Github,网址为:https://github.com/percent4/ES_Learning
后续笔者将持续更新Kafka相关内容,欢迎关注~
欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。
 
欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。
