Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientRequestContext#cancel cancels the associated request immediately #5800

Merged
merged 11 commits into from
Aug 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
import com.linecorp.armeria.internal.client.HttpSession;
import com.linecorp.armeria.internal.common.CancellationScheduler;
import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.unsafe.PooledObjects;

Expand Down Expand Up @@ -89,6 +91,7 @@ enum State {
private ScheduledFuture<?> timeoutFuture;
private State state = State.NEEDS_TO_WRITE_FIRST_HEADER;
private boolean loggedRequestFirstBytesTransferred;
private boolean failed;

AbstractHttpRequestHandler(Channel ch, ClientHttpObjectEncoder encoder, HttpResponseDecoder responseDecoder,
DecodedHttpResponse originalRes,
Expand Down Expand Up @@ -192,9 +195,30 @@ final boolean tryInitialize() {
() -> failAndReset(WriteTimeoutException.get()),
timeoutMillis, TimeUnit.MILLISECONDS);
}
final CancellationScheduler scheduler = cancellationScheduler();
if (scheduler != null) {
scheduler.updateTask(newCancellationTask());
}
if (ctx.isCancelled()) {
// The previous cancellation task wraps the cause with an UnprocessedRequestException
// so we return early
return false;
}
return true;
}

private CancellationTask newCancellationTask() {
return cause -> {
if (ch.eventLoop().inEventLoop()) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
failAndReset(cause);
}
} else {
ch.eventLoop().execute(() -> failAndReset(cause));
}
};
}

