Skip to content

Commit

Permalink
Add support for XACK and XPENDING. Add command output parsers for XPE…
Browse files Browse the repository at this point in the history
…NDING #606
  • Loading branch information
mp911de committed May 14, 2018
1 parent 079eefd commit 64f2e6b
Show file tree
Hide file tree
Showing 24 changed files with 604 additions and 391 deletions.
59 changes: 50 additions & 9 deletions src/main/java/com/lambdaworks/redis/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@
*/
package com.lambdaworks.redis;

import java.util.Objects;

import com.lambdaworks.redis.internal.LettuceAssert;

/**
* Value object representing a Stream consumer within a group.
* Value object representing a Stream consumer within a consumer group. Group name and consumer name are encoded as keys.
*
* @author Mark Paluch
* @since 4.5
* @see com.lambdaworks.redis.codec.RedisCodec
*/
public class Consumer {
public class Consumer<K> {

final K group;
final K name;

final String group;
final String name;
private Consumer(K group, K name) {

public Consumer(String group, String name) {
this.group = group;
this.name = name;
}
Expand All @@ -37,11 +44,45 @@ public Consumer(String group, String name) {
* @param name name of the consumer, must not be {@literal null} or empty.
* @return the consumer {@link Consumer} object.
*/
public static Consumer from(String group, String name) {
public static <K> Consumer<K> from(K group, K name) {

LettuceAssert.notEmpty(group, "Group must not be empty");
LettuceAssert.notEmpty(name, "Name must not be empty");
LettuceAssert.notNull(group, "Group must not be null");
LettuceAssert.notNull(name, "Name must not be null");

return new Consumer<>(group, name);
}

/**
* @return name of the group.
*/
public K getGroup() {
return group;
}

/**
* @return name of the consumer.
*/
public K getName() {
return name;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof Consumer))
return false;
Consumer<?> consumer = (Consumer<?>) o;
return Objects.equals(group, consumer.group) && Objects.equals(name, consumer.name);
}

@Override
public int hashCode() {
return Objects.hash(group, name);
}

return new Consumer(group, name);
@Override
public String toString() {
return String.format("%s:%s", group, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

import java.util.List;
import java.util.Map;

import com.lambdaworks.redis.*;
import com.lambdaworks.redis.Consumer;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.XReadArgs.StreamOffset;

/**
* Asynchronous executed commands for Streams.
Expand All @@ -40,7 +40,7 @@ public interface RedisStreamAsyncCommands<K, V> {
* @param messageIds message Ids to acknowledge.
* @return simple-reply the lenght of acknowledged messages.
*/
RedisFuture<Long> xack(K key, String group, String... messageIds);
RedisFuture<Long> xack(K key, K group, String... messageIds);

/**
* Append a message to the stream {@code key}.
Expand Down Expand Up @@ -89,7 +89,7 @@ public interface RedisStreamAsyncCommands<K, V> {
* @param args
* @return simple-reply the {@link StreamMessage}
*/
RedisFuture<StreamMessage<K, V>> xclaim(K key, Consumer consumer, XClaimArgs args, String... messageIds);
RedisFuture<List<StreamMessage<K, V>>> xclaim(K key, Consumer<K> consumer, XClaimArgs args, String... messageIds);

/**
* Create a consumer group.
Expand All @@ -99,7 +99,7 @@ public interface RedisStreamAsyncCommands<K, V> {
* @param offset read offset or {@literal $}.
* @return simple-reply {@literal true} if successful.
*/
RedisFuture<String> xgroupCreate(K key, String group, String offset);
RedisFuture<String> xgroupCreate(K key, K group, String offset);

/**
* Delete a consumer from a consumer group.
Expand All @@ -108,7 +108,7 @@ public interface RedisStreamAsyncCommands<K, V> {
* @param consumer consumer identified by group name and consumer key.
* @return simple-reply the number of pending messages
*/
RedisFuture<String> xgroupDelconsumer(K key, Consumer consumer);
RedisFuture<Boolean> xgroupDelconsumer(K key, Consumer<K> consumer);

/**
* Set the current {@code group} id.
Expand All @@ -118,7 +118,7 @@ public interface RedisStreamAsyncCommands<K, V> {
* @param offset read offset or {@literal $}.
* @return simple-reply the lenght of the stream.
*/
RedisFuture<String> xgroupSetid(K key, String group, String offset);
RedisFuture<Boolean> xgroupSetid(K key, K group, String offset);

/**
* Get the length of a steam.
Expand All @@ -129,22 +129,13 @@ public interface RedisStreamAsyncCommands<K, V> {
RedisFuture<Long> xlen(K key);

/**
* Read pending messages from a stream within a specific {@link Range}.
* Read pending messages from a stream for a {@code group}.
*
* @param key the stream key.
* @param group name of the consumer group.
* @return List&lt;StreamMessage&gt; array-reply list with members of the resulting stream.
*/
RedisFuture<List<StreamMessage<K, V>>> xpending(K key, String group);

/**
* Read pending messages from a stream within a specific {@link Range}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
* @return List&lt;StreamMessage&gt; array-reply list with members of the resulting stream.
* @return List&lt;Object&gt; array-reply list pending entries.
*/
RedisFuture<List<StreamMessage<K, V>>> xpending(K key, Consumer consumer);
RedisFuture<List<Object>> xpending(K key, K group);

/**
* Read pending messages from a stream within a specific {@link Range}.
Expand All @@ -153,20 +144,9 @@ public interface RedisStreamAsyncCommands<K, V> {
* @param group name of the consumer group.
* @param range must not be {@literal null}.
* @param limit must not be {@literal null}.
* @return List&lt;StreamMessage&gt; array-reply list with members of the resulting stream.
*/
RedisFuture<List<PendingEntry>> xpending(K key, String group, Range<String> range, Limit limit);

/**
* Read pending messages from a stream within a specific {@link Range}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
* @param range must not be {@literal null}.
* @param limit must not be {@literal null}.
* @return List&lt;StreamMessage&gt; array-reply list with members of the resulting stream.
* @return List&lt;Object&gt; array-reply list with members of the resulting stream.
*/
RedisFuture<List<StreamMessage<K, V>>> xpending(K key, Consumer consumer, Range<String> range, Limit limit);
RedisFuture<List<Object>> xpending(K key, K group, Range<String> range, Limit limit);

/**
* Read messages from a stream within a specific {@link Range}.
Expand Down Expand Up @@ -207,38 +187,38 @@ public interface RedisStreamAsyncCommands<K, V> {
RedisFuture<List<StreamMessage<K, V>>> xrevrange(K key, Range<String> range, Limit limit);

/**
* Read messages from one or more {@link XReadArgs.StreamOffset}s.
* Read messages from one or more {@link StreamOffset}s.
*
* @param streams the streams to read from.
* @return List&lt;StreamMessage&gt; array-reply list with members of the resulting stream.
*/
RedisFuture<List<StreamMessage<K, V>>> xread(XReadArgs.StreamOffset<K>... streams);
RedisFuture<List<StreamMessage<K, V>>> xread(StreamOffset<K>... streams);

/**
* Read messages from one or more {@link XReadArgs.StreamOffset}s.
* Read messages from one or more {@link StreamOffset}s.
*
* @param args read arguments.
* @param streams the streams to read from.
* @return List&lt;StreamMessage&gt; array-reply list with members of the resulting stream.
*/
RedisFuture<List<StreamMessage<K, V>>> xread(XReadArgs args, XReadArgs.StreamOffset<K>... streams);
RedisFuture<List<StreamMessage<K, V>>> xread(XReadArgs args, StreamOffset<K>... streams);

/**
* Read messages from one or more {@link XReadArgs.StreamOffset}s using a consumer group.
* Read messages from one or more {@link StreamOffset}s using a consumer group.
*
* @param consumer consumer/group.
* @param streams the streams to read from.
* @return List&lt;StreamMessage&gt; array-reply list with members of the resulting stream.
*/
RedisFuture<List<StreamMessage<K, V>>> xreadgroup(Consumer consumer, XReadArgs.StreamOffset<K>... streams);
RedisFuture<List<StreamMessage<K, V>>> xreadgroup(Consumer<K> consumer, StreamOffset<K>... streams);

/**
* Read messages from one or more {@link XReadArgs.StreamOffset}s using a consumer group.
* Read messages from one or more {@link StreamOffset}s using a consumer group.
*
* @param consumer consumer/group.
* @param args read arguments.
* @param streams the streams to read from.
* @return List&lt;StreamMessage&gt; array-reply list with members of the resulting stream.
*/
RedisFuture<List<StreamMessage<K, V>>> xreadgroup(Consumer consumer, XReadArgs args, XReadArgs.StreamOffset<K>... streams);
RedisFuture<List<StreamMessage<K, V>>> xreadgroup(Consumer<K> consumer, XReadArgs args, StreamOffset<K>... streams);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
package com.lambdaworks.redis.api.rx;

import java.util.Map;

import rx.Observable;

import com.lambdaworks.redis.*;
import com.lambdaworks.redis.Consumer;
import com.lambdaworks.redis.XReadArgs.StreamOffset;
import rx.Observable;

/**
* Observable commands for Streams.
Expand All @@ -40,7 +41,7 @@ public interface RedisStreamReactiveCommands<K, V> {
* @param messageIds message Ids to acknowledge.
* @return simple-reply the lenght of acknowledged messages.
*/
Observable<Long> xack(K key, String group, String... messageIds);
Observable<Long> xack(K key, K group, String... messageIds);

/**
* Append a message to the stream {@code key}.
Expand Down Expand Up @@ -89,7 +90,7 @@ public interface RedisStreamReactiveCommands<K, V> {
* @param args
* @return simple-reply the {@link StreamMessage}
*/
Observable<StreamMessage<K, V>> xclaim(K key, Consumer consumer, XClaimArgs args, String... messageIds);
Observable<StreamMessage<K, V>> xclaim(K key, Consumer<K> consumer, XClaimArgs args, String... messageIds);

/**
* Create a consumer group.
Expand All @@ -99,7 +100,7 @@ public interface RedisStreamReactiveCommands<K, V> {
* @param offset read offset or {@literal $}.
* @return simple-reply {@literal true} if successful.
*/
Observable<Boolean> xgroupCreate(K key, String group, String offset);
Observable<String> xgroupCreate(K key, K group, String offset);

/**
* Delete a consumer from a consumer group.
Expand All @@ -108,7 +109,7 @@ public interface RedisStreamReactiveCommands<K, V> {
* @param consumer consumer identified by group name and consumer key.
* @return simple-reply the number of pending messages
*/
Observable<Boolean> xgroupDelconsumer(K key, Consumer consumer);
Observable<Boolean> xgroupDelconsumer(K key, Consumer<K> consumer);

/**
* Set the current {@code group} id.
Expand All @@ -118,7 +119,7 @@ public interface RedisStreamReactiveCommands<K, V> {
* @param offset read offset or {@literal $}.
* @return simple-reply the lenght of the stream.
*/
Observable<Boolean> xgroupSetid(K key, String group, String offset);
Observable<Boolean> xgroupSetid(K key, K group, String offset);

/**
* Get the length of a steam.
Expand All @@ -129,22 +130,13 @@ public interface RedisStreamReactiveCommands<K, V> {
Observable<Long> xlen(K key);

/**
* Read pending messages from a stream within a specific {@link Range}.
* Read pending messages from a stream for a {@code group}.
*
* @param key the stream key.
* @param group name of the consumer group.
* @return StreamMessage array-reply list with members of the resulting stream.
*/
Observable<StreamMessage<K, V>> xpending(K key, String group);

/**
* Read pending messages from a stream within a specific {@link Range}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
* @return StreamMessage array-reply list with members of the resulting stream.
* @return Object array-reply list pending entries.
*/
Observable<StreamMessage<K, V>> xpending(K key, Consumer consumer);
Observable<Object> xpending(K key, K group);

/**
* Read pending messages from a stream within a specific {@link Range}.
Expand All @@ -153,20 +145,9 @@ public interface RedisStreamReactiveCommands<K, V> {
* @param group name of the consumer group.
* @param range must not be {@literal null}.
* @param limit must not be {@literal null}.
* @return StreamMessage array-reply list with members of the resulting stream.
* @return Object array-reply list with members of the resulting stream.
*/
Observable<StreamMessage<K, V>> xpending(K key, String group, Range<String> range, Limit limit);

/**
* Read pending messages from a stream within a specific {@link Range}.
*
* @param key the stream key.
* @param consumer consumer identified by group name and consumer key.
* @param range must not be {@literal null}.
* @param limit must not be {@literal null}.
* @return StreamMessage array-reply list with members of the resulting stream.
*/
Observable<StreamMessage<K, V>> xpending(K key, Consumer consumer, Range<String> range, Limit limit);
Observable<Object> xpending(K key, K group, Range<String> range, Limit limit);

/**
* Read messages from a stream within a specific {@link Range}.
Expand Down Expand Up @@ -215,7 +196,7 @@ public interface RedisStreamReactiveCommands<K, V> {
Observable<StreamMessage<K, V>> xread(StreamOffset<K>... streams);

/**
* Read messages from one or more {@link XReadArgs.StreamOffset}s.
* Read messages from one or more {@link StreamOffset}s.
*
* @param args read arguments.
* @param streams the streams to read from.
Expand All @@ -224,13 +205,13 @@ public interface RedisStreamReactiveCommands<K, V> {
Observable<StreamMessage<K, V>> xread(XReadArgs args, StreamOffset<K>... streams);

/**
* Read messages from one or more {@link XReadArgs.StreamOffset}s using a consumer group.
* Read messages from one or more {@link StreamOffset}s using a consumer group.
*
* @param consumer consumer/group.
* @param streams the streams to read from.
* @return StreamMessage array-reply list with members of the resulting stream.
*/
Observable<StreamMessage<K, V>> xreadgroup(Consumer consumer, StreamOffset<K>... streams);
Observable<StreamMessage<K, V>> xreadgroup(Consumer<K> consumer, StreamOffset<K>... streams);

/**
* Read messages from one or more {@link StreamOffset}s using a consumer group.
Expand All @@ -240,5 +221,5 @@ public interface RedisStreamReactiveCommands<K, V> {
* @param streams the streams to read from.
* @return StreamMessage array-reply list with members of the resulting stream.
*/
Observable<StreamMessage<K, V>> xreadgroup(Consumer consumer, XReadArgs args, XReadArgs.StreamOffset<K>... streams);
Observable<StreamMessage<K, V>> xreadgroup(Consumer<K> consumer, XReadArgs args, StreamOffset<K>... streams);
}
Loading

0 comments on commit 64f2e6b

Please sign in to comment.