Skip to content

Commit

Permalink
Add support for SORT_RO #2145
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jul 8, 2022
1 parent b9d3a8c commit fb6e0f1
Show file tree
Hide file tree
Showing 17 changed files with 407 additions and 16 deletions.
22 changes: 21 additions & 1 deletion src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1763,7 +1763,27 @@ public RedisFuture<List<V>> sort(K key, SortArgs sortArgs) {

@Override
public RedisFuture<Long> sort(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs) {
return dispatch(commandBuilder.sort(channel, key, sortArgs));
return dispatch(commandBuilder.sortReadOnly(channel, key, sortArgs));
}

@Override
public RedisFuture<List<V>> sortReadOnly(K key) {
return dispatch(commandBuilder.sortReadOnly(key));
}

@Override
public RedisFuture<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key) {
return dispatch(commandBuilder.sortReadOnly(channel, key));
}

@Override
public RedisFuture<List<V>> sortReadOnly(K key, SortArgs sortArgs) {
return dispatch(commandBuilder.sortReadOnly(key, sortArgs));
}

@Override
public RedisFuture<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs) {
return dispatch(commandBuilder.sortReadOnly(channel, key, sortArgs));
}

@Override
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,26 @@ public Mono<Long> sort(ValueStreamingChannel<V> channel, K key, SortArgs sortArg
return createMono(() -> commandBuilder.sort(channel, key, sortArgs));
}

@Override
public Flux<V> sortReadOnly(K key) {
return createDissolvingFlux(() -> commandBuilder.sortReadOnly(key));
}

@Override
public Mono<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key) {
return createMono(() -> commandBuilder.sortReadOnly(channel, key));
}

@Override
public Flux<V> sortReadOnly(K key, SortArgs sortArgs) {
return createDissolvingFlux(() -> commandBuilder.sortReadOnly(key, sortArgs));
}

@Override
public Mono<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs) {
return createMono(() -> commandBuilder.sortReadOnly(channel, key, sortArgs));
}

@Override
public Mono<Long> sortStore(K key, SortArgs sortArgs, K destination) {
return createMono(() -> commandBuilder.sortStore(key, sortArgs, destination));
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2440,6 +2440,38 @@ Command<K, V, Long> sort(ValueStreamingChannel<V> channel, K key, SortArgs sortA
return createCommand(SORT, new ValueStreamingOutput<>(codec, channel), args);
}

Command<K, V, List<V>> sortReadOnly(K key) {
notNullKey(key);

return createCommand(SORT_RO, new ValueListOutput<>(codec), key);
}

Command<K, V, List<V>> sortReadOnly(K key, SortArgs sortArgs) {
notNullKey(key);
LettuceAssert.notNull(sortArgs, "SortArgs " + MUST_NOT_BE_NULL);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key);
sortArgs.build(args, null);
return createCommand(SORT_RO, new ValueListOutput<>(codec), args);
}

Command<K, V, Long> sortReadOnly(ValueStreamingChannel<V> channel, K key) {
notNullKey(key);
notNull(channel);

return createCommand(SORT_RO, new ValueStreamingOutput<>(codec, channel), key);
}

Command<K, V, Long> sortReadOnly(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs) {
notNullKey(key);
notNull(channel);
LettuceAssert.notNull(sortArgs, "SortArgs " + MUST_NOT_BE_NULL);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key);
sortArgs.build(args, null);
return createCommand(SORT_RO, new ValueStreamingOutput<>(codec, channel), args);
}

