From 1b1a1b5e97def94504a75ffaedbd5d72ffc9614a Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Mon, 9 Sep 2024 10:22:58 -0700 Subject: [PATCH] Add query, langType, status, error in AsyncQueryJobMetadata (#2958) * Add query, langType, status, error in AsyncQueryJobMetadata Signed-off-by: Tomoyuki Morita * Fix test Signed-off-by: Tomoyuki Morita --------- Signed-off-by: Tomoyuki Morita Signed-off-by: Tomoyuki MORITA --- .../AsyncQueryExecutorServiceImpl.java | 4 ++ .../model/AsyncQueryJobMetadata.java | 9 ++++ .../spark/asyncquery/model/QueryState.java | 41 +++++++++++++++++++ .../spark/dispatcher/BatchQueryHandler.java | 2 + .../sql/spark/dispatcher/IndexDMLHandler.java | 4 ++ .../dispatcher/InteractiveQueryHandler.java | 2 + .../spark/dispatcher/RefreshQueryHandler.java | 2 + .../dispatcher/StreamingQueryHandler.java | 2 + .../model/DispatchQueryResponse.java | 3 ++ .../asyncquery/AsyncQueryCoreIntegTest.java | 24 ++++++----- .../AsyncQueryExecutorServiceImplTest.java | 15 +++---- .../asyncquery/model/QueryStateTest.java | 28 +++++++++++++ 12 files changed, 119 insertions(+), 17 deletions(-) create mode 100644 async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/model/QueryState.java create mode 100644 async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/model/QueryStateTest.java diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java b/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java index 5933343ba4..0e9e128896 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java @@ -67,6 +67,10 @@ public CreateAsyncQueryResponse createAsyncQuery( .datasourceName(dispatchQueryResponse.getDatasourceName()) .jobType(dispatchQueryResponse.getJobType()) .indexName(dispatchQueryResponse.getIndexName()) + .query(createAsyncQueryRequest.getQuery()) + .langType(createAsyncQueryRequest.getLang()) + .state(dispatchQueryResponse.getStatus()) + .error(dispatchQueryResponse.getError()) .build(), asyncQueryRequestContext); return new CreateAsyncQueryResponse( diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java b/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java index 1cfab4832d..46aa8ac898 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/model/AsyncQueryJobMetadata.java @@ -12,6 +12,7 @@ import lombok.experimental.SuperBuilder; import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.statestore.StateModel; +import org.opensearch.sql.spark.rest.model.LangType; import org.opensearch.sql.utils.SerializeUtils; /** This class models all the metadata required for a job. */ @@ -35,6 +36,10 @@ public class AsyncQueryJobMetadata extends StateModel { private final String datasourceName; // null if JobType is INTERACTIVE or null private final String indexName; + private final String query; + private final LangType langType; + private final QueryState state; + private final String error; @Override public String toString() { @@ -54,6 +59,10 @@ public static AsyncQueryJobMetadata copy( .datasourceName(copy.datasourceName) .jobType(copy.jobType) .indexName(copy.indexName) + .query(copy.query) + .langType(copy.langType) + .state(copy.state) + .error(copy.error) .metadata(metadata) .build(); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/model/QueryState.java b/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/model/QueryState.java new file mode 100644 index 0000000000..62bceb6637 --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/model/QueryState.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery.model; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Getter; + +@Getter +public enum QueryState { + WAITING("waiting"), + RUNNING("running"), + SUCCESS("success"), + FAILED("failed"), + TIMEOUT("timeout"), + CANCELLED("cancelled"); + + private final String state; + + QueryState(String state) { + this.state = state; + } + + private static final Map STATES = + Arrays.stream(QueryState.values()) + .collect(Collectors.toMap(t -> t.name().toLowerCase(), t -> t)); + + public static QueryState fromString(String key) { + for (QueryState ss : QueryState.values()) { + if (ss.getState().toLowerCase(Locale.ROOT).equals(key)) { + return ss; + } + } + throw new IllegalArgumentException("Invalid query state: " + key); + } +} diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index 33d78b174e..5a775aa243 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -17,6 +17,7 @@ import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.asyncquery.model.QueryState; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; @@ -111,6 +112,7 @@ public DispatchQueryResponse submit( .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) .jobType(JobType.BATCH) + .status(QueryState.WAITING) .indexName(getIndexName(context)) .build(); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index 7211da0941..fe848593a7 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -18,6 +18,7 @@ import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.asyncquery.model.QueryState; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; @@ -83,6 +84,7 @@ public DispatchQueryResponse submit( .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) .jobType(JobType.BATCH) + .status(QueryState.SUCCESS) .build(); } catch (Exception e) { LOG.error(e.getMessage()); @@ -101,6 +103,8 @@ public DispatchQueryResponse submit( .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) .jobType(JobType.BATCH) + .status(QueryState.FAILED) + .error(e.getMessage()) .build(); } } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index 7be6809912..75912f3a7c 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -17,6 +17,7 @@ import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.asyncquery.model.QueryState; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; @@ -151,6 +152,7 @@ public DispatchQueryResponse submit( .sessionId(session.getSessionId()) .datasourceName(dataSourceMetadata.getName()) .jobType(JobType.INTERACTIVE) + .status(QueryState.WAITING) .build(); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java index cf5a0c6c59..52cd863081 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -9,6 +9,7 @@ import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.asyncquery.model.QueryState; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; @@ -85,6 +86,7 @@ public DispatchQueryResponse submit( .datasourceName(dataSourceMetadata.getName()) .jobType(JobType.REFRESH) .indexName(context.getIndexQueryDetails().openSearchIndexName()) + .status(QueryState.WAITING) .build(); } } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 51e245b57c..58fb5244b4 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -13,6 +13,7 @@ import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.asyncquery.model.QueryState; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; @@ -102,6 +103,7 @@ public DispatchQueryResponse submit( .datasourceName(dataSourceMetadata.getName()) .jobType(JobType.STREAMING) .indexName(indexQueryDetails.openSearchIndexName()) + .status(QueryState.WAITING) .build(); } } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java index b97d9fd7b0..c484236d6e 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/DispatchQueryResponse.java @@ -2,6 +2,7 @@ import lombok.Builder; import lombok.Getter; +import org.opensearch.sql.spark.asyncquery.model.QueryState; @Getter @Builder @@ -13,4 +14,6 @@ public class DispatchQueryResponse { private final String datasourceName; private final JobType jobType; private final String indexName; + private final QueryState status; + private final String error; } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index 1214935dc6..52d805dd01 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -46,6 +46,7 @@ import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata.AsyncQueryJobMetadataBuilder; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.asyncquery.model.QueryState; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.client.EmrServerlessClientImpl; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; @@ -205,7 +206,7 @@ public void createDropIndexQuery() { verifyGetQueryIdCalled(); verifyCancelJobRunCalled(); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH); } @Test @@ -227,7 +228,7 @@ public void createDropIndexQueryWithScheduler() { assertNull(response.getSessionId()); verifyGetQueryIdCalled(); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH); verify(asyncQueryScheduler).unscheduleJob(indexName); } @@ -255,7 +256,7 @@ public void createVacuumIndexQuery() { verifyGetSessionIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID, JobType.INTERACTIVE); + verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.INTERACTIVE); } @Test @@ -286,7 +287,7 @@ public void createAlterIndexQuery() { assertFalse(flintIndexOptions.autoRefresh()); verifyCancelJobRunCalled(); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH); } @Test @@ -320,7 +321,7 @@ public void createAlterIndexQueryWithScheduler() { verify(asyncQueryScheduler).unscheduleJob(indexName); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH); } @Test @@ -345,7 +346,7 @@ public void createStreamingQuery() { verifyGetQueryIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID, JobType.STREAMING); + verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.STREAMING); } private void verifyStartJobRunCalled() { @@ -380,7 +381,7 @@ public void createCreateIndexQuery() { assertNull(response.getSessionId()); verifyGetQueryIdCalled(); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID, JobType.BATCH); + verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.BATCH); } @Test @@ -402,7 +403,7 @@ public void createRefreshQuery() { verifyGetQueryIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID, JobType.REFRESH); + verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.REFRESH); } @Test @@ -428,7 +429,7 @@ public void createInteractiveQuery() { verifyGetSessionIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID, JobType.INTERACTIVE); + verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.INTERACTIVE); } @Test @@ -644,7 +645,7 @@ private void verifyGetSessionIdCalled() { assertEquals(APPLICATION_ID, createSessionRequest.getApplicationId()); } - private void verifyStoreJobMetadataCalled(String jobId, JobType jobType) { + private void verifyStoreJobMetadataCalled(String jobId, QueryState state, JobType jobType) { verify(asyncQueryJobMetadataStorageService) .storeJobMetadata( asyncQueryJobMetadataArgumentCaptor.capture(), eq(asyncQueryRequestContext)); @@ -652,6 +653,9 @@ private void verifyStoreJobMetadataCalled(String jobId, JobType jobType) { assertEquals(QUERY_ID, asyncQueryJobMetadata.getQueryId()); assertEquals(jobId, asyncQueryJobMetadata.getJobId()); assertEquals(DATASOURCE_NAME, asyncQueryJobMetadata.getDatasourceName()); + assertNull(asyncQueryJobMetadata.getError()); + assertEquals(LangType.SQL, asyncQueryJobMetadata.getLangType()); + assertEquals(state, asyncQueryJobMetadata.getState()); assertEquals(jobType, asyncQueryJobMetadata.getJobType()); } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java index 1491f0bd61..73850db83d 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java @@ -47,6 +47,9 @@ @ExtendWith(MockitoExtension.class) public class AsyncQueryExecutorServiceImplTest { + private static final String QUERY = "select * from my_glue.default.http_logs"; + private static final String QUERY_ID = "QUERY_ID"; + @Mock private SparkQueryDispatcher sparkQueryDispatcher; @Mock private AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService; private AsyncQueryExecutorService jobExecutorService; @@ -54,7 +57,6 @@ public class AsyncQueryExecutorServiceImplTest { @Mock private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier; @Mock private SparkSubmitParameterModifier sparkSubmitParameterModifier; @Mock private AsyncQueryRequestContext asyncQueryRequestContext; - private final String QUERY_ID = "QUERY_ID"; @BeforeEach void setUp() { @@ -68,8 +70,7 @@ void setUp() { @Test void testCreateAsyncQuery() { CreateAsyncQueryRequest createAsyncQueryRequest = - new CreateAsyncQueryRequest( - "select * from my_glue.default.http_logs", "my_glue", LangType.SQL); + new CreateAsyncQueryRequest(QUERY, "my_glue", LangType.SQL); when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any())) .thenReturn( SparkExecutionEngineConfig.builder() @@ -82,7 +83,7 @@ void testCreateAsyncQuery() { DispatchQueryRequest expectedDispatchQueryRequest = DispatchQueryRequest.builder() .applicationId(EMRS_APPLICATION_ID) - .query("select * from my_glue.default.http_logs") + .query(QUERY) .datasource("my_glue") .langType(LangType.SQL) .executionRoleARN(EMRS_EXECUTION_ROLE) @@ -134,9 +135,7 @@ void testCreateAsyncQueryWithExtraSparkSubmitParameter() { .build()); jobExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - "select * from my_glue.default.http_logs", "my_glue", LangType.SQL), - asyncQueryRequestContext); + new CreateAsyncQueryRequest(QUERY, "my_glue", LangType.SQL), asyncQueryRequestContext); verify(sparkQueryDispatcher, times(1)) .dispatch( @@ -237,6 +236,8 @@ private AsyncQueryJobMetadata getAsyncQueryJobMetadata() { .queryId(QUERY_ID) .applicationId(EMRS_APPLICATION_ID) .jobId(EMR_JOB_ID) + .query(QUERY) + .langType(LangType.SQL) .build(); } } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/model/QueryStateTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/model/QueryStateTest.java new file mode 100644 index 0000000000..8e86e3b176 --- /dev/null +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/model/QueryStateTest.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery.model; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +class QueryStateTest { + @Test + public void testFromString() { + assertEquals(QueryState.WAITING, QueryState.fromString("waiting")); + assertEquals(QueryState.RUNNING, QueryState.fromString("running")); + assertEquals(QueryState.SUCCESS, QueryState.fromString("success")); + assertEquals(QueryState.FAILED, QueryState.fromString("failed")); + assertEquals(QueryState.CANCELLED, QueryState.fromString("cancelled")); + assertEquals(QueryState.TIMEOUT, QueryState.fromString("timeout")); + } + + @Test + public void testFromStringWithUnknownState() { + assertThrows(IllegalArgumentException.class, () -> QueryState.fromString("UNKNOWN_STATE")); + } +}