Skip to content

Commit

Permalink
DATAREDIS-1196 - Add lPos command to ReactiveListCommands.
Browse files Browse the repository at this point in the history
  • Loading branch information
christophstrobl committed Sep 16, 2020
1 parent 4916bc5 commit 5ea658f
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,125 @@ default Mono<Boolean> lTrim(ByteBuffer key, long start, long end) {
*/
Flux<BooleanResponse<RangeCommand>> lTrim(Publisher<RangeCommand> commands);

/**
* {@code LPOS} command parameters.
*
* @author Christoph Strobl
* @since 2.4
* @see <a href="https://redis.io/commands/lpos">Redis Documentation: LPOS</a>
*/
class LPosCommand extends KeyCommand {

private final ByteBuffer element;
private final @Nullable Integer count;
private final @Nullable Integer rank;

private LPosCommand(@Nullable ByteBuffer key, ByteBuffer element, @Nullable Integer count, @Nullable Integer rank) {

super(key);
this.element = element;
this.count = count;
this.rank = rank;
}

/**
* Creates a new {@link LPosCommand} for given {@literal element}.
*
* @param element
* @return a new {@link LPosCommand} for {@literal element}.
*/
public static LPosCommand lPosOf(ByteBuffer element) {
return new LPosCommand(null, element, null, null);
}

/**
* Applies the {@literal key}. Constructs a new command instance with all previously configured properties.
*
* @param key must not be {@literal null}.
* @return a new {@link LPosCommand} with {@literal key} applied.
*/
public LPosCommand from(ByteBuffer key) {

Assert.notNull(key, "Key must not be null!");
return new LPosCommand(key, element, count, rank);
}

/**
* Applies the {@literal count} parameter specifying the number of matches to return. Constructs a new command
* instance with all previously configured properties.
*
* @param count can be {@literal null}.
* @return a new {@link LPosCommand} with {@literal count} applied.
*/
public LPosCommand count(Integer count) {
return new LPosCommand(getKey(), element, count, rank);
}

/**
* Applies the {@literal rank} parameter specifying the "rank" of the first element to return. Constructs a new
* command instance with all previously configured properties.
*
* @param rank can be {@literal null}.
* @return a new {@link LPosCommand} with {@literal count} applied.
*/
public LPosCommand rank(Integer rank) {
return new LPosCommand(getKey(), element, count, rank);
}

@Nullable
public Integer getCount() {
return count;
}

@Nullable
public Integer getRank() {
return rank;
}

@Nullable
public ByteBuffer getElement() {
return element;
}
}

/**
* Get first index of the {@literal element} from list at {@literal key}.
*
* @param key must not be {@literal null}.
* @param element
* @return a {@link Mono} emitting the elements index.
* @since 2.4
* @see <a href="https://redis.io/commands/lindex">Redis Documentation: LINDEX</a>
*/
default Mono<Long> lPos(ByteBuffer key, ByteBuffer element) {

Assert.notNull(key, "Key must not be null!");

return lPos(LPosCommand.lPosOf(element).from(key)).next();
}

/**
* Get indices of the {@literal element} from list at {@link LPosCommand#getKey()}.
*
* @param command must not be {@literal null}.
* @return a {@link Flux} emitting the elements indices one by one.
* @since 2.4
* @see <a href="https://redis.io/commands/lindex">Redis Documentation: LINDEX</a>
*/
default Flux<Long> lPos(LPosCommand command) {
return lPos(Mono.just(command)).map(NumericResponse::getOutput);
}

/**
* Get indices of the {@literal element} from list at {@link LPosCommand#getKey()}.
*
* @param commands must not be {@literal null}.
* @return a {@link Flux} emitting the elements indices one by one.
* @since 2.4
* @see <a href="https://redis.io/commands/lindex">Redis Documentation: LINDEX</a>
*/
Flux<NumericResponse<LPosCommand, Long>> lPos(Publisher<LPosCommand> commands);

/**
* {@code LINDEX} command parameters.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.LPosArgs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -146,8 +147,32 @@ public Flux<BooleanResponse<RangeCommand>> lTrim(Publisher<RangeCommand> command
LettuceConverters.getLowerBoundIndex(range), //
LettuceConverters.getUpperBoundIndex(range));

return result
.map(LettuceConverters::stringToBoolean).map(value -> new BooleanResponse<>(command, value));
return result.map(LettuceConverters::stringToBoolean).map(value -> new BooleanResponse<>(command, value));
}));
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.ReactiveListCommands#lPos(org.reactivestreams.Publisher)
*/
@Override
public Flux<NumericResponse<LPosCommand, Long>> lPos(Publisher<LPosCommand> commands) {

return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {

LPosArgs args = new LPosArgs();
if (command.getRank() != null) {
args.first(command.getRank());
}

Flux<Long> values;
if (command.getCount() != null) {
values = cmd.lpos(command.getKey(), command.getElement(), command.getCount(), args);
} else {
values = cmd.lpos(command.getKey(), command.getElement(), args).flux();
}

return values.map(value -> new NumericResponse<>(command, value));
}));
}

Expand Down Expand Up @@ -238,7 +263,8 @@ public Flux<ByteBufferResponse<PopCommand>> pop(Publisher<PopCommand> commands)
Assert.notNull(command.getDirection(), "Direction must not be null!");

