From 2024867f03cd90e14285e95be30792bb5fbb17a5 Mon Sep 17 00:00:00 2001 From: AMIT YADAV Date: Wed, 9 Oct 2024 16:15:51 +0530 Subject: [PATCH] Updating JobExecutionResponseReader interface to add RequestContext --- .../response/JobExecutionResponseReader.java | 1 + .../OpenSearchJobExecutionResponseReader.java | 11 ++++++-- .../AsyncQueryGetResultSpecTest.java | 21 +++++++++++++- ...nSearchJobExecutionResponseReaderTest.java | 28 +++++++++++++++---- 4 files changed, 51 insertions(+), 10 deletions(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java b/async-query-core/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java index 9ded9bec9b..237ce9c7f6 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java @@ -16,6 +16,7 @@ public interface JobExecutionResponseReader { * * @param asyncQueryJobMetadata metadata will have jobId and resultLocation and other required * params. + * @param asyncQueryRequestContext request context passed to AsyncQueryExecutorService * @return A JSONObject containing the result data. */ JSONObject getResultFromResultIndex( diff --git a/async-query/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java b/async-query/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java index 537165a026..c969a3a6dc 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReader.java @@ -34,12 +34,17 @@ public OpenSearchJobExecutionResponseReader(Client client) { } @Override - public JSONObject getResultFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata, AsyncQueryRequestContext asyncQueryRequestContext) { - return searchInSparkIndex(QueryBuilders.termQuery(JOB_ID_FIELD, asyncQueryJobMetadata.getJobId()), asyncQueryJobMetadata.getResultIndex()); + public JSONObject getResultFromResultIndex( + AsyncQueryJobMetadata asyncQueryJobMetadata, + AsyncQueryRequestContext asyncQueryRequestContext) { + return searchInSparkIndex( + QueryBuilders.termQuery(JOB_ID_FIELD, asyncQueryJobMetadata.getJobId()), + asyncQueryJobMetadata.getResultIndex()); } @Override - public JSONObject getResultWithQueryId(String queryId, String resultLocation, AsyncQueryRequestContext asyncQueryRequestContext) { + public JSONObject getResultWithQueryId( + String queryId, String resultLocation, AsyncQueryRequestContext asyncQueryRequestContext) { return searchInSparkIndex(QueryBuilders.termQuery("queryId", queryId), resultLocation); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index b094b33054..7d6bd3f016 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -429,6 +429,7 @@ private class AssertionHelper { */ new JobExecutionResponseReader() { @Override +<<<<<<< HEAD public JSONObject getResultFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata, AsyncQueryRequestContext asyncQueryRequestContext) { return interaction.interact(new InteractionStep(emrClient, asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex())); } @@ -436,6 +437,24 @@ public JSONObject getResultFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMe @Override public JSONObject getResultWithQueryId(String queryId, String resultIndex, AsyncQueryRequestContext asyncQueryRequestContext) { return interaction.interact(new InteractionStep(emrClient, queryId, resultIndex)); +======= + public JSONObject getResultFromResultIndex( + AsyncQueryJobMetadata asyncQueryJobMetadata, + AsyncQueryRequestContext asyncQueryRequestContext) { + return interaction.interact( + new InteractionStep( + emrClient, + asyncQueryJobMetadata.getJobId(), + asyncQueryJobMetadata.getResultIndex())); + } + + @Override + public JSONObject getResultWithQueryId( + String queryId, + String resultIndex, + AsyncQueryRequestContext asyncQueryRequestContext) { + return interaction.interact(new InteractionStep(emrClient, queryId, resultIndex)); +>>>>>>> 0a112a13 (Updating JobExecutionResponseReader interface to add RequestContext) } }); this.createQueryResponse = @@ -502,7 +521,7 @@ private InteractionStep(LocalEMRSClient emrClient, String queryId, String result /** Simulate PPL plugin search query_execution_result */ JSONObject pluginSearchQueryResult() { return new OpenSearchJobExecutionResponseReader(client) - .getResultWithQueryId(queryId, resultIndex, null); + .getResultWithQueryId(queryId, resultIndex, null); } /** Simulate EMR-S bulk writes query_execution_result with refresh = wait_for */ diff --git a/async-query/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java b/async-query/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java index c0ee122948..0791d0c51c 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/response/OpenSearchJobExecutionResponseReaderTest.java @@ -51,7 +51,11 @@ public void testGetResultFromOpensearchIndex() { new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F)); Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID)); - assertFalse(jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null).isEmpty()); + assertFalse( + jobExecutionResponseReader + .getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null) + .isEmpty()); } @Test @@ -65,7 +69,12 @@ public void testGetResultFromCustomIndex() { new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F)); Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID)); - assertFalse(jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null).isEmpty()); + + assertFalse( + jobExecutionResponseReader + .getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null) + .isEmpty()); } @Test @@ -77,7 +86,9 @@ public void testInvalidSearchResponse() { RuntimeException exception = assertThrows( RuntimeException.class, - () -> jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null)); + () -> + jobExecutionResponseReader.getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null)); Assertions.assertEquals( "Fetching result from " @@ -93,13 +104,18 @@ public void testSearchFailure() { assertThrows( RuntimeException.class, - () -> jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null)); + () -> + jobExecutionResponseReader.getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null)); } @Test public void testIndexNotFoundException() { when(client.search(any())).thenThrow(IndexNotFoundException.class); - - assertTrue(jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null).isEmpty()); + assertTrue( + jobExecutionResponseReader + .getResultFromResultIndex( + AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null) + .isEmpty()); } }