本文将会介绍如何使用Docker来启动最新版Kafka和可视化工具kafka-ui,并使用Python连接Kafka进行基础操作。
前言
在文章Kafka入门(一)Kafak介绍、安装与简单使用中,笔者介绍了Kafka的基本概念、使用场景以及从安装到使用的完整流程,包括命令行模式、Offset
Explorer 可视化工具和 Python 脚本操作,适合初学者快速上手。
当时使用的Kafka版本为3.9.0,启动依赖于Zookeeper,且在本地系统中采用软件安装。
本文将会介绍如何使用Docker来启动最新版的Kafka
服务和可视化工具kafka-ui
.
Docker启动Kafka
拉取最新版Kafka的Docker镜像bitnami/kafka:latest
,对应的Kafka版本为4.0.0,其服务启动已不再依赖于Zookeeper.
配置的docker-compose.yml文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| version: '3.8'
networks: kafka-net:
services: kafka: image: bitnami/kafka:latest container_name: kafka networks: - kafka-net ports: - "9092:9092" volumes: - ./kafka_data:/bitnami/kafka environment: - KAFKA_KRAFT_MODE=true - KAFKA_CFG_NODE_ID=1 - KAFKA_CFG_PROCESS_ROLES=broker,controller - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 - KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://kafka:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CLIENT_USERS=jc - KAFKA_CLIENT_PASSWORDS=jckafka
kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui depends_on: - kafka networks: - kafka-net ports: - "8090:8080" environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_PLAINTEXT - KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN - KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="jc" password="jckafka";
|
该Kafka服务采用SASL/PLAIN认证方式。
Python操作Kafka
进入Kafka服务的Docker容器,新建school这个topic,命令如下:
1 2 3 4 5 6 7
| kafka-topics.sh \ --bootstrap-server localhost:9092 \ --command-config /tmp/client.properties \ --create \ --topic school \ --partitions 1 \ --replication-factor 1
|
生产者代码:
1 2 3 4 5 6 7 8 9 10 11
| from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092', security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN', sasl_plain_username='jc', sasl_plain_password='jckafka') for i in range(10): message = f'Hello {i} from Kafka.'.encode('utf-8') producer.send(topic='school', value=message) producer.close()
|
消费者代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
| from kafka import KafkaConsumer
consumer = KafkaConsumer( 'school', bootstrap_servers='localhost:9092', security_protocol='SASL_PLAINTEXT', sasl_mechanism='PLAIN', sasl_plain_username='jc', sasl_plain_password='jckafka' )
for message in consumer: print(f'Received message: {message.value.decode("utf-8")}')
|
在kafka-ui
页面中查看message内容,如下图:

总结
本文最大的贡献在于通过与ChatGPT的交互,给出了Docker启动Kafka的docker-compose.yml配置文件,这样可以很方便地启动Kafka。之前在本地采用软件方式启动Kafka,过程较为麻烦且依赖于Zookeeper.
本文中的docker配置文件和Python脚本已上传至Github,网址为:https://github.com/percent4/ES_Learning
后续笔者将持续更新Kafka相关内容,欢迎关注~
欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。
欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。