diff --git a/src/main/java/com/lambdaworks/redis/models/stream/package-info.java b/src/main/java/com/lambdaworks/redis/models/stream/package-info.java deleted file mode 100644 index 190ac27af3..0000000000 --- a/src/main/java/com/lambdaworks/redis/models/stream/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -/** - * Model and parser for the Stream-related command output such as {@literal XPENDING}. - */ -package com.lambdaworks.redis.models.stream; - diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index f36def365d..7e5d4215a3 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -1469,6 +1469,11 @@ public RedisFuture watch(K... keys) { return dispatch(commandBuilder.watch(keys)); } + @Override + public RedisFuture xack(K key, K group, String... messageIds) { + return dispatch(commandBuilder.xack(key, group, messageIds)); + } + @Override public RedisFuture xadd(K key, Map body) { return dispatch(commandBuilder.xadd(key, null, body)); @@ -1490,94 +1495,94 @@ public RedisFuture xadd(K key, XAddArgs args, Object... keysAndValues) { } @Override - public RedisFuture>> xrange(K key, Range range) { - return dispatch(commandBuilder.xrange(key, range, Limit.unlimited())); + public RedisFuture>> xclaim(K key, Consumer consumer, long minIdleTime, String... messageIds) { + return dispatch(commandBuilder.xclaim(key, consumer, XClaimArgs.Builder.minIdleTime(minIdleTime), messageIds)); } @Override - public RedisFuture>> xrange(K key, Range range, Limit limit) { - return dispatch(commandBuilder.xrange(key, range, limit)); + public RedisFuture>> xclaim(K key, Consumer consumer, XClaimArgs args, String... messageIds) { + return dispatch(commandBuilder.xclaim(key, consumer, args, messageIds)); } @Override - public RedisFuture>> xrevrange(K key, Range range) { - return dispatch(commandBuilder.xrevrange(key, range, Limit.unlimited())); + public RedisFuture xdel(K key, String... messageIds) { + return dispatch(commandBuilder.xdel(key, messageIds)); } @Override - public RedisFuture>> xrevrange(K key, Range range, Limit limit) { - return dispatch(commandBuilder.xrevrange(key, range, limit)); + public RedisFuture xgroupCreate(K key, K group, String offset) { + return dispatch(commandBuilder.xgroupCreate(key, group, offset)); } @Override - public RedisFuture>> xread(XReadArgs.StreamOffset... streams) { - return dispatch(commandBuilder.xread(streams, null)); + public RedisFuture xgroupDelconsumer(K key, Consumer consumer) { + return null; } @Override - public RedisFuture>> xread(XReadArgs args, XReadArgs.StreamOffset... streams) { - return dispatch(commandBuilder.xread(streams, args)); + public RedisFuture xgroupSetid(K key, K group, String offset) { + return null; } @Override - public RedisFuture xack(K key, K group, String... messageIds) { - return dispatch(commandBuilder.xack(key, group, messageIds)); + public RedisFuture xlen(K key) { + return dispatch(commandBuilder.xlen(key)); } @Override - public RedisFuture>> xclaim(K key, Consumer consumer, long minIdleTime, String... messageIds) { - return dispatch(commandBuilder.xclaim(key, consumer, messageIds, XClaimArgs.Builder.minIdleTime(minIdleTime))); + public RedisFuture> xpending(K key, K group) { + return dispatch(commandBuilder.xpending(key, group, Range.unbounded(), Limit.unlimited())); } @Override - public RedisFuture>> xclaim(K key, Consumer consumer, XClaimArgs args, String... messageIds) { - return dispatch(commandBuilder.xclaim(key, consumer, messageIds, args)); + public RedisFuture> xpending(K key, K group, Range range, Limit limit) { + return dispatch(commandBuilder.xpending(key, group, range, limit)); } @Override - public RedisFuture xdel(K key, String... messageIds) { - return dispatch(commandBuilder.xdel(key, messageIds)); + public RedisFuture> xpending(K key, Consumer consumer, Range range, Limit limit) { + return dispatch(commandBuilder.xpending(key, consumer, range, limit)); } @Override - public RedisFuture xgroupCreate(K key, K group, String offset) { - return dispatch(commandBuilder.xgroupCreate(key, group, offset)); + public RedisFuture>> xrange(K key, Range range) { + return dispatch(commandBuilder.xrange(key, range, Limit.unlimited())); } @Override - public RedisFuture xgroupDelconsumer(K key, Consumer consumer) { - return null; + public RedisFuture>> xrange(K key, Range range, Limit limit) { + return dispatch(commandBuilder.xrange(key, range, limit)); } @Override - public RedisFuture xgroupSetid(K key, K group, String offset) { - return null; + public RedisFuture>> xread(XReadArgs.StreamOffset... streams) { + return dispatch(commandBuilder.xread(null, streams)); } @Override - public RedisFuture> xpending(K key, K group) { - return dispatch(commandBuilder.xpending(key, group, Range.unbounded(), Limit.unlimited())); + public RedisFuture>> xread(XReadArgs args, XReadArgs.StreamOffset... streams) { + return dispatch(commandBuilder.xread(args, streams)); } @Override - public RedisFuture> xpending(K key, K group, Range range, Limit limit) { - return dispatch(commandBuilder.xpending(key, group, range, limit)); + public RedisFuture>> xreadgroup(Consumer consumer, XReadArgs.StreamOffset... streams) { + return dispatch(commandBuilder.xreadgroup(consumer, null, streams)); } @Override - public RedisFuture> xpending(K key, Consumer consumer, Range range, Limit limit) { - return dispatch(commandBuilder.xpending(key, consumer, range, limit)); + public RedisFuture>> xreadgroup(Consumer consumer, XReadArgs args, + XReadArgs.StreamOffset... streams) { + return dispatch(commandBuilder.xreadgroup(consumer, args, streams)); } @Override - public RedisFuture>> xreadgroup(Consumer consumer, XReadArgs.StreamOffset... streams) { - return dispatch(commandBuilder.xreadgroup(consumer, streams, null)); + public RedisFuture>> xrevrange(K key, Range range) { + return dispatch(commandBuilder.xrevrange(key, range, Limit.unlimited())); } @Override - public RedisFuture>> xreadgroup(Consumer consumer, XReadArgs args, - XReadArgs.StreamOffset... streams) { - return dispatch(commandBuilder.xreadgroup(consumer, streams, args)); + public RedisFuture>> xrevrange(K key, Range range, Limit limit) { + return dispatch(commandBuilder.xrevrange(key, range, limit)); } @Override diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index ce690ea601..28c2f8c441 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -1482,6 +1482,127 @@ public Mono watch(K... keys) { return createMono(() -> commandBuilder.watch(keys)); } + @Override + public Mono xack(K key, K group, String... messageIds) { + return createMono(() -> commandBuilder.xack(key, group, messageIds)); + } + + @Override + public Mono xadd(K key, Map body) { + return createMono(() -> commandBuilder.xadd(key, null, body)); + } + + @Override + public Mono xadd(K key, XAddArgs args, Map body) { + return createMono(() -> commandBuilder.xadd(key, args, body)); + } + + @Override + public Mono xadd(K key, Object... keysAndValues) { + return createMono(() -> commandBuilder.xadd(key, null, keysAndValues)); + } + + @Override + public Mono xadd(K key, XAddArgs args, Object... keysAndValues) { + return createMono(() -> commandBuilder.xadd(key, args, keysAndValues)); + } + + @Override + public Flux> xclaim(K key, Consumer consumer, long minIdleTime, String... messageIds) { + return createDissolvingFlux(() -> commandBuilder.xclaim(key, consumer, XClaimArgs.Builder.minIdleTime(minIdleTime), + messageIds)); + } + + @Override + public Flux> xclaim(K key, Consumer consumer, XClaimArgs args, String... messageIds) { + return createDissolvingFlux(() -> commandBuilder.xclaim(key, consumer, args, messageIds)); + } + + @Override + public Mono xdel(K key, String... messageIds) { + return createMono(() -> commandBuilder.xdel(key, messageIds)); + } + + @Override + public Mono xgroupCreate(K key, K group, String offset) { + return createMono(() -> commandBuilder.xgroupCreate(key, group, offset)); + } + + @Override + public Mono xgroupDelconsumer(K key, Consumer consumer) { + throw new UnsupportedOperationException("Not supported yet"); + } + + @Override + public Mono xgroupSetid(K key, K group, String offset) { + throw new UnsupportedOperationException("Not supported yet"); + } + + @Override + public Mono xlen(K key) { + return createMono(() -> commandBuilder.xlen(key)); + } + + @Override + public Flux xpending(K key, K group) { + return xpending(key, group, Range.unbounded(), Limit.unlimited()); + } + + @Override + public Flux xpending(K key, K group, Range range, Limit limit) { + return createDissolvingFlux(() -> commandBuilder.xpending(key, group, range, limit)); + } + + @Override + public Flux xpending(K key, Consumer consumer, Range range, Limit limit) { + return createDissolvingFlux(() -> commandBuilder.xpending(key, consumer, range, limit)); + } + + @Override + public Flux> xrange(K key, Range range) { + return createDissolvingFlux(() -> commandBuilder.xrange(key, range, Limit.unlimited())); + } + + @Override + public Flux> xrange(K key, Range range, Limit limit) { + return createDissolvingFlux(() -> commandBuilder.xrange(key, range, limit)); + } + + @Override + public Flux> xread(XReadArgs.StreamOffset... streams) { + return createDissolvingFlux(() -> commandBuilder.xread(null, streams)); + } + + @Override + public Flux> xread(XReadArgs args, XReadArgs.StreamOffset... streams) { + return createDissolvingFlux(() -> commandBuilder.xread(args, streams)); + } + + @Override + public Flux> xreadgroup(Consumer consumer, XReadArgs.StreamOffset... streams) { + return createDissolvingFlux(() -> commandBuilder.xreadgroup(consumer, null, streams)); + } + + @Override + public Flux> xreadgroup(Consumer consumer, XReadArgs args, XReadArgs.StreamOffset... streams) { + return createDissolvingFlux(() -> commandBuilder.xreadgroup(consumer, args, streams)); + } + + @Override + public Flux> xrevrange(K key, Range range) { + return xrevrange(key, range, Limit.unlimited()); + } + + @Override + public Flux> xrevrange(K key, Range range, Limit limit) { + return createDissolvingFlux(() -> commandBuilder.xrevrange(key, range, limit)); + } + + @Override + public Mono xtrim(K key, long count) { + return createMono(() -> commandBuilder.xtrim(key, count)); + } + @Override public Mono zadd(K key, double score, V member) { return createMono(() -> commandBuilder.zadd(key, null, score, member)); diff --git a/src/main/java/com/lambdaworks/redis/Consumer.java b/src/main/java/io/lettuce/core/Consumer.java similarity index 93% rename from src/main/java/com/lambdaworks/redis/Consumer.java rename to src/main/java/io/lettuce/core/Consumer.java index 6550f60607..2a80616d2f 100644 --- a/src/main/java/com/lambdaworks/redis/Consumer.java +++ b/src/main/java/io/lettuce/core/Consumer.java @@ -13,18 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis; +package io.lettuce.core; import java.util.Objects; -import com.lambdaworks.redis.internal.LettuceAssert; +import io.lettuce.core.internal.LettuceAssert; /** * Value object representing a Stream consumer within a consumer group. Group name and consumer name are encoded as keys. * * @author Mark Paluch - * @since 4.5 - * @see com.lambdaworks.redis.codec.RedisCodec + * @since 5.1 + * @see io.lettuce.core.codec.RedisCodec */ public class Consumer { diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index 2eddb97c90..5959c02afc 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -22,8 +22,7 @@ import java.nio.ByteBuffer; import java.util.*; -import com.lambdaworks.redis.Range.Boundary; - +import io.lettuce.core.Range.Boundary; import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.codec.Utf8StringCodec; @@ -2052,8 +2051,8 @@ public Command xack(K key, K group, String[] messageIds) { return createCommand(XACK, new IntegerOutput<>(codec), args); } - public Command>> xclaim(K key, Consumer consumer, String[] messageIds, - XClaimArgs xClaimArgs) { + public Command>> xclaim(K key, Consumer consumer, XClaimArgs xClaimArgs, + String[] messageIds) { notNullKey(key); LettuceAssert.notNull(consumer, "Consumer " + MUST_NOT_BE_NULL); @@ -2248,7 +2247,7 @@ private static String getUpperValue(Range range) { return range.getUpper().getValue(); } - public Command>> xread(StreamOffset[] streams, XReadArgs xReadArgs) { + public Command>> xread(XReadArgs xReadArgs, StreamOffset[] streams) { LettuceAssert.notNull(streams, "Streams " + MUST_NOT_BE_NULL); LettuceAssert.isTrue(streams.length > 0, "Streams " + MUST_NOT_BE_EMPTY); @@ -2271,8 +2270,8 @@ public Command>> xread(StreamOffset[] streams, return createCommand(XREAD, new StreamReadOutput<>(codec), args); } - public Command>> xreadgroup(Consumer consumer, XReadArgs.StreamOffset[] streams, - XReadArgs xReadArgs) { + public Command>> xreadgroup(Consumer consumer, XReadArgs xReadArgs, + StreamOffset[] streams) { LettuceAssert.notNull(streams, "Streams " + MUST_NOT_BE_NULL); LettuceAssert.isTrue(streams.length > 0, "Streams " + MUST_NOT_BE_EMPTY); LettuceAssert.notNull(consumer, "Consumer " + MUST_NOT_BE_NULL); diff --git a/src/main/java/io/lettuce/core/StreamMessage.java b/src/main/java/io/lettuce/core/StreamMessage.java index 42cfe14065..2509410353 100644 --- a/src/main/java/io/lettuce/core/StreamMessage.java +++ b/src/main/java/io/lettuce/core/StreamMessage.java @@ -22,7 +22,7 @@ * A stream message and its id. * * @author Mark Paluch - * @since 4.5 + * @since 5.1 */ public class StreamMessage { diff --git a/src/main/java/io/lettuce/core/XAddArgs.java b/src/main/java/io/lettuce/core/XAddArgs.java index 3b0ce91a3d..74700b0997 100644 --- a/src/main/java/io/lettuce/core/XAddArgs.java +++ b/src/main/java/io/lettuce/core/XAddArgs.java @@ -15,29 +15,41 @@ */ package io.lettuce.core; -import com.lambdaworks.redis.internal.LettuceAssert; -import com.lambdaworks.redis.protocol.CommandArgs; -import com.lambdaworks.redis.protocol.CommandKeyword; +import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.protocol.CommandArgs; +import io.lettuce.core.protocol.CommandKeyword; /** - * Args for the {@literal XADD} command. + * Argument list builder for the Redis XADD command. Static import the methods from + * {@link Builder} and call the methods: {@code maxlen(…)} . + *

+ * {@link XAddArgs} is a mutable object and instances should be used only once to avoid shared mutable state. * * @author Mark Paluch + * @since 5.1 */ public class XAddArgs { private String id; private Long maxlen; + /** + * Builder entry points for {@link XAddArgs}. + */ public static class Builder { /** * Utility constructor. */ private Builder() { - } + /** + * Creates new {@link XAddArgs} and setting {@literal MAXLEN}. + * + * @return new {@link XAddArgs} with {@literal MAXLEN} set. + * @see XAddArgs#maxlen(long) + */ public static XAddArgs maxlen(long count) { return new XAddArgs().maxlen(count); } @@ -50,7 +62,9 @@ public static XAddArgs maxlen(long count) { * @return {@code this} */ public XAddArgs id(String id) { + LettuceAssert.notNull(id, "Id must not be null"); + this.id = id; return this; } @@ -62,12 +76,15 @@ public XAddArgs id(String id) { * @return {@code this} */ public XAddArgs maxlen(long maxlen) { + LettuceAssert.isTrue(maxlen > 0, "Maxlen must be greater 0"); + this.maxlen = maxlen; return this; } public void build(CommandArgs args) { + if (maxlen != null) { args.add(CommandKeyword.MAXLEN).add(maxlen); } diff --git a/src/main/java/com/lambdaworks/redis/XClaimArgs.java b/src/main/java/io/lettuce/core/XClaimArgs.java similarity index 96% rename from src/main/java/com/lambdaworks/redis/XClaimArgs.java rename to src/main/java/io/lettuce/core/XClaimArgs.java index 9e41d1774a..f69a11d4de 100644 --- a/src/main/java/com/lambdaworks/redis/XClaimArgs.java +++ b/src/main/java/io/lettuce/core/XClaimArgs.java @@ -13,16 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis; +package io.lettuce.core; import java.time.Duration; import java.time.Instant; import java.time.temporal.TemporalAccessor; -import com.lambdaworks.redis.internal.LettuceAssert; -import com.lambdaworks.redis.protocol.CommandArgs; -import com.lambdaworks.redis.protocol.CommandKeyword; -import com.lambdaworks.redis.protocol.CommandType; +import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.protocol.CommandArgs; +import io.lettuce.core.protocol.CommandKeyword; +import io.lettuce.core.protocol.CommandType; /** * Argument list builder for the Redis XCLAIM command. Static import the methods @@ -31,7 +31,7 @@ * {@link XClaimArgs} is a mutable object and instances should be used only once to avoid shared mutable state. * * @author Mark Paluch - * @since 4.5 + * @since 5.1 */ public class XClaimArgs { diff --git a/src/main/java/io/lettuce/core/XReadArgs.java b/src/main/java/io/lettuce/core/XReadArgs.java index 42c660a9d0..a30e1a8d64 100644 --- a/src/main/java/io/lettuce/core/XReadArgs.java +++ b/src/main/java/io/lettuce/core/XReadArgs.java @@ -15,14 +15,20 @@ */ package io.lettuce.core; -import com.lambdaworks.redis.internal.LettuceAssert; -import com.lambdaworks.redis.protocol.CommandArgs; -import com.lambdaworks.redis.protocol.CommandKeyword; +import java.time.Duration; + +import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.protocol.CommandArgs; +import io.lettuce.core.protocol.CommandKeyword; /** - * Args for the {@literal XREAD} command. + * Argument list builder for the Redis XREAD and {@literal XREADGROUP} commands. + * Static import the methods from {@link XReadArgs.Builder} and call the methods: {@code block(…)} . + *

+ * {@link XReadArgs} is a mutable object and instances should be used only once to avoid shared mutable state. * * @author Mark Paluch + * @since 5.1 */ public class XReadArgs { @@ -30,39 +36,99 @@ public class XReadArgs { private Long count; private boolean noack; + /** + * Builder entry points for {@link XReadArgs}. + */ public static class Builder { /** * Utility constructor. */ private Builder() { - } + /** + * Create a new {@link XReadArgs} and set {@literal BLOCK}. + * + * @param milliseconds time to block. + * @return new {@link XReadArgs} with {@literal BLOCK} set. + * @see XReadArgs#block(long) + */ public static XReadArgs block(long milliseconds) { return new XReadArgs().block(milliseconds); } + /** + * Create a new {@link XReadArgs} and set {@literal BLOCK}. + * + * @param timeout time to block. + * @return new {@link XReadArgs} with {@literal BLOCK} set. + * @see XReadArgs#block(Duration) + */ + public static XReadArgs block(Duration timeout) { + + LettuceAssert.notNull(timeout, "Block timeout must not be null"); + + return block(timeout.toMillis()); + } + + /** + * Create a new {@link XReadArgs} and set {@literal COUNT}. + * + * @param count + * @return new {@link XReadArgs} with {@literal COUNT} set. + */ public static XReadArgs count(long count) { return new XReadArgs().count(count); } + /** + * Create a new {@link XReadArgs} and set {@literal NOACK}. + * + * @return new {@link XReadArgs} with {@literal NOACK} set. + * @see XReadArgs#noack(boolean) + */ + public static XReadArgs noack() { + return noack(true); + } + + /** + * Create a new {@link XReadArgs} and set {@literal NOACK}. + * + * @param noack + * @return new {@link XReadArgs} with {@literal NOACK} set. + * @see XReadArgs#noack(boolean) + */ public static XReadArgs noack(boolean noack) { return new XReadArgs().noack(noack); } } /** - * Wait up to {@code milliseconds} for a new stream message. + * Perform a blocking read and wait up to {@code milliseconds} for a new stream message. * * @param milliseconds max time to wait. * @return {@code this}. */ public XReadArgs block(long milliseconds) { + this.block = milliseconds; return this; } + /** + * Perform a blocking read and wait up to a {@link Duration timeout} for a new stream message. + * + * @param timeout max time to wait. + * @return {@code this}. + */ + public XReadArgs block(Duration timeout) { + + LettuceAssert.notNull(timeout, "Block timeout must not be null"); + + return block(timeout.toMillis()); + } + /** * Limit read to {@code count} messages. * @@ -70,17 +136,19 @@ public XReadArgs block(long milliseconds) { * @return {@code this}. */ public XReadArgs count(long count) { + this.count = count; return this; } /** - * Use NOACK option to disable auto-acknowledgement. + * Use NOACK option to disable auto-acknowledgement. Only valid for {@literal XREADGROUP}. * * @param noack {@literal true} to disable auto-ack. * @return {@code this}. */ public XReadArgs noack(boolean noack) { + this.noack = noack; return this; } @@ -101,7 +169,7 @@ public void build(CommandArgs args) { } /** - * Value object representing a Stream consumer group. + * Value object representing a Stream with its offset. */ public static class StreamOffset { @@ -127,12 +195,13 @@ public static StreamOffset latest(K name) { } /** - * Read all new arriving elements from the stream identified by {@code name}. + * Read all new arriving elements from the stream identified by {@code name} with ids greater than the last one consumed + * by the consumer group. * * @param name must not be {@literal null}. * @return the {@link StreamOffset} object without a specific offset. */ - public static StreamOffset latestConsumer(K name) { + public static StreamOffset lastConsumed(K name) { LettuceAssert.notNull(name, "Stream must not be null"); @@ -153,6 +222,18 @@ public static StreamOffset from(K name, String offset) { return new StreamOffset<>(name, offset); } - } + public K getName() { + return name; + } + + public String getOffset() { + return offset; + } + + @Override + public String toString() { + return String.format("%s:%s", name, offset); + } + } } diff --git a/src/main/java/io/lettuce/core/api/async/RedisAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/RedisAsyncCommands.java index 21b231c32f..bb8b27c10f 100644 --- a/src/main/java/io/lettuce/core/api/async/RedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/RedisAsyncCommands.java @@ -27,11 +27,11 @@ * @author Mark Paluch * @since 3.0 */ -public interface RedisAsyncCommands extends RedisHashAsyncCommands, RedisKeyAsyncCommands, - RedisStringAsyncCommands, RedisListAsyncCommands, RedisSetAsyncCommands, - RedisSortedSetAsyncCommands, RedisScriptingAsyncCommands, RedisServerAsyncCommands, - RedisHLLAsyncCommands, BaseRedisAsyncCommands, RedisClusterAsyncCommands, - RedisTransactionalAsyncCommands, RedisGeoAsyncCommands { +public interface RedisAsyncCommands extends BaseRedisAsyncCommands, RedisClusterAsyncCommands, + RedisGeoAsyncCommands, RedisHashAsyncCommands, RedisHLLAsyncCommands, RedisKeyAsyncCommands, + RedisListAsyncCommands, RedisScriptingAsyncCommands, RedisServerAsyncCommands, + RedisSetAsyncCommands, RedisSortedSetAsyncCommands, RedisStreamAsyncCommands, + RedisStringAsyncCommands, RedisTransactionalAsyncCommands { /** * Authenticate to the server. diff --git a/src/main/java/com/lambdaworks/redis/api/async/RedisStreamAsyncCommands.java b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java similarity index 97% rename from src/main/java/com/lambdaworks/redis/api/async/RedisStreamAsyncCommands.java rename to src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java index e6296385f2..6e80b8cc27 100644 --- a/src/main/java/com/lambdaworks/redis/api/async/RedisStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/api/async/RedisStreamAsyncCommands.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,13 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.api.async; +package io.lettuce.core.api.async; import java.util.List; import java.util.Map; -import com.lambdaworks.redis.*; -import com.lambdaworks.redis.XReadArgs.StreamOffset; +import io.lettuce.core.*; +import io.lettuce.core.XReadArgs.StreamOffset; /** * Asynchronous executed commands for Streams. @@ -27,8 +27,8 @@ * @param Key type. * @param Value type. * @author Mark Paluch - * @since 4.5 - * @generated by com.lambdaworks.apigenerator.CreateAsyncApi + * @since 5.1 + * @generated by io.lettuce.apigenerator.CreateAsyncApi */ public interface RedisStreamAsyncCommands { diff --git a/src/main/java/io/lettuce/core/api/reactive/RedisReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/RedisReactiveCommands.java index 78a1c4486e..70c5c8aee1 100644 --- a/src/main/java/io/lettuce/core/api/reactive/RedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/RedisReactiveCommands.java @@ -27,11 +27,11 @@ * @author Mark Paluch * @since 5.0 */ -public interface RedisReactiveCommands extends RedisHashReactiveCommands, RedisKeyReactiveCommands, - RedisStringReactiveCommands, RedisListReactiveCommands, RedisSetReactiveCommands, - RedisSortedSetReactiveCommands, RedisScriptingReactiveCommands, RedisServerReactiveCommands, - RedisHLLReactiveCommands, BaseRedisReactiveCommands, RedisClusterReactiveCommands, - RedisTransactionalReactiveCommands, RedisGeoReactiveCommands { +public interface RedisReactiveCommands extends BaseRedisReactiveCommands, RedisClusterReactiveCommands, + RedisGeoReactiveCommands, RedisHashReactiveCommands, RedisHLLReactiveCommands, + RedisKeyReactiveCommands, RedisListReactiveCommands, RedisScriptingReactiveCommands, + RedisServerReactiveCommands, RedisSetReactiveCommands, RedisSortedSetReactiveCommands, + RedisStreamReactiveCommands, RedisStringReactiveCommands, RedisTransactionalReactiveCommands { /** * Authenticate to the server. diff --git a/src/main/java/com/lambdaworks/redis/api/rx/RedisStreamReactiveCommands.java b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java similarity index 77% rename from src/main/java/com/lambdaworks/redis/api/rx/RedisStreamReactiveCommands.java rename to src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java index 41fa85fabc..50e9e1d2aa 100644 --- a/src/main/java/com/lambdaworks/redis/api/rx/RedisStreamReactiveCommands.java +++ b/src/main/java/io/lettuce/core/api/reactive/RedisStreamReactiveCommands.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,23 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.api.rx; +package io.lettuce.core.api.reactive; import java.util.Map; -import rx.Observable; - -import com.lambdaworks.redis.*; -import com.lambdaworks.redis.XReadArgs.StreamOffset; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import io.lettuce.core.*; +import io.lettuce.core.XReadArgs.StreamOffset; /** - * Observable commands for Streams. + * Reactive executed commands for Streams. * * @param Key type. * @param Value type. * @author Mark Paluch - * @since 4.5 - * @generated by com.lambdaworks.apigenerator.CreateReactiveApi + * @since 5.1 + * @generated by io.lettuce.apigenerator.CreateReactiveApi */ public interface RedisStreamReactiveCommands { @@ -41,7 +41,7 @@ public interface RedisStreamReactiveCommands { * @param messageIds message Id's to acknowledge. * @return simple-reply the lenght of acknowledged messages. */ - Observable xack(K key, K group, String... messageIds); + Mono xack(K key, K group, String... messageIds); /** * Append a message to the stream {@code key}. @@ -50,7 +50,7 @@ public interface RedisStreamReactiveCommands { * @param body message body. * @return simple-reply the message Id. */ - Observable xadd(K key, Map body); + Mono xadd(K key, Map body); /** * Append a message to the stream {@code key}. @@ -60,7 +60,7 @@ public interface RedisStreamReactiveCommands { * @param body message body. * @return simple-reply the message Id. */ - Observable xadd(K key, XAddArgs args, Map body); + Mono xadd(K key, XAddArgs args, Map body); /** * Append a message to the stream {@code key}. @@ -69,7 +69,7 @@ public interface RedisStreamReactiveCommands { * @param keysAndValues message body. * @return simple-reply the message Id. */ - Observable xadd(K key, Object... keysAndValues); + Mono xadd(K key, Object... keysAndValues); /** * Append a message to the stream {@code key}. @@ -79,7 +79,7 @@ public interface RedisStreamReactiveCommands { * @param keysAndValues message body. * @return simple-reply the message Id. */ - Observable xadd(K key, XAddArgs args, Object... keysAndValues); + Mono xadd(K key, XAddArgs args, Object... keysAndValues); /** * Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group. @@ -90,7 +90,7 @@ public interface RedisStreamReactiveCommands { * @param messageIds message Id's to claim. * @return simple-reply the {@link StreamMessage} */ - Observable> xclaim(K key, Consumer consumer, long minIdleTime, String... messageIds); + Flux> xclaim(K key, Consumer consumer, long minIdleTime, String... messageIds); /** * Gets ownership of one or multiple messages in the Pending Entries List of a given stream consumer group. @@ -101,7 +101,7 @@ public interface RedisStreamReactiveCommands { * @param messageIds message Id's to claim. * @return simple-reply the {@link StreamMessage} */ - Observable> xclaim(K key, Consumer consumer, XClaimArgs args, String... messageIds); + Flux> xclaim(K key, Consumer consumer, XClaimArgs args, String... messageIds); /** * Removes the specified entries from the stream. Returns the number of items deleted, that may be different from the number @@ -111,7 +111,7 @@ public interface RedisStreamReactiveCommands { * @param messageIds stream message Id's. * @return simple-reply number of removed entries. */ - Observable xdel(K key, String... messageIds); + Mono xdel(K key, String... messageIds); /** * Create a consumer group. @@ -121,7 +121,7 @@ public interface RedisStreamReactiveCommands { * @param offset read offset or {@literal $}. * @return simple-reply {@literal true} if successful. */ - Observable xgroupCreate(K key, K group, String offset); + Mono xgroupCreate(K key, K group, String offset); /** * Delete a consumer from a consumer group. @@ -130,7 +130,7 @@ public interface RedisStreamReactiveCommands { * @param consumer consumer identified by group name and consumer key. * @return simple-reply the number of pending messages */ - Observable xgroupDelconsumer(K key, Consumer consumer); + Mono xgroupDelconsumer(K key, Consumer consumer); /** * Set the current {@code group} id. @@ -140,7 +140,7 @@ public interface RedisStreamReactiveCommands { * @param offset read offset or {@literal $}. * @return simple-reply the lenght of the stream. */ - Observable xgroupSetid(K key, K group, String offset); + Mono xgroupSetid(K key, K group, String offset); /** * Get the length of a steam. @@ -148,7 +148,7 @@ public interface RedisStreamReactiveCommands { * @param key the stream key. * @return simple-reply the lenght of the stream. */ - Observable xlen(K key); + Mono xlen(K key); /** * Read pending messages from a stream for a {@code group}. @@ -157,7 +157,7 @@ public interface RedisStreamReactiveCommands { * @param group name of the consumer group. * @return Object array-reply list pending entries. */ - Observable xpending(K key, K group); + Flux xpending(K key, K group); /** * Read pending messages from a stream within a specific {@link Range}. @@ -168,7 +168,7 @@ public interface RedisStreamReactiveCommands { * @param limit must not be {@literal null}. * @return Object array-reply list with members of the resulting stream. */ - Observable xpending(K key, K group, Range range, Limit limit); + Flux xpending(K key, K group, Range range, Limit limit); /** * Read pending messages from a stream within a specific {@link Range}. @@ -179,7 +179,7 @@ public interface RedisStreamReactiveCommands { * @param limit must not be {@literal null}. * @return Object array-reply list with members of the resulting stream. */ - Observable xpending(K key, Consumer consumer, Range range, Limit limit); + Flux xpending(K key, Consumer consumer, Range range, Limit limit); /** * Read messages from a stream within a specific {@link Range}. @@ -188,7 +188,7 @@ public interface RedisStreamReactiveCommands { * @param range must not be {@literal null}. * @return StreamMessage array-reply list with members of the resulting stream. */ - Observable> xrange(K key, Range range); + Flux> xrange(K key, Range range); /** * Read messages from a stream within a specific {@link Range} applying a {@link Limit}. @@ -198,7 +198,7 @@ public interface RedisStreamReactiveCommands { * @param limit must not be {@literal null}. * @return StreamMessage array-reply list with members of the resulting stream. */ - Observable> xrange(K key, Range range, Limit limit); + Flux> xrange(K key, Range range, Limit limit); /** * Read messages from one or more {@link StreamOffset}s. @@ -206,7 +206,7 @@ public interface RedisStreamReactiveCommands { * @param streams the streams to read from. * @return StreamMessage array-reply list with members of the resulting stream. */ - Observable> xread(StreamOffset... streams); + Flux> xread(StreamOffset... streams); /** * Read messages from one or more {@link StreamOffset}s. @@ -215,7 +215,7 @@ public interface RedisStreamReactiveCommands { * @param streams the streams to read from. * @return StreamMessage array-reply list with members of the resulting stream. */ - Observable> xread(XReadArgs args, StreamOffset... streams); + Flux> xread(XReadArgs args, StreamOffset... streams); /** * Read messages from one or more {@link StreamOffset}s using a consumer group. @@ -224,7 +224,7 @@ public interface RedisStreamReactiveCommands { * @param streams the streams to read from. * @return StreamMessage array-reply list with members of the resulting stream. */ - Observable> xreadgroup(Consumer consumer, StreamOffset... streams); + Flux> xreadgroup(Consumer consumer, StreamOffset... streams); /** * Read messages from one or more {@link StreamOffset}s using a consumer group. @@ -234,7 +234,7 @@ public interface RedisStreamReactiveCommands { * @param streams the streams to read from. * @return StreamMessage array-reply list with members of the resulting stream. */ - Observable> xreadgroup(Consumer consumer, XReadArgs args, StreamOffset... streams); + Flux> xreadgroup(Consumer consumer, XReadArgs args, StreamOffset... streams); /** * Read messages from a stream within a specific {@link Range} in reverse order. @@ -243,7 +243,7 @@ public interface RedisStreamReactiveCommands { * @param range must not be {@literal null}. * @return StreamMessage array-reply list with members of the resulting stream. */ - Observable> xrevrange(K key, Range range); + Flux> xrevrange(K key, Range range); /** * Read messages from a stream within a specific {@link Range} applying a {@link Limit} in reverse order. @@ -253,7 +253,7 @@ public interface RedisStreamReactiveCommands { * @param limit must not be {@literal null}. * @return StreamMessage array-reply list with members of the resulting stream. */ - Observable> xrevrange(K key, Range range, Limit limit); + Flux> xrevrange(K key, Range range, Limit limit); /** * Trims the stream to {@code count} elements. @@ -262,5 +262,5 @@ public interface RedisStreamReactiveCommands { * @param count length of the stream. * @return simple-reply number of removed entries. */ - Observable xtrim(K key, long count); + Mono xtrim(K key, long count); } diff --git a/src/main/java/io/lettuce/core/api/sync/RedisCommands.java b/src/main/java/io/lettuce/core/api/sync/RedisCommands.java index 035c3d61c1..f93a46b29c 100644 --- a/src/main/java/io/lettuce/core/api/sync/RedisCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/RedisCommands.java @@ -27,10 +27,10 @@ * @author Mark Paluch * @since 3.0 */ -public interface RedisCommands extends RedisHashCommands, RedisKeyCommands, RedisStringCommands, - RedisListCommands, RedisSetCommands, RedisSortedSetCommands, RedisScriptingCommands, - RedisServerCommands, RedisHLLCommands, BaseRedisCommands, RedisClusterCommands, - RedisTransactionalCommands, RedisGeoCommands { +public interface RedisCommands extends BaseRedisCommands, RedisClusterCommands, RedisGeoCommands, + RedisHashCommands, RedisHLLCommands, RedisKeyCommands, RedisListCommands, + RedisScriptingCommands, RedisServerCommands, RedisSetCommands, RedisSortedSetCommands, + RedisStreamCommands, RedisStringCommands, RedisTransactionalCommands { /** * Authenticate to the server. diff --git a/src/main/java/com/lambdaworks/redis/api/sync/RedisStreamCommands.java b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java similarity index 97% rename from src/main/java/com/lambdaworks/redis/api/sync/RedisStreamCommands.java rename to src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java index c48fdf2cae..984c57e639 100644 --- a/src/main/java/com/lambdaworks/redis/api/sync/RedisStreamCommands.java +++ b/src/main/java/io/lettuce/core/api/sync/RedisStreamCommands.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,13 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.api.sync; +package io.lettuce.core.api.sync; import java.util.List; import java.util.Map; -import com.lambdaworks.redis.*; -import com.lambdaworks.redis.XReadArgs.StreamOffset; +import io.lettuce.core.*; +import io.lettuce.core.XReadArgs.StreamOffset; /** * Synchronous executed commands for Streams. @@ -27,8 +27,8 @@ * @param Key type. * @param Value type. * @author Mark Paluch - * @since 4.5 - * @generated by com.lambdaworks.apigenerator.CreateSyncApi + * @since 5.1 + * @generated by io.lettuce.apigenerator.CreateSyncApi */ public interface RedisStreamCommands { diff --git a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionAsyncCommands.java index a24cda3419..905e2fb0a1 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionAsyncCommands.java @@ -16,6 +16,7 @@ package io.lettuce.core.cluster.api.async; import io.lettuce.core.cluster.api.NodeSelectionSupport; +import io.lettuce.core.cluster.api.sync.NodeSelectionStreamCommands; /** * Asynchronous and thread-safe Redis API to execute commands on a {@link NodeSelectionSupport}. @@ -23,9 +24,8 @@ * @author Mark Paluch */ public interface NodeSelectionAsyncCommands extends BaseNodeSelectionAsyncCommands, - NodeSelectionHashAsyncCommands, NodeSelectionHLLAsyncCommands, NodeSelectionKeyAsyncCommands, - NodeSelectionListAsyncCommands, NodeSelectionScriptingAsyncCommands, + NodeSelectionGeoAsyncCommands, NodeSelectionHashAsyncCommands, NodeSelectionHLLAsyncCommands, + NodeSelectionKeyAsyncCommands, NodeSelectionListAsyncCommands, NodeSelectionScriptingAsyncCommands, NodeSelectionServerAsyncCommands, NodeSelectionSetAsyncCommands, NodeSelectionSortedSetAsyncCommands, - NodeSelectionStringAsyncCommands, NodeSelectionGeoAsyncCommands { - + NodeSelectionStreamCommands, NodeSelectionStringAsyncCommands { } diff --git a/src/main/java/com/lambdaworks/redis/cluster/api/async/NodeSelectionStreamAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java similarity index 97% rename from src/main/java/com/lambdaworks/redis/cluster/api/async/NodeSelectionStreamAsyncCommands.java rename to src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java index 3d20685574..393a684fcd 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/api/async/NodeSelectionStreamAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/NodeSelectionStreamAsyncCommands.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,13 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.cluster.api.async; +package io.lettuce.core.cluster.api.async; import java.util.List; import java.util.Map; -import com.lambdaworks.redis.*; -import com.lambdaworks.redis.XReadArgs.StreamOffset; +import io.lettuce.core.*; +import io.lettuce.core.XReadArgs.StreamOffset; /** * Asynchronous executed commands on a node selection for Streams. @@ -27,8 +27,8 @@ * @param Key type. * @param Value type. * @author Mark Paluch - * @since 4.5 - * @generated by com.lambdaworks.apigenerator.CreateAsyncNodeSelectionClusterApi + * @since 5.1 + * @generated by io.lettuce.apigenerator.CreateAsyncNodeSelectionClusterApi */ public interface NodeSelectionStreamAsyncCommands { diff --git a/src/main/java/io/lettuce/core/cluster/api/async/RedisClusterAsyncCommands.java b/src/main/java/io/lettuce/core/cluster/api/async/RedisClusterAsyncCommands.java index b2cb494487..cfd2eb2d5c 100644 --- a/src/main/java/io/lettuce/core/cluster/api/async/RedisClusterAsyncCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/async/RedisClusterAsyncCommands.java @@ -32,11 +32,10 @@ * @author Mark Paluch * @since 4.0 */ -public interface RedisClusterAsyncCommands - extends RedisHashAsyncCommands, RedisKeyAsyncCommands, RedisStringAsyncCommands, - RedisListAsyncCommands, RedisSetAsyncCommands, RedisSortedSetAsyncCommands, - RedisScriptingAsyncCommands, RedisServerAsyncCommands, RedisHLLAsyncCommands, - RedisGeoAsyncCommands, BaseRedisAsyncCommands { +public interface RedisClusterAsyncCommands extends RedisHashAsyncCommands, RedisKeyAsyncCommands, + RedisStringAsyncCommands, RedisListAsyncCommands, RedisSetAsyncCommands, + RedisSortedSetAsyncCommands, RedisScriptingAsyncCommands, RedisServerAsyncCommands, + RedisHLLAsyncCommands, RedisGeoAsyncCommands, RedisStreamAsyncCommands, BaseRedisAsyncCommands { /** * Set the default timeout for operations. diff --git a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionCommands.java index 8926614f56..0a2026c180 100644 --- a/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionCommands.java @@ -22,9 +22,9 @@ * * @author Mark Paluch */ -public interface NodeSelectionCommands extends BaseNodeSelectionCommands, NodeSelectionHashCommands, - NodeSelectionHLLCommands, NodeSelectionKeyCommands, NodeSelectionListCommands, - NodeSelectionScriptingCommands, NodeSelectionServerCommands, NodeSelectionSetCommands, - NodeSelectionSortedSetCommands, NodeSelectionStringCommands, NodeSelectionGeoCommands { - +public interface NodeSelectionCommands extends BaseNodeSelectionCommands, NodeSelectionGeoCommands, + NodeSelectionHashCommands, NodeSelectionHLLCommands, NodeSelectionKeyCommands, + NodeSelectionListCommands, NodeSelectionScriptingCommands, NodeSelectionServerCommands, + NodeSelectionSetCommands, NodeSelectionSortedSetCommands, NodeSelectionStreamCommands, + NodeSelectionStringCommands { } diff --git a/src/main/java/com/lambdaworks/redis/cluster/api/sync/NodeSelectionStreamCommands.java b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java similarity index 97% rename from src/main/java/com/lambdaworks/redis/cluster/api/sync/NodeSelectionStreamCommands.java rename to src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java index 47dfe97957..825b6ad592 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/api/sync/NodeSelectionStreamCommands.java +++ b/src/main/java/io/lettuce/core/cluster/api/sync/NodeSelectionStreamCommands.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,13 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.cluster.api.sync; +package io.lettuce.core.cluster.api.sync; import java.util.List; import java.util.Map; -import com.lambdaworks.redis.*; -import com.lambdaworks.redis.XReadArgs.StreamOffset; +import io.lettuce.core.*; +import io.lettuce.core.XReadArgs.StreamOffset; /** * Synchronous executed commands on a node selection for Streams. @@ -27,8 +27,8 @@ * @param Key type. * @param Value type. * @author Mark Paluch - * @since 4.5 - * @generated by com.lambdaworks.apigenerator.CreateSyncNodeSelectionClusterApi + * @since 5.1 + * @generated by io.lettuce.apigenerator.CreateSyncNodeSelectionClusterApi */ public interface NodeSelectionStreamCommands { diff --git a/src/main/java/com/lambdaworks/redis/models/stream/PendingEntry.java b/src/main/java/io/lettuce/core/models/stream/PendingEntry.java similarity index 88% rename from src/main/java/com/lambdaworks/redis/models/stream/PendingEntry.java rename to src/main/java/io/lettuce/core/models/stream/PendingEntry.java index 6067c5af0d..78c3446031 100644 --- a/src/main/java/com/lambdaworks/redis/models/stream/PendingEntry.java +++ b/src/main/java/io/lettuce/core/models/stream/PendingEntry.java @@ -13,22 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.models.stream; +package io.lettuce.core.models.stream; /** * Value object representing an entry of the Pending Entry List retrieved via {@literal XPENDING}. * * @author Mark Paluch - * @since 4.5 + * @since 5.1 */ public class PendingEntry { - private String messageId; - private String consumer; - private long millisSinceDelivery; - private long deliveryCount; + private final String messageId; + private final String consumer; + private final long millisSinceDelivery; + private final long deliveryCount; public PendingEntry(String messageId, String consumer, long millisSinceDelivery, long deliveryCount) { + this.messageId = messageId; this.consumer = consumer; this.millisSinceDelivery = millisSinceDelivery; diff --git a/src/main/java/com/lambdaworks/redis/models/stream/PendingMessage.java b/src/main/java/io/lettuce/core/models/stream/PendingMessage.java similarity index 96% rename from src/main/java/com/lambdaworks/redis/models/stream/PendingMessage.java rename to src/main/java/io/lettuce/core/models/stream/PendingMessage.java index 56bd88009e..304204f7e7 100644 --- a/src/main/java/com/lambdaworks/redis/models/stream/PendingMessage.java +++ b/src/main/java/io/lettuce/core/models/stream/PendingMessage.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.models.stream; +package io.lettuce.core.models.stream; import java.time.Duration; @@ -21,7 +21,7 @@ * Value object representing a pending message reported through XPENDING with range/limit. * * @author Mark Paluch - * @since 4.5 + * @since 5.1 */ public class PendingMessage { diff --git a/src/main/java/com/lambdaworks/redis/models/stream/PendingMessages.java b/src/main/java/io/lettuce/core/models/stream/PendingMessages.java similarity index 86% rename from src/main/java/com/lambdaworks/redis/models/stream/PendingMessages.java rename to src/main/java/io/lettuce/core/models/stream/PendingMessages.java index 361415e713..ee02875321 100644 --- a/src/main/java/com/lambdaworks/redis/models/stream/PendingMessages.java +++ b/src/main/java/io/lettuce/core/models/stream/PendingMessages.java @@ -13,14 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.models.stream; +package io.lettuce.core.models.stream; import java.util.Map; -import com.lambdaworks.redis.Range; +import io.lettuce.core.Range; /** + * Value object representing the output of the Redis {@literal XPENDING} reporting a summary on pending messages. + * * @author Mark Paluch + * @since 5.1 */ public class PendingMessages { @@ -29,6 +32,7 @@ public class PendingMessages { private final Map consumerMessageCount; public PendingMessages(long count, Range messageIds, Map consumerMessageCount) { + this.count = count; this.messageIds = messageIds; this.consumerMessageCount = consumerMessageCount; diff --git a/src/main/java/com/lambdaworks/redis/models/stream/PendingParser.java b/src/main/java/io/lettuce/core/models/stream/PendingParser.java similarity index 94% rename from src/main/java/com/lambdaworks/redis/models/stream/PendingParser.java rename to src/main/java/io/lettuce/core/models/stream/PendingParser.java index d253bee5a4..d862acad79 100644 --- a/src/main/java/com/lambdaworks/redis/models/stream/PendingParser.java +++ b/src/main/java/io/lettuce/core/models/stream/PendingParser.java @@ -13,18 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.models.stream; +package io.lettuce.core.models.stream; import java.util.*; -import com.lambdaworks.redis.Range; -import com.lambdaworks.redis.internal.LettuceAssert; +import io.lettuce.core.Range; +import io.lettuce.core.internal.LettuceAssert; /** * Parser for redis XPENDING command output. * * @author Mark Paluch - * @since 4.5 + * @since 5.1 */ public class PendingParser { @@ -35,7 +35,7 @@ private PendingParser() { } /** - * Parse the output of the Redis {@literal XPENDING} command with {@link com.lambdaworks.redis.Range}. + * Parse the output of the Redis {@literal XPENDING} command with {@link Range}. * * @param xpendingOutput output of the Redis {@literal XPENDING}. * @return list of {@link PendingMessage}s. diff --git a/src/main/java/io/lettuce/core/models/stream/package-info.java b/src/main/java/io/lettuce/core/models/stream/package-info.java new file mode 100644 index 0000000000..587dad7151 --- /dev/null +++ b/src/main/java/io/lettuce/core/models/stream/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Model and parser for the Stream-related command output such as {@literal XPENDING}. + */ +package io.lettuce.core.models.stream; + diff --git a/src/main/java/com/lambdaworks/redis/output/StreamMessageListOutput.java b/src/main/java/io/lettuce/core/output/StreamMessageListOutput.java similarity index 91% rename from src/main/java/com/lambdaworks/redis/output/StreamMessageListOutput.java rename to src/main/java/io/lettuce/core/output/StreamMessageListOutput.java index 8f5aa63d54..9da9c8ebab 100644 --- a/src/main/java/com/lambdaworks/redis/output/StreamMessageListOutput.java +++ b/src/main/java/io/lettuce/core/output/StreamMessageListOutput.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.output; +package io.lettuce.core.output; import java.nio.ByteBuffer; import java.util.Collections; @@ -21,15 +21,15 @@ import java.util.List; import java.util.Map; -import com.lambdaworks.redis.StreamMessage; -import com.lambdaworks.redis.codec.RedisCodec; -import com.lambdaworks.redis.internal.LettuceAssert; +import io.lettuce.core.StreamMessage; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.internal.LettuceAssert; /** * {@link List} of {@link StreamMessage}s. * * @author Mark Paluch - * @since 4.5 + * @since 5.1 */ public class StreamMessageListOutput extends CommandOutput>> implements StreamingOutput> { diff --git a/src/main/java/io/lettuce/core/output/StreamReadOutput.java b/src/main/java/io/lettuce/core/output/StreamReadOutput.java index 832687d76d..4c1c5456af 100644 --- a/src/main/java/io/lettuce/core/output/StreamReadOutput.java +++ b/src/main/java/io/lettuce/core/output/StreamReadOutput.java @@ -21,14 +21,13 @@ import java.util.List; import java.util.Map; -import com.lambdaworks.redis.codec.RedisCodec; -import com.lambdaworks.redis.internal.LettuceAssert; - import io.lettuce.core.StreamMessage; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.internal.LettuceAssert; /** * @author Mark Paluch - * @since 4.5 + * @since 5.1 */ public class StreamReadOutput extends CommandOutput>> implements StreamingOutput> { diff --git a/src/main/java/io/lettuce/core/protocol/RedisStateMachine.java b/src/main/java/io/lettuce/core/protocol/RedisStateMachine.java index 04fa1b27ee..6e6c2cf681 100644 --- a/src/main/java/io/lettuce/core/protocol/RedisStateMachine.java +++ b/src/main/java/io/lettuce/core/protocol/RedisStateMachine.java @@ -383,7 +383,7 @@ protected void safeSet(CommandOutput output, ByteBuffer bytes, RedisCom * @param bytes * @param command */ - protected void safeSetSingle(CommandOutput output, ByteBuffer bytes, RedisCommand command) { + protected void safeSetSingle(CommandOutput output, ByteBuffer bytes, RedisCommand command) { try { output.set(bytes); diff --git a/src/main/templates/com/lambdaworks/redis/api/RedisStreamCommands.java b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java similarity index 97% rename from src/main/templates/com/lambdaworks/redis/api/RedisStreamCommands.java rename to src/main/templates/io/lettuce/core/api/RedisStreamCommands.java index 914a252405..8f49ec1c9a 100644 --- a/src/main/templates/com/lambdaworks/redis/api/RedisStreamCommands.java +++ b/src/main/templates/io/lettuce/core/api/RedisStreamCommands.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,13 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.api; +package io.lettuce.core.api; import java.util.List; import java.util.Map; -import com.lambdaworks.redis.*; -import com.lambdaworks.redis.XReadArgs.StreamOffset; +import io.lettuce.core.*; +import io.lettuce.core.XReadArgs.StreamOffset; /** * ${intent} for Streams. @@ -27,7 +27,7 @@ * @param Key type. * @param Value type. * @author Mark Paluch - * @since 4.5 + * @since 5.1 */ public interface RedisStreamCommands { diff --git a/src/test/java/io/lettuce/apigenerator/Constants.java b/src/test/java/io/lettuce/apigenerator/Constants.java index d12648b03f..9dbe29d9fb 100644 --- a/src/test/java/io/lettuce/apigenerator/Constants.java +++ b/src/test/java/io/lettuce/apigenerator/Constants.java @@ -22,10 +22,10 @@ */ class Constants { - public final static String[] TEMPLATE_NAMES = { "RedisHashCommands", "RedisHLLCommands", "RedisKeyCommands", - "RedisListCommands", "RedisScriptingCommands", "RedisServerCommands", "RedisSetCommands", "RedisSortedSetCommands", - "RedisStringCommands", "RedisTransactionalCommands", "RedisSentinelCommands", "BaseRedisCommands", - "RedisGeoCommands" }; + public final static String[] TEMPLATE_NAMES = { "BaseRedisCommands", "RedisGeoCommands", "RedisHashCommands", + "RedisHLLCommands", "RedisKeyCommands", "RedisListCommands", "RedisScriptingCommands", "RedisSentinelCommands", + "RedisServerCommands", "RedisSetCommands", "RedisSortedSetCommands", "RedisStreamCommands", "RedisStringCommands", + "RedisTransactionalCommands" }; public final static File TEMPLATES = new File("src/main/templates"); public final static File SOURCES = new File("src/main/java"); diff --git a/src/test/java/com/lambdaworks/redis/commands/StreamCommandTest.java b/src/test/java/io/lettuce/core/commands/StreamCommandTest.java similarity index 90% rename from src/test/java/com/lambdaworks/redis/commands/StreamCommandTest.java rename to src/test/java/io/lettuce/core/commands/StreamCommandTest.java index 45d2057b24..ab98179def 100644 --- a/src/test/java/com/lambdaworks/redis/commands/StreamCommandTest.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,32 +13,41 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.commands; +package io.lettuce.core.commands; -import static com.lambdaworks.redis.protocol.CommandType.XINFO; +import static io.lettuce.core.protocol.CommandType.XINFO; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; import java.time.Instant; import java.util.*; +import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import io.lettuce.core.StreamMessage; -import io.lettuce.core.XAddArgs; +import io.lettuce.RedisConditions; +import io.lettuce.core.*; import io.lettuce.core.XReadArgs.StreamOffset; -import com.lambdaworks.redis.codec.StringCodec; -import com.lambdaworks.redis.models.stream.PendingMessage; -import com.lambdaworks.redis.models.stream.PendingParser; -import com.lambdaworks.redis.output.NestedMultiOutput; -import com.lambdaworks.redis.protocol.CommandArgs; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.models.stream.PendingMessage; +import io.lettuce.core.models.stream.PendingParser; +import io.lettuce.core.output.NestedMultiOutput; +import io.lettuce.core.protocol.CommandArgs; /** * @author Mark Paluch */ public class StreamCommandTest extends AbstractRedisClientTest { + @Before + public void openConnection() throws Exception { + + super.openConnection(); + assumeTrue(RedisConditions.of(redis).hasCommand("XADD")); + } + @Test public void xadd() { @@ -189,11 +198,11 @@ public void xreadTransactional() { redis.xadd("stream-2", Collections.singletonMap("key4", "value4")); redis.xread(StreamOffset.from("stream-1", initial1), XReadArgs.StreamOffset.from("stream-2", initial2)); - List exec = redis.exec(); + TransactionResult exec = redis.exec(); - String message1 = (String) exec.get(0); - String message2 = (String) exec.get(1); - List> messages = (List) exec.get(2); + String message1 = exec.get(0); + String message2 = exec.get(1); + List> messages = exec.get(2); StreamMessage firstMessage = messages.get(0); @@ -229,7 +238,7 @@ public void xgroupread() { redis.xadd(key, Collections.singletonMap("key", "value")); List> read1 = redis.xreadgroup(Consumer.from("group", "consumer1"), - StreamOffset.latestConsumer(key)); + StreamOffset.lastConsumed(key)); assertThat(read1).hasSize(1); } @@ -241,7 +250,7 @@ public void xpendingWithGroup() { redis.xgroupCreate(key, "group", "$"); String id = redis.xadd(key, Collections.singletonMap("key", "value")); - redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.latestConsumer(key)); + redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key)); List pendingEntries = redis.xpending(key, "group"); assertThat(pendingEntries).hasSize(4).containsSequence(1L, id, id); @@ -254,7 +263,7 @@ public void xpending() { redis.xgroupCreate(key, "group", "$"); String id = redis.xadd(key, Collections.singletonMap("key", "value")); - redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.latestConsumer(key)); + redis.xreadgroup(Consumer.from("group", "consumer1"), StreamOffset.lastConsumed(key)); List pendingEntries = redis.xpending(key, "group", Range.unbounded(), Limit.from(10)); @@ -275,7 +284,7 @@ public void xack() { redis.xadd(key, Collections.singletonMap("key", "value")); List> messages = redis.xreadgroup(Consumer.from("group", "consumer1"), - StreamOffset.latestConsumer(key)); + StreamOffset.lastConsumed(key)); Long ackd = redis.xack(key, "group", messages.get(0).getId()); assertThat(ackd).isEqualTo(1); @@ -292,7 +301,7 @@ public void xclaim() { redis.xadd(key, Collections.singletonMap("key", "value")); List> messages = redis.xreadgroup(Consumer.from("group", "consumer1"), - StreamOffset.latestConsumer(key)); + StreamOffset.lastConsumed(key)); List> claimedMessages = redis.xclaim(key, Consumer.from("group", "consumer2"), 0, messages.get(0).getId()); @@ -311,7 +320,7 @@ public void xclaimWithArgs() { String id2 = redis.xadd(key, Collections.singletonMap("key", "value")); List> messages = redis.xreadgroup(Consumer.from("group", "consumer1"), - StreamOffset.latestConsumer(key)); + StreamOffset.lastConsumed(key)); List> claimedMessages = redis.xclaim(key, Consumer.from("group", "consumer2"), XClaimArgs.Builder.minIdleTime(0).time(Instant.now().minusSeconds(60)), id1, id2); @@ -323,7 +332,7 @@ public void xclaimWithArgs() { List pendingMessages = PendingParser.parseRange(xpending); PendingMessage message = pendingMessages.get(0); - assertThat(message.getMsSinceLastDelivery()).isGreaterThan(60000); + assertThat(message.getMsSinceLastDelivery()).isBetween(50000L, 80000L); } @Test diff --git a/src/test/java/com/lambdaworks/redis/commands/rx/StreamRxCommandTest.java b/src/test/java/io/lettuce/core/commands/reactive/StreamReactiveCommandTest.java similarity index 68% rename from src/test/java/com/lambdaworks/redis/commands/rx/StreamRxCommandTest.java rename to src/test/java/io/lettuce/core/commands/reactive/StreamReactiveCommandTest.java index ca43ea2802..b5db1da585 100644 --- a/src/test/java/com/lambdaworks/redis/commands/rx/StreamRxCommandTest.java +++ b/src/test/java/io/lettuce/core/commands/reactive/StreamReactiveCommandTest.java @@ -13,18 +13,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.commands.rx; +package io.lettuce.core.commands.reactive; -import com.lambdaworks.redis.api.sync.RedisCommands; -import com.lambdaworks.redis.commands.StreamCommandTest; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.commands.StreamCommandTest; +import io.lettuce.util.ReactiveSyncInvocationHandler; /** * @author Mark Paluch */ -public class StreamRxCommandTest extends StreamCommandTest { +public class StreamReactiveCommandTest extends StreamCommandTest { @Override protected RedisCommands connect() { - return RxSyncInvocationHandler.sync(client.connectAsync().getStatefulConnection()); + return ReactiveSyncInvocationHandler.sync(client.connect()); } } diff --git a/src/test/java/com/lambdaworks/redis/models/stream/PendingParserTest.java b/src/test/java/io/lettuce/core/models/stream/PendingParserTest.java similarity index 89% rename from src/test/java/com/lambdaworks/redis/models/stream/PendingParserTest.java rename to src/test/java/io/lettuce/core/models/stream/PendingParserTest.java index ff9eecf348..1eeebdf183 100644 --- a/src/test/java/com/lambdaworks/redis/models/stream/PendingParserTest.java +++ b/src/test/java/io/lettuce/core/models/stream/PendingParserTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.lambdaworks.redis.models.stream; +package io.lettuce.core.models.stream; import static org.assertj.core.api.Assertions.assertThat; @@ -24,7 +24,7 @@ import org.junit.Test; -import com.lambdaworks.redis.Range; +import io.lettuce.core.Range; /** * @author Mark Paluch @@ -34,7 +34,8 @@ public class PendingParserTest { @Test public void shouldParseXpendingWithRangeOutput() { - List result = PendingParser.parseRange(Collections.singletonList(Arrays.asList("foo", "consumer", 1L, + List result = PendingParser + .parseRange(Collections.singletonList(Arrays.asList("foo", "consumer", 1L, 2L))); assertThat(result).hasSize(1);