Skip to content

Commit

Permalink
Change vacuum statement semantic (opensearch-project#2606)
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Apr 5, 2024
1 parent e153609 commit 36b423c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,6 @@ private void executeIndexOp(
flintIndexOpAlter.apply(indexMetadata);
break;
case VACUUM:
// Try to perform drop operation first
FlintIndexOp tryDropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
try {
tryDropOp.apply(indexMetadata);
} catch (IllegalStateException e) {
// Drop failed possibly due to invalid initial state
}

// Continue to delete index data physically if state is DELETED
// which means previous transaction succeeds
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
indexVacuumOp.apply(indexMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import com.amazonaws.services.emrserverless.model.JobRun;
import com.amazonaws.services.emrserverless.model.ValidationException;
import com.google.common.collect.Lists;
import java.util.Base64;
import java.util.List;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob;
Expand Down Expand Up @@ -63,22 +63,15 @@ public class IndexQuerySpecVacuumTest extends AsyncQueryExecutorServiceSpec {
.isSpecialCharacter(true));

@Test
public void shouldVacuumIndexInRefreshingState() {
public void shouldVacuumIndexInDeletedState() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(REFRESHING),
List.of(DELETED),
List.of(
// Happy case that there is job running
Pair.<EMRApiCall, EMRApiCall>of(
DEFAULT_OP,
() -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled"))),
// Cancel EMR-S job, but not job running
Pair.<EMRApiCall, EMRApiCall>of(
() -> {
throw new ValidationException("Job run is not in a cancellable state");
},
DEFAULT_OP)));
() -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")))));

runVacuumTestSuite(
testCases,
Expand All @@ -90,32 +83,11 @@ public void shouldVacuumIndexInRefreshingState() {
}

@Test
public void shouldNotVacuumIndexInRefreshingStateIfCancelTimeout() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(REFRESHING),
List.of(
Pair.<EMRApiCall, EMRApiCall>of(
DEFAULT_OP,
() -> new GetJobRunResult().withJobRun(new JobRun().withState("Running")))));

runVacuumTestSuite(
testCases,
(mockDS, response) -> {
assertEquals("FAILED", response.getStatus());
assertEquals("Cancel job operation timed out.", response.getError());
assertTrue(indexExists(mockDS.indexName));
assertTrue(indexDocExists(mockDS.latestId));
});
}

@Test
public void shouldNotVacuumIndexInVacuumingState() {
public void shouldNotVacuumIndexInOtherStates() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(VACUUMING),
List.of(EMPTY, CREATING, ACTIVE, REFRESHING, VACUUMING),
List.of(
Pair.<EMRApiCall, EMRApiCall>of(
() -> {
Expand All @@ -134,39 +106,29 @@ public void shouldNotVacuumIndexInVacuumingState() {
});
}

@Test
public void shouldVacuumIndexWithoutJobRunning() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(EMPTY, CREATING, ACTIVE, DELETED),
List.of(
Pair.<EMRApiCall, EMRApiCall>of(
DEFAULT_OP,
() -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")))));

runVacuumTestSuite(
testCases,
(mockDS, response) -> {
assertEquals("SUCCESS", response.getStatus());
assertFalse(flintIndexExists(mockDS.indexName));
assertFalse(indexDocExists(mockDS.latestId));
});
}

private void runVacuumTestSuite(
List<List<Object>> testCases,
BiConsumer<FlintDatasetMock, AsyncQueryExecutionResponse> assertion) {
testCases.forEach(
params -> {
FlintDatasetMock mockDS = (FlintDatasetMock) params.get(0);
FlintIndexState state = (FlintIndexState) params.get(1);
EMRApiCall cancelJobRun = ((Pair<EMRApiCall, EMRApiCall>) params.get(2)).getLeft();
EMRApiCall getJobRunResult = ((Pair<EMRApiCall, EMRApiCall>) params.get(2)).getRight();

AsyncQueryExecutionResponse response =
runVacuumTest(mockDS, state, cancelJobRun, getJobRunResult);
assertion.accept(mockDS, response);
try {
FlintIndexState state = (FlintIndexState) params.get(1);
EMRApiCall cancelJobRun = ((Pair<EMRApiCall, EMRApiCall>) params.get(2)).getLeft();
EMRApiCall getJobRunResult = ((Pair<EMRApiCall, EMRApiCall>) params.get(2)).getRight();

AsyncQueryExecutionResponse response =
runVacuumTest(mockDS, state, cancelJobRun, getJobRunResult);
assertion.accept(mockDS, response);
} finally {
// Clean up because we simulate parameterized test in single unit test method
if (flintIndexExists(mockDS.indexName)) {
mockDS.deleteIndex();
}
if (indexDocExists(mockDS.latestId)) {
deleteIndexDoc(mockDS.latestId);
}
}
});
}

Expand Down Expand Up @@ -229,6 +191,10 @@ private boolean indexDocExists(String docId) {
.isExists();
}

private void deleteIndexDoc(String docId) {
client.delete(new DeleteRequest(DATASOURCE_TO_REQUEST_INDEX.apply("mys3"), docId)).actionGet();
}

private FlintDatasetMock mockDataset(String query, FlintIndexType indexType, String indexName) {
FlintDatasetMock dataset = new FlintDatasetMock(query, "", indexType, indexName);
dataset.latestId(Base64.getEncoder().encodeToString(indexName.getBytes()));
Expand Down

0 comments on commit 36b423c

Please sign in to comment.