From b8911d62e62e2daab656a89fd75710e8d6b820f8 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Thu, 5 Sep 2024 12:44:42 -0700 Subject: [PATCH] Throw exception for RECOVER INDEX JOB query Signed-off-by: Tomoyuki Morita --- .../sql/spark/dispatcher/SparkQueryDispatcher.java | 3 +++ .../spark/dispatcher/model/IndexQueryActionType.java | 3 ++- .../org/opensearch/sql/spark/utils/SQLQueryUtils.java | 7 +++++++ .../sql/spark/dispatcher/SparkQueryDispatcherTest.java | 10 ++++++++++ .../opensearch/sql/spark/utils/SQLQueryUtilsTest.java | 9 +++++++++ 5 files changed, 31 insertions(+), 1 deletion(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 4df2b5450d..8478dbfaa7 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -119,6 +119,9 @@ private AsyncQueryHandler getQueryHandlerForFlintExtensionQuery( } else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) { // Manual refresh should be handled by batch handler return queryHandlerFactory.getRefreshQueryHandler(dispatchQueryRequest.getAccountId()); + } else if (IndexQueryActionType.RECOVER.equals(indexQueryDetails.getIndexQueryActionType())) { + // RECOVER INDEX JOB should not be executed from async-query-core + throw new IllegalArgumentException("RECOVER INDEX JOB is not allowed."); } else { return getDefaultAsyncQueryHandler(dispatchQueryRequest.getAccountId()); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java index 96e7d159af..51e0832217 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java @@ -13,5 +13,6 @@ public enum IndexQueryActionType { SHOW, DROP, VACUUM, - ALTER + ALTER, + RECOVER } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index ce3bcab06b..a67d1a8bc6 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -26,6 +26,7 @@ import org.opensearch.sql.spark.antlr.parser.FlintSparkSqlExtensionsLexer; import org.opensearch.sql.spark.antlr.parser.FlintSparkSqlExtensionsParser; import org.opensearch.sql.spark.antlr.parser.FlintSparkSqlExtensionsParser.MaterializedViewQueryContext; +import org.opensearch.sql.spark.antlr.parser.FlintSparkSqlExtensionsParser.RecoverIndexJobStatementContext; import org.opensearch.sql.spark.antlr.parser.SqlBaseLexer; import org.opensearch.sql.spark.antlr.parser.SqlBaseParser; import org.opensearch.sql.spark.antlr.parser.SqlBaseParser.IdentifierReferenceContext; @@ -411,6 +412,12 @@ public Void visitMaterializedViewQuery(MaterializedViewQueryContext ctx) { return super.visitMaterializedViewQuery(ctx); } + @Override + public Void visitRecoverIndexJobStatement(RecoverIndexJobStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.RECOVER); + return super.visitRecoverIndexJobStatement(ctx); + } + private String propertyKey(FlintSparkSqlExtensionsParser.PropertyKeyContext key) { if (key.STRING() != null) { return key.STRING().getText(); diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 5154b71574..81329838e5 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -839,6 +839,16 @@ void testDispatchVacuumIndexQuery() { verify(queryHandlerFactory, times(1)).getIndexDMLHandler(); } + @Test + void testDispatchRecoverIndexQuery() { + String query = "RECOVER INDEX JOB `flint_spark_catalog_default_test_skipping_index`"; + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + sparkQueryDispatcher.dispatch( + getBaseDispatchQueryRequest(query), asyncQueryRequestContext)); + } + @Test void testDispatchWithUnSupportedDataSourceType() { when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index 235fe84c70..4e32435e7d 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -408,6 +408,15 @@ void testAutoRefresh() { .autoRefresh()); } + @Test + void testRecoverIndex() { + String refreshSkippingIndex = + "RECOVER INDEX JOB `flint_spark_catalog_default_test_skipping_index`"; + assertTrue(SQLQueryUtils.isFlintExtensionQuery(refreshSkippingIndex)); + IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(refreshSkippingIndex); + assertEquals(IndexQueryActionType.RECOVER, indexDetails.getIndexQueryActionType()); + } + @Test void testValidateSparkSqlQuery_ValidQuery() { List errors =