Skip to content

Commit

Permalink
Updating JobExecutionResponseReader interface to add RequestContext
Browse files Browse the repository at this point in the history
Signed-off-by: AMIT YADAV <[email protected]>
  • Loading branch information
AMIT YADAV committed Oct 13, 2024
1 parent 2683640 commit fccc556
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public interface JobExecutionResponseReader {
*
* @param asyncQueryJobMetadata metadata will have jobId and resultLocation and other required
* params.
* @param asyncQueryRequestContext request context passed to AsyncQueryExecutorService
* @return A JSONObject containing the result data.
*/
JSONObject getResultFromResultIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ public OpenSearchJobExecutionResponseReader(Client client) {
}

@Override
public JSONObject getResultFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata, AsyncQueryRequestContext asyncQueryRequestContext) {
return searchInSparkIndex(QueryBuilders.termQuery(JOB_ID_FIELD, asyncQueryJobMetadata.getJobId()), asyncQueryJobMetadata.getResultIndex());
public JSONObject getResultFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
return searchInSparkIndex(
QueryBuilders.termQuery(JOB_ID_FIELD, asyncQueryJobMetadata.getJobId()),
asyncQueryJobMetadata.getResultIndex());
}

@Override
public JSONObject getResultWithQueryId(String queryId, String resultLocation, AsyncQueryRequestContext asyncQueryRequestContext) {
public JSONObject getResultWithQueryId(
String queryId, String resultLocation, AsyncQueryRequestContext asyncQueryRequestContext) {
return searchInSparkIndex(QueryBuilders.termQuery("queryId", queryId), resultLocation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,22 @@ private class AssertionHelper {
*/
new JobExecutionResponseReader() {
@Override
public JSONObject getResultFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata, AsyncQueryRequestContext asyncQueryRequestContext) {
return interaction.interact(new InteractionStep(emrClient, asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex()));
public JSONObject getResultFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
return interaction.interact(
new InteractionStep(
emrClient,
asyncQueryJobMetadata.getJobId(),
asyncQueryJobMetadata.getResultIndex()));
}

@Override
public JSONObject getResultWithQueryId(String queryId, String resultIndex, AsyncQueryRequestContext asyncQueryRequestContext) {
return interaction.interact(new InteractionStep(emrClient, queryId, resultIndex));
public JSONObject getResultWithQueryId(
String queryId,
String resultIndex,
AsyncQueryRequestContext asyncQueryRequestContext) {
return interaction.interact(new InteractionStep(emrClient, queryId, resultIndex));
}
});
this.createQueryResponse =
Expand Down Expand Up @@ -502,7 +511,7 @@ private InteractionStep(LocalEMRSClient emrClient, String queryId, String result
/** Simulate PPL plugin search query_execution_result */
JSONObject pluginSearchQueryResult() {
return new OpenSearchJobExecutionResponseReader(client)
.getResultWithQueryId(queryId, resultIndex, null);
.getResultWithQueryId(queryId, resultIndex, null);
}

/** Simulate EMR-S bulk writes query_execution_result with refresh = wait_for */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ public void testGetResultFromOpensearchIndex() {
new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F));
Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID));

assertFalse(jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null).isEmpty());
assertFalse(
jobExecutionResponseReader
.getResultFromResultIndex(
AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null)
.isEmpty());
}

@Test
Expand All @@ -65,7 +69,11 @@ public void testGetResultFromCustomIndex() {
new SearchHit[] {searchHit}, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F));
Mockito.when(searchHit.getSourceAsMap()).thenReturn(Map.of("stepId", EMR_JOB_ID));

assertFalse(jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null).isEmpty());
assertFalse(
jobExecutionResponseReader
.getResultFromResultIndex(
AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null)
.isEmpty());
}

@Test
Expand All @@ -77,7 +85,9 @@ public void testInvalidSearchResponse() {
RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null));
() ->
jobExecutionResponseReader.getResultFromResultIndex(
AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null));

Assertions.assertEquals(
"Fetching result from "
Expand All @@ -93,13 +103,18 @@ public void testSearchFailure() {

assertThrows(
RuntimeException.class,
() -> jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null));
() ->
jobExecutionResponseReader.getResultFromResultIndex(
AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).build(), null));
}

@Test
public void testIndexNotFoundException() {
when(client.search(any())).thenThrow(IndexNotFoundException.class);

assertTrue(jobExecutionResponseReader.getResultFromResultIndex(AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null).isEmpty());
assertTrue(
jobExecutionResponseReader
.getResultFromResultIndex(
AsyncQueryJobMetadata.builder().jobId(EMR_JOB_ID).resultIndex("foo").build(), null)
.isEmpty());
}
}

0 comments on commit fccc556

Please sign in to comment.