Skip to content

Commit

Permalink
[Improvement][MasterServer] event response handle parallel (#7560)
Browse files Browse the repository at this point in the history
* [Feature][dolphinscheduler-api] parse traceId in http header for Cross system delivery to #7237 (#7238)

* to #7237

* rerun test

Co-authored-by: honghuo.zw <[email protected]>

* chery-pick 05aef27 and handle conflicts

* to #7065: fix ExecutorService and schedulerService (#7072)

Co-authored-by: honghuo.zw <[email protected]>

* [Feature][dolphinscheduler-api] access control of taskDefinition and taskInstance in project to #7081  (#7082)

* to #7081

* fix #7081

* to #7081

Co-authored-by: honghuo.zw <[email protected]>

* chery-pick 8ebe060 and handle conflicts

* cherry-pick 1f18444 and handle conflicts

* fix #6807: dolphinscheduler.zookeeper.env_vars - > dolphinscheduler.registry.env_vars (#6808)

Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>

* add default constructor (#6780)

Co-authored-by: honghuo.zw <[email protected]>

* to #7108 (#7109)

* to #7450

* to #7450: fix parallel bug

* add index

* expose config to user

* fix bug

* fix bug

* add delay delete

* fix bug

* add License

* fix ut

* fix ut

* fix name

Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>
  • Loading branch information
3 people authored Dec 24, 2021
1 parent 8fa61cf commit f8942bf
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 107 deletions.
4 changes: 3 additions & 1 deletion docker/build/conf/dolphinscheduler/master.properties.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,6 @@ master.reserved.memory=${MASTER_RESERVED_MEMORY}
# master failover interval minutes
master.failover.interval=${MASTER_FAILOVER_INTERVAL}
# master kill yarn job when handle failover
master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
# master.persist.event.state.threads
master.persist.event.state.threads=${MASTER_PERSIST_EVENT_STATE_THREADS}
1 change: 1 addition & 0 deletions docker/kubernetes/dolphinscheduler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ master:
ORG_QUARTZ_THREADPOOL_THREADCOUNT: "25"
ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT: "1"
SESSION_TIMEOUT_MS: 60000
MASTER_PERSIST_EVENT_STATE_THREADS: 10
## Periodic probe of container liveness. Container will be restarted if the probe fails. Cannot be updated.
## More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
livenessProbe:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,8 @@ CREATE TABLE `t_ds_process_task_relation` (
`condition_params` text COMMENT 'condition params(json)',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`)
PRIMARY KEY (`id`),
KEY `project_code_process_definition_code_index` (`project_code`,`process_definition_code`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

-- ----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ CREATE TABLE t_ds_process_task_relation (
PRIMARY KEY (id)
) ;

create index project_code_process_definition_code_index on t_ds_process_task_relation (project_code,process_definition_code);

DROP TABLE IF EXISTS t_ds_process_task_relation_log;
CREATE TABLE t_ds_process_task_relation_log (
id int NOT NULL ,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));


-- uc_dolphin_T_t_ds_process_instance_A_restart_time
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_restart_time;
delimiter d//
Expand All @@ -35,4 +36,25 @@ d//

delimiter ;
CALL uc_dolphin_T_t_ds_process_instance_A_restart_time();
DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time;
DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time;


-- uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
WHERE TABLE_NAME='t_ds_process_task_relation'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND INDEX_NAME ='project_code_process_definition_code_index')
THEN
ALTER TABLE `t_ds_process_task_relation` ADD KEY `project_code_process_definition_code_index`(`project_code`,`process_definition_code`) USING BTREE;
END IF;
END;

d//

delimiter ;
CALL uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index();
DROP PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index;
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ BEGIN
v_schema =current_schema();

EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL';

EXECUTE 'CREATE INDEX IF NOT EXISTS project_code_process_definition_code_index ON ' || quote_ident(v_schema) ||'.t_ds_process_task_relation USING Btree("project_code","process_definition_code")';

return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ public class MasterConfig {
@Value("${master.failover.interval:10}")
private int failoverInterval;

@Value("${master.kill.yarn.job.when.handle.fail.over:true}")
@Value("${master.kill.yarn.job.when.handle.failover:true}")
private boolean masterKillYarnJobWhenHandleFailOver;

@Value("${master.persist.event.state.threads:10}")
private int masterPersistEventStateThreads;

public int getListenPort() {
return listenPort;
}
Expand Down Expand Up @@ -183,4 +186,12 @@ public boolean getMasterKillYarnJobWhenHandleFailOver() {
public void setMasterKillYarnJobWhenHandleFailOver(boolean masterKillYarnJobWhenHandleFailOver) {
this.masterKillYarnJobWhenHandleFailOver = masterKillYarnJobWhenHandleFailOver;
}

public int getMasterPersistEventStateThreads() {
return masterPersistEventStateThreads;
}

public void setMasterPersistEventStateThreads(int masterPersistEventStateThreads) {
this.masterPersistEventStateThreads = masterPersistEventStateThreads;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.processor.queue;

import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;

public class TaskResponsePersistThread implements Runnable {

/**
* logger of TaskResponsePersistThread
*/
private static final Logger logger = LoggerFactory.getLogger(TaskResponsePersistThread.class);

private final ConcurrentLinkedQueue<TaskResponseEvent> events = new ConcurrentLinkedQueue<>();

private final Integer processInstanceId;

/**
* process service
*/
private ProcessService processService;

private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper;

public TaskResponsePersistThread(ProcessService processService,
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper,
Integer processInstanceId) {
this.processService = processService;
this.processInstanceMapper = processInstanceMapper;
this.processInstanceId = processInstanceId;
}

@Override
public void run() {
while (!this.events.isEmpty()) {
TaskResponseEvent event = this.events.peek();
try {
boolean result = persist(event);
if (!result) {
logger.error("persist meta error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId());
}
} catch (Exception e) {
logger.error("persist error, task id:{}, instance id:{}", event.getTaskInstanceId(), event.getProcessInstanceId(), e);
} finally {
this.events.remove(event);
}
}
}

/**
* persist taskResponseEvent
*
* @param taskResponseEvent taskResponseEvent
*/
private boolean persist(TaskResponseEvent taskResponseEvent) {
Event event = taskResponseEvent.getEvent();
Channel channel = taskResponseEvent.getChannel();

TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());

boolean result = true;

switch (event) {
case ACK:
try {
if (taskInstance != null) {
ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
processService.changeTaskState(taskInstance, status,
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
logger.debug("changeTaskState in ACK , changed in meta:{} ,task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
logger.debug("worker ack master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
} catch (Exception e) {
result = false;
logger.error("worker ack master error", e);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : taskInstance.getId());
channel.writeAndFlush(taskAckCommand.convert2Command());
}
break;
case RESULT:
try {
if (taskInstance != null) {
result = processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
);
logger.debug("changeTaskState in RESULT , changed in meta:{} task instance state:{}, task response event state:{}, taskInstance id:{},taskInstance host:{}",
result, taskInstance.getState(), taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
}
if (!result) {
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
logger.debug("worker response master failure, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
} else {
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
logger.debug("worker response master success, taskInstance id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
}
} catch (Exception e) {
result = false;
logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskResponseCommand.convert2Command());
}
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}

WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskResponseEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThread.addStateEvent(stateEvent);
}
return result;
}

public boolean addEvent(TaskResponseEvent event) {
if (event.getProcessInstanceId() != this.processInstanceId) {
logger.info("event would be abounded, task instance id:{}, process instance id:{}, this.processInstanceId:{}",
event.getTaskInstanceId(), event.getProcessInstanceId(), this.processInstanceId);
return false;
}
return this.events.add(event);
}

public int eventSize() {
return this.events.size();
}

public boolean isEmpty() {
return this.events.isEmpty();
}

public Integer getProcessInstanceId() {
return processInstanceId;
}

public String getKey() {
return String.valueOf(processInstanceId);
}
}
Loading

0 comments on commit f8942bf

Please sign in to comment.