From fd9d714da8b16c0b7c2e057aa88cdd59bf571e7f Mon Sep 17 00:00:00 2001 From: rajeshLovesToCode Date: Tue, 9 May 2023 14:37:12 +0530 Subject: [PATCH] opensearch configuration changes for #1985,2264 Signed-off-by: rajeshLovesToCode --- .../opensearch-source/build.gradle | 28 ++++++++ .../plugins/source/OpenSearchSource.java | 40 +++++++++++ .../source/OpenSearchSourceConfiguration.java | 72 +++++++++++++++++++ .../AwsAuthenticationConfiguration.java | 60 ++++++++++++++++ .../ConnectionConfiguration.java | 60 ++++++++++++++++ .../IndexParametersConfiguration.java | 34 +++++++++ .../QueryParameterConfiguration.java | 19 +++++ .../configuration/RetryConfiguration.java | 18 +++++ .../SchedulingParameterConfiguration.java | 32 +++++++++ .../configuration/SearchConfiguration.java | 32 +++++++++ .../configuration/SortingConfiguration.java | 26 +++++++ 11 files changed, 421 insertions(+) create mode 100644 data-prepper-plugins/opensearch-source/build.gradle create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/OpenSearchSource.java create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/OpenSearchSourceConfiguration.java create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/AwsAuthenticationConfiguration.java create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/ConnectionConfiguration.java create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/IndexParametersConfiguration.java create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/QueryParameterConfiguration.java create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/RetryConfiguration.java create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SchedulingParameterConfiguration.java create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SearchConfiguration.java create mode 100644 data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SortingConfiguration.java diff --git a/data-prepper-plugins/opensearch-source/build.gradle b/data-prepper-plugins/opensearch-source/build.gradle new file mode 100644 index 0000000000..efb0e2a4ee --- /dev/null +++ b/data-prepper-plugins/opensearch-source/build.gradle @@ -0,0 +1,28 @@ +plugins { + id 'java' +} + +group 'org.opensearch.dataprepper' +version '2.3.0-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + implementation project(path: ':data-prepper-api') + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' + implementation 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.1' + implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1' + implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1' + implementation "org.apache.commons:commons-lang3:3.12.0" + implementation group: 'org.opensearch.client', name: 'opensearch-rest-client', version: '2.6.0' + implementation 'org.opensearch.client:opensearch-java:2.4.0' + implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148' + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1' +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/OpenSearchSource.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/OpenSearchSource.java new file mode 100644 index 0000000000..6297f7fa40 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/OpenSearchSource.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; + +@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType =OpenSearchSourceConfiguration.class ) +public class OpenSearchSource implements Source> { + + private final OpenSearchSourceConfiguration openSearchSourceConfiguration; + + @DataPrepperPluginConstructor + public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + this.openSearchSourceConfiguration = openSearchSourceConfiguration; + } + + @Override + public void start(Buffer> buffer) { + if (buffer == null) { + throw new IllegalStateException("Buffer provided is null"); + } + startProcess(openSearchSourceConfiguration); + } + + private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + //Yet to implement + } + + @Override + public void stop() { + // Yet to implement + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/OpenSearchSourceConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/OpenSearchSourceConfiguration.java new file mode 100644 index 0000000000..02235bf620 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/OpenSearchSourceConfiguration.java @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.plugins.source.configuration.ConnectionConfiguration; +import org.opensearch.dataprepper.plugins.source.configuration.AwsAuthenticationConfiguration; +import org.opensearch.dataprepper.plugins.source.configuration.IndexParametersConfiguration; +import org.opensearch.dataprepper.plugins.source.configuration.QueryParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.configuration.SchedulingParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.configuration.RetryConfiguration; +import org.opensearch.dataprepper.plugins.source.configuration.SearchConfiguration; + + +import java.util.Map; + +public class OpenSearchSourceConfiguration { + + @JsonProperty("connection") + private ConnectionConfiguration connectionConfiguration; + + @JsonProperty("indices") + private IndexParametersConfiguration indexParametersConfiguration; + + @JsonProperty("aws") + private AwsAuthenticationConfiguration awsAuthenticationOptions; + + @JsonProperty("scheduling") + private SchedulingParameterConfiguration schedulingParameterConfiguration; + + @JsonProperty("query") + private QueryParameterConfiguration queryParameterConfiguration; + + @JsonProperty("search_options") + private SearchConfiguration searchConfiguration; + + @JsonProperty("retry") + private RetryConfiguration retryConfiguration; + + private Map indexNames; + + public ConnectionConfiguration getConnectionConfiguration() { + return connectionConfiguration; + } + + public IndexParametersConfiguration getIndexParametersConfiguration() { + return indexParametersConfiguration; + } + + public AwsAuthenticationConfiguration getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + public SchedulingParameterConfiguration getSchedulingParameterConfiguration() { + return schedulingParameterConfiguration; + } + + public QueryParameterConfiguration getQueryParameterConfiguration() { + return queryParameterConfiguration; + } + + public SearchConfiguration getSearchConfiguration() { + return searchConfiguration; + } + + public Map getIndexNames() { + return indexNames; + } + +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/AwsAuthenticationConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/AwsAuthenticationConfiguration.java new file mode 100644 index 0000000000..bb5820ce3b --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/AwsAuthenticationConfiguration.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import java.util.UUID; + +public class AwsAuthenticationConfiguration { + + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + public AwsCredentialsProvider authenticateAwsConfiguration() { + + final AwsCredentialsProvider awsCredentialsProvider; + if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) { + try { + Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn"); + } + + AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder() + .roleSessionName("OpenSearch-Source" + UUID.randomUUID()).roleArn(awsStsRoleArn); + + awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder().stsClient(getStsClient()) + .refreshRequest(assumeRoleRequestBuilder.build()).build(); + + } else { + awsCredentialsProvider = DefaultCredentialsProvider.create(); + } + return awsCredentialsProvider; + } + + private StsClient getStsClient() { + return StsClient.builder() + .region(getAwsRegion()) + .build(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/ConnectionConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/ConnectionConfiguration.java new file mode 100644 index 0000000000..c1bec8f559 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/ConnectionConfiguration.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotNull; +import java.nio.file.Path; +import java.util.List; + +public class ConnectionConfiguration { + + @NotNull + @JsonProperty("hosts") + private List hosts; + + @JsonProperty("username") + private String username; + + @JsonProperty("password") + private String password; + + @JsonProperty("cert") + private Path certPath; + + @JsonProperty("socket_timeout") + private Integer socketTimeout; + + @JsonProperty("connection_timeout") + private Integer connectTimeout; + + @JsonProperty("insecure") + private boolean insecure; + + List getHosts() { + return hosts; + } + + String getUsername() { + return username; + } + + String getPassword() { + return password; + } + + Path getCertPath() { + return certPath; + } + + Integer getSocketTimeout() { + return socketTimeout; + } + + Integer getConnectTimeout() { + return connectTimeout; + } + +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/IndexParametersConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/IndexParametersConfiguration.java new file mode 100644 index 0000000000..6ae58f22bf --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/IndexParametersConfiguration.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +public class IndexParametersConfiguration { + + @JsonProperty("include") + private List include; + + @JsonProperty("exclude") + private List exclude; + + public void setInclude(List include) { + this.include = include; + } + + public void setExclude(List exclude) { + this.exclude = exclude; + } + + public List getInclude() { + return include; + } + + public List getExclude() { + return exclude; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/QueryParameterConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/QueryParameterConfiguration.java new file mode 100644 index 0000000000..8f1e6b132a --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/QueryParameterConfiguration.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +public class QueryParameterConfiguration { + + @JsonProperty("fields") + private List fields; + + public List getFields() { + return fields; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/RetryConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/RetryConfiguration.java new file mode 100644 index 0000000000..13c3e8fb4c --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/RetryConfiguration.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RetryConfiguration { + + @JsonProperty("max_retries") + private Integer maxRetries; + + public Integer getMaxRetries() { + return maxRetries; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SchedulingParameterConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SchedulingParameterConfiguration.java new file mode 100644 index 0000000000..d27351e7ed --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SchedulingParameterConfiguration.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class SchedulingParameterConfiguration { + + @JsonProperty("rate") + private String rate; + + @JsonProperty("job_count") + private int jobCount; + + @JsonProperty("start_time") + private String startTime; + + public String getRate() { + return rate; + } + + public int getJobCount() { + return jobCount; + } + + public String getStartTime() { + return startTime; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SearchConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SearchConfiguration.java new file mode 100644 index 0000000000..39af0baa88 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SearchConfiguration.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class SearchConfiguration { + + @JsonProperty("batch_size") + private Integer batchSize; + + @JsonProperty("expand_wildcards") + private String expandWildcards; + + @JsonProperty("sorting") + private SortingConfiguration sorting; + + public Integer getBatchSize() { + return batchSize; + } + + public String getExpandWildcards() { + return expandWildcards; + } + + public SortingConfiguration getSorting() { + return sorting; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SortingConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SortingConfiguration.java new file mode 100644 index 0000000000..2d5daacb02 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/configuration/SortingConfiguration.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +public class SortingConfiguration { + + @JsonProperty("sort_key") + private List sortKey; + + @JsonProperty("order") + private String order; + + public List getSortKey() { + return sortKey; + } + + public String getOrder() { + return order; + } +}