Skip to content

Commit

Permalink
[FLINK-35551][runtime] Makes default value for maximum rescale trigge…
Browse files Browse the repository at this point in the history
…r delay be dependent on the checkpointing interval.
  • Loading branch information
XComp committed Jun 7, 2024
1 parent 192e0ed commit 9f59f1e
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/all_jobmanager_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>jobmanager.adaptive-scheduler.max-delay-for-rescale-trigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled).</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
<td style="word-wrap: break-word;">1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<td>MemorySize</td>
<td>The size of the write buffer of JobEventStore. The content will be flushed to external file system once the buffer is full</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.max-delay-for-rescale-trigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled).</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
<td style="word-wrap: break-word;">1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>jobmanager.adaptive-scheduler.max-delay-for-rescale-trigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and %dx of the checkpointing interval if checkpointing is enabled).</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
<td style="word-wrap: break-word;">1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public class JobManagerOptions {

public static final MemorySize MIN_JVM_HEAP_SIZE = MemorySize.ofMebiBytes(128);
public static final int FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER = 3;

/**
* The config parameter defining the network address to connect to for communication with the
Expand Down Expand Up @@ -580,11 +581,15 @@ public InlineElement getDescription() {
public static final ConfigOption<Duration> MAXIMUM_DELAY_FOR_RESCALE_TRIGGER =
key("jobmanager.adaptive-scheduler.max-delay-for-rescale-trigger")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"The maximum time the JobManager will wait with evaluating previously observed events for rescaling.")
"The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled "
+ "and %dx of the checkpointing interval if checkpointing is enabled).",
text(
String.valueOf(
FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER)))
.build());

@Documentation.Section({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
Expand Down Expand Up @@ -186,6 +187,12 @@ public class AdaptiveScheduler
public static class Settings {

public static Settings of(Configuration configuration) {
return of(configuration, null);
}

public static Settings of(
Configuration configuration,
@Nullable JobCheckpointingSettings checkpointingConfiguration) {
final SchedulerExecutionMode executionMode =
configuration.get(JobManagerOptions.SCHEDULER_MODE);
Duration allocationTimeoutDefault =
Expand Down Expand Up @@ -215,6 +222,16 @@ public static Settings of(Configuration configuration) {
scalingIntervalMin);
}

final Duration maximumDelayForRescaleTriggerDefault =
checkpointingConfiguration != null
? Duration.ofMillis(
JobManagerOptions
.FACTOR_FOR_DEFAULT_MAXIMUM_DELAY_FOR_RESCALE_TRIGGER
* checkpointingConfiguration
.getCheckpointCoordinatorConfiguration()
.getCheckpointInterval())
: Duration.ZERO;

return new Settings(
executionMode,
configuration
Expand All @@ -227,7 +244,9 @@ public static Settings of(Configuration configuration) {
scalingIntervalMin,
scalingIntervalMax,
configuration.get(MIN_PARALLELISM_INCREASE),
configuration.get(MAXIMUM_DELAY_FOR_RESCALE_TRIGGER));
configuration.get(
MAXIMUM_DELAY_FOR_RESCALE_TRIGGER,
maximumDelayForRescaleTriggerDefault));
}

private final SchedulerExecutionMode executionMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public SchedulerNG createInstance(
partitionTracker);

return new AdaptiveScheduler(
AdaptiveScheduler.Settings.of(jobMasterConfiguration),
AdaptiveScheduler.Settings.of(
jobMasterConfiguration, jobGraph.getCheckpointingSettings()),
jobGraph,
JobResourceRequirements.readFromJobGraph(jobGraph).orElse(null),
jobMasterConfiguration,
Expand Down

0 comments on commit 9f59f1e

Please sign in to comment.