diff --git a/docs/docs/en/guide/task/dependent.md b/docs/docs/en/guide/task/dependent.md index f13fed59c1a1..a0bdc6fcdf7c 100644 --- a/docs/docs/en/guide/task/dependent.md +++ b/docs/docs/en/guide/task/dependent.md @@ -16,9 +16,12 @@ Dependent nodes are **dependency check nodes**. For example, process A depends o - Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters. -| **Parameter** | **Description** | -|------------------|---------------------------------------------| -| Predecessor Task | The upstream task of the current task node. | +| **Parameter** | **Description** | +|---------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Add Dependencies | Configure dependent upstream tasks. | +| Check interval | Check the dependent upstream task status interval, the default is 10s. | +| Dependency failure policy | Failure: The dependent upstream task failure and the current task directly failure; Wait: The dependent upstream task failure and the current task continues to wait; | +| Dependency failure waiting time | When the dependency failure policy chooses to wait, the current task wait time. | ## Task Examples diff --git a/docs/docs/zh/guide/task/dependent.md b/docs/docs/zh/guide/task/dependent.md index ccdadd5aa996..c3318ae3d220 100644 --- a/docs/docs/zh/guide/task/dependent.md +++ b/docs/docs/zh/guide/task/dependent.md @@ -15,7 +15,13 @@ Dependent 节点,就是**依赖检查节点**。比如 A 流程依赖昨天的 [//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。) - 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。 -- 此任务除上述链接中的默认参数外无其他参数。 + +| **任务参数** | **描述** | +|----------|----------------------------------------------| +| 添加依赖 | 配置依赖的上游任务. | +| 检查间隔 | 检查依赖的上游任务状态间隔,默认10s. | +| 依赖失败策略 | 失败: 依赖的上游任务失败当前任务直接失败;等待: 依赖的上游任务失败当前任务继续等待; | +| 依赖失败等待时间 | 当依赖失败策略选择等待时,当前任务等待的时间. | ## 任务样例 diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java index 86517f5a1dd6..858057dbb89f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dependent/DependentAsyncTaskExecuteFunction.java @@ -56,7 +56,7 @@ @Slf4j public class DependentAsyncTaskExecuteFunction implements AsyncTaskExecuteFunction { - private static final Duration DEPENDENT_TASK_STATE_CHECK_INTERVAL = Duration.ofSeconds(10); + private static final Duration DEFAULT_STATE_CHECK_INTERVAL = Duration.ofSeconds(10); private final TaskExecutionContext taskExecutionContext; private final DependentParameters dependentParameters; @@ -195,6 +195,10 @@ private DependResult calculateDependResult() { private boolean isAllDependentTaskFinished() { boolean isAllDependentTaskFinished = true; for (DependentExecute dependentExecute : dependentTaskList) { + if (!dependentExecute.finish(dependentDate, processInstance.getTestFlag(), + dependentParameters.getFailurePolicy(), dependentParameters.getFailureWaitingTime())) { + isAllDependentTaskFinished = false; + } dependentExecute.getDependResultMap().forEach((dependentKey, dependResult) -> { if (!dependResultMap.containsKey(dependentKey)) { dependResultMap.put(dependentKey, dependResult); @@ -206,15 +210,13 @@ private boolean isAllDependentTaskFinished() { dependResult, dependentDate); } }); - if (!dependentExecute.finish(dependentDate, processInstance.getTestFlag())) { - isAllDependentTaskFinished = false; - } } return isAllDependentTaskFinished; } @Override public @NonNull Duration getAsyncTaskStateCheckInterval() { - return DEPENDENT_TASK_STATE_CHECK_INTERVAL; + return dependentParameters.getCheckInterval() == null ? DEFAULT_STATE_CHECK_INTERVAL + : Duration.ofSeconds(dependentParameters.getCheckInterval()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java index 38a71d00ec04..cd212cbff86c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.utils; +import static org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters.DependentFailurePolicyEnum.DEPENDENT_FAILURE_WAITING; + import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -27,9 +29,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; +import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -62,11 +67,6 @@ public class DependentExecute { private TaskInstance taskInstance; - /** - * depend result - */ - private DependResult modelDependResult = DependResult.WAITING; - /** * depend result map */ @@ -231,10 +231,15 @@ private DependResult getDependResultByState(TaskExecutionStatus state) { * @param currentTime current time * @return boolean */ - public boolean finish(Date currentTime, int testFlag) { + public boolean finish(Date currentTime, int testFlag, DependentParameters.DependentFailurePolicyEnum failurePolicy, + Integer failureWaitingTime) { + DependResult modelDependResult = getModelDependResult(currentTime, testFlag); if (modelDependResult == DependResult.WAITING) { - modelDependResult = getModelDependResult(currentTime, testFlag); return false; + } else if (modelDependResult == DependResult.FAILED && DEPENDENT_FAILURE_WAITING == failurePolicy + && failureWaitingTime != null) { + return Duration.between(currentTime.toInstant(), Instant.now()) + .compareTo(Duration.ofMinutes(failureWaitingTime)) > 0; } return true; } @@ -260,13 +265,12 @@ public DependResult getModelDependResult(Date currentTime, int testFlag) { continue; } DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag); - if (dependResult != DependResult.WAITING) { + if (dependResult != DependResult.WAITING && dependResult != DependResult.FAILED) { dependResultMap.put(dependentItem.getKey(), dependResult); } dependResultList.add(dependResult); } - modelDependResult = DependentUtils.getDependResultForRelation(this.relation, dependResultList); - return modelDependResult; + return DependentUtils.getDependResultForRelation(this.relation, dependResultList); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java index 2b90f4a1cc66..1e648d57757e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java @@ -31,10 +31,23 @@ public class DependentParameters extends AbstractParameters { private List dependTaskList; private DependentRelation relation; + /** Time unit is second */ + private Integer checkInterval; + private DependentFailurePolicyEnum failurePolicy; + /** Time unit is minutes */ + private Integer failureWaitingTime; @Override public boolean checkParameters() { return true; } + /** + * the dependent task failure policy. + */ + public enum DependentFailurePolicyEnum { + DEPENDENT_FAILURE_FAILURE, + DEPENDENT_FAILURE_WAITING + } + } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 09dbd1156bcf..6c2b8c147c7f 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -373,6 +373,7 @@ export default { times: 'Times', failed_retry_interval: 'Failed retry interval', minute: 'Minute', + second: 'Second', delay_execution_time: 'Delay execution time', namespace_cluster: 'Namespace(Cluster)', min_cpu: 'Min cpu', @@ -385,7 +386,8 @@ export default { command_tips: 'Please enter the container execution command, for example: ["printenv"]', args: 'Args', - args_tips: 'Please enter the container execution command args, for example: ["HOSTNAME", "KUBERNETES_PORT"]', + args_tips: + 'Please enter the container execution command args, for example: ["HOSTNAME", "KUBERNETES_PORT"]', min_memory_tips: 'Please enter min memory', state: 'State', branch_flow: 'Branch flow', @@ -622,6 +624,7 @@ export default { add_dependency: 'Add dependency', waiting_dependent_start: 'Waiting Dependent start', check_interval: 'Check interval', + check_interval_tips: 'Check interval must be a positive integer', waiting_dependent_complete: 'Waiting Dependent complete', project_name: 'Project Name', project_name_tips: 'Please select a project(required)', @@ -694,7 +697,7 @@ export default { zeppelin_username: 'zeppelinUsername', zeppelin_username_tips: 'Please enter the zeppelin server username', zeppelin_password: 'zeppelinPassword', - zeppelin_password_tips: 'Please enter the zeppelin server password', + zeppelin_password_tips: 'Please enter the zeppelin server password', hive_cli_task_execution_type: 'Hive Cli Task Execution Type', hive_sql_script: 'Hive SQL Script', hive_cli_options: 'Hive Cli Options', @@ -820,7 +823,13 @@ export default { pipeline_name: 'Pipeline Name', factory_tips: 'Please select factory', resource_group_tips: 'Please select resource group', - pipeline_tips: 'Please select pipeline' + pipeline_tips: 'Please select pipeline', + dependent_failure_policy: 'Dependent failure policy', + dependent_failure_policy_failure: 'failure', + dependent_failure_policy_waiting: 'waiting', + dependent_failure_waiting_time: 'Dependent failure waiting time', + dependent_failure_waiting_time_tips: + 'Failure waiting time must be a positive integer' }, menu: { fav: 'Favorites', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index b0c780386a26..bf36550e237e 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -37,10 +37,10 @@ export default { confirm: '确定', cancel: '取消', delete_confirm: '确定删除吗?', - authorize_level:'权限等级', + authorize_level: '权限等级', no_permission: '无权限', read_permission: '读权限', - all_permission: '所有权限', + all_permission: '所有权限' }, workflow: { on_line: '线上', @@ -220,7 +220,7 @@ export default { workflow_relation_no_data_result_desc: '目前没有任何工作流,请先创建工作流,再访问该页面', ready_to_block: '准备锁定', - block: '锁定', + block: '锁定' }, task: { on_line: '线上', @@ -334,7 +334,7 @@ export default { online: '已上线' }, node: { - is_cache: "缓存执行", + is_cache: '缓存执行', is_module_path: '使用模块路径', run_type: '运行类型', jvm_args: '虚拟机参数', @@ -372,6 +372,7 @@ export default { times: '次', failed_retry_interval: '失败重试间隔', minute: '分', + second: '秒', delay_execution_time: '延时执行时间', namespace_cluster: '命名空间(集群)', min_cpu: '最小cpu', @@ -614,6 +615,7 @@ export default { add_dependency: '添加依赖', waiting_dependent_start: '等待依赖启动', check_interval: '检查间隔', + check_interval_tips: '检查间隔必须为正整数', waiting_dependent_complete: '等待依赖完成', project_name: '项目名称', project_name_tips: '项目名称(必填)', @@ -798,7 +800,12 @@ export default { pipeline_name: 'pipeline名称', factory_tips: '请选择工厂', resource_group_tips: '请选择资源组', - pipeline_tips: '请选择pipeline' + pipeline_tips: '请选择pipeline', + dependent_failure_policy: '依赖失败策略', + dependent_failure_policy_failure: '失败', + dependent_failure_policy_waiting: '等待', + dependent_failure_waiting_time: '依赖失败等待时间', + dependent_failure_waiting_time_tips: '失败等待时间必须为正整数' }, menu: { fav: '收藏组件', @@ -808,6 +815,6 @@ export default { di: '数据集成', dq: '数据质量', ml: '机器学习', - other: '其他', + other: '其他' } } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts index 50d228cc99e7..433895f861cf 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dependent.ts @@ -15,7 +15,7 @@ * limitations under the License. */ -import { ref, onMounted, watch, h } from 'vue' +import { ref, onMounted, watch, h, computed } from 'vue' import { useI18n } from 'vue-i18n' import { NEllipsis, NIcon } from 'naive-ui' import { useRelationCustomParams, useDependentTimeout } from '.' @@ -36,13 +36,28 @@ import type { ITaskState, IDateType } from '../types' -import {IRenderOption} from "../types"; +import { IRenderOption } from '../types' export function useDependent(model: { [field: string]: any }): IJsonItem[] { const { t } = useI18n() const router: Router = useRouter() const nodeStore = useTaskNodeStore() + const dependentFailurePolicyOptions = computed(() => { + return [ + { + label: t('project.node.dependent_failure_policy_failure'), + value: 'DEPENDENT_FAILURE_FAILURE' + }, + { + label: t('project.node.dependent_failure_policy_waiting'), + value: 'DEPENDENT_FAILURE_WAITING' + } + ] + }) + const failureWaitingTimeSpan = computed(() => + model.failurePolicy === 'DEPENDENT_FAILURE_WAITING' ? 12 : 0 + ) const dependentResult = nodeStore.getDependentResult const TasksStateConfig = tasksState(t) const projectList = ref([] as IRenderOption[]) @@ -248,23 +263,26 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { if (!item.dependItemList?.length) return const itemListOptions = ref([] as IDependentItemOptions[]) - item.dependItemList?.forEach(async (dependItem: IDependentItem, itemIndex: number) => { - itemListOptions.value[itemIndex] = {} - if (dependItem.projectCode) { - itemListOptions.value[itemIndex].definitionCodeOptions = await getProcessList( - dependItem.projectCode - ) - } - if (dependItem.projectCode && dependItem.definitionCode) { - itemListOptions.value[itemIndex].depTaskCodeOptions = await getTaskList( - dependItem.projectCode, - dependItem.definitionCode - ) - } - if (dependItem.cycle) { - itemListOptions.value[itemIndex].dateOptions = DATE_LIST[dependItem.cycle] + item.dependItemList?.forEach( + async (dependItem: IDependentItem, itemIndex: number) => { + itemListOptions.value[itemIndex] = {} + if (dependItem.projectCode) { + itemListOptions.value[itemIndex].definitionCodeOptions = + await getProcessList(dependItem.projectCode) + } + if (dependItem.projectCode && dependItem.definitionCode) { + itemListOptions.value[itemIndex].depTaskCodeOptions = + await getTaskList( + dependItem.projectCode, + dependItem.definitionCode + ) + } + if (dependItem.cycle) { + itemListOptions.value[itemIndex].dateOptions = + DATE_LIST[dependItem.cycle] + } } - }) + ) selectOptions.value[taskIndex] = {} as IDependTaskOptions selectOptions.value[taskIndex].dependItemList = itemListOptions.value }) @@ -297,7 +315,9 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { const options = selectOptions?.value[i] || {} const itemListOptions = options?.dependItemList || [] const itemOptions = {} as IDependentItemOptions - itemOptions.definitionCodeOptions = await getProcessList(projectCode) + itemOptions.definitionCodeOptions = await getProcessList( + projectCode + ) itemListOptions[j] = itemOptions options.dependItemList = itemListOptions selectOptions.value[i] = options @@ -331,15 +351,14 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { }, onUpdateValue: async (processCode: number) => { const item = model.dependTaskList[i].dependItemList[j] - selectOptions.value[i].dependItemList[j].depTaskCodeOptions = await getTaskList( - item.projectCode, - processCode - ) + selectOptions.value[i].dependItemList[j].depTaskCodeOptions = + await getTaskList(item.projectCode, processCode) item.depTaskCode = 0 } }, - options: selectOptions.value[i]?.dependItemList[j] - ?.definitionCodeOptions || [], + options: + selectOptions.value[i]?.dependItemList[j] + ?.definitionCodeOptions || [], path: `dependTaskList.${i}.dependItemList.${j}.definitionCode`, rule: { required: true, @@ -430,6 +449,56 @@ export function useDependent(model: { [field: string]: any }): IJsonItem[] { }), childrenField: 'dependItemList', name: 'add_dependency' - }) + }), + { + type: 'input-number', + field: 'checkInterval', + name: t('project.node.check_interval'), + span: 12, + props: { + max: Math.pow(9, 10) - 1 + }, + slots: { + suffix: () => t('project.node.second') + }, + validate: { + trigger: ['input'], + validator(validate: any, value: number) { + if (!value && !/^[1-9]\d*$/.test(String(value))) { + return new Error(t('project.node.check_interval_tips')) + } + } + } + }, + { + type: 'radio', + field: 'failurePolicy', + name: t('project.node.dependent_failure_policy'), + options: dependentFailurePolicyOptions, + span: 24 + }, + { + type: 'input-number', + field: 'failureWaitingTime', + name: t('project.node.dependent_failure_waiting_time'), + span: failureWaitingTimeSpan, + props: { + max: Math.pow(9, 10) - 1 + }, + slots: { + suffix: () => t('project.node.minute') + }, + validate: { + trigger: ['input'], + required: true, + validator(validate: any, value: number) { + if (model.timeoutFlag && !/^[1-9]\d*$/.test(String(value))) { + return new Error( + t('project.node.dependent_failure_waiting_time_tips') + ) + } + } + } + } ] } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 90ad5b9fe315..959a99a14274 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -23,8 +23,7 @@ import type { ISqoopTargetParams, ISqoopSourceParams, ILocalParam, - IDependTask, - RelationType + IDependentParameters } from './types' export function formatParams(data: INodeData): { @@ -279,6 +278,9 @@ export function formatParams(data: INodeData): { } if (data.taskType === 'DEPENDENT') { taskParams.dependence = { + checkInterval: data.checkInterval, + failurePolicy: data.failurePolicy, + failureWaitingTime: data.failureWaitingTime, relation: data.relation, dependTaskList: data.dependTaskList } @@ -651,7 +653,12 @@ export function formatModel(data: ITaskData) { } if (data.taskParams?.dependence) { - const dependence: { relation?: RelationType, dependTaskList?: IDependTask[] } = JSON.parse(JSON.stringify(data.taskParams.dependence)) + const dependence: IDependentParameters = JSON.parse( + JSON.stringify(data.taskParams.dependence) + ) + params.checkInterval = dependence.checkInterval + params.failurePolicy = dependence.failurePolicy + params.failureWaitingTime = dependence.failureWaitingTime params.dependTaskList = dependence.dependTaskList || [] params.relation = dependence.relation } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dependent.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dependent.ts index 2551d1076f68..02bfcf7ebc9a 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dependent.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dependent.ts @@ -48,6 +48,8 @@ export function useDependent({ timeoutNotifyStrategy: [], timeout: 30, timeoutFlag: false, + failurePolicy: 'DEPENDENT_FAILURE_FAILURE', + checkInterval: 10, ...data } as INodeData) diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 43ff0596ffdf..e3fb2569bd3c 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -107,6 +107,14 @@ interface ISwitchResult { nextNode?: number } +interface IDependentParameters { + checkInterval?: number + failurePolicy?: 'DEPENDENT_FAILURE_FAILURE' | 'DEPENDENT_FAILURE_WAITING' + failureWaitingTime?: number + relation?: RelationType + dependTaskList?: IDependTask[] +} + /* * resourceName: resource full name * res: resource file name @@ -301,10 +309,7 @@ interface ITaskParams { switchResult?: ISwitchResult dependTaskList?: IDependTask[] nextNode?: number - dependence?: { - relation?: RelationType - dependTaskList?: IDependTask[] - } + dependence?: IDependentParameters customConfig?: number json?: string dsType?: string @@ -440,6 +445,7 @@ interface INodeData >, ISqoopTargetData, ISqoopSourceData, + IDependentParameters, Omit { id?: string taskType?: ITaskType @@ -518,5 +524,6 @@ export { FormRules, IJsonItemParams, IResponseJsonItem, - IDateType + IDateType, + IDependentParameters }