Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create OpenSearch source client with auth and lookup version to detect search strategy #2806

Merged

Conversation

graytaylor0
Copy link
Member

@graytaylor0 graytaylor0 commented Jun 2, 2023

Description

This change mainly focused on implementing the OpenSearch client with auth and detecting the search strategy through a version lookup in SearchAccessorStrategy. The starting implementation here will only allow opensearch distributions for now.

Much of the code for the client construction and ssl was taken from the OpenSearch sink.

Tested with both a self managed cluster and Amazon OpenSearch domain successfully. I have not manually tested the cert and insecure: false

Issues Resolved

[List any issues this PR will resolve]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* and {@link AwsCredentialsProvider}. This is a copy from the opensearch sink
*/
final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create an issue to eliminate the code duplication between these two plugins and add unit tests

Copy link
Member Author

@graytaylor0 graytaylor0 Jun 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. I started with trying to use the Extension Points for this class but it wasn't working and started taking up a lot of time. Will create the issue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

public class X509TrustAllManager implements X509TrustManager {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return new OpenSearchAccessor(openSearchClient, searchContextType);
}

private RestClient createOpenSearchRestClient() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 43 to 50
switch(searchAccessor.getSearchContextType()) {
case POINT_IN_TIME:
searchWorkerThread = new Thread(new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
break;
case SCROLL:
searchWorkerThread = new Thread(new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer));
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a future CR, let's pull this logic into a strategy class that is deleted in this PR and do something like this here. It will be easier to test this control logic separate from thread creation:

final SearchWorker worker = SearchWorkerStrategy.get(searchAccessor, ...);
searchWorkerThread = new Thread(worker);
searchWorkerThread.start()

}

return awsCredentialsProvider;
public Map<String, String> getAwsStsHeaderOverrides() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for pulling this work in too.

throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can delete this too. I don't see it used anywhere now.

Comment on lines 14 to 37
//todo: to implement

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a todo ;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha it doesn't look done?


package org.opensearch.dataprepper.plugins.source.opensearch.worker;

public interface SearchWorker extends Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this change. Thanks for making this more extendable.

* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* and {@link AwsCredentialsProvider}. This is a copy from the opensearch sink
*/
final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create an issue to eliminate the code duplication between these two plugins and add unit tests

@@ -13,14 +59,242 @@
*/
public class SearchAccessorStrategy {

private static final Logger LOG = LoggerFactory.getLogger(SearchAccessorStrategy.class);

private static final String AOS_SERVICE_NAME = "es";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a future PR. We should support AOSS as well.

public void start() {
// todo: to implement
// Leverages a runnable (SearchWorker) to perform the querying on the source cluster
switch(searchAccessor.getSearchContextType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NONE is a supported type. We should throw an exception if NONE is returned or we will encounter an NPE on line 52.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do until we end up adding a search worker for this case

Comment on lines 51 to 68
@Test
void getAwsRegion_returns_Region_of() throws NoSuchFieldException, IllegalAccessException {
final String regionString = UUID.randomUUID().toString();
final Region expectedRegionObject = mock(Region.class);
reflectivelySetField(awsAuthenticationOptions, "awsRegion", regionString);
final Region actualRegion;
try(final MockedStatic<Region> regionMockedStatic = mockStatic(Region.class)) {
regionMockedStatic.when(() -> Region.of(regionString)).thenReturn(expectedRegionObject);
actualRegion = awsAuthenticationOptions.getAwsRegion();
}
assertThat(actualRegion, equalTo(expectedRegionObject));
}

@Test
void getAwsRegion_returns_null_when_region_is_null() throws NoSuchFieldException, IllegalAccessException {
reflectivelySetField(awsAuthenticationOptions, "awsRegion", null);
assertThat(awsAuthenticationOptions.getAwsRegion(), nullValue());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep these tests around and add tests for the getter methods in future CRs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah will add these basic getters back

Signed-off-by: Taylor Gray <[email protected]>
@graytaylor0 graytaylor0 requested a review from cmanning09 June 2, 2023 17:15
@@ -94,7 +94,7 @@ void testConditionalExpressionEvaluator(final String expression, final Event eve
void testGenericExpressionEvaluatorWithMultipleThreads(final String expression, final Event event, final Boolean expected) {
final GenericExpressionEvaluator evaluator = applicationContext.getBean(GenericExpressionEvaluator.class);

final int numberOfThreads = 50;
final int numberOfThreads = 10;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this file being changed here? Why are the number of threads reduced?

Copy link
Member Author

@graytaylor0 graytaylor0 Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That test runs over 50 times, and it was spawning 50 threads for each test. My build failed every time due to that with an error about being out of threads. It didn’t fail after I lowered it. Is this test really necessary for every conditional expression?

@graytaylor0 graytaylor0 merged commit c2d7767 into opensearch-project:main Jun 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants