Skip to content

Commit

Permalink
Enable HTTP Health Check for OTelTraceSource and OTelMetricsSource. (o…
Browse files Browse the repository at this point in the history
…pensearch-project#1547)

* Enable HTTP Health Check for OTelTraceSource and OTelMetricsSource.
* Updated Readme file and added unit test for configurations

Signed-off-by: Dinu John <[email protected]>
Signed-off-by: Finn Roblin <[email protected]>
  • Loading branch information
dinujoh authored and finnroblin committed Jul 11, 2022
1 parent 5728b53 commit 98941bf
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 4 deletions.
4 changes: 2 additions & 2 deletions data-prepper-plugins/otel-metrics-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ source:

* port(Optional) => An `int` represents the port Otel metrics source is running on. Default is ```21891```.
* request_timeout(Optional) => An `int` represents request timeout in millis. Default is ```10_000```.
* health_check_service(Optional) => A boolean enables a gRPC health check service under ```grpc.health.v1 / Health / Check```. Default is ```false```.
* health_check_service(Optional) => A boolean enables health check service. When ```true``` enables a gRPC health check service under ```grpc.health.v1.Health/Check```. Default is ```false```. In order to use the health check service, you must also enable ```proto_reflection_service```.
* proto_reflection_service(Optional) => A boolean enables a reflection service for Protobuf services (see [ProtoReflectionService](https://grpc.github.io/grpc-java/javadoc/io/grpc/protobuf/services/ProtoReflectionService.html) and [gRPC reflection](https://github.com/grpc/grpc-java/blob/master/documentation/server-reflection-tutorial.md) docs). Default is ```false```.
* unframed_requests(Optional) => A boolean to enable requests not framed using the gRPC wire protocol.
* unframed_requests(Optional) => A boolean to enable requests not framed using the gRPC wire protocol. When ```health_check_service``` is true and ```unframed_requests``` is true, enables HTTP health check service under ```/health```.
* thread_count(Optional) => the number of threads to keep in the ScheduledThreadPool. Default is `200`.
* max_connection_count(Optional) => the maximum allowed number of open connections. Default is `500`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.server.grpc.GrpcServiceBuilder;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.protobuf.services.ProtoReflectionService;
Expand All @@ -44,6 +45,7 @@
@DataPrepperPlugin(name = "otel_metrics_source", pluginType = Source.class, pluginConfigurationType = OTelMetricsSourceConfig.class)
public class OTelMetricsSource implements Source<Record<ExportMetricsServiceRequest>> {
private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsSource.class);
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
private final OTelMetricsSourceConfig oTelMetricsSourceConfig;
private Server server;
private final PluginMetrics pluginMetrics;
Expand Down Expand Up @@ -106,6 +108,10 @@ public void start(Buffer<Record<ExportMetricsServiceRequest>> buffer) {
sb.disableServerHeader();
sb.service(grpcServiceBuilder.build());

if(oTelMetricsSourceConfig.enableHttpHealthCheck()) {
sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.of());
}

final Optional<Function<? super HttpService, ? extends HttpService>> optionalHttpAuthenticationService =
authenticationProvider.getHttpAuthenticationService();
optionalHttpAuthenticationService.ifPresent(sb::decorator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ public boolean hasHealthCheck() {
return healthCheck;
}

public boolean enableHttpHealthCheck() {
return enableUnframedRequests() && hasHealthCheck();
}

public boolean hasProtoReflectionService() {
return protoReflectionService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.server.grpc.GrpcServiceBuilder;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import io.grpc.BindableService;
import io.grpc.ServerServiceDefinition;
import io.netty.util.AsciiString;
Expand Down Expand Up @@ -359,6 +360,50 @@ void start_with_Health_configured_includes_HealthCheck_service() throws IOExcept
verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false);
verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true);
verify(grpcServiceBuilder).addService(isA(HealthGrpcService.class));
verify(serverBuilder, never()).service(eq("/health"),isA(HealthCheckService.class));
}

@Test
void start_with_Health_configured_unframed_requests_includes_HealthCheck_service() throws IOException {
try (MockedStatic<Server> armeriaServerMock = Mockito.mockStatic(Server.class);
MockedStatic<GrpcService> grpcServerMock = Mockito.mockStatic(GrpcService.class)) {
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder);

when(server.stop()).thenReturn(completableFuture);
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final Path keyFilePath = Path.of("data/certificate/test_decrypted_key.key");
final String certAsString = Files.readString(certFilePath);
final String keyAsString = Files.readString(keyFilePath);
when(certificate.getCertificate()).thenReturn(certAsString);
when(certificate.getPrivateKey()).thenReturn(keyAsString);
when(certificateProvider.getCertificate()).thenReturn(certificate);
when(certificateProviderFactory.getCertificateProvider()).thenReturn(certificateProvider);
final Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put(SSL, true);
settingsMap.put("useAcmCertForSSL", true);
settingsMap.put("awsRegion", "us-east-1");
settingsMap.put("acmCertificateArn", "arn:aws:acm:us-east-1:account:certificate/1234-567-856456");
settingsMap.put("sslKeyCertChainFile", "data/certificate/test_cert.crt");
settingsMap.put("sslKeyFile", "data/certificate/test_decrypted_key.key");
settingsMap.put("health_check_service", "true");
settingsMap.put("unframed_requests", "true");

testPluginSetting = new PluginSetting(null, settingsMap);
testPluginSetting.setPipelineName("pipeline");

oTelMetricsSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelMetricsSourceConfig.class);
final OTelMetricsSource source = new OTelMetricsSource(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, certificateProviderFactory);
source.start(buffer);
source.stop();
}

verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false);
verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true);
verify(grpcServiceBuilder).addService(isA(HealthGrpcService.class));
verify(serverBuilder).service(eq("/health"), isA(HealthCheckService.class));
}

