From 0cde55271bd0b88e5631127cde2748fe453273a0 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Wed, 21 Nov 2018 15:13:56 +0800 Subject: [PATCH 1/4] Add distributor which schedule task to be fairly Step1: updateBeIdTaskMaps, remove unalive be and add new alive be Step2: process timeout tasks, if a task already has been allocate to be but not finished before DEFAULT_TASK_TIMEOUT_MINUTES, it will be discarded. At the same time, the partitions belong to old task will be allocate to a new task. The new task with a signatrue will be add in the queue of needSchedulerRoutineLoadTask. Step3: process all of needSchedulerRoutineLoadTask, allocate task to be. The task will be executed by backend. --- .../org/apache/doris/catalog/Catalog.java | 10 +- ...neLoadProgress.java => KafkaProgress.java} | 19 +- .../load/routineload/KafkaRoutineLoadJob.java | 33 ++-- .../routineload/KafkaRoutineLoadTask.java | 30 +-- .../doris/load/routineload/KafkaTaskInfo.java | 48 +++++ .../load/routineload/RoutineLoadJob.java | 27 ++- ...utineLoad.java => RoutineLoadManager.java} | 179 ++++++++++++++++-- .../routineload/RoutineLoadScheduler.java | 28 +-- .../load/routineload/RoutineLoadTask.java | 16 +- .../load/routineload/RoutineLoadTaskInfo.java | 51 +++++ .../routineload/RoutineLoadTaskScheduler.java | 98 ++++++++++ .../routineload/KafkaRoutineLoadJobTest.java | 17 +- .../routineload/RoutineLoadManagerTest.java | 145 ++++++++++++++ .../routineload/RoutineLoadSchedulerTest.java | 20 +- .../RoutineLoadTaskSchedulerTest.java | 117 ++++++++++++ gensrc/thrift/Types.thrift | 3 +- 16 files changed, 748 insertions(+), 93 deletions(-) rename fe/src/main/java/org/apache/doris/load/routineload/{KafkaRoutineLoadProgress.java => KafkaProgress.java} (64%) create mode 100644 fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java rename fe/src/main/java/org/apache/doris/load/routineload/{RoutineLoad.java => RoutineLoadManager.java} (56%) create mode 100644 fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java create mode 100644 fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java create mode 100644 fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java create mode 100644 fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index e79aa1fa4e0a9f..e253a0740ccd21 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -135,7 +135,7 @@ import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; import org.apache.doris.load.LoadJob.JobState; -import org.apache.doris.load.routineload.RoutineLoad; +import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; import org.apache.doris.metric.MetricRepo; @@ -241,7 +241,7 @@ public class Catalog { private ConcurrentHashMap nameToCluster; private Load load; - private RoutineLoad routineLoad; + private RoutineLoadManager routineLoadManager; private ExportMgr exportMgr; private Clone clone; private Alter alter; @@ -372,7 +372,7 @@ private Catalog() { this.idToDb = new ConcurrentHashMap<>(); this.fullNameToDb = new ConcurrentHashMap<>(); this.load = new Load(); - this.routineLoad = new RoutineLoad(); + this.routineLoadManager = new RoutineLoadManager(); this.exportMgr = new ExportMgr(); this.clone = new Clone(); this.alter = new Alter(); @@ -4250,8 +4250,8 @@ public Load getLoadInstance() { return this.load; } - public RoutineLoad getRoutineLoadInstance() { - return routineLoad; + public RoutineLoadManager getRoutineLoadInstance() { + return routineLoadManager; } public ExportMgr getExportMgr() { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java similarity index 64% rename from fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java rename to fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index ff46af630bea46..6f0f4243ca7a8a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -17,8 +17,21 @@ package org.apache.doris.load.routineload; -public class KafkaRoutineLoadProgress { +import java.util.Map; - private String partitionName; - private long offset; +/** + * this is description of kafka routine load progress + * the data before offset was already loaded in doris + */ +public class KafkaProgress { + + private Map partitionIdToOffset; + + public Map getPartitionIdToOffset() { + return partitionIdToOffset; + } + + public void setPartitionIdToOffset(Map partitionIdToOffset) { + this.partitionIdToOffset = partitionIdToOffset; + } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index cced4ddc1e4836..bba1c5d5f48980 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -18,13 +18,13 @@ package org.apache.doris.load.routineload; import com.google.common.base.Strings; +import com.google.gson.Gson; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.thrift.TTaskType; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.logging.log4j.LogManager; @@ -43,7 +43,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private String serverAddress; private String topic; - // optional + // optional, user want to load partitions. private List kafkaPartitions; public KafkaRoutineLoadJob() { @@ -59,21 +59,23 @@ public KafkaRoutineLoadJob(long id, String name, String userName, long dbId, lon this.topic = topic; } + public KafkaProgress getProgress() { + Gson gson = new Gson(); + return gson.fromJson(this.progress, KafkaProgress.class); + } + @Override - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { + public List divideRoutineLoadJob(int currentConcurrentTaskNum) { // divide kafkaPartitions into tasks - List kafkaRoutineLoadTaskList = new ArrayList<>(); + List kafkaRoutineLoadTaskList = new ArrayList<>(); for (int i = 0; i < currentConcurrentTaskNum; i++) { - // TODO(ml): init load task - kafkaRoutineLoadTaskList.add(new KafkaRoutineLoadTask(getResourceInfo(), 0L, TTaskType.PUSH, - dbId, tableId, 0L, 0L, 0L, SystemIdGenerator.getNextId())); + kafkaRoutineLoadTaskList.add(new KafkaTaskInfo(SystemIdGenerator.getNextId())); } for (int i = 0; i < kafkaPartitions.size(); i++) { - kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum).addKafkaPartition(kafkaPartitions.get(i)); + ((KafkaTaskInfo) kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum)) + .addKafkaPartition(kafkaPartitions.get(i)); } - List result = new ArrayList<>(); - result.addAll(kafkaRoutineLoadTaskList); - return result; + return kafkaRoutineLoadTaskList; } @Override @@ -99,6 +101,15 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)); } + @Override + public RoutineLoadTask createTask(RoutineLoadTaskInfo routineLoadTaskInfo, long beId) { + return new KafkaRoutineLoadTask(getResourceInfo(), + beId, getDbId(), getTableId(), + 0L, 0L, 0L, getColumns(), getWhere(), getColumnSeparator(), + (KafkaTaskInfo) routineLoadTaskInfo, + getProgress()); + } + private void updatePartitions() { // fetch all of kafkaPartitions in topic if (kafkaPartitions == null || kafkaPartitions.size() == 0) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java index 89347745c48c20..c561563a693f4a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java @@ -17,27 +17,29 @@ package org.apache.doris.load.routineload; -import com.google.common.collect.Lists; import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TTaskType; -import java.util.List; +import java.util.HashMap; +import java.util.Map; -public class KafkaRoutineLoadTask extends RoutineLoadTask { - - private List kafkaPartitions; - public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, - long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) { - super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature); - this.kafkaPartitions = Lists.newArrayList(); - } +public class KafkaRoutineLoadTask extends RoutineLoadTask { - public void addKafkaPartition(int partition) { - kafkaPartitions.add(partition); + private Map partitionIdToOffset; + + public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, + long dbId, long tableId, long partitionId, long indexId, long tabletId, + String columns, String where, String columnSeparator, + KafkaTaskInfo kafkaTaskInfo, KafkaProgress kafkaProgress) { + super(resourceInfo, backendId, TTaskType.STREAM_LOAD, dbId, tableId, partitionId, indexId, tabletId, + kafkaTaskInfo.getSignature(), columns, where, columnSeparator, RoutineLoadJob.DataSourceType.KAFKA); + this.partitionIdToOffset = new HashMap<>(); + kafkaTaskInfo.getPartitions().parallelStream().forEach(entity -> + partitionIdToOffset.put(entity, kafkaProgress.getPartitionIdToOffset().get(entity))); } - public List getKafkaPartitions() { - return kafkaPartitions; + public Map getPartitionIdToOffset() { + return partitionIdToOffset; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java new file mode 100644 index 00000000000000..01bedebcf2cf4e --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.common.SystemIdGenerator; + +import java.util.ArrayList; +import java.util.List; + +public class KafkaTaskInfo extends RoutineLoadTaskInfo { + + private List partitions; + + public KafkaTaskInfo(long signature) { + super(signature); + this.partitions = new ArrayList<>(); + } + + public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) { + super(SystemIdGenerator.getNextId()); + this.partitions = kafkaTaskInfo.getPartitions(); + } + + public void addKafkaPartition(int partition) { + partitions.add(partition); + } + + public List getPartitions() { + return partitions; + } + + +} diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index af737356febf57..97998cfb18f3b0 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -25,7 +25,6 @@ import java.io.DataOutput; import java.io.IOException; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -35,7 +34,7 @@ * The desireTaskConcurrentNum means that user expect the number of concurrent stream load * The routine load job support different streaming medium such as KAFKA */ -public class RoutineLoadJob implements Writable { +public abstract class RoutineLoadJob implements Writable { public enum JobState { NEED_SCHEDULER, @@ -116,6 +115,26 @@ public long getId() { return id; } + public long getDbId() { + return dbId; + } + + public long getTableId() { + return tableId; + } + + public String getColumns() { + return columns; + } + + public String getWhere() { + return where; + } + + public String getColumnSeparator() { + return columnSeparator; + } + public JobState getState() { return state; } @@ -128,7 +147,7 @@ public TResourceInfo getResourceInfo() { return resourceInfo; } - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { + public List divideRoutineLoadJob(int currentConcurrentTaskNum) { return null; } @@ -145,4 +164,6 @@ public void write(DataOutput out) throws IOException { public void readFields(DataInput in) throws IOException { // TODO(ml) } + + abstract RoutineLoadTask createTask(RoutineLoadTaskInfo routineLoadTaskInfo, long beId); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java similarity index 56% rename from fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java rename to fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 0dd6b3cc431974..36042693799777 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -18,6 +18,7 @@ package org.apache.doris.load.routineload; import com.google.common.collect.Maps; +import com.google.common.collect.Queues; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.LoadException; import org.apache.logging.log4j.LogManager; @@ -27,15 +28,18 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; -public class RoutineLoad { - private static final Logger LOG = LogManager.getLogger(RoutineLoad.class); +public class RoutineLoadManager { + private static final Logger LOG = LogManager.getLogger(RoutineLoadManager.class); private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; + private static final int DEFAULT_TASK_TIMEOUT_MINUTES = 5; - // TODO(ml): real-time calculate by be + // Long is beId, integer is the size of tasks in be private Map beIdToMaxConcurrentTasks; + private Map beIdToConcurrentTasks; // stream load job meta private Map idToRoutineLoadJob; @@ -44,8 +48,11 @@ public class RoutineLoad { private Map idToCancelledRoutineLoadJob; // stream load tasks meta (not persistent) - private Map idToRoutineLoadTask; - private Map idToNeedSchedulerRoutineLoadTask; + private Map idToRoutineLoadTask; + // KafkaPartitions means that partitions belong to one task + // kafka partitions == routine load task (logical) + private Queue needSchedulerRoutineLoadTasks; + private Map taskIdToJobId; private ReentrantReadWriteLock lock; @@ -65,29 +72,75 @@ private void writeUnlock() { lock.writeLock().unlock(); } - public RoutineLoad() { + public RoutineLoadManager() { idToRoutineLoadJob = Maps.newHashMap(); idToNeedSchedulerRoutineLoadJob = Maps.newHashMap(); idToRunningRoutineLoadJob = Maps.newHashMap(); idToCancelledRoutineLoadJob = Maps.newHashMap(); idToRoutineLoadTask = Maps.newHashMap(); - idToNeedSchedulerRoutineLoadTask = Maps.newHashMap(); + needSchedulerRoutineLoadTasks = Queues.newLinkedBlockingQueue(); + beIdToConcurrentTasks = Maps.newHashMap(); + taskIdToJobId = Maps.newHashMap(); lock = new ReentrantReadWriteLock(true); } + public void initBeIdToMaxConcurrentTasks() { + if (beIdToMaxConcurrentTasks == null) { + beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) + .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); + } + } + public int getTotalMaxConcurrentTaskNum() { readLock(); try { - if (beIdToMaxConcurrentTasks == null) { - beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) - .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); - } + initBeIdToMaxConcurrentTasks(); return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum(); } finally { readUnlock(); } } + public void updateBeIdTaskMaps() { + writeLock(); + try { + initBeIdToMaxConcurrentTasks(); + List beIds = Catalog.getCurrentSystemInfo().getBackendIds(true); + + // diff beIds and beIdToMaxConcurrentTasks.keys() + List newBeIds = beIds.parallelStream().filter(entity -> beIdToMaxConcurrentTasks.get(entity) == null) + .collect(Collectors.toList()); + List unavailableBeIds = beIdToMaxConcurrentTasks.keySet().parallelStream() + .filter(entity -> !beIds.contains(entity)) + .collect(Collectors.toList()); + newBeIds.parallelStream().forEach(entity -> beIdToMaxConcurrentTasks.put(entity, DEFAULT_BE_CONCURRENT_TASK_NUM)); + for (long beId : unavailableBeIds) { + beIdToMaxConcurrentTasks.remove(beId); + beIdToConcurrentTasks.remove(beId); + } + LOG.info("There are {} backends which participate in routine load scheduler. " + + "There are {} new backends and {} unavailable backends for routine load", + beIdToMaxConcurrentTasks.size(), newBeIds.size(), unavailableBeIds.size()); + } finally { + writeUnlock(); + } + } + + public void addNumOfConcurrentTasksByBeId(long beId) { + writeLock(); + try { + if (beIdToConcurrentTasks.get(beId) == null) { + beIdToConcurrentTasks.put(beId, 1); + } else { + int concurrentTaskNum = (int) beIdToConcurrentTasks.get(beId); + concurrentTaskNum++; + beIdToConcurrentTasks.put(beId, concurrentTaskNum); + } + } finally { + writeUnlock(); + } + } + public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) { writeLock(); try { @@ -97,7 +150,7 @@ public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) { } } - public void addRoutineLoadTasks(List routineLoadTaskList) { + public void addRoutineLoadTasks(List routineLoadTaskList) { writeLock(); try { idToRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect( @@ -107,37 +160,93 @@ public void addRoutineLoadTasks(List routineLoadTaskList) { } } - public Map getIdToRoutineLoadTask() { - return idToRoutineLoadTask; + public Map getIdToRoutineLoadTask() { + readLock(); + try { + return idToRoutineLoadTask; + } finally { + readUnlock(); + } } - public void addNeedSchedulerRoutineLoadTasks(List routineLoadTaskList) { + public void addNeedSchedulerRoutineLoadTasks(List routineLoadTaskList, long routineLoadJobId) { writeLock(); try { - idToNeedSchedulerRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect( - Collectors.toMap(task -> task.getSignature(), task -> task))); + routineLoadTaskList.parallelStream().forEach(entity -> needSchedulerRoutineLoadTasks.add(entity)); + routineLoadTaskList.parallelStream().forEach(entity -> + taskIdToJobId.put(entity.getSignature(), routineLoadJobId)); } finally { writeUnlock(); } } - public void removeRoutineLoadTasks(List routineLoadTasks) { + public void removeRoutineLoadTasks(List routineLoadTasks) { if (routineLoadTasks != null) { writeLock(); try { routineLoadTasks.parallelStream().forEach(task -> idToRoutineLoadTask.remove(task.getSignature())); routineLoadTasks.parallelStream().forEach(task -> - idToNeedSchedulerRoutineLoadTask.remove(task.getSignature())); + needSchedulerRoutineLoadTasks.remove(task)); + routineLoadTasks.parallelStream().forEach(task -> taskIdToJobId.remove(task.getSignature())); } finally { writeUnlock(); } } } - public Map getIdToNeedSchedulerRoutineLoadTasks() { + public int getClusterIdleSlotNum() { + readLock(); + try { + int result = 0; + initBeIdToMaxConcurrentTasks(); + for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { + if (beIdToConcurrentTasks.get(entry.getKey()) == null) { + result += entry.getValue(); + } else { + result += entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); + } + } + return result; + } finally { + readUnlock(); + } + } + + public long getMinTaskBeId() { + readLock(); + try { + long result = 0L; + int maxIdleSlotNum = 0; + initBeIdToMaxConcurrentTasks(); + for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { + if (beIdToConcurrentTasks.get(entry.getKey()) == null) { + result = maxIdleSlotNum < entry.getValue() ? entry.getKey() : result; + maxIdleSlotNum = Math.max(maxIdleSlotNum, entry.getValue()); + } else { + int idelTaskNum = entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); + result = maxIdleSlotNum < idelTaskNum ? entry.getKey() : result; + maxIdleSlotNum = Math.max(maxIdleSlotNum, idelTaskNum); + } + } + return result; + } finally { + readUnlock(); + } + } + + public Queue getNeedSchedulerRoutineLoadTasks() { + readLock(); + try { + return needSchedulerRoutineLoadTasks; + } finally { + readUnlock(); + } + } + + public RoutineLoadJob getJobByTaskId(long taskId) { readLock(); try { - return idToNeedSchedulerRoutineLoadTask; + return idToRoutineLoadJob.get(taskIdToJobId.get(taskId)); } finally { readUnlock(); } @@ -251,6 +360,34 @@ public void updateRoutineLoadJobState(RoutineLoadJob routineLoadJob, RoutineLoad } } + public void processTimeOutTasks() { + writeLock(); + try { + List runningTasks = new ArrayList<>(idToRoutineLoadTask.values()); + runningTasks.removeAll(needSchedulerRoutineLoadTasks); + + for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) { + if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) + > DEFAULT_TASK_TIMEOUT_MINUTES * 60 * 1000) { + long oldSignature = routineLoadTaskInfo.getSignature(); + if (routineLoadTaskInfo instanceof KafkaTaskInfo) { + // remove old task + idToRoutineLoadTask.remove(routineLoadTaskInfo.getSignature()); + // add new task + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo((KafkaTaskInfo) routineLoadTaskInfo); + idToRoutineLoadTask.put(kafkaTaskInfo.getSignature(), kafkaTaskInfo); + needSchedulerRoutineLoadTasks.add(kafkaTaskInfo); + } + LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled", + oldSignature, DEFAULT_TASK_TIMEOUT_MINUTES); + } + + } + } finally { + writeUnlock(); + } + } + private void checkStateTransform(RoutineLoadJob.JobState currentState, RoutineLoadJob.JobState desireState) throws LoadException { if (currentState == RoutineLoadJob.JobState.PAUSED && desireState == RoutineLoadJob.JobState.NEED_SCHEDULER) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 6c3cdefdf0a975..276494b84b885f 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -30,10 +30,11 @@ public class RoutineLoadScheduler extends Daemon { private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class); - private RoutineLoad routineLoad = Catalog.getInstance().getRoutineLoadInstance(); + private RoutineLoadManager routineLoadManager = Catalog.getInstance().getRoutineLoadInstance(); @Override protected void runOneCycle() { + // update // get need scheduler routine jobs List routineLoadJobList = null; try { @@ -45,38 +46,37 @@ protected void runOneCycle() { LOG.debug("there are {} job need scheduler", routineLoadJobList.size()); for (RoutineLoadJob routineLoadJob : routineLoadJobList) { // judge nums of tasks more then max concurrent tasks of cluster - List routineLoadTaskList = null; + List routineLoadTaskList = null; + routineLoadJob.writeLock(); try { - routineLoadJob.writeLock(); - if (routineLoadJob.getState() == RoutineLoadJob.JobState.NEED_SCHEDULER) { int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); - int totalTaskNum = currentConcurrentTaskNum + routineLoad.getIdToRoutineLoadTask().size(); - if (totalTaskNum > routineLoad.getTotalMaxConcurrentTaskNum()) { + int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getIdToRoutineLoadTask().size(); + if (totalTaskNum > routineLoadManager.getTotalMaxConcurrentTaskNum()) { LOG.info("job {} concurrent task num {}, current total task num {}. " + "desired total task num {} more then total max task num {}, " + "skip this turn of scheduler", routineLoadJob.getId(), currentConcurrentTaskNum, - routineLoad.getIdToRoutineLoadTask().size(), - totalTaskNum, routineLoad.getTotalMaxConcurrentTaskNum()); + routineLoadManager.getIdToRoutineLoadTask().size(), + totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum()); break; } // divide job into tasks routineLoadTaskList = routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); // update tasks meta - routineLoad.addRoutineLoadTasks(routineLoadTaskList); - routineLoad.addNeedSchedulerRoutineLoadTasks(routineLoadTaskList); + routineLoadManager.addRoutineLoadTasks(routineLoadTaskList); + routineLoadManager.addNeedSchedulerRoutineLoadTasks(routineLoadTaskList, routineLoadJob.getId()); // change job state to running - routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.RUNNING); + routineLoadManager.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.RUNNING); } } catch (MetaNotFoundException e) { - routineLoad.updateRoutineLoadJobStateNoValid(routineLoadJob, RoutineLoadJob.JobState.CANCELLED); + routineLoadManager.updateRoutineLoadJobStateNoValid(routineLoadJob, RoutineLoadJob.JobState.CANCELLED); } catch (LoadException e) { LOG.error("failed to scheduler job {} with error massage {}", routineLoadJob.getId(), e.getMessage(), e); - routineLoad.removeRoutineLoadTasks(routineLoadTaskList); + routineLoadManager.removeRoutineLoadTasks(routineLoadTaskList); } finally { routineLoadJob.writeUnlock(); } @@ -85,7 +85,7 @@ protected void runOneCycle() { } private List getNeedSchedulerRoutineJobs() throws LoadException { - return routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER); + return routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java index 184feee880db95..a89ce5f84f094d 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java @@ -21,10 +21,22 @@ import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TTaskType; -public class RoutineLoadTask extends AgentTask{ +public class RoutineLoadTask extends AgentTask { + + private String columns; + private String where; + private String columnSeparator; + private RoutineLoadJob.DataSourceType dataSourceType; + public RoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, - long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) { + long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature, + String columns, String where, String columnSeparator, + RoutineLoadJob.DataSourceType dataSourceType) { super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature); + this.columns = columns; + this.where = where; + this.columnSeparator = columnSeparator; + this.dataSourceType = dataSourceType; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java new file mode 100644 index 00000000000000..70fc27e39204ad --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Routine load task info is the task info include the only id (signature). + * For the kafka type of task info, it also include partitions which will be obtained data in this task. + * The routine load task info and routine load task are the same thing logically. + * Differently, routine load task is a agent task include backendId which will execute this task. + */ +public class RoutineLoadTaskInfo { + + private long signature; + + private long createTimeMs; + private long loadStartTimeMs; + + public RoutineLoadTaskInfo(long signature) { + this.signature = signature; + this.createTimeMs = System.currentTimeMillis(); + } + + public long getSignature() { + return signature; + } + + public void setLoadStartTimeMs(long loadStartTimeMs) { + this.loadStartTimeMs = loadStartTimeMs; + } + + public long getLoadStartTimeMs() { + return loadStartTimeMs; + } +} diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java new file mode 100644 index 00000000000000..07804b881e8605 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.util.Daemon; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Queue; + +/** + * Routine load task scheduler is a function which allocate task to be. + * Step1: get total idle task num of backends. + * Step2: equally divide to be + * Step3: submit tasks to be + */ +public class RoutineLoadTaskScheduler extends Daemon { + + private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class); + + private RoutineLoadManager routineLoadManager = Catalog.getInstance().getRoutineLoadInstance(); + + @Override + protected void runOneCycle() { + try { + process(); + } catch (Throwable e) { + LOG.error("Failed to process one round of RoutineLoadTaskScheduler with error message {}", + e.getMessage(), e); + } + } + + private void process() { + // update current beIdMaps for tasks + routineLoadManager.updateBeIdTaskMaps(); + + // check timeout tasks + routineLoadManager.processTimeOutTasks(); + + // get idle be task num + int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum(); + int scheduledTaskNum = 0; + Queue routineLoadTaskList = routineLoadManager.getNeedSchedulerRoutineLoadTasks(); + AgentBatchTask batchTask = new AgentBatchTask(); + + // allocate task to be + while (clusterIdleSlotNum > 0) { + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskList.poll(); + // queue is not empty + if (routineLoadTaskInfo != null) { + // when routine load task is not abandoned + if (routineLoadManager.getIdToRoutineLoadTask().get(routineLoadTaskInfo.getSignature()) != null) { + long beId = routineLoadManager.getMinTaskBeId(); + RoutineLoadJob routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getSignature()); + RoutineLoadTask routineLoadTask = routineLoadJob.createTask(routineLoadTaskInfo, beId); + if (routineLoadTask != null) { + routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); + AgentTaskQueue.addTask(routineLoadTask); + batchTask.addTask(routineLoadTask); + clusterIdleSlotNum--; + scheduledTaskNum++; + routineLoadManager.addNumOfConcurrentTasksByBeId(beId); + } + } else { + LOG.debug("Task {} for job has been already discarded", routineLoadTaskInfo.getSignature()); + } + } else { + LOG.debug("The queue of need scheduler tasks is empty."); + break; + } + } + LOG.info("{} tasks have bean allocated to be. There are {} remaining idle slot in cluster.", + scheduledTaskNum, routineLoadManager.getClusterIdleSlotNum()); + + if (batchTask.getTaskNum() > 0) { + AgentTaskExecutor.submit(batchTask); + } + } +} diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index ab88bff6be6ee3..1ce9644bb1bd25 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -19,7 +19,6 @@ import com.google.common.collect.Lists; import mockit.Deencapsulation; -import mockit.Delegate; import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; @@ -89,15 +88,15 @@ public void testDivideRoutineLoadJob() { Deencapsulation.setField(kafkaRoutineLoadJob, "kafkaPartitions", Arrays.asList(1, 4, 6)); - List result = kafkaRoutineLoadJob.divideRoutineLoadJob(2); + List result = kafkaRoutineLoadJob.divideRoutineLoadJob(2); Assert.assertEquals(2, result.size()); - for (RoutineLoadTask routineLoadTask : result) { - KafkaRoutineLoadTask kafkaRoutineLoadTask = (KafkaRoutineLoadTask) routineLoadTask; - if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 2) { - Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(1)); - Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(6)); - } else if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 1) { - Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(4)); + for (RoutineLoadTaskInfo routineLoadTaskInfo : result) { + KafkaTaskInfo kafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; + if (kafkaTaskInfo.getPartitions().size() == 2) { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(1)); + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(6)); + } else if (kafkaTaskInfo.getPartitions().size() == 1) { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(4)); } else { Assert.fail(); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java new file mode 100644 index 00000000000000..8e50e701410844 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import com.google.common.collect.Lists; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import mockit.Verifications; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.SystemIdGenerator; +import org.apache.doris.system.SystemInfoService; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +public class RoutineLoadManagerTest { + + private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; + private static final int DEFAULT_TASK_TIMEOUT_MINUTES = 5; + + @Mocked + private SystemInfoService systemInfoService; + + @Test + public void testGetMinTaskBeId() { + List beIds = Lists.newArrayList(); + beIds.add(1L); + beIds.add(2L); + + new Expectations() { + { + systemInfoService.getBackendIds(true); + result = beIds; + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + } + }; + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addNumOfConcurrentTasksByBeId(1L); + Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId()); + } + + @Test + public void testGetTotalIdleTaskNum() { + List beIds = Lists.newArrayList(); + beIds.add(1L); + beIds.add(2L); + + new Expectations() { + { + systemInfoService.getBackendIds(true); + result = beIds; + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + } + }; + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addNumOfConcurrentTasksByBeId(1L); + Assert.assertEquals(DEFAULT_BE_CONCURRENT_TASK_NUM * 2 - 1, routineLoadManager.getClusterIdleSlotNum()); + } + + @Test + public void testUpdateBeIdTaskMaps() { + List oldBeIds = Lists.newArrayList(); + oldBeIds.add(1L); + oldBeIds.add(2L); + + List newBeIds = Lists.newArrayList(); + newBeIds.add(1L); + newBeIds.add(3L); + + new Expectations() { + { + systemInfoService.getBackendIds(true); + returns(oldBeIds, newBeIds); + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + } + }; + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.updateBeIdTaskMaps(); + } + + @Test + public void testProcessTimeOutTasks() { + List routineLoadTaskInfoList = new ArrayList<>(); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(1L); + kafkaTaskInfo.addKafkaPartition(100); + kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_MINUTES * 60 * 1000); + routineLoadTaskInfoList.add(kafkaTaskInfo); + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addRoutineLoadTasks(routineLoadTaskInfoList); + + + new MockUp() { + @Mock + public long getNextId() { + return 2L; + } + }; + + + routineLoadManager.processTimeOutTasks(); + new Verifications() { + { + Map idToRoutineLoadTask = + Deencapsulation.getField(routineLoadManager, "idToRoutineLoadTask"); + Assert.assertNull(idToRoutineLoadTask.get(1L)); + Assert.assertEquals(1, idToRoutineLoadTask.size()); + Queue needSchedulerTask = + Deencapsulation.getField(routineLoadManager, "needSchedulerRoutineLoadTask"); + RoutineLoadTaskInfo routineLoadTaskInfo = needSchedulerTask.poll(); + Assert.assertNotNull(routineLoadTaskInfo); + Assert.assertEquals(100, (int) ((KafkaTaskInfo) routineLoadTaskInfo).getPartitions().get(0)); + } + }; + } + +} diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 2a9e4de35382c6..9940b1b92b6a70 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -41,8 +41,8 @@ public class RoutineLoadSchedulerTest { @Test public void testNormalRunOneCycle() throws LoadException, MetaNotFoundException { int taskNum = 1; - List routineLoadTaskList = new ArrayList<>(); - KafkaRoutineLoadTask kafkaRoutineLoadTask = EasyMock.createNiceMock(KafkaRoutineLoadTask.class); + List routineLoadTaskList = new ArrayList<>(); + KafkaTaskInfo kafkaRoutineLoadTask = EasyMock.createNiceMock(KafkaTaskInfo.class); EasyMock.expect(kafkaRoutineLoadTask.getSignature()).andReturn(1L).anyTimes(); EasyMock.replay(kafkaRoutineLoadTask); routineLoadTaskList.add(kafkaRoutineLoadTask); @@ -66,21 +66,21 @@ public void testNormalRunOneCycle() throws LoadException, MetaNotFoundException PowerMock.replay(Catalog.class); - RoutineLoad routineLoad = new RoutineLoad(); + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); EasyMock.expect(catalog.getEditLog()).andReturn(editLog).anyTimes(); - EasyMock.expect(catalog.getRoutineLoadInstance()).andReturn(routineLoad).anyTimes(); + EasyMock.expect(catalog.getRoutineLoadInstance()).andReturn(routineLoadManager).anyTimes(); EasyMock.replay(catalog); - routineLoad.addRoutineLoadJob(routineLoadJob); - routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.NEED_SCHEDULER); + routineLoadManager.addRoutineLoadJob(routineLoadJob); + routineLoadManager.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.NEED_SCHEDULER); RoutineLoadScheduler routineLoadScheduler = new RoutineLoadScheduler(); routineLoadScheduler.runOneCycle(); - Assert.assertEquals(1, routineLoad.getIdToRoutineLoadTask().size()); - Assert.assertEquals(1, routineLoad.getIdToNeedSchedulerRoutineLoadTasks().size()); - Assert.assertEquals(1, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING).size()); - Assert.assertEquals(0, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER).size()); + Assert.assertEquals(1, routineLoadManager.getIdToRoutineLoadTask().size()); + Assert.assertEquals(1, routineLoadManager.getNeedSchedulerRoutineLoadTasks().size()); + Assert.assertEquals(1, routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING).size()); + Assert.assertEquals(0, routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER).size()); } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java new file mode 100644 index 00000000000000..69d3c6b7e3f09c --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; +import mockit.Verifications; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.task.AgentTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TTaskType; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.Queue; + +public class RoutineLoadTaskSchedulerTest { + + @Mocked + private RoutineLoadManager routineLoadManager; + @Mocked + private Catalog catalog; + @Mocked + private AgentTaskExecutor agentTaskExecutor; + + @Test + public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1) { + long beId = 100L; + + Queue routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(1L); + routineLoadTaskInfo1.addKafkaPartition(1); + routineLoadTaskInfo1.addKafkaPartition(2); + routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); + + Map idToRoutineLoadTask = Maps.newHashMap(); + idToRoutineLoadTask.put(1L, routineLoadTaskInfo1); + + Map partitionIdToOffset = Maps.newHashMap(); + partitionIdToOffset.put(1, 100L); + partitionIdToOffset.put(2, 200L); + KafkaProgress kafkaProgress = new KafkaProgress(); + kafkaProgress.setPartitionIdToOffset(partitionIdToOffset); + + new Expectations() { + { + Catalog.getInstance(); + result = catalog; + catalog.getRoutineLoadInstance(); + result = routineLoadManager; + + routineLoadManager.getClusterIdleSlotNum(); + result = 3; + routineLoadManager.getNeedSchedulerRoutineLoadTasks(); + result = routineLoadTaskInfoQueue; + routineLoadManager.getIdToRoutineLoadTask(); + result = idToRoutineLoadTask; + + kafkaRoutineLoadJob1.getDbId(); + result = 1L; + kafkaRoutineLoadJob1.getTableId(); + result = 1L; + kafkaRoutineLoadJob1.getColumns(); + result = "columns"; + kafkaRoutineLoadJob1.getColumnSeparator(); + result = ""; + kafkaRoutineLoadJob1.getProgress(); + result = kafkaProgress; + + + routineLoadManager.getMinTaskBeId(); + result = beId; + routineLoadManager.getJobByTaskId(1L); + result = kafkaRoutineLoadJob1; + } + }; + + RoutineLoadTaskScheduler routineLoadTaskScheduler = new RoutineLoadTaskScheduler(); + routineLoadTaskScheduler.runOneCycle(); + + new Verifications() { + { + AgentTask routineLoadTask = + AgentTaskQueue.getTask(beId, TTaskType.STREAM_LOAD, routineLoadTaskInfo1.getSignature()); + + Assert.assertEquals(beId, routineLoadTask.getBackendId()); + Assert.assertEquals(100L, + (long) ((KafkaRoutineLoadTask) routineLoadTask).getPartitionIdToOffset().get(1)); + Assert.assertEquals(200L, + (long) ((KafkaRoutineLoadTask) routineLoadTask).getPartitionIdToOffset().get(2)); + + routineLoadManager.addNumOfConcurrentTasksByBeId(beId); + times = 1; + } + }; + } +} diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 90e07e012e9369..15271c3997a8bb 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -155,7 +155,8 @@ enum TTaskType { PUBLISH_VERSION, CLEAR_ALTER_TASK, CLEAR_TRANSACTION_TASK, - RECOVER_TABLET + RECOVER_TABLET, + STREAM_LOAD } enum TStmtType { From caf4c93bce7f58bdc6fe14725c349b67d0e03c5d Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 22 Nov 2018 19:04:04 +0800 Subject: [PATCH 2/4] Move RoutineLoadTaskInfo from RoutineLoadManager to RoutineLoadJob --- .../load/routineload/KafkaRoutineLoadJob.java | 34 ++- .../routineload/KafkaRoutineLoadTask.java | 2 +- .../doris/load/routineload/KafkaTaskInfo.java | 7 +- .../load/routineload/RoutineLoadJob.java | 85 +++++- .../load/routineload/RoutineLoadManager.java | 266 ++++-------------- .../routineload/RoutineLoadScheduler.java | 54 ++-- .../load/routineload/RoutineLoadTask.java | 10 +- .../load/routineload/RoutineLoadTaskInfo.java | 17 +- .../routineload/RoutineLoadTaskScheduler.java | 44 +-- .../routineload/KafkaRoutineLoadJobTest.java | 54 +++- .../routineload/RoutineLoadManagerTest.java | 40 +-- .../routineload/RoutineLoadSchedulerTest.java | 105 ++++--- .../RoutineLoadTaskSchedulerTest.java | 46 ++- 13 files changed, 381 insertions(+), 383 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index bba1c5d5f48980..b59bd12513d24b 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.UUID; public class KafkaRoutineLoadJob extends RoutineLoadJob { private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class); @@ -49,7 +50,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { public KafkaRoutineLoadJob() { } - public KafkaRoutineLoadJob(long id, String name, String userName, long dbId, long tableId, + public KafkaRoutineLoadJob(String id, String name, String userName, long dbId, long tableId, String partitions, String columns, String where, String columnSeparator, int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType, int maxErrorNum, TResourceInfo resourceInfo, String serverAddress, String topic) { @@ -65,17 +66,28 @@ public KafkaProgress getProgress() { } @Override - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { - // divide kafkaPartitions into tasks - List kafkaRoutineLoadTaskList = new ArrayList<>(); - for (int i = 0; i < currentConcurrentTaskNum; i++) { - kafkaRoutineLoadTaskList.add(new KafkaTaskInfo(SystemIdGenerator.getNextId())); - } - for (int i = 0; i < kafkaPartitions.size(); i++) { - ((KafkaTaskInfo) kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum)) - .addKafkaPartition(kafkaPartitions.get(i)); + public void divideRoutineLoadJob(int currentConcurrentTaskNum) { + writeLock(); + try { + if (state == JobState.NEED_SCHEDULER) { + // divide kafkaPartitions into tasks + for (int i = 0; i < currentConcurrentTaskNum; i++) { + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID().toString(), id); + routineLoadTaskInfoList.add(kafkaTaskInfo); + needSchedulerTaskInfoList.add(kafkaTaskInfo); + } + for (int i = 0; i < kafkaPartitions.size(); i++) { + ((KafkaTaskInfo) routineLoadTaskInfoList.get(i % currentConcurrentTaskNum)) + .addKafkaPartition(kafkaPartitions.get(i)); + } + // change job state to running + state = JobState.RUNNING; + } else { + LOG.debug("Ignore to divide routine load job while job state {}", state); + } + } finally { + writeUnlock(); } - return kafkaRoutineLoadTaskList; } @Override diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java index c561563a693f4a..fe839ee879a7e5 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java @@ -33,7 +33,7 @@ public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, String columns, String where, String columnSeparator, KafkaTaskInfo kafkaTaskInfo, KafkaProgress kafkaProgress) { super(resourceInfo, backendId, TTaskType.STREAM_LOAD, dbId, tableId, partitionId, indexId, tabletId, - kafkaTaskInfo.getSignature(), columns, where, columnSeparator, RoutineLoadJob.DataSourceType.KAFKA); + kafkaTaskInfo.getId(), columns, where, columnSeparator, RoutineLoadJob.DataSourceType.KAFKA); this.partitionIdToOffset = new HashMap<>(); kafkaTaskInfo.getPartitions().parallelStream().forEach(entity -> partitionIdToOffset.put(entity, kafkaProgress.getPartitionIdToOffset().get(entity))); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 01bedebcf2cf4e..7fe6b885d66726 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -21,18 +21,19 @@ import java.util.ArrayList; import java.util.List; +import java.util.UUID; public class KafkaTaskInfo extends RoutineLoadTaskInfo { private List partitions; - public KafkaTaskInfo(long signature) { - super(signature); + public KafkaTaskInfo(String id, String jobId) { + super(id, jobId); this.partitions = new ArrayList<>(); } public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) { - super(SystemIdGenerator.getNextId()); + super(UUID.randomUUID().toString(), kafkaTaskInfo.getJobId()); this.partitions = kafkaTaskInfo.getPartitions(); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 97998cfb18f3b0..2b708837c300cc 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -17,13 +17,17 @@ package org.apache.doris.load.routineload; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Writable; import org.apache.doris.thrift.TResourceInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -36,6 +40,10 @@ */ public abstract class RoutineLoadJob implements Writable { + private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class); + + private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; + public enum JobState { NEED_SCHEDULER, RUNNING, @@ -48,7 +56,7 @@ public enum DataSourceType { KAFKA } - protected long id; + protected String id; protected String name; protected String userName; protected long dbId; @@ -63,6 +71,11 @@ public enum DataSourceType { // max number of error data in ten thousand data protected int maxErrorNum; protected String progress; + + // The tasks belong to this job + protected List routineLoadTaskInfoList; + protected List needSchedulerTaskInfoList; + protected ReentrantReadWriteLock lock; // TODO(ml): error sample @@ -70,7 +83,7 @@ public enum DataSourceType { public RoutineLoadJob() { } - public RoutineLoadJob(long id, String name, String userName, long dbId, long tableId, + public RoutineLoadJob(String id, String name, String userName, long dbId, long tableId, String partitions, String columns, String where, String columnSeparator, int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType, int maxErrorNum, TResourceInfo resourceInfo) { @@ -89,6 +102,8 @@ public RoutineLoadJob(long id, String name, String userName, long dbId, long tab this.maxErrorNum = maxErrorNum; this.resourceInfo = resourceInfo; this.progress = ""; + this.routineLoadTaskInfoList = new ArrayList<>(); + this.needSchedulerTaskInfoList = new ArrayList<>(); lock = new ReentrantReadWriteLock(true); } @@ -111,7 +126,7 @@ public void writeUnlock() { // thrift object private TResourceInfo resourceInfo; - public long getId() { + public String getId() { return id; } @@ -147,8 +162,57 @@ public TResourceInfo getResourceInfo() { return resourceInfo; } - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { - return null; + public int getSizeOfRoutineLoadTaskInfoList() { + readLock(); + try { + return routineLoadTaskInfoList.size(); + } finally { + readUnlock(); + } + + } + + public List getNeedSchedulerTaskInfoList() { + return needSchedulerTaskInfoList; + } + + public void updateState(JobState jobState) { + writeLock(); + try { + state = jobState; + } finally { + writeUnlock(); + } + } + + public void processTimeoutTasks() { + writeLock(); + try { + List runningTasks = new ArrayList<>(routineLoadTaskInfoList); + runningTasks.removeAll(needSchedulerTaskInfoList); + + for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) { + if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) + > DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000) { + String oldSignature = routineLoadTaskInfo.getId(); + if (routineLoadTaskInfo instanceof KafkaTaskInfo) { + // remove old task + routineLoadTaskInfoList.remove(routineLoadTaskInfo); + // add new task + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo((KafkaTaskInfo) routineLoadTaskInfo); + routineLoadTaskInfoList.add(kafkaTaskInfo); + needSchedulerTaskInfoList.add(kafkaTaskInfo); + } + LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled", + oldSignature, DEFAULT_TASK_TIMEOUT_SECONDS); + } + } + } finally { + writeUnlock(); + } + } + + public void divideRoutineLoadJob(int currentConcurrentTaskNum) { } public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { @@ -166,4 +230,15 @@ public void readFields(DataInput in) throws IOException { } abstract RoutineLoadTask createTask(RoutineLoadTaskInfo routineLoadTaskInfo, long beId); + + private void checkStateTransform(RoutineLoadJob.JobState currentState, RoutineLoadJob.JobState desireState) + throws LoadException { + if (currentState == RoutineLoadJob.JobState.PAUSED && desireState == RoutineLoadJob.JobState.NEED_SCHEDULER) { + throw new LoadException("could not transform " + currentState + " to " + desireState); + } else if (currentState == RoutineLoadJob.JobState.CANCELLED || + currentState == RoutineLoadJob.JobState.STOPPED) { + throw new LoadException("could not transform " + currentState + " to " + desireState); + } + } + } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 36042693799777..eb1e7e4d9c3ecd 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -35,24 +35,13 @@ public class RoutineLoadManager { private static final Logger LOG = LogManager.getLogger(RoutineLoadManager.class); private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; - private static final int DEFAULT_TASK_TIMEOUT_MINUTES = 5; // Long is beId, integer is the size of tasks in be private Map beIdToMaxConcurrentTasks; private Map beIdToConcurrentTasks; // stream load job meta - private Map idToRoutineLoadJob; - private Map idToNeedSchedulerRoutineLoadJob; - private Map idToRunningRoutineLoadJob; - private Map idToCancelledRoutineLoadJob; - - // stream load tasks meta (not persistent) - private Map idToRoutineLoadTask; - // KafkaPartitions means that partitions belong to one task - // kafka partitions == routine load task (logical) - private Queue needSchedulerRoutineLoadTasks; - private Map taskIdToJobId; + private Map idToRoutineLoadJob; private ReentrantReadWriteLock lock; @@ -73,14 +62,8 @@ private void writeUnlock() { } public RoutineLoadManager() { - idToRoutineLoadJob = Maps.newHashMap(); - idToNeedSchedulerRoutineLoadJob = Maps.newHashMap(); - idToRunningRoutineLoadJob = Maps.newHashMap(); - idToCancelledRoutineLoadJob = Maps.newHashMap(); - idToRoutineLoadTask = Maps.newHashMap(); - needSchedulerRoutineLoadTasks = Queues.newLinkedBlockingQueue(); + idToRoutineLoadJob = Maps.newConcurrentMap(); beIdToConcurrentTasks = Maps.newHashMap(); - taskIdToJobId = Maps.newHashMap(); lock = new ReentrantReadWriteLock(true); } @@ -142,56 +125,15 @@ public void addNumOfConcurrentTasksByBeId(long beId) { } public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) { - writeLock(); - try { - idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob); - } finally { - writeUnlock(); - } - } - - public void addRoutineLoadTasks(List routineLoadTaskList) { - writeLock(); - try { - idToRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect( - Collectors.toMap(task -> task.getSignature(), task -> task))); - } finally { - writeUnlock(); - } - } - - public Map getIdToRoutineLoadTask() { - readLock(); - try { - return idToRoutineLoadTask; - } finally { - readUnlock(); - } - } - - public void addNeedSchedulerRoutineLoadTasks(List routineLoadTaskList, long routineLoadJobId) { - writeLock(); - try { - routineLoadTaskList.parallelStream().forEach(entity -> needSchedulerRoutineLoadTasks.add(entity)); - routineLoadTaskList.parallelStream().forEach(entity -> - taskIdToJobId.put(entity.getSignature(), routineLoadJobId)); - } finally { - writeUnlock(); - } + idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob); } - public void removeRoutineLoadTasks(List routineLoadTasks) { - if (routineLoadTasks != null) { - writeLock(); - try { - routineLoadTasks.parallelStream().forEach(task -> idToRoutineLoadTask.remove(task.getSignature())); - routineLoadTasks.parallelStream().forEach(task -> - needSchedulerRoutineLoadTasks.remove(task)); - routineLoadTasks.parallelStream().forEach(task -> taskIdToJobId.remove(task.getSignature())); - } finally { - writeUnlock(); - } + public int getSizeOfIdToRoutineLoadTask() { + int sizeOfTasks = 0; + for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { + sizeOfTasks += routineLoadJob.getSizeOfRoutineLoadTaskInfoList(); } + return sizeOfTasks; } public int getClusterIdleSlotNum() { @@ -212,10 +154,10 @@ public int getClusterIdleSlotNum() { } } - public long getMinTaskBeId() { + public long getMinTaskBeId() throws LoadException { readLock(); try { - long result = 0L; + long result = -1L; int maxIdleSlotNum = 0; initBeIdToMaxConcurrentTasks(); for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { @@ -228,173 +170,65 @@ public long getMinTaskBeId() { maxIdleSlotNum = Math.max(maxIdleSlotNum, idelTaskNum); } } + if (result < 0) { + throw new LoadException("There is no empty slot in cluster"); + } return result; } finally { readUnlock(); } } - public Queue getNeedSchedulerRoutineLoadTasks() { - readLock(); - try { - return needSchedulerRoutineLoadTasks; - } finally { - readUnlock(); + public List getNeedSchedulerRoutineLoadTasks() { + List routineLoadTaskInfoList = new ArrayList<>(); + for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { + routineLoadTaskInfoList.addAll(routineLoadJob.getNeedSchedulerTaskInfoList()); } + return routineLoadTaskInfoList; } - public RoutineLoadJob getJobByTaskId(long taskId) { - readLock(); - try { - return idToRoutineLoadJob.get(taskIdToJobId.get(taskId)); - } finally { - readUnlock(); - } + public RoutineLoadJob getJob(String jobId) { + return idToRoutineLoadJob.get(jobId); } public List getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException { List jobs = new ArrayList<>(); Collection stateJobs = null; - readLock(); LOG.debug("begin to get routine load job by state {}", jobState.name()); - try { - switch (jobState) { - case NEED_SCHEDULER: - stateJobs = idToNeedSchedulerRoutineLoadJob.values(); - break; - case PAUSED: - throw new LoadException("not support getting paused routine load jobs"); - case RUNNING: - stateJobs = idToRunningRoutineLoadJob.values(); - break; - case STOPPED: - throw new LoadException("not support getting stopped routine load jobs"); - default: - break; - } - if (stateJobs != null) { - jobs.addAll(stateJobs); - LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name()); - } - } finally { - readUnlock(); + switch (jobState) { + case NEED_SCHEDULER: + stateJobs = idToRoutineLoadJob.values().stream() + .filter(entity -> entity.getState() == RoutineLoadJob.JobState.NEED_SCHEDULER) + .collect(Collectors.toList()); + break; + case PAUSED: + stateJobs = idToRoutineLoadJob.values().stream() + .filter(entity -> entity.getState() == RoutineLoadJob.JobState.PAUSED) + .collect(Collectors.toList()); + break; + case RUNNING: + stateJobs = idToRoutineLoadJob.values().stream() + .filter(entity -> entity.getState() == RoutineLoadJob.JobState.RUNNING) + .collect(Collectors.toList()); + break; + case STOPPED: + stateJobs = idToRoutineLoadJob.values().stream() + .filter(entity -> entity.getState() == RoutineLoadJob.JobState.STOPPED) + .collect(Collectors.toList()); + break; + default: + break; + } + if (stateJobs != null) { + jobs.addAll(stateJobs); + LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name()); } return jobs; } - public void updateRoutineLoadJobStateNoValid(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState) { - writeLock(); - try { - RoutineLoadJob.JobState srcJobState = routineLoadJob.getState(); - long jobId = routineLoadJob.getId(); - LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState); - switch (jobState) { - case NEED_SCHEDULER: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob); - break; - case PAUSED: - idToNeedSchedulerRoutineLoadJob.remove(jobId); - idToRunningRoutineLoadJob.remove(jobId); - break; - case RUNNING: - idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob); - idToRunningRoutineLoadJob.put(jobId, routineLoadJob); - break; - case CANCELLED: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.remove(jobId); - idToCancelledRoutineLoadJob.put(jobId, routineLoadJob); - break; - case STOPPED: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.remove(jobId); - break; - default: - break; - } - routineLoadJob.setState(jobState); - Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob); - } finally { - writeUnlock(); - } - } - - public void updateRoutineLoadJobState(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState) - throws LoadException { - writeLock(); - try { - RoutineLoadJob.JobState srcJobState = routineLoadJob.getState(); - long jobId = routineLoadJob.getId(); - LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState); - checkStateTransform(srcJobState, jobState); - switch (jobState) { - case NEED_SCHEDULER: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob); - break; - case PAUSED: - idToNeedSchedulerRoutineLoadJob.remove(jobId); - idToRunningRoutineLoadJob.remove(jobId); - break; - case RUNNING: - idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob); - idToRunningRoutineLoadJob.put(jobId, routineLoadJob); - break; - case CANCELLED: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.remove(jobId); - idToCancelledRoutineLoadJob.put(jobId, routineLoadJob); - break; - case STOPPED: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.remove(jobId); - break; - default: - break; - } - routineLoadJob.setState(jobState); - Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob); - } finally { - writeUnlock(); - } - } - - public void processTimeOutTasks() { - writeLock(); - try { - List runningTasks = new ArrayList<>(idToRoutineLoadTask.values()); - runningTasks.removeAll(needSchedulerRoutineLoadTasks); - - for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) { - if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) - > DEFAULT_TASK_TIMEOUT_MINUTES * 60 * 1000) { - long oldSignature = routineLoadTaskInfo.getSignature(); - if (routineLoadTaskInfo instanceof KafkaTaskInfo) { - // remove old task - idToRoutineLoadTask.remove(routineLoadTaskInfo.getSignature()); - // add new task - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo((KafkaTaskInfo) routineLoadTaskInfo); - idToRoutineLoadTask.put(kafkaTaskInfo.getSignature(), kafkaTaskInfo); - needSchedulerRoutineLoadTasks.add(kafkaTaskInfo); - } - LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled", - oldSignature, DEFAULT_TASK_TIMEOUT_MINUTES); - } - - } - } finally { - writeUnlock(); - } - } - - private void checkStateTransform(RoutineLoadJob.JobState currentState, RoutineLoadJob.JobState desireState) - throws LoadException { - if (currentState == RoutineLoadJob.JobState.PAUSED && desireState == RoutineLoadJob.JobState.NEED_SCHEDULER) { - throw new LoadException("could not transform " + currentState + " to " + desireState); - } else if (currentState == RoutineLoadJob.JobState.CANCELLED || - currentState == RoutineLoadJob.JobState.STOPPED) { - throw new LoadException("could not transform " + currentState + " to " + desireState); + public void processTimeoutTasks() { + for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { + routineLoadJob.processTimeoutTasks(); } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 276494b84b885f..c2a86d0c5d34f4 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -34,6 +34,14 @@ public class RoutineLoadScheduler extends Daemon { @Override protected void runOneCycle() { + try { + process(); + } catch (Throwable e) { + LOG.error("failed to scheduler jobs with error massage {}", e.getMessage(), e); + } + } + + private void process() { // update // get need scheduler routine jobs List routineLoadJobList = null; @@ -45,43 +53,25 @@ protected void runOneCycle() { LOG.debug("there are {} job need scheduler", routineLoadJobList.size()); for (RoutineLoadJob routineLoadJob : routineLoadJobList) { - // judge nums of tasks more then max concurrent tasks of cluster - List routineLoadTaskList = null; - routineLoadJob.writeLock(); try { - if (routineLoadJob.getState() == RoutineLoadJob.JobState.NEED_SCHEDULER) { - int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); - int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getIdToRoutineLoadTask().size(); - if (totalTaskNum > routineLoadManager.getTotalMaxConcurrentTaskNum()) { - LOG.info("job {} concurrent task num {}, current total task num {}. " - + "desired total task num {} more then total max task num {}, " - + "skip this turn of scheduler", - routineLoadJob.getId(), currentConcurrentTaskNum, - routineLoadManager.getIdToRoutineLoadTask().size(), - totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum()); - break; - } - // divide job into tasks - routineLoadTaskList = routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); - - // update tasks meta - routineLoadManager.addRoutineLoadTasks(routineLoadTaskList); - routineLoadManager.addNeedSchedulerRoutineLoadTasks(routineLoadTaskList, routineLoadJob.getId()); - - // change job state to running - routineLoadManager.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.RUNNING); + // judge nums of tasks more then max concurrent tasks of cluster + int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); + int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getSizeOfIdToRoutineLoadTask(); + if (totalTaskNum > routineLoadManager.getTotalMaxConcurrentTaskNum()) { + LOG.info("job {} concurrent task num {}, current total task num {}. " + + "desired total task num {} more then total max task num {}, " + + "skip this turn of scheduler", + routineLoadJob.getId(), currentConcurrentTaskNum, + routineLoadManager.getSizeOfIdToRoutineLoadTask(), + totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum()); + break; } + // divide job into tasks + routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); } catch (MetaNotFoundException e) { - routineLoadManager.updateRoutineLoadJobStateNoValid(routineLoadJob, RoutineLoadJob.JobState.CANCELLED); - } catch (LoadException e) { - LOG.error("failed to scheduler job {} with error massage {}", routineLoadJob.getId(), - e.getMessage(), e); - routineLoadManager.removeRoutineLoadTasks(routineLoadTaskList); - } finally { - routineLoadJob.writeUnlock(); + routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED); } } - } private List getNeedSchedulerRoutineJobs() throws LoadException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java index a89ce5f84f094d..18f18aa97a410a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java @@ -17,12 +17,16 @@ package org.apache.doris.load.routineload; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.CatalogIdGenerator; +import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.task.AgentTask; import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TTaskType; public class RoutineLoadTask extends AgentTask { + private String id; private String columns; private String where; private String columnSeparator; @@ -30,10 +34,12 @@ public class RoutineLoadTask extends AgentTask { public RoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, - long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature, + long dbId, long tableId, long partitionId, long indexId, long tabletId, String id, String columns, String where, String columnSeparator, RoutineLoadJob.DataSourceType dataSourceType) { - super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature); + super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, + Catalog.getCurrentCatalog().getNextId()); + this.id = id; this.columns = columns; this.where = where; this.columnSeparator = columnSeparator; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 70fc27e39204ad..0eca2bb423a088 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -17,7 +17,6 @@ package org.apache.doris.load.routineload; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Routine load task info is the task info include the only id (signature). @@ -27,18 +26,24 @@ */ public class RoutineLoadTaskInfo { - private long signature; + private String id; + private String jobId; private long createTimeMs; private long loadStartTimeMs; - public RoutineLoadTaskInfo(long signature) { - this.signature = signature; + public RoutineLoadTaskInfo(String id, String jobId) { + this.id = id; + this.jobId = jobId; this.createTimeMs = System.currentTimeMillis(); } - public long getSignature() { - return signature; + public String getId() { + return id; + } + + public String getJobId() { + return jobId; } public void setLoadStartTimeMs(long loadStartTimeMs) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 07804b881e8605..46f7c6036b7772 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -18,6 +18,7 @@ package org.apache.doris.load.routineload; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; import org.apache.doris.common.util.Daemon; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -25,6 +26,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Iterator; +import java.util.List; import java.util.Queue; /** @@ -44,47 +47,44 @@ protected void runOneCycle() { try { process(); } catch (Throwable e) { - LOG.error("Failed to process one round of RoutineLoadTaskScheduler with error message {}", + LOG.warn("Failed to process one round of RoutineLoadTaskScheduler with error message {}", e.getMessage(), e); } } - private void process() { + private void process() throws LoadException { // update current beIdMaps for tasks routineLoadManager.updateBeIdTaskMaps(); // check timeout tasks - routineLoadManager.processTimeOutTasks(); + routineLoadManager.processTimeoutTasks(); // get idle be task num int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum(); int scheduledTaskNum = 0; - Queue routineLoadTaskList = routineLoadManager.getNeedSchedulerRoutineLoadTasks(); + List routineLoadTaskList = routineLoadManager.getNeedSchedulerRoutineLoadTasks(); + Iterator iterator = routineLoadTaskList.iterator(); AgentBatchTask batchTask = new AgentBatchTask(); // allocate task to be while (clusterIdleSlotNum > 0) { - RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskList.poll(); - // queue is not empty - if (routineLoadTaskInfo != null) { - // when routine load task is not abandoned - if (routineLoadManager.getIdToRoutineLoadTask().get(routineLoadTaskInfo.getSignature()) != null) { - long beId = routineLoadManager.getMinTaskBeId(); - RoutineLoadJob routineLoadJob = routineLoadManager.getJobByTaskId(routineLoadTaskInfo.getSignature()); - RoutineLoadTask routineLoadTask = routineLoadJob.createTask(routineLoadTaskInfo, beId); - if (routineLoadTask != null) { - routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); - AgentTaskQueue.addTask(routineLoadTask); - batchTask.addTask(routineLoadTask); - clusterIdleSlotNum--; - scheduledTaskNum++; - routineLoadManager.addNumOfConcurrentTasksByBeId(beId); - } + if (iterator.hasNext()) { + RoutineLoadTaskInfo routineLoadTaskInfo = iterator.next(); + long beId = routineLoadManager.getMinTaskBeId(); + RoutineLoadJob routineLoadJob = routineLoadManager.getJob(routineLoadTaskInfo.getJobId()); + RoutineLoadTask routineLoadTask = routineLoadJob.createTask(routineLoadTaskInfo, beId); + if (routineLoadTask != null) { + routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); + AgentTaskQueue.addTask(routineLoadTask); + batchTask.addTask(routineLoadTask); + clusterIdleSlotNum--; + scheduledTaskNum++; + routineLoadManager.addNumOfConcurrentTasksByBeId(beId); } else { - LOG.debug("Task {} for job has been already discarded", routineLoadTaskInfo.getSignature()); + LOG.debug("Task {} for job has been already discarded", routineLoadTaskInfo.getId()); } } else { - LOG.debug("The queue of need scheduler tasks is empty."); + LOG.debug("All of tasks were scheduled."); break; } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 1ce9644bb1bd25..db0480f7fd97d9 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -21,10 +21,14 @@ import mockit.Deencapsulation; import mockit.Expectations; import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; import mockit.Mocked; +import mockit.Verifications; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -36,9 +40,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Queue; public class KafkaRoutineLoadJobTest { + private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; + @Test public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer, @Injectable PartitionInfo partitionInfo1, @@ -70,7 +78,7 @@ public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer, } }; - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L, + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", "miaoling", 1L, 1L, "1L", "v1", "", "", 3, RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(), "", ""); @@ -81,14 +89,16 @@ public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer, @Test public void testDivideRoutineLoadJob() { - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L, + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", "miaoling", 1L, 1L, "1L", "v1", "", "", 3, RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(), "", ""); Deencapsulation.setField(kafkaRoutineLoadJob, "kafkaPartitions", Arrays.asList(1, 4, 6)); - List result = kafkaRoutineLoadJob.divideRoutineLoadJob(2); + kafkaRoutineLoadJob.divideRoutineLoadJob(2); + + List result = kafkaRoutineLoadJob.getNeedSchedulerTaskInfoList(); Assert.assertEquals(2, result.size()); for (RoutineLoadTaskInfo routineLoadTaskInfo : result) { KafkaTaskInfo kafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; @@ -102,4 +112,42 @@ public void testDivideRoutineLoadJob() { } } } + + @Test + public void testProcessTimeOutTasks() { + List routineLoadTaskInfoList = new ArrayList<>(); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo("1", "1"); + kafkaTaskInfo.addKafkaPartition(100); + kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000); + routineLoadTaskInfoList.add(kafkaTaskInfo); + + RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", "miaoling", 1L, + 1L, "1L", "v1", "", "", 3, + RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(), + "", ""); + Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList); + + new MockUp() { + @Mock + public long getNextId() { + return 2L; + } + }; + + + routineLoadJob.processTimeoutTasks(); + new Verifications() { + { + List idToRoutineLoadTask = + Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); + Assert.assertNotEquals("1", idToRoutineLoadTask.get(0).getId()); + Assert.assertEquals(1, idToRoutineLoadTask.size()); + List needSchedulerTask = + Deencapsulation.getField(routineLoadJob, "needSchedulerTaskInfoList"); + Assert.assertEquals(1, needSchedulerTask.size()); + Assert.assertEquals(100, (int) ((KafkaTaskInfo) (needSchedulerTask.get(0))) + .getPartitions().get(0)); + } + }; + } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 8e50e701410844..49f698e898877d 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -25,6 +25,7 @@ import mockit.Mocked; import mockit.Verifications; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.system.SystemInfoService; import org.junit.Assert; @@ -38,13 +39,12 @@ public class RoutineLoadManagerTest { private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; - private static final int DEFAULT_TASK_TIMEOUT_MINUTES = 5; @Mocked private SystemInfoService systemInfoService; @Test - public void testGetMinTaskBeId() { + public void testGetMinTaskBeId() throws LoadException { List beIds = Lists.newArrayList(); beIds.add(1L); beIds.add(2L); @@ -106,40 +106,4 @@ public void testUpdateBeIdTaskMaps() { routineLoadManager.updateBeIdTaskMaps(); } - @Test - public void testProcessTimeOutTasks() { - List routineLoadTaskInfoList = new ArrayList<>(); - KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(1L); - kafkaTaskInfo.addKafkaPartition(100); - kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_MINUTES * 60 * 1000); - routineLoadTaskInfoList.add(kafkaTaskInfo); - - RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.addRoutineLoadTasks(routineLoadTaskInfoList); - - - new MockUp() { - @Mock - public long getNextId() { - return 2L; - } - }; - - - routineLoadManager.processTimeOutTasks(); - new Verifications() { - { - Map idToRoutineLoadTask = - Deencapsulation.getField(routineLoadManager, "idToRoutineLoadTask"); - Assert.assertNull(idToRoutineLoadTask.get(1L)); - Assert.assertEquals(1, idToRoutineLoadTask.size()); - Queue needSchedulerTask = - Deencapsulation.getField(routineLoadManager, "needSchedulerRoutineLoadTask"); - RoutineLoadTaskInfo routineLoadTaskInfo = needSchedulerTask.poll(); - Assert.assertNotNull(routineLoadTaskInfo); - Assert.assertEquals(100, (int) ((KafkaTaskInfo) routineLoadTaskInfo).getPartitions().get(0)); - } - }; - } - } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 9940b1b92b6a70..8f6e22ecde8a6f 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -17,11 +17,20 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Lists; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.persist.EditLog; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TResourceInfo; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -34,53 +43,79 @@ import java.util.Arrays; import java.util.List; -@RunWith(PowerMockRunner.class) -@PrepareForTest({Catalog.class}) public class RoutineLoadSchedulerTest { @Test - public void testNormalRunOneCycle() throws LoadException, MetaNotFoundException { - int taskNum = 1; - List routineLoadTaskList = new ArrayList<>(); - KafkaTaskInfo kafkaRoutineLoadTask = EasyMock.createNiceMock(KafkaTaskInfo.class); - EasyMock.expect(kafkaRoutineLoadTask.getSignature()).andReturn(1L).anyTimes(); - EasyMock.replay(kafkaRoutineLoadTask); - routineLoadTaskList.add(kafkaRoutineLoadTask); + public void testNormalRunOneCycle(@Mocked Catalog catalog, + @Injectable RoutineLoadManager routineLoadManager, + @Injectable SystemInfoService systemInfoService, + @Injectable Database database) + throws LoadException, MetaNotFoundException { - KafkaRoutineLoadJob routineLoadJob = EasyMock.createNiceMock(KafkaRoutineLoadJob.class); - EasyMock.expect(routineLoadJob.calculateCurrentConcurrentTaskNum()).andReturn(taskNum).anyTimes(); - EasyMock.expect(routineLoadJob.divideRoutineLoadJob(taskNum)).andReturn(routineLoadTaskList).anyTimes(); - EasyMock.expect(routineLoadJob.getState()).andReturn(RoutineLoadJob.JobState.NEED_SCHEDULER).anyTimes(); - EasyMock.replay(routineLoadJob); + String clusterName = "cluster1"; + List beIds = Lists.newArrayList(); + beIds.add(1L); + beIds.add(2L); - SystemInfoService systemInfoService = EasyMock.createNiceMock(SystemInfoService.class); - List beIds = Arrays.asList(1L, 2L, 3L); - EasyMock.expect(systemInfoService.getBackendIds(true)).andReturn(beIds).anyTimes(); - EasyMock.replay(systemInfoService); + List partitions = Lists.newArrayList(); + partitions.add(100); + partitions.add(200); + partitions.add(300); + RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", "miaoling", 1L, + 1L, "1L", "v1", "", "", 3, + RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(), + "", ""); + routineLoadJob.setState(RoutineLoadJob.JobState.NEED_SCHEDULER); + List routineLoadJobList = new ArrayList<>(); + routineLoadJobList.add(routineLoadJob); - Catalog catalog = EasyMock.createNiceMock(Catalog.class); - EditLog editLog = EasyMock.createNiceMock(EditLog.class); - PowerMock.mockStatic(Catalog.class); - EasyMock.expect(Catalog.getCurrentSystemInfo()).andReturn(systemInfoService).anyTimes(); - EasyMock.expect(Catalog.getInstance()).andReturn(catalog).anyTimes(); - PowerMock.replay(Catalog.class); + Deencapsulation.setField(routineLoadJob, "kafkaPartitions", partitions); + Deencapsulation.setField(routineLoadJob, "desireTaskConcurrentNum", 3); + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return systemInfoService; + } - RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - EasyMock.expect(catalog.getEditLog()).andReturn(editLog).anyTimes(); - EasyMock.expect(catalog.getRoutineLoadInstance()).andReturn(routineLoadManager).anyTimes(); - EasyMock.replay(catalog); + @Mock + public Catalog getCurrentCatalog() { + return catalog; + } + }; - routineLoadManager.addRoutineLoadJob(routineLoadJob); - routineLoadManager.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.NEED_SCHEDULER); + new Expectations() { + { + catalog.getRoutineLoadInstance(); + result = routineLoadManager; + routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER); + result = routineLoadJobList; + catalog.getDb(anyLong); + result = database; + database.getClusterName(); + result = clusterName; + systemInfoService.getClusterBackendIds(clusterName, true); + result = beIds; + routineLoadManager.getSizeOfIdToRoutineLoadTask(); + result = 1; + routineLoadManager.getTotalMaxConcurrentTaskNum(); + result = 10; + } + }; RoutineLoadScheduler routineLoadScheduler = new RoutineLoadScheduler(); + Deencapsulation.setField(routineLoadScheduler, "routineLoadManager", routineLoadManager); routineLoadScheduler.runOneCycle(); - Assert.assertEquals(1, routineLoadManager.getIdToRoutineLoadTask().size()); - Assert.assertEquals(1, routineLoadManager.getNeedSchedulerRoutineLoadTasks().size()); - Assert.assertEquals(1, routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING).size()); - Assert.assertEquals(0, routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER).size()); - + Assert.assertEquals(2, routineLoadJob.getNeedSchedulerTaskInfoList().size()); + for (RoutineLoadTaskInfo routineLoadTaskInfo : routineLoadJob.getNeedSchedulerTaskInfoList()) { + KafkaTaskInfo kafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; + if (kafkaTaskInfo.getPartitions().size() == 2) { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(100)); + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(300)); + } else { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(200)); + } + } } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index 69d3c6b7e3f09c..6226a0b2699f3c 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -17,13 +17,16 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; +import mockit.Deencapsulation; import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; import mockit.Verifications; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; @@ -31,6 +34,8 @@ import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Queue; @@ -44,14 +49,16 @@ public class RoutineLoadTaskSchedulerTest { private AgentTaskExecutor agentTaskExecutor; @Test - public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1) { + public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1, + @Injectable KafkaRoutineLoadJob routineLoadJob) throws LoadException { long beId = 100L; - Queue routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue(); - KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(1L); + List routineLoadTaskInfoList = Lists.newArrayList(); + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo("1", "1"); routineLoadTaskInfo1.addKafkaPartition(1); routineLoadTaskInfo1.addKafkaPartition(2); - routineLoadTaskInfoQueue.add(routineLoadTaskInfo1); + routineLoadTaskInfoList.add(routineLoadTaskInfo1); + Map idToRoutineLoadTask = Maps.newHashMap(); idToRoutineLoadTask.put(1L, routineLoadTaskInfo1); @@ -62,19 +69,26 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 KafkaProgress kafkaProgress = new KafkaProgress(); kafkaProgress.setPartitionIdToOffset(partitionIdToOffset); + Map idToRoutineLoadJob = Maps.newConcurrentMap(); + idToRoutineLoadJob.put("1", routineLoadJob); + + Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); + new Expectations() { { Catalog.getInstance(); result = catalog; catalog.getRoutineLoadInstance(); result = routineLoadManager; + Catalog.getCurrentCatalog(); + result = catalog; + catalog.getNextId(); + result = 2L; routineLoadManager.getClusterIdleSlotNum(); result = 3; routineLoadManager.getNeedSchedulerRoutineLoadTasks(); - result = routineLoadTaskInfoQueue; - routineLoadManager.getIdToRoutineLoadTask(); - result = idToRoutineLoadTask; + result = routineLoadTaskInfoList; kafkaRoutineLoadJob1.getDbId(); result = 1L; @@ -90,18 +104,32 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 routineLoadManager.getMinTaskBeId(); result = beId; - routineLoadManager.getJobByTaskId(1L); + routineLoadManager.getJob(anyString); result = kafkaRoutineLoadJob1; } }; + KafkaRoutineLoadTask kafkaRoutineLoadTask = new KafkaRoutineLoadTask(kafkaRoutineLoadJob1.getResourceInfo(), + beId, kafkaRoutineLoadJob1.getDbId(), kafkaRoutineLoadJob1.getTableId(), + 0L, 0L, 0L, kafkaRoutineLoadJob1.getColumns(), kafkaRoutineLoadJob1.getWhere(), + kafkaRoutineLoadJob1.getColumnSeparator(), + (KafkaTaskInfo) routineLoadTaskInfo1, + kafkaRoutineLoadJob1.getProgress()); + + new Expectations() { + { + kafkaRoutineLoadJob1.createTask((RoutineLoadTaskInfo) any, anyLong); + result = kafkaRoutineLoadTask; + } + }; + RoutineLoadTaskScheduler routineLoadTaskScheduler = new RoutineLoadTaskScheduler(); routineLoadTaskScheduler.runOneCycle(); new Verifications() { { AgentTask routineLoadTask = - AgentTaskQueue.getTask(beId, TTaskType.STREAM_LOAD, routineLoadTaskInfo1.getSignature()); + AgentTaskQueue.getTask(beId, TTaskType.STREAM_LOAD, 2L); Assert.assertEquals(beId, routineLoadTask.getBackendId()); Assert.assertEquals(100L, From d8aea83f62f369d474f716d2594d477619594337 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 22 Nov 2018 19:56:38 +0800 Subject: [PATCH 3/4] Change DEFAULT_TASK_TIMEOUT_SECONDS * 60 to DEFAULT_TASK_TIMEOUT_SECONDS --- .../load/routineload/RoutineLoadJob.java | 2 +- .../load/routineload/RoutineLoadManager.java | 26 ++----------------- .../routineload/RoutineLoadTaskScheduler.java | 2 +- 3 files changed, 4 insertions(+), 26 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 2b708837c300cc..d2bdab8b637bcf 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -193,7 +193,7 @@ public void processTimeoutTasks() { for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) { if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) - > DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000) { + > DEFAULT_TASK_TIMEOUT_SECONDS * 1000) { String oldSignature = routineLoadTaskInfo.getId(); if (routineLoadTaskInfo instanceof KafkaTaskInfo) { // remove old task diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index eb1e7e4d9c3ecd..72d6cbfa207e59 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -195,30 +195,8 @@ public List getRoutineLoadJobByState(RoutineLoadJob.JobState job List jobs = new ArrayList<>(); Collection stateJobs = null; LOG.debug("begin to get routine load job by state {}", jobState.name()); - switch (jobState) { - case NEED_SCHEDULER: - stateJobs = idToRoutineLoadJob.values().stream() - .filter(entity -> entity.getState() == RoutineLoadJob.JobState.NEED_SCHEDULER) - .collect(Collectors.toList()); - break; - case PAUSED: - stateJobs = idToRoutineLoadJob.values().stream() - .filter(entity -> entity.getState() == RoutineLoadJob.JobState.PAUSED) - .collect(Collectors.toList()); - break; - case RUNNING: - stateJobs = idToRoutineLoadJob.values().stream() - .filter(entity -> entity.getState() == RoutineLoadJob.JobState.RUNNING) - .collect(Collectors.toList()); - break; - case STOPPED: - stateJobs = idToRoutineLoadJob.values().stream() - .filter(entity -> entity.getState() == RoutineLoadJob.JobState.STOPPED) - .collect(Collectors.toList()); - break; - default: - break; - } + idToRoutineLoadJob.values().stream() + .filter(entity -> entity.getState() == jobState).collect(Collectors.toList()); if (stateJobs != null) { jobs.addAll(stateJobs); LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name()); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 46f7c6036b7772..61b3d3aaf3e936 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -28,7 +28,6 @@ import java.util.Iterator; import java.util.List; -import java.util.Queue; /** * Routine load task scheduler is a function which allocate task to be. @@ -36,6 +35,7 @@ * Step2: equally divide to be * Step3: submit tasks to be */ +// TODO(ml): change interval ms in constructor public class RoutineLoadTaskScheduler extends Daemon { private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class); From 2913e50dfea7860eaa91456d84e572462141dd28 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Fri, 23 Nov 2018 10:02:39 +0800 Subject: [PATCH 4/4] Fix bug of RoutineLoadManager --- .../org/apache/doris/load/routineload/RoutineLoadManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 72d6cbfa207e59..d419675af2b015 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -195,7 +195,7 @@ public List getRoutineLoadJobByState(RoutineLoadJob.JobState job List jobs = new ArrayList<>(); Collection stateJobs = null; LOG.debug("begin to get routine load job by state {}", jobState.name()); - idToRoutineLoadJob.values().stream() + stateJobs = idToRoutineLoadJob.values().stream() .filter(entity -> entity.getState() == jobState).collect(Collectors.toList()); if (stateJobs != null) { jobs.addAll(stateJobs);