Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-#14148][Task] Added status check interval and dependent failure policy parameters for dependent task nodes #14150

Merged
merged 5 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions docs/docs/en/guide/task/dependent.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ Dependent nodes are **dependency check nodes**. For example, process A depends o

- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.

| **Parameter** | **Description** |
|------------------|---------------------------------------------|
| Predecessor Task | The upstream task of the current task node. |
| **Parameter** | **Description** |
|---------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Add Dependencies | Configure dependent upstream tasks. |
| Check interval | Check the dependent upstream task status interval, the default is 10s. |
| Dependency failure policy | Failure: The dependent upstream task failure and the current task directly failure; Wait: The dependent upstream task failure and the current task continues to wait; |
| Dependency failure waiting time | When the dependency failure policy chooses to wait, the current task wait time. |

## Task Examples

Expand Down
8 changes: 7 additions & 1 deletion docs/docs/zh/guide/task/dependent.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ Dependent 节点,就是**依赖检查节点**。比如 A 流程依赖昨天的
[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。)

- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
- 此任务除上述链接中的默认参数外无其他参数。

| **任务参数** | **描述** |
|----------|----------------------------------------------|
| 添加依赖 | 配置依赖的上游任务. |
| 检查间隔 | 检查依赖的上游任务状态间隔,默认10s. |
| 依赖失败策略 | 失败: 依赖的上游任务失败当前任务直接失败;等待: 依赖的上游任务失败当前任务继续等待; |
| 依赖失败等待时间 | 当依赖失败策略选择等待时,当前任务等待的时间. |

