Skip to content

Commit

Permalink
[Feature-13511] Submit Spark task directly on Kubernetes (#13550)
Browse files Browse the repository at this point in the history
  • Loading branch information
Radeity authored Feb 21, 2023
1 parent aea8405 commit 047fa2f
Show file tree
Hide file tree
Showing 33 changed files with 996 additions and 428 deletions.
2 changes: 1 addition & 1 deletion deploy/kubernetes/dolphinscheduler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ master:
MASTER_MAX_CPU_LOAD_AVG: "-1"
MASTER_RESERVED_MEMORY: "0.3"
MASTER_FAILOVER_INTERVAL: "10m"
MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER: "true"
MASTER_KILL_APPLICATION_WHEN_HANDLE_FAILOVER: "true"

worker:
## PodManagementPolicy controls how pods are created during initial scale up, when replacing pods on nodes, or when scaling down.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/en/architecture/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ Location: `master-server/conf/application.yaml`
|master.max-cpu-load-avg|-1|master max CPU load avg, only higher than the system CPU load average, master server can schedule. default value -1: the number of CPU cores * 2|
|master.reserved-memory|0.3|master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G|
|master.failover-interval|10|failover interval, the unit is minute|
|master.kill-yarn-job-when-task-failover|true|whether to kill yarn job when failover taskInstance|
|master.kill-application-when-task-failover|true|whether to kill yarn/k8s application when failover taskInstance|
|master.registry-disconnect-strategy.strategy|stop|Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely|
|master.worker-group-refresh-interval|10s|The interval to refresh worker group from db to memory|
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/zh/architecture/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
|master.max-cpu-load-avg|-1|master最大cpuload均值,只有高于系统cpuload均值时,master服务才能调度任务. 默认值为-1: cpu cores * 2|
|master.reserved-memory|0.3|master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G|
|master.failover-interval|10|failover间隔,单位为分钟|
|master.kill-yarn-job-when-task-failover|true|当任务实例failover时,是否kill掉yarn job|
|master.kill-application-when-task-failover|true|当任务实例failover时,是否kill掉yarn或k8s application|
|master.registry-disconnect-strategy.strategy|stop|当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
|master.registry-disconnect-strategy.max-waiting-time|100s|当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
|master.master.worker-group-refresh-interval|10s|定期将workerGroup从数据库中同步到内存的时间间隔|
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.enums;

public enum ResourceManagerType {

YARN,

KUBERNETES;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.dolphinscheduler.common.constants.Constants.DATA_BASEDIR_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_VIEW_SUFFIXES;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE;
import static org.apache.dolphinscheduler.common.constants.Constants.UTF_8;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class FileUtils {

public static final String APPINFO_PATH = "appInfo.log";

public static final String KUBE_CONFIG_FILE = "config";

private FileUtils() {
throw new UnsupportedOperationException("Construct FileUtils");
}
Expand Down Expand Up @@ -116,6 +119,16 @@ public static String getProcessExecDir(String tenant,
taskInstanceId);
}

/**
* absolute path of kubernetes configuration file
*
* @param execPath
* @return
*/
public static String getKubeConfigPath(String execPath) {
return String.format(FORMAT_S_S, execPath, KUBE_CONFIG_FILE);
}

/**
* absolute path of appInfo file
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class MasterConfig implements Validator {
private double maxCpuLoadAvg = -1;
private double reservedMemory = 0.3;
private Duration failoverInterval = Duration.ofMinutes(10);
private boolean killYarnJobWhenTaskFailover = true;
private boolean killApplicationWhenTaskFailover = true;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();

private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
Expand Down Expand Up @@ -163,7 +163,7 @@ private void printConfig() {
log.info("Master config: maxCpuLoadAvg -> {} ", maxCpuLoadAvg);
log.info("Master config: reservedMemory -> {} ", reservedMemory);
log.info("Master config: failoverInterval -> {} ", failoverInterval);
log.info("Master config: killYarnJobWhenTaskFailover -> {} ", killYarnJobWhenTaskFailover);
log.info("Master config: killApplicationWhenTaskFailover -> {} ", killApplicationWhenTaskFailover);
log.info("Master config: registryDisconnectStrategy -> {} ", registryDisconnectStrategy);
log.info("Master config: masterAddress -> {} ", masterAddress);
log.info("Master config: masterRegistryPath -> {} ", masterRegistryPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import static org.apache.dolphinscheduler.common.constants.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.constants.Constants.USER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE;
Expand Down Expand Up @@ -72,6 +72,7 @@
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.JdbcUrlParser;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.spark.SparkParameters;
import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
Expand Down Expand Up @@ -328,11 +329,8 @@ protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance
dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext();
setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenant.getTenantCode());
}
K8sTaskExecutionContext k8sTaskExecutionContext = null;
if (TASK_TYPE_SET_K8S.contains(taskInstance.getTaskType())) {
k8sTaskExecutionContext = new K8sTaskExecutionContext();
setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
}

K8sTaskExecutionContext k8sTaskExecutionContext = setK8sTaskRelation(taskInstance);

Map<String, Property> businessParamsMap = curingParamsService.preBuildBusinessParams(processInstance);

Expand Down Expand Up @@ -635,18 +633,39 @@ public Map<String, String> getResourceFullNames(TaskInstance taskInstance) {
}

/**
* set k8s task relation
* @param k8sTaskExecutionContext k8sTaskExecutionContext
* get k8s task execution context based on task type and deploy mode
*
* @param taskInstance taskInstance
*/
private void setK8sTaskRelation(K8sTaskExecutionContext k8sTaskExecutionContext, TaskInstance taskInstance) {
K8sTaskParameters k8sTaskParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class);
Map<String, String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
String clusterName = namespace.get(CLUSTER);
String configYaml = processService.findConfigYamlByName(clusterName);
if (configYaml != null) {
k8sTaskExecutionContext.setConfigYaml(configYaml);
private K8sTaskExecutionContext setK8sTaskRelation(TaskInstance taskInstance) {
K8sTaskExecutionContext k8sTaskExecutionContext = null;
String namespace = "";
switch (taskInstance.getTaskType()) {
case "K8S":
case "KUBEFLOW":
K8sTaskParameters k8sTaskParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), K8sTaskParameters.class);
namespace = k8sTaskParameters.getNamespace();
break;
case "SPARK":
SparkParameters sparkParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), SparkParameters.class);
if (StringUtils.isNotEmpty(sparkParameters.getNamespace())) {
namespace = sparkParameters.getNamespace();
}
break;
default:
break;
}

