Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send RetryInfo on OTel Timeouts #4294

Merged
merged 20 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package org.opensearch.dataprepper;

import com.google.protobuf.Any;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.common.grpc.GoogleGrpcExceptionHandlerFunction;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -22,9 +23,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

public class GrpcRequestExceptionHandler implements GrpcStatusFunction {
public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFunction {
private static final Logger LOG = LoggerFactory.getLogger(GrpcRequestExceptionHandler.class);
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";

Expand All @@ -37,53 +39,57 @@ public class GrpcRequestExceptionHandler implements GrpcStatusFunction {
private final Counter badRequestsCounter;
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;
private final GrpcRetryInfoCalculator retryInfoCalculator;

public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) {
public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics, Duration retryInfoMinDelay, Duration retryInfoMaxDelay) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
retryInfoCalculator = new GrpcRetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay);
}

@Override
public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) {
final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception;

public com.google.rpc.@Nullable Status applyStatusProto(RequestContext ctx, Throwable throwable,
Metadata metadata) {
final Throwable exceptionCause = throwable instanceof BufferWriteException ? throwable.getCause() : throwable;
return handleExceptions(exceptionCause);
}

private Status handleExceptions(final Throwable e) {
private com.google.rpc.Status handleExceptions(final Throwable e) {
String message = e.getMessage();
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
return createStatus(e, Status.Code.RESOURCE_EXHAUSTED);
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
return createStatus(e, Status.Code.RESOURCE_EXHAUSTED);
} else if (e instanceof BadRequestException) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
return createStatus(e, Status.Code.INVALID_ARGUMENT);
} else if ((e instanceof StatusRuntimeException) && (message.contains("Invalid protobuf byte sequence") || message.contains("Can't decode compressed frame"))) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
return createStatus(e, Status.Code.INVALID_ARGUMENT);
} else if (e instanceof RequestCancelledException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.CANCELLED);
return createStatus(e, Status.Code.CANCELLED);
}

internalServerErrorCounter.increment();
LOG.error("Unexpected exception handling gRPC request", e);
return createStatus(e, Status.INTERNAL);
return createStatus(e, Status.Code.INTERNAL);
}

private Status createStatus(final Throwable e, final Status status) {
final String message;
private com.google.rpc.Status createStatus(final Throwable e, final Status.Code code) {
com.google.rpc.Status.Builder builder = com.google.rpc.Status.newBuilder().setCode(code.value());
if (e instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
builder.setMessage(ARMERIA_REQUEST_TIMEOUT_MESSAGE);
} else {
message = e.getMessage() == null ? status.getCode().name() : e.getMessage();
builder.setMessage(e.getMessage() == null ? code.name() :e.getMessage());
}

return status.withDescription(message);
if (code == Status.Code.RESOURCE_EXHAUSTED) {
builder.addDetails(Any.pack(retryInfoCalculator.createRetryInfo()));
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.opensearch.dataprepper;

import com.google.rpc.RetryInfo;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

class GrpcRetryInfoCalculator {

private final Duration minimumDelay;
private final Duration maximumDelay;

private final AtomicReference<Instant> lastTimeCalled;
private final AtomicReference<Duration> nextDelay;

GrpcRetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) {
this.minimumDelay = minimumDelay;
this.maximumDelay = maximumDelay;
// Create a cushion so that the calculator treats a first quick exception (after prepper startup) as normal request (e.g. does not calculate a backoff)
this.lastTimeCalled = new AtomicReference<>(Instant.now().minus(maximumDelay));
this.nextDelay = new AtomicReference<>(minimumDelay);
}

private static RetryInfo createProtoResult(Duration delay) {
return RetryInfo.newBuilder().setRetryDelay(mapDuration(delay)).build();
}

private static Duration minDuration(Duration left, Duration right) {
return left.compareTo(right) <= 0 ? left : right;
}

private static com.google.protobuf.Duration.Builder mapDuration(Duration duration) {
return com.google.protobuf.Duration.newBuilder().setSeconds(duration.getSeconds()).setNanos(duration.getNano());
}

RetryInfo createRetryInfo() {
Instant now = Instant.now();
// Is the last time we got called longer ago than the next delay?
if (lastTimeCalled.getAndSet(now).isBefore(now.minus(nextDelay.get()))) {
// Use minimum delay and reset the saved delay
nextDelay.set(minimumDelay);
return createProtoResult(minimumDelay);
}
Duration delay = nextDelay.getAndUpdate(d -> minDuration(maximumDelay, d.multipliedBy(2)));
return createProtoResult(delay);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper;

import com.google.protobuf.Any;
import com.google.rpc.RetryInfo;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
Expand All @@ -13,6 +15,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.exceptions.BadRequestException;
Expand All @@ -22,11 +27,15 @@
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static com.linecorp.armeria.internal.common.grpc.MetadataUtil.GRPC_STATUS_DETAILS_BIN_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -55,6 +64,9 @@ public class GrpcRequestExceptionHandlerTest {
@Mock
private Metadata metadata;

@Captor
private ArgumentCaptor<com.google.rpc.Status> status;

private GrpcRequestExceptionHandler grpcRequestExceptionHandler;

@BeforeEach
Expand All @@ -64,7 +76,7 @@ public void setUp() {
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter);

grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2));
}

@Test
Expand Down Expand Up @@ -99,6 +111,12 @@ public void testHandleTimeoutException() {
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestTimeoutsCounter, times(2)).increment();

verify(metadata, times(2)).put(ArgumentMatchers.eq(GRPC_STATUS_DETAILS_BIN_KEY), status.capture());
for (com.google.rpc.Status currentStatus: status.getAllValues()) {
Optional<Any> retryInfo = currentStatus.getDetailsList().stream().filter(d -> d.is(RetryInfo.class)).findFirst();
assertTrue(retryInfo.isPresent(), "No RetryInfo at status:\n" + currentStatus.toString());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.opensearch.dataprepper;

import com.google.rpc.RetryInfo;
import org.junit.jupiter.api.Test;

import java.time.Duration;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

public class GrpcRetryInfoCalculatorTest {

@Test
public void testMinimumDelayOnFirstCall() {
RetryInfo retryInfo = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(1)).createRetryInfo();

assertThat(retryInfo.getRetryDelay().getNanos(), equalTo(100_000_000));
assertThat(retryInfo.getRetryDelay().getSeconds(), equalTo(0L));
}

@Test
public void testExponentialBackoff() {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(10));
RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();
RetryInfo retryInfo4 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(2L));
assertThat(retryInfo4.getRetryDelay().getSeconds(), equalTo(4L));
}

@Test
public void testUsesMaximumAsLongestDelay() {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(2));
RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(2L));
}

