Skip to content

Commit

Permalink
Implement basic search with point in time and search after
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Jun 8, 2023
1 parent 3598ba4 commit a858cee
Show file tree
Hide file tree
Showing 19 changed files with 459 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class OpenSearchSourceConfiguration {

@JsonProperty("search_options")
@Valid
private SearchConfiguration searchConfiguration;
private SearchConfiguration searchConfiguration = new SearchConfiguration();

public Integer getMaxRetries() {
return maxRetries;
Expand Down Expand Up @@ -95,7 +95,6 @@ public SearchConfiguration getSearchConfiguration() {

@AssertTrue(message = "Either username and password, or aws options must be specified. Both cannot be set at once.")
boolean validateAwsConfigWithUsernameAndPassword() {

return !((Objects.nonNull(awsAuthenticationOptions) && (Objects.nonNull(username) || Objects.nonNull(password))) ||
(Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ public class AwsAuthenticationConfiguration {
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

@JsonProperty("enable_sigv4")
private Boolean sigv4Enabled = false;

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}
Expand All @@ -39,7 +36,5 @@ public Region getAwsRegion() {
public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}

public Boolean isSigv4Enabled() { return sigv4Enabled; }
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public class SearchConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(SearchConfiguration.class);

@JsonProperty("batch_size")
private Integer batchSize;
private Integer batchSize = 1000;

@JsonProperty("query")
private String queryString;
private String queryString = "{ \"query\": { \"match_all\": {} }}";

@JsonIgnore
private Map<String, Object> queryMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
* PitWorker polls the source cluster via Point-In-Time contexts.
Expand All @@ -32,16 +38,18 @@ public class PitWorker implements SearchWorker, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(PitWorker.class);

static final int BUFFER_TIMEOUT_MILLIS = 180_000;
private static final int STANDARD_BACKOFF_MILLIS = 30_000;
private static final Duration BACKOFF_ON_PIT_LIMIT_REACHED = Duration.ofSeconds(60);
static final String STARTING_KEEP_ALIVE = "15m";
static final String EXTEND_KEEP_ALIVE_TIME = "1m";

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

private static final int STANDARD_BACKOFF_MILLIS = 30_000;
static final String DEFAULT_KEEP_ALIVE = "30m"; // 30 minutes (will be extended during search)
private static final Duration BACKOFF_ON_PIT_LIMIT_REACHED = Duration.ofSeconds(60);

public PitWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
Expand Down Expand Up @@ -108,7 +116,7 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
if (!openSearchIndexProgressState.hasValidPointInTime()) {
final CreatePointInTimeResponse createPointInTimeResponse = searchAccessor.createPit(CreatePointInTimeRequest.builder()
.withIndex(indexName)
.withKeepAlive(DEFAULT_KEEP_ALIVE)
.withKeepAlive(STARTING_KEEP_ALIVE)
.build());

LOG.debug("Created point in time for index {} with pit id {}", indexName, createPointInTimeResponse.getPitId());
Expand All @@ -118,10 +126,32 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
openSearchIndexProgressState.setKeepAlive(createPointInTimeResponse.getKeepAlive());
}

// todo: Implement search with pit and write documents to buffer, checkpoint with calls to saveState
final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration();
SearchPointInTimeResponse searchPointInTimeResponse = null;

// todo: Pass query and sort options from SearchConfiguration to the search request
do {
try {
searchPointInTimeResponse = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
.withPitId(openSearchIndexProgressState.getPitId())
.withKeepAlive(EXTEND_KEEP_ALIVE_TIME)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(Objects.nonNull(searchPointInTimeResponse) ? searchPointInTimeResponse.getNextSearchAfter() : null)
.build());
buffer.writeAll(searchPointInTimeResponse.getDocuments().stream().map(Record::new).collect(Collectors.toList()), BUFFER_TIMEOUT_MILLIS);
} catch (final TimeoutException e) {
// todo: implement backoff and retry, can reuse buffer accumulator code from the s3 source
} catch (final Exception e) {
LOG.error("Received an exception while searching with PIT for index '{}'", indexName);
throw new RuntimeException(e);
}

// todo: Don't save state on every iteration of paginating, save search_after to state to pick up where left off in case of crash
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
} while (searchPointInTimeResponse.getDocuments().size() == searchConfiguration.getBatchSize());


// todo: This API call is failing with sigv4 enabled due to a mismatch in the signature
// todo: This API call is failing with sigv4 enabled due to a mismatch in the signature. Tracking issue (https://github.com/opensearch-project/opensearch-java/issues/521)
searchAccessor.deletePit(DeletePointInTimeRequest.builder().withPitId(openSearchIndexProgressState.getPitId()).build());
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;

Expand All @@ -30,7 +30,7 @@ public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest create
}

@Override
public SearchPitResponse searchWithPit(SearchPitRequest searchPitRequest) {
public SearchPointInTimeResponse searchWithPit(SearchPointInTimeRequest searchPointInTimeRequest) {
//todo: implement
return null;
}
Expand Down
Loading

0 comments on commit a858cee

Please sign in to comment.