diff --git a/src/main/java/io/lettuce/core/AbstractRedisClient.java b/src/main/java/io/lettuce/core/AbstractRedisClient.java index 852813e839..e0b913f76b 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisClient.java +++ b/src/main/java/io/lettuce/core/AbstractRedisClient.java @@ -434,55 +434,65 @@ public CompletableFuture shutdownAsync(long quietPeriod, long timeout, Tim logger.debug("Initiate shutdown ({}, {}, {})", quietPeriod, timeout, timeUnit); - List> closeFutures = new ArrayList<>(); - - while (!closeableResources.isEmpty()) { - Closeable closeableResource = closeableResources.iterator().next(); + return closeOtherResources().thenCompose((value) -> + closeClientResources(quietPeriod, timeout, timeUnit) + ); + } - if (closeableResource instanceof AsyncCloseable) { + return completedFuture(null); + } - closeFutures.add(((AsyncCloseable) closeableResource).closeAsync()); - } else { - try { - closeableResource.close(); - } catch (Exception e) { - logger.debug("Exception on Close: " + e.getMessage(), e); - } - } - closeableResources.remove(closeableResource); - } + private CompletableFuture closeOtherResources() { + List> closeFutures = new ArrayList<>(); - for (Channel c : channels) { + while (!closeableResources.isEmpty()) { + Closeable closeableResource = closeableResources.iterator().next(); - ChannelPipeline pipeline = c.pipeline(); + if (closeableResource instanceof AsyncCloseable) { - ConnectionWatchdog commandHandler = pipeline.get(ConnectionWatchdog.class); - if (commandHandler != null) { - commandHandler.setListenOnChannelInactive(false); + closeFutures.add(((AsyncCloseable) closeableResource).closeAsync()); + } else { + try { + closeableResource.close(); + } catch (Exception e) { + logger.debug("Exception on Close: " + e.getMessage(), e); } } + closeableResources.remove(closeableResource); + } - try { - closeFutures.add(toCompletableFuture(channels.close())); - } catch (Exception e) { - logger.debug("Cannot close channels", e); - } + for (Channel c : channels) { - if (!sharedResources) { - Future groupCloseFuture = clientResources.shutdown(quietPeriod, timeout, timeUnit); - closeFutures.add(toCompletableFuture(groupCloseFuture)); - } else { - for (EventLoopGroup eventExecutors : eventLoopGroups.values()) { - Future groupCloseFuture = clientResources.eventLoopGroupProvider().release(eventExecutors, quietPeriod, - timeout, timeUnit); - closeFutures.add(toCompletableFuture(groupCloseFuture)); - } + ChannelPipeline pipeline = c.pipeline(); + + ConnectionWatchdog commandHandler = pipeline.get(ConnectionWatchdog.class); + if (commandHandler != null) { + commandHandler.setListenOnChannelInactive(false); } + } - return Futures.allOf(closeFutures); + try { + closeFutures.add(toCompletableFuture(channels.close())); + } catch (Exception e) { + logger.debug("Cannot close channels", e); } - return completedFuture(null); + return Futures.allOf(closeFutures); + } + + private CompletableFuture closeClientResources(long quietPeriod, long timeout, TimeUnit timeUnit) { + List> groupCloseFutures = new ArrayList<>(); + if (!sharedResources) { + Future groupCloseFuture = clientResources.shutdown(quietPeriod, timeout, timeUnit); + groupCloseFutures.add(toCompletableFuture(groupCloseFuture)); + } else { + for (EventLoopGroup eventExecutors : eventLoopGroups.values()) { + Future groupCloseFuture = clientResources.eventLoopGroupProvider().release(eventExecutors, quietPeriod, + timeout, timeUnit); + groupCloseFutures.add(toCompletableFuture(groupCloseFuture)); + } + } + return Futures.allOf(groupCloseFutures); } protected int getResourceCount() {