@Test
public void testResetAfterDelayWearsOff() throws InterruptedException {
int minDelayNanos = 1_000_000;
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofNanos(minDelayNanos), Duration.ofSeconds(1));

RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();
sleep(retryInfo3);
RetryInfo retryInfo4 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getNanos(), equalTo(minDelayNanos));
assertThat(retryInfo2.getRetryDelay().getNanos(), equalTo(minDelayNanos));
assertThat(retryInfo3.getRetryDelay().getNanos(), equalTo(minDelayNanos * 2));
assertThat(retryInfo4.getRetryDelay().getNanos(), equalTo(minDelayNanos));
}

@Test
public void testQuickFirstExceptionDoesNotTriggerBackoffCalculationEvenWithLongMinDelay() throws InterruptedException {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(10), Duration.ofSeconds(20));

RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(10L));
assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(10L));
}

private void sleep(RetryInfo retryInfo) throws InterruptedException {
// make sure we let enough time pass by adding a few milliseconds on top
Thread.sleep((retryInfo.getRetryDelay().getNanos() / 1_000_000) + 200 );
}
}
12 changes: 12 additions & 0 deletions data-prepper-plugins/otel-logs-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ source:
* `none`: no compression
* `gzip`: apply GZip de-compression on the incoming request.

### Retry Information

Data Prepper replies with a `RetryInfo` specifying how long to wait for the next request in case backpressure builds up. The retry information is implemented as exponential backoff, with a max delay of `retry_info.max_delay`.

```yaml
source:
otel_trace_source:
retry_info:
min_delay: 1000ms # defaults to 100ms
max_delay: 5s # defaults to 2s
```

### SSL

* ssl(Optional) => A boolean enables TLS/SSL. Default is ```true```.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.otellogs;

import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
import com.linecorp.armeria.server.encoding.DecodingService;
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
Expand Down Expand Up @@ -43,6 +44,7 @@

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -54,14 +56,16 @@ public class OTelLogsSource implements Source<Record<Object>> {
private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}";
static final String SERVER_CONNECTIONS = "serverConnections";

// Default RetryInfo with minimum 100ms and maximum 2s
private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000));

private final OTelLogsSourceConfig oTelLogsSourceConfig;
private final String pipelineName;
private final PluginMetrics pluginMetrics;
private final GrpcAuthenticationProvider authenticationProvider;
private final CertificateProviderFactory certificateProviderFactory;
private final GrpcRequestExceptionHandler requestExceptionHandler;
private final ByteDecoder byteDecoder;
private Server server;
private ByteDecoder byteDecoder;

@DataPrepperPluginConstructor
public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig,
Expand All @@ -80,7 +84,6 @@ public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig,
this.certificateProviderFactory = certificateProviderFactory;
this.pipelineName = pipelineDescription.getPipelineName();
this.authenticationProvider = createAuthenticationProvider(pluginFactory);
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
this.byteDecoder = new OTelLogsDecoder();
}

Expand Down Expand Up @@ -110,7 +113,7 @@ public void start(Buffer<Record<Object>> buffer) {
.builder()
.useClientTimeoutHeader(false)
.useBlockingTaskExecutor(true)
.exceptionMapping(requestExceptionHandler);
.exceptionHandler(createGrpExceptionHandler());

final MethodDescriptor<ExportLogsServiceRequest, ExportLogsServiceResponse> methodDescriptor = LogsServiceGrpc.getExportMethod();
final String oTelLogsSourcePath = oTelLogsSourceConfig.getPath();
Expand Down Expand Up @@ -205,6 +208,14 @@ public void stop() {
LOG.info("Stopped otel_logs_source.");
}

private GrpcExceptionHandlerFunction createGrpExceptionHandler() {
RetryInfoConfig retryInfo = oTelLogsSourceConfig.getRetryInfo() != null
? oTelLogsSourceConfig.getRetryInfo()
: DEFAULT_RETRY_INFO;

return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay());
}

private List<ServerInterceptor> getAuthenticationInterceptor() {
final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor();
if (authenticationInterceptor == null) {
Expand Down
Loading
Loading