Skip to content

Commit

Permalink
Consider value multiplicity in reactive output resolution #879
Browse files Browse the repository at this point in the history
Lettuce now checks for the value multiplicity when resolving the actual output type for reactive Redis commands methods.

Previously, all reactive types were considered streaming ones which caused usage of a not applicable output type. So Mono<String> used KeyListOutput which propagated null values.
  • Loading branch information
mp911de committed Oct 8, 2018
1 parent acdcb5c commit f57be1c
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ private DeclaredCommandMethod(Method method, ExecutionSpecificParameters paramet

ResolvableType actualReturnType = this.returnType;

while (Future.class.isAssignableFrom(actualReturnType.getRawClass())
|| ReactiveTypes.supports(actualReturnType.getRawClass())) {
while (Future.class.isAssignableFrom(actualReturnType.getRawClass())) {
ResolvableType[] generics = actualReturnType.getGenerics();

if (generics.length != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ReactiveCommandSegmentCommandFactory extends CommandSegmentCommandFactory

private boolean streamingExecution;

public ReactiveCommandSegmentCommandFactory(CommandSegments commandSegments, CommandMethod commandMethod,
ReactiveCommandSegmentCommandFactory(CommandSegments commandSegments, CommandMethod commandMethod,
RedisCodec<?, ?> redisCodec, CommandOutputFactoryResolver outputResolver) {

super(commandSegments, commandMethod, redisCodec, outputResolver);
Expand All @@ -49,20 +49,27 @@ public ReactiveCommandSegmentCommandFactory(CommandSegments commandSegments, Com
@Override
protected CommandOutputFactory resolveCommandOutputFactory(OutputSelector outputSelector) {

CommandOutputFactory factory = getOutputResolver().resolveStreamingCommandOutput(outputSelector);
streamingExecution = ReactiveTypes.isMultiValueType(outputSelector.getOutputType().getRawClass());

if (factory != null) {
streamingExecution = true;
return factory;
OutputSelector componentType = new OutputSelector(outputSelector.getOutputType().getGeneric(0),
outputSelector.getRedisCodec());

if (streamingExecution) {

CommandOutputFactory streamingFactory = getOutputResolver().resolveStreamingCommandOutput(componentType);

if (streamingExecution && streamingFactory != null) {
return streamingFactory;
}
}

return super.resolveCommandOutputFactory(outputSelector);
return super.resolveCommandOutputFactory(componentType);
}

/**
* @return {@literal true} if the resolved {@link io.lettuce.core.output.CommandOutput} should use streaming.
*/
public boolean isStreamingExecution() {
boolean isStreamingExecution() {
return streamingExecution;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ void shouldResolveFluxComponentType() throws Exception {

CommandMethod commandMethod = DeclaredCommandMethod.create(getMethod("getFlux"));

assertThat(commandMethod.getActualReturnType().getRawClass()).isEqualTo(String.class);
assertThat(commandMethod.getActualReturnType().getRawClass()).isEqualTo(Flux.class);
assertThat(commandMethod.getReturnType().getRawClass()).isEqualTo(Flux.class);
}

private Method getMethod(String name) throws NoSuchMethodException {
return MyInterface.class.getDeclaredMethod(name);
}

private static interface MyInterface {
private interface MyInterface {

String getString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;

import java.lang.reflect.Method;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.reactivestreams.Publisher;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.dynamic.domain.Timeout;
import io.lettuce.core.dynamic.output.CommandOutputFactory;
import io.lettuce.core.dynamic.output.CommandOutputFactoryResolver;
import io.lettuce.core.dynamic.output.CodecAwareOutputFactoryResolver;
import io.lettuce.core.dynamic.output.OutputRegistry;
import io.lettuce.core.dynamic.output.OutputRegistryCommandOutputFactoryResolver;
import io.lettuce.core.dynamic.segment.AnnotationCommandSegmentFactory;
import io.lettuce.core.dynamic.segment.CommandSegments;
import io.lettuce.core.dynamic.support.ReflectionUtils;
import io.lettuce.core.output.StreamingOutput;
import io.lettuce.core.protocol.RedisCommand;

/**
Expand All @@ -44,16 +44,8 @@
@ExtendWith(MockitoExtension.class)
class ReactiveCommandSegmentCommandFactoryUnitTests {

@Mock
private CommandOutputFactoryResolver outputFactoryResolver;

@Mock
private CommandOutputFactory commandOutputFactory;

@BeforeEach
void before() {
when(outputFactoryResolver.resolveCommandOutput(any())).thenReturn(commandOutputFactory);
}
private CodecAwareOutputFactoryResolver outputFactoryResolver = new CodecAwareOutputFactoryResolver(
new OutputRegistryCommandOutputFactoryResolver(new OutputRegistry()), StringCodec.UTF8);

@Test
void commandCreationWithTimeoutShouldFail() {
Expand All @@ -66,6 +58,22 @@ void commandCreationWithTimeoutShouldFail() {
}
}

@Test
void shouldResolveNonStreamingOutput() {

RedisCommand<?, ?, ?> command = createCommand("getOne", ReactiveWithTimeout.class, String.class);

assertThat(command.getOutput()).isNotInstanceOf(StreamingOutput.class);
}

@Test
void shouldResolveStreamingOutput() {

RedisCommand<?, ?, ?> command = createCommand("getMany", ReactiveWithTimeout.class, String.class);

assertThat(command.getOutput()).isInstanceOf(StreamingOutput.class);
}

RedisCommand<?, ?, ?> createCommand(String methodName, Class<?> interfaceClass, Class<?>... parameterTypes) {

Method method = ReflectionUtils.findMethod(interfaceClass, methodName, parameterTypes);
Expand All @@ -78,11 +86,15 @@ void commandCreationWithTimeoutShouldFail() {
ReactiveCommandSegmentCommandFactory factory = new ReactiveCommandSegmentCommandFactory(commandSegments, commandMethod,
new StringCodec(), outputFactoryResolver);

return factory.createCommand(null);
return factory.createCommand(new Object[] { "foo" });
}

private static interface ReactiveWithTimeout {

Publisher<String> get(String key, Timeout timeout);

Mono<String> getOne(String key);

Flux<String> getMany(String key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import javax.inject.Inject;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand All @@ -27,6 +28,7 @@
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.dynamic.annotation.Command;
import io.lettuce.test.LettuceExtension;
import io.reactivex.Maybe;

/**
* @author Mark Paluch
Expand All @@ -41,6 +43,11 @@ class RedisCommandsReactiveIntegrationTests extends TestSupport {
this.redis = connection.sync();
}

@BeforeEach
void setUp() {
this.redis.flushall();
}

@Test
void reactive() {

Expand All @@ -51,8 +58,56 @@ void reactive() {
StepVerifier.create(api.setReactive(key, value)).expectNext("OK").verifyComplete();
}

static interface MultipleExecutionModels extends Commands {
@Test
void shouldHandlePresentValue() {

RedisCommandFactory factory = new RedisCommandFactory(redis.getStatefulConnection());

MultipleExecutionModels api = factory.getCommands(MultipleExecutionModels.class);

StepVerifier.create(api.setReactive(key, value)).expectNext("OK").verifyComplete();
StepVerifier.create(api.get(key)).expectNext(value).verifyComplete();
}

@Test
void shouldHandleAbsentValue() {

RedisCommandFactory factory = new RedisCommandFactory(redis.getStatefulConnection());

MultipleExecutionModels api = factory.getCommands(MultipleExecutionModels.class);

StepVerifier.create(api.get("unknown")).verifyComplete();
}

@Test
void shouldHandlePresentValueRxJava() throws InterruptedException {

RedisCommandFactory factory = new RedisCommandFactory(redis.getStatefulConnection());

MultipleExecutionModels api = factory.getCommands(MultipleExecutionModels.class);

StepVerifier.create(api.setReactive(key, value)).expectNext("OK").verifyComplete();
api.getRxJava(key).test().await().onSuccess(value);
}

@Test
void shouldHandleAbsentValueRxJava() throws InterruptedException {

RedisCommandFactory factory = new RedisCommandFactory(redis.getStatefulConnection());

MultipleExecutionModels api = factory.getCommands(MultipleExecutionModels.class);

api.getRxJava(key).test().await().onSuccess(null);
}

interface MultipleExecutionModels extends Commands {

@Command("SET")
Mono<String> setReactive(String key, String value);

Mono<String> get(String key);

@Command("GET")
Maybe<String> getRxJava(String key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.util.List;

import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import io.lettuce.core.GeoCoordinates;
import io.lettuce.core.ScoredValue;
import io.lettuce.core.Value;
Expand All @@ -46,13 +49,23 @@ void shouldResolveStringListOutput() {
assertThat(getCommandOutput("stringIterable")).isInstanceOf(KeyListOutput.class);
}

@Test
void shouldResolveStreamingStringListOutput() {
assertThat(getStreamingCommandOutput("stringFlux")).isInstanceOf(KeyListOutput.class);
}

@Test
void shouldResolveVoidOutput() {

assertThat(getCommandOutput("voidMethod")).isInstanceOf(VoidOutput.class);
assertThat(getCommandOutput("voidWrapper")).isInstanceOf(VoidOutput.class);
}

@Test
void shouldResolveKeyOutput() {
assertThat(getCommandOutput("stringMono")).isInstanceOf(KeyOutput.class);
}

@Test
void shouldResolveStringValueListOutput() {

Expand Down Expand Up @@ -143,23 +156,40 @@ void stringWildcardValueCollectionIsAssignableFromOutputs() {
CommandOutput<?, ?, ?> getCommandOutput(String methodName) {

OutputSelector outputSelector = getOutputSelector(methodName);
CommandOutputFactory factory = resolver.resolveCommandOutput(outputSelector);
CommandOutputFactory factory = resolver.resolveCommandOutput(Publisher.class.isAssignableFrom(outputSelector
.getOutputType().getRawClass()) ? unwrapReactiveType(outputSelector) : outputSelector);

return factory.create(new StringCodec());
}

CommandOutput<?, ?, ?> getStreamingCommandOutput(String methodName) {

OutputSelector outputSelector = getOutputSelector(methodName);
CommandOutputFactory factory = resolver.resolveStreamingCommandOutput(unwrapReactiveType(outputSelector));

return factory.create(new StringCodec());
}

private OutputSelector unwrapReactiveType(OutputSelector outputSelector) {
return new OutputSelector(outputSelector.getOutputType().getGeneric(0), outputSelector.getRedisCodec());
}

private OutputSelector getOutputSelector(String methodName) {

Method method = ReflectionUtils.findMethod(CommandMethods.class, methodName);
return new OutputSelector(DeclaredCommandMethod.create(method).getActualReturnType(), StringCodec.UTF8);
}

private static interface CommandMethods {
private interface CommandMethods {

List<String> stringList();

Iterable<String> stringIterable();

Mono<String> stringMono();

Flux<String> stringFlux();

Collection<Value<String>> stringValueCollection();

Collection<? extends Value<String>> stringWildcardValueCollection();
Expand Down

0 comments on commit f57be1c

Please sign in to comment.