Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Di server #846

Merged
merged 43 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
be3953c
Add Spring Core dependency
sbayer55 Dec 20, 2021
6d391b6
Added public constructor to DataPrepper Class
sbayer55 Dec 20, 2021
f637308
Added DataPrepper instance now constructed by Spring DI
sbayer55 Dec 20, 2021
ea86ade
Added DI based unit test for Data Prepper class
sbayer55 Dec 21, 2021
492dfd4
Create DataPrepperConfigurate in bean provider method
sbayer55 Dec 22, 2021
9fae630
Added DataPrepper unit tests
sbayer55 Dec 22, 2021
1d594a5
Decoupled system metrics from DataPrepper
sbayer55 Dec 22, 2021
bce2c26
Updated DP Test object scope
sbayer55 Dec 22, 2021
f4205e8
DataPrepperServerTest refactored to work with new DataPrepper constru…
sbayer55 Dec 23, 2021
8ce2584
Refactored DataPrepperConfigurationTests
sbayer55 Dec 23, 2021
68d73b1
Renamed DataPrepperConfigurationConfiguration to DataPrepperAppConfig…
sbayer55 Dec 23, 2021
5c1ff83
Remove debug messages
sbayer55 Jan 3, 2022
eab5623
Added unit tests
sbayer55 Jan 3, 2022
6cd7ded
Changed DataPrepperServer to Lazy bean
sbayer55 Jan 4, 2022
321d3eb
Fixed bug - never checked for logstash config
sbayer55 Jan 4, 2022
22be738
Added unit tests for command line args
sbayer55 Jan 4, 2022
d70ab4c
Removed unused import
sbayer55 Jan 4, 2022
2c4ad8e
Renamed test from foo
sbayer55 Jan 4, 2022
5f1a131
Added copywrite comments
sbayer55 Jan 5, 2022
6c54d84
Remove commented code
sbayer55 Jan 5, 2022
c5557b8
refactor test to assert against hard coded string
sbayer55 Jan 5, 2022
75e9e52
added final modifier to main class variables
sbayer55 Jan 5, 2022
9ef1060
Fixed misplaced Nullable
sbayer55 Jan 6, 2022
e6d8041
Add Spring Core dependency
sbayer55 Dec 20, 2021
85c84ae
Converted DataPrepperServer to use Spring Dependency Injection
sbayer55 Dec 23, 2021
835bee5
Added DataPrepperServer constructor test
sbayer55 Dec 23, 2021
71f13e8
remove unused swing dependency
sbayer55 Jan 5, 2022
ec61194
Added testing for new classes
sbayer55 Jan 5, 2022
eebc8c7
Added Server config unit test
sbayer55 Jan 6, 2022
7d8f1f6
Added DataPrepperServerConfiguration unit tests
sbayer55 Jan 6, 2022
a2ccce5
Added final modifier where possible
sbayer55 Jan 6, 2022
2b1ed15
Only create Cloud Watch Provider if configured
sbayer55 Jan 6, 2022
4614301
Changed when AWSCloudMeter are registered
sbayer55 Jan 7, 2022
ed97a93
Removed unused import
sbayer55 Jan 7, 2022
35729ff
Removed invalid unit test
sbayer55 Jan 7, 2022
70a8aa8
Removed unused import
sbayer55 Jan 7, 2022
73367ac
Added unit tests on ListPipelinesHandler
sbayer55 Jan 7, 2022
e2849a4
Add unit tests for PrometheusMetricsHandler
sbayer55 Jan 10, 2022
1b5aee4
Remove Optional types from Spring configuration files
sbayer55 Jan 18, 2022
2f116f5
Remove Optional types from Spring configuration files, split meter cr…
sbayer55 Jan 18, 2022
b4a4c11
Added CloudWatchMeterRegistryProvider bean factory method
sbayer55 Jan 18, 2022
6b5fc42
remove unused import
sbayer55 Jan 19, 2022
90d9e80
Removed duplicated call to withPluginSetting
sbayer55 Jan 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.amazon.dataprepper.parser.config;