RequestHeaders mergedRequestHeaders(RequestHeaders headers) {
final HttpHeaders internalHeaders;
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
Expand Down Expand Up @@ -354,6 +378,10 @@ final void failRequest(Throwable cause) {
}

private void fail(Throwable cause) {
if (failed) {
return;
}
failed = true;
state = State.DONE;
cancel();
logBuilder.endRequest(cause);
Expand All @@ -368,9 +396,20 @@ private void fail(Throwable cause) {
logBuilder.endResponse(cause);
originalRes.close(cause);
}

final CancellationScheduler scheduler = cancellationScheduler();
if (scheduler != null) {
// best-effort attempt to cancel the scheduled timeout task so that RequestContext#cause
// isn't set unnecessarily
scheduler.cancelScheduled();
}
}

final void failAndReset(Throwable cause) {
if (failed) {
return;
}

if (cause instanceof WriteTimeoutException) {
final HttpSession session = HttpSession.get(ch);
// Mark the session as unhealthy so that subsequent requests do not use it.
Expand All @@ -394,7 +433,7 @@ final void failAndReset(Throwable cause) {
error = Http2Error.INTERNAL_ERROR;
}

if (error.code() != Http2Error.CANCEL.code()) {
if (error.code() != Http2Error.CANCEL.code() && cause != ctx.cancellationCause()) {
Exceptions.logIfUnexpected(logger, ch,
HttpSession.get(ch).protocol(),
"a request publisher raised an exception", cause);
Expand All @@ -415,4 +454,13 @@ final boolean cancelTimeout() {
this.timeoutFuture = null;
return timeoutFuture.cancel(false);
}

@Nullable
private CancellationScheduler cancellationScheduler() {
final ClientRequestContextExtension ctxExt = ctx.as(ClientRequestContextExtension.class);
if (ctxExt != null) {
return ctxExt.responseCancellationScheduler();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.logging.ClientConnectionTimings;
import com.linecorp.armeria.common.logging.ClientConnectionTimingsBuilder;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil;
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
import com.linecorp.armeria.internal.client.HttpSession;
import com.linecorp.armeria.internal.client.PooledChannel;
Expand Down Expand Up @@ -63,13 +63,13 @@ final class HttpClientDelegate implements HttpClient {
public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception {
final Throwable throwable = ClientPendingThrowableUtil.pendingThrowable(ctx);
if (throwable != null) {
return earlyFailedResponse(throwable, ctx, req);
return earlyFailedResponse(throwable, ctx);
}
if (req != ctx.request()) {
return earlyFailedResponse(
new IllegalStateException("ctx.request() does not match the actual request; " +
"did you forget to call ctx.updateRequest() in your decorator?"),
ctx, req);
ctx);
}

final Endpoint endpoint = ctx.endpoint();
Expand All @@ -84,21 +84,27 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
// and response created here will be exposed only when `EndpointGroup.select()` returned `null`.
//
// See `DefaultClientRequestContext.init()` for more information.
return earlyFailedResponse(EmptyEndpointGroupException.get(ctx.endpointGroup()), ctx, req);
return earlyFailedResponse(EmptyEndpointGroupException.get(ctx.endpointGroup()), ctx);
}

final SessionProtocol protocol = ctx.sessionProtocol();
final ProxyConfig proxyConfig;
try {
proxyConfig = getProxyConfig(protocol, endpoint);
} catch (Throwable t) {
return earlyFailedResponse(t, ctx, req);
return earlyFailedResponse(t, ctx);
}

final Throwable cancellationCause = ctx.cancellationCause();
if (cancellationCause != null) {
return earlyFailedResponse(cancellationCause, ctx);
}

final Endpoint endpointWithPort = endpoint.withDefaultPort(ctx.sessionProtocol());
final EventLoop eventLoop = ctx.eventLoop().withoutContext();
// TODO(ikhoon) Use ctx.exchangeType() to create an optimized HttpResponse for non-streaming response.
final DecodedHttpResponse res = new DecodedHttpResponse(eventLoop);
updateCancellationTask(ctx, req, res);

final ClientConnectionTimingsBuilder timingsBuilder = ClientConnectionTimings.builder();

Expand All @@ -115,14 +121,31 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex
acquireConnectionAndExecute(ctx, resolved, req, res, timingsBuilder, proxyConfig);
} else {
ctx.logBuilder().session(null, ctx.sessionProtocol(), timingsBuilder.build());
earlyFailedResponse(cause, ctx, req, res);
ctx.cancel(cause);
}
});
}

return res;
}

private static void updateCancellationTask(ClientRequestContext ctx, HttpRequest req,
DecodedHttpResponse res) {
final ClientRequestContextExtension ctxExt = ctx.as(ClientRequestContextExtension.class);
if (ctxExt == null) {
return;
}
ctxExt.responseCancellationScheduler().updateTask(cause -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggested removing updateTask() method, but that would result in too many changes to the existing code. As long as the implemetation is correct, I'm okay to maintain the current style and focus on implementing the response timeout mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, updateTask() doesn't necessarily have to be removed - it can still be used internally since the old CancellationScheduler#start acted similarly to updateTask().

It is probably more important to unify and simplify the request/response abortion mechanism. This seems difficult to do at the moment for client-side. In particular, when one attempts this one should take note of 1) consistency between [req|res].cause and log.[req|res]Cause, 2) setting the correct exception type (e.g. UnprocessedRequestException).

try (SafeCloseable ignored = RequestContextUtil.pop()) {
final UnprocessedRequestException ure = UnprocessedRequestException.of(cause);
req.abort(ure);
ctx.logBuilder().endRequest(ure);
res.close(ure);
ctx.logBuilder().endResponse(ure);
}
});
}

private void resolveAddress(Endpoint endpoint, ClientRequestContext ctx,
BiConsumer<@Nullable Endpoint, @Nullable Throwable> onComplete) {

Expand Down Expand Up @@ -169,7 +192,7 @@ private void acquireConnectionAndExecute0(ClientRequestContext ctx, Endpoint end
try {
pool = factory.pool(ctx.eventLoop().withoutContext());
} catch (Throwable t) {
earlyFailedResponse(t, ctx, req, res);
ctx.cancel(t);
return;
}
final SessionProtocol protocol = ctx.sessionProtocol();
Expand All @@ -185,7 +208,7 @@ private void acquireConnectionAndExecute0(ClientRequestContext ctx, Endpoint end
if (cause == null) {
doExecute(newPooledChannel, ctx, req, res);
} else {
earlyFailedResponse(cause, ctx, req, res);
ctx.cancel(cause);
}
return null;
});
Expand Down Expand Up @@ -224,30 +247,12 @@ private static void logSession(ClientRequestContext ctx, @Nullable PooledChannel
}
}

