Skip to content

Commit

Permalink
Restrict the scope of cancel API (#2548)
Browse files Browse the repository at this point in the history
* Restric cancel the scope of cancel API

Signed-off-by: Peng Huo <[email protected]>

* Fix UT, batch query only been used for REFRESH

Signed-off-by: Peng Huo <[email protected]>

* Update style

Signed-off-by: Peng Huo <[email protected]>

* support cancel refresh query

Signed-off-by: Peng Huo <[email protected]>

* fix UT

Signed-off-by: Peng Huo <[email protected]>

* refactor code

Signed-off-by: Peng Huo <[email protected]>

* update doc

Signed-off-by: Peng Huo <[email protected]>

* refactor code

Signed-off-by: Peng Huo <[email protected]>

---------

Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored Mar 13, 2024
1 parent ec8a066 commit a84c3ef
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 19 deletions.
3 changes: 3 additions & 0 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ Async Query Cancellation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/delete``.

Limitation: Flint index creation statement with auto_refresh = true can not be cancelled. User could submit ALTER statement to stop auto refresh query.
- flint index creation statement include, CREATE SKIPPING INDEX / CREATE INDEX / CREATE MATERIALIZED VIEW

HTTP URI: ``_plugins/_async_query/{queryId}``

HTTP VERB: ``DELETE``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public CreateAsyncQueryResponse createAsyncQuery(
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId()));
dispatchQueryResponse.getSessionId(),
dispatchQueryResponse.getDatasourceName(),
dispatchQueryResponse.getJobType(),
dispatchQueryResponse.getIndexName()));
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,44 @@
package org.opensearch.sql.spark.asyncquery.model;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID;

