From a79cdbd5fe4c73edb18ec9f01668302d37e59e4a Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Fri, 30 Aug 2024 13:20:21 -0700 Subject: [PATCH 1/2] Populate indexName for BatchQuery Signed-off-by: Tomoyuki Morita --- .../spark/dispatcher/BatchQueryHandler.java | 7 + .../dispatcher/model/IndexQueryDetails.java | 20 +-- .../dispatcher/SparkQueryDispatcherTest.java | 132 +++++++++++------- .../sql/spark/utils/SQLQueryUtilsTest.java | 45 +++++- 4 files changed, 142 insertions(+), 62 deletions(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index 661ebe27fc..bddfff5538 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -106,6 +106,13 @@ public DispatchQueryResponse submit( .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) .jobType(JobType.INTERACTIVE) + .indexName(getIndexName(context)) .build(); } + + private static String getIndexName(DispatchQueryContext context) { + return context.getIndexQueryDetails() != null + ? context.getIndexQueryDetails().openSearchIndexName() + : null; + } } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java index 2ca997f6b0..74118cd30a 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java @@ -97,20 +97,24 @@ public String openSearchIndexName() { String indexName = StringUtils.EMPTY; switch (getIndexType()) { case COVERING: - indexName = - "flint_" - + fullyQualifiedTableName.toFlintName() - + "_" - + strip(getIndexName(), STRIP_CHARS) - + "_" - + getIndexType().getSuffix(); + if (getIndexName() != null) { // getIndexName will be null for SHOW INDEX query + indexName = + "flint_" + + fullyQualifiedTableName.toFlintName() + + "_" + + strip(getIndexName(), STRIP_CHARS) + + "_" + + getIndexType().getSuffix(); + } break; case SKIPPING: indexName = "flint_" + fullyQualifiedTableName.toFlintName() + "_" + getIndexType().getSuffix(); break; case MATERIALIZED_VIEW: - indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName(); + if (mvName != null) { // mvName is not available for SHOW MATERIALIZED VIEW query + indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName(); + } break; } return percentEncode(indexName).toLowerCase(); diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index ee840e8b4c..c1a2a384ae 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.dispatcher; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; @@ -198,8 +199,8 @@ void testDispatchSelectQuery() { asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -232,8 +233,8 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -257,8 +258,8 @@ void testDispatchSelectQueryCreateNewSession() { verifyNoInteractions(emrServerlessClient); verify(sessionManager, never()).getSession(any(), any()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertEquals(MOCK_SESSION_ID, dispatchQueryResponse.getSessionId()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(MOCK_SESSION_ID, dispatchQueryResponse.getSessionId()); } @Test @@ -284,8 +285,8 @@ void testDispatchSelectQueryReuseSession() { verifyNoInteractions(emrServerlessClient); verify(sessionManager, never()).createSession(any(), any()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - Assertions.assertEquals(MOCK_SESSION_ID, dispatchQueryResponse.getSessionId()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(MOCK_SESSION_ID, dispatchQueryResponse.getSessionId()); } @Test @@ -339,8 +340,8 @@ void testDispatchCreateAutoRefreshIndexQuery() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -375,8 +376,8 @@ void testDispatchCreateManualRefreshIndexQuery() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -411,8 +412,8 @@ void testDispatchWithPPLQuery() { asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -437,7 +438,7 @@ void testDispatchWithSparkUDFQuery() { sparkQueryDispatcher.dispatch( getBaseDispatchQueryRequestBuilder(query).langType(LangType.SQL).build(), asyncQueryRequestContext)); - Assertions.assertEquals( + assertEquals( "Query is not allowed: Creating user-defined functions is not allowed", illegalArgumentException.getMessage()); verifyNoInteractions(emrServerlessClient); @@ -484,8 +485,8 @@ void testInvalidSQLQueryDispatchToSpark() { asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -518,8 +519,8 @@ void testDispatchQueryWithoutATableAndDataSourceName() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -555,8 +556,8 @@ void testDispatchIndexQueryWithoutADatasourceName() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -591,8 +592,45 @@ void testDispatchMaterializedViewQuery() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + verifyNoInteractions(flintIndexMetadataService); + } + + @Test + void testManualRefreshMaterializedViewQuery() { + when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); + when(queryIdProvider.getQueryId(any(), any())).thenReturn(QUERY_ID); + HashMap tags = new HashMap<>(); + tags.put(DATASOURCE_TAG_KEY, MY_GLUE); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); + String query = + "CREATE MATERIALIZED VIEW mv_1 AS select * from logs WITH" + " (auto_refresh = false)"; + String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(query); + StartJobRequest expected = + new StartJobRequest( + "TEST_CLUSTER:batch", + null, + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + false, + "query_execution_result_my_glue"); + when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( + MY_GLUE, asyncQueryRequestContext)) + .thenReturn(dataSourceMetadata); + + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); + + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals("flint_mv_1", dispatchQueryResponse.getIndexName()); verifyNoInteractions(flintIndexMetadataService); } @@ -625,8 +663,8 @@ void testDispatchShowMVQuery() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -659,8 +697,8 @@ void testRefreshIndexQuery() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -693,8 +731,8 @@ void testDispatchDescribeIndexQuery() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -730,8 +768,8 @@ void testDispatchAlterToAutoRefreshIndexQuery() { sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); verifyNoInteractions(flintIndexMetadataService); } @@ -741,7 +779,6 @@ void testDispatchAlterToManualRefreshIndexQuery() { sparkQueryDispatcher = new SparkQueryDispatcher( dataSourceService, sessionManager, queryHandlerFactory, queryIdProvider); - String query = "ALTER INDEX elb_and_requestUri ON my_glue.default.http_logs WITH" + " (auto_refresh = false)"; @@ -758,6 +795,7 @@ void testDispatchAlterToManualRefreshIndexQuery() { flintIndexOpFactory)); sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); + verify(queryHandlerFactory, times(1)).getIndexDMLHandler(); } @@ -767,7 +805,6 @@ void testDispatchDropIndexQuery() { sparkQueryDispatcher = new SparkQueryDispatcher( dataSourceService, sessionManager, queryHandlerFactory, queryIdProvider); - String query = "DROP INDEX elb_and_requestUri ON my_glue.default.http_logs"; DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( @@ -781,7 +818,9 @@ void testDispatchDropIndexQuery() { indexDMLResultStorageService, flintIndexOpFactory)); - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); + DispatchQueryResponse response = + sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); + verify(queryHandlerFactory, times(1)).getIndexDMLHandler(); } @@ -824,7 +863,7 @@ void testDispatchWithUnSupportedDataSourceType() { getBaseDispatchQueryRequestBuilder(query).datasource("my_prometheus").build(), asyncQueryRequestContext)); - Assertions.assertEquals( + assertEquals( "UnSupported datasource type for async queries:: PROMETHEUS", unsupportedOperationException.getMessage()); } @@ -841,7 +880,7 @@ void testCancelJob() { String queryId = sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata(), asyncQueryRequestContext); - Assertions.assertEquals(QUERY_ID, queryId); + assertEquals(QUERY_ID, queryId); } @Test @@ -857,7 +896,7 @@ void testCancelQueryWithSession() { verifyNoInteractions(emrServerlessClient); verify(statement, times(1)).cancel(); - Assertions.assertEquals(MOCK_STATEMENT_ID, queryId); + assertEquals(MOCK_STATEMENT_ID, queryId); } @Test @@ -874,7 +913,7 @@ void testCancelQueryWithInvalidSession() { verifyNoInteractions(emrServerlessClient); verifyNoInteractions(session); - Assertions.assertEquals("no session found. invalid", exception.getMessage()); + assertEquals("no session found. invalid", exception.getMessage()); } @Test @@ -891,8 +930,7 @@ void testCancelQueryWithInvalidStatementId() { verifyNoInteractions(emrServerlessClient); verifyNoInteractions(statement); - Assertions.assertEquals( - "no statement found. " + new StatementId("invalid"), exception.getMessage()); + assertEquals("no statement found. " + new StatementId("invalid"), exception.getMessage()); } @Test @@ -907,7 +945,7 @@ void testCancelQueryWithNoSessionId() { String queryId = sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata(), asyncQueryRequestContext); - Assertions.assertEquals(QUERY_ID, queryId); + assertEquals(QUERY_ID, queryId); } @Test @@ -921,7 +959,7 @@ void testGetQueryResponse() { JSONObject result = sparkQueryDispatcher.getQueryResponse(asyncQueryJobMetadata()); - Assertions.assertEquals("PENDING", result.get("status")); + assertEquals("PENDING", result.get("status")); } @Test @@ -939,7 +977,7 @@ void testGetQueryResponseWithSession() { asyncQueryJobMetadataWithSessionId(MOCK_STATEMENT_ID, MOCK_SESSION_ID)); verifyNoInteractions(emrServerlessClient); - Assertions.assertEquals("waiting", result.get("status")); + assertEquals("waiting", result.get("status")); } @Test @@ -957,7 +995,7 @@ void testGetQueryResponseWithInvalidSession() { asyncQueryJobMetadataWithSessionId(MOCK_STATEMENT_ID, MOCK_SESSION_ID))); verifyNoInteractions(emrServerlessClient); - Assertions.assertEquals("no session found. " + MOCK_SESSION_ID, exception.getMessage()); + assertEquals("no session found. " + MOCK_SESSION_ID, exception.getMessage()); } @Test @@ -976,7 +1014,7 @@ void testGetQueryResponseWithStatementNotExist() { asyncQueryJobMetadataWithSessionId(MOCK_STATEMENT_ID, MOCK_SESSION_ID))); verifyNoInteractions(emrServerlessClient); - Assertions.assertEquals( + assertEquals( "no statement found. " + new StatementId(MOCK_STATEMENT_ID), exception.getMessage()); } @@ -992,7 +1030,7 @@ void testGetQueryResponseWithSuccess() { JSONObject result = sparkQueryDispatcher.getQueryResponse(asyncQueryJobMetadata()); verify(jobExecutionResponseReader, times(1)).getResultWithJobId(EMR_JOB_ID, null); - Assertions.assertEquals( + assertEquals( new HashSet<>(Arrays.asList(DATA_FIELD, STATUS_FIELD, ERROR_FIELD)), result.keySet()); JSONObject dataJson = new JSONObject(); dataJson.put(ERROR_FIELD, ""); @@ -1003,7 +1041,7 @@ void testGetQueryResponseWithSuccess() { // the same order. // We need similar. Assertions.assertTrue(dataJson.similar(result.get(DATA_FIELD))); - Assertions.assertEquals("SUCCESS", result.get(STATUS_FIELD)); + assertEquals("SUCCESS", result.get(STATUS_FIELD)); verifyNoInteractions(emrServerlessClient); } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index fe7777606c..b46f6f6de8 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -149,6 +149,8 @@ void testExtractionFromFlintSkippingIndexQueries() { assertNull(indexQueryDetails.getIndexName()); assertFullyQualifiedTableName("myS3", "default", "alb_logs", fullyQualifiedTableName); + assertEquals( + "flint_mys3_default_alb_logs_skipping_index", indexQueryDetails.openSearchIndexName()); } } @@ -178,6 +180,9 @@ void testExtractionFromFlintCoveringIndexQueries() { assertEquals("elb_and_requestUri", indexQueryDetails.getIndexName()); assertFullyQualifiedTableName("myS3", "default", "alb_logs", fullyQualifiedTableName); + assertEquals( + "flint_mys3_default_alb_logs_elb_and_requesturi_index", + indexQueryDetails.openSearchIndexName()); } } @@ -192,6 +197,7 @@ void testExtractionFromCreateMVQuery() { assertNull(indexQueryDetails.getFullyQualifiedTableName()); assertEquals(mvQuery, indexQueryDetails.getMvQuery()); assertEquals("mv_1", indexQueryDetails.getMvName()); + assertEquals("flint_mv_1", indexQueryDetails.openSearchIndexName()); } @Test @@ -213,61 +219,86 @@ void testExtractionFromFlintMVQuery() { assertNull(fullyQualifiedTableName); assertNull(indexQueryDetails.getMvQuery()); assertEquals("mv_1", indexQueryDetails.getMvName()); + assertEquals("flint_mv_1", indexQueryDetails.openSearchIndexName()); } } @Test void testDescSkippingIndex() { String descSkippingIndex = "DESC SKIPPING INDEX ON mys3.default.http_logs"; + assertTrue(SQLQueryUtils.isFlintExtensionQuery(descSkippingIndex)); IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(descSkippingIndex); FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + assertNull(indexDetails.getIndexName()); assertNotNull(fullyQualifiedTableName); assertEquals(FlintIndexType.SKIPPING, indexDetails.getIndexType()); assertEquals(IndexQueryActionType.DESCRIBE, indexDetails.getIndexQueryActionType()); + assertEquals("flint_mys3_default_http_logs_skipping_index", indexDetails.openSearchIndexName()); + } + @Test + void testDescCoveringIndex() { String descCoveringIndex = "DESC INDEX cv1 ON mys3.default.http_logs"; + assertTrue(SQLQueryUtils.isFlintExtensionQuery(descCoveringIndex)); - indexDetails = SQLQueryUtils.extractIndexDetails(descCoveringIndex); - fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(descCoveringIndex); + FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + assertEquals("cv1", indexDetails.getIndexName()); assertNotNull(fullyQualifiedTableName); assertEquals(FlintIndexType.COVERING, indexDetails.getIndexType()); assertEquals(IndexQueryActionType.DESCRIBE, indexDetails.getIndexQueryActionType()); + assertEquals("flint_mys3_default_http_logs_cv1_index", indexDetails.openSearchIndexName()); + } + @Test + void testDescMaterializedView() { String descMv = "DESC MATERIALIZED VIEW mv1"; + assertTrue(SQLQueryUtils.isFlintExtensionQuery(descMv)); - indexDetails = SQLQueryUtils.extractIndexDetails(descMv); - fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(descMv); + FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + assertNull(indexDetails.getIndexName()); assertEquals("mv1", indexDetails.getMvName()); assertNull(fullyQualifiedTableName); assertEquals(FlintIndexType.MATERIALIZED_VIEW, indexDetails.getIndexType()); assertEquals(IndexQueryActionType.DESCRIBE, indexDetails.getIndexQueryActionType()); + assertEquals("flint_mv1", indexDetails.openSearchIndexName()); } @Test void testShowIndex() { - String showCoveringIndex = " SHOW INDEX ON myS3.default.http_logs"; + String showCoveringIndex = "SHOW INDEX ON myS3.default.http_logs"; + assertTrue(SQLQueryUtils.isFlintExtensionQuery(showCoveringIndex)); IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(showCoveringIndex); FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + assertNull(indexDetails.getIndexName()); assertNull(indexDetails.getMvName()); assertNotNull(fullyQualifiedTableName); assertEquals(FlintIndexType.COVERING, indexDetails.getIndexType()); assertEquals(IndexQueryActionType.SHOW, indexDetails.getIndexQueryActionType()); + assertEquals("", indexDetails.openSearchIndexName()); + } + @Test + void testShowMaterializedView() { String showMV = "SHOW MATERIALIZED VIEW IN my_glue.default"; + assertTrue(SQLQueryUtils.isFlintExtensionQuery(showMV)); - indexDetails = SQLQueryUtils.extractIndexDetails(showMV); - fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(showMV); + FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + assertNull(indexDetails.getIndexName()); assertNull(indexDetails.getMvName()); assertNull(fullyQualifiedTableName); assertEquals(FlintIndexType.MATERIALIZED_VIEW, indexDetails.getIndexType()); assertEquals(IndexQueryActionType.SHOW, indexDetails.getIndexQueryActionType()); + assertEquals("", indexDetails.openSearchIndexName()); } @Test From 6f440129eed00c2806da53961f67158f56cdf0b9 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Fri, 6 Sep 2024 11:17:17 -0700 Subject: [PATCH 2/2] Fix test failure due to rebase Signed-off-by: Tomoyuki Morita --- .../sql/spark/dispatcher/model/IndexQueryDetails.java | 7 +++++++ .../sql/spark/dispatcher/SparkQueryDispatcherTest.java | 3 ++- .../org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java | 4 ++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java index 74118cd30a..50ce95ffe0 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java @@ -93,6 +93,9 @@ public IndexQueryDetails build() { } public String openSearchIndexName() { + if (getIndexType() == null) { + return null; + } FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName(); String indexName = StringUtils.EMPTY; switch (getIndexType()) { @@ -105,6 +108,8 @@ public String openSearchIndexName() { + strip(getIndexName(), STRIP_CHARS) + "_" + getIndexType().getSuffix(); + } else { + return null; } break; case SKIPPING: @@ -114,6 +119,8 @@ public String openSearchIndexName() { case MATERIALIZED_VIEW: if (mvName != null) { // mvName is not available for SHOW MATERIALIZED VIEW query indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName(); + } else { + return null; } break; } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 0666756d15..9f12ddf323 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -452,7 +452,8 @@ void testManualRefreshMaterializedViewQuery() { tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "CREATE MATERIALIZED VIEW mv_1 AS select * from logs WITH" + " (auto_refresh = false)"; - String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(query); + String sparkSubmitParameters = + constructExpectedSparkSubmitParameterString(query, null, QUERY_ID); StartJobRequest expected = new StartJobRequest( "TEST_CLUSTER:batch", diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index 0680ba9c10..4608bce74e 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -284,7 +284,7 @@ void testShowIndex() { assertNotNull(fullyQualifiedTableName); assertEquals(FlintIndexType.COVERING, indexDetails.getIndexType()); assertEquals(IndexQueryActionType.SHOW, indexDetails.getIndexQueryActionType()); - assertEquals("", indexDetails.openSearchIndexName()); + assertNull(indexDetails.openSearchIndexName()); } @Test @@ -300,7 +300,7 @@ void testShowMaterializedView() { assertNull(fullyQualifiedTableName); assertEquals(FlintIndexType.MATERIALIZED_VIEW, indexDetails.getIndexType()); assertEquals(IndexQueryActionType.SHOW, indexDetails.getIndexQueryActionType()); - assertEquals("", indexDetails.openSearchIndexName()); + assertNull(indexDetails.openSearchIndexName()); } @Test