Skip to content

Commit

Permalink
[Fix](TransientTask)Export tasks should only be run on the master node (
Browse files Browse the repository at this point in the history
#32700)

* [Fix](TransientTask)Export tasks should only be run on the master node
Add thread name

Export Task runs only on the master node, so it is necessary to explicitly start the corresponding resources. At the same time, refactor some code to avoid circular dependencies.

* TransientTaskManager is initialized twice. Therefore, the second initialization needs to be deleted.
  • Loading branch information
CalvinKirs authored Mar 24, 2024
1 parent 44658e0 commit 91be50b
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 33 deletions.
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,6 @@ public Env(boolean isCheckpointCatalog) {
this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
this.transientTaskManager = new TransientTaskManager();

this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
Expand Down Expand Up @@ -1631,6 +1630,8 @@ protected void startMasterOnlyDaemonThreads() {
// Start txn cleaner
txnCleaner.start();
jobManager.start();
// transient task manager
transientTaskManager.start();
// Alter
getAlterInstance().start();
// Consistency checker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public class InternalSchemaInitializer extends Thread {

private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class);

public InternalSchemaInitializer() {
super("InternalSchemaInitializer");
}

public void run() {
if (!FeConstants.enableInternalSchemaDb) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private enum ReportType {
}

public ReportHandler() {
super("report-thread");
GaugeMetric<Long> gauge = new GaugeMetric<Long>(
"report_queue_size", MetricUnit.NOUNIT, "report queue size") {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,14 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {

public void startUpdateThread() {
WorkloadGroupMgr wgMgr = this;
updatePropThread = new Thread(new Runnable() {
public void run() {
while (true) {
try {
wgMgr.resetQueryQueueProp();
Thread.sleep(Config.query_queue_update_interval_ms);
} catch (Throwable e) {
LOG.warn("reset query queue failed ", e);
}
updatePropThread = new Thread(() -> {
Thread.currentThread().setName("reset-query-queue-prop");
while (true) {
try {
wgMgr.resetQueryQueueProp();
Thread.sleep(Config.query_queue_update_interval_ms);
} catch (Throwable e) {
LOG.warn("reset query queue failed ", e);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
package org.apache.doris.scheduler.disruptor;

import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.manager.TransientTaskManager;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;

import java.io.Closeable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -43,11 +41,10 @@
*
* <p>The work handler also handles system events by scheduling batch scheduler tasks.
*/
@Slf4j
@Log4j2
public class TaskDisruptor implements Closeable {

private Disruptor<TaskEvent> disruptor;
private TransientTaskManager transientTaskManager;
private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size;

private static final int consumerThreadCount = Config.async_task_consumer_thread_num;
Expand All @@ -74,17 +71,13 @@ public class TaskDisruptor implements Closeable {
event.setTaskType(taskType);
};

public TaskDisruptor(TransientTaskManager transientTaskManager) {
this.transientTaskManager = transientTaskManager;
}

public void start() {
ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
ProducerType.MULTI, new BlockingWaitStrategy());
CustomThreadFactory exportTaskThreadFactory = new CustomThreadFactory("export-task-consumer");
disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());
WorkHandler<TaskEvent>[] workers = new TaskHandler[consumerThreadCount];
for (int i = 0; i < consumerThreadCount; i++) {
workers[i] = new TaskHandler(transientTaskManager);
workers[i] = new TaskHandler();
}
disruptor.handleEventsWithWorkerPool(workers);
disruptor.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.doris.scheduler.disruptor;

import org.apache.doris.catalog.Env;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.scheduler.manager.TransientTaskManager;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;

/**
* This class represents a work handler for processing event tasks consumed by a Disruptor.
Expand All @@ -31,16 +32,10 @@
* If the event job execution fails, the work handler logs an error message and pauses the event job.
* The work handler also handles system events by scheduling batch scheduler tasks.
*/
@Slf4j
@Log4j2
public class TaskHandler implements WorkHandler<TaskEvent> {


private TransientTaskManager transientTaskManager;

public TaskHandler(TransientTaskManager transientTaskManager) {
this.transientTaskManager = transientTaskManager;
}

/**
* Processes an event task by retrieving the associated event job and executing it if it is running.
* If the event job is not running, it logs an error message.
Expand All @@ -62,6 +57,7 @@ public void onEvent(TaskEvent event) {

public void onTransientTaskHandle(TaskEvent taskEvent) {
Long taskId = taskEvent.getId();
TransientTaskManager transientTaskManager = Env.getCurrentEnv().getTransientTaskManager();
TransientTaskExecutor taskExecutor = transientTaskManager.getMemoryTaskExecutor(taskId);
if (taskExecutor == null) {
log.info("Memory task executor is null, task id: {}", taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public class TransientTaskManager {
private TaskDisruptor disruptor;

public TransientTaskManager() {
disruptor = new TaskDisruptor(this);
disruptor = new TaskDisruptor();
}

public void start() {
disruptor.start();
}

Expand Down

0 comments on commit 91be50b

Please sign in to comment.