From a233c9f2824071f5a884b6d438470ccd77d3f3c3 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Wed, 29 Jun 2022 11:11:48 -0500 Subject: [PATCH] Enable HTTP Health Check for OTelTraceSource and OTelMetricsSource. (#1547) * Enable HTTP Health Check for OTelTraceSource and OTelMetricsSource. * Updated Readme file and added unit test for configurations Signed-off-by: Dinu John --- .../otel-metrics-source/README.md | 4 +- .../source/otelmetrics/OTelMetricsSource.java | 6 ++ .../otelmetrics/OTelMetricsSourceConfig.java | 4 + .../otelmetrics/OTelMetricsSourceTest.java | 88 +++++++++++++++++++ .../OtelMetricsSourceConfigTests.java | 39 ++++++++ .../otel-trace-source/README.md | 4 +- .../source/oteltrace/OTelTraceSource.java | 6 ++ .../oteltrace/OTelTraceSourceConfig.java | 4 + .../source/oteltrace/OTelTraceSourceTest.java | 88 +++++++++++++++++++ .../oteltrace/OtelTraceSourceConfigTests.java | 39 ++++++++ 10 files changed, 278 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/otel-metrics-source/README.md b/data-prepper-plugins/otel-metrics-source/README.md index f8daed7d55..906561f3a4 100644 --- a/data-prepper-plugins/otel-metrics-source/README.md +++ b/data-prepper-plugins/otel-metrics-source/README.md @@ -16,9 +16,9 @@ source: * port(Optional) => An `int` represents the port Otel metrics source is running on. Default is ```21891```. * request_timeout(Optional) => An `int` represents request timeout in millis. Default is ```10_000```. -* health_check_service(Optional) => A boolean enables a gRPC health check service under ```grpc.health.v1 / Health / Check```. Default is ```false```. +* health_check_service(Optional) => A boolean enables health check service. When ```true``` enables a gRPC health check service under ```grpc.health.v1.Health/Check```. Default is ```false```. In order to use the health check service, you must also enable ```proto_reflection_service```. * proto_reflection_service(Optional) => A boolean enables a reflection service for Protobuf services (see [ProtoReflectionService](https://grpc.github.io/grpc-java/javadoc/io/grpc/protobuf/services/ProtoReflectionService.html) and [gRPC reflection](https://github.com/grpc/grpc-java/blob/master/documentation/server-reflection-tutorial.md) docs). Default is ```false```. -* unframed_requests(Optional) => A boolean to enable requests not framed using the gRPC wire protocol. +* unframed_requests(Optional) => A boolean to enable requests not framed using the gRPC wire protocol. When ```health_check_service``` is true and ```unframed_requests``` is true, enables HTTP health check service under ```/health```. * thread_count(Optional) => the number of threads to keep in the ScheduledThreadPool. Default is `200`. * max_connection_count(Optional) => the maximum allowed number of open connections. Default is `500`. diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java b/data-prepper-plugins/otel-metrics-source/src/main/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java index 33ab7f224b..5191919a08 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java @@ -24,6 +24,7 @@ import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.server.healthcheck.HealthCheckService; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; import io.grpc.protobuf.services.ProtoReflectionService; @@ -44,6 +45,7 @@ @DataPrepperPlugin(name = "otel_metrics_source", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class) public class OTelMetricsSource implements Source> { private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsSource.class); + private static final String HTTP_HEALTH_CHECK_PATH = "/health"; private final OTelMetricsSourceConfig oTelMetricsSourceConfig; private Server server; private final PluginMetrics pluginMetrics; @@ -106,6 +108,10 @@ public void start(Buffer> buffer) { sb.disableServerHeader(); sb.service(grpcServiceBuilder.build()); + if(oTelMetricsSourceConfig.enableHttpHealthCheck()) { + sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.of()); + } + final Optional> optionalHttpAuthenticationService = authenticationProvider.getHttpAuthenticationService(); optionalHttpAuthenticationService.ifPresent(sb::decorator); diff --git a/data-prepper-plugins/otel-metrics-source/src/main/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java b/data-prepper-plugins/otel-metrics-source/src/main/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java index b718270e20..8a0ba611d1 100644 --- a/data-prepper-plugins/otel-metrics-source/src/main/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java +++ b/data-prepper-plugins/otel-metrics-source/src/main/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java @@ -131,6 +131,10 @@ public boolean hasHealthCheck() { return healthCheck; } + public boolean enableHttpHealthCheck() { + return enableUnframedRequests() && hasHealthCheck(); + } + public boolean hasProtoReflectionService() { return protoReflectionService; } diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index ef5714352d..be76075e44 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/com/amazon/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -34,6 +34,7 @@ import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.server.healthcheck.HealthCheckService; import io.grpc.BindableService; import io.grpc.ServerServiceDefinition; import io.netty.util.AsciiString; @@ -359,6 +360,50 @@ void start_with_Health_configured_includes_HealthCheck_service() throws IOExcept verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false); verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true); verify(grpcServiceBuilder).addService(isA(HealthGrpcService.class)); + verify(serverBuilder, never()).service(eq("/health"),isA(HealthCheckService.class)); + } + + @Test + void start_with_Health_configured_unframed_requests_includes_HealthCheck_service() throws IOException { + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class); + MockedStatic grpcServerMock = Mockito.mockStatic(GrpcService.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); + + when(server.stop()).thenReturn(completableFuture); + final Path certFilePath = Path.of("data/certificate/test_cert.crt"); + final Path keyFilePath = Path.of("data/certificate/test_decrypted_key.key"); + final String certAsString = Files.readString(certFilePath); + final String keyAsString = Files.readString(keyFilePath); + when(certificate.getCertificate()).thenReturn(certAsString); + when(certificate.getPrivateKey()).thenReturn(keyAsString); + when(certificateProvider.getCertificate()).thenReturn(certificate); + when(certificateProviderFactory.getCertificateProvider()).thenReturn(certificateProvider); + final Map settingsMap = new HashMap<>(); + settingsMap.put(SSL, true); + settingsMap.put("useAcmCertForSSL", true); + settingsMap.put("awsRegion", "us-east-1"); + settingsMap.put("acmCertificateArn", "arn:aws:acm:us-east-1:account:certificate/1234-567-856456"); + settingsMap.put("sslKeyCertChainFile", "data/certificate/test_cert.crt"); + settingsMap.put("sslKeyFile", "data/certificate/test_decrypted_key.key"); + settingsMap.put("health_check_service", "true"); + settingsMap.put("unframed_requests", "true"); + + testPluginSetting = new PluginSetting(null, settingsMap); + testPluginSetting.setPipelineName("pipeline"); + + oTelMetricsSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelMetricsSourceConfig.class); + final OTelMetricsSource source = new OTelMetricsSource(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, certificateProviderFactory); + source.start(buffer); + source.stop(); + } + + verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false); + verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true); + verify(grpcServiceBuilder).addService(isA(HealthGrpcService.class)); + verify(serverBuilder).service(eq("/health"), isA(HealthCheckService.class)); } @Test @@ -399,6 +444,49 @@ void start_without_Health_configured_does_not_include_HealthCheck_service() thro verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false); verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true); verify(grpcServiceBuilder, never()).addService(isA(HealthGrpcService.class)); + verify(serverBuilder, never()).service(eq("/health"),isA(HealthCheckService.class)); + } + + @Test + void start_without_Health_configured_unframed_requests_does_not_include_HealthCheck_service() throws IOException { + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class); + MockedStatic grpcServerMock = Mockito.mockStatic(GrpcService.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); + + when(server.stop()).thenReturn(completableFuture); + final Path certFilePath = Path.of("data/certificate/test_cert.crt"); + final Path keyFilePath = Path.of("data/certificate/test_decrypted_key.key"); + final String certAsString = Files.readString(certFilePath); + final String keyAsString = Files.readString(keyFilePath); + when(certificate.getCertificate()).thenReturn(certAsString); + when(certificate.getPrivateKey()).thenReturn(keyAsString); + when(certificateProvider.getCertificate()).thenReturn(certificate); + when(certificateProviderFactory.getCertificateProvider()).thenReturn(certificateProvider); + final Map settingsMap = new HashMap<>(); + settingsMap.put(SSL, true); + settingsMap.put("useAcmCertForSSL", true); + settingsMap.put("awsRegion", "us-east-1"); + settingsMap.put("acmCertificateArn", "arn:aws:acm:us-east-1:account:certificate/1234-567-856456"); + settingsMap.put("sslKeyCertChainFile", "data/certificate/test_cert.crt"); + settingsMap.put("sslKeyFile", "data/certificate/test_decrypted_key.key"); + settingsMap.put("health_check_service", "false"); + settingsMap.put("unframed_requests", "true"); + + testPluginSetting = new PluginSetting(null, settingsMap); + testPluginSetting.setPipelineName("pipeline"); + oTelMetricsSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelMetricsSourceConfig.class); + final OTelMetricsSource source = new OTelMetricsSource(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, certificateProviderFactory); + source.start(buffer); + source.stop(); + } + + verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false); + verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true); + verify(grpcServiceBuilder, never()).addService(isA(HealthGrpcService.class)); + verify(serverBuilder, never()).service(eq("/health"),isA(HealthCheckService.class)); } @Test diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/com/amazon/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java b/data-prepper-plugins/otel-metrics-source/src/test/java/com/amazon/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java index dbe06ac156..fe2e84a040 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/com/amazon/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/com/amazon/dataprepper/plugins/source/otelmetrics/OtelMetricsSourceConfigTests.java @@ -48,6 +48,7 @@ public void testDefault() { assertEquals(DEFAULT_THREAD_COUNT, otelMetricsSourceConfig.getThreadCount()); assertEquals(OTelMetricsSourceConfig.DEFAULT_MAX_CONNECTION_COUNT, otelMetricsSourceConfig.getMaxConnectionCount()); assertFalse(otelMetricsSourceConfig.hasHealthCheck()); + assertFalse(otelMetricsSourceConfig.enableHttpHealthCheck()); assertFalse(otelMetricsSourceConfig.hasProtoReflectionService()); assertFalse(otelMetricsSourceConfig.isSslCertAndKeyFileInS3()); assertTrue(otelMetricsSourceConfig.isSsl()); @@ -55,6 +56,42 @@ public void testDefault() { assertNull(otelMetricsSourceConfig.getSslKeyFile()); } + @Test + public void testHttpHealthCheckWithUnframedRequestEnabled() { + // Prepare + final Map settings = new HashMap<>(); + settings.put(OTelMetricsSourceConfig.ENABLE_UNFRAMED_REQUESTS, "true"); + settings.put(OTelMetricsSourceConfig.HEALTH_CHECK_SERVICE, "true"); + settings.put(OTelMetricsSourceConfig.PROTO_REFLECTION_SERVICE, "true"); + + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, settings); + final OTelMetricsSourceConfig otelMetricsSourceConfig = OBJECT_MAPPER.convertValue(pluginSetting.getSettings(), OTelMetricsSourceConfig.class); + + // When/Then + assertTrue(otelMetricsSourceConfig.hasHealthCheck()); + assertTrue(otelMetricsSourceConfig.enableUnframedRequests()); + assertTrue(otelMetricsSourceConfig.hasProtoReflectionService()); + assertTrue(otelMetricsSourceConfig.enableHttpHealthCheck()); + } + + @Test + public void testHttpHealthCheckWithUnframedRequestDisabled() { + // Prepare + final Map settings = new HashMap<>(); + settings.put(OTelMetricsSourceConfig.ENABLE_UNFRAMED_REQUESTS, "false"); + settings.put(OTelMetricsSourceConfig.HEALTH_CHECK_SERVICE, "true"); + settings.put(OTelMetricsSourceConfig.PROTO_REFLECTION_SERVICE, "true"); + + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, settings); + final OTelMetricsSourceConfig otelMetricsSourceConfig = OBJECT_MAPPER.convertValue(pluginSetting.getSettings(), OTelMetricsSourceConfig.class); + + // When/Then + assertTrue(otelMetricsSourceConfig.hasHealthCheck()); + assertFalse(otelMetricsSourceConfig.enableUnframedRequests()); + assertTrue(otelMetricsSourceConfig.hasProtoReflectionService()); + assertFalse(otelMetricsSourceConfig.enableHttpHealthCheck()); + } + @Test public void testValidConfigWithoutS3CertAndKey() { // Prepare @@ -81,6 +118,7 @@ public void testValidConfigWithoutS3CertAndKey() { assertEquals(TEST_MAX_CONNECTION_COUNT, otelMetricsSourceConfig.getMaxConnectionCount()); assertTrue(otelMetricsSourceConfig.hasHealthCheck()); assertTrue(otelMetricsSourceConfig.hasProtoReflectionService()); + assertFalse(otelMetricsSourceConfig.enableHttpHealthCheck()); assertTrue(otelMetricsSourceConfig.isSsl()); assertFalse(otelMetricsSourceConfig.isSslCertAndKeyFileInS3()); assertEquals(TEST_KEY_CERT, otelMetricsSourceConfig.getSslKeyCertChainFile()); @@ -113,6 +151,7 @@ public void testValidConfigWithS3CertAndKey() { assertEquals(TEST_THREAD_COUNT, otelMetricsSourceConfig.getThreadCount()); assertEquals(TEST_MAX_CONNECTION_COUNT, otelMetricsSourceConfig.getMaxConnectionCount()); assertFalse(otelMetricsSourceConfig.hasHealthCheck()); + assertFalse(otelMetricsSourceConfig.enableHttpHealthCheck()); assertFalse(otelMetricsSourceConfig.hasProtoReflectionService()); assertTrue(otelMetricsSourceConfig.isSsl()); assertTrue(otelMetricsSourceConfig.isSslCertAndKeyFileInS3()); diff --git a/data-prepper-plugins/otel-trace-source/README.md b/data-prepper-plugins/otel-trace-source/README.md index cb6ce09bf0..7d777a1598 100644 --- a/data-prepper-plugins/otel-trace-source/README.md +++ b/data-prepper-plugins/otel-trace-source/README.md @@ -14,9 +14,9 @@ source: * port(Optional) => An `int` represents the port Otel trace source is running on. Default is ```21890```. * request_timeout(Optional) => An `int` represents request timeout in millis. Default is ```10_000```. -* health_check_service(Optional) => A boolean enables a gRPC health check service under ```grpc.health.v1 / Health / Check```. Default is ```false```. +* health_check_service(Optional) => A boolean enables health check service. When ```true``` enables a gRPC health check service under ```grpc.health.v1.Health/Check```. Default is ```false```. In order to use the health check service, you must also enable ```proto_reflection_service```. * proto_reflection_service(Optional) => A boolean enables a reflection service for Protobuf services (see [ProtoReflectionService](https://grpc.github.io/grpc-java/javadoc/io/grpc/protobuf/services/ProtoReflectionService.html) and [gRPC reflection](https://github.com/grpc/grpc-java/blob/master/documentation/server-reflection-tutorial.md) docs). Default is ```false```. -* unframed_requests(Optional) => A boolean to enable requests not framed using the gRPC wire protocol. +* unframed_requests(Optional) => A boolean to enable requests not framed using the gRPC wire protocol. When ```health_check_service``` is true and ```unframed_requests``` is true, enables HTTP health check service under ```/health```. * thread_count(Optional) => the number of threads to keep in the ScheduledThreadPool. Default is `200`. * max_connection_count(Optional) => the maximum allowed number of open connections. Default is `500`. * authentication(Optional) => An authentication configuration. By default, this runs an unauthenticated server. See below for more information. diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSource.java b/data-prepper-plugins/otel-trace-source/src/main/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSource.java index fdd5537b14..69f743b299 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSource.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSource.java @@ -25,6 +25,7 @@ import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.server.healthcheck.HealthCheckService; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; import io.grpc.protobuf.services.ProtoReflectionService; @@ -43,6 +44,7 @@ @DataPrepperPlugin(name = "otel_trace_source", pluginType = Source.class, pluginConfigurationType = OTelTraceSourceConfig.class) public class OTelTraceSource implements Source> { private static final Logger LOG = LoggerFactory.getLogger(OTelTraceSource.class); + private static final String HTTP_HEALTH_CHECK_PATH = "/health"; private final OTelTraceSourceConfig oTelTraceSourceConfig; private Server server; private final PluginMetrics pluginMetrics; @@ -107,6 +109,10 @@ public void start(Buffer> buffer) { sb.disableServerHeader(); sb.service(grpcServiceBuilder.build()); + if(oTelTraceSourceConfig.enableHttpHealthCheck()) { + sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.of()); + } + final Optional> optionalHttpAuthenticationService = authenticationProvider.getHttpAuthenticationService(); optionalHttpAuthenticationService.ifPresent(sb::decorator); diff --git a/data-prepper-plugins/otel-trace-source/src/main/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java b/data-prepper-plugins/otel-trace-source/src/main/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java index 4cd7ae03bd..f61462f1ec 100644 --- a/data-prepper-plugins/otel-trace-source/src/main/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java +++ b/data-prepper-plugins/otel-trace-source/src/main/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSourceConfig.java @@ -135,6 +135,10 @@ public boolean hasHealthCheck() { return healthCheck; } + public boolean enableHttpHealthCheck() { + return enableUnframedRequests() && hasHealthCheck(); + } + public boolean hasProtoReflectionService() { return protoReflectionService; } diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java index 1ac5a77309..90022e1774 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/com/amazon/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java @@ -32,6 +32,7 @@ import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.grpc.GrpcService; import com.linecorp.armeria.server.grpc.GrpcServiceBuilder; +import com.linecorp.armeria.server.healthcheck.HealthCheckService; import io.grpc.BindableService; import io.grpc.ServerServiceDefinition; import io.netty.util.AsciiString; @@ -415,6 +416,50 @@ void start_with_Health_configured_includes_HealthCheck_service() throws IOExcept verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false); verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true); verify(grpcServiceBuilder).addService(isA(HealthGrpcService.class)); + verify(serverBuilder, never()).service(eq("/health"),isA(HealthCheckService.class)); + } + + @Test + void start_with_Health_configured_unframed_requests_includes_HTTPHealthCheck_service() throws IOException { + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class); + MockedStatic grpcServerMock = Mockito.mockStatic(GrpcService.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); + + when(server.stop()).thenReturn(completableFuture); + final Path certFilePath = Path.of("data/certificate/test_cert.crt"); + final Path keyFilePath = Path.of("data/certificate/test_decrypted_key.key"); + final String certAsString = Files.readString(certFilePath); + final String keyAsString = Files.readString(keyFilePath); + when(certificate.getCertificate()).thenReturn(certAsString); + when(certificate.getPrivateKey()).thenReturn(keyAsString); + when(certificateProvider.getCertificate()).thenReturn(certificate); + when(certificateProviderFactory.getCertificateProvider()).thenReturn(certificateProvider); + final Map settingsMap = new HashMap<>(); + settingsMap.put(SSL, true); + settingsMap.put("useAcmCertForSSL", true); + settingsMap.put("awsRegion", "us-east-1"); + settingsMap.put("acmCertificateArn", "arn:aws:acm:us-east-1:account:certificate/1234-567-856456"); + settingsMap.put("sslKeyCertChainFile", "data/certificate/test_cert.crt"); + settingsMap.put("sslKeyFile", "data/certificate/test_decrypted_key.key"); + settingsMap.put("health_check_service", "true"); + settingsMap.put("unframed_requests", "true"); + + testPluginSetting = new PluginSetting(null, settingsMap); + testPluginSetting.setPipelineName("pipeline"); + + oTelTraceSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelTraceSourceConfig.class); + final OTelTraceSource source = new OTelTraceSource(oTelTraceSourceConfig, pluginMetrics, pluginFactory, certificateProviderFactory); + source.start(buffer); + source.stop(); + } + + verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false); + verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true); + verify(grpcServiceBuilder).addService(isA(HealthGrpcService.class)); + verify(serverBuilder).service(eq("/health"), isA(HealthCheckService.class)); } @Test @@ -455,6 +500,49 @@ void start_without_Health_configured_does_not_include_HealthCheck_service() thro verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false); verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true); verify(grpcServiceBuilder, never()).addService(isA(HealthGrpcService.class)); + verify(serverBuilder, never()).service(eq("/health"),isA(HealthCheckService.class)); + } + + @Test + void start_without_Health_configured_unframed_requests_does_not_include_HealthCheck_service() throws IOException { + try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class); + MockedStatic grpcServerMock = Mockito.mockStatic(GrpcService.class)) { + armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); + grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder); + when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder); + + when(server.stop()).thenReturn(completableFuture); + final Path certFilePath = Path.of("data/certificate/test_cert.crt"); + final Path keyFilePath = Path.of("data/certificate/test_decrypted_key.key"); + final String certAsString = Files.readString(certFilePath); + final String keyAsString = Files.readString(keyFilePath); + when(certificate.getCertificate()).thenReturn(certAsString); + when(certificate.getPrivateKey()).thenReturn(keyAsString); + when(certificateProvider.getCertificate()).thenReturn(certificate); + when(certificateProviderFactory.getCertificateProvider()).thenReturn(certificateProvider); + final Map settingsMap = new HashMap<>(); + settingsMap.put(SSL, true); + settingsMap.put("useAcmCertForSSL", true); + settingsMap.put("awsRegion", "us-east-1"); + settingsMap.put("acmCertificateArn", "arn:aws:acm:us-east-1:account:certificate/1234-567-856456"); + settingsMap.put("sslKeyCertChainFile", "data/certificate/test_cert.crt"); + settingsMap.put("sslKeyFile", "data/certificate/test_decrypted_key.key"); + settingsMap.put("health_check_service", "false"); + settingsMap.put("unframed_requests", "true"); + + testPluginSetting = new PluginSetting(null, settingsMap); + testPluginSetting.setPipelineName("pipeline"); + oTelTraceSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelTraceSourceConfig.class); + final OTelTraceSource source = new OTelTraceSource(oTelTraceSourceConfig, pluginMetrics, pluginFactory, certificateProviderFactory); + source.start(buffer); + source.stop(); + } + + verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false); + verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true); + verify(grpcServiceBuilder, never()).addService(isA(HealthGrpcService.class)); + verify(serverBuilder, never()).service(eq("/health"),isA(HealthCheckService.class)); } @Test diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/com/amazon/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java b/data-prepper-plugins/otel-trace-source/src/test/java/com/amazon/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java index 25ed1fe382..872a07e87e 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/com/amazon/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/com/amazon/dataprepper/plugins/source/oteltrace/OtelTraceSourceConfigTests.java @@ -52,12 +52,49 @@ public void testDefault() { assertEquals(OTelTraceSourceConfig.DEFAULT_RECORD_TYPE, otelTraceSourceConfig.getRecordType()); assertFalse(otelTraceSourceConfig.hasHealthCheck()); assertFalse(otelTraceSourceConfig.hasProtoReflectionService()); + assertFalse(otelTraceSourceConfig.enableHttpHealthCheck()); assertFalse(otelTraceSourceConfig.isSslCertAndKeyFileInS3()); assertTrue(otelTraceSourceConfig.isSsl()); assertNull(otelTraceSourceConfig.getSslKeyCertChainFile()); assertNull(otelTraceSourceConfig.getSslKeyFile()); } + @Test + public void testHttpHealthCheckWithUnframedRequestEnabled() { + // Prepare + final Map settings = new HashMap<>(); + settings.put(OTelTraceSourceConfig.ENABLE_UNFRAMED_REQUESTS, "true"); + settings.put(OTelTraceSourceConfig.HEALTH_CHECK_SERVICE, "true"); + settings.put(OTelTraceSourceConfig.PROTO_REFLECTION_SERVICE, "true"); + + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, settings); + final OTelTraceSourceConfig otelTraceSourceConfig = OBJECT_MAPPER.convertValue(pluginSetting.getSettings(), OTelTraceSourceConfig.class); + + // When/Then + assertTrue(otelTraceSourceConfig.hasHealthCheck()); + assertTrue(otelTraceSourceConfig.enableUnframedRequests()); + assertTrue(otelTraceSourceConfig.hasProtoReflectionService()); + assertTrue(otelTraceSourceConfig.enableHttpHealthCheck()); + } + + @Test + public void testHttpHealthCheckWithUnframedRequestDisabled() { + // Prepare + final Map settings = new HashMap<>(); + settings.put(OTelTraceSourceConfig.ENABLE_UNFRAMED_REQUESTS, "false"); + settings.put(OTelTraceSourceConfig.HEALTH_CHECK_SERVICE, "true"); + settings.put(OTelTraceSourceConfig.PROTO_REFLECTION_SERVICE, "true"); + + final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, settings); + final OTelTraceSourceConfig otelTraceSourceConfig = OBJECT_MAPPER.convertValue(pluginSetting.getSettings(), OTelTraceSourceConfig.class); + + // When/Then + assertTrue(otelTraceSourceConfig.hasHealthCheck()); + assertFalse(otelTraceSourceConfig.enableUnframedRequests()); + assertTrue(otelTraceSourceConfig.hasProtoReflectionService()); + assertFalse(otelTraceSourceConfig.enableHttpHealthCheck()); + } + @Test public void testValidConfigWithoutS3CertAndKey() { // Prepare @@ -85,6 +122,7 @@ public void testValidConfigWithoutS3CertAndKey() { assertEquals(TEST_MAX_CONNECTION_COUNT, otelTraceSourceConfig.getMaxConnectionCount()); assertTrue(otelTraceSourceConfig.hasHealthCheck()); assertTrue(otelTraceSourceConfig.hasProtoReflectionService()); + assertFalse(otelTraceSourceConfig.enableHttpHealthCheck()); assertTrue(otelTraceSourceConfig.isSsl()); assertFalse(otelTraceSourceConfig.isSslCertAndKeyFileInS3()); assertEquals(TEST_KEY_CERT, otelTraceSourceConfig.getSslKeyCertChainFile()); @@ -119,6 +157,7 @@ public void testValidConfigWithS3CertAndKey() { assertEquals(TEST_MAX_CONNECTION_COUNT, otelTraceSourceConfig.getMaxConnectionCount()); assertFalse(otelTraceSourceConfig.hasHealthCheck()); assertFalse(otelTraceSourceConfig.hasProtoReflectionService()); + assertFalse(otelTraceSourceConfig.enableHttpHealthCheck()); assertTrue(otelTraceSourceConfig.isSsl()); assertTrue(otelTraceSourceConfig.isSslCertAndKeyFileInS3()); assertEquals(TEST_KEY_CERT_S3, otelTraceSourceConfig.getSslKeyCertChainFile());