网站页面布局,ih5网页制作教程,wix做的网站 网址是什么,女网友叫我一起做优惠券网站Redis - PubSub、Stream流 文章目录 Redis - PubSub、Stream流1.基于List的消息队列2.基于PubSub的消息队列3.基于Stream的消息队列1.Redis Streams简介2.Redis Streams基本命令1.XADD 添加消息到末尾2.XLEN 获取消息长度3.XREAD 读取消息 #xff08;单消费模式#xff09;4…Redis - PubSub、Stream流 文章目录 Redis - PubSub、Stream流1.基于List的消息队列2.基于PubSub的消息队列3.基于Stream的消息队列1.Redis Streams简介2.Redis Streams基本命令1.XADD 添加消息到末尾2.XLEN 获取消息长度3.XREAD 读取消息 单消费模式4.XGROUP 消费组操作5.XREADGROUP GROUP 从消费组读取消息6.XACK 消息确认7.XPENDING 查看pend数据 1.基于List的消息队列
由于redis的list数据结构为双向链表则可以通过lpush和rpop来模拟队列效果由于队列没有消息时候需要阻塞获取队列数据而lpop和rpop在空队列获取数据时会返回null所以需要使用brpop和blpop来进行阻塞获取
#向data1的list存两个数据
lpush data1 aaa bbb#右监听data1 等待20秒
brpop data1 20缺点:
无法避免消息丢失,只支持单消费者无法广播
2.基于PubSub的消息队列
基于发布订阅形式可以广播生产者向channel(信道)发送消息可以由多个消息者去订阅订阅的消费者都可以收到消息 SUBSCRIBE channel [channel ...] #订阅一个或多个信道PUBLISH channel message #向一个信道发送消息PSUBSCRIBE pattern [pattern ...] #通过通配符匹配订阅的信道 匹配规则 代表一个字符 []代表中括号内的可选字符 *代表任意字符SUBSCRIBE log
PUBLISH log zhangsan 缺点:
不支持持久化消息有上限超出会导致消息丢失
3.基于Stream的消息队列
1.Redis Streams简介
官方文档https://redis.io/docs/latest/commands/xadd/
Redis Stream是redis在5.x版本引入的新特性Redis流是一种数据结构它类似于一个只可追加的日志但也实现了多种操作以克服典型只可追加日志的一些限制。这些操作包括O(1)时间的随机访问和复杂的消费策略如消费者组。你可以使用流来记录并同时实时分发事件。Redis流的使用案例包括
事件溯源例如跟踪用户操作、点击等传感器监测例如现场设备的读数通知例如将每个用户的通知记录存储在单独的流中
Redis为每个流条目生成一个唯一的ID。你可以使用这些ID在后续检索与其关联的条目或者读取并处理流中的所有后续条目。请注意由于这些ID与时间相关这里显示的ID可能会有所不同与你自己的Redis实例中看到的ID也会有所不同。 Redis流支持多种修剪策略以防止流无限制地增长和多种消费策略参见XREAD、XREADGROUP和XRANGE。
2.Redis Streams基本命令
stream消息队列相关命令
XADD - 添加消息到末尾XTRIM - 对流进行修剪限制长度XDEL - 删除消息XLEN - 获取流包含的元素数量即消息长度XRANGE - 获取消息列表会自动过滤已经删除的消息XREVRANGE - 反向获取消息列表ID 从大到小XREAD - 以阻塞或非阻塞方式获取消息列表
消费者组相关命令
XGROUP CREATE - 创建消费者组XGROUP CREATECONSUMER 给指定的消费者组添加消费者XREADGROUP GROUP - 读取消费者组中的消息XACK - 将消息标记为已处理XGROUP SETID - 为消费者组设置新的最后递送消息IDXGROUP DELCONSUMER - 删除消费者XGROUP DESTROY - 删除消费者组XPENDING - 显示待处理消息的相关信息XCLAIM - 转移消息的归属权XINFO - 查看流和消费者组的相关信息XINFO GROUPS - 打印消费者组的信息XINFO STREAM - 打印流信息
1.XADD 添加消息到末尾
1.基本语法
XADD是唯一可以向流中添加数据的 Redis 命令但 还有其他命令例如 XDEL 和 XTRIM能够 从流中删除数据。
XADD key [NOMKSTREAM] [MAXLEN | MINID [ | ~] threshold [LIMIT count]] * | id field value [field value ...]key: 队列名[NOMKSTREAM]队列不存在是否自动创建默认自动创建[MAXLEN | MINID [ | ~] threshold [LIMIT count]] :设置消息队列的最大消息数量* | id消息唯一id*代表消息由redis自动生成格式为时间戳-递增序列,可以手动指定field value字段和值键值对可以一次添加多个
XADD users * name zhangsan age 18 #向user发送一条name为zhangsanage为18的消息返回消息id2.指定stream的id参数
1526919030474-55ID标识流中的给定消息数据。 如果指定的ID参数是*字符XADD命令将自动生成一个唯一的ID。然而尽管仅在极少数情况下有用但可以指定一个格式良好的ID以便新条目将与指定的ID完全相同。
XADD mystream 1526919030474-55 message Hello, 当自动生成ID时第一部分是Redis实例生成ID的Unix时间毫秒。第二部分只是一个序列号用于区分在同一毫秒内生成的ID。
XADD mystream 1526919030474-* message World!还可以指定一个不完整的ID只自动生成序列号部分注意6.0版本不支持报错
stream的数据是有序的所以消息的id始终的递增的如果手动指定一个小于上一条数据的id则会出错
2.XLEN 获取消息长度
XLEN users #返回消息个数3.XREAD 读取消息 单消费模式
基础语法
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...][COUNT count] :每次读取的最大数量[BLOCK milliseconds]:当消息没有时是否阻塞阻塞时间毫秒不阻塞就不给值如果给0则永久阻塞等待STREAMS key: 从那个队列读取消息key为读取的队列名id [id ...]起始id代表从那个id的消息开始读取0代表从第一个$代表从最新的消息读取
测试 xread count 1 streams users 0 #读取users中最开始的一条数据消息读取后不会删除所有消费者都可以重复获取 xread count 1 streams users $ #读取最新消息 返回为nil空xread count 1 block 0 streams users $ #永久阻塞读取但是阻塞方式监听到消息后会关闭需要重新监听
此时在开发中我们可以使用死循环来无限读取最新消息进行监听
但是 当指定起始id为$时代表读取最新消息如果处理消息过程中又有超过一条以上的消息到达则下次也只能获取一条最新的消息会导致其他数据漏读
4.XGROUP 消费组操作 消费者组将多个消费者划分到同一个消费组监听同一个队列 分流消费队列中的消费将会分流给消费者组中的消费者不会重复消费加快消息消费速度 消息标识消费者组会维护一个标识记录最后一个被处理非最新的消息即使redis挂机重启也可以按照标识恢复读取确保消息消费 消息确认机制消费者获取消息后消息处于pending状态并存入一个pend-list当处理完成时通过XACK来确认消息标记为已处理才pend-list移除
XGROUP CREATE 创建消费者组
XGROUP CREATE key group id | $ [MKSTREAM] [ENTRIESREAD entries-read]key队列名称group 消费者组名称id | $起始id标识 0代表第一个 $代表最新消息[MKSTREAM]队列不存在时自动创建队列如果不存在且不指定会报错[ENTRIESREAD entries-read]: redis7.0后的参数
创建消费者组g1
XGROUP CREATE users g1 0XGROUP CREATECONSUMER 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key group consumerXGROUP DESTROY 删除指定的消费者组
XGROUP DESTROY key groupXGROUP DELCONSUMER 删除消费者组中指定的消费者
XGROUP DELCONSUMER key group consumer5.XREADGROUP GROUP 从消费组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]group 消费者组名称consumer消费者名称如果不存在会自动创建[COUNT count] :每次读取的最大数量[BLOCK milliseconds]:当消息没有时是否阻塞阻塞时间毫秒不阻塞就不给值如果给0则永久阻塞等待STREAMS key: 从那个队列读取消息key为读取的队列名[NOACK] 无需消息确认(类似自动确认)。id [id ...]起始id
**注意**id取值 从下一个未消费的消息开始非最新消息确保都消费 其他数字根据指定id从pend-list中获取已消费但未确认消息例如0从pend-list第一个消息开始
所以当正常处理时的id都采用 进行消费如果出现异常可以指定0每次都读取第一个pend-list的消息即每次都是读取最新的未处理数据将异常数据处理掉
测试
XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS users 6.XACK 消息确认
XACK key group id [id ...]测试
XACK users g1 1733738565351-0 1733738570018-0 1733738567511-0 1733738587327-07.XPENDING 查看pend数据
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]key队列名称group 消费者组名称[IDLE min-idle-time]:查看过去空闲时间的以上的消息比如给5000则查询空闲时间5000ms以上的消息start end 消息起始范围 “- ”代表所有count 获取数量[consumer]: 获取那个消费者的
测试
XPENDING users g1 - 10参考来源https://www.bilibili.com/video/BV1cr4y1671t/?spm_id_from333.788.videopod.episodesvd_source97a7d9497f7eb9e537f6b50df8831e27p75