Skip to content

Commit

Permalink
Refactor cluster topology refresh to non-blocking operations #1107
Browse files Browse the repository at this point in the history
The cluster topology refresh now no longer uses a blocking task to obtain the topology but uses a non-blocking sequence of futures to connect to nodes, request client and topology details and to return results.

Previously, topology refresh was partially asynchronous by connecting to all nodes and sending commands in parallel. The synchronization to await all connections and command completion was blocking which kept a carrier thread busy until the refresh task completed.

The side effect of keeping a worker thread busy was that command expiry and other work got delayed until the topology refresh released the thread.
  • Loading branch information
mp911de committed Mar 13, 2020
1 parent efb0b6c commit 4b602e7
Show file tree
Hide file tree
Showing 19 changed files with 910 additions and 992 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/ExceptionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static RedisCommandTimeoutException createTimeoutException(String message
String.format("%s. Command timed out after %s", message, formatTimeout(timeout)));
}

static String formatTimeout(Duration duration) {
public static String formatTimeout(Duration duration) {

if (duration.isZero()) {
return "no timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,108 +16,102 @@
package io.lettuce.core.cluster;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.event.cluster.AdaptiveRefreshTriggeredEvent;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
* Scheduler utility to schedule and initiate cluster topology refresh.
*
* @author Mark Paluch
*/
class ClusterTopologyRefreshScheduler implements Runnable, ClusterEventListener {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ClusterTopologyRefreshScheduler.class);
private static final ClusterTopologyRefreshOptions FALLBACK_OPTIONS = ClusterTopologyRefreshOptions.create();

private final RedisClusterClient redisClusterClient;
private final Supplier<ClusterClientOptions> clientOptions;
private final Supplier<Partitions> partitions;
private final ClientResources clientResources;
private final ClusterTopologyRefreshTask clusterTopologyRefreshTask;
private final AtomicReference<Timeout> timeoutRef = new AtomicReference<>();

ClusterTopologyRefreshScheduler(RedisClusterClient redisClusterClient, ClientResources clientResources) {
private final AtomicBoolean clusterTopologyRefreshActivated = new AtomicBoolean(false);
private final AtomicReference<ScheduledFuture<?>> clusterTopologyRefreshFuture = new AtomicReference<>();
private final EventExecutorGroup genericWorkerPool;

ClusterTopologyRefreshScheduler(Supplier<ClusterClientOptions> clientOptions, Supplier<Partitions> partitions,
Supplier<CompletionStage<?>> refreshTopology, ClientResources clientResources) {

this.redisClusterClient = redisClusterClient;
this.clientOptions = clientOptions;
this.partitions = partitions;
this.clientResources = clientResources;
this.clusterTopologyRefreshTask = new ClusterTopologyRefreshTask(redisClusterClient);
this.genericWorkerPool = this.clientResources.eventExecutorGroup();
this.clusterTopologyRefreshTask = new ClusterTopologyRefreshTask(refreshTopology);
}

@Override
public void run() {
protected void activateTopologyRefreshIfNeeded() {

logger.debug("ClusterTopologyRefreshScheduler.run()");
ClusterClientOptions options = clientOptions.get();
ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();

if (isEventLoopActive() && redisClusterClient.getClusterClientOptions() != null) {
if (!redisClusterClient.getClusterClientOptions().isRefreshClusterView()) {
logger.debug("Periodic ClusterTopologyRefresh is disabled");
return;
}
} else {
logger.debug("Periodic ClusterTopologyRefresh is disabled");
if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) {
return;
}

clientResources.eventExecutorGroup().submit(clusterTopologyRefreshTask);
}

private boolean indicateTopologyRefreshSignal() {

logger.debug("ClusterTopologyRefreshScheduler.indicateTopologyRefreshSignal()");

if (!acquireTimeout()) {
return false;
}

return scheduleRefresh();
}

private boolean scheduleRefresh() {

if (isEventLoopActive() && redisClusterClient.getClusterClientOptions() != null) {
clientResources.eventExecutorGroup().submit(clusterTopologyRefreshTask);
return true;
if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {
ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(this,
options.getRefreshPeriod().toNanos(), options.getRefreshPeriod().toNanos(), TimeUnit.NANOSECONDS);
clusterTopologyRefreshFuture.set(scheduledFuture);
}

logger.debug("ClusterTopologyRefresh is disabled");
return false;
}

/**
* Check if the {@link EventExecutorGroup} is active
*
* @return false if the worker pool is terminating, shutdown or terminated
* Disable periodic topology refresh.
*/
private boolean isEventLoopActive() {

EventExecutorGroup eventExecutors = clientResources.eventExecutorGroup();
if (eventExecutors.isShuttingDown() || eventExecutors.isShutdown() || eventExecutors.isTerminated()) {
return false;
}
public void shutdown() {

return true;
}

private boolean acquireTimeout() {
if (clusterTopologyRefreshActivated.compareAndSet(true, false)) {

Timeout existingTimeout = timeoutRef.get();
ScheduledFuture<?> scheduledFuture = clusterTopologyRefreshFuture.get();

if (existingTimeout != null) {
if (!existingTimeout.isExpired()) {
return false;
try {
scheduledFuture.cancel(false);
clusterTopologyRefreshFuture.set(null);
} catch (Exception e) {
logger.debug("Could not cancel Cluster topology refresh", e);
}
}
}

ClusterTopologyRefreshOptions refreshOptions = getClusterTopologyRefreshOptions();
Timeout timeout = new Timeout(refreshOptions.getAdaptiveRefreshTimeout());
@Override
public void run() {

if (timeoutRef.compareAndSet(existingTimeout, timeout)) {
return true;
logger.debug("ClusterTopologyRefreshScheduler.run()");

if (isEventLoopActive()) {

if (!clientOptions.get().isRefreshClusterView()) {
logger.debug("Periodic ClusterTopologyRefresh is disabled");
return;
}
} else {
logger.debug("Periodic ClusterTopologyRefresh is disabled");
return;
}

return false;
clientResources.eventExecutorGroup().submit(clusterTopologyRefreshTask);
}

@Override
Expand Down Expand Up @@ -171,18 +165,74 @@ public void onUnknownNode() {

private void emitAdaptiveRefreshScheduledEvent() {

AdaptiveRefreshTriggeredEvent event = new AdaptiveRefreshTriggeredEvent(redisClusterClient::getPartitions,
this::scheduleRefresh);
AdaptiveRefreshTriggeredEvent event = new AdaptiveRefreshTriggeredEvent(partitions, this::scheduleRefresh);

clientResources.eventBus().publish(event);
}

private boolean indicateTopologyRefreshSignal() {

logger.debug("ClusterTopologyRefreshScheduler.indicateTopologyRefreshSignal()");

if (!acquireTimeout()) {
return false;
}

return scheduleRefresh();
}

private boolean scheduleRefresh() {

if (isEventLoopActive()) {
clientResources.eventExecutorGroup().submit(clusterTopologyRefreshTask);
return true;
}

logger.debug("ClusterTopologyRefresh is disabled");
return false;
}

/**
* Check if the {@link EventExecutorGroup} is active
*
* @return false if the worker pool is terminating, shutdown or terminated
*/
private boolean isEventLoopActive() {

EventExecutorGroup eventExecutors = clientResources.eventExecutorGroup();
if (eventExecutors.isShuttingDown() || eventExecutors.isShutdown() || eventExecutors.isTerminated()) {
return false;
}

return true;
}

private boolean acquireTimeout() {

Timeout existingTimeout = timeoutRef.get();

if (existingTimeout != null) {
if (!existingTimeout.isExpired()) {
return false;
}
}

ClusterTopologyRefreshOptions refreshOptions = getClusterTopologyRefreshOptions();
Timeout timeout = new Timeout(refreshOptions.getAdaptiveRefreshTimeout());

if (timeoutRef.compareAndSet(existingTimeout, timeout)) {
return true;
}

return false;
}

private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {

ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();
ClientOptions clientOptions = this.clientOptions.get();

if (clusterClientOptions != null) {
return clusterClientOptions.getTopologyRefreshOptions();
if (clientOptions instanceof ClusterClientOptions) {
return ((ClusterClientOptions) clientOptions).getTopologyRefreshOptions();
}

return FALLBACK_OPTIONS;
Expand Down Expand Up @@ -220,24 +270,19 @@ public long remaining() {
}
}

private static class ClusterTopologyRefreshTask implements Runnable {
private static class ClusterTopologyRefreshTask extends AtomicBoolean implements Runnable {

private final RedisClusterClient redisClusterClient;
private final AtomicBoolean unique = new AtomicBoolean();
private static final long serialVersionUID = -1337731371220365694L;
private final Supplier<CompletionStage<?>> reloadTopologyAsync;

ClusterTopologyRefreshTask(RedisClusterClient redisClusterClient) {
this.redisClusterClient = redisClusterClient;
ClusterTopologyRefreshTask(Supplier<CompletionStage<?>> reloadTopologyAsync) {
this.reloadTopologyAsync = reloadTopologyAsync;
}

public void run() {

if (unique.compareAndSet(false, true)) {
try {
doRun();
} finally {
unique.set(false);
}

if (compareAndSet(false, true)) {
doRun();
return;
}

Expand All @@ -249,11 +294,17 @@ public void run() {
void doRun() {

if (logger.isDebugEnabled()) {
logger.debug("ClusterTopologyRefreshTask requesting partitions from {}",
redisClusterClient.getTopologyRefreshSource());
logger.debug("ClusterTopologyRefreshTask requesting partitions");
}
try {
redisClusterClient.reloadPartitions();
reloadTopologyAsync.get().whenComplete((ignore, throwable) -> {

if (throwable != null) {
logger.warn("Cannot refresh Redis Cluster topology", throwable);
}

set(false);
});
} catch (Exception e) {
logger.warn("Cannot refresh Redis Cluster topology", e);
}
Expand Down
Loading

0 comments on commit 4b602e7

Please sign in to comment.