@Test
Expand Down Expand Up @@ -399,6 +444,49 @@ void start_without_Health_configured_does_not_include_HealthCheck_service() thro
verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false);
verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true);
verify(grpcServiceBuilder, never()).addService(isA(HealthGrpcService.class));
verify(serverBuilder, never()).service(eq("/health"),isA(HealthCheckService.class));
}

@Test
void start_without_Health_configured_unframed_requests_does_not_include_HealthCheck_service() throws IOException {
try (MockedStatic<Server> armeriaServerMock = Mockito.mockStatic(Server.class);
MockedStatic<GrpcService> grpcServerMock = Mockito.mockStatic(GrpcService.class)) {
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
grpcServerMock.when(GrpcService::builder).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.addService(any(ServerServiceDefinition.class))).thenReturn(grpcServiceBuilder);
when(grpcServiceBuilder.useClientTimeoutHeader(anyBoolean())).thenReturn(grpcServiceBuilder);

when(server.stop()).thenReturn(completableFuture);
final Path certFilePath = Path.of("data/certificate/test_cert.crt");
final Path keyFilePath = Path.of("data/certificate/test_decrypted_key.key");
final String certAsString = Files.readString(certFilePath);
final String keyAsString = Files.readString(keyFilePath);
when(certificate.getCertificate()).thenReturn(certAsString);
when(certificate.getPrivateKey()).thenReturn(keyAsString);
when(certificateProvider.getCertificate()).thenReturn(certificate);
when(certificateProviderFactory.getCertificateProvider()).thenReturn(certificateProvider);
final Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put(SSL, true);
settingsMap.put("useAcmCertForSSL", true);
settingsMap.put("awsRegion", "us-east-1");
settingsMap.put("acmCertificateArn", "arn:aws:acm:us-east-1:account:certificate/1234-567-856456");
settingsMap.put("sslKeyCertChainFile", "data/certificate/test_cert.crt");
settingsMap.put("sslKeyFile", "data/certificate/test_decrypted_key.key");
settingsMap.put("health_check_service", "false");
settingsMap.put("unframed_requests", "true");

testPluginSetting = new PluginSetting(null, settingsMap);
testPluginSetting.setPipelineName("pipeline");
oTelMetricsSourceConfig = OBJECT_MAPPER.convertValue(testPluginSetting.getSettings(), OTelMetricsSourceConfig.class);
final OTelMetricsSource source = new OTelMetricsSource(oTelMetricsSourceConfig, pluginMetrics, pluginFactory, certificateProviderFactory);
source.start(buffer);
source.stop();
}

