Skip to content

Commit

Permalink
Fix a bug where RequestLog does not complete when HttpResponse is…
Browse files Browse the repository at this point in the history
… cancelled (#5312)

Motivation:

An `HttpResponse` is cancelled if either the `ResponseHeaders` is `END_OF_STREAM` or the content is expected to be empty. https://github.com/line/armeria/blob/9ddd6c1b76fdc0fe8943dd417e80773cb92eebdc/core/src/main/java/com/linecorp/armeria/server/HttpResponseSubscriber.java#L66-L67

If `SubscriptionOption.NOTIFY_CANCELLATION` is set when subscribing, `CancellationSubscripitonException` should be propagated through `Subscriber.onError()`. However, there was a bug in `FuseableStreamMessage` where this error was not propagated. `onError()` signal is ignored if the `StreamMessage` has been canceled.
https://github.com/line/armeria/blob/756c17797f95c1ddc7afbb8038ad046c080ba605/core/src/main/java/com/linecorp/armeria/common/stream/FuseableStreamMessage.java#L297-L300

Apart from that, there are other problems. `CancellationSubscripitonException` may not be delivered to `Subscriber.onError()` even though `SubscriptionOption.NOTIFY_CANCELLATION` is set if a `StreamMessage` has already published all elements. `Subscription.cancel()` event will be silently ignored.
https://github.com/line/armeria/blob/871d87297e4d051241589cb1ae95641cbc83f880/core/src/main/java/com/linecorp/armeria/internal/common/stream/OneElementFixedStreamMessage.java#L92-L95

Modifications:

- Intercept using `Subscription.cancel()` in `FilteredStreamMessage` and complete `FilteredStreamMessage.whenComplete()` with `CancelledSubscriptionException` instead of relying on `NOTIFY_CANCELLATION` option.
- Fix `FuseableStreamMessage` to notify cancellation event via `Subscriber.onError()` when `NOTIFY_CANCELLATION` option is set.

Result:

Fix a bug where `RequestLog` may be not completed when an `HttpResponse` only has `ResponsesHeaders`.
  • Loading branch information
ikhoon authored Dec 8, 2023
1 parent a0197b5 commit 4d29604
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package com.linecorp.armeria.common.stream;

import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.CANCELLATION_AND_POOLED_OPTIONS;
import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.CANCELLATION_OPTION;
import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.EMPTY_OPTIONS;
import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.POOLED_OBJECTS;
import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.containsNotifyCancellation;
import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.containsWithPooledObjects;
import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.toSubscriptionOptions;
import static java.util.Objects.requireNonNull;

import java.util.List;
Expand Down Expand Up @@ -227,15 +226,12 @@ public final void subscribe(Subscriber<? super U> subscriber, EventExecutor exec
}

private void subscribe(Subscriber<? super U> subscriber, EventExecutor executor,
boolean withPooledObjects,
boolean notifyCancellation) {
boolean withPooledObjects, boolean notifyCancellation) {
final FilteringSubscriber filteringSubscriber = new FilteringSubscriber(
subscriber, withPooledObjects, notifyCancellation);
if (filterSupportsPooledObjects) {
upstream.subscribe(filteringSubscriber, executor, CANCELLATION_AND_POOLED_OPTIONS);
} else {
upstream.subscribe(filteringSubscriber, executor, CANCELLATION_OPTION);
}
subscriber, withPooledObjects);
final SubscriptionOption[] options = toSubscriptionOptions(filterSupportsPooledObjects,
notifyCancellation);
upstream.subscribe(filteringSubscriber, executor, options);
}

@Override
Expand All @@ -253,36 +249,47 @@ public final void abort(Throwable cause) {
upstream.abort(requireNonNull(cause, "cause"));
}

