Skip to content

Commit

Permalink
add interface of cancel load and export commands
Browse files Browse the repository at this point in the history
  • Loading branch information
LiBinfeng-01 committed Nov 27, 2024
1 parent e6e01ac commit 0a7d7fa
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -522,4 +524,91 @@ private static void addNeedCancelLoadJob(String label, String state,
// job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
// }
// }

/**
* used for nereids planner
*/
public void cancelLoadJob(String dbName, String label, String state, BinaryOperator operator)
throws JobException, AnalysisException, DdlException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<InsertJob> unfinishedLoadJob;
readLock();
try {
List<InsertJob> loadJobs = Env.getCurrentEnv().getLabelProcessor().getJobs(db);
List<InsertJob> matchLoadJobs = Lists.newArrayList();
addNeedCancelLoadJob(label, state, operator, loadJobs, matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
throw new JobException("Load job does not exist");
}
// check state here
unfinishedLoadJob =
matchLoadJobs.stream().filter(InsertJob::isRunning)
.collect(Collectors.toList());
if (unfinishedLoadJob.isEmpty()) {
throw new JobException("There is no uncompleted job");
}
} finally {
readUnlock();
}
// check auth
if (unfinishedLoadJob.size() > 1 || unfinishedLoadJob.get(0).getTableNames().isEmpty()) {
if (Env.getCurrentEnv().getAccessManager()
.checkDbPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName);
}
} else {
for (String tableName : unfinishedLoadJob.get(0).getTableNames()) {
if (Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ":" + tableName);
}
}
}
for (InsertJob loadJob : unfinishedLoadJob) {
try {
alterJobStatus(loadJob.getJobId(), JobStatus.STOPPED);
} catch (JobException e) {
log.warn("Fail to cancel job, its label: {}", loadJob.getLabelName());
}
}
}

private static void addNeedCancelLoadJob(String label, String state,
BinaryOperator operator, List<InsertJob> loadJobs,
List<InsertJob> matchLoadJobs)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(
loadJobs.stream()
.filter(job -> !job.isCancelled())
.filter(job -> {
if (operator != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state);
return operator instanceof And ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
return label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
return job.getJobStatus().name().equalsIgnoreCase(state);
}
return false;
}).collect(Collectors.toList())
);
}
}
74 changes: 74 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.nereids.trees.expressions.Or;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.exception.JobException;

Expand Down Expand Up @@ -160,6 +162,78 @@ public void cancelExportJob(CancelExportStmt stmt) throws DdlException, Analysis
}
}

private List<ExportJob> getWaitingCancelJobs(String label, String state, BinaryOperator operator)
throws AnalysisException {
Predicate<ExportJob> jobFilter = buildCancelJobFilter(label, state, operator);
readLock();
try {
return getJobs().stream().filter(jobFilter).collect(Collectors.toList());
} finally {
readUnlock();
}
}

@VisibleForTesting
public static Predicate<ExportJob> buildCancelJobFilter(String label, String state, BinaryOperator operator)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());

return job -> {
boolean labelFilter = true;
boolean stateFilter = true;
if (StringUtils.isNotEmpty(label)) {
labelFilter = label.contains("%") ? matcher.match(job.getLabel()) :
job.getLabel().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
stateFilter = job.getState().name().equalsIgnoreCase(state);
}

if (operator != null && operator instanceof Or) {
return labelFilter || stateFilter;
}

return labelFilter && stateFilter;
};
}

/**
* used for Nereids planner
*/
public void cancelExportJob(String label, String state, BinaryOperator operator, String dbName)
throws DdlException, AnalysisException {
// List of export jobs waiting to be cancelled
List<ExportJob> matchExportJobs = getWaitingCancelJobs(label, state, operator);
if (matchExportJobs.isEmpty()) {
throw new DdlException("Export job(s) do not exist");
}
matchExportJobs = matchExportJobs.stream()
.filter(job -> !job.isFinalState()).collect(Collectors.toList());
if (matchExportJobs.isEmpty()) {
throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)");
}

// check auth
checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME, dbName, matchExportJobs);
// Must add lock to protect export job.
// Because job may be cancelled when generating task executors,
// the cancel process may clear the task executor list at same time,
// which will cause ConcurrentModificationException
writeLock();
try {
for (ExportJob exportJob : matchExportJobs) {
// exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
exportJob.updateExportJobState(ExportJobState.CANCELLED, 0L, null,
ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
}
} catch (JobException e) {
throw new AnalysisException(e.getMessage());
} finally {
writeUnlock();
}
}

