xadd 命令
XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,将使用流的条目自动创建 key。
一个条目是由一组键值对组成的,它基本上是一个小的字典。键值对以用户给定的顺序存储,并且读取流的命令(如 XRANGE 或者 XREAD)可以保证按照通过 XADD 添加的顺序返回。
语法
XADD key ID field value[field value ...]
参数 ID
ID 标识流内的给定条目。如果指定的 ID 参数是字符*(星号 ASCII 字符),XADD 命令会自动为您生成一个唯一的 ID。但是,也可以指定一个良好格式的 ID,以便新的条目以指定的 ID 准确存储,虽然仅在极少数情况下有用。
ID 是由-隔开的两个数字组成的:1526919030474-55。两个部分数字都是 64 位的,当自动生成 ID 时,第一部分是生成 ID 的 Redis 实例的毫秒格式的 Unix 时间。第二部分只是一个序列号,以及是用来区分同一毫秒内生成的 ID 的。
ID 保证始终是递增的:如果比较刚插入的条目的 ID,它将大于其他任何过去的 ID,因此条目在流中是完全排序的。为了保证这个特性,如果流中当前最大的 ID 的时间大于实例的当前本地时间,将会使用前者,并将 ID 的序列部分递增。例如,本地始终回调了,或者在故障转移之后新主机具有不同的绝对时间,则可能发生这种情况。
当用户为 XADD 命令指定显式 ID 时,最小有效的 ID 是0-1,并且用户必须指定一个比当前流中的任何 ID 都要大的 ID,否则命令将失败。通常使用特定 ID 仅在您有另一个系统生成唯一 ID(例如 SQL 表),并且您确实希望 Redis 流 ID 与该另一个系统的 ID 匹配时才有用。
上限流
可以使用 MAXLEN 选项来限制流中的最大元素数量。与使用 XADD 添加条目相比较,使用 MAXLEN 修整会很昂贵:流由宏节点表示为基数树,以便非常节省内存。改变由几十个元素组成的单个宏节点不是最佳的。因此可以使用以下特殊形式提供命令:
XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
在选项 MAXLEN 和实际计数中间的参数~的意思是,用户不是真的需要精确的 1000 个项目。它可以多几十个条目,但决不能少于 1000 个。通过使用这个参数,仅当我们移除整个节点的时候才执行修整。这使得命令更高效,而且这也是我们通常想要的。
返回值
该命令返回添加的条目的 ID。如果 ID 参数传的是*,那么 ID 是自动生成的,否则,命令仅返回用户在插入期间指定的相同的 ID。
示例
redis> XADD mystream * name Sara surname OConnor "1575742009761-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1575742009762-0" redis> XLEN mystream (integer) 2 #- 是最小,+ 是最大 redis> XRANGE mystream - + 1) 1) "1575742009761-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "OConnor" 2) 1) "1575742009762-0" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" 5) "field3" 6) "value3"
测试
第一步,插入数据stm是streams的key,*代表的是redis帮生成序列 id, stream的一条记录可以有多个字段及值,所以后面跟了 url和typo.
# xiaorui.cc redis> xadd stm * url xiaorui.cc typo 1 redis> xadd stm * url xiaorui.cc typo 2 redis> xadd stm * url xiaorui.cc typo 3 redis> xadd stm * url xiaorui.cc typo 4 redis> xadd stm * url xiaorui.cc typo 5 redis> xadd stm * url xiaorui.cc typo 6 redis> xadd stm * url xiaorui.cc typo 7 redis> xadd stm * url xiaorui.cc typo 8 redis> xadd stm * url xiaorui.cc typo 9 redis> xadd stm * url xiaorui.cc typo 10
第二步,查询最小到最大的数据,-是最小,+是最大
redis> xrange stm - + 1) 1) 1528271957975-0 2) 1) "url" 2) "xiaorui.cc" 3) "typo" 4) "1" 2) 1) 1528271958716-0 2) 1) "url" 2) "xiaorui.cc" 3) "typo" 4) "2"
第三步,创建一个消费组
redis> xgroup create stm cg1 0-0 OK
第四步,创建一个消费者并且消费
redis> xreadgroup GROUP cg1 c1 count 1 streams stm > 1) 1) "stm" 2) 1) 1) 1528271957975-0 2) 1) "url" 2) "xiaorui.cc" 3) "typo" 4) "1"
第五步,查看redis里的stream消费状态,可以看到当前只有一个消费者c1, pending状态的id只有一个。
redis> xinfo groups stm 1) 1) name 2) "cg1" 3) consumers 4) (integer) 1 5) pending 6) (integer) 1 redis> xinfo consumers stm cg1 1) 1) name 2) "c1" 3) pending 4) (integer) 1 5) idle 6) (integer) 46018
第六步,使用xack来确认该消息id,之后我们在通过xinfo groups发现pending为0了。
redis> xack stm cg1 1528271957975-0 (integer) 1 redis> xinfo groups stm 1) 1) name 2) "cg1" 3) consumers 4) (integer) 1 5) pending 6) (integer) 0
上面说了基本的用法,我们再试试在一个consumer group里,多个消费者并发去消费请求会出现了什么?
# xiaorui.cc redis> xreadgroup GROUP cg1 c2 count 1 streams stm > 1) 1) "stm" 2) 1) 1) 1528271958716-0 2) 1) "url" 2) "xiaorui.cc" 3) "typo" 4) "2" redis> xreadgroup GROUP cg1 c1 count 1 streams stm > 1) 1) "stm" 2) 1) 1) 1528271960027-0 2) 1) "url" 2) "xiaorui.cc" 3) "typo" 4) "3"
通过上面的 result 可以看到,两个消费者不会消费到同一个消息 id,>表示从当前消费组的last_delivered_id后面开始读,每当消费者读取一条消息,last_delivered_id变量就会前进。这个get消息和set偏移量的过程是原子的,redis会帮你保证该组合指令的原子性。既然是消费队列,当消费端没有数据的时候,肯定不能忙轮询吧?所以说redis streams也支持类似long poll事件唤醒机制。 block 0 等于一直等待读事件,block 10 等待10s的意思。