Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientRequestContext#cancel cancels the associated request immediately #5800

Merged
merged 11 commits into from
Aug 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
import com.linecorp.armeria.internal.client.HttpSession;
import com.linecorp.armeria.internal.common.CancellationScheduler;
import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.unsafe.PooledObjects;

Expand Down Expand Up @@ -89,6 +91,7 @@ enum State {
private ScheduledFuture<?> timeoutFuture;
private State state = State.NEEDS_TO_WRITE_FIRST_HEADER;
private boolean loggedRequestFirstBytesTransferred;
private boolean failed;

AbstractHttpRequestHandler(Channel ch, ClientHttpObjectEncoder encoder, HttpResponseDecoder responseDecoder,
DecodedHttpResponse originalRes,
Expand Down Expand Up @@ -192,9 +195,30 @@ final boolean tryInitialize() {
() -> failAndReset(WriteTimeoutException.get()),
timeoutMillis, TimeUnit.MILLISECONDS);
}
final CancellationScheduler scheduler = cancellationScheduler();
if (scheduler != null) {
scheduler.updateTask(newCancellationTask());
}
if (ctx.isCancelled()) {
// The previous cancellation task wraps the cause with an UnprocessedRequestException
// so we return early
return false;
}
return true;
}

private CancellationTask newCancellationTask() {
return cause -> {
if (ch.eventLoop().inEventLoop()) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
failAndReset(cause);
}
} else {
ch.eventLoop().execute(() -> failAndReset(cause));
}
};
}

