Skip to content

Commit

Permalink
Add UT
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 3, 2024
1 parent 7edf0d3 commit 6b24d2f
Show file tree
Hide file tree
Showing 17 changed files with 87 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import java.util.List;
import java.util.Optional;
import lombok.AllArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
Expand All @@ -37,13 +35,10 @@ public class AsyncQueryExecutorServiceImpl implements AsyncQueryExecutorService
private SparkQueryDispatcher sparkQueryDispatcher;
private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;

private static final Logger LOGGER = LogManager.getLogger(AsyncQueryExecutorServiceImpl.class);

@Override
public CreateAsyncQueryResponse createAsyncQuery(
CreateAsyncQueryRequest createAsyncQueryRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
LOGGER.info("CreateAsyncQueryRequest: " + createAsyncQueryRequest.getQuery());
SparkExecutionEngineConfig sparkExecutionEngineConfig =
sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(asyncQueryRequestContext);
DispatchQueryResponse dispatchQueryResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
Expand Down Expand Up @@ -42,8 +40,6 @@ public class BatchQueryHandler extends AsyncQueryHandler {
protected final MetricsService metricsService;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

private static final Logger LOGGER = LogManager.getLogger(BatchQueryHandler.class);

@Override
protected JSONObject getResponseFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata,
Expand Down Expand Up @@ -106,9 +102,6 @@ public DispatchQueryResponse submit(
tags,
false,
dataSourceMetadata.getResultIndex());

LOGGER.info("Submit batch query: " + dispatchQueryRequest.getQuery());

String jobId = emrServerlessClient.startJobRun(startJobRequest);
metricsService.incrementNumericalMetric(EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
return DispatchQueryResponse.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@RequiredArgsConstructor
public class QueryHandlerFactory {
Expand All @@ -28,7 +27,6 @@ public class QueryHandlerFactory {
private final FlintIndexOpFactory flintIndexOpFactory;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final MetricsService metricsService;
private final AsyncQueryScheduler asyncQueryScheduler;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

public RefreshQueryHandler getRefreshQueryHandler(String accountId) {
Expand All @@ -48,7 +46,6 @@ public StreamingQueryHandler getStreamingQueryHandler(String accountId) {
jobExecutionResponseReader,
leaseManager,
metricsService,
asyncQueryScheduler,
sparkSubmitParametersBuilderProvider);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.opensearch.sql.spark.dispatcher;

import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
Expand All @@ -31,7 +29,6 @@
* index, and new job is submitted to Spark.
*/
public class RefreshQueryHandler extends BatchQueryHandler {
private static final Logger LOGGER = LogManager.getLogger(RefreshQueryHandler.class);

private final FlintIndexMetadataService flintIndexMetadataService;
private final FlintIndexOpFactory flintIndexOpFactory;
Expand Down Expand Up @@ -78,8 +75,6 @@ public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));

LOGGER.info("Submit refresh query: " + dispatchQueryRequest.getQuery());

DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
return DispatchQueryResponse.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT;

import java.time.Instant;
import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
Expand All @@ -25,31 +25,25 @@
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/**
* The handler for streaming query. Streaming query is a job to continuously update flint index.
* Once started, the job can be stopped by IndexDML query.
*/
public class StreamingQueryHandler extends BatchQueryHandler {
private final AsyncQueryScheduler asyncQueryScheduler;

public StreamingQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
LeaseManager leaseManager,
MetricsService metricsService,
AsyncQueryScheduler asyncQueryScheduler,
SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider) {
super(
emrServerlessClient,
jobExecutionResponseReader,
leaseManager,
metricsService,
sparkSubmitParametersBuilderProvider);
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -67,57 +61,42 @@ public DispatchQueryResponse submit(

leaseManager.borrow(new LeaseRequest(JobType.STREAMING, dispatchQueryRequest.getDatasource()));

// String clusterName = dispatchQueryRequest.getClusterName();
String clusterName = dispatchQueryRequest.getClusterName();
IndexQueryDetails indexQueryDetails = context.getIndexQueryDetails();
Map<String, String> tags = context.getTags();
tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName());
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
// String jobName =
// clusterName
// + ":"
// + JobType.STREAMING.getText()
// + ":"
// + indexQueryDetails.openSearchIndexName();
// StartJobRequest startJobRequest =
// new StartJobRequest(
// jobName,
// dispatchQueryRequest.getAccountId(),
// dispatchQueryRequest.getApplicationId(),
// dispatchQueryRequest.getExecutionRoleARN(),
// sparkSubmitParametersBuilderProvider
// .getSparkSubmitParametersBuilder()
// .clusterName(clusterName)
// .query(dispatchQueryRequest.getQuery())
// .structuredStreaming(true)
// .dataSource(
// dataSourceMetadata, dispatchQueryRequest,
// context.getAsyncQueryRequestContext())
// .acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
// .acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext())
// .toString(),
// tags,
// indexQueryDetails.getFlintIndexOptions().autoRefresh(),
// dataSourceMetadata.getResultIndex());
// String jobId = emrServerlessClient.startJobRun(startJobRequest);
AsyncQuerySchedulerRequest request =
new AsyncQuerySchedulerRequest(
String jobName =
clusterName
+ ":"
+ JobType.STREAMING.getText()
+ ":"
+ indexQueryDetails.openSearchIndexName();
StartJobRequest startJobRequest =
new StartJobRequest(
jobName,
dispatchQueryRequest.getAccountId(),
"flint_myglue_test_default_count_by_status",
dataSourceMetadata.getName(),
"REFRESH MATERIALIZED VIEW myglue_test.default.count_by_status",
LangType.SQL,
"1 minute",
true,
Instant.now(),
Instant.now(),
-1L,
0.0);
asyncQueryScheduler.scheduleJob(request);
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.clusterName(clusterName)
.query(dispatchQueryRequest.getQuery())
.structuredStreaming(true)
.dataSource(
dataSourceMetadata, dispatchQueryRequest, context.getAsyncQueryRequestContext())
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
.acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext())
.toString(),
tags,
indexQueryDetails.getFlintIndexOptions().autoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
metricsService.incrementNumericalMetric(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT);
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId("test")
.jobId(jobId)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.STREAMING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public void setUp() {
flintIndexStateModelService,
flintIndexClient,
flintIndexMetadataService,
emrServerlessClientFactory);
emrServerlessClientFactory,
asyncQueryScheduler);
QueryHandlerFactory queryHandlerFactory =
new QueryHandlerFactory(
jobExecutionResponseReader,
Expand All @@ -172,7 +173,6 @@ public void setUp() {
flintIndexOpFactory,
emrServerlessClientFactory,
metricsService,
asyncQueryScheduler,
new SparkSubmitParametersBuilderProvider(collection));
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
Expand Down Expand Up @@ -519,6 +519,7 @@ private void givenFlintIndexMetadataExists(String indexName) {
.appId(APPLICATION_ID)
.jobId(JOB_ID)
.opensearchIndexName(indexName)
.flintIndexOptions(new FlintIndexOptions())
.build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ void setUp() {
flintIndexOpFactory,
emrServerlessClientFactory,
metricsService,
asyncQueryScheduler,
sparkSubmitParametersBuilderProvider);
sparkQueryDispatcher =
new SparkQueryDispatcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@ExtendWith(MockitoExtension.class)
class FlintIndexOpFactoryTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@ExtendWith(MockitoExtension.class)
class FlintIndexOpVacuumTest {
Expand All @@ -31,9 +33,16 @@ class FlintIndexOpVacuumTest {
public static final String LATEST_ID = "LATEST_ID";
public static final String INDEX_NAME = "INDEX_NAME";
public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITH_LATEST_ID =
FlintIndexMetadata.builder().latestId(LATEST_ID).opensearchIndexName(INDEX_NAME).build();
FlintIndexMetadata.builder()
.latestId(LATEST_ID)
.opensearchIndexName(INDEX_NAME)
.flintIndexOptions(new FlintIndexOptions())
.build();
public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITHOUT_LATEST_ID =
FlintIndexMetadata.builder().opensearchIndexName(INDEX_NAME).build();
FlintIndexMetadata.builder()
.opensearchIndexName(INDEX_NAME)
.flintIndexOptions(new FlintIndexOptions())
.build();
@Mock FlintIndexClient flintIndexClient;
@Mock FlintIndexStateModelService flintIndexStateModelService;
@Mock EMRServerlessClientFactory emrServerlessClientFactory;
Expand Down
2 changes: 1 addition & 1 deletion async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ jacocoTestCoverageVerification {
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser',
'org.opensearch.sql.spark.scheduler.OpenSearchScheduleQueryJobRequestParser',
'org.opensearch.sql.spark.transport.model.*'
]
limit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@

package org.opensearch.sql.spark.config;

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.common.setting.Settings;

/** Load SparkExecutionEngineConfigClusterSetting from settings with privilege check. */
Expand All @@ -15,20 +20,17 @@ public class SparkExecutionEngineConfigClusterSettingLoader {
private final Settings settings;

public Optional<SparkExecutionEngineConfigClusterSetting> load() {
// String sparkExecutionEngineConfigSettingString =
// this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
SparkExecutionEngineConfigClusterSetting setting =
new SparkExecutionEngineConfigClusterSetting("test", "test", "test", "test", "test");
return Optional.of(setting);
// if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) {
// return Optional.of(
// AccessController.doPrivileged(
// (PrivilegedAction<SparkExecutionEngineConfigClusterSetting>)
// () ->
// SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(
// sparkExecutionEngineConfigSettingString)));
// } else {
// return Optional.empty();
// }
String sparkExecutionEngineConfigSettingString =
this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) {
return Optional.of(
AccessController.doPrivileged(
(PrivilegedAction<SparkExecutionEngineConfigClusterSetting>)
() ->
SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigSettingString)));
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.scheduler.job.AsyncQueryScheduledQueryJob;
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJob;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;

Expand Down Expand Up @@ -197,6 +197,6 @@ private void assertIndexExists() {

/** Returns the job runner instance for the scheduler. */
public static ScheduledJobRunner getJobRunner() {
return AsyncQueryScheduledQueryJob.getJobRunnerInstance();
return ScheduledAsyncQueryJob.getJobRunnerInstance();
}
}
Loading

0 comments on commit 6b24d2f

Please sign in to comment.