Skip to content

Commit

Permalink
Capture subscriber initialization exceptions in init future.
Browse files Browse the repository at this point in the history
We now capture exceptions during the connection initialization and subscription to avoid getting the container into an invalid state.

Closes spring-projects#2335
  • Loading branch information
mp911de committed Jun 2, 2022
1 parent e2e68aa commit 08ed1c4
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.ConnectionUtils;
import org.springframework.data.redis.connection.Message;
Expand All @@ -53,6 +54,7 @@
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -181,7 +183,6 @@ public void afterPropertiesSet() {
subscriptionExecutor = taskExecutor;
}


this.subscriber = createSubscriber(connectionFactory, this.subscriptionExecutor);

afterPropertiesSet = true;
Expand Down Expand Up @@ -269,6 +270,11 @@ private void lazyListen() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {

if (e.getCause() instanceof DataAccessException) {
throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause());
}

throw new CompletionException(e.getCause());
} catch (TimeoutException e) {
throw new IllegalStateException("Subscription registration timeout exceeded.", e);
Expand Down Expand Up @@ -670,7 +676,16 @@ else if (topic instanceof PatternTopic) {
getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][]));
getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][]));

future.join();
try {
future.join();
} catch (CompletionException e) {

if (e.getCause() instanceof DataAccessException) {
throw new RedisListenerExecutionFailedException(e.getMessage(), e.getCause());
}

throw e;
}
}
}
}
Expand Down Expand Up @@ -1166,23 +1181,25 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col

synchronized (localMonitor) {

RedisConnection connection = connectionFactory.getConnection();
this.connection = connection;

if (connection.isSubscribed()) {
CompletableFuture<Void> initFuture = new CompletableFuture<>();
try {
RedisConnection connection = connectionFactory.getConnection();
this.connection = connection;

CompletableFuture<Void> failure = new CompletableFuture<>();
failure.completeExceptionally(
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
return failure;
}
if (connection.isSubscribed()) {

CompletableFuture<Void> initFuture = new CompletableFuture<>();
initFuture.completeExceptionally(
new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
return initFuture;
}

try {
eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
} catch (Throwable t) {
handleSubscriptionException(initFuture, backOffExecution, t);
try {
eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
} catch (Throwable t) {
handleSubscriptionException(initFuture, backOffExecution, t);
}
} catch (RuntimeException e) {
initFuture.completeExceptionally(e);
}

return initFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.junit.jupiter.api.Test;

import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;

/**
* Unit tests for {@link RedisMessageListenerContainer}.
Expand Down Expand Up @@ -106,6 +108,46 @@ void containerShouldStopGracefullyOnUnsubscribeErrors() {
verify(connectionMock).close();
}

@Test // GH-2335
void containerStartShouldReportFailureOnRedisUnavailability() {

when(connectionFactoryMock.getConnection()).thenThrow(new RedisConnectionFailureException("Booh!"));

doAnswer(it -> {

Runnable r = it.getArgument(0);
r.run();
return null;
}).when(executorMock).execute(any());

container.addMessageListener(adapter, new ChannelTopic("a"));
assertThatExceptionOfType(RedisListenerExecutionFailedException.class).isThrownBy(() -> container.start());

assertThat(container.isRunning()).isTrue();
assertThat(container.isListening()).isFalse();
}

@Test // GH-2335
void containerListenShouldReportFailureOnRedisUnavailability() {

when(connectionFactoryMock.getConnection()).thenThrow(new RedisConnectionFailureException("Booh!"));

doAnswer(it -> {

Runnable r = it.getArgument(0);
r.run();
return null;
}).when(executorMock).execute(any());

container.start();

assertThatExceptionOfType(RedisListenerExecutionFailedException.class)
.isThrownBy(() -> container.addMessageListener(adapter, new ChannelTopic("a")));

assertThat(container.isRunning()).isTrue();
assertThat(container.isListening()).isFalse();
}

@Test // GH-964
void failsOnDuplicateInit() {
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> container.afterPropertiesSet());
Expand Down

0 comments on commit 08ed1c4

Please sign in to comment.