From 4de6ed3ac915ec6f086dfbc751c8439a196d4128 Mon Sep 17 00:00:00 2001 From: Chris K Wensel Date: Tue, 19 Dec 2023 15:38:09 -0800 Subject: [PATCH] improved cloudwatch exporter retry functionality the retry will now timeout before any configured lambda timeout period --- .../build.gradle.kts | 3 +- .../cls/substrate/aws/sdk/ClientRetry.java | 37 ++++++++++++++++++- .../substrate/aws/sdk/ClientRetryTest.java | 33 +++++++++++++++++ .../CloudWatchExportActivityConstruct.java | 1 + .../CloudWatchExportActivityProps.java | 16 +++++++- .../CloudWatchExportActivityHandlerTest.java | 1 + .../CloudWatchExportActivityHandler.java | 8 +++- 7 files changed, 93 insertions(+), 6 deletions(-) create mode 100644 clusterless-substrate-aws-common/src/test/java/clusterless/cls/substrate/aws/sdk/ClientRetryTest.java diff --git a/clusterless-substrate-aws-common/build.gradle.kts b/clusterless-substrate-aws-common/build.gradle.kts index 47354b02..8bef3ae2 100644 --- a/clusterless-substrate-aws-common/build.gradle.kts +++ b/clusterless-substrate-aws-common/build.gradle.kts @@ -22,8 +22,7 @@ dependencies { implementation("com.fasterxml.jackson.datatype:jackson-datatype-joda") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") - implementation("io.github.resilience4j:resilience4j-retry") - + api("io.github.resilience4j:resilience4j-retry") api("com.jayway.jsonpath:json-path") implementation("software.amazon.awssdk:s3") diff --git a/clusterless-substrate-aws-common/src/main/java/clusterless/cls/substrate/aws/sdk/ClientRetry.java b/clusterless-substrate-aws-common/src/main/java/clusterless/cls/substrate/aws/sdk/ClientRetry.java index f5576f88..5ff0018a 100644 --- a/clusterless-substrate-aws-common/src/main/java/clusterless/cls/substrate/aws/sdk/ClientRetry.java +++ b/clusterless-substrate-aws-common/src/main/java/clusterless/cls/substrate/aws/sdk/ClientRetry.java @@ -11,6 +11,7 @@ import io.github.resilience4j.core.IntervalFunction; import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.AwsClient; @@ -24,11 +25,45 @@ public class ClientRetry { private final RetryConfig config; private final String client; + @NotNull + public static IntervalFunction exponentialBackoff(Duration initialInterval, double multiplier, Duration maxInterval) { + return IntervalFunction.ofExponentialBackoff(initialInterval, multiplier, maxInterval); + } + + public ClientRetry(String client, int maxAttempts, Duration fixed, Predicate.Response> predicate) { + this(client, maxAttempts, IntervalFunction.of(fixed), predicate); + } + public ClientRetry(String client, int maxAttempts, Predicate.Response> predicate) { + this(client, maxAttempts, exponentialBackoff(Duration.ofSeconds(30), 2.0, Duration.ofMinutes(5)), predicate); + } + + public ClientRetry(String client, Duration maxDuration, IntervalFunction function, Predicate.Response> predicate) { + this(client, maxAttempts(maxDuration, function), function, predicate); + } + + protected static int maxAttempts(Duration maxDuration, IntervalFunction function) { + Duration duration = Duration.ZERO; + + int count = 0; + while (duration.toMillis() < maxDuration.toMillis()) { + count++; + duration = duration.plus(Duration.ofMillis(function.apply(count))); + } + + if (count == 0) { + LOG.warn("calculated max attempts are zero, for maxDuration: {}, returning value of 1 max attempts", maxDuration); + return 1; + } + + return count; + } + + public ClientRetry(String client, int maxAttempts, IntervalFunction function, Predicate.Response> predicate) { this.client = client; this.config = RetryConfig..Response>custom() .maxAttempts(maxAttempts) - .intervalFunction(IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(30), 2.0, Duration.ofMinutes(5))) + .intervalFunction(function) .consumeResultBeforeRetryAttempt((attempt, response) -> LOG.warn("got: {}, for retry attempt: {} of {}", response.errorMessage(), attempt, maxAttempts)) .retryOnResult(predicate) .failAfterMaxAttempts(true) diff --git a/clusterless-substrate-aws-common/src/test/java/clusterless/cls/substrate/aws/sdk/ClientRetryTest.java b/clusterless-substrate-aws-common/src/test/java/clusterless/cls/substrate/aws/sdk/ClientRetryTest.java new file mode 100644 index 00000000..d702432b --- /dev/null +++ b/clusterless-substrate-aws-common/src/test/java/clusterless/cls/substrate/aws/sdk/ClientRetryTest.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2023 Chris K Wensel . All Rights Reserved. + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package clusterless.cls.substrate.aws.sdk; + +import io.github.resilience4j.core.IntervalFunction; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ClientRetryTest { + @Test + void max() { + assertEquals(2, ClientRetry.maxAttempts(Duration.ofSeconds(60), IntervalFunction.of(Duration.ofSeconds(30)))); + assertEquals(2, ClientRetry.maxAttempts(Duration.ofSeconds(60), IntervalFunction.ofExponentialBackoff(30000, 2, 600000))); + + IntervalFunction intervalFunction = IntervalFunction.ofExponentialBackoff(15000, 2, 600000); +// System.out.println("IntStream.range(1,3).mapToLong(intervalFunction::apply).sum() = " + IntStream.range(1, 3).mapToLong(intervalFunction::apply).peek(System.out::println).sum()); + assertEquals(3, ClientRetry.maxAttempts(Duration.ofSeconds(60), intervalFunction)); + + IntervalFunction intervalFunction1 = IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(3), 2, Duration.ofMinutes(1)); +// System.out.println("IntStream.range(1,19).mapToLong(intervalFunction::apply).sum() = " + IntStream.range(1, 19).mapToLong(intervalFunction1::apply).peek(System.out::println).sum()); +// System.out.println("Duration.ofMinutes(15).toMillis() = " + Duration.ofMinutes(15).toMillis()); + assertEquals(19, ClientRetry.maxAttempts(Duration.ofMinutes(15), intervalFunction1)); + } +} diff --git a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/cls/substrate/aws/activity/cloudwatch/CloudWatchExportActivityConstruct.java b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/cls/substrate/aws/activity/cloudwatch/CloudWatchExportActivityConstruct.java index 847f38cb..96d95b47 100644 --- a/clusterless-substrate-aws-construct-core/src/main/java/clusterless/cls/substrate/aws/activity/cloudwatch/CloudWatchExportActivityConstruct.java +++ b/clusterless-substrate-aws-construct-core/src/main/java/clusterless/cls/substrate/aws/activity/cloudwatch/CloudWatchExportActivityConstruct.java @@ -108,6 +108,7 @@ public CloudWatchExportActivityConstruct(@NotNull ManagedComponentContext contex .withLogGroupName(model.logGroupName()) .withLogStreamPrefix(model.logStreamPrefix()) .withInterval(model.interval()) + .withTimeoutMin(model().runtimeProps().timeoutMin()) .build(); Map environment = Env.toEnv(activityProps); diff --git a/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityProps.java b/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityProps.java index b98f0e53..ca017c84 100644 --- a/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityProps.java +++ b/clusterless-substrate-aws-lambda-transform-model/src/main/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityProps.java @@ -21,6 +21,7 @@ public class CloudWatchExportActivityProps implements Struct { String logStreamPrefix; @JsonRequiredProperty URI pathURI; + int timeoutMin; public static Builder builder() { return Builder.builder(); @@ -42,11 +43,16 @@ public URI pathURI() { return pathURI; } + public int timeoutMin() { + return timeoutMin; + } + public static final class Builder { String interval; String logGroupName; String logStreamPrefix; URI pathURI; + int timeoutMin; private Builder() { } @@ -75,11 +81,17 @@ public Builder withPathURI(URI pathURI) { return this; } + public Builder withTimeoutMin(int timeoutMin) { + this.timeoutMin = timeoutMin; + return this; + } + public CloudWatchExportActivityProps build() { CloudWatchExportActivityProps cloudWatchExportActivityProps = new CloudWatchExportActivityProps(); - cloudWatchExportActivityProps.pathURI = this.pathURI; - cloudWatchExportActivityProps.logGroupName = this.logGroupName; cloudWatchExportActivityProps.logStreamPrefix = this.logStreamPrefix; + cloudWatchExportActivityProps.logGroupName = this.logGroupName; + cloudWatchExportActivityProps.pathURI = this.pathURI; + cloudWatchExportActivityProps.timeoutMin = this.timeoutMin; cloudWatchExportActivityProps.interval = this.interval; return cloudWatchExportActivityProps; } diff --git a/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityHandlerTest.java b/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityHandlerTest.java index 78a3ca93..ac8585bb 100644 --- a/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityHandlerTest.java +++ b/clusterless-substrate-aws-lambda-transform/src/integrationTest/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityHandlerTest.java @@ -42,6 +42,7 @@ protected CloudWatchExportActivityProps getProps() { .withLogGroupName("test-log-group") .withPathURI(URIs.create("s3", bucketName(), "/test-prefix/")) .withInterval(IntervalUnit.TWELFTHS.name()) + .withTimeoutMin(15) .build(); } diff --git a/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityHandler.java b/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityHandler.java index d53632a0..b355b71f 100644 --- a/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityHandler.java +++ b/clusterless-substrate-aws-lambda-transform/src/main/java/clusterless/aws/lambda/activity/cloudwatch/CloudWatchExportActivityHandler.java @@ -36,7 +36,13 @@ public class CloudWatchExportActivityHandler extends EventHandler retryClient = new ClientRetry<>("cloudwatchlogs", 5, r -> r.exception() instanceof LimitExceededException); + protected final ClientRetry retryClient = new ClientRetry<>( + "cloudwatchlogs", + // attempt to match the lambda timeout period, minus a small fudge factor + Duration.ofMinutes(activityProps.timeoutMin()).minusSeconds(5), + ClientRetry.exponentialBackoff(Duration.ofSeconds(3), 2.0, Duration.ofMinutes(3)), + r -> r.exception() instanceof LimitExceededException + ); protected final IntervalBuilder intervalBuilder = new IntervalBuilder(activityProps.interval);