From d298cb4bf70c6430d69e4f2e3f4144ce7ccae414 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 25 Jun 2024 16:44:59 +0800 Subject: [PATCH 1/2] [feat](job)Internal job cancellation immediately and the strong association with the STARTS parameter For internal tasks, such as MTMV, the start time may already be set, or the time may be adjusted immediately. --- .../java/org/apache/doris/analysis/CreateJobStmt.java | 3 +++ .../apache/doris/job/base/JobExecutionConfiguration.java | 8 +++----- .../java/org/apache/doris/job/base/TimerDefinition.java | 8 +------- .../java/org/apache/doris/job/scheduler/JobScheduler.java | 4 ++++ 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index 367d03fa867b49..eae7efca987e78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -128,6 +128,7 @@ public void analyze(Analyzer analyzer) throws UserException { if (null != onceJobStartTimestamp) { if (onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { jobExecutionConfiguration.setImmediate(true); + timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100L); } else { timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp)); } @@ -149,6 +150,8 @@ public void analyze(Analyzer analyzer) throws UserException { if (null != startsTimeStamp) { if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { jobExecutionConfiguration.setImmediate(true); + //To avoid immediate re-scheduling, set the start time of the timer 100ms before the current time. + timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100L); } else { timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java index 46bc2c71ea2221..301222d5434ea3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java @@ -57,9 +57,7 @@ public void checkParams() { if (executeType == JobExecuteType.INSTANT || executeType == JobExecuteType.MANUAL) { return; } - - checkTimerDefinition(immediate); - + checkTimerDefinition(); if (executeType == JobExecuteType.ONE_TIME) { validateStartTimeMs(); return; @@ -80,12 +78,12 @@ public void checkParams() { } } - private void checkTimerDefinition(boolean immediate) { + private void checkTimerDefinition() { if (timerDefinition == null) { throw new IllegalArgumentException( "timerDefinition cannot be null when executeType is not instant or manual"); } - timerDefinition.checkParams(immediate); + timerDefinition.checkParams(); } private void validateStartTimeMs() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java index bcff4216c6e7cd..9068a18f693e12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java @@ -38,13 +38,7 @@ public class TimerDefinition { private Long latestSchedulerTimeMs; - public void checkParams(boolean immediate) { - if (null != startTimeMs && immediate) { - throw new IllegalArgumentException("startTimeMs must be null when immediate is true"); - } - if (null == startTimeMs && immediate) { - startTimeMs = System.currentTimeMillis(); - } + public void checkParams() { if (null == startTimeMs) { startTimeMs = System.currentTimeMillis() + intervalUnit.getIntervalMs(interval); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 2100511d22bd2f..7f0133bf957e77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -124,6 +124,10 @@ public void scheduleOneJob(T job) throws JobException { schedulerInstantJob(job, TaskType.SCHEDULED, null); } } + if (job.getJobConfig().isImmediate() && JobExecuteType.ONE_TIME.equals(job.getJobConfig().getExecuteType())) { + schedulerInstantJob(job, TaskType.SCHEDULED, null); + return; + } //RECURRING job and immediate is true if (job.getJobConfig().isImmediate()) { job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis()); From 834394b2d65a3cf75c10a52dcd56418495a7d717 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 27 Jun 2024 11:35:26 +0800 Subject: [PATCH 2/2] add ut --- .../main/java/org/apache/doris/analysis/CreateJobStmt.java | 4 ++-- .../apache/doris/job/base/JobExecutionConfigurationTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index eae7efca987e78..8a8db0a3d1eaea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -128,7 +128,7 @@ public void analyze(Analyzer analyzer) throws UserException { if (null != onceJobStartTimestamp) { if (onceJobStartTimestamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { jobExecutionConfiguration.setImmediate(true); - timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100L); + timerDefinition.setStartTimeMs(System.currentTimeMillis()); } else { timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp)); } @@ -151,7 +151,7 @@ public void analyze(Analyzer analyzer) throws UserException { if (startsTimeStamp.equalsIgnoreCase(CURRENT_TIMESTAMP_STRING)) { jobExecutionConfiguration.setImmediate(true); //To avoid immediate re-scheduling, set the start time of the timer 100ms before the current time. - timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100L); + timerDefinition.setStartTimeMs(System.currentTimeMillis()); } else { timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp)); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index 6d01f09c5ea087..24c486baff81bc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -75,8 +75,8 @@ public void testImmediate() { JobExecutionConfiguration configuration = new JobExecutionConfiguration(); configuration.setExecuteType(JobExecuteType.ONE_TIME); configuration.setImmediate(true); - configuration.setImmediate(true); TimerDefinition timerDefinition = new TimerDefinition(); + timerDefinition.setStartTimeMs(0L); configuration.setTimerDefinition(timerDefinition); configuration.checkParams(); }