Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/dev' into ob-datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiajie committed Mar 7, 2023
2 parents 833926e + c9066e8 commit fbfeaab
Show file tree
Hide file tree
Showing 65 changed files with 595 additions and 1,130 deletions.
6 changes: 3 additions & 3 deletions docs/docs/en/guide/resource/file-manage.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ Create a shell file, print `hello world`.

In the workflow definition module of project Manage, create a new workflow using a shell task.

- Script: 'sh hello.sh'
- Resource: Select 'hello.sh'
- Script: 'sh resource/hello.sh'
- Resource: Select 'resource/hello.sh'

> Notice: When using a resource file in the script, the file name needs to be the same as the full path of the selected resource:
> For example: if the resource path is `/resource/hello.sh`, you need to use the full path of `/resource/hello.sh` to use it in the script.
> For example: if the resource path is `resource/hello.sh`, you need to use the full path of `resource/hello.sh` to use it in the script.
![use-shell](../../../../img/new_ui/dev/resource/demo/file-demo02.png)

Expand Down
6 changes: 3 additions & 3 deletions docs/docs/zh/guide/resource/file-manage.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@

在项目管理的工作流定义模块,创建一个新的工作流,使用 shell 任务。

- 脚本:`sh hello.sh`
- 资源:选择 `hello.sh`
- 脚本:`sh resource/hello.sh`
- 资源:选择 `resource/hello.sh`

