Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CancellationScheduler finishes even if not started #5212

Merged
merged 24 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLSession;

Expand Down Expand Up @@ -116,17 +114,8 @@ public ClientRequestContext build() {
if (timedOut()) {
responseCancellationScheduler = CancellationScheduler.finished(false);
} else {
responseCancellationScheduler = CancellationScheduler.of(0);
final CountDownLatch latch = new CountDownLatch(1);
eventLoop().execute(() -> {
responseCancellationScheduler.init(eventLoop(), noopCancellationTask, 0, /* server */ false);
latch.countDown();
});

try {
latch.await(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
responseCancellationScheduler = CancellationScheduler.ofClient(0);
responseCancellationScheduler.initAndStart(eventLoop(), noopCancellationTask);
}

final DefaultClientRequestContext ctx = new DefaultClientRequestContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,7 @@ void initTimeout() {
if (ctxExtension != null) {
final CancellationScheduler responseCancellationScheduler =
ctxExtension.responseCancellationScheduler();
responseCancellationScheduler.init(
ctx.eventLoop(), newCancellationTask(),
TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis), /* server */ false);
responseCancellationScheduler.start(newCancellationTask());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,15 @@ private static SessionProtocol desiredSessionProtocol(SessionProtocol protocol,
* e.g. {@code System.currentTimeMillis() * 1000}.
*/
public DefaultClientRequestContext(
@Nullable EventLoop eventLoop, MeterRegistry meterRegistry, SessionProtocol sessionProtocol,
EventLoop eventLoop, MeterRegistry meterRegistry, SessionProtocol sessionProtocol,
RequestId id, HttpMethod method, RequestTarget reqTarget,
ClientOptions options, @Nullable HttpRequest req, @Nullable RpcRequest rpcReq,
RequestOptions requestOptions, CancellationScheduler responseCancellationScheduler,
long requestStartTimeNanos, long requestStartTimeMicros) {
this(eventLoop, meterRegistry, sessionProtocol,
this(requireNonNull(eventLoop, "eventLoop"), meterRegistry, sessionProtocol,
id, method, reqTarget, options, req, rpcReq, requestOptions, serviceRequestContext(),
responseCancellationScheduler, requestStartTimeNanos, requestStartTimeMicros);
requireNonNull(responseCancellationScheduler, "responseCancellationScheduler"),
requestStartTimeNanos, requestStartTimeMicros);
}

/**
Expand Down Expand Up @@ -226,6 +227,9 @@ private DefaultClientRequestContext(
firstNonNull(requestOptions.exchangeType(), ExchangeType.BIDI_STREAMING),
requestAutoAbortDelayMillis(options, requestOptions), req, rpcReq,
getAttributes(root), options.contextHook());
assert (eventLoop == null && responseCancellationScheduler == null) ||
(eventLoop != null && responseCancellationScheduler != null)
: "'eventLoop' and 'responseCancellationScheduler' should be both null or non-null";

this.eventLoop = eventLoop;
this.options = requireNonNull(options, "options");
Expand All @@ -240,7 +244,8 @@ private DefaultClientRequestContext(
responseTimeoutMillis = options().responseTimeoutMillis();
}
this.responseCancellationScheduler =
CancellationScheduler.of(TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis));
CancellationScheduler.ofClient(TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis));
// the cancellationScheduler is not initialized here since the eventLoop is guaranteed to be null
} else {
this.responseCancellationScheduler = responseCancellationScheduler;
}
Expand Down Expand Up @@ -427,6 +432,7 @@ private void acquireEventLoop(EndpointGroup endpointGroup) {
options().factory().acquireEventLoop(sessionProtocol(), endpointGroup, endpoint);
eventLoop = releasableEventLoop.get();
log.whenComplete().thenAccept(unused -> releasableEventLoop.release());
responseCancellationScheduler.init(eventLoop());
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -517,7 +523,7 @@ private DefaultClientRequestContext(DefaultClientRequestContext ctx,
log = RequestLog.builder(this);
log.startRequest();
responseCancellationScheduler =
CancellationScheduler.of(TimeUnit.MILLISECONDS.toNanos(ctx.responseTimeoutMillis()));
CancellationScheduler.ofClient(TimeUnit.MILLISECONDS.toNanos(ctx.responseTimeoutMillis()));
writeTimeoutMillis = ctx.writeTimeoutMillis();
maxResponseLength = ctx.maxResponseLength();

Expand All @@ -534,6 +540,7 @@ private DefaultClientRequestContext(DefaultClientRequestContext ctx,
// the root context.
if (endpoint == null || ctx.endpoint() == endpoint && ctx.log.children().isEmpty()) {
eventLoop = ctx.eventLoop().withoutContext();
responseCancellationScheduler.init(eventLoop());
} else {
acquireEventLoop(endpoint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

import java.util.concurrent.CompletableFuture;

import com.google.common.annotations.VisibleForTesting;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.TimeoutMode;

import io.netty.util.concurrent.EventExecutor;

public interface CancellationScheduler {

static CancellationScheduler of(long timeoutNanos) {
return new DefaultCancellationScheduler(timeoutNanos);
static CancellationScheduler ofClient(long timeoutNanos) {
return new DefaultCancellationScheduler(timeoutNanos, false);
}

static CancellationScheduler ofServer(long timeoutNanos) {
return new DefaultCancellationScheduler(timeoutNanos, true);
}

/**
Expand Down Expand Up @@ -59,7 +61,11 @@ public boolean canSchedule() {
public void run(Throwable cause) { /* no-op */ }
};

void init(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server);
void initAndStart(EventExecutor eventLoop, CancellationTask task);

void init(EventExecutor eventLoop);

void start(CancellationTask task);

void clearTimeout();

Expand Down Expand Up @@ -89,9 +95,6 @@ public void run(Throwable cause) { /* no-op */ }
@Deprecated
CompletableFuture<Void> whenTimedOut();

@VisibleForTesting
boolean isInitialized();

enum State {
INIT,
INACTIVE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ final class DefaultCancellationScheduler implements CancellationScheduler {
private static final AtomicLongFieldUpdater<DefaultCancellationScheduler> pendingTimeoutNanosUpdater =
AtomicLongFieldUpdater.newUpdater(DefaultCancellationScheduler.class, "pendingTimeoutNanos");

private static final AtomicReferenceFieldUpdater<DefaultCancellationScheduler, EventExecutor>
eventLoopUpdater = AtomicReferenceFieldUpdater.newUpdater(
DefaultCancellationScheduler.class, EventExecutor.class, "eventLoop");

private static final Runnable noopPendingTask = () -> {
};

Expand All @@ -74,7 +78,7 @@ final class DefaultCancellationScheduler implements CancellationScheduler {
private long timeoutNanos;
private long startTimeNanos;
@Nullable
private EventExecutor eventLoop;
private volatile EventExecutor eventLoop;
@Nullable
private CancellationTask task;
@Nullable
Expand All @@ -91,42 +95,61 @@ final class DefaultCancellationScheduler implements CancellationScheduler {
private volatile TimeoutFuture whenTimedOut;
@SuppressWarnings("FieldMayBeFinal")
private volatile long pendingTimeoutNanos;
private boolean server;
private final boolean server;
@Nullable
private Throwable cause;

@VisibleForTesting
DefaultCancellationScheduler(long timeoutNanos) {
this(timeoutNanos, true);
}

DefaultCancellationScheduler(long timeoutNanos, boolean server) {
this.timeoutNanos = timeoutNanos;
pendingTimeoutNanos = timeoutNanos;
this.server = server;
}

/**
* Initializes this {@link DefaultCancellationScheduler}.
*/
@Override
public void init(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server) {
public void initAndStart(EventExecutor eventLoop, CancellationTask task) {
init(eventLoop);
if (!eventLoop.inEventLoop()) {
eventLoop.execute(() -> init0(eventLoop, task, timeoutNanos, server));
eventLoop.execute(() -> start(task));
} else {
init0(eventLoop, task, timeoutNanos, server);
start(task);
}
}

@Override
public void init(EventExecutor eventLoop) {
if (!eventLoopUpdater.compareAndSet(this, null, eventLoop)) {
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException("Can't init() more than once");
}
}

private void init0(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server) {
if (state != State.INIT) {
@Override
public void start(CancellationTask task) {
assert eventLoop != null;
assert eventLoop.inEventLoop();
if (isFinished()) {
assert cause != null;
task.run(cause);
return;
}
this.eventLoop = eventLoop;
this.task = task;
if (timeoutNanos > 0) {
this.timeoutNanos = timeoutNanos;
if (this.task != null) {
// just replace the task
this.task = task;
return;
}
this.server = server;
this.task = task;
startTimeNanos = System.nanoTime();
if (this.timeoutNanos != 0) {
if (timeoutNanos != 0) {
state = State.SCHEDULED;
scheduledFuture =
eventLoop.schedule(() -> invokeTask(null), this.timeoutNanos, NANOSECONDS);
eventLoop.schedule(() -> invokeTask(null), timeoutNanos, NANOSECONDS);
} else {
state = State.INACTIVE;
}
Expand Down Expand Up @@ -312,14 +335,16 @@ public void finishNow(@Nullable Throwable cause) {
if (isFinishing()) {
return;
}
assert eventLoop != null;
if (!eventLoop.inEventLoop()) {
eventLoop.execute(() -> finishNow(cause));
return;
}
if (isInitialized()) {
if (eventLoop.inEventLoop()) {
finishNow0(cause);
} else {
eventLoop.execute(() -> finishNow0(cause));
}
finishNow0(cause);
} else {
addPendingTask(() -> finishNow0(cause));
start(noopCancellationTask);
finishNow0(cause);
Copy link
Contributor

@ikhoon ikhoon Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctx.cancel() called in a decorator before invoking delegate.execute() couldn't prevent the request headers from being written.
Would we abort the request in the following points if ctx.isCanceled() is true?

final Throwable throwable = ClientPendingThrowableUtil.pendingThrowable(ctx);

if (handleEarlyCancellation(ctx, req, res)) {

Copy link
Contributor Author

@jrhee17 jrhee17 Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not decided on how we want to set up the dependency between log, HttpResponse, and ctx yet but I think this is one possibility.

I think overall we want to

  1. Hook callbacks so that there is a dependency between the above three states
  2. Check whether a single state is completed or not

For now, I'm imagining that:

  1. If ctx is cancelled, then the corresponding HttpResponse is cancelled
  2. We check for res.isDone overall for early cancellation

But I will have to explore this further. For now, I think this PR doesn't change the previous behavior so it should probably be safe to merge.

Do you prefer this be done in this PR as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I'm imagining that:

Sounds good.

The case where ctx.cancel() is called before a normal CancellationTask is registered can be considered a separate issue.

}
}

Expand Down Expand Up @@ -430,9 +455,7 @@ public CompletableFuture<Void> whenTimedOut() {
}
}

@Override
@VisibleForTesting
public boolean isInitialized() {
private boolean isInitialized() {
return pendingTask == noopPendingTask && eventLoop != null;
}

Expand Down Expand Up @@ -531,9 +554,8 @@ void doComplete() {
}

private static CancellationScheduler finished0(boolean server) {
final CancellationScheduler cancellationScheduler = CancellationScheduler.of(0);
cancellationScheduler
.init(ImmediateEventExecutor.INSTANCE, noopCancellationTask, 0, server);
final CancellationScheduler cancellationScheduler = new DefaultCancellationScheduler(0, server);
cancellationScheduler.initAndStart(ImmediateEventExecutor.INSTANCE, noopCancellationTask);
cancellationScheduler.finishNow();
return cancellationScheduler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ private NoopCancellationScheduler() {
}

@Override
public void init(EventExecutor eventLoop, CancellationTask task, long timeoutNanos, boolean server) {
public void initAndStart(EventExecutor eventLoop, CancellationTask task) {
}

@Override
public void init(EventExecutor eventLoop) {
}

@Override
public void start(CancellationTask task) {
}

@Override
Expand Down Expand Up @@ -100,9 +108,4 @@ public CompletableFuture<Void> whenTimingOut() {
public CompletableFuture<Void> whenTimedOut() {
return VOID_FUTURE;
}

@Override
public boolean isInitialized() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public DefaultServiceRequestContext(
this.requestCancellationScheduler = requestCancellationScheduler;
} else {
this.requestCancellationScheduler =
CancellationScheduler.of(TimeUnit.MILLISECONDS.toNanos(cfg.requestTimeoutMillis()));
CancellationScheduler.ofServer(TimeUnit.MILLISECONDS.toNanos(cfg.requestTimeoutMillis()));
this.requestCancellationScheduler.init(eventLoop());
}
this.sslSession = sslSession;
this.proxiedAddresses = requireNonNull(proxiedAddresses, "proxiedAddresses");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ final void maybeWriteAccessLog() {
*/
final void scheduleTimeout() {
// Schedule the initial request timeout with the timeoutNanos in the CancellationScheduler
reqCtx.requestCancellationScheduler().init(reqCtx.eventLoop(), newCancellationTask(),
0, /* server */ true);
reqCtx.requestCancellationScheduler().start(newCancellationTask());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.net.ssl.SSLSession;
Expand Down Expand Up @@ -234,17 +232,8 @@ public ServiceRequestContext build() {
if (timedOut()) {
requestCancellationScheduler = CancellationScheduler.finished(true);
} else {
requestCancellationScheduler = CancellationScheduler.of(0);
final CountDownLatch latch = new CountDownLatch(1);
eventLoop().execute(() -> {
requestCancellationScheduler.init(eventLoop(), noopCancellationTask, 0, /* server */ true);
latch.countDown();
});

try {
latch.await(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
requestCancellationScheduler = CancellationScheduler.ofServer(0);
requestCancellationScheduler.initAndStart(eventLoop(), noopCancellationTask);
}

// Build the context with the properties set by a user and the fake objects.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private static DefaultClientRequestContext newContext(ClientOptions clientOption
return new DefaultClientRequestContext(
mock(EventLoop.class), NoopMeterRegistry.get(), SessionProtocol.H2C,
RequestId.random(), HttpMethod.POST, reqTarget, clientOptions, httpRequest,
null, RequestOptions.of(), CancellationScheduler.of(0), System.nanoTime(),
null, RequestOptions.of(), CancellationScheduler.ofClient(0), System.nanoTime(),
SystemInfo.currentTimeMicros());
}

Expand Down
Loading