Skip to content

Commit

Permalink
[fix](export) remove export task executor in TransientTaskExecutor (a…
Browse files Browse the repository at this point in the history
…pache#42880)

### What problem does this PR solve?

Problem Summary:

There is a memory leak on FE side when continue running `export`
command.
The `ExportTaskExecutor` instances will be added to
`TransientTaskManager` and never be removed.

This PR mainly changes:
1. Remove unused `ExportTaskRegister` in `Env`.
2. Remove `ExportTaskExecutor` from `TransientTaskManager` once export
job is finished or cancelled.

### Release note

Fix FE memory leak by removing `ExportTaskExecutor` from
`TransientTaskManager` once export job is finished or cancelled.
  • Loading branch information
morningman committed Oct 31, 2024
1 parent 298ddb5 commit 3c2d4bf
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 59 deletions.
8 changes: 0 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
Expand Down Expand Up @@ -395,7 +394,6 @@ public class Env {
private ExternalMetaIdMgr externalMetaIdMgr;
private MetastoreEventsProcessor metastoreEventsProcessor;

private ExportTaskRegister exportTaskRegister;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private LabelProcessor labelProcessor;
private TransientTaskManager transientTaskManager;
Expand Down Expand Up @@ -709,7 +707,6 @@ public Env(boolean isCheckpointCatalog) {
this.jobManager = new JobManager<>();
this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);

this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
Expand Down Expand Up @@ -4425,11 +4422,6 @@ public SyncJobManager getSyncJobManager() {
return this.syncJobManager;
}


public ExportTaskRegister getExportTaskRegister() {
return exportTaskRegister;
}

public JobManager getJobManager() {
return jobManager;
}
Expand Down
14 changes: 7 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Data
Expand Down Expand Up @@ -207,9 +206,7 @@ public class ExportJob implements Writable {
// backend_address => snapshot path
private List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList();

private List<ExportTaskExecutor> jobExecutorList;

private ConcurrentHashMap<Long, TransientTaskExecutor> taskIdToExecutor = new ConcurrentHashMap<>();
private List<ExportTaskExecutor> jobExecutorList = Lists.newArrayList();

private Integer finishedTaskCount = 0;
private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
Expand Down Expand Up @@ -690,11 +687,11 @@ private void cancelExportTask(ExportFailMsg.CancelType type, String msg) throws
}

// we need cancel all task
taskIdToExecutor.keySet().forEach(id -> {
jobExecutorList.forEach(executor -> {
try {
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(id);
Env.getCurrentEnv().getTransientTaskManager().cancelMemoryTask(executor.getId());
} catch (JobException e) {
LOG.warn("cancel export task {} exception: {}", id, e);
LOG.warn("cancel export task {} exception: {}", executor.getId(), e);
}
});

Expand All @@ -705,6 +702,7 @@ private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, String ms
setExportJobState(ExportJobState.CANCELLED);
finishTimeMs = System.currentTimeMillis();
failMsg = new ExportFailMsg(type, msg);
jobExecutorList.clear();
if (FeConstants.runningUnitTest) {
return;
}
Expand Down Expand Up @@ -749,6 +747,8 @@ private void finishExportJobUnprotected() {
setExportJobState(ExportJobState.FINISHED);
finishTimeMs = System.currentTimeMillis();
outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
// Clear the jobExecutorList to release memory.
jobExecutorList.clear();
Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.FINISHED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
job.getBrokerDesc());
}
job.getTaskExecutors().forEach(executor -> {
Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
job.getTaskIdToExecutor().put(taskId, executor);
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
});
Env.getCurrentEnv().getEditLog().logExportCreate(job);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public void onTransientTaskHandle(TaskEvent taskEvent) {
taskExecutor.execute();
} catch (JobException e) {
log.warn("Memory task execute failed, taskId: {}, msg : {}", taskId, e.getMessage());
} finally {
transientTaskManager.removeMemoryTask(taskId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.doris.scheduler.executor.TransientTaskExecutor;

import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.ConcurrentHashMap;

public class TransientTaskManager {
private static final Logger LOG = LogManager.getLogger(TransientTaskManager.class);
/**
* key: taskId
* value: memory task executor of this task
Expand Down Expand Up @@ -57,10 +60,20 @@ public Long addMemoryTask(TransientTaskExecutor executor) {
Long taskId = executor.getId();
taskExecutorMap.put(taskId, executor);
disruptor.tryPublishTask(taskId);
LOG.info("add memory task, taskId: {}", taskId);
return taskId;
}

public void cancelMemoryTask(Long taskId) throws JobException {
taskExecutorMap.get(taskId).cancel();
try {
taskExecutorMap.get(taskId).cancel();
} finally {
removeMemoryTask(taskId);
}
}

public void removeMemoryTask(Long taskId) {
taskExecutorMap.remove(taskId);
LOG.info("remove memory task, taskId: {}", taskId);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ public void testExportMgrCancelJob() throws UserException {
exportMgr.unprotectAddJob(job3);
exportMgr.unprotectAddJob(job4);


// cancel export job where state = "PENDING"
Assert.assertTrue(job1.getState() == ExportJobState.PENDING);
SlotRef stateSlotRef = new SlotRef(null, "state");
Expand Down

0 comments on commit 3c2d4bf

Please sign in to comment.