Skip to content

Commit

Permalink
[Improvement][Batch Query] Batch query ProcessDefinitions belongs to …
Browse files Browse the repository at this point in the history
…need failover ProcessInstance. (#12506)
  • Loading branch information
BongBongBang authored Nov 3, 2022
1 parent 44e0935 commit 7cdb926
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.dolphinscheduler.dao.repository;

import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.model.PageListingResult;

import java.util.List;

import javax.annotation.Nullable;

public interface ProcessDefinitionDao {
Expand All @@ -37,4 +40,10 @@ PageListingResult<ProcessDefinition> listingProcessDefinition(
int userId,
long projectCode);

/**
* query process definitions by definition codes and versions
* @param processInstances process instances where codes and version come from
* @return
*/
List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@
package org.apache.dolphinscheduler.dao.repository.impl;

import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

Expand All @@ -33,6 +41,8 @@ public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {

@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;

@Override
public PageListingResult<ProcessDefinition> listingProcessDefinition(int pageNumber, int pageSize, String searchVal,
Expand All @@ -48,4 +58,25 @@ public PageListingResult<ProcessDefinition> listingProcessDefinition(int pageNum
.records(processDefinitions.getRecords())
.build();
}

@Override
public List<ProcessDefinition> queryProcessDefinitionsByCodesAndVersions(List<ProcessInstance> processInstances) {
if (Objects.isNull(processInstances) || processInstances.isEmpty()) {
return new ArrayList<>();
}
List<ProcessDefinitionLog> processDefinitionLogs = processInstances
.parallelStream()
.map(processInstance -> {
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
return processDefinitionLog;
})
.collect(Collectors.toList());

List<ProcessDefinition> processDefinitions =
processDefinitionLogs.stream().map(log -> (ProcessDefinition) log).collect(Collectors.toList());

return processDefinitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
Expand All @@ -48,14 +49,17 @@

import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.NonNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import io.micrometer.core.annotation.Counted;
Expand All @@ -78,6 +82,9 @@ public class MasterFailoverService {

private final TaskInstanceDao taskInstanceDao;

@Autowired
private ProcessDefinitionDao processDefinitionDao;

public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
Expand Down Expand Up @@ -153,6 +160,12 @@ private void doFailoverMaster(@NonNull String masterHost) {
needFailoverProcessInstanceList.size(),
needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));

List<ProcessDefinition> processDefinitions =
processDefinitionDao.queryProcessDefinitionsByCodesAndVersions(needFailoverProcessInstanceList);
Map<Long, ProcessDefinition> codeDefinitionMap = processDefinitions
.stream()
.collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity()));

for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
Expand All @@ -161,10 +174,7 @@ private void doFailoverMaster(@NonNull String masterHost) {
LOGGER.info("WorkflowInstance doesn't need to failover");
continue;
}
// todo: use batch query
ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
ProcessDefinition processDefinition = codeDefinitionMap.get(processInstance.getProcessDefinitionCode());
processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
Expand All @@ -42,6 +43,7 @@
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
Expand All @@ -56,6 +58,7 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.context.ApplicationContext;
import org.springframework.util.ReflectionUtils;

import com.google.common.collect.Lists;

Expand Down Expand Up @@ -92,6 +95,9 @@ public class FailoverServiceTest {
@Mock
private LogClient logClient;

@Mock
private ProcessDefinitionDao processDefinitionDao;

private static int masterPort = 5678;
private static int workerPort = 1234;

Expand All @@ -113,6 +119,9 @@ public void before() throws Exception {
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
processInstanceExecCacheManager, logClient, taskInstanceDao);
Field processDefinitionDaoField = masterFailoverService.getClass().getDeclaredField("processDefinitionDao");
processDefinitionDaoField.setAccessible(true);
ReflectionUtils.setField(processDefinitionDaoField, masterFailoverService, processDefinitionDao);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,
Expand Down

0 comments on commit 7cdb926

Please sign in to comment.