verify(grpcServiceBuilder, times(1)).useClientTimeoutHeader(false);
verify(grpcServiceBuilder, times(1)).useBlockingTaskExecutor(true);
verify(grpcServiceBuilder, never()).addService(isA(HealthGrpcService.class));
verify(serverBuilder, never()).service(eq("/health"),isA(HealthCheckService.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,50 @@ public void testDefault() {
assertEquals(DEFAULT_THREAD_COUNT, otelMetricsSourceConfig.getThreadCount());
assertEquals(OTelMetricsSourceConfig.DEFAULT_MAX_CONNECTION_COUNT, otelMetricsSourceConfig.getMaxConnectionCount());
assertFalse(otelMetricsSourceConfig.hasHealthCheck());
assertFalse(otelMetricsSourceConfig.enableHttpHealthCheck());
assertFalse(otelMetricsSourceConfig.hasProtoReflectionService());
assertFalse(otelMetricsSourceConfig.isSslCertAndKeyFileInS3());
assertTrue(otelMetricsSourceConfig.isSsl());
assertNull(otelMetricsSourceConfig.getSslKeyCertChainFile());
assertNull(otelMetricsSourceConfig.getSslKeyFile());
}

@Test
public void testHttpHealthCheckWithUnframedRequestEnabled() {
// Prepare
final Map<String, Object> settings = new HashMap<>();
settings.put(OTelMetricsSourceConfig.ENABLE_UNFRAMED_REQUESTS, "true");
settings.put(OTelMetricsSourceConfig.HEALTH_CHECK_SERVICE, "true");
settings.put(OTelMetricsSourceConfig.PROTO_REFLECTION_SERVICE, "true");

final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, settings);
final OTelMetricsSourceConfig otelMetricsSourceConfig = OBJECT_MAPPER.convertValue(pluginSetting.getSettings(), OTelMetricsSourceConfig.class);

// When/Then
assertTrue(otelMetricsSourceConfig.hasHealthCheck());
assertTrue(otelMetricsSourceConfig.enableUnframedRequests());
assertTrue(otelMetricsSourceConfig.hasProtoReflectionService());
assertTrue(otelMetricsSourceConfig.enableHttpHealthCheck());
}

@Test
public void testHttpHealthCheckWithUnframedRequestDisabled() {
// Prepare
final Map<String, Object> settings = new HashMap<>();
settings.put(OTelMetricsSourceConfig.ENABLE_UNFRAMED_REQUESTS, "false");
settings.put(OTelMetricsSourceConfig.HEALTH_CHECK_SERVICE, "true");
settings.put(OTelMetricsSourceConfig.PROTO_REFLECTION_SERVICE, "true");

final PluginSetting pluginSetting = new PluginSetting(PLUGIN_NAME, settings);
final OTelMetricsSourceConfig otelMetricsSourceConfig = OBJECT_MAPPER.convertValue(pluginSetting.getSettings(), OTelMetricsSourceConfig.class);

// When/Then
assertTrue(otelMetricsSourceConfig.hasHealthCheck());
assertFalse(otelMetricsSourceConfig.enableUnframedRequests());
assertTrue(otelMetricsSourceConfig.hasProtoReflectionService());
assertFalse(otelMetricsSourceConfig.enableHttpHealthCheck());
}

@Test
public void testValidConfigWithoutS3CertAndKey() {
// Prepare
Expand All @@ -81,6 +118,7 @@ public void testValidConfigWithoutS3CertAndKey() {
assertEquals(TEST_MAX_CONNECTION_COUNT, otelMetricsSourceConfig.getMaxConnectionCount());
assertTrue(otelMetricsSourceConfig.hasHealthCheck());
assertTrue(otelMetricsSourceConfig.hasProtoReflectionService());
assertFalse(otelMetricsSourceConfig.enableHttpHealthCheck());
assertTrue(otelMetricsSourceConfig.isSsl());
assertFalse(otelMetricsSourceConfig.isSslCertAndKeyFileInS3());
assertEquals(TEST_KEY_CERT, otelMetricsSourceConfig.getSslKeyCertChainFile());
Expand Down Expand Up @@ -113,6 +151,7 @@ public void testValidConfigWithS3CertAndKey() {
assertEquals(TEST_THREAD_COUNT, otelMetricsSourceConfig.getThreadCount());
assertEquals(TEST_MAX_CONNECTION_COUNT, otelMetricsSourceConfig.getMaxConnectionCount());
assertFalse(otelMetricsSourceConfig.hasHealthCheck());
assertFalse(otelMetricsSourceConfig.enableHttpHealthCheck());
assertFalse(otelMetricsSourceConfig.hasProtoReflectionService());
assertTrue(otelMetricsSourceConfig.isSsl());
assertTrue(otelMetricsSourceConfig.isSslCertAndKeyFileInS3());
Expand Down
4 changes: 2 additions & 2 deletions data-prepper-plugins/otel-trace-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ source:
* port(Optional) => An `int` represents the port Otel trace source is running on. Default is ```21890```.
* request_timeout(Optional) => An `int` represents request timeout in millis. Default is ```10_000```.
* health_check_service(Optional) => A boolean enables a gRPC health check service under ```grpc.health.v1 / Health / Check```. Default is ```false```.
* health_check_service(Optional) => A boolean enables health check service. When ```true``` enables a gRPC health check service under ```grpc.health.v1.Health/Check```. Default is ```false```. In order to use the health check service, you must also enable ```proto_reflection_service```.
* proto_reflection_service(Optional) => A boolean enables a reflection service for Protobuf services (see [ProtoReflectionService](https://grpc.github.io/grpc-java/javadoc/io/grpc/protobuf/services/ProtoReflectionService.html) and [gRPC reflection](https://github.com/grpc/grpc-java/blob/master/documentation/server-reflection-tutorial.md) docs). Default is ```false```.
* unframed_requests(Optional) => A boolean to enable requests not framed using the gRPC wire protocol.
* unframed_requests(Optional) => A boolean to enable requests not framed using the gRPC wire protocol. When ```health_check_service``` is true and ```unframed_requests``` is true, enables HTTP health check service under ```/health```.
* thread_count(Optional) => the number of threads to keep in the ScheduledThreadPool. Default is `200`.
* max_connection_count(Optional) => the maximum allowed number of open connections. Default is `500`.
* authentication(Optional) => An authentication configuration. By default, this runs an unauthenticated server. See below for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.server.grpc.GrpcServiceBuilder;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.protobuf.services.ProtoReflectionService;
Expand All @@ -43,6 +44,7 @@
@DataPrepperPlugin(name = "otel_trace_source", pluginType = Source.class, pluginConfigurationType = OTelTraceSourceConfig.class)
public class OTelTraceSource implements Source<Record<Object>> {
private static final Logger LOG = LoggerFactory.getLogger(OTelTraceSource.class);
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
private final OTelTraceSourceConfig oTelTraceSourceConfig;
private Server server;
private final PluginMetrics pluginMetrics;
Expand Down Expand Up @@ -107,6 +109,10 @@ public void start(Buffer<Record<Object>> buffer) {
sb.disableServerHeader();
sb.service(grpcServiceBuilder.build());

if(oTelTraceSourceConfig.enableHttpHealthCheck()) {
sb.service(HTTP_HEALTH_CHECK_PATH, HealthCheckService.of());
}

final Optional<Function<? super HttpService, ? extends HttpService>> optionalHttpAuthenticationService =
authenticationProvider.getHttpAuthenticationService();
optionalHttpAuthenticationService.ifPresent(sb::decorator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ public boolean hasHealthCheck() {
return healthCheck;
}

public boolean enableHttpHealthCheck() {
return enableUnframedRequests() && hasHealthCheck();
}

public boolean hasProtoReflectionService() {
return protoReflectionService;
}
Expand Down
Loading

0 comments on commit 98941bf

Please sign in to comment.