Skip to content

Commit

Permalink
Create and delete point in time for processing an index
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Jun 6, 2023
1 parent 10d3984 commit b77da85
Show file tree
Hide file tree
Showing 20 changed files with 750 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertThrows;

class GenericExpressionEvaluator_ConditionalIT {
Expand Down
2 changes: 1 addition & 1 deletion data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies {
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'org.opensearch.client:opensearch-java:2.4.0'
implementation 'org.opensearch.client:opensearch-java:2.5.0'
implementation 'org.opensearch.client:opensearch-rest-client:2.7.0'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation 'org.apache.maven:maven-artifact:3.0.3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,57 @@

package org.opensearch.dataprepper.plugins.source.opensearch;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.time.Instant;
import java.util.Objects;

public class OpenSearchIndexProgressState {

private String pitId;
private Long pitCreationTime;
private Long keepAlive;

public OpenSearchIndexProgressState() {

}

@JsonCreator
public OpenSearchIndexProgressState(@JsonProperty("pit_id") final String pitId,
@JsonProperty("pit_creation_time") final Long pitCreationTime,
@JsonProperty("pit_keep_alive") final Long pitKeepAlive) {
this.pitId = pitId;
this.pitCreationTime = pitCreationTime;
this.keepAlive = pitKeepAlive;
}

public String getPitId() {
return pitId;
}

public void setPitId(final String pitId) {
this.pitId = pitId;
}

public Long getPitCreationTime() {
return pitCreationTime;
}

public void setPitCreationTime(final Long pitCreationTime) {
this.pitCreationTime = pitCreationTime;
}

public Long getKeepAlive() {
return keepAlive;
}

public void setKeepAlive(final Long keepAlive) {
this.keepAlive = keepAlive;
}

public boolean hasValidPointInTime() {
return Objects.nonNull(pitId) && Objects.nonNull(pitCreationTime) && Objects.nonNull(keepAlive)
&& Instant.ofEpochMilli(pitCreationTime + keepAlive).isAfter(Instant.now());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.util.Map;

public class AwsAuthenticationConfiguration {
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
Expand All @@ -27,6 +25,9 @@ public class AwsAuthenticationConfiguration {
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

@JsonProperty("enable_sigv4")
private Boolean sigv4Enabled = false;

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}
Expand All @@ -38,5 +39,7 @@ public Region getAwsRegion() {
public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}

public Boolean isSigv4Enabled() { return sigv4Enabled; }
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,20 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotFoundException;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionNotOwnedException;
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Optional;

/**
Expand All @@ -31,6 +39,8 @@ public class PitWorker implements SearchWorker, Runnable {
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

private static final int STANDARD_BACKOFF_MILLIS = 30_000;
static final String DEFAULT_KEEP_ALIVE = "30m"; // 30 minutes (will be extended during search)
private static final Duration BACKOFF_ON_PIT_LIMIT_REACHED = Duration.ofSeconds(60);

public PitWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
Expand All @@ -53,18 +63,69 @@ public void run() {
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
continue;
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
LOG.info("The PitWorker was interrupted while sleeping after acquiring no indices to process, stopping processing");
return;
}
}

// todo: process the index and write to the buffer
try {
processIndex(indexPartition.get());

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
} catch (final SearchContextLimitException e) {
LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}",
indexPartition.get().getPartitionKey(), BACKOFF_ON_PIT_LIMIT_REACHED.getSeconds(), e.getMessage());
sourceCoordinator.giveUpPartitions();
try {
Thread.sleep(BACKOFF_ON_PIT_LIMIT_REACHED.toMillis());
} catch (final InterruptedException ex) {
return;
}
} catch (final RuntimeException e) {
LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e);
sourceCoordinator.giveUpPartitions();
}
}
}

private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition) {
final String indexName = openSearchIndexPartition.getPartitionKey();
Optional<OpenSearchIndexProgressState> openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState();

if (openSearchIndexProgressStateOptional.isEmpty()) {
openSearchIndexProgressStateOptional = Optional.of(initializeProgressState());
}

final OpenSearchIndexProgressState openSearchIndexProgressState = openSearchIndexProgressStateOptional.get();

if (!openSearchIndexProgressState.hasValidPointInTime()) {
final CreatePointInTimeResponse createPointInTimeResponse = searchAccessor.createPit(CreatePointInTimeRequest.builder()
.withIndex(indexName)
.withKeepAlive(DEFAULT_KEEP_ALIVE)
.build());

LOG.debug("Created point in time for index {} with pit id {}", indexName, createPointInTimeResponse.getPitId());

openSearchIndexProgressState.setPitId(createPointInTimeResponse.getPitId());
openSearchIndexProgressState.setPitCreationTime(createPointInTimeResponse.getPitCreationTime());
openSearchIndexProgressState.setKeepAlive(createPointInTimeResponse.getKeepAlive());
}

// todo: Implement search with pit and write documents to buffer, checkpoint with calls to saveState


// todo: This API call is failing with sigv4 enabled due to a mismatch in the signature
searchAccessor.deletePit(DeletePointInTimeRequest.builder().withPitId(openSearchIndexProgressState.getPitId()).build());
}

private OpenSearchIndexProgressState initializeProgressState() {
return new OpenSearchIndexProgressState();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker.client;

import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePitRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest;
Expand All @@ -24,7 +24,7 @@ public SearchContextType getSearchContextType() {
}

@Override
public CreatePitResponse createPit(CreatePitRequest createPitRequest) {
public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest createPointInTimeRequest) {
//todo: implement
return null;
}
Expand All @@ -36,7 +36,7 @@ public SearchPitResponse searchWithPit(SearchPitRequest searchPitRequest) {
}

@Override
public void deletePit(DeletePitRequest deletePitRequest) {
public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) {
//todo: implement
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,37 @@
package org.opensearch.dataprepper.plugins.source.opensearch.worker.client;

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePitResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch._types.Time;
import org.opensearch.client.opensearch.core.pit.CreatePitRequest;
import org.opensearch.client.opensearch.core.pit.CreatePitResponse;
import org.opensearch.client.opensearch.core.pit.DeletePitRequest;
import org.opensearch.client.opensearch.core.pit.DeletePitResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreatePointInTimeResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.CreateScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePitRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPitResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory {

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchAccessor.class);

static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception";

private final OpenSearchClient openSearchClient;
private final SearchContextType searchContextType;

Expand All @@ -33,41 +50,83 @@ public SearchContextType getSearchContextType() {
}

@Override
public CreatePitResponse createPit(CreatePitRequest createPitRequest) {
//todo: implement
return null;
public CreatePointInTimeResponse createPit(final CreatePointInTimeRequest createPointInTimeRequest) {
CreatePitResponse createPitResponse;
try {
createPitResponse = openSearchClient.createPit(CreatePitRequest.of(builder -> builder
.targetIndexes(createPointInTimeRequest.getIndex())
.keepAlive(new Time.Builder().time(createPointInTimeRequest.getKeepAlive()).build())));
} catch (final OpenSearchException e) {
if (isDueToPitLimitExceeded(e)) {
throw new SearchContextLimitException(String.format("There was an error creating a new point in time for index '%s': %s", createPointInTimeRequest.getIndex(),
e.error().causedBy().reason()));
}
LOG.error("There was an error creating a point in time for OpenSearch: ", e);
throw e;
} catch (final IOException e) {
LOG.error("There was an error creating a point in time for OpenSearch: ", e);
throw new RuntimeException(e);
}

return CreatePointInTimeResponse.builder()
.withPitId(createPitResponse.pitId())
.withCreationTime(createPitResponse.creationTime())
.build();
}

@Override
public SearchPitResponse searchWithPit(SearchPitRequest searchPitRequest) {
//todo: implement
public SearchPitResponse searchWithPit(final SearchPitRequest searchPitRequest) {
// todo: implement
return null;
}

@Override
public void deletePit(DeletePitRequest deletePitRequest) {
//todo: implement
public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) {
try {
final DeletePitResponse deletePitResponse = openSearchClient.deletePit(DeletePitRequest.of(builder -> builder.pitId(Collections.singletonList(deletePointInTimeRequest.getPitId()))));
if (isPitDeletedSuccessfully(deletePitResponse)) {
LOG.debug("Successfully deleted point in time id {}", deletePointInTimeRequest.getPitId());
} else {
LOG.warn("Point in time id {} was not deleted successfully. It will expire from keep-alive", deletePointInTimeRequest.getPitId());
}
} catch (final OpenSearchException e) {
LOG.error("There was an error deleting the point in time with id {} for OpenSearch: ", deletePointInTimeRequest.getPitId(), e);
throw e;
} catch (IOException e) {
LOG.error("There was an error deleting the point in time with id {} for OpenSearch: {}", deletePointInTimeRequest.getPitId(), e.getMessage());
throw new RuntimeException(e);
}
}

@Override
public CreateScrollResponse createScroll(CreateScrollRequest createScrollRequest) {
public CreateScrollResponse createScroll(final CreateScrollRequest createScrollRequest) {
//todo: implement
return null;
}

@Override
public SearchScrollResponse searchWithScroll(SearchScrollRequest searchScrollRequest) {
public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScrollRequest) {
//todo: implement
return null;
}

@Override
public void deleteScroll(DeleteScrollRequest deleteScrollRequest) {
public void deleteScroll(final DeleteScrollRequest deleteScrollRequest) {
//todo: implement
}

@Override
public Object getClient() {
return openSearchClient;
}

private boolean isPitDeletedSuccessfully(final DeletePitResponse deletePitResponse) {
return Objects.nonNull(deletePitResponse.pits()) && deletePitResponse.pits().size() == 1
&& Objects.nonNull(deletePitResponse.pits().get(0)) && deletePitResponse.pits().get(0).successful();
}

private boolean isDueToPitLimitExceeded(final OpenSearchException e) {
return Objects.nonNull(e.error()) && Objects.nonNull(e.error().causedBy()) && Objects.nonNull(e.error().causedBy().type())
&& PIT_RESOURCE_LIMIT_ERROR_TYPE.equals(e.error().causedBy().type());
}
}
Loading

0 comments on commit b77da85

Please sign in to comment.