Skip to content

Commit

Permalink
Add UT
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 3, 2024
1 parent 7edf0d3 commit 5ab5fe5
Show file tree
Hide file tree
Showing 16 changed files with 76 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import java.util.List;
import java.util.Optional;
import lombok.AllArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
Expand All @@ -37,13 +35,10 @@ public class AsyncQueryExecutorServiceImpl implements AsyncQueryExecutorService
private SparkQueryDispatcher sparkQueryDispatcher;
private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;

private static final Logger LOGGER = LogManager.getLogger(AsyncQueryExecutorServiceImpl.class);

@Override
public CreateAsyncQueryResponse createAsyncQuery(
CreateAsyncQueryRequest createAsyncQueryRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
LOGGER.info("CreateAsyncQueryRequest: " + createAsyncQueryRequest.getQuery());
SparkExecutionEngineConfig sparkExecutionEngineConfig =
sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(asyncQueryRequestContext);
DispatchQueryResponse dispatchQueryResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
Expand Down Expand Up @@ -42,8 +40,6 @@ public class BatchQueryHandler extends AsyncQueryHandler {
protected final MetricsService metricsService;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

private static final Logger LOGGER = LogManager.getLogger(BatchQueryHandler.class);

@Override
protected JSONObject getResponseFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata,
Expand Down Expand Up @@ -106,9 +102,6 @@ public DispatchQueryResponse submit(
tags,
false,
dataSourceMetadata.getResultIndex());

LOGGER.info("Submit batch query: " + dispatchQueryRequest.getQuery());

String jobId = emrServerlessClient.startJobRun(startJobRequest);
metricsService.incrementNumericalMetric(EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
return DispatchQueryResponse.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@RequiredArgsConstructor
public class QueryHandlerFactory {
Expand All @@ -28,7 +27,6 @@ public class QueryHandlerFactory {
private final FlintIndexOpFactory flintIndexOpFactory;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final MetricsService metricsService;
private final AsyncQueryScheduler asyncQueryScheduler;
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

public RefreshQueryHandler getRefreshQueryHandler(String accountId) {
Expand All @@ -48,7 +46,6 @@ public StreamingQueryHandler getStreamingQueryHandler(String accountId) {
jobExecutionResponseReader,
leaseManager,
metricsService,
asyncQueryScheduler,
sparkSubmitParametersBuilderProvider);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
package org.opensearch.sql.spark.dispatcher;

import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
Expand All @@ -31,7 +29,6 @@
* index, and new job is submitted to Spark.
*/
public class RefreshQueryHandler extends BatchQueryHandler {
private static final Logger LOGGER = LogManager.getLogger(RefreshQueryHandler.class);

private final FlintIndexMetadataService flintIndexMetadataService;
private final FlintIndexOpFactory flintIndexOpFactory;
Expand Down Expand Up @@ -78,8 +75,6 @@ public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));

LOGGER.info("Submit refresh query: " + dispatchQueryRequest.getQuery());

DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
return DispatchQueryResponse.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT;

import java.time.Instant;
import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
Expand All @@ -25,31 +25,24 @@
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/**
* The handler for streaming query. Streaming query is a job to continuously update flint index.
* Once started, the job can be stopped by IndexDML query.
*/
public class StreamingQueryHandler extends BatchQueryHandler {
private final AsyncQueryScheduler asyncQueryScheduler;

public StreamingQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
LeaseManager leaseManager,
MetricsService metricsService,
AsyncQueryScheduler asyncQueryScheduler,
SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider) {
super(
emrServerlessClient,
jobExecutionResponseReader,
leaseManager,
metricsService,
sparkSubmitParametersBuilderProvider);
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -67,53 +60,38 @@ public DispatchQueryResponse submit(

leaseManager.borrow(new LeaseRequest(JobType.STREAMING, dispatchQueryRequest.getDatasource()));

// String clusterName = dispatchQueryRequest.getClusterName();
String clusterName = dispatchQueryRequest.getClusterName();
IndexQueryDetails indexQueryDetails = context.getIndexQueryDetails();
Map<String, String> tags = context.getTags();
tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName());
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
// String jobName =
// clusterName
// + ":"
// + JobType.STREAMING.getText()
// + ":"
// + indexQueryDetails.openSearchIndexName();
// StartJobRequest startJobRequest =
// new StartJobRequest(
// jobName,
// dispatchQueryRequest.getAccountId(),
// dispatchQueryRequest.getApplicationId(),
// dispatchQueryRequest.getExecutionRoleARN(),
// sparkSubmitParametersBuilderProvider
// .getSparkSubmitParametersBuilder()
// .clusterName(clusterName)
// .query(dispatchQueryRequest.getQuery())
// .structuredStreaming(true)
// .dataSource(
// dataSourceMetadata, dispatchQueryRequest,
// context.getAsyncQueryRequestContext())
// .acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
// .acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext())
// .toString(),
// tags,
// indexQueryDetails.getFlintIndexOptions().autoRefresh(),
// dataSourceMetadata.getResultIndex());
// String jobId = emrServerlessClient.startJobRun(startJobRequest);
AsyncQuerySchedulerRequest request =
new AsyncQuerySchedulerRequest(
String jobName =
clusterName
+ ":"
+ JobType.STREAMING.getText()
+ ":"
+ indexQueryDetails.openSearchIndexName();
StartJobRequest startJobRequest =
new StartJobRequest(
jobName,
dispatchQueryRequest.getAccountId(),
"flint_myglue_test_default_count_by_status",
dataSourceMetadata.getName(),
"REFRESH MATERIALIZED VIEW myglue_test.default.count_by_status",
LangType.SQL,
"1 minute",
true,
Instant.now(),
Instant.now(),
-1L,
0.0);
asyncQueryScheduler.scheduleJob(request);
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.clusterName(clusterName)
.query(dispatchQueryRequest.getQuery())
.structuredStreaming(true)
.dataSource(
dataSourceMetadata, dispatchQueryRequest, context.getAsyncQueryRequestContext())
.acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier())
.acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext())
.toString(),
tags,
indexQueryDetails.getFlintIndexOptions().autoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
metricsService.incrementNumericalMetric(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT);
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public void setUp() {
flintIndexStateModelService,
flintIndexClient,
flintIndexMetadataService,
emrServerlessClientFactory);
emrServerlessClientFactory,
asyncQueryScheduler);
QueryHandlerFactory queryHandlerFactory =
new QueryHandlerFactory(
jobExecutionResponseReader,
Expand All @@ -172,7 +173,6 @@ public void setUp() {
flintIndexOpFactory,
emrServerlessClientFactory,
metricsService,
asyncQueryScheduler,
new SparkSubmitParametersBuilderProvider(collection));
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
Expand Down Expand Up @@ -519,6 +519,7 @@ private void givenFlintIndexMetadataExists(String indexName) {
.appId(APPLICATION_ID)
.jobId(JOB_ID)
.opensearchIndexName(indexName)
.flintIndexOptions(new FlintIndexOptions())
.build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@ExtendWith(MockitoExtension.class)
class FlintIndexOpFactoryTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@ExtendWith(MockitoExtension.class)
class FlintIndexOpVacuumTest {
Expand Down
2 changes: 1 addition & 1 deletion async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ jacocoTestCoverageVerification {
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser',
'org.opensearch.sql.spark.scheduler.OpenSearchScheduleQueryJobRequestParser',
'org.opensearch.sql.spark.transport.model.*'
]
limit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@

package org.opensearch.sql.spark.config;

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.common.setting.Settings;

/** Load SparkExecutionEngineConfigClusterSetting from settings with privilege check. */
Expand All @@ -15,20 +20,17 @@ public class SparkExecutionEngineConfigClusterSettingLoader {
private final Settings settings;

public Optional<SparkExecutionEngineConfigClusterSetting> load() {
// String sparkExecutionEngineConfigSettingString =
// this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
SparkExecutionEngineConfigClusterSetting setting =
new SparkExecutionEngineConfigClusterSetting("test", "test", "test", "test", "test");
return Optional.of(setting);
// if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) {
// return Optional.of(
// AccessController.doPrivileged(
// (PrivilegedAction<SparkExecutionEngineConfigClusterSetting>)
// () ->
// SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(
// sparkExecutionEngineConfigSettingString)));
// } else {
// return Optional.empty();
// }
String sparkExecutionEngineConfigSettingString =
this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) {
return Optional.of(
AccessController.doPrivileged(
(PrivilegedAction<SparkExecutionEngineConfigClusterSetting>)
() ->
SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigSettingString)));
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.scheduler.job.AsyncQueryScheduledQueryJob;
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJob;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;

Expand Down Expand Up @@ -197,6 +197,6 @@ private void assertIndexExists() {

/** Returns the job runner instance for the scheduler. */
public static ScheduledJobRunner getJobRunner() {
return AsyncQueryScheduledQueryJob.getJobRunnerInstance();
return ScheduledAsyncQueryJob.getJobRunnerInstance();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.opensearch.threadpool.ThreadPool;

/**
* The job runner class for scheduling refresh index query.
* The job runner class for scheduling async query.
*
* <p>The job runner should be a singleton class if it uses OpenSearch client or other objects
* passed from OpenSearch. Because when registering the job runner to JobScheduler plugin,
Expand All @@ -33,12 +33,12 @@
* and using singleton job runner to ensure we register a usable job runner instance to JobScheduler
* plugin.
*/
public class AsyncQueryScheduledQueryJob implements ScheduledJobRunner {
private static final Logger LOGGER = LogManager.getLogger(AsyncQueryScheduledQueryJob.class);
public class ScheduledAsyncQueryJob implements ScheduledJobRunner {
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJob.class);

public static AsyncQueryScheduledQueryJob INSTANCE = new AsyncQueryScheduledQueryJob();
public static ScheduledAsyncQueryJob INSTANCE = new ScheduledAsyncQueryJob();

public static AsyncQueryScheduledQueryJob getJobRunnerInstance() {
public static ScheduledAsyncQueryJob getJobRunnerInstance() {
return INSTANCE;
}

Expand All @@ -47,7 +47,7 @@ public static AsyncQueryScheduledQueryJob getJobRunnerInstance() {
private Client client;
private AsyncQueryExecutorService asyncQueryExecutorService;

private AsyncQueryScheduledQueryJob() {
private ScheduledAsyncQueryJob() {
// Singleton class, use getJobRunnerInstance method instead of constructor
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ public QueryHandlerFactory queryhandlerFactory(
FlintIndexOpFactory flintIndexOpFactory,
EMRServerlessClientFactory emrServerlessClientFactory,
MetricsService metricsService,
AsyncQueryScheduler asyncQueryScheduler,
SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider) {
return new QueryHandlerFactory(
openSearchJobExecutionResponseReader,
Expand All @@ -131,7 +130,6 @@ public QueryHandlerFactory queryhandlerFactory(
flintIndexOpFactory,
emrServerlessClientFactory,
metricsService,
asyncQueryScheduler,
sparkSubmitParametersBuilderProvider);
}

Expand Down
Loading

0 comments on commit 5ab5fe5

Please sign in to comment.