Skip to content

Commit

Permalink
Make backoff calculation configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
TomasLongo committed Sep 9, 2024
1 parent 55b91da commit 0d45f77
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFu
private final Counter internalServerErrorCounter;
private final GrpcRetryInfoCalculator retryInfoCalculator;

public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) {
public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics, Duration retryInfoMinDelay, Duration retryInfoMaxDelay) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
retryInfoCalculator = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(2));
retryInfoCalculator = new GrpcRetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -75,7 +76,7 @@ public void setUp() {
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter);

grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -79,7 +80,8 @@ public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig,
this.certificateProviderFactory = certificateProviderFactory;
this.pipelineName = pipelineDescription.getPipelineName();
this.authenticationProvider = createAuthenticationProvider(pluginFactory);
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
// TODO tlongo read from config
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2));
this.byteDecoder = new OTelLogsDecoder();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -83,7 +84,8 @@ public OTelMetricsSource(final OTelMetricsSourceConfig oTelMetricsSourceConfig,
this.certificateProviderFactory = certificateProviderFactory;
this.pipelineName = pipelineDescription.getPipelineName();
this.authenticationProvider = createAuthenticationProvider(pluginFactory);
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
// TODO tlongo read from config
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2));
this.byteDecoder = new OTelMetricDecoder();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.oteltrace;

