Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Migrate all workergroup-related interface functions from ProcessServiceImpl #12493

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;

import java.util.List;
import java.util.Map;

/**
Expand All @@ -28,7 +30,7 @@
public interface WorkerGroupService {

/**
* create or update a worker group
* Create or update a worker group
*
* @param loginUser login user
* @param id worker group id
Expand All @@ -42,7 +44,7 @@ Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String
String otherParamsJson);

/**
* query worker group paging
* Query worker group paging
*
* @param loginUser login user
* @param pageNo page number
Expand All @@ -53,25 +55,40 @@ Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String
Result queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal);

/**
* query all worker group
* Query all worker group
*
* @param loginUser
* @param loginUser login user
* @return all worker group list
*/
Map<String, Object> queryAllGroup(User loginUser);

/**
* delete worker group by id
* Delete worker group by id
* @param loginUser login user
* @param id worker group id
* @return delete result code
*/
Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id);

/**
* query all worker address list
* Query all worker address list
*
* @return all worker address list
*/
Map<String, Object> getWorkerAddressList();

/**
* Get task instance's worker group
* @param taskInstance task instance
* @return worker group
*/
String getTaskWorkerGroup(TaskInstance taskInstance);

/**
* Query worker group by process definition codes
* @param processDefinitionCodeList processDefinitionCodeList
* @return worker group map
*/
Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList);

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
Expand Down Expand Up @@ -150,6 +151,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private TaskGroupQueueMapper taskGroupQueueMapper;

@Autowired
private WorkerGroupService workerGroupService;

/**
* execute process instance
*
Expand Down Expand Up @@ -1030,7 +1034,7 @@ private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(
.collect(Collectors.toList());

Map<Long, String> processDefinitionWorkerGroupMap =
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
workerGroupService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);

for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;

import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -74,6 +78,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
private RegistryClient registryClient;

@Autowired
private ProcessService processService;

@Autowired
private ScheduleMapper scheduleMapper;

/**
* create or update a worker group
*
Expand Down Expand Up @@ -354,4 +364,33 @@ public Map<String, Object> getWorkerAddressList() {
return result;
}

@Override
public String getTaskWorkerGroup(TaskInstance taskInstance) {
if (taskInstance == null) {
return null;
}

String workerGroup = taskInstance.getWorkerGroup();

if (StringUtils.isNotEmpty(workerGroup)) {
return workerGroup;
}
int processInstanceId = taskInstance.getProcessInstanceId();
ProcessInstance processInstance = processService.findProcessInstanceById(processInstanceId);

if (processInstance != null) {
return processInstance.getWorkerGroup();
}
logger.info("task : {} will use default worker group", taskInstance.getId());
return Constants.DEFAULT_WORKER_GROUP;
}

@Override
public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
List<Schedule> processDefinitionScheduleList =
scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
Schedule::getWorkerGroup));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public class ExecutorServiceTest {
@Mock
private CommandService commandService;

@Mock
private WorkerGroupService workerGroupService;

@Mock
private ProcessDefinitionMapper processDefinitionMapper;

Expand Down Expand Up @@ -289,7 +292,7 @@ public void testComplementWithDependentMode() {

Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP);
Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
Mockito.when(workerGroupService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
.thenReturn(processDefinitionWorkerGroupMap);

Command command = new Command();
Expand Down
Loading