private final class FilteringSubscriber implements Subscriber<T> {
private final class FilteringSubscriber implements Subscription, Subscriber<T> {

private final Subscriber<? super U> delegate;
private final boolean subscribedWithPooledObjects;
private final boolean notifyCancellation;

private boolean completed;
@Nullable
private Subscription upstream;

FilteringSubscriber(Subscriber<? super U> delegate, boolean subscribedWithPooledObjects,
boolean notifyCancellation) {
FilteringSubscriber(Subscriber<? super U> delegate, boolean subscribedWithPooledObjects) {
this.delegate = requireNonNull(delegate, "delegate");
this.subscribedWithPooledObjects = subscribedWithPooledObjects;
this.notifyCancellation = notifyCancellation;
}

@Override
public void request(long n) {
assert upstream != null;
upstream.request(n);
}

@Override
public void cancel() {
assert upstream != null;
onCancellation(delegate);
completionFuture.completeExceptionally(CancelledSubscriptionException.get());
upstream.cancel();
}

@Override
public void onSubscribe(Subscription s) {
upstream = s;
try {
beforeSubscribe(delegate, s);
beforeSubscribe(delegate, this);
} catch (Throwable ex) {
s.cancel();
logger.warn("Unexpected exception from {}#beforeSubscribe()",
FilteredStreamMessage.this.getClass().getName(), ex);
return;
}

delegate.onSubscribe(s);
delegate.onSubscribe(this);
}

@Override
Expand Down Expand Up @@ -310,14 +317,6 @@ public void onNext(T o) {

@Override
public void onError(Throwable t) {
if (t instanceof CancelledSubscriptionException) {
onCancellation(delegate);
completionFuture.completeExceptionally(t);
if (!notifyCancellation) {
return;
}
}

if (completed) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.armeria.common.stream;

import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.containsNotifyCancellation;
import static com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil.containsWithPooledObjects;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -214,7 +215,9 @@ public void subscribe(Subscriber<? super U> subscriber, EventExecutor executor,
requireNonNull(options, "options");

source.subscribe(new FuseableSubscriber<>(subscriber, function, errorFunction,
containsWithPooledObjects(options)), executor, options);
containsWithPooledObjects(options),
containsNotifyCancellation(options)),
executor, options);
}

@Override
Expand All @@ -236,18 +239,21 @@ private static final class FuseableSubscriber<U> implements Subscriber<Object>,
@Nullable
private final Function<Throwable, Throwable> errorFunction;
private final boolean withPooledObjects;
private final boolean notifyCancellation;

@Nullable
private volatile Subscription upstream;
private volatile boolean canceled;

FuseableSubscriber(Subscriber<? super U> downstream, @Nullable MapperFunction<Object, U> function,
@Nullable Function<Throwable, Throwable> errorFunction, boolean withPooledObjects) {
@Nullable Function<Throwable, Throwable> errorFunction, boolean withPooledObjects,
boolean notifyCancellation) {
requireNonNull(downstream, "downstream");
this.downstream = downstream;
this.function = function;
this.errorFunction = errorFunction;
this.withPooledObjects = withPooledObjects;
this.notifyCancellation = notifyCancellation;
}

@Override
Expand Down Expand Up @@ -295,6 +301,9 @@ public void onNext(Object item) {
public void onError(Throwable cause) {
requireNonNull(cause, "cause");
if (canceled) {
if (notifyCancellation && cause instanceof CancelledSubscriptionException) {
downstream.onError(cause);
}
return;
}
canceled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,22 @@ public static boolean containsNotifyCancellation(SubscriptionOption... options)
return false;
}

public static SubscriptionOption[] toSubscriptionOptions(boolean withPooledObjects,
boolean notifyCancellation) {
if (withPooledObjects) {
if (notifyCancellation) {
return CANCELLATION_AND_POOLED_OPTIONS;
} else {
return POOLED_OBJECTS;
}
} else {
if (notifyCancellation) {
return CANCELLATION_OPTION;
} else {
return EMPTY_OPTIONS;
}
}
}

private InternalStreamMessageUtil() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/

package com.linecorp.armeria.server.logging;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.concurrent.CompletionException;
import java.util.function.Function;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.linecorp.armeria.client.BlockingWebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpObject;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.logging.ContentPreviewerFactory;
import com.linecorp.armeria.common.metric.MeterIdPrefixFunction;
import com.linecorp.armeria.common.metric.PrometheusMeterRegistries;
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.common.util.Functions;
import com.linecorp.armeria.internal.logging.ContentPreviewingUtil;
import com.linecorp.armeria.internal.testing.ImmediateEventLoop;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ExceptionHandler;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.cors.CorsService;
import com.linecorp.armeria.server.metric.MetricCollectingService;
import com.linecorp.armeria.server.metric.PrometheusExpositionService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

import io.micrometer.prometheus.PrometheusMeterRegistry;

class ContentPreviewerCancellationTest {

@RegisterExtension
static final ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
final PrometheusMeterRegistry registry = PrometheusMeterRegistries.newRegistry();
sb.meterRegistry(registry)
.decorator(
MetricCollectingService.newDecorator(
MeterIdPrefixFunction.ofDefault("armeria.server")))
.decorator(ContentPreviewingService.newDecorator(Integer.MAX_VALUE))
.decorator(
CorsService
.builderForAnyOrigin()
.allowCredentials()
.allowRequestMethods(HttpMethod.POST, HttpMethod.GET)
.maxAge(3600)
.newDecorator()
)
.annotatedService(new TestService())
.service("/metrics", PrometheusExpositionService.of(registry.getPrometheusRegistry()));
}
};

@Test
void shouldCompleteLogWithNoContentResponse() throws InterruptedException {
final BlockingWebClient client = server.blockingWebClient();
final AggregatedHttpResponse response = client.get("/test");
assertThat(response.status()).isEqualTo(HttpStatus.NO_CONTENT);
final ServiceRequestContext ctx = server.requestContextCaptor().take();
// Make sure the log is complete.
ctx.log().whenComplete().join();
}

@Test
void shouldCompleteContentPreviewerResponseWhenCancelled() {
final ContentPreviewerFactory previewerFactory = ContentPreviewerFactory.text(100);
final ServiceRequestContext ctx = ServiceRequestContext.of(HttpRequest.of(HttpMethod.GET, "/"));
HttpResponse response = HttpResponse.of(HttpStatus.NO_CONTENT);
response = response.recover(cause -> null);
response = response.mapHeaders(Function.identity());
final HttpResponse contentPreviewingResponse =
ContentPreviewingUtil.setUpResponseContentPreviewer(previewerFactory, ctx, response,
Functions.second());
contentPreviewingResponse.subscribe(new Subscriber<HttpObject>() {

@Nullable
private Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
subscription = s;
s.request(1);
}

@Override
public void onNext(HttpObject httpObject) {
assertThat(httpObject).isInstanceOf(ResponseHeaders.class);
subscription.cancel();
}

@Override
public void onError(Throwable t) {}

@Override
public void onComplete() {}
}, ImmediateEventLoop.INSTANCE);

assertThatThrownBy(() -> {
contentPreviewingResponse.whenComplete().join();
}).isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(CancelledSubscriptionException.class);
}

@ExceptionHandler(SomeExceptionHandler.class)
private static class TestService {
@Get("/test")
public void noContent() {}
}

private static final class SomeExceptionHandler implements ExceptionHandlerFunction {

@Override
public HttpResponse handleException(ServiceRequestContext ctx, HttpRequest req, Throwable cause) {
return ExceptionHandlerFunction.fallthrough();
}
}
}

0 comments on commit 4d29604

Please sign in to comment.