Skip to content

Commit

Permalink
[Feature](executor)broker load support workload group (apache#30866)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo authored Feb 21, 2024
1 parent 04d27ad commit 058489e
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
task.init(loadId, attachment.getFileStatusByTable(aggKey),
attachment.getFileNumByTable(aggKey), getUserInfo());
task.settWorkloadGroups(tWorkloadGroups);
return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.BeginTransactionException;
Expand Down Expand Up @@ -135,7 +136,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements

protected String comment = "";


protected List<TPipelineWorkloadGroup> tWorkloadGroups = null;

public LoadJob(EtlJobType jobType) {
this.jobType = jobType;
Expand Down Expand Up @@ -1167,4 +1168,8 @@ public EtlStatus getLoadingStatus() {
public LoadStatistic getLoadStatistic() {
return loadStatistic;
}

public void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.ErrorTabletInfo;
Expand Down Expand Up @@ -79,6 +80,8 @@ public class LoadLoadingTask extends LoadTask {
private Profile jobProfile;
private long beginTime;

private List<TPipelineWorkloadGroup> tWorkloadGroups = null;

public LoadLoadingTask(Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate,
Expand Down Expand Up @@ -164,6 +167,10 @@ private void executeOnce() throws Exception {
int timeoutS = Math.max((int) (leftTimeMs / 1000), 1);
curCoordinator.setTimeout(timeoutS);

if (tWorkloadGroups != null) {
curCoordinator.setTWorkloadGroups(tWorkloadGroups);
}

try {
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
actualExecute(curCoordinator, timeoutS);
Expand Down Expand Up @@ -221,4 +228,9 @@ public void updateRetryInfo() {
this.loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
planner.updateLoadId(this.loadId);
}

void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void start() {
/**
* This method will be invoked by the broker load(v2) now.
*/
public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, UserException {
Database database = checkDb(stmt.getLabel().getDbName());
long dbId = database.getId();
LoadJob loadJob;
Expand Down Expand Up @@ -144,6 +144,12 @@ public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
} finally {
writeUnlock();
}

if (Config.enable_workload_group) {
loadJob.settWorkloadGroups(
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get()));
}

Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);

// The job must be submitted after edit log.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void unload(String fullDbName, String label, String subLabel) throws DdlE

// 'db' and 'label' form a multiLabel used to
// user can pass commitLabel which use this string commit to jobmgr
public void commit(String fullDbName, String label) throws DdlException {
public void commit(String fullDbName, String label) throws DdlException, UserException {
LabelName multiLabel = new LabelName(fullDbName, label);
List<Long> jobIds = Lists.newArrayList();
lock.writeLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

suite("test_s3_load", "load_p0") {

sql "create workload group if not exists broker_load_test properties ( 'cpu_share'='1024'); "

sql "set workload_group=broker_load_test;"

def tables = [
"agg_tbl_basic",
"dup_tbl_array",
Expand Down

0 comments on commit 058489e

Please sign in to comment.