Skip to content

Commit

Permalink
Refactor IndexDMLHandler and related classes (#2644)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored May 6, 2024
1 parent b454a2c commit 45122ec
Show file tree
Hide file tree
Showing 20 changed files with 486 additions and 564 deletions.
6 changes: 2 additions & 4 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
Expand Down Expand Up @@ -227,8 +226,7 @@ public Collection<Object> createComponents(
environment.settings(),
dataSourceService,
injector.getInstance(FlintIndexMetadataServiceImpl.class),
injector.getInstance(StateStore.class),
injector.getInstance(EMRServerlessClientFactory.class));
injector.getInstance(FlintIndexOpFactory.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -37,8 +36,7 @@ public class ClusterManagerEventListener implements LocalNodeClusterManagerListe
private Clock clock;
private DataSourceService dataSourceService;
private FlintIndexMetadataService flintIndexMetadataService;
private StateStore stateStore;
private EMRServerlessClientFactory emrServerlessClientFactory;
private FlintIndexOpFactory flintIndexOpFactory;
private Duration sessionTtlDuration;
private Duration resultTtlDuration;
private TimeValue streamingJobHouseKeepingInterval;
Expand All @@ -56,17 +54,15 @@ public ClusterManagerEventListener(
Settings settings,
DataSourceService dataSourceService,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory) {
FlintIndexOpFactory flintIndexOpFactory) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.clusterService.addLocalNodeClusterManagerListener(this);
this.clock = clock;
this.dataSourceService = dataSourceService;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.flintIndexOpFactory = flintIndexOpFactory;
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
this.resultTtlDuration = toDuration(resultTtl.get(settings));
this.streamingJobHouseKeepingInterval = streamingJobHouseKeepingInterval.get(settings);
Expand Down Expand Up @@ -151,10 +147,7 @@ private void initializeStreamingJobHouseKeeperCron() {
flintStreamingJobHouseKeeperCron =
threadPool.scheduleWithFixedDelay(
new FlintStreamingJobHouseKeeperTask(
dataSourceService,
flintIndexMetadataService,
stateStore,
emrServerlessClientFactory),
dataSourceService, flintIndexMetadataService, flintIndexOpFactory),
streamingJobHouseKeepingInterval,
executorName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;

/** Cleaner task which alters the active streaming jobs of a disabled datasource. */
@RequiredArgsConstructor
public class FlintStreamingJobHouseKeeperTask implements Runnable {

private final DataSourceService dataSourceService;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final FlintIndexOpFactory flintIndexOpFactory;

private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobHouseKeeperTask.class);
protected static final AtomicBoolean isRunning = new AtomicBoolean(false);
Expand Down Expand Up @@ -95,9 +91,7 @@ private void dropAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
// When the datasource is deleted. Possibly Replace with VACUUM Operation.
LOGGER.info("Attempting to drop auto refresh index: {}", autoRefreshIndex);
FlintIndexOpDrop flintIndexOpDrop =
new FlintIndexOpDrop(stateStore, datasourceName, emrServerlessClientFactory.getClient());
flintIndexOpDrop.apply(flintIndexMetadata);
flintIndexOpFactory.getDrop(datasourceName).apply(flintIndexMetadata);
LOGGER.info("Successfully dropped index: {}", autoRefreshIndex);
}

Expand All @@ -106,14 +100,7 @@ private void alterAutoRefreshIndex(
LOGGER.info("Attempting to alter index: {}", autoRefreshIndex);
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false");
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
datasourceName,
emrServerlessClientFactory.getClient(),
flintIndexMetadataService);
flintIndexOpAlter.apply(flintIndexMetadata);
flintIndexOpFactory.getAlter(flintIndexOptions, datasourceName).apply(flintIndexMetadata);
LOGGER.info("Successfully altered index: {}", autoRefreshIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.Map;
Expand All @@ -16,24 +15,20 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpVacuum;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Index DML query. includes * DROP * ALT? */
Expand All @@ -45,15 +40,10 @@ public class IndexDMLHandler extends AsyncQueryHandler {
public static final String DROP_INDEX_JOB_ID = "dropIndexJobId";
public static final String DML_QUERY_JOB_ID = "DMLQueryJobId";

private final EMRServerlessClient emrServerlessClient;

private final JobExecutionResponseReader jobExecutionResponseReader;

private final FlintIndexMetadataService flintIndexMetadataService;

private final StateStore stateStore;

private final Client client;
private final IndexDMLResultStorageService indexDMLResultStorageService;
private final FlintIndexOpFactory flintIndexOpFactory;

public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
Expand All @@ -67,14 +57,16 @@ public DispatchQueryResponse submit(
try {
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = getFlintIndexMetadata(indexDetails);
executeIndexOp(dispatchQueryRequest, indexDetails, indexMetadata);

getIndexOp(dispatchQueryRequest, indexDetails).apply(indexMetadata);

AsyncQueryId asyncQueryId =
storeIndexDMLResult(
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.SUCCESS.toString(),
StringUtils.EMPTY,
startTime);
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
} catch (Exception e) {
Expand All @@ -85,7 +77,7 @@ public DispatchQueryResponse submit(
dataSourceMetadata,
JobRunState.FAILED.toString(),
e.getMessage(),
startTime);
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
}
Expand All @@ -96,46 +88,34 @@ private AsyncQueryId storeIndexDMLResult(
DataSourceMetadata dataSourceMetadata,
String status,
String error,
long startTime) {
long queryRunTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
new IndexDMLResult(
asyncQueryId.getId(),
status,
error,
dispatchQueryRequest.getDatasource(),
System.currentTimeMillis() - startTime,
queryRunTime,
System.currentTimeMillis());
createIndexDMLResult(stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult);
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult, dataSourceMetadata.getName());
return asyncQueryId;
}

private void executeIndexOp(
DispatchQueryRequest dispatchQueryRequest,
IndexQueryDetails indexQueryDetails,
FlintIndexMetadata indexMetadata) {
private long getElapsedTimeSince(long startTime) {
return System.currentTimeMillis() - startTime;
}

private FlintIndexOp getIndexOp(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
switch (indexQueryDetails.getIndexQueryActionType()) {
case DROP:
FlintIndexOp dropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
dropOp.apply(indexMetadata);
break;
return flintIndexOpFactory.getDrop(dispatchQueryRequest.getDatasource());
case ALTER:
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
indexQueryDetails.getFlintIndexOptions(),
stateStore,
dispatchQueryRequest.getDatasource(),
emrServerlessClient,
flintIndexMetadataService);
flintIndexOpAlter.apply(indexMetadata);
break;
return flintIndexOpFactory.getAlter(
indexQueryDetails.getFlintIndexOptions(), dispatchQueryRequest.getDatasource());
case VACUUM:
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
indexVacuumOp.apply(indexMetadata);
break;
return flintIndexOpFactory.getVacuum(dispatchQueryRequest.getDatasource());
default:
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
package org.opensearch.sql.spark.dispatcher;

import lombok.RequiredArgsConstructor;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

Expand All @@ -19,19 +19,19 @@ public class QueryHandlerFactory {

private final JobExecutionResponseReader jobExecutionResponseReader;
private final FlintIndexMetadataService flintIndexMetadataService;
private final Client client;
private final SessionManager sessionManager;
private final LeaseManager leaseManager;
private final StateStore stateStore;
private final IndexDMLResultStorageService indexDMLResultStorageService;
private final FlintIndexOpFactory flintIndexOpFactory;
private final EMRServerlessClientFactory emrServerlessClientFactory;

public RefreshQueryHandler getRefreshQueryHandler() {
return new RefreshQueryHandler(
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
flintIndexMetadataService,
stateStore,
leaseManager);
leaseManager,
flintIndexOpFactory);
}

public StreamingQueryHandler getStreamingQueryHandler() {
Expand All @@ -50,10 +50,9 @@ public InteractiveQueryHandler getInteractiveQueryHandler() {

public IndexDMLHandler getIndexDMLHandler() {
return new IndexDMLHandler(
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
flintIndexMetadataService,
stateStore,
client);
indexDMLResultStorageService,
flintIndexOpFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,28 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Refresh Query. */
public class RefreshQueryHandler extends BatchQueryHandler {

private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;
private final FlintIndexOpFactory flintIndexOpFactory;

public RefreshQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
LeaseManager leaseManager) {
LeaseManager leaseManager,
FlintIndexOpFactory flintIndexOpFactory) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClient = emrServerlessClient;
this.flintIndexOpFactory = flintIndexOpFactory;
}

@Override
Expand All @@ -51,8 +48,7 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
"Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName()));
}
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
FlintIndexOp jobCancelOp = flintIndexOpFactory.getCancel(datasourceName);
jobCancelOp.apply(indexMetadata);
return asyncQueryJobMetadata.getQueryId().getId();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;

public interface IndexDMLResultStorageService {
IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName);
}
Loading

0 comments on commit 45122ec

Please sign in to comment.