Skip to content

Commit

Permalink
Call LeaseManager for BatchQuery
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Nov 13, 2024
1 parent cfe38d7 commit dba09c4
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
Expand Down Expand Up @@ -75,13 +76,22 @@ public String cancelJob(
return asyncQueryJobMetadata.getQueryId();
}

/**
* This method allows RefreshQueryHandler to override the job type when calling leaseManager.borrow.
*/
protected void borrow(String datasource) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, datasource));
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
String clusterName = dispatchQueryRequest.getClusterName();
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();

this.borrow(dispatchQueryRequest.getDatasource());

tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ public String cancelJob(
return asyncQueryJobMetadata.getQueryId();
}

@Override
protected void borrow(String datasource) {
leaseManager.borrow(new LeaseRequest(JobType.REFRESH, datasource));
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.REFRESH, dispatchQueryRequest.getDatasource()));

DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkParameterComposerCollection;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
Expand Down Expand Up @@ -137,6 +138,7 @@ public class AsyncQueryCoreIntegTest {
@Captor ArgumentCaptor<FlintIndexOptions> flintIndexOptionsArgumentCaptor;
@Captor ArgumentCaptor<StartJobRunRequest> startJobRunRequestArgumentCaptor;
@Captor ArgumentCaptor<CreateSessionRequest> createSessionRequestArgumentCaptor;
@Captor ArgumentCaptor<LeaseRequest> leaseRequestArgumentCaptor;

AsyncQueryExecutorService asyncQueryExecutorService;

Expand Down Expand Up @@ -267,7 +269,8 @@ public void createVacuumIndexQuery() {
assertEquals(SESSION_ID, response.getSessionId());
verifyGetQueryIdCalled();
verifyGetSessionIdCalled();
verify(leaseManager).borrow(any());
verify(leaseManager).borrow(leaseRequestArgumentCaptor.capture());
assertEquals(JobType.INTERACTIVE, leaseRequestArgumentCaptor.getValue().getJobType());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.INTERACTIVE);
}
Expand Down Expand Up @@ -356,11 +359,38 @@ public void createStreamingQuery() {
assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
verifyGetQueryIdCalled();
verify(leaseManager).borrow(any());
verify(leaseManager).borrow(leaseRequestArgumentCaptor.capture());
assertEquals(JobType.STREAMING, leaseRequestArgumentCaptor.getValue().getJobType());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.STREAMING);
}

@Test
public void createBatchQuery() {
givenSparkExecutionEngineConfigIsSupplied();
givenValidDataSourceMetadataExist();
when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID);
when(awsemrServerless.startJobRun(any()))
.thenReturn(new StartJobRunResult().withApplicationId(APPLICATION_ID).withJobRunId(JOB_ID));

CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(
"CREATE INDEX index_name ON table_name(l_orderkey, l_quantity)"
+ " WITH (auto_refresh = false)",
DATASOURCE_NAME,
LangType.SQL),
asyncQueryRequestContext);

assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
verifyGetQueryIdCalled();
verify(leaseManager).borrow(leaseRequestArgumentCaptor.capture());
assertEquals(JobType.BATCH, leaseRequestArgumentCaptor.getValue().getJobType());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.BATCH);
}

private void verifyStartJobRunCalled() {
verify(awsemrServerless).startJobRun(startJobRunRequestArgumentCaptor.capture());
StartJobRunRequest startJobRunRequest = startJobRunRequestArgumentCaptor.getValue();
Expand Down Expand Up @@ -413,7 +443,8 @@ public void createRefreshQuery() {
assertEquals(QUERY_ID, response.getQueryId());
assertNull(response.getSessionId());
verifyGetQueryIdCalled();
verify(leaseManager).borrow(any());
verify(leaseManager).borrow(leaseRequestArgumentCaptor.capture());
assertEquals(JobType.REFRESH, leaseRequestArgumentCaptor.getValue().getJobType());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.REFRESH);
}
Expand All @@ -439,7 +470,8 @@ public void createInteractiveQuery() {
assertEquals(SESSION_ID, response.getSessionId());
verifyGetQueryIdCalled();
verifyGetSessionIdCalled();
verify(leaseManager).borrow(any());
verify(leaseManager).borrow(leaseRequestArgumentCaptor.capture());
assertEquals(JobType.INTERACTIVE, leaseRequestArgumentCaptor.getValue().getJobType());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID, QueryState.WAITING, JobType.INTERACTIVE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public String description() {

@Override
public boolean test(LeaseRequest leaseRequest) {
if (leaseRequest.getJobType() == JobType.INTERACTIVE) {
if (leaseRequest.getJobType() != JobType.REFRESH && leaseRequest.getJobType() != JobType.STREAMING) {
return true;
}
return activeRefreshJobCount(stateStore, ALL_DATASOURCE).get() < refreshJobLimit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ public void concurrentSessionRuleOnlyApplyToInteractiveQuery() {
}

@Test
public void concurrentRefreshRuleOnlyNotAppliedToInteractiveQuery() {
public void concurrentRefreshRuleNotAppliedToInteractiveAndBatchQuery() {
assertTrue(
new DefaultLeaseManager.ConcurrentRefreshJobRule(settings, stateStore)
.test(new LeaseRequest(JobType.INTERACTIVE, "mys3")));
assertTrue(
new DefaultLeaseManager.ConcurrentRefreshJobRule(settings, stateStore)
.test(new LeaseRequest(JobType.BATCH, "mys3")));
}
}

0 comments on commit dba09c4

Please sign in to comment.