Skip to content

Commit

Permalink
Fix handler for existing query
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Sep 4, 2024
1 parent c13f770 commit f83351a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public String cancelJob(
@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));
leaseManager.borrow(new LeaseRequest(JobType.REFRESH, dispatchQueryRequest.getDatasource()));

DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
Expand All @@ -83,7 +83,7 @@ public DispatchQueryResponse submit(
.resultIndex(resp.getResultIndex())
.sessionId(resp.getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.BATCH)
.jobType(JobType.REFRESH)
.indexName(context.getIndexQueryDetails().openSearchIndexName())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private AsyncQueryHandler getAsyncQueryHandlerForExistingQuery(
return queryHandlerFactory.getInteractiveQueryHandler();
} else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) {
return queryHandlerFactory.getIndexDMLHandler();
} else if (asyncQueryJobMetadata.getJobType() == JobType.BATCH) {
} else if (asyncQueryJobMetadata.getJobType() == JobType.REFRESH) {
return queryHandlerFactory.getRefreshQueryHandler(asyncQueryJobMetadata.getAccountId());
} else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) {
return queryHandlerFactory.getStreamingQueryHandler(asyncQueryJobMetadata.getAccountId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
public enum JobType {
INTERACTIVE("interactive"),
STREAMING("streaming"),
REFRESH("refresh"),
BATCH("batch");

private String text;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ void testDispatchCreateManualRefreshIndexQuery() {
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
Assertions.assertEquals(JobType.BATCH, dispatchQueryResponse.getJobType());
verifyNoInteractions(flintIndexMetadataService);
}

Expand Down Expand Up @@ -661,6 +662,7 @@ void testRefreshIndexQuery() {
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
Assertions.assertEquals(JobType.REFRESH, dispatchQueryResponse.getJobType());
verifyNoInteractions(flintIndexMetadataService);
}

Expand Down Expand Up @@ -831,12 +833,7 @@ void testDispatchWithUnSupportedDataSourceType() {

@Test
void testCancelJob() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, false))
.thenReturn(
new CancelJobRunResult()
.withJobRunId(EMR_JOB_ID)
.withApplicationId(EMRS_APPLICATION_ID));
givenCancelJobRunSucceed();

String queryId =
sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata(), asyncQueryRequestContext);
Expand Down Expand Up @@ -897,17 +894,32 @@ void testCancelQueryWithInvalidStatementId() {

@Test
void testCancelQueryWithNoSessionId() {
givenCancelJobRunSucceed();

String queryId =
sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata(), asyncQueryRequestContext);

Assertions.assertEquals(QUERY_ID, queryId);
}

@Test
void testCancelBatchJob() {
givenCancelJobRunSucceed();

String queryId =
sparkQueryDispatcher.cancelJob(
asyncQueryJobMetadata(JobType.BATCH), asyncQueryRequestContext);

Assertions.assertEquals(QUERY_ID, queryId);
}

private void givenCancelJobRunSucceed() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, false))
.thenReturn(
new CancelJobRunResult()
.withJobRunId(EMR_JOB_ID)
.withApplicationId(EMRS_APPLICATION_ID));

String queryId =
sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata(), asyncQueryRequestContext);

Assertions.assertEquals(QUERY_ID, queryId);
}

@Test
Expand Down Expand Up @@ -1154,11 +1166,16 @@ private DispatchQueryRequest dispatchQueryRequestWithSessionId(String query, Str
}

private AsyncQueryJobMetadata asyncQueryJobMetadata() {
return asyncQueryJobMetadata(JobType.INTERACTIVE);
}

private AsyncQueryJobMetadata asyncQueryJobMetadata(JobType jobType) {
return AsyncQueryJobMetadata.builder()
.queryId(QUERY_ID)
.applicationId(EMRS_APPLICATION_ID)
.jobId(EMR_JOB_ID)
.datasourceName(MY_GLUE)
.jobType(jobType)
.build();
}

Expand Down

0 comments on commit f83351a

Please sign in to comment.