Skip to content

Commit

Permalink
[fix#12195] Change date to timestamp to support cross diff time zones (
Browse files Browse the repository at this point in the history
…apache#12239)

* fix DS support cross time zone, use timestamp replace date
  • Loading branch information
DarkAssassinator authored and xdu-chenrj committed Oct 13, 2022
1 parent 6c58d64 commit 8e747bd
Show file tree
Hide file tree
Showing 20 changed files with 106 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -646,4 +646,23 @@ public static long toDurationHours(long d) {

}

/**
* transform timeStamp to local date
*
* @param timeStamp time stamp (milliseconds)
* @return local date
*/
public static @Nullable Date timeStampToDate(long timeStamp) {
return timeStamp <= 0L ? null : new Date(timeStamp);
}

/**
* transform date to timeStamp
* @param date date
* @return time stamp (milliseconds)
*/
public static long dateToTimeStamp(Date date) {
return date == null ? 0L : date.getTime();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,41 @@ public void testDateToString() {
String utcNowStr = DateUtils.dateToString(asiaShNow, utc);
Assert.assertEquals(asiaShNowStr, utcNowStr);
}

@Test
public void testDateToTimeStamp() throws ParseException {
// Beijing Date
String timeString = "2022-09-29 21:00:00";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
sdf.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
Date date = sdf.parse(timeString);
long timeStamp = DateUtils.dateToTimeStamp(date);
Assert.assertEquals(1664456400000L, timeStamp);

// Tokyo Date
String tokyoTime = "2022-09-29 22:00:00";
sdf.setTimeZone(TimeZone.getTimeZone("Asia/Tokyo"));
date = sdf.parse(tokyoTime);
timeStamp = DateUtils.dateToTimeStamp(date);
Assert.assertEquals(1664456400000L, timeStamp);

date = null;
Assert.assertEquals(0L, DateUtils.dateToTimeStamp(date));
}

@Test
public void testTimeStampToDate() {
long timeStamp = 1664456400000L;
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
sdf.setTimeZone(TimeZone.getTimeZone("Asia/Shanghai"));
String sd = sdf.format(new Date(timeStamp));
Assert.assertEquals("2022-09-29 21:00:00", sd);

sdf.setTimeZone(TimeZone.getTimeZone("Asia/Tokyo"));
sd = sdf.format(new Date(timeStamp));
Assert.assertEquals("2022-09-29 22:00:00", sd);

Date date = DateUtils.timeStampToDate(0L);
Assert.assertNull(date);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;

import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
Expand Down Expand Up @@ -61,8 +62,8 @@ public static TaskExecutionContextBuilder get() {
public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance) {
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setFirstSubmitTime(taskInstance.getFirstSubmitTime());
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setFirstSubmitTime(DateUtils.dateToTimeStamp(taskInstance.getFirstSubmitTime()));
taskExecutionContext.setStartTime(DateUtils.dateToTimeStamp(taskInstance.getStartTime()));
taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
Expand Down Expand Up @@ -102,7 +103,7 @@ public TaskExecutionContextBuilder buildTaskDefinitionRelatedInfo(TaskDefinition
*/
public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(ProcessInstance processInstance) {
taskExecutionContext.setProcessInstanceId(processInstance.getId());
taskExecutionContext.setScheduleTime(processInstance.getScheduleTime());
taskExecutionContext.setScheduleTime(DateUtils.dateToTimeStamp(processInstance.getScheduleTime()));
taskExecutionContext.setGlobalParams(processInstance.getGlobalParams());
taskExecutionContext.setExecutorId(processInstance.getExecutorId());
taskExecutionContext.setCmdTypeIfComplement(processInstance.getCmdTypeIfComplement().getCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue;

import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
Expand Down Expand Up @@ -110,7 +111,7 @@ public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Chann
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
event.setState(command.getStatus());
event.setStartTime(command.getStartTime());
event.setStartTime(DateUtils.timeStampToDate(command.getStartTime()));
event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath());
event.setAppIds(command.getAppIds());
Expand All @@ -125,10 +126,10 @@ public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
event.setState(TaskExecutionStatus.of(command.getStatus()));
event.setStartTime(command.getStartTime());
event.setStartTime(DateUtils.timeStampToDate(command.getStartTime()));
event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath());
event.setEndTime(command.getEndTime());
event.setEndTime(DateUtils.timeStampToDate(command.getEndTime()));
event.setProcessId(command.getProcessId());
event.setAppIds(command.getAppIds());
event.setVarPool(command.getVarPool());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void before() {
taskExecuteRunningMessage.setExecutePath("/dolphinscheduler/worker");
taskExecuteRunningMessage.setHost("localhost");
taskExecuteRunningMessage.setLogPath("/temp/worker.log");
taskExecuteRunningMessage.setStartTime(new Date());
taskExecuteRunningMessage.setStartTime(System.currentTimeMillis());
taskExecuteRunningMessage.setTaskInstanceId(1);
taskExecuteRunningMessage.setProcessInstanceId(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void before() {
taskExecuteRunningMessage.setExecutePath("path");
taskExecuteRunningMessage.setLogPath("logPath");
taskExecuteRunningMessage.setHost("127.*.*.*");
taskExecuteRunningMessage.setStartTime(new Date());
taskExecuteRunningMessage.setStartTime(System.currentTimeMillis());

ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage,
channel,
Expand All @@ -94,7 +94,7 @@ public void before() {
taskExecuteResultMessage.setProcessInstanceId(1);
taskExecuteResultMessage.setTaskInstanceId(22);
taskExecuteResultMessage.setStatus(TaskExecutionStatus.SUCCESS.getCode());
taskExecuteResultMessage.setEndTime(new Date());
taskExecuteResultMessage.setEndTime(System.currentTimeMillis());
taskExecuteResultMessage.setVarPool("varPol");
taskExecuteResultMessage.setAppIds("ids");
taskExecuteResultMessage.setProcessId(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public TaskExecuteResultCommand(String messageSenderAddress, String messageRecei
/**
* startTime
*/
private Date startTime;
private long startTime;

/**
* host
Expand All @@ -78,7 +78,7 @@ public TaskExecuteResultCommand(String messageSenderAddress, String messageRecei
/**
* end time
*/
private Date endTime;
private long endTime;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.dolphinscheduler.common.utils.JSONUtils;

import java.util.Date;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -49,7 +47,7 @@ public class TaskExecuteRunningCommand extends BaseCommand {
/**
* startTime
*/
private Date startTime;
private long startTime;

/**
* host
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public static String getTaskLogPath(Date firstSubmitTime, Long processDefineCode
* get task log path by TaskExecutionContext
*/
public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
return getTaskLogPath(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getProcessDefineCode(),
return getTaskLogPath(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testGetTaskLogPath() {
taskExecutionContext.setTaskInstanceId(1000);
taskExecutionContext.setProcessDefineCode(1L);
taskExecutionContext.setProcessDefineVersion(1);
taskExecutionContext.setFirstSubmitTime(firstSubmitTime);
taskExecutionContext.setFirstSubmitTime(firstSubmitTime.getTime());

Logger rootLogger = (Logger) LoggerFactory.getILoggerFactory().getLogger("ROOT");
Assert.assertNotNull(rootLogger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.TimeZone;

import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -437,4 +438,15 @@ public static TimeZone getTimezone(String timezoneId) {
public static String getTimestampString() {
return String.valueOf(System.currentTimeMillis());
}

/**
* transform timeStamp to local date
*
* @param timeStamp time stamp (milliseconds)
* @return local date
*/
public static @Nullable
Date timeStampToDate(long timeStamp) {
return timeStamp <= 0L ? null : new Date(timeStamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ private String findVarPool(String line) {
* @return remain time
*/
private long getRemainTime() {
long usedTime = (System.currentTimeMillis() - taskRequest.getStartTime().getTime()) / 1000;
long usedTime = (System.currentTimeMillis() - taskRequest.getStartTime()) / 1000;
long remainTime = taskRequest.getTaskTimeout() - usedTime;

if (remainTime < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;

import java.io.Serializable;
import java.util.Date;
import java.util.Map;

import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -55,12 +54,12 @@ public class TaskExecutionContext implements Serializable {
/**
* task first submit time.
*/
private Date firstSubmitTime;
private long firstSubmitTime;

/**
* task start time
*/
private Date startTime;
private long startTime;

/**
* task type
Expand Down Expand Up @@ -115,7 +114,7 @@ public class TaskExecutionContext implements Serializable {
/**
* process instance schedule time
*/
private Date scheduleTime;
private long scheduleTime;

/**
* process instance global parameters
Expand Down Expand Up @@ -223,7 +222,7 @@ public class TaskExecutionContext implements Serializable {
/**
* endTime
*/
private Date endTime;
private long endTime;

/**
* sql TaskExecutionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void before() throws Exception {
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(UUID.randomUUID().toString());
Mockito.when(taskExecutionContext.getTenantCode()).thenReturn("root");
Mockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
Mockito.when(taskExecutionContext.getStartTime()).thenReturn(System.currentTimeMillis());
Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
Mockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
// Mockito.when(taskExecutionContext.getVarPool())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;

Expand Down Expand Up @@ -483,7 +484,7 @@ private SqlBinds getSqlAndSqlParamsMap(String sql) {

//new
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime());
sql = ParameterUtils.replaceScheduleTime(sql, DateUtils.timeStampToDate(taskExecutionContext.getScheduleTime()));
// special characters need to be escaped, ${} needs to be escaped
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap,taskExecutionContext.getTaskInstanceId());
//Replace the original value in sql !{...} ,Does not participate in precompilation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void process(Channel channel, Command command) {
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));

// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
long remainTime = DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public long getDelay(TimeUnit unit) {
TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
return unit.convert(
DateUtils.getRemainTime(
taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
Expand All @@ -44,6 +45,7 @@
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.service.utils.ProcessUtils;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
Expand Down Expand Up @@ -90,7 +92,7 @@ protected WorkerTaskExecuteRunnable(
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
String taskLogName = LoggerUtils.buildTaskId(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
Expand Down Expand Up @@ -118,7 +120,7 @@ protected void afterThrowing(Throwable throwable) throws TaskException {
cancelTask();
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
taskExecutionContext.setEndTime(System.currentTimeMillis());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info(
"Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}",
Expand Down Expand Up @@ -157,7 +159,7 @@ public void run() {

if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(new Date());
taskExecutionContext.setEndTime(System.currentTimeMillis());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress,
CommandType.TASK_EXECUTE_RESULT);
Expand Down Expand Up @@ -185,7 +187,7 @@ public void run() {
protected void initializeTask() {
logger.info("Begin to initialize task");

Date taskStartTime = new Date();
long taskStartTime = System.currentTimeMillis();
taskExecutionContext.setStartTime(taskStartTime);
logger.info("Set task startTime: {}", taskStartTime);

Expand Down Expand Up @@ -251,7 +253,7 @@ protected void sendAlertIfNeeded() {

protected void sendTaskResult() {
taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
taskExecutionContext.setEndTime(new Date());
taskExecutionContext.setEndTime(System.currentTimeMillis());
taskExecutionContext.setProcessId(task.getProcessId());
taskExecutionContext.setAppIds(task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public TaskExecutionContext getTaskExecutionContext() {
taskExecutionContext.setProcessDefineCode(1L);
taskExecutionContext.setProcessDefineVersion(1);
taskExecutionContext.setTaskType("SQL");
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setFirstSubmitTime(System.currentTimeMillis());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
taskExecutionContext.setHost("localhost");
Expand Down
Loading

0 comments on commit 8e747bd

Please sign in to comment.