Skip to content

Commit

Permalink
minimal impl
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhee17 committed Dec 9, 2024
1 parent fa76e99 commit 5e8111c
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ public interface ClientRequestContextExtension extends ClientRequestContext, Req
* with default values on every request.
*/
HttpHeaders internalRequestHeaders();

long remainingTimeoutNanos();
}
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ private DefaultClientRequestContext(DefaultClientRequestContext ctx,
// Cancel the original timeout and create a new scheduler for the derived context.
ctx.responseCancellationScheduler.cancelScheduled();
responseCancellationScheduler =
CancellationScheduler.ofClient(TimeUnit.MILLISECONDS.toNanos(ctx.responseTimeoutMillis()));
CancellationScheduler.ofClient(ctx.remainingTimeoutNanos());
writeTimeoutMillis = ctx.writeTimeoutMillis();
maxResponseLength = ctx.maxResponseLength();

Expand Down Expand Up @@ -898,6 +898,11 @@ public HttpHeaders internalRequestHeaders() {
return internalRequestHeaders;
}

@Override
public long remainingTimeoutNanos() {
return responseCancellationScheduler().remainingTimeoutNanos();
}

@Override
public void setAdditionalRequestHeader(CharSequence name, Object value) {
requireNonNull(name, "name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ default void finishNow() {
*/
long timeoutNanos();

/**
* Before the scheduler has started, the configured timeout will be returned regardless of the
* {@link TimeoutMode}. If the scheduler has already started, the remaining time will be returned.
*/
long remainingTimeoutNanos();

long startTimeNanos();

CompletableFuture<Throwable> whenCancelling();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,14 @@ public void start() {
if (state != State.INIT) {
return;
}
state = State.SCHEDULED;
startTimeNanos = ticker.read();
if (timeoutMode == TimeoutMode.SET_FROM_NOW) {
final long elapsedTimeNanos = startTimeNanos - setFromNowStartNanos;
timeoutNanos = Long.max(LongMath.saturatedSubtract(timeoutNanos, elapsedTimeNanos), 0);
}

// set the state after all timeout related fields are updated
state = State.SCHEDULED;
if (timeoutNanos != Long.MAX_VALUE) {
scheduledFuture = eventLoop().schedule(() -> invokeTask(null), timeoutNanos, NANOSECONDS);
}
Expand Down Expand Up @@ -292,6 +294,18 @@ public long timeoutNanos() {
return timeoutNanos == Long.MAX_VALUE ? 0 : timeoutNanos;
}

@Override
public long remainingTimeoutNanos() {
if (timeoutNanos == Long.MAX_VALUE) {
return 0;
}
if (!isStarted()) {
return timeoutNanos;
}
final long elapsed = ticker.read() - startTimeNanos;
return Math.max(1, LongMath.saturatedSubtract(timeoutNanos, elapsed));
}

@Override
public long startTimeNanos() {
return startTimeNanos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public long timeoutNanos() {
return 0;
}

@Override
public long remainingTimeoutNanos() {
return 0;
}

@Override
public long startTimeNanos() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client.retry;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;

import java.time.Duration;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;

import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.client.ResponseTimeoutException;
import com.linecorp.armeria.client.ResponseTimeoutMode;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.QueryParams;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

class ResponseTimeoutFromStartTest {

private static final Logger logger = LoggerFactory.getLogger(ResponseTimeoutFromStartTest.class);

@RegisterExtension
static ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.service("/", (ctx, req) -> {
final String delayMillisStr = ctx.queryParam("delayMillis");
assertThat(delayMillisStr).isNotNull();
final int delayMillis = Integer.parseInt(delayMillisStr);
return HttpResponse.delayed(HttpResponse.of(500), Duration.ofMillis(delayMillis));
});
}
};

@ParameterizedTest
@CsvSource({
"0,2500,2000",
"0,1750,2000",
"5000,1500,2000",
})
void originalResponseTimeoutRespected(long backoffMillis, long attemptMillis, long delayMillis) {
final long timeoutSeconds = 3;
final WebClient webClient =
WebClient.builder(server.httpUri())
.responseTimeout(Duration.ofSeconds(timeoutSeconds))
.responseTimeoutMode(ResponseTimeoutMode.FROM_START)
.decorator((delegate, ctx, req) -> {
logger.info("ctx.responseTimeoutMillis: {}", ctx.responseTimeoutMillis());
return delegate.execute(ctx, req);
})
.decorator(
RetryingClient.builder(RetryRule.builder()
.onException()
.onServerErrorStatus()
.thenBackoff(Backoff.fixed(backoffMillis)))
.responseTimeoutForEachAttempt(Duration.ofMillis(attemptMillis))
.maxTotalAttempts(Integer.MAX_VALUE)
.newDecorator())
.build();

final long prev = System.nanoTime();
final Throwable throwable = catchThrowable(
() -> webClient.get("/", QueryParams.of("delayMillis", delayMillis)).aggregate().join());
assertThat(throwable)
.isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(ResponseTimeoutException.class);
logger.debug("elapsed time is: {}ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev));

if (backoffMillis > 0) {
assertThat(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev))
.isLessThan(TimeUnit.SECONDS.toMillis(timeoutSeconds));
} else {

assertThat(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - prev))
.isCloseTo(TimeUnit.SECONDS.toMillis(timeoutSeconds), Percentage.withPercentage(10));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void start(Listener<O> responseListener, Metadata metadata) {
ctx.setResponseTimeout(TimeoutMode.SET_FROM_NOW, Duration.ofNanos(remainingNanos));
}
} else {
remainingNanos = MILLISECONDS.toNanos(ctx.responseTimeoutMillis());
remainingNanos = ctx.remainingTimeoutNanos();
}

// Must come after handling deadline.
Expand Down

0 comments on commit 5e8111c

Please sign in to comment.