import com.amazon.dataprepper.model.configuration.PluginModel;
import com.amazon.dataprepper.parser.model.DataPrepperConfiguration;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
Expand All @@ -29,7 +30,7 @@ public DataPrepperArgs dataPrepperArgs(final Environment environment) {
LOG.info("Command line args: {}", commandLineArgs);

if (commandLineArgs != null) {
String[] args = commandLineArgs.split(COMMAND_LINE_ARG_DELIMITER);
final String[] args = commandLineArgs.split(COMMAND_LINE_ARG_DELIMITER);
return new DataPrepperArgs(args);
}
else {
Expand All @@ -55,4 +56,9 @@ public DataPrepperConfiguration dataPrepperConfiguration(
return new DataPrepperConfiguration();
}
}

@Bean
public PluginModel authentication(final DataPrepperConfiguration dataPrepperConfiguration) {
return dataPrepperConfiguration.getAuthentication();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private static String checkForLogstashConfigurationAndConvert(String configurati
try {
configurationFileLocation = logstashConfigConverter.convertLogstashConfigurationToPipeline(
configurationFileLocation, String.valueOf(configurationDirectory));
} catch (IOException e) {
} catch (final IOException e) {
LOG.warn("Unable to read the Logstash configuration file", e);
throw new IllegalArgumentException("Invalid Logstash configuration file", e);
}
Expand All @@ -51,7 +51,7 @@ else if (args.length > MAXIMUM_SUPPORTED_NUMBER_OF_ARGS) {
"Data Prepper supports a maximum of " + MAXIMUM_SUPPORTED_NUMBER_OF_ARGS + " command line arguments");
}

String configurationFileLocation = args[DATA_PREPPER_PIPELINE_CONFIG_POSITON];
final String configurationFileLocation = args[DATA_PREPPER_PIPELINE_CONFIG_POSITON];
LOG.info("Using {} configuration file", configurationFileLocation);

this.pipelineConfigFileLocation = DataPrepperArgs.checkForLogstashConfigurationAndConvert(configurationFileLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,41 @@

import com.amazon.dataprepper.parser.model.DataPrepperConfiguration;
import com.amazon.dataprepper.parser.model.MetricRegistryType;
import com.amazon.dataprepper.pipeline.server.CloudWatchMeterRegistryProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.micrometer.cloudwatch2.CloudWatchMeterRegistry;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.core.exception.SdkClientException;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;

import static com.amazon.dataprepper.DataPrepper.getServiceNameForMetrics;
import static com.amazon.dataprepper.metrics.MetricNames.SERVICE_NAME;

@Configuration
public class MetricsConfig {
private static final Logger LOG = LoggerFactory.getLogger(MetricsConfig.class);
private static final String METRICS_CONTEXT_PREFIX = "/metrics";

@Bean
public YAMLFactory yamlFactory() {
return new YAMLFactory();
Expand Down Expand Up @@ -60,21 +77,80 @@ public JvmThreadMetrics jvmThreadMetrics() {
return new JvmThreadMetrics();
}

private void configureMetricRegistry(final MeterRegistry meterRegistry) {
meterRegistry.config()
.commonTags(Collections.singletonList(
Tag.of(SERVICE_NAME, getServiceNameForMetrics())
));

}

@Bean
public MeterRegistry prometheusMeterRegistry(final DataPrepperConfiguration dataPrepperConfiguration) {
if (dataPrepperConfiguration.getMetricRegistryTypes().contains(MetricRegistryType.Prometheus)) {
final PrometheusMeterRegistry meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
configureMetricRegistry(meterRegistry);

return meterRegistry;
}
else {
return null;
}
}

@Bean
public CloudWatchMeterRegistryProvider cloudWatchMeterRegistryProvider(
final DataPrepperConfiguration dataPrepperConfiguration
) {
if (dataPrepperConfiguration.getMetricRegistryTypes().contains(MetricRegistryType.CloudWatch)) {
return new CloudWatchMeterRegistryProvider();
}
else {
return null;
}
}

@Bean
public MeterRegistry cloudWatchMeterRegistry(
final DataPrepperConfiguration dataPrepperConfiguration,
@Autowired(required = false) @Nullable final CloudWatchMeterRegistryProvider cloudWatchMeterRegistryProvider
) {
if (dataPrepperConfiguration.getMetricRegistryTypes().contains(MetricRegistryType.CloudWatch)) {
if (cloudWatchMeterRegistryProvider == null) {
throw new IllegalStateException(
"configuration required configure cloudwatch meter registry but one could not be configured");
}

try {
final CloudWatchMeterRegistry meterRegistry = cloudWatchMeterRegistryProvider.getCloudWatchMeterRegistry();
configureMetricRegistry(meterRegistry);

return meterRegistry;
} catch (final SdkClientException e) {
LOG.warn("Unable to configure Cloud Watch Meter Registry but Meter Registry was requested in Data Prepper Configuration");
throw new RuntimeException("Unable to initialize Cloud Watch Meter Registry", e);
}
}
else {
return null;
}
}

@Bean
public CompositeMeterRegistry systemMeterRegistry(
final List<MeterBinder> meterBinders,
final DataPrepperConfiguration dataPrepperConfiguration
final List<MeterRegistry> meterRegistries
) {
final CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry();
final CompositeMeterRegistry compositeMeterRegistry = new CompositeMeterRegistry();

meterBinders.forEach(binder -> binder.bindTo(meterRegistry));
LOG.debug("{} Meter Binder beans registered.", meterBinders.size());
meterBinders.forEach(binder -> binder.bindTo(compositeMeterRegistry));

dataPrepperConfiguration.getMetricRegistryTypes().forEach(metricRegistryType -> {
MeterRegistry registryForType = MetricRegistryType.getDefaultMeterRegistryForType(metricRegistryType);
meterRegistry.add(registryForType);
Metrics.addRegistry(registryForType);
meterRegistries.forEach(meterRegistry -> {
compositeMeterRegistry.add(meterRegistry);
Metrics.addRegistry(meterRegistry);
});

return meterRegistry;
return compositeMeterRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,7 @@

package com.amazon.dataprepper.parser.model;

import com.amazon.dataprepper.pipeline.server.CloudWatchMeterRegistryProvider;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;

import java.util.Arrays;

import static com.amazon.dataprepper.DataPrepper.getServiceNameForMetrics;
import static com.amazon.dataprepper.metrics.MetricNames.SERVICE_NAME;
import static java.lang.String.format;

public enum MetricRegistryType {
Prometheus,
CloudWatch;

public static MeterRegistry getDefaultMeterRegistryForType(final MetricRegistryType metricRegistryType) {
MeterRegistry meterRegistry = null;
switch (metricRegistryType) {
case Prometheus:
meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
break;
case CloudWatch:
meterRegistry = new CloudWatchMeterRegistryProvider().getCloudWatchMeterRegistry();
break;
default:
throw new IllegalArgumentException(format("Invalid metricRegistryType %s", metricRegistryType));
}
meterRegistry.config().commonTags(Arrays.asList(Tag.of(SERVICE_NAME, getServiceNameForMetrics())));
return meterRegistry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,17 @@ public CloudWatchMeterRegistry getCloudWatchMeterRegistry() {
* Returns CloudWatchConfig using the properties from {@link #CLOUDWATCH_PROPERTIES}
*/
private CloudWatchConfig createCloudWatchConfig(final String cloudWatchPropertiesFilePath) {
CloudWatchConfig cloudWatchConfig = null;
try (final InputStream inputStream = requireNonNull(getClass().getClassLoader()
.getResourceAsStream(cloudWatchPropertiesFilePath))) {
final Properties cloudwatchProperties = new Properties();
cloudwatchProperties.load(inputStream);
cloudWatchConfig = new CloudWatchConfig() {
@Override
public String get(final String key) {
return cloudwatchProperties.getProperty(key);
}
};
} catch (IOException ex) {
return cloudwatchProperties::getProperty;
} catch (final IOException ex) {
LOG.error("Encountered exception in creating CloudWatchConfig for CloudWatchMeterRegistry, " +
"Proceeding without metrics", ex);

//If there is no registry attached, micrometer will make NoopMeters which are discarded.
return null;
}
return cloudWatchConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,12 @@

package com.amazon.dataprepper.pipeline.server;

import com.amazon.dataprepper.DataPrepper;
import com.amazon.dataprepper.model.configuration.PluginModel;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.plugin.PluginFactory;
import com.amazon.dataprepper.parser.model.DataPrepperConfiguration;
import com.sun.net.httpserver.Authenticator;
import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsParameters;
import com.sun.net.httpserver.HttpsServer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Named;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;



Expand All @@ -45,70 +25,9 @@ public class DataPrepperServer {

@Inject
public DataPrepperServer(
final DataPrepperConfiguration dataPrepperConfiguration,
final PluginFactory pluginFactory,
final DataPrepper dataPrepper,
final CompositeMeterRegistry systemMeterRegistry
final HttpServer server
) {
final int port = dataPrepperConfiguration.getServerPort();
final boolean ssl = dataPrepperConfiguration.ssl();
final String keyStoreFilePath = dataPrepperConfiguration.getKeyStoreFilePath();
final String keyStorePassword = dataPrepperConfiguration.getKeyStorePassword();
final String privateKeyPassword = dataPrepperConfiguration.getPrivateKeyPassword();

final PluginModel authenticationConfiguration = dataPrepperConfiguration.getAuthentication();
final PluginSetting authenticationPluginSetting;

if (authenticationConfiguration == null || authenticationConfiguration.getPluginName().equals(DataPrepperCoreAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME)) {
LOG.warn("Creating data prepper server without authentication. This is not secure.");
LOG.warn("In order to set up Http Basic authentication for the data prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/core_apis.md#authentication");
}

if(authenticationConfiguration != null) {
authenticationPluginSetting =
new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings());
} else {
authenticationPluginSetting =
new PluginSetting(DataPrepperCoreAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap());
}

final DataPrepperCoreAuthenticationProvider authenticationProvider = pluginFactory.loadPlugin(DataPrepperCoreAuthenticationProvider.class, authenticationPluginSetting);
final Authenticator authenticator = authenticationProvider.getAuthenticator();

try {
if (ssl) {
LOG.info("Creating Data Prepper server with TLS");
this.server = createHttpsServer(port, keyStoreFilePath, keyStorePassword, privateKeyPassword);
} else {
LOG.warn("Creating Data Prepper server without TLS. This is not secure.");
LOG.warn("In order to set up TLS for the Data Prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/configuration.md#server-configuration");
sbayer55 marked this conversation as resolved.
Show resolved Hide resolved
this.server = createHttpServer(port);
}
} catch (final IOException ex) {
throw new RuntimeException("Failed to create server", ex);
}

getPrometheusMeterRegistryFromRegistries(Metrics.globalRegistry.getRegistries()).ifPresent(meterRegistry -> {
final PrometheusMeterRegistry prometheusMeterRegistryForDataPrepper = (PrometheusMeterRegistry) meterRegistry;
server.createContext("/metrics/prometheus", new PrometheusMetricsHandler(prometheusMeterRegistryForDataPrepper))
.setAuthenticator(authenticator);
});

getPrometheusMeterRegistryFromRegistries(systemMeterRegistry.getRegistries()).ifPresent(
meterRegistry -> {
final PrometheusMeterRegistry prometheusMeterRegistryForSystem = (PrometheusMeterRegistry) meterRegistry;
server.createContext("/metrics/sys", new PrometheusMetricsHandler(prometheusMeterRegistryForSystem))
.setAuthenticator(authenticator);
});
server.createContext("/list", new ListPipelinesHandler(dataPrepper))
.setAuthenticator(authenticator);
server.createContext("/shutdown", new ShutdownHandler(dataPrepper))
.setAuthenticator(authenticator);
}

private Optional<MeterRegistry> getPrometheusMeterRegistryFromRegistries(final Set<MeterRegistry> meterRegistries) {
return meterRegistries.stream().filter(meterRegistry ->
meterRegistry instanceof PrometheusMeterRegistry).findFirst();
this.server = server;
}

/**
Expand All @@ -117,7 +36,6 @@ private Optional<MeterRegistry> getPrometheusMeterRegistryFromRegistries(final S
public void start() {
server.start();
LOG.info("Data Prepper server running at :{}", server.getAddress().getPort());

}

/**
Expand All @@ -127,26 +45,4 @@ public void stop() {
server.stop(0);
LOG.info("Data Prepper server stopped");
}

private HttpServer createHttpServer(final int port) throws IOException {
return HttpServer.create(new InetSocketAddress(port), 0);
}

private HttpServer createHttpsServer(final int port,
final String keyStoreFilePath,
final String keyStorePassword,
final String privateKeyPassword) throws IOException {
final SSLContext sslContext = SslUtil.createSslContext(keyStoreFilePath, keyStorePassword, privateKeyPassword);

final HttpsServer server = HttpsServer.create(new InetSocketAddress(port), 0);
server.setHttpsConfigurator(new HttpsConfigurator(sslContext) {
public void configure(HttpsParameters params) {
SSLContext context = getSSLContext();
SSLParameters sslparams = context.getDefaultSSLParameters();
params.setSSLParameters(sslparams);
}
});

return server;
}
}
Loading