From 9f59f1e0af9abed25ccd5dba28300cd59356e16b Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 3 Jun 2024 17:55:09 +0200 Subject: [PATCH] [FLINK-35551][runtime] Makes default value for maximum rescale trigger delay be dependent on the checkpointing interval. --- .../generated/all_jobmanager_section.html | 6 ++++++ .../generated/expert_scheduling_section.html | 6 ++++++ .../generated/job_manager_configuration.html | 6 ++++++ .../configuration/JobManagerOptions.java | 9 ++++++-- .../scheduler/adaptive/AdaptiveScheduler.java | 21 ++++++++++++++++++- .../adaptive/AdaptiveSchedulerFactory.java | 3 ++- 6 files changed, 47 insertions(+), 4 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html index 626aa50f386865..9219592aef05a6 100644 --- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html @@ -8,6 +8,12 @@ + +
jobmanager.adaptive-scheduler.max-delay-for-rescale-trigger
+ (none) + Duration + The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). +
jobmanager.adaptive-scheduler.min-parallelism-increase
1 diff --git a/docs/layouts/shortcodes/generated/expert_scheduling_section.html b/docs/layouts/shortcodes/generated/expert_scheduling_section.html index 86682f4fbfd2ce..3d547251af40ee 100644 --- a/docs/layouts/shortcodes/generated/expert_scheduling_section.html +++ b/docs/layouts/shortcodes/generated/expert_scheduling_section.html @@ -86,6 +86,12 @@ MemorySize The size of the write buffer of JobEventStore. The content will be flushed to external file system once the buffer is full + +
jobmanager.adaptive-scheduler.max-delay-for-rescale-trigger
+ (none) + Duration + The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). +
jobmanager.adaptive-scheduler.min-parallelism-increase
1 diff --git a/docs/layouts/shortcodes/generated/job_manager_configuration.html b/docs/layouts/shortcodes/generated/job_manager_configuration.html index c01601a031a342..36e15791f43dd1 100644 --- a/docs/layouts/shortcodes/generated/job_manager_configuration.html +++ b/docs/layouts/shortcodes/generated/job_manager_configuration.html @@ -8,6 +8,12 @@ + +
jobmanager.adaptive-scheduler.max-delay-for-rescale-trigger
+ (none) + Duration + The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled). +
jobmanager.adaptive-scheduler.min-parallelism-increase
1 diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 997484dd19c8be..26aa41830ec2e2 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -39,6 +39,7 @@ public class JobManagerOptions { public static final MemorySize MIN_JVM_HEAP_SIZE = MemorySize.ofMebiBytes(128); + public static final int FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER = 3; /** * The config parameter defining the network address to connect to for communication with the @@ -580,11 +581,15 @@ public InlineElement getDescription() { public static final ConfigOption MAXIMUM_DELAY_FOR_RESCALE_TRIGGER = key("jobmanager.adaptive-scheduler.max-delay-for-rescale-trigger") .durationType() - .defaultValue(Duration.ofMinutes(5)) + .noDefaultValue() .withDescription( Description.builder() .text( - "The maximum time the JobManager will wait with evaluating previously observed events for rescaling.") + "The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled " + + "and %dx of the checkpointing interval if checkpointing is enabled).", + text( + String.valueOf( + FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER))) .build()); @Documentation.Section({ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 683272c3bdb88c..099a1512011b96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -74,6 +74,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmaster.LogicalSlot; @@ -186,6 +187,12 @@ public class AdaptiveScheduler public static class Settings { public static Settings of(Configuration configuration) { + return of(configuration, null); + } + + public static Settings of( + Configuration configuration, + @Nullable JobCheckpointingSettings checkpointingConfiguration) { final SchedulerExecutionMode executionMode = configuration.get(JobManagerOptions.SCHEDULER_MODE); Duration allocationTimeoutDefault = @@ -215,6 +222,16 @@ public static Settings of(Configuration configuration) { scalingIntervalMin); } + final Duration maximumDelayForRescaleTriggerDefault = + checkpointingConfiguration != null + ? Duration.ofMillis( + JobManagerOptions + .FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER + * checkpointingConfiguration + .getCheckpointCoordinatorConfiguration() + .getCheckpointInterval()) + : Duration.ZERO; + return new Settings( executionMode, configuration @@ -227,7 +244,9 @@ public static Settings of(Configuration configuration) { scalingIntervalMin, scalingIntervalMax, configuration.get(MIN_PARALLELISM_INCREASE), - configuration.get(MAXIMUM_DELAY_FOR_RESCALE_TRIGGER)); + configuration.get( + MAXIMUM_DELAY_FOR_RESCALE_TRIGGER, + maximumDelayForRescaleTriggerDefault)); } private final SchedulerExecutionMode executionMode; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java index a2552f1103a1e4..f6e6b6e882cfe8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java @@ -119,7 +119,8 @@ public SchedulerNG createInstance( partitionTracker); return new AdaptiveScheduler( - AdaptiveScheduler.Settings.of(jobMasterConfiguration), + AdaptiveScheduler.Settings.of( + jobMasterConfiguration, jobGraph.getCheckpointingSettings()), jobGraph, JobResourceRequirements.readFromJobGraph(jobGraph).orElse(null), jobMasterConfiguration,