Skip to content

Commit

Permalink
Combine pendingTask and loadingTask into loadTask
Browse files Browse the repository at this point in the history
1. The load task callback include two methods: onTaskFinished, onTaskFailed
  • Loading branch information
EmmyMiao87 committed Apr 30, 2019
1 parent 8176d86 commit 2cb16be
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 424 deletions.
7 changes: 1 addition & 6 deletions fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ public class BrokerFileGroup implements Writable {
// input
private DataDescription dataDescription;

// Now we don't save this in image, only use this to parse DataDescription;
// if we have this require later, we save this here.
private BrokerDesc brokerDesc;

private long tableId;
private String valueSeparator;
private String lineDelimiter;
Expand All @@ -83,9 +79,8 @@ public BrokerFileGroup(BrokerTable table) throws AnalysisException {
this.filePathes = table.getPaths();
}

public BrokerFileGroup(DataDescription dataDescription, BrokerDesc desc) {
public BrokerFileGroup(DataDescription dataDescription) {
this.dataDescription = dataDescription;
this.brokerDesc = desc;
exprColumnMap = dataDescription.getParsedExprMap();
}

Expand Down
8 changes: 4 additions & 4 deletions fe/src/main/java/org/apache/doris/load/EtlStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Map<String, String> getCounters() {
return counters;
}

public void updateCounter(String key, String value) {
public void replaceCounter(String key, String value) {
counters.put(key, value);
}

Expand All @@ -102,9 +102,9 @@ public void addAllFileMap(Map<String, Long> fileMap) {
}

public void reset() {
this.stats = Maps.newHashMap();
this.counters = Maps.newHashMap();
this.fileMap = Maps.newHashMap();
this.stats.clear();
this.counters.clear();
this.fileMap.clear();
}

@Override
Expand Down
25 changes: 18 additions & 7 deletions fe/src/main/java/org/apache/doris/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType,
Map<Long, Map<Long, List<Source>>> tableToPartitionSources = Maps.newHashMap();
for (DataDescription dataDescription : dataDescriptions) {
// create source
createSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag());
checkAndCreateSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag());
job.addTableName(dataDescription.getTableName());
}
for (Entry<Long, Map<Long, List<Source>>> tableEntry : tableToPartitionSources.entrySet()) {
Expand All @@ -538,7 +538,7 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType,
if (etlJobType == EtlJobType.BROKER) {
PullLoadSourceInfo sourceInfo = new PullLoadSourceInfo();
for (DataDescription dataDescription : dataDescriptions) {
BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription, stmt.getBrokerDesc());
BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
fileGroup.parse(db);
sourceInfo.addFileGroup(fileGroup);
}
Expand Down Expand Up @@ -646,8 +646,10 @@ private LoadJob createLoadJob(LoadStmt stmt, EtlJobType etlJobType,
return job;
}

private void createSource(Database db, DataDescription dataDescription,
Map<Long, Map<Long, List<Source>>> tableToPartitionSources, boolean deleteFlag) throws DdlException {
public static void checkAndCreateSource(Database db, DataDescription dataDescription,
Map<Long, Map<Long, List<Source>>> tableToPartitionSources,
boolean deleteFlag)
throws DdlException {
Source source = new Source(dataDescription.getFilePathes());
long tableId = -1;
Set<Long> sourcePartitionIds = Sets.newHashSet();
Expand Down Expand Up @@ -846,7 +848,7 @@ public void unprotectAddLoadJob(LoadJob job, boolean isReplay) throws DdlExcepti
checkMini = false;
}

isLabelUsed(dbId, label, -1, checkMini);
unprotectIsLabelUsed(dbId, label, -1, checkMini);

// add job
Map<String, List<LoadJob>> labelToLoadJobs = null;
Expand Down Expand Up @@ -976,7 +978,7 @@ public boolean registerMiniLabel(String fullDbName, String label, long timestamp
long dbId = db.getId();
writeLock();
try {
if (isLabelUsed(dbId, label, timestamp, true)) {
if (unprotectIsLabelUsed(dbId, label, timestamp, true)) {
// label is used and this is a retry request.
// no need to do further operation, just return.
return false;
Expand Down Expand Up @@ -1020,6 +1022,15 @@ public void deregisterMiniLabel(String fullDbName, String label) throws DdlExcep
}
}

public boolean isLabelUsed(long dbId, String label) throws DdlException {
readLock();
try {
return unprotectIsLabelUsed(dbId, label, -1, false);
} finally {
readUnlock();
}
}

/*
* 1. if label is already used, and this is not a retry request,
* throw exception ("Label already used")
Expand All @@ -1028,7 +1039,7 @@ public void deregisterMiniLabel(String fullDbName, String label) throws DdlExcep
* 3. if label is not used, return false
* 4. throw exception if encounter error.
*/
private boolean isLabelUsed(long dbId, String label, long timestamp, boolean checkMini)
private boolean unprotectIsLabelUsed(long dbId, String label, long timestamp, boolean checkMini)
throws DdlException {
// check dbLabelToLoadJobs
if (dbLabelToLoadJobs.containsKey(dbId)) {
Expand Down
97 changes: 41 additions & 56 deletions fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.PullLoadSourceInfo;
import org.apache.doris.thrift.TEtlState;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -70,35 +69,48 @@ public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(),
stmt.getBrokerDesc());
brokerLoadJob.setJobProperties(stmt.getProperties());
brokerLoadJob.setLoadInfo(db, stmt.getDataDescriptions());
brokerLoadJob.checkDataSourceInfo(db, stmt.getDataDescriptions());
brokerLoadJob.setDataSourceInfo(db, stmt.getDataDescriptions());
return brokerLoadJob;
}

private void setDataSourceInfo(Database db, List<DataDescription> dataDescriptions) throws DdlException {
for (DataDescription dataDescription : dataDescriptions) {
BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription, brokerDesc);
BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
fileGroup.parse(db);
dataSourceInfo.addFileGroup(fileGroup);
}
LOG.info("Source info is {}", dataSourceInfo);
}

@Override
public void createPendingTask() {
loadPendingTask = new BrokerLoadPendingTask(this, dataSourceInfo.getIdToFileGroups(), brokerDesc);
public void executeScheduleJob() {
LoadTask task = new BrokerLoadPendingTask(this, dataSourceInfo.getIdToFileGroups(), brokerDesc);
tasks.add(task);
Catalog.getCurrentCatalog().getLoadManager().submitTask(task);
}

@Override
public void onTaskFinished(TaskAttachment attachment) {
if (attachment instanceof BrokerPendingTaskAttachment) {
onPendingTaskFinished((BrokerPendingTaskAttachment) attachment);
} else if (attachment instanceof BrokerLoadingTaskAttachment) {
onLoadingTaskFinished((BrokerLoadingTaskAttachment) attachment);
}
}

@Override
public void onTaskFailed(String errMsg) {
updateState(JobState.CANCELLED, FailMsg.CancelType.LOAD_RUN_FAIL, errMsg);
}

/**
* divide job into loading task and init the plan of task
* submit tasks into loadingTaskExecutor in LoadManager
*
* @param attachment
* step1: divide job into loading task
* step2: init the plan of task
* step3: submit tasks into loadingTaskExecutor
* @param attachment BrokerPendingTaskAttachment
*/
@Override
public void onPendingTaskFinished(LoadPendingTaskAttachment attachment) {
public void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) {
// TODO(ml): check if task has been cancelled
BrokerPendingTaskAttachment brokerPendingTaskAttachment = (BrokerPendingTaskAttachment) attachment;
Database db = null;
try {
getDb();
Expand All @@ -114,7 +126,6 @@ public void onPendingTaskFinished(LoadPendingTaskAttachment attachment) {
// divide job into broker loading task by table
db.readLock();
try {
// tableId -> BrokerFileGroups
for (Map.Entry<Long, List<BrokerFileGroup>> entry :
dataSourceInfo.getIdToFileGroups().entrySet()) {
long tableId = entry.getKey();
Expand All @@ -130,14 +141,15 @@ public void onPendingTaskFinished(LoadPendingTaskAttachment attachment) {
return;
}

// Generate pull load task, one
// Generate loading task and init the plan of task
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
entry.getValue(), getDeadlineMs(), execMemLimit,
transactionId, this);
task.init(brokerPendingTaskAttachment.getFileStatusByTable(tableId),
brokerPendingTaskAttachment.getFileNumByTable(tableId));
loadLoadingTaskList.add(task);
Catalog.getCurrentCatalog().getLoadManager().submitLoadingTask(task);
task.init(attachment.getFileStatusByTable(tableId),
attachment.getFileNumByTable(tableId));
// Add tasks into list and pool
tasks.add(task);
Catalog.getCurrentCatalog().getLoadManager().submitTask(task);
}
} catch (UserException e) {
updateState(JobState.CANCELLED, FailMsg.CancelType.ETL_RUN_FAIL, "failed to " + e.getMessage());
Expand All @@ -147,23 +159,16 @@ public void onPendingTaskFinished(LoadPendingTaskAttachment attachment) {
loadStartTimestamp = System.currentTimeMillis();
}

@Override
public void onPendingTaskFailed(String errMsg) {
// TODO(ml): check if task has been cancelled
updateState(JobState.CANCELLED, FailMsg.CancelType.ETL_RUN_FAIL, errMsg);
}

@Override
public void onLoadingTaskFinished(LoadLoadingTaskAttachment attachment) {
public void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
// TODO(ml): check if task has been cancelled
boolean commitTxn = false;
writeLock();
try {
// update loading status
unprotectedUpdateLoadingStatus((BrokerLoadingTaskAttachment) attachment);
updateLoadingStatus(attachment);

// begin commit txn when all of loading tasks have been finished
if (loadLoadingTaskList.size() == loadLoadingTaskList.stream()
if (tasks.size() == tasks.stream()
.filter(entity -> entity.isFinished()).count()) {
// check data quality
if (!checkDataQuality()) {
Expand Down Expand Up @@ -198,37 +203,17 @@ public void onLoadingTaskFinished(LoadLoadingTaskAttachment attachment) {
}
}

@Override
public void onLoadingTaskFailed(String errMsg) {
// TODO(ml): check if task has been cancelled
writeLock();
try {
// clean the loadingStatus
loadingStatus.reset();
loadingStatus.setState(TEtlState.CANCELLED);
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("error_msg", "Failed to execute load plan with error " + errMsg)
.build());
// cancel the job
unprotectedUpdateState(JobState.CANCELLED, FailMsg.CancelType.ETL_RUN_FAIL, errMsg);
return;
} finally {
writeUnlock();
}
}

private void unprotectedUpdateLoadingStatus(BrokerLoadingTaskAttachment attachment) {
loadingStatus.updateCounter(DPP_ABNORMAL_ALL,
increaseCounter(DPP_ABNORMAL_ALL, attachment.getCounter(DPP_ABNORMAL_ALL)));
loadingStatus.updateCounter(DPP_NORMAL_ALL,
increaseCounter(DPP_NORMAL_ALL, attachment.getCounter(DPP_NORMAL_ALL)));
private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) {
loadingStatus.replaceCounter(DPP_ABNORMAL_ALL,
increaseCounter(DPP_ABNORMAL_ALL, attachment.getCounter(DPP_ABNORMAL_ALL)));
loadingStatus.replaceCounter(DPP_NORMAL_ALL,
increaseCounter(DPP_NORMAL_ALL, attachment.getCounter(DPP_NORMAL_ALL)));
if (attachment.getTrackingUrl() != null) {
loadingStatus.setTrackingUrl(attachment.getTrackingUrl());
}
loadingStatus.addAllFileMap(attachment.getFileMap());
commitInfos.addAll(attachment.getCommitInfoList());
int finishedLoadingTaskNum = (int) loadLoadingTaskList.stream().filter(entity -> entity.isFinished()).count();
progress = finishedLoadingTaskNum / loadLoadingTaskList.size() * 100;
int finishedTaskNum = (int) tasks.stream().filter(entity -> entity.isFinished()).count();
progress = finishedTaskNum / tasks.size() * 100;
if (progress == 100) {
progress = 99;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.List;
import java.util.Map;

public class BrokerLoadPendingTask extends LoadPendingTask {
public class BrokerLoadPendingTask extends LoadTask {

private static final Logger LOG = LogManager.getLogger(BrokerLoadPendingTask.class);

Expand All @@ -47,9 +47,9 @@ public BrokerLoadPendingTask(LoadTaskCallback loadTaskCallback,
Map<Long, List<BrokerFileGroup>> tableToBrokerFileList,
BrokerDesc brokerDesc) {
super(loadTaskCallback);
this.attachment = new BrokerPendingTaskAttachment();
this.tableToBrokerFileList = tableToBrokerFileList;
this.brokerDesc = brokerDesc;
this.attachment = new BrokerPendingTaskAttachment();
}

@Override
Expand All @@ -72,7 +72,7 @@ private void getAllFileStatus()
fileStatusList.add(fileStatuses);
for (TBrokerFileStatus fstatus : fileStatuses) {
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, loadTaskCallback.getCallbackId())
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("file_status", fstatus)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@
import java.util.List;
import java.util.Map;

public class BrokerLoadingTaskAttachment implements LoadLoadingTaskAttachment {
public class BrokerLoadingTaskAttachment implements TaskAttachment{

private Map<String, Long> fileMap;
private Map<String, String> counters;
private String trackingUrl;
private List<TabletCommitInfo> commitInfoList;

public BrokerLoadingTaskAttachment(Map<String, Long> fileMap, Map<String, String> counters, String trackingUrl,
public BrokerLoadingTaskAttachment(Map<String, String> counters, String trackingUrl,
List<TabletCommitInfo> commitInfoList) {
this.fileMap = fileMap;
this.trackingUrl = trackingUrl;
this.counters = counters;
this.commitInfoList = commitInfoList;
Expand All @@ -48,10 +46,6 @@ public String getTrackingUrl() {
return trackingUrl;
}

public Map<String, Long> getFileMap() {
return fileMap;
}

public List<TabletCommitInfo> getCommitInfoList() {
return commitInfoList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.List;
import java.util.Map;

public class BrokerPendingTaskAttachment implements LoadPendingTaskAttachment {
public class BrokerPendingTaskAttachment implements TaskAttachment {

// table id -> file status
private Map<Long, List<List<TBrokerFileStatus>>> fileStatusMap = Maps.newHashMap();
Expand Down
Loading

0 comments on commit 2cb16be

Please sign in to comment.