From 38f90fd811351b90438a7b08d0f7fa67c5d13e6d Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Wed, 21 Nov 2018 15:13:56 +0800 Subject: [PATCH] Add distributor which scheduler task to be fairly Step1: updateBeIdTaskMaps, remove unalive be and add new alive be Step2: process timeout tasks, if a task already has been allocate to be but not finished before DEFAULT_TASK_TIMEOUT_MINUTES, it will be discarded. At the same time, the partitions belong to old task will be allocate to a new task. The new task with a signatrue will be add in the queue of needSchedulerRoutineLoadTask. Step3: process all of needSchedulerRoutineLoadTask, allocate task to be. The task will be executed by backend. --- .../org/apache/doris/catalog/Catalog.java | 10 +- ...neLoadProgress.java => KafkaProgress.java} | 19 +- .../load/routineload/KafkaRoutineLoadJob.java | 33 ++-- .../routineload/KafkaRoutineLoadTask.java | 30 +-- .../doris/load/routineload/KafkaTaskInfo.java | 48 +++++ .../load/routineload/RoutineLoadJob.java | 27 ++- ...utineLoad.java => RoutineLoadManager.java} | 179 ++++++++++++++++-- .../routineload/RoutineLoadScheduler.java | 28 +-- .../load/routineload/RoutineLoadTask.java | 16 +- .../load/routineload/RoutineLoadTaskInfo.java | 51 +++++ .../routineload/RoutineLoadTaskScheduler.java | 98 ++++++++++ .../routineload/KafkaRoutineLoadJobTest.java | 17 +- .../routineload/RoutineLoadManagerTest.java | 145 ++++++++++++++ .../routineload/RoutineLoadSchedulerTest.java | 20 +- .../RoutineLoadTaskSchedulerTest.java | 117 ++++++++++++ gensrc/thrift/Types.thrift | 3 +- 16 files changed, 748 insertions(+), 93 deletions(-) rename fe/src/main/java/org/apache/doris/load/routineload/{KafkaRoutineLoadProgress.java => KafkaProgress.java} (64%) create mode 100644 fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java rename fe/src/main/java/org/apache/doris/load/routineload/{RoutineLoad.java => RoutineLoadManager.java} (56%) create mode 100644 fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java create mode 100644 fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java create mode 100644 fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java create mode 100644 fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index e79aa1fa4e0a9f4..e253a0740ccd211 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -135,7 +135,7 @@ import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; import org.apache.doris.load.LoadJob.JobState; -import org.apache.doris.load.routineload.RoutineLoad; +import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; import org.apache.doris.metric.MetricRepo; @@ -241,7 +241,7 @@ public class Catalog { private ConcurrentHashMap nameToCluster; private Load load; - private RoutineLoad routineLoad; + private RoutineLoadManager routineLoadManager; private ExportMgr exportMgr; private Clone clone; private Alter alter; @@ -372,7 +372,7 @@ private Catalog() { this.idToDb = new ConcurrentHashMap<>(); this.fullNameToDb = new ConcurrentHashMap<>(); this.load = new Load(); - this.routineLoad = new RoutineLoad(); + this.routineLoadManager = new RoutineLoadManager(); this.exportMgr = new ExportMgr(); this.clone = new Clone(); this.alter = new Alter(); @@ -4250,8 +4250,8 @@ public Load getLoadInstance() { return this.load; } - public RoutineLoad getRoutineLoadInstance() { - return routineLoad; + public RoutineLoadManager getRoutineLoadInstance() { + return routineLoadManager; } public ExportMgr getExportMgr() { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java similarity index 64% rename from fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java rename to fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index ff46af630bea464..6f0f4243ca7a8ac 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -17,8 +17,21 @@ package org.apache.doris.load.routineload; -public class KafkaRoutineLoadProgress { +import java.util.Map; - private String partitionName; - private long offset; +/** + * this is description of kafka routine load progress + * the data before offset was already loaded in doris + */ +public class KafkaProgress { + + private Map partitionIdToOffset; + + public Map getPartitionIdToOffset() { + return partitionIdToOffset; + } + + public void setPartitionIdToOffset(Map partitionIdToOffset) { + this.partitionIdToOffset = partitionIdToOffset; + } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index cced4ddc1e48365..bba1c5d5f489804 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -18,13 +18,13 @@ package org.apache.doris.load.routineload; import com.google.common.base.Strings; +import com.google.gson.Gson; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.thrift.TTaskType; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.logging.log4j.LogManager; @@ -43,7 +43,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private String serverAddress; private String topic; - // optional + // optional, user want to load partitions. private List kafkaPartitions; public KafkaRoutineLoadJob() { @@ -59,21 +59,23 @@ public KafkaRoutineLoadJob(long id, String name, String userName, long dbId, lon this.topic = topic; } + public KafkaProgress getProgress() { + Gson gson = new Gson(); + return gson.fromJson(this.progress, KafkaProgress.class); + } + @Override - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { + public List divideRoutineLoadJob(int currentConcurrentTaskNum) { // divide kafkaPartitions into tasks - List kafkaRoutineLoadTaskList = new ArrayList<>(); + List kafkaRoutineLoadTaskList = new ArrayList<>(); for (int i = 0; i < currentConcurrentTaskNum; i++) { - // TODO(ml): init load task - kafkaRoutineLoadTaskList.add(new KafkaRoutineLoadTask(getResourceInfo(), 0L, TTaskType.PUSH, - dbId, tableId, 0L, 0L, 0L, SystemIdGenerator.getNextId())); + kafkaRoutineLoadTaskList.add(new KafkaTaskInfo(SystemIdGenerator.getNextId())); } for (int i = 0; i < kafkaPartitions.size(); i++) { - kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum).addKafkaPartition(kafkaPartitions.get(i)); + ((KafkaTaskInfo) kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum)) + .addKafkaPartition(kafkaPartitions.get(i)); } - List result = new ArrayList<>(); - result.addAll(kafkaRoutineLoadTaskList); - return result; + return kafkaRoutineLoadTaskList; } @Override @@ -99,6 +101,15 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)); } + @Override + public RoutineLoadTask createTask(RoutineLoadTaskInfo routineLoadTaskInfo, long beId) { + return new KafkaRoutineLoadTask(getResourceInfo(), + beId, getDbId(), getTableId(), + 0L, 0L, 0L, getColumns(), getWhere(), getColumnSeparator(), + (KafkaTaskInfo) routineLoadTaskInfo, + getProgress()); + } + private void updatePartitions() { // fetch all of kafkaPartitions in topic if (kafkaPartitions == null || kafkaPartitions.size() == 0) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java index 89347745c48c20f..c561563a693f4a3 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java @@ -17,27 +17,29 @@ package org.apache.doris.load.routineload; -import com.google.common.collect.Lists; import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TTaskType; -import java.util.List; +import java.util.HashMap; +import java.util.Map; -public class KafkaRoutineLoadTask extends RoutineLoadTask { - - private List kafkaPartitions; - public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, - long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) { - super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature); - this.kafkaPartitions = Lists.newArrayList(); - } +public class KafkaRoutineLoadTask extends RoutineLoadTask { - public void addKafkaPartition(int partition) { - kafkaPartitions.add(partition); + private Map partitionIdToOffset; + + public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, + long dbId, long tableId, long partitionId, long indexId, long tabletId, + String columns, String where, String columnSeparator, + KafkaTaskInfo kafkaTaskInfo, KafkaProgress kafkaProgress) { + super(resourceInfo, backendId, TTaskType.STREAM_LOAD, dbId, tableId, partitionId, indexId, tabletId, + kafkaTaskInfo.getSignature(), columns, where, columnSeparator, RoutineLoadJob.DataSourceType.KAFKA); + this.partitionIdToOffset = new HashMap<>(); + kafkaTaskInfo.getPartitions().parallelStream().forEach(entity -> + partitionIdToOffset.put(entity, kafkaProgress.getPartitionIdToOffset().get(entity))); } - public List getKafkaPartitions() { - return kafkaPartitions; + public Map getPartitionIdToOffset() { + return partitionIdToOffset; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java new file mode 100644 index 000000000000000..01bedebcf2cf4ea --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.common.SystemIdGenerator; + +import java.util.ArrayList; +import java.util.List; + +public class KafkaTaskInfo extends RoutineLoadTaskInfo { + + private List partitions; + + public KafkaTaskInfo(long signature) { + super(signature); + this.partitions = new ArrayList<>(); + } + + public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) { + super(SystemIdGenerator.getNextId()); + this.partitions = kafkaTaskInfo.getPartitions(); + } + + public void addKafkaPartition(int partition) { + partitions.add(partition); + } + + public List getPartitions() { + return partitions; + } + + +} diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index af737356febf57e..97998cfb18f3b0a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -25,7 +25,6 @@ import java.io.DataOutput; import java.io.IOException; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -35,7 +34,7 @@ * The desireTaskConcurrentNum means that user expect the number of concurrent stream load * The routine load job support different streaming medium such as KAFKA */ -public class RoutineLoadJob implements Writable { +public abstract class RoutineLoadJob implements Writable { public enum JobState { NEED_SCHEDULER, @@ -116,6 +115,26 @@ public long getId() { return id; } + public long getDbId() { + return dbId; + } + + public long getTableId() { + return tableId; + } + + public String getColumns() { + return columns; + } + + public String getWhere() { + return where; + } + + public String getColumnSeparator() { + return columnSeparator; + } + public JobState getState() { return state; } @@ -128,7 +147,7 @@ public TResourceInfo getResourceInfo() { return resourceInfo; } - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { + public List divideRoutineLoadJob(int currentConcurrentTaskNum) { return null; } @@ -145,4 +164,6 @@ public void write(DataOutput out) throws IOException { public void readFields(DataInput in) throws IOException { // TODO(ml) } + + abstract RoutineLoadTask createTask(RoutineLoadTaskInfo routineLoadTaskInfo, long beId); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java similarity index 56% rename from fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java rename to fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 0dd6b3cc4319742..36042693799777f 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -18,6 +18,7 @@ package org.apache.doris.load.routineload; import com.google.common.collect.Maps; +import com.google.common.collect.Queues; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.LoadException; import org.apache.logging.log4j.LogManager; @@ -27,15 +28,18 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; -public class RoutineLoad { - private static final Logger LOG = LogManager.getLogger(RoutineLoad.class); +public class RoutineLoadManager { + private static final Logger LOG = LogManager.getLogger(RoutineLoadManager.class); private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; + private static final int DEFAULT_TASK_TIMEOUT_MINUTES = 5; - // TODO(ml): real-time calculate by be + // Long is beId, integer is the size of tasks in be private Map beIdToMaxConcurrentTasks; + private Map beIdToConcurrentTasks; // stream load job meta private Map idToRoutineLoadJob; @@ -44,8 +48,11 @@ public class RoutineLoad { private Map idToCancelledRoutineLoadJob; // stream load tasks meta (not persistent) - private Map idToRoutineLoadTask; - private Map idToNeedSchedulerRoutineLoadTask; + private Map idToRoutineLoadTask; + // KafkaPartitions means that partitions belong to one task + // kafka partitions == routine load task (logical) + private Queue needSchedulerRoutineLoadTasks; + private Map taskIdToJobId; private ReentrantReadWriteLock lock; @@ -65,29 +72,75 @@ private void writeUnlock() { lock.writeLock().unlock(); } - public RoutineLoad() { + public RoutineLoadManager() { idToRoutineLoadJob = Maps.newHashMap(); idToNeedSchedulerRoutineLoadJob = Maps.newHashMap(); idToRunningRoutineLoadJob = Maps.newHashMap(); idToCancelledRoutineLoadJob = Maps.newHashMap(); idToRoutineLoadTask = Maps.newHashMap(); - idToNeedSchedulerRoutineLoadTask = Maps.newHashMap(); + needSchedulerRoutineLoadTasks = Queues.newLinkedBlockingQueue(); + beIdToConcurrentTasks = Maps.newHashMap(); + taskIdToJobId = Maps.newHashMap(); lock = new ReentrantReadWriteLock(true); } + public void initBeIdToMaxConcurrentTasks() { + if (beIdToMaxConcurrentTasks == null) { + beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) + .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); + } + } + public int getTotalMaxConcurrentTaskNum() { readLock(); try { - if (beIdToMaxConcurrentTasks == null) { - beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) - .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); - } + initBeIdToMaxConcurrentTasks(); return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum(); } finally { readUnlock(); } } + public void updateBeIdTaskMaps() { + writeLock(); + try { + initBeIdToMaxConcurrentTasks(); + List beIds = Catalog.getCurrentSystemInfo().getBackendIds(true); + + // diff beIds and beIdToMaxConcurrentTasks.keys() + List newBeIds = beIds.parallelStream().filter(entity -> beIdToMaxConcurrentTasks.get(entity) == null) + .collect(Collectors.toList()); + List unavailableBeIds = beIdToMaxConcurrentTasks.keySet().parallelStream() + .filter(entity -> !beIds.contains(entity)) + .collect(Collectors.toList()); + newBeIds.parallelStream().forEach(entity -> beIdToMaxConcurrentTasks.put(entity, DEFAULT_BE_CONCURRENT_TASK_NUM)); + for (long beId : unavailableBeIds) { + beIdToMaxConcurrentTasks.remove(beId); + beIdToConcurrentTasks.remove(beId); + } + LOG.info("There are {} backends which participate in routine load scheduler. " + + "There are {} new backends and {} unavailable backends for routine load", + beIdToMaxConcurrentTasks.size(), newBeIds.size(), unavailableBeIds.size()); + } finally { + writeUnlock(); + } + } + + public void addNumOfConcurrentTasksByBeId(long beId) { + writeLock(); + try { + if (beIdToConcurrentTasks.get(beId) == null) { + beIdToConcurrentTasks.put(beId, 1); + } else { + int concurrentTaskNum = (int) beIdToConcurrentTasks.get(beId); + concurrentTaskNum++; + beIdToConcurrentTasks.put(beId, concurrentTaskNum); + } + } finally { + writeUnlock(); + } + } + public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) { writeLock(); try { @@ -97,7 +150,7 @@ public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) { } } - public void addRoutineLoadTasks(List routineLoadTaskList) { + public void addRoutineLoadTasks(List routineLoadTaskList) { writeLock(); try { idToRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect( @@ -107,37 +160,93 @@ public void addRoutineLoadTasks(List routineLoadTaskList) { } } - public Map getIdToRoutineLoadTask() { - return idToRoutineLoadTask; + public Map getIdToRoutineLoadTask() { + readLock(); + try { + return idToRoutineLoadTask; + } finally { + readUnlock(); + } } - public void addNeedSchedulerRoutineLoadTasks(List routineLoadTaskList) { + public void addNeedSchedulerRoutineLoadTasks(List routineLoadTaskList, long routineLoadJobId) { writeLock(); try { - idToNeedSchedulerRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect( - Collectors.toMap(task -> task.getSignature(), task -> task))); + routineLoadTaskList.parallelStream().forEach(entity -> needSchedulerRoutineLoadTasks.add(entity)); + routineLoadTaskList.parallelStream().forEach(entity -> + taskIdToJobId.put(entity.getSignature(), routineLoadJobId)); } finally { writeUnlock(); } } - public void removeRoutineLoadTasks(List routineLoadTasks) { + public void removeRoutineLoadTasks(List routineLoadTasks) { if (routineLoadTasks != null) { writeLock(); try { routineLoadTasks.parallelStream().forEach(task -> idToRoutineLoadTask.remove(task.getSignature())); routineLoadTasks.parallelStream().forEach(task -> - idToNeedSchedulerRoutineLoadTask.remove(task.getSignature())); + needSchedulerRoutineLoadTasks.remove(task)); + routineLoadTasks.parallelStream().forEach(task -> taskIdToJobId.remove(task.getSignature())); } finally { writeUnlock(); } } } - public Map getIdToNeedSchedulerRoutineLoadTasks() { + public int getClusterIdleSlotNum() { + readLock(); + try { + int result = 0; + initBeIdToMaxConcurrentTasks(); + for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { + if (beIdToConcurrentTasks.get(entry.getKey()) == null) { + result += entry.getValue(); + } else { + result += entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); + } + } + return result; + } finally { + readUnlock(); + } + } + + public long getMinTaskBeId() { + readLock(); + try { + long result = 0L; + int maxIdleSlotNum = 0; + initBeIdToMaxConcurrentTasks(); + for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { + if (beIdToConcurrentTasks.get(entry.getKey()) == null) { + result = maxIdleSlotNum < entry.getValue() ? entry.getKey() : result; + maxIdleSlotNum = Math.max(maxIdleSlotNum, entry.getValue()); + } else { + int idelTaskNum = entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); + result = maxIdleSlotNum < idelTaskNum ? entry.getKey() : result; + maxIdleSlotNum = Math.max(maxIdleSlotNum, idelTaskNum); + } + } + return result; + } finally { + readUnlock(); + } + } + + public Queue getNeedSchedulerRoutineLoadTasks() { + readLock(); + try { + return needSchedulerRoutineLoadTasks; + } finally { + readUnlock(); + } + } + + public RoutineLoadJob getJobByTaskId(long taskId) { readLock(); try { - return idToNeedSchedulerRoutineLoadTask; + return idToRoutineLoadJob.get(taskIdToJobId.get(taskId)); } finally { readUnlock(); } @@ -251,6 +360,34 @@ public void updateRoutineLoadJobState(RoutineLoadJob routineLoadJob, RoutineLoad } } + public void processTimeOutTasks() { + writeLock(); + try { + List runningTasks = new ArrayList<>(idToRoutineLoadTask.values()); + runningTasks.removeAll(needSchedulerRoutineLoadTasks); + + for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) { + if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) + > DEFAULT_TASK_TIMEOUT_MINUTES * 60 * 1000) { + long oldSignature = routineLoadTaskInfo.getSignature(); + if (routineLoadTaskInfo instanceof KafkaTaskInfo) { + // remove old task + idToRoutineLoadTask.remove(routineLoadTaskInfo.getSignature()); + // add new task + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo((KafkaTaskInfo) routineLoadTaskInfo); + idToRoutineLoadTask.put(kafkaTaskInfo.getSignature(), kafkaTaskInfo); + needSchedulerRoutineLoadTasks.add(kafkaTaskInfo); + } + LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled", + oldSignature, DEFAULT_TASK_TIMEOUT_MINUTES); + } + + } + } finally { + writeUnlock(); + } + } + private void checkStateTransform(RoutineLoadJob.JobState currentState, RoutineLoadJob.JobState desireState) throws LoadException { if (currentState == RoutineLoadJob.JobState.PAUSED && desireState == RoutineLoadJob.JobState.NEED_SCHEDULER) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 6c3cdefdf0a975a..276494b84b885f0 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -30,10 +30,11 @@ public class RoutineLoadScheduler extends Daemon { private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class); - private RoutineLoad routineLoad = Catalog.getInstance().getRoutineLoadInstance(); + private RoutineLoadManager routineLoadManager = Catalog.getInstance().getRoutineLoadInstance(); @Override protected void runOneCycle() { + // update // get need scheduler routine jobs List routineLoadJobList = null; try { @@ -45,38 +46,37 @@ protected void runOneCycle() { LOG.debug("there are {} job need scheduler", routineLoadJobList.size()); for (RoutineLoadJob routineLoadJob : routineLoadJobList) { // judge nums of tasks more then max concurrent tasks of cluster - List routineLoadTaskList = null; + List routineLoadTaskList = null; + routineLoadJob.writeLock(); try { - routineLoadJob.writeLock(); - if (routineLoadJob.getState() == RoutineLoadJob.JobState.NEED_SCHEDULER) { int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); - int totalTaskNum = currentConcurrentTaskNum + routineLoad.getIdToRoutineLoadTask().size(); - if (totalTaskNum > routineLoad.getTotalMaxConcurrentTaskNum()) { + int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getIdToRoutineLoadTask().size(); + if (totalTaskNum > routineLoadManager.getTotalMaxConcurrentTaskNum()) { LOG.info("job {} concurrent task num {}, current total task num {}. " + "desired total task num {} more then total max task num {}, " + "skip this turn of scheduler", routineLoadJob.getId(), currentConcurrentTaskNum, - routineLoad.getIdToRoutineLoadTask().size(), - totalTaskNum, routineLoad.getTotalMaxConcurrentTaskNum()); + routineLoadManager.getIdToRoutineLoadTask().size(), + totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum()); break; } // divide job into tasks routineLoadTaskList = routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); // update tasks meta - routineLoad.addRoutineLoadTasks(routineLoadTaskList); - routineLoad.addNeedSchedulerRoutineLoadTasks(routineLoadTaskList); + routineLoadManager.addRoutineLoadTasks(routineLoadTaskList); + routineLoadManager.addNeedSchedulerRoutineLoadTasks(routineLoadTaskList, routineLoadJob.getId()); // change job state to running - routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.RUNNING); + routineLoadManager.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.RUNNING); } } catch (MetaNotFoundException e) { - routineLoad.updateRoutineLoadJobStateNoValid(routineLoadJob, RoutineLoadJob.JobState.CANCELLED); + routineLoadManager.updateRoutineLoadJobStateNoValid(routineLoadJob, RoutineLoadJob.JobState.CANCELLED); } catch (LoadException e) { LOG.error("failed to scheduler job {} with error massage {}", routineLoadJob.getId(), e.getMessage(), e); - routineLoad.removeRoutineLoadTasks(routineLoadTaskList); + routineLoadManager.removeRoutineLoadTasks(routineLoadTaskList); } finally { routineLoadJob.writeUnlock(); } @@ -85,7 +85,7 @@ protected void runOneCycle() { } private List getNeedSchedulerRoutineJobs() throws LoadException { - return routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER); + return routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java index 184feee880db95a..a89ce5f84f094df 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java @@ -21,10 +21,22 @@ import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TTaskType; -public class RoutineLoadTask extends AgentTask{ +public class RoutineLoadTask extends AgentTask { + + private String columns; + private String where; + private String columnSeparator; + private RoutineLoadJob.DataSourceType dataSourceType; + public RoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, - long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) { + long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature, + String columns, String where, String columnSeparator, + RoutineLoadJob.DataSourceType dataSourceType) { super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature); + this.columns = columns; + this.where = where; + this.columnSeparator = columnSeparator; + this.dataSourceType = dataSourceType; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java new file mode 100644 index 000000000000000..70fc27e39204ad0 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Routine load task info is the task info include the only id (signature). + * For the kafka type of task info, it also include partitions which will be obtained data in this task. + * The routine load task info and routine load task are the same thing logically. + * Differently, routine load task is a agent task include backendId which will execute this task. + */ +public class RoutineLoadTaskInfo { + + private long signature; + + private long createTimeMs; + private long loadStartTimeMs; + + public RoutineLoadTaskInfo(long signature) { + this.signature = signature; + this.createTimeMs = System.currentTimeMillis(); + } + + public long getSignature() { + return signature; + } + + public void setLoadStartTimeMs(long loadStartTimeMs) { + this.loadStartTimeMs = loadStartTimeMs; + } + + public long getLoadStartTimeMs() { + return loadStartTimeMs; + } +} diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java new file mode 100644 index 000000000000000..07804b881e8605b --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.util.Daemon; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Queue; + +/** + * Routine load task scheduler is a function which allocate task to be. + * Step1: get total idle task num of backends. + * Step2: equally divide to be + * Step3: submit tasks to be + */ +public class RoutineLoadTaskScheduler extends Daemon { + + private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class); + + private RoutineLoadManager routineLoadManager = Catalog.getInstance().getRoutineLoadInstance(); + + @Override + protected void runOneCycle() { + try { + process(); + } catch (Throwable e) { + LOG.error("Failed to process one round of RoutineLoadTaskScheduler with error message {}", + e.getMessage(), e); + } + } + + private void process() { + // update current beIdMaps for tasks + routineLoadManager.updateBeIdTaskMaps(); + + // check timeout tasks + routineLoadManager.processTimeOutTasks(); + + // get idle be task num + int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum(); + int scheduledTaskNum = 0; + Queue routineLoadTaskList = routineLoadManager.getNeedSchedulerRoutineLoadTasks(); + AgentBatchTask batchTask = new AgentBatchTask(); + + // allocate task to be + while (clusterIdleSlotNum > 0) { + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskList.poll(); + // queue is not empty + if (routineLoadTaskInfo != null) { + // when routine load task is not abandoned + if (routineLoadManager.getIdToRoutineLoadTask().get(routineLoadTaskInfo.getSignature()) != null) { + long beId = routineLoadManager.getMinTaskBeId(); + RoutineLoadJob routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getSignature()); + RoutineLoadTask routineLoadTask = routineLoadJob.createTask(routineLoadTaskInfo, beId); + if (routineLoadTask != null) { + routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); + AgentTaskQueue.addTask(routineLoadTask); + batchTask.addTask(routineLoadTask); + clusterIdleSlotNum--; + scheduledTaskNum++; + routineLoadManager.addNumOfConcurrentTasksByBeId(beId); + } + } else { + LOG.debug("Task {} for job has been already discarded", routineLoadTaskInfo.getSignature()); + } + } else { + LOG.debug("The queue of need scheduler tasks is empty."); + break; + } + } + LOG.info("{} tasks have bean allocated to be. There are {} remaining idle slot in cluster.", + scheduledTaskNum, routineLoadManager.getClusterIdleSlotNum()); + + if (batchTask.getTaskNum() > 0) { + AgentTaskExecutor.submit(batchTask); + } + } +} diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index ab88bff6be6ee36..1ce9644bb1bd256 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -19,7 +19,6 @@ import com.google.common.collect.Lists; import mockit.Deencapsulation; -import mockit.Delegate; import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; @@ -89,15 +88,15 @@ public void testDivideRoutineLoadJob() { Deencapsulation.setField(kafkaRoutineLoadJob, "kafkaPartitions", Arrays.asList(1, 4, 6)); - List result = kafkaRoutineLoadJob.divideRoutineLoadJob(2); + List result = kafkaRoutineLoadJob.divideRoutineLoadJob(2); Assert.assertEquals(2, result.size()); - for (RoutineLoadTask routineLoadTask : result) { - KafkaRoutineLoadTask kafkaRoutineLoadTask = (KafkaRoutineLoadTask) routineLoadTask; - if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 2) { - Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(1)); - Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(6)); - } else if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 1) { - Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(4)); + for (RoutineLoadTaskInfo routineLoadTaskInfo : result) { + KafkaTaskInfo kafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; + if (kafkaTaskInfo.getPartitions().size() == 2) { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(1)); + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(6)); + } else if (kafkaTaskInfo.getPartitions().size() == 1) { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(4)); } else { Assert.fail(); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java new file mode 100644 index 000000000000000..8e50e7014108447 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import com.google.common.collect.Lists; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import mockit.Verifications; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.SystemIdGenerator; +import org.apache.doris.system.SystemInfoService; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +public class RoutineLoadManagerTest { + + private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; + private static final int DEFAULT_TASK_TIMEOUT_MINUTES = 5; + + @Mocked + private SystemInfoService systemInfoService; + + @Test + public void testGetMinTaskBeId() { + List beIds = Lists.newArrayList(); + beIds.add(1L); + beIds.add(2L); + + new Expectations() { + { + systemInfoService.getBackendIds(true); + result = beIds; + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + } + }; + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addNumOfConcurrentTasksByBeId(1L); + Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId()); + } + + @Test + public void testGetTotalIdleTaskNum() { + List beIds = Lists.newArrayList(); + beIds.add(1L); + beIds.add(2L); + + new Expectations() { + { + systemInfoService.getBackendIds(true); + result = beIds; + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + } + }; + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addNumOfConcurrentTasksByBeId(1L); + Assert.assertEquals(DEFAULT_BE_CONCURRENT_TASK_NUM * 2 - 1, routineLoadManager.getClusterIdleSlotNum()); + } + + @Test + public void testUpdateBeIdTaskMaps() { + List oldBeIds = Lists.newArrayList(); + oldBeIds.add(1L); + oldBeIds.add(2L); + + List newBeIds = Lists.newArrayList(); + newBeIds.add(1L); + newBeIds.add(3L); + + new Expectations() { + { + systemInfoService.getBackendIds(true); + returns(oldBeIds, newBeIds); + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + } + }; + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.updateBeIdTaskMaps(); + } + + @Test + public void testProcessTimeOutTasks() { + List routineLoadTaskInfoList = new ArrayList<>(); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(1L); + kafkaTaskInfo.addKafkaPartition(100); + kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_MINUTES * 60 * 1000); + routineLoadTaskInfoList.add(kafkaTaskInfo); + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addRoutineLoadTasks(routineLoadTaskInfoList); + + + new MockUp() { + @Mock + public long getNextId() { + return 2L; + } + }; + + + routineLoadManager.processTimeOutTasks(); + new Verifications() { + { + Map idToRoutineLoadTask = + Deencapsulation.getField(routineLoadManager, "idToRoutineLoadTask"); + Assert.assertNull(idToRoutineLoadTask.get(1L)); + Assert.assertEquals(1, idToRoutineLoadTask.size()); + Queue needSchedulerTask = + Deencapsulation.getField(routineLoadManager, "needSchedulerRoutineLoadTask"); + RoutineLoadTaskInfo routineLoadTaskInfo = needSchedulerTask.poll(); + Assert.assertNotNull(routineLoadTaskInfo); + Assert.assertEquals(100, (int) ((KafkaTaskInfo) routineLoadTaskInfo).getPartitions().get(0)); + } + }; + } + +} diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 2a9e4de35382c6d..9940b1b92b6a70b 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -41,8 +41,8 @@ public class RoutineLoadSchedulerTest { @Test public void testNormalRunOneCycle() throws LoadException, MetaNotFoundException { int taskNum = 1; - List routineLoadTaskList = new ArrayList<>(); - KafkaRoutineLoadTask kafkaRoutineLoadTask = EasyMock.createNiceMock(KafkaRoutineLoadTask.class); + List routineLoadTaskList = new ArrayList<>(); + KafkaTaskInfo kafkaRoutineLoadTask = EasyMock.createNiceMock(KafkaTaskInfo.class); EasyMock.expect(kafkaRoutineLoadTask.getSignature()).andReturn(1L).anyTimes(); EasyMock.replay(kafkaRoutineLoadTask); routineLoadTaskList.add(kafkaRoutineLoadTask); @@ -66,21 +66,21 @@ public void testNormalRunOneCycle() throws LoadException, MetaNotFoundException PowerMock.replay(Catalog.class); - RoutineLoad routineLoad = new RoutineLoad(); + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); EasyMock.expect(catalog.getEditLog()).andReturn(editLog).anyTimes(); - EasyMock.expect(catalog.getRoutineLoadInstance()).andReturn(routineLoad).anyTimes(); + EasyMock.expect(catalog.getRoutineLoadInstance()).andReturn(routineLoadManager).anyTimes(); EasyMock.replay(catalog); - routineLoad.addRoutineLoadJob(routineLoadJob); - routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.NEED_SCHEDULER); + routineLoadManager.addRoutineLoadJob(routineLoadJob); + routineLoadManager.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.NEED_SCHEDULER); RoutineLoadScheduler routineLoadScheduler = new RoutineLoadScheduler(); routineLoadScheduler.runOneCycle(); - Assert.assertEquals(1, routineLoad.getIdToRoutineLoadTask().size()); - Assert.assertEquals(1, routineLoad.getIdToNeedSchedulerRoutineLoadTasks().size()); - Assert.assertEquals(1, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING).size()); - Assert.assertEquals(0, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER).size()); + Assert.assertEquals(1, routineLoadManager.getIdToRoutineLoadTask().size()); + Assert.assertEquals(1, routineLoadManager.getNeedSchedulerRoutineLoadTasks().size()); + Assert.assertEquals(1, routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING).size()); + Assert.assertEquals(0, routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER).size()); } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java new file mode 100644 index 000000000000000..69d3c6b7e3f09ca --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; +import mockit.Verifications; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.task.AgentTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TTaskType; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.Queue; + +public class RoutineLoadTaskSchedulerTest { + + @Mocked + private RoutineLoadManager routineLoadManager; + @Mocked + private Catalog catalog; + @Mocked + private AgentTaskExecutor agentTaskExecutor; + + @Test + public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1) { + long beId = 100L; + + Queue routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(1L); + routineLoadTaskInfo1.addKafkaPartition(1); + routineLoadTaskInfo1.addKafkaPartition(2); + routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); + + Map idToRoutineLoadTask = Maps.newHashMap(); + idToRoutineLoadTask.put(1L, routineLoadTaskInfo1); + + Map partitionIdToOffset = Maps.newHashMap(); + partitionIdToOffset.put(1, 100L); + partitionIdToOffset.put(2, 200L); + KafkaProgress kafkaProgress = new KafkaProgress(); + kafkaProgress.setPartitionIdToOffset(partitionIdToOffset); + + new Expectations() { + { + Catalog.getInstance(); + result = catalog; + catalog.getRoutineLoadInstance(); + result = routineLoadManager; + + routineLoadManager.getClusterIdleSlotNum(); + result = 3; + routineLoadManager.getNeedSchedulerRoutineLoadTasks(); + result = routineLoadTaskInfoQueue; + routineLoadManager.getIdToRoutineLoadTask(); + result = idToRoutineLoadTask; + + kafkaRoutineLoadJob1.getDbId(); + result = 1L; + kafkaRoutineLoadJob1.getTableId(); + result = 1L; + kafkaRoutineLoadJob1.getColumns(); + result = "columns"; + kafkaRoutineLoadJob1.getColumnSeparator(); + result = ""; + kafkaRoutineLoadJob1.getProgress(); + result = kafkaProgress; + + + routineLoadManager.getMinTaskBeId(); + result = beId; + routineLoadManager.getJobByTaskId(1L); + result = kafkaRoutineLoadJob1; + } + }; + + RoutineLoadTaskScheduler routineLoadTaskScheduler = new RoutineLoadTaskScheduler(); + routineLoadTaskScheduler.runOneCycle(); + + new Verifications() { + { + AgentTask routineLoadTask = + AgentTaskQueue.getTask(beId, TTaskType.STREAM_LOAD, routineLoadTaskInfo1.getSignature()); + + Assert.assertEquals(beId, routineLoadTask.getBackendId()); + Assert.assertEquals(100L, + (long) ((KafkaRoutineLoadTask) routineLoadTask).getPartitionIdToOffset().get(1)); + Assert.assertEquals(200L, + (long) ((KafkaRoutineLoadTask) routineLoadTask).getPartitionIdToOffset().get(2)); + + routineLoadManager.addNumOfConcurrentTasksByBeId(beId); + times = 1; + } + }; + } +} diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 90e07e012e93695..15271c3997a8bbd 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -155,7 +155,8 @@ enum TTaskType { PUBLISH_VERSION, CLEAR_ALTER_TASK, CLEAR_TRANSACTION_TASK, - RECOVER_TABLET + RECOVER_TABLET, + STREAM_LOAD } enum TStmtType {