diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientRequestContextBuilder.java b/core/src/main/java/com/linecorp/armeria/client/ClientRequestContextBuilder.java index ac80ec33d71..6ac8ca5ab1c 100644 --- a/core/src/main/java/com/linecorp/armeria/client/ClientRequestContextBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/client/ClientRequestContextBuilder.java @@ -15,6 +15,7 @@ */ package com.linecorp.armeria.client; +import static com.linecorp.armeria.internal.common.CancellationScheduler.noopCancellationTask; import static java.util.Objects.requireNonNull; import java.net.InetSocketAddress; @@ -35,12 +36,10 @@ import com.linecorp.armeria.common.util.SystemInfo; import com.linecorp.armeria.internal.client.DefaultClientRequestContext; import com.linecorp.armeria.internal.common.CancellationScheduler; -import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask; import io.micrometer.core.instrument.MeterRegistry; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.EventLoop; -import io.netty.util.concurrent.ImmediateEventExecutor; /** * Builds a new {@link ClientRequestContext}. Note that it is not usually required to create a new context by @@ -49,27 +48,6 @@ */ public final class ClientRequestContextBuilder extends AbstractRequestContextBuilder { - private static final CancellationTask noopCancellationTask = new CancellationTask() { - @Override - public boolean canSchedule() { - return true; - } - - @Override - public void run(Throwable cause) { /* no-op */ } - }; - - /** - * A cancellation scheduler that has been finished. - */ - static final CancellationScheduler noopResponseCancellationScheduler = new CancellationScheduler(0); - - static { - noopResponseCancellationScheduler - .init(ImmediateEventExecutor.INSTANCE, noopCancellationTask, 0, /* server */ false); - noopResponseCancellationScheduler.finishNow(); - } - @Nullable private Endpoint endpoint; private ClientOptions options = ClientOptions.of(); @@ -136,9 +114,9 @@ public ClientRequestContext build() { final CancellationScheduler responseCancellationScheduler; if (timedOut()) { - responseCancellationScheduler = noopResponseCancellationScheduler; + responseCancellationScheduler = CancellationScheduler.finished(false); } else { - responseCancellationScheduler = new CancellationScheduler(0); + responseCancellationScheduler = CancellationScheduler.of(0); final CountDownLatch latch = new CountDownLatch(1); eventLoop().execute(() -> { responseCancellationScheduler.init(eventLoop(), noopCancellationTask, 0, /* server */ false); diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java b/core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java index bca1bec9fc0..1d6f6a781b2 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpClientPipelineConfigurator.java @@ -17,7 +17,6 @@ package com.linecorp.armeria.client; import static com.google.common.base.MoreObjects.firstNonNull; -import static com.linecorp.armeria.client.ClientRequestContextBuilder.noopResponseCancellationScheduler; import static com.linecorp.armeria.common.SessionProtocol.H1; import static com.linecorp.armeria.common.SessionProtocol.H1C; import static com.linecorp.armeria.common.SessionProtocol.H2; @@ -57,6 +56,7 @@ import com.linecorp.armeria.internal.client.UserAgentUtil; import com.linecorp.armeria.internal.common.ArmeriaHttp2HeadersDecoder; import com.linecorp.armeria.internal.common.ArmeriaHttpUtil; +import com.linecorp.armeria.internal.common.CancellationScheduler; import com.linecorp.armeria.internal.common.ReadSuppressingHandler; import com.linecorp.armeria.internal.common.TrafficLoggingHandler; import com.linecorp.armeria.internal.common.util.ChannelUtil; @@ -547,7 +547,7 @@ public void onComplete() {} com.linecorp.armeria.common.HttpMethod.OPTIONS, RequestTarget.forClient("*"), ClientOptions.of(), HttpRequest.of(com.linecorp.armeria.common.HttpMethod.OPTIONS, "*"), - null, REQUEST_OPTIONS_FOR_UPGRADE_REQUEST, noopResponseCancellationScheduler, + null, REQUEST_OPTIONS_FOR_UPGRADE_REQUEST, CancellationScheduler.noop(), System.nanoTime(), SystemInfo.currentTimeMicros()); // NB: No need to set the response timeout because we have session creation timeout. diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java index eef72c48c05..a8f75ce1494 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/DefaultClientRequestContext.java @@ -226,7 +226,7 @@ private DefaultClientRequestContext( responseTimeoutMillis = options().responseTimeoutMillis(); } this.responseCancellationScheduler = - new CancellationScheduler(TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis)); + CancellationScheduler.of(TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis)); } else { this.responseCancellationScheduler = responseCancellationScheduler; } @@ -504,7 +504,7 @@ private DefaultClientRequestContext(DefaultClientRequestContext ctx, log = RequestLog.builder(this); log.startRequest(); responseCancellationScheduler = - new CancellationScheduler(TimeUnit.MILLISECONDS.toNanos(ctx.responseTimeoutMillis())); + CancellationScheduler.of(TimeUnit.MILLISECONDS.toNanos(ctx.responseTimeoutMillis())); writeTimeoutMillis = ctx.writeTimeoutMillis(); maxResponseLength = ctx.maxResponseLength(); diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java b/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java index a0e51bb1797..6fd9015aea0 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/CancellationScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 LINE Corporation + * Copyright 2023 LINE Corporation * * LINE Corporation licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance @@ -16,481 +16,81 @@ package com.linecorp.armeria.internal.common; -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.concurrent.TimeUnit.NANOSECONDS; - import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import com.google.common.annotations.VisibleForTesting; -import com.google.common.math.LongMath; -import com.linecorp.armeria.client.ResponseTimeoutException; -import com.linecorp.armeria.common.TimeoutException; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.TimeoutMode; -import com.linecorp.armeria.common.util.UnmodifiableFuture; -import com.linecorp.armeria.server.RequestTimeoutException; import io.netty.util.concurrent.EventExecutor; -@SuppressWarnings("UnstableApiUsage") -public final class CancellationScheduler { - - private static final AtomicReferenceFieldUpdater - whenCancellingUpdater = AtomicReferenceFieldUpdater.newUpdater( - CancellationScheduler.class, CancellationFuture.class, "whenCancelling"); - - private static final AtomicReferenceFieldUpdater - whenCancelledUpdater = AtomicReferenceFieldUpdater.newUpdater( - CancellationScheduler.class, CancellationFuture.class, "whenCancelled"); - - private static final AtomicReferenceFieldUpdater - whenTimingOutUpdater = AtomicReferenceFieldUpdater.newUpdater( - CancellationScheduler.class, TimeoutFuture.class, "whenTimingOut"); +public interface CancellationScheduler { - private static final AtomicReferenceFieldUpdater - whenTimedOutUpdater = AtomicReferenceFieldUpdater.newUpdater( - CancellationScheduler.class, TimeoutFuture.class, "whenTimedOut"); - - private static final AtomicReferenceFieldUpdater - pendingTaskUpdater = AtomicReferenceFieldUpdater.newUpdater( - CancellationScheduler.class, Runnable.class, "pendingTask"); - - private static final AtomicLongFieldUpdater pendingTimeoutNanosUpdater = - AtomicLongFieldUpdater.newUpdater(CancellationScheduler.class, "pendingTimeoutNanos"); - - private static final Runnable noopPendingTask = () -> { - }; - - private State state = State.INIT; - private long timeoutNanos; - private long startTimeNanos; - @Nullable - private EventExecutor eventLoop; - @Nullable - private CancellationTask task; - @Nullable - private volatile Runnable pendingTask; - @Nullable - private ScheduledFuture scheduledFuture; - @Nullable - private volatile CancellationFuture whenCancelling; - @Nullable - private volatile CancellationFuture whenCancelled; - @Nullable - private volatile TimeoutFuture whenTimingOut; - @Nullable - private volatile TimeoutFuture whenTimedOut; - @SuppressWarnings("FieldMayBeFinal") - private volatile long pendingTimeoutNanos; - private boolean server; - @Nullable - private Throwable cause; - - public CancellationScheduler(long timeoutNanos) { - this.timeoutNanos = timeoutNanos; - pendingTimeoutNanos = timeoutNanos; + static CancellationScheduler of(long timeoutNanos) { + return new DefaultCancellationScheduler(timeoutNanos); } /** - * Initializes this {@link CancellationScheduler}. + * A {@link CancellationScheduler} that has already completed. */ - public void init(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server) { - if (!eventLoop.inEventLoop()) { - eventLoop.execute(() -> init0(eventLoop, task, timeoutNanos, server)); + static CancellationScheduler finished(boolean server) { + if (server) { + return DefaultCancellationScheduler.serverFinishedCancellationScheduler; } else { - init0(eventLoop, task, timeoutNanos, server); + return DefaultCancellationScheduler.clientFinishedCancellationScheduler; } } - private void init0(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server) { - if (state != State.INIT) { - return; - } - this.eventLoop = eventLoop; - this.task = task; - if (timeoutNanos > 0) { - this.timeoutNanos = timeoutNanos; - } - this.server = server; - startTimeNanos = System.nanoTime(); - if (this.timeoutNanos != 0) { - state = State.SCHEDULED; - scheduledFuture = - eventLoop.schedule(() -> invokeTask(null), this.timeoutNanos, NANOSECONDS); - } else { - state = State.INACTIVE; - } - for (;;) { - final Runnable pendingTask = this.pendingTask; - if (pendingTaskUpdater.compareAndSet(this, pendingTask, noopPendingTask)) { - if (pendingTask != null) { - pendingTask.run(); - } - break; - } - } - } - - public void clearTimeout() { - clearTimeout(true); - } - - public void clearTimeout(boolean resetTimeout) { - if (timeoutNanos() == 0) { - return; - } - if (isInitialized()) { - if (eventLoop.inEventLoop()) { - clearTimeout0(resetTimeout); - } else { - eventLoop.execute(() -> clearTimeout0(resetTimeout)); - } - } else { - if (resetTimeout) { - setPendingTimeoutNanos(0); - } - addPendingTask(() -> clearTimeout0(resetTimeout)); - } + /** + * A {@link CancellationScheduler} that never completes. + */ + static CancellationScheduler noop() { + return NoopCancellationScheduler.INSTANCE; } - private boolean clearTimeout0(boolean resetTimeout) { - assert eventLoop != null && eventLoop.inEventLoop(); - if (state != State.SCHEDULED) { + CancellationTask noopCancellationTask = new CancellationTask() { + @Override + public boolean canSchedule() { return true; } - if (resetTimeout) { - timeoutNanos = 0; - } - assert scheduledFuture != null; - final boolean cancelled = scheduledFuture.cancel(false); - scheduledFuture = null; - if (cancelled) { - state = State.INACTIVE; - } - return cancelled; - } - public void setTimeoutNanos(TimeoutMode mode, long timeoutNanos) { - switch (mode) { - case SET_FROM_NOW: - setTimeoutNanosFromNow(timeoutNanos); - break; - case SET_FROM_START: - setTimeoutNanosFromStart(timeoutNanos); - break; - case EXTEND: - extendTimeoutNanos(timeoutNanos); - break; - } - } - - private void setTimeoutNanosFromStart(long timeoutNanos) { - checkArgument(timeoutNanos >= 0, "timeoutNanos: %s (expected: >= 0)", timeoutNanos); - if (timeoutNanos == 0) { - clearTimeout(); - return; - } - if (isInitialized()) { - if (eventLoop.inEventLoop()) { - setTimeoutNanosFromStart0(timeoutNanos); - } else { - eventLoop.execute(() -> setTimeoutNanosFromStart0(timeoutNanos)); - } - } else { - setPendingTimeoutNanos(timeoutNanos); - addPendingTask(() -> setTimeoutNanosFromStart0(timeoutNanos)); - } - } - - private void setTimeoutNanosFromStart0(long timeoutNanos) { - assert eventLoop != null && eventLoop.inEventLoop(); - final long passedTimeNanos = System.nanoTime() - startTimeNanos; - final long newTimeoutNanos = LongMath.saturatedSubtract(timeoutNanos, passedTimeNanos); - if (newTimeoutNanos <= 0) { - invokeTask(null); - return; - } - // Cancel the previously scheduled timeout, if exists. - clearTimeout0(true); - this.timeoutNanos = timeoutNanos; - state = State.SCHEDULED; - scheduledFuture = eventLoop.schedule(() -> invokeTask(null), newTimeoutNanos, NANOSECONDS); - } - - private void extendTimeoutNanos(long adjustmentNanos) { - if (adjustmentNanos == 0 || timeoutNanos() == 0) { - return; - } - if (isInitialized()) { - if (eventLoop.inEventLoop()) { - extendTimeoutNanos0(adjustmentNanos); - } else { - eventLoop.execute(() -> extendTimeoutNanos0(adjustmentNanos)); - } - } else { - addPendingTimeoutNanos(adjustmentNanos); - addPendingTask(() -> extendTimeoutNanos0(adjustmentNanos)); - } - } - - private void extendTimeoutNanos0(long adjustmentNanos) { - assert eventLoop != null && eventLoop.inEventLoop() && task != null; - if (state != State.SCHEDULED || !task.canSchedule()) { - return; - } - final long timeoutNanos = this.timeoutNanos; - // Cancel the previously scheduled timeout, if exists. - clearTimeout0(true); - this.timeoutNanos = LongMath.saturatedAdd(timeoutNanos, adjustmentNanos); - if (timeoutNanos <= 0) { - invokeTask(null); - return; - } - state = State.SCHEDULED; - scheduledFuture = eventLoop.schedule(() -> invokeTask(null), this.timeoutNanos, NANOSECONDS); - } - - private void setTimeoutNanosFromNow(long timeoutNanos) { - checkArgument(timeoutNanos > 0, "timeoutNanos: %s (expected: > 0)", timeoutNanos); - if (isInitialized()) { - if (eventLoop.inEventLoop()) { - setTimeoutNanosFromNow0(timeoutNanos); - } else { - final long eventLoopStartTimeNanos = System.nanoTime(); - eventLoop.execute(() -> { - final long passedTimeNanos0 = System.nanoTime() - eventLoopStartTimeNanos; - final long timeoutNanos0 = Math.max(1, timeoutNanos - passedTimeNanos0); - setTimeoutNanosFromNow0(timeoutNanos0); - }); - } - } else { - final long pendingTaskRegisterTimeNanos = System.nanoTime(); - setPendingTimeoutNanos(timeoutNanos); - addPendingTask(() -> { - final long passedTimeNanos0 = System.nanoTime() - pendingTaskRegisterTimeNanos; - final long timeoutNanos0 = Math.max(1, timeoutNanos - passedTimeNanos0); - setTimeoutNanosFromNow0(timeoutNanos0); - }); - } - } + @Override + public void run(Throwable cause) { /* no-op */ } + }; - private void setTimeoutNanosFromNow0(long newTimeoutNanos) { - assert newTimeoutNanos > 0; - assert eventLoop != null && eventLoop.inEventLoop() && task != null; - if (isFinishing() || !task.canSchedule()) { - return; - } - // Cancel the previously scheduled timeout, if exists. - clearTimeout0(true); - final long passedTimeNanos = System.nanoTime() - startTimeNanos; - timeoutNanos = LongMath.saturatedAdd(newTimeoutNanos, passedTimeNanos); + void init(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server); - state = State.SCHEDULED; - scheduledFuture = eventLoop.schedule(() -> invokeTask(null), newTimeoutNanos, NANOSECONDS); - } + void clearTimeout(); - public void finishNow() { - finishNow(null); - } + void clearTimeout(boolean resetTimeout); - public void finishNow(@Nullable Throwable cause) { - if (isFinishing()) { - return; - } - if (isInitialized()) { - if (eventLoop.inEventLoop()) { - finishNow0(cause); - } else { - eventLoop.execute(() -> finishNow0(cause)); - } - } else { - addPendingTask(() -> finishNow0(cause)); - } - } + void setTimeoutNanos(TimeoutMode mode, long timeoutNanos); - private void finishNow0(@Nullable Throwable cause) { - assert eventLoop != null && eventLoop.inEventLoop() && task != null; - if (isFinishing() || !task.canSchedule()) { - return; - } - if (state == State.SCHEDULED) { - if (clearTimeout0(false)) { - invokeTask(cause); - } - } else { - invokeTask(cause); - } - } + void finishNow(); - public boolean isFinished() { - return state == State.FINISHED; - } + void finishNow(@Nullable Throwable cause); - private boolean isFinishing() { - return state == State.FINISHED || state == State.FINISHING; - } + boolean isFinished(); - @Nullable - public Throwable cause() { - return cause; - } + @Nullable Throwable cause(); - public long timeoutNanos() { - return isInitialized() ? timeoutNanos : pendingTimeoutNanos; - } + long timeoutNanos(); - public long startTimeNanos() { - return startTimeNanos; - } + long startTimeNanos(); - public CompletableFuture whenCancelling() { - final CancellationFuture whenCancelling = this.whenCancelling; - if (whenCancelling != null) { - return whenCancelling; - } - final CancellationFuture cancellationFuture = new CancellationFuture(); - if (whenCancellingUpdater.compareAndSet(this, null, cancellationFuture)) { - return cancellationFuture; - } else { - return this.whenCancelling; - } - } + CompletableFuture whenCancelling(); - public CompletableFuture whenCancelled() { - final CancellationFuture whenCancelled = this.whenCancelled; - if (whenCancelled != null) { - return whenCancelled; - } - final CancellationFuture cancellationFuture = new CancellationFuture(); - if (whenCancelledUpdater.compareAndSet(this, null, cancellationFuture)) { - return cancellationFuture; - } else { - return this.whenCancelled; - } - } + CompletableFuture whenCancelled(); @Deprecated - public CompletableFuture whenTimingOut() { - final TimeoutFuture whenTimingOut = this.whenTimingOut; - if (whenTimingOut != null) { - return whenTimingOut; - } - final TimeoutFuture timeoutFuture = new TimeoutFuture(); - if (whenTimingOutUpdater.compareAndSet(this, null, timeoutFuture)) { - whenCancelling().thenAccept(cause -> { - if (cause instanceof TimeoutException) { - timeoutFuture.doComplete(); - } - }); - return timeoutFuture; - } else { - return this.whenTimingOut; - } - } + CompletableFuture whenTimingOut(); @Deprecated - public CompletableFuture whenTimedOut() { - final TimeoutFuture whenTimedOut = this.whenTimedOut; - if (whenTimedOut != null) { - return whenTimedOut; - } - final TimeoutFuture timeoutFuture = new TimeoutFuture(); - if (whenTimedOutUpdater.compareAndSet(this, null, timeoutFuture)) { - whenCancelled().thenAccept(cause -> { - if (cause instanceof TimeoutException) { - timeoutFuture.doComplete(); - } - }); - return timeoutFuture; - } else { - return this.whenTimedOut; - } - } + CompletableFuture whenTimedOut(); @VisibleForTesting - public boolean isInitialized() { - return pendingTask == noopPendingTask && eventLoop != null; - } - - private void addPendingTask(Runnable pendingTask) { - if (!pendingTaskUpdater.compareAndSet(this, null, pendingTask)) { - for (;;) { - final Runnable oldPendingTask = this.pendingTask; - assert oldPendingTask != null; - if (oldPendingTask == noopPendingTask) { - assert eventLoop != null; - eventLoop.execute(pendingTask); - break; - } - final Runnable newPendingTask = () -> { - oldPendingTask.run(); - pendingTask.run(); - }; - if (pendingTaskUpdater.compareAndSet(this, oldPendingTask, newPendingTask)) { - break; - } - } - } - } - - private void setPendingTimeoutNanos(long pendingTimeoutNanos) { - for (;;) { - final long oldPendingTimeoutNanos = this.pendingTimeoutNanos; - if (pendingTimeoutNanosUpdater.compareAndSet(this, oldPendingTimeoutNanos, pendingTimeoutNanos)) { - break; - } - } - } - - private void addPendingTimeoutNanos(long pendingTimeoutNanos) { - for (;;) { - final long oldPendingTimeoutNanos = this.pendingTimeoutNanos; - final long newPendingTimeoutNanos = LongMath.saturatedAdd(oldPendingTimeoutNanos, - pendingTimeoutNanos); - if (pendingTimeoutNanosUpdater.compareAndSet(this, oldPendingTimeoutNanos, - newPendingTimeoutNanos)) { - break; - } - } - } - - private void invokeTask(@Nullable Throwable cause) { - if (task == null) { - return; - } - - if (cause == null) { - if (server) { - cause = RequestTimeoutException.get(); - } else { - cause = ResponseTimeoutException.get(); - } - } - - // Set FINISHING to preclude executing other timeout operations from the callbacks of `whenCancelling()` - state = State.FINISHING; - if (task.canSchedule()) { - ((CancellationFuture) whenCancelling()).doComplete(cause); - } - // Set state first to prevent duplicate execution - state = State.FINISHED; - - // The returned value of `canSchedule()` could've been changed by the callbacks of `whenCancelling()` - if (task.canSchedule()) { - task.run(cause); - } - this.cause = cause; - ((CancellationFuture) whenCancelled()).doComplete(cause); - } - - @VisibleForTesting - State state() { - return state; - } + boolean isInitialized(); enum State { INIT, @@ -503,7 +103,7 @@ enum State { /** * A cancellation task invoked by the scheduler when its timeout exceeds or invoke by the user. */ - public interface CancellationTask { + interface CancellationTask { /** * Returns {@code true} if the cancellation task can be scheduled. */ @@ -514,17 +114,4 @@ public interface CancellationTask { */ void run(Throwable cause); } - - private static class CancellationFuture extends UnmodifiableFuture { - @Override - protected void doComplete(@Nullable Throwable cause) { - super.doComplete(cause); - } - } - - private static class TimeoutFuture extends UnmodifiableFuture { - void doComplete() { - doComplete(null); - } - } } diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java b/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java new file mode 100644 index 00000000000..b3e31e5f349 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/internal/common/DefaultCancellationScheduler.java @@ -0,0 +1,533 @@ +/* + * Copyright 2020 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.common; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.math.LongMath; + +import com.linecorp.armeria.client.ResponseTimeoutException; +import com.linecorp.armeria.common.TimeoutException; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.util.TimeoutMode; +import com.linecorp.armeria.common.util.UnmodifiableFuture; +import com.linecorp.armeria.server.RequestTimeoutException; + +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.ImmediateEventExecutor; + +final class DefaultCancellationScheduler implements CancellationScheduler { + + private static final AtomicReferenceFieldUpdater + whenCancellingUpdater = AtomicReferenceFieldUpdater.newUpdater( + DefaultCancellationScheduler.class, CancellationFuture.class, "whenCancelling"); + + private static final AtomicReferenceFieldUpdater + whenCancelledUpdater = AtomicReferenceFieldUpdater.newUpdater( + DefaultCancellationScheduler.class, CancellationFuture.class, "whenCancelled"); + + private static final AtomicReferenceFieldUpdater + whenTimingOutUpdater = AtomicReferenceFieldUpdater.newUpdater( + DefaultCancellationScheduler.class, TimeoutFuture.class, "whenTimingOut"); + + private static final AtomicReferenceFieldUpdater + whenTimedOutUpdater = AtomicReferenceFieldUpdater.newUpdater( + DefaultCancellationScheduler.class, TimeoutFuture.class, "whenTimedOut"); + + private static final AtomicReferenceFieldUpdater + pendingTaskUpdater = AtomicReferenceFieldUpdater.newUpdater( + DefaultCancellationScheduler.class, Runnable.class, "pendingTask"); + + private static final AtomicLongFieldUpdater pendingTimeoutNanosUpdater = + AtomicLongFieldUpdater.newUpdater(DefaultCancellationScheduler.class, "pendingTimeoutNanos"); + + private static final Runnable noopPendingTask = () -> { + }; + + static final CancellationScheduler serverFinishedCancellationScheduler = finished0(true); + static final CancellationScheduler clientFinishedCancellationScheduler = finished0(false); + + private State state = State.INIT; + private long timeoutNanos; + private long startTimeNanos; + @Nullable + private EventExecutor eventLoop; + @Nullable + private CancellationTask task; + @Nullable + private volatile Runnable pendingTask; + @Nullable + private ScheduledFuture scheduledFuture; + @Nullable + private volatile CancellationFuture whenCancelling; + @Nullable + private volatile CancellationFuture whenCancelled; + @Nullable + private volatile TimeoutFuture whenTimingOut; + @Nullable + private volatile TimeoutFuture whenTimedOut; + @SuppressWarnings("FieldMayBeFinal") + private volatile long pendingTimeoutNanos; + private boolean server; + @Nullable + private Throwable cause; + + DefaultCancellationScheduler(long timeoutNanos) { + this.timeoutNanos = timeoutNanos; + pendingTimeoutNanos = timeoutNanos; + } + + /** + * Initializes this {@link DefaultCancellationScheduler}. + */ + @Override + public void init(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server) { + if (!eventLoop.inEventLoop()) { + eventLoop.execute(() -> init0(eventLoop, task, timeoutNanos, server)); + } else { + init0(eventLoop, task, timeoutNanos, server); + } + } + + private void init0(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server) { + if (state != State.INIT) { + return; + } + this.eventLoop = eventLoop; + this.task = task; + if (timeoutNanos > 0) { + this.timeoutNanos = timeoutNanos; + } + this.server = server; + startTimeNanos = System.nanoTime(); + if (this.timeoutNanos != 0) { + state = State.SCHEDULED; + scheduledFuture = + eventLoop.schedule(() -> invokeTask(null), this.timeoutNanos, NANOSECONDS); + } else { + state = State.INACTIVE; + } + for (;;) { + final Runnable pendingTask = this.pendingTask; + if (pendingTaskUpdater.compareAndSet(this, pendingTask, noopPendingTask)) { + if (pendingTask != null) { + pendingTask.run(); + } + break; + } + } + } + + @Override + public void clearTimeout() { + clearTimeout(true); + } + + @Override + public void clearTimeout(boolean resetTimeout) { + if (timeoutNanos() == 0) { + return; + } + if (isInitialized()) { + if (eventLoop.inEventLoop()) { + clearTimeout0(resetTimeout); + } else { + eventLoop.execute(() -> clearTimeout0(resetTimeout)); + } + } else { + if (resetTimeout) { + setPendingTimeoutNanos(0); + } + addPendingTask(() -> clearTimeout0(resetTimeout)); + } + } + + private boolean clearTimeout0(boolean resetTimeout) { + assert eventLoop != null && eventLoop.inEventLoop(); + if (state != State.SCHEDULED) { + return true; + } + if (resetTimeout) { + timeoutNanos = 0; + } + assert scheduledFuture != null; + final boolean cancelled = scheduledFuture.cancel(false); + scheduledFuture = null; + if (cancelled) { + state = State.INACTIVE; + } + return cancelled; + } + + @Override + public void setTimeoutNanos(TimeoutMode mode, long timeoutNanos) { + switch (mode) { + case SET_FROM_NOW: + setTimeoutNanosFromNow(timeoutNanos); + break; + case SET_FROM_START: + setTimeoutNanosFromStart(timeoutNanos); + break; + case EXTEND: + extendTimeoutNanos(timeoutNanos); + break; + } + } + + private void setTimeoutNanosFromStart(long timeoutNanos) { + checkArgument(timeoutNanos >= 0, "timeoutNanos: %s (expected: >= 0)", timeoutNanos); + if (timeoutNanos == 0) { + clearTimeout(); + return; + } + if (isInitialized()) { + if (eventLoop.inEventLoop()) { + setTimeoutNanosFromStart0(timeoutNanos); + } else { + eventLoop.execute(() -> setTimeoutNanosFromStart0(timeoutNanos)); + } + } else { + setPendingTimeoutNanos(timeoutNanos); + addPendingTask(() -> setTimeoutNanosFromStart0(timeoutNanos)); + } + } + + private void setTimeoutNanosFromStart0(long timeoutNanos) { + assert eventLoop != null && eventLoop.inEventLoop(); + final long passedTimeNanos = System.nanoTime() - startTimeNanos; + final long newTimeoutNanos = LongMath.saturatedSubtract(timeoutNanos, passedTimeNanos); + if (newTimeoutNanos <= 0) { + invokeTask(null); + return; + } + // Cancel the previously scheduled timeout, if exists. + clearTimeout0(true); + this.timeoutNanos = timeoutNanos; + state = State.SCHEDULED; + scheduledFuture = eventLoop.schedule(() -> invokeTask(null), newTimeoutNanos, NANOSECONDS); + } + + private void extendTimeoutNanos(long adjustmentNanos) { + if (adjustmentNanos == 0 || timeoutNanos() == 0) { + return; + } + if (isInitialized()) { + if (eventLoop.inEventLoop()) { + extendTimeoutNanos0(adjustmentNanos); + } else { + eventLoop.execute(() -> extendTimeoutNanos0(adjustmentNanos)); + } + } else { + addPendingTimeoutNanos(adjustmentNanos); + addPendingTask(() -> extendTimeoutNanos0(adjustmentNanos)); + } + } + + private void extendTimeoutNanos0(long adjustmentNanos) { + assert eventLoop != null && eventLoop.inEventLoop() && task != null; + if (state != State.SCHEDULED || !task.canSchedule()) { + return; + } + final long timeoutNanos = this.timeoutNanos; + // Cancel the previously scheduled timeout, if exists. + clearTimeout0(true); + this.timeoutNanos = LongMath.saturatedAdd(timeoutNanos, adjustmentNanos); + if (timeoutNanos <= 0) { + invokeTask(null); + return; + } + state = State.SCHEDULED; + scheduledFuture = eventLoop.schedule(() -> invokeTask(null), this.timeoutNanos, NANOSECONDS); + } + + private void setTimeoutNanosFromNow(long timeoutNanos) { + checkArgument(timeoutNanos > 0, "timeoutNanos: %s (expected: > 0)", timeoutNanos); + if (isInitialized()) { + if (eventLoop.inEventLoop()) { + setTimeoutNanosFromNow0(timeoutNanos); + } else { + final long eventLoopStartTimeNanos = System.nanoTime(); + eventLoop.execute(() -> { + final long passedTimeNanos0 = System.nanoTime() - eventLoopStartTimeNanos; + final long timeoutNanos0 = Math.max(1, timeoutNanos - passedTimeNanos0); + setTimeoutNanosFromNow0(timeoutNanos0); + }); + } + } else { + final long pendingTaskRegisterTimeNanos = System.nanoTime(); + setPendingTimeoutNanos(timeoutNanos); + addPendingTask(() -> { + final long passedTimeNanos0 = System.nanoTime() - pendingTaskRegisterTimeNanos; + final long timeoutNanos0 = Math.max(1, timeoutNanos - passedTimeNanos0); + setTimeoutNanosFromNow0(timeoutNanos0); + }); + } + } + + private void setTimeoutNanosFromNow0(long newTimeoutNanos) { + assert newTimeoutNanos > 0; + assert eventLoop != null && eventLoop.inEventLoop() && task != null; + if (isFinishing() || !task.canSchedule()) { + return; + } + // Cancel the previously scheduled timeout, if exists. + clearTimeout0(true); + final long passedTimeNanos = System.nanoTime() - startTimeNanos; + timeoutNanos = LongMath.saturatedAdd(newTimeoutNanos, passedTimeNanos); + + state = State.SCHEDULED; + scheduledFuture = eventLoop.schedule(() -> invokeTask(null), newTimeoutNanos, NANOSECONDS); + } + + @Override + public void finishNow() { + finishNow(null); + } + + @Override + public void finishNow(@Nullable Throwable cause) { + if (isFinishing()) { + return; + } + if (isInitialized()) { + if (eventLoop.inEventLoop()) { + finishNow0(cause); + } else { + eventLoop.execute(() -> finishNow0(cause)); + } + } else { + addPendingTask(() -> finishNow0(cause)); + } + } + + private void finishNow0(@Nullable Throwable cause) { + assert eventLoop != null && eventLoop.inEventLoop() && task != null; + if (isFinishing() || !task.canSchedule()) { + return; + } + if (state == State.SCHEDULED) { + if (clearTimeout0(false)) { + invokeTask(cause); + } + } else { + invokeTask(cause); + } + } + + @Override + public boolean isFinished() { + return state == State.FINISHED; + } + + private boolean isFinishing() { + return state == State.FINISHED || state == State.FINISHING; + } + + @Override + @Nullable + public Throwable cause() { + return cause; + } + + @Override + public long timeoutNanos() { + return isInitialized() ? timeoutNanos : pendingTimeoutNanos; + } + + @Override + public long startTimeNanos() { + return startTimeNanos; + } + + @Override + public CompletableFuture whenCancelling() { + final CancellationFuture whenCancelling = this.whenCancelling; + if (whenCancelling != null) { + return whenCancelling; + } + final CancellationFuture cancellationFuture = new CancellationFuture(); + if (whenCancellingUpdater.compareAndSet(this, null, cancellationFuture)) { + return cancellationFuture; + } else { + return this.whenCancelling; + } + } + + @Override + public CompletableFuture whenCancelled() { + final CancellationFuture whenCancelled = this.whenCancelled; + if (whenCancelled != null) { + return whenCancelled; + } + final CancellationFuture cancellationFuture = new CancellationFuture(); + if (whenCancelledUpdater.compareAndSet(this, null, cancellationFuture)) { + return cancellationFuture; + } else { + return this.whenCancelled; + } + } + + @Override + @Deprecated + public CompletableFuture whenTimingOut() { + final TimeoutFuture whenTimingOut = this.whenTimingOut; + if (whenTimingOut != null) { + return whenTimingOut; + } + final TimeoutFuture timeoutFuture = new TimeoutFuture(); + if (whenTimingOutUpdater.compareAndSet(this, null, timeoutFuture)) { + whenCancelling().thenAccept(cause -> { + if (cause instanceof TimeoutException) { + timeoutFuture.doComplete(); + } + }); + return timeoutFuture; + } else { + return this.whenTimingOut; + } + } + + @Override + @Deprecated + public CompletableFuture whenTimedOut() { + final TimeoutFuture whenTimedOut = this.whenTimedOut; + if (whenTimedOut != null) { + return whenTimedOut; + } + final TimeoutFuture timeoutFuture = new TimeoutFuture(); + if (whenTimedOutUpdater.compareAndSet(this, null, timeoutFuture)) { + whenCancelled().thenAccept(cause -> { + if (cause instanceof TimeoutException) { + timeoutFuture.doComplete(); + } + }); + return timeoutFuture; + } else { + return this.whenTimedOut; + } + } + + @Override + @VisibleForTesting + public boolean isInitialized() { + return pendingTask == noopPendingTask && eventLoop != null; + } + + private void addPendingTask(Runnable pendingTask) { + if (!pendingTaskUpdater.compareAndSet(this, null, pendingTask)) { + for (;;) { + final Runnable oldPendingTask = this.pendingTask; + assert oldPendingTask != null; + if (oldPendingTask == noopPendingTask) { + assert eventLoop != null; + eventLoop.execute(pendingTask); + break; + } + final Runnable newPendingTask = () -> { + oldPendingTask.run(); + pendingTask.run(); + }; + if (pendingTaskUpdater.compareAndSet(this, oldPendingTask, newPendingTask)) { + break; + } + } + } + } + + private void setPendingTimeoutNanos(long pendingTimeoutNanos) { + for (;;) { + final long oldPendingTimeoutNanos = this.pendingTimeoutNanos; + if (pendingTimeoutNanosUpdater.compareAndSet(this, oldPendingTimeoutNanos, pendingTimeoutNanos)) { + break; + } + } + } + + private void addPendingTimeoutNanos(long pendingTimeoutNanos) { + for (;;) { + final long oldPendingTimeoutNanos = this.pendingTimeoutNanos; + final long newPendingTimeoutNanos = LongMath.saturatedAdd(oldPendingTimeoutNanos, + pendingTimeoutNanos); + if (pendingTimeoutNanosUpdater.compareAndSet(this, oldPendingTimeoutNanos, + newPendingTimeoutNanos)) { + break; + } + } + } + + private void invokeTask(@Nullable Throwable cause) { + if (task == null) { + return; + } + + if (cause == null) { + if (server) { + cause = RequestTimeoutException.get(); + } else { + cause = ResponseTimeoutException.get(); + } + } + + // Set FINISHING to preclude executing other timeout operations from the callbacks of `whenCancelling()` + state = State.FINISHING; + if (task.canSchedule()) { + ((CancellationFuture) whenCancelling()).doComplete(cause); + } + // Set state first to prevent duplicate execution + state = State.FINISHED; + + // The returned value of `canSchedule()` could've been changed by the callbacks of `whenCancelling()` + if (task.canSchedule()) { + task.run(cause); + } + this.cause = cause; + ((CancellationFuture) whenCancelled()).doComplete(cause); + } + + @VisibleForTesting + State state() { + return state; + } + + private static class CancellationFuture extends UnmodifiableFuture { + @Override + protected void doComplete(@Nullable Throwable cause) { + super.doComplete(cause); + } + } + + private static class TimeoutFuture extends UnmodifiableFuture { + void doComplete() { + doComplete(null); + } + } + + private static CancellationScheduler finished0(boolean server) { + final CancellationScheduler cancellationScheduler = CancellationScheduler.of(0); + cancellationScheduler + .init(ImmediateEventExecutor.INSTANCE, noopCancellationTask, 0, server); + cancellationScheduler.finishNow(); + return cancellationScheduler; + } +} diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java b/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java new file mode 100644 index 00000000000..8a5f9f8d717 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/internal/common/NoopCancellationScheduler.java @@ -0,0 +1,108 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.common; + +import java.util.concurrent.CompletableFuture; + +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.util.TimeoutMode; +import com.linecorp.armeria.common.util.UnmodifiableFuture; + +import io.netty.util.concurrent.EventExecutor; + +final class NoopCancellationScheduler implements CancellationScheduler { + + static final CancellationScheduler INSTANCE = new NoopCancellationScheduler(); + + private static final CompletableFuture THROWABLE_FUTURE = + UnmodifiableFuture.wrap(new CompletableFuture<>()); + private static final CompletableFuture VOID_FUTURE = + UnmodifiableFuture.wrap(new CompletableFuture<>()); + + private NoopCancellationScheduler() { + } + + @Override + public void init(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server) { + } + + @Override + public void clearTimeout() { + } + + @Override + public void clearTimeout(boolean resetTimeout) { + } + + @Override + public void setTimeoutNanos(TimeoutMode mode, long timeoutNanos) { + } + + @Override + public void finishNow() { + } + + @Override + public void finishNow(@Nullable Throwable cause) { + } + + @Override + public boolean isFinished() { + return false; + } + + @Override + @Nullable + public Throwable cause() { + return null; + } + + @Override + public long timeoutNanos() { + return 0; + } + + @Override + public long startTimeNanos() { + return 0; + } + + @Override + public CompletableFuture whenCancelling() { + return THROWABLE_FUTURE; + } + + @Override + public CompletableFuture whenCancelled() { + return THROWABLE_FUTURE; + } + + @Override + public CompletableFuture whenTimingOut() { + return VOID_FUTURE; + } + + @Override + public CompletableFuture whenTimedOut() { + return VOID_FUTURE; + } + + @Override + public boolean isInitialized() { + return false; + } +} diff --git a/core/src/main/java/com/linecorp/armeria/internal/server/DefaultServiceRequestContext.java b/core/src/main/java/com/linecorp/armeria/internal/server/DefaultServiceRequestContext.java index 59513b6f784..e1dec2e545f 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/server/DefaultServiceRequestContext.java +++ b/core/src/main/java/com/linecorp/armeria/internal/server/DefaultServiceRequestContext.java @@ -174,7 +174,7 @@ public DefaultServiceRequestContext( this.requestCancellationScheduler = requestCancellationScheduler; } else { this.requestCancellationScheduler = - new CancellationScheduler(TimeUnit.MILLISECONDS.toNanos(cfg.requestTimeoutMillis())); + CancellationScheduler.of(TimeUnit.MILLISECONDS.toNanos(cfg.requestTimeoutMillis())); } this.sslSession = sslSession; this.proxiedAddresses = requireNonNull(proxiedAddresses, "proxiedAddresses"); diff --git a/core/src/main/java/com/linecorp/armeria/server/ServiceRequestContextBuilder.java b/core/src/main/java/com/linecorp/armeria/server/ServiceRequestContextBuilder.java index 115c21ec731..7285401722e 100644 --- a/core/src/main/java/com/linecorp/armeria/server/ServiceRequestContextBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/server/ServiceRequestContextBuilder.java @@ -15,6 +15,7 @@ */ package com.linecorp.armeria.server; +import static com.linecorp.armeria.internal.common.CancellationScheduler.noopCancellationTask; import static java.util.Objects.requireNonNull; import java.net.InetAddress; @@ -40,14 +41,12 @@ import com.linecorp.armeria.common.logging.RequestLogBuilder; import com.linecorp.armeria.common.util.SystemInfo; import com.linecorp.armeria.internal.common.CancellationScheduler; -import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask; import com.linecorp.armeria.internal.server.DefaultServiceRequestContext; import io.micrometer.core.instrument.MeterRegistry; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.EventLoop; -import io.netty.util.concurrent.ImmediateEventExecutor; /** * Builds a new {@link ServiceRequestContext}. Note that it is not usually required to create a new context by @@ -71,27 +70,6 @@ public void serverStarting(Server server) { } }; - private static final CancellationTask noopCancellationTask = new CancellationTask() { - @Override - public boolean canSchedule() { - return true; - } - - @Override - public void run(Throwable cause) { /* no-op */ } - }; - - /** - * A cancellation scheduler that has been finished. - */ - private static final CancellationScheduler noopRequestCancellationScheduler = new CancellationScheduler(0); - - static { - noopRequestCancellationScheduler.init(ImmediateEventExecutor.INSTANCE, noopCancellationTask, - 0, /* server */ true); - noopRequestCancellationScheduler.finishNow(); - } - private final List> serverConfigurators = new ArrayList<>(4); private HttpService service = fakeService; @@ -254,9 +232,9 @@ public ServiceRequestContext build() { final CancellationScheduler requestCancellationScheduler; if (timedOut()) { - requestCancellationScheduler = noopRequestCancellationScheduler; + requestCancellationScheduler = CancellationScheduler.finished(true); } else { - requestCancellationScheduler = new CancellationScheduler(0); + requestCancellationScheduler = CancellationScheduler.of(0); final CountDownLatch latch = new CountDownLatch(1); eventLoop().execute(() -> { requestCancellationScheduler.init(eventLoop(), noopCancellationTask, 0, /* server */ true); diff --git a/core/src/test/java/com/linecorp/armeria/internal/client/DefaultClientRequestContextTest.java b/core/src/test/java/com/linecorp/armeria/internal/client/DefaultClientRequestContextTest.java index ad0caae1917..fe9eba91a93 100644 --- a/core/src/test/java/com/linecorp/armeria/internal/client/DefaultClientRequestContextTest.java +++ b/core/src/test/java/com/linecorp/armeria/internal/client/DefaultClientRequestContextTest.java @@ -300,7 +300,7 @@ private static DefaultClientRequestContext newContext(ClientOptions clientOption return new DefaultClientRequestContext( mock(EventLoop.class), NoopMeterRegistry.get(), SessionProtocol.H2C, RequestId.random(), HttpMethod.POST, reqTarget, clientOptions, httpRequest, - null, RequestOptions.of(), new CancellationScheduler(0), System.nanoTime(), + null, RequestOptions.of(), CancellationScheduler.of(0), System.nanoTime(), SystemInfo.currentTimeMicros()); } diff --git a/core/src/test/java/com/linecorp/armeria/internal/common/CancellationSchedulerTest.java b/core/src/test/java/com/linecorp/armeria/internal/common/CancellationSchedulerTest.java index 21c7eb69fbe..dfa1c06a229 100644 --- a/core/src/test/java/com/linecorp/armeria/internal/common/CancellationSchedulerTest.java +++ b/core/src/test/java/com/linecorp/armeria/internal/common/CancellationSchedulerTest.java @@ -38,7 +38,6 @@ import com.linecorp.armeria.common.CommonPools; import com.linecorp.armeria.common.TimeoutException; import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask; -import com.linecorp.armeria.internal.common.CancellationScheduler.State; import com.linecorp.armeria.server.RequestTimeoutException; import io.netty.util.concurrent.EventExecutor; @@ -57,10 +56,11 @@ public boolean canSchedule() { public void run(Throwable cause) {} }; - private static void executeInEventLoop(long initTimeoutNanos, Consumer task) { + private static void executeInEventLoop(long initTimeoutNanos, + Consumer task) { final AtomicBoolean completed = new AtomicBoolean(); eventExecutor.execute(() -> { - final CancellationScheduler scheduler = new CancellationScheduler(0); + final DefaultCancellationScheduler scheduler = new DefaultCancellationScheduler(0); scheduler.init(eventExecutor, noopTask, initTimeoutNanos, true); task.accept(scheduler); completed.set(true); @@ -140,7 +140,7 @@ void cancelTimeoutAfterDeadline() { void cancelTimeoutBySettingTimeoutZero() { executeInEventLoop(1000, scheduler -> { scheduler.setTimeoutNanos(SET_FROM_START, 0); - assertThat(scheduler.state()).isEqualTo(State.INACTIVE); + assertThat(scheduler.state()).isEqualTo(CancellationScheduler.State.INACTIVE); }); } @@ -253,7 +253,7 @@ void whenTimingOutAndWhenTimedOut() { final AtomicBoolean completed = new AtomicBoolean(); final AtomicBoolean passed = new AtomicBoolean(); eventExecutor.execute(() -> { - final CancellationScheduler scheduler = new CancellationScheduler(0); + final DefaultCancellationScheduler scheduler = new DefaultCancellationScheduler(0); final CancellationTask task = new CancellationTask() { @Override public boolean canSchedule() { @@ -273,7 +273,7 @@ public void run(Throwable cause) { assertThat(scheduler.isFinished()).isFalse(); scheduler.setTimeoutNanos(SET_FROM_NOW, MILLISECONDS.toNanos(1000)); - assertThat(scheduler.state()).isEqualTo(State.SCHEDULED); + assertThat(scheduler.state()).isEqualTo(CancellationScheduler.State.SCHEDULED); schedulerRef.set(scheduler); whenTimedOutRef.set(scheduler.whenTimedOut()); @@ -319,7 +319,7 @@ void whenCancellingAndWhenCancelled(boolean server) { } eventExecutor.execute(() -> { - final CancellationScheduler scheduler = new CancellationScheduler(0); + final DefaultCancellationScheduler scheduler = new DefaultCancellationScheduler(0); final CancellationTask task = new CancellationTask() { @Override public boolean canSchedule() { @@ -339,7 +339,7 @@ public void run(Throwable cause) { assertThat(scheduler.isFinished()).isFalse(); scheduler.setTimeoutNanos(SET_FROM_NOW, MILLISECONDS.toNanos(1000)); - assertThat(scheduler.state()).isEqualTo(State.SCHEDULED); + assertThat(scheduler.state()).isEqualTo(CancellationScheduler.State.SCHEDULED); schedulerRef.set(scheduler); whenCancellingRef.set(scheduler.whenCancelling()); @@ -357,7 +357,7 @@ public void run(Throwable cause) { @Test void pendingTimeout() { - final CancellationScheduler scheduler = new CancellationScheduler(1000); + final CancellationScheduler scheduler = new DefaultCancellationScheduler(1000); scheduler.setTimeoutNanos(EXTEND, 1000); assertThat(scheduler.timeoutNanos()).isEqualTo(2000); scheduler.setTimeoutNanos(SET_FROM_NOW, 1000); @@ -376,28 +376,28 @@ void pendingTimeout() { void evaluatePendingTimeout() { final AtomicBoolean completed = new AtomicBoolean(); eventExecutor.execute(() -> { - CancellationScheduler scheduler = new CancellationScheduler(MILLISECONDS.toNanos(1000)); + CancellationScheduler scheduler = new DefaultCancellationScheduler(MILLISECONDS.toNanos(1000)); scheduler.setTimeoutNanos(EXTEND, MILLISECONDS.toNanos(1000)); scheduler.init(eventExecutor, noopTask, 0, false); assertThat(scheduler.timeoutNanos()).isEqualTo(MILLISECONDS.toNanos(2000)); - scheduler = new CancellationScheduler(MILLISECONDS.toNanos(1000)); + scheduler = new DefaultCancellationScheduler(MILLISECONDS.toNanos(1000)); scheduler.setTimeoutNanos(EXTEND, MILLISECONDS.toNanos(2000)); scheduler.setTimeoutNanos(SET_FROM_NOW, MILLISECONDS.toNanos(1000)); scheduler.init(eventExecutor, noopTask, 0, false); assertTimeoutWithTolerance(scheduler.timeoutNanos(), MILLISECONDS.toNanos(1000)); - scheduler = new CancellationScheduler(MILLISECONDS.toNanos(1000)); + scheduler = new DefaultCancellationScheduler(MILLISECONDS.toNanos(1000)); scheduler.clearTimeout(false); scheduler.init(eventExecutor, noopTask, 0, false); assertThat(scheduler.timeoutNanos()).isEqualTo(MILLISECONDS.toNanos(1000)); - scheduler = new CancellationScheduler(MILLISECONDS.toNanos(1000)); + scheduler = new DefaultCancellationScheduler(MILLISECONDS.toNanos(1000)); scheduler.clearTimeout(); scheduler.init(eventExecutor, noopTask, 0, false); assertThat(scheduler.timeoutNanos()).isZero(); - scheduler = new CancellationScheduler(MILLISECONDS.toNanos(1000)); + scheduler = new DefaultCancellationScheduler(MILLISECONDS.toNanos(1000)); scheduler.setTimeoutNanos(EXTEND, MILLISECONDS.toNanos(2000)); scheduler.setTimeoutNanos(SET_FROM_NOW, MILLISECONDS.toNanos(1000)); scheduler.setTimeoutNanos(SET_FROM_START, MILLISECONDS.toNanos(10000)); @@ -411,7 +411,7 @@ void evaluatePendingTimeout() { @Test void initializeOnlyOnce() { final AtomicBoolean completed = new AtomicBoolean(); - final CancellationScheduler scheduler = new CancellationScheduler(0); + final CancellationScheduler scheduler = new DefaultCancellationScheduler(0); eventExecutor.execute(() -> { scheduler.init(eventExecutor, noopTask, MILLISECONDS.toNanos(100), false); assertThat(scheduler.timeoutNanos()).isEqualTo(MILLISECONDS.toNanos(100)); @@ -427,7 +427,7 @@ void initializeOnlyOnce() { @Test void multiple_ClearTimeoutInWhenCancelling() { final AtomicBoolean completed = new AtomicBoolean(); - final CancellationScheduler scheduler = new CancellationScheduler(0); + final CancellationScheduler scheduler = new DefaultCancellationScheduler(0); scheduler.whenCancelling().thenRun(() -> { scheduler.clearTimeout(false); scheduler.clearTimeout(false);