Command<K, V, Long> sortStore(K key, SortArgs sortArgs, K destination) {
notNullKey(key);
LettuceAssert.notNull(destination, "Destination " + MUST_NOT_BE_NULL);
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/io/lettuce/core/api/async/RedisKeyAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,46 @@ public interface RedisKeyAsyncCommands<K, V> {
*/
RedisFuture<Long> sort(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @return List&lt;V&gt; array-reply list of sorted elements.
* @since 6.2
*/
RedisFuture<List<V>> sortReadOnly(K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @return Long number of values.
* @since 6.2
*/
RedisFuture<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @param sortArgs sort arguments.
* @return List&lt;V&gt; array-reply list of sorted elements.
* @since 6.2
*/
RedisFuture<List<V>> sortReadOnly(K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @param sortArgs sort arguments.
* @return Long number of values.
* @since 6.2
*/
RedisFuture<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import java.time.Instant;
import java.util.Date;

import io.lettuce.core.KeyScanArgs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import io.lettuce.core.CopyArgs;
import io.lettuce.core.KeyScanArgs;
import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.MigrateArgs;
import io.lettuce.core.RestoreArgs;
Expand Down Expand Up @@ -419,6 +419,52 @@ public interface RedisKeyReactiveCommands<K, V> {
@Deprecated
Mono<Long> sort(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @return V array-reply list of sorted elements.
* @since 6.2
*/
Flux<V> sortReadOnly(K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @return Long number of values.
* @since 6.2
* @deprecated since 6.0 in favor of consuming large results through the {@link org.reactivestreams.Publisher} returned by
* {@link #sortReadOnly}.
*/
@Deprecated
Mono<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @param sortArgs sort arguments.
* @return V array-reply list of sorted elements.
* @since 6.2
*/
Flux<V> sortReadOnly(K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @param sortArgs sort arguments.
* @return Long number of values.
* @since 6.2
* @deprecated since 6.0 in favor of consuming large results through the {@link org.reactivestreams.Publisher} returned by
* {@link #sortReadOnly}.
*/
@Deprecated
Mono<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
Expand Down Expand Up @@ -538,5 +584,4 @@ public interface RedisKeyReactiveCommands<K, V> {
*/
@Deprecated
Mono<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor);

}
40 changes: 40 additions & 0 deletions src/main/java/io/lettuce/core/api/sync/RedisKeyCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,46 @@ public interface RedisKeyCommands<K, V> {
*/
Long sort(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @return List&lt;V&gt; array-reply list of sorted elements.
* @since 6.2
*/
List<V> sortReadOnly(K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @return Long number of values.
* @since 6.2
*/
Long sortReadOnly(ValueStreamingChannel<V> channel, K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @param sortArgs sort arguments.
* @return List&lt;V&gt; array-reply list of sorted elements.
* @since 6.2
*/
List<V> sortReadOnly(K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @param sortArgs sort arguments.
* @return Long number of values.
* @since 6.2
*/
Long sortReadOnly(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/lettuce/core/cluster/ReadOnlyCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ enum CommandName {
ASKING, BITCOUNT, BITPOS, CLIENT, COMMAND, DUMP, ECHO, EVAL_RO, EVALSHA_RO, EXISTS, //
GEODIST, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, GEOHASH, GET, GETBIT, //
GETRANGE, HEXISTS, HGET, HGETALL, HKEYS, HLEN, HMGET, HRANDFIELD, HSCAN, HSTRLEN, //
HVALS, INFO, KEYS, LINDEX, LLEN, LPOS, LRANGE, MGET, PFCOUNT, PTTL, //
HVALS, INFO, KEYS, LINDEX, LLEN, LPOS, LRANGE, SORT_RO, MGET, PFCOUNT, PTTL, //
RANDOMKEY, READWRITE, SCAN, SCARD, SCRIPT, //
SDIFF, SINTER, SISMEMBER, SMEMBERS, SRANDMEMBER, SSCAN, STRLEN, //
SUNION, TIME, TTL, TYPE, //
XINFO, XLEN, XPENDING, XRANGE, XREVRANGE, XREAD, //
ZCARD, ZCOUNT, ZLEXCOUNT, ZRANGE, //
ZRANDMEMBER, ZRANGEBYLEX, ZRANGEBYSCORE, ZRANK, ZREVRANGE, ZREVRANGEBYLEX, ZREVRANGEBYSCORE, ZREVRANK, ZSCAN, ZSCORE, //
ZRANDMEMBER, ZRANGEBYLEX, ZRANGEBYSCORE, ZRANK, ZREVRANGE, ZREVRANGEBYLEX, ZREVRANGEBYSCORE, ZREVRANK, ZSCAN, ZSCORE,

// Pub/Sub commands are no key-space commands so they are safe to execute on replica nodes
PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,46 @@ public interface NodeSelectionKeyAsyncCommands<K, V> {
*/
AsyncExecutions<Long> sort(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @return List&lt;V&gt; array-reply list of sorted elements.
* @since 6.2
*/
AsyncExecutions<List<V>> sortReadOnly(K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @return Long number of values.
* @since 6.2
*/
AsyncExecutions<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @param sortArgs sort arguments.
* @return List&lt;V&gt; array-reply list of sorted elements.
* @since 6.2
*/
AsyncExecutions<List<V>> sortReadOnly(K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @param sortArgs sort arguments.
* @return Long number of values.
* @since 6.2
*/
AsyncExecutions<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,46 @@ public interface NodeSelectionKeyCommands<K, V> {
*/
Executions<Long> sort(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @return List&lt;V&gt; array-reply list of sorted elements.
* @since 6.2
*/
Executions<List<V>> sortReadOnly(K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @return Long number of values.
* @since 6.2
*/
Executions<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key);

/**
* Sort the elements in a list, set or sorted set.
*
* @param key the key.
* @param sortArgs sort arguments.
* @return List&lt;V&gt; array-reply list of sorted elements.
* @since 6.2
*/
Executions<List<V>> sortReadOnly(K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
* @param channel streaming channel that receives a call for every value.
* @param key the key.
* @param sortArgs sort arguments.
* @return Long number of values.
* @since 6.2
*/
Executions<Long> sortReadOnly(ValueStreamingChannel<V> channel, K key, SortArgs sortArgs);

/**
* Sort the elements in a list, set or sorted set.
*
Expand Down Expand Up @@ -516,5 +556,4 @@ public interface NodeSelectionKeyCommands<K, V> {
* @return StreamScanCursor scan cursor.
*/
Executions<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor);

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public static Set<CommandType> getReadOnlyCommands() {

enum CommandName {
ASKING, BITCOUNT, BITPOS, CLIENT, COMMAND, DUMP, ECHO, EVAL_RO, EVALSHA_RO, EXISTS, //
GEODIST, GEOPOS, GEORADIUS, GEORADIUSBYMEMBER, GEOHASH, GET, GETBIT, //
GEODIST, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, GEOHASH, GET, GETBIT, //
GETRANGE, HEXISTS, HGET, HGETALL, HKEYS, HLEN, HMGET, HRANDFIELD, HSCAN, HSTRLEN, //
HVALS, INFO, KEYS, LINDEX, LLEN, LPOS, LRANGE, MGET, PFCOUNT, PTTL, //
HVALS, INFO, KEYS, LINDEX, LLEN, LPOS, LRANGE, SORT_RO, MGET, PFCOUNT, PTTL, //
RANDOMKEY, READWRITE, SCAN, SCARD, SCRIPT, //
SDIFF, SINTER, SISMEMBER, SMEMBERS, SRANDMEMBER, SSCAN, STRLEN, //
SUNION, TIME, TTL, TYPE, //
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandType.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public enum CommandType implements ProtocolKeyword {

// List

BLMOVE, BLMPOP, BLPOP, BRPOP, BRPOPLPUSH, LINDEX, LINSERT, LLEN, LMOVE, LMPOP, LPOP, LPOS, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM, RPOP, RPOPLPUSH, RPUSH, RPUSHX, SORT,
BLMOVE, BLMPOP, BLPOP, BRPOP, BRPOPLPUSH, LINDEX, LINSERT, LLEN, LMOVE, LMPOP, LPOP, LPOS, LPUSH, LPUSHX, LRANGE, LREM, LSET, LTRIM, RPOP, RPOPLPUSH, RPUSH, RPUSHX, SORT, SORT_RO,

// Hash

Expand Down
Loading

0 comments on commit fb6e0f1

Please sign in to comment.