diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java index 426fbddaf5..1c4d3609c4 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/GrpcRequestExceptionHandler.java @@ -39,12 +39,12 @@ public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFu private final Counter internalServerErrorCounter; private final GrpcRetryInfoCalculator retryInfoCalculator; - public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) { + public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics, Duration retryInfoMinDelay, Duration retryInfoMaxDelay) { requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS); badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS); requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE); internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR); - retryInfoCalculator = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(2)); + retryInfoCalculator = new GrpcRetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay); } @Override diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java index 2dc3310196..031861c50e 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/GrpcRequestExceptionHandlerTest.java @@ -27,6 +27,7 @@ import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import java.io.IOException; +import java.time.Duration; import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -75,7 +76,7 @@ public void setUp() { when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter); when(pluginMetrics.counter(HttpRequestExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter); - grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); + grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2)); } @Test diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java index 96e0525f49..4a1e622b99 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java @@ -43,6 +43,7 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; @@ -79,7 +80,8 @@ public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig, this.certificateProviderFactory = certificateProviderFactory; this.pipelineName = pipelineDescription.getPipelineName(); this.authenticationProvider = createAuthenticationProvider(pluginFactory); - this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); + // TODO tlongo read from config + this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2)); this.byteDecoder = new OTelLogsDecoder(); } diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 4417740625..f85a55d2c0 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -45,6 +45,7 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -83,7 +84,8 @@ public OTelMetricsSource(final OTelMetricsSourceConfig oTelMetricsSourceConfig, this.certificateProviderFactory = certificateProviderFactory; this.pipelineName = pipelineDescription.getPipelineName(); this.authenticationProvider = createAuthenticationProvider(pluginFactory); - this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); + // TODO tlongo read from config + this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2)); this.byteDecoder = new OTelMetricDecoder(); } diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java index fd514686dd..5277f5dbb3 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.oteltrace; +import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction; import com.linecorp.armeria.common.util.BlockingTaskExecutor; import com.linecorp.armeria.server.HttpService; import com.linecorp.armeria.server.Server; @@ -45,6 +46,7 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -62,7 +64,6 @@ public class OTelTraceSource implements Source> { private final PluginMetrics pluginMetrics; private final GrpcAuthenticationProvider authenticationProvider; private final CertificateProviderFactory certificateProviderFactory; - private final GrpcRequestExceptionHandler requestExceptionHandler; private final String pipelineName; private Server server; private final ByteDecoder byteDecoder; @@ -82,7 +83,6 @@ public OTelTraceSource(final OTelTraceSourceConfig oTelTraceSourceConfig, final this.certificateProviderFactory = certificateProviderFactory; this.pipelineName = pipelineDescription.getPipelineName(); this.authenticationProvider = createAuthenticationProvider(pluginFactory); - this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics); this.byteDecoder = new OTelTraceDecoder(); } @@ -112,7 +112,7 @@ public void start(Buffer> buffer) { .builder() .useClientTimeoutHeader(false) .useBlockingTaskExecutor(true) - .exceptionHandler(requestExceptionHandler); + .exceptionHandler(createGrpExceptionHandler()); final MethodDescriptor methodDescriptor = TraceServiceGrpc.getExportMethod(); final String oTelTraceSourcePath = oTelTraceSourceConfig.getPath(); @@ -205,6 +205,12 @@ public void start(Buffer> buffer) { LOG.info("Started otel_trace_source on port " + oTelTraceSourceConfig.getPort() + "..."); } + private GrpcExceptionHandlerFunction createGrpExceptionHandler() { + RetryInfo retryInfo = oTelTraceSourceConfig.getRetryInfo() != null ? oTelTraceSourceConfig.getRetryInfo() : new RetryInfo(100, 2000); + + return new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(retryInfo.getMinDelay()), Duration.ofMillis(retryInfo.getMaxDelay())); + } + @Override public void stop() { if (server != null) { diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java index 4760da34a4..61a9c3f205 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java @@ -32,6 +32,7 @@ public class OTelTraceSourceConfig { static final String ENABLE_UNFRAMED_REQUESTS = "unframed_requests"; static final String UNAUTHENTICATED_HEALTH_CHECK = "unauthenticated_health_check"; static final String COMPRESSION = "compression"; + static final String RETRY_INFO = "retry_info"; static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000; static final int DEFAULT_PORT = 21890; static final int DEFAULT_THREAD_COUNT = 200; @@ -107,6 +108,9 @@ public class OTelTraceSourceConfig { @JsonProperty("max_request_length") private ByteCount maxRequestLength; + @JsonProperty(RETRY_INFO) + private RetryInfo retryInfo; + @AssertTrue(message = "path should start with /") boolean isPathValid() { return path == null || path.startsWith("/"); @@ -228,4 +232,8 @@ public CompressionOption getCompression() { public ByteCount getMaxRequestLength() { return maxRequestLength; } + + public RetryInfo getRetryInfo() { + return retryInfo; + } } diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/RetryInfo.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/RetryInfo.java new file mode 100644 index 0000000000..be2092f4aa --- /dev/null +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/RetryInfo.java @@ -0,0 +1,36 @@ +package org.opensearch.dataprepper.plugins.source.oteltrace; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RetryInfo { + + @JsonProperty("min_delay") + private Integer minDelay; + + @JsonProperty("max_delay") + private Integer maxDelay; + + // Jackson needs this constructor + public RetryInfo() {} + + public RetryInfo(int minDelay, int maxDelay) { + this.minDelay = minDelay; + this.maxDelay = maxDelay; + } + + public int getMinDelay() { + return minDelay; + } + + public void setMinDelay(Integer minDelay) { + this.minDelay = minDelay; + } + + public int getMaxDelay() { + return maxDelay; + } + + public void setMaxDelay(Integer maxDelay) { + this.maxDelay = maxDelay; + } +} diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java index bace874047..86ace9dac3 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java @@ -131,6 +131,7 @@ class OTelTraceSourceTest { private static final String TEST_PATH = "${pipelineName}/v1/traces"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String TEST_PIPELINE_NAME = "test_pipeline"; + private static final RetryInfo TEST_RETRY_INFO = new RetryInfo(100, 2000); private static final ExportTraceServiceRequest SUCCESS_REQUEST = ExportTraceServiceRequest.newBuilder() .addResourceSpans(ResourceSpans.newBuilder() .addInstrumentationLibrarySpans(InstrumentationLibrarySpans.newBuilder() @@ -209,6 +210,7 @@ void beforeEach() { when(oTelTraceSourceConfig.getMaxConnectionCount()).thenReturn(10); when(oTelTraceSourceConfig.getThreadCount()).thenReturn(5); when(oTelTraceSourceConfig.getCompression()).thenReturn(CompressionOption.NONE); + when(oTelTraceSourceConfig.getRetryInfo()).thenReturn(TEST_RETRY_INFO); when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))) .thenReturn(authenticationProvider); @@ -845,7 +847,9 @@ void testRunAnotherSourceWithSamePort() { // starting server SOURCE.start(buffer); - testPluginSetting = new PluginSetting(null, Collections.singletonMap(SSL, false)); + + Map settingsMap = Map.of("retry_info", TEST_RETRY_INFO, SSL, false); + testPluginSetting = new PluginSetting(null, settingsMap); testPluginSetting.setPipelineName("pipeline"); oTelTraceSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelTraceSourceConfig.class); final OTelTraceSource source = new OTelTraceSource(oTelTraceSourceConfig, pluginMetrics, pluginFactory, pipelineDescription); diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java index ea5ce66b91..8b2595a605 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java @@ -302,6 +302,28 @@ void testInValidConfigWithCustomPath() { assertThat(otelTraceSourceConfig.isPathValid(), equalTo(false)); } + @Test + void testRetryInfoConfig() { + final PluginSetting customPathPluginSetting = completePluginSettingForOtelTraceSource( + DEFAULT_REQUEST_TIMEOUT_MS, + DEFAULT_PORT, + null, + false, + false, + false, + true, + TEST_KEY_CERT, + "", + DEFAULT_THREAD_COUNT, + DEFAULT_MAX_CONNECTION_COUNT); + + final OTelTraceSourceConfig otelTraceSourceConfig = OBJECT_MAPPER.convertValue(customPathPluginSetting.getSettings(), OTelTraceSourceConfig.class); + + + assertThat(otelTraceSourceConfig.getRetryInfo().getMaxDelay(), equalTo(100)); + assertThat(otelTraceSourceConfig.getRetryInfo().getMinDelay(), equalTo(50)); + } + private PluginSetting completePluginSettingForOtelTraceSource(final int requestTimeoutInMillis, final int port, final String path, @@ -325,6 +347,7 @@ private PluginSetting completePluginSettingForOtelTraceSource(final int requestT settings.put(OTelTraceSourceConfig.SSL_KEY_FILE, sslKeyFile); settings.put(OTelTraceSourceConfig.THREAD_COUNT, threadCount); settings.put(OTelTraceSourceConfig.MAX_CONNECTION_COUNT, maxConnectionCount); + settings.put(OTelTraceSourceConfig.RETRY_INFO, new RetryInfo(50, 100)); return new PluginSetting(PLUGIN_NAME, settings); } }