, Object> newPluginFunction;
+
+ /**
+ * Please do not call this method. Only the DefaultPluginFactory should call it.
+ *
+ * This exists so that this class can still exhibit the correct behavior when creating new plugin
+ * classes. This whole class is going to be deleted in the next major version, as will this method.
+ */
+ public static void dangerousMethod_setPluginFunction(final BiFunction, Object> newPluginFunction) {
+ PluginFactory.newPluginFunction = newPluginFunction;
+ }
public static Object newPlugin(final PluginSetting pluginSetting, final Class> clazz) {
+ if(newPluginFunction != null) {
+ try {
+ return newPluginFunction.apply(pluginSetting, clazz);
+ } catch (final InvalidPluginDefinitionException | PluginInvocationException ex) {
+ throw new PluginInvocationException("Failed to create instance of new plugin.", ex);
+ }
+ }
+ return defaultFunction(pluginSetting, clazz);
+ }
+
+ private static Object defaultFunction(final PluginSetting pluginSetting, final Class> clazz) {
if (clazz == null) {
LOG.error("Failed to find the plugin with name {}. " +
"Please ensure that plugin is annotated with appropriate values", pluginSetting.getName());
diff --git a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/prepper/NoOpPrepper.java b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/prepper/NoOpPrepper.java
index 5be7f0651f..8160c31a6d 100644
--- a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/prepper/NoOpPrepper.java
+++ b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/prepper/NoOpPrepper.java
@@ -12,7 +12,6 @@
package com.amazon.dataprepper.plugins.prepper;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
-import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.prepper.Prepper;
import com.amazon.dataprepper.model.record.Record;
@@ -21,24 +20,12 @@
@DataPrepperPlugin(name = "no-op", pluginType = Prepper.class)
public class NoOpPrepper> implements Prepper {
- /**
- * Mandatory constructor for Data Prepper Component - This constructor is used by Data Prepper
- * runtime engine to construct an instance of {@link NoOpPrepper} using an instance of {@link PluginSetting} which
- * has access to pluginSetting metadata from pipeline
- * pluginSetting file.
- *
- * @param pluginSetting instance with metadata information from pipeline pluginSetting file.
- */
- public NoOpPrepper(final PluginSetting pluginSetting) {
- //no op
- }
-
public NoOpPrepper() {
}
@Override
- public Collection execute(Collection records) {
+ public Collection execute(final Collection records) {
return records;
}
diff --git a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/prepper/StringPrepper.java b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/prepper/StringPrepper.java
index 54fc917909..2564a7b0a7 100644
--- a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/prepper/StringPrepper.java
+++ b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/prepper/StringPrepper.java
@@ -12,6 +12,7 @@
package com.amazon.dataprepper.plugins.prepper;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
+import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.prepper.Prepper;
import com.amazon.dataprepper.model.record.Record;
@@ -50,6 +51,7 @@ public void setUpperCase(final boolean upperCase) {
*
* @param configuration instance with metadata information from pipeline pluginSetting file.
*/
+ @DataPrepperPluginConstructor
public StringPrepper(final Configuration configuration) {
this.upperCase = configuration.getUpperCase();
}
diff --git a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/RandomStringSource.java b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/RandomStringSource.java
index a46befa5ba..eeeb8c4194 100644
--- a/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/RandomStringSource.java
+++ b/data-prepper-plugins/common/src/main/java/com/amazon/dataprepper/plugins/source/RandomStringSource.java
@@ -13,7 +13,6 @@
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.Buffer;
-import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -32,15 +31,11 @@
@DataPrepperPlugin(name = "random", pluginType = Source.class)
public class RandomStringSource implements Source> {
- private static Logger LOG = LoggerFactory.getLogger(RandomStringSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(RandomStringSource.class);
private ExecutorService executorService;
private boolean stop = false;
- public RandomStringSource(final PluginSetting pluginSetting) {
-
- }
-
private void setExecutorService() {
if(executorService == null || executorService.isShutdown()) {
executorService = Executors.newSingleThreadExecutor(
@@ -50,7 +45,7 @@ private void setExecutorService() {
}
@Override
- public void start(Buffer> buffer) {
+ public void start(final Buffer> buffer) {
setExecutorService();
executorService.execute(() -> {
while (!stop) {
@@ -58,9 +53,9 @@ public void start(Buffer> buffer) {
LOG.info("Writing to buffer");
buffer.write(new Record<>(UUID.randomUUID().toString()), 500);
Thread.sleep(500);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
break;
- } catch (TimeoutException e) {
+ } catch (final TimeoutException e) {
// Do nothing
}
}
@@ -75,7 +70,7 @@ public void stop() {
if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
- } catch (InterruptedException ex) {
+ } catch (final InterruptedException ex) {
executorService.shutdownNow();
}
}
diff --git a/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/prepper/PrepperFactoryTests.java b/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/prepper/PrepperFactoryTests.java
index 336281fdd0..454d3b199c 100644
--- a/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/prepper/PrepperFactoryTests.java
+++ b/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/prepper/PrepperFactoryTests.java
@@ -15,7 +15,8 @@
import com.amazon.dataprepper.model.prepper.Prepper;
import com.amazon.dataprepper.plugins.PluginException;
import com.amazon.dataprepper.plugins.sink.SinkFactory;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
@@ -31,11 +32,19 @@
@SuppressWarnings("rawtypes")
public class PrepperFactoryTests {
private static String TEST_PIPELINE = "test-pipeline";
+
+ @AfterEach
+ void cleanUp() {
+ PrepperFactory.dangerousMethod_setPluginFunction(null);
+ }
+
/**
* Tests if PrepperFactory is able to retrieve default Source plugins by name
*/
@Test
public void testNewSingletonPrepperClassByNameThatExists() {
+ PrepperFactory.dangerousMethod_setPluginFunction((s, c) -> new NoOpPrepper<>());
+
final PluginSetting noOpPrepperConfiguration = new PluginSetting("no-op", new HashMap<>());
noOpPrepperConfiguration.setPipelineName(TEST_PIPELINE);
final List actualPrepperSets = PrepperFactory.newPreppers(noOpPrepperConfiguration);
diff --git a/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/RandomStringSourceTests.java b/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/RandomStringSourceTests.java
index 292b271109..7c648ad20c 100644
--- a/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/RandomStringSourceTests.java
+++ b/data-prepper-plugins/common/src/test/java/com/amazon/dataprepper/plugins/source/RandomStringSourceTests.java
@@ -11,21 +11,20 @@
package com.amazon.dataprepper.plugins.source;
-import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.plugins.buffer.TestBuffer;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.Queue;
import org.junit.Assert;
import org.junit.Test;
+import java.util.LinkedList;
+import java.util.Queue;
+
public class RandomStringSourceTests {
@Test
public void testPutRecord() throws InterruptedException {
final RandomStringSource randomStringSource =
- new RandomStringSource(new PluginSetting("random", Collections.emptyMap()));
+ new RandomStringSource();
final Queue> bufferQueue = new LinkedList<>();
final TestBuffer buffer = new TestBuffer(bufferQueue, 1);
//Start source, and sleep for 100 millis
diff --git a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java
index 03cfeef130..5ece991db2 100644
--- a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java
+++ b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java
@@ -13,8 +13,8 @@
import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
+import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.buffer.Buffer;
-import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.model.source.Source;
import com.amazon.dataprepper.plugins.certificate.CertificateProvider;
@@ -32,7 +32,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-@DataPrepperPlugin(name = "http", pluginType = Source.class)
+@DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class)
public class HTTPSource implements Source> {
private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
@@ -41,14 +41,17 @@ public class HTTPSource implements Source> {
private Server server;
private final PluginMetrics pluginMetrics;
- public HTTPSource(final PluginSetting pluginSetting) {
- sourceConfig = HTTPSourceConfig.buildConfig(pluginSetting);
+ @DataPrepperPluginConstructor
+ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics) {
+ // TODO: Remove once JSR-303 validation is available.
+ sourceConfig.validate();
+ this.sourceConfig = sourceConfig;
+ this.pluginMetrics = pluginMetrics;
certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
- pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
}
@Override
- public void start(Buffer> buffer) {
+ public void start(final Buffer> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
diff --git a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java
index 3ebc15f071..a118f7435f 100644
--- a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java
+++ b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java
@@ -11,7 +11,7 @@
package com.amazon.dataprepper.plugins.source.loghttp;
-import com.amazon.dataprepper.model.configuration.PluginSetting;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.micrometer.core.instrument.util.StringUtils;
@@ -19,41 +19,45 @@
import java.nio.file.Path;
public class HTTPSourceConfig {
- static final String PORT = "port";
- static final String REQUEST_TIMEOUT = "request_timeout";
- static final String THREAD_COUNT = "thread_count";
- static final String MAX_CONNECTION_COUNT = "max_connection_count";
- static final String MAX_PENDING_REQUESTS = "max_pending_requests";
static final String DEFAULT_LOG_INGEST_URI = "/log/ingest";
static final String SSL = "ssl";
static final String SSL_CERTIFICATE_FILE = "ssl_certificate_file";
static final String SSL_KEY_FILE = "ssl_key_file";
- static final String SSL_KEY_PASSWORD = "ssl_key_password";
static final int DEFAULT_PORT = 2021;
static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000;
static final int DEFAULT_THREAD_COUNT = 200;
static final int DEFAULT_MAX_CONNECTION_COUNT = 500;
static final int DEFAULT_MAX_PENDING_REQUESTS = 1024;
- private final int port;
- private final int requestTimeoutInMillis;
- private final int threadCount;
- private final int maxConnectionCount;
- private final int maxPendingRequests;
- private final boolean ssl;
- private final String sslCertificateFile;
- private final String sslKeyFile;
- private final String sslKeyPassword;
-
- private HTTPSourceConfig(final int port,
- final int requestTimeoutInMillis,
- final int threadCount,
- final int maxConnectionCount,
- final int maxPendingRequests,
- final boolean ssl,
- final String sslCertificateFile,
- final String sslKeyFile,
- final String sslKeyPassword) {
+ @JsonProperty("port")
+ private int port = DEFAULT_PORT;
+
+ @JsonProperty("request_timeout")
+ private int requestTimeoutInMillis = DEFAULT_REQUEST_TIMEOUT_MS;
+
+ @JsonProperty("thread_count")
+ private int threadCount = DEFAULT_THREAD_COUNT;
+
+ @JsonProperty("max_connection_count")
+ private int maxConnectionCount = DEFAULT_MAX_CONNECTION_COUNT;
+
+ @JsonProperty("max_pending_requests")
+ private int maxPendingRequests = DEFAULT_MAX_PENDING_REQUESTS;
+
+ @JsonProperty(SSL)
+ private boolean ssl;
+
+ @JsonProperty(SSL_CERTIFICATE_FILE)
+ private String sslCertificateFile;
+
+ @JsonProperty(SSL_KEY_FILE)
+ private String sslKeyFile;
+
+ @JsonProperty("ssl_key_password")
+ private String sslKeyPassword;
+
+ // TODO: Remove once JSR-303 validation is available
+ void validate() {
Preconditions.checkArgument(port >= 0 && port < 65535, "port must be between 0 and 65535.");
Preconditions.checkArgument(requestTimeoutInMillis > 0, "request_timeout must be greater than 0.");
Preconditions.checkArgument(threadCount > 0, "thread_count must be greater than 0.");
@@ -63,29 +67,7 @@ private HTTPSourceConfig(final int port,
validateFilePath(String.format("%s is enabled", SSL), sslCertificateFile, SSL_CERTIFICATE_FILE);
validateFilePath(String.format("%s is enabled", SSL), sslKeyFile, SSL_KEY_FILE);
}
- this.port = port;
- this.requestTimeoutInMillis = requestTimeoutInMillis;
- this.threadCount = threadCount;
- this.maxConnectionCount = maxConnectionCount;
- this.maxPendingRequests = maxPendingRequests;
- this.ssl = ssl;
- this.sslCertificateFile = sslCertificateFile;
- this.sslKeyFile = sslKeyFile;
- this.sslKeyPassword = sslKeyPassword;
- }
- public static HTTPSourceConfig buildConfig(final PluginSetting pluginSetting) {
- return new HTTPSourceConfig(
- pluginSetting.getIntegerOrDefault(PORT, DEFAULT_PORT),
- pluginSetting.getIntegerOrDefault(REQUEST_TIMEOUT, DEFAULT_REQUEST_TIMEOUT_MS),
- pluginSetting.getIntegerOrDefault(THREAD_COUNT, DEFAULT_THREAD_COUNT),
- pluginSetting.getIntegerOrDefault(MAX_CONNECTION_COUNT, DEFAULT_MAX_CONNECTION_COUNT),
- pluginSetting.getIntegerOrDefault(MAX_PENDING_REQUESTS, DEFAULT_MAX_PENDING_REQUESTS),
- pluginSetting.getBooleanOrDefault(SSL, false),
- pluginSetting.getStringOrDefault(SSL_CERTIFICATE_FILE, null),
- pluginSetting.getStringOrDefault(SSL_KEY_FILE, null),
- pluginSetting.getStringOrDefault(SSL_KEY_PASSWORD, null)
- );
}
private void validateFilePath(final String typeMessage, final String argument, final String argumentName) {
diff --git a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java
index 7ec02ed4f8..b63c1cf3a2 100644
--- a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java
+++ b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java
@@ -11,33 +11,15 @@
package com.amazon.dataprepper.plugins.source.loghttp;
-import com.amazon.dataprepper.model.configuration.PluginSetting;
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
-import java.util.Map;
-
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
public class HTTPSourceConfigTest {
- private static final String PLUGIN_NAME = "http";
- private static final int TEST_PORT = 45600;
- private static final int TEST_REQUEST_TIMEOUT_MS = 777;
- private static final int TEST_THREAD_COUNT = 888;
- private static final int TEST_MAX_CONNECTION_COUNT = 999;
- private static final int TEST_MAX_PENDING_REQUESTS = 666;
- private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile();
- private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile();
-
@Test
public void testDefault() {
// Prepare
- final HTTPSourceConfig sourceConfig = HTTPSourceConfig.buildConfig(
- new PluginSetting(PLUGIN_NAME, new HashMap<>()));
+ final HTTPSourceConfig sourceConfig = new HTTPSourceConfig();
// When/Then
assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getPort());
@@ -46,194 +28,4 @@ public void testDefault() {
assertEquals(HTTPSourceConfig.DEFAULT_MAX_CONNECTION_COUNT, sourceConfig.getMaxConnectionCount());
assertEquals(HTTPSourceConfig.DEFAULT_MAX_PENDING_REQUESTS, sourceConfig.getMaxPendingRequests());
}
-
- @Test
- public void testValidConfigSSLDisabled() {
- // Prepare
- final PluginSetting pluginSetting = completePluginSettingForLogHTTPSource(
- TEST_PORT,
- TEST_REQUEST_TIMEOUT_MS,
- TEST_THREAD_COUNT,
- TEST_MAX_CONNECTION_COUNT,
- TEST_MAX_PENDING_REQUESTS,
- false,
- null,
- null,
- null
- );
- final HTTPSourceConfig sourceConfig = HTTPSourceConfig.buildConfig(pluginSetting);
-
- // When/Then
- assertEquals(TEST_PORT, sourceConfig.getPort());
- assertEquals(TEST_REQUEST_TIMEOUT_MS, sourceConfig.getRequestTimeoutInMillis());
- assertEquals(TEST_THREAD_COUNT, sourceConfig.getThreadCount());
- assertEquals(TEST_MAX_CONNECTION_COUNT, sourceConfig.getMaxConnectionCount());
- assertEquals(TEST_MAX_PENDING_REQUESTS, sourceConfig.getMaxPendingRequests());
- assertFalse(sourceConfig.isSsl());
- assertNull(sourceConfig.getSslCertificateFile());
- assertNull(sourceConfig.getSslKeyFile());
- assertNull(sourceConfig.getSslKeyPassword());
- }
-
- @Test
- public void testValidConfigSSLEnabled() {
- // Prepare
- final PluginSetting pluginSetting = completePluginSettingForLogHTTPSource(
- TEST_PORT,
- TEST_REQUEST_TIMEOUT_MS,
- TEST_THREAD_COUNT,
- TEST_MAX_CONNECTION_COUNT,
- TEST_MAX_PENDING_REQUESTS,
- true,
- TEST_SSL_CERTIFICATE_FILE,
- TEST_SSL_KEY_FILE,
- null
- );
- final HTTPSourceConfig sourceConfig = HTTPSourceConfig.buildConfig(pluginSetting);
-
- // When/Then
- assertEquals(TEST_PORT, sourceConfig.getPort());
- assertEquals(TEST_REQUEST_TIMEOUT_MS, sourceConfig.getRequestTimeoutInMillis());
- assertEquals(TEST_THREAD_COUNT, sourceConfig.getThreadCount());
- assertEquals(TEST_MAX_CONNECTION_COUNT, sourceConfig.getMaxConnectionCount());
- assertEquals(TEST_MAX_PENDING_REQUESTS, sourceConfig.getMaxPendingRequests());
- assertTrue(sourceConfig.isSsl());
- assertEquals(TEST_SSL_CERTIFICATE_FILE, sourceConfig.getSslCertificateFile());
- assertEquals(TEST_SSL_KEY_FILE, sourceConfig.getSslKeyFile());
- assertNull(sourceConfig.getSslKeyPassword());
- }
-
- @Test
- public void testInvalidPort() {
- final PluginSetting invalidPluginSetting = completePluginSettingForLogHTTPSource(
- 65536,
- TEST_REQUEST_TIMEOUT_MS,
- TEST_THREAD_COUNT,
- TEST_MAX_CONNECTION_COUNT,
- TEST_MAX_PENDING_REQUESTS,
- false,
- null,
- null,
- null
- );
- assertThrows(IllegalArgumentException.class, () -> HTTPSourceConfig.buildConfig(invalidPluginSetting));
- }
-
- @Test
- public void testInvalidRequestTimeout() {
- final PluginSetting invalidPluginSetting = completePluginSettingForLogHTTPSource(
- TEST_PORT,
- -1,
- TEST_THREAD_COUNT,
- TEST_MAX_CONNECTION_COUNT,
- TEST_MAX_PENDING_REQUESTS,
- false,
- null,
- null,
- null
- );
- assertThrows(IllegalArgumentException.class, () -> HTTPSourceConfig.buildConfig(invalidPluginSetting));
- }
-
- @Test
- public void testInvalidThreadCount() {
- final PluginSetting invalidPluginSetting = completePluginSettingForLogHTTPSource(
- TEST_PORT,
- TEST_REQUEST_TIMEOUT_MS,
- 0,
- TEST_MAX_CONNECTION_COUNT,
- TEST_MAX_PENDING_REQUESTS,
- false,
- null,
- null,
- null
- );
- assertThrows(IllegalArgumentException.class, () -> HTTPSourceConfig.buildConfig(invalidPluginSetting));
- }
-
- @Test
- public void testInvalidMaxConnectionCount() {
- final PluginSetting invalidPluginSetting = completePluginSettingForLogHTTPSource(
- TEST_PORT,
- TEST_REQUEST_TIMEOUT_MS,
- TEST_THREAD_COUNT,
- 0,
- TEST_MAX_PENDING_REQUESTS,
- false,
- null,
- null,
- null
- );
- assertThrows(IllegalArgumentException.class, () -> HTTPSourceConfig.buildConfig(invalidPluginSetting));
- }
-
- @Test
- public void testInvalidMaxPendingRequests() {
- final PluginSetting invalidPluginSetting = completePluginSettingForLogHTTPSource(
- TEST_PORT,
- TEST_REQUEST_TIMEOUT_MS,
- TEST_THREAD_COUNT,
- TEST_MAX_CONNECTION_COUNT,
- 0,
- false,
- null,
- null,
- null
- );
- assertThrows(IllegalArgumentException.class, () -> HTTPSourceConfig.buildConfig(invalidPluginSetting));
- }
-
- @Test
- public void testInvalidSslCert() {
- final PluginSetting invalidPluginSetting = completePluginSettingForLogHTTPSource(
- TEST_PORT,
- TEST_REQUEST_TIMEOUT_MS,
- TEST_THREAD_COUNT,
- TEST_MAX_CONNECTION_COUNT,
- 0,
- true,
- "invalid path",
- TEST_SSL_KEY_FILE,
- null
- );
- assertThrows(IllegalArgumentException.class, () -> HTTPSourceConfig.buildConfig(invalidPluginSetting));
- }
-
- @Test
- public void testInvalidSslKey() {
- final PluginSetting invalidPluginSetting = completePluginSettingForLogHTTPSource(
- TEST_PORT,
- TEST_REQUEST_TIMEOUT_MS,
- TEST_THREAD_COUNT,
- TEST_MAX_CONNECTION_COUNT,
- 0,
- true,
- TEST_SSL_CERTIFICATE_FILE,
- "invalid path",
- null
- );
- assertThrows(IllegalArgumentException.class, () -> HTTPSourceConfig.buildConfig(invalidPluginSetting));
- }
-
- private PluginSetting completePluginSettingForLogHTTPSource(final int port,
- final int requestTimeoutInMillis,
- final int threadCount,
- final int maxConnectionCount,
- final int maxPendingRequests,
- final boolean ssl,
- final String sslCertificateFile,
- final String sslKeyFile,
- final String sslKeyPassword) {
- final Map settings = new HashMap<>();
- settings.put(HTTPSourceConfig.PORT, port);
- settings.put(HTTPSourceConfig.REQUEST_TIMEOUT, requestTimeoutInMillis);
- settings.put(HTTPSourceConfig.THREAD_COUNT, threadCount);
- settings.put(HTTPSourceConfig.MAX_CONNECTION_COUNT, maxConnectionCount);
- settings.put(HTTPSourceConfig.MAX_PENDING_REQUESTS, maxPendingRequests);
- settings.put(HTTPSourceConfig.SSL, ssl);
- settings.put(HTTPSourceConfig.SSL_CERTIFICATE_FILE, sslCertificateFile);
- settings.put(HTTPSourceConfig.SSL_KEY_FILE, sslKeyFile);
- settings.put(HTTPSourceConfig.SSL_KEY_PASSWORD, sslKeyPassword);
- return new PluginSetting(PLUGIN_NAME, settings);
- }
}
\ No newline at end of file
diff --git a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java
index 2b743b930d..43fb61f9f7 100644
--- a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java
+++ b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java
@@ -13,11 +13,12 @@
import com.amazon.dataprepper.metrics.MetricNames;
import com.amazon.dataprepper.metrics.MetricsTestUtil;
+import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
-import com.linecorp.armeria.client.ResponseTimeoutException;
import com.linecorp.armeria.client.ClientFactory;
+import com.linecorp.armeria.client.ResponseTimeoutException;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpMethod;
@@ -46,7 +47,6 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,6 +59,8 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -87,7 +89,6 @@ class HTTPSourceTest {
@Mock
private CompletableFuture completableFuture;
- private PluginSetting testPluginSetting;
private BlockingBuffer> testBuffer;
private HTTPSource HTTPSourceUnderTest;
private List requestsReceivedMeasurements;
@@ -98,6 +99,8 @@ class HTTPSourceTest {
private List rejectedRequestsMeasurements;
private List requestProcessDurationMeasurements;
private List payloadSizeSummaryMeasurements;
+ private HTTPSourceConfig sourceConfig;
+ private PluginMetrics pluginMetrics;
private BlockingBuffer> getBuffer() {
final HashMap integerHashMap = new HashMap<>();
@@ -149,12 +152,18 @@ public void setUp() {
lenient().when(serverBuilder.build()).thenReturn(server);
lenient().when(server.start()).thenReturn(completableFuture);
+ sourceConfig = mock(HTTPSourceConfig.class);
+ lenient().when(sourceConfig.getPort()).thenReturn(2021);
+ lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(10_000);
+ lenient().when(sourceConfig.getThreadCount()).thenReturn(200);
+ lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(500);
+ lenient().when(sourceConfig.getMaxPendingRequests()).thenReturn(1024);
+
MetricsTestUtil.initMetrics();
- testPluginSetting = new PluginSetting(PLUGIN_NAME, new HashMap<>()) {{
- setPipelineName(TEST_PIPELINE_NAME);
- }};
+ pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME);
+
testBuffer = getBuffer();
- HTTPSourceUnderTest = new HTTPSource(testPluginSetting);
+ HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics);
}
@AfterEach
@@ -279,18 +288,15 @@ public void testHTTPJsonResponse413() throws InterruptedException {
}
@Test
- public void testHTTPJsonResponse415() throws InterruptedException {
+ public void testHTTPJsonResponse415() {
// Prepare
- final Map settings = new HashMap<>();
final int testMaxPendingRequests = 1;
final int testThreadCount = 1;
final int serverTimeoutInMillis = 500;
- settings.put(HTTPSourceConfig.REQUEST_TIMEOUT, serverTimeoutInMillis);
- settings.put(HTTPSourceConfig.MAX_PENDING_REQUESTS, testMaxPendingRequests);
- settings.put(HTTPSourceConfig.THREAD_COUNT, testThreadCount);
- testPluginSetting = new PluginSetting(PLUGIN_NAME, settings);
- testPluginSetting.setPipelineName(TEST_PIPELINE_NAME);
- HTTPSourceUnderTest = new HTTPSource(testPluginSetting);
+ when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(serverTimeoutInMillis);
+ when(sourceConfig.getMaxPendingRequests()).thenReturn(testMaxPendingRequests);
+ when(sourceConfig.getThreadCount()).thenReturn(testThreadCount);
+ HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics);
// Start the source
HTTPSourceUnderTest.start(testBuffer);
refreshMeasurements();
@@ -338,12 +344,10 @@ public void testHTTPJsonResponse429() throws InterruptedException {
final int testThreadCount = 1;
final int clientTimeoutInMillis = 100;
final int serverTimeoutInMillis = (testMaxPendingRequests + testThreadCount + 1) * clientTimeoutInMillis;
- settings.put(HTTPSourceConfig.REQUEST_TIMEOUT, serverTimeoutInMillis);
- settings.put(HTTPSourceConfig.MAX_PENDING_REQUESTS, testMaxPendingRequests);
- settings.put(HTTPSourceConfig.THREAD_COUNT, testThreadCount);
- testPluginSetting = new PluginSetting(PLUGIN_NAME, settings);
- testPluginSetting.setPipelineName(TEST_PIPELINE_NAME);
- HTTPSourceUnderTest = new HTTPSource(testPluginSetting);
+ when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(serverTimeoutInMillis);
+ when(sourceConfig.getMaxPendingRequests()).thenReturn(testMaxPendingRequests);
+ when(sourceConfig.getThreadCount()).thenReturn(testThreadCount);
+ HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics);
// Start the source
HTTPSourceUnderTest.start(testBuffer);
refreshMeasurements();
@@ -401,14 +405,10 @@ public void testServerStartCertFileSuccess() throws IOException {
final String certAsString = Files.readString(certFilePath);
final String keyAsString = Files.readString(keyFilePath);
- final Map settingsMap = new HashMap<>();
- settingsMap.put(HTTPSourceConfig.SSL, true);
- settingsMap.put(HTTPSourceConfig.SSL_CERTIFICATE_FILE, TEST_SSL_CERTIFICATE_FILE);
- settingsMap.put(HTTPSourceConfig.SSL_KEY_FILE, TEST_SSL_KEY_FILE);
-
- testPluginSetting = new PluginSetting(PLUGIN_NAME, settingsMap);
- testPluginSetting.setPipelineName(TEST_PIPELINE_NAME);
- HTTPSourceUnderTest = new HTTPSource(testPluginSetting);
+ when(sourceConfig.isSsl()).thenReturn(true);
+ when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE);
+ when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE);
+ HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics);
HTTPSourceUnderTest.start(testBuffer);
HTTPSourceUnderTest.stop();
@@ -424,14 +424,16 @@ public void testServerStartCertFileSuccess() throws IOException {
@Test
void testHTTPSJsonResponse() {
- final Map settingsMap = new HashMap<>();
- settingsMap.put(HTTPSourceConfig.REQUEST_TIMEOUT, 200);
- settingsMap.put(HTTPSourceConfig.SSL, true);
- settingsMap.put(HTTPSourceConfig.SSL_CERTIFICATE_FILE, TEST_SSL_CERTIFICATE_FILE);
- settingsMap.put(HTTPSourceConfig.SSL_KEY_FILE, TEST_SSL_KEY_FILE);
- testPluginSetting = new PluginSetting(PLUGIN_NAME, settingsMap);
- testPluginSetting.setPipelineName(TEST_PIPELINE_NAME);
- HTTPSourceUnderTest = new HTTPSource(testPluginSetting);
+ reset(sourceConfig);
+ when(sourceConfig.getPort()).thenReturn(2021);
+ when(sourceConfig.getThreadCount()).thenReturn(200);
+ when(sourceConfig.getMaxConnectionCount()).thenReturn(500);
+ when(sourceConfig.getMaxPendingRequests()).thenReturn(1024);
+ when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(200);
+ when(sourceConfig.isSsl()).thenReturn(true);
+ when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE);
+ when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE);
+ HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics);
testBuffer = getBuffer();
HTTPSourceUnderTest.start(testBuffer);
@@ -458,16 +460,14 @@ public void testDoubleStart() {
@Test
public void testStartWithEmptyBuffer() {
- testPluginSetting = new PluginSetting(PLUGIN_NAME, Collections.emptyMap());
- testPluginSetting.setPipelineName(TEST_PIPELINE_NAME);
- final HTTPSource source = new HTTPSource(testPluginSetting);
+ final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics);
Assertions.assertThrows(IllegalStateException.class, () -> source.start(null));
}
@Test
public void testStartWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException {
// Prepare
- final HTTPSource source = new HTTPSource(testPluginSetting);
+ final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics);
try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) {
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
when(completableFuture.get()).thenThrow(new ExecutionException("", null));
@@ -480,7 +480,7 @@ public void testStartWithServerExecutionExceptionNoCause() throws ExecutionExcep
@Test
public void testStartWithServerExecutionExceptionWithCause() throws ExecutionException, InterruptedException {
// Prepare
- final HTTPSource source = new HTTPSource(testPluginSetting);
+ final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics);
try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) {
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
final NullPointerException expCause = new NullPointerException();
@@ -495,7 +495,7 @@ public void testStartWithServerExecutionExceptionWithCause() throws ExecutionExc
@Test
public void testStartWithInterruptedException() throws ExecutionException, InterruptedException {
// Prepare
- final HTTPSource source = new HTTPSource(testPluginSetting);
+ final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics);
try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) {
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
when(completableFuture.get()).thenThrow(new InterruptedException());
@@ -509,7 +509,7 @@ public void testStartWithInterruptedException() throws ExecutionException, Inter
@Test
public void testStopWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException {
// Prepare
- final HTTPSource source = new HTTPSource(testPluginSetting);
+ final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics);
try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) {
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
source.start(testBuffer);
@@ -524,7 +524,7 @@ public void testStopWithServerExecutionExceptionNoCause() throws ExecutionExcept
@Test
public void testStopWithServerExecutionExceptionWithCause() throws ExecutionException, InterruptedException {
// Prepare
- final HTTPSource source = new HTTPSource(testPluginSetting);
+ final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics);
try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) {
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
source.start(testBuffer);
@@ -541,7 +541,7 @@ public void testStopWithServerExecutionExceptionWithCause() throws ExecutionExce
@Test
public void testStopWithInterruptedException() throws ExecutionException, InterruptedException {
// Prepare
- final HTTPSource source = new HTTPSource(testPluginSetting);
+ final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics);
try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) {
armeriaServerMock.when(Server::builder).thenReturn(serverBuilder);
source.start(testBuffer);
@@ -559,7 +559,7 @@ public void testRunAnotherSourceWithSamePort() {
// starting server
HTTPSourceUnderTest.start(testBuffer);
- final HTTPSource secondSource = new HTTPSource(testPluginSetting);
+ final HTTPSource secondSource = new HTTPSource(sourceConfig, pluginMetrics);
//Expect RuntimeException because when port is already in use, BindException is thrown which is not RuntimeException
Assertions.assertThrows(RuntimeException.class, () -> secondSource.start(testBuffer));
}
diff --git a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactoryTest.java b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactoryTest.java
index 81188ac97b..b89a5ed729 100644
--- a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactoryTest.java
+++ b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/certificate/CertificateProviderFactoryTest.java
@@ -11,17 +11,15 @@
package com.amazon.dataprepper.plugins.source.loghttp.certificate;
-import com.amazon.dataprepper.model.configuration.PluginSetting;
import com.amazon.dataprepper.plugins.certificate.CertificateProvider;
import com.amazon.dataprepper.plugins.certificate.file.FileCertificateProvider;
import com.amazon.dataprepper.plugins.source.loghttp.HTTPSourceConfig;
import org.hamcrest.core.IsInstanceOf;
import org.junit.jupiter.api.Test;
-import java.util.HashMap;
-import java.util.Map;
-
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
class CertificateProviderFactoryTest {
private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile();
@@ -29,14 +27,10 @@ class CertificateProviderFactoryTest {
@Test
public void getFileCertificateProviderSuccess() {
- final Map settingsMap = new HashMap<>();
- settingsMap.put("ssl", true);
- settingsMap.put("ssl_certificate_file", TEST_SSL_CERTIFICATE_FILE);
- settingsMap.put("ssl_key_file", TEST_SSL_KEY_FILE);
-
- final PluginSetting pluginSetting = new PluginSetting(null, settingsMap);
- pluginSetting.setPipelineName("pipeline");
- final HTTPSourceConfig sourceConfig = HTTPSourceConfig.buildConfig(pluginSetting);
+ final HTTPSourceConfig sourceConfig = mock(HTTPSourceConfig.class);
+ when(sourceConfig.isSsl()).thenReturn(true);
+ when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE);
+ when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE);
final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
diff --git a/docs/developer_guide.md b/docs/developer_guide.md
index 0a979fef3a..b4d6fecc47 100644
--- a/docs/developer_guide.md
+++ b/docs/developer_guide.md
@@ -63,5 +63,8 @@ Optionally add `"-Dlog4j.configurationFile=config/log4j2.properties"` to the com
## More Information
-Please read our [Error Handling](error_handling.md) and [Logs](logs.md) guides for specific development guidance
-on those topics.
+We have the following pages for specific development guidance on the topics:
+
+* [Plugin Development](plugin_development.md)
+* [Error Handling](error_handling.md)
+* [Logs](logs.md)
diff --git a/docs/plugin_development.md b/docs/plugin_development.md
new file mode 100644
index 0000000000..cb7dfbd31e
--- /dev/null
+++ b/docs/plugin_development.md
@@ -0,0 +1,30 @@
+# Plugin Development
+
+Data Prepper supports plugins. All sources, buffers, preppers, and processors
+are created as Data Prepper plugins.
+
+## Plugin Requirements
+
+Plugins are created as Java classes. They must conform to the following.
+
+* The class must be annotated with [`@DataPrepperPlugin`](../data-prepper-api/src/main/java/com/amazon/dataprepper/model/annotations/DataPrepperPlugin.java)
+* The class must be in the `com.amazon.dataprepper.plugins` package
+* The class must implement the required interface
+* The class must have a valid constructor (see below)
+
+### Plugin Constructors
+
+The preferred way to create a plugin constructor is to choose a single
+constructor and annotate it with [`@DataPrepperConstructor`](../data-prepper-api/src/main/java/com/amazon/dataprepper/model/annotations/DataPrepperPluginConstructor.java).
+The constructor can only take in class types which are supported by the plugin framework.
+
+The plugin framework can inject the following types into this constructor:
+
+* An instance of the plugin configuration class type as defined by `DataPrepperPlugin::pluginConfigurationType`. The plugin framework will deserialize this type from the Pipeline configuration and supply it in the constructor if requested.
+* An instance of `PluginMetrics`.
+* An instance of `PluginSetting`.
+
+If your plugin requires no arguments, it can use a default constructor which will be chosen instead.
+
+Additionally, the plugin framework can create a plugin using a single parameter constructor with
+a single parameter of type `PluginSetting`. This behavior is deprecated and planned for removal.