Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement basic search with point in time and search after #2847

Merged
merged 2 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class OpenSearchSourceConfiguration {

@JsonProperty("search_options")
@Valid
private SearchConfiguration searchConfiguration;
private SearchConfiguration searchConfiguration = new SearchConfiguration();

public Integer getMaxRetries() {
return maxRetries;
Expand Down Expand Up @@ -95,7 +95,6 @@ public SearchConfiguration getSearchConfiguration() {

@AssertTrue(message = "Either username and password, or aws options must be specified. Both cannot be set at once.")
boolean validateAwsConfigWithUsernameAndPassword() {

return !((Objects.nonNull(awsAuthenticationOptions) && (Objects.nonNull(username) || Objects.nonNull(password))) ||
(Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ public class AwsAuthenticationConfiguration {
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

@JsonProperty("enable_sigv4")
private Boolean sigv4Enabled = false;

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}
Expand All @@ -39,7 +36,5 @@ public Region getAwsRegion() {
public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}

public Boolean isSigv4Enabled() { return sigv4Enabled; }
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public class SearchConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(SearchConfiguration.class);

@JsonProperty("batch_size")
private Integer batchSize;
private Integer batchSize = 1000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit
DEFAULT_BATCH_SIZE=1000


@JsonProperty("query")
private String queryString;
private String queryString = "{ \"query\": { \"match_all\": {} }}";

@JsonIgnore
private Map<String, Object> queryMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
* PitWorker polls the source cluster via Point-In-Time contexts.
Expand All @@ -32,16 +38,18 @@ public class PitWorker implements SearchWorker, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(PitWorker.class);

static final int BUFFER_TIMEOUT_MILLIS = 180_000;
private static final int STANDARD_BACKOFF_MILLIS = 30_000;
private static final Duration BACKOFF_ON_PIT_LIMIT_REACHED = Duration.ofSeconds(60);
static final String STARTING_KEEP_ALIVE = "15m";
static final String EXTEND_KEEP_ALIVE_TIME = "1m";

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

private static final int STANDARD_BACKOFF_MILLIS = 30_000;
static final String DEFAULT_KEEP_ALIVE = "30m"; // 30 minutes (will be extended during search)
private static final Duration BACKOFF_ON_PIT_LIMIT_REACHED = Duration.ofSeconds(60);

public PitWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
Expand Down Expand Up @@ -108,7 +116,7 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
if (!openSearchIndexProgressState.hasValidPointInTime()) {
final CreatePointInTimeResponse createPointInTimeResponse = searchAccessor.createPit(CreatePointInTimeRequest.builder()
.withIndex(indexName)
.withKeepAlive(DEFAULT_KEEP_ALIVE)
.withKeepAlive(STARTING_KEEP_ALIVE)
.build());

LOG.debug("Created point in time for index {} with pit id {}", indexName, createPointInTimeResponse.getPitId());
Expand All @@ -118,10 +126,32 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
openSearchIndexProgressState.setKeepAlive(createPointInTimeResponse.getKeepAlive());
}

// todo: Implement search with pit and write documents to buffer, checkpoint with calls to saveState
final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration();
SearchPointInTimeResults searchPointInTimeResults = null;

// todo: Pass query and sort options from SearchConfiguration to the search request
do {
try {
searchPointInTimeResults = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
.withPitId(openSearchIndexProgressState.getPitId())
.withKeepAlive(EXTEND_KEEP_ALIVE_TIME)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(Objects.nonNull(searchPointInTimeResults) ? searchPointInTimeResults.getNextSearchAfter() : null)
.build());
buffer.writeAll(searchPointInTimeResults.getDocuments().stream().map(Record::new).collect(Collectors.toList()), BUFFER_TIMEOUT_MILLIS);
} catch (final TimeoutException e) {
// todo: implement backoff and retry, can reuse buffer accumulator code from the s3 source
} catch (final Exception e) {
LOG.error("Received an exception while searching with PIT for index '{}'", indexName);
throw new RuntimeException(e);
}

// todo: Don't save state on every iteration of paginating, save search_after to state to pick up where left off in case of crash
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
} while (searchPointInTimeResults.getDocuments().size() == searchConfiguration.getBatchSize());


// todo: This API call is failing with sigv4 enabled due to a mismatch in the signature
// todo: This API call is failing with sigv4 enabled due to a mismatch in the signature. Tracking issue (https://github.com/opensearch-project/opensearch-java/issues/521)
searchAccessor.deletePit(DeletePointInTimeRequest.builder().withPitId(openSearchIndexProgressState.getPitId()).build());
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;

Expand All @@ -30,7 +30,7 @@ public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest create
}

@Override
public SearchPitResponse searchWithPit(SearchPitRequest searchPitRequest) {
public SearchPointInTimeResults searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest) {
//todo: implement
return null;
}
Expand Down
Loading