Skip to content

Commit

Permalink
[DS-6829][WorkerServer] skip create log dir and print log in dryRun m…
Browse files Browse the repository at this point in the history
…odel (#6852)

Co-authored-by: caishunfeng <[email protected]>
  • Loading branch information
caishunfeng and caishunfeng authored Nov 21, 2021
1 parent 1b6b526 commit e6239e8
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -766,4 +766,5 @@ private Constants() {
* dry run flag
*/
public static final int DRY_RUN_FLAG_NO = 0;
public static final int DRY_RUN_FLAG_YES = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.worker.processor;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
Expand Down Expand Up @@ -135,19 +136,21 @@ public void process(Channel channel, Command command) {
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));

// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);

try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);

try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
} catch (Throwable ex) {
logger.error("create execLocalPath: {}", execLocalPath, ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
} catch (Throwable ex) {
logger.error("create execLocalPath: {}", execLocalPath, ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}

taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,16 @@ public TaskExecuteThread(TaskExecutionContext taskExecutionContext,

@Override
public void run() {

TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
responseCommand.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
return;
}

try {
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// check if the OS user exists
Expand All @@ -146,13 +154,8 @@ public void run() {
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());

int dryRun = taskExecutionContext.getDryRun();
// copy hdfs/minio file to local
if (dryRun == Constants.DRY_RUN_FLAG_NO) {
downloadResource(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources(),
logger);
}
downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger);

taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
Expand All @@ -177,31 +180,28 @@ public void run() {
taskRequest.setTaskLogName(taskLogName);

task = taskChannel.createTask(taskRequest);

// task init
this.task.init();

//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());

if (dryRun == Constants.DRY_RUN_FLAG_NO) {
// task handle
this.task.handle();
// task handle
this.task.handle();

// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo());
}
responseCommand.setStatus(this.task.getExitStatus().getCode());
} else {
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
task.setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo());
}

responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
responseCommand.setAppIds(this.task.getAppIds());
responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {

logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
Expand Down

0 comments on commit e6239e8

Please sign in to comment.