Skip to content

Commit

Permalink
Fix Stream Lag and Add EntriesAdded/EntriesRead
Browse files Browse the repository at this point in the history
Added them according to redis spec described here
https://redis.io/docs/latest/commands/xinfo-groups/
  • Loading branch information
sancar committed Jun 6, 2024
1 parent 4923b04 commit 971abad
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 28 deletions.
13 changes: 10 additions & 3 deletions cmd_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,11 @@ func (m *Miniredis) cmdXinfoStream(c *server.Peer, args []string) {
return
}

c.WriteMapLen(1)
c.WriteMapLen(2)
c.WriteBulk("length")
c.WriteInt(len(s.entries))
c.WriteBulk("entries-added")
c.WriteInt(s.entriesAdded)
})
}

Expand Down Expand Up @@ -610,9 +612,14 @@ func (m *Miniredis) cmdXinfoGroups(c *server.Peer, args []string) {
c.WriteBulk("last-delivered-id")
c.WriteBulk(g.lastID)
c.WriteBulk("entries-read")
c.WriteNull()
c.WriteInt(g.entriesRead)
c.WriteBulk("lag")
c.WriteInt(len(g.stream.entries))
lag := g.lag()
if lag == -1 {
c.WriteNull()
} else {
c.WriteInt(lag)
}
}
})
}
Expand Down
144 changes: 126 additions & 18 deletions cmd_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestStream(t *testing.T) {

mustDo(t, c,
"XINFO", "STREAM", "s",
proto.Array(proto.String("length"), proto.Int(1)),
proto.Array(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)),
)

now := time.Date(2001, 1, 1, 4, 4, 5, 4000000, time.UTC)
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestStream(t *testing.T) {
t.Run("resp3", func(t *testing.T) {
mustDo(t, c,
"XINFO", "STREAM", "s",
proto.Map(proto.String("length"), proto.Int(1)),
proto.Map(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)),
)
})
}
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestStreamInfo(t *testing.T) {

mustDo(t, c,
"XINFO", "STREAM", "planets",
proto.Array(proto.String("length"), proto.Int(1)),
proto.Array(proto.String("length"), proto.Int(1), proto.String("entries-added"), proto.Int(1)),
)

mustDo(t, c,
Expand Down Expand Up @@ -605,7 +605,7 @@ func TestStreamGroup(t *testing.T) {
proto.String("consumers"), proto.Int(0),
proto.String("pending"), proto.Int(0),
proto.String("last-delivered-id"), proto.String("0-0"),
proto.String("entries-read"), proto.Nil,
proto.String("entries-read"), proto.Int(0),
proto.String("lag"), proto.Int(0),
),
),
Expand All @@ -627,7 +627,7 @@ func TestStreamGroup(t *testing.T) {
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(0),
proto.String("last-delivered-id"), proto.String("0-0"),
proto.String("entries-read"), proto.Nil,
proto.String("entries-read"), proto.Int(0),
proto.String("lag"), proto.Int(0),
),
),
Expand Down Expand Up @@ -660,7 +660,7 @@ func TestStreamGroup(t *testing.T) {
proto.String("consumers"), proto.Int(0),
proto.String("pending"), proto.Int(0),
proto.String("last-delivered-id"), proto.String("0-0"),
proto.String("entries-read"), proto.Nil,
proto.String("entries-read"), proto.Int(0),
proto.String("lag"), proto.Int(0),
),
),
Expand Down Expand Up @@ -727,7 +727,7 @@ func TestStreamReadGroup(t *testing.T) {
proto.String("consumers"), proto.Int(0),
proto.String("pending"), proto.Int(0),
proto.String("last-delivered-id"), proto.String("0-0"),
proto.String("entries-read"), proto.Nil,
proto.String("entries-read"), proto.Int(0),
proto.String("lag"), proto.Int(0),
),
),
Expand All @@ -747,6 +747,20 @@ func TestStreamReadGroup(t *testing.T) {
"XLEN", "planets",
)

mustDo(t, c,
"XINFO", "GROUPS", "planets",
proto.Array(
proto.Array(
proto.String("name"), proto.String("processing"),
proto.String("consumers"), proto.Int(0),
proto.String("pending"), proto.Int(0),
proto.String("last-delivered-id"), proto.String("0-0"),
proto.String("entries-read"), proto.Int(0),
proto.String("lag"), proto.Int(1),
),
),
)

mustDo(t, c,
"XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">",
proto.Array(
Expand All @@ -762,8 +776,8 @@ func TestStreamReadGroup(t *testing.T) {
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(1),
proto.String("last-delivered-id"), proto.String("0-1"),
proto.String("entries-read"), proto.Nil,
proto.String("lag"), proto.Int(1),
proto.String("entries-read"), proto.Int(1),
proto.String("lag"), proto.Int(0),
),
),
)
Expand Down Expand Up @@ -806,6 +820,11 @@ func TestStreamDelete(t *testing.T) {
proto.String("0-1"),
)

