Skip to content

Commit

Permalink
Polishing #606
Browse files Browse the repository at this point in the history
Post-cherrypick polishing. Sort inherited commands interfaces.
  • Loading branch information
mp911de committed May 14, 2018
1 parent c407687 commit cc11f92
Show file tree
Hide file tree
Showing 33 changed files with 478 additions and 225 deletions.

This file was deleted.

79 changes: 42 additions & 37 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1469,6 +1469,11 @@ public RedisFuture<String> watch(K... keys) {
return dispatch(commandBuilder.watch(keys));
}

@Override
public RedisFuture<Long> xack(K key, K group, String... messageIds) {
return dispatch(commandBuilder.xack(key, group, messageIds));
}

@Override
public RedisFuture<String> xadd(K key, Map<K, V> body) {
return dispatch(commandBuilder.xadd(key, null, body));
Expand All @@ -1490,94 +1495,94 @@ public RedisFuture<String> xadd(K key, XAddArgs args, Object... keysAndValues) {
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xrange(K key, Range<String> range) {
return dispatch(commandBuilder.xrange(key, range, Limit.unlimited()));
public RedisFuture<List<StreamMessage<K, V>>> xclaim(K key, Consumer<K> consumer, long minIdleTime, String... messageIds) {
return dispatch(commandBuilder.xclaim(key, consumer, XClaimArgs.Builder.minIdleTime(minIdleTime), messageIds));
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xrange(K key, Range<String> range, Limit limit) {
return dispatch(commandBuilder.xrange(key, range, limit));
public RedisFuture<List<StreamMessage<K, V>>> xclaim(K key, Consumer<K> consumer, XClaimArgs args, String... messageIds) {
return dispatch(commandBuilder.xclaim(key, consumer, args, messageIds));
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xrevrange(K key, Range<String> range) {
return dispatch(commandBuilder.xrevrange(key, range, Limit.unlimited()));
public RedisFuture<Long> xdel(K key, String... messageIds) {
return dispatch(commandBuilder.xdel(key, messageIds));
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xrevrange(K key, Range<String> range, Limit limit) {
return dispatch(commandBuilder.xrevrange(key, range, limit));
public RedisFuture<String> xgroupCreate(K key, K group, String offset) {
return dispatch(commandBuilder.xgroupCreate(key, group, offset));
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xread(XReadArgs.StreamOffset<K>... streams) {
return dispatch(commandBuilder.xread(streams, null));
public RedisFuture<Boolean> xgroupDelconsumer(K key, Consumer<K> consumer) {
return null;
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xread(XReadArgs args, XReadArgs.StreamOffset<K>... streams) {
return dispatch(commandBuilder.xread(streams, args));
public RedisFuture<Boolean> xgroupSetid(K key, K group, String offset) {
return null;
}

@Override
public RedisFuture<Long> xack(K key, K group, String... messageIds) {
return dispatch(commandBuilder.xack(key, group, messageIds));
public RedisFuture<Long> xlen(K key) {
return dispatch(commandBuilder.xlen(key));
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xclaim(K key, Consumer<K> consumer, long minIdleTime, String... messageIds) {
return dispatch(commandBuilder.xclaim(key, consumer, messageIds, XClaimArgs.Builder.minIdleTime(minIdleTime)));
public RedisFuture<List<Object>> xpending(K key, K group) {
return dispatch(commandBuilder.xpending(key, group, Range.unbounded(), Limit.unlimited()));
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xclaim(K key, Consumer<K> consumer, XClaimArgs args, String... messageIds) {
return dispatch(commandBuilder.xclaim(key, consumer, messageIds, args));
public RedisFuture<List<Object>> xpending(K key, K group, Range<String> range, Limit limit) {
return dispatch(commandBuilder.xpending(key, group, range, limit));
}

@Override
public RedisFuture<Long> xdel(K key, String... messageIds) {
return dispatch(commandBuilder.xdel(key, messageIds));
public RedisFuture<List<Object>> xpending(K key, Consumer<K> consumer, Range<String> range, Limit limit) {
return dispatch(commandBuilder.xpending(key, consumer, range, limit));
}

@Override
public RedisFuture<String> xgroupCreate(K key, K group, String offset) {
return dispatch(commandBuilder.xgroupCreate(key, group, offset));
public RedisFuture<List<StreamMessage<K, V>>> xrange(K key, Range<String> range) {
return dispatch(commandBuilder.xrange(key, range, Limit.unlimited()));
}

@Override
public RedisFuture<Boolean> xgroupDelconsumer(K key, Consumer<K> consumer) {
return null;
public RedisFuture<List<StreamMessage<K, V>>> xrange(K key, Range<String> range, Limit limit) {
return dispatch(commandBuilder.xrange(key, range, limit));
}

@Override
public RedisFuture<Boolean> xgroupSetid(K key, K group, String offset) {
return null;
public RedisFuture<List<StreamMessage<K, V>>> xread(XReadArgs.StreamOffset<K>... streams) {
return dispatch(commandBuilder.xread(null, streams));
}

@Override
public RedisFuture<List<Object>> xpending(K key, K group) {
return dispatch(commandBuilder.xpending(key, group, Range.unbounded(), Limit.unlimited()));
public RedisFuture<List<StreamMessage<K, V>>> xread(XReadArgs args, XReadArgs.StreamOffset<K>... streams) {
return dispatch(commandBuilder.xread(args, streams));
}

@Override
public RedisFuture<List<Object>> xpending(K key, K group, Range<String> range, Limit limit) {
return dispatch(commandBuilder.xpending(key, group, range, limit));
public RedisFuture<List<StreamMessage<K, V>>> xreadgroup(Consumer<K> consumer, XReadArgs.StreamOffset<K>... streams) {
return dispatch(commandBuilder.xreadgroup(consumer, null, streams));
}

@Override
public RedisFuture<List<Object>> xpending(K key, Consumer<K> consumer, Range<String> range, Limit limit) {
return dispatch(commandBuilder.xpending(key, consumer, range, limit));
public RedisFuture<List<StreamMessage<K, V>>> xreadgroup(Consumer<K> consumer, XReadArgs args,
XReadArgs.StreamOffset<K>... streams) {
return dispatch(commandBuilder.xreadgroup(consumer, args, streams));
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xreadgroup(Consumer<K> consumer, XReadArgs.StreamOffset<K>... streams) {
return dispatch(commandBuilder.xreadgroup(consumer, streams, null));
public RedisFuture<List<StreamMessage<K, V>>> xrevrange(K key, Range<String> range) {
return dispatch(commandBuilder.xrevrange(key, range, Limit.unlimited()));
}

@Override
public RedisFuture<List<StreamMessage<K, V>>> xreadgroup(Consumer<K> consumer, XReadArgs args,
XReadArgs.StreamOffset<K>... streams) {
return dispatch(commandBuilder.xreadgroup(consumer, streams, args));
public RedisFuture<List<StreamMessage<K, V>>> xrevrange(K key, Range<String> range, Limit limit) {
return dispatch(commandBuilder.xrevrange(key, range, limit));
}

@Override
Expand Down
121 changes: 121 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,127 @@ public Mono<String> watch(K... keys) {
return createMono(() -> commandBuilder.watch(keys));
}

@Override
public Mono<Long> xack(K key, K group, String... messageIds) {
return createMono(() -> commandBuilder.xack(key, group, messageIds));
}

@Override
public Mono<String> xadd(K key, Map<K, V> body) {
return createMono(() -> commandBuilder.xadd(key, null, body));
}

@Override
public Mono<String> xadd(K key, XAddArgs args, Map<K, V> body) {
return createMono(() -> commandBuilder.xadd(key, args, body));
}

@Override
public Mono<String> xadd(K key, Object... keysAndValues) {
return createMono(() -> commandBuilder.xadd(key, null, keysAndValues));
}

@Override
public Mono<String> xadd(K key, XAddArgs args, Object... keysAndValues) {
return createMono(() -> commandBuilder.xadd(key, args, keysAndValues));
}

@Override
public Flux<StreamMessage<K, V>> xclaim(K key, Consumer<K> consumer, long minIdleTime, String... messageIds) {
return createDissolvingFlux(() -> commandBuilder.xclaim(key, consumer, XClaimArgs.Builder.minIdleTime(minIdleTime),
messageIds));
}

@Override
public Flux<StreamMessage<K, V>> xclaim(K key, Consumer<K> consumer, XClaimArgs args, String... messageIds) {
return createDissolvingFlux(() -> commandBuilder.xclaim(key, consumer, args, messageIds));
}

@Override
public Mono<Long> xdel(K key, String... messageIds) {
return createMono(() -> commandBuilder.xdel(key, messageIds));
}

@Override
public Mono<String> xgroupCreate(K key, K group, String offset) {
return createMono(() -> commandBuilder.xgroupCreate(key, group, offset));
}

@Override
public Mono<Boolean> xgroupDelconsumer(K key, Consumer<K> consumer) {
throw new UnsupportedOperationException("Not supported yet");
}

@Override
public Mono<Boolean> xgroupSetid(K key, K group, String offset) {
throw new UnsupportedOperationException("Not supported yet");
}

@Override
public Mono<Long> xlen(K key) {
return createMono(() -> commandBuilder.xlen(key));
}

@Override
public Flux<Object> xpending(K key, K group) {
return xpending(key, group, Range.unbounded(), Limit.unlimited());
}

@Override
public Flux<Object> xpending(K key, K group, Range<String> range, Limit limit) {
return createDissolvingFlux(() -> commandBuilder.xpending(key, group, range, limit));
}

@Override
public Flux<Object> xpending(K key, Consumer<K> consumer, Range<String> range, Limit limit) {
return createDissolvingFlux(() -> commandBuilder.xpending(key, consumer, range, limit));
}

@Override
public Flux<StreamMessage<K, V>> xrange(K key, Range<String> range) {
return createDissolvingFlux(() -> commandBuilder.xrange(key, range, Limit.unlimited()));
}

@Override
public Flux<StreamMessage<K, V>> xrange(K key, Range<String> range, Limit limit) {
return createDissolvingFlux(() -> commandBuilder.xrange(key, range, limit));
}

@Override
public Flux<StreamMessage<K, V>> xread(XReadArgs.StreamOffset<K>... streams) {
return createDissolvingFlux(() -> commandBuilder.xread(null, streams));
}

@Override
public Flux<StreamMessage<K, V>> xread(XReadArgs args, XReadArgs.StreamOffset<K>... streams) {
return createDissolvingFlux(() -> commandBuilder.xread(args, streams));
}

@Override
public Flux<StreamMessage<K, V>> xreadgroup(Consumer<K> consumer, XReadArgs.StreamOffset<K>... streams) {
return createDissolvingFlux(() -> commandBuilder.xreadgroup(consumer, null, streams));
}

@Override
public Flux<StreamMessage<K, V>> xreadgroup(Consumer<K> consumer, XReadArgs args, XReadArgs.StreamOffset<K>... streams) {
return createDissolvingFlux(() -> commandBuilder.xreadgroup(consumer, args, streams));
}

@Override
public Flux<StreamMessage<K, V>> xrevrange(K key, Range<String> range) {
return xrevrange(key, range, Limit.unlimited());
}

@Override
public Flux<StreamMessage<K, V>> xrevrange(K key, Range<String> range, Limit limit) {
return createDissolvingFlux(() -> commandBuilder.xrevrange(key, range, limit));
}

@Override
public Mono<Long> xtrim(K key, long count) {
return createMono(() -> commandBuilder.xtrim(key, count));
}

@Override
public Mono<Long> zadd(K key, double score, V member) {
return createMono(() -> commandBuilder.zadd(key, null, score, member));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lambdaworks.redis;
package io.lettuce.core;

import java.util.Objects;

import com.lambdaworks.redis.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceAssert;

/**
* Value object representing a Stream consumer within a consumer group. Group name and consumer name are encoded as keys.
*
* @author Mark Paluch
* @since 4.5
* @see com.lambdaworks.redis.codec.RedisCodec
* @since 5.1
* @see io.lettuce.core.codec.RedisCodec
*/
public class Consumer<K> {

Expand Down
13 changes: 6 additions & 7 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import java.nio.ByteBuffer;
import java.util.*;

import com.lambdaworks.redis.Range.Boundary;

import io.lettuce.core.Range.Boundary;
import io.lettuce.core.XReadArgs.StreamOffset;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.Utf8StringCodec;
Expand Down Expand Up @@ -2052,8 +2051,8 @@ public Command<K, V, Long> xack(K key, K group, String[] messageIds) {
return createCommand(XACK, new IntegerOutput<>(codec), args);
}

public Command<K, V, List<StreamMessage<K, V>>> xclaim(K key, Consumer<K> consumer, String[] messageIds,
XClaimArgs xClaimArgs) {
public Command<K, V, List<StreamMessage<K, V>>> xclaim(K key, Consumer<K> consumer, XClaimArgs xClaimArgs,
String[] messageIds) {

notNullKey(key);
LettuceAssert.notNull(consumer, "Consumer " + MUST_NOT_BE_NULL);
Expand Down Expand Up @@ -2248,7 +2247,7 @@ private static String getUpperValue(Range<String> range) {
return range.getUpper().getValue();
}

public Command<K, V, List<StreamMessage<K, V>>> xread(StreamOffset<K>[] streams, XReadArgs xReadArgs) {
public Command<K, V, List<StreamMessage<K, V>>> xread(XReadArgs xReadArgs, StreamOffset<K>[] streams) {
LettuceAssert.notNull(streams, "Streams " + MUST_NOT_BE_NULL);
LettuceAssert.isTrue(streams.length > 0, "Streams " + MUST_NOT_BE_EMPTY);

Expand All @@ -2271,8 +2270,8 @@ public Command<K, V, List<StreamMessage<K, V>>> xread(StreamOffset<K>[] streams,
return createCommand(XREAD, new StreamReadOutput<>(codec), args);
}

public Command<K, V, List<StreamMessage<K, V>>> xreadgroup(Consumer<K> consumer, XReadArgs.StreamOffset<K>[] streams,
XReadArgs xReadArgs) {
public Command<K, V, List<StreamMessage<K, V>>> xreadgroup(Consumer<K> consumer, XReadArgs xReadArgs,
StreamOffset<K>[] streams) {
LettuceAssert.notNull(streams, "Streams " + MUST_NOT_BE_NULL);
LettuceAssert.isTrue(streams.length > 0, "Streams " + MUST_NOT_BE_EMPTY);
LettuceAssert.notNull(consumer, "Consumer " + MUST_NOT_BE_NULL);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/StreamMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* A stream message and its id.
*
* @author Mark Paluch
* @since 4.5
* @since 5.1
*/
public class StreamMessage<K, V> {

Expand Down
Loading

0 comments on commit cc11f92

Please sign in to comment.