Skip to content

Commit

Permalink
Merge branch 'dev' into fix_ui_taskgroup_switchstatus
Browse files Browse the repository at this point in the history
  • Loading branch information
SbloodyS authored Mar 11, 2024
2 parents a151b66 + c61b81e commit c283aa9
Show file tree
Hide file tree
Showing 28 changed files with 696 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class ProjectWorkerGroupController extends BaseController {
@ @RequestParam(value = "workerGroups", required = false) String workerGroups
* @return create result code
*/
@Operation(summary = "assignWorkerGroups", description = "CREATE_PROCESS_DEFINITION_NOTES")
@Operation(summary = "assignWorkerGroups", description = "ASSIGN_WORKER_GROUPS_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", schema = @Schema(implementation = long.class, example = "123456")),
@Parameter(name = "workerGroups", description = "WORKER_GROUP_LIST", schema = @Schema(implementation = List.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,20 @@ public Result verifyTaskCanDelete(@Parameter(hidden = true) @RequestAttribute(va
putMsg(result, Status.SUCCESS);
return result;
}

@Operation(summary = "queryDownstreamDependentTaskList", description = "QUERY_DOWNSTREAM_DEPENDENT_TASK_NOTES")
@Parameters({
@Parameter(name = "workFlowCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = Long.class)),
@Parameter(name = "taskCode", description = "TASK_DEFINITION_CODE", required = false, schema = @Schema(implementation = Long.class, example = "123456789")),
})
@GetMapping(value = "/query-dependent-tasks")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKFLOW_LINEAGE_ERROR)
public Result<Map<String, Object>> queryDownstreamDependentTaskList(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@RequestParam(value = "workFlowCode") Long workFlowCode,
@RequestParam(value = "taskCode", required = false, defaultValue = "0") Long taskCode) {
Map<String, Object> result =
workFlowLineageService.queryDownstreamDependentTasks(workFlowCode, taskCode);
return returnDataList(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ public interface WorkFlowLineageService {
*/
Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode);

/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode);

/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,7 @@ public void offlineWorkflowDefinition(User loginUser, Long projectCode, Long wor
// do nothing if the workflow is already offline
return;
}

workflowDefinition.setReleaseState(ReleaseState.OFFLINE);
processDefinitionDao.updateById(workflowDefinition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -278,11 +279,29 @@ public Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitio
public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode) {
Set<TaskMainInfo> taskMainInfos = new HashSet<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentDepOnProcess(projectCode, processDefinitionCode);
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, 0);
List<TaskMainInfo> taskSubProcess =
workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, processDefinitionCode);
taskMainInfos.addAll(taskDependents);
taskMainInfos.addAll(taskSubProcess);
return taskMainInfos;
}

/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
@Override
public Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode) {
Map<String, Object> result = new HashMap<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode,
Objects.isNull(taskCode) ? 0 : taskCode.longValue());
result.put(Constants.DATA_LIST, taskDependents);
putMsg(result, Status.SUCCESS);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,17 @@ public void testQueryWorkFlowLineageByCode() {
Mockito.when(workFlowLineageService.queryWorkFlowLineageByCode(projectCode, code)).thenReturn(new HashMap<>());
assertDoesNotThrow(() -> workFlowLineageController.queryWorkFlowLineageByCode(user, projectCode, code));
}

@Test
public void testQueryDownstreamDependentTaskList() {
long code = 1L;
long taskCode = 1L;
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(workFlowLineageService.queryDownstreamDependentTasks(code, taskCode))
.thenReturn(result);

assertDoesNotThrow(
() -> workFlowLineageController.queryDownstreamDependentTaskList(user, code, taskCode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -494,9 +495,8 @@ public void testUpdateResourceContent() throws Exception {
ServiceException serviceException =
Assertions.assertThrows(ServiceException.class, () -> resourcesService.updateResourceContent(getUser(),
"/dolphinscheduler/123/resources/ResourcesServiceTest.jar", "123", "content"));
assertEquals(
"Internal Server Error: Resource file: /dolphinscheduler/123/resources/ResourcesServiceTest.jar is illegal",
serviceException.getMessage());
assertTrue(serviceException.getMessage()
.contains("Resource file: /dolphinscheduler/123/resources/ResourcesServiceTest.jar is illegal"));

// RESOURCE_NOT_EXIST
when(storageOperate.getResDir(Mockito.anyString())).thenReturn("/dolphinscheduler/123/resources");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public class TaskMainInfo {
*/
private Date taskUpdateTime;

/**
* projectCode
*/
private long projectCode;

/**
* processDefinitionCode
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ List<TaskMainInfo> queryTaskSubProcessDepOnProcess(@Param("projectCode") long pr
* current method `queryTaskDepOnProcess`. Which mean with the same parameter processDefinitionCode, all tasks in
* `queryTaskDepOnTask` are in the result of method `queryTaskDepOnProcess`.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return List of TaskMainInfo
*/
List<TaskMainInfo> queryTaskDependentDepOnProcess(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
List<TaskMainInfo> queryTaskDependentOnProcess(@Param("processDefinitionCode") long processDefinitionCode,
@Param("taskCode") long taskCode);

/**
* Query all tasks depend on task, only downstream task support currently(from dependent task type).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,13 @@
</where>
</select>

<select id="queryTaskDependentDepOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
<select id="queryTaskDependentOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, pd.project_code as projectCode
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
Expand All @@ -205,16 +206,16 @@
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<if test="projectCode != 0">
and ptr.project_code = #{projectCode}
</if>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<!-- For dependnet task type, using `like concat('%"definitionCode":', #{processDefinitionCode}, '%')` -->
<if test="processDefinitionCode != 0">
and td.task_type = 'DEPENDENT'
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"definitionCode":', #{processDefinitionCode}, '%')
</if>
<if test="taskCode != 0">
and (td.task_params like concat('%"depTaskCode":', #{taskCode}, '%') or td.task_params like concat('%"depTaskCode":-1%'))
</if>
</where>
</select>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class DependentItem {
private String dateValue;
private DependResult dependResult;
private TaskExecutionStatus status;
private Boolean parameterPassing;
private Boolean parameterPassing = false;

public String getKey() {
return String.format("%d-%d-%s-%s",
Expand Down
10 changes: 9 additions & 1 deletion dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ export default {
confirm_to_offline: 'Confirm to make the workflow offline?',
time_to_online: 'Confirm to make the Scheduler online?',
time_to_offline: 'Confirm to make the Scheduler offline?',
warning_dependent_tasks_title: 'Warning',
warning_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to make the workflow offline?',
warning_dependencies: 'Dependencies:',
delete_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the workflow.',
warning_offline_scheduler_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to make the scheduler offline?',
delete_task_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the task.',
warning_delete_scheduler_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to delete the scheduler?',
},
task: {
on_line: 'Online',
Expand Down Expand Up @@ -306,7 +313,8 @@ export default {
startup_parameter: 'Startup Parameter',
whether_dry_run: 'Whether Dry-Run',
please_choose: 'Please Choose',
remove_task_cache: 'Clear cache'
remove_task_cache: 'Clear cache',
delete_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the task.',
},
dag: {
create: 'Create Workflow',
Expand Down
10 changes: 9 additions & 1 deletion dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ export default {
confirm_to_offline: '是否确定下线该工作流?',
time_to_online: '是否确定上线该定时?',
time_to_offline: '是否确定下线该定时?',
warning_dependent_tasks_title: '警告',
warning_dependent_tasks_desc: '下游存在依赖, 下线操作可能会对下游任务产生影响. 你确定要下线该工作流嘛?',
warning_dependencies: '依赖如下:',
delete_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该工作流',
warning_offline_scheduler_dependent_tasks_desc: '下游存在依赖, 下线操作可能会对下游任务产生影响. 你确定要下线该定时嘛?',
delete_task_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该任务.',
warning_delete_scheduler_dependent_tasks_desc: '下游存在依赖, 删除定时可能会对下游任务产生影响. 你确定要删除该定时嘛?',
},
task: {
on_line: '线上',
Expand Down Expand Up @@ -304,7 +311,8 @@ export default {
startup_parameter: '启动参数',
whether_dry_run: '是否空跑',
please_choose: '请选择',
remove_task_cache: '清除缓存'
remove_task_cache: '清除缓存',
delete_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该任务定义',
},
dag: {
create: '创建工作流',
Expand Down
10 changes: 9 additions & 1 deletion dolphinscheduler-ui/src/service/modules/lineages/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

import { axios } from '@/service/service'
import { ProjectCodeReq, WorkflowCodeReq } from './types'
import {DependentTaskReq, ProjectCodeReq, WorkflowCodeReq} from './types'

export function queryWorkFlowList(projectCode: ProjectCodeReq): any {
return axios({
Expand All @@ -41,3 +41,11 @@ export function queryLineageByWorkFlowCode(
method: 'get'
})
}

export function queryDependentTasks(projectCode: number, params: DependentTaskReq): any {
return axios({
url: `/projects/${projectCode}/lineages/query-dependent-tasks`,
method: 'get',
params
})
}
5 changes: 5 additions & 0 deletions dolphinscheduler-ui/src/service/modules/lineages/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ interface WorkflowRes {
workFlowRelationList: WorkFlowRelationList[]
}

interface DependentTaskReq extends WorkflowCodeReq {
taskCode?: number
}

export {
ProjectCodeReq,
WorkflowCodeReq,
WorkFlowNameReq,
DependentTaskReq,
WorkflowRes,
WorkFlowListRes
}
Loading

0 comments on commit c283aa9

Please sign in to comment.