mustDo(t, c,
"XADD", "planets", "0-2", "name", "Venus",
proto.String("0-2"),
)

mustDo(t, c,
"XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">",
proto.Array(
Expand All @@ -816,24 +835,84 @@ func TestStreamDelete(t *testing.T) {
proto.String("0-1"),
proto.Strings("name", "Mercury"),
),
proto.Array(
proto.String("0-2"),
proto.Strings("name", "Venus"),
),
),
),
),
)

mustDo(t, c,
"XADD", "planets", "0-2", "name", "Mercury",
proto.String("0-2"),
"XINFO", "GROUPS", "planets",
proto.Array(
proto.Array(
proto.String("name"), proto.String("processing"),
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(2),
proto.String("last-delivered-id"), proto.String("0-2"),
proto.String("entries-read"), proto.Int(2),
proto.String("lag"), proto.Int(0),
),
),
)

mustDo(t, c,
"XADD", "planets", "0-3", "name", "Earth",
proto.String("0-3"),
)

mustDo(t, c,
"XINFO", "GROUPS", "planets",
proto.Array(
proto.Array(
proto.String("name"), proto.String("processing"),
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(2),
proto.String("last-delivered-id"), proto.String("0-2"),
proto.String("entries-read"), proto.Int(2),
proto.String("lag"), proto.Int(1),
),
),
)

must1(t, c,
"XDEL", "planets", "0-1",
)

mustDo(t, c,
"XINFO", "GROUPS", "planets",
proto.Array(
proto.Array(
proto.String("name"), proto.String("processing"),
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(1),
proto.String("last-delivered-id"), proto.String("0-2"),
proto.String("entries-read"), proto.Int(2),
proto.String("lag"), proto.Int(1),
),
),
)

must1(t, c,
"XDEL", "planets", "0-2",
)

mustDo(t, c,
"XINFO", "GROUPS", "planets",
proto.Array(
proto.Array(
proto.String("name"), proto.String("processing"),
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(0),
proto.String("last-delivered-id"), proto.String("0-2"),
proto.String("entries-read"), proto.Int(2),
proto.String("lag"), proto.Nil,
),
),
)

mustDo(t, c,
"XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", "0-0",
proto.Array(
Expand Down Expand Up @@ -865,9 +944,38 @@ func TestStreamAck(t *testing.T) {
),
)

mustDo(t, c,
"XINFO", "GROUPS", "planets",
proto.Array(
proto.Array(
proto.String("name"), proto.String("processing"),
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(1),
proto.String("last-delivered-id"), proto.String("0-1"),
proto.String("entries-read"), proto.Int(1),
proto.String("lag"), proto.Int(0),
),
),
)

must1(t, c,
"XACK", "planets", "processing", "0-1",
)

mustDo(t, c,
"XINFO", "GROUPS", "planets",
proto.Array(
proto.Array(
proto.String("name"), proto.String("processing"),
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(0),
proto.String("last-delivered-id"), proto.String("0-1"),
proto.String("entries-read"), proto.Int(1),
proto.String("lag"), proto.Int(0),
),
),
)

mustDo(t, c,
"XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", "0-0",
proto.Array(
Expand All @@ -885,8 +993,8 @@ func TestStreamAck(t *testing.T) {
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(0),
proto.String("last-delivered-id"), proto.String("0-1"),
proto.String("entries-read"), proto.Nil,
proto.String("lag"), proto.Int(1),
proto.String("entries-read"), proto.Int(1),
proto.String("lag"), proto.Int(0),
),
),
)
Expand Down Expand Up @@ -1353,8 +1461,8 @@ func TestStreamAutoClaim(t *testing.T) {
proto.String("consumers"), proto.Int(0),
proto.String("pending"), proto.Int(0),
proto.String("last-delivered-id"), proto.String("0-2"),
proto.String("entries-read"), proto.Nil,
proto.String("lag"), proto.Int(2),
proto.String("entries-read"), proto.Int(2),
proto.String("lag"), proto.Int(0),
),
),
)
Expand Down Expand Up @@ -1428,7 +1536,7 @@ func TestStreamClaim(t *testing.T) {
proto.String("consumers"), proto.Int(1),
proto.String("pending"), proto.Int(2),
proto.String("last-delivered-id"), proto.String("0-0"),
proto.String("entries-read"), proto.Nil,
proto.String("entries-read"), proto.Int(0),
proto.String("lag"), proto.Int(2),
),
),
Expand Down Expand Up @@ -1479,8 +1587,8 @@ func TestStreamClaim(t *testing.T) {
proto.String("consumers"), proto.Int(2),
proto.String("pending"), proto.Int(1),
proto.String("last-delivered-id"), proto.String("0-0"),
proto.String("entries-read"), proto.Nil,
proto.String("lag"), proto.Int(1),
proto.String("entries-read"), proto.Int(0),
proto.String("lag"), proto.Int(2),
),
),
)
Expand Down
Loading

0 comments on commit 971abad

Please sign in to comment.