diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index c2351bcd0b..d1ebf21e24 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -16,6 +16,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; +import org.opensearch.client.Client; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; @@ -32,6 +33,7 @@ import org.opensearch.sql.spark.flint.operation.FlintIndexOp; import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter; import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpVacuum; import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** Handle Index DML query. includes * DROP * ALT? */ @@ -51,6 +53,8 @@ public class IndexDMLHandler extends AsyncQueryHandler { private final StateStore stateStore; + private final Client client; + public static boolean isIndexDMLQuery(String jobId) { return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId); } @@ -127,6 +131,23 @@ private void executeIndexOp( flintIndexMetadataService); flintIndexOpAlter.apply(indexMetadata); break; + case VACUUM: + // Try to perform drop operation first + FlintIndexOp tryDropOp = + new FlintIndexOpDrop( + stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient); + try { + tryDropOp.apply(indexMetadata); + } catch (IllegalStateException e) { + // Drop failed possibly due to invalid initial state + } + + // Continue to delete index data physically if state is DELETED + // which means previous transaction succeeds + FlintIndexOp indexVacuumOp = + new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client); + indexVacuumOp.apply(indexMetadata); + break; default: throw new IllegalStateException( String.format( diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index f32c3433e8..2760b30123 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -112,6 +112,7 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails) private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) { return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType()) + || IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType()) || (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType()) && (indexQueryDetails .getFlintIndexOptions() @@ -161,7 +162,11 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { private IndexDMLHandler createIndexDMLHandler(EMRServerlessClient emrServerlessClient) { return new IndexDMLHandler( - emrServerlessClient, jobExecutionResponseReader, flintIndexMetadataService, stateStore); + emrServerlessClient, + jobExecutionResponseReader, + flintIndexMetadataService, + stateStore, + client); } // TODO: Revisit this logic. diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java index 93e44f00ea..96e7d159af 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java @@ -12,5 +12,6 @@ public enum IndexQueryActionType { DESCRIBE, SHOW, DROP, + VACUUM, ALTER } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index e99087b24d..e50a2837d9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -24,6 +24,8 @@ import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; @@ -167,6 +169,33 @@ public T updateState( } } + /** + * Delete the index state document with the given ID. + * + * @param sid index state doc ID + * @param indexName index store index name + * @return true if deleted, otherwise false + */ + @VisibleForTesting + public boolean delete(String sid, String indexName) { + try { + // No action if the index doesn't exist + if (!this.clusterService.state().routingTable().hasIndex(indexName)) { + return true; + } + + try (ThreadContext.StoredContext ignored = + client.threadPool().getThreadContext().stashContext()) { + DeleteRequest deleteRequest = new DeleteRequest(indexName, sid); + DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet(); + return deleteResponse.getResult() == DocWriteResponse.Result.DELETED; + } + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to delete index state doc %s in index %s", sid, indexName), e); + } + } + private void createIndex(String indexName) { try { CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); @@ -328,6 +357,16 @@ public static Function createFlintIn st, FlintIndexStateModel::copy, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); } + /** + * @param stateStore index state store + * @param datasourceName data source name + * @return function that accepts index state doc ID and perform the deletion + */ + public static Function deleteFlintIndexState( + StateStore stateStore, String datasourceName) { + return (docId) -> stateStore.delete(docId, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); + } + public static Function createIndexDMLResult( StateStore stateStore, String indexName) { return (result) -> stateStore.create(result, IndexDMLResult::copy, indexName); diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java index 36ac8fe715..3d6532b8ea 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java @@ -37,7 +37,9 @@ public enum FlintIndexState { // stable state FAILED("failed"), // unknown state, if some state update in Spark side, not reflect in here. - UNKNOWN("unknown"); + UNKNOWN("unknown"), + // special state that instructs StateStore to purge the index state doc + NONE("none"); private final String state; diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java index 37d36a49db..0e99c18eef 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.flint.operation; +import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState; import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState; import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState; @@ -120,7 +121,12 @@ private void commit(FlintIndexStateModel flintIndex) { LOG.debug("Committing the transaction and moving to stable state."); FlintIndexState stableState = stableState(); try { - updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState); + if (stableState == FlintIndexState.NONE) { + LOG.info("Deleting index state with docId: " + flintIndex.getLatestId()); + deleteFlintIndexState(stateStore, datasourceName).apply(flintIndex.getLatestId()); + } else { + updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState); + } } catch (Exception e) { String errorMsg = String.format(Locale.ROOT, "commit failed. target stable state: [%s]", stableState); diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java new file mode 100644 index 0000000000..cf204450e7 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.operation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +/** Flint index vacuum operation. */ +public class FlintIndexOpVacuum extends FlintIndexOp { + + private static final Logger LOG = LogManager.getLogger(); + + /** OpenSearch client. */ + private final Client client; + + public FlintIndexOpVacuum(StateStore stateStore, String datasourceName, Client client) { + super(stateStore, datasourceName); + this.client = client; + } + + @Override + boolean validate(FlintIndexState state) { + return state == FlintIndexState.DELETED; + } + + @Override + FlintIndexState transitioningState() { + return FlintIndexState.VACUUMING; + } + + @Override + public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) { + LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName()); + DeleteIndexRequest request = + new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName()); + AcknowledgedResponse response = client.admin().indices().delete(request).actionGet(); + LOG.info("OpenSearch index delete result: {}", response.isAcknowledged()); + } + + @Override + FlintIndexState stableState() { + // Instruct StateStore to purge the index state doc + return FlintIndexState.NONE; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index 1ac177771c..78978dcb71 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -190,6 +190,31 @@ public Void visitDropMaterializedViewStatement( return super.visitDropMaterializedViewStatement(ctx); } + @Override + public Void visitVacuumSkippingIndexStatement( + FlintSparkSqlExtensionsParser.VacuumSkippingIndexStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM); + indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING); + return super.visitVacuumSkippingIndexStatement(ctx); + } + + @Override + public Void visitVacuumCoveringIndexStatement( + FlintSparkSqlExtensionsParser.VacuumCoveringIndexStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM); + indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING); + return super.visitVacuumCoveringIndexStatement(ctx); + } + + @Override + public Void visitVacuumMaterializedViewStatement( + FlintSparkSqlExtensionsParser.VacuumMaterializedViewStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM); + indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); + indexQueryDetailsBuilder.mvName(ctx.mvName.getText()); + return super.visitVacuumMaterializedViewStatement(ctx); + } + @Override public Void visitDescribeCoveringIndexStatement( FlintSparkSqlExtensionsParser.DescribeCoveringIndexStatementContext ctx) { diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 3acbfc439c..3a9b6e12a9 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -47,7 +47,7 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { "REFRESH SKIPPING INDEX ON mys3.default.http_logs", FlintIndexType.SKIPPING, "flint_mys3_default_http_logs_skipping_index") - .latestId("skippingindexid"); + .latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19za2lwcGluZ19pbmRleA=="); private MockFlintSparkJob mockIndexState; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 25b94f2d11..132074de63 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -29,7 +29,7 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { public final String REFRESH_SI = "REFRESH SKIPPING INDEX on mys3.default.http_logs"; public final String REFRESH_CI = "REFRESH INDEX covering ON mys3.default.http_logs"; - public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mv"; + public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mys3.default.http_logs_metrics"; public final FlintDatasetMock LEGACY_SKIPPING = new FlintDatasetMock( @@ -47,7 +47,10 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { .isLegacy(true); public final FlintDatasetMock LEGACY_MV = new FlintDatasetMock( - "DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv") + "DROP MATERIALIZED VIEW mys3.default.http_logs_metrics", + REFRESH_MV, + FlintIndexType.MATERIALIZED_VIEW, + "flint_mys3_default_http_logs_metrics") .isLegacy(true); public final FlintDatasetMock SKIPPING = @@ -56,18 +59,21 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { REFRESH_SI, FlintIndexType.SKIPPING, "flint_mys3_default_http_logs_skipping_index") - .latestId("skippingindexid"); + .latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19za2lwcGluZ19pbmRleA=="); public final FlintDatasetMock COVERING = new FlintDatasetMock( "DROP INDEX covering ON mys3.default.http_logs", REFRESH_CI, FlintIndexType.COVERING, "flint_mys3_default_http_logs_covering_index") - .latestId("coveringid"); + .latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19jb3ZlcmluZ19pbmRleA=="); public final FlintDatasetMock MV = new FlintDatasetMock( - "DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv") - .latestId("mvid"); + "DROP MATERIALIZED VIEW mys3.default.http_logs_metrics", + REFRESH_MV, + FlintIndexType.MATERIALIZED_VIEW, + "flint_mys3_default_http_logs_metrics") + .latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19tZXRyaWNz"); public final String CREATE_SI_AUTO = "CREATE SKIPPING INDEX ON mys3.default.http_logs" + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)"; @@ -77,7 +83,7 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { + "(l_orderkey, l_quantity) WITH (auto_refresh = true)"; public final String CREATE_MV_AUTO = - "CREATE MATERIALIZED VIEW mv AS select * " + "CREATE MATERIALIZED VIEW mys3.default.http_logs_metrics AS select * " + "from mys3.default.https WITH (auto_refresh = true)"; /** diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java new file mode 100644 index 0000000000..67c89c791c --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java @@ -0,0 +1,240 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery; + +import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; +import static org.opensearch.sql.spark.flint.FlintIndexState.ACTIVE; +import static org.opensearch.sql.spark.flint.FlintIndexState.CREATING; +import static org.opensearch.sql.spark.flint.FlintIndexState.DELETED; +import static org.opensearch.sql.spark.flint.FlintIndexState.EMPTY; +import static org.opensearch.sql.spark.flint.FlintIndexState.REFRESHING; +import static org.opensearch.sql.spark.flint.FlintIndexState.VACUUMING; +import static org.opensearch.sql.spark.flint.FlintIndexType.COVERING; +import static org.opensearch.sql.spark.flint.FlintIndexType.MATERIALIZED_VIEW; +import static org.opensearch.sql.spark.flint.FlintIndexType.SKIPPING; + +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.google.common.collect.Lists; +import java.util.Base64; +import java.util.List; +import java.util.function.BiConsumer; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Test; +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.opensearch.action.get.GetRequest; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; +import org.opensearch.sql.spark.rest.model.LangType; + +@SuppressWarnings({"unchecked", "rawtypes"}) +public class IndexQuerySpecVacuumTest extends AsyncQueryExecutorServiceSpec { + + private static final EMRApiCall DEFAULT_OP = () -> null; + + private final List FLINT_TEST_DATASETS = + List.of( + mockDataset( + "VACUUM SKIPPING INDEX ON mys3.default.http_logs", + SKIPPING, + "flint_mys3_default_http_logs_skipping_index"), + mockDataset( + "VACUUM INDEX covering ON mys3.default.http_logs", + COVERING, + "flint_mys3_default_http_logs_covering_index"), + mockDataset( + "VACUUM MATERIALIZED VIEW mys3.default.http_logs_metrics", + MATERIALIZED_VIEW, + "flint_mys3_default_http_logs_metrics")); + + @Test + public void shouldVacuumIndexInRefreshingState() { + List> testCases = + Lists.cartesianProduct( + FLINT_TEST_DATASETS, + List.of(REFRESHING), + List.of( + // Happy case that there is job running + Pair.of( + DEFAULT_OP, + () -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled"))), + // Cancel EMR-S job, but not job running + Pair.of( + () -> { + throw new IllegalArgumentException("Job run is not in a cancellable state"); + }, + DEFAULT_OP))); + + runVacuumTestSuite( + testCases, + (mockDS, response) -> { + assertEquals("SUCCESS", response.getStatus()); + assertFalse(flintIndexExists(mockDS.indexName)); + assertFalse(indexDocExists(mockDS.latestId)); + }); + } + + @Test + public void shouldNotVacuumIndexInRefreshingStateIfCancelTimeout() { + List> testCases = + Lists.cartesianProduct( + FLINT_TEST_DATASETS, + List.of(REFRESHING), + List.of( + Pair.of( + DEFAULT_OP, + () -> new GetJobRunResult().withJobRun(new JobRun().withState("Running"))))); + + runVacuumTestSuite( + testCases, + (mockDS, response) -> { + assertEquals("FAILED", response.getStatus()); + assertEquals("Cancel job operation timed out.", response.getError()); + assertTrue(indexExists(mockDS.indexName)); + assertTrue(indexDocExists(mockDS.latestId)); + }); + } + + @Test + public void shouldNotVacuumIndexInVacuumingState() { + List> testCases = + Lists.cartesianProduct( + FLINT_TEST_DATASETS, + List.of(VACUUMING), + List.of( + Pair.of( + () -> { + throw new AssertionError("should not call cancelJobRun"); + }, + () -> { + throw new AssertionError("should not call getJobRunResult"); + }))); + + runVacuumTestSuite( + testCases, + (mockDS, response) -> { + assertEquals("FAILED", response.getStatus()); + assertTrue(flintIndexExists(mockDS.indexName)); + assertTrue(indexDocExists(mockDS.latestId)); + }); + } + + @Test + public void shouldVacuumIndexWithoutJobRunning() { + List> testCases = + Lists.cartesianProduct( + FLINT_TEST_DATASETS, + List.of(EMPTY, CREATING, ACTIVE, DELETED), + List.of( + Pair.of( + DEFAULT_OP, + () -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled"))))); + + runVacuumTestSuite( + testCases, + (mockDS, response) -> { + assertEquals("SUCCESS", response.getStatus()); + assertFalse(flintIndexExists(mockDS.indexName)); + assertFalse(indexDocExists(mockDS.latestId)); + }); + } + + private void runVacuumTestSuite( + List> testCases, + BiConsumer assertion) { + testCases.forEach( + params -> { + FlintDatasetMock mockDS = (FlintDatasetMock) params.get(0); + FlintIndexState state = (FlintIndexState) params.get(1); + EMRApiCall cancelJobRun = ((Pair) params.get(2)).getLeft(); + EMRApiCall getJobRunResult = ((Pair) params.get(2)).getRight(); + + AsyncQueryExecutionResponse response = + runVacuumTest(mockDS, state, cancelJobRun, getJobRunResult); + assertion.accept(mockDS, response); + }); + } + + private AsyncQueryExecutionResponse runVacuumTest( + FlintDatasetMock mockDS, + FlintIndexState state, + EMRApiCall cancelJobRun, + EMRApiCall getJobRunResult) { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + if (cancelJobRun == DEFAULT_OP) { + return super.cancelJobRun(applicationId, jobId); + } + return cancelJobRun.call(); + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + if (getJobRunResult == DEFAULT_OP) { + return super.getJobRunResult(applicationId, jobId); + } + return getJobRunResult.call(); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + + // Mock Flint index + mockDS.createIndex(); + + // Mock index state doc + MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(stateStore, mockDS.latestId, "mys3"); + flintIndexJob.transition(state); + + // Vacuum index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + return asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + } + + private boolean flintIndexExists(String flintIndexName) { + return client + .admin() + .indices() + .exists(new IndicesExistsRequest(flintIndexName)) + .actionGet() + .isExists(); + } + + private boolean indexDocExists(String docId) { + return client + .get(new GetRequest(DATASOURCE_TO_REQUEST_INDEX.apply("mys3"), docId)) + .actionGet() + .isExists(); + } + + private FlintDatasetMock mockDataset(String query, FlintIndexType indexType, String indexName) { + FlintDatasetMock dataset = new FlintDatasetMock(query, "", indexType, indexName); + dataset.latestId(Base64.getEncoder().encodeToString(indexName.getBytes())); + return dataset; + } + + /** + * EMR API call mock interface. + * + * @param API call response type + */ + @FunctionalInterface + public interface EMRApiCall { + V call(); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java index 0840ce975c..4cfdb6a9a9 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java @@ -38,6 +38,11 @@ public MockFlintSparkJob(StateStore stateStore, String latestId, String datasour stateModel = StateStore.createFlintIndexState(stateStore, datasource).apply(stateModel); } + public void transition(FlintIndexState newState) { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource).apply(stateModel, newState); + } + public void refreshing() { stateModel = StateStore.updateFlintIndexState(stateStore, datasource) diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java index ac03e817dd..045de66d0a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java @@ -24,6 +24,7 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.Client; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -46,10 +47,12 @@ class IndexDMLHandlerTest { @Mock private JobExecutionResponseReader jobExecutionResponseReader; @Mock private FlintIndexMetadataService flintIndexMetadataService; @Mock private StateStore stateStore; + @Mock private Client client; @Test public void getResponseFromExecutor() { - JSONObject result = new IndexDMLHandler(null, null, null, null).getResponseFromExecutor(null); + JSONObject result = + new IndexDMLHandler(null, null, null, null, null).getResponseFromExecutor(null); assertEquals("running", result.getString(STATUS_FIELD)); assertEquals("", result.getString(ERROR_FIELD)); @@ -59,7 +62,11 @@ public void getResponseFromExecutor() { public void testWhenIndexDetailsAreNotFound() { IndexDMLHandler indexDMLHandler = new IndexDMLHandler( - emrServerlessClient, jobExecutionResponseReader, flintIndexMetadataService, stateStore); + emrServerlessClient, + jobExecutionResponseReader, + flintIndexMetadataService, + stateStore, + client); DispatchQueryRequest dispatchQueryRequest = new DispatchQueryRequest( EMRS_APPLICATION_ID, @@ -97,7 +104,11 @@ public void testWhenIndexDetailsWithInvalidQueryActionType() { FlintIndexMetadata flintIndexMetadata = mock(FlintIndexMetadata.class); IndexDMLHandler indexDMLHandler = new IndexDMLHandler( - emrServerlessClient, jobExecutionResponseReader, flintIndexMetadataService, stateStore); + emrServerlessClient, + jobExecutionResponseReader, + flintIndexMetadataService, + stateStore, + client); DispatchQueryRequest dispatchQueryRequest = new DispatchQueryRequest( EMRS_APPLICATION_ID, diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json index 54ed5e05e1..811204847c 100644 --- a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json +++ b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_covering_index.json @@ -19,7 +19,7 @@ "columnName": "request_url" } ], - "name": "test", + "name": "covering", "options": { "auto_refresh": "true", "index_settings": "{\"number_of_shards\":1,\"number_of_replicas\":1}" @@ -32,6 +32,6 @@ "SERVERLESS_EMR_JOB_ID": "00fe3gu2tgad000q" } }, - "latestId": "coveringid" + "latestId": "ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19jb3ZlcmluZ19pbmRleA==" } } diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json index 1a9c74806a..1369f9c721 100644 --- a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json +++ b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_mv.json @@ -11,7 +11,7 @@ "columnName": "count" } ], - "name": "spark_catalog.default.http_logs_metrics_chen", + "name": "mys3.default.http_logs_metrics", "options": { "auto_refresh": "true", "checkpoint_location": "s3://flint-data-dp-eu-west-1-beta/data/checkpoint/chen-job-1", @@ -25,6 +25,6 @@ "SERVERLESS_EMR_JOB_ID": "00fe86mkk5q3u00q" } }, - "latestId": "mvid" + "latestId": "ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19tZXRyaWNz" } } diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json index 5e7c9175fd..2f65b1d8ee 100644 --- a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json +++ b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_skipping_index.json @@ -18,6 +18,6 @@ "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" } }, - "latestId": "skippingindexid" + "latestId": "ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19za2lwcGluZ19pbmRleA==" } }