Skip to content

Commit

Permalink
Update XINFO STREAM reply (#3421)
Browse files Browse the repository at this point in the history
with new active-time field and seen-time meaning.
  • Loading branch information
sazzad16 authored May 18, 2023
1 parent 65d47fc commit 9803f89
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
13 changes: 8 additions & 5 deletions src/main/java/redis/clients/jedis/BuilderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,8 @@ public String toString() {
}
};

private static final Builder<List<StreamConsumerFullInfo>> STREAM_CONSUMER_FULL_INFO_LIST = new Builder<List<StreamConsumerFullInfo>>() {
private static final Builder<List<StreamConsumerFullInfo>> STREAM_CONSUMER_FULL_INFO_LIST
= new Builder<List<StreamConsumerFullInfo>>() {

final Map<String, Builder> mappingFunctions = createDecoderMap();

Expand Down Expand Up @@ -1340,7 +1341,8 @@ public List<StreamConsumerFullInfo> build(Object data) {
for (Object streamsEntry : streamsEntries) {
List<Object> consumerInfoList = (List<Object>) streamsEntry;
Iterator<Object> consumerInfoIterator = consumerInfoList.iterator();
StreamConsumerFullInfo consumerInfo = new StreamConsumerFullInfo(createMapFromDecodingFunctions(consumerInfoIterator, mappingFunctions));
StreamConsumerFullInfo consumerInfo = new StreamConsumerFullInfo(
createMapFromDecodingFunctions(consumerInfoIterator, mappingFunctions));
list.add(consumerInfo);
}
return list;
Expand All @@ -1352,7 +1354,8 @@ public String toString() {
}
};

private static final Builder<List<StreamGroupFullInfo>> STREAM_GROUP_FULL_INFO_LIST = new Builder<List<StreamGroupFullInfo>>() {
private static final Builder<List<StreamGroupFullInfo>> STREAM_GROUP_FULL_INFO_LIST
= new Builder<List<StreamGroupFullInfo>>() {

final Map<String, Builder> mappingFunctions = createDecoderMap();

Expand Down Expand Up @@ -1384,8 +1387,8 @@ public List<StreamGroupFullInfo> build(Object data) {

Iterator<Object> groupInfoIterator = groupInfo.iterator();

StreamGroupFullInfo groupFullInfo = new StreamGroupFullInfo(createMapFromDecodingFunctions(
groupInfoIterator, mappingFunctions));
StreamGroupFullInfo groupFullInfo = new StreamGroupFullInfo(
createMapFromDecodingFunctions(groupInfoIterator, mappingFunctions));
list.add(groupFullInfo);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ public class StreamConsumerFullInfo implements Serializable {

public static final String NAME = "name";
public static final String SEEN_TIME = "seen-time";
public static final String ACTIVE_TIME = "active-time";
public static final String PEL_COUNT = "pel-count";
public static final String PENDING = "pending";

private final String name;
private final Long seenTime;
private final Long activeTime; // since Redis 7.2
private final Long pelCount;
private final List<List<Object>> pending;
private final Map<String, Object> consumerInfo;
Expand All @@ -28,20 +30,29 @@ public StreamConsumerFullInfo(Map<String, Object> map) {
consumerInfo = map;
name = (String) map.get(NAME);
seenTime = (Long) map.get(SEEN_TIME);
activeTime = (Long) map.get(ACTIVE_TIME);
pending = (List<List<Object>>) map.get(PENDING);
pelCount = (Long) map.get(PEL_COUNT);

pending.stream().forEach(entry -> entry.set(0, new StreamEntryID((String) entry.get(0))));
pending.forEach(entry -> entry.set(0, new StreamEntryID((String) entry.get(0))));
}

public String getName() {
return name;
}

// TODO: Long
public long getSeenTime() {
return seenTime;
}

/**
* Since Redis 7.2.
*/
public Long getActiveTime() {
return activeTime;
}

public Long getPelCount() {
return pelCount;
}
Expand All @@ -50,6 +61,9 @@ public List<List<Object>> getPending() {
return pending;
}

/**
* All data.
*/
public Map<String, Object> getConsumerInfo() {
return consumerInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

import redis.clients.jedis.BuilderFactory;
Expand Down Expand Up @@ -917,6 +919,8 @@ public void xinfoStreamFullWithPending() {
assertEquals(1, group.getConsumers().size());
StreamConsumerFullInfo consumer = group.getConsumers().get(0);
assertEquals("xreadGroup-consumer", consumer.getName());
MatcherAssert.assertThat(consumer.getSeenTime(), Matchers.greaterThanOrEqualTo(0L));
MatcherAssert.assertThat(consumer.getActiveTime(), Matchers.greaterThanOrEqualTo(0L));
assertEquals(1, consumer.getPending().size());
List<Object> consumerPendingEntry = consumer.getPending().get(0);
assertEquals(id1, consumerPendingEntry.get(0));
Expand Down

0 comments on commit 9803f89

Please sign in to comment.