Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DS-6829][WorkerServer] skip create log dir and print log in dryRun #6852

Merged
merged 1 commit into from
Nov 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -762,4 +762,5 @@ private Constants() {
* dry run flag
*/
public static final int DRY_RUN_FLAG_NO = 0;
public static final int DRY_RUN_FLAG_YES = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.worker.processor;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
Expand Down Expand Up @@ -135,19 +136,21 @@ public void process(Channel channel, Command command) {
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));

// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);

try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);

try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
} catch (Throwable ex) {
logger.error("create execLocalPath: {}", execLocalPath, ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
} catch (Throwable ex) {
logger.error("create execLocalPath: {}", execLocalPath, ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}

taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,16 @@ public TaskExecuteThread(TaskExecutionContext taskExecutionContext,

@Override
public void run() {

TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
responseCommand.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
return;
}

try {
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// check if the OS user exists
Expand All @@ -146,13 +154,8 @@ public void run() {
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());

int dryRun = taskExecutionContext.getDryRun();
// copy hdfs/minio file to local
if (dryRun == Constants.DRY_RUN_FLAG_NO) {
downloadResource(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources(),
logger);
}
downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger);

taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
Expand All @@ -176,31 +179,28 @@ public void run() {
taskRequest.setTaskLogName(taskLogName);

task = taskChannel.createTask(taskRequest);

// task init
this.task.init();

//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());

if (dryRun == Constants.DRY_RUN_FLAG_NO) {
// task handle
this.task.handle();
// task handle
this.task.handle();

// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo());
}
responseCommand.setStatus(this.task.getExitStatus().getCode());
} else {
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
task.setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo());
}

responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
responseCommand.setAppIds(this.task.getAppIds());
responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {

logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
Expand Down