Skip to content

Commit

Permalink
Replace io.lettuce.core.resource.Futures utility with Netty's Promise…
Browse files Browse the repository at this point in the history
…Combiner #1283
  • Loading branch information
mp911de committed May 5, 2020
1 parent bd72456 commit 53c6268
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 44 deletions.
23 changes: 6 additions & 17 deletions src/main/java/io/lettuce/core/resource/DefaultClientResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package io.lettuce.core.resource;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand All @@ -28,7 +25,6 @@
import io.lettuce.core.event.EventPublisherOptions;
import io.lettuce.core.event.metrics.DefaultCommandLatencyEventPublisher;
import io.lettuce.core.event.metrics.MetricEventPublisher;
import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceLists;
import io.lettuce.core.metrics.CommandLatencyCollector;
Expand Down Expand Up @@ -601,8 +597,8 @@ public Future<Boolean> shutdown(long quietPeriod, long timeout, TimeUnit timeUni
logger.debug("Initiate shutdown ({}, {}, {})", quietPeriod, timeout, timeUnit);

shutdownCalled = true;
DefaultPromise<Boolean> overall = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
List<CompletionStage<?>> stages = new ArrayList<>();
DefaultPromise<Void> voidPromise = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE);
PromiseCombiner aggregator = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);

if (metricEventPublisher != null) {
metricEventPublisher.shutdown();
Expand All @@ -614,28 +610,21 @@ public Future<Boolean> shutdown(long quietPeriod, long timeout, TimeUnit timeUni

if (!sharedEventLoopGroupProvider) {
Future<Boolean> shutdown = eventLoopGroupProvider.shutdown(quietPeriod, timeout, timeUnit);
stages.add(Futures.toCompletionStage(shutdown));
aggregator.add(shutdown);
}

if (!sharedEventExecutor) {
Future<?> shutdown = eventExecutorGroup.shutdownGracefully(quietPeriod, timeout, timeUnit);
stages.add(Futures.toCompletionStage(shutdown));
aggregator.add(shutdown);
}

if (!sharedCommandLatencyCollector) {
commandLatencyCollector.shutdown();
}

Futures.allOf(stages).whenComplete((ignore, throwable) -> {
aggregator.finish(voidPromise);

if (throwable != null) {
overall.setFailure(throwable);
} else {
overall.setSuccess(true);
}
});

return overall;
return PromiseAdapter.toBooleanPromise(voidPromise);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

import static io.lettuce.core.resource.PromiseAdapter.toBooleanPromise;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import io.lettuce.core.internal.Futures;
import io.lettuce.core.internal.LettuceAssert;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.*;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand Down Expand Up @@ -210,23 +209,24 @@ private static <T extends EventExecutorGroup> EventExecutorGroup createEventLoop

@Override
public Promise<Boolean> release(EventExecutorGroup eventLoopGroup, long quietPeriod, long timeout, TimeUnit unit) {
return toBooleanPromise(doRelease(eventLoopGroup, quietPeriod, timeout, unit));
}

private Future<?> doRelease(EventExecutorGroup eventLoopGroup, long quietPeriod, long timeout, TimeUnit unit) {

logger.debug("Release executor {}", eventLoopGroup);

Class<?> key = getKey(release(eventLoopGroup));

if ((key == null && eventLoopGroup.isShuttingDown()) || refCounter.containsKey(eventLoopGroup)) {
DefaultPromise<Boolean> promise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
promise.setSuccess(true);
return promise;
return new SucceededFuture<>(ImmediateEventExecutor.INSTANCE, true);
}

if (key != null) {
eventLoopGroups.remove(key);
}

Future<?> shutdownFuture = eventLoopGroup.shutdownGracefully(quietPeriod, timeout, unit);
return toBooleanPromise(shutdownFuture);
return eventLoopGroup.shutdownGracefully(quietPeriod, timeout, unit);
}

private Class<?> getKey(EventExecutorGroup eventLoopGroup) {
Expand All @@ -248,34 +248,24 @@ public int threadPoolSize() {
}

@Override
@SuppressWarnings("unchecked")
public Future<Boolean> shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) {

logger.debug("Initiate shutdown ({}, {}, {})", quietPeriod, timeout, timeUnit);

shutdownCalled = true;

List<EventExecutorGroup> copy = new ArrayList<>(eventLoopGroups.values());
DefaultPromise<Boolean> overall = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
CompletableFuture[] futures = new CompletableFuture[copy.size()];
Map<Class<? extends EventExecutorGroup>, EventExecutorGroup> copy = new HashMap<>(eventLoopGroups);

for (int i = 0; i < copy.size(); i++) {
DefaultPromise<Void> overall = new DefaultPromise<>(ImmediateEventExecutor.INSTANCE);
PromiseCombiner combiner = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);

EventExecutorGroup executorGroup = copy.get(i);
futures[i] = Futures.toCompletionStage(release(executorGroup, quietPeriod, timeout, timeUnit))
.toCompletableFuture();
for (EventExecutorGroup executorGroup : copy.values()) {
combiner.add(doRelease(executorGroup, quietPeriod, timeout, timeUnit));
}

CompletableFuture.allOf(futures).whenComplete((ignore, throwable) -> {

if (throwable != null) {
overall.setFailure(throwable);
} else {
overall.setSuccess(true);
}
});
combiner.finish(overall);

return overall;
return PromiseAdapter.toBooleanPromise(overall);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ public void afterChannelInitialized(Channel channel) {
assertThat(initial.isOpen()).isFalse();

Channel reconnect = ref.take();
Wait.untilTrue(() -> !reconnect.isOpen()).waitOrTimeout();
assertThat(reconnect.isOpen()).isFalse();

FastShutdown.shutdown(client);
Expand Down

0 comments on commit 53c6268

Please sign in to comment.