Skip to content

Commit

Permalink
Add distributor which scheduler task to be fairly
Browse files Browse the repository at this point in the history
Step1: updateBeIdTaskMaps, remove unalive be and add new alive be
Step2: process timeout tasks, if a task already has been allocate to be but not finished before DEFAULT_TASK_TIMEOUT_MINUTES, it will be discarded.
       At the same time, the partitions belong to old task will be allocate to a new task. The new task with a signatrue will be add in the queue of needSchedulerRoutineLoadTask.
Step3: process all of needSchedulerRoutineLoadTask, allocate task to be. The task will be executed by backend.
  • Loading branch information
EmmyMiao87 committed Nov 21, 2018
1 parent b57dfd4 commit 38f90fd
Show file tree
Hide file tree
Showing 16 changed files with 748 additions and 93 deletions.
10 changes: 5 additions & 5 deletions fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.routineload.RoutineLoad;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.master.MetaHelper;
import org.apache.doris.metric.MetricRepo;
Expand Down Expand Up @@ -241,7 +241,7 @@ public class Catalog {
private ConcurrentHashMap<String, Cluster> nameToCluster;

private Load load;
private RoutineLoad routineLoad;
private RoutineLoadManager routineLoadManager;
private ExportMgr exportMgr;
private Clone clone;
private Alter alter;
Expand Down Expand Up @@ -372,7 +372,7 @@ private Catalog() {
this.idToDb = new ConcurrentHashMap<>();
this.fullNameToDb = new ConcurrentHashMap<>();
this.load = new Load();
this.routineLoad = new RoutineLoad();
this.routineLoadManager = new RoutineLoadManager();
this.exportMgr = new ExportMgr();
this.clone = new Clone();
this.alter = new Alter();
Expand Down Expand Up @@ -4250,8 +4250,8 @@ public Load getLoadInstance() {
return this.load;
}

public RoutineLoad getRoutineLoadInstance() {
return routineLoad;
public RoutineLoadManager getRoutineLoadInstance() {
return routineLoadManager;
}

public ExportMgr getExportMgr() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,21 @@

package org.apache.doris.load.routineload;

public class KafkaRoutineLoadProgress {
import java.util.Map;

private String partitionName;
private long offset;
/**
* this is description of kafka routine load progress
* the data before offset was already loaded in doris
*/
public class KafkaProgress {

private Map<Integer, Long> partitionIdToOffset;

public Map<Integer, Long> getPartitionIdToOffset() {
return partitionIdToOffset;
}

public void setPartitionIdToOffset(Map<Integer, Long> partitionIdToOffset) {
this.partitionIdToOffset = partitionIdToOffset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.doris.load.routineload;

import com.google.common.base.Strings;
import com.google.gson.Gson;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.SystemIdGenerator;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TTaskType;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.logging.log4j.LogManager;
Expand All @@ -43,7 +43,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {

private String serverAddress;
private String topic;
// optional
// optional, user want to load partitions.
private List<Integer> kafkaPartitions;

public KafkaRoutineLoadJob() {
Expand All @@ -59,21 +59,23 @@ public KafkaRoutineLoadJob(long id, String name, String userName, long dbId, lon
this.topic = topic;
}

public KafkaProgress getProgress() {
Gson gson = new Gson();
return gson.fromJson(this.progress, KafkaProgress.class);
}

@Override
public List<RoutineLoadTask> divideRoutineLoadJob(int currentConcurrentTaskNum) {
public List<RoutineLoadTaskInfo> divideRoutineLoadJob(int currentConcurrentTaskNum) {
// divide kafkaPartitions into tasks
List<KafkaRoutineLoadTask> kafkaRoutineLoadTaskList = new ArrayList<>();
List<RoutineLoadTaskInfo> kafkaRoutineLoadTaskList = new ArrayList<>();
for (int i = 0; i < currentConcurrentTaskNum; i++) {
// TODO(ml): init load task
kafkaRoutineLoadTaskList.add(new KafkaRoutineLoadTask(getResourceInfo(), 0L, TTaskType.PUSH,
dbId, tableId, 0L, 0L, 0L, SystemIdGenerator.getNextId()));
kafkaRoutineLoadTaskList.add(new KafkaTaskInfo(SystemIdGenerator.getNextId()));
}
for (int i = 0; i < kafkaPartitions.size(); i++) {
kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum).addKafkaPartition(kafkaPartitions.get(i));
((KafkaTaskInfo) kafkaRoutineLoadTaskList.get(i % currentConcurrentTaskNum))
.addKafkaPartition(kafkaPartitions.get(i));
}
List<RoutineLoadTask> result = new ArrayList<>();
result.addAll(kafkaRoutineLoadTaskList);
return result;
return kafkaRoutineLoadTaskList;
}

@Override
Expand All @@ -99,6 +101,15 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
return Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum));
}

@Override
public RoutineLoadTask createTask(RoutineLoadTaskInfo routineLoadTaskInfo, long beId) {
return new KafkaRoutineLoadTask(getResourceInfo(),
beId, getDbId(), getTableId(),
0L, 0L, 0L, getColumns(), getWhere(), getColumnSeparator(),
(KafkaTaskInfo) routineLoadTaskInfo,
getProgress());
}

private void updatePartitions() {
// fetch all of kafkaPartitions in topic
if (kafkaPartitions == null || kafkaPartitions.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,29 @@

package org.apache.doris.load.routineload;

import com.google.common.collect.Lists;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TTaskType;

import java.util.List;
import java.util.HashMap;
import java.util.Map;

public class KafkaRoutineLoadTask extends RoutineLoadTask {

private List<Integer> kafkaPartitions;

public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId, TTaskType taskType,
long dbId, long tableId, long partitionId, long indexId, long tabletId, long signature) {
super(resourceInfo, backendId, taskType, dbId, tableId, partitionId, indexId, tabletId, signature);
this.kafkaPartitions = Lists.newArrayList();
}
public class KafkaRoutineLoadTask extends RoutineLoadTask {

public void addKafkaPartition(int partition) {
kafkaPartitions.add(partition);
private Map<Integer, Long> partitionIdToOffset;

public KafkaRoutineLoadTask(TResourceInfo resourceInfo, long backendId,
long dbId, long tableId, long partitionId, long indexId, long tabletId,
String columns, String where, String columnSeparator,
KafkaTaskInfo kafkaTaskInfo, KafkaProgress kafkaProgress) {
super(resourceInfo, backendId, TTaskType.STREAM_LOAD, dbId, tableId, partitionId, indexId, tabletId,
kafkaTaskInfo.getSignature(), columns, where, columnSeparator, RoutineLoadJob.DataSourceType.KAFKA);
this.partitionIdToOffset = new HashMap<>();
kafkaTaskInfo.getPartitions().parallelStream().forEach(entity ->
partitionIdToOffset.put(entity, kafkaProgress.getPartitionIdToOffset().get(entity)));
}

public List<Integer> getKafkaPartitions() {
return kafkaPartitions;
public Map<Integer, Long> getPartitionIdToOffset() {
return partitionIdToOffset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.routineload;

import org.apache.doris.common.SystemIdGenerator;

import java.util.ArrayList;
import java.util.List;

public class KafkaTaskInfo extends RoutineLoadTaskInfo {

private List<Integer> partitions;

public KafkaTaskInfo(long signature) {
super(signature);
this.partitions = new ArrayList<>();
}

public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo) {
super(SystemIdGenerator.getNextId());
this.partitions = kafkaTaskInfo.getPartitions();
}

public void addKafkaPartition(int partition) {
partitions.add(partition);
}

public List<Integer> getPartitions() {
return partitions;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
Expand All @@ -35,7 +34,7 @@
* The desireTaskConcurrentNum means that user expect the number of concurrent stream load
* The routine load job support different streaming medium such as KAFKA
*/
public class RoutineLoadJob implements Writable {
public abstract class RoutineLoadJob implements Writable {

public enum JobState {
NEED_SCHEDULER,
Expand Down Expand Up @@ -116,6 +115,26 @@ public long getId() {
return id;
}

public long getDbId() {
return dbId;
}

public long getTableId() {
return tableId;
}

public String getColumns() {
return columns;
}

public String getWhere() {
return where;
}

public String getColumnSeparator() {
return columnSeparator;
}

public JobState getState() {
return state;
}
Expand All @@ -128,7 +147,7 @@ public TResourceInfo getResourceInfo() {
return resourceInfo;
}

public List<RoutineLoadTask> divideRoutineLoadJob(int currentConcurrentTaskNum) {
public List<RoutineLoadTaskInfo> divideRoutineLoadJob(int currentConcurrentTaskNum) {
return null;
}

Expand All @@ -145,4 +164,6 @@ public void write(DataOutput out) throws IOException {
public void readFields(DataInput in) throws IOException {
// TODO(ml)
}

abstract RoutineLoadTask createTask(RoutineLoadTaskInfo routineLoadTaskInfo, long beId);
}
Loading

0 comments on commit 38f90fd

Please sign in to comment.