diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index e79aa1fa4e0a9f..e253a0740ccd21 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -135,7 +135,7 @@ import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; import org.apache.doris.load.LoadJob.JobState; -import org.apache.doris.load.routineload.RoutineLoad; +import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; import org.apache.doris.metric.MetricRepo; @@ -241,7 +241,7 @@ public class Catalog { private ConcurrentHashMap nameToCluster; private Load load; - private RoutineLoad routineLoad; + private RoutineLoadManager routineLoadManager; private ExportMgr exportMgr; private Clone clone; private Alter alter; @@ -372,7 +372,7 @@ private Catalog() { this.idToDb = new ConcurrentHashMap<>(); this.fullNameToDb = new ConcurrentHashMap<>(); this.load = new Load(); - this.routineLoad = new RoutineLoad(); + this.routineLoadManager = new RoutineLoadManager(); this.exportMgr = new ExportMgr(); this.clone = new Clone(); this.alter = new Alter(); @@ -4250,8 +4250,8 @@ public Load getLoadInstance() { return this.load; } - public RoutineLoad getRoutineLoadInstance() { - return routineLoad; + public RoutineLoadManager getRoutineLoadInstance() { + return routineLoadManager; } public ExportMgr getExportMgr() { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java similarity index 64% rename from fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java rename to fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index ff46af630bea46..6f0f4243ca7a8a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadProgress.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -17,8 +17,21 @@ package org.apache.doris.load.routineload; -public class KafkaRoutineLoadProgress { +import java.util.Map; - private String partitionName; - private long offset; +/** + * this is description of kafka routine load progress + * the data before offset was already loaded in doris + */ +public class KafkaProgress { + + private Map partitionIdToOffset; + + public Map getPartitionIdToOffset() { + return partitionIdToOffset; + } + + public void setPartitionIdToOffset(Map partitionIdToOffset) { + this.partitionIdToOffset = partitionIdToOffset; + } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index cced4ddc1e4836..b59bd12513d24b 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -18,13 +18,13 @@ package org.apache.doris.load.routineload; import com.google.common.base.Strings; +import com.google.gson.Gson; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.thrift.TTaskType; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.logging.log4j.LogManager; @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.UUID; public class KafkaRoutineLoadJob extends RoutineLoadJob { private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJob.class); @@ -43,13 +44,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private String serverAddress; private String topic; - // optional + // optional, user want to load partitions. private List kafkaPartitions; public KafkaRoutineLoadJob() { } - public KafkaRoutineLoadJob(long id, String name, String userName, long dbId, long tableId, + public KafkaRoutineLoadJob(String id, String name, String userName, long dbId, long tableId, String partitions, String columns, String where, String columnSeparator, int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType, int maxErrorNum, TResourceInfo resourceInfo, String serverAddress, String topic) { @@ -59,21 +60,34 @@ public KafkaRoutineLoadJob(long id, String name, String userName, long dbId, lon this.topic = topic; } + public KafkaProgress getProgress() { + Gson gson = new Gson(); + return gson.fromJson(this.progress, KafkaProgress.class); + } + @Override - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { - // divide kafkaPartitions into tasks - List kafkaRoutineLoadTaskList = new ArrayList<>(); - for (int i = 0; i < currentConcurrentTaskNum; i++) { - // TODO(ml): init load task - kafkaRoutineLoadTaskList.add(new KafkaRoutineLoadTask(getResourceInfo(), 0L, TTaskType.PUSH, - dbId, tableId, 0L, 0L, 0L, SystemIdGenerator.getNextId())); - } - for (int i = 0; i < kafkaPartitions.size(); i++) { - kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum).addKafkaPartition(kafkaPartitions.get(i)); + public void divideRoutineLoadJob(int currentConcurrentTaskNum) { + writeLock(); + try { + if (state == JobState.NEED_SCHEDULER) { + // divide kafkaPartitions into tasks + for (int i = 0; i < currentConcurrentTaskNum; i++) { + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(UUID.randomUUID().toString(), id); + routineLoadTaskInfoList.add(kafkaTaskInfo); + needSchedulerTaskInfoList.add(kafkaTaskInfo); + } + for (int i = 0; i < kafkaPartitions.size(); i++) { + ((KafkaTaskInfo) routineLoadTaskInfoList.get(i % currentConcurrentTaskNum)) + .addKafkaPartition(kafkaPartitions.get(i)); + } + // change job state to running + state = JobState.RUNNING; + } else { + LOG.debug("Ignore to divide routine load job while job state {}", state); + } + } finally { + writeUnlock(); } - List result = new ArrayList<>(); - result.addAll(kafkaRoutineLoadTaskList); - return result; } @Override @@ -99,6 +113,15 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)); } + @Override + public RoutineLoadTask createTask(RoutineLoadTaskInfo routineLoadTaskInfo, long beId) { + return new KafkaRoutineLoadTask(getResourceInfo(), + beId, getDbId(), getTableId(), + 0L, 0L, 0L, getColumns(), getWhere(), getColumnSeparator(), + (KafkaTaskInfo) routineLoadTaskInfo, + getProgress()); + } + private void updatePartitions() { // fetch all of kafkaPartitions in topic if (kafkaPartitions == null || kafkaPartitions.size() == 0) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java index 89347745c48c20..fe839ee879a7e5 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadTask.java @@ -17,27 +17,29 @@ package org.apache.doris.load.routineload; -import com.google.common.collect.Lists; import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TTaskType; -import java.util.List; +import java.util.HashMap; +import java.util.Map; -public class KafkaRoutineLoadTask extends RoutineLoadTask { - - private List kafkaPartitions; - public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, - long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) { - super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature); - this.kafkaPartitions = Lists.newArrayList(); - } +public class KafkaRoutineLoadTask extends RoutineLoadTask { - public void addKafkaPartition(int partition) { - kafkaPartitions.add(partition); + private Map partitionIdToOffset; + + public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, + long dbId, long tableId, long partitionId, long indexId, long tabletId, + String columns, String where, String columnSeparator, + KafkaTaskInfo kafkaTaskInfo, KafkaProgress kafkaProgress) { + super(resourceInfo, backendId, TTaskType.STREAM_LOAD, dbId, tableId, partitionId, indexId, tabletId, + kafkaTaskInfo.getId(), columns, where, columnSeparator, RoutineLoadJob.DataSourceType.KAFKA); + this.partitionIdToOffset = new HashMap<>(); + kafkaTaskInfo.getPartitions().parallelStream().forEach(entity -> + partitionIdToOffset.put(entity, kafkaProgress.getPartitionIdToOffset().get(entity))); } - public List getKafkaPartitions() { - return kafkaPartitions; + public Map getPartitionIdToOffset() { + return partitionIdToOffset; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java new file mode 100644 index 00000000000000..7fe6b885d66726 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.common.SystemIdGenerator; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class KafkaTaskInfo extends RoutineLoadTaskInfo { + + private List partitions; + + public KafkaTaskInfo(String id, String jobId) { + super(id, jobId); + this.partitions = new ArrayList<>(); + } + + public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) { + super(UUID.randomUUID().toString(), kafkaTaskInfo.getJobId()); + this.partitions = kafkaTaskInfo.getPartitions(); + } + + public void addKafkaPartition(int partition) { + partitions.add(partition); + } + + public List getPartitions() { + return partitions; + } + + +} diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java deleted file mode 100644 index 0dd6b3cc431974..00000000000000 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoad.java +++ /dev/null @@ -1,264 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.load.routineload; - -import com.google.common.collect.Maps; -import org.apache.doris.catalog.Catalog; -import org.apache.doris.common.LoadException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -public class RoutineLoad { - private static final Logger LOG = LogManager.getLogger(RoutineLoad.class); - private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; - - // TODO(ml): real-time calculate by be - private Map beIdToMaxConcurrentTasks; - - // stream load job meta - private Map idToRoutineLoadJob; - private Map idToNeedSchedulerRoutineLoadJob; - private Map idToRunningRoutineLoadJob; - private Map idToCancelledRoutineLoadJob; - - // stream load tasks meta (not persistent) - private Map idToRoutineLoadTask; - private Map idToNeedSchedulerRoutineLoadTask; - - private ReentrantReadWriteLock lock; - - private void readLock() { - lock.readLock().lock(); - } - - private void readUnlock() { - lock.readLock().unlock(); - } - - private void writeLock() { - lock.writeLock().lock(); - } - - private void writeUnlock() { - lock.writeLock().unlock(); - } - - public RoutineLoad() { - idToRoutineLoadJob = Maps.newHashMap(); - idToNeedSchedulerRoutineLoadJob = Maps.newHashMap(); - idToRunningRoutineLoadJob = Maps.newHashMap(); - idToCancelledRoutineLoadJob = Maps.newHashMap(); - idToRoutineLoadTask = Maps.newHashMap(); - idToNeedSchedulerRoutineLoadTask = Maps.newHashMap(); - lock = new ReentrantReadWriteLock(true); - } - - public int getTotalMaxConcurrentTaskNum() { - readLock(); - try { - if (beIdToMaxConcurrentTasks == null) { - beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) - .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); - } - return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum(); - } finally { - readUnlock(); - } - } - - public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) { - writeLock(); - try { - idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob); - } finally { - writeUnlock(); - } - } - - public void addRoutineLoadTasks(List routineLoadTaskList) { - writeLock(); - try { - idToRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect( - Collectors.toMap(task -> task.getSignature(), task -> task))); - } finally { - writeUnlock(); - } - } - - public Map getIdToRoutineLoadTask() { - return idToRoutineLoadTask; - } - - public void addNeedSchedulerRoutineLoadTasks(List routineLoadTaskList) { - writeLock(); - try { - idToNeedSchedulerRoutineLoadTask.putAll(routineLoadTaskList.parallelStream().collect( - Collectors.toMap(task -> task.getSignature(), task -> task))); - } finally { - writeUnlock(); - } - } - - public void removeRoutineLoadTasks(List routineLoadTasks) { - if (routineLoadTasks != null) { - writeLock(); - try { - routineLoadTasks.parallelStream().forEach(task -> idToRoutineLoadTask.remove(task.getSignature())); - routineLoadTasks.parallelStream().forEach(task -> - idToNeedSchedulerRoutineLoadTask.remove(task.getSignature())); - } finally { - writeUnlock(); - } - } - } - - public Map getIdToNeedSchedulerRoutineLoadTasks() { - readLock(); - try { - return idToNeedSchedulerRoutineLoadTask; - } finally { - readUnlock(); - } - } - - public List getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException { - List jobs = new ArrayList<>(); - Collection stateJobs = null; - readLock(); - LOG.debug("begin to get routine load job by state {}", jobState.name()); - try { - switch (jobState) { - case NEED_SCHEDULER: - stateJobs = idToNeedSchedulerRoutineLoadJob.values(); - break; - case PAUSED: - throw new LoadException("not support getting paused routine load jobs"); - case RUNNING: - stateJobs = idToRunningRoutineLoadJob.values(); - break; - case STOPPED: - throw new LoadException("not support getting stopped routine load jobs"); - default: - break; - } - if (stateJobs != null) { - jobs.addAll(stateJobs); - LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name()); - } - } finally { - readUnlock(); - } - return jobs; - } - - public void updateRoutineLoadJobStateNoValid(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState) { - writeLock(); - try { - RoutineLoadJob.JobState srcJobState = routineLoadJob.getState(); - long jobId = routineLoadJob.getId(); - LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState); - switch (jobState) { - case NEED_SCHEDULER: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob); - break; - case PAUSED: - idToNeedSchedulerRoutineLoadJob.remove(jobId); - idToRunningRoutineLoadJob.remove(jobId); - break; - case RUNNING: - idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob); - idToRunningRoutineLoadJob.put(jobId, routineLoadJob); - break; - case CANCELLED: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.remove(jobId); - idToCancelledRoutineLoadJob.put(jobId, routineLoadJob); - break; - case STOPPED: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.remove(jobId); - break; - default: - break; - } - routineLoadJob.setState(jobState); - Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob); - } finally { - writeUnlock(); - } - } - - public void updateRoutineLoadJobState(RoutineLoadJob routineLoadJob, RoutineLoadJob.JobState jobState) - throws LoadException { - writeLock(); - try { - RoutineLoadJob.JobState srcJobState = routineLoadJob.getState(); - long jobId = routineLoadJob.getId(); - LOG.info("begin to change job {} state from {} to {}", jobId, srcJobState, jobState); - checkStateTransform(srcJobState, jobState); - switch (jobState) { - case NEED_SCHEDULER: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.put(jobId, routineLoadJob); - break; - case PAUSED: - idToNeedSchedulerRoutineLoadJob.remove(jobId); - idToRunningRoutineLoadJob.remove(jobId); - break; - case RUNNING: - idToNeedSchedulerRoutineLoadJob.remove(jobId, routineLoadJob); - idToRunningRoutineLoadJob.put(jobId, routineLoadJob); - break; - case CANCELLED: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.remove(jobId); - idToCancelledRoutineLoadJob.put(jobId, routineLoadJob); - break; - case STOPPED: - idToRunningRoutineLoadJob.remove(jobId); - idToNeedSchedulerRoutineLoadJob.remove(jobId); - break; - default: - break; - } - routineLoadJob.setState(jobState); - Catalog.getInstance().getEditLog().logRoutineLoadJob(routineLoadJob); - } finally { - writeUnlock(); - } - } - - private void checkStateTransform(RoutineLoadJob.JobState currentState, RoutineLoadJob.JobState desireState) - throws LoadException { - if (currentState == RoutineLoadJob.JobState.PAUSED && desireState == RoutineLoadJob.JobState.NEED_SCHEDULER) { - throw new LoadException("could not transform " + currentState + " to " + desireState); - } else if (currentState == RoutineLoadJob.JobState.CANCELLED || - currentState == RoutineLoadJob.JobState.STOPPED) { - throw new LoadException("could not transform " + currentState + " to " + desireState); - } - } - -} diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index af737356febf57..d2bdab8b637bcf 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -17,15 +17,18 @@ package org.apache.doris.load.routineload; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Writable; import org.apache.doris.thrift.TResourceInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -35,7 +38,11 @@ * The desireTaskConcurrentNum means that user expect the number of concurrent stream load * The routine load job support different streaming medium such as KAFKA */ -public class RoutineLoadJob implements Writable { +public abstract class RoutineLoadJob implements Writable { + + private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class); + + private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; public enum JobState { NEED_SCHEDULER, @@ -49,7 +56,7 @@ public enum DataSourceType { KAFKA } - protected long id; + protected String id; protected String name; protected String userName; protected long dbId; @@ -64,6 +71,11 @@ public enum DataSourceType { // max number of error data in ten thousand data protected int maxErrorNum; protected String progress; + + // The tasks belong to this job + protected List routineLoadTaskInfoList; + protected List needSchedulerTaskInfoList; + protected ReentrantReadWriteLock lock; // TODO(ml): error sample @@ -71,7 +83,7 @@ public enum DataSourceType { public RoutineLoadJob() { } - public RoutineLoadJob(long id, String name, String userName, long dbId, long tableId, + public RoutineLoadJob(String id, String name, String userName, long dbId, long tableId, String partitions, String columns, String where, String columnSeparator, int desireTaskConcurrentNum, JobState state, DataSourceType dataSourceType, int maxErrorNum, TResourceInfo resourceInfo) { @@ -90,6 +102,8 @@ public RoutineLoadJob(long id, String name, String userName, long dbId, long tab this.maxErrorNum = maxErrorNum; this.resourceInfo = resourceInfo; this.progress = ""; + this.routineLoadTaskInfoList = new ArrayList<>(); + this.needSchedulerTaskInfoList = new ArrayList<>(); lock = new ReentrantReadWriteLock(true); } @@ -112,10 +126,30 @@ public void writeUnlock() { // thrift object private TResourceInfo resourceInfo; - public long getId() { + public String getId() { return id; } + public long getDbId() { + return dbId; + } + + public long getTableId() { + return tableId; + } + + public String getColumns() { + return columns; + } + + public String getWhere() { + return where; + } + + public String getColumnSeparator() { + return columnSeparator; + } + public JobState getState() { return state; } @@ -128,8 +162,57 @@ public TResourceInfo getResourceInfo() { return resourceInfo; } - public List divideRoutineLoadJob(int currentConcurrentTaskNum) { - return null; + public int getSizeOfRoutineLoadTaskInfoList() { + readLock(); + try { + return routineLoadTaskInfoList.size(); + } finally { + readUnlock(); + } + + } + + public List getNeedSchedulerTaskInfoList() { + return needSchedulerTaskInfoList; + } + + public void updateState(JobState jobState) { + writeLock(); + try { + state = jobState; + } finally { + writeUnlock(); + } + } + + public void processTimeoutTasks() { + writeLock(); + try { + List runningTasks = new ArrayList<>(routineLoadTaskInfoList); + runningTasks.removeAll(needSchedulerTaskInfoList); + + for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) { + if ((System.currentTimeMillis() - routineLoadTaskInfo.getLoadStartTimeMs()) + > DEFAULT_TASK_TIMEOUT_SECONDS * 1000) { + String oldSignature = routineLoadTaskInfo.getId(); + if (routineLoadTaskInfo instanceof KafkaTaskInfo) { + // remove old task + routineLoadTaskInfoList.remove(routineLoadTaskInfo); + // add new task + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo((KafkaTaskInfo) routineLoadTaskInfo); + routineLoadTaskInfoList.add(kafkaTaskInfo); + needSchedulerTaskInfoList.add(kafkaTaskInfo); + } + LOG.debug("Task {} was ran more then {} minutes. It was removed and rescheduled", + oldSignature, DEFAULT_TASK_TIMEOUT_SECONDS); + } + } + } finally { + writeUnlock(); + } + } + + public void divideRoutineLoadJob(int currentConcurrentTaskNum) { } public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { @@ -145,4 +228,17 @@ public void write(DataOutput out) throws IOException { public void readFields(DataInput in) throws IOException { // TODO(ml) } + + abstract RoutineLoadTask createTask(RoutineLoadTaskInfo routineLoadTaskInfo, long beId); + + private void checkStateTransform(RoutineLoadJob.JobState currentState, RoutineLoadJob.JobState desireState) + throws LoadException { + if (currentState == RoutineLoadJob.JobState.PAUSED && desireState == RoutineLoadJob.JobState.NEED_SCHEDULER) { + throw new LoadException("could not transform " + currentState + " to " + desireState); + } else if (currentState == RoutineLoadJob.JobState.CANCELLED || + currentState == RoutineLoadJob.JobState.STOPPED) { + throw new LoadException("could not transform " + currentState + " to " + desireState); + } + } + } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java new file mode 100644 index 00000000000000..d419675af2b015 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +public class RoutineLoadManager { + private static final Logger LOG = LogManager.getLogger(RoutineLoadManager.class); + private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; + + // Long is beId, integer is the size of tasks in be + private Map beIdToMaxConcurrentTasks; + private Map beIdToConcurrentTasks; + + // stream load job meta + private Map idToRoutineLoadJob; + + private ReentrantReadWriteLock lock; + + private void readLock() { + lock.readLock().lock(); + } + + private void readUnlock() { + lock.readLock().unlock(); + } + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } + + public RoutineLoadManager() { + idToRoutineLoadJob = Maps.newConcurrentMap(); + beIdToConcurrentTasks = Maps.newHashMap(); + lock = new ReentrantReadWriteLock(true); + } + + public void initBeIdToMaxConcurrentTasks() { + if (beIdToMaxConcurrentTasks == null) { + beIdToMaxConcurrentTasks = Catalog.getCurrentSystemInfo().getBackendIds(true) + .parallelStream().collect(Collectors.toMap(beId -> beId, beId -> DEFAULT_BE_CONCURRENT_TASK_NUM)); + } + } + + public int getTotalMaxConcurrentTaskNum() { + readLock(); + try { + initBeIdToMaxConcurrentTasks(); + return beIdToMaxConcurrentTasks.values().stream().mapToInt(i -> i).sum(); + } finally { + readUnlock(); + } + } + + public void updateBeIdTaskMaps() { + writeLock(); + try { + initBeIdToMaxConcurrentTasks(); + List beIds = Catalog.getCurrentSystemInfo().getBackendIds(true); + + // diff beIds and beIdToMaxConcurrentTasks.keys() + List newBeIds = beIds.parallelStream().filter(entity -> beIdToMaxConcurrentTasks.get(entity) == null) + .collect(Collectors.toList()); + List unavailableBeIds = beIdToMaxConcurrentTasks.keySet().parallelStream() + .filter(entity -> !beIds.contains(entity)) + .collect(Collectors.toList()); + newBeIds.parallelStream().forEach(entity -> beIdToMaxConcurrentTasks.put(entity, DEFAULT_BE_CONCURRENT_TASK_NUM)); + for (long beId : unavailableBeIds) { + beIdToMaxConcurrentTasks.remove(beId); + beIdToConcurrentTasks.remove(beId); + } + LOG.info("There are {} backends which participate in routine load scheduler. " + + "There are {} new backends and {} unavailable backends for routine load", + beIdToMaxConcurrentTasks.size(), newBeIds.size(), unavailableBeIds.size()); + } finally { + writeUnlock(); + } + } + + public void addNumOfConcurrentTasksByBeId(long beId) { + writeLock(); + try { + if (beIdToConcurrentTasks.get(beId) == null) { + beIdToConcurrentTasks.put(beId, 1); + } else { + int concurrentTaskNum = (int) beIdToConcurrentTasks.get(beId); + concurrentTaskNum++; + beIdToConcurrentTasks.put(beId, concurrentTaskNum); + } + } finally { + writeUnlock(); + } + } + + public void addRoutineLoadJob(RoutineLoadJob routineLoadJob) { + idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob); + } + + public int getSizeOfIdToRoutineLoadTask() { + int sizeOfTasks = 0; + for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { + sizeOfTasks += routineLoadJob.getSizeOfRoutineLoadTaskInfoList(); + } + return sizeOfTasks; + } + + public int getClusterIdleSlotNum() { + readLock(); + try { + int result = 0; + initBeIdToMaxConcurrentTasks(); + for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { + if (beIdToConcurrentTasks.get(entry.getKey()) == null) { + result += entry.getValue(); + } else { + result += entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); + } + } + return result; + } finally { + readUnlock(); + } + } + + public long getMinTaskBeId() throws LoadException { + readLock(); + try { + long result = -1L; + int maxIdleSlotNum = 0; + initBeIdToMaxConcurrentTasks(); + for (Map.Entry entry : beIdToMaxConcurrentTasks.entrySet()) { + if (beIdToConcurrentTasks.get(entry.getKey()) == null) { + result = maxIdleSlotNum < entry.getValue() ? entry.getKey() : result; + maxIdleSlotNum = Math.max(maxIdleSlotNum, entry.getValue()); + } else { + int idelTaskNum = entry.getValue() - beIdToConcurrentTasks.get(entry.getKey()); + result = maxIdleSlotNum < idelTaskNum ? entry.getKey() : result; + maxIdleSlotNum = Math.max(maxIdleSlotNum, idelTaskNum); + } + } + if (result < 0) { + throw new LoadException("There is no empty slot in cluster"); + } + return result; + } finally { + readUnlock(); + } + } + + public List getNeedSchedulerRoutineLoadTasks() { + List routineLoadTaskInfoList = new ArrayList<>(); + for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { + routineLoadTaskInfoList.addAll(routineLoadJob.getNeedSchedulerTaskInfoList()); + } + return routineLoadTaskInfoList; + } + + public RoutineLoadJob getJob(String jobId) { + return idToRoutineLoadJob.get(jobId); + } + + public List getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException { + List jobs = new ArrayList<>(); + Collection stateJobs = null; + LOG.debug("begin to get routine load job by state {}", jobState.name()); + stateJobs = idToRoutineLoadJob.values().stream() + .filter(entity -> entity.getState() == jobState).collect(Collectors.toList()); + if (stateJobs != null) { + jobs.addAll(stateJobs); + LOG.info("got {} routine load jobs by state {}", jobs.size(), jobState.name()); + } + return jobs; + } + + public void processTimeoutTasks() { + for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) { + routineLoadJob.processTimeoutTasks(); + } + } + +} diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 6c3cdefdf0a975..c2a86d0c5d34f4 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -30,10 +30,19 @@ public class RoutineLoadScheduler extends Daemon { private static final Logger LOG = LogManager.getLogger(RoutineLoadScheduler.class); - private RoutineLoad routineLoad = Catalog.getInstance().getRoutineLoadInstance(); + private RoutineLoadManager routineLoadManager = Catalog.getInstance().getRoutineLoadInstance(); @Override protected void runOneCycle() { + try { + process(); + } catch (Throwable e) { + LOG.error("failed to scheduler jobs with error massage {}", e.getMessage(), e); + } + } + + private void process() { + // update // get need scheduler routine jobs List routineLoadJobList = null; try { @@ -44,48 +53,29 @@ protected void runOneCycle() { LOG.debug("there are {} job need scheduler", routineLoadJobList.size()); for (RoutineLoadJob routineLoadJob : routineLoadJobList) { - // judge nums of tasks more then max concurrent tasks of cluster - List routineLoadTaskList = null; try { - routineLoadJob.writeLock(); - - if (routineLoadJob.getState() == RoutineLoadJob.JobState.NEED_SCHEDULER) { - int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); - int totalTaskNum = currentConcurrentTaskNum + routineLoad.getIdToRoutineLoadTask().size(); - if (totalTaskNum > routineLoad.getTotalMaxConcurrentTaskNum()) { - LOG.info("job {} concurrent task num {}, current total task num {}. " - + "desired total task num {} more then total max task num {}, " - + "skip this turn of scheduler", - routineLoadJob.getId(), currentConcurrentTaskNum, - routineLoad.getIdToRoutineLoadTask().size(), - totalTaskNum, routineLoad.getTotalMaxConcurrentTaskNum()); - break; - } - // divide job into tasks - routineLoadTaskList = routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); - - // update tasks meta - routineLoad.addRoutineLoadTasks(routineLoadTaskList); - routineLoad.addNeedSchedulerRoutineLoadTasks(routineLoadTaskList); - - // change job state to running - routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.RUNNING); + // judge nums of tasks more then max concurrent tasks of cluster + int currentConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum(); + int totalTaskNum = currentConcurrentTaskNum + routineLoadManager.getSizeOfIdToRoutineLoadTask(); + if (totalTaskNum > routineLoadManager.getTotalMaxConcurrentTaskNum()) { + LOG.info("job {} concurrent task num {}, current total task num {}. " + + "desired total task num {} more then total max task num {}, " + + "skip this turn of scheduler", + routineLoadJob.getId(), currentConcurrentTaskNum, + routineLoadManager.getSizeOfIdToRoutineLoadTask(), + totalTaskNum, routineLoadManager.getTotalMaxConcurrentTaskNum()); + break; } + // divide job into tasks + routineLoadJob.divideRoutineLoadJob(currentConcurrentTaskNum); } catch (MetaNotFoundException e) { - routineLoad.updateRoutineLoadJobStateNoValid(routineLoadJob, RoutineLoadJob.JobState.CANCELLED); - } catch (LoadException e) { - LOG.error("failed to scheduler job {} with error massage {}", routineLoadJob.getId(), - e.getMessage(), e); - routineLoad.removeRoutineLoadTasks(routineLoadTaskList); - } finally { - routineLoadJob.writeUnlock(); + routineLoadJob.updateState(RoutineLoadJob.JobState.CANCELLED); } } - } private List getNeedSchedulerRoutineJobs() throws LoadException { - return routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER); + return routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java index 184feee880db95..18f18aa97a410a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTask.java @@ -17,14 +17,32 @@ package org.apache.doris.load.routineload; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.CatalogIdGenerator; +import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.task.AgentTask; import org.apache.doris.thrift.TResourceInfo; import org.apache.doris.thrift.TTaskType; -public class RoutineLoadTask extends AgentTask{ +public class RoutineLoadTask extends AgentTask { + + private String id; + private String columns; + private String where; + private String columnSeparator; + private RoutineLoadJob.DataSourceType dataSourceType; + public RoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType, - long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) { - super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature); + long dbId, long tableId, long partitionId, long indexId, long tabletId, String id, + String columns, String where, String columnSeparator, + RoutineLoadJob.DataSourceType dataSourceType) { + super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, + Catalog.getCurrentCatalog().getNextId()); + this.id = id; + this.columns = columns; + this.where = where; + this.columnSeparator = columnSeparator; + this.dataSourceType = dataSourceType; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java new file mode 100644 index 00000000000000..0eca2bb423a088 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + + +/** + * Routine load task info is the task info include the only id (signature). + * For the kafka type of task info, it also include partitions which will be obtained data in this task. + * The routine load task info and routine load task are the same thing logically. + * Differently, routine load task is a agent task include backendId which will execute this task. + */ +public class RoutineLoadTaskInfo { + + private String id; + private String jobId; + + private long createTimeMs; + private long loadStartTimeMs; + + public RoutineLoadTaskInfo(String id, String jobId) { + this.id = id; + this.jobId = jobId; + this.createTimeMs = System.currentTimeMillis(); + } + + public String getId() { + return id; + } + + public String getJobId() { + return jobId; + } + + public void setLoadStartTimeMs(long loadStartTimeMs) { + this.loadStartTimeMs = loadStartTimeMs; + } + + public long getLoadStartTimeMs() { + return loadStartTimeMs; + } +} diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java new file mode 100644 index 00000000000000..61b3d3aaf3e936 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.util.Daemon; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Iterator; +import java.util.List; + +/** + * Routine load task scheduler is a function which allocate task to be. + * Step1: get total idle task num of backends. + * Step2: equally divide to be + * Step3: submit tasks to be + */ +// TODO(ml): change interval ms in constructor +public class RoutineLoadTaskScheduler extends Daemon { + + private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class); + + private RoutineLoadManager routineLoadManager = Catalog.getInstance().getRoutineLoadInstance(); + + @Override + protected void runOneCycle() { + try { + process(); + } catch (Throwable e) { + LOG.warn("Failed to process one round of RoutineLoadTaskScheduler with error message {}", + e.getMessage(), e); + } + } + + private void process() throws LoadException { + // update current beIdMaps for tasks + routineLoadManager.updateBeIdTaskMaps(); + + // check timeout tasks + routineLoadManager.processTimeoutTasks(); + + // get idle be task num + int clusterIdleSlotNum = routineLoadManager.getClusterIdleSlotNum(); + int scheduledTaskNum = 0; + List routineLoadTaskList = routineLoadManager.getNeedSchedulerRoutineLoadTasks(); + Iterator iterator = routineLoadTaskList.iterator(); + AgentBatchTask batchTask = new AgentBatchTask(); + + // allocate task to be + while (clusterIdleSlotNum > 0) { + if (iterator.hasNext()) { + RoutineLoadTaskInfo routineLoadTaskInfo = iterator.next(); + long beId = routineLoadManager.getMinTaskBeId(); + RoutineLoadJob routineLoadJob = routineLoadManager.getJob(routineLoadTaskInfo.getJobId()); + RoutineLoadTask routineLoadTask = routineLoadJob.createTask(routineLoadTaskInfo, beId); + if (routineLoadTask != null) { + routineLoadTaskInfo.setLoadStartTimeMs(System.currentTimeMillis()); + AgentTaskQueue.addTask(routineLoadTask); + batchTask.addTask(routineLoadTask); + clusterIdleSlotNum--; + scheduledTaskNum++; + routineLoadManager.addNumOfConcurrentTasksByBeId(beId); + } else { + LOG.debug("Task {} for job has been already discarded", routineLoadTaskInfo.getId()); + } + } else { + LOG.debug("All of tasks were scheduled."); + break; + } + } + LOG.info("{} tasks have bean allocated to be. There are {} remaining idle slot in cluster.", + scheduledTaskNum, routineLoadManager.getClusterIdleSlotNum()); + + if (batchTask.getTaskNum() > 0) { + AgentTaskExecutor.submit(batchTask); + } + } +} diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index ab88bff6be6ee3..db0480f7fd97d9 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -19,13 +19,16 @@ import com.google.common.collect.Lists; import mockit.Deencapsulation; -import mockit.Delegate; import mockit.Expectations; import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; import mockit.Mocked; +import mockit.Verifications; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.SystemIdGenerator; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -37,9 +40,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Queue; public class KafkaRoutineLoadJobTest { + private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; + @Test public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer, @Injectable PartitionInfo partitionInfo1, @@ -71,7 +78,7 @@ public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer, } }; - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L, + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", "miaoling", 1L, 1L, "1L", "v1", "", "", 3, RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(), "", ""); @@ -82,25 +89,65 @@ public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer, @Test public void testDivideRoutineLoadJob() { - KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "miaoling", 1L, + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", "miaoling", 1L, 1L, "1L", "v1", "", "", 3, RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(), "", ""); Deencapsulation.setField(kafkaRoutineLoadJob, "kafkaPartitions", Arrays.asList(1, 4, 6)); - List result = kafkaRoutineLoadJob.divideRoutineLoadJob(2); + kafkaRoutineLoadJob.divideRoutineLoadJob(2); + + List result = kafkaRoutineLoadJob.getNeedSchedulerTaskInfoList(); Assert.assertEquals(2, result.size()); - for (RoutineLoadTask routineLoadTask : result) { - KafkaRoutineLoadTask kafkaRoutineLoadTask = (KafkaRoutineLoadTask) routineLoadTask; - if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 2) { - Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(1)); - Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(6)); - } else if (kafkaRoutineLoadTask.getKafkaPartitions().size() == 1) { - Assert.assertTrue(kafkaRoutineLoadTask.getKafkaPartitions().contains(4)); + for (RoutineLoadTaskInfo routineLoadTaskInfo : result) { + KafkaTaskInfo kafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; + if (kafkaTaskInfo.getPartitions().size() == 2) { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(1)); + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(6)); + } else if (kafkaTaskInfo.getPartitions().size() == 1) { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(4)); } else { Assert.fail(); } } } + + @Test + public void testProcessTimeOutTasks() { + List routineLoadTaskInfoList = new ArrayList<>(); + KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo("1", "1"); + kafkaTaskInfo.addKafkaPartition(100); + kafkaTaskInfo.setLoadStartTimeMs(System.currentTimeMillis() - DEFAULT_TASK_TIMEOUT_SECONDS * 60 * 1000); + routineLoadTaskInfoList.add(kafkaTaskInfo); + + RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", "miaoling", 1L, + 1L, "1L", "v1", "", "", 3, + RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(), + "", ""); + Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList); + + new MockUp() { + @Mock + public long getNextId() { + return 2L; + } + }; + + + routineLoadJob.processTimeoutTasks(); + new Verifications() { + { + List idToRoutineLoadTask = + Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); + Assert.assertNotEquals("1", idToRoutineLoadTask.get(0).getId()); + Assert.assertEquals(1, idToRoutineLoadTask.size()); + List needSchedulerTask = + Deencapsulation.getField(routineLoadJob, "needSchedulerTaskInfoList"); + Assert.assertEquals(1, needSchedulerTask.size()); + Assert.assertEquals(100, (int) ((KafkaTaskInfo) (needSchedulerTask.get(0))) + .getPartitions().get(0)); + } + }; + } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java new file mode 100644 index 00000000000000..49f698e898877d --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import com.google.common.collect.Lists; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import mockit.Verifications; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.SystemIdGenerator; +import org.apache.doris.system.SystemInfoService; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +public class RoutineLoadManagerTest { + + private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; + + @Mocked + private SystemInfoService systemInfoService; + + @Test + public void testGetMinTaskBeId() throws LoadException { + List beIds = Lists.newArrayList(); + beIds.add(1L); + beIds.add(2L); + + new Expectations() { + { + systemInfoService.getBackendIds(true); + result = beIds; + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + } + }; + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addNumOfConcurrentTasksByBeId(1L); + Assert.assertEquals(2L, routineLoadManager.getMinTaskBeId()); + } + + @Test + public void testGetTotalIdleTaskNum() { + List beIds = Lists.newArrayList(); + beIds.add(1L); + beIds.add(2L); + + new Expectations() { + { + systemInfoService.getBackendIds(true); + result = beIds; + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + } + }; + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addNumOfConcurrentTasksByBeId(1L); + Assert.assertEquals(DEFAULT_BE_CONCURRENT_TASK_NUM * 2 - 1, routineLoadManager.getClusterIdleSlotNum()); + } + + @Test + public void testUpdateBeIdTaskMaps() { + List oldBeIds = Lists.newArrayList(); + oldBeIds.add(1L); + oldBeIds.add(2L); + + List newBeIds = Lists.newArrayList(); + newBeIds.add(1L); + newBeIds.add(3L); + + new Expectations() { + { + systemInfoService.getBackendIds(true); + returns(oldBeIds, newBeIds); + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + } + }; + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.updateBeIdTaskMaps(); + } + +} diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 2a9e4de35382c6..8f6e22ecde8a6f 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -17,11 +17,20 @@ package org.apache.doris.load.routineload; +import com.google.common.collect.Lists; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.persist.EditLog; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TResourceInfo; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -34,53 +43,79 @@ import java.util.Arrays; import java.util.List; -@RunWith(PowerMockRunner.class) -@PrepareForTest({Catalog.class}) public class RoutineLoadSchedulerTest { @Test - public void testNormalRunOneCycle() throws LoadException, MetaNotFoundException { - int taskNum = 1; - List routineLoadTaskList = new ArrayList<>(); - KafkaRoutineLoadTask kafkaRoutineLoadTask = EasyMock.createNiceMock(KafkaRoutineLoadTask.class); - EasyMock.expect(kafkaRoutineLoadTask.getSignature()).andReturn(1L).anyTimes(); - EasyMock.replay(kafkaRoutineLoadTask); - routineLoadTaskList.add(kafkaRoutineLoadTask); + public void testNormalRunOneCycle(@Mocked Catalog catalog, + @Injectable RoutineLoadManager routineLoadManager, + @Injectable SystemInfoService systemInfoService, + @Injectable Database database) + throws LoadException, MetaNotFoundException { - KafkaRoutineLoadJob routineLoadJob = EasyMock.createNiceMock(KafkaRoutineLoadJob.class); - EasyMock.expect(routineLoadJob.calculateCurrentConcurrentTaskNum()).andReturn(taskNum).anyTimes(); - EasyMock.expect(routineLoadJob.divideRoutineLoadJob(taskNum)).andReturn(routineLoadTaskList).anyTimes(); - EasyMock.expect(routineLoadJob.getState()).andReturn(RoutineLoadJob.JobState.NEED_SCHEDULER).anyTimes(); - EasyMock.replay(routineLoadJob); + String clusterName = "cluster1"; + List beIds = Lists.newArrayList(); + beIds.add(1L); + beIds.add(2L); - SystemInfoService systemInfoService = EasyMock.createNiceMock(SystemInfoService.class); - List beIds = Arrays.asList(1L, 2L, 3L); - EasyMock.expect(systemInfoService.getBackendIds(true)).andReturn(beIds).anyTimes(); - EasyMock.replay(systemInfoService); + List partitions = Lists.newArrayList(); + partitions.add(100); + partitions.add(200); + partitions.add(300); + RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", "miaoling", 1L, + 1L, "1L", "v1", "", "", 3, + RoutineLoadJob.JobState.NEED_SCHEDULER, RoutineLoadJob.DataSourceType.KAFKA, 0, new TResourceInfo(), + "", ""); + routineLoadJob.setState(RoutineLoadJob.JobState.NEED_SCHEDULER); + List routineLoadJobList = new ArrayList<>(); + routineLoadJobList.add(routineLoadJob); - Catalog catalog = EasyMock.createNiceMock(Catalog.class); - EditLog editLog = EasyMock.createNiceMock(EditLog.class); - PowerMock.mockStatic(Catalog.class); - EasyMock.expect(Catalog.getCurrentSystemInfo()).andReturn(systemInfoService).anyTimes(); - EasyMock.expect(Catalog.getInstance()).andReturn(catalog).anyTimes(); - PowerMock.replay(Catalog.class); + Deencapsulation.setField(routineLoadJob, "kafkaPartitions", partitions); + Deencapsulation.setField(routineLoadJob, "desireTaskConcurrentNum", 3); + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return systemInfoService; + } - RoutineLoad routineLoad = new RoutineLoad(); - EasyMock.expect(catalog.getEditLog()).andReturn(editLog).anyTimes(); - EasyMock.expect(catalog.getRoutineLoadInstance()).andReturn(routineLoad).anyTimes(); - EasyMock.replay(catalog); + @Mock + public Catalog getCurrentCatalog() { + return catalog; + } + }; - routineLoad.addRoutineLoadJob(routineLoadJob); - routineLoad.updateRoutineLoadJobState(routineLoadJob, RoutineLoadJob.JobState.NEED_SCHEDULER); + new Expectations() { + { + catalog.getRoutineLoadInstance(); + result = routineLoadManager; + routineLoadManager.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER); + result = routineLoadJobList; + catalog.getDb(anyLong); + result = database; + database.getClusterName(); + result = clusterName; + systemInfoService.getClusterBackendIds(clusterName, true); + result = beIds; + routineLoadManager.getSizeOfIdToRoutineLoadTask(); + result = 1; + routineLoadManager.getTotalMaxConcurrentTaskNum(); + result = 10; + } + }; RoutineLoadScheduler routineLoadScheduler = new RoutineLoadScheduler(); + Deencapsulation.setField(routineLoadScheduler, "routineLoadManager", routineLoadManager); routineLoadScheduler.runOneCycle(); - Assert.assertEquals(1, routineLoad.getIdToRoutineLoadTask().size()); - Assert.assertEquals(1, routineLoad.getIdToNeedSchedulerRoutineLoadTasks().size()); - Assert.assertEquals(1, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.RUNNING).size()); - Assert.assertEquals(0, routineLoad.getRoutineLoadJobByState(RoutineLoadJob.JobState.NEED_SCHEDULER).size()); - + Assert.assertEquals(2, routineLoadJob.getNeedSchedulerTaskInfoList().size()); + for (RoutineLoadTaskInfo routineLoadTaskInfo : routineLoadJob.getNeedSchedulerTaskInfoList()) { + KafkaTaskInfo kafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; + if (kafkaTaskInfo.getPartitions().size() == 2) { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(100)); + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(300)); + } else { + Assert.assertTrue(kafkaTaskInfo.getPartitions().contains(200)); + } + } } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java new file mode 100644 index 00000000000000..6226a0b2699f3c --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; +import mockit.Verifications; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.LoadException; +import org.apache.doris.task.AgentTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.thrift.TTaskType; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +public class RoutineLoadTaskSchedulerTest { + + @Mocked + private RoutineLoadManager routineLoadManager; + @Mocked + private Catalog catalog; + @Mocked + private AgentTaskExecutor agentTaskExecutor; + + @Test + public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1, + @Injectable KafkaRoutineLoadJob routineLoadJob) throws LoadException { + long beId = 100L; + + List routineLoadTaskInfoList = Lists.newArrayList(); + KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo("1", "1"); + routineLoadTaskInfo1.addKafkaPartition(1); + routineLoadTaskInfo1.addKafkaPartition(2); + routineLoadTaskInfoList.add(routineLoadTaskInfo1); + + + Map idToRoutineLoadTask = Maps.newHashMap(); + idToRoutineLoadTask.put(1L, routineLoadTaskInfo1); + + Map partitionIdToOffset = Maps.newHashMap(); + partitionIdToOffset.put(1, 100L); + partitionIdToOffset.put(2, 200L); + KafkaProgress kafkaProgress = new KafkaProgress(); + kafkaProgress.setPartitionIdToOffset(partitionIdToOffset); + + Map idToRoutineLoadJob = Maps.newConcurrentMap(); + idToRoutineLoadJob.put("1", routineLoadJob); + + Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); + + new Expectations() { + { + Catalog.getInstance(); + result = catalog; + catalog.getRoutineLoadInstance(); + result = routineLoadManager; + Catalog.getCurrentCatalog(); + result = catalog; + catalog.getNextId(); + result = 2L; + + routineLoadManager.getClusterIdleSlotNum(); + result = 3; + routineLoadManager.getNeedSchedulerRoutineLoadTasks(); + result = routineLoadTaskInfoList; + + kafkaRoutineLoadJob1.getDbId(); + result = 1L; + kafkaRoutineLoadJob1.getTableId(); + result = 1L; + kafkaRoutineLoadJob1.getColumns(); + result = "columns"; + kafkaRoutineLoadJob1.getColumnSeparator(); + result = ""; + kafkaRoutineLoadJob1.getProgress(); + result = kafkaProgress; + + + routineLoadManager.getMinTaskBeId(); + result = beId; + routineLoadManager.getJob(anyString); + result = kafkaRoutineLoadJob1; + } + }; + + KafkaRoutineLoadTask kafkaRoutineLoadTask = new KafkaRoutineLoadTask(kafkaRoutineLoadJob1.getResourceInfo(), + beId, kafkaRoutineLoadJob1.getDbId(), kafkaRoutineLoadJob1.getTableId(), + 0L, 0L, 0L, kafkaRoutineLoadJob1.getColumns(), kafkaRoutineLoadJob1.getWhere(), + kafkaRoutineLoadJob1.getColumnSeparator(), + (KafkaTaskInfo) routineLoadTaskInfo1, + kafkaRoutineLoadJob1.getProgress()); + + new Expectations() { + { + kafkaRoutineLoadJob1.createTask((RoutineLoadTaskInfo) any, anyLong); + result = kafkaRoutineLoadTask; + } + }; + + RoutineLoadTaskScheduler routineLoadTaskScheduler = new RoutineLoadTaskScheduler(); + routineLoadTaskScheduler.runOneCycle(); + + new Verifications() { + { + AgentTask routineLoadTask = + AgentTaskQueue.getTask(beId, TTaskType.STREAM_LOAD, 2L); + + Assert.assertEquals(beId, routineLoadTask.getBackendId()); + Assert.assertEquals(100L, + (long) ((KafkaRoutineLoadTask) routineLoadTask).getPartitionIdToOffset().get(1)); + Assert.assertEquals(200L, + (long) ((KafkaRoutineLoadTask) routineLoadTask).getPartitionIdToOffset().get(2)); + + routineLoadManager.addNumOfConcurrentTasksByBeId(beId); + times = 1; + } + }; + } +} diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 90e07e012e9369..15271c3997a8bb 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -155,7 +155,8 @@ enum TTaskType { PUBLISH_VERSION, CLEAR_ALTER_TASK, CLEAR_TRANSACTION_TASK, - RECOVER_TABLET + RECOVER_TABLET, + STREAM_LOAD } enum TStmtType {