Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds max_request_length as a configuration for the http and OTel sources #3958

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ public static ByteCount parse(final String string) {
return new ByteCount(byteCount.longValue());
}

/**
* Returns a {@link ByteCount} with the total number of bytes provided.
*
* @param bytes The number of bytes
* @return A new {@link ByteCount}
* @since 2.7
*/
public static ByteCount ofBytes(final long bytes) {
if(bytes < 0)
throw new IllegalArgumentException("The argument provided for bytes is negative.");

return new ByteCount(bytes);
}

public static ByteCount zeroBytes() {
return ZERO_BYTES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,21 @@ void parse_returns_rounded_bytes_for_implicit_fractional_bytes(final String byte
assertThat(byteCount.getBytes(), equalTo(expectedBytes));
}

@ParameterizedTest
@ValueSource(longs = {0, 1, 2, 1024, Integer.MAX_VALUE, (long) Integer.MAX_VALUE + 100})
void ofBytes_returns_with_same_bytes(final long bytes) {
final ByteCount byteCount = ByteCount.ofBytes(bytes);

assertThat(byteCount, notNullValue());
assertThat(byteCount.getBytes(), equalTo(bytes));
}

@ParameterizedTest
@ValueSource(longs = {-1, -2, -1024, Integer.MIN_VALUE, (long) Integer.MIN_VALUE - 100})
void ofBytes_throws_with_invalid_bytes(final long bytes) {
assertThrows(IllegalArgumentException.class, () -> ByteCount.ofBytes(bytes));
}

@Test
void zeroBytes_returns_bytes_with_getBytes_equal_to_0() {
assertThat(ByteCount.zeroBytes(), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.parser.ByteCountDeserializer;
import org.opensearch.dataprepper.parser.DataPrepperDurationDeserializer;
import org.springframework.context.annotation.Bean;

Expand Down Expand Up @@ -33,6 +35,7 @@ ObjectMapper extensionPluginConfigObjectMapper() {
ObjectMapper pluginConfigObjectMapper(final VariableExpander variableExpander) {
final SimpleModule simpleModule = new SimpleModule();
simpleModule.addDeserializer(Duration.class, new DataPrepperDurationDeserializer());
simpleModule.addDeserializer(ByteCount.class, new ByteCountDeserializer());
TRANSLATE_VALUE_SUPPORTED_JAVA_TYPES.stream().forEach(clazz -> simpleModule.addDeserializer(
clazz, new DataPrepperScalarTypeDeserializer<>(variableExpander, clazz)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.server.HttpStatusException;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
Expand Down Expand Up @@ -56,6 +57,9 @@ public HttpResponse handleException(final ServiceRequestContext ctx, final HttpR
}

private HttpStatus handleException(final Throwable e) {
if(e instanceof HttpStatusException) {
return ((HttpStatusException) e).httpStatus();
}
if (e instanceof IOException) {
badRequestsCounter.increment();
return HttpStatus.BAD_REQUEST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,33 @@

package org.opensearch.dataprepper;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.HttpStatusException;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -163,6 +170,18 @@ public void testHandleSizeOverflowException() throws ExecutionException, Interru
verify(requestsTooLargeCounter, times(2)).increment();
}

@ParameterizedTest
@ValueSource(ints = {413, 429})
void handleException_with_HttpStatusException(final int statusCode) throws ExecutionException, InterruptedException {
final HttpStatus httpStatus = HttpStatus.valueOf(statusCode);
final HttpStatusException httpStatusException = mock(HttpStatusException.class);
when(httpStatusException.httpStatus()).thenReturn(httpStatus);
final HttpResponse httpResponse = httpRequestExceptionHandler.handleException(serviceRequestContext, httpRequest, httpStatusException);

assertThat(httpResponse, notNullValue());
assertThat(httpResponse.aggregate().get().status(), equalTo(httpStatus));
}

@Test
public void testHandleUnknownException() throws ExecutionException, InterruptedException {
// Prepare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public void start(final Buffer<Record<Log>> buffer) {

sb.maxNumConnections(sourceConfig.getMaxConnectionCount());
sb.requestTimeout(Duration.ofMillis(sourceConfig.getRequestTimeoutInMillis()));
if(sourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(sourceConfig.getMaxRequestLength().getBytes());
}
final int threads = sourceConfig.getThreadCount();
final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
sb.blockingTaskExecutor(blockingTaskExecutor, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.loghttp;

import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class HTTPSourceConfig {
@JsonProperty(COMPRESSION)
private CompressionOption compression = CompressionOption.NONE;

@JsonProperty("max_request_length")
private ByteCount maxRequestLength;

private PluginModel authentication;

public boolean isSslCertAndKeyFileInS3() {
Expand Down Expand Up @@ -217,4 +221,8 @@ public boolean isUnauthenticatedHealthCheck() {
public CompressionOption getCompression() {
return compression;
}

public ByteCount getMaxRequestLength() {
return maxRequestLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.log.Log;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.client.ResponseTimeoutException;
Expand Down Expand Up @@ -72,6 +73,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -742,4 +744,32 @@ public void testRunAnotherSourceWithSamePort() {
//Expect RuntimeException because when port is already in use, BindException is thrown which is not RuntimeException
Assertions.assertThrows(RuntimeException.class, () -> secondSource.start(testBuffer));
}

@Test
public void request_that_exceeds_maxRequestLength_returns_413() {
lenient().when(sourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4));
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);
// Prepare
final String testData = "[{\"log\": \"somelog\"}]";

assertThat((long) testData.getBytes().length, greaterThan(sourceConfig.getMaxRequestLength().getBytes()));
HTTPSourceUnderTest.start(testBuffer);
refreshMeasurements();

// When
WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:2021")
.method(HttpMethod.POST)
.path("/log/ingest")
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.ofUtf8(testData))
.aggregate()
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_ENTITY_TOO_LARGE)).join();

// Then
Assertions.assertTrue(testBuffer.isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ public void start(Buffer<Record<Object>> buffer) {
sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator());
}
sb.requestTimeoutMillis(oTelLogsSourceConfig.getRequestTimeoutInMillis());
if(oTelLogsSourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(oTelLogsSourceConfig.getMaxRequestLength().getBytes());
}

// ACM Cert for SSL takes preference
if (oTelLogsSourceConfig.isSsl() || oTelLogsSourceConfig.useAcmCertForSSL()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Size;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.model.configuration.PluginModel;

Expand Down Expand Up @@ -100,6 +101,9 @@ public class OTelLogsSourceConfig {
@JsonProperty(COMPRESSION)
private CompressionOption compression = CompressionOption.NONE;

@JsonProperty("max_request_length")
private ByteCount maxRequestLength;

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path == null || path.startsWith("/");
Expand Down Expand Up @@ -209,5 +213,9 @@ public int getMaxConnectionCount() {
public CompressionOption getCompression() {
return compression;
}

public ByteCount getMaxRequestLength() {
return maxRequestLength;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
Expand Down Expand Up @@ -761,6 +762,25 @@ void gRPC_request_returns_expected_status_for_exceptions_from_buffer(
assertThat(actualException.getStatus().getCode(), equalTo(expectedStatusCode));
}

@Test
void request_that_exceeds_maxRequestLength_returns_413() throws InvalidProtocolBufferException {
when(oTelLogsSourceConfig.enableUnframedRequests()).thenReturn(true);
when(oTelLogsSourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4));
SOURCE.start(buffer);

WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:21892")
.method(HttpMethod.POST)
.path("/opentelemetry.proto.collector.logs.v1.LogsService/Export")
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.copyOf(JsonFormat.printer().print(LOGS_REQUEST).getBytes()))
.aggregate()
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.REQUEST_ENTITY_TOO_LARGE, throwable))
.join();
}

static class BufferExceptionToStatusArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public void start(Buffer<Record<ExportMetricsServiceRequest>> buffer) {
}

sb.requestTimeoutMillis(oTelMetricsSourceConfig.getRequestTimeoutInMillis());
if(oTelMetricsSourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(oTelMetricsSourceConfig.getMaxRequestLength().getBytes());
}

// ACM Cert for SSL takes preference
if (oTelMetricsSourceConfig.isSsl() || oTelMetricsSourceConfig.useAcmCertForSSL()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Size;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.model.configuration.PluginModel;

Expand Down Expand Up @@ -103,6 +104,9 @@ public class OTelMetricsSourceConfig {
@JsonProperty(COMPRESSION)
private CompressionOption compression = CompressionOption.NONE;

@JsonProperty("max_request_length")
private ByteCount maxRequestLength;

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path == null || path.startsWith("/");
Expand Down Expand Up @@ -220,5 +224,9 @@ public boolean isUnauthenticatedHealthCheck() {
public CompressionOption getCompression() {
return compression;
}

public ByteCount getMaxRequestLength() {
return maxRequestLength;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
Expand Down Expand Up @@ -979,6 +980,25 @@ void gRPC_request_returns_expected_status_for_exceptions_from_buffer(
assertThat(actualException.getStatus().getCode(), equalTo(expectedStatusCode));
}

@Test
void request_that_exceeds_maxRequestLength_returns_413() throws InvalidProtocolBufferException {
when(oTelMetricsSourceConfig.enableUnframedRequests()).thenReturn(true);
when(oTelMetricsSourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4));
SOURCE.start(buffer);

WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:21891")
.method(HttpMethod.POST)
.path("/opentelemetry.proto.collector.metrics.v1.MetricsService/Export")
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.copyOf(JsonFormat.printer().print(METRICS_REQUEST).getBytes()))
.aggregate()
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.REQUEST_ENTITY_TOO_LARGE, throwable))
.join();
}

static class BufferExceptionToStatusArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public void start(Buffer<Record<Object>> buffer) {
}

sb.requestTimeoutMillis(oTelTraceSourceConfig.getRequestTimeoutInMillis());
if(oTelTraceSourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(oTelTraceSourceConfig.getMaxRequestLength().getBytes());
}

// ACM Cert for SSL takes preference
if (oTelTraceSourceConfig.isSsl() || oTelTraceSourceConfig.useAcmCertForSSL()) {
Expand Down
Loading
Loading