本文将会介绍如何使用Redis中的List和Stream结构来实现消息队列。
欢迎关注我的公众号NLP奇幻之旅,原创技术文章第一时间推送。
欢迎关注我的知识星球“自然语言处理奇幻之旅”,笔者正在努力构建自己的技术社区。
在文章Redis进阶(一)使用Redis实现分布式锁中,笔者介绍了如何使用Redis来实现分布式锁。
本文将会介绍Redis的另一个应用场景:使用Redis实现消息队列
。Redis中的List和Stream数据结构都适合用来实现消息队列。
下面将会详细介绍如何使用Redis中的List和Stream结构来实现消息队列,使用语言为Python.
List结构实现
使用Redis数据结构List中的lpush和brpop(阻塞式)命令分别实现消息队列
的发送(product)和消费(consume)消息。
Python实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import redis import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
QUEUE_NAME = 'list_queue_task'
def producer(message): redis_client.lpush(QUEUE_NAME, message) print(f"Produced: {message}")
def consumer_blocking(): while True: result = redis_client.brpop(QUEUE_NAME) if result: queue_name, message = result print(f"Consumed: {message.decode('utf-8')}")
if __name__ == "__main__": for i in range(5): producer(f"Message {i}") time.sleep(2)
consumer_blocking()
|

Stream结构实现
Redis Stream是Redis 5.0版本中引入的一种新的数据结构,它主要用于高效地处理流式数据,特别适用于消息队列、日志记录和实时数据分析等场景,其优势如下:
- 持久化存储:Stream中的消息可以被持久化存储,确保数据不会丢失,即使在Redis服务器重启后也能恢复消息。
- 有序性:消息按照产生顺序生成消息ID, 被添加到Stream中,并且可以按照指定的条件检索消息,保证了消息的有序性。
- 多播与分组消费:支持多个消费者同时消费同一流中的消息,并且可以将消费者组织成消费组,实现消息的分组消费。
- 消息确认机制:消费者可以通过XACK命令确认是否成功消费消息,保证消息至少背消费一次,确保消息不会被重复处理。
- 阻塞读取:消费者可以选择阻塞读取模式,当没有新消息时,消费者会等待直至新消息到达。
- 消息可回溯: 方便补数、特殊数据处理, 以及问题回溯查询
使用Redis数据结构Stream中的xadd和xread命令分别实现消息队列
的发送(product)和消费(consume)消息。
Python实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import redis import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
STREAM_NAME = 'my_stream'
def producer(message): redis_client.xadd(STREAM_NAME, {'message': message}) print(f"Produced: {message}")
def consumer(): last_id = '0' while True: response = redis_client.xread({STREAM_NAME: last_id}, block=5000, count=1) if response: stream_name, messages = response[0] for msg_id, msg_data in messages: print(f"Consumed: {msg_data[b'message'].decode('utf-8')} (ID: {msg_id.decode('utf-8')})") last_id = msg_id else: print("No new message, waiting...")
if __name__ == "__main__": for i in range(5): producer(f"Message {i}") time.sleep(2.5)
consumer()
|

上述两种实现消息队列
的方式的不同之处在于,List结构为空时该key在Redis不存在,无法找到该key;Stream结构是持久化存储的,因此在Redis中一直存在,能找到对应的key.
总结
本文主要介绍了如何使用Redis中的List和Stream结构来实现消息队列。