Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…y: rajeshLovesToCode <[email protected]>

Signed-off-by: rajeshLovesToCode <[email protected]>
  • Loading branch information
rajeshLovesToCode committed May 18, 2023
1 parent a540932 commit 2eefca4
Show file tree
Hide file tree
Showing 20 changed files with 958 additions and 0 deletions.
16 changes: 16 additions & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
dependencies {
implementation project(path: ':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType =OpenSearchSourceConfiguration.class )
public class OpenSearchSource implements Source<Record<Event>> {

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;

@DataPrepperPluginConstructor
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
}

@Override
public void start(Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
startProcess(openSearchSourceConfiguration);
}

private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
//Yet to implement
}

@Override
public void stop() {
// Yet to implement
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.QueryParameterConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;

import java.util.List;

public class OpenSearchSourceConfiguration {

@JsonProperty("max_retries")
private Integer maxRetries;

@NotNull
@JsonProperty("hosts")
private List<String> hosts;

@JsonProperty("username")
private String username;

@JsonProperty("password")
private String password;

@JsonProperty("connection")
private ConnectionConfiguration connectionConfiguration;

@JsonProperty("indices")
private IndexParametersConfiguration indexParametersConfiguration;

@JsonProperty("aws")
private AwsAuthenticationConfiguration awsAuthenticationOptions;

@JsonProperty("scheduling")
private SchedulingParameterConfiguration schedulingParameterConfiguration;

@JsonProperty("query")
private QueryParameterConfiguration queryParameterConfiguration;

@JsonProperty("search_options")
private SearchConfiguration searchConfiguration;

public Integer getMaxRetries() {
return maxRetries;
}

public List<String> getHosts() {
return hosts;
}

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}

public ConnectionConfiguration getConnectionConfiguration() {
return connectionConfiguration;
}

public IndexParametersConfiguration getIndexParametersConfiguration() {
return indexParametersConfiguration;
}

public AwsAuthenticationConfiguration getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
}

public SchedulingParameterConfiguration getSchedulingParameterConfiguration() {
return schedulingParameterConfiguration;
}

public QueryParameterConfiguration getQueryParameterConfiguration() {
return queryParameterConfiguration;
}

public SearchConfiguration getSearchConfiguration() {
return searchConfiguration;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

public class AwsAuthenticationConfiguration {
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

private void validateStsRoleArn() {
final Arn arn = getArn();
if (!AWS_IAM.equals(arn.service())) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
final Optional<String> resourceType = arn.resource().resourceType();
if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
}
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public AwsCredentialsProvider authenticateAwsConfiguration() {

final AwsCredentialsProvider awsCredentialsProvider;
if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) {

validateStsRoleArn();

final StsClient stsClient = StsClient.builder()
.region(getAwsRegion())
.build();

AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder()
.roleSessionName("OpenSearch-Source-" + UUID.randomUUID())
.roleArn(awsStsRoleArn);
if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) {
assumeRoleRequestBuilder = assumeRoleRequestBuilder
.overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader));
}

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build())
.build();

} else {
// use default credential provider
awsCredentialsProvider = DefaultCredentialsProvider.create();
}

return awsCredentialsProvider;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.nio.file.Path;

public class ConnectionConfiguration {

@JsonProperty("cert")
private Path certPath;

@JsonProperty("socket_timeout")
private Integer socketTimeout;

@JsonProperty("connection_timeout")
private Integer connectTimeout;

@JsonProperty("insecure")
private boolean insecure;

public Path getCertPath() {
return certPath;
}

public Integer getSocketTimeout() {
return socketTimeout;
}

public Integer getConnectTimeout() {
return connectTimeout;
}

public boolean isInsecure() {
return insecure;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;

public class IndexParametersConfiguration {

@JsonProperty("include")
private List<String> include;

@JsonProperty("exclude")
private List<String> exclude;

public void setInclude(List<String> include) {
this.include = include;
}

public void setExclude(List<String> exclude) {
this.exclude = exclude;
}

public List<String> getInclude() {
return include;
}

public List<String> getExclude() {
return exclude;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;

public class QueryParameterConfiguration {

@JsonProperty("fields")
private List<String> fields;

public List<String> getFields() {
return fields;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import jakarta.validation.constraints.Min;

import java.time.Duration;
import java.time.LocalDateTime;

public class SchedulingParameterConfiguration {

@JsonProperty("rate")
private Duration rate;

@Min(1)
@JsonProperty("job_count")
private int jobCount = 1;

@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonProperty("start_time")
private LocalDateTime startTime = LocalDateTime.now();

public Duration getRate() {
return rate;
}

public int getJobCount() {
return jobCount;
}

public LocalDateTime getStartTime() {
return startTime;
}
}
Loading

0 comments on commit 2eefca4

Please sign in to comment.