> 注意:脚本中选择资源文件时文件名称需要保持和所选择资源全路径一致:
> 例如:资源路径为`/resource/hello.sh` 则脚本中调用需要使用`/resource/hello.sh`全路径
> 例如:资源路径为`resource/hello.sh` 则脚本中调用需要使用`resource/hello.sh`全路径
![use-shell](../../../../img/new_ui/dev/resource/demo/file-demo02.png)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ public Map<String, Object> queryResourceList(User loginUser, ResourceType type,
List<User> userList = userMapper.selectList(null);
Set<String> visitedTenantEntityCode = new HashSet<>();
for (User userEntity : userList) {
Tenant tt = tenantMapper.queryById(userEntity.getTenantId());

String tenantEntityCode = tenantMapper.queryById(userEntity.getTenantId()).getTenantCode();
if (!visitedTenantEntityCode.contains(tenantEntityCode)) {
defaultPath = storageOperate.getResDir(tenantEntityCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@
select
<include refid="baseSql" />
from t_ds_task_group_queue
where priority = (select max(priority) from t_ds_task_group_queue where group_id = #{groupId}
and status = #{status} and in_queue = #{inQueue} and force_start = #{forceStart} ) and group_id = #{groupId}
and status = #{status} and in_queue = #{inQueue} and force_start = #{forceStart} limit 1
where group_id = #{groupId} and status = #{status} and in_queue = #{inQueue} and force_start = #{forceStart}
order by priority desc
limit 1
</select>

<select id="queryByTaskId" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
Expand All @@ -36,18 +35,15 @@

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;

/**
* TaskExecutionContext builder
*/

@Slf4j
public class TaskExecutionContextBuilder {

protected final Logger log =
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));

public static TaskExecutionContextBuilder get() {
return new TaskExecutionContextBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
Expand All @@ -28,7 +29,6 @@
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -62,14 +62,11 @@ public void process(Channel channel, Command command) {
stateEvent = createTaskStateEvent(workflowStateEventChangeCommand);
}

try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
stateEvent.getTaskInstanceId());

try (
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC(
stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId())) {
log.info("Received state change command, event: {}", stateEvent);
stateEventResponseService.addStateChangeEvent(stateEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -59,13 +59,11 @@ public void process(Channel channel, Command command) {
.key(taskEventChangeCommand.getKey())
.type(StateEventType.WAKE_UP_TASK_GROUP)
.build();
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
stateEvent.getTaskInstanceId());
try (
LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC(
stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId())) {
log.info("Received task event change command, event: {}", stateEvent);
stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.dolphinscheduler.server.master.processor;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -61,14 +61,12 @@ public void process(Channel channel, Command command) {
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
channel,
taskExecuteResultMessage.getMessageSenderAddress());
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
taskResultEvent.getTaskInstanceId());
try (
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC(
taskResultEvent.getProcessInstanceId(), taskResultEvent.getTaskInstanceId())) {
log.info("Received task execute result, event: {}", taskResultEvent);

taskEventService.addEvent(taskResultEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.dolphinscheduler.server.master.processor;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -56,13 +56,11 @@ public void process(Channel channel, Command command) {
String.format("invalid command type : %s", command.getType()));
TaskRejectCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectCommand.class);
TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(recallCommand.getProcessInstanceId(),
recallCommand.getTaskInstanceId());
try (
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC(
recallCommand.getProcessInstanceId(), recallCommand.getTaskInstanceId())) {
log.info("Receive task recall command: {}", recallCommand);
taskEventService.addEvent(taskEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -74,13 +74,11 @@ public void stop() {
List<StateEvent> remainEvents = new ArrayList<>(eventQueue.size());
eventQueue.drainTo(remainEvents);
for (StateEvent event : remainEvents) {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(),
event.getTaskInstanceId());
try (
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext =
LogUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(),
event.getTaskInstanceId())) {
this.persist(event);

} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
Expand Down Expand Up @@ -112,18 +110,20 @@ protected StateEventResponseWorker() {
public void run() {
log.info("State event loop service started");
while (!ServerLifeCycleManager.isStopped()) {
StateEvent stateEvent;
try {
// if not task , blocking here
StateEvent stateEvent = eventQueue.take();
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
stateEvent.getTaskInstanceId());
persist(stateEvent);
stateEvent = eventQueue.take();
} catch (InterruptedException e) {
log.warn("State event loop service interrupted, will stop this loop", e);
Thread.currentThread().interrupt();
break;
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
try (
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext =
LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
stateEvent.getTaskInstanceId())) {
// if not task , blocking here
persist(stateEvent);
}
}
log.info("State event loop service stopped");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ public void run() {
while (!this.events.isEmpty()) {
// we handle the task event belongs to one task serial, so if the event comes in wrong order,
TaskEvent event = this.events.peek();
try {
LogUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
try (
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils
.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId())) {
log.info("Handle task event begin: {}", event);
taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
events.remove(event);
Expand All @@ -68,8 +69,6 @@ public void run() {
log.error("Handle task event error, get a unknown exception, this event will be removed, event: {}",
event, unknownException);
events.remove(event);
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.StreamTaskInstanceExecCacheManager;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -78,23 +78,23 @@ public void run() {
private void workflowEventHandler() {
for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
LogUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
workflowExecuteThreadPool.executeEvent(workflowExecuteThread);

} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
LogUtils.removeWorkflowInstanceIdMDC();
}
}
}

private void streamTaskEventHandler() {
for (StreamTaskExecuteRunnable streamTaskExecuteRunnable : streamTaskInstanceExecCacheManager.getAll()) {
try {
LoggerUtils.setTaskInstanceIdMDC(streamTaskExecuteRunnable.getTaskInstance().getId());
LogUtils.setTaskInstanceIdMDC(streamTaskExecuteRunnable.getTaskInstance().getId());
streamTaskExecuteThreadPool.executeEvent(streamTaskExecuteRunnable);

} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
LogUtils.removeWorkflowInstanceIdMDC();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
Expand All @@ -43,7 +44,6 @@
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;

import org.apache.commons.collections4.CollectionUtils;

Expand Down Expand Up @@ -180,7 +180,7 @@ public void run() {

processInstances.forEach(processInstance -> {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
LogUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
log.error(
"The workflow instance is already been cached, this case shouldn't be happened");
Expand All @@ -200,7 +200,7 @@ public void run() {
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
LogUtils.removeWorkflowInstanceIdMDC();
}
});
} catch (InterruptedException interruptedException) {
Expand Down
Loading

0 comments on commit fbfeaab

Please sign in to comment.