private static HttpResponse earlyFailedResponse(Throwable t, ClientRequestContext ctx, HttpRequest req) {
private static HttpResponse earlyFailedResponse(Throwable t, ClientRequestContext ctx) {
final UnprocessedRequestException cause = UnprocessedRequestException.of(t);
handleEarlyRequestException(ctx, req, cause);
ctx.cancel(cause);
return HttpResponse.ofFailure(cause);
}

private static void earlyFailedResponse(Throwable t, ClientRequestContext ctx, HttpRequest req,
DecodedHttpResponse res) {
final UnprocessedRequestException cause = UnprocessedRequestException.of(t);
handleEarlyRequestException(ctx, req, cause);
res.close(cause);
}

private static void handleEarlyRequestException(ClientRequestContext ctx,
HttpRequest req, Throwable cause) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
req.abort(cause);
final RequestLogBuilder logBuilder = ctx.logBuilder();
logBuilder.endRequest(cause);
logBuilder.endResponse(cause);
ctx.cancel(cause);
}
}

private static void doExecute(PooledChannel pooledChannel, ClientRequestContext ctx,
HttpRequest req, DecodedHttpResponse res) {
final Channel channel = pooledChannel.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import com.linecorp.armeria.common.stream.StreamWriter;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.client.ClientRequestContextExtension;
import com.linecorp.armeria.internal.client.DecodedHttpResponse;
import com.linecorp.armeria.internal.common.CancellationScheduler;
import com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.unsafe.PooledObjects;

import io.netty.channel.EventLoop;
Expand Down Expand Up @@ -213,7 +215,7 @@ void close(@Nullable Throwable cause, boolean cancel) {
}
done = true;
closed = true;
cancelTimeoutOrLog(cause, cancel);
cancelTimeoutAndLog(cause, cancel);
final HttpRequest request = ctx.request();
assert request != null;
if (cause != null) {
Expand Down Expand Up @@ -250,32 +252,24 @@ private void cancelAction(@Nullable Throwable cause) {
}
}

private void cancelTimeoutOrLog(@Nullable Throwable cause, boolean cancel) {
CancellationScheduler responseCancellationScheduler = null;
private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) {
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
if (ctxExtension != null) {
responseCancellationScheduler = ctxExtension.responseCancellationScheduler();
// best-effort attempt to cancel the scheduled timeout task so that RequestContext#cause
// isn't set unnecessarily
ctxExtension.responseCancellationScheduler().cancelScheduled();
}

if (responseCancellationScheduler == null || !responseCancellationScheduler.isFinished()) {
if (responseCancellationScheduler != null) {
responseCancellationScheduler.clearTimeout(false);
}
// There's no timeout or the response has not been timed out.
if (cancel) {
cancelAction(cause);
} else {
closeAction(cause);
}
if (cancel) {
cancelAction(cause);
return;
}
if (delegate.isOpen()) {
closeAction(cause);
}

// Response has been timed out already.
// Log only when it's not a ResponseTimeoutException.
if (cause instanceof ResponseTimeoutException) {
// the context has been cancelled either by timeout or by user invocation
if (cause == ctx.cancellationCause()) {
return;
}

Expand All @@ -299,7 +293,8 @@ void initTimeout() {
if (ctxExtension != null) {
final CancellationScheduler responseCancellationScheduler =
ctxExtension.responseCancellationScheduler();
responseCancellationScheduler.start(newCancellationTask());
responseCancellationScheduler.updateTask(newCancellationTask());
responseCancellationScheduler.start();
}
}

Expand All @@ -312,11 +307,13 @@ public boolean canSchedule() {

@Override
public void run(Throwable cause) {
delegate.close(cause);
final HttpRequest request = ctx.request();
assert request != null;
request.abort(cause);
ctx.logBuilder().endResponse(cause);
if (ctx.eventLoop().inEventLoop()) {
try (SafeCloseable ignored = RequestContextUtil.pop()) {
close(cause);
}
} else {
ctx.eventLoop().withoutContext().execute(() -> close(cause));
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ default void setRequestAutoAbortDelay(Duration delay) {

/**
* Returns the cause of cancellation, {@code null} if the request has not been cancelled.
* Note that there is no guarantee that the cancellation cause is equivalent to the cause of failure
* for {@link HttpRequest} or {@link HttpResponse}. Refer to {@link RequestLog#requestCause()}
* or {@link RequestLog#responseCause()} for the exact reason why a request or response failed.
*/
@Nullable
Throwable cancellationCause();
Expand Down
Loading
Loading