Skip to content

Commit

Permalink
Introduce support for ZMPOP and BZMPOP commands #2435
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Jul 6, 2023
1 parent 47bee1e commit 263e093
Show file tree
Hide file tree
Showing 16 changed files with 962 additions and 10 deletions.
30 changes: 30 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -2179,6 +2179,26 @@ public RedisFuture<Long> xtrim(K key, XTrimArgs args) {
return dispatch(commandBuilder.xtrim(key, args));
}

@Override
public RedisFuture<KeyValue<K, ScoredValue<V>>> bzmpop(long timeout, ZPopArgs args, K... keys) {
return dispatch(commandBuilder.bzmpop(timeout, args, keys));
}

@Override
public RedisFuture<KeyValue<K, List<ScoredValue<V>>>> bzmpop(long timeout, long count, ZPopArgs args, K... keys) {
return dispatch(commandBuilder.bzmpop(timeout, count, args, keys));
}

@Override
public RedisFuture<KeyValue<K, ScoredValue<V>>> bzmpop(double timeout, ZPopArgs args, K... keys) {
return dispatch(commandBuilder.bzmpop(timeout, args, keys));
}

@Override
public RedisFuture<KeyValue<K, List<ScoredValue<V>>>> bzmpop(double timeout, int count, ZPopArgs args, K... keys) {
return dispatch(commandBuilder.bzmpop(timeout, count, args, keys));
}

