diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index 5daf91b59c..fe848593a7 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -142,8 +142,6 @@ private FlintIndexOp getIndexOp( case ALTER: return flintIndexOpFactory.getAlter( indexQueryDetails.getFlintIndexOptions(), dispatchQueryRequest.getDatasource()); - case VACUUM: - return flintIndexOpFactory.getVacuum(dispatchQueryRequest.getDatasource()); default: throw new IllegalStateException( String.format( diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 4df2b5450d..50e8403d36 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -150,7 +150,6 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails) private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) { return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType()) - || IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType()) || (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType()) && (indexQueryDetails .getFlintIndexOptions() diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java index 9f925e0bcf..d82b29e928 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java @@ -36,15 +36,6 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da asyncQueryScheduler); } - public FlintIndexOpVacuum getVacuum(String datasource) { - return new FlintIndexOpVacuum( - flintIndexStateModelService, - datasource, - flintIndexClient, - emrServerlessClientFactory, - asyncQueryScheduler); - } - public FlintIndexOpCancel getCancel(String datasource) { return new FlintIndexOpCancel( flintIndexStateModelService, datasource, emrServerlessClientFactory); diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java deleted file mode 100644 index 324ddb5720..0000000000 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint.operation; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; -import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.flint.FlintIndexClient; -import org.opensearch.sql.spark.flint.FlintIndexMetadata; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; -import org.opensearch.sql.spark.flint.FlintIndexStateModelService; -import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; - -/** Flint index vacuum operation. */ -public class FlintIndexOpVacuum extends FlintIndexOp { - private static final Logger LOG = LogManager.getLogger(); - - private final AsyncQueryScheduler asyncQueryScheduler; - - /** OpenSearch client. */ - private final FlintIndexClient flintIndexClient; - - public FlintIndexOpVacuum( - FlintIndexStateModelService flintIndexStateModelService, - String datasourceName, - FlintIndexClient flintIndexClient, - EMRServerlessClientFactory emrServerlessClientFactory, - AsyncQueryScheduler asyncQueryScheduler) { - super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); - this.flintIndexClient = flintIndexClient; - this.asyncQueryScheduler = asyncQueryScheduler; - } - - @Override - boolean validate(FlintIndexState state) { - return state == FlintIndexState.DELETED; - } - - @Override - FlintIndexState transitioningState() { - return FlintIndexState.VACUUMING; - } - - @Override - public void runOp( - FlintIndexMetadata flintIndexMetadata, - FlintIndexStateModel flintIndex, - AsyncQueryRequestContext asyncQueryRequestContext) { - LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName()); - if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { - asyncQueryScheduler.removeJob(flintIndexMetadata.getOpensearchIndexName()); - } - flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName()); - } - - @Override - FlintIndexState stableState() { - // Instruct StateStore to purge the index state doc - return FlintIndexState.NONE; - } -} diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index ce3bcab06b..b1a8c3d4f6 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -267,31 +267,6 @@ public Void visitDropMaterializedViewStatement( return super.visitDropMaterializedViewStatement(ctx); } - @Override - public Void visitVacuumSkippingIndexStatement( - FlintSparkSqlExtensionsParser.VacuumSkippingIndexStatementContext ctx) { - indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM); - indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING); - return super.visitVacuumSkippingIndexStatement(ctx); - } - - @Override - public Void visitVacuumCoveringIndexStatement( - FlintSparkSqlExtensionsParser.VacuumCoveringIndexStatementContext ctx) { - indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM); - indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING); - return super.visitVacuumCoveringIndexStatement(ctx); - } - - @Override - public Void visitVacuumMaterializedViewStatement( - FlintSparkSqlExtensionsParser.VacuumMaterializedViewStatementContext ctx) { - indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.VACUUM); - indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); - indexQueryDetailsBuilder.mvName(ctx.mvName.getText()); - return super.visitVacuumMaterializedViewStatement(ctx); - } - @Override public Void visitDescribeCoveringIndexStatement( FlintSparkSqlExtensionsParser.DescribeCoveringIndexStatementContext ctx) { diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index 9f78dd2cdb..3607482d3f 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -237,32 +237,12 @@ public void createDropIndexQueryWithScheduler() { public void createVacuumIndexQuery() { givenSparkExecutionEngineConfigIsSupplied(); givenValidDataSourceMetadataExist(); + givenSessionExists(); when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID); - String indexName = "flint_datasource_name_table_name_index_name_index"; - givenFlintIndexMetadataExists(indexName); - - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - "VACUUM INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL), - asyncQueryRequestContext); - - assertEquals(QUERY_ID, response.getQueryId()); - assertNull(response.getSessionId()); - verifyGetQueryIdCalled(); - verify(flintIndexClient).deleteIndex(indexName); - verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH); - } - - @Test - public void createVacuumIndexQueryWithScheduler() { - givenSparkExecutionEngineConfigIsSupplied(); - givenValidDataSourceMetadataExist(); - when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID); - - String indexName = "flint_datasource_name_table_name_index_name_index"; - givenFlintIndexMetadataExistsWithExternalScheduler(indexName); + when(sessionIdProvider.getSessionId(any())).thenReturn(SESSION_ID); + givenSessionExists(); // called twice + when(awsemrServerless.startJobRun(any())) + .thenReturn(new StartJobRunResult().withApplicationId(APPLICATION_ID).withJobRunId(JOB_ID)); CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( @@ -271,14 +251,12 @@ public void createVacuumIndexQueryWithScheduler() { asyncQueryRequestContext); assertEquals(QUERY_ID, response.getQueryId()); - assertNull(response.getSessionId()); + assertEquals(SESSION_ID, response.getSessionId()); verifyGetQueryIdCalled(); - - verify(flintIndexClient).deleteIndex(indexName); - verifyCreateIndexDMLResultCalled(); + verifyGetSessionIdCalled(); + verify(leaseManager).borrow(any()); + verifyStartJobRunCalled(); verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH); - - verify(asyncQueryScheduler).removeJob(indexName); } @Test diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 5154b71574..8b855c190c 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -163,84 +163,12 @@ void setUp() { @Test void testDispatchSelectQuery() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "select * from my_glue.default.http_logs"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString(query, null, QUERY_ID); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch( - DispatchQueryRequest.builder() - .applicationId(EMRS_APPLICATION_ID) - .query(query) - .datasource(MY_GLUE) - .langType(LangType.SQL) - .executionRoleARN(EMRS_EXECUTION_ROLE) - .clusterName(TEST_CLUSTER_NAME) - .sparkSubmitParameterModifier(sparkSubmitParameterModifier) - .build(), - asyncQueryRequestContext); - - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); + testDispatchBatchQuery("select * from my_glue.default.http_logs"); } @Test void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "select * from my_glue.default.http_logs"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString(query, null, QUERY_ID); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithBasicAuth(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); + testDispatchBatchQuery("select * from my_glue.default.http_logs"); } @Test @@ -354,41 +282,9 @@ void testDispatchCreateAutoRefreshIndexQuery() { @Test void testDispatchCreateManualRefreshIndexQuery() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, "my_glue"); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = + testDispatchBatchQuery( "CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH" - + " (auto_refresh = false)"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString(query, null, QUERY_ID); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - "my_glue", asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertEquals(JobType.BATCH, dispatchQueryResponse.getJobType()); - verifyNoInteractions(flintIndexMetadataService); + + " (auto_refresh = false)"); } @Test @@ -460,84 +356,12 @@ void testDispatchWithSparkUDFQuery() { @Test void testInvalidSQLQueryDispatchToSpark() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "myselect 1"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString(query, null, QUERY_ID); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch( - DispatchQueryRequest.builder() - .applicationId(EMRS_APPLICATION_ID) - .query(query) - .datasource(MY_GLUE) - .langType(LangType.SQL) - .executionRoleARN(EMRS_EXECUTION_ROLE) - .clusterName(TEST_CLUSTER_NAME) - .sparkSubmitParameterModifier(sparkSubmitParameterModifier) - .build(), - asyncQueryRequestContext); - - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); + testDispatchBatchQuery("myselect 1"); } @Test void testDispatchQueryWithoutATableAndDataSourceName() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "show tables"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString(query, null, QUERY_ID); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); + testDispatchBatchQuery("show tables"); } @Test @@ -619,38 +443,7 @@ void testDispatchMaterializedViewQuery() { @Test void testDispatchShowMVQuery() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "SHOW MATERIALIZED VIEW IN mys3.default"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString(query, null, QUERY_ID); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); + testDispatchBatchQuery("SHOW MATERIALIZED VIEW IN mys3.default"); } @Test @@ -692,38 +485,7 @@ void testRefreshIndexQuery() { @Test void testDispatchDescribeIndexQuery() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "DESCRIBE SKIPPING INDEX ON mys3.default.http_logs"; - String sparkSubmitParameters = - constructExpectedSparkSubmitParameterString(query, null, QUERY_ID); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); + testDispatchBatchQuery("DESCRIBE SKIPPING INDEX ON mys3.default.http_logs"); } @Test @@ -817,26 +579,7 @@ void testDispatchDropIndexQuery() { @Test void testDispatchVacuumIndexQuery() { - QueryHandlerFactory queryHandlerFactory = mock(QueryHandlerFactory.class); - sparkQueryDispatcher = - new SparkQueryDispatcher( - dataSourceService, sessionManager, queryHandlerFactory, queryIdProvider); - - String query = "VACUUM INDEX elb_and_requestUri ON my_glue.default.http_logs"; - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - "my_glue", asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - when(queryHandlerFactory.getIndexDMLHandler()) - .thenReturn( - new IndexDMLHandler( - jobExecutionResponseReader, - flintIndexMetadataService, - indexDMLResultStorageService, - flintIndexOpFactory)); - - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - verify(queryHandlerFactory, times(1)).getIndexDMLHandler(); + testDispatchBatchQuery("VACUUM INDEX elb_and_requestUri ON my_glue.default.http_logs"); } @Test @@ -1087,6 +830,42 @@ void testDispatchQueryWithExtraSparkSubmitParameters() { } } + private void testDispatchBatchQuery(String query) { + when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); + when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID); + HashMap tags = new HashMap<>(); + tags.put(DATASOURCE_TAG_KEY, MY_GLUE); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); + + String sparkSubmitParameters = + constructExpectedSparkSubmitParameterString(query, null, QUERY_ID); + StartJobRequest expected = + new StartJobRequest( + "TEST_CLUSTER:batch", + null, + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + false, + "query_execution_result_my_glue"); + when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( + MY_GLUE, asyncQueryRequestContext)) + .thenReturn(dataSourceMetadata); + + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); + + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); + Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertEquals(JobType.BATCH, dispatchQueryResponse.getJobType()); + verifyNoInteractions(flintIndexMetadataService); + } + private String constructExpectedSparkSubmitParameterString(String query) { return constructExpectedSparkSubmitParameterString(query, null, null); } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java index 62ac98f1a2..e73c5614ae 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java @@ -41,11 +41,6 @@ void getAlter() { assertNotNull(flintIndexOpFactory.getAlter(new FlintIndexOptions(), DATASOURCE_NAME)); } - @Test - void getVacuum() { - assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME)); - } - @Test void getCancel() { assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME)); diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java deleted file mode 100644 index 08f8efd488..0000000000 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint.operation; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.Optional; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; -import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; -import org.opensearch.sql.spark.flint.FlintIndexClient; -import org.opensearch.sql.spark.flint.FlintIndexMetadata; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; -import org.opensearch.sql.spark.flint.FlintIndexStateModelService; -import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; - -@ExtendWith(MockitoExtension.class) -class FlintIndexOpVacuumTest { - - public static final String DATASOURCE_NAME = "DATASOURCE_NAME"; - public static final String LATEST_ID = "LATEST_ID"; - public static final String INDEX_NAME = "INDEX_NAME"; - - public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITH_LATEST_ID = - createFlintIndexMetadataWithLatestId(); - - public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITHOUT_LATEST_ID = - createFlintIndexMetadataWithoutLatestId(); - - @Mock FlintIndexClient flintIndexClient; - @Mock FlintIndexStateModelService flintIndexStateModelService; - @Mock EMRServerlessClientFactory emrServerlessClientFactory; - @Mock FlintIndexStateModel flintIndexStateModel; - @Mock FlintIndexStateModel transitionedFlintIndexStateModel; - @Mock AsyncQueryRequestContext asyncQueryRequestContext; - @Mock AsyncQueryScheduler asyncQueryScheduler; - - RuntimeException testException = new RuntimeException("Test Exception"); - - FlintIndexOpVacuum flintIndexOpVacuum; - - @BeforeEach - public void setUp() { - flintIndexOpVacuum = - new FlintIndexOpVacuum( - flintIndexStateModelService, - DATASOURCE_NAME, - flintIndexClient, - emrServerlessClientFactory, - asyncQueryScheduler); - } - - private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() { - return FlintIndexMetadata.builder() - .latestId(LATEST_ID) - .opensearchIndexName(INDEX_NAME) - .flintIndexOptions(new FlintIndexOptions()) - .build(); - } - - private static FlintIndexMetadata createFlintIndexMetadataWithoutLatestId() { - return FlintIndexMetadata.builder() - .opensearchIndexName(INDEX_NAME) - .flintIndexOptions(new FlintIndexOptions()) - .build(); - } - - private FlintIndexMetadata createFlintIndexMetadataWithExternalScheduler() { - FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); - flintIndexOptions.setOption(FlintIndexOptions.SCHEDULER_MODE, "external"); - - return FlintIndexMetadata.builder() - .opensearchIndexName(INDEX_NAME) - .flintIndexOptions(flintIndexOptions) - .build(); - } - - @Test - public void testApplyWithEmptyLatestId() { - flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITHOUT_LATEST_ID, asyncQueryRequestContext); - - verify(flintIndexClient).deleteIndex(INDEX_NAME); - } - - @Test - public void testApplyWithFlintIndexStateNotFound() { - when(flintIndexStateModelService.getFlintIndexStateModel( - LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext)) - .thenReturn(Optional.empty()); - - assertThrows( - IllegalStateException.class, - () -> - flintIndexOpVacuum.apply( - FLINT_INDEX_METADATA_WITH_LATEST_ID, asyncQueryRequestContext)); - } - - @Test - public void testApplyWithNotDeletedState() { - when(flintIndexStateModelService.getFlintIndexStateModel( - LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext)) - .thenReturn(Optional.of(flintIndexStateModel)); - when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.ACTIVE); - - assertThrows( - IllegalStateException.class, - () -> - flintIndexOpVacuum.apply( - FLINT_INDEX_METADATA_WITH_LATEST_ID, asyncQueryRequestContext)); - } - - @Test - public void testApplyWithUpdateFlintIndexStateThrow() { - when(flintIndexStateModelService.getFlintIndexStateModel( - LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext)) - .thenReturn(Optional.of(flintIndexStateModel)); - when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); - when(flintIndexStateModelService.updateFlintIndexState( - flintIndexStateModel, - FlintIndexState.VACUUMING, - DATASOURCE_NAME, - asyncQueryRequestContext)) - .thenThrow(testException); - - assertThrows( - IllegalStateException.class, - () -> - flintIndexOpVacuum.apply( - FLINT_INDEX_METADATA_WITH_LATEST_ID, asyncQueryRequestContext)); - } - - @Test - public void testApplyWithRunOpThrow() { - when(flintIndexStateModelService.getFlintIndexStateModel( - LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext)) - .thenReturn(Optional.of(flintIndexStateModel)); - when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); - when(flintIndexStateModelService.updateFlintIndexState( - flintIndexStateModel, - FlintIndexState.VACUUMING, - DATASOURCE_NAME, - asyncQueryRequestContext)) - .thenReturn(transitionedFlintIndexStateModel); - doThrow(testException).when(flintIndexClient).deleteIndex(INDEX_NAME); - - assertThrows( - Exception.class, - () -> - flintIndexOpVacuum.apply( - FLINT_INDEX_METADATA_WITH_LATEST_ID, asyncQueryRequestContext)); - - verify(flintIndexStateModelService) - .updateFlintIndexState( - transitionedFlintIndexStateModel, - FlintIndexState.DELETED, - DATASOURCE_NAME, - asyncQueryRequestContext); - } - - @Test - public void testApplyWithRunOpThrowAndRollbackThrow() { - when(flintIndexStateModelService.getFlintIndexStateModel( - LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext)) - .thenReturn(Optional.of(flintIndexStateModel)); - when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); - when(flintIndexStateModelService.updateFlintIndexState( - flintIndexStateModel, - FlintIndexState.VACUUMING, - DATASOURCE_NAME, - asyncQueryRequestContext)) - .thenReturn(transitionedFlintIndexStateModel); - doThrow(testException).when(flintIndexClient).deleteIndex(INDEX_NAME); - when(flintIndexStateModelService.updateFlintIndexState( - transitionedFlintIndexStateModel, - FlintIndexState.DELETED, - DATASOURCE_NAME, - asyncQueryRequestContext)) - .thenThrow(testException); - - assertThrows( - Exception.class, - () -> - flintIndexOpVacuum.apply( - FLINT_INDEX_METADATA_WITH_LATEST_ID, asyncQueryRequestContext)); - } - - @Test - public void testApplyWithDeleteFlintIndexStateModelThrow() { - when(flintIndexStateModelService.getFlintIndexStateModel( - LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext)) - .thenReturn(Optional.of(flintIndexStateModel)); - when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); - when(flintIndexStateModelService.updateFlintIndexState( - flintIndexStateModel, - FlintIndexState.VACUUMING, - DATASOURCE_NAME, - asyncQueryRequestContext)) - .thenReturn(transitionedFlintIndexStateModel); - when(flintIndexStateModelService.deleteFlintIndexStateModel( - LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext)) - .thenThrow(testException); - - assertThrows( - IllegalStateException.class, - () -> - flintIndexOpVacuum.apply( - FLINT_INDEX_METADATA_WITH_LATEST_ID, asyncQueryRequestContext)); - } - - @Test - public void testApplyHappyPath() { - when(flintIndexStateModelService.getFlintIndexStateModel( - LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext)) - .thenReturn(Optional.of(flintIndexStateModel)); - when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); - when(flintIndexStateModelService.updateFlintIndexState( - flintIndexStateModel, - FlintIndexState.VACUUMING, - DATASOURCE_NAME, - asyncQueryRequestContext)) - .thenReturn(transitionedFlintIndexStateModel); - when(transitionedFlintIndexStateModel.getLatestId()).thenReturn(LATEST_ID); - - flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID, asyncQueryRequestContext); - - verify(flintIndexStateModelService) - .deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext); - verify(flintIndexClient).deleteIndex(INDEX_NAME); - } - - @Test - public void testRunOpWithExternalScheduler() { - FlintIndexMetadata flintIndexMetadata = createFlintIndexMetadataWithExternalScheduler(); - flintIndexOpVacuum.runOp(flintIndexMetadata, flintIndexStateModel, asyncQueryRequestContext); - - verify(asyncQueryScheduler).removeJob(INDEX_NAME); - verify(flintIndexClient).deleteIndex(INDEX_NAME); - } - - @Test - public void testRunOpWithoutExternalScheduler() { - FlintIndexMetadata flintIndexMetadata = FLINT_INDEX_METADATA_WITHOUT_LATEST_ID; - flintIndexOpVacuum.runOp(flintIndexMetadata, flintIndexStateModel, asyncQueryRequestContext); - - verify(asyncQueryScheduler, never()).removeJob(INDEX_NAME); - verify(flintIndexClient).deleteIndex(INDEX_NAME); - } -} diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index 235fe84c70..f1853f2c1e 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -142,7 +142,6 @@ void testExtractionFromFlintSkippingIndexQueries() { + " WHERE elb_status_code = 500 " + " WITH (auto_refresh = true)", "DROP SKIPPING INDEX ON myS3.default.alb_logs", - "VACUUM SKIPPING INDEX ON myS3.default.alb_logs", "ALTER SKIPPING INDEX ON myS3.default.alb_logs WITH (auto_refresh = false)", }; @@ -171,7 +170,6 @@ void testExtractionFromFlintCoveringIndexQueries() { + " WHERE elb_status_code = 500 " + " WITH (auto_refresh = true)", "DROP INDEX elb_and_requestUri ON myS3.default.alb_logs", - "VACUUM INDEX elb_and_requestUri ON myS3.default.alb_logs", "ALTER INDEX elb_and_requestUri ON myS3.default.alb_logs WITH (auto_refresh = false)" }; @@ -203,9 +201,7 @@ void testExtractionFromCreateMVQuery() { @Test void testExtractionFromFlintMVQuery() { String[] mvQueries = { - "DROP MATERIALIZED VIEW mv_1", - "VACUUM MATERIALIZED VIEW mv_1", - "ALTER MATERIALIZED VIEW mv_1 WITH (auto_refresh = false)", + "DROP MATERIALIZED VIEW mv_1", "ALTER MATERIALIZED VIEW mv_1 WITH (auto_refresh = false)", }; for (String query : mvQueries) { diff --git a/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java b/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java index 6dce09a406..f791b050a1 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java @@ -24,12 +24,12 @@ public void compose( SparkSubmitParameters sparkSubmitParameters, DispatchQueryRequest dispatchQueryRequest, AsyncQueryRequestContext context) { - String externalSchedulerEnabled = + Boolean externalSchedulerEnabled = settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED); String externalSchedulerInterval = settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL); sparkSubmitParameters.setConfigItem( - FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED, externalSchedulerEnabled); + FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED, String.valueOf(externalSchedulerEnabled)); sparkSubmitParameters.setConfigItem( FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL, externalSchedulerInterval); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java deleted file mode 100644 index e62b60bfd2..0000000000 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.asyncquery; - -import static org.opensearch.sql.spark.flint.FlintIndexState.ACTIVE; -import static org.opensearch.sql.spark.flint.FlintIndexState.CREATING; -import static org.opensearch.sql.spark.flint.FlintIndexState.DELETED; -import static org.opensearch.sql.spark.flint.FlintIndexState.EMPTY; -import static org.opensearch.sql.spark.flint.FlintIndexState.REFRESHING; -import static org.opensearch.sql.spark.flint.FlintIndexState.VACUUMING; -import static org.opensearch.sql.spark.flint.FlintIndexType.COVERING; -import static org.opensearch.sql.spark.flint.FlintIndexType.MATERIALIZED_VIEW; -import static org.opensearch.sql.spark.flint.FlintIndexType.SKIPPING; - -import com.amazonaws.services.emrserverless.model.CancelJobRunResult; -import com.amazonaws.services.emrserverless.model.GetJobRunResult; -import com.amazonaws.services.emrserverless.model.JobRun; -import com.google.common.collect.Lists; -import java.util.Base64; -import java.util.List; -import java.util.function.BiConsumer; -import org.apache.commons.lang3.tuple.Pair; -import org.junit.Test; -import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.opensearch.action.delete.DeleteRequest; -import org.opensearch.action.get.GetRequest; -import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; -import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; -import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexType; -import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; -import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; -import org.opensearch.sql.spark.rest.model.LangType; - -@SuppressWarnings({"unchecked", "rawtypes"}) -public class IndexQuerySpecVacuumTest extends AsyncQueryExecutorServiceSpec { - - private static final EMRApiCall DEFAULT_OP = () -> null; - - private final List FLINT_TEST_DATASETS = - List.of( - mockDataset( - "VACUUM SKIPPING INDEX ON mys3.default.http_logs", - SKIPPING, - "flint_mys3_default_http_logs_skipping_index"), - mockDataset( - "VACUUM INDEX covering ON mys3.default.http_logs", - COVERING, - "flint_mys3_default_http_logs_covering_index"), - mockDataset( - "VACUUM MATERIALIZED VIEW mys3.default.http_logs_metrics", - MATERIALIZED_VIEW, - "flint_mys3_default_http_logs_metrics"), - mockDataset( - "VACUUM SKIPPING INDEX ON mys3.default.`test ,:\"+/\\|?#><`", - SKIPPING, - "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index") - .isSpecialCharacter(true)); - - @Test - public void shouldVacuumIndexInDeletedState() { - List> testCases = - Lists.cartesianProduct( - FLINT_TEST_DATASETS, - List.of(DELETED), - List.of( - Pair.of( - DEFAULT_OP, - () -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled"))))); - - runVacuumTestSuite( - testCases, - (mockDS, response) -> { - assertEquals("SUCCESS", response.getStatus()); - assertFalse(flintIndexExists(mockDS.indexName)); - assertFalse(indexDocExists(mockDS.latestId)); - }); - } - - @Test - public void shouldNotVacuumIndexInOtherStates() { - List> testCases = - Lists.cartesianProduct( - FLINT_TEST_DATASETS, - List.of(EMPTY, CREATING, ACTIVE, REFRESHING, VACUUMING), - List.of( - Pair.of( - () -> { - throw new AssertionError("should not call cancelJobRun"); - }, - () -> { - throw new AssertionError("should not call getJobRunResult"); - }))); - - runVacuumTestSuite( - testCases, - (mockDS, response) -> { - assertEquals("FAILED", response.getStatus()); - assertTrue(flintIndexExists(mockDS.indexName)); - assertTrue(indexDocExists(mockDS.latestId)); - }); - } - - private void runVacuumTestSuite( - List> testCases, - BiConsumer assertion) { - testCases.forEach( - params -> { - FlintDatasetMock mockDS = (FlintDatasetMock) params.get(0); - try { - FlintIndexState state = (FlintIndexState) params.get(1); - EMRApiCall cancelJobRun = ((Pair) params.get(2)).getLeft(); - EMRApiCall getJobRunResult = ((Pair) params.get(2)).getRight(); - - AsyncQueryExecutionResponse response = - runVacuumTest(mockDS, state, cancelJobRun, getJobRunResult); - assertion.accept(mockDS, response); - } finally { - // Clean up because we simulate parameterized test in single unit test method - if (flintIndexExists(mockDS.indexName)) { - mockDS.deleteIndex(); - } - if (indexDocExists(mockDS.latestId)) { - deleteIndexDoc(mockDS.latestId); - } - } - }); - } - - private AsyncQueryExecutionResponse runVacuumTest( - FlintDatasetMock mockDS, - FlintIndexState state, - EMRApiCall cancelJobRun, - EMRApiCall getJobRunResult) { - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public CancelJobRunResult cancelJobRun( - String applicationId, String jobId, boolean allowExceptionPropagation) { - if (cancelJobRun == DEFAULT_OP) { - return super.cancelJobRun(applicationId, jobId, allowExceptionPropagation); - } - return cancelJobRun.call(); - } - - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - if (getJobRunResult == DEFAULT_OP) { - return super.getJobRunResult(applicationId, jobId); - } - return getJobRunResult.call(); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = (accountId) -> emrsClient; - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrServerlessClientFactory); - - // Mock Flint index - mockDS.createIndex(); - - // Mock index state doc - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(flintIndexStateModelService, mockDS.latestId, "mys3"); - flintIndexJob.transition(state); - - // Vacuum index - CreateAsyncQueryResponse response = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, MYS3_DATASOURCE, LangType.SQL, null), - asyncQueryRequestContext); - - return asyncQueryExecutorService.getAsyncQueryResults( - response.getQueryId(), asyncQueryRequestContext); - } - - private boolean flintIndexExists(String flintIndexName) { - return client - .admin() - .indices() - .exists(new IndicesExistsRequest(flintIndexName)) - .actionGet() - .isExists(); - } - - private boolean indexDocExists(String docId) { - return client - .get(new GetRequest(OpenSearchStateStoreUtil.getIndexName("mys3"), docId)) - .actionGet() - .isExists(); - } - - private void deleteIndexDoc(String docId) { - client - .delete(new DeleteRequest(OpenSearchStateStoreUtil.getIndexName("mys3"), docId)) - .actionGet(); - } - - private FlintDatasetMock mockDataset(String query, FlintIndexType indexType, String indexName) { - FlintDatasetMock dataset = new FlintDatasetMock(query, "", indexType, indexName); - dataset.latestId(Base64.getEncoder().encodeToString(indexName.getBytes())); - return dataset; - } - - /** - * EMR API call mock interface. - * - * @param API call response type - */ - @FunctionalInterface - public interface EMRApiCall { - V call(); - } -} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java index 7836c63b7a..1556d4db3f 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java @@ -31,7 +31,7 @@ public void setUp() { @Test public void testCompose() { when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)) - .thenReturn("true"); + .thenReturn(true); when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL)) .thenReturn("10 minutes"); @@ -46,7 +46,7 @@ public void testCompose() { @Test public void testComposeWithDisabledScheduler() { when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)) - .thenReturn("false"); + .thenReturn(false); composer.compose(sparkSubmitParameters, dispatchQueryRequest, context); @@ -57,7 +57,7 @@ public void testComposeWithDisabledScheduler() { @Test public void testComposeWithMissingInterval() { when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)) - .thenReturn("true"); + .thenReturn(true); when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL)) .thenReturn(""); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexClientTest.java b/async-query/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexClientTest.java new file mode 100644 index 0000000000..d9f2e58dba --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexClientTest.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; + +@ExtendWith(MockitoExtension.class) +public class OpenSearchFlintIndexClientTest { + + @Mock(answer = RETURNS_DEEP_STUBS) + private Client client; + + @Mock private AcknowledgedResponse acknowledgedResponse; + + @InjectMocks private OpenSearchFlintIndexClient openSearchFlintIndexClient; + + @Test + public void testDeleteIndex() { + when(client.admin().indices().delete(any(DeleteIndexRequest.class)).actionGet()) + .thenReturn(acknowledgedResponse); + when(acknowledgedResponse.isAcknowledged()).thenReturn(true); + + openSearchFlintIndexClient.deleteIndex("test-index"); + verify(client.admin().indices()).delete(any(DeleteIndexRequest.class)); + verify(acknowledgedResponse).isAcknowledged(); + } +}