## 任务样例

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
@Slf4j
public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFunction {

private static final Duration DEPENDENT_TASK_STATE_CHECK_INTERVAL = Duration.ofSeconds(10);
private static final Duration DEFAULT_STATE_CHECK_INTERVAL = Duration.ofSeconds(10);

private final TaskExecutionContext taskExecutionContext;
private final DependentParameters dependentParameters;
Expand Down Expand Up @@ -195,6 +195,10 @@ private DependResult calculateDependResult() {
private boolean isAllDependentTaskFinished() {
boolean isAllDependentTaskFinished = true;
for (DependentExecute dependentExecute : dependentTaskList) {
if (!dependentExecute.finish(dependentDate, processInstance.getTestFlag(),
dependentParameters.getFailurePolicy(), dependentParameters.getFailureWaitingTime())) {
isAllDependentTaskFinished = false;
}
dependentExecute.getDependResultMap().forEach((dependentKey, dependResult) -> {
if (!dependResultMap.containsKey(dependentKey)) {
dependResultMap.put(dependentKey, dependResult);
Expand All @@ -206,15 +210,13 @@ private boolean isAllDependentTaskFinished() {
dependResult, dependentDate);
}
});
if (!dependentExecute.finish(dependentDate, processInstance.getTestFlag())) {
isAllDependentTaskFinished = false;
}
}
return isAllDependentTaskFinished;
}

@Override
public @NonNull Duration getAsyncTaskStateCheckInterval() {
return DEPENDENT_TASK_STATE_CHECK_INTERVAL;
return dependentParameters.getCheckInterval() == null ? DEFAULT_STATE_CHECK_INTERVAL
: Duration.ofSeconds(dependentParameters.getCheckInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.server.master.utils;

import static org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters.DependentFailurePolicyEnum.DEPENDENT_FAILURE_WAITING;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
Expand All @@ -27,9 +29,12 @@
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -62,11 +67,6 @@ public class DependentExecute {

private TaskInstance taskInstance;

/**
* depend result
*/
private DependResult modelDependResult = DependResult.WAITING;

/**
* depend result map
*/
Expand Down Expand Up @@ -231,10 +231,15 @@ private DependResult getDependResultByState(TaskExecutionStatus state) {
* @param currentTime current time
* @return boolean
*/
public boolean finish(Date currentTime, int testFlag) {
public boolean finish(Date currentTime, int testFlag, DependentParameters.DependentFailurePolicyEnum failurePolicy,
Integer failureWaitingTime) {
DependResult modelDependResult = getModelDependResult(currentTime, testFlag);
if (modelDependResult == DependResult.WAITING) {
modelDependResult = getModelDependResult(currentTime, testFlag);
return false;
} else if (modelDependResult == DependResult.FAILED && DEPENDENT_FAILURE_WAITING == failurePolicy
&& failureWaitingTime != null) {
return Duration.between(currentTime.toInstant(), Instant.now())
.compareTo(Duration.ofMinutes(failureWaitingTime)) > 0;
}
return true;
}
Expand All @@ -260,13 +265,12 @@ public DependResult getModelDependResult(Date currentTime, int testFlag) {
continue;
}
DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag);
if (dependResult != DependResult.WAITING) {
if (dependResult != DependResult.WAITING && dependResult != DependResult.FAILED) {
dependResultMap.put(dependentItem.getKey(), dependResult);
}
dependResultList.add(dependResult);
}
modelDependResult = DependentUtils.getDependResultForRelation(this.relation, dependResultList);
return modelDependResult;
return DependentUtils.getDependResultForRelation(this.relation, dependResultList);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,23 @@ public class DependentParameters extends AbstractParameters {

private List<DependentTaskModel> dependTaskList;
private DependentRelation relation;
/** Time unit is second */
private Integer checkInterval;
private DependentFailurePolicyEnum failurePolicy;
/** Time unit is minutes */
private Integer failureWaitingTime;

@Override
public boolean checkParameters() {
return true;
}

/**
* the dependent task failure policy.
*/
public enum DependentFailurePolicyEnum {
DEPENDENT_FAILURE_FAILURE,
DEPENDENT_FAILURE_WAITING
}

}
15 changes: 12 additions & 3 deletions dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ export default {
times: 'Times',
failed_retry_interval: 'Failed retry interval',
minute: 'Minute',
second: 'Second',
delay_execution_time: 'Delay execution time',
namespace_cluster: 'Namespace(Cluster)',
min_cpu: 'Min cpu',
Expand All @@ -385,7 +386,8 @@ export default {
command_tips:
'Please enter the container execution command, for example: ["printenv"]',
args: 'Args',
args_tips: 'Please enter the container execution command args, for example: ["HOSTNAME", "KUBERNETES_PORT"]',
args_tips:
'Please enter the container execution command args, for example: ["HOSTNAME", "KUBERNETES_PORT"]',
min_memory_tips: 'Please enter min memory',
state: 'State',
branch_flow: 'Branch flow',
Expand Down Expand Up @@ -622,6 +624,7 @@ export default {
add_dependency: 'Add dependency',
waiting_dependent_start: 'Waiting Dependent start',
check_interval: 'Check interval',
check_interval_tips: 'Check interval must be a positive integer',
waiting_dependent_complete: 'Waiting Dependent complete',
project_name: 'Project Name',
project_name_tips: 'Please select a project(required)',
Expand Down Expand Up @@ -694,7 +697,7 @@ export default {
zeppelin_username: 'zeppelinUsername',
zeppelin_username_tips: 'Please enter the zeppelin server username',
zeppelin_password: 'zeppelinPassword',
zeppelin_password_tips: 'Please enter the zeppelin server password',
zeppelin_password_tips: 'Please enter the zeppelin server password',
hive_cli_task_execution_type: 'Hive Cli Task Execution Type',
hive_sql_script: 'Hive SQL Script',
hive_cli_options: 'Hive Cli Options',
Expand Down Expand Up @@ -820,7 +823,13 @@ export default {
pipeline_name: 'Pipeline Name',
factory_tips: 'Please select factory',
resource_group_tips: 'Please select resource group',
pipeline_tips: 'Please select pipeline'
pipeline_tips: 'Please select pipeline',
dependent_failure_policy: 'Dependent failure policy',
dependent_failure_policy_failure: 'failure',
dependent_failure_policy_waiting: 'waiting',
dependent_failure_waiting_time: 'Dependent failure waiting time',
dependent_failure_waiting_time_tips:
'Failure waiting time must be a positive integer'
},
menu: {
fav: 'Favorites',
Expand Down
19 changes: 13 additions & 6 deletions dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ export default {
confirm: '确定',
cancel: '取消',
delete_confirm: '确定删除吗?',
authorize_level:'权限等级',
authorize_level: '权限等级',
no_permission: '无权限',
read_permission: '读权限',
all_permission: '所有权限',
all_permission: '所有权限'
},
workflow: {
on_line: '线上',
Expand Down Expand Up @@ -220,7 +220,7 @@ export default {
workflow_relation_no_data_result_desc:
'目前没有任何工作流,请先创建工作流,再访问该页面',
ready_to_block: '准备锁定',
block: '锁定',
block: '锁定'
},
task: {
on_line: '线上',
Expand Down Expand Up @@ -334,7 +334,7 @@ export default {
online: '已上线'
},
node: {
is_cache: "缓存执行",
is_cache: '缓存执行',
is_module_path: '使用模块路径',
run_type: '运行类型',
jvm_args: '虚拟机参数',
Expand Down Expand Up @@ -372,6 +372,7 @@ export default {
times: '次',
failed_retry_interval: '失败重试间隔',
minute: '分',
second: '秒',
delay_execution_time: '延时执行时间',
namespace_cluster: '命名空间(集群)',
min_cpu: '最小cpu',
Expand Down Expand Up @@ -614,6 +615,7 @@ export default {
add_dependency: '添加依赖',
waiting_dependent_start: '等待依赖启动',
check_interval: '检查间隔',
check_interval_tips: '检查间隔必须为正整数',
waiting_dependent_complete: '等待依赖完成',
project_name: '项目名称',
project_name_tips: '项目名称(必填)',
Expand Down Expand Up @@ -798,7 +800,12 @@ export default {
pipeline_name: 'pipeline名称',
factory_tips: '请选择工厂',
resource_group_tips: '请选择资源组',
pipeline_tips: '请选择pipeline'
pipeline_tips: '请选择pipeline',
dependent_failure_policy: '依赖失败策略',
dependent_failure_policy_failure: '失败',
dependent_failure_policy_waiting: '等待',
dependent_failure_waiting_time: '依赖失败等待时间',
dependent_failure_waiting_time_tips: '失败等待时间必须为正整数'
},
menu: {
fav: '收藏组件',
Expand All @@ -808,6 +815,6 @@ export default {
di: '数据集成',
dq: '数据质量',
ml: '机器学习',
other: '其他',
other: '其他'
}
}
Loading