Skip to content

Commit

Permalink
Respond with 408 status when the server didn't receive the request fu…
Browse files Browse the repository at this point in the history
…lly (#5680)

Motivation:
Currently, the server always responds with a 503 status if a `RequestTimeoutException` is raised. However, according to the RFC 9110, the correct response in this case should be a 408 status code. https://httpwg.org/specs/rfc9110.html#status.408
```
The 408 (Request Timeout) status code indicates that the server did not receive a complete request message within the time that it was prepared to wait.
```

Modification:
- Introduced `DecodedHttpRequest.isNormallyClosed()` to check if the request was received fully.
- Updated the server to send a 408 response when a request times out and the service didn't receive the request fully.

Result:
- The server now returns a 408 status if a service didn't receive the request fully and the request times out.
- Issue #5579 has been closed.
  • Loading branch information
minwoox authored May 31, 2024
1 parent e263d76 commit 2112f50
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,8 @@ public CompletableFuture<List<T>> collect(EventExecutor executor, SubscriptionOp
}

private static SubscriptionImpl noopSubscription() {
final DefaultStreamMessage<?> streamMessage = new DefaultStreamMessage<>();
streamMessage.close();
return new SubscriptionImpl(streamMessage, NoopSubscriber.get(), ImmediateEventExecutor.INSTANCE,
EMPTY_OPTIONS);
return new SubscriptionImpl(NoopCancellableStreamMessage.INSTANCE, NoopSubscriber.get(),
ImmediateEventExecutor.INSTANCE, EMPTY_OPTIONS);
}

private final class ForwardingSubscriber implements Subscriber<T> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.common.stream;

final class NoopCancellableStreamMessage extends CancellableStreamMessage<Object> {

static final NoopCancellableStreamMessage INSTANCE = new NoopCancellableStreamMessage();

@Override
SubscriptionImpl subscribe(SubscriptionImpl subscription) {
throw new UnsupportedOperationException();
}

@Override
void request(long n) {}

@Override
void cancel() {}

@Override
public boolean isOpen() {
return false;
}

@Override
public boolean isEmpty() {
return false;
}

@Override
public long demand() {
return 0;
}

@Override
public void abort() {}

@Override
public void abort(Throwable cause) {}

private NoopCancellableStreamMessage() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ final class AggregatingDecodedHttpRequest extends AggregatingStreamMessage<HttpO
@Nullable
private Throwable abortResponseCause;

private boolean isNormallyClosed;

private final CompletableFuture<Void> aggregationFuture = new CompletableFuture<>();

AggregatingDecodedHttpRequest(EventLoop eventLoop, int id, int streamId, RequestHeaders headers,
Expand Down Expand Up @@ -194,6 +196,7 @@ public void abortResponse(Throwable cause, boolean cancel) {

@Override
public void close() {
isNormallyClosed = true;
super.close();
aggregationFuture.complete(null);
}
Expand All @@ -204,6 +207,11 @@ public void close(Throwable cause) {
aggregationFuture.complete(null);
}

@Override
public boolean isClosedSuccessfully() {
return isNormallyClosed;
}

@Override
public void abort() {
super.abort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ static DecodedHttpRequest of(boolean endOfStream, EventLoop eventLoop, int id, i

void close(Throwable cause);

boolean isClosedSuccessfully();

/**
* Sets the specified {@link HttpResponse} which responds to this request. This is always called
* by the {@link HttpServerHandler} after the handler gets the {@link HttpResponse} from an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.common.RequestContextExtension;
import com.linecorp.armeria.internal.common.util.TemporaryThreadLocals;
import com.linecorp.armeria.server.annotation.AnnotatedService;

Expand Down Expand Up @@ -86,8 +87,17 @@ public HttpResponse onServiceException(ServiceRequestContext ctx, Throwable caus
}

if (cause instanceof RequestTimeoutException) {
return internalRenderStatus(ctx, ctx.request().headers(),
HttpStatus.SERVICE_UNAVAILABLE, cause);
final HttpStatus status;
final RequestContextExtension ctxExtension = ctx.as(RequestContextExtension.class);
assert ctxExtension != null;
final DecodedHttpRequest request = (DecodedHttpRequest) ctxExtension.originalRequest();
if (request.isClosedSuccessfully()) {
status = HttpStatus.SERVICE_UNAVAILABLE;
} else {
// The server didn't receive the request fully yet.
status = HttpStatus.REQUEST_TIMEOUT;
}
return internalRenderStatus(ctx, ctx.request().headers(), status, cause);
}

return internalRenderStatus(ctx, ctx.request().headers(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ public void close() {}
@Override
public void close(Throwable cause) {}

@Override
public boolean isClosedSuccessfully() {
return true;
}

@Override
public void setResponse(HttpResponse response) {
// TODO(ikhoon): Dedup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ final class StreamingDecodedHttpRequest extends DefaultHttpRequest implements De
@Nullable
private Throwable abortResponseCause;

private boolean isNormallyClosed;

StreamingDecodedHttpRequest(EventLoop eventLoop, int id, int streamId, RequestHeaders headers,
boolean keepAlive, InboundTrafficController inboundTrafficController,
long maxRequestLength, RoutingContext routingCtx, ExchangeType exchangeType,
Expand Down Expand Up @@ -165,6 +167,17 @@ protected void onRemoval(HttpObject obj) {
}
}

@Override
public void close() {
isNormallyClosed = true;
super.close();
}

@Override
public boolean isClosedSuccessfully() {
return isNormallyClosed;
}

@Override
public void setResponse(HttpResponse response) {
if (abortResponseCause != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ void asynchronouslyHandleFailedRequest(SessionProtocol protocol) {
// The service waits for the full request body and then responds.
assertThat(WebClient.of(protocol, server.endpoint(protocol))
.execute(request).aggregate().join().status())
.isSameAs(HttpStatus.SERVICE_UNAVAILABLE);
.isSameAs(HttpStatus.REQUEST_TIMEOUT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,18 @@ public ExchangeType exchangeType(RoutingContext routingContext) {
request.write(HttpData.ofUtf8("foo"));
}
request.close();
assertThat(WebClient.of(protocol, server.endpoint(protocol))
.execute(request).aggregate().join().status())
.isSameAs(HttpStatus.SERVICE_UNAVAILABLE);

if (!exchangeType.isRequestStreaming() || (!clientSendsOneBody && !useServiceEventLoop)) {
// The server received the request fully in these cases.
assertThat(WebClient.of(protocol, server.endpoint(protocol))
.execute(request).aggregate().join().status())
.isSameAs(HttpStatus.SERVICE_UNAVAILABLE);
} else {
// The server did or did not receive the request fully depending on the thread scheduling timing.
assertThat(WebClient.of(protocol, server.endpoint(protocol))
.execute(request).aggregate().join().status())
.isIn(HttpStatus.REQUEST_TIMEOUT, HttpStatus.SERVICE_UNAVAILABLE);
}
}

private static class TimeoutByTimerArgumentsProvider implements ArgumentsProvider {
Expand Down Expand Up @@ -147,12 +156,16 @@ public ExchangeType exchangeType(RoutingContext routingContext) {
});
});
final HttpRequestWriter request = HttpRequest.streaming(RequestHeaders.of(HttpMethod.POST, "/"));
final HttpStatus expectedStatus;
if (clientCloseRequest) {
request.close();
expectedStatus = HttpStatus.SERVICE_UNAVAILABLE;
} else {
expectedStatus = HttpStatus.REQUEST_TIMEOUT;
}
assertThat(WebClient.builder(protocol, server.endpoint(protocol))
.build()
.execute(request).aggregate().join().status())
.isSameAs(HttpStatus.SERVICE_UNAVAILABLE);
.isSameAs(expectedStatus);
}
}

0 comments on commit 2112f50

Please sign in to comment.