Skip to content

Commit

Permalink
Use HashedWheelTimer for command timeout scheduling to reduce threa…
Browse files Browse the repository at this point in the history
…d context switches and improve performance redis#2773
  • Loading branch information
yangty89 committed Mar 6, 2024
1 parent 18750c1 commit 388e0ac
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions src/main/java/io/lettuce/core/protocol/CommandExpiryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
import io.lettuce.core.internal.ExceptionFactory;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.resource.ClientResources;
import io.netty.util.Timeout;
import io.netty.util.Timer;

/**
* Extension to {@link RedisChannelWriter} that expires commands. Command timeout starts at the time the command is written
* regardless to {@link #setAutoFlushCommands(boolean) flushing mode} (user-controlled batching).
*
* @author Mark Paluch
* @author Tianyi Yang
* @since 5.1
* @see io.lettuce.core.TimeoutOptions
*/
Expand All @@ -49,6 +52,8 @@ public class CommandExpiryWriter implements RedisChannelWriter {

private final ScheduledExecutorService executorService;

private final Timer timer;

private final boolean applyConnectionTimeout;

private volatile long timeout = -1;
Expand All @@ -72,6 +77,7 @@ public CommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOpti
this.applyConnectionTimeout = timeoutOptions.isApplyConnectionTimeout();
this.timeUnit = source.getTimeUnit();
this.executorService = clientResources.eventExecutorGroup();
this.timer = clientResources.timer();
}

/**
Expand Down Expand Up @@ -168,24 +174,19 @@ private void potentiallyExpire(RedisCommand<?, ?, ?> command, ScheduledExecutorS
if (timeout <= 0) {
return;
}

ScheduledFuture<?> schedule = executors.schedule(() -> {


Timeout commandTimeout = timer.newTimeout(t -> {
if (!command.isDone()) {
command.completeExceptionally(
ExceptionFactory.createTimeoutException(Duration.ofNanos(timeUnit.toNanos(timeout))));
}
executors.submit(() -> command.completeExceptionally(
ExceptionFactory.createTimeoutException(Duration.ofNanos(timeUnit.toNanos(timeout)))));

}
}, timeout, timeUnit);

if (command instanceof CompleteableCommand) {
((CompleteableCommand) command).onComplete((o, o2) -> {

if (!schedule.isDone()) {
schedule.cancel(false);
}
});
((CompleteableCommand) command).onComplete((o, o2) -> commandTimeout.cancel());
}

}

}

0 comments on commit 388e0ac

Please sign in to comment.