Skip to content

Commit

Permalink
Synchronize ClientResources shutdown with channel close completion #998
Browse files Browse the repository at this point in the history
ClientResources are released after awaiting channel close completion to avoid event loop termination before the channels had a chance to get closed.

Co-authored-by: Pritam Kadam <[email protected]>
Original pull request: #1000.
  • Loading branch information
Poorva17 authored and mp911de committed Mar 14, 2019
1 parent 5141b8c commit 30cfd1d
Showing 1 changed file with 46 additions and 36 deletions.
82 changes: 46 additions & 36 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,55 +434,65 @@ public CompletableFuture<Void> shutdownAsync(long quietPeriod, long timeout, Tim

logger.debug("Initiate shutdown ({}, {}, {})", quietPeriod, timeout, timeUnit);

List<CompletableFuture<Void>> closeFutures = new ArrayList<>();

while (!closeableResources.isEmpty()) {
Closeable closeableResource = closeableResources.iterator().next();
return closeOtherResources().thenCompose((value) ->
closeClientResources(quietPeriod, timeout, timeUnit)
);
}

if (closeableResource instanceof AsyncCloseable) {
return completedFuture(null);
}

closeFutures.add(((AsyncCloseable) closeableResource).closeAsync());
} else {
try {
closeableResource.close();
} catch (Exception e) {
logger.debug("Exception on Close: " + e.getMessage(), e);
}
}
closeableResources.remove(closeableResource);
}
private CompletableFuture<Void> closeOtherResources() {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();

for (Channel c : channels) {
while (!closeableResources.isEmpty()) {
Closeable closeableResource = closeableResources.iterator().next();

ChannelPipeline pipeline = c.pipeline();
if (closeableResource instanceof AsyncCloseable) {

ConnectionWatchdog commandHandler = pipeline.get(ConnectionWatchdog.class);
if (commandHandler != null) {
commandHandler.setListenOnChannelInactive(false);
closeFutures.add(((AsyncCloseable) closeableResource).closeAsync());
} else {
try {
closeableResource.close();
} catch (Exception e) {
logger.debug("Exception on Close: " + e.getMessage(), e);
}
}
closeableResources.remove(closeableResource);
}

try {
closeFutures.add(toCompletableFuture(channels.close()));
} catch (Exception e) {
logger.debug("Cannot close channels", e);
}
for (Channel c : channels) {

if (!sharedResources) {
Future<?> groupCloseFuture = clientResources.shutdown(quietPeriod, timeout, timeUnit);
closeFutures.add(toCompletableFuture(groupCloseFuture));
} else {
for (EventLoopGroup eventExecutors : eventLoopGroups.values()) {
Future<?> groupCloseFuture = clientResources.eventLoopGroupProvider().release(eventExecutors, quietPeriod,
timeout, timeUnit);
closeFutures.add(toCompletableFuture(groupCloseFuture));
}
ChannelPipeline pipeline = c.pipeline();

ConnectionWatchdog commandHandler = pipeline.get(ConnectionWatchdog.class);
if (commandHandler != null) {
commandHandler.setListenOnChannelInactive(false);
}
}

return Futures.allOf(closeFutures);
try {
closeFutures.add(toCompletableFuture(channels.close()));
} catch (Exception e) {
logger.debug("Cannot close channels", e);
}

return completedFuture(null);
return Futures.allOf(closeFutures);
}

private CompletableFuture<Void> closeClientResources(long quietPeriod, long timeout, TimeUnit timeUnit) {
List<CompletableFuture<Void>> groupCloseFutures = new ArrayList<>();
if (!sharedResources) {
Future<?> groupCloseFuture = clientResources.shutdown(quietPeriod, timeout, timeUnit);
groupCloseFutures.add(toCompletableFuture(groupCloseFuture));
} else {
for (EventLoopGroup eventExecutors : eventLoopGroups.values()) {
Future<?> groupCloseFuture = clientResources.eventLoopGroupProvider().release(eventExecutors, quietPeriod,
timeout, timeUnit);
groupCloseFutures.add(toCompletableFuture(groupCloseFuture));
}
}
return Futures.allOf(groupCloseFutures);
}

protected int getResourceCount() {
Expand Down

0 comments on commit 30cfd1d

Please sign in to comment.