Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time #15528

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ jobs:
fail-fast: false
matrix:
db: ["mysql", "postgresql"]
version: ["2.0.9", "3.0.6", "3.1.8", "3.2.0"]
version: ["2.0.9", "3.0.6", "3.1.9", "3.2.0"]
steps:
- name: Set up JDK 8
uses: actions/setup-java@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value =
* @param pageSize page size
* @return queue list
*/
@Operation(summary = "queryTasksByGroupId", description = "QUERY_ALL_TASKS_GROUP_NOTES")
@Operation(summary = "queryTaskGroupQueuesByGroupId", description = "QUERY_TASKS_GROUP_GROUP_QUEUES")
@Parameters({
@Parameter(name = "groupId", description = "GROUP_ID", required = false, schema = @Schema(implementation = int.class, example = "1", defaultValue = "-1")),
@Parameter(name = "taskInstanceName", description = "TASK_INSTANCE_NAME", required = false, schema = @Schema(implementation = String.class, example = "taskName")),
Expand All @@ -310,15 +310,21 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value =
@GetMapping(value = "/query-list-by-group-id")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_QUEUE_LIST_ERROR)
public Result queryTasksByGroupId(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName, processName, status,
groupId, pageNo, pageSize);
public Result queryTaskGroupQueues(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(
loginUser,
taskName,
processName,
status,
groupId,
pageNo,
pageSize);
return returnDataList(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ Map<String, Object> createTaskGroup(User loginUser, Long projectCode, String nam
Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
String description, int groupSize);

/**
* get task group status
*
* @param id task group id
* @return the result code and msg
*/
boolean isTheTaskGroupAvailable(int id);

/**
* query all task group by user id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
Expand Down Expand Up @@ -83,14 +82,12 @@
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.master.IStreamingTaskOperator;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.service.command.CommandService;
Expand Down Expand Up @@ -552,16 +549,20 @@
Map<String, Object> result = new HashMap<>();
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.selectById(queueId);
// check process instance exist
ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
if (processInstance == null) {
log.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.",
taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
return result;
ProcessInstance processInstance = processInstanceDao.queryOptionalById(taskGroupQueue.getProcessId())
.orElseThrow(
() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId()));
Comment on lines +552 to +554

Check notice

Code scanning / CodeQL

Unread local variable Note

Variable 'ProcessInstance processInstance' is never read.
checkMasterExists();

if (taskGroupQueue.getInQueue() == Flag.NO.getCode()) {
throw new ServiceException(Status.TASK_GROUP_QUEUE_ALREADY_START);
}
taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueMapper.updateById(taskGroupQueue);

checkMasterExists();
return forceStart(processInstance, taskGroupQueue);
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}

public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) {
Expand Down Expand Up @@ -664,32 +665,6 @@
return result;
}

/**
* prepare to update process instance command type and status
*
* @param processInstance process instance
* @return update result
*/
private Map<String, Object> forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) {
Map<String, Object> result = new HashMap<>();
if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
log.warn("Task group queue already starts, taskGroupQueueId:{}.", taskGroupQueue.getId());
putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START);
return result;
}

taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
processService.updateTaskGroupQueue(taskGroupQueue);
log.info("Sending force start command to master: {}.", processInstance.getHost());
ILogicTaskInstanceOperator iLogicTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class);
iLogicTaskInstanceOperator.forceStartTaskInstance(
new TaskInstanceForceStartRequest(processInstance.getId(), taskGroupQueue.getTaskId()));
putMsg(result, Status.SUCCESS);
return result;
}

/**
* check whether sub processes are offline before starting process definition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
* @return tasks list
*/
@Override
public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName, String processName, Integer status,
int groupId, int pageNo, int pageSize) {
public Map<String, Object> queryTasksByGroupId(User loginUser,
String taskName,
String processName,
Integer status,
int groupId,
int pageNo,
int pageSize) {
Map<String, Object> result = new HashMap<>();
Page<TaskGroupQueue> page = new Page<>(pageNo, pageSize);
PageInfo<TaskGroupQueue> pageInfo = new PageInfo<>(pageNo, pageSize);
Expand All @@ -79,8 +84,13 @@ public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName,
return result;
}
List<Project> projects = projectMapper.selectBatchIds(projectIds);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page,
taskName, processName, status, groupId, projects);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(
page,
taskName,
processName,
status,
groupId,
projects);

