diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator_ConditionalIT.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator_ConditionalIT.java index ffa78d5fa4..55ceb338a8 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator_ConditionalIT.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator_ConditionalIT.java @@ -27,13 +27,13 @@ import java.util.stream.Stream; import static org.awaitility.Awaitility.await; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.jupiter.api.Assertions.assertThrows; class GenericExpressionEvaluator_ConditionalIT { diff --git a/data-prepper-plugins/opensearch-source/build.gradle b/data-prepper-plugins/opensearch-source/build.gradle index 7310eb1d10..15cbd84e58 100644 --- a/data-prepper-plugins/opensearch-source/build.gradle +++ b/data-prepper-plugins/opensearch-source/build.gradle @@ -11,7 +11,7 @@ dependencies { implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:sts' testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' - implementation 'org.opensearch.client:opensearch-java:2.4.0' + implementation 'org.opensearch.client:opensearch-java:2.5.0' implementation 'org.opensearch.client:opensearch-rest-client:2.7.0' implementation "org.apache.commons:commons-lang3:3.12.0" implementation 'org.apache.maven:maven-artifact:3.0.3' diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java index a67976130d..a55ddfea41 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java @@ -5,5 +5,57 @@ package org.opensearch.dataprepper.plugins.source.opensearch; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.Instant; +import java.util.Objects; + public class OpenSearchIndexProgressState { + + private String pitId; + private Long pitCreationTime; + private Long keepAlive; + + public OpenSearchIndexProgressState() { + + } + + @JsonCreator + public OpenSearchIndexProgressState(@JsonProperty("pit_id") final String pitId, + @JsonProperty("pit_creation_time") final Long pitCreationTime, + @JsonProperty("pit_keep_alive") final Long pitKeepAlive) { + this.pitId = pitId; + this.pitCreationTime = pitCreationTime; + this.keepAlive = pitKeepAlive; + } + + public String getPitId() { + return pitId; + } + + public void setPitId(final String pitId) { + this.pitId = pitId; + } + + public Long getPitCreationTime() { + return pitCreationTime; + } + + public void setPitCreationTime(final Long pitCreationTime) { + this.pitCreationTime = pitCreationTime; + } + + public Long getKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(final Long keepAlive) { + this.keepAlive = keepAlive; + } + + public boolean hasValidPointInTime() { + return Objects.nonNull(pitId) && Objects.nonNull(pitCreationTime) && Objects.nonNull(keepAlive) + && Instant.ofEpochMilli(pitCreationTime + keepAlive).isAfter(Instant.now()); + } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java index 4c4a1ec058..d1ff5e55e3 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java @@ -12,8 +12,6 @@ import java.util.Map; public class AwsAuthenticationConfiguration { - private static final String AWS_IAM_ROLE = "role"; - private static final String AWS_IAM = "iam"; @JsonProperty("region") @Size(min = 1, message = "Region cannot be empty string") @@ -27,6 +25,9 @@ public class AwsAuthenticationConfiguration { @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") private Map awsStsHeaderOverrides; + @JsonProperty("enable_sigv4") + private Boolean sigv4Enabled = false; + public String getAwsStsRoleArn() { return awsStsRoleArn; } @@ -38,5 +39,7 @@ public Region getAwsRegion() { public Map getAwsStsHeaderOverrides() { return awsStsHeaderOverrides; } + + public Boolean isSigv4Enabled() { return sigv4Enabled; } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index 0161185661..da9f3eed0e 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -9,12 +9,20 @@ import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Optional; /** @@ -31,6 +39,8 @@ public class PitWorker implements SearchWorker, Runnable { private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; private static final int STANDARD_BACKOFF_MILLIS = 30_000; + static final String DEFAULT_KEEP_ALIVE = "30m"; // 30 minutes (will be extended during search) + private static final Duration BACKOFF_ON_PIT_LIMIT_REACHED = Duration.ofSeconds(60); public PitWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, @@ -53,18 +63,69 @@ public void run() { try { Thread.sleep(STANDARD_BACKOFF_MILLIS); continue; - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.info("The PitWorker was interrupted while sleeping after acquiring no indices to process, stopping processing"); return; } } - // todo: process the index and write to the buffer + try { + processIndex(indexPartition.get()); - sourceCoordinator.closePartition( - indexPartition.get().getPartitionKey(), - openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(), - openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount()); + sourceCoordinator.closePartition( + indexPartition.get().getPartitionKey(), + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(), + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount()); + } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { + LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); + sourceCoordinator.giveUpPartitions(); + } catch (final SearchContextLimitException e) { + LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}", + indexPartition.get().getPartitionKey(), BACKOFF_ON_PIT_LIMIT_REACHED.getSeconds(), e.getMessage()); + sourceCoordinator.giveUpPartitions(); + try { + Thread.sleep(BACKOFF_ON_PIT_LIMIT_REACHED.toMillis()); + } catch (final InterruptedException ex) { + return; + } + } catch (final RuntimeException e) { + LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e); + sourceCoordinator.giveUpPartitions(); + } + } + } + + private void processIndex(final SourcePartition openSearchIndexPartition) { + final String indexName = openSearchIndexPartition.getPartitionKey(); + Optional openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState(); + + if (openSearchIndexProgressStateOptional.isEmpty()) { + openSearchIndexProgressStateOptional = Optional.of(initializeProgressState()); + } + + final OpenSearchIndexProgressState openSearchIndexProgressState = openSearchIndexProgressStateOptional.get(); + + if (!openSearchIndexProgressState.hasValidPointInTime()) { + final CreatePointInTimeResponse createPointInTimeResponse = searchAccessor.createPit(CreatePointInTimeRequest.builder() + .withIndex(indexName) + .withKeepAlive(DEFAULT_KEEP_ALIVE) + .build()); + + LOG.debug("Created point in time for index {} with pit id {}", indexName, createPointInTimeResponse.getPitId()); + + openSearchIndexProgressState.setPitId(createPointInTimeResponse.getPitId()); + openSearchIndexProgressState.setPitCreationTime(createPointInTimeResponse.getPitCreationTime()); + openSearchIndexProgressState.setKeepAlive(createPointInTimeResponse.getKeepAlive()); } + + // todo: Implement search with pit and write documents to buffer, checkpoint with calls to saveState + + + // todo: This API call is failing with sigv4 enabled due to a mismatch in the signature + searchAccessor.deletePit(DeletePointInTimeRequest.builder().withPitId(openSearchIndexProgressState.getPitId()).build()); + } + + private OpenSearchIndexProgressState initializeProgressState() { + return new OpenSearchIndexProgressState(); } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java index dbc83b1127..5bfe6d9ea4 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java @@ -4,11 +4,11 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitRequest; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePitRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest; @@ -24,7 +24,7 @@ public SearchContextType getSearchContextType() { } @Override - public CreatePitResponse createPit(CreatePitRequest createPitRequest) { + public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest createPointInTimeRequest) { //todo: implement return null; } @@ -36,7 +36,7 @@ public SearchPitResponse searchWithPit(SearchPitRequest searchPitRequest) { } @Override - public void deletePit(DeletePitRequest deletePitRequest) { + public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) { //todo: implement } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index bb149725ca..c49580daca 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -5,20 +5,37 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitRequest; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitResponse; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch._types.Time; +import org.opensearch.client.opensearch.core.pit.CreatePitRequest; +import org.opensearch.client.opensearch.core.pit.CreatePitResponse; +import org.opensearch.client.opensearch.core.pit.DeletePitRequest; +import org.opensearch.client.opensearch.core.pit.DeletePitResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePitRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Objects; public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAccessor.class); + + static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception"; + private final OpenSearchClient openSearchClient; private final SearchContextType searchContextType; @@ -33,36 +50,68 @@ public SearchContextType getSearchContextType() { } @Override - public CreatePitResponse createPit(CreatePitRequest createPitRequest) { - //todo: implement - return null; + public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest createPointInTimeRequest) { + CreatePitResponse createPitResponse; + try { + createPitResponse = openSearchClient.createPit(CreatePitRequest.of(builder -> builder + .targetIndexes(createPointInTimeRequest.getIndex()) + .keepAlive(new Time.Builder().time(createPointInTimeRequest.getKeepAlive()).build()))); + } catch (final OpenSearchException e) { + if (isDueToPitLimitExceeded(e)) { + throw new SearchContextLimitException(String.format("There was an error creating a new point in time for index '%s': %s", createPointInTimeRequest.getIndex(), + e.error().causedBy().reason())); + } + LOG.error("There was an error creating a point in time for OpenSearch: ", e); + throw e; + } catch (final IOException e) { + LOG.error("There was an error creating a point in time for OpenSearch: ", e); + throw new RuntimeException(e); + } + + return CreatePointInTimeResponse.builder() + .withPitId(createPitResponse.pitId()) + .withCreationTime(createPitResponse.creationTime()) + .build(); } @Override - public SearchPitResponse searchWithPit(SearchPitRequest searchPitRequest) { - //todo: implement + public SearchPitResponse searchWithPit(final SearchPitRequest searchPitRequest) { + // todo: implement return null; } @Override - public void deletePit(DeletePitRequest deletePitRequest) { - //todo: implement + public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) { + try { + final DeletePitResponse deletePitResponse = openSearchClient.deletePit(DeletePitRequest.of(builder -> builder.pitId(Collections.singletonList(deletePointInTimeRequest.getPitId())))); + if (isPitDeletedSuccessfully(deletePitResponse)) { + LOG.debug("Successfully deleted point in time id {}", deletePointInTimeRequest.getPitId()); + } else { + LOG.warn("Point in time id {} was not deleted successfully. It will expire from keep-alive", deletePointInTimeRequest.getPitId()); + } + } catch (final OpenSearchException e) { + LOG.error("There was an error deleting the point in time with id {} for OpenSearch: ", deletePointInTimeRequest.getPitId(), e); + throw e; + } catch (IOException e) { + LOG.error("There was an error deleting the point in time with id {} for OpenSearch: {}", deletePointInTimeRequest.getPitId(), e.getMessage()); + throw new RuntimeException(e); + } } @Override - public CreateScrollResponse createScroll(CreateScrollRequest createScrollRequest) { + public CreateScrollResponse createScroll(final CreateScrollRequest createScrollRequest) { //todo: implement return null; } @Override - public SearchScrollResponse searchWithScroll(SearchScrollRequest searchScrollRequest) { + public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScrollRequest) { //todo: implement return null; } @Override - public void deleteScroll(DeleteScrollRequest deleteScrollRequest) { + public void deleteScroll(final DeleteScrollRequest deleteScrollRequest) { //todo: implement } @@ -70,4 +119,14 @@ public void deleteScroll(DeleteScrollRequest deleteScrollRequest) { public Object getClient() { return openSearchClient; } + + private boolean isPitDeletedSuccessfully(final DeletePitResponse deletePitResponse) { + return Objects.nonNull(deletePitResponse.pits()) && deletePitResponse.pits().size() == 1 + && Objects.nonNull(deletePitResponse.pits().get(0)) && deletePitResponse.pits().get(0).successful(); + } + + private boolean isDueToPitLimitExceeded(final OpenSearchException e) { + return Objects.nonNull(e.error()) && Objects.nonNull(e.error().causedBy()) && Objects.nonNull(e.error().causedBy().type()) + && PIT_RESOURCE_LIMIT_ERROR_TYPE.equals(e.error().causedBy().type()); + } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessor.java index 8c451e6995..b7550edc3e 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessor.java @@ -4,11 +4,11 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitRequest; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePitRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest; @@ -30,11 +30,11 @@ public interface SearchAccessor { /** * Creates a Point-In-Time (PIT) context for searching - * @param createPitRequest the request for creating the PIT context - * @return + * @param createPointInTimeRequest - The {@link CreatePointInTimeRequest} details + * @return {@link CreatePointInTimeResponse} the result of the PIT creation * @since 2.4 */ - CreatePitResponse createPit(CreatePitRequest createPitRequest); + CreatePointInTimeResponse createPit(CreatePointInTimeRequest createPointInTimeRequest); /** * Searches using a PIT context @@ -46,10 +46,10 @@ public interface SearchAccessor { /** * Deletes PITs - * @param deletePitRequest contains the payload for deleting PIT contexts + * @param deletePointInTimeRequest The information on which point in time to delete * @since 2.4 */ - void deletePit(DeletePitRequest deletePitRequest); + void deletePit(DeletePointInTimeRequest deletePointInTimeRequest); /** * Creates scroll context diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java index 3a382b67d7..d7b19e4c63 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java @@ -228,7 +228,7 @@ private void setConnectAndSocketTimeout(final RestClientBuilder restClientBuilde private OpenSearchTransport createOpenSearchTransport(final RestClient restClient) { - if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) { + if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) && openSearchSourceConfiguration.getAwsAuthenticationOptions().isSigv4Enabled()) { final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() .withRegion(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion()) .withStsRoleArn(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsRoleArn()) diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/exceptions/SearchContextLimitException.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/exceptions/SearchContextLimitException.java new file mode 100644 index 0000000000..1605914937 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/exceptions/SearchContextLimitException.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions; + +public class SearchContextLimitException extends RuntimeException { + public SearchContextLimitException(final String message, final Throwable cause) { + super(message, cause); + } + + public SearchContextLimitException(final String message) { super (message); } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePitRequest.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePitRequest.java deleted file mode 100644 index 86d1e6728b..0000000000 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePitRequest.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; - -public class CreatePitRequest { - - // todo: model after https://opensearch.org/docs/latest/search-plugins/point-in-time-api/#create-a-pit & - // https://www.elastic.co/guide/en/elasticsearch/reference/7.10/point-in-time-api.html -} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePitResponse.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePitResponse.java deleted file mode 100644 index 513c5b8d32..0000000000 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePitResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; - -public class CreatePitResponse { - - //todo: model after https://opensearch.org/docs/latest/search-plugins/point-in-time-api/#create-a-pit - // & https://www.elastic.co/guide/en/elasticsearch/reference/7.10/point-in-time-api.html -} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePointInTimeRequest.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePointInTimeRequest.java new file mode 100644 index 0000000000..c0b95adf32 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePointInTimeRequest.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; + +public class CreatePointInTimeRequest { + + private final String index; + private final String keepAlive; + + public String getIndex() { + return index; + } + + public String getKeepAlive() { return keepAlive; } + + private CreatePointInTimeRequest(final Builder builder) { + this.index = builder.index; + this.keepAlive = builder.keepAlive; + } + + public static CreatePointInTimeRequest.Builder builder() { + return new Builder(); + } + + public static class Builder { + + private String index; + private String keepAlive; + + public Builder() { + + } + + public CreatePointInTimeRequest.Builder withIndex(final String index) { + this.index = index; + return this; + } + + public CreatePointInTimeRequest.Builder withKeepAlive(final String keepAlive) { + this.keepAlive = keepAlive; + return this; + } + + public CreatePointInTimeRequest build() { + return new CreatePointInTimeRequest(this); + } + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePointInTimeResponse.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePointInTimeResponse.java new file mode 100644 index 0000000000..356a86f5b9 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreatePointInTimeResponse.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; + +public class CreatePointInTimeResponse { + + private final String pitId; + private final Long pitCreationTime; + private final Long keepAlive; + + public String getPitId() { + return pitId; + } + + public Long getPitCreationTime() { return pitCreationTime; } + + public Long getKeepAlive() { return keepAlive; } + + private CreatePointInTimeResponse(final CreatePointInTimeResponse.Builder builder) { + this.pitId = builder.pitId; + this.pitCreationTime = builder.pitCreationTime; + this.keepAlive = builder.keepAlive; + } + + public static CreatePointInTimeResponse.Builder builder() { + return new CreatePointInTimeResponse.Builder(); + } + + public static class Builder { + + private String pitId; + private Long pitCreationTime; + private Long keepAlive; + + public Builder() { + + } + + public CreatePointInTimeResponse.Builder withPitId(final String pitId) { + this.pitId = pitId; + return this; + } + + public CreatePointInTimeResponse.Builder withCreationTime(final Long pitCreationTime) { + this.pitCreationTime = pitCreationTime; + return this; + } + + public CreatePointInTimeResponse.Builder withKeepAlive(final Long keepAlive) { + this.keepAlive = keepAlive; + return this; + } + + public CreatePointInTimeResponse build() { + return new CreatePointInTimeResponse(this); + } + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeletePitRequest.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeletePitRequest.java deleted file mode 100644 index a1f616b314..0000000000 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeletePitRequest.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; - -public class DeletePitRequest { - - //todo: model after https://opensearch.org/docs/latest/search-plugins/point-in-time-api/#delete-pits -} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeletePointInTimeRequest.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeletePointInTimeRequest.java new file mode 100644 index 0000000000..1182fbba0c --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/DeletePointInTimeRequest.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; + +public class DeletePointInTimeRequest { + private final String pitId; + + public String getPitId() { + return pitId; + } + + private DeletePointInTimeRequest(final DeletePointInTimeRequest.Builder builder) { + this.pitId = builder.pitId; + } + + public static DeletePointInTimeRequest.Builder builder() { + return new DeletePointInTimeRequest.Builder(); + } + + public static class Builder { + + private String pitId; + + public Builder() { + + } + + public DeletePointInTimeRequest.Builder withPitId(final String pitId) { + this.pitId = pitId; + return this; + } + + public DeletePointInTimeRequest build() { + return new DeletePointInTimeRequest(this); + } + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java index 8adb0ab47c..c62d47b109 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java @@ -39,6 +39,7 @@ void getAwsRegion_returns_Region_of() throws NoSuchFieldException, IllegalAccess actualRegion = awsAuthenticationOptions.getAwsRegion(); } assertThat(actualRegion, equalTo(expectedRegionObject)); + assertThat(awsAuthenticationOptions.isSigv4Enabled(), equalTo(false)); } @Test diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java new file mode 100644 index 0000000000..592b98f848 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -0,0 +1,202 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; +import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; +import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; + +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker.DEFAULT_KEEP_ALIVE; + +@ExtendWith(MockitoExtension.class) +public class PitWorkerTest { + + @Mock + private OpenSearchSourceConfiguration openSearchSourceConfiguration; + + @Mock + private OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; + + @Mock + private SourceCoordinator sourceCoordinator; + + @Mock + private SearchAccessor searchAccessor; + + @Mock + private Buffer> buffer; + + private ExecutorService executorService; + + @BeforeEach + void setup() { + executorService = Executors.newSingleThreadExecutor(); + } + + private PitWorker createObjectUnderTest() { + return new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier); + } + + @Test + void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrupted() throws InterruptedException { + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.empty()); + + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_and_closes_that_partition() throws InterruptedException { + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + when(sourcePartition.getPartitionState()).thenReturn(Optional.empty()); + + final String pitId = UUID.randomUUID().toString(); + final ArgumentCaptor requestArgumentCaptor = ArgumentCaptor.forClass(CreatePointInTimeRequest.class); + final CreatePointInTimeResponse createPointInTimeResponse = mock(CreatePointInTimeResponse.class); + when(createPointInTimeResponse.getPitId()).thenReturn(pitId); + when(searchAccessor.createPit(requestArgumentCaptor.capture())).thenReturn(createPointInTimeResponse); + + final ArgumentCaptor deleteRequestArgumentCaptor = ArgumentCaptor.forClass(DeletePointInTimeRequest.class); + doNothing().when(searchAccessor).deletePit(deleteRequestArgumentCaptor.capture()); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); + when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + doNothing().when(sourceCoordinator).closePartition(partitionKey, + Duration.ZERO, 1); + + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + + final CreatePointInTimeRequest createPointInTimeRequest = requestArgumentCaptor.getValue(); + assertThat(createPointInTimeRequest, notNullValue()); + assertThat(createPointInTimeRequest.getIndex(), equalTo(partitionKey)); + assertThat(createPointInTimeRequest.getKeepAlive(), equalTo(DEFAULT_KEEP_ALIVE)); + + final DeletePointInTimeRequest deletePointInTimeRequest = deleteRequestArgumentCaptor.getValue(); + assertThat(deletePointInTimeRequest, notNullValue()); + assertThat(deletePointInTimeRequest.getPitId(), equalTo(pitId)); + } + + @Test + void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create_another_point_in_time() throws InterruptedException { + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + + final OpenSearchIndexProgressState openSearchIndexProgressState = mock(OpenSearchIndexProgressState.class); + final String pitId = UUID.randomUUID().toString(); + when(openSearchIndexProgressState.getPitId()).thenReturn(pitId); + when(openSearchIndexProgressState.hasValidPointInTime()).thenReturn(true); + when(sourcePartition.getPartitionState()).thenReturn(Optional.of(openSearchIndexProgressState)); + + final ArgumentCaptor deleteRequestArgumentCaptor = ArgumentCaptor.forClass(DeletePointInTimeRequest.class); + doNothing().when(searchAccessor).deletePit(deleteRequestArgumentCaptor.capture()); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); + when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + doNothing().when(sourceCoordinator).closePartition(partitionKey, + Duration.ZERO, 1); + + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + + final DeletePointInTimeRequest deletePointInTimeRequest = deleteRequestArgumentCaptor.getValue(); + assertThat(deletePointInTimeRequest, notNullValue()); + assertThat(deletePointInTimeRequest.getPitId(), equalTo(pitId)); + + verify(searchAccessor, never()).createPit(any(CreatePointInTimeRequest.class)); + } + + @Test + void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitException() throws InterruptedException { + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + when(sourcePartition.getPartitionState()).thenReturn(Optional.empty()); + + when(searchAccessor.createPit(any(CreatePointInTimeRequest.class))).thenThrow(SearchContextLimitException.class); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); + + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + + verify(searchAccessor, never()).deletePit(any(DeletePointInTimeRequest.class)); + verify(sourceCoordinator).giveUpPartitions(); + verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java new file mode 100644 index 0000000000..1f699697c0 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java @@ -0,0 +1,171 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.ErrorCause; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch.core.pit.CreatePitRequest; +import org.opensearch.client.opensearch.core.pit.CreatePitResponse; +import org.opensearch.client.opensearch.core.pit.DeletePitRecord; +import org.opensearch.client.opensearch.core.pit.DeletePitRequest; +import org.opensearch.client.opensearch.core.pit.DeletePitResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; + +import java.io.IOException; +import java.util.Collections; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.PIT_RESOURCE_LIMIT_ERROR_TYPE; + +@ExtendWith(MockitoExtension.class) +public class OpenSearchAccessorTest { + + @Mock + private OpenSearchClient openSearchClient; + + private SearchAccessor createObjectUnderTest() { + return new OpenSearchAccessor(openSearchClient, SearchContextType.POINT_IN_TIME); + } + + @Test + void create_pit_returns_expected_create_point_in_time_response() throws IOException { + final String indexName = UUID.randomUUID().toString(); + final String keepAlive = UUID.randomUUID().toString(); + + final CreatePointInTimeRequest createPointInTimeRequest = mock(CreatePointInTimeRequest.class); + when(createPointInTimeRequest.getIndex()).thenReturn(indexName); + when(createPointInTimeRequest.getKeepAlive()).thenReturn(keepAlive); + + final String pitId = UUID.randomUUID().toString(); + final Long creationTime = new Random().nextLong(); + final CreatePitResponse createPitResponse = mock(CreatePitResponse.class); + when(createPitResponse.creationTime()).thenReturn(creationTime); + when(createPitResponse.pitId()).thenReturn(pitId); + + when(openSearchClient.createPit(any(CreatePitRequest.class))).thenReturn(createPitResponse); + + final CreatePointInTimeResponse createPointInTimeResponse = createObjectUnderTest().createPit(createPointInTimeRequest); + assertThat(createPointInTimeResponse, notNullValue()); + assertThat(createPointInTimeResponse.getPitCreationTime(), equalTo(creationTime)); + assertThat(createPointInTimeResponse.getPitId(), equalTo(pitId)); + } + + @Test + void create_pit_with_exception_for_pit_limit_throws_SearchContextLimitException() throws IOException { + final String indexName = UUID.randomUUID().toString(); + final String keepAlive = UUID.randomUUID().toString(); + + final CreatePointInTimeRequest createPointInTimeRequest = mock(CreatePointInTimeRequest.class); + when(createPointInTimeRequest.getIndex()).thenReturn(indexName); + when(createPointInTimeRequest.getKeepAlive()).thenReturn(keepAlive); + + final OpenSearchException openSearchException = mock(OpenSearchException.class); + final ErrorCause errorCause = mock(ErrorCause.class); + final ErrorCause rootCause = mock(ErrorCause.class); + when(rootCause.type()).thenReturn(PIT_RESOURCE_LIMIT_ERROR_TYPE); + when(rootCause.reason()).thenReturn(UUID.randomUUID().toString()); + when(errorCause.causedBy()).thenReturn(rootCause); + when(openSearchException.error()).thenReturn(errorCause); + + when(openSearchClient.createPit(any(CreatePitRequest.class))).thenThrow(openSearchException); + + assertThrows(SearchContextLimitException.class, () -> createObjectUnderTest().createPit(createPointInTimeRequest)); + } + + @Test + void createPit_throws_OpenSearch_exception_throws_that_exception() throws IOException { + final String indexName = UUID.randomUUID().toString(); + final String keepAlive = UUID.randomUUID().toString(); + + final CreatePointInTimeRequest createPointInTimeRequest = mock(CreatePointInTimeRequest.class); + when(createPointInTimeRequest.getIndex()).thenReturn(indexName); + when(createPointInTimeRequest.getKeepAlive()).thenReturn(keepAlive); + + final OpenSearchException openSearchException = mock(OpenSearchException.class); + final ErrorCause errorCause = mock(ErrorCause.class); + when(errorCause.causedBy()).thenReturn(null); + when(openSearchException.error()).thenReturn(errorCause); + + when(openSearchClient.createPit(any(CreatePitRequest.class))).thenThrow(openSearchException); + + assertThrows(OpenSearchException.class, () -> createObjectUnderTest().createPit(createPointInTimeRequest)); + } + + @Test + void createPit_throws_runtime_exception_throws_IO_Exception() throws IOException { + final String indexName = UUID.randomUUID().toString(); + final String keepAlive = UUID.randomUUID().toString(); + + final CreatePointInTimeRequest createPointInTimeRequest = mock(CreatePointInTimeRequest.class); + when(createPointInTimeRequest.getIndex()).thenReturn(indexName); + when(createPointInTimeRequest.getKeepAlive()).thenReturn(keepAlive); + + when(openSearchClient.createPit(any(CreatePitRequest.class))).thenThrow(IOException.class); + + assertThrows(RuntimeException.class, () -> createObjectUnderTest().createPit(createPointInTimeRequest)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void delete_pit_with_no_exception_does_not_throw(final boolean successful) throws IOException { + final String pitId = UUID.randomUUID().toString(); + + final DeletePointInTimeRequest deletePointInTimeRequest = mock(DeletePointInTimeRequest.class); + when(deletePointInTimeRequest.getPitId()).thenReturn(pitId); + + final DeletePitResponse deletePitResponse = mock(DeletePitResponse.class); + final DeletePitRecord deletePitRecord = mock(DeletePitRecord.class); + when(deletePitRecord.successful()).thenReturn(successful); + when(deletePitResponse.pits()).thenReturn(Collections.singletonList(deletePitRecord)); + + when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenReturn(deletePitResponse); + + createObjectUnderTest().deletePit(deletePointInTimeRequest); + } + + @Test + void delete_pit_throws_opensearch_exception() throws IOException { + final String pitId = UUID.randomUUID().toString(); + + final DeletePointInTimeRequest deletePointInTimeRequest = mock(DeletePointInTimeRequest.class); + when(deletePointInTimeRequest.getPitId()).thenReturn(pitId); + + when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(OpenSearchException.class); + + assertThrows(OpenSearchException.class, () -> createObjectUnderTest().deletePit(deletePointInTimeRequest)); + } + + @Test + void delete_pit_throws_runtime_exception_when_client_throws_IOException() throws IOException { + final String pitId = UUID.randomUUID().toString(); + + final DeletePointInTimeRequest deletePointInTimeRequest = mock(DeletePointInTimeRequest.class); + when(deletePointInTimeRequest.getPitId()).thenReturn(pitId); + + when(openSearchClient.deletePit(any(DeletePitRequest.class))).thenThrow(IOException.class); + + assertThrows(RuntimeException.class, () -> createObjectUnderTest().deletePit(deletePointInTimeRequest)); + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/PitWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/PitWorkerTest.java deleted file mode 100644 index 2e2b20ddeb..0000000000 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/PitWorkerTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; -import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; -import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; -import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; -import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker; - -import java.time.Duration; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class PitWorkerTest { - - @Mock - private OpenSearchSourceConfiguration openSearchSourceConfiguration; - - @Mock - private OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; - - @Mock - private SourceCoordinator sourceCoordinator; - - @Mock - private SearchAccessor searchAccessor; - - @Mock - private Buffer> buffer; - - private ExecutorService executorService; - - @BeforeEach - void setup() { - executorService = Executors.newSingleThreadExecutor(); - } - - private PitWorker createObjectUnderTest() { - return new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier); - } - - @Test - void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrupted() throws InterruptedException { - when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.empty()); - - - final Future future = executorService.submit(() -> createObjectUnderTest().run()); - Thread.sleep(100); - executorService.shutdown(); - future.cancel(true); - assertThat(future.isCancelled(), equalTo(true)); - - assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); - } - - @Test - void run_with_getNextPartition_with_non_empty_partition_closes_that_partition() throws InterruptedException { - final SourcePartition sourcePartition = mock(SourcePartition.class); - final String partitionKey = UUID.randomUUID().toString(); - when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); - - when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); - - final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); - when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); - when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); - when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); - - doNothing().when(sourceCoordinator).closePartition(partitionKey, - Duration.ZERO, 1); - - - final Future future = executorService.submit(() -> createObjectUnderTest().run()); - Thread.sleep(100); - executorService.shutdown(); - future.cancel(true); - assertThat(future.isCancelled(), equalTo(true)); - - assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); - } -}