Redis Stream:消息队列的典型案例

发表时间: 2018-10-23 12:19

Redis Stream

Redis最新的大版本5.0已经RC1了,其中最重要的Feature莫过于Redis Stream了,关于Redis Stream的基本使用介绍和设计理念可以看我之前的一篇文章(Redis Stream简介)。Redis Stream本质上是在Redis内核上(非Redis Module)实现的一个消息发布订阅功能组件。相比于现有的PUB/SUB、BLOCKED LIST,其虽然也可以在简单的场景下作为消息队列来使用,但是Redis Stream无疑要完善很多。Redis Stream提供了消息的持久化和主备复制功能、新的RadixTree数据结构来支持更高效的内存使用和消息读取、甚至是类似于Kafka的Consumer Group功能。今天我们重点关注怎么在实际业务场景下去使用Redis Stream。

Redis Stream实战——IRC系统

相信大家对IRC都比较了解了(还记得被和谐掉的xx聊天室吗:-)),很多知名的开源项目(包括Redis)都有自己的IRC频道,方便开发者和使用者实时的进行思想火花的碰撞,我们今天介绍的主角——Redis Stream,本身就是起源于IRC中一个用户的idea。IRC的模型如下,

在某个IRC频道中的用户,既可以向所有的其他用户自由的发送消息,也可以接收其他所有用户发送的消息。如果要基于Redis来构建一个IRC系统,那我们不由自主的会想到使用Redis的PUB/SUB功能,

可以看到,基于PUB/SUB,只需要所有的用户(client)都订阅(subscribe)同一个IRC频道(channel1),就可以接收所有用户发出的消息了。发出消息时,只需使用发布命令(publish)命令即可。整个业务逻辑非常的清晰简单,这也是Redis强大和流行的重要原因——提供的功能和数据结构能尽可能提升开发者的开发效率。

但是基于PUB/SUB构建的IRC,有一个问题是PUB/SUB的消息模型是Fire and Forgot。也就是说Redis本身并不保存任何历史消息,如果IRC中某个用户的网络连接出现异常,重新加入IRC后,他是看不到断链期间的聊天记录的,新加入的用户同样也看不到最近一段时间的历史记录,这个对用户迅速的理解当前讨论的问题非常不便。此外,如果Redis发生了重启,所有的用户也需要重新订阅频道。

那如果基于Redis Stream来构建IRC呢?

  • 创建频道
# 目前Redis还不支持创建空的stream,所以我们可以添加一个特殊消息,# 来创建一个新的stream(频道)ip:7000> xadd channel1 * create-channel null1528702126345-0
  • 发送消息
# 发送一条消息,只需要使用xadd命令即可,我们可以给每条消息命名,顺便带上消息来源,方便业务逻辑处理。# 我们也可以一次发送多条消息,可以作为优化网络开销的一种手段。ip:7000> xadd channel1 * msg1-tony "Hello everyone."1528702503377-0ip:7000> xadd channel1 * msg2-tony "I am a big Redis fan." msg3-tony "Hope we can learn from each other.:-)"1528702573546-0
  • 接收消息
# 新用户初次加入频道时,指定'$'作为一个特殊起始ID读取消息,表示只接收最新的频道消息# 之后如果新消息,只需从上一次的返回结果ID继续读取即可# 当没有新消息时,xread命令返回空集ip:7000> xread BLOCK 100 STREAMS channel1 ) 1) "channel1" 2) 1) 1) 1528703048021-0 2) 1) "msg1-tony" 2) "Hello everyone."ip:7000> xread BLOCK 100 STREAMS channel1 1528703048021-01) 1) "channel1" 2) 1) 1) 1528703061087-0 2) 1) "msg2-tony" 2) "I am a big Redis fan." 3) "msg3-tony" 4) "Hope we can learn from each other.:-)"ip:7000> xread BLOCK 100 STREAMS channel1 1528703061087-0(nil)
  • 获取历史消息
  • 前面我们提到了,Redis Stream和PUB/SUB相比,一个重要的区别是,Redis Stream可以获取历史发送的消息,所以当一个用户断开连接重新加入IRC时,可以通过如下方式获取历史消息:
# 1528703061087-0 为用户记录的最后接收的消息的IDip:7000> xrange channel1 1528703061087-0 +1) 1) 1528706457462-0 2) 1) "msg1-andy" 2) "Nice to meet you guys."2) 1) 1528706497200-0 2) 1) "msg4-tony" 2) "When will Redis 5.0 GA comes out?"3) 1) 1528706601973-0 2) 1) "msg1-antirez" 2) "I think it will arrive in the second half of 2018."

Redis Stream实战——IoT数据采集

Redis除了强大而且丰富的数据结构支持,还有一个很重要的能力是跨平台,甚至是作为一个嵌入式的存储系统跑在基于ARM的平台上,比如作者之前就宣称,Redis成功的跑在了“树莓派”上。

试想一下,在IoT时代,会有无数随时随地可以接入互联网的智能设备,你家里的冰箱会实时的汇报,冰箱里面有哪些食物,数量多少,新鲜程度如何,空调会汇报现在温度多少,空气质量如何,你的车会不断的汇报发动机的各项数据,变速箱的各项数据,车内空气的各项数据。这么多的IoT设备会形成巨大的数据洪流,采集完成后在云端进行分析,产生巨大的用户价值。

这些数据虽然内容各个不同,但是都有一个共同的特点,都是一种时序数据。看到这里,你可能会突然发现,Redis Stream从设计初就是为了支持时间序列数据而生(见第一部分Redis Stream介绍),Redis又成功的跑在了ARM平台,而未来物联网会有万亿级的设备基于ARM平台。所以,我们不由自主的可以猜想,除了现在在各种互联网服务中作为Cache和KV存储广泛应用,Redis下一个大放异彩的领域也许就在物联网。

上面这个图,就是一个典型的物联网设备信息采集,分析,展示的架构。Redis作为一个嵌入式的存储系统跑在各个IoT设备上,各个设备使用Redis Stream暂存产生的时序数据,然后再异步的推送到云端。云上部署的各个业务程序,会读取推送的原始数据,基于一定的规则进行分析,然后将结果写入可靠的数据存储系统。用户读取结果,在APP或者web页面上进行展示,从而整个系统形成一个闭环。

作者简介

夏德军,花名夏周,阿里云Redis技术专家,负责阿里云Redis内核开发和维护。活跃于开源社区,Redis Contributor,设计并实现了阿里云Redis开源项目ApsaraCache的部分核心feature,如时间点恢复,binlog同步等。