diff --git a/data-prepper-plugins/http-source/README.md b/data-prepper-plugins/http-source/README.md index c86ed89382..1c562082b6 100644 --- a/data-prepper-plugins/http-source/README.md +++ b/data-prepper-plugins/http-source/README.md @@ -5,7 +5,7 @@ This is a source plugin that supports HTTP protocol. Currently ONLY support Json ## Usages -Example `.yaml` configuration: +Currently, we are exposing `/log/ingest` URI path for http log ingestion. Example `.yaml` configuration: ``` source: - http: @@ -27,7 +27,18 @@ source: ## Metrics -TBD +### Counter +- `requestsReceived`: measures total number of requests received by `/log/ingest` endpoint. +- `requestsRejected`: measures total number of requests rejected by HTTP source plugin. +- `successRequests`: measures total number of requests successfully processed by HTTP source plugin. +- `badRequests`: measures total number of requests with invalid content type or format processed by HTTP source plugin. +- `requestTimeouts`: measures total number of requests that time out in the HTTP source server. + +### Timer +- `requestProcessDuration`: measures latency of requests processed by the HTTP source plugin in seconds. + +### Distribution Summary +- `payloadSize`: measures the distribution of incoming requests payload sizes in bytes. ## Developer Guide This plugin is compatible with Java 14. See diff --git a/data-prepper-plugins/http-source/build.gradle b/data-prepper-plugins/http-source/build.gradle index 64221fbbe0..87e865cb79 100644 --- a/data-prepper-plugins/http-source/build.gradle +++ b/data-prepper-plugins/http-source/build.gradle @@ -8,6 +8,7 @@ dependencies { implementation project(':data-prepper-plugins:common') implementation "com.linecorp.armeria:armeria:1.9.2" implementation "commons-io:commons-io:2.11.0" + testImplementation project(':data-prepper-api').sourceSets.test.output testImplementation 'org.assertj:assertj-core:3.20.2' testImplementation "org.hamcrest:hamcrest:2.2" } diff --git a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java index 2e1d3b119b..fd68a5640e 100644 --- a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -11,6 +11,7 @@ package com.amazon.dataprepper.plugins.source.loghttp; +import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.PluginType; import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; import com.amazon.dataprepper.model.buffer.Buffer; @@ -39,10 +40,12 @@ public class HTTPSource implements Source> { private final HTTPSourceConfig sourceConfig; private final CertificateProviderFactory certificateProviderFactory; private Server server; + private final PluginMetrics pluginMetrics; public HTTPSource(final PluginSetting pluginSetting) { sourceConfig = HTTPSourceConfig.buildConfig(pluginSetting); certificateProviderFactory = new CertificateProviderFactory(sourceConfig); + pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting); } @Override @@ -75,10 +78,10 @@ public void start(Buffer> buffer) { final int maxPendingRequests = sourceConfig.getMaxPendingRequests(); final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy( maxPendingRequests, blockingTaskExecutor.getQueue()); - final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests); + final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics); // TODO: allow customization on URI path for log ingestion sb.decorator(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); - final LogHTTPService logHTTPService = new LogHTTPService(requestTimeoutInMillis, buffer); + final LogHTTPService logHTTPService = new LogHTTPService(requestTimeoutInMillis, buffer, pluginMetrics); sb.annotatedService(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, logHTTPService); // TODO: attach HealthCheckService diff --git a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/LogHTTPService.java b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/LogHTTPService.java index a90b858806..7686a0f740 100644 --- a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/LogHTTPService.java +++ b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/LogHTTPService.java @@ -11,15 +11,20 @@ package com.amazon.dataprepper.plugins.source.loghttp; +import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.buffer.Buffer; import com.amazon.dataprepper.model.record.Record; import com.amazon.dataprepper.plugins.source.loghttp.codec.JsonCodec; import com.linecorp.armeria.common.AggregatedHttpRequest; +import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.server.annotation.Blocking; import com.linecorp.armeria.server.annotation.Post; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; import java.io.IOException; import java.util.List; @@ -30,27 +35,53 @@ */ @Blocking public class LogHTTPService { + public static final String REQUESTS_RECEIVED = "requestsReceived"; + public static final String REQUEST_TIMEOUTS = "requestTimeouts"; + public static final String SUCCESS_REQUESTS = "successRequests"; + public static final String BAD_REQUESTS = "badRequests"; + public static final String PAYLOAD_SIZE = "payloadSize"; + public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration"; // TODO: support other data-types as request body, e.g. json_lines, msgpack private final JsonCodec jsonCodec = new JsonCodec(); private final Buffer> buffer; private final int bufferWriteTimeoutInMillis; + private final Counter requestsReceivedCounter; + private final Counter requestTimeoutsCounter; + private final Counter successRequestsCounter; + private final Counter badRequestsCounter; + private final DistributionSummary payloadSizeSummary; + private final Timer requestProcessDuration; - public LogHTTPService(final int bufferWriteTimeoutInMillis, final Buffer> buffer) { + public LogHTTPService(final int bufferWriteTimeoutInMillis, + final Buffer> buffer, + final PluginMetrics pluginMetrics) { this.buffer = buffer; this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; + + requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); + requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS); + successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); + badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS); + payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE); + requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION); } @Post public HttpResponse doPost(final AggregatedHttpRequest aggregatedHttpRequest) { - return processRequest(aggregatedHttpRequest); + return requestProcessDuration.record(() -> processRequest(aggregatedHttpRequest)); } private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) { + requestsReceivedCounter.increment(); + List jsonList; + final HttpData content = aggregatedHttpRequest.content(); + payloadSizeSummary.record(content.length()); try { - jsonList = jsonCodec.parse(aggregatedHttpRequest.content()); + jsonList = jsonCodec.parse(content); } catch (IOException e) { + badRequestsCounter.increment(); return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.ANY_TYPE, "Bad request data format. Needs to be json array."); } for (String json: jsonList) { @@ -58,9 +89,11 @@ private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRe // TODO: switch to new API writeAll once ready buffer.write(new Record<>(json), bufferWriteTimeoutInMillis); } catch (TimeoutException e) { + requestTimeoutsCounter.increment(); return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT, MediaType.ANY_TYPE, e.getMessage()); } } + successRequestsCounter.increment(); return HttpResponse.of(HttpStatus.OK); } } diff --git a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandler.java b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandler.java index ad5c7dcae0..9413ec89ef 100644 --- a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandler.java +++ b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandler.java @@ -11,6 +11,7 @@ package com.amazon.dataprepper.plugins.source.loghttp; +import com.amazon.dataprepper.metrics.PluginMetrics; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; @@ -18,19 +19,26 @@ import com.linecorp.armeria.server.Service; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.throttling.ThrottlingRejectHandler; +import io.micrometer.core.instrument.Counter; import javax.annotation.Nullable; public class LogThrottlingRejectHandler implements ThrottlingRejectHandler { + public static final String REQUESTS_REJECTED = "requestsRejected"; + private final int maxPendingRequests; + private final Counter rejectedRequestsCounter; - public LogThrottlingRejectHandler(final int maxPendingRequests) { + public LogThrottlingRejectHandler(final int maxPendingRequests, final PluginMetrics pluginMetrics) { this.maxPendingRequests = maxPendingRequests; + + rejectedRequestsCounter = pluginMetrics.counter(REQUESTS_REJECTED); } @Override public HttpResponse handleRejected(final Service delegate, final ServiceRequestContext ctx, final HttpRequest req, final @Nullable Throwable cause) throws Exception { + rejectedRequestsCounter.increment(); return HttpResponse.of(HttpStatus.TOO_MANY_REQUESTS, MediaType.ANY_TYPE, "The number of pending requests in the work queue reaches max_pending_requests:%d. Please retry later", maxPendingRequests diff --git a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java index 292c645397..d8541a961f 100644 --- a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java @@ -11,6 +11,8 @@ package com.amazon.dataprepper.plugins.source.loghttp; +import com.amazon.dataprepper.metrics.MetricNames; +import com.amazon.dataprepper.metrics.MetricsTestUtil; import com.amazon.dataprepper.model.configuration.PluginSetting; import com.amazon.dataprepper.model.record.Record; import com.amazon.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; @@ -25,6 +27,8 @@ import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.ServerBuilder; +import io.micrometer.core.instrument.Measurement; +import io.micrometer.core.instrument.Statistic; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -44,7 +48,9 @@ import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -58,6 +64,16 @@ @ExtendWith(MockitoExtension.class) class HTTPSourceTest { + /** + * TODO: according to the new coding guideline, consider refactoring the following test cases into HTTPSourceIT. + * - testHTTPJsonResponse200() + * - testHTTPJsonResponse400() + * - testHTTPJsonResponse415() + * - testHTTPJsonResponse429() + * - testHTTPSJsonResponse() + */ + private final String PLUGIN_NAME = "http"; + private final String TEST_PIPELINE_NAME = "test_pipeline"; private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile(); private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile(); @@ -73,12 +89,51 @@ class HTTPSourceTest { private PluginSetting testPluginSetting; private BlockingBuffer> testBuffer; private HTTPSource HTTPSourceUnderTest; + private List requestsReceivedMeasurements; + private List successRequestsMeasurements; + private List requestTimeoutsMeasurements; + private List badRequestsMeasurements; + private List rejectedRequestsMeasurements; + private List requestProcessDurationMeasurements; + private List payloadSizeSummaryMeasurements; private BlockingBuffer> getBuffer() { final HashMap integerHashMap = new HashMap<>(); integerHashMap.put("buffer_size", 1); integerHashMap.put("batch_size", 1); - return new BlockingBuffer<>(new PluginSetting("blocking_buffer", integerHashMap)); + final PluginSetting pluginSetting = new PluginSetting("blocking_buffer", integerHashMap) {{ + setPipelineName(TEST_PIPELINE_NAME); + }}; + return new BlockingBuffer<>(pluginSetting); + } + + /** + * This method should be invoked after {@link HTTPSource::start(Buffer buffer)} to scrape metrics + */ + private void refreshMeasurements() { + final String metricNamePrefix = new StringJoiner(MetricNames.DELIMITER) + .add(TEST_PIPELINE_NAME).add(PLUGIN_NAME).toString(); + requestsReceivedMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(LogHTTPService.REQUESTS_RECEIVED).toString()); + successRequestsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(LogHTTPService.SUCCESS_REQUESTS).toString()); + requestTimeoutsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(LogHTTPService.REQUEST_TIMEOUTS).toString()); + badRequestsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(LogHTTPService.BAD_REQUESTS).toString()); + rejectedRequestsMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(LogThrottlingRejectHandler.REQUESTS_REJECTED).toString()); + requestProcessDurationMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(LogHTTPService.REQUEST_PROCESS_DURATION).toString()); + payloadSizeSummaryMeasurements = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(metricNamePrefix) + .add(LogHTTPService.PAYLOAD_SIZE).toString()); } @BeforeEach @@ -89,20 +144,30 @@ public void setUp() { lenient().when(serverBuilder.build()).thenReturn(server); lenient().when(server.start()).thenReturn(completableFuture); - testPluginSetting = new PluginSetting("http", new HashMap<>()); - testPluginSetting.setPipelineName("pipeline"); + MetricsTestUtil.initMetrics(); + testPluginSetting = new PluginSetting(PLUGIN_NAME, new HashMap<>()) {{ + setPipelineName(TEST_PIPELINE_NAME); + }}; testBuffer = getBuffer(); HTTPSourceUnderTest = new HTTPSource(testPluginSetting); } @AfterEach public void cleanUp() { - HTTPSourceUnderTest.stop(); + if (HTTPSourceUnderTest != null) { + HTTPSourceUnderTest.stop(); + } } @Test public void testHTTPJsonResponse200() { + // Prepare + final String testData = "[{\"log\": \"somelog\"}]"; + final int testPayloadSize = testData.getBytes().length; HTTPSourceUnderTest.start(testBuffer); + refreshMeasurements(); + + // When WebClient.of().execute(RequestHeaders.builder() .scheme(SessionProtocol.HTTP) .authority("127.0.0.1:2021") @@ -110,10 +175,111 @@ public void testHTTPJsonResponse200() { .path("/log/ingest") .contentType(MediaType.JSON_UTF_8) .build(), - HttpData.ofUtf8("[{\"log\": \"somelog\"}]")) + HttpData.ofUtf8(testData)) .aggregate() .whenComplete((i, ex) -> assertThat(i.status()).isEqualTo(HttpStatus.OK)).join(); + + // Then Assertions.assertFalse(testBuffer.isEmpty()); + // Verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestReceivedCount.getValue()); + final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( + successRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, successRequestsCount.getValue()); + final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestProcessDurationCount.getValue()); + final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.MAX); + Assertions.assertTrue(requestProcessDurationMax.getValue() > 0); + final Measurement payloadSizeMax = MetricsTestUtil.getMeasurementFromList( + payloadSizeSummaryMeasurements, Statistic.MAX); + Assertions.assertEquals(testPayloadSize, payloadSizeMax.getValue()); + } + + @Test + public void testHTTPJsonResponse400() { + // Prepare + final String testBadData = "}"; + final int testPayloadSize = testBadData.getBytes().length; + HTTPSourceUnderTest.start(testBuffer); + refreshMeasurements(); + + // When + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority("127.0.0.1:2021") + .method(HttpMethod.POST) + .path("/log/ingest") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testBadData)) + .aggregate() + .whenComplete((i, ex) -> assertThat(i.status()).isEqualTo(HttpStatus.BAD_REQUEST)).join(); + + // Then + Assertions.assertTrue(testBuffer.isEmpty()); + // Verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestReceivedCount.getValue()); + final Measurement badRequestsCount = MetricsTestUtil.getMeasurementFromList( + badRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, badRequestsCount.getValue()); + } + + @Test + public void testHTTPJsonResponse415() throws InterruptedException { + // Prepare + final Map settings = new HashMap<>(); + final int testMaxPendingRequests = 1; + final int testThreadCount = 1; + final int serverTimeoutInMillis = 500; + settings.put(HTTPSourceConfig.REQUEST_TIMEOUT, serverTimeoutInMillis); + settings.put(HTTPSourceConfig.MAX_PENDING_REQUESTS, testMaxPendingRequests); + settings.put(HTTPSourceConfig.THREAD_COUNT, testThreadCount); + testPluginSetting = new PluginSetting(PLUGIN_NAME, settings); + testPluginSetting.setPipelineName(TEST_PIPELINE_NAME); + HTTPSourceUnderTest = new HTTPSource(testPluginSetting); + // Start the source + HTTPSourceUnderTest.start(testBuffer); + refreshMeasurements(); + final RequestHeaders testRequestHeaders = RequestHeaders.builder().scheme(SessionProtocol.HTTP) + .authority("127.0.0.1:2021") + .method(HttpMethod.POST) + .path("/log/ingest") + .contentType(MediaType.JSON_UTF_8) + .build(); + final HttpData testHttpData = HttpData.ofUtf8("[{\"log\": \"somelog\"}]"); + + // Fill in the buffer + WebClient.of().execute(testRequestHeaders, testHttpData).aggregate().whenComplete( + (response, ex) -> assertThat(response.status()).isEqualTo(HttpStatus.OK)).join(); + + // Disable client timeout + WebClient testWebClient = WebClient.builder().responseTimeoutMillis(0).build(); + + // When/Then + testWebClient.execute(testRequestHeaders, testHttpData) + .aggregate() + .whenComplete((i, ex) -> assertThat(i.status()).isEqualTo(HttpStatus.REQUEST_TIMEOUT)) + .join(); + // verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + Assertions.assertEquals(2.0, requestReceivedCount.getValue()); + final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( + successRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, successRequestsCount.getValue()); + final Measurement requestTimeoutsCount = MetricsTestUtil.getMeasurementFromList( + requestTimeoutsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, requestTimeoutsCount.getValue()); + final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.MAX); + final double maxDurationInMillis = 1000 * requestProcessDurationMax.getValue(); + Assertions.assertTrue(maxDurationInMillis > serverTimeoutInMillis); } @Test @@ -127,11 +293,12 @@ public void testHTTPJsonResponse429() throws InterruptedException { settings.put(HTTPSourceConfig.REQUEST_TIMEOUT, serverTimeoutInMillis); settings.put(HTTPSourceConfig.MAX_PENDING_REQUESTS, testMaxPendingRequests); settings.put(HTTPSourceConfig.THREAD_COUNT, testThreadCount); - testPluginSetting = new PluginSetting("http", settings); - testPluginSetting.setPipelineName("pipeline"); + testPluginSetting = new PluginSetting(PLUGIN_NAME, settings); + testPluginSetting.setPipelineName(TEST_PIPELINE_NAME); HTTPSourceUnderTest = new HTTPSource(testPluginSetting); // Start the source HTTPSourceUnderTest.start(testBuffer); + refreshMeasurements(); final RequestHeaders testRequestHeaders = RequestHeaders.builder().scheme(SessionProtocol.HTTP) .authority("127.0.0.1:2021") .method(HttpMethod.POST) @@ -156,12 +323,23 @@ public void testHTTPJsonResponse429() throws InterruptedException { // When/Then testWebClient.execute(testRequestHeaders, testHttpData).aggregate().whenComplete( (response, ex) -> assertThat(response.status()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS)).join(); + // Wait until source server timeout a request processing thread Thread.sleep(serverTimeoutInMillis); // New request should timeout instead of being rejected CompletionException actualException = Assertions.assertThrows( CompletionException.class, () -> testWebClient.execute(testRequestHeaders, testHttpData).aggregate().join()); assertThat(actualException.getCause()).isInstanceOf(ResponseTimeoutException.class); + // verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + Assertions.assertEquals(testMaxPendingRequests + testThreadCount + 2, requestReceivedCount.getValue()); + final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( + successRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, successRequestsCount.getValue()); + final Measurement rejectedRequestsCount = MetricsTestUtil.getMeasurementFromList( + rejectedRequestsMeasurements, Statistic.COUNT); + Assertions.assertEquals(1.0, rejectedRequestsCount.getValue()); } @Test @@ -180,8 +358,8 @@ public void testServerStartCertFileSuccess() throws IOException { settingsMap.put(HTTPSourceConfig.SSL_CERTIFICATE_FILE, TEST_SSL_CERTIFICATE_FILE); settingsMap.put(HTTPSourceConfig.SSL_KEY_FILE, TEST_SSL_KEY_FILE); - testPluginSetting = new PluginSetting(null, settingsMap); - testPluginSetting.setPipelineName("pipeline"); + testPluginSetting = new PluginSetting(PLUGIN_NAME, settingsMap); + testPluginSetting.setPipelineName(TEST_PIPELINE_NAME); HTTPSourceUnderTest = new HTTPSource(testPluginSetting); HTTPSourceUnderTest.start(testBuffer); HTTPSourceUnderTest.stop(); @@ -198,14 +376,13 @@ public void testServerStartCertFileSuccess() throws IOException { @Test void testHTTPSJsonResponse() { - final Map settingsMap = new HashMap<>(); settingsMap.put(HTTPSourceConfig.REQUEST_TIMEOUT, 200); settingsMap.put(HTTPSourceConfig.SSL, true); settingsMap.put(HTTPSourceConfig.SSL_CERTIFICATE_FILE, TEST_SSL_CERTIFICATE_FILE); settingsMap.put(HTTPSourceConfig.SSL_KEY_FILE, TEST_SSL_KEY_FILE); - testPluginSetting = new PluginSetting("http", settingsMap); - testPluginSetting.setPipelineName("pipeline"); + testPluginSetting = new PluginSetting(PLUGIN_NAME, settingsMap); + testPluginSetting.setPipelineName(TEST_PIPELINE_NAME); HTTPSourceUnderTest = new HTTPSource(testPluginSetting); testBuffer = getBuffer(); @@ -233,8 +410,8 @@ public void testDoubleStart() { @Test public void testStartWithEmptyBuffer() { - testPluginSetting = new PluginSetting(null, Collections.emptyMap()); - testPluginSetting.setPipelineName("pipeline"); + testPluginSetting = new PluginSetting(PLUGIN_NAME, Collections.emptyMap()); + testPluginSetting.setPipelineName(TEST_PIPELINE_NAME); final HTTPSource source = new HTTPSource(testPluginSetting); Assertions.assertThrows(IllegalStateException.class, () -> source.start(null)); } diff --git a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java index 45ba3976ca..b02dd52ea2 100644 --- a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java @@ -11,6 +11,7 @@ package com.amazon.dataprepper.plugins.source.loghttp; +import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.buffer.Buffer; import com.amazon.dataprepper.model.record.Record; import com.amazon.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; @@ -21,11 +22,21 @@ import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.HashMap; @@ -33,20 +44,62 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) class LogHTTPServiceTest { private static final ObjectMapper mapper = new ObjectMapper(); private static final int TEST_BUFFER_CAPACITY = 3; private static final int TEST_TIMEOUT_IN_MILLIS = 500; + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter requestsReceivedCounter; + + @Mock + private Counter requestTimeoutsCounter; + + @Mock + private Counter successRequestsCounter; + + @Mock + private Counter badRequestsCounter; + + @Mock + private DistributionSummary payloadSizeSummary; + + @Mock + private Timer requestProcessDuration; + private LogHTTPService logHTTPService; @BeforeEach public void setUp() { + when(pluginMetrics.counter(LogHTTPService.REQUESTS_RECEIVED)).thenReturn(requestsReceivedCounter); + when(pluginMetrics.counter(LogHTTPService.REQUEST_TIMEOUTS)).thenReturn(requestTimeoutsCounter); + when(pluginMetrics.counter(LogHTTPService.SUCCESS_REQUESTS)).thenReturn(successRequestsCounter); + when(pluginMetrics.counter(LogHTTPService.BAD_REQUESTS)).thenReturn(badRequestsCounter); + when(pluginMetrics.summary(LogHTTPService.PAYLOAD_SIZE)).thenReturn(payloadSizeSummary); + when(pluginMetrics.timer(LogHTTPService.REQUEST_PROCESS_DURATION)).thenReturn(requestProcessDuration); + when(requestProcessDuration.record(ArgumentMatchers.>any())).thenAnswer( + (Answer) invocation -> { + final Object[] args = invocation.getArguments(); + @SuppressWarnings("unchecked") + final Supplier supplier = (Supplier) args[0]; + return supplier.get(); + } + ); + Buffer> blockingBuffer = new BlockingBuffer<>(TEST_BUFFER_CAPACITY, 8, "test-pipeline"); - logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer); + logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics); } @Test @@ -59,6 +112,14 @@ public void testHTTPRequestSuccess() throws InterruptedException, ExecutionExcep // Then assertEquals(HttpStatus.OK, postResponse.status()); + verify(requestsReceivedCounter, times(1)).increment(); + verify(requestTimeoutsCounter, never()).increment(); + verify(successRequestsCounter, times(1)).increment(); + verify(badRequestsCounter, never()).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); + assertEquals(testRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); + verify(requestProcessDuration, times(1)).record(ArgumentMatchers.>any()); } @Test @@ -71,6 +132,14 @@ public void testHTTPRequestBadRequest() throws ExecutionException, InterruptedEx // Then assertEquals(HttpStatus.BAD_REQUEST, postResponse.status()); + verify(requestsReceivedCounter, times(1)).increment(); + verify(requestTimeoutsCounter, never()).increment(); + verify(successRequestsCounter, never()).increment(); + verify(badRequestsCounter, times(1)).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); + assertEquals(testBadRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); + verify(requestProcessDuration, times(1)).record(ArgumentMatchers.>any()); } @Test @@ -86,6 +155,14 @@ public void testHTTPRequestTimeout() throws InterruptedException, ExecutionExcep // Then assertEquals(HttpStatus.REQUEST_TIMEOUT, timeoutPostResponse.status()); + verify(requestsReceivedCounter, times(2)).increment(); + verify(requestTimeoutsCounter, times(1)).increment(); + verify(successRequestsCounter, times(1)).increment(); + verify(badRequestsCounter, never()).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(2)).record(payloadLengthCaptor.capture()); + assertEquals(timeoutRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); + verify(requestProcessDuration, times(2)).record(ArgumentMatchers.>any()); } private AggregatedHttpRequest generateRandomValidHTTPRequest(int numJson) throws JsonProcessingException, diff --git a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandlerTest.java b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandlerTest.java index 50ff2547db..2e9a0d739e 100644 --- a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandlerTest.java +++ b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/LogThrottlingRejectHandlerTest.java @@ -11,12 +11,15 @@ package com.amazon.dataprepper.plugins.source.loghttp; +import com.amazon.dataprepper.metrics.PluginMetrics; import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.server.Service; import com.linecorp.armeria.server.ServiceRequestContext; +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -24,6 +27,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class LogThrottlingRejectHandlerTest { @@ -38,12 +44,23 @@ class LogThrottlingRejectHandlerTest { @Mock private HttpRequest httpRequest; + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter rejectedRequestsCounter; + private LogThrottlingRejectHandler objectUnderTest; + @BeforeEach + public void setUp() { + when(pluginMetrics.counter(LogThrottlingRejectHandler.REQUESTS_REJECTED)).thenReturn(rejectedRequestsCounter); + } + @Test public void testHandleRejected() throws Exception { // Prepare - objectUnderTest = new LogThrottlingRejectHandler(TEST_MAX_PENDING_REQUEST); + objectUnderTest = new LogThrottlingRejectHandler(TEST_MAX_PENDING_REQUEST, pluginMetrics); // When HttpResponse httpResponse = objectUnderTest.handleRejected(service, serviceRequestContext, httpRequest, new Throwable()); @@ -52,5 +69,6 @@ public void testHandleRejected() throws Exception { // Then assertEquals(HttpStatus.TOO_MANY_REQUESTS, aggregatedHttpResponse.status()); assertTrue(aggregatedHttpResponse.contentUtf8().contains(String.format("max_pending_requests:%d", TEST_MAX_PENDING_REQUEST))); + verify(rejectedRequestsCounter, times(1)).increment(); } } \ No newline at end of file