import com.google.gson.Gson;
import java.io.IOException;
import java.util.Locale;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** This class models all the metadata required for a job. */
@Data
@EqualsAndHashCode(callSuper = false)
public class AsyncQueryJobMetadata extends StateModel {
public static final String TYPE_JOBMETA = "jobmeta";
public static final String JOB_TYPE = "jobType";
public static final String INDEX_NAME = "indexName";

private final AsyncQueryId queryId;
private final String applicationId;
private final String jobId;
private final String resultIndex;
// optional sessionId.
private final String sessionId;
// since 2.13
// jobType could be null before OpenSearch 2.12. SparkQueryDispatcher use jobType to choose
// cancel query handler. if jobType is null, it will invoke BatchQueryHandler.cancel().
private final JobType jobType;
// null if JobType is null
private final String datasourceName;
// null if JobType is INTERACTIVE or null
private final String indexName;

@EqualsAndHashCode.Exclude private final long seqNo;
@EqualsAndHashCode.Exclude private final long primaryTerm;
Expand All @@ -44,6 +58,9 @@ public AsyncQueryJobMetadata(
jobId,
resultIndex,
null,
null,
JobType.INTERACTIVE,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}
Expand All @@ -60,6 +77,31 @@ public AsyncQueryJobMetadata(
jobId,
resultIndex,
sessionId,
null,
JobType.INTERACTIVE,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName) {
this(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
jobType,
indexName,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}
Expand All @@ -70,13 +112,19 @@ public AsyncQueryJobMetadata(
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName,
long seqNo,
long primaryTerm) {
this.queryId = queryId;
this.applicationId = applicationId;
this.jobId = jobId;
this.resultIndex = resultIndex;
this.sessionId = sessionId;
this.datasourceName = datasourceName;
this.jobType = jobType;
this.indexName = indexName;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}
Expand All @@ -102,6 +150,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field("applicationId", applicationId)
.field("resultIndex", resultIndex)
.field("sessionId", sessionId)
.field(DATASOURCE_NAME, datasourceName)
.field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT))
.field(INDEX_NAME, indexName)
.endObject();
return builder;
}
Expand All @@ -115,6 +166,9 @@ public static AsyncQueryJobMetadata copy(
copy.getJobId(),
copy.getResultIndex(),
copy.getSessionId(),
copy.datasourceName,
copy.jobType,
copy.indexName,
seqNo,
primaryTerm);
}
Expand All @@ -132,9 +186,11 @@ public static AsyncQueryJobMetadata fromXContent(
AsyncQueryId queryId = null;
String jobId = null;
String applicationId = null;
boolean isDropIndexQuery = false;
String resultIndex = null;
String sessionId = null;
String datasourceName = null;
String jobTypeStr = null;
String indexName = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
Expand All @@ -149,15 +205,18 @@ public static AsyncQueryJobMetadata fromXContent(
case "applicationId":
applicationId = parser.textOrNull();
break;
case "isDropIndexQuery":
isDropIndexQuery = parser.booleanValue();
break;
case "resultIndex":
resultIndex = parser.textOrNull();
break;
case "sessionId":
sessionId = parser.textOrNull();
break;
case DATASOURCE_NAME:
datasourceName = parser.textOrNull();
case JOB_TYPE:
jobTypeStr = parser.textOrNull();
case INDEX_NAME:
indexName = parser.textOrNull();
case "type":
break;
default:
Expand All @@ -168,7 +227,16 @@ public static AsyncQueryJobMetadata fromXContent(
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(
queryId, applicationId, jobId, resultIndex, sessionId, seqNo, primaryTerm);
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr),
indexName,
seqNo,
primaryTerm);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Refresh Query. */
public class RefreshQueryHandler extends BatchQueryHandler {

private final FlintIndexMetadataReader flintIndexMetadataReader;
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public RefreshQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataReader flintIndexMetadataReader,
StateStore stateStore,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.flintIndexMetadataReader = flintIndexMetadataReader;
this.stateStore = stateStore;
this.emrServerlessClient = emrServerlessClient;
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String datasourceName = asyncQueryJobMetadata.getDatasourceName();
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
jobCancelOp.apply(indexMetadata);
return asyncQueryJobMetadata.getQueryId().getId();
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
return new DispatchQueryResponse(
resp.getQueryId(),
resp.getJobId(),
resp.getResultIndex(),
resp.getSessionId(),
dataSourceMetadata.getName(),
JobType.BATCH,
context.getIndexQueryDetails().openSearchIndexName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
Expand Down Expand Up @@ -90,7 +91,12 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
} else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) {
// manual refresh should be handled by batch handler
asyncQueryHandler =
new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager);
new RefreshQueryHandler(
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataReader,
stateStore,
leaseManager);
}
}
return asyncQueryHandler.submit(dispatchQueryRequest, contextBuilder.build());
Expand All @@ -117,6 +123,17 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager);
} else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) {
queryHandler = createIndexDMLHandler(emrServerlessClient);
} else if (asyncQueryJobMetadata.getJobType() == JobType.BATCH) {
queryHandler =
new RefreshQueryHandler(
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataReader,
stateStore,
leaseManager);
} else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) {
queryHandler =
new StreamingQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager);
} else {
queryHandler =
new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
Expand All @@ -37,6 +38,13 @@ public StreamingQueryHandler(
this.emrServerlessClient = emrServerlessClient;
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
throw new IllegalArgumentException(
"can't cancel index DML query, using ALTER auto_refresh=off statement to stop job, using"
+ " VACUUM statement to stop job and delete data");
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
Expand Down Expand Up @@ -77,6 +85,9 @@ public DispatchQueryResponse submit(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
jobId,
dataSourceMetadata.getResultIndex(),
null);
null,
dataSourceMetadata.getName(),
JobType.STREAMING,
indexQueryDetails.openSearchIndexName());
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,37 @@
package org.opensearch.sql.spark.dispatcher.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;

@Data
@AllArgsConstructor
@Getter
public class DispatchQueryResponse {
private AsyncQueryId queryId;
private String jobId;
private String resultIndex;
private String sessionId;
private final AsyncQueryId queryId;
private final String jobId;
private final String resultIndex;
private final String sessionId;
private final String datasourceName;
private final JobType jobType;
private final String indexName;

public DispatchQueryResponse(
AsyncQueryId queryId, String jobId, String resultIndex, String sessionId) {
this(queryId, jobId, resultIndex, sessionId, null, JobType.INTERACTIVE, null);
}

public DispatchQueryResponse(
AsyncQueryId queryId,
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName) {
this.queryId = queryId;
this.jobId = jobId;
this.resultIndex = resultIndex;
this.sessionId = sessionId;
this.datasourceName = datasourceName;
this.jobType = jobType;
this.indexName = indexName;
}
}
Loading

0 comments on commit a84c3ef

Please sign in to comment.