Redis进阶(二)实现消息队列

本文将会介绍如何使用Redis中的List和Stream结构来实现消息队列。

欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。

欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。

在文章Redis进阶(一)使用Redis实现分布式锁中,笔者介绍了如何使用Redis来实现分布式锁。

本文将会介绍Redis的另一个应用场景:使用Redis实现消息队列。Redis中的List和Stream数据结构都适合用来实现消息队列。

下面将会详细介绍如何使用Redis中的List和Stream结构来实现消息队列,使用语言为Python.

List结构实现

使用Redis数据结构List中的lpush和brpop(阻塞式)命令分别实现消息队列的发送(product)和消费(consume)消息。

Python实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import redis
import time

# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 定义队列名
QUEUE_NAME = 'list_queue_task'


# 生产者:向队列推送消息
def producer(message):
redis_client.lpush(QUEUE_NAME, message)
print(f"Produced: {message}")


# 消费者:从队列拉取消息(使用阻塞方式)
def consumer_blocking():
while True:
# brpop会阻塞直到有数据
result = redis_client.brpop(QUEUE_NAME)
if result:
queue_name, message = result
print(f"Consumed: {message.decode('utf-8')}")


if __name__ == "__main__":
# 示例:先生产一些消息
for i in range(5):
producer(f"Message {i}")
time.sleep(2)

# 启动消费者(阻塞式)
consumer_blocking()

Stream结构实现

Redis Stream是Redis 5.0版本中引入的一种新的数据结构,它主要用于高效地处理流式数据,特别适用于消息队列、日志记录和实时数据分析等场景,其优势如下:

  • 持久化存储:Stream中的消息可以被持久化存储,确保数据不会丢失,即使在Redis服务器重启后也能恢复消息。
  • 有序性:消息按照产生顺序生成消息ID, 被添加到Stream中,并且可以按照指定的条件检索消息,保证了消息的有序性。
  • 多播与分组消费:支持多个消费者同时消费同一流中的消息,并且可以将消费者组织成消费组,实现消息的分组消费。
  • 消息确认机制:消费者可以通过XACK命令确认是否成功消费消息,保证消息至少背消费一次,确保消息不会被重复处理。
  • 阻塞读取:消费者可以选择阻塞读取模式,当没有新消息时,消费者会等待直至新消息到达。
  • 消息可回溯: 方便补数、特殊数据处理, 以及问题回溯查询

使用Redis数据结构Stream中的xadd和xread命令分别实现消息队列的发送(product)和消费(consume)消息。

Python实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import redis
import time

# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# 定义Stream的名字
STREAM_NAME = 'my_stream'


# 生产者:向Stream写入消息
def producer(message):
redis_client.xadd(STREAM_NAME, {'message': message})
print(f"Produced: {message}")


# 消费者:从Stream读取消息
def consumer():
# 初始读取位置:从最新开始(也可以改成 '0' 表示从最早开始)
last_id = '0'
while True:
# XREAD block等待消息,streams是一个字典,key是stream名,value是起始ID
response = redis_client.xread({STREAM_NAME: last_id}, block=5000, count=1)
if response:
stream_name, messages = response[0]
for msg_id, msg_data in messages:
print(f"Consumed: {msg_data[b'message'].decode('utf-8')} (ID: {msg_id.decode('utf-8')})")
last_id = msg_id # 更新last_id,继续读取新消息
else:
print("No new message, waiting...")


if __name__ == "__main__":
# 示例:先生产一些消息
for i in range(5):
producer(f"Message {i}")
time.sleep(2.5)

# 启动消费者
consumer()

上述两种实现消息队列的方式的不同之处在于,List结构为空时该key在Redis不存在,无法找到该key;Stream结构是持久化存储的,因此在Redis中一直存在,能找到对应的key.

总结

本文主要介绍了如何使用Redis中的List和Stream结构来实现消息队列。


Redis进阶(二)实现消息队列
https://percent4.github.io/Redis进阶(二)实现消息队列/
作者
Jclian91
发布于
2025年7月13日
许可协议