Skip to content

Commit

Permalink
opensearch configuration changes for opensearch-project#1985,2264
Browse files Browse the repository at this point in the history
Signed-off-by: rajeshLovesToCode <[email protected]>
  • Loading branch information
rajeshLovesToCode committed May 9, 2023
1 parent 7ef5ab4 commit fd9d714
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 0 deletions.
28 changes: 28 additions & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
plugins {
id 'java'
}

group 'org.opensearch.dataprepper'
version '2.3.0-SNAPSHOT'

repositories {
mavenCentral()
}

dependencies {
implementation project(path: ':data-prepper-api')
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.1'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation group: 'org.opensearch.client', name: 'opensearch-rest-client', version: '2.6.0'
implementation 'org.opensearch.client:opensearch-java:2.4.0'
implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType =OpenSearchSourceConfiguration.class )
public class OpenSearchSource implements Source<Record<Event>> {

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;

@DataPrepperPluginConstructor
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
}

@Override
public void start(Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
startProcess(openSearchSourceConfiguration);
}

private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
//Yet to implement
}

@Override
public void stop() {
// Yet to implement
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.configuration.ConnectionConfiguration;
import org.opensearch.dataprepper.plugins.source.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.source.configuration.IndexParametersConfiguration;
import org.opensearch.dataprepper.plugins.source.configuration.QueryParameterConfiguration;
import org.opensearch.dataprepper.plugins.source.configuration.SchedulingParameterConfiguration;
import org.opensearch.dataprepper.plugins.source.configuration.RetryConfiguration;
import org.opensearch.dataprepper.plugins.source.configuration.SearchConfiguration;


import java.util.Map;

public class OpenSearchSourceConfiguration {

@JsonProperty("connection")
private ConnectionConfiguration connectionConfiguration;

@JsonProperty("indices")
private IndexParametersConfiguration indexParametersConfiguration;

@JsonProperty("aws")
private AwsAuthenticationConfiguration awsAuthenticationOptions;

@JsonProperty("scheduling")
private SchedulingParameterConfiguration schedulingParameterConfiguration;

@JsonProperty("query")
private QueryParameterConfiguration queryParameterConfiguration;

@JsonProperty("search_options")
private SearchConfiguration searchConfiguration;

@JsonProperty("retry")
private RetryConfiguration retryConfiguration;

private Map<String,String> indexNames;

public ConnectionConfiguration getConnectionConfiguration() {
return connectionConfiguration;
}

public IndexParametersConfiguration getIndexParametersConfiguration() {
return indexParametersConfiguration;
}

public AwsAuthenticationConfiguration getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
}

public SchedulingParameterConfiguration getSchedulingParameterConfiguration() {
return schedulingParameterConfiguration;
}

public QueryParameterConfiguration getQueryParameterConfiguration() {
return queryParameterConfiguration;
}

public SearchConfiguration getSearchConfiguration() {
return searchConfiguration;
}

public Map<String, String> getIndexNames() {
return indexNames;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import java.util.UUID;

public class AwsAuthenticationConfiguration {

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public AwsCredentialsProvider authenticateAwsConfiguration() {

final AwsCredentialsProvider awsCredentialsProvider;
if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) {
try {
Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException("Invalid ARN format for awsStsRoleArn");
}

AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder()
.roleSessionName("OpenSearch-Source" + UUID.randomUUID()).roleArn(awsStsRoleArn);

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder().stsClient(getStsClient())
.refreshRequest(assumeRoleRequestBuilder.build()).build();

} else {
awsCredentialsProvider = DefaultCredentialsProvider.create();
}
return awsCredentialsProvider;
}

private StsClient getStsClient() {
return StsClient.builder()
.region(getAwsRegion())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
import java.nio.file.Path;
import java.util.List;

public class ConnectionConfiguration {

@NotNull
@JsonProperty("hosts")
private List<String> hosts;

@JsonProperty("username")
private String username;

@JsonProperty("password")
private String password;

@JsonProperty("cert")
private Path certPath;

@JsonProperty("socket_timeout")
private Integer socketTimeout;

@JsonProperty("connection_timeout")
private Integer connectTimeout;

@JsonProperty("insecure")
private boolean insecure;

List<String> getHosts() {
return hosts;
}

String getUsername() {
return username;
}

String getPassword() {
return password;
}

Path getCertPath() {
return certPath;
}

Integer getSocketTimeout() {
return socketTimeout;
}

Integer getConnectTimeout() {
return connectTimeout;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;

public class IndexParametersConfiguration {

@JsonProperty("include")
private List<String> include;

@JsonProperty("exclude")
private List<String> exclude;

public void setInclude(List<String> include) {
this.include = include;
}

public void setExclude(List<String> exclude) {
this.exclude = exclude;
}

public List<String> getInclude() {
return include;
}

public List<String> getExclude() {
return exclude;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;

public class QueryParameterConfiguration {

@JsonProperty("fields")
private List<String> fields;

public List<String> getFields() {
return fields;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;

public class RetryConfiguration {

@JsonProperty("max_retries")
private Integer maxRetries;

public Integer getMaxRetries() {
return maxRetries;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;

public class SchedulingParameterConfiguration {

@JsonProperty("rate")
private String rate;

@JsonProperty("job_count")
private int jobCount;

@JsonProperty("start_time")
private String startTime;

public String getRate() {
return rate;
}

public int getJobCount() {
return jobCount;
}

public String getStartTime() {
return startTime;
}
}
Loading

0 comments on commit fd9d714

Please sign in to comment.