diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index 2275326de35bc7..38e3b2f13dcbc8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; @@ -50,6 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -364,10 +364,10 @@ private CompletableFuture getJobResult( final JobID jobId, final ScheduledExecutor scheduledExecutor, final boolean tolerateMissingResult) { - final Time timeout = - Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); - final Time retryPeriod = - Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis()); + final Duration timeout = + Duration.ofMillis(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); + final Duration retryPeriod = + Duration.ofMillis(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis()); final CompletableFuture jobResultFuture = JobStatusPollingUtils.getJobResult( dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java index e379aa02f0f68a..97bda863b79854 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.dispatcher.DispatcherGateway; @@ -40,6 +39,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -59,7 +59,7 @@ public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway private final ScheduledExecutor retryExecutor; - private final Time timeout; + private final Duration timeout; private final ClassLoader classLoader; @@ -67,7 +67,7 @@ public EmbeddedJobClient( final JobID jobId, final DispatcherGateway dispatcherGateway, final ScheduledExecutor retryExecutor, - final Time rpcTimeout, + final Duration rpcTimeout, final ClassLoader classLoader) { this.jobId = checkNotNull(jobId); this.dispatcherGateway = checkNotNull(dispatcherGateway); @@ -136,7 +136,7 @@ public CompletableFuture> getAccumulators() { public CompletableFuture getJobExecutionResult() { checkNotNull(classLoader); - final Time retryPeriod = Time.milliseconds(100L); + final Duration retryPeriod = Duration.ofMillis(100L); return JobStatusPollingUtils.getJobResult( dispatcherGateway, jobId, retryExecutor, timeout, retryPeriod) .thenApply( diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java index 79168bee80e81b..ddd89fe465b4c3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java @@ -21,11 +21,11 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.concurrent.ScheduledExecutor; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -50,8 +50,8 @@ static CompletableFuture getJobResult( final DispatcherGateway dispatcherGateway, final JobID jobId, final ScheduledExecutor scheduledExecutor, - final Time rpcTimeout, - final Time retryPeriod) { + final Duration rpcTimeout, + final Duration retryPeriod) { return pollJobResultAsync( () -> dispatcherGateway.requestJobStatus(jobId, rpcTimeout), diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java index 408bc69ad31ffd..97c1d86678ea2d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.deployment.executors.PipelineExecutorUtils; @@ -43,6 +42,7 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -136,8 +136,8 @@ private CompletableFuture submitAndGetJobClientFuture( final Configuration configuration, final ClassLoader userCodeClassloader) throws MalformedURLException { - final Time timeout = - Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); + final Duration timeout = + Duration.ofMillis(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader); @@ -191,7 +191,7 @@ private static CompletableFuture submitJob( final Configuration configuration, final DispatcherGateway dispatcherGateway, final JobGraph jobGraph, - final Time rpcTimeout) { + final Duration rpcTimeout) { checkNotNull(jobGraph); LOG.info("Submitting Job with JobId={}.", jobGraph.getJobID()); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java index 790fffb0a1fdc2..e291015140d8cf 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.deployment.application.EmbeddedJobClient; import org.apache.flink.configuration.Configuration; @@ -29,6 +28,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.util.concurrent.ScheduledExecutor; +import java.time.Duration; import java.util.Collection; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -81,8 +81,8 @@ public PipelineExecutor getExecutor(final Configuration configuration) { dispatcherGateway, configuration, (jobId, userCodeClassloader) -> { - final Time timeout = - Time.milliseconds( + final Duration timeout = + Duration.ofMillis( configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); return new EmbeddedJobClient( jobId, dispatcherGateway, retryExecutor, timeout, userCodeClassloader); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 74dcceee40ddc7..36aa9050453d8e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.program.ClusterClient; @@ -300,7 +299,7 @@ public void close() { TimeUnit.MILLISECONDS, retryExecutorService); - this.restClient.shutdown(Time.seconds(5)); + this.restClient.shutdown(Duration.ofSeconds(5)); ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService); try { diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java index c3f8a4e7388654..4d83093a8200c1 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.connector.base.sink; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -29,6 +28,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; + import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration tests of a baseline generic sink that implements the AsyncSinkBase. */ @@ -66,7 +67,7 @@ public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() { @Test public void testThatNoIssuesOccurWhenCheckpointingIsEnabled() throws Exception { env.enableCheckpointing(20); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(200))); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(200))); env.fromSequence(1, 10_000).map(Object::toString).sinkTo(new ArrayListAsyncSink()); env.execute("Integration Test: AsyncSinkBaseITCase"); } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java index 3a5226e62142d0..e16ebb1fe78d75 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.configuration.Configuration; @@ -34,6 +33,8 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.StreamSource; +import java.time.Duration; + /** Tests the functionality of the {@link FileSink} in BATCH mode. */ class BatchExecutionFileSinkITCase extends FileSinkITBase { @@ -49,7 +50,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) { env.configure(config, getClass().getClassLoader()); if (triggerFailover) { - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100))); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100))); } else { env.setRestartStrategy(RestartStrategies.noRestart()); } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java index c20c70491a7ba9..1ea0a7b592217a 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.execution.CheckpointingMode; @@ -39,6 +38,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -82,7 +82,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) { env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE); if (triggerFailover) { - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100))); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100))); } else { env.setRestartStrategy(RestartStrategies.noRestart()); } diff --git a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java index db048af9eedbe0..dc627a0cd55743 100644 --- a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java +++ b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.wikiedits; -import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.testutils.junit.RetryOnFailure; @@ -31,6 +30,7 @@ import java.net.InetSocketAddress; import java.net.Socket; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -53,7 +53,7 @@ class WikipediaEditsSourceTest { @RetryOnFailure(times = 1) void testWikipediaEditsSource() throws Exception { if (canConnect(1, TimeUnit.SECONDS)) { - final Time testTimeout = Time.seconds(60); + final Duration testTimeout = Duration.ofSeconds(60); final WikipediaEditsSource wikipediaEditsSource = new WikipediaEditsSource(); ExecutorService executorService = null; @@ -116,8 +116,8 @@ void testWikipediaEditsSource() throws Exception { } } - private long deadlineNanos(Time testTimeout) { - return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMilliseconds()); + private long deadlineNanos(Duration testTimeout) { + return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMillis()); } private static class CollectingSourceContext implements SourceFunction.SourceContext { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java index b099d944892164..1dd56fdb55aed9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -19,7 +19,6 @@ package org.apache.flink.api.common.restartstrategy; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType; @@ -28,7 +27,6 @@ import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; /** * This class defines methods to generate RestartStrategyConfigurations. These configurations are @@ -81,22 +79,7 @@ public static RestartStrategyConfiguration fallBackRestart() { */ public static RestartStrategyConfiguration fixedDelayRestart( int restartAttempts, long delayBetweenAttempts) { - return fixedDelayRestart( - restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS)); - } - - /** - * Generates a FixedDelayRestartStrategyConfiguration. - * - * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy - * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy - * @return FixedDelayRestartStrategy - * @deprecated Use {@link #fixedDelayRestart(int, Duration)} - */ - @Deprecated - public static RestartStrategyConfiguration fixedDelayRestart( - int restartAttempts, Time delayInterval) { - return fixedDelayRestart(restartAttempts, Time.toDuration(delayInterval)); + return fixedDelayRestart(restartAttempts, Duration.ofMillis(delayBetweenAttempts)); } /** @@ -111,22 +94,6 @@ public static RestartStrategyConfiguration fixedDelayRestart( return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval); } - /** - * Generates a FailureRateRestartStrategyConfiguration. - * - * @param failureRate Maximum number of restarts in given interval {@code failureInterval} - * before failing a job - * @param failureInterval Time interval for failures - * @param delayInterval Delay in-between restart attempts - * @deprecated Use {@link #failureRateRestart(int, Duration, Duration)} - */ - @Deprecated - public static FailureRateRestartStrategyConfiguration failureRateRestart( - int failureRate, Time failureInterval, Time delayInterval) { - return failureRateRestart( - failureRate, Time.toDuration(failureInterval), Time.toDuration(delayInterval)); - } - /** * Generates a FailureRateRestartStrategyConfiguration. * @@ -141,32 +108,6 @@ public static FailureRateRestartStrategyConfiguration failureRateRestart( failureRate, failureInterval, delayInterval); } - /** - * Generates a ExponentialDelayRestartStrategyConfiguration. - * - * @param initialBackoff Starting duration between restarts - * @param maxBackoff The highest possible duration between restarts - * @param backoffMultiplier Delay multiplier how many times is the delay longer than before - * @param resetBackoffThreshold How long the job must run smoothly to reset the time interval - * @param jitterFactor How much the delay may differ (in percentage) - * @deprecated Use {@link #exponentialDelayRestart(Duration, Duration, double, Duration, - * double)} - */ - @Deprecated - public static ExponentialDelayRestartStrategyConfiguration exponentialDelayRestart( - Time initialBackoff, - Time maxBackoff, - double backoffMultiplier, - Time resetBackoffThreshold, - double jitterFactor) { - return exponentialDelayRestart( - Time.toDuration(initialBackoff), - Time.toDuration(maxBackoff), - backoffMultiplier, - Time.toDuration(resetBackoffThreshold), - jitterFactor); - } - /** * Generates a ExponentialDelayRestartStrategyConfiguration. * @@ -249,12 +190,6 @@ public int getRestartAttempts() { return restartAttempts; } - /** @deprecated Use {@link #getDurationBetweenAttempts()} */ - @Deprecated - public Time getDelayBetweenAttemptsInterval() { - return Time.fromDuration(getDurationBetweenAttempts()); - } - public Duration getDurationBetweenAttempts() { return delayBetweenAttemptsInterval; } @@ -303,26 +238,6 @@ public static final class ExponentialDelayRestartStrategyConfiguration private final Duration resetBackoffThreshold; private final double jitterFactor; - /** - * @deprecated Use {@link - * ExponentialDelayRestartStrategyConfiguration#ExponentialDelayRestartStrategyConfiguration(Duration, - * Duration, double, Duration, double)} - */ - @Deprecated - public ExponentialDelayRestartStrategyConfiguration( - Time initialBackoff, - Time maxBackoff, - double backoffMultiplier, - Time resetBackoffThreshold, - double jitterFactor) { - this( - Time.toDuration(initialBackoff), - Time.toDuration(maxBackoff), - backoffMultiplier, - Time.toDuration(resetBackoffThreshold), - jitterFactor); - } - public ExponentialDelayRestartStrategyConfiguration( Duration initialBackoff, Duration maxBackoff, @@ -336,22 +251,10 @@ public ExponentialDelayRestartStrategyConfiguration( this.jitterFactor = jitterFactor; } - /** @deprecated Use {@link #getInitialBackoffDuration()} */ - @Deprecated - public Time getInitialBackoff() { - return Time.fromDuration(getInitialBackoffDuration()); - } - public Duration getInitialBackoffDuration() { return initialBackoff; } - /** @deprecated Use {@link #getMaxBackoffDuration()} */ - @Deprecated - public Time getMaxBackoff() { - return Time.fromDuration(maxBackoff); - } - public Duration getMaxBackoffDuration() { return maxBackoff; } @@ -360,12 +263,6 @@ public double getBackoffMultiplier() { return backoffMultiplier; } - /** @deprecated Use {@link #getResetBackoffDurationThreshold()} */ - @Deprecated - public Time getResetBackoffThreshold() { - return Time.fromDuration(resetBackoffThreshold); - } - public Duration getResetBackoffDurationThreshold() { return resetBackoffThreshold; } @@ -424,18 +321,6 @@ public static final class FailureRateRestartStrategyConfiguration private final Duration failureInterval; private final Duration delayBetweenAttemptsInterval; - /** - * @deprecated Use {@link #FailureRateRestartStrategyConfiguration(int, Duration, Duration)} - */ - @Deprecated - public FailureRateRestartStrategyConfiguration( - int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) { - this( - maxFailureRate, - Time.toDuration(failureInterval), - Time.toDuration(delayBetweenAttemptsInterval)); - } - public FailureRateRestartStrategyConfiguration( int maxFailureRate, Duration failureInterval, @@ -449,22 +334,10 @@ public int getMaxFailureRate() { return maxFailureRate; } - /** @deprecated Use {@link #getFailureIntervalDuration()} */ - @Deprecated - public Time getFailureInterval() { - return Time.fromDuration(getFailureIntervalDuration()); - } - public Duration getFailureIntervalDuration() { return failureInterval; } - /** @deprecated Use {@link #getDurationBetweenAttempts()} */ - @Deprecated - public Time getDelayBetweenAttemptsInterval() { - return Time.fromDuration(getDurationBetweenAttempts()); - } - public Duration getDurationBetweenAttempts() { return delayBetweenAttemptsInterval; } @@ -594,10 +467,7 @@ private static RestartStrategyConfiguration parseConfiguration( Duration failureRateDelay = configuration.get( RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY); - return failureRateRestart( - maxFailures, - Time.milliseconds(failureRateInterval.toMillis()), - Time.milliseconds(failureRateDelay.toMillis())); + return failureRateRestart(maxFailures, failureRateInterval, failureRateDelay); default: throw new IllegalArgumentException( "Unknown restart strategy " + restartstrategyKind + "."); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java index d035df43a64a4f..8c80c1f5b4cb5e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SerializerConfigImpl; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.GenericTypeInfo; @@ -37,6 +36,7 @@ import org.junit.jupiter.api.Test; import java.io.Serializable; +import java.time.Duration; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -304,7 +304,7 @@ void testNotOverridingRegisteredPojoTypesWithDefaultsFromConfiguration() { void testNotOverridingRestartStrategiesWithDefaultsFromConfiguration() { ExecutionConfig config = new ExecutionConfig(); RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = - RestartStrategies.fixedDelayRestart(10, Time.minutes(2)); + RestartStrategies.fixedDelayRestart(10, Duration.ofMinutes(2)); config.setRestartStrategy(restartStrategyConfiguration); // mutate config according to configuration diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java index e8e2449b91c172..584efd48c218b8 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/TestingUtils.java @@ -18,7 +18,6 @@ package org.apache.flink.testutils; -import org.apache.flink.api.common.time.Time; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.testutils.executor.TestExecutorResource; @@ -41,11 +40,11 @@ public class TestingUtils { private static final UUID ZERO_UUID = new UUID(0L, 0L); public static final Duration TESTING_DURATION = Duration.ofMinutes(2L); - public static final Time TIMEOUT = Time.minutes(1L); + public static final Duration TIMEOUT = Duration.ofMinutes(1L); public static final Duration DEFAULT_ASK_TIMEOUT = Duration.ofSeconds(200); - public static Time infiniteTime() { - return Time.milliseconds(Integer.MAX_VALUE); + public static Duration infiniteTime() { + return Duration.ofMillis(Integer.MAX_VALUE); } public static Duration infiniteDuration() { diff --git a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java index 56b1a81881b6af..f759f60e874bf7 100644 --- a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java +++ b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.concurrent.FutureUtils; @@ -46,7 +45,7 @@ public class RpcUtils { * private method {@code checkMaxDelay()} in {@link * org.apache.pekko.actor.LightArrayRevolverScheduler}. */ - public static final Time INF_TIMEOUT = Time.seconds(21474835); + public static final Duration INF_TIMEOUT = Duration.ofSeconds(21474835); public static final Duration INF_DURATION = Duration.ofSeconds(21474835); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index bc6fb2be71372a..e81b5fba107ab9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.operators.ResourceSpec; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.CheckpointingOptions; @@ -198,7 +197,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint private final ResourceCleaner localResourceCleaner; private final ResourceCleaner globalResourceCleaner; - private final Time webTimeout; + private final Duration webTimeout; private final Map jobClientExpiredTimestamp = new HashMap<>(); private final Map uninitializedJobClientHeartbeatTimeout = new HashMap<>(); @@ -327,7 +326,7 @@ protected Dispatcher( this.globalResourceCleaner = resourceCleanerFactory.createGlobalResourceCleaner(this.getMainThreadExecutor()); - this.webTimeout = Time.fromDuration(configuration.get(WebOptions.TIMEOUT)); + this.webTimeout = configuration.get(WebOptions.TIMEOUT); this.jobClientAlivenessCheckInterval = configuration.get(CLIENT_ALIVENESS_CHECK_DURATION).toMillis(); @@ -515,7 +514,7 @@ private void stopDispatcherServices() throws Exception { // ------------------------------------------------------ @Override - public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) { + public CompletableFuture submitJob(JobGraph jobGraph, Duration timeout) { final JobID jobID = jobGraph.getJobID(); try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobID))) { log.info("Received JobGraph submission '{}' ({}).", jobGraph.getName(), jobID); @@ -784,13 +783,13 @@ private CleanupJobState jobManagerRunnerFailed( } @Override - public CompletableFuture> listJobs(Time timeout) { + public CompletableFuture> listJobs(Duration timeout) { return CompletableFuture.completedFuture( Collections.unmodifiableSet(jobManagerRunnerRegistry.getRunningJobIds())); } @Override - public CompletableFuture disposeSavepoint(String savepointPath, Time timeout) { + public CompletableFuture disposeSavepoint(String savepointPath, Duration timeout) { final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); return CompletableFuture.supplyAsync( @@ -814,7 +813,7 @@ public CompletableFuture disposeSavepoint(String savepointPath, Tim } @Override - public CompletableFuture cancelJob(JobID jobId, Time timeout) { + public CompletableFuture cancelJob(JobID jobId, Duration timeout) { Optional maybeJob = getJobManagerRunner(jobId); if (maybeJob.isPresent()) { @@ -837,7 +836,7 @@ public CompletableFuture cancelJob(JobID jobId, Time timeout) { } @Override - public CompletableFuture requestClusterOverview(Time timeout) { + public CompletableFuture requestClusterOverview(Duration timeout) { CompletableFuture taskManagerOverviewFuture = runResourceManagerCommand( resourceManagerGateway -> @@ -865,7 +864,7 @@ public CompletableFuture requestClusterOverview(Time timeout) { } @Override - public CompletableFuture requestMultipleJobDetails(Time timeout) { + public CompletableFuture requestMultipleJobDetails(Duration timeout) { List>> individualOptionalJobDetails = queryJobMastersForInformation( jobManagerRunner -> jobManagerRunner.requestJobDetails(timeout)); @@ -891,7 +890,7 @@ public CompletableFuture requestMultipleJobDetails(Time tim } @Override - public CompletableFuture requestJobStatus(JobID jobId, Time timeout) { + public CompletableFuture requestJobStatus(JobID jobId, Duration timeout) { Optional maybeJob = getJobManagerRunner(jobId); return maybeJob.map(job -> job.requestJobStatus(timeout)) .orElseGet( @@ -910,7 +909,7 @@ public CompletableFuture requestJobStatus(JobID jobId, Time timeout) @Override public CompletableFuture requestExecutionGraphInfo( - JobID jobId, Time timeout) { + JobID jobId, Duration timeout) { Optional maybeJob = getJobManagerRunner(jobId); return maybeJob.map(job -> job.requestJob(timeout)) .orElse(FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId))) @@ -929,7 +928,7 @@ private ExecutionGraphInfo getExecutionGraphInfoFromStore(Throwable t, JobID job @Override public CompletableFuture requestCheckpointStats( - JobID jobId, Time timeout) { + JobID jobId, Duration timeout) { return performOperationOnJobMasterGateway( jobId, gateway -> gateway.requestCheckpointStats(timeout)) .exceptionally( @@ -940,7 +939,7 @@ public CompletableFuture requestCheckpointStats( } @Override - public CompletableFuture requestJobResult(JobID jobId, Time timeout) { + public CompletableFuture requestJobResult(JobID jobId, Duration timeout) { if (!jobManagerRunnerRegistry.isRegistered(jobId)) { final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); @@ -964,7 +963,8 @@ public CompletableFuture requestJobResult(JobID jobId, Time timeout) } @Override - public CompletableFuture> requestMetricQueryServiceAddresses(Time timeout) { + public CompletableFuture> requestMetricQueryServiceAddresses( + Duration timeout) { if (metricServiceQueryAddress != null) { return CompletableFuture.completedFuture( Collections.singleton(metricServiceQueryAddress)); @@ -975,7 +975,7 @@ public CompletableFuture> requestMetricQueryServiceAddresses( @Override public CompletableFuture>> - requestTaskManagerMetricQueryServiceAddresses(Time timeout) { + requestTaskManagerMetricQueryServiceAddresses(Duration timeout) { return runResourceManagerCommand( resourceManagerGateway -> resourceManagerGateway.requestTaskManagerMetricQueryServiceAddresses( @@ -983,25 +983,27 @@ public CompletableFuture> requestMetricQueryServiceAddresses( } @Override - public CompletableFuture requestThreadDump(Time timeout) { + public CompletableFuture requestThreadDump(Duration timeout) { int stackTraceMaxDepth = configuration.get(ClusterOptions.THREAD_DUMP_STACKTRACE_MAX_DEPTH); return CompletableFuture.completedFuture(ThreadDumpInfo.dumpAndCreate(stackTraceMaxDepth)); } @Override - public CompletableFuture getBlobServerPort(Time timeout) { + public CompletableFuture getBlobServerPort(Duration timeout) { return CompletableFuture.completedFuture(blobServer.getPort()); } @Override - public CompletableFuture triggerCheckpoint(JobID jobID, Time timeout) { + public CompletableFuture triggerCheckpoint(JobID jobID, Duration timeout) { return performOperationOnJobMasterGateway( jobID, gateway -> gateway.triggerCheckpoint(timeout)); } @Override public CompletableFuture triggerCheckpoint( - AsynchronousJobOperationKey operationKey, CheckpointType checkpointType, Time timeout) { + AsynchronousJobOperationKey operationKey, + CheckpointType checkpointType, + Duration timeout) { return dispatcherCachedOperationsHandler.triggerCheckpoint( operationKey, checkpointType, timeout); } @@ -1014,7 +1016,7 @@ public CompletableFuture> getTriggeredCheckpointStatus( @Override public CompletableFuture triggerCheckpointAndGetCheckpointID( - final JobID jobID, final CheckpointType checkpointType, final Time timeout) { + final JobID jobID, final CheckpointType checkpointType, final Duration timeout) { return performOperationOnJobMasterGateway( jobID, gateway -> @@ -1028,7 +1030,7 @@ public CompletableFuture triggerSavepoint( final String targetDirectory, SavepointFormatType formatType, final TriggerSavepointMode savepointMode, - final Time timeout) { + final Duration timeout) { return dispatcherCachedOperationsHandler.triggerSavepoint( operationKey, targetDirectory, formatType, savepointMode, timeout); } @@ -1039,7 +1041,7 @@ public CompletableFuture triggerSavepointAndGetLocation( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - Time timeout) { + Duration timeout) { return performOperationOnJobMasterGateway( jobId, gateway -> @@ -1062,7 +1064,7 @@ public CompletableFuture stopWithSavepoint( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - final Time timeout) { + final Duration timeout) { return dispatcherCachedOperationsHandler.stopWithSavepoint( operationKey, targetDirectory, formatType, savepointMode, timeout); } @@ -1073,7 +1075,7 @@ public CompletableFuture stopWithSavepointAndGetLocation( final String targetDirectory, final SavepointFormatType formatType, final TriggerSavepointMode savepointMode, - final Time timeout) { + final Duration timeout) { return performOperationOnJobMasterGateway( jobId, gateway -> @@ -1101,7 +1103,7 @@ public CompletableFuture deliverCoordinationRequestToCoord JobID jobId, OperatorID operatorId, SerializedValue serializedRequest, - Time timeout) { + Duration timeout) { return performOperationOnJobMasterGateway( jobId, gateway -> @@ -1111,7 +1113,7 @@ public CompletableFuture deliverCoordinationRequestToCoord @Override public CompletableFuture reportJobClientHeartbeat( - JobID jobId, long expiredTimestamp, Time timeout) { + JobID jobId, long expiredTimestamp, Duration timeout) { if (!getJobManagerRunner(jobId).isPresent()) { log.warn("Fail to find job {} for client.", jobId); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java index 046e51ec404d80..5b3a6e8991cc99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandler.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.messages.Acknowledge; @@ -28,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey; import org.apache.flink.util.concurrent.FutureUtils; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -76,7 +76,9 @@ public class DispatcherCachedOperationsHandler { } public CompletableFuture triggerCheckpoint( - AsynchronousJobOperationKey operationKey, CheckpointType checkpointType, Time timeout) { + AsynchronousJobOperationKey operationKey, + CheckpointType checkpointType, + Duration timeout) { if (!checkpointTriggerCache.containsOperation(operationKey)) { checkpointTriggerCache.registerOngoingOperation( @@ -103,7 +105,7 @@ public CompletableFuture triggerSavepoint( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - Time timeout) { + Duration timeout) { return registerOperationIdempotently( operationKey, () -> @@ -120,7 +122,7 @@ public CompletableFuture stopWithSavepoint( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - Time timeout) { + Duration timeout) { return registerOperationIdempotently( operationKey, () -> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java index ccb5f037d5a60d..c04fbc708d80ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -29,6 +28,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import java.time.Duration; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -42,7 +42,7 @@ public interface DispatcherGateway extends FencedRpcGateway, Restf * @param timeout RPC timeout * @return A future acknowledge if the submission succeeded */ - CompletableFuture submitJob(JobGraph jobGraph, @RpcTimeout Time timeout); + CompletableFuture submitJob(JobGraph jobGraph, @RpcTimeout Duration timeout); CompletableFuture submitFailedJob( JobID jobId, String jobName, Throwable exception); @@ -53,7 +53,7 @@ CompletableFuture submitFailedJob( * @param timeout RPC timeout * @return A future collection of currently submitted jobs */ - CompletableFuture> listJobs(@RpcTimeout Time timeout); + CompletableFuture> listJobs(@RpcTimeout Duration timeout); /** * Returns the port of the blob server. @@ -61,13 +61,13 @@ CompletableFuture submitFailedJob( * @param timeout of the operation * @return A future integer of the blob server port */ - CompletableFuture getBlobServerPort(@RpcTimeout Time timeout); + CompletableFuture getBlobServerPort(@RpcTimeout Duration timeout); default CompletableFuture shutDownCluster(ApplicationStatus applicationStatus) { return shutDownCluster(); } - default CompletableFuture triggerCheckpoint(JobID jobID, @RpcTimeout Time timeout) { + default CompletableFuture triggerCheckpoint(JobID jobID, @RpcTimeout Duration timeout) { throw new UnsupportedOperationException(); } @@ -87,7 +87,7 @@ default CompletableFuture triggerSavepointAndGetLocation( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - @RpcTimeout Time timeout) { + @RpcTimeout Duration timeout) { throw new UnsupportedOperationException(); } @@ -106,7 +106,7 @@ default CompletableFuture stopWithSavepointAndGetLocation( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - @RpcTimeout final Time timeout) { + @RpcTimeout final Duration timeout) { throw new UnsupportedOperationException(); } @@ -120,7 +120,7 @@ default CompletableFuture stopWithSavepointAndGetLocation( * @return Future which is completed once the operation is triggered successfully */ default CompletableFuture triggerCheckpointAndGetCheckpointID( - final JobID jobId, final CheckpointType checkpointType, final Time timeout) { + final JobID jobId, final CheckpointType checkpointType, final Duration timeout) { throw new UnsupportedOperationException(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java index a874972f91c0cc..1da39a5fa833cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerCheckpointFunction.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.CheckpointType; +import java.time.Duration; import java.util.concurrent.CompletableFuture; /** @@ -30,5 +30,5 @@ */ @FunctionalInterface public interface TriggerCheckpointFunction { - CompletableFuture apply(JobID jobId, CheckpointType checkpointType, Time timeout); + CompletableFuture apply(JobID jobId, CheckpointType checkpointType, Duration timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointFunction.java index f5bc0ebd1abec4..f63149d4b70610 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointFunction.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.SavepointFormatType; +import java.time.Duration; import java.util.concurrent.CompletableFuture; /** @@ -35,5 +35,5 @@ CompletableFuture apply( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - Time timeout); + Duration timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 5dfeb4ab8b647e..e44c4ff44f479e 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.entrypoint; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.ConfigOption; @@ -93,6 +92,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.UUID; @@ -124,7 +124,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro protected static final int STARTUP_FAILURE_RETURN_CODE = 1; protected static final int RUNTIME_FAILURE_RETURN_CODE = 2; - private static final Time INITIALIZATION_SHUTDOWN_TIMEOUT = Time.seconds(30L); + private static final Duration INITIALIZATION_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30L); /** The lock to guard startup / shutdown / manipulation methods. */ private final Object lock = new Object(); @@ -252,9 +252,7 @@ public void startCluster() throws ClusterEntrypointException { ShutdownBehaviour.GRACEFUL_SHUTDOWN, ExceptionUtils.stringifyException(strippedThrowable), false) - .get( - INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), - TimeUnit.MILLISECONDS); + .get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { strippedThrowable.addSuppressed(e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index d50d45a734dd43..bf7df41b339b3d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent; import org.apache.flink.core.execution.JobStatusChangedListener; @@ -102,6 +101,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -184,7 +184,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG private final long[] stateTimestamps; /** The timeout for all messages that require a response/acknowledgement. */ - private final Time rpcTimeout; + private final Duration rpcTimeout; /** The classloader for the user code. Needed for calls into user code classes. */ private final ClassLoader userClassLoader; @@ -315,7 +315,7 @@ public DefaultExecutionGraph( JobInformation jobInformation, ScheduledExecutorService futureExecutor, Executor ioExecutor, - Time rpcTimeout, + Duration rpcTimeout, int executionHistorySizeLimit, ClassLoader userClassLoader, BlobWriter blobWriter, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index 7ba3e91929a77f..a222404a471e6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -58,6 +58,7 @@ import org.slf4j.Logger; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -82,7 +83,7 @@ public static DefaultExecutionGraph buildGraph( CompletedCheckpointStore completedCheckpointStore, CheckpointsCleaner checkpointsCleaner, CheckpointIDCounter checkpointIdCounter, - Time rpcTimeout, + Duration rpcTimeout, BlobWriter blobWriter, Logger log, ShuffleMaster shuffleMaster, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 46769a45f8906c..566bbe949e62b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; @@ -63,6 +62,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -140,7 +140,7 @@ public class Execution */ private final long[] stateEndTimestamps; - private final Time rpcTimeout; + private final Duration rpcTimeout; private final Collection partitionInfos; @@ -206,7 +206,7 @@ public Execution( ExecutionVertex vertex, int attemptNumber, long startTimestamp, - Time rpcTimeout) { + Duration rpcTimeout) { this.executor = checkNotNull(executor); this.vertex = checkNotNull(vertex); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index f9c77f19e947a8..2521914c872dce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.MemorySize; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; @@ -61,6 +60,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -187,7 +187,7 @@ public ExecutionJobVertex( protected void initialize( int executionHistorySizeLimit, - Time timeout, + Duration timeout, long createTimestamp, SubtaskAttemptNumberStore initialAttemptCounts) throws JobException { @@ -271,7 +271,7 @@ protected ExecutionVertex createExecutionVertex( ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets, - Time timeout, + Duration timeout, long createTimestamp, int executionHistorySizeLimit, int initialAttemptCount) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 28f1d745e3732d..13e3fb3564ad6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.runtime.JobException; @@ -39,6 +38,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -74,7 +74,7 @@ public class ExecutionVertex final ExecutionHistory executionHistory; - private final Time timeout; + private final Duration timeout; /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */ private final String taskNameWithSubtask; @@ -110,7 +110,7 @@ public ExecutionVertex( ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets, - Time timeout, + Duration timeout, long createTimestamp, int executionHistorySizeLimit, int initialAttemptCount) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java index 7ec9405f6244a4..78c53f17390159 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmanager.slots; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -33,6 +32,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway; import org.apache.flink.util.SerializedValue; +import java.time.Duration; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -53,7 +53,7 @@ public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway { * @param timeout of the submit operation * @return Future acknowledge of the successful operation */ - CompletableFuture submitTask(TaskDeploymentDescriptor tdd, Time timeout); + CompletableFuture submitTask(TaskDeploymentDescriptor tdd, Duration timeout); /** * Cancel the given task. @@ -62,7 +62,8 @@ public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway { * @param timeout of the submit operation * @return Future acknowledge if the task is successfully canceled */ - CompletableFuture cancelTask(ExecutionAttemptID executionAttemptID, Time timeout); + CompletableFuture cancelTask( + ExecutionAttemptID executionAttemptID, Duration timeout); /** * Update the task where the given partitions can be found. @@ -75,7 +76,7 @@ public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway { CompletableFuture updatePartitions( ExecutionAttemptID executionAttemptID, Iterable partitionInfos, - Time timeout); + Duration timeout); /** * Batch release intermediate result partitions. @@ -144,7 +145,9 @@ CompletableFuture triggerCheckpoint( * @return Future acknowledge which is returned once the slot has been freed */ CompletableFuture freeSlot( - final AllocationID allocationId, final Throwable cause, @RpcTimeout final Time timeout); + final AllocationID allocationId, + final Throwable cause, + @RpcTimeout final Duration timeout); @Override CompletableFuture sendOperatorEventToTask( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index cc8373f3fa5fb0..4e71e781e760ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -20,12 +20,12 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.util.AutoCloseableAsync; +import java.time.Duration; import java.util.concurrent.CompletableFuture; /** Interface for a runner which executes a {@link JobMaster}. */ @@ -70,7 +70,7 @@ public interface JobManagerRunner extends AutoCloseableAsync { * @param timeout of this operation * @return Future acknowledge of the operation */ - CompletableFuture cancel(Time timeout); + CompletableFuture cancel(Duration timeout); /** * Requests the current job status. @@ -78,7 +78,7 @@ public interface JobManagerRunner extends AutoCloseableAsync { * @param timeout for the rpc call * @return Future containing the current job status */ - CompletableFuture requestJobStatus(Time timeout); + CompletableFuture requestJobStatus(Duration timeout); /** * Request the details of the executed job. @@ -86,7 +86,7 @@ public interface JobManagerRunner extends AutoCloseableAsync { * @param timeout for the rpc call * @return Future details of the executed job */ - CompletableFuture requestJobDetails(Time timeout); + CompletableFuture requestJobDetails(Duration timeout); /** * Requests the {@link ExecutionGraphInfo} of the executed job. @@ -94,7 +94,7 @@ public interface JobManagerRunner extends AutoCloseableAsync { * @param timeout for the rpc call * @return Future which is completed with the {@link ExecutionGraphInfo} of the executed job */ - CompletableFuture requestJob(Time timeout); + CompletableFuture requestJob(Duration timeout); /** * Flag indicating if the JobManagerRunner has been initialized. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 8fb28695d2ec2b..f0278c48eb2104 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -165,7 +165,7 @@ public class JobMaster extends FencedRpcEndpoint private final JobGraph jobGraph; - private final Time rpcTimeout; + private final Duration rpcTimeout; private final HighAvailabilityServices highAvailabilityServices; @@ -710,7 +710,9 @@ public CompletableFuture notifyKvStateUnregistered( @Override public CompletableFuture> offerSlots( - final ResourceID taskManagerId, final Collection slots, final Time timeout) { + final ResourceID taskManagerId, + final Collection slots, + final Duration timeout) { TaskManagerRegistration taskManagerRegistration = registeredTaskManagers.get(taskManagerId); @@ -773,7 +775,7 @@ private void releaseEmptyTaskManager(ResourceID resourceId) { public CompletableFuture registerTaskManager( final JobID jobId, final TaskManagerRegistrationInformation taskManagerRegistrationInformation, - final Time timeout) { + final Duration timeout) { if (!jobGraph.getJobID().equals(jobId)) { log.debug( @@ -919,7 +921,7 @@ public CompletableFuture requestCheckpointStats(Time ti @Override public CompletableFuture triggerCheckpoint( - final CheckpointType checkpointType, final Time timeout) { + final CheckpointType checkpointType, final Duration timeout) { return schedulerNG.triggerCheckpoint(checkpointType); } @@ -928,7 +930,7 @@ public CompletableFuture triggerSavepoint( @Nullable final String targetDirectory, final boolean cancelJob, final SavepointFormatType formatType, - final Time timeout) { + final Duration timeout) { return schedulerNG.triggerSavepoint(targetDirectory, cancelJob, formatType); } @@ -938,7 +940,7 @@ public CompletableFuture stopWithSavepoint( @Nullable final String targetDirectory, final SavepointFormatType formatType, final boolean terminate, - final Time timeout) { + final Duration timeout) { return schedulerNG.stopWithSavepoint(targetDirectory, terminate, formatType); } @@ -976,7 +978,7 @@ public CompletableFuture updateGlobalAggregate( public CompletableFuture deliverCoordinationRequestToCoordinator( OperatorID operatorId, SerializedValue serializedRequest, - Time timeout) { + Duration timeout) { return this.sendRequestToCoordinator(operatorId, serializedRequest); } @@ -1539,7 +1541,7 @@ protected CompletableFuture invokeRegistration( ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) { - Time timeout = Time.milliseconds(timeoutMillis); + Duration timeout = Duration.ofMillis(timeoutMillis); return gateway.registerJobMaster( jobMasterId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java index 956c138043d2c8..04b3d52f24ed1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.JobManagerOptions; @@ -26,12 +25,14 @@ import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; import org.apache.flink.util.Preconditions; +import java.time.Duration; + /** Configuration for the {@link JobMaster}. */ public class JobMasterConfiguration { - private final Time rpcTimeout; + private final Duration rpcTimeout; - private final Time slotRequestTimeout; + private final Duration slotRequestTimeout; private final String tmpDirectory; @@ -40,8 +41,8 @@ public class JobMasterConfiguration { private final Configuration configuration; public JobMasterConfiguration( - Time rpcTimeout, - Time slotRequestTimeout, + Duration rpcTimeout, + Duration slotRequestTimeout, String tmpDirectory, RetryingRegistrationConfiguration retryingRegistrationConfiguration, Configuration configuration) { @@ -52,11 +53,11 @@ public JobMasterConfiguration( this.configuration = Preconditions.checkNotNull(configuration); } - public Time getRpcTimeout() { + public Duration getRpcTimeout() { return rpcTimeout; } - public Time getSlotRequestTimeout() { + public Duration getSlotRequestTimeout() { return slotRequestTimeout; } @@ -74,11 +75,10 @@ public Configuration getConfiguration() { public static JobMasterConfiguration fromConfiguration(Configuration configuration) { - final Time rpcTimeout = - Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); + final Duration rpcTimeout = configuration.get(RpcOptions.ASK_TIMEOUT_DURATION); - final Time slotRequestTimeout = - Time.fromDuration(configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT)); + final Duration slotRequestTimeout = + configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT); final String tmpDirectory = ConfigurationUtils.parseTempDirectories(configuration)[0]; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 38772fbadae4c5..968904a3408457 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.blocklist.BlocklistListener; @@ -76,7 +75,7 @@ public interface JobMasterGateway * @param timeout of this operation * @return Future acknowledge of the operation */ - CompletableFuture cancel(@RpcTimeout Time timeout); + CompletableFuture cancel(@RpcTimeout Duration timeout); /** * Updates the task execution state for a given task. @@ -141,7 +140,7 @@ void disconnectResourceManager( CompletableFuture> offerSlots( final ResourceID taskManagerId, final Collection slots, - @RpcTimeout final Time timeout); + @RpcTimeout final Duration timeout); /** * Fails the slot with the given allocation id and cause. @@ -166,7 +165,7 @@ void failSlot( CompletableFuture registerTaskManager( final JobID jobId, final TaskManagerRegistrationInformation taskManagerRegistrationInformation, - @RpcTimeout final Time timeout); + @RpcTimeout final Duration timeout); /** * Sends the heartbeat to job manager from task manager. @@ -192,7 +191,7 @@ CompletableFuture heartbeatFromTaskManager( * @param timeout for the rpc call * @return Future containing the current job status */ - CompletableFuture requestJobStatus(@RpcTimeout Time timeout); + CompletableFuture requestJobStatus(@RpcTimeout Duration timeout); /** * Requests the {@link ExecutionGraphInfo} of the executed job. @@ -200,7 +199,7 @@ CompletableFuture heartbeatFromTaskManager( * @param timeout for the rpc call * @return Future which is completed with the {@link ExecutionGraphInfo} of the executed job */ - CompletableFuture requestJob(@RpcTimeout Time timeout); + CompletableFuture requestJob(@RpcTimeout Duration timeout); /** * Requests the {@link CheckpointStatsSnapshot} of the job. @@ -208,7 +207,7 @@ CompletableFuture heartbeatFromTaskManager( * @param timeout for the rpc call * @return Future which is completed with the {@link CheckpointStatsSnapshot} of the job */ - CompletableFuture requestCheckpointStats(@RpcTimeout Time timeout); + CompletableFuture requestCheckpointStats(@RpcTimeout Duration timeout); /** * Triggers taking a savepoint of the executed job. @@ -223,7 +222,7 @@ CompletableFuture triggerSavepoint( @Nullable final String targetDirectory, final boolean cancelJob, final SavepointFormatType formatType, - @RpcTimeout final Time timeout); + @RpcTimeout final Duration timeout); /** * Triggers taking a checkpoint of the executed job. @@ -233,7 +232,7 @@ CompletableFuture triggerSavepoint( * @return Future which is completed with the CompletedCheckpoint once completed */ CompletableFuture triggerCheckpoint( - final CheckpointType checkpointType, @RpcTimeout final Time timeout); + final CheckpointType checkpointType, @RpcTimeout final Duration timeout); /** * Triggers taking a checkpoint of the executed job. @@ -241,7 +240,7 @@ CompletableFuture triggerCheckpoint( * @param timeout for the rpc call * @return Future which is completed with the checkpoint path once completed */ - default CompletableFuture triggerCheckpoint(@RpcTimeout final Time timeout) { + default CompletableFuture triggerCheckpoint(@RpcTimeout final Duration timeout) { return triggerCheckpoint(CheckpointType.DEFAULT, timeout) .thenApply(CompletedCheckpoint::getExternalPointer); } @@ -259,7 +258,7 @@ CompletableFuture stopWithSavepoint( @Nullable final String targetDirectory, final SavepointFormatType formatType, final boolean terminate, - @RpcTimeout final Time timeout); + @RpcTimeout final Duration timeout); /** * Notifies that not enough resources are available to fulfill the resource requirements of a @@ -295,7 +294,7 @@ CompletableFuture updateGlobalAggregate( CompletableFuture deliverCoordinationRequestToCoordinator( OperatorID operatorId, SerializedValue serializedRequest, - @RpcTimeout Time timeout); + @RpcTimeout Duration timeout); /** * Notifies the {@link org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java index 86a3200e421a0b..6e1a35b5015740 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -127,7 +128,7 @@ public void removeMetric(Metric metric) { @Override public CompletableFuture queryMetrics( - Time timeout) { + Duration timeout) { return callAsync( () -> enforceSizeLimit(serializer.serialize(counters, gauges, histograms, meters)), TimeUtils.toDuration(timeout)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 2bfe899380dfb7..2a1914d4e814f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.io.FileOutputFormat; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; @@ -160,7 +159,7 @@ public class MiniCluster implements AutoCloseableAsync { /** The configuration for this mini cluster. */ private final MiniClusterConfiguration miniClusterConfiguration; - private final Time rpcTimeout; + private final Duration rpcTimeout; @GuardedBy("lock") private final List taskManagers; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java index 693e04bd31a350..7f46a2b107ab35 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdService.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; @@ -34,6 +33,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -57,7 +57,7 @@ public class DefaultJobLeaderIdService implements JobLeaderIdService { private final ScheduledExecutor scheduledExecutor; - private final Time jobTimeout; + private final Duration jobTimeout; /** Map of currently monitored jobs. */ private final Map jobLeaderIdListeners; @@ -68,7 +68,7 @@ public class DefaultJobLeaderIdService implements JobLeaderIdService { public DefaultJobLeaderIdService( HighAvailabilityServices highAvailabilityServices, ScheduledExecutor scheduledExecutor, - Time jobTimeout) { + Duration jobTimeout) { this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices"); this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor"); @@ -333,7 +333,7 @@ public void run() { jobId, newTimeoutId); } }, - jobTimeout.toMilliseconds(), + jobTimeout.toMillis(), TimeUnit.MILLISECONDS); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 275bc6299e8436..938f3c58621bfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.TransientBlobKey; @@ -76,7 +75,7 @@ CompletableFuture registerJobMaster( ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, - @RpcTimeout Time timeout); + @RpcTimeout Duration timeout); /** * Declares the absolute resource requirements for a job. @@ -98,7 +97,7 @@ CompletableFuture declareRequiredResources( * @return The future to the response by the ResourceManager. */ CompletableFuture registerTaskExecutor( - TaskExecutorRegistration taskExecutorRegistration, @RpcTimeout Time timeout); + TaskExecutorRegistration taskExecutorRegistration, @RpcTimeout Duration timeout); /** * Sends the given {@link SlotReport} to the ResourceManager. @@ -114,7 +113,7 @@ CompletableFuture sendSlotReport( ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, - @RpcTimeout Time timeout); + @RpcTimeout Duration timeout); /** * Sent by the TaskExecutor to notify the ResourceManager that a slot has become available. @@ -183,7 +182,8 @@ CompletableFuture heartbeatFromTaskManager( * @param timeout of the request * @return Future collection of TaskManager information */ - CompletableFuture> requestTaskManagerInfo(@RpcTimeout Time timeout); + CompletableFuture> requestTaskManagerInfo( + @RpcTimeout Duration timeout); /** * Requests detail information about the given {@link TaskExecutor}. @@ -193,7 +193,7 @@ CompletableFuture heartbeatFromTaskManager( * @return Future TaskManager information and its allocated slots */ CompletableFuture requestTaskManagerDetailsInfo( - ResourceID taskManagerId, @RpcTimeout Time timeout); + ResourceID taskManagerId, @RpcTimeout Duration timeout); /** * Requests the resource overview. The resource overview provides information about the @@ -202,7 +202,7 @@ CompletableFuture requestTaskManagerDetailsInfo( * @param timeout of the request * @return Future containing the resource overview */ - CompletableFuture requestResourceOverview(@RpcTimeout Time timeout); + CompletableFuture requestResourceOverview(@RpcTimeout Duration timeout); /** * Requests the paths for the TaskManager's {@link MetricQueryService} to query. @@ -212,7 +212,7 @@ CompletableFuture requestTaskManagerDetailsInfo( * service path */ CompletableFuture>> - requestTaskManagerMetricQueryServiceAddresses(@RpcTimeout Time timeout); + requestTaskManagerMetricQueryServiceAddresses(@RpcTimeout Duration timeout); /** * Request the file upload from the given {@link TaskExecutor} to the cluster's {@link @@ -225,7 +225,7 @@ CompletableFuture requestTaskManagerDetailsInfo( * to the {@link BlobServer}. */ CompletableFuture requestTaskManagerFileUploadByType( - ResourceID taskManagerId, FileType fileType, @RpcTimeout Time timeout); + ResourceID taskManagerId, FileType fileType, @RpcTimeout Duration timeout); /** * Request the file upload from the given {@link TaskExecutor} to the cluster's {@link @@ -252,7 +252,7 @@ CompletableFuture requestTaskManagerFileUploadByNameAndType( * @return Future which is completed with the historical log list */ CompletableFuture> requestTaskManagerLogList( - ResourceID taskManagerId, @RpcTimeout Time timeout); + ResourceID taskManagerId, @RpcTimeout Duration timeout); /** * Requests the thread dump from the given {@link TaskExecutor}. @@ -263,7 +263,7 @@ CompletableFuture> requestTaskManagerLogList( * @return Future containing the thread dump information */ CompletableFuture requestThreadDump( - ResourceID taskManagerId, @RpcTimeout Time timeout); + ResourceID taskManagerId, @RpcTimeout Duration timeout); /** * Requests the {@link TaskExecutorGateway}. @@ -272,7 +272,7 @@ CompletableFuture requestThreadDump( * @return Future containing the task executor gateway. */ CompletableFuture requestTaskExecutorThreadInfoGateway( - ResourceID taskManagerId, @RpcTimeout Time timeout); + ResourceID taskManagerId, @RpcTimeout Duration timeout); /** * Request profiling list from the given {@link TaskExecutor}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java index c35059c5be580d..6a5b35119ad6c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.resourcemanager; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration; @@ -30,17 +29,17 @@ /** Configuration class for the {@link ResourceManagerRuntimeServices} class. */ public class ResourceManagerRuntimeServicesConfiguration { - private final Time jobTimeout; + private final Duration jobTimeout; private final SlotManagerConfiguration slotManagerConfiguration; public ResourceManagerRuntimeServicesConfiguration( - Time jobTimeout, SlotManagerConfiguration slotManagerConfiguration) { + Duration jobTimeout, SlotManagerConfiguration slotManagerConfiguration) { this.jobTimeout = Preconditions.checkNotNull(jobTimeout); this.slotManagerConfiguration = Preconditions.checkNotNull(slotManagerConfiguration); } - public Time getJobTimeout() { + public Duration getJobTimeout() { return jobTimeout; } @@ -55,10 +54,10 @@ public static ResourceManagerRuntimeServicesConfiguration fromConfiguration( throws ConfigurationException { final Duration strJobTimeout = configuration.get(ResourceManagerOptions.JOB_TIMEOUT); - final Time jobTimeout; + final Duration jobTimeout; try { - jobTimeout = Time.milliseconds(strJobTimeout.toMillis()); + jobTimeout = strJobTimeout; } catch (IllegalArgumentException e) { throw new ConfigurationException( "Could not parse the resource manager's job timeout " diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 477418cfaa45ff..152609b6a72837 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.rest; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; @@ -98,6 +97,7 @@ import java.nio.channels.spi.SelectorProvider; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -298,21 +298,21 @@ List getOutboundChannelHandlerFactories() { @Override public CompletableFuture closeAsync() { - return shutdownInternally(Time.seconds(10L)); + return shutdownInternally(Duration.ofSeconds(10L)); } - public void shutdown(Time timeout) { + public void shutdown(Duration timeout) { final CompletableFuture shutDownFuture = shutdownInternally(timeout); try { - shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + shutDownFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); LOG.debug("Rest endpoint shutdown complete."); } catch (Exception e) { LOG.warn("Rest endpoint shutdown failed.", e); } } - private CompletableFuture shutdownInternally(Time timeout) { + private CompletableFuture shutdownInternally(Duration timeout) { if (isRunning.compareAndSet(true, false)) { LOG.debug("Shutting down rest endpoint."); @@ -321,7 +321,7 @@ private CompletableFuture shutdownInternally(Time timeout) { bootstrap .config() .group() - .shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .shutdownGracefully(0L, timeout.toMillis(), TimeUnit.MILLISECONDS) .addListener( finished -> { notifyResponseFuturesOfShutdown(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 1593dd86ed71b8..5bde205afc3ede 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -77,7 +76,7 @@ CompletableFuture requestSlot( ResourceProfile resourceProfile, String targetAddress, ResourceManagerId resourceManagerId, - @RpcTimeout Time timeout); + @RpcTimeout Duration timeout); /** * Submit a {@link Task} to the {@link TaskExecutor}. @@ -88,7 +87,7 @@ CompletableFuture requestSlot( * @return Future acknowledge of the successful operation */ CompletableFuture submitTask( - TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout); + TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Duration timeout); /** * Update the task where the given partitions can be found. @@ -101,7 +100,7 @@ CompletableFuture submitTask( CompletableFuture updatePartitions( ExecutionAttemptID executionAttemptID, Iterable partitionInfos, - @RpcTimeout Time timeout); + @RpcTimeout Duration timeout); /** * Batch release intermediate result partitions. @@ -129,7 +128,7 @@ CompletableFuture promotePartitions( * @return Future acknowledge that the request was received */ CompletableFuture releaseClusterPartitions( - Collection dataSetsToRelease, @RpcTimeout Time timeout); + Collection dataSetsToRelease, @RpcTimeout Duration timeout); /** * Trigger the checkpoint for the given task. The checkpoint is identified by the checkpoint ID @@ -187,7 +186,7 @@ CompletableFuture abortCheckpoint( * @return Future acknowledge if the task is successfully canceled */ CompletableFuture cancelTask( - ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout); + ExecutionAttemptID executionAttemptID, @RpcTimeout Duration timeout); /** * Heartbeat request from the job manager. @@ -230,7 +229,9 @@ CompletableFuture heartbeatFromJobManager( * @return Future acknowledge which is returned once the slot has been freed */ CompletableFuture freeSlot( - final AllocationID allocationId, final Throwable cause, @RpcTimeout final Time timeout); + final AllocationID allocationId, + final Throwable cause, + @RpcTimeout final Duration timeout); /** * Frees all currently inactive slot allocated for the given job. @@ -238,7 +239,7 @@ CompletableFuture freeSlot( * @param jobId job for which all inactive slots should be released * @param timeout for the operation */ - void freeInactiveSlots(JobID jobId, @RpcTimeout Time timeout); + void freeInactiveSlots(JobID jobId, @RpcTimeout Duration timeout); /** * Requests the file upload of the specified type to the cluster's {@link BlobServer}. @@ -248,7 +249,7 @@ CompletableFuture freeSlot( * @return Future which is completed with the {@link TransientBlobKey} of the uploaded file. */ CompletableFuture requestFileUploadByType( - FileType fileType, @RpcTimeout Time timeout); + FileType fileType, @RpcTimeout Duration timeout); /** * Requests the file upload of the specified name to the cluster's {@link BlobServer}. @@ -278,7 +279,7 @@ CompletableFuture requestFileUploadByNameAndType( * @return Future gateway of Metric Query Service on the TaskManager. */ CompletableFuture> requestMetricQueryServiceAddress( - @RpcTimeout Time timeout); + @RpcTimeout Duration timeout); /** * Checks whether the task executor can be released. It cannot be released if there're @@ -293,7 +294,7 @@ CompletableFuture> requestMetricQueryServiceAddress * * @return A Tuple2 Array with all log file names with its length. */ - CompletableFuture> requestLogList(@RpcTimeout Time timeout); + CompletableFuture> requestLogList(@RpcTimeout Duration timeout); @Override CompletableFuture sendOperatorEventToTask( @@ -305,7 +306,7 @@ CompletableFuture sendOperatorEventToTask( * @param timeout timeout for the asynchronous operation * @return the {@link ThreadDumpInfo} for this TaskManager. */ - CompletableFuture requestThreadDump(@RpcTimeout Time timeout); + CompletableFuture requestThreadDump(@RpcTimeout Duration timeout); /** * Sends new delegation tokens to this TaskManager. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index 7aa246bd705a68..0031131d78fbc2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; @@ -48,6 +47,7 @@ import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; +import java.time.Duration; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -66,7 +66,7 @@ public interface RestfulGateway extends RpcGateway { * @param timeout of the operation * @return A future acknowledge if the cancellation succeeded */ - CompletableFuture cancelJob(JobID jobId, @RpcTimeout Time timeout); + CompletableFuture cancelJob(JobID jobId, @RpcTimeout Duration timeout); /** * Requests the {@link ArchivedExecutionGraph} for the given jobId. If there is no such graph, @@ -78,7 +78,7 @@ public interface RestfulGateway extends RpcGateway { * {@link FlinkJobNotFoundException} */ default CompletableFuture requestJob( - JobID jobId, @RpcTimeout Time timeout) { + JobID jobId, @RpcTimeout Duration timeout) { return requestExecutionGraphInfo(jobId, timeout) .thenApply(ExecutionGraphInfo::getArchivedExecutionGraph); } @@ -94,7 +94,7 @@ default CompletableFuture requestJob( * {@link FlinkJobNotFoundException} */ CompletableFuture requestExecutionGraphInfo( - JobID jobId, @RpcTimeout Time timeout); + JobID jobId, @RpcTimeout Duration timeout); /** * Requests the {@link CheckpointStatsSnapshot} containing checkpointing information. @@ -104,7 +104,7 @@ CompletableFuture requestExecutionGraphInfo( * @return Future containing the {@link CheckpointStatsSnapshot} for the given jobId */ CompletableFuture requestCheckpointStats( - JobID jobId, @RpcTimeout Time timeout); + JobID jobId, @RpcTimeout Duration timeout); /** * Requests the {@link JobResult} of a job specified by the given jobId. @@ -113,7 +113,7 @@ CompletableFuture requestCheckpointStats( * @param timeout for the asynchronous operation * @return Future which is completed with the job's {@link JobResult} once the job has finished */ - CompletableFuture requestJobResult(JobID jobId, @RpcTimeout Time timeout); + CompletableFuture requestJobResult(JobID jobId, @RpcTimeout Duration timeout); /** * Requests job details currently being executed on the Flink cluster. @@ -121,7 +121,7 @@ CompletableFuture requestCheckpointStats( * @param timeout for the asynchronous operation * @return Future containing the job details */ - CompletableFuture requestMultipleJobDetails(@RpcTimeout Time timeout); + CompletableFuture requestMultipleJobDetails(@RpcTimeout Duration timeout); /** * Requests the cluster status overview. @@ -129,7 +129,7 @@ CompletableFuture requestCheckpointStats( * @param timeout for the asynchronous operation * @return Future containing the status overview */ - CompletableFuture requestClusterOverview(@RpcTimeout Time timeout); + CompletableFuture requestClusterOverview(@RpcTimeout Duration timeout); /** * Requests the addresses of the {@link MetricQueryService} to query. @@ -138,7 +138,7 @@ CompletableFuture requestCheckpointStats( * @return Future containing the collection of metric query service addresses to query */ CompletableFuture> requestMetricQueryServiceAddresses( - @RpcTimeout Time timeout); + @RpcTimeout Duration timeout); /** * Requests the addresses for the TaskManagers' {@link MetricQueryService} to query. @@ -148,7 +148,7 @@ CompletableFuture> requestMetricQueryServiceAddresses( * service address */ CompletableFuture>> - requestTaskManagerMetricQueryServiceAddresses(@RpcTimeout Time timeout); + requestTaskManagerMetricQueryServiceAddresses(@RpcTimeout Duration timeout); /** * Requests the thread dump from the JobManager. @@ -156,7 +156,7 @@ CompletableFuture> requestMetricQueryServiceAddresses( * @param timeout timeout of the asynchronous operation * @return Future containing the thread dump information */ - CompletableFuture requestThreadDump(@RpcTimeout Time timeout); + CompletableFuture requestThreadDump(@RpcTimeout Duration timeout); /** * Triggers a checkpoint with the given savepoint directory as a target. @@ -170,7 +170,7 @@ CompletableFuture> requestMetricQueryServiceAddresses( default CompletableFuture triggerCheckpoint( AsynchronousJobOperationKey operationKey, CheckpointType checkpointType, - @RpcTimeout Time timeout) { + @RpcTimeout Duration timeout) { throw new UnsupportedOperationException(); } @@ -202,7 +202,7 @@ default CompletableFuture triggerSavepoint( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - @RpcTimeout Time timeout) { + @RpcTimeout Duration timeout) { throw new UnsupportedOperationException(); } @@ -222,7 +222,7 @@ default CompletableFuture stopWithSavepoint( final String targetDirectory, SavepointFormatType formatType, final TriggerSavepointMode savepointMode, - @RpcTimeout final Time timeout) { + @RpcTimeout final Duration timeout) { throw new UnsupportedOperationException(); } @@ -246,7 +246,7 @@ default CompletableFuture> getTriggeredSavepointStatus( * @return A future acknowledge if the disposal succeeded */ default CompletableFuture disposeSavepoint( - final String savepointPath, @RpcTimeout final Time timeout) { + final String savepointPath, @RpcTimeout final Duration timeout) { throw new UnsupportedOperationException(); } @@ -257,7 +257,8 @@ default CompletableFuture disposeSavepoint( * @param timeout for the asynchronous operation * @return A future to the {@link JobStatus} of the given job */ - default CompletableFuture requestJobStatus(JobID jobId, @RpcTimeout Time timeout) { + default CompletableFuture requestJobStatus( + JobID jobId, @RpcTimeout Duration timeout) { throw new UnsupportedOperationException(); } @@ -281,13 +282,13 @@ default CompletableFuture deliverCoordinationRequestToCoor JobID jobId, OperatorID operatorId, SerializedValue serializedRequest, - @RpcTimeout Time timeout) { + @RpcTimeout Duration timeout) { throw new UnsupportedOperationException(); } /** The client reports the heartbeat to the dispatcher for aliveness. */ default CompletableFuture reportJobClientHeartbeat( - JobID jobId, long expiredTimestamp, Time timeout) { + JobID jobId, long expiredTimestamp, Duration timeout) { return FutureUtils.completedVoidFuture(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java index b36e2d7ce93bab..cba78d3d66fe5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/MetricQueryServiceGateway.java @@ -18,15 +18,16 @@ package org.apache.flink.runtime.webmonitor.retriever; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.rpc.RpcGateway; +import java.time.Duration; import java.util.concurrent.CompletableFuture; /** {@link MetricQueryService} rpc gateway interface. */ public interface MetricQueryServiceGateway extends RpcGateway { - CompletableFuture queryMetrics(Time timeout); + CompletableFuture queryMetrics( + Duration timeout); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java index 7550372c353bcd..ce191522db5fba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java @@ -19,7 +19,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; @@ -45,12 +44,14 @@ import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; +import java.time.Duration; + /** Abstract test for the {@link Dispatcher} component. */ public class AbstractDispatcherTest extends TestLogger { static TestingRpcService rpcService; - static final Time TIMEOUT = Time.minutes(1L); + static final Duration TIMEOUT = Duration.ofMinutes(1L); @BeforeClass public static void setupClass() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java index 1272f41e3c9be7..7768e36724c024 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCachedOperationsHandlerTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.RestOptions; @@ -51,7 +50,7 @@ /** Tests for the {@link DispatcherCachedOperationsHandler} component. */ public class DispatcherCachedOperationsHandlerTest extends TestLogger { - private static final Time TIMEOUT = Time.minutes(10); + private static final Duration TIMEOUT = Duration.ofMinutes(10); private CompletedOperationCache checkpointTriggerCache; private CompletedOperationCache savepointTriggerCache; @@ -76,7 +75,7 @@ public void setup() { new TriggerCheckpointSpyFunction() { @Override CompletableFuture applyWrappedFunction( - JobID jobID, CheckpointType checkpointType, Time timeout) { + JobID jobID, CheckpointType checkpointType, Duration timeout) { return checkpointIdFuture; } }); @@ -243,13 +242,13 @@ private abstract static class TriggerCheckpointSpyFunction @Override public CompletableFuture apply( - JobID jobID, CheckpointType checkpointType, Time timeout) { + JobID jobID, CheckpointType checkpointType, Duration timeout) { invocations.add(new Tuple2<>(jobID, checkpointType)); return applyWrappedFunction(jobID, checkpointType, timeout); } abstract CompletableFuture applyWrappedFunction( - JobID jobID, CheckpointType checkpointType, Time timeout); + JobID jobID, CheckpointType checkpointType, Duration timeout); public List> getInvocationParameters() { return invocations; @@ -264,7 +263,7 @@ public static TriggerCheckpointSpyFunction wrap( return new TriggerCheckpointSpyFunction() { @Override CompletableFuture applyWrappedFunction( - JobID jobID, CheckpointType checkpointType, Time timeout) { + JobID jobID, CheckpointType checkpointType, Duration timeout) { return wrappedFunction.apply(jobID, checkpointType, timeout); } }; @@ -282,7 +281,7 @@ public CompletableFuture apply( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - Time timeout) { + Duration timeout) { invocations.add(new Tuple4<>(jobID, targetDirectory, formatType, savepointMode)); return applyWrappedFunction(jobID, targetDirectory, formatType, savepointMode, timeout); } @@ -292,7 +291,7 @@ abstract CompletableFuture applyWrappedFunction( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - Time timeout); + Duration timeout); public List> getInvocationParameters() { @@ -311,7 +310,7 @@ CompletableFuture applyWrappedFunction( String targetDirectory, SavepointFormatType formatType, TriggerSavepointMode savepointMode, - Time timeout) { + Duration timeout) { return wrappedFunction.apply( jobID, targetDirectory, formatType, savepointMode, timeout); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index bd7f18f3ba922a..10d822c49434f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -71,6 +71,7 @@ import org.junit.rules.TemporaryFolder; import java.io.IOException; +import java.time.Duration; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collection; @@ -103,7 +104,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); - private static final Time timeout = Time.seconds(10L); + private static final Duration timeout = Duration.ofSeconds(10L); private static TestingRpcService rpcService; @@ -647,7 +648,7 @@ public void testFailingJobManagerRunnerCleanup() throws Exception { // submit and fail during job master runner construction queue.offer(Optional.of(testException)); try { - dispatcherGateway.submitJob(jobGraph, Time.minutes(1)).get(); + dispatcherGateway.submitJob(jobGraph, Duration.ofMinutes(1)).get(); fail("A FlinkException is expected"); } catch (Throwable expectedException) { assertThat(expectedException, containsCause(FlinkException.class)); @@ -659,7 +660,7 @@ public void testFailingJobManagerRunnerCleanup() throws Exception { // don't fail this time queue.offer(Optional.empty()); // submit job again - dispatcherGateway.submitJob(jobGraph, Time.minutes(1L)).get(); + dispatcherGateway.submitJob(jobGraph, Duration.ofMinutes(1L)).get(); blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING); // Ensure job is running @@ -684,7 +685,7 @@ public void testArchivingFinishedJobToHistoryServer() throws Exception { // terminated assertThatNoCleanupWasTriggered(); final CompletableFuture jobTerminationFuture = - dispatcher.getJobTerminationFuture(jobId, Time.hours(1)); + dispatcher.getJobTerminationFuture(jobId, Duration.ofHours(1)); assertFalse(jobTerminationFuture.isDone()); archiveFuture.complete(Acknowledge.get()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java index 1e3b678f103999..be3dd535d755c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; @@ -51,6 +50,7 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -70,7 +70,7 @@ */ public class JobMasterTester implements Closeable { - private static final Time TIMEOUT = Time.minutes(1); + private static final Duration TIMEOUT = Duration.ofMinutes(1); private static TaskStateSnapshot createNonEmptyStateSnapshot(TaskInformation taskInformation) { final TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 2e8c1b5a31f8fe..cce17718175872 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; @@ -60,6 +59,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -73,7 +73,7 @@ /** Tests for the {@link MiniDispatcher}. */ public class MiniDispatcherTest extends TestLogger { - private static final Time timeout = Time.seconds(10L); + private static final Duration timeout = Duration.ofSeconds(10L); @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -309,7 +309,7 @@ public void testShutdownIfJobCancelledInNormalMode() throws Exception { final DispatcherGateway dispatcherGateway = miniDispatcher.getSelfGateway(DispatcherGateway.class); - dispatcherGateway.cancelJob(jobGraph.getJobID(), Time.seconds(10L)); + dispatcherGateway.cancelJob(jobGraph.getJobID(), Duration.ofSeconds(10L)); testingJobManagerRunner.completeResultFuture( new ExecutionGraphInfo( new ArchivedExecutionGraphBuilder() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java index 0e6424a5cfed17..cc12ce5d389348 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.dispatcher.cleanup.CleanupRunnerFactory; @@ -47,11 +46,11 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.TimeUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.concurrent.Callable; @@ -148,13 +147,14 @@ CompletableFuture callAsyncInMainThread(Callable> ca return callAsync(callable, TestingUtils.TESTING_DURATION).thenCompose(Function.identity()); } - CompletableFuture getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) { - return callAsync(() -> getJobTerminationFuture(jobId), TimeUtils.toDuration(timeout)) + CompletableFuture getJobTerminationFuture( + @Nonnull JobID jobId, @Nonnull Duration timeout) { + return callAsync(() -> getJobTerminationFuture(jobId), timeout) .thenCompose(Function.identity()); } - CompletableFuture getNumberJobs(Time timeout) { - return callAsync(() -> listJobs(timeout).get().size(), TimeUtils.toDuration(timeout)); + CompletableFuture getNumberJobs(Duration timeout) { + return callAsync(() -> listJobs(timeout).get().size(), timeout); } void waitUntilStarted() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java index 882177e419b7b0..cebb193b8d6b29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.RestoreMode; import org.apache.flink.core.testutils.OneShotLatch; @@ -46,6 +45,7 @@ import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -63,7 +63,7 @@ */ class CheckpointResourcesCleanupRunnerTest { - private static final Time TIMEOUT_FOR_REQUESTS = Time.milliseconds(0); + private static final Duration TIMEOUT_FOR_REQUESTS = Duration.ofMillis(0); private static final ThrowingConsumer BEFORE_START = ignored -> {}; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java index 641bf7fc866a24..fa0f41efc2743b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.dispatcher.runner; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.AllCallbackWrapper; import org.apache.flink.runtime.dispatcher.Dispatcher; @@ -62,6 +61,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.UUID; @@ -76,7 +76,7 @@ class DefaultDispatcherRunnerITCase { private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunnerITCase.class); - private static final Time TIMEOUT = Time.seconds(10L); + private static final Duration TIMEOUT = Duration.ofSeconds(10L); @RegisterExtension public static AllCallbackWrapper rpcServiceExtensionWrapper = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java index e0192d57d900ef..a13d805322e4f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.dispatcher.runner; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.testutils.AllCallbackWrapper; @@ -74,6 +73,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.time.Duration; import java.util.Collections; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -88,7 +88,7 @@ class ZooKeeperDefaultDispatcherRunnerTest { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDefaultDispatcherRunnerTest.class); - private static final Time TESTING_TIMEOUT = Time.seconds(10L); + private static final Duration TESTING_TIMEOUT = Duration.ofSeconds(10L); @RegisterExtension public static AllCallbackWrapper zooKeeperExtensionWrapper = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index e4271234171048..5e0fc608002555 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -48,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; @@ -62,7 +63,7 @@ public static TestingDefaultExecutionGraphBuilder newBuilder() { return new TestingDefaultExecutionGraphBuilder(); } - private Time rpcTimeout = Time.fromDuration(RpcOptions.ASK_TIMEOUT_DURATION.defaultValue()); + private Duration rpcTimeout = RpcOptions.ASK_TIMEOUT_DURATION.defaultValue(); private ClassLoader userClassLoader = DefaultExecutionGraph.class.getClassLoader(); private BlobWriter blobWriter = VoidBlobWriter.getInstance(); private ShuffleMaster shuffleMaster = ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER; @@ -99,7 +100,7 @@ public TestingDefaultExecutionGraphBuilder setJobGraph(JobGraph jobGraph) { return this; } - public TestingDefaultExecutionGraphBuilder setRpcTimeout(Time rpcTimeout) { + public TestingDefaultExecutionGraphBuilder setRpcTimeout(Duration rpcTimeout) { this.rpcTimeout = rpcTimeout; return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java index edd34a136ffa00..a365786d5b5401 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java @@ -53,6 +53,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -67,7 +68,7 @@ /** Tests for the execution deployment-reconciliation logic in the {@link JobMaster}. */ class JobMasterExecutionDeploymentReconciliationTest { - private static final Time testingTimeout = Time.seconds(10L); + private static final Duration testingTimeout = Duration.ofSeconds(10L); private final HeartbeatServices heartbeatServices = new HeartbeatServicesImpl(Integer.MAX_VALUE, Integer.MAX_VALUE); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java index 3477f74fde40e7..00a738c0fdc931 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; @@ -61,6 +60,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -78,7 +78,7 @@ class JobMasterPartitionReleaseTest { @TempDir private static File temporaryFolder; - private static final Time testingTimeout = Time.seconds(10L); + private static final Duration testingTimeout = Duration.ofSeconds(10L); private static TestingRpcService rpcService; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java index 052216d6644c27..b5459086235ee7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java @@ -49,6 +49,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -59,7 +60,7 @@ /** Tests for the queryable-state logic of the {@link JobMaster}. */ class JobMasterQueryableStateTest { - private static final Time testingTimeout = Time.seconds(10L); + private static final Duration testingTimeout = Duration.ofSeconds(10L); private static TestingRpcService rpcService; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index 20fa28e3e4e1c1..dfe5869581b518 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -80,7 +79,7 @@ /** Tests for the {@link JobMasterServiceLeadershipRunner}. */ class JobMasterServiceLeadershipRunnerTest { - private static final Time TESTING_TIMEOUT = Time.seconds(10); + private static final Duration TESTING_TIMEOUT = Duration.ofSeconds(10); private static JobGraph jobGraph; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 7bfc1cbf54a9ae..288200da872dfb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -189,7 +189,7 @@ class JobMasterTest { @TempDir private Path temporaryFolder; - private static final Time testingTimeout = Time.seconds(10L); + private static final Duration testingTimeout = Duration.ofSeconds(10L); private static final long fastHeartbeatInterval = 1L; private static final long fastHeartbeatTimeout = 10L; @@ -369,7 +369,7 @@ private void runHeartbeatTest( final JobID disconnectedJobManager = disconnectedJobManagerFuture.get( - testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + testingTimeout.toMillis(), TimeUnit.MILLISECONDS); assertThat(disconnectedJobManager).isEqualTo(jobGraph.getJobID()); } @@ -704,7 +704,7 @@ void testHeartbeatTimeoutWithResourceManager() throws Exception { // register job manager success will trigger monitor heartbeat target between jm and rm final Tuple3 registrationInformation = jobManagerRegistrationFuture.get( - testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + testingTimeout.toMillis(), TimeUnit.MILLISECONDS); assertThat(registrationInformation.f0).isEqualTo(jobMasterId); assertThat(registrationInformation.f1).isEqualTo(jmResourceId); @@ -712,7 +712,7 @@ void testHeartbeatTimeoutWithResourceManager() throws Exception { final JobID disconnectedJobManager = disconnectedJobManagerFuture.get( - testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + testingTimeout.toMillis(), TimeUnit.MILLISECONDS); // heartbeat timeout should trigger disconnect JobManager from ResourceManager assertThat(disconnectedJobManager).isEqualTo(jobGraph.getJobID()); @@ -1458,7 +1458,7 @@ void testTriggerSavepointTimeout() throws Exception { jobMaster.getSelfGateway(JobMasterGateway.class); final CompletableFuture savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint( - "/tmp", false, SavepointFormatType.CANONICAL, Time.milliseconds(1)); + "/tmp", false, SavepointFormatType.CANONICAL, Duration.ofMillis(1)); final CompletableFuture savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint( "/tmp", false, SavepointFormatType.CANONICAL, RpcUtils.INF_TIMEOUT); @@ -1466,7 +1466,7 @@ void testTriggerSavepointTimeout() throws Exception { assertThatThrownBy( () -> savepointFutureLowTimeout.get( - testingTimeout.getSize(), testingTimeout.getUnit())) + testingTimeout.toMillis(), TimeUnit.MILLISECONDS)) .hasRootCauseInstanceOf(TimeoutException.class); assertThat(savepointFutureHighTimeout).isNotDone(); @@ -1597,7 +1597,7 @@ void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throw // trigger some request to guarantee ensure the slotAllocationFailure processing if // complete - jobMasterGateway.requestJobStatus(Time.seconds(5)).get(); + jobMasterGateway.requestJobStatus(Duration.ofSeconds(5)).get(); assertThat(disconnectTaskExecutorFuture).isNotDone(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java index c846de6888f408..c1be38661183f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; import org.apache.flink.testutils.TestingUtils; +import java.time.Duration; import java.util.Collection; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -42,7 +43,7 @@ public static void registerTaskExecutorAndOfferSlots( JobID jobId, int numSlots, TaskExecutorGateway taskExecutorGateway, - Time testingTimeout) + Duration estingTimeout) throws ExecutionException, InterruptedException { final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java index 51652b843079e8..8bc367b3ed8301 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.metrics; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; @@ -50,6 +49,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -102,7 +102,7 @@ void testMetricQueryServiceSetup() throws Exception { for (int x = 0; x < 10; x++) { MetricDumpSerialization.MetricSerializationResult metricSerializationResult = metricQueryServiceGateway - .queryMetrics(Time.seconds(5)) + .queryMetrics(Duration.ofSeconds(5)) .get(5, TimeUnit.SECONDS); if (metricSerializationResult.numCounters == 1) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java index 3d03820cc7f87f..16be2ac175d35a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.metrics.dump; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; @@ -37,6 +36,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.List; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -46,7 +46,7 @@ /** Tests for the {@link MetricQueryService}. */ class MetricQueryServiceTest { - private static final Time TIMEOUT = Time.seconds(1); + private static final Duration TIMEOUT = Duration.ofSeconds(1); private static TestingRpcService rpcService; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/TestingMetricQueryServiceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/TestingMetricQueryServiceGateway.java index 74a844688b787f..70f87e8ed63e49 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/TestingMetricQueryServiceGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/TestingMetricQueryServiceGateway.java @@ -18,11 +18,11 @@ package org.apache.flink.runtime.metrics.dump; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway; import javax.annotation.Nonnull; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -46,7 +46,7 @@ public TestingMetricQueryServiceGateway( @Override public CompletableFuture queryMetrics( - Time timeout) { + Duration timeout) { return queryMetricsSupplier.get(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java index 83801e4f9e66ab..4412b66f7ef991 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/DefaultJobLeaderIdServiceTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; @@ -30,6 +29,7 @@ import org.junit.jupiter.api.Timeout; import org.mockito.ArgumentCaptor; +import java.time.Duration; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Queue; @@ -69,7 +69,7 @@ void testAddingJob() throws Exception { highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); - Time timeout = Time.milliseconds(5000L); + Duration timeout = Duration.ofMillis(5000L); JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); JobLeaderIdService jobLeaderIdService = @@ -102,7 +102,7 @@ void testRemovingJob() throws Exception { highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); - Time timeout = Time.milliseconds(5000L); + Duration timeout = Duration.ofMillis(5000L); JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); JobLeaderIdService jobLeaderIdService = @@ -140,7 +140,7 @@ void testInitialJobTimeout() throws Exception { highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); - Time timeout = Time.milliseconds(5000L); + Duration timeout = Duration.ofMillis(5000L); JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); JobLeaderIdService jobLeaderIdService = @@ -200,7 +200,7 @@ void jobTimeoutAfterLostLeadership() throws Exception { .when(scheduledExecutor) .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); - Time timeout = Time.milliseconds(5000L); + Duration timeout = Duration.ofMillis(5000L); JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); final AtomicReference lastTimeoutId = new AtomicReference<>(); @@ -286,7 +286,7 @@ void testLeaderFutureWaitsForValidLeader() throws Exception { new DefaultJobLeaderIdService( highAvailabilityServices, new ManuallyTriggeredScheduledExecutor(), - Time.milliseconds(5000L)); + Duration.ofMillis(5000L)); jobLeaderIdService.start(new NoOpJobLeaderIdActions()); @@ -320,7 +320,7 @@ void testIsStarted() throws Exception { new SettableLeaderRetrievalService(null, null); highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); - Time timeout = Time.milliseconds(5000L); + Duration timeout = Duration.ofMillis(5000L); JobLeaderIdActions jobLeaderIdActions = mock(JobLeaderIdActions.class); DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, timeout); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 873e9293c6eb8d..2f140ac1d1426f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMaster; @@ -43,6 +42,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -56,7 +56,7 @@ /** Tests for the interaction between the {@link ResourceManager} and the {@link JobMaster}. */ class ResourceManagerJobMasterTest { - private static final Time TIMEOUT = Time.seconds(10L); + private static final Duration TIMEOUT = Duration.ofMillis(10L); private TestingRpcService rpcService; @@ -148,7 +148,7 @@ void testRegisterJobMaster() { jobId, TIMEOUT); assertThatFuture(successfulFuture) - .succeedsWithin(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS) + .succeedsWithin(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) .isInstanceOf(JobMasterRegistrationSuccess.class); } @@ -186,7 +186,7 @@ void testDisconnectTaskManagerInResourceManager() jobId, TIMEOUT); assertThatFuture(successfulFuture) - .succeedsWithin(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS) + .succeedsWithin(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) .isInstanceOf(JobMasterRegistrationSuccess.class); final TaskExecutorGateway taskExecutorGateway = @@ -200,9 +200,9 @@ void testDisconnectTaskManagerInResourceManager() resourceManagerGateway.disconnectTaskManager(taskExecutorId, new Exception("for test")); assertThatFuture(disconnectRMFuture) - .succeedsWithin(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + .succeedsWithin(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); final ResourceID resourceId = - disconnectTMFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + disconnectTMFuture.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); assertThat(resourceId).isEqualTo(taskExecutorId); } @@ -215,7 +215,7 @@ void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception { resourceManagerGateway.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class) - .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + .get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); // test throw exception when receive a registration from job master which takes unmatched // leaderSessionId @@ -288,7 +288,7 @@ void testRegisterJobMasterWithFailureLeaderListener() { assertThatFuture(registrationFuture) .as("Expected to fail with a ResourceManagerException.") - .failsWithin(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS) + .failsWithin(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) .withThrowableOfType(ExecutionException.class) .withCauseInstanceOf(ResourceManagerException.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java index 9253ef9e844a89..33ab238627bab0 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.security.token.DelegationTokenManager; import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager; +import java.time.Duration; + import static org.apache.flink.util.Preconditions.checkNotNull; /** Mock services needed by the resource manager. */ @@ -53,6 +55,6 @@ public MockResourceManagerRuntimeServices(RpcService rpcService, SlotManager slo new DefaultJobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), - Time.minutes(5L)); + Duration.ofMinutes(5L)); } }