pageInfo.setTotal((int) taskGroupQueue.getTotal());
pageInfo.setTotalList(taskGroupQueue.getRecords());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public Map<String, Object> createTaskGroup(User loginUser, Long projectCode, Str
.description(description)
.groupSize(groupSize)
.userId(loginUser.getId())
.status(Flag.YES.getCode())
.status(Flag.YES)
.createTime(now)
.updateTime(now)
.build();
Expand Down Expand Up @@ -180,7 +180,7 @@ public Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
if (taskGroup.getStatus() != Flag.YES.getCode()) {
if (taskGroup.getStatus() != Flag.YES) {
log.warn("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_ERROR);
return result;
Expand All @@ -202,17 +202,6 @@ public Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
return result;
}

/**
* get task group status
*
* @param id task group id
* @return is the task group available
*/
@Override
public boolean isTheTaskGroupAvailable(int id) {
return taskGroupMapper.selectCountByIdStatus(id, Flag.YES.getCode()) == 1;
}

/**
* query all task group by user id
*
Expand Down Expand Up @@ -331,12 +320,12 @@ public Map<String, Object> closeTaskGroup(User loginUser, int id) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.NO.getCode()) {
if (taskGroup.getStatus() == Flag.NO) {
log.info("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_CLOSED);
return result;
}
taskGroup.setStatus(Flag.NO.getCode());
taskGroup.setStatus(Flag.NO);
int update = taskGroupMapper.updateById(taskGroup);
if (update > 0)
log.info("Task group close complete, taskGroupId:{}.", id);
Expand Down Expand Up @@ -364,12 +353,12 @@ public Map<String, Object> startTaskGroup(User loginUser, int id) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.YES.getCode()) {
if (taskGroup.getStatus() == Flag.YES) {
log.info("Task group has been started, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_OPENED);
return result;
}
taskGroup.setStatus(Flag.YES.getCode());
taskGroup.setStatus(Flag.YES);
taskGroup.setUpdateTime(new Date(System.currentTimeMillis()));
int update = taskGroupMapper.updateById(taskGroup);
if (update > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private TaskGroup getTaskGroup() {
.description(taskGroupDesc)
.groupSize(100)
.userId(1)
.status(Flag.YES.getCode())
.status(Flag.YES)
.build();

return taskGroup;
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testUpdate() {

User loginUser = getLoginUser();
TaskGroup taskGroup = getTaskGroup();
taskGroup.setStatus(Flag.YES.getCode());
taskGroup.setStatus(Flag.YES);
// Task group status error

Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.TASK_GROUP,
Expand All @@ -218,7 +218,6 @@ public void testUpdate() {
logger.info(result.toString());
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

taskGroup.setStatus(0);
}

@Test
Expand All @@ -236,12 +235,12 @@ public void testCloseAndStart() {
Map<String, Object> result = taskGroupService.closeTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

taskGroup.setStatus(0);
taskGroup.setStatus(Flag.NO);
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
result = taskGroupService.closeTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.TASK_GROUP_STATUS_CLOSED, result.get(Constants.STATUS));

taskGroup.setStatus(1);
taskGroup.setStatus(Flag.YES);
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
result = taskGroupService.startTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.TASK_GROUP_STATUS_OPENED, result.get(Constants.STATUS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.common.enums.Flag;

import java.io.Serializable;
import java.util.Date;

Expand All @@ -36,44 +38,17 @@
@TableName("t_ds_task_group")
public class TaskGroup implements Serializable {

/**
* key
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* task_group name
*/
private String name;

private long projectCode;
private String description;
/**
* 作业组大小
*/
private int groupSize;
/**
* 已使用作业组大小
*/
private int useSize;
/**
* creator id
*/
private int userId;
/**
* 0 not available, 1 available
*/
private Integer status;
/**
* create time
*/
private Flag status;
private Date createTime;
/**
* update time
*/
private Date updateTime;
/**
* project code
*/
private long projectCode;

}
Loading
Loading