diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java index e1d9bd4e724..1e8e2d74936 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/DeferredStreamMessage.java @@ -482,10 +482,8 @@ public CompletableFuture> collect(EventExecutor executor, SubscriptionOp } private static SubscriptionImpl noopSubscription() { - final DefaultStreamMessage streamMessage = new DefaultStreamMessage<>(); - streamMessage.close(); - return new SubscriptionImpl(streamMessage, NoopSubscriber.get(), ImmediateEventExecutor.INSTANCE, - EMPTY_OPTIONS); + return new SubscriptionImpl(NoopCancellableStreamMessage.INSTANCE, NoopSubscriber.get(), + ImmediateEventExecutor.INSTANCE, EMPTY_OPTIONS); } private final class ForwardingSubscriber implements Subscriber { diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/NoopCancellableStreamMessage.java b/core/src/main/java/com/linecorp/armeria/common/stream/NoopCancellableStreamMessage.java new file mode 100644 index 00000000000..a9ffbc8f78c --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/common/stream/NoopCancellableStreamMessage.java @@ -0,0 +1,55 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.common.stream; + +final class NoopCancellableStreamMessage extends CancellableStreamMessage { + + static final NoopCancellableStreamMessage INSTANCE = new NoopCancellableStreamMessage(); + + @Override + SubscriptionImpl subscribe(SubscriptionImpl subscription) { + throw new UnsupportedOperationException(); + } + + @Override + void request(long n) {} + + @Override + void cancel() {} + + @Override + public boolean isOpen() { + return false; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public long demand() { + return 0; + } + + @Override + public void abort() {} + + @Override + public void abort(Throwable cause) {} + + private NoopCancellableStreamMessage() {} +} diff --git a/core/src/main/java/com/linecorp/armeria/server/AggregatingDecodedHttpRequest.java b/core/src/main/java/com/linecorp/armeria/server/AggregatingDecodedHttpRequest.java index 15e599d58a3..73331f1f252 100644 --- a/core/src/main/java/com/linecorp/armeria/server/AggregatingDecodedHttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/server/AggregatingDecodedHttpRequest.java @@ -56,6 +56,8 @@ final class AggregatingDecodedHttpRequest extends AggregatingStreamMessage aggregationFuture = new CompletableFuture<>(); AggregatingDecodedHttpRequest(EventLoop eventLoop, int id, int streamId, RequestHeaders headers, @@ -194,6 +196,7 @@ public void abortResponse(Throwable cause, boolean cancel) { @Override public void close() { + isNormallyClosed = true; super.close(); aggregationFuture.complete(null); } @@ -204,6 +207,11 @@ public void close(Throwable cause) { aggregationFuture.complete(null); } + @Override + public boolean isClosedSuccessfully() { + return isNormallyClosed; + } + @Override public void abort() { super.abort(); diff --git a/core/src/main/java/com/linecorp/armeria/server/DecodedHttpRequest.java b/core/src/main/java/com/linecorp/armeria/server/DecodedHttpRequest.java index fe10d9135c2..a298c53c690 100644 --- a/core/src/main/java/com/linecorp/armeria/server/DecodedHttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/server/DecodedHttpRequest.java @@ -88,6 +88,8 @@ static DecodedHttpRequest of(boolean endOfStream, EventLoop eventLoop, int id, i void close(Throwable cause); + boolean isClosedSuccessfully(); + /** * Sets the specified {@link HttpResponse} which responds to this request. This is always called * by the {@link HttpServerHandler} after the handler gets the {@link HttpResponse} from an diff --git a/core/src/main/java/com/linecorp/armeria/server/DefaultServerErrorHandler.java b/core/src/main/java/com/linecorp/armeria/server/DefaultServerErrorHandler.java index 163afa9f808..740a72e3148 100644 --- a/core/src/main/java/com/linecorp/armeria/server/DefaultServerErrorHandler.java +++ b/core/src/main/java/com/linecorp/armeria/server/DefaultServerErrorHandler.java @@ -32,6 +32,7 @@ import com.linecorp.armeria.common.ResponseHeaders; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.Exceptions; +import com.linecorp.armeria.internal.common.RequestContextExtension; import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals; import com.linecorp.armeria.server.annotation.AnnotatedService; @@ -86,8 +87,17 @@ public HttpResponse onServiceException(ServiceRequestContext ctx, Throwable caus } if (cause instanceof RequestTimeoutException) { - return internalRenderStatus(ctx, ctx.request().headers(), - HttpStatus.SERVICE_UNAVAILABLE, cause); + final HttpStatus status; + final RequestContextExtension ctxExtension = ctx.as(RequestContextExtension.class); + assert ctxExtension != null; + final DecodedHttpRequest request = (DecodedHttpRequest) ctxExtension.originalRequest(); + if (request.isClosedSuccessfully()) { + status = HttpStatus.SERVICE_UNAVAILABLE; + } else { + // The server didn't receive the request fully yet. + status = HttpStatus.REQUEST_TIMEOUT; + } + return internalRenderStatus(ctx, ctx.request().headers(), status, cause); } return internalRenderStatus(ctx, ctx.request().headers(), diff --git a/core/src/main/java/com/linecorp/armeria/server/EmptyContentDecodedHttpRequest.java b/core/src/main/java/com/linecorp/armeria/server/EmptyContentDecodedHttpRequest.java index 10fd7db7ff5..b71b57b64c5 100644 --- a/core/src/main/java/com/linecorp/armeria/server/EmptyContentDecodedHttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/server/EmptyContentDecodedHttpRequest.java @@ -192,6 +192,11 @@ public void close() {} @Override public void close(Throwable cause) {} + @Override + public boolean isClosedSuccessfully() { + return true; + } + @Override public void setResponse(HttpResponse response) { // TODO(ikhoon): Dedup diff --git a/core/src/main/java/com/linecorp/armeria/server/StreamingDecodedHttpRequest.java b/core/src/main/java/com/linecorp/armeria/server/StreamingDecodedHttpRequest.java index f3ac24af3aa..99b894a5945 100644 --- a/core/src/main/java/com/linecorp/armeria/server/StreamingDecodedHttpRequest.java +++ b/core/src/main/java/com/linecorp/armeria/server/StreamingDecodedHttpRequest.java @@ -54,6 +54,8 @@ final class StreamingDecodedHttpRequest extends DefaultHttpRequest implements De @Nullable private Throwable abortResponseCause; + private boolean isNormallyClosed; + StreamingDecodedHttpRequest(EventLoop eventLoop, int id, int streamId, RequestHeaders headers, boolean keepAlive, InboundTrafficController inboundTrafficController, long maxRequestLength, RoutingContext routingCtx, ExchangeType exchangeType, @@ -165,6 +167,17 @@ protected void onRemoval(HttpObject obj) { } } + @Override + public void close() { + isNormallyClosed = true; + super.close(); + } + + @Override + public boolean isClosedSuccessfully() { + return isNormallyClosed; + } + @Override public void setResponse(HttpResponse response) { if (abortResponseCause != null) { diff --git a/core/src/test/java/com/linecorp/armeria/server/AggregatingRequestTimeoutTest.java b/core/src/test/java/com/linecorp/armeria/server/AggregatingRequestTimeoutTest.java index 732518532a3..9e5f54b500e 100644 --- a/core/src/test/java/com/linecorp/armeria/server/AggregatingRequestTimeoutTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/AggregatingRequestTimeoutTest.java @@ -91,6 +91,6 @@ void asynchronouslyHandleFailedRequest(SessionProtocol protocol) { // The service waits for the full request body and then responds. assertThat(WebClient.of(protocol, server.endpoint(protocol)) .execute(request).aggregate().join().status()) - .isSameAs(HttpStatus.SERVICE_UNAVAILABLE); + .isSameAs(HttpStatus.REQUEST_TIMEOUT); } } diff --git a/core/src/test/java/com/linecorp/armeria/server/RequestTimeoutTest.java b/core/src/test/java/com/linecorp/armeria/server/RequestTimeoutTest.java index f695d732c56..377802d3509 100644 --- a/core/src/test/java/com/linecorp/armeria/server/RequestTimeoutTest.java +++ b/core/src/test/java/com/linecorp/armeria/server/RequestTimeoutTest.java @@ -103,9 +103,18 @@ public ExchangeType exchangeType(RoutingContext routingContext) { request.write(HttpData.ofUtf8("foo")); } request.close(); - assertThat(WebClient.of(protocol, server.endpoint(protocol)) - .execute(request).aggregate().join().status()) - .isSameAs(HttpStatus.SERVICE_UNAVAILABLE); + + if (!exchangeType.isRequestStreaming() || (!clientSendsOneBody && !useServiceEventLoop)) { + // The server received the request fully in these cases. + assertThat(WebClient.of(protocol, server.endpoint(protocol)) + .execute(request).aggregate().join().status()) + .isSameAs(HttpStatus.SERVICE_UNAVAILABLE); + } else { + // The server did or did not receive the request fully depending on the thread scheduling timing. + assertThat(WebClient.of(protocol, server.endpoint(protocol)) + .execute(request).aggregate().join().status()) + .isIn(HttpStatus.REQUEST_TIMEOUT, HttpStatus.SERVICE_UNAVAILABLE); + } } private static class TimeoutByTimerArgumentsProvider implements ArgumentsProvider { @@ -147,12 +156,16 @@ public ExchangeType exchangeType(RoutingContext routingContext) { }); }); final HttpRequestWriter request = HttpRequest.streaming(RequestHeaders.of(HttpMethod.POST, "/")); + final HttpStatus expectedStatus; if (clientCloseRequest) { request.close(); + expectedStatus = HttpStatus.SERVICE_UNAVAILABLE; + } else { + expectedStatus = HttpStatus.REQUEST_TIMEOUT; } assertThat(WebClient.builder(protocol, server.endpoint(protocol)) .build() .execute(request).aggregate().join().status()) - .isSameAs(HttpStatus.SERVICE_UNAVAILABLE); + .isSameAs(expectedStatus); } }