diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java index e8776ff9c0..8e2fd42384 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java @@ -55,7 +55,7 @@ public class OpenSearchSourceConfiguration { @JsonProperty("search_options") @Valid - private SearchConfiguration searchConfiguration; + private SearchConfiguration searchConfiguration = new SearchConfiguration(); public Integer getMaxRetries() { return maxRetries; @@ -95,7 +95,6 @@ public SearchConfiguration getSearchConfiguration() { @AssertTrue(message = "Either username and password, or aws options must be specified. Both cannot be set at once.") boolean validateAwsConfigWithUsernameAndPassword() { - return !((Objects.nonNull(awsAuthenticationOptions) && (Objects.nonNull(username) || Objects.nonNull(password))) || (Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password)))); } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java index d1ff5e55e3..b319db6ed2 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfiguration.java @@ -25,9 +25,6 @@ public class AwsAuthenticationConfiguration { @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") private Map awsStsHeaderOverrides; - @JsonProperty("enable_sigv4") - private Boolean sigv4Enabled = false; - public String getAwsStsRoleArn() { return awsStsRoleArn; } @@ -39,7 +36,5 @@ public Region getAwsRegion() { public Map getAwsStsHeaderOverrides() { return awsStsHeaderOverrides; } - - public Boolean isSigv4Enabled() { return sigv4Enabled; } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java index e3186429bc..ce7ad8c856 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java @@ -20,10 +20,10 @@ public class SearchConfiguration { private static final Logger LOG = LoggerFactory.getLogger(SearchConfiguration.class); @JsonProperty("batch_size") - private Integer batchSize; + private Integer batchSize = 1000; @JsonProperty("query") - private String queryString; + private String queryString = "{ \"query\": { \"match_all\": {} }}"; @JsonIgnore private Map queryMap; diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index da9f3eed0e..ab40f90c1a 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -14,16 +14,22 @@ import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; /** * PitWorker polls the source cluster via Point-In-Time contexts. @@ -32,16 +38,18 @@ public class PitWorker implements SearchWorker, Runnable { private static final Logger LOG = LoggerFactory.getLogger(PitWorker.class); + static final int BUFFER_TIMEOUT_MILLIS = 180_000; + private static final int STANDARD_BACKOFF_MILLIS = 30_000; + private static final Duration BACKOFF_ON_PIT_LIMIT_REACHED = Duration.ofSeconds(60); + static final String STARTING_KEEP_ALIVE = "15m"; + static final String EXTEND_KEEP_ALIVE_TIME = "1m"; + private final SearchAccessor searchAccessor; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final SourceCoordinator sourceCoordinator; private final Buffer> buffer; private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; - private static final int STANDARD_BACKOFF_MILLIS = 30_000; - static final String DEFAULT_KEEP_ALIVE = "30m"; // 30 minutes (will be extended during search) - private static final Duration BACKOFF_ON_PIT_LIMIT_REACHED = Duration.ofSeconds(60); - public PitWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, @@ -108,7 +116,7 @@ private void processIndex(final SourcePartition op if (!openSearchIndexProgressState.hasValidPointInTime()) { final CreatePointInTimeResponse createPointInTimeResponse = searchAccessor.createPit(CreatePointInTimeRequest.builder() .withIndex(indexName) - .withKeepAlive(DEFAULT_KEEP_ALIVE) + .withKeepAlive(STARTING_KEEP_ALIVE) .build()); LOG.debug("Created point in time for index {} with pit id {}", indexName, createPointInTimeResponse.getPitId()); @@ -118,10 +126,32 @@ private void processIndex(final SourcePartition op openSearchIndexProgressState.setKeepAlive(createPointInTimeResponse.getKeepAlive()); } - // todo: Implement search with pit and write documents to buffer, checkpoint with calls to saveState + final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration(); + SearchPointInTimeResponse searchPointInTimeResponse = null; + + // todo: Pass query and sort options from SearchConfiguration to the search request + do { + try { + searchPointInTimeResponse = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder() + .withPitId(openSearchIndexProgressState.getPitId()) + .withKeepAlive(EXTEND_KEEP_ALIVE_TIME) + .withPaginationSize(searchConfiguration.getBatchSize()) + .withSearchAfter(Objects.nonNull(searchPointInTimeResponse) ? searchPointInTimeResponse.getNextSearchAfter() : null) + .build()); + buffer.writeAll(searchPointInTimeResponse.getDocuments().stream().map(Record::new).collect(Collectors.toList()), BUFFER_TIMEOUT_MILLIS); + } catch (final TimeoutException e) { + // todo: implement backoff and retry, can reuse buffer accumulator code from the s3 source + } catch (final Exception e) { + LOG.error("Received an exception while searching with PIT for index '{}'", indexName); + throw new RuntimeException(e); + } + + // todo: Don't save state on every iteration of paginating, save search_after to state to pick up where left off in case of crash + sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState); + } while (searchPointInTimeResponse.getDocuments().size() == searchConfiguration.getBatchSize()); - // todo: This API call is failing with sigv4 enabled due to a mismatch in the signature + // todo: This API call is failing with sigv4 enabled due to a mismatch in the signature. Tracking issue (https://github.com/opensearch-project/opensearch-java/issues/521) searchAccessor.deletePit(DeletePointInTimeRequest.builder().withPitId(openSearchIndexProgressState.getPitId()).build()); } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/AwsRequestSigningApacheInterceptor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/AwsRequestSigningApacheInterceptor.java deleted file mode 100644 index b3460553a0..0000000000 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/AwsRequestSigningApacheInterceptor.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Copyright OpenSearch Contributors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with - * the License. A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ -package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; - -import org.apache.http.Header; -import org.apache.http.HttpEntityEnclosingRequest; -import org.apache.http.HttpHost; -import org.apache.http.HttpRequest; -import org.apache.http.HttpRequestInterceptor; -import org.apache.http.NameValuePair; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.entity.BasicHttpEntity; -import org.apache.http.message.BasicHeader; -import org.apache.http.protocol.HttpContext; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute; -import software.amazon.awssdk.core.interceptor.ExecutionAttributes; -import software.amazon.awssdk.core.signer.Signer; -import software.amazon.awssdk.http.SdkHttpFullRequest; -import software.amazon.awssdk.http.SdkHttpMethod; -import software.amazon.awssdk.regions.Region; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.TreeMap; - -import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST; - -/** - * An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer} - * and {@link AwsCredentialsProvider}. This is a copy from the opensearch sink - */ -final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor { - - /** - * Constant to check content-length - */ - private static final String CONTENT_LENGTH = "content-length"; - /** - * Constant to check Zero content length - */ - private static final String ZERO_CONTENT_LENGTH = "0"; - /** - * Constant to check if host is the endpoint - */ - private static final String HOST = "host"; - - /** - * The service that we're connecting to. - */ - private final String service; - - /** - * The particular signer implementation. - */ - private final Signer signer; - - /** - * The source of AWS credentials for signing. - */ - private final AwsCredentialsProvider awsCredentialsProvider; - - /** - * The region signing region. - */ - private final Region region; - - /** - * - * @param service service that we're connecting to - * @param signer particular signer implementation - * @param awsCredentialsProvider source of AWS credentials for signing - * @param region signing region - */ - public AwsRequestSigningApacheInterceptor(final String service, - final Signer signer, - final AwsCredentialsProvider awsCredentialsProvider, - final Region region) { - this.service = Objects.requireNonNull(service); - this.signer = Objects.requireNonNull(signer); - this.awsCredentialsProvider = Objects.requireNonNull(awsCredentialsProvider); - this.region = Objects.requireNonNull(region); - } - - /** - * {@inheritDoc} - */ - @Override - public void process(final HttpRequest request, final HttpContext context) - throws IOException { - URIBuilder uriBuilder; - try { - uriBuilder = new URIBuilder(request.getRequestLine().getUri()); - } catch (URISyntaxException e) { - throw new IOException("Invalid URI" , e); - } - - // Copy Apache HttpRequest to AWS Request - SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder() - .method(SdkHttpMethod.fromValue(request.getRequestLine().getMethod())) - .uri(buildUri(context, uriBuilder)); - - if (request instanceof HttpEntityEnclosingRequest) { - HttpEntityEnclosingRequest httpEntityEnclosingRequest = - (HttpEntityEnclosingRequest) request; - if (httpEntityEnclosingRequest.getEntity() != null) { - InputStream content = httpEntityEnclosingRequest.getEntity().getContent(); - requestBuilder.contentStreamProvider(() -> content); - } - } - requestBuilder.rawQueryParameters(nvpToMapParams(uriBuilder.getQueryParams())); - requestBuilder.headers(headerArrayToMap(request.getAllHeaders())); - - ExecutionAttributes attributes = new ExecutionAttributes(); - attributes.putAttribute(AwsSignerExecutionAttribute.AWS_CREDENTIALS, awsCredentialsProvider.resolveCredentials()); - attributes.putAttribute(AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME, service); - attributes.putAttribute(AwsSignerExecutionAttribute.SIGNING_REGION, region); - - // Sign it - SdkHttpFullRequest signedRequest = signer.sign(requestBuilder.build(), attributes); - - // Now copy everything back - request.setHeaders(mapToHeaderArray(signedRequest.headers())); - if (request instanceof HttpEntityEnclosingRequest) { - HttpEntityEnclosingRequest httpEntityEnclosingRequest = - (HttpEntityEnclosingRequest) request; - if (httpEntityEnclosingRequest.getEntity() != null) { - BasicHttpEntity basicHttpEntity = new BasicHttpEntity(); - basicHttpEntity.setContent(signedRequest.contentStreamProvider() - .orElseThrow(() -> new IllegalStateException("There must be content")) - .newStream()); - httpEntityEnclosingRequest.setEntity(basicHttpEntity); - } - } - } - - private URI buildUri(final HttpContext context, URIBuilder uriBuilder) throws IOException { - try { - HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST); - - if (host != null) { - uriBuilder.setHost(host.getHostName()); - uriBuilder.setScheme(host.getSchemeName()); - uriBuilder.setPort(host.getPort()); - } - - return uriBuilder.build(); - } catch (URISyntaxException e) { - throw new IOException("Invalid URI", e); - } - } - - /** - * - * @param params list of HTTP query params as NameValuePairs - * @return a multimap of HTTP query params - */ - private static Map> nvpToMapParams(final List params) { - Map> parameterMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - for (NameValuePair nvp : params) { - List argsList = - parameterMap.computeIfAbsent(nvp.getName(), k -> new ArrayList<>()); - argsList.add(nvp.getValue()); - } - return parameterMap; - } - - /** - * @param headers modelled Header objects - * @return a Map of header entries - */ - private static Map> headerArrayToMap(final Header[] headers) { - Map> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - for (Header header : headers) { - if (!skipHeader(header)) { - headersMap.put(header.getName(), headersMap - .getOrDefault(header.getName(), - new LinkedList<>(Collections.singletonList(header.getValue())))); - } - } - return headersMap; - } - - /** - * @param header header line to check - * @return true if the given header should be excluded when signing - */ - private static boolean skipHeader(final Header header) { - return (CONTENT_LENGTH.equalsIgnoreCase(header.getName()) - && ZERO_CONTENT_LENGTH.equals(header.getValue())) // Strip Content-Length: 0 - || HOST.equalsIgnoreCase(header.getName()); // Host comes from endpoint - } - - /** - * @param mapHeaders Map of header entries - * @return modelled Header objects - */ - private static Header[] mapToHeaderArray(final Map> mapHeaders) { - Header[] headers = new Header[mapHeaders.size()]; - int i = 0; - for (Map.Entry> headerEntry : mapHeaders.entrySet()) { - for (String value : headerEntry.getValue()) { - headers[i++] = new BasicHeader(headerEntry.getKey(), value); - } - } - return headers; - } -} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java index 5bfe6d9ea4..c1da9abc65 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java @@ -11,8 +11,8 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; @@ -30,7 +30,7 @@ public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest create } @Override - public SearchPitResponse searchWithPit(SearchPitRequest searchPitRequest) { + public SearchPointInTimeResponse searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest) { //todo: implement return null; } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index c49580daca..2c25b720bc 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -4,13 +4,25 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch._types.ScoreSort; +import org.opensearch.client.opensearch._types.SortOptions; +import org.opensearch.client.opensearch._types.SortOrder; import org.opensearch.client.opensearch._types.Time; +import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery; +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.opensearch.core.pit.CreatePitRequest; import org.opensearch.client.opensearch.core.pit.CreatePitResponse; import org.opensearch.client.opensearch.core.pit.DeletePitRequest; import org.opensearch.client.opensearch.core.pit.DeletePitResponse; +import org.opensearch.client.opensearch.core.search.Pit; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventType; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; @@ -19,8 +31,8 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; import org.slf4j.Logger; @@ -28,12 +40,18 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory { private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAccessor.class); + static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "document_id"; + static final String INDEX_METADATA_ATTRIBUTE_NAME = "index"; + static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception"; private final OpenSearchClient openSearchClient; @@ -75,9 +93,37 @@ public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest create } @Override - public SearchPitResponse searchWithPit(final SearchPitRequest searchPitRequest) { - // todo: implement - return null; + public SearchPointInTimeResponse searchWithPit(final SearchPointInTimeRequest searchPointInTimeRequest) { + try { + final SearchResponse searchResponse = openSearchClient.search( + SearchRequest.of(builder -> { + builder + .pit(Pit.of(pitBuilder -> pitBuilder.id(searchPointInTimeRequest.getPitId()).keepAlive(searchPointInTimeRequest.getKeepAlive()))) + .size(searchPointInTimeRequest.getPaginationSize()) + .sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc))))) + .query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery.queryName("*"))))); + + if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) { + builder.searchAfter(searchPointInTimeRequest.getSearchAfter()); + } + + return builder; + }), ObjectNode.class); + + final List documents = searchResponse.hits().hits().stream() + .map(hit -> JacksonEvent.builder() + .withData(hit.source()) + .withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index())) + .withEventType(EventType.DOCUMENT.toString()).build()) + .collect(Collectors.toList()); + + return SearchPointInTimeResponse.builder() + .withDocuments(documents) + .withNextSearchAfter(searchResponse.hits().hits().get(searchResponse.hits().hits().size() - 1).sort()) + .build(); + } catch (final IOException e) { + throw new RuntimeException(e); + } } @Override diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessor.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessor.java index b7550edc3e..fc899bc516 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessor.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessor.java @@ -11,8 +11,8 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest; -import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; @@ -38,11 +38,11 @@ public interface SearchAccessor { /** * Searches using a PIT context - * @param searchPitRequest payload for searching with PIT context + * @param searchPointInTimeRequest payload for searching with PIT context * @return * @since 2.4 */ - SearchPitResponse searchWithPit(SearchPitRequest searchPitRequest); + SearchPointInTimeResponse searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest); /** * Deletes PITs diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java index d7b19e4c63..1a4557353a 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessorStrategy.java @@ -5,7 +5,6 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; import org.apache.http.HttpHost; -import org.apache.http.HttpRequestInterceptor; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; @@ -35,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.signer.Aws4Signer; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; @@ -85,8 +83,14 @@ private SearchAccessorStrategy(final OpenSearchSourceConfiguration openSearchSou * @since 2.4 */ public SearchAccessor getSearchAccessor() { - final RestClient restClient = createOpenSearchRestClient(); - final OpenSearchTransport transport = createOpenSearchTransport(restClient); + + OpenSearchTransport transport; + if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) { + transport = createOpenSearchTransportForAws(); + } else { + final RestClient restClient = createOpenSearchRestClient(); + transport = createOpenSearchTransport(restClient); + } final OpenSearchClient openSearchClient = new OpenSearchClient(transport); InfoResponse infoResponse; @@ -129,13 +133,8 @@ private RestClient createOpenSearchRestClient() { final RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); - if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) { - LOG.info("Using aws credentials and sigv4 for auth for the OpenSearch source"); - attachSigv4Auth(restClientBuilder); - } else { - LOG.info("Using username and password for auth for the OpenSearch source"); - attachUsernamePassword(restClientBuilder); - } + LOG.info("Using username and password for auth for the OpenSearch source"); + attachUsernamePassword(restClientBuilder); setConnectAndSocketTimeout(restClientBuilder); @@ -179,27 +178,6 @@ private static TrustManager[] createTrustManagers(final Path certPath) { } } - private void attachSigv4Auth(final RestClientBuilder restClientBuilder) { - final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() - .withRegion(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion()) - .withStsRoleArn(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsRoleArn()) - .withStsHeaderOverrides(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsHeaderOverrides()) - .build()); - - final HttpRequestInterceptor httpRequestInterceptor = new AwsRequestSigningApacheInterceptor( - AOS_SERVICE_NAME, - Aws4Signer.create(), - awsCredentialsProvider, - openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion() - ); - - restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { - httpClientBuilder.addInterceptorLast(httpRequestInterceptor); - attachSSLContext(httpClientBuilder); - return httpClientBuilder; - }); - } - private void attachUsernamePassword(final RestClientBuilder restClientBuilder) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, @@ -227,24 +205,23 @@ private void setConnectAndSocketTimeout(final RestClientBuilder restClientBuilde } private OpenSearchTransport createOpenSearchTransport(final RestClient restClient) { + return new RestClientTransport(restClient, new JacksonJsonpMapper()); + } - if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions()) && openSearchSourceConfiguration.getAwsAuthenticationOptions().isSigv4Enabled()) { - final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() - .withRegion(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion()) - .withStsRoleArn(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsRoleArn()) - .withStsHeaderOverrides(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsHeaderOverrides()) - .build()); - - return new AwsSdk2Transport(createSdkHttpClient(), - HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(), - AOS_SERVICE_NAME, openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(), - AwsSdk2TransportOptions.builder() - .setCredentials(awsCredentialsProvider) - .setMapper(new JacksonJsonpMapper()) - .build()); - } + private OpenSearchTransport createOpenSearchTransportForAws() { + final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() + .withRegion(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion()) + .withStsRoleArn(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsRoleArn()) + .withStsHeaderOverrides(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsHeaderOverrides()) + .build()); - return new RestClientTransport(restClient, new JacksonJsonpMapper()); + return new AwsSdk2Transport(createSdkHttpClient(), + HttpHost.create(openSearchSourceConfiguration.getHosts().get(0)).getHostName(), + AOS_SERVICE_NAME, openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion(), + AwsSdk2TransportOptions.builder() + .setCredentials(awsCredentialsProvider) + .setMapper(new JacksonJsonpMapper()) + .build()); } private SdkHttpClient createSdkHttpClient() { diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPitRequest.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPitRequest.java deleted file mode 100644 index 566f181265..0000000000 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPitRequest.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; - -public class SearchPitRequest { - - // todo: model after https://opensearch.org/docs/latest/search-plugins/point-in-time/#pit-search - // & https://www.elastic.co/guide/en/elasticsearch/reference/7.10/point-in-time-api.html -} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPitResponse.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPitResponse.java deleted file mode 100644 index 0ab4829deb..0000000000 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPitResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; - -public class SearchPitResponse { - - // todo: model after https://opensearch.org/docs/latest/search-plugins/point-in-time/#pit-search & - // https://www.elastic.co/guide/en/elasticsearch/reference/7.10/point-in-time-api.html -} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPointInTimeRequest.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPointInTimeRequest.java new file mode 100644 index 0000000000..ae04123cff --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPointInTimeRequest.java @@ -0,0 +1,113 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; + +import java.util.List; + +public class SearchPointInTimeRequest { + + private final String pitId; + private final String keepAlive; + private final String index; + private final List searchAfter; + private final Integer paginationSize; + private final String query; + private final List sortingOptions; + + public String getIndex() { + return index; + } + + public List getSortOptions() { + return sortingOptions; + } + + public String getQuery() { + return query; + } + + public Integer getPaginationSize() { + return paginationSize; + } + + public List getSearchAfter() { + return searchAfter; + } + + public String getPitId() { + return pitId; + } + + public String getKeepAlive() { return keepAlive; } + + private SearchPointInTimeRequest(final SearchPointInTimeRequest.Builder builder) { + this.pitId = builder.pitId; + this.keepAlive = builder.keepAlive; + this.index = builder.index; + this.searchAfter = builder.searchAfter; + this.paginationSize = builder.paginationSize; + this.query = builder.query; + this.sortingOptions = builder.sortingOptions; + } + + public static SearchPointInTimeRequest.Builder builder() { + return new SearchPointInTimeRequest.Builder(); + } + + public static class Builder { + + private String pitId; + private String keepAlive; + private String index; + private List searchAfter; + private Integer paginationSize; + private String query; + private List sortingOptions; + + + public Builder() { + + } + + public SearchPointInTimeRequest.Builder withQuery(final String query) { + this.query = query; + return this; + } + + public SearchPointInTimeRequest.Builder withSortOptions(final List sortingOptions) { + this.sortingOptions = sortingOptions; + return this; + } + + public SearchPointInTimeRequest.Builder withPitId(final String pitId) { + this.pitId = pitId; + return this; + } + + public SearchPointInTimeRequest.Builder withPaginationSize(final Integer paginationSize) { + this.paginationSize = paginationSize; + return this; + } + + public SearchPointInTimeRequest.Builder withSearchAfter(final List searchAfter) { + this.searchAfter = searchAfter; + return this; + } + + public SearchPointInTimeRequest.Builder withKeepAlive(final String keepAlive) { + this.keepAlive = keepAlive; + return this; + } + + public SearchPointInTimeRequest.Builder withIndex(final String index) { + this.index = index; + return this; + } + + public SearchPointInTimeRequest build() { + return new SearchPointInTimeRequest(this); + } + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPointInTimeResponse.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPointInTimeResponse.java new file mode 100644 index 0000000000..29530c8ef9 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchPointInTimeResponse.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; + +import org.opensearch.dataprepper.model.event.Event; + +import java.util.List; + +public class SearchPointInTimeResponse { + + private final List documents; + private final List nextSearchAfter; + + public List getDocuments() { + return documents; + } + + public List getNextSearchAfter() { + return nextSearchAfter; + } + + private SearchPointInTimeResponse(final SearchPointInTimeResponse.Builder builder) { + this.documents = builder.documents; + this.nextSearchAfter = builder.nextSearchAfter; + } + + public static SearchPointInTimeResponse.Builder builder() { + return new SearchPointInTimeResponse.Builder(); + } + + public static class Builder { + + private List documents; + private List nextSearchAfter; + + public Builder() { + + } + + public SearchPointInTimeResponse.Builder withDocuments(final List documents) { + this.documents = documents; + return this; + } + + public SearchPointInTimeResponse.Builder withNextSearchAfter(final List nextSearchAfter) { + this.nextSearchAfter = nextSearchAfter; + return this; + } + + + public SearchPointInTimeResponse build() { + return new SearchPointInTimeResponse(this); + } + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptions.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptions.java new file mode 100644 index 0000000000..d69ee9850f --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptions.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +// TODO: Convert the queryMap sort value from SearchConfiguration to this class to be passed to SearchWithPit or SearchWithScroll +public class SortingOptions { + + private String fieldName; + + @JsonProperty("order") + private String order = "asc"; + + @JsonProperty("format") + private String format; + + @JsonProperty("mode") + private String mode; + + public String getFieldName() { + return fieldName; + } + + public String getOrder() { + return order; + } + + public String getFormat() { + return format; + } + + public String getMode() { + return mode; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java index c62d47b109..8adb0ab47c 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/AwsAuthenticationConfigurationTest.java @@ -39,7 +39,6 @@ void getAwsRegion_returns_Region_of() throws NoSuchFieldException, IllegalAccess actualRegion = awsAuthenticationOptions.getAwsRegion(); } assertThat(actualRegion, equalTo(expectedRegionObject)); - assertThat(awsAuthenticationOptions.isSigv4Enabled(), equalTo(false)); } @Test diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java index 0867d59ee9..1c9d4deb7c 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java @@ -26,17 +26,17 @@ void default_search_configuration() { final SearchConfiguration searchConfiguration = new SearchConfiguration(); assertThat(searchConfiguration.getQuery(), equalTo(null)); - assertThat(searchConfiguration.getBatchSize(), equalTo(null)); + assertThat(searchConfiguration.getBatchSize(), equalTo(1000)); } @Test void non_default_search_configuration() { final Map pluginSettings = new HashMap<>(); - pluginSettings.put("batch_size", 1000); + pluginSettings.put("batch_size", 2000); pluginSettings.put("query", "{\"query\": {\"match_all\": {} }}"); final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class); - assertThat(searchConfiguration.getBatchSize(),equalTo(1000)); + assertThat(searchConfiguration.getBatchSize(),equalTo(2000)); assertThat(searchConfiguration.isQueryValid(), equalTo(true)); assertThat(searchConfiguration.getQuery(), notNullValue()); assertThat(searchConfiguration.getQuery().containsKey("query"), equalTo(true)); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index 592b98f848..6e1df10187 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -19,13 +19,18 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse; import java.time.Duration; +import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -37,14 +42,19 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker.DEFAULT_KEEP_ALIVE; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker.BUFFER_TIMEOUT_MILLIS; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker.EXTEND_KEEP_ALIVE_TIME; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker.STARTING_KEEP_ALIVE; @ExtendWith(MockitoExtension.class) public class PitWorkerTest { @@ -90,7 +100,7 @@ void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrup } @Test - void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_and_closes_that_partition() throws InterruptedException { + void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_and_closes_that_partition() throws Exception { final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -102,10 +112,24 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ when(createPointInTimeResponse.getPitId()).thenReturn(pitId); when(searchAccessor.createPit(requestArgumentCaptor.capture())).thenReturn(createPointInTimeResponse); + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + final SearchPointInTimeResponse searchPointInTimeResponse = mock(SearchPointInTimeResponse.class); + when(searchPointInTimeResponse.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); + when(searchPointInTimeResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class))) + .thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + + final ArgumentCaptor searchPointInTimeRequestArgumentCaptor = ArgumentCaptor.forClass(SearchPointInTimeRequest.class); + when(searchAccessor.searchWithPit(searchPointInTimeRequestArgumentCaptor.capture())).thenReturn(searchPointInTimeResponse); + + doNothing().when(buffer).writeAll(anyCollection(), eq(BUFFER_TIMEOUT_MILLIS)); + final ArgumentCaptor deleteRequestArgumentCaptor = ArgumentCaptor.forClass(DeletePointInTimeRequest.class); doNothing().when(searchAccessor).deletePit(deleteRequestArgumentCaptor.capture()); - when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); @@ -127,7 +151,25 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ final CreatePointInTimeRequest createPointInTimeRequest = requestArgumentCaptor.getValue(); assertThat(createPointInTimeRequest, notNullValue()); assertThat(createPointInTimeRequest.getIndex(), equalTo(partitionKey)); - assertThat(createPointInTimeRequest.getKeepAlive(), equalTo(DEFAULT_KEEP_ALIVE)); + assertThat(createPointInTimeRequest.getKeepAlive(), equalTo(STARTING_KEEP_ALIVE)); + + verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class)); + verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), any(OpenSearchIndexProgressState.class)); + + final List searchPointInTimeRequestList = searchPointInTimeRequestArgumentCaptor.getAllValues(); + assertThat(searchPointInTimeRequestList.size(), equalTo(2)); + assertThat(searchPointInTimeRequestList.get(0), notNullValue()); + assertThat(searchPointInTimeRequestList.get(0).getPitId(), equalTo(pitId)); + assertThat(searchPointInTimeRequestList.get(0).getKeepAlive(), equalTo(EXTEND_KEEP_ALIVE_TIME)); + assertThat(searchPointInTimeRequestList.get(0).getPaginationSize(), equalTo(2)); + assertThat(searchPointInTimeRequestList.get(0).getSearchAfter(), equalTo(null)); + + assertThat(searchPointInTimeRequestList.get(1), notNullValue()); + assertThat(searchPointInTimeRequestList.get(1).getPitId(), equalTo(pitId)); + assertThat(searchPointInTimeRequestList.get(1).getKeepAlive(), equalTo(EXTEND_KEEP_ALIVE_TIME)); + assertThat(searchPointInTimeRequestList.get(1).getPaginationSize(), equalTo(2)); + assertThat(searchPointInTimeRequestList.get(1).getSearchAfter(), equalTo(searchPointInTimeResponse.getNextSearchAfter())); + final DeletePointInTimeRequest deletePointInTimeRequest = deleteRequestArgumentCaptor.getValue(); assertThat(deletePointInTimeRequest, notNullValue()); @@ -135,7 +177,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ } @Test - void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create_another_point_in_time() throws InterruptedException { + void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create_another_point_in_time() throws Exception { final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -146,10 +188,23 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create when(openSearchIndexProgressState.hasValidPointInTime()).thenReturn(true); when(sourcePartition.getPartitionState()).thenReturn(Optional.of(openSearchIndexProgressState)); + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + final SearchPointInTimeResponse searchPointInTimeResponse = mock(SearchPointInTimeResponse.class); + when(searchPointInTimeResponse.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); + when(searchPointInTimeResponse.getDocuments()).thenReturn(List.of(mock(Event.class), mock(Event.class))).thenReturn(List.of(mock(Event.class), mock(Event.class))) + .thenReturn(List.of(mock(Event.class))).thenReturn(List.of(mock(Event.class))); + + when(searchAccessor.searchWithPit(any(SearchPointInTimeRequest.class))).thenReturn(searchPointInTimeResponse); + + doNothing().when(buffer).writeAll(anyCollection(), eq(BUFFER_TIMEOUT_MILLIS)); + final ArgumentCaptor deleteRequestArgumentCaptor = ArgumentCaptor.forClass(DeletePointInTimeRequest.class); doNothing().when(searchAccessor).deletePit(deleteRequestArgumentCaptor.capture()); - when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); @@ -173,6 +228,8 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create assertThat(deletePointInTimeRequest.getPitId(), equalTo(pitId)); verify(searchAccessor, never()).createPit(any(CreatePointInTimeRequest.class)); + verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class)); + verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState)); } @Test diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java index 1f699697c0..fd9cf19499 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java @@ -5,28 +5,38 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.ErrorCause; import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; import org.opensearch.client.opensearch.core.pit.CreatePitRequest; import org.opensearch.client.opensearch.core.pit.CreatePitResponse; import org.opensearch.client.opensearch.core.pit.DeletePitRecord; import org.opensearch.client.opensearch.core.pit.DeletePitRequest; import org.opensearch.client.opensearch.core.pit.DeletePitResponse; +import org.opensearch.client.opensearch.core.search.Hit; +import org.opensearch.client.opensearch.core.search.HitsMetadata; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Random; import java.util.UUID; @@ -35,6 +45,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.PIT_RESOURCE_LIMIT_ERROR_TYPE; @@ -168,4 +179,54 @@ void delete_pit_throws_runtime_exception_when_client_throws_IOException() throws assertThrows(RuntimeException.class, () -> createObjectUnderTest().deletePit(deletePointInTimeRequest)); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean hasSearchAfter) throws IOException { + final String pitId = UUID.randomUUID().toString(); + final Integer paginationSize = new Random().nextInt(); + final List searchAfter = Collections.singletonList(UUID.randomUUID().toString()); + + final SearchPointInTimeRequest searchPointInTimeRequest = mock(SearchPointInTimeRequest.class); + when(searchPointInTimeRequest.getPitId()).thenReturn(pitId); + when(searchPointInTimeRequest.getPaginationSize()).thenReturn(paginationSize); + + if (hasSearchAfter) { + when(searchPointInTimeRequest.getSearchAfter()).thenReturn(searchAfter); + } else { + when(searchPointInTimeRequest.getSearchAfter()).thenReturn(null); + } + + final SearchResponse searchResponse = mock(SearchResponse.class); + final HitsMetadata hitsMetadata = mock(HitsMetadata.class); + final List> hits = new ArrayList<>(); + final Hit firstHit = mock(Hit.class); + when(firstHit.id()).thenReturn(UUID.randomUUID().toString()); + when(firstHit.index()).thenReturn(UUID.randomUUID().toString()); + when(firstHit.source()).thenReturn(mock(ObjectNode.class)); + + final Hit secondHit = mock(Hit.class); + when(secondHit.id()).thenReturn(UUID.randomUUID().toString()); + when(secondHit.index()).thenReturn(UUID.randomUUID().toString()); + when(secondHit.source()).thenReturn(mock(ObjectNode.class)); + when(secondHit.sort()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); + + hits.add(firstHit); + hits.add(secondHit); + + when(hitsMetadata.hits()).thenReturn(hits); + when(searchResponse.hits()).thenReturn(hitsMetadata); + + final ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); + + when(openSearchClient.search(searchRequestArgumentCaptor.capture(), eq(ObjectNode.class))).thenReturn(searchResponse); + + final SearchPointInTimeResponse searchPointInTimeResponse = createObjectUnderTest().searchWithPit(searchPointInTimeRequest); + + assertThat(searchPointInTimeResponse, notNullValue()); + assertThat(searchPointInTimeResponse.getDocuments(), notNullValue()); + assertThat(searchPointInTimeResponse.getDocuments().size(), equalTo(2)); + + assertThat(searchPointInTimeResponse.getNextSearchAfter(), equalTo(secondHit.sort())); + } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java index 2d1821b59c..d43d3fb4aa 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/SearchAccessStrategyTest.java @@ -104,7 +104,6 @@ void testHappyPath_with_aws_credentials_for_different_scroll_versions_for_opense when(connectionConfiguration.getCertPath()).thenReturn(null); when(connectionConfiguration.getSocketTimeout()).thenReturn(null); when(connectionConfiguration.getConnectTimeout()).thenReturn(null); - when(connectionConfiguration.isInsecure()).thenReturn(true); final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); when(awsAuthenticationConfiguration.getAwsRegion()).thenReturn(Region.US_EAST_1);