Skip to content

Commit

Permalink
Fix integ test
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Jun 28, 2024
1 parent 5dac199 commit f2f0cf5
Showing 1 changed file with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -77,6 +77,8 @@
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkParameterComposerCollection;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
Expand Down Expand Up @@ -110,6 +112,7 @@ public class AsyncQueryCoreIntegTest {
@Mock FlintIndexClient flintIndexClient;
@Mock AsyncQueryRequestContext asyncQueryRequestContext;
@Mock MetricsService metricsService;
@Mock SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

// storage services
@Mock AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService;
Expand All @@ -134,6 +137,16 @@ public class AsyncQueryCoreIntegTest {
public void setUp() {
emrServerlessClientFactory =
() -> new EmrServerlessClientImpl(awsemrServerless, metricsService);
SparkParameterComposerCollection collection = new SparkParameterComposerCollection();
collection.register(
DataSourceType.S3GLUE,
(dataSourceMetadata, sparkSubmitParameters, dispatchQueryRequest, context) ->
sparkSubmitParameters.setConfigItem(
"key.from.datasource.composer", "value.from.datasource.composer"));
collection.register(
(sparkSubmitParameters, dispatchQueryRequest, context) ->
sparkSubmitParameters.setConfigItem(
"key.from.generic.composer", "value.from.generic.composer"));
SessionManager sessionManager =
new SessionManager(
sessionStorageService,
Expand All @@ -156,7 +169,8 @@ public void setUp() {
indexDMLResultStorageService,
flintIndexOpFactory,
emrServerlessClientFactory,
metricsService);
metricsService,
new SparkSubmitParametersBuilderProvider(collection));
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
dataSourceService, sessionManager, queryHandlerFactory, queryIdProvider);
Expand Down Expand Up @@ -272,7 +286,11 @@ private void verifyStartJobRunCalled() {
verify(awsemrServerless).startJobRun(startJobRunRequestArgumentCaptor.capture());
StartJobRunRequest startJobRunRequest = startJobRunRequestArgumentCaptor.getValue();
assertEquals(APPLICATION_ID, startJobRunRequest.getApplicationId());
assertNotNull(startJobRunRequest.getJobDriver().getSparkSubmit().getSparkSubmitParameters());
String submitParameters =
startJobRunRequest.getJobDriver().getSparkSubmit().getSparkSubmitParameters();
assertTrue(
submitParameters.contains("key.from.datasource.composer=value.from.datasource.composer"));
assertTrue(submitParameters.contains("key.from.generic.composer=value.from.generic.composer"));
}

@Test
Expand Down

0 comments on commit f2f0cf5

Please sign in to comment.