From b5f24c3323d1a6b849eb0b1860bfc899b39f5eac Mon Sep 17 00:00:00 2001 From: David Venable Date: Thu, 11 Jan 2024 20:16:03 -0600 Subject: [PATCH 1/2] Adds max_request_length as a configuration for the http, otel_trace_source, otel_metrics_source, and otel_logs_source sources. Resolves #3931 Signed-off-by: David Venable --- .../dataprepper/model/types/ByteCount.java | 14 +++++++++ .../model/types/ByteCountTest.java | 15 ++++++++++ .../plugin/ObjectMapperConfiguration.java | 3 ++ .../HttpRequestExceptionHandler.java | 4 +++ .../HttpRequestExceptionHandlerTest.java | 20 +++++++++++++ .../plugins/source/loghttp/HTTPSource.java | 3 ++ .../source/loghttp/HTTPSourceConfig.java | 8 +++++ .../source/loghttp/HTTPSourceTest.java | 30 +++++++++++++++++++ .../source/otellogs/OTelLogsSource.java | 3 ++ .../source/otellogs/OTelLogsSourceConfig.java | 8 +++++ .../source/otellogs/OTelLogsSourceTest.java | 20 +++++++++++++ .../source/otelmetrics/OTelMetricsSource.java | 3 ++ .../otelmetrics/OTelMetricsSourceConfig.java | 8 +++++ .../otelmetrics/OTelMetricsSourceTest.java | 20 +++++++++++++ .../source/oteltrace/OTelTraceSource.java | 3 ++ .../oteltrace/OTelTraceSourceConfig.java | 8 +++++ .../source/oteltrace/OTelTraceSourceTest.java | 22 ++++++++++++++ 17 files changed, 192 insertions(+) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java index 1681563675..034caa9f00 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java @@ -95,6 +95,20 @@ public static ByteCount parse(final String string) { return new ByteCount(byteCount.longValue()); } + /** + * Returns a {@link ByteCount} with the total number of bytes provided. + * + * @param bytes The number of bytes + * @return A new {@link ByteCount} + * @since 2.7 + */ + public static ByteCount ofBytes(final long bytes) { + if(bytes < 0) + throw new IllegalArgumentException("The argument provided for bytes is negative."); + + return new ByteCount(bytes); + } + public static ByteCount zeroBytes() { return ZERO_BYTES; } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java index b717289a7f..dee9dad549 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/types/ByteCountTest.java @@ -148,6 +148,21 @@ void parse_returns_rounded_bytes_for_implicit_fractional_bytes(final String byte assertThat(byteCount.getBytes(), equalTo(expectedBytes)); } + @ParameterizedTest + @ValueSource(longs = {0, 1, 2, 1024, Integer.MAX_VALUE, (long) Integer.MAX_VALUE + 100}) + void ofBytes_returns_with_same_bytes(final long bytes) { + final ByteCount byteCount = ByteCount.ofBytes(bytes); + + assertThat(byteCount, notNullValue()); + assertThat(byteCount.getBytes(), equalTo(bytes)); + } + + @ParameterizedTest + @ValueSource(longs = {-1, -2, -1024, Integer.MIN_VALUE, (long) Integer.MIN_VALUE - 100}) + void ofBytes_throws_with_invalid_bytes(final long bytes) { + assertThrows(IllegalArgumentException.class, () -> ByteCount.ofBytes(bytes)); + } + @Test void zeroBytes_returns_bytes_with_getBytes_equal_to_0() { assertThat(ByteCount.zeroBytes(), notNullValue()); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java index a593950fea..14aafb13c3 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugin/ObjectMapperConfiguration.java @@ -3,6 +3,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.fasterxml.jackson.databind.module.SimpleModule; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.parser.ByteCountDeserializer; import org.opensearch.dataprepper.parser.DataPrepperDurationDeserializer; import org.springframework.context.annotation.Bean; @@ -33,6 +35,7 @@ ObjectMapper extensionPluginConfigObjectMapper() { ObjectMapper pluginConfigObjectMapper(final VariableExpander variableExpander) { final SimpleModule simpleModule = new SimpleModule(); simpleModule.addDeserializer(Duration.class, new DataPrepperDurationDeserializer()); + simpleModule.addDeserializer(ByteCount.class, new ByteCountDeserializer()); TRANSLATE_VALUE_SUPPORTED_JAVA_TYPES.stream().forEach(clazz -> simpleModule.addDeserializer( clazz, new DataPrepperScalarTypeDeserializer<>(variableExpander, clazz))); diff --git a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/HttpRequestExceptionHandler.java b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/HttpRequestExceptionHandler.java index cfffcc2f2b..2d0fde0196 100644 --- a/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/HttpRequestExceptionHandler.java +++ b/data-prepper-plugins/armeria-common/src/main/java/org/opensearch/dataprepper/HttpRequestExceptionHandler.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper; import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.server.HttpStatusException; import com.linecorp.armeria.server.RequestTimeoutException; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction; @@ -56,6 +57,9 @@ public HttpResponse handleException(final ServiceRequestContext ctx, final HttpR } private HttpStatus handleException(final Throwable e) { + if(e instanceof HttpStatusException) { + return ((HttpStatusException) e).httpStatus(); + } if (e instanceof IOException) { badRequestsCounter.increment(); return HttpStatus.BAD_REQUEST; diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/HttpRequestExceptionHandlerTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/HttpRequestExceptionHandlerTest.java index 9e75aa7012..5bf71e4cbe 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/HttpRequestExceptionHandlerTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/HttpRequestExceptionHandlerTest.java @@ -6,8 +6,12 @@ package org.opensearch.dataprepper; import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.server.HttpStatusException; import com.linecorp.armeria.server.RequestTimeoutException; import com.linecorp.armeria.server.ServiceRequestContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import com.linecorp.armeria.common.AggregatedHttpResponse; @@ -24,7 +28,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -163,6 +171,18 @@ public void testHandleSizeOverflowException() throws ExecutionException, Interru verify(requestsTooLargeCounter, times(2)).increment(); } + @ParameterizedTest + @ValueSource(ints = {413, 429}) + void handleException_with_HttpStatusException(final int statusCode) throws ExecutionException, InterruptedException { + final HttpStatus httpStatus = HttpStatus.valueOf(statusCode); + final HttpStatusException httpStatusException = mock(HttpStatusException.class); + when(httpStatusException.httpStatus()).thenReturn(httpStatus); + final HttpResponse httpResponse = httpRequestExceptionHandler.handleException(serviceRequestContext, httpRequest, httpStatusException); + + assertThat(httpResponse, notNullValue()); + assertThat(httpResponse.aggregate().get().status(), equalTo(httpStatus)); + } + @Test public void testHandleUnknownException() throws ExecutionException, InterruptedException { // Prepare diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java index 1c80191c50..9022eaca60 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -124,6 +124,9 @@ public void start(final Buffer> buffer) { sb.maxNumConnections(sourceConfig.getMaxConnectionCount()); sb.requestTimeout(Duration.ofMillis(sourceConfig.getRequestTimeoutInMillis())); + if(sourceConfig.getMaxRequestLength() != null) { + sb.maxRequestLength(sourceConfig.getMaxRequestLength().getBytes()); + } final int threads = sourceConfig.getThreadCount(); final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads); sb.blockingTaskExecutor(blockingTaskExecutor, true); diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java index be54821c34..c8ad8397d0 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.loghttp; import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.model.configuration.PluginModel; import com.fasterxml.jackson.annotation.JsonProperty; @@ -92,6 +93,9 @@ public class HTTPSourceConfig { @JsonProperty(COMPRESSION) private CompressionOption compression = CompressionOption.NONE; + @JsonProperty("max_request_length") + private ByteCount maxRequestLength; + private PluginModel authentication; public boolean isSslCertAndKeyFileInS3() { @@ -217,4 +221,8 @@ public boolean isUnauthenticatedHealthCheck() { public CompressionOption getCompression() { return compression; } + + public ByteCount getMaxRequestLength() { + return maxRequestLength; + } } diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java index 06e8676e1c..142a0c0401 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.log.Log; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import com.linecorp.armeria.client.ClientFactory; import com.linecorp.armeria.client.ResponseTimeoutException; @@ -72,6 +73,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -742,4 +744,32 @@ public void testRunAnotherSourceWithSamePort() { //Expect RuntimeException because when port is already in use, BindException is thrown which is not RuntimeException Assertions.assertThrows(RuntimeException.class, () -> secondSource.start(testBuffer)); } + + @Test + public void request_that_exceeds_maxRequestLength_returns_413() { + lenient().when(sourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4)); + HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + // Prepare + final String testData = "[{\"log\": \"somelog\"}]"; + + assertThat((long) testData.getBytes().length, greaterThan(sourceConfig.getMaxRequestLength().getBytes())); + HTTPSourceUnderTest.start(testBuffer); + refreshMeasurements(); + + // When + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority("127.0.0.1:2021") + .method(HttpMethod.POST) + .path("/log/ingest") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_ENTITY_TOO_LARGE)).join(); + + // Then + Assertions.assertTrue(testBuffer.isEmpty()); + } + } diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java index 2d560c7e0d..2d1275f68a 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java @@ -141,6 +141,9 @@ public void start(Buffer> buffer) { sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator()); } sb.requestTimeoutMillis(oTelLogsSourceConfig.getRequestTimeoutInMillis()); + if(oTelLogsSourceConfig.getMaxRequestLength() != null) { + sb.maxRequestLength(oTelLogsSourceConfig.getMaxRequestLength().getBytes()); + } // ACM Cert for SSL takes preference if (oTelLogsSourceConfig.isSsl() || oTelLogsSourceConfig.useAcmCertForSSL()) { diff --git a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java index c150ce307e..bfca665d76 100644 --- a/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java +++ b/data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceConfig.java @@ -10,6 +10,7 @@ import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.Size; import org.apache.commons.lang3.StringUtils; +import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.model.configuration.PluginModel; @@ -100,6 +101,9 @@ public class OTelLogsSourceConfig { @JsonProperty(COMPRESSION) private CompressionOption compression = CompressionOption.NONE; + @JsonProperty("max_request_length") + private ByteCount maxRequestLength; + @AssertTrue(message = "path should start with /") boolean isPathValid() { return path == null || path.startsWith("/"); @@ -209,5 +213,9 @@ public int getMaxConnectionCount() { public CompressionOption getCompression() { return compression; } + + public ByteCount getMaxRequestLength() { + return maxRequestLength; + } } diff --git a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java index f31dfe03f7..c1107f974d 100644 --- a/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java +++ b/data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceTest.java @@ -62,6 +62,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; @@ -761,6 +762,25 @@ void gRPC_request_returns_expected_status_for_exceptions_from_buffer( assertThat(actualException.getStatus().getCode(), equalTo(expectedStatusCode)); } + @Test + void request_that_exceeds_maxRequestLength_returns_413() throws InvalidProtocolBufferException { + when(oTelLogsSourceConfig.enableUnframedRequests()).thenReturn(true); + when(oTelLogsSourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4)); + SOURCE.start(buffer); + + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority("127.0.0.1:21892") + .method(HttpMethod.POST) + .path("/opentelemetry.proto.collector.logs.v1.LogsService/Export") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.copyOf(JsonFormat.printer().print(LOGS_REQUEST).getBytes())) + .aggregate() + .whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.REQUEST_ENTITY_TOO_LARGE, throwable)) + .join(); + } + static class BufferExceptionToStatusArgumentsProvider implements ArgumentsProvider { @Override public Stream provideArguments(final ExtensionContext context) { diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 33c4023e67..85e6982e23 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -159,6 +159,9 @@ public void start(Buffer> buffer) { } sb.requestTimeoutMillis(oTelMetricsSourceConfig.getRequestTimeoutInMillis()); + if(oTelMetricsSourceConfig.getMaxRequestLength() != null) { + sb.maxRequestLength(oTelMetricsSourceConfig.getMaxRequestLength().getBytes()); + } // ACM Cert for SSL takes preference if (oTelMetricsSourceConfig.isSsl() || oTelMetricsSourceConfig.useAcmCertForSSL()) { diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java index f707eb0a5b..ea590fd80b 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java @@ -9,6 +9,7 @@ import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.Size; import org.apache.commons.lang3.StringUtils; +import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.model.configuration.PluginModel; @@ -103,6 +104,9 @@ public class OTelMetricsSourceConfig { @JsonProperty(COMPRESSION) private CompressionOption compression = CompressionOption.NONE; + @JsonProperty("max_request_length") + private ByteCount maxRequestLength; + @AssertTrue(message = "path should start with /") boolean isPathValid() { return path == null || path.startsWith("/"); @@ -220,5 +224,9 @@ public boolean isUnauthenticatedHealthCheck() { public CompressionOption getCompression() { return compression; } + + public ByteCount getMaxRequestLength() { + return maxRequestLength; + } } diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index 0a537f9c40..5db85ed19d 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -57,6 +57,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; +import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.configuration.PipelineDescription; @@ -979,6 +980,25 @@ void gRPC_request_returns_expected_status_for_exceptions_from_buffer( assertThat(actualException.getStatus().getCode(), equalTo(expectedStatusCode)); } + @Test + void request_that_exceeds_maxRequestLength_returns_413() throws InvalidProtocolBufferException { + when(oTelMetricsSourceConfig.enableUnframedRequests()).thenReturn(true); + when(oTelMetricsSourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4)); + SOURCE.start(buffer); + + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority("127.0.0.1:21891") + .method(HttpMethod.POST) + .path("/opentelemetry.proto.collector.metrics.v1.MetricsService/Export") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.copyOf(JsonFormat.printer().print(METRICS_REQUEST).getBytes())) + .aggregate() + .whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.REQUEST_ENTITY_TOO_LARGE, throwable)) + .join(); + } + static class BufferExceptionToStatusArgumentsProvider implements ArgumentsProvider { @Override public Stream provideArguments(final ExtensionContext context) { diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java index a38a3f9119..d92c5fc092 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSource.java @@ -161,6 +161,9 @@ public void start(Buffer> buffer) { } sb.requestTimeoutMillis(oTelTraceSourceConfig.getRequestTimeoutInMillis()); + if(oTelTraceSourceConfig.getMaxRequestLength() != null) { + sb.maxRequestLength(oTelTraceSourceConfig.getMaxRequestLength().getBytes()); + } // ACM Cert for SSL takes preference if (oTelTraceSourceConfig.isSsl() || oTelTraceSourceConfig.useAcmCertForSSL()) { diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java index 6e6c3c4de9..4760da34a4 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java @@ -7,6 +7,7 @@ import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.Size; +import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.model.configuration.PluginModel; import com.fasterxml.jackson.annotation.JsonProperty; @@ -103,6 +104,9 @@ public class OTelTraceSourceConfig { @JsonProperty(COMPRESSION) private CompressionOption compression = CompressionOption.NONE; + @JsonProperty("max_request_length") + private ByteCount maxRequestLength; + @AssertTrue(message = "path should start with /") boolean isPathValid() { return path == null || path.startsWith("/"); @@ -220,4 +224,8 @@ public boolean isUnauthenticatedHealthCheck() { public CompressionOption getCompression() { return compression; } + + public ByteCount getMaxRequestLength() { + return maxRequestLength; + } } diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java index 8832b356a6..43849bdfd1 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java @@ -56,6 +56,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider; import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; +import org.opensearch.dataprepper.model.types.ByteCount; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -1057,6 +1058,27 @@ void gRPC_request_throws_InvalidArgument_for_malformed_trace_data() { verifyNoInteractions(buffer); } + @Test + void request_that_exceeds_maxRequestLength_returns_413() throws InvalidProtocolBufferException { + when(oTelTraceSourceConfig.enableUnframedRequests()).thenReturn(true); + when(oTelTraceSourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4)); + configureObjectUnderTest(); + SOURCE.start(buffer); + + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority("127.0.0.1:21890") + .method(HttpMethod.POST) + .path("/opentelemetry.proto.collector.trace.v1.TraceService/Export") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.copyOf(JsonFormat.printer().print(createExportTraceRequest()).getBytes())) + .aggregate() + .whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.REQUEST_ENTITY_TOO_LARGE, throwable)) + .join(); + } + + static class BufferExceptionToStatusArgumentsProvider implements ArgumentsProvider { @Override public Stream provideArguments(final ExtensionContext context) { From 7c31ae343f8b7d16bde0aa6b6a83d755d9657ded Mon Sep 17 00:00:00 2001 From: David Venable Date: Fri, 12 Jan 2024 09:16:02 -0600 Subject: [PATCH 2/2] Corrects imports Signed-off-by: David Venable --- .../HttpRequestExceptionHandlerTest.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/HttpRequestExceptionHandlerTest.java b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/HttpRequestExceptionHandlerTest.java index 5bf71e4cbe..6babcc37bb 100644 --- a/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/HttpRequestExceptionHandlerTest.java +++ b/data-prepper-plugins/armeria-common/src/test/java/org/opensearch/dataprepper/HttpRequestExceptionHandlerTest.java @@ -5,24 +5,23 @@ package org.opensearch.dataprepper; +import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.server.HttpStatusException; import com.linecorp.armeria.server.RequestTimeoutException; import com.linecorp.armeria.server.ServiceRequestContext; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.ValueSource; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.SizeOverflowException; -import com.linecorp.armeria.common.AggregatedHttpResponse; -import com.linecorp.armeria.common.HttpResponse; -import com.linecorp.armeria.common.HttpStatus; import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; import java.io.IOException; import java.util.concurrent.ExecutionException;