RequestHeaders mergedRequestHeaders(RequestHeaders headers) {
final HttpHeaders internalHeaders;
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
Expand Down Expand Up @@ -354,6 +378,10 @@ final void failRequest(Throwable cause) {
}

private void fail(Throwable cause) {
if (failed) {
return;
}
failed = true;
state = State.DONE;
cancel();
logBuilder.endRequest(cause);
Expand All @@ -368,9 +396,20 @@ private void fail(Throwable cause) {
logBuilder.endResponse(cause);
originalRes.close(cause);
}

final CancellationScheduler scheduler = cancellationScheduler();
if (scheduler != null) {
// best-effort attempt to cancel the scheduled timeout task so that RequestContext#cause
// isn't set unnecessarily
scheduler.cancelScheduled();
}
}

final void failAndReset(Throwable cause) {
if (failed) {
return;
}

if (cause instanceof WriteTimeoutException) {
final HttpSession session = HttpSession.get(ch);
// Mark the session as unhealthy so that subsequent requests do not use it.
Expand All @@ -394,7 +433,7 @@ final void failAndReset(Throwable cause) {
error = Http2Error.INTERNAL_ERROR;
}

if (error.code() != Http2Error.CANCEL.code()) {
if (error.code() != Http2Error.CANCEL.code() && cause != ctx.cancellationCause()) {
Exceptions.logIfUnexpected(logger, ch,
HttpSession.get(ch).protocol(),
"a request publisher raised an exception", cause);
Expand All @@ -415,4 +454,13 @@ final boolean cancelTimeout() {
this.timeoutFuture = null;
return timeoutFuture.cancel(false);
}

@Nullable
private CancellationScheduler cancellationScheduler() {
final ClientRequestContextExtension ctxExt = ctx.as(ClientRequestContextExtension.class);
if (ctxExt != null) {
return ctxExt.responseCancellationScheduler();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.logging.ClientConnectionTimings;
import com.linecorp.armeria.common.logging.ClientConnectionTimingsBuilder;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil;
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
import com.linecorp.armeria.internal.client.HttpSession;
import com.linecorp.armeria.internal.client.PooledChannel;
Expand Down Expand Up @@ -63,13 +63,13 @@ final class HttpClientDelegate implements HttpClient {
public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception {
final Throwable throwable = ClientPendingThrowableUtil.pendingThrowable(ctx);
if (throwable != null) {
return earlyFailedResponse(throwable, ctx, req);
return earlyFailedResponse(throwable, ctx);
}
if (req != ctx.request()) {
return earlyFailedResponse(
new IllegalStateException("ctx.request() does not match the actual request; " +
"did you forget to call ctx.updateRequest() in your decorator?"),
ctx, req);
ctx);
}

final Endpoint endpoint = ctx.endpoint();
Expand All @@ -84,21 +84,27 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
// and response created here will be exposed only when `EndpointGroup.select()` returned `null`.
//
// See `DefaultClientRequestContext.init()` for more information.
return earlyFailedResponse(EmptyEndpointGroupException.get(ctx.endpointGroup()), ctx, req);
return earlyFailedResponse(EmptyEndpointGroupException.get(ctx.endpointGroup()), ctx);
}

final SessionProtocol protocol = ctx.sessionProtocol();
final ProxyConfig proxyConfig;
try {
proxyConfig = getProxyConfig(protocol, endpoint);
} catch (Throwable t) {
return earlyFailedResponse(t, ctx, req);
return earlyFailedResponse(t, ctx);
}

final Throwable cancellationCause = ctx.cancellationCause();
if (cancellationCause != null) {
return earlyFailedResponse(cancellationCause, ctx);
}

final Endpoint endpointWithPort = endpoint.withDefaultPort(ctx.sessionProtocol());
final EventLoop eventLoop = ctx.eventLoop().withoutContext();
// TODO(ikhoon) Use ctx.exchangeType() to create an optimized HttpResponse for non-streaming response.
final DecodedHttpResponse res = new DecodedHttpResponse(eventLoop);
updateCancellationTask(ctx, req, res);

final ClientConnectionTimingsBuilder timingsBuilder = ClientConnectionTimings.builder();

Expand All @@ -115,14 +121,31 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
acquireConnectionAndExecute(ctx, resolved, req, res, timingsBuilder, proxyConfig);
} else {
ctx.logBuilder().session(null, ctx.sessionProtocol(), timingsBuilder.build());
earlyFailedResponse(cause, ctx, req, res);
ctx.cancel(cause);
}
});
}

return res;
}

private static void updateCancellationTask(ClientRequestContext ctx, HttpRequest req,
DecodedHttpResponse res) {
final ClientRequestContextExtension ctxExt = ctx.as(ClientRequestContextExtension.class);
if (ctxExt == null) {
return;
}
ctxExt.responseCancellationScheduler().updateTask(cause -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested removing updateTask() method, but that would result in too many changes to the existing code. As long as the implemetation is correct, I'm okay to maintain the current style and focus on implementing the response timeout mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, updateTask() doesn't necessarily have to be removed - it can still be used internally since the old CancellationScheduler#start acted similarly to updateTask().

It is probably more important to unify and simplify the request/response abortion mechanism. This seems difficult to do at the moment for client-side. In particular, when one attempts this one should take note of 1) consistency between [req|res].cause and log.[req|res]Cause, 2) setting the correct exception type (e.g. UnprocessedRequestException).

try (SafeCloseable ignored = RequestContextUtil.pop()) {
final UnprocessedRequestException ure = UnprocessedRequestException.of(cause);
req.abort(ure);
ctx.logBuilder().endRequest(ure);
res.close(ure);
ctx.logBuilder().endResponse(ure);
}
});
}

private void resolveAddress(Endpoint endpoint, ClientRequestContext ctx,
BiConsumer<@Nullable Endpoint, @Nullable Throwable> onComplete) {

Expand Down Expand Up @@ -169,7 +192,7 @@ private void acquireConnectionAndExecute0(ClientRequestContext ctx, Endpoint end
try {
pool = factory.pool(ctx.eventLoop().withoutContext());
} catch (Throwable t) {
earlyFailedResponse(t, ctx, req, res);
ctx.cancel(t);
return;
}
final SessionProtocol protocol = ctx.sessionProtocol();
Expand All @@ -185,7 +208,7 @@ private void acquireConnectionAndExecute0(ClientRequestContext ctx, Endpoint end
if (cause == null) {
doExecute(newPooledChannel, ctx, req, res);
} else {
earlyFailedResponse(cause, ctx, req, res);
ctx.cancel(cause);
}
return null;
});
Expand Down Expand Up @@ -224,30 +247,12 @@ private static void logSession(ClientRequestContext ctx, @Nullable PooledChannel
}
}

