• 首页
  • vue
  • TypeScript
  • JavaScript
  • scss
  • css3
  • html5
  • php
  • MySQL
  • redis
  • jQuery
  • xadd 命令

    XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在,将使用流的条目自动创建 key。

    一个条目是由一组键值对组成的,它基本上是一个小的字典。键值对以用户给定的顺序存储,并且读取流的命令(如 XRANGE 或者 XREAD)可以保证按照通过 XADD 添加的顺序返回。


    语法

    XADD key ID field value[field value ...]

    参数 ID

    ID 标识流内的给定条目。如果指定的 ID 参数是字符*(星号 ASCII 字符),XADD 命令会自动为您生成一个唯一的 ID。但是,也可以指定一个良好格式的 ID,以便新的条目以指定的 ID 准确存储,虽然仅在极少数情况下有用。

    ID 是由-隔开的两个数字组成的:1526919030474-55。两个部分数字都是 64 位的,当自动生成 ID 时,第一部分是生成 ID 的 Redis 实例的毫秒格式的 Unix 时间。第二部分只是一个序列号,以及是用来区分同一毫秒内生成的 ID 的。

    ID 保证始终是递增的:如果比较刚插入的条目的 ID,它将大于其他任何过去的 ID,因此条目在流中是完全排序的。为了保证这个特性,如果流中当前最大的 ID 的时间大于实例的当前本地时间,将会使用前者,并将 ID 的序列部分递增。例如,本地始终回调了,或者在故障转移之后新主机具有不同的绝对时间,则可能发生这种情况。

    当用户为 XADD 命令指定显式 ID 时,最小有效的 ID 是0-1,并且用户必须指定一个比当前流中的任何 ID 都要大的 ID,否则命令将失败。通常使用特定 ID 仅在您有另一个系统生成唯一 ID(例如 SQL 表),并且您确实希望 Redis 流 ID 与该另一个系统的 ID 匹配时才有用。

    上限流

    可以使用 MAXLEN 选项来限制流中的最大元素数量。与使用 XADD 添加条目相比较,使用 MAXLEN 修整会很昂贵:流由宏节点表示为基数树,以便非常节省内存。改变由几十个元素组成的单个宏节点不是最佳的。因此可以使用以下特殊形式提供命令:

    XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
    

    在选项 MAXLEN 和实际计数中间的参数~的意思是,用户不是真的需要精确的 1000 个项目。它可以多几十个条目,但决不能少于 1000 个。通过使用这个参数,仅当我们移除整个节点的时候才执行修整。这使得命令更高效,而且这也是我们通常想要的。

    返回值

    该命令返回添加的条目的 ID。如果 ID 参数传的是*,那么 ID 是自动生成的,否则,命令仅返回用户在插入期间指定的相同的 ID。

    示例

    redis> XADD mystream * name Sara surname OConnor
    "1575742009761-0"
    
    redis> XADD mystream * field1 value1 field2 value2 field3 value3
    "1575742009762-0"
    
    redis> XLEN mystream
    (integer) 2
    
    #- 是最小,+ 是最大
    
    redis> XRANGE mystream - +
    1) 1) "1575742009761-0"
       2) 1) "name"
          2) "Sara"
          3) "surname"
          4) "OConnor"
    2) 1) "1575742009762-0"
       2) 1) "field1"
          2) "value1"
          3) "field2"
          4) "value2"
          5) "field3"
          6) "value3"
    

    测试

    第一步,插入数据stm是streams的key,*代表的是redis帮生成序列 id, stream的一条记录可以有多个字段及值,所以后面跟了 url和typo.

    # xiaorui.cc
    
    redis> xadd stm * url xiaorui.cc typo 1
    redis> xadd stm * url xiaorui.cc typo 2
    redis> xadd stm * url xiaorui.cc typo 3
    redis> xadd stm * url xiaorui.cc typo 4
    redis> xadd stm * url xiaorui.cc typo 5
    redis> xadd stm * url xiaorui.cc typo 6
    redis> xadd stm * url xiaorui.cc typo 7
    redis> xadd stm * url xiaorui.cc typo 8
    redis> xadd stm * url xiaorui.cc typo 9
    redis> xadd stm * url xiaorui.cc typo 10
    

    第二步,查询最小到最大的数据,-是最小,+是最大

    redis> xrange stm - +
     1) 1) 1528271957975-0
        2) 1) "url"
           2) "xiaorui.cc"
           3) "typo"
           4) "1"
     2) 1) 1528271958716-0
        2) 1) "url"
           2) "xiaorui.cc"
           3) "typo"
           4) "2"
    

    第三步,创建一个消费组

    redis> xgroup create stm cg1 0-0
    OK
    

    第四步,创建一个消费者并且消费

    redis> xreadgroup GROUP cg1 c1 count 1 streams stm >
    1) 1) "stm"
       2) 1) 1) 1528271957975-0
             2) 1) "url"
                2) "xiaorui.cc"
                3) "typo"
                4) "1"
    

    第五步,查看redis里的stream消费状态,可以看到当前只有一个消费者c1, pending状态的id只有一个。

    redis> xinfo groups stm
    1) 1) name
       2) "cg1"
       3) consumers
       4) (integer) 1
       5) pending
       6) (integer) 1
    redis> xinfo consumers stm cg1
    1) 1) name
       2) "c1"
       3) pending
       4) (integer) 1
       5) idle
       6) (integer) 46018
    

    第六步,使用xack来确认该消息id,之后我们在通过xinfo groups发现pending为0了。

    redis> xack stm cg1 1528271957975-0
    (integer) 1
    redis> xinfo groups stm
    1) 1) name
       2) "cg1"
       3) consumers
       4) (integer) 1
       5) pending
       6) (integer) 0
    

    上面说了基本的用法,我们再试试在一个consumer group里,多个消费者并发去消费请求会出现了什么?

    # xiaorui.cc
    
    redis> xreadgroup GROUP cg1 c2 count 1 streams stm >
    1) 1) "stm"
       2) 1) 1) 1528271958716-0
             2) 1) "url"
                2) "xiaorui.cc"
                3) "typo"
                4) "2"
    redis> xreadgroup GROUP cg1 c1 count 1 streams stm >
    1) 1) "stm"
       2) 1) 1) 1528271960027-0
             2) 1) "url"
                2) "xiaorui.cc"
                3) "typo"
                4) "3"
    

    通过上面的 result 可以看到,两个消费者不会消费到同一个消息 id,>表示从当前消费组的last_delivered_id后面开始读,每当消费者读取一条消息,last_delivered_id变量就会前进。这个get消息和set偏移量的过程是原子的,redis会帮你保证该组合指令的原子性。既然是消费队列,当消费端没有数据的时候,肯定不能忙轮询吧?所以说redis streams也支持类似long poll事件唤醒机制。 block 0 等于一直等待读事件,block 10 等待10s的意思。

    下篇:xtrim 命令