diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java b/async-query-core/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java index 9ded9bec9b..237ce9c7f6 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java @@ -16,6 +16,7 @@ public interface JobExecutionResponseReader { * * @param asyncQueryJobMetadata metadata will have jobId and resultLocation and other required * params. + * @param asyncQueryRequestContext request context passed to AsyncQueryExecutorService * @return A JSONObject containing the result data. */ JSONObject getResultFromResultIndex( diff --git a/async-query/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java b/async-query/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java index 537165a026..c969a3a6dc 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java @@ -34,12 +34,17 @@ public OpenSearchJobExecutionResponseReader(Client client) { } @Override - public JSONObject getResultFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata, AsyncQueryRequestContext asyncQueryRequestContext) { - return searchInSparkIndex(QueryBuilders.termQuery(JOB_ID_FIELD, asyncQueryJobMetadata.getJobId()), asyncQueryJobMetadata.getResultIndex()); + public JSONObject getResultFromResultIndex( + AsyncQueryJobMetadata asyncQueryJobMetadata, + AsyncQueryRequestContext asyncQueryRequestContext) { + return searchInSparkIndex( + QueryBuilders.termQuery(JOB_ID_FIELD, asyncQueryJobMetadata.getJobId()), + asyncQueryJobMetadata.getResultIndex()); } @Override - public JSONObject getResultWithQueryId(String queryId, String resultLocation, AsyncQueryRequestContext asyncQueryRequestContext) { + public JSONObject getResultWithQueryId( + String queryId, String resultLocation, AsyncQueryRequestContext asyncQueryRequestContext) { return searchInSparkIndex(QueryBuilders.termQuery("queryId", queryId), resultLocation); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index b094b33054..ef98e955f6 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -429,13 +429,22 @@ private class AssertionHelper { */ new JobExecutionResponseReader() { @Override - public JSONObject getResultFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata, AsyncQueryRequestContext asyncQueryRequestContext) { - return interaction.interact(new InteractionStep(emrClient, asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex())); + public JSONObject getResultFromResultIndex( + AsyncQueryJobMetadata asyncQueryJobMetadata, + AsyncQueryRequestContext asyncQueryRequestContext) { + return interaction.interact( + new InteractionStep( + emrClient, + asyncQueryJobMetadata.getJobId(), + asyncQueryJobMetadata.getResultIndex())); } @Override - public JSONObject getResultWithQueryId(String queryId, String resultIndex, AsyncQueryRequestContext asyncQueryRequestContext) { - return interaction.interact(new InteractionStep(emrClient, queryId, resultIndex)); + public JSONObject getResultWithQueryId( + String queryId, + String resultIndex, + AsyncQueryRequestContext asyncQueryRequestContext) { + return interaction.interact(new InteractionStep(emrClient, queryId, resultIndex)); } }); this.createQueryResponse = @@ -502,7 +511,7 @@ private InteractionStep(LocalEMRSClient emrClient, String queryId, String result /** Simulate PPL plugin search query_execution_result */ JSONObject pluginSearchQueryResult() { return new OpenSearchJobExecutionResponseReader(client) - .getResultWithQueryId(queryId, resultIndex, null); + .getResultWithQueryId(queryId, resultIndex, null); } /** Simulate EMR-S bulk writes query_execution_result with refresh = wait_for */ diff --git a/async-query/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java b/async-query/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java index c0ee122948..4de3a56dd9 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java @@ -51,7 +51,11 @@ public void testGetResultFromOpensearchIndex() { new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F)); Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID)); - assertFalse(jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null).isEmpty()); + assertFalse( + jobExecutionResponseReader + .getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null) + .isEmpty()); } @Test @@ -65,7 +69,11 @@ public void testGetResultFromCustomIndex() { new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F)); Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID)); - assertFalse(jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null).isEmpty()); + assertFalse( + jobExecutionResponseReader + .getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null) + .isEmpty()); } @Test @@ -77,7 +85,9 @@ public void testInvalidSearchResponse() { RuntimeException exception = assertThrows( RuntimeException.class, - () -> jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null)); + () -> + jobExecutionResponseReader.getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null)); Assertions.assertEquals( "Fetching result from " @@ -93,13 +103,18 @@ public void testSearchFailure() { assertThrows( RuntimeException.class, - () -> jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null)); + () -> + jobExecutionResponseReader.getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null)); } @Test public void testIndexNotFoundException() { when(client.search(any())).thenThrow(IndexNotFoundException.class); - - assertTrue(jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null).isEmpty()); + assertTrue( + jobExecutionResponseReader + .getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null) + .isEmpty()); } }