@Override
public RedisFuture<KeyValue<K, ScoredValue<V>>> bzpopmin(long timeout, K... keys) {
return dispatch(commandBuilder.bzpopmin(timeout, keys));
Expand Down Expand Up @@ -2334,6 +2354,16 @@ public RedisFuture<List<Double>> zmscore(K key, V... members) {
return dispatch(commandBuilder.zmscore(key, members));
}

@Override
public RedisFuture<KeyValue<K, ScoredValue<V>>> zmpop(ZPopArgs args, K... keys) {
return dispatch(commandBuilder.zmpop(args, keys));
}

@Override
public RedisFuture<KeyValue<K, List<ScoredValue<V>>>> zmpop(int count, ZPopArgs args, K... keys) {
return dispatch(commandBuilder.zmpop(count, args, keys));
}

@Override
public RedisFuture<ScoredValue<V>> zpopmin(K key) {
return dispatch(commandBuilder.zpopmin(key));
Expand Down
34 changes: 32 additions & 2 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.Set;
import java.util.function.Supplier;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import io.lettuce.core.GeoArgs.Unit;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.reactive.*;
Expand Down Expand Up @@ -54,6 +52,8 @@
import io.lettuce.core.tracing.Tracing;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.ImmediateEventExecutor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* A reactive and thread-safe API for a Redis connection.
Expand Down Expand Up @@ -2257,6 +2257,26 @@ public Mono<Long> xtrim(K key, XTrimArgs args) {
return createMono(() -> commandBuilder.xtrim(key, args));
}

@Override
public Mono<KeyValue<K, ScoredValue<V>>> bzmpop(long timeout, ZPopArgs args, K... keys) {
return createMono(() -> commandBuilder.bzmpop(timeout, args, keys));
}

@Override
public Mono<KeyValue<K, List<ScoredValue<V>>>> bzmpop(long timeout, long count, ZPopArgs args, K... keys) {
return createMono(() -> commandBuilder.bzmpop(timeout, count, args, keys));
}

@Override
public Mono<KeyValue<K, ScoredValue<V>>> bzmpop(double timeout, ZPopArgs args, K... keys) {
return createMono(() -> commandBuilder.bzmpop(timeout, args, keys));
}

@Override
public Mono<KeyValue<K, List<ScoredValue<V>>>> bzmpop(double timeout, int count, ZPopArgs args, K... keys) {
return createMono(() -> commandBuilder.bzmpop(timeout, count, args, keys));
}

@Override
public Mono<KeyValue<K, ScoredValue<V>>> bzpopmin(long timeout, K... keys) {
return createMono(() -> commandBuilder.bzpopmin(timeout, keys));
Expand Down Expand Up @@ -2411,6 +2431,16 @@ public Mono<List<Double>> zmscore(K key, V... members) {
return createMono(() -> commandBuilder.zmscore(key, members));
}

@Override
public Mono<KeyValue<K, ScoredValue<V>>> zmpop(ZPopArgs args, K... keys) {
return createMono(() -> commandBuilder.zmpop(args, keys));
}

@Override
public Mono<KeyValue<K, List<ScoredValue<V>>>> zmpop(int count, ZPopArgs args, K... keys) {
return createMono(() -> commandBuilder.zmpop(count, args, keys));
}

@Override
public Mono<ScoredValue<V>> zpopmin(K key) {
return createMono(() -> commandBuilder.zpopmin(key));
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -3110,6 +3110,48 @@ private byte[] encode(K k) {
return result;
}

Command<K, V, KeyValue<K, ScoredValue<V>>> bzmpop(long timeout, ZPopArgs popArgs, K[] keys) {
notEmpty(keys);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(timeout).add(keys.length).addKeys(keys);

popArgs.build(args);

return createCommand(BZMPOP, new KeyValueOfScoredValueOutput<>(codec, keys[0]), args);
}

Command<K, V, KeyValue<K, ScoredValue<V>>> bzmpop(double timeout, ZPopArgs popArgs, K[] keys) {
notEmpty(keys);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(timeout).add(keys.length).addKeys(keys);

popArgs.build(args);

return createCommand(BZMPOP, new KeyValueOfScoredValueOutput<>(codec, keys[0]), args);
}

Command<K, V, KeyValue<K, List<ScoredValue<V>>>> bzmpop(long timeout, long count, ZPopArgs popArgs, K[] keys) {
notEmpty(keys);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(timeout).add(keys.length).addKeys(keys);

popArgs.build(args);
args.add(COUNT).add(count);

return createCommand(BZMPOP, new KeyValueListScoredValueOutput<>(codec, keys[0]), args);
}

Command<K, V, KeyValue<K, List<ScoredValue<V>>>> bzmpop(double timeout, long count, ZPopArgs popArgs, K[] keys) {
notEmpty(keys);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(timeout).add(keys.length).addKeys(keys);

popArgs.build(args);
args.add(COUNT).add(count);

return createCommand(BZMPOP, new KeyValueListScoredValueOutput<>(codec, keys[0]), args);
}

Command<K, V, KeyValue<K, ScoredValue<V>>> bzpopmin(long timeout, K... keys) {
notEmpty(keys);

Expand Down Expand Up @@ -3349,6 +3391,27 @@ Command<K, V, List<Double>> zmscore(K key, V... members) {
return createCommand(ZMSCORE, new DoubleListOutput<>(codec), key, members);
}

Command<K, V, KeyValue<K, ScoredValue<V>>> zmpop(ZPopArgs popArgs, K[] keys) {
notEmpty(keys);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(keys.length).addKeys(keys);

popArgs.build(args);

return createCommand(ZMPOP, new KeyValueOfScoredValueOutput<>(codec, keys[0]), args);
}

Command<K, V, KeyValue<K, List<ScoredValue<V>>>> zmpop(long count, ZPopArgs popArgs, K[] keys) {
notEmpty(keys);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(keys.length).addKeys(keys);

popArgs.build(args);
args.add(COUNT).add(count);

return createCommand(ZMPOP, new KeyValueListScoredValueOutput<>(codec, keys[0]), args);
}

Command<K, V, ScoredValue<V>> zpopmin(K key) {
notNullKey(key);

Expand Down
95 changes: 95 additions & 0 deletions src/main/java/io/lettuce/core/ZPopArgs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core;

import static io.lettuce.core.protocol.CommandKeyword.*;

import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.ProtocolKeyword;

/**
* Argument list builder for the ZMPOP <a href="https://redis.io/commands/zmpop">ZMPOP</a> and
* <a href="https://redis.io/commands/bzmpop">BZMPOP</a> command starting. {@link ZPopArgs} is a mutable object and instances
* should be used only once to avoid shared mutable state.
*
* @author Mark Paluch
* @since 6.3
*/
public class ZPopArgs implements CompositeArgument {

private ProtocolKeyword modifier;

/**
* Builder entry points for {@link ScanArgs}.
*/
public static class Builder {

/**
* Utility constructor.
*/
private Builder() {
}

/**
* Creates new {@link ZPopArgs} and enabling {@literal MIN}.
*
* @return new {@link ZPopArgs} with {@literal MIN} enabled.
* @see ZPopArgs#min()
*/
public static ZPopArgs min() {
return new ZPopArgs().min();
}

/**
* Creates new {@link ZPopArgs} and enabling {@literal MAX}.
*
* @return new {@link ZPopArgs} with {@literal MAX} enabled.
* @see ZPopArgs#min()
*/
public static ZPopArgs max() {
return new ZPopArgs().max();
}

}

/**
* Elements popped are those with the lowest scores from the first non-empty sorted set
*
* @return {@code this} {@link ZPopArgs}.
*/
public ZPopArgs min() {

this.modifier = MIN;
return this;
}

/**
* Elements popped are those with the highest scores from the first non-empty sorted set
*
* @return {@code this} {@link ZPopArgs}.
*/
public ZPopArgs max() {

this.modifier = MAX;
return this;
}

@Override
public <K, V> void build(CommandArgs<K, V> args) {
args.add(modifier);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,50 @@
*/
public interface RedisSortedSetAsyncCommands<K, V> {

/**
* Pops one or more elements, that are member-score pairs, from the first non-empty sorted set in the provided list of keys.
*
* @param timeout the timeout in seconds.
* @param keys the keys.
* @return ScoredValue&lt;V&gt; the removed element or {@link KeyValue#empty()}.
* @since 6.3
*/
RedisFuture<KeyValue<K, ScoredValue<V>>> bzmpop(long timeout, ZPopArgs args, K... keys);

/**
* Pops one or more elements, that are member-score pairs, from the first non-empty sorted set in the provided list of keys.
*
* @param timeout the timeout in seconds.
* @param count number of elements to pop.
* @param args the command args.
* @param keys the keys.
* @return ScoredValue&lt;V&gt; the removed element or {@link KeyValue#empty()}.
* @since 6.3
*/
RedisFuture<KeyValue<K, List<ScoredValue<V>>>> bzmpop(long timeout, long count, ZPopArgs args, K... keys);

/**
* Pops one or more elements, that are member-score pairs, from the first non-empty sorted set in the provided list of keys.
*
* @param timeout the timeout in seconds.
* @param keys the keys.
* @return ScoredValue&lt;V&gt; the removed element or {@link KeyValue#empty()}.
* @since 6.3
*/
RedisFuture<KeyValue<K, ScoredValue<V>>> bzmpop(double timeout, ZPopArgs args, K... keys);

/**
* Pops one or more elements, that are member-score pairs, from the first non-empty sorted set in the provided list of keys.
*
* @param timeout the timeout in seconds.
* @param count number of elements to pop.
* @param args the command args.
* @param keys the keys.
* @return ScoredValue&lt;V&gt; the removed element or {@link KeyValue#empty()}.
* @since 6.3
*/
RedisFuture<KeyValue<K, List<ScoredValue<V>>>> bzmpop(double timeout, int count, ZPopArgs args, K... keys);

/**
* Removes and returns a member with the lowest scores in the sorted set stored at one of the keys.
*
Expand Down Expand Up @@ -370,6 +414,26 @@ public interface RedisSortedSetAsyncCommands<K, V> {
*/
RedisFuture<List<Double>> zmscore(K key, V... members);

/**
* Pops one or more elements, that are member-score pairs, from the first non-empty sorted set in the provided list of keys.
*
* @param keys the keys.
* @return ScoredValue&lt;V&gt; the removed element or {@link KeyValue#empty()}.
* @since 6.3
*/
RedisFuture<KeyValue<K, ScoredValue<V>>> zmpop(ZPopArgs args, K... keys);

/**
* Pops one or more elements, that are member-score pairs, from the first non-empty sorted set in the provided list of keys.
*
* @param count number of elements to pop.
* @param args the command args.
* @param keys the keys.
* @return ScoredValue&lt;V&gt; the removed element or {@link KeyValue#empty()}.
* @since 6.3
*/
RedisFuture<KeyValue<K, List<ScoredValue<V>>>> zmpop(int count, ZPopArgs args, K... keys);

/**
* Removes and returns up to count members with the lowest scores in the sorted set stored at key.
*
Expand Down
Loading

0 comments on commit 263e093

Please sign in to comment.