diff --git a/data-prepper-core/src/main/java/com/amazon/dataprepper/plugin/DefaultPluginFactory.java b/data-prepper-core/src/main/java/com/amazon/dataprepper/plugin/DefaultPluginFactory.java index 30ad88fa84..cd8e59ae88 100644 --- a/data-prepper-core/src/main/java/com/amazon/dataprepper/plugin/DefaultPluginFactory.java +++ b/data-prepper-core/src/main/java/com/amazon/dataprepper/plugin/DefaultPluginFactory.java @@ -92,6 +92,7 @@ private PluginArgumentsContext getConstructionContext(final PluginSetting pl return new PluginArgumentsContext.Builder() .withPluginSetting(pluginSetting) .withPluginConfiguration(configuration) + .withPluginFactory(this) .build(); } diff --git a/data-prepper-core/src/main/java/com/amazon/dataprepper/plugin/PluginArgumentsContext.java b/data-prepper-core/src/main/java/com/amazon/dataprepper/plugin/PluginArgumentsContext.java index 267ad0fdbb..da6e45e8cb 100644 --- a/data-prepper-core/src/main/java/com/amazon/dataprepper/plugin/PluginArgumentsContext.java +++ b/data-prepper-core/src/main/java/com/amazon/dataprepper/plugin/PluginArgumentsContext.java @@ -3,6 +3,7 @@ import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.configuration.PluginSetting; import com.amazon.dataprepper.model.plugin.InvalidPluginDefinitionException; +import com.amazon.dataprepper.model.plugin.PluginFactory; import java.util.Arrays; import java.util.HashMap; @@ -30,6 +31,9 @@ private PluginArgumentsContext(final Builder builder) { } typedArgumentsSuppliers.put(PluginMetrics.class, () -> PluginMetrics.fromPluginSetting(builder.pluginSetting)); + + if(builder.pluginFactory != null) + typedArgumentsSuppliers.put(PluginFactory.class, () -> builder.pluginFactory); } Object[] createArguments(final Class[] parameterTypes) { @@ -50,6 +54,7 @@ private Supplier getRequiredArgumentSupplier(final Class parameterTyp static class Builder { private Object pluginConfiguration; private PluginSetting pluginSetting; + private PluginFactory pluginFactory; Builder withPluginConfiguration(final Object pluginConfiguration) { this.pluginConfiguration = pluginConfiguration; @@ -61,6 +66,11 @@ Builder withPluginSetting(final PluginSetting pluginSetting) { return this; } + Builder withPluginFactory(final PluginFactory pluginFactory) { + this.pluginFactory = pluginFactory; + return this; + } + PluginArgumentsContext build() { return new PluginArgumentsContext(this); } diff --git a/data-prepper-core/src/test/java/com/amazon/dataprepper/plugin/PluginArgumentsContextTest.java b/data-prepper-core/src/test/java/com/amazon/dataprepper/plugin/PluginArgumentsContextTest.java index ddd55e7d28..51c8824d8a 100644 --- a/data-prepper-core/src/test/java/com/amazon/dataprepper/plugin/PluginArgumentsContextTest.java +++ b/data-prepper-core/src/test/java/com/amazon/dataprepper/plugin/PluginArgumentsContextTest.java @@ -3,6 +3,7 @@ import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.configuration.PluginSetting; import com.amazon.dataprepper.model.plugin.InvalidPluginDefinitionException; +import com.amazon.dataprepper.model.plugin.PluginFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; @@ -70,6 +71,18 @@ void createArguments_with_two_classes_inverted_order() { equalTo(new Object[] { pluginSetting, testPluginConfiguration })); } + @Test + void createArguments_with_pluginFactory_should_return_the_instance_from_the_builder() { + final PluginFactory pluginFactory = mock(PluginFactory.class); + final PluginArgumentsContext objectUnderTest = new PluginArgumentsContext.Builder() + .withPluginSetting(pluginSetting) + .withPluginFactory(pluginFactory) + .build(); + + assertThat(objectUnderTest.createArguments(new Class[] { PluginFactory.class }), + equalTo(new Object[] { pluginFactory })); + } + @Test void createArguments_with_PluginMetrics() { final PluginArgumentsContext objectUnderTest = new PluginArgumentsContext.Builder() diff --git a/data-prepper-plugins/armeria-common/build.gradle b/data-prepper-plugins/armeria-common/build.gradle new file mode 100644 index 0000000000..7dbd125c60 --- /dev/null +++ b/data-prepper-plugins/armeria-common/build.gradle @@ -0,0 +1,6 @@ + +dependencies { + implementation project(':data-prepper-api') + implementation 'com.linecorp.armeria:armeria:1.9.2' + testImplementation 'com.linecorp.armeria:armeria-junit5:1.9.2' +} diff --git a/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/armeria/authentication/ArmeriaAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/armeria/authentication/ArmeriaAuthenticationProvider.java new file mode 100644 index 0000000000..bba7de0826 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/armeria/authentication/ArmeriaAuthenticationProvider.java @@ -0,0 +1,27 @@ +package com.amazon.dataprepper.armeria.authentication; + +import com.linecorp.armeria.server.ServerBuilder; + +/** + * An interface for providing authentication in Armeria-based HTTP servers. + *

+ * Plugin authors can use this interface for Armeria authentication in + * HTTP servers. + * + * @since 1.2 + */ +public interface ArmeriaAuthenticationProvider { + /** + * The plugin name for the plugin which allows unauthenticated + * requests. This plugin will disable authentication. + */ + String UNAUTHENTICATED_PLUGIN_NAME = "unauthenticated"; + + /** + * Adds an authentication decorator to an Armeria {@link ServerBuilder}. + * + * @param serverBuilder the builder for the server needing authentication + * @since 1.2 + */ + void addAuthenticationDecorator(ServerBuilder serverBuilder); +} diff --git a/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/armeria/authentication/HttpBasicAuthenticationConfig.java b/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/armeria/authentication/HttpBasicAuthenticationConfig.java new file mode 100644 index 0000000000..6ea63044c2 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/armeria/authentication/HttpBasicAuthenticationConfig.java @@ -0,0 +1,30 @@ +package com.amazon.dataprepper.armeria.authentication; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Configuration for HTTP Basic Authentication. + * + * @since 1.2 + */ +public class HttpBasicAuthenticationConfig { + private final String username; + private final String password; + + @JsonCreator + public HttpBasicAuthenticationConfig( + @JsonProperty("username") final String username, + @JsonProperty("password") final String password) { + this.username = username; + this.password = password; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } +} diff --git a/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/plugins/HttpBasicArmeriaAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/plugins/HttpBasicArmeriaAuthenticationProvider.java new file mode 100644 index 0000000000..c7e5b712fe --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/plugins/HttpBasicArmeriaAuthenticationProvider.java @@ -0,0 +1,48 @@ +package com.amazon.dataprepper.plugins; + +import com.amazon.dataprepper.armeria.authentication.ArmeriaAuthenticationProvider; +import com.amazon.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; +import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; +import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; +import com.linecorp.armeria.server.HttpService; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.auth.AuthService; + +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * The plugin for HTTP Basic authentication of Armeria servers. + * + * @since 1.2 + */ +@DataPrepperPlugin(name = "http_basic", + pluginType = ArmeriaAuthenticationProvider.class, + pluginConfigurationType = HttpBasicAuthenticationConfig.class) +public class HttpBasicArmeriaAuthenticationProvider implements ArmeriaAuthenticationProvider { + + private final HttpBasicAuthenticationConfig httpBasicAuthenticationConfig; + + @DataPrepperPluginConstructor + public HttpBasicArmeriaAuthenticationProvider(final HttpBasicAuthenticationConfig httpBasicAuthenticationConfig) { + Objects.requireNonNull(httpBasicAuthenticationConfig); + Objects.requireNonNull(httpBasicAuthenticationConfig.getUsername()); + Objects.requireNonNull(httpBasicAuthenticationConfig.getPassword()); + this.httpBasicAuthenticationConfig = httpBasicAuthenticationConfig; + } + + @Override + public void addAuthenticationDecorator(final ServerBuilder serverBuilder) { + serverBuilder.decorator(createDecorator()); + } + + private Function createDecorator() { + return AuthService.builder() + .addBasicAuth((context, basic) -> + CompletableFuture.completedFuture( + httpBasicAuthenticationConfig.getUsername().equals(basic.username()) && + httpBasicAuthenticationConfig.getPassword().equals(basic.password()))) + .newDecorator(); + } +} diff --git a/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/plugins/UnauthenticatedArmeriaAuthenticationProvider.java b/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/plugins/UnauthenticatedArmeriaAuthenticationProvider.java new file mode 100644 index 0000000000..02dd268388 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/main/java/com/amazon/dataprepper/plugins/UnauthenticatedArmeriaAuthenticationProvider.java @@ -0,0 +1,18 @@ +package com.amazon.dataprepper.plugins; + +import com.amazon.dataprepper.armeria.authentication.ArmeriaAuthenticationProvider; +import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; +import com.linecorp.armeria.server.ServerBuilder; + +/** + * The plugin to use for unauthenticated access to Armeria servers. It + * disables authentication on endpoints. + * + * @since 1.2 + */ +@DataPrepperPlugin(name = ArmeriaAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, pluginType = ArmeriaAuthenticationProvider.class) +public class UnauthenticatedArmeriaAuthenticationProvider implements ArmeriaAuthenticationProvider { + @Override + public void addAuthenticationDecorator(final ServerBuilder serverBuilder) { + } +} diff --git a/data-prepper-plugins/armeria-common/src/test/java/com/amazon/dataprepper/plugins/HttpBasicArmeriaAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/com/amazon/dataprepper/plugins/HttpBasicArmeriaAuthenticationProviderTest.java new file mode 100644 index 0000000000..8eba597d2e --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/com/amazon/dataprepper/plugins/HttpBasicArmeriaAuthenticationProviderTest.java @@ -0,0 +1,111 @@ +package com.amazon.dataprepper.plugins; + +import com.amazon.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.auth.BasicToken; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +class HttpBasicArmeriaAuthenticationProviderTest { + + private static final String USERNAME = UUID.randomUUID().toString(); + private static final String PASSWORD = UUID.randomUUID().toString(); + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(final ServerBuilder sb) { + sb.service("/test", (ctx, req) -> HttpResponse.of(200)); + + final HttpBasicAuthenticationConfig config = mock(HttpBasicAuthenticationConfig.class); + when(config.getUsername()).thenReturn(USERNAME); + when(config.getPassword()).thenReturn(PASSWORD); + new HttpBasicArmeriaAuthenticationProvider(config).addAuthenticationDecorator(sb); + } + }; + + @Nested + class ConstructorTests { + private HttpBasicAuthenticationConfig config; + + @BeforeEach + void setUp() { + config = mock(HttpBasicAuthenticationConfig.class); + + } + + private HttpBasicArmeriaAuthenticationProvider createObjectUnderTest() { + return new HttpBasicArmeriaAuthenticationProvider(config); + } + + @Test + void constructor_with_null_Config_throws() { + config = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_with_null_username_throws() { + reset(config); + when(config.getPassword()).thenReturn(UUID.randomUUID().toString()); + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_with_null_password_throws() { + reset(config); + when(config.getUsername()).thenReturn(UUID.randomUUID().toString()); + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + } + + @Nested + class WithServer { + @Test + void httpRequest_without_authentication_responds_Unauthorized() { + final WebClient client = WebClient.of(server.httpUri()); + + final AggregatedHttpResponse httpResponse = client.get("/test").aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.UNAUTHORIZED)); + } + + @Test + void httpRequest_with_incorrect_authentication_responds_Unauthorized() { + final WebClient client = WebClient.builder(server.httpUri()) + .auth(BasicToken.of(UUID.randomUUID().toString(), UUID.randomUUID().toString())) + .build(); + + final AggregatedHttpResponse httpResponse = client.get("/test").aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.UNAUTHORIZED)); + } + + @Test + void httpRequest_with_correct_authentication_responds_OK() { + final WebClient client = WebClient.builder(server.httpUri()) + .auth(BasicToken.of(USERNAME, PASSWORD)) + .build(); + + final AggregatedHttpResponse httpResponse = client.get("/test").aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/armeria-common/src/test/java/com/amazon/dataprepper/plugins/UnauthenticatedArmeriaAuthenticationProviderTest.java b/data-prepper-plugins/armeria-common/src/test/java/com/amazon/dataprepper/plugins/UnauthenticatedArmeriaAuthenticationProviderTest.java new file mode 100644 index 0000000000..a794effd45 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/java/com/amazon/dataprepper/plugins/UnauthenticatedArmeriaAuthenticationProviderTest.java @@ -0,0 +1,47 @@ +package com.amazon.dataprepper.plugins; + +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.auth.BasicToken; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +class UnauthenticatedArmeriaAuthenticationProviderTest { + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(final ServerBuilder sb) { + sb.service("/test", (ctx, req) -> HttpResponse.of(200)); + new UnauthenticatedArmeriaAuthenticationProvider().addAuthenticationDecorator(sb); + } + }; + + @Test + void httpRequest_without_authentication_responds_OK() { + final WebClient client = WebClient.of(server.httpUri()); + + final AggregatedHttpResponse httpResponse = client.get("/test").aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + } + + @Test + void httpRequest_with_BasicAuthentication_responds_OK() { + final WebClient client = WebClient.builder(server.httpUri()) + .auth(BasicToken.of(UUID.randomUUID().toString(), UUID.randomUUID().toString())) + .build(); + + final AggregatedHttpResponse httpResponse = client.get("/test").aggregate().join(); + + assertThat(httpResponse.status(), equalTo(HttpStatus.OK)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/armeria-common/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/armeria-common/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..ca6ee9cea8 --- /dev/null +++ b/data-prepper-plugins/armeria-common/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/data-prepper-plugins/http-source/build.gradle b/data-prepper-plugins/http-source/build.gradle index 87e865cb79..17bbb1710b 100644 --- a/data-prepper-plugins/http-source/build.gradle +++ b/data-prepper-plugins/http-source/build.gradle @@ -6,6 +6,7 @@ dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:blocking-buffer') implementation project(':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:armeria-common') implementation "com.linecorp.armeria:armeria:1.9.2" implementation "commons-io:commons-io:2.11.0" testImplementation project(':data-prepper-api').sourceSets.test.output diff --git a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java index 5ece991db2..3b8d6ce39b 100644 --- a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -11,10 +11,14 @@ package com.amazon.dataprepper.plugins.source.loghttp; +import com.amazon.dataprepper.armeria.authentication.ArmeriaAuthenticationProvider; import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; import com.amazon.dataprepper.model.buffer.Buffer; +import com.amazon.dataprepper.model.configuration.PluginModel; +import com.amazon.dataprepper.model.configuration.PluginSetting; +import com.amazon.dataprepper.model.plugin.PluginFactory; import com.amazon.dataprepper.model.record.Record; import com.amazon.dataprepper.model.source.Source; import com.amazon.dataprepper.plugins.certificate.CertificateProvider; @@ -29,6 +33,7 @@ import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -38,16 +43,27 @@ public class HTTPSource implements Source> { private final HTTPSourceConfig sourceConfig; private final CertificateProviderFactory certificateProviderFactory; + private final ArmeriaAuthenticationProvider authenticationProvider; private Server server; private final PluginMetrics pluginMetrics; @DataPrepperPluginConstructor - public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics) { + public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory) { // TODO: Remove once JSR-303 validation is available. sourceConfig.validate(); this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; certificateProviderFactory = new CertificateProviderFactory(sourceConfig); + final PluginModel authenticationConfiguration = sourceConfig.getAuthentication(); + final PluginSetting authenticationPluginSetting; + if(authenticationConfiguration != null) { + authenticationPluginSetting = + new PluginSetting(authenticationConfiguration.getPluginName(), authenticationConfiguration.getPluginSettings()); + } else { + authenticationPluginSetting = + new PluginSetting(ArmeriaAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap()); + } + authenticationProvider = pluginFactory.loadPlugin(ArmeriaAuthenticationProvider.class, authenticationPluginSetting); } @Override @@ -70,6 +86,9 @@ public void start(final Buffer> buffer) { } else { sb.http(sourceConfig.getPort()); } + + authenticationProvider.addAuthenticationDecorator(sb); + sb.maxNumConnections(sourceConfig.getMaxConnectionCount()); final int requestTimeoutInMillis = sourceConfig.getRequestTimeoutInMillis(); // Allow 2*requestTimeoutInMillis to accommodate non-blocking operations other than buffer writing. diff --git a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java index a118f7435f..6ead38beed 100644 --- a/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java +++ b/data-prepper-plugins/http-source/src/main/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java @@ -11,6 +11,7 @@ package com.amazon.dataprepper.plugins.source.loghttp; +import com.amazon.dataprepper.model.configuration.PluginModel; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.micrometer.core.instrument.util.StringUtils; @@ -56,6 +57,8 @@ public class HTTPSourceConfig { @JsonProperty("ssl_key_password") private String sslKeyPassword; + private PluginModel authentication; + // TODO: Remove once JSR-303 validation is available void validate() { Preconditions.checkArgument(port >= 0 && port < 65535, "port must be between 0 and 65535."); @@ -111,4 +114,8 @@ public String getSslKeyFile() { public String getSslKeyPassword() { return sslKeyPassword; } + + public PluginModel getAuthentication() { + return authentication; + } } diff --git a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java index 43fb61f9f7..2fa7dc937a 100644 --- a/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/com/amazon/dataprepper/plugins/source/loghttp/HTTPSourceTest.java @@ -11,10 +11,12 @@ package com.amazon.dataprepper.plugins.source.loghttp; +import com.amazon.dataprepper.armeria.authentication.ArmeriaAuthenticationProvider; import com.amazon.dataprepper.metrics.MetricNames; import com.amazon.dataprepper.metrics.MetricsTestUtil; import com.amazon.dataprepper.metrics.PluginMetrics; import com.amazon.dataprepper.model.configuration.PluginSetting; +import com.amazon.dataprepper.model.plugin.PluginFactory; import com.amazon.dataprepper.model.record.Record; import com.amazon.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; import com.linecorp.armeria.client.ClientFactory; @@ -58,6 +60,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -101,6 +104,7 @@ class HTTPSourceTest { private List payloadSizeSummaryMeasurements; private HTTPSourceConfig sourceConfig; private PluginMetrics pluginMetrics; + private PluginFactory pluginFactory; private BlockingBuffer> getBuffer() { final HashMap integerHashMap = new HashMap<>(); @@ -162,8 +166,13 @@ public void setUp() { MetricsTestUtil.initMetrics(); pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); + pluginFactory = mock(PluginFactory.class); + final ArmeriaAuthenticationProvider authenticationProvider = mock(ArmeriaAuthenticationProvider.class); + when(pluginFactory.loadPlugin(eq(ArmeriaAuthenticationProvider.class), any(PluginSetting.class))) + .thenReturn(authenticationProvider); + testBuffer = getBuffer(); - HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics); + HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); } @AfterEach @@ -296,7 +305,7 @@ public void testHTTPJsonResponse415() { when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(serverTimeoutInMillis); when(sourceConfig.getMaxPendingRequests()).thenReturn(testMaxPendingRequests); when(sourceConfig.getThreadCount()).thenReturn(testThreadCount); - HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics); + HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); // Start the source HTTPSourceUnderTest.start(testBuffer); refreshMeasurements(); @@ -347,7 +356,7 @@ public void testHTTPJsonResponse429() throws InterruptedException { when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(serverTimeoutInMillis); when(sourceConfig.getMaxPendingRequests()).thenReturn(testMaxPendingRequests); when(sourceConfig.getThreadCount()).thenReturn(testThreadCount); - HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics); + HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); // Start the source HTTPSourceUnderTest.start(testBuffer); refreshMeasurements(); @@ -408,7 +417,7 @@ public void testServerStartCertFileSuccess() throws IOException { when(sourceConfig.isSsl()).thenReturn(true); when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); - HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics); + HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); HTTPSourceUnderTest.start(testBuffer); HTTPSourceUnderTest.stop(); @@ -433,7 +442,7 @@ void testHTTPSJsonResponse() { when(sourceConfig.isSsl()).thenReturn(true); when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); - HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics); + HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); testBuffer = getBuffer(); HTTPSourceUnderTest.start(testBuffer); @@ -460,14 +469,14 @@ public void testDoubleStart() { @Test public void testStartWithEmptyBuffer() { - final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics); + final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); Assertions.assertThrows(IllegalStateException.class, () -> source.start(null)); } @Test public void testStartWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException { // Prepare - final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics); + final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); when(completableFuture.get()).thenThrow(new ExecutionException("", null)); @@ -480,7 +489,7 @@ public void testStartWithServerExecutionExceptionNoCause() throws ExecutionExcep @Test public void testStartWithServerExecutionExceptionWithCause() throws ExecutionException, InterruptedException { // Prepare - final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics); + final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); final NullPointerException expCause = new NullPointerException(); @@ -495,7 +504,7 @@ public void testStartWithServerExecutionExceptionWithCause() throws ExecutionExc @Test public void testStartWithInterruptedException() throws ExecutionException, InterruptedException { // Prepare - final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics); + final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); when(completableFuture.get()).thenThrow(new InterruptedException()); @@ -509,7 +518,7 @@ public void testStartWithInterruptedException() throws ExecutionException, Inter @Test public void testStopWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException { // Prepare - final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics); + final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); source.start(testBuffer); @@ -524,7 +533,7 @@ public void testStopWithServerExecutionExceptionNoCause() throws ExecutionExcept @Test public void testStopWithServerExecutionExceptionWithCause() throws ExecutionException, InterruptedException { // Prepare - final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics); + final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); source.start(testBuffer); @@ -541,7 +550,7 @@ public void testStopWithServerExecutionExceptionWithCause() throws ExecutionExce @Test public void testStopWithInterruptedException() throws ExecutionException, InterruptedException { // Prepare - final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics); + final HTTPSource source = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { armeriaServerMock.when(Server::builder).thenReturn(serverBuilder); source.start(testBuffer); @@ -559,7 +568,7 @@ public void testRunAnotherSourceWithSamePort() { // starting server HTTPSourceUnderTest.start(testBuffer); - final HTTPSource secondSource = new HTTPSource(sourceConfig, pluginMetrics); + final HTTPSource secondSource = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory); //Expect RuntimeException because when port is already in use, BindException is thrown which is not RuntimeException Assertions.assertThrows(RuntimeException.class, () -> secondSource.start(testBuffer)); } diff --git a/settings.gradle b/settings.gradle index 9aa3c2b7a2..2f375581cc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -22,6 +22,7 @@ include 'data-prepper-api' include 'data-prepper-plugins' include 'data-prepper-core' include 'data-prepper-plugins:common' +include 'data-prepper-plugins:armeria-common' include 'data-prepper-plugins:opensearch' include 'data-prepper-plugins:service-map-stateful' include 'data-prepper-plugins:mapdb-prepper-state'