Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict the scope of cancel API #2548

Merged
merged 9 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ Async Query Cancellation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/delete``.

Limitation: Flint index creation statement with auto_refresh = true can not be cancelled. User could submit ALTER statement to stop auto refresh query.
- flint index creation statement include, CREATE SKIPPING INDEX / CREATE INDEX / CREATE MATERIALIZED VIEW

HTTP URI: ``_plugins/_async_query/{queryId}``

HTTP VERB: ``DELETE``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public CreateAsyncQueryResponse createAsyncQuery(
sparkExecutionEngineConfig.getApplicationId(),
dispatchQueryResponse.getJobId(),
dispatchQueryResponse.getResultIndex(),
dispatchQueryResponse.getSessionId()));
dispatchQueryResponse.getSessionId(),
dispatchQueryResponse.getDatasourceName(),
dispatchQueryResponse.getJobType(),
dispatchQueryResponse.getIndexName()));
return new CreateAsyncQueryResponse(
dispatchQueryResponse.getQueryId().getId(), dispatchQueryResponse.getSessionId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,44 @@
package org.opensearch.sql.spark.asyncquery.model;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.sql.spark.execution.session.SessionModel.DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.statement.StatementModel.QUERY_ID;

import com.google.gson.Gson;
import java.io.IOException;
import java.util.Locale;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.SneakyThrows;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateModel;

/** This class models all the metadata required for a job. */
@Data
@EqualsAndHashCode(callSuper = false)
public class AsyncQueryJobMetadata extends StateModel {
public static final String TYPE_JOBMETA = "jobmeta";
public static final String JOB_TYPE = "jobType";
public static final String INDEX_NAME = "indexName";

private final AsyncQueryId queryId;
private final String applicationId;
private final String jobId;
private final String resultIndex;
// optional sessionId.
private final String sessionId;
// since 2.13
// jobType could be null before OpenSearch 2.12. SparkQueryDispatcher use jobType to choose
// cancel query handler. if jobType is null, it will invoke BatchQueryHandler.cancel().
private final JobType jobType;
// null if JobType is null
private final String datasourceName;
// null if JobType is INTERACTIVE or null
private final String indexName;

@EqualsAndHashCode.Exclude private final long seqNo;
@EqualsAndHashCode.Exclude private final long primaryTerm;
Expand All @@ -44,6 +58,9 @@ public AsyncQueryJobMetadata(
jobId,
resultIndex,
null,
null,
JobType.INTERACTIVE,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}
Expand All @@ -60,6 +77,31 @@ public AsyncQueryJobMetadata(
jobId,
resultIndex,
sessionId,
null,
JobType.INTERACTIVE,
null,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public AsyncQueryJobMetadata(
AsyncQueryId queryId,
String applicationId,
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName) {
this(
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
jobType,
indexName,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}
Expand All @@ -70,13 +112,19 @@ public AsyncQueryJobMetadata(
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName,
long seqNo,
long primaryTerm) {
this.queryId = queryId;
this.applicationId = applicationId;
this.jobId = jobId;
this.resultIndex = resultIndex;
this.sessionId = sessionId;
this.datasourceName = datasourceName;
this.jobType = jobType;
this.indexName = indexName;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}
Expand All @@ -102,6 +150,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field("applicationId", applicationId)
.field("resultIndex", resultIndex)
.field("sessionId", sessionId)
.field(DATASOURCE_NAME, datasourceName)
.field(JOB_TYPE, jobType.getText().toLowerCase(Locale.ROOT))
.field(INDEX_NAME, indexName)
.endObject();
return builder;
}
Expand All @@ -115,6 +166,9 @@ public static AsyncQueryJobMetadata copy(
copy.getJobId(),
copy.getResultIndex(),
copy.getSessionId(),
copy.datasourceName,
copy.jobType,
copy.indexName,
seqNo,
primaryTerm);
}
Expand All @@ -132,9 +186,11 @@ public static AsyncQueryJobMetadata fromXContent(
AsyncQueryId queryId = null;
String jobId = null;
String applicationId = null;
boolean isDropIndexQuery = false;
String resultIndex = null;
String sessionId = null;
String datasourceName = null;
String jobTypeStr = null;
String indexName = null;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
Expand All @@ -149,15 +205,18 @@ public static AsyncQueryJobMetadata fromXContent(
case "applicationId":
applicationId = parser.textOrNull();
break;
case "isDropIndexQuery":
isDropIndexQuery = parser.booleanValue();
break;
case "resultIndex":
resultIndex = parser.textOrNull();
break;
case "sessionId":
sessionId = parser.textOrNull();
break;
case DATASOURCE_NAME:
datasourceName = parser.textOrNull();
case JOB_TYPE:
jobTypeStr = parser.textOrNull();
case INDEX_NAME:
indexName = parser.textOrNull();
case "type":
break;
default:
Expand All @@ -168,7 +227,16 @@ public static AsyncQueryJobMetadata fromXContent(
throw new IllegalArgumentException("jobId and applicationId are required fields.");
}
return new AsyncQueryJobMetadata(
queryId, applicationId, jobId, resultIndex, sessionId, seqNo, primaryTerm);
queryId,
applicationId,
jobId,
resultIndex,
sessionId,
datasourceName,
Strings.isNullOrEmpty(jobTypeStr) ? null : JobType.fromString(jobTypeStr),
indexName,
seqNo,
primaryTerm);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.dispatcher;

import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Refresh Query. */
public class RefreshQueryHandler extends BatchQueryHandler {

private final FlintIndexMetadataReader flintIndexMetadataReader;
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public RefreshQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataReader flintIndexMetadataReader,
StateStore stateStore,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.flintIndexMetadataReader = flintIndexMetadataReader;
this.stateStore = stateStore;
this.emrServerlessClient = emrServerlessClient;
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String datasourceName = asyncQueryJobMetadata.getDatasourceName();
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
jobCancelOp.apply(indexMetadata);
return asyncQueryJobMetadata.getQueryId().getId();
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
return new DispatchQueryResponse(
resp.getQueryId(),
resp.getJobId(),
resp.getResultIndex(),
resp.getSessionId(),
dataSourceMetadata.getName(),
JobType.BATCH,
context.getIndexQueryDetails().openSearchIndexName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
Expand Down Expand Up @@ -90,7 +91,12 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
} else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) {
// manual refresh should be handled by batch handler
asyncQueryHandler =
new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager);
new RefreshQueryHandler(
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataReader,
stateStore,
leaseManager);
}
}
return asyncQueryHandler.submit(dispatchQueryRequest, contextBuilder.build());
Expand All @@ -117,6 +123,17 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager);
} else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) {
queryHandler = createIndexDMLHandler(emrServerlessClient);
} else if (asyncQueryJobMetadata.getJobType() == JobType.BATCH) {
queryHandler =
new RefreshQueryHandler(
vmmusings marked this conversation as resolved.
Show resolved Hide resolved
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataReader,
stateStore,
leaseManager);
} else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) {
queryHandler =
new StreamingQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager);
} else {
queryHandler =
new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
Expand All @@ -37,6 +38,13 @@ public StreamingQueryHandler(
this.emrServerlessClient = emrServerlessClient;
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
throw new IllegalArgumentException(
"can't cancel index DML query, using ALTER auto_refresh=off statement to stop job, using"
+ " VACUUM statement to stop job and delete data");
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
Expand Down Expand Up @@ -77,6 +85,9 @@ public DispatchQueryResponse submit(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
jobId,
dataSourceMetadata.getResultIndex(),
null);
null,
dataSourceMetadata.getName(),
JobType.STREAMING,
indexQueryDetails.openSearchIndexName());
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,37 @@
package org.opensearch.sql.spark.dispatcher.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;

@Data
@AllArgsConstructor
@Getter
public class DispatchQueryResponse {
private AsyncQueryId queryId;
private String jobId;
private String resultIndex;
private String sessionId;
private final AsyncQueryId queryId;
private final String jobId;
private final String resultIndex;
private final String sessionId;
private final String datasourceName;
private final JobType jobType;
private final String indexName;

public DispatchQueryResponse(
AsyncQueryId queryId, String jobId, String resultIndex, String sessionId) {
this(queryId, jobId, resultIndex, sessionId, null, JobType.INTERACTIVE, null);
}

public DispatchQueryResponse(
AsyncQueryId queryId,
String jobId,
String resultIndex,
String sessionId,
String datasourceName,
JobType jobType,
String indexName) {
this.queryId = queryId;
this.jobId = jobId;
this.resultIndex = resultIndex;
this.sessionId = sessionId;
this.datasourceName = datasourceName;
this.jobType = jobType;
this.indexName = indexName;
}
}
Loading
Loading