Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create OpenSearch source client with auth and lookup version to detect search strategy #2806

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.expression;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -17,13 +18,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.Random;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
Expand All @@ -34,7 +35,6 @@
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.commons.lang3.RandomStringUtils;

class GenericExpressionEvaluator_ConditionalIT {
/**
Expand Down Expand Up @@ -94,7 +94,7 @@ void testConditionalExpressionEvaluator(final String expression, final Event eve
void testGenericExpressionEvaluatorWithMultipleThreads(final String expression, final Event event, final Boolean expected) {
final GenericExpressionEvaluator evaluator = applicationContext.getBean(GenericExpressionEvaluator.class);

final int numberOfThreads = 50;
final int numberOfThreads = 10;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this file being changed here? Why are the number of threads reduced?

Copy link
Member Author

@graytaylor0 graytaylor0 Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That test runs over 50 times, and it was spawning 50 threads for each test. My build failed every time due to that with an error about being out of threads. It didn’t fail after I lowered it. Is this test really necessary for every conditional expression?

final ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

List<Boolean> evaluationResults = Collections.synchronizedList(new ArrayList<>());
Expand Down
6 changes: 6 additions & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
*/
dependencies {
implementation project(path: ':data-prepper-api')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:apache-client'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'org.opensearch.client:opensearch-java:2.4.0'
implementation 'org.opensearch.client:opensearch-rest-client:2.7.0'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation 'org.apache.maven:maven-artifact:3.0.3'
testImplementation testLibs.mockito.inline
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch;

public class OpenSearchIndexProgressState {
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,59 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;

public class OpenSearchService {

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;

private Thread searchWorkerThread;

public static OpenSearchService createOpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
return new OpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer);
}

private OpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
this.sourceCoordinator = sourceCoordinator;
this.sourceCoordinator.initialize();
}

public void start() {
// todo: to implement
// Leverages a runnable (SearchWorker) to perform the querying on the source cluster
switch(searchAccessor.getSearchContextType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NONE is a supported type. We should throw an exception if NONE is returned or we will encounter an NPE on line 52.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do until we end up adding a search worker for this case

case POINT_IN_TIME:
searchWorkerThread = new Thread(new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
break;
case SCROLL:
searchWorkerThread = new Thread(new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
break;
default:
throw new IllegalArgumentException(
String.format("Search context type must be POINT_IN_TIME or SCROLL, type %s was given instead",
searchAccessor.getSearchContextType()));
}

searchWorkerThread.start();
}

public void stop() {
// todo: to implement

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,57 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType =OpenSearchSourceConfiguration.class )
public class OpenSearchSource implements Source<Record<Event>> {
@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType = OpenSearchSourceConfiguration.class )
public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordination {

private final AwsCredentialsSupplier awsCredentialsSupplier;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;

private SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private OpenSearchService openSearchService;

@DataPrepperPluginConstructor
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final AwsCredentialsSupplier awsCredentialsSupplier) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.awsCredentialsSupplier = awsCredentialsSupplier;
}

@Override
public void start(Buffer<Record<Event>> buffer) {
public void start(final Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
startProcess(openSearchSourceConfiguration);
startProcess(openSearchSourceConfiguration, buffer);
}

private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
// todo: implement
// Should leverage OpenSearchService to run the actual plugin core logic.
private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final Buffer<Record<Event>> buffer) {
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(openSearchSourceConfiguration, awsCredentialsSupplier);

final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor();

openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer);
openSearchService.start();
}

@Override
public void stop() {
// Yet to implement
openSearchService.stop();
}

@Override
public <T> void setSourceCoordinator(final SourceCoordinator<T> sourceCoordinator) {
this.sourceCoordinator = (SourceCoordinator<OpenSearchIndexProgressState>) sourceCoordinator;
}

@Override
public Class<?> getPartitionProgressStateClass() {
return OpenSearchIndexProgressState.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class OpenSearchSourceConfiguration {

@JsonProperty("connection")
@Valid
private ConnectionConfiguration connectionConfiguration;
private ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();

@JsonProperty("indices")
@Valid
Expand Down Expand Up @@ -97,7 +97,7 @@ public SearchConfiguration getSearchConfiguration() {
boolean validateAwsConfigWithUsernameAndPassword() {

return !((Objects.nonNull(awsAuthenticationOptions) && (Objects.nonNull(username) || Objects.nonNull(password))) ||
(Objects.isNull(awsAuthenticationOptions) && Objects.isNull(username) && Objects.isNull(password)));
(Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,9 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

public class AwsAuthenticationConfiguration {
private static final String AWS_IAM_ROLE = "role";
Expand All @@ -35,25 +27,6 @@ public class AwsAuthenticationConfiguration {
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

private void validateStsRoleArn() {
final Arn arn = getArn();
if (!AWS_IAM.equals(arn.service())) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
final Optional<String> resourceType = arn.resource().resourceType();
if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for aws.sts_role_arn. Check the format of %s", awsStsRoleArn));
}
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}
Expand All @@ -62,36 +35,8 @@ public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public AwsCredentialsProvider authenticateAwsConfiguration() {

final AwsCredentialsProvider awsCredentialsProvider;
if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) {

validateStsRoleArn();

final StsClient stsClient = StsClient.builder()
.region(getAwsRegion())
.build();

AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder()
.roleSessionName("OpenSearch-Source-" + UUID.randomUUID())
.roleArn(awsStsRoleArn);
if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) {
assumeRoleRequestBuilder = assumeRoleRequestBuilder
.overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader));
}

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build())
.build();

} else {
// use default credential provider
awsCredentialsProvider = DefaultCredentialsProvider.create();
}

return awsCredentialsProvider;
public Map<String, String> getAwsStsHeaderOverrides() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for pulling this work in too.

return awsStsHeaderOverrides;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,36 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;

/**
* PitWorker polls the source cluster via Point-In-Time contexts.
*/
public class PitWorker implements Runnable {
public class PitWorker implements SearchWorker, Runnable {

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;

public PitWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer) {
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
}

@Override
public void run() {
//todo: to implement
// todo: implement
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,36 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;

/**
* ScrollWorker polls the source cluster via scroll contexts.
*/
public class ScrollWorker implements Runnable {
public class ScrollWorker implements SearchWorker {

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;

public ScrollWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.sourceCoordinator = sourceCoordinator;
this.buffer = buffer;
}

@Override
public void run() {
//todo: to implement
// todo: implement
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

public interface SearchWorker extends Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change. Thanks for making this more extendable.

}
Loading