if (StringUtils.isNotEmpty(namespace)) {
String clusterName = JSONUtils.toMap(namespace).get(CLUSTER);
String configYaml = processService.findConfigYamlByName(clusterName);
if (configYaml != null) {
k8sTaskExecutionContext =
new K8sTaskExecutionContext(configYaml, JSONUtils.toMap(namespace).get(NAMESPACE_NAME));
}
}
return k8sTaskExecutionContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private Optional<Date> getServerStartupTime(List<Server> servers, String host) {
/**
* failover task instance
* <p>
* 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 1. kill yarn/k8s job if run on worker and there are yarn/k8s jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
Expand All @@ -248,10 +248,10 @@ private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @Non
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();

if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(logClient, taskExecutionContext);
if (masterConfig.isKillApplicationWhenTaskFailover()) {
// only kill yarn/k8s job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn or k8s job");
ProcessUtils.killApplication(logClient, taskExecutionContext);
}
// kill worker task, When the master failover and worker failover happened in the same time,
// the task may not be failover if we don't set NEED_FAULT_TOLERANCE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void failoverWorker(@NonNull String workerHost) {
/**
* failover task instance
* <p>
* 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 1. kill yarn/k8s job if run on worker and there are yarn/k8s jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
Expand All @@ -175,10 +175,10 @@ private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @Non
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();

if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(logClient, taskExecutionContext);
if (masterConfig.isKillApplicationWhenTaskFailover()) {
// only kill yarn/k8s job if exists , the local thread has exited
log.info("TaskInstance failover begin kill the task related yarn or k8s job");
ProcessUtils.killApplication(logClient, taskExecutionContext);
}
} else {
log.info("The failover taskInstance is a master task");
Expand Down
4 changes: 2 additions & 2 deletions dolphinscheduler-master/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ master:
reserved-memory: 0.3
# failover interval, the unit is minute
failover-interval: 10m
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
# kill yarn / k8s application when failover taskInstance, default true
kill-application-when-task-failover: true
registry-disconnect-strategy:
# The disconnect strategy: stop, waiting
strategy: waiting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClient;
Expand Down Expand Up @@ -161,8 +162,8 @@ public static String getPidsStr(int processId) throws Exception {
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
public static @Nullable List<String> killYarnJob(@NonNull LogClient logClient,
@NonNull TaskExecutionContext taskExecutionContext) {
public static @Nullable List<String> killApplication(@NonNull LogClient logClient,
@NonNull TaskExecutionContext taskExecutionContext) {
if (taskExecutionContext.getLogPath() == null) {
return Collections.emptyList();
}
Expand All @@ -172,6 +173,7 @@ public static String getPidsStr(int processId) throws Exception {
List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath(),
taskExecutionContext.getAppInfoPath());
if (CollectionUtils.isNotEmpty(appIds)) {
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
taskExecutionContext
.setExecutePath(FileUtils.getProcessExecDir(
Expand All @@ -183,9 +185,7 @@ public static String getPidsStr(int processId) throws Exception {
taskExecutionContext.getTaskInstanceId()));
}
FileUtils.createWorkDirIfAbsent(taskExecutionContext.getExecutePath());
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(appIds, log,
taskExecutionContext.getTenantCode(),
taskExecutionContext.getExecutePath());
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(taskExecutionContext);
return appIds;
} else {
log.info("The current appId is empty, don't need to kill the yarn job, taskInstanceId: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ master:
reserved-memory: 0.03
# failover interval
failover-interval: 10m
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
# kill yarn/k8s application when failover taskInstance, default true
kill-application-when-task-failover: true
worker-group-refresh-interval: 10s

worker:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
Expand All @@ -38,6 +39,7 @@
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -229,8 +231,11 @@ public TaskResponse run(String execCommand) throws IOException, InterruptedExcep
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);

TaskExecutionStatus kubernetesStatus =
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());

// if SHELL task exit
if (status) {
if (status && kubernetesStatus.isSuccess()) {

// SHELL task state
result.setExitStatusCode(process.exitValue());
Expand Down Expand Up @@ -334,7 +339,11 @@ private void clear() {

LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>(1);
markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());

String logs = appendPodLogIfNeeded();
if (StringUtils.isNotEmpty(logs)) {
logBuffer.add("Dump logs from driver pod:");
logBuffer.add(logs);
}
if (!logBuffer.isEmpty()) {
// log handle
logHandler.accept(logBuffer);
Expand All @@ -348,6 +357,13 @@ private void clear() {
}
}

private String appendPodLogIfNeeded() {
if (Objects.isNull(taskRequest.getK8sTaskExecutionContext())) {
return "";
}
return ProcessUtils.getPodLog(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
}

/**
* get the standard output of the process
*
Expand Down
Loading

0 comments on commit 047fa2f

Please sign in to comment.