Mono<ByteBuffer> popResult = ObjectUtils.nullSafeEquals(Direction.RIGHT, command.getDirection())
? cmd.rpop(command.getKey()) : cmd.lpop(command.getKey());
? cmd.rpop(command.getKey())
: cmd.lpop(command.getKey());

return popResult.map(value -> new ByteBufferResponse<>(command, value));
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@
import reactor.test.StepVerifier;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.ReactiveListCommands.LPosCommand;
import org.springframework.data.redis.connection.ReactiveListCommands.PopResult;
import org.springframework.data.redis.connection.ReactiveListCommands.PushCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.RangeCommand;
import org.springframework.data.redis.connection.RedisListCommands.Position;
import org.springframework.data.redis.test.util.MinimumRedisVersionRule;
import org.springframework.test.annotation.IfProfileValue;

/**
* @author Christoph Strobl
Expand All @@ -44,6 +49,8 @@
*/
public class LettuceReactiveListCommandTests extends LettuceReactiveCommandsTestsBase {

@Rule public MinimumRedisVersionRule redisVersion = new MinimumRedisVersionRule();

@Test // DATAREDIS-525
public void rPushShouldAppendValuesCorrectly() {

Expand Down Expand Up @@ -304,4 +311,91 @@ public void brPopLPushShouldWorkCorrectly() {
assertThat(nativeCommands.llen(KEY_2)).isEqualTo(2L);
assertThat(nativeCommands.lindex(KEY_2, 0)).isEqualTo(VALUE_3);
}

@Test // DATAREDIS-1196
@IfProfileValue(name = "redisVersion", value = "6.0.6+")
public void lPos() {

nativeCommands.rpush(KEY_1, "a", "b", "c", "1", "2", "3", "c", "c");

connection.listCommands().lPos(KEY_1_BBUFFER, ByteBuffer.wrap("c".getBytes(StandardCharsets.UTF_8))) //
.as(StepVerifier::create) //
.expectNext(2L) //
.verifyComplete();

}

@Test // DATAREDIS-1196
@IfProfileValue(name = "redisVersion", value = "6.0.6+")
@Ignore("https://github.com/lettuce-io/lettuce-core/issues/1410")
public void lPosRank() {

nativeCommands.rpush(KEY_1, "a", "b", "c", "1", "2", "3", "c", "c");

connection.listCommands()
.lPos(LPosCommand.lPosOf(ByteBuffer.wrap("c".getBytes(StandardCharsets.UTF_8))).from(KEY_1_BBUFFER).rank(2)) //
.as(StepVerifier::create) //
.expectNext(6L) //
.verifyComplete();
}

@Test // DATAREDIS-1196
@IfProfileValue(name = "redisVersion", value = "6.0.6+")
@Ignore("https://github.com/lettuce-io/lettuce-core/issues/1410")
public void lPosNegativeRank() {

nativeCommands.rpush(KEY_1, "a", "b", "c", "1", "2", "3", "c", "c");

connection.listCommands()
.lPos(LPosCommand.lPosOf(ByteBuffer.wrap("c".getBytes(StandardCharsets.UTF_8))).from(KEY_1_BBUFFER).rank(-1)) //
.as(StepVerifier::create) //
.expectNext(7L) //
.verifyComplete();
}

@Test // DATAREDIS-1196
@IfProfileValue(name = "redisVersion", value = "6.0.6+")
public void lPosCount() {

nativeCommands.rpush(KEY_1, "a", "b", "c", "1", "2", "3", "c", "c");

connection.listCommands()
.lPos(LPosCommand.lPosOf(ByteBuffer.wrap("c".getBytes(StandardCharsets.UTF_8))).from(KEY_1_BBUFFER).count(2)) //
.as(StepVerifier::create) //
.expectNext(2L) //
.expectNext(6L) //
.verifyComplete();
}

@Test // DATAREDIS-1196
@IfProfileValue(name = "redisVersion", value = "6.0.6+")
@Ignore("https://github.com/lettuce-io/lettuce-core/issues/1410")
public void lPosRankCount() {

nativeCommands.rpush(KEY_1, "a", "b", "c", "1", "2", "3", "c", "c");

connection.listCommands()
.lPos(LPosCommand.lPosOf(ByteBuffer.wrap("c".getBytes(StandardCharsets.UTF_8))).from(KEY_1_BBUFFER)
.from(KEY_1_BBUFFER).rank(-1).count(2)) //
.as(StepVerifier::create) //
.expectNext(7L) //
.expectNext(6L) //
.verifyComplete();
}

@Test // DATAREDIS-1196
@IfProfileValue(name = "redisVersion", value = "6.0.6+")
public void lPosCountZero() {

nativeCommands.rpush(KEY_1, "a", "b", "c", "1", "2", "3", "c", "c");

connection.listCommands()
.lPos(LPosCommand.lPosOf(ByteBuffer.wrap("c".getBytes(StandardCharsets.UTF_8))).from(KEY_1_BBUFFER).count(0)) //
.as(StepVerifier::create) //
.expectNext(2L) //
.expectNext(6L) //
.expectNext(7L) //
.verifyComplete();
}

}

0 comments on commit 5ea658f

Please sign in to comment.