private static HttpResponse earlyFailedResponse(Throwable t, ClientRequestContext ctx, HttpRequest req) {
private static HttpResponse earlyFailedResponse(Throwable t, ClientRequestContext ctx) {
final UnprocessedRequestException cause = UnprocessedRequestException.of(t);
handleEarlyRequestException(ctx, req, cause);
ctx.cancel(cause);
return HttpResponse.ofFailure(cause);
}

private static void earlyFailedResponse(Throwable t, ClientRequestContext ctx, HttpRequest req,
DecodedHttpResponse res) {
final UnprocessedRequestException cause = UnprocessedRequestException.of(t);
handleEarlyRequestException(ctx, req, cause);
res.close(cause);
}

private static void handleEarlyRequestException(ClientRequestContext ctx,
HttpRequest req, Throwable cause) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
req.abort(cause);
final RequestLogBuilder logBuilder = ctx.logBuilder();
logBuilder.endRequest(cause);
logBuilder.endResponse(cause);
ctx.cancel(cause);
}
}

private static void doExecute(PooledChannel pooledChannel, ClientRequestContext ctx,
HttpRequest req, DecodedHttpResponse res) {
final Channel channel = pooledChannel.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import com.linecorp.armeria.common.stream.StreamWriter;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
import com.linecorp.armeria.internal.common.CancellationScheduler;
import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.unsafe.PooledObjects;

import io.netty.channel.EventLoop;
Expand Down Expand Up @@ -213,7 +215,7 @@ void close(@Nullable Throwable cause, boolean cancel) {
}
done = true;
closed = true;
cancelTimeoutOrLog(cause, cancel);
cancelTimeoutAndLog(cause, cancel);
final HttpRequest request = ctx.request();
assert request != null;
if (cause != null) {
Expand Down Expand Up @@ -250,32 +252,24 @@ private void cancelAction(@Nullable Throwable cause) {
}
}

private void cancelTimeoutOrLog(@Nullable Throwable cause, boolean cancel) {
CancellationScheduler responseCancellationScheduler = null;
private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) {
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
if (ctxExtension != null) {
responseCancellationScheduler = ctxExtension.responseCancellationScheduler();
// best-effort attempt to cancel the scheduled timeout task so that RequestContext#cause
// isn't set unnecessarily
ctxExtension.responseCancellationScheduler().cancelScheduled();
}

if (responseCancellationScheduler == null || !responseCancellationScheduler.isFinished()) {
if (responseCancellationScheduler != null) {
responseCancellationScheduler.clearTimeout(false);
}
// There's no timeout or the response has not been timed out.
if (cancel) {
cancelAction(cause);
} else {
closeAction(cause);
}
if (cancel) {
cancelAction(cause);
return;
}
if (delegate.isOpen()) {
closeAction(cause);
}

// Response has been timed out already.
// Log only when it's not a ResponseTimeoutException.
if (cause instanceof ResponseTimeoutException) {
// the context has been cancelled either by timeout or by user invocation
if (cause == ctx.cancellationCause()) {
return;
}

Expand All @@ -299,7 +293,8 @@ void initTimeout() {
if (ctxExtension != null) {
final CancellationScheduler responseCancellationScheduler =
ctxExtension.responseCancellationScheduler();
responseCancellationScheduler.start(newCancellationTask());
responseCancellationScheduler.updateTask(newCancellationTask());
responseCancellationScheduler.start();
}
}

Expand All @@ -312,11 +307,13 @@ public boolean canSchedule() {

@Override
public void run(Throwable cause) {
delegate.close(cause);
final HttpRequest request = ctx.request();
assert request != null;
request.abort(cause);
ctx.logBuilder().endResponse(cause);
if (ctx.eventLoop().inEventLoop()) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
close(cause);
}
} else {
ctx.eventLoop().withoutContext().execute(() -> close(cause));
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ default void setRequestAutoAbortDelay(Duration delay) {

/**
* Returns the cause of cancellation, {@code null} if the request has not been cancelled.
* Note that there is no guarantee that the cancellation cause is equivalent to the cause of failure
* for {@link HttpRequest} or {@link HttpResponse}. Refer to {@link RequestLog#requestCause()}
* or {@link RequestLog#responseCause()} for the exact reason why a request or response failed.
*/
@Nullable
Throwable cancellationCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ private void updateFlags(int flags) {
private static void completeSatisfiedFutures(RequestLogFuture[] satisfiedFutures, RequestLog log,
RequestContext ctx) {
if (!ctx.eventLoop().inEventLoop()) {
ctx.eventLoop().execute(() -> completeSatisfiedFutures(satisfiedFutures, log, ctx));
ctx.eventLoop().withoutContext().execute(
() -> completeSatisfiedFutures(satisfiedFutures, log, ctx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q) Why was the context removed? Shouldn't the request context be propagated to the callbacks of satisfiedFutures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed like we were making an effort to clear the request context when completing logs.

e.g.

try (SafeCloseable ignored = RequestContextUtil.pop()) {
req.abort(cause);
final RequestLogBuilder logBuilder = ctx.logBuilder();
logBuilder.endRequest(cause);
logBuilder.endResponse(cause);
}

I saw ContextCancellationTest failing occasionally due to this, so I modified to use withoutContext.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was an inevitable choice Netty can handle pending messages in batch mode that can cause a request context leak.

If an action does not perform sequentially in the callbacks of Netty operations, I think a context-aware executor is generally preferred.

I am not strong here. If you have any other workarounds, it is okay to keep it.

Copy link
Contributor Author

@jrhee17 jrhee17 Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Netty can handle pending messages in batch mode that can cause a request context leak

I see, I think I misunderstood the intention then. My original guess was that if request logs for multiple requests are waited on a single condition, users would easily see a request context leak.

I'm not sure I understood what you mean by batch mode though. I thought channels are tied to a single thread, and within a single thread as long as the try...close syntax is used context leaks weren't possible.

Copy link
Contributor

@ikhoon ikhoon Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try...close syntax is used context leaks weren't possible.

Context leaks could happen even with try...close.
if request logs for multiple requests are waited on a single condition seems to mean the same thing, but I'll explain it just in case.

Say we have a simple service that adds a callback to response.whenComplete().

try (ctx.push()) {
   val response = HttpResponse.streaming();
   response.write(...);
   response.whenComplete().thenAccept(unused -> {
      // Do something else.
   })
}
  • The code pushes ctx temporary using try with resource block.
  • response.whenComplete() may be completed when WriteFutureListener.operationComplete() is invoked if it is a unary type.
    future.addListener(new WriteFutureListener(response.content().isEmpty(), cause));
  • Netty Channel stores write requests to ChannelOutboundBuffer.
  • Channel is busy. response.write() for request_1 to request_9 are added to ChannelOutboundBuffer and operationComplete() isn't invoked for them.
  • When response.write() for request_10 is written, Channel can flush the outbound data so request_1 to request_10 are flushed in the same call (task).
  • Without try (SafeCloseable ignored = RequestContextUtil.pop()), the callbacks of request_1-9.whenComplete() will see the context of request_10.

However, it would be impossible to leak the context with ctx.eventLoop().execute(() -> completeSatisfiedFutures()) since it is executed asynchronously. So I thought it was safe to remove .withoutContext().

It seemed like we were making an effort to clear the request context when completing logs.

As you said, contexts are not propagated in some cases. I agree with keeping the current style in terms of consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation 👍

When response.write() for request_10 is written, Channel can flush the outbound data so request_1 to request_10 are flushed in the same call (task).

I understood that even if request_1 to request_10 is flushed together in a single task, they will be executed sequentially.
e.g.

ClientRequestContext ctx1 = null;
ClientRequestContext ctx2 = null;
// represents a single task
CommonPools.workerGroup().execute(() -> {
    // request 1
    try (SafeCloseable sf = ctx1.push()) {
        ctx1.logBuilder().endResponse();   
    }
    // request 2
    try (SafeCloseable sf = ctx2.push()) {
        ctx2.logBuilder().endResponse();
    }
});

I agree with keeping the current style in terms of consistency.

👍 for me as well. Ultimately, my thought was that it depends more on how users are using the API - for this reason it is probably more important that the context is pushed (or not) consistently.

e.g. I can imagine the following scenario

HttpResponseWriter res1 = null;
HttpResponseWriter res2 = null;
CompletableFuture.allOf(ctx1.log().whenComplete(), ctx2.log().whenComplete())
                 .handle((v, t) -> {
                     res1.close();
                     res2.close();
                     // when the res is completed, the same thread may call ctx.push somewhere
                     return null;
                 });

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for chiming in late.
One thing we need to keep in mind is that if this method is initially called by the ctx.eventLoop(), it might have the context in the thread-local.
Could you let me know which test fails in ContextCancellationTest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't expect this change to be so controversial 😅 Let me just revert this change since this isn't really critical to this PR

return;
}
for (RequestLogFuture f : satisfiedFutures) {
Expand Down
Loading
Loading