Skip to content

Commit

Permalink
Create OpenSearch source client with auth and lookup version to detec…
Browse files Browse the repository at this point in the history
…t search strategy (#2806)

Create OpenSearch source client with auth and lookup version to detect search strategy

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Jun 3, 2023
1 parent 40e60fb commit c2d7767
Show file tree
Hide file tree
Showing 23 changed files with 1,000 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.expression;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -17,13 +18,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.Random;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
Expand All @@ -34,7 +35,6 @@
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.commons.lang3.RandomStringUtils;

class GenericExpressionEvaluator_ConditionalIT {
/**
Expand Down Expand Up @@ -94,7 +94,7 @@ void testConditionalExpressionEvaluator(final String expression, final Event eve
void testGenericExpressionEvaluatorWithMultipleThreads(final String expression, final Event event, final Boolean expected) {
final GenericExpressionEvaluator evaluator = applicationContext.getBean(GenericExpressionEvaluator.class);

final int numberOfThreads = 50;
final int numberOfThreads = 10;
final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

List<Boolean> evaluationResults = Collections.synchronizedList(new ArrayList<>());
Expand Down
6 changes: 6 additions & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
*/
dependencies {
implementation project(path: ':data-prepper-api')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:apache-client'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'org.opensearch.client:opensearch-java:2.4.0'
implementation 'org.opensearch.client:opensearch-rest-client:2.7.0'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation 'org.apache.maven:maven-artifact:3.0.3'
testImplementation testLibs.mockito.inline
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch;

public class OpenSearchIndexProgressState {
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,59 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;

public class OpenSearchService {

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;

private Thread searchWorkerThread;

public static OpenSearchService createOpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
return new OpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer);
}

private OpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
this.sourceCoordinator = sourceCoordinator;
this.sourceCoordinator.initialize();
}

public void start() {
// todo: to implement
// Leverages a runnable (SearchWorker) to perform the querying on the source cluster
switch(searchAccessor.getSearchContextType()) {
case POINT_IN_TIME:
searchWorkerThread = new Thread(new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
break;
case SCROLL:
searchWorkerThread = new Thread(new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
break;
default:
throw new IllegalArgumentException(
String.format("Search context type must be POINT_IN_TIME or SCROLL, type %s was given instead",
searchAccessor.getSearchContextType()));
}

searchWorkerThread.start();
}

public void stop() {
// todo: to implement

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,57 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType =OpenSearchSourceConfiguration.class )
public class OpenSearchSource implements Source<Record<Event>> {
@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType = OpenSearchSourceConfiguration.class )
public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordination {

private final AwsCredentialsSupplier awsCredentialsSupplier;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;

private SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private OpenSearchService openSearchService;

@DataPrepperPluginConstructor
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final AwsCredentialsSupplier awsCredentialsSupplier) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.awsCredentialsSupplier = awsCredentialsSupplier;
}

@Override
public void start(Buffer<Record<Event>> buffer) {
public void start(final Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
startProcess(openSearchSourceConfiguration);
startProcess(openSearchSourceConfiguration, buffer);
}

private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
// todo: implement
// Should leverage OpenSearchService to run the actual plugin core logic.
private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final Buffer<Record<Event>> buffer) {
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(openSearchSourceConfiguration, awsCredentialsSupplier);

final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor();

openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer);
openSearchService.start();
}

@Override
public void stop() {
// Yet to implement
openSearchService.stop();
}

@Override
public <T> void setSourceCoordinator(final SourceCoordinator<T> sourceCoordinator) {
this.sourceCoordinator = (SourceCoordinator<OpenSearchIndexProgressState>) sourceCoordinator;
}

@Override
public Class<?> getPartitionProgressStateClass() {
return OpenSearchIndexProgressState.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class OpenSearchSourceConfiguration {

@JsonProperty("connection")
@Valid
private ConnectionConfiguration connectionConfiguration;
private ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();

@JsonProperty("indices")
@Valid
Expand Down Expand Up @@ -97,7 +97,7 @@ public SearchConfiguration getSearchConfiguration() {
boolean validateAwsConfigWithUsernameAndPassword() {

return !((Objects.nonNull(awsAuthenticationOptions) && (Objects.nonNull(username) || Objects.nonNull(password))) ||
(Objects.isNull(awsAuthenticationOptions) && Objects.isNull(username) && Objects.isNull(password)));
(Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,9 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

public class AwsAuthenticationConfiguration {
private static final String AWS_IAM_ROLE = "role";
Expand All @@ -35,25 +27,6 @@ public class AwsAuthenticationConfiguration {
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

private void validateStsRoleArn() {
final Arn arn = getArn();
if (!AWS_IAM.equals(arn.service())) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
final Optional<String> resourceType = arn.resource().resourceType();
if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for aws.sts_role_arn. Check the format of %s", awsStsRoleArn));
}
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}
Expand All @@ -62,36 +35,8 @@ public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public AwsCredentialsProvider authenticateAwsConfiguration() {

final AwsCredentialsProvider awsCredentialsProvider;
if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) {

validateStsRoleArn();

final StsClient stsClient = StsClient.builder()
.region(getAwsRegion())
.build();

AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder()
.roleSessionName("OpenSearch-Source-" + UUID.randomUUID())
.roleArn(awsStsRoleArn);
if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) {
assumeRoleRequestBuilder = assumeRoleRequestBuilder
.overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader));
}

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build())
.build();

} else {
// use default credential provider
awsCredentialsProvider = DefaultCredentialsProvider.create();
}

return awsCredentialsProvider;
public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,36 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;

/**
* PitWorker polls the source cluster via Point-In-Time contexts.
*/
public class PitWorker implements Runnable {
public class PitWorker implements SearchWorker, Runnable {

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;

public PitWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer) {
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
}

@Override
public void run() {
//todo: to implement
// todo: implement
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,36 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;

/**
* ScrollWorker polls the source cluster via scroll contexts.
*/
public class ScrollWorker implements Runnable {
public class ScrollWorker implements SearchWorker {

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;

public ScrollWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.sourceCoordinator = sourceCoordinator;
this.buffer = buffer;
}

@Override
public void run() {
//todo: to implement
// todo: implement
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

public interface SearchWorker extends Runnable {
}
Loading

0 comments on commit c2d7767

Please sign in to comment.