public void checkCancelExportJobAuth(String ctlName, String dbName, List<ExportJob> jobs) throws AnalysisException {
if (jobs.size() > 1) {
if (!Env.getCurrentEnv().getAccessManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
Expand Down Expand Up @@ -246,23 +248,22 @@ public void recordFinishedLoadJob(String label, long transactionId, String dbNam
* Match need cancel loadJob by stmt.
**/
@VisibleForTesting
public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
public static void addNeedCancelLoadJob(String label, String state, BinaryOperator operator,
List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
throws AnalysisException {
String label = stmt.getLabel();
String state = stmt.getState();
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(
loadJobs.stream()
.filter(job -> job.getState() != JobState.CANCELLED)
.filter(job -> {
if (stmt.getOperator() != null) {
if (operator != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabel())
: job.getLabel().equalsIgnoreCase(label);
boolean stateFilter = job.getState().name().equalsIgnoreCase(state);
return Operator.AND.equals(stmt.getOperator()) ? labelFilter && stateFilter :
return operator instanceof And ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
Expand All @@ -280,8 +281,9 @@ public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJ
/**
* Cancel load job by stmt.
**/
public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
public void cancelLoadJob(String dbName, String label, String state, BinaryOperator operator)
throws DdlException, AnalysisException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<LoadJob> unfinishedLoadJob;
readLock();
Expand All @@ -291,7 +293,7 @@ public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisExce
throw new DdlException("Load job does not exist");
}
List<LoadJob> matchLoadJobs = Lists.newArrayList();
addNeedCancelLoadJob(stmt,
addNeedCancelLoadJob(label, state, operator,
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
Expand All @@ -318,6 +320,82 @@ public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisExce
}
}

/**
* Match need cancel loadJob by stmt.
**/
@VisibleForTesting
public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
throws AnalysisException {
String label = stmt.getLabel();
String state = stmt.getState();
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(
loadJobs.stream()
.filter(job -> job.getState() != JobState.CANCELLED)
.filter(job -> {
if (stmt.getOperator() != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabel())
: job.getLabel().equalsIgnoreCase(label);
boolean stateFilter = job.getState().name().equalsIgnoreCase(state);
return Operator.AND.equals(stmt.getOperator()) ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
return label.contains("%") ? matcher.match(job.getLabel())
: job.getLabel().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
return job.getState().name().equalsIgnoreCase(state);
}
return false;
}).collect(Collectors.toList())
);
}

/**
* Cancel load job by stmt.
**/
public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
// List of load jobs waiting to be cancelled
List<LoadJob> unfinishedLoadJob;
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}
List<LoadJob> matchLoadJobs = Lists.newArrayList();
addNeedCancelLoadJob(stmt,
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
throw new DdlException("Load job does not exist");
}
// check state here
unfinishedLoadJob =
matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()).collect(Collectors.toList());
if (unfinishedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job");
}
} finally {
readUnlock();
}
for (LoadJob loadJob : unfinishedLoadJob) {
try {
loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
} catch (DdlException e) {
throw new DdlException(
"Cancel load job [" + loadJob.getId() + "] fail, " + "label=[" + loadJob.getLabel()
+
"] failed msg=" + e.getMessage());
}
}
}

/**
* Replay end load job.
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.CancelExportStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.nereids.trees.expressions.CompoundPredicate;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand All @@ -48,8 +46,6 @@ public class CancelExportCommand extends CancelCommand implements ForwardWithSyn

private Expression whereClause;

private Expr legacyWhereClause;

public CancelExportCommand(String dbName, Expression whereClause) {
super(PlanType.CANCEL_EXPORT_COMMAND);
this.dbName = dbName;
Expand All @@ -59,14 +55,7 @@ public CancelExportCommand(String dbName, Expression whereClause) {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate(ctx);
CancelExportStmt cancelStmt = null;
if (whereClause instanceof CompoundPredicate) {
cancelStmt = new CancelExportStmt(dbName, legacyWhereClause, label,
((org.apache.doris.analysis.CompoundPredicate) legacyWhereClause).getOp(), state);
} else {
cancelStmt = new CancelExportStmt(dbName, legacyWhereClause, label, null, state);
}
ctx.getEnv().getExportMgr().cancelExportJob(cancelStmt);
ctx.getEnv().getExportMgr().cancelExportJob(label, state, (BinaryOperator) whereClause, dbName);
}

private void validate(ConnectContext ctx) throws UserException {
Expand All @@ -91,8 +80,6 @@ private void validate(ConnectContext ctx) throws UserException {
throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state);
}
}

legacyWhereClause = translateToLegacyExpr(ctx, whereClause);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* base class for all drop commands
*/
public class CancelJobTaskCommand extends Command implements ForwardWithSync {
public class CancelJobTaskCommand extends CancelCommand implements ForwardWithSync {
private static final String jobNameKey = "jobName";

private static final String taskIdKey = "taskId";
Expand Down
Loading

0 comments on commit 0a7d7fa

Please sign in to comment.