import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
import com.linecorp.armeria.common.util.BlockingTaskExecutor;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Server;
Expand Down Expand Up @@ -45,6 +46,7 @@

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand All @@ -62,7 +64,6 @@ public class OTelTraceSource implements Source<Record<Object>> {
private final PluginMetrics pluginMetrics;
private final GrpcAuthenticationProvider authenticationProvider;
private final CertificateProviderFactory certificateProviderFactory;
private final GrpcRequestExceptionHandler requestExceptionHandler;
private final String pipelineName;
private Server server;
private final ByteDecoder byteDecoder;
Expand All @@ -82,7 +83,6 @@ public OTelTraceSource(final OTelTraceSourceConfig oTelTraceSourceConfig, final
this.certificateProviderFactory = certificateProviderFactory;
this.pipelineName = pipelineDescription.getPipelineName();
this.authenticationProvider = createAuthenticationProvider(pluginFactory);
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
this.byteDecoder = new OTelTraceDecoder();
}

Expand Down Expand Up @@ -112,7 +112,7 @@ public void start(Buffer<Record<Object>> buffer) {
.builder()
.useClientTimeoutHeader(false)
.useBlockingTaskExecutor(true)
.exceptionHandler(requestExceptionHandler);
.exceptionHandler(createGrpExceptionHandler());

final MethodDescriptor<ExportTraceServiceRequest, ExportTraceServiceResponse> methodDescriptor = TraceServiceGrpc.getExportMethod();
final String oTelTraceSourcePath = oTelTraceSourceConfig.getPath();
Expand Down Expand Up @@ -205,6 +205,12 @@ public void start(Buffer<Record<Object>> buffer) {
LOG.info("Started otel_trace_source on port " + oTelTraceSourceConfig.getPort() + "...");
}

private GrpcExceptionHandlerFunction createGrpExceptionHandler() {
RetryInfo retryInfo = oTelTraceSourceConfig.getRetryInfo() != null ? oTelTraceSourceConfig.getRetryInfo() : new RetryInfo(100, 2000);

return new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(retryInfo.getMinDelay()), Duration.ofMillis(retryInfo.getMaxDelay()));
}

@Override
public void stop() {
if (server != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class OTelTraceSourceConfig {
static final String ENABLE_UNFRAMED_REQUESTS = "unframed_requests";
static final String UNAUTHENTICATED_HEALTH_CHECK = "unauthenticated_health_check";
static final String COMPRESSION = "compression";
static final String RETRY_INFO = "retry_info";
static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000;
static final int DEFAULT_PORT = 21890;
static final int DEFAULT_THREAD_COUNT = 200;
Expand Down Expand Up @@ -107,6 +108,9 @@ public class OTelTraceSourceConfig {
@JsonProperty("max_request_length")
private ByteCount maxRequestLength;

@JsonProperty(RETRY_INFO)
private RetryInfo retryInfo;

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path == null || path.startsWith("/");
Expand Down Expand Up @@ -228,4 +232,8 @@ public CompressionOption getCompression() {
public ByteCount getMaxRequestLength() {
return maxRequestLength;
}

public RetryInfo getRetryInfo() {
return retryInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.opensearch.dataprepper.plugins.source.oteltrace;

import com.fasterxml.jackson.annotation.JsonProperty;

public class RetryInfo {

@JsonProperty("min_delay")
private Integer minDelay;

@JsonProperty("max_delay")
private Integer maxDelay;

// Jackson needs this constructor
public RetryInfo() {}

public RetryInfo(int minDelay, int maxDelay) {
this.minDelay = minDelay;
this.maxDelay = maxDelay;
}

public int getMinDelay() {
return minDelay;
}

public void setMinDelay(Integer minDelay) {
this.minDelay = minDelay;
}

public int getMaxDelay() {
return maxDelay;
}

public void setMaxDelay(Integer maxDelay) {
this.maxDelay = maxDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class OTelTraceSourceTest {
private static final String TEST_PATH = "${pipelineName}/v1/traces";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String TEST_PIPELINE_NAME = "test_pipeline";
private static final RetryInfo TEST_RETRY_INFO = new RetryInfo(100, 2000);
private static final ExportTraceServiceRequest SUCCESS_REQUEST = ExportTraceServiceRequest.newBuilder()
.addResourceSpans(ResourceSpans.newBuilder()
.addInstrumentationLibrarySpans(InstrumentationLibrarySpans.newBuilder()
Expand Down Expand Up @@ -209,6 +210,7 @@ void beforeEach() {
when(oTelTraceSourceConfig.getMaxConnectionCount()).thenReturn(10);
when(oTelTraceSourceConfig.getThreadCount()).thenReturn(5);
when(oTelTraceSourceConfig.getCompression()).thenReturn(CompressionOption.NONE);
when(oTelTraceSourceConfig.getRetryInfo()).thenReturn(TEST_RETRY_INFO);

when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class)))
.thenReturn(authenticationProvider);
Expand Down Expand Up @@ -845,7 +847,9 @@ void testRunAnotherSourceWithSamePort() {
// starting server
SOURCE.start(buffer);

testPluginSetting = new PluginSetting(null, Collections.singletonMap(SSL, false));

Map<String, Object> settingsMap = Map.of("retry_info", TEST_RETRY_INFO, SSL, false);
testPluginSetting = new PluginSetting(null, settingsMap);
testPluginSetting.setPipelineName("pipeline");
oTelTraceSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelTraceSourceConfig.class);
final OTelTraceSource source = new OTelTraceSource(oTelTraceSourceConfig, pluginMetrics, pluginFactory, pipelineDescription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,28 @@ void testInValidConfigWithCustomPath() {
assertThat(otelTraceSourceConfig.isPathValid(), equalTo(false));
}

@Test
void testRetryInfoConfig() {
final PluginSetting customPathPluginSetting = completePluginSettingForOtelTraceSource(
DEFAULT_REQUEST_TIMEOUT_MS,
DEFAULT_PORT,
null,
false,
false,
false,
true,
TEST_KEY_CERT,
"",
DEFAULT_THREAD_COUNT,
DEFAULT_MAX_CONNECTION_COUNT);

final OTelTraceSourceConfig otelTraceSourceConfig = OBJECT_MAPPER.convertValue(customPathPluginSetting.getSettings(), OTelTraceSourceConfig.class);


assertThat(otelTraceSourceConfig.getRetryInfo().getMaxDelay(), equalTo(100));
assertThat(otelTraceSourceConfig.getRetryInfo().getMinDelay(), equalTo(50));
}

private PluginSetting completePluginSettingForOtelTraceSource(final int requestTimeoutInMillis,
final int port,
final String path,
Expand All @@ -325,6 +347,7 @@ private PluginSetting completePluginSettingForOtelTraceSource(final int requestT
settings.put(OTelTraceSourceConfig.SSL_KEY_FILE, sslKeyFile);
settings.put(OTelTraceSourceConfig.THREAD_COUNT, threadCount);
settings.put(OTelTraceSourceConfig.MAX_CONNECTION_COUNT, maxConnectionCount);
settings.put(OTelTraceSourceConfig.RETRY_INFO, new RetryInfo(50, 100));
return new PluginSetting(PLUGIN_NAME, settings);
}
}

0 comments on commit 0d45f77

Please sign in to comment.