Skip to content

Commit

Permalink
Close async closeables asynchronously #640
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Nov 11, 2017
1 parent ec67b2a commit 7118b2f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
19 changes: 13 additions & 6 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import reactor.core.publisher.Mono;
import io.lettuce.core.Transports.NativeTransports;
import io.lettuce.core.internal.AsyncCloseable;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.ConnectionWatchdog;
import io.lettuce.core.resource.ClientResources;
Expand Down Expand Up @@ -432,18 +433,24 @@ public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, Tim

if (shutdown.compareAndSet(false, true)) {

List<CompletableFuture<Void>> closeFutures = new ArrayList<>();

while (!closeableResources.isEmpty()) {
Closeable closeableResource = closeableResources.iterator().next();
try {
closeableResource.close();
} catch (Exception e) {
logger.debug("Exception on Close: " + e.getMessage(), e);

if (closeableResource instanceof AsyncCloseable) {

closeFutures.add(((AsyncCloseable) closeableResource).closeAsync());
} else {
try {
closeableResource.close();
} catch (Exception e) {
logger.debug("Exception on Close: " + e.getMessage(), e);
}
}
closeableResources.remove(closeableResource);
}

List<CompletableFuture<Void>> closeFutures = new ArrayList<>();

for (Channel c : channels) {

ChannelPipeline pipeline = c.pipeline();
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/io/lettuce/core/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,14 @@ private Mono<SocketAddress> getSocketAddressSupplier(RedisURI redisURI) {

private Mono<SocketAddress> lookupRedis(RedisURI sentinelUri) {

return Mono.fromCompletionStage(connectSentinelAsync(newStringStringCodec(), sentinelUri, timeout)).flatMap(
c -> c.reactive().getMasterAddrByName(sentinelUri.getSentinelMasterId()).timeout(timeout)
.doFinally(s -> c.close()));
Mono<StatefulRedisSentinelConnection<String, String>> connection = Mono.fromCompletionStage(connectSentinelAsync(
newStringStringCodec(), sentinelUri, timeout));

return connection.flatMap(c -> c.reactive() //
.getMasterAddrByName(sentinelUri.getSentinelMasterId()) //
.timeout(this.timeout) //
.flatMap(it -> Mono.fromCompletionStage(c.closeAsync()) //
.then(Mono.just(it))));
}

private static void checkValidRedisURI(RedisURI redisURI) {
Expand Down

0 comments on commit 7118b2f

Please sign in to comment.