diff --git a/data-prepper-plugins/opensearch-source/build.gradle b/data-prepper-plugins/opensearch-source/build.gradle new file mode 100644 index 0000000000..6e0d7b72e8 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/build.gradle @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +dependencies { + implementation project(path: ':data-prepper-api') + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2' + implementation 'software.amazon.awssdk:s3' + implementation 'software.amazon.awssdk:sts' + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java new file mode 100644 index 0000000000..80843f0bbb --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; + +@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType =OpenSearchSourceConfiguration.class ) +public class OpenSearchSource implements Source> { + + private final OpenSearchSourceConfiguration openSearchSourceConfiguration; + + @DataPrepperPluginConstructor + public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + this.openSearchSourceConfiguration = openSearchSourceConfiguration; + } + + @Override + public void start(Buffer> buffer) { + if (buffer == null) { + throw new IllegalStateException("Buffer provided is null"); + } + startProcess(openSearchSourceConfiguration); + } + + private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + //Yet to implement + } + + @Override + public void stop() { + // Yet to implement + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java new file mode 100644 index 0000000000..ccbf0f88dc --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.QueryParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; + +import java.util.List; + +public class OpenSearchSourceConfiguration { + + @JsonProperty("max_retries") + private Integer maxRetries; + + @NotNull + @JsonProperty("hosts") + private List hosts; + + @JsonProperty("username") + private String username; + + @JsonProperty("password") + private String password; + + @JsonProperty("connection") + private ConnectionConfiguration connectionConfiguration; + + @JsonProperty("indices") + private IndexParametersConfiguration indexParametersConfiguration; + + @JsonProperty("aws") + private AwsAuthenticationConfiguration awsAuthenticationOptions; + + @JsonProperty("scheduling") + private SchedulingParameterConfiguration schedulingParameterConfiguration; + + @JsonProperty("query") + private QueryParameterConfiguration queryParameterConfiguration; + + @JsonProperty("search_options") + private SearchConfiguration searchConfiguration; + + public Integer getMaxRetries() { + return maxRetries; + } + + public List getHosts() { + return hosts; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public ConnectionConfiguration getConnectionConfiguration() { + return connectionConfiguration; + } + + public IndexParametersConfiguration getIndexParametersConfiguration() { + return indexParametersConfiguration; + } + + public AwsAuthenticationConfiguration getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + public SchedulingParameterConfiguration getSchedulingParameterConfiguration() { + return schedulingParameterConfiguration; + } + + public QueryParameterConfiguration getQueryParameterConfiguration() { + return queryParameterConfiguration; + } + + public SearchConfiguration getSearchConfiguration() { + return searchConfiguration; + } + +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java new file mode 100644 index 0000000000..ab0cc09b7f --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java @@ -0,0 +1,97 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +public class AwsAuthenticationConfiguration { + private static final String AWS_IAM_ROLE = "role"; + private static final String AWS_IAM = "iam"; + + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + private void validateStsRoleArn() { + final Arn arn = getArn(); + if (!AWS_IAM.equals(arn.service())) { + throw new IllegalArgumentException("sts_role_arn must be an IAM Role"); + } + final Optional resourceType = arn.resource().resourceType(); + if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) { + throw new IllegalArgumentException("sts_role_arn must be an IAM Role"); + } + } + + private Arn getArn() { + try { + return Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn)); + } + } + + public String getAwsStsRoleArn() { + return awsStsRoleArn; + } + + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + public AwsCredentialsProvider authenticateAwsConfiguration() { + + final AwsCredentialsProvider awsCredentialsProvider; + if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { + + validateStsRoleArn(); + + final StsClient stsClient = StsClient.builder() + .region(getAwsRegion()) + .build(); + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("OpenSearch-Source-" + UUID.randomUUID()) + .roleArn(awsStsRoleArn); + if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) { + assumeRoleRequestBuilder = assumeRoleRequestBuilder + .overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader)); + } + + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder() + .stsClient(stsClient) + .refreshRequest(assumeRoleRequestBuilder.build()) + .build(); + + } else { + // use default credential provider + awsCredentialsProvider = DefaultCredentialsProvider.create(); + } + + return awsCredentialsProvider; + } +} + diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/ConnectionConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/ConnectionConfiguration.java new file mode 100644 index 0000000000..ade2abadb9 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/ConnectionConfiguration.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.nio.file.Path; + +public class ConnectionConfiguration { + + @JsonProperty("cert") + private Path certPath; + + @JsonProperty("socket_timeout") + private Integer socketTimeout; + + @JsonProperty("connection_timeout") + private Integer connectTimeout; + + @JsonProperty("insecure") + private boolean insecure; + + public Path getCertPath() { + return certPath; + } + + public Integer getSocketTimeout() { + return socketTimeout; + } + + public Integer getConnectTimeout() { + return connectTimeout; + } + + public boolean isInsecure() { + return insecure; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/IndexParametersConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/IndexParametersConfiguration.java new file mode 100644 index 0000000000..cf09cf74ec --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/IndexParametersConfiguration.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +public class IndexParametersConfiguration { + + @JsonProperty("include") + private List include; + + @JsonProperty("exclude") + private List exclude; + + public void setInclude(List include) { + this.include = include; + } + + public void setExclude(List exclude) { + this.exclude = exclude; + } + + public List getInclude() { + return include; + } + + public List getExclude() { + return exclude; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/QueryParameterConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/QueryParameterConfiguration.java new file mode 100644 index 0000000000..908442b5f7 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/QueryParameterConfiguration.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +public class QueryParameterConfiguration { + + @JsonProperty("fields") + private List fields; + + public List getFields() { + return fields; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfiguration.java new file mode 100644 index 0000000000..ba8b6c2b95 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfiguration.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; +import jakarta.validation.constraints.Min; + +import java.time.Duration; +import java.time.LocalDateTime; + +public class SchedulingParameterConfiguration { + + @JsonProperty("rate") + private Duration rate; + + @Min(1) + @JsonProperty("job_count") + private int jobCount = 1; + + @JsonSerialize(using = LocalDateTimeSerializer.class) + @JsonDeserialize(using = LocalDateTimeDeserializer.class) + @JsonProperty("start_time") + private LocalDateTime startTime = LocalDateTime.now(); + + public Duration getRate() { + return rate; + } + + public int getJobCount() { + return jobCount; + } + + public LocalDateTime getStartTime() { + return startTime; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java new file mode 100644 index 0000000000..c3b2b70efc --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public class SearchConfiguration { + + @JsonProperty("batch_size") + private Integer batchSize; + + @JsonProperty("expand_wildcards") + private WildCardOptions expandWildcards = WildCardOptions.ALL; + + @JsonProperty("sorting") + private List sorting; + + public Integer getBatchSize() { + return batchSize; + } + + public WildCardOptions getExpandWildcards() { + return expandWildcards; + } + + public List getSorting() { + return sorting; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortingConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortingConfiguration.java new file mode 100644 index 0000000000..8614f70597 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortingConfiguration.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +public class SortingConfiguration { + private static final String DEFAULT_SORT_ORDER = "asc"; + + @NotEmpty + @NotNull + @JsonProperty("sort_key") + private String sortKey; + @JsonProperty("order") + private String order = DEFAULT_SORT_ORDER; + + public String getSortKey() { + return sortKey; + } + + public String getOrder() { + return order; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/WildCardOptions.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/WildCardOptions.java new file mode 100644 index 0000000000..141e5e715e --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/WildCardOptions.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum WildCardOptions { + OPEN("open"), + CLOSED("closed"), + NONE("none"), + ALL("all"); + private static final Map OPTIONS_MAP = Arrays.stream(WildCardOptions.values()) + .collect(Collectors.toMap( + value -> value.option, + value -> value + )); + + private final String option; + + WildCardOptions(final String option) { + this.option = option; + } + + @JsonCreator + static WildCardOptions fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java new file mode 100644 index 0000000000..1d53dcfb17 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.Test; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.WildCardOptions; +import software.amazon.awssdk.regions.Region; + +import java.nio.file.Path; +import java.time.LocalDateTime; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class OpenSearchSourceConfigurationTest { + + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + public void open_search_source_config_values_test() throws JsonProcessingException { + + final String sourceConfigurationYaml = "max_retries: 5\n" + + "hosts: [\"http://localhost:9200\"]\n" + + "username: test\n" + + "password: test\n" + + "connection:\n" + + " insecure: true\n" + + " cert: \"cert\"\n" + + "indices:\n" + + " include:\n" + + " - \"shakespeare\"\n" + + "aws:\n" + + " region: \"us-east-1\"\n" + + " sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" + + "scheduling:\n" + + " job_count: 3\n" + + " start_time: 2023-05-05T18:00:00\n" + + "query:\n" + + " fields: [\"test_variable : test_value\"]\n" + + "search_options:\n" + + " batch_size: 1000\n" + + " expand_wildcards: \"open\"\n" + + " sorting:\n" + + " - sort_key: name\n" + + " order: desc"; + final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); + final ConnectionConfiguration connectionConfig = sourceConfiguration.getConnectionConfiguration(); + final SearchConfiguration searchConfiguration = sourceConfiguration.getSearchConfiguration(); + final AwsAuthenticationConfiguration awsAuthenticationOptions = sourceConfiguration.getAwsAuthenticationOptions(); + final SchedulingParameterConfiguration schedulingParameterConfiguration = sourceConfiguration.getSchedulingParameterConfiguration(); + + assertThat(sourceConfiguration.getMaxRetries(),equalTo(5)); + assertThat(awsAuthenticationOptions.getAwsRegion(),equalTo(Region.US_EAST_1)); + assertThat(sourceConfiguration.getHosts().get(0),equalTo("http://localhost:9200")); + assertThat(sourceConfiguration.getUsername(),equalTo("test")); + assertThat(sourceConfiguration.getPassword(),equalTo("test")); + assertThat(connectionConfig.getCertPath(),equalTo(Path.of("cert"))); + assertThat(connectionConfig.isInsecure(),equalTo(true)); + assertThat(searchConfiguration.getExpandWildcards(),equalTo(WildCardOptions.OPEN)); + assertThat(searchConfiguration.getBatchSize(),equalTo(1000)); + assertThat(sourceConfiguration.getQueryParameterConfiguration().getFields(),equalTo(List.of("test_variable : test_value"))); + assertThat(schedulingParameterConfiguration.getJobCount(),equalTo(3)); + assertThat(schedulingParameterConfiguration.getStartTime(),equalTo(LocalDateTime.parse("2023-05-05T18:00:00"))); + assertThat(sourceConfiguration.getIndexParametersConfiguration().getInclude().get(0),equalTo("shakespeare")); + assertThat(searchConfiguration.getSorting().get(0).getSortKey(),equalTo("name")); + assertThat(searchConfiguration.getSorting().get(0).getOrder(),equalTo("desc")); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java new file mode 100644 index 0000000000..b50a9f887d --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java @@ -0,0 +1,232 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.StsClientBuilder; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +class AwsAuthenticationConfigurationTest { + + private AwsAuthenticationConfiguration awsAuthenticationOptions; + + private final String TEST_ROLE = "arn:aws:iam::123456789012:role/test-role"; + + @BeforeEach + void setUp() { + awsAuthenticationOptions = new AwsAuthenticationConfiguration(); + } + + @Test + void getAwsRegion_returns_Region_of() throws NoSuchFieldException, IllegalAccessException { + final String regionString = UUID.randomUUID().toString(); + final Region expectedRegionObject = mock(Region.class); + reflectivelySetField(awsAuthenticationOptions, "awsRegion", regionString); + final Region actualRegion; + try(final MockedStatic regionMockedStatic = mockStatic(Region.class)) { + regionMockedStatic.when(() -> Region.of(regionString)).thenReturn(expectedRegionObject); + actualRegion = awsAuthenticationOptions.getAwsRegion(); + } + assertThat(actualRegion, equalTo(expectedRegionObject)); + } + + @Test + void getAwsRegion_returns_null_when_region_is_null() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", null); + assertThat(awsAuthenticationOptions.getAwsRegion(), nullValue()); + } + + @Test + void authenticateAWSConfiguration_should_return_s3Client_without_sts_role_arn() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", "us-east-1"); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", null); + + final DefaultCredentialsProvider mockedCredentialsProvider = mock(DefaultCredentialsProvider.class); + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic defaultCredentialsProviderMockedStatic = mockStatic(DefaultCredentialsProvider.class)) { + defaultCredentialsProviderMockedStatic.when(DefaultCredentialsProvider::create) + .thenReturn(mockedCredentialsProvider); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, sameInstance(mockedCredentialsProvider)); + } + + @Nested + class WithSts { + private StsClient stsClient; + private StsClientBuilder stsClientBuilder; + + @BeforeEach + void setUp() { + stsClient = mock(StsClient.class); + stsClientBuilder = mock(StsClientBuilder.class); + + when(stsClientBuilder.build()).thenReturn(stsClient); + } + + @Test + void authenticateAWSConfiguration_should_return_s3Client_with_sts_role_arn() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", "us-east-1"); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", TEST_ROLE); + + when(stsClientBuilder.region(Region.US_EAST_1)).thenReturn(stsClientBuilder); + final AssumeRoleRequest.Builder assumeRoleRequestBuilder = mock(AssumeRoleRequest.Builder.class); + when(assumeRoleRequestBuilder.roleSessionName(anyString())) + .thenReturn(assumeRoleRequestBuilder); + when(assumeRoleRequestBuilder.roleArn(anyString())) + .thenReturn(assumeRoleRequestBuilder); + + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class); + final MockedStatic assumeRoleRequestMockedStatic = mockStatic(AssumeRoleRequest.class)) { + stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); + assumeRoleRequestMockedStatic.when(AssumeRoleRequest::builder).thenReturn(assumeRoleRequestBuilder); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); + + verify(assumeRoleRequestBuilder).roleArn(TEST_ROLE); + verify(assumeRoleRequestBuilder).roleSessionName(anyString()); + verify(assumeRoleRequestBuilder).build(); + verifyNoMoreInteractions(assumeRoleRequestBuilder); + } + + @Test + void authenticateAWSConfiguration_should_return_s3Client_with_sts_role_arn_when_no_region() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", null); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", TEST_ROLE); + assertThat(awsAuthenticationOptions.getAwsRegion(), equalTo(null)); + + when(stsClientBuilder.region(null)).thenReturn(stsClientBuilder); + + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class)) { + stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); + } + + @Test + void authenticateAWSConfiguration_should_override_STS_Headers_when_HeaderOverrides_when_set() throws NoSuchFieldException, IllegalAccessException { + final String headerName1 = UUID.randomUUID().toString(); + final String headerValue1 = UUID.randomUUID().toString(); + final String headerName2 = UUID.randomUUID().toString(); + final String headerValue2 = UUID.randomUUID().toString(); + final Map overrideHeaders = Map.of(headerName1, headerValue1, headerName2, headerValue2); + + reflectivelySetField(awsAuthenticationOptions, "awsRegion", "us-east-1"); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", TEST_ROLE); + reflectivelySetField(awsAuthenticationOptions, "awsStsHeaderOverrides", overrideHeaders); + + when(stsClientBuilder.region(Region.US_EAST_1)).thenReturn(stsClientBuilder); + + final AssumeRoleRequest.Builder assumeRoleRequestBuilder = mock(AssumeRoleRequest.Builder.class); + when(assumeRoleRequestBuilder.roleSessionName(anyString())) + .thenReturn(assumeRoleRequestBuilder); + when(assumeRoleRequestBuilder.roleArn(anyString())) + .thenReturn(assumeRoleRequestBuilder); + when(assumeRoleRequestBuilder.overrideConfiguration(any(Consumer.class))) + .thenReturn(assumeRoleRequestBuilder); + + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class); + final MockedStatic assumeRoleRequestMockedStatic = mockStatic(AssumeRoleRequest.class)) { + stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); + assumeRoleRequestMockedStatic.when(AssumeRoleRequest::builder).thenReturn(assumeRoleRequestBuilder); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); + + final ArgumentCaptor> configurationCaptor = ArgumentCaptor.forClass(Consumer.class); + + verify(assumeRoleRequestBuilder).roleArn(TEST_ROLE); + verify(assumeRoleRequestBuilder).roleSessionName(anyString()); + verify(assumeRoleRequestBuilder).overrideConfiguration(configurationCaptor.capture()); + verify(assumeRoleRequestBuilder).build(); + verifyNoMoreInteractions(assumeRoleRequestBuilder); + + final Consumer actualOverride = configurationCaptor.getValue(); + + final AwsRequestOverrideConfiguration.Builder configurationBuilder = mock(AwsRequestOverrideConfiguration.Builder.class); + actualOverride.accept(configurationBuilder); + verify(configurationBuilder).putHeader(headerName1, headerValue1); + verify(configurationBuilder).putHeader(headerName2, headerValue2); + verifyNoMoreInteractions(configurationBuilder); + } + + @Test + void authenticateAWSConfiguration_should_not_override_STS_Headers_when_HeaderOverrides_are_empty() throws NoSuchFieldException, IllegalAccessException { + reflectivelySetField(awsAuthenticationOptions, "awsRegion", "us-east-1"); + reflectivelySetField(awsAuthenticationOptions, "awsStsRoleArn", TEST_ROLE); + reflectivelySetField(awsAuthenticationOptions, "awsStsHeaderOverrides", Collections.emptyMap()); + + when(stsClientBuilder.region(Region.US_EAST_1)).thenReturn(stsClientBuilder); + final AssumeRoleRequest.Builder assumeRoleRequestBuilder = mock(AssumeRoleRequest.Builder.class); + when(assumeRoleRequestBuilder.roleSessionName(anyString())) + .thenReturn(assumeRoleRequestBuilder); + when(assumeRoleRequestBuilder.roleArn(anyString())) + .thenReturn(assumeRoleRequestBuilder); + + final AwsCredentialsProvider actualCredentialsProvider; + try (final MockedStatic stsClientMockedStatic = mockStatic(StsClient.class); + final MockedStatic assumeRoleRequestMockedStatic = mockStatic(AssumeRoleRequest.class)) { + stsClientMockedStatic.when(StsClient::builder).thenReturn(stsClientBuilder); + assumeRoleRequestMockedStatic.when(AssumeRoleRequest::builder).thenReturn(assumeRoleRequestBuilder); + actualCredentialsProvider = awsAuthenticationOptions.authenticateAwsConfiguration(); + } + + assertThat(actualCredentialsProvider, instanceOf(AwsCredentialsProvider.class)); + + verify(assumeRoleRequestBuilder).roleArn(TEST_ROLE); + verify(assumeRoleRequestBuilder).roleSessionName(anyString()); + verify(assumeRoleRequestBuilder).build(); + verifyNoMoreInteractions(assumeRoleRequestBuilder); + } + } + + private void reflectivelySetField(final AwsAuthenticationConfiguration awsAuthenticationOptions, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException { + final Field field = AwsAuthenticationConfiguration.class.getDeclaredField(fieldName); + try { + field.setAccessible(true); + field.set(awsAuthenticationOptions, value); + } finally { + field.setAccessible(false); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/ConnectionConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/ConnectionConfigurationTest.java new file mode 100644 index 0000000000..747d7495c2 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/ConnectionConfigurationTest.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.Test; + +import java.nio.file.Path; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class ConnectionConfigurationTest { + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + public void connection_configuration_values_test() throws JsonProcessingException { + + final String connectionYaml = + " cert: \"cert\"\n" + + " insecure: true\n" + + " socket_timeout: 500\n" + + " connection_timeout: 500"; + final ConnectionConfiguration connectionConfig = objectMapper.readValue(connectionYaml, ConnectionConfiguration.class); + assertThat(connectionConfig.getCertPath(),equalTo(Path.of("cert"))); + assertThat(connectionConfig.getSocketTimeout(),equalTo(500)); + assertThat(connectionConfig.getConnectTimeout(),equalTo(500)); + assertThat(connectionConfig.isInsecure(),equalTo(true)); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/IndexParametersConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/IndexParametersConfigurationTest.java new file mode 100644 index 0000000000..e90bd6ec04 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/IndexParametersConfigurationTest.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.Test; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class IndexParametersConfigurationTest { + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + public void index_parameter_configuration_test() throws JsonProcessingException { + + final String indexParameterConfigYaml = + " include:\n" + + " - \"includeTest\"\n" + + " - \"includeTest1\"\n" + + " exclude:\n" + + " - \"excludeTest\""; + final IndexParametersConfiguration indexParametersConfiguration = objectMapper.readValue(indexParameterConfigYaml, IndexParametersConfiguration.class); + assertThat(indexParametersConfiguration.getInclude(),equalTo(List.of("includeTest","includeTest1"))); + assertThat(indexParametersConfiguration.getExclude(),equalTo(List.of("excludeTest"))); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/QueryParameterConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/QueryParameterConfigurationTest.java new file mode 100644 index 0000000000..5bc9ea179e --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/QueryParameterConfigurationTest.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.Test; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class QueryParameterConfigurationTest { + + + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + public void query_params_config_values_test() throws JsonProcessingException { + final String queryConfigurationYaml = + " fields: [\"test_variable : test_value\"]"; + final QueryParameterConfiguration queryParameterConfiguration = objectMapper.readValue(queryConfigurationYaml, QueryParameterConfiguration.class); + assertThat(queryParameterConfiguration.getFields(),equalTo(List.of("test_variable : test_value"))); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfigurationTest.java new file mode 100644 index 0000000000..ef925e8ee0 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfigurationTest.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.Test; + +import java.time.LocalDateTime; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class SchedulingParameterConfigurationTest { + + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + public void scheduling_parameter_configuration_test() throws JsonProcessingException { + + final String schedulingParameterYaml = + " job_count: 3\n" + + " start_time: 2023-05-05T18:00:00\n"; + final SchedulingParameterConfiguration schedulingParameterConfiguration = objectMapper.readValue(schedulingParameterYaml, SchedulingParameterConfiguration.class); + assertThat(schedulingParameterConfiguration.getJobCount(),equalTo(3)); + assertThat(schedulingParameterConfiguration.getStartTime(),equalTo(LocalDateTime.parse("2023-05-05T18:00:00"))); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java new file mode 100644 index 0000000000..d8ee58ca92 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class SearchConfigurationTest { + + + private ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + public void search_config_values_test() throws JsonProcessingException { + + final String searchConfigurationYaml = + " batch_size: 1000\n" + + " expand_wildcards: \"open\"\n" + + " sorting:\n" + + " - sort_key: \"test\"\n" + + " order: asc\n" + + " - sort_key: \"name\"\n" + + " order: desc"; + final SearchConfiguration searchConfiguration = objectMapper.readValue(searchConfigurationYaml, SearchConfiguration.class); + assertThat(searchConfiguration.getExpandWildcards(),equalTo(WildCardOptions.OPEN)); + assertThat(searchConfiguration.getBatchSize(),equalTo(1000)); + assertThat(searchConfiguration.getSorting().get(0).getSortKey(),equalTo("test")); + assertThat(searchConfiguration.getSorting().get(0).getOrder(),equalTo("asc")); + assertThat(searchConfiguration.getSorting().get(1).getSortKey(),equalTo("name")); + assertThat(searchConfiguration.getSorting().get(1).getOrder(),equalTo("desc")); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/data-prepper-plugins/opensearch-source/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000000..23c33feb6d --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1,3 @@ +# To enable mocking of final classes with vanilla Mockito +# https://github.com/mockito/mockito/wiki/What%27s-new-in-Mockito-2#mock-the-unmockable-opt-in-mocking-of-final-classesmethods +mock-maker-inline diff --git a/settings.gradle b/settings.gradle index 6954951f45..e28dc10b02 100644 --- a/settings.gradle +++ b/settings.gradle @@ -114,3 +114,4 @@ include 'data-prepper-plugins:failures-common' include 'data-prepper-plugins:newline-codecs' include 'data-prepper-plugins:avro-codecs' include 'data-prepper-plugins:kafka-plugins' +include 'data-prepper-plugins:opensearch-source'