diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md index 2635c11f1ecd77..a58b5a0807dcfc 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md @@ -302,7 +302,7 @@ env.execute() ```java import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.time.Time; +import java.time.Duration; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Duration.ofSeconds(1)) @@ -318,7 +318,7 @@ stateDescriptor.enableTimeToLive(ttlConfig); ```scala import org.apache.flink.api.common.state.StateTtlConfig import org.apache.flink.api.common.state.ValueStateDescriptor -import org.apache.flink.api.common.time.Time +import java.time.Duration val ttlConfig = StateTtlConfig .newBuilder(Duration.ofSeconds(1)) @@ -337,7 +337,7 @@ from pyflink.common.typeinfo import Types from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig ttl_config = StateTtlConfig \ - .new_builder(Time.seconds(1)) \ + .new_builder(Duration.ofSeconds(1)) \ .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \ .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \ .build() @@ -420,7 +420,7 @@ from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfig ttl_config = StateTtlConfig \ - .new_builder(Time.seconds(1)) \ + .new_builder(Duration.ofSeconds(1)) \ .disable_cleanup_in_background() \ .build() ``` @@ -438,7 +438,7 @@ ttl_config = StateTtlConfig \ {{< tab "Java" >}} ```java import org.apache.flink.api.common.state.StateTtlConfig; -import org.apache.flink.api.common.time.Time; +import java.time.Duration; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Duration.ofSeconds(1)) @@ -449,7 +449,7 @@ StateTtlConfig ttlConfig = StateTtlConfig {{< tab "Scala" >}} ```scala import org.apache.flink.api.common.state.StateTtlConfig -import org.apache.flink.api.common.time.Time +import java.time.Duration val ttlConfig = StateTtlConfig .newBuilder(Duration.ofSeconds(1)) @@ -463,7 +463,7 @@ from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfig ttl_config = StateTtlConfig \ - .new_builder(Time.seconds(1)) \ + .new_builder(Duration.ofSeconds(1)) \ .cleanup_full_snapshot() \ .build() ``` @@ -507,7 +507,7 @@ from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfig ttl_config = StateTtlConfig \ - .new_builder(Time.seconds(1)) \ + .new_builder(Duration.ofSeconds(1)) \ .cleanup_incrementally(10, True) \ .build() ``` diff --git a/docs/content.zh/docs/ops/state/task_failure_recovery.md b/docs/content.zh/docs/ops/state/task_failure_recovery.md index a2ed511ce97904..9eb4b0a309cff7 100644 --- a/docs/content.zh/docs/ops/state/task_failure_recovery.md +++ b/docs/content.zh/docs/ops/state/task_failure_recovery.md @@ -69,7 +69,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 - Time.of(10, TimeUnit.SECONDS) // 延时 + Duration.ofSeconds(10) // 延时 )) ``` {{< /tab >}} @@ -126,7 +126,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 - Time.of(10, TimeUnit.SECONDS) // 延时 + Duration.ofSeconds(10) // 延时 )) ``` {{< /tab >}} @@ -184,10 +184,10 @@ env.setRestartStrategy(RestartStrategies.exponentialDelayRestart( ```scala val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.exponentialDelayRestart( - Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts - Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts + Duration.ofMillis(1), // initial delay between restarts + Duration.ofMillis(1000), // maximum delay between restarts 1.1, // exponential multiplier - Time.of(2, TimeUnit.SECONDS), // 重置延迟时间到初始值的阈值 + Duration.ofSeconds(2), // 重置延迟时间到初始值的阈值 0.1 // jitter )) ``` @@ -279,8 +279,8 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 每个时间间隔的最大故障次数 - Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔 - Time.of(10, TimeUnit.SECONDS) // 延时 + Duration.ofMinutes(5), // 测量故障率的时间间隔 + Duration.ofSeconds(10) // 延时 )) ``` {{< /tab >}} diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state.md b/docs/content/docs/dev/datastream/fault-tolerance/state.md index 233f9bdc01b5b4..97115f7bba0daf 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/state.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/state.md @@ -341,7 +341,7 @@ functionality can then be enabled in any state descriptor by passing the configu ```java import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.time.Time; +import java.time.Duration; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Duration.ofSeconds(1)) @@ -357,7 +357,7 @@ stateDescriptor.enableTimeToLive(ttlConfig); ```scala import org.apache.flink.api.common.state.StateTtlConfig import org.apache.flink.api.common.state.ValueStateDescriptor -import org.apache.flink.api.common.time.Time +import java.time.Duration val ttlConfig = StateTtlConfig .newBuilder(Duration.ofSeconds(1)) @@ -376,7 +376,7 @@ from pyflink.common.typeinfo import Types from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig ttl_config = StateTtlConfig \ - .new_builder(Time.seconds(1)) \ + .new_builder(Duration.ofSeconds(1)) \ .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \ .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \ .build() @@ -467,7 +467,7 @@ from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfig ttl_config = StateTtlConfig \ - .new_builder(Time.seconds(1)) \ + .new_builder(Duration.ofSeconds(1)) \ .disable_cleanup_in_background() \ .build() ``` @@ -488,7 +488,7 @@ It can be configured in `StateTtlConfig`: {{< tab "Java" >}} ```java import org.apache.flink.api.common.state.StateTtlConfig; -import org.apache.flink.api.common.time.Time; +import java.time.Duration; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Duration.ofSeconds(1)) @@ -499,7 +499,7 @@ StateTtlConfig ttlConfig = StateTtlConfig {{< tab "Scala" >}} ```scala import org.apache.flink.api.common.state.StateTtlConfig -import org.apache.flink.api.common.time.Time +import java.time.Duration val ttlConfig = StateTtlConfig .newBuilder(Duration.ofSeconds(1)) @@ -513,7 +513,7 @@ from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfig ttl_config = StateTtlConfig \ - .new_builder(Time.seconds(1)) \ + .new_builder(Duration.ofSeconds(1)) \ .cleanup_full_snapshot() \ .build() ``` @@ -563,7 +563,7 @@ from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfig ttl_config = StateTtlConfig \ - .new_builder(Time.seconds(1)) \ + .new_builder(Duration.ofSeconds(1)) \ .cleanup_incrementally(10, True) \ .build() ``` @@ -622,7 +622,7 @@ from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfig ttl_config = StateTtlConfig \ - .new_builder(Time.seconds(1)) \ + .new_builder(Duration.ofSeconds(1)) \ .cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \ .build() ``` diff --git a/docs/content/docs/ops/state/task_failure_recovery.md b/docs/content/docs/ops/state/task_failure_recovery.md index a091d79e35b8da..cb16172beb3903 100644 --- a/docs/content/docs/ops/state/task_failure_recovery.md +++ b/docs/content/docs/ops/state/task_failure_recovery.md @@ -73,7 +73,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // number of restart attempts - Time.of(10, TimeUnit.SECONDS) // delay + Duration.ofSeconds(10) // delay )) ``` {{< /tab >}} @@ -129,7 +129,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // number of restart attempts - Time.of(10, TimeUnit.SECONDS) // delay + Duration.ofSeconds(10) // delay )) ``` {{< /tab >}} @@ -188,10 +188,10 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm ```scala val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.exponentialDelayRestart( - Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts - Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts + Duration.ofMillis(1), // initial delay between restarts + Duration.ofMillis(1000), // maximum delay between restarts 1.1, // exponential multiplier - Time.of(2, TimeUnit.SECONDS), // threshold duration to reset delay to its initial value + Duration.ofSeconds(2), // threshold duration to reset delay to its initial value 0.1 // jitter )) ``` @@ -281,8 +281,8 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per unit - Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate - Time.of(10, TimeUnit.SECONDS) // delay + Duration.ofMinutes(5), //time interval for measuring failure rate + Duration.ofSeconds(10) // delay )) ``` {{< /tab >}} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index 2275326de35bc7..97336c7b26577d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; @@ -50,6 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -364,10 +364,8 @@ private CompletableFuture getJobResult( final JobID jobId, final ScheduledExecutor scheduledExecutor, final boolean tolerateMissingResult) { - final Time timeout = - Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); - final Time retryPeriod = - Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis()); + final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); + final Duration retryPeriod = configuration.get(ClientOptions.CLIENT_RETRY_PERIOD); final CompletableFuture jobResultFuture = JobStatusPollingUtils.getJobResult( dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java index e379aa02f0f68a..97bda863b79854 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.dispatcher.DispatcherGateway; @@ -40,6 +39,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.time.Duration; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -59,7 +59,7 @@ public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway private final ScheduledExecutor retryExecutor; - private final Time timeout; + private final Duration timeout; private final ClassLoader classLoader; @@ -67,7 +67,7 @@ public EmbeddedJobClient( final JobID jobId, final DispatcherGateway dispatcherGateway, final ScheduledExecutor retryExecutor, - final Time rpcTimeout, + final Duration rpcTimeout, final ClassLoader classLoader) { this.jobId = checkNotNull(jobId); this.dispatcherGateway = checkNotNull(dispatcherGateway); @@ -136,7 +136,7 @@ public CompletableFuture> getAccumulators() { public CompletableFuture getJobExecutionResult() { checkNotNull(classLoader); - final Time retryPeriod = Time.milliseconds(100L); + final Duration retryPeriod = Duration.ofMillis(100L); return JobStatusPollingUtils.getJobResult( dispatcherGateway, jobId, retryExecutor, timeout, retryPeriod) .thenApply( diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java index 79168bee80e81b..b61cb18080c003 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/JobStatusPollingUtils.java @@ -21,11 +21,11 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.concurrent.ScheduledExecutor; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -50,14 +50,14 @@ static CompletableFuture getJobResult( final DispatcherGateway dispatcherGateway, final JobID jobId, final ScheduledExecutor scheduledExecutor, - final Time rpcTimeout, - final Time retryPeriod) { + final Duration rpcTimeout, + final Duration retryPeriod) { return pollJobResultAsync( () -> dispatcherGateway.requestJobStatus(jobId, rpcTimeout), () -> dispatcherGateway.requestJobResult(jobId, rpcTimeout), scheduledExecutor, - retryPeriod.toMilliseconds()); + retryPeriod.toMillis()); } @VisibleForTesting diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java index 408bc69ad31ffd..596c703cc1ff7e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.deployment.executors.PipelineExecutorUtils; @@ -43,6 +42,7 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -136,8 +136,7 @@ private CompletableFuture submitAndGetJobClientFuture( final Configuration configuration, final ClassLoader userCodeClassloader) throws MalformedURLException { - final Time timeout = - Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); + final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader); @@ -191,7 +190,7 @@ private static CompletableFuture submitJob( final Configuration configuration, final DispatcherGateway dispatcherGateway, final JobGraph jobGraph, - final Time rpcTimeout) { + final Duration rpcTimeout) { checkNotNull(jobGraph); LOG.info("Submitting Job with JobId={}.", jobGraph.getJobID()); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java index 790fffb0a1fdc2..15c11cae27068c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.client.cli.ClientOptions; import org.apache.flink.client.deployment.application.EmbeddedJobClient; import org.apache.flink.configuration.Configuration; @@ -29,6 +28,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.util.concurrent.ScheduledExecutor; +import java.time.Duration; import java.util.Collection; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -81,9 +81,7 @@ public PipelineExecutor getExecutor(final Configuration configuration) { dispatcherGateway, configuration, (jobId, userCodeClassloader) -> { - final Time timeout = - Time.milliseconds( - configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); + final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT); return new EmbeddedJobClient( jobId, dispatcherGateway, retryExecutor, timeout, userCodeClassloader); }); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 74dcceee40ddc7..36aa9050453d8e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.program.ClusterClient; @@ -300,7 +299,7 @@ public void close() { TimeUnit.MILLISECONDS, retryExecutorService); - this.restClient.shutdown(Time.seconds(5)); + this.restClient.shutdown(Duration.ofSeconds(5)); ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService); try { diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java index c3f8a4e7388654..4d83093a8200c1 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.connector.base.sink; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -29,6 +28,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; + import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration tests of a baseline generic sink that implements the AsyncSinkBase. */ @@ -66,7 +67,7 @@ public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() { @Test public void testThatNoIssuesOccurWhenCheckpointingIsEnabled() throws Exception { env.enableCheckpointing(20); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(200))); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(200))); env.fromSequence(1, 10_000).map(Object::toString).sinkTo(new ArrayListAsyncSink()); env.execute("Integration Test: AsyncSinkBaseITCase"); } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java index 3a5226e62142d0..e16ebb1fe78d75 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.configuration.Configuration; @@ -34,6 +33,8 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.StreamSource; +import java.time.Duration; + /** Tests the functionality of the {@link FileSink} in BATCH mode. */ class BatchExecutionFileSinkITCase extends FileSinkITBase { @@ -49,7 +50,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) { env.configure(config, getClass().getClassLoader()); if (triggerFailover) { - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100))); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100))); } else { env.setRestartStrategy(RestartStrategies.noRestart()); } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java index c20c70491a7ba9..1ea0a7b592217a 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.execution.CheckpointingMode; @@ -39,6 +38,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -82,7 +82,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) { env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE); if (triggerFailover) { - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100))); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100))); } else { env.setRestartStrategy(RestartStrategies.noRestart()); } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java index 7e1aacfee040c2..703831fe878a7f 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java @@ -236,11 +236,7 @@ void testTableRollingOnProcessingTime(@TempDir java.nio.file.Path tmpDir) throws Path path = new Path(outDir.toURI()); FileSystemTableSink.TableRollingPolicy tableRollingPolicy = - new FileSystemTableSink.TableRollingPolicy( - false, - Long.MAX_VALUE, - Duration.ofMillis(20).toMillis(), - Duration.ofMillis(10).toMillis()); + new FileSystemTableSink.TableRollingPolicy(false, Long.MAX_VALUE, 20L, 10L); TestRecoverableWriter recoverableWriter = getRecoverableWriter(path); FileWriterBucket bucket = diff --git a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java index db048af9eedbe0..dc627a0cd55743 100644 --- a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java +++ b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.wikiedits; -import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.testutils.junit.RetryOnFailure; @@ -31,6 +30,7 @@ import java.net.InetSocketAddress; import java.net.Socket; +import java.time.Duration; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -53,7 +53,7 @@ class WikipediaEditsSourceTest { @RetryOnFailure(times = 1) void testWikipediaEditsSource() throws Exception { if (canConnect(1, TimeUnit.SECONDS)) { - final Time testTimeout = Time.seconds(60); + final Duration testTimeout = Duration.ofSeconds(60); final WikipediaEditsSource wikipediaEditsSource = new WikipediaEditsSource(); ExecutorService executorService = null; @@ -116,8 +116,8 @@ void testWikipediaEditsSource() throws Exception { } } - private long deadlineNanos(Time testTimeout) { - return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMilliseconds()); + private long deadlineNanos(Duration testTimeout) { + return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMillis()); } private static class CollectingSourceContext implements SourceFunction.SourceContext { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java index b099d944892164..1dd56fdb55aed9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -19,7 +19,6 @@ package org.apache.flink.api.common.restartstrategy; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType; @@ -28,7 +27,6 @@ import java.time.Duration; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; /** * This class defines methods to generate RestartStrategyConfigurations. These configurations are @@ -81,22 +79,7 @@ public static RestartStrategyConfiguration fallBackRestart() { */ public static RestartStrategyConfiguration fixedDelayRestart( int restartAttempts, long delayBetweenAttempts) { - return fixedDelayRestart( - restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS)); - } - - /** - * Generates a FixedDelayRestartStrategyConfiguration. - * - * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy - * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy - * @return FixedDelayRestartStrategy - * @deprecated Use {@link #fixedDelayRestart(int, Duration)} - */ - @Deprecated - public static RestartStrategyConfiguration fixedDelayRestart( - int restartAttempts, Time delayInterval) { - return fixedDelayRestart(restartAttempts, Time.toDuration(delayInterval)); + return fixedDelayRestart(restartAttempts, Duration.ofMillis(delayBetweenAttempts)); } /** @@ -111,22 +94,6 @@ public static RestartStrategyConfiguration fixedDelayRestart( return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval); } - /** - * Generates a FailureRateRestartStrategyConfiguration. - * - * @param failureRate Maximum number of restarts in given interval {@code failureInterval} - * before failing a job - * @param failureInterval Time interval for failures - * @param delayInterval Delay in-between restart attempts - * @deprecated Use {@link #failureRateRestart(int, Duration, Duration)} - */ - @Deprecated - public static FailureRateRestartStrategyConfiguration failureRateRestart( - int failureRate, Time failureInterval, Time delayInterval) { - return failureRateRestart( - failureRate, Time.toDuration(failureInterval), Time.toDuration(delayInterval)); - } - /** * Generates a FailureRateRestartStrategyConfiguration. * @@ -141,32 +108,6 @@ public static FailureRateRestartStrategyConfiguration failureRateRestart( failureRate, failureInterval, delayInterval); } - /** - * Generates a ExponentialDelayRestartStrategyConfiguration. - * - * @param initialBackoff Starting duration between restarts - * @param maxBackoff The highest possible duration between restarts - * @param backoffMultiplier Delay multiplier how many times is the delay longer than before - * @param resetBackoffThreshold How long the job must run smoothly to reset the time interval - * @param jitterFactor How much the delay may differ (in percentage) - * @deprecated Use {@link #exponentialDelayRestart(Duration, Duration, double, Duration, - * double)} - */ - @Deprecated - public static ExponentialDelayRestartStrategyConfiguration exponentialDelayRestart( - Time initialBackoff, - Time maxBackoff, - double backoffMultiplier, - Time resetBackoffThreshold, - double jitterFactor) { - return exponentialDelayRestart( - Time.toDuration(initialBackoff), - Time.toDuration(maxBackoff), - backoffMultiplier, - Time.toDuration(resetBackoffThreshold), - jitterFactor); - } - /** * Generates a ExponentialDelayRestartStrategyConfiguration. * @@ -249,12 +190,6 @@ public int getRestartAttempts() { return restartAttempts; } - /** @deprecated Use {@link #getDurationBetweenAttempts()} */ - @Deprecated - public Time getDelayBetweenAttemptsInterval() { - return Time.fromDuration(getDurationBetweenAttempts()); - } - public Duration getDurationBetweenAttempts() { return delayBetweenAttemptsInterval; } @@ -303,26 +238,6 @@ public static final class ExponentialDelayRestartStrategyConfiguration private final Duration resetBackoffThreshold; private final double jitterFactor; - /** - * @deprecated Use {@link - * ExponentialDelayRestartStrategyConfiguration#ExponentialDelayRestartStrategyConfiguration(Duration, - * Duration, double, Duration, double)} - */ - @Deprecated - public ExponentialDelayRestartStrategyConfiguration( - Time initialBackoff, - Time maxBackoff, - double backoffMultiplier, - Time resetBackoffThreshold, - double jitterFactor) { - this( - Time.toDuration(initialBackoff), - Time.toDuration(maxBackoff), - backoffMultiplier, - Time.toDuration(resetBackoffThreshold), - jitterFactor); - } - public ExponentialDelayRestartStrategyConfiguration( Duration initialBackoff, Duration maxBackoff, @@ -336,22 +251,10 @@ public ExponentialDelayRestartStrategyConfiguration( this.jitterFactor = jitterFactor; } - /** @deprecated Use {@link #getInitialBackoffDuration()} */ - @Deprecated - public Time getInitialBackoff() { - return Time.fromDuration(getInitialBackoffDuration()); - } - public Duration getInitialBackoffDuration() { return initialBackoff; } - /** @deprecated Use {@link #getMaxBackoffDuration()} */ - @Deprecated - public Time getMaxBackoff() { - return Time.fromDuration(maxBackoff); - } - public Duration getMaxBackoffDuration() { return maxBackoff; } @@ -360,12 +263,6 @@ public double getBackoffMultiplier() { return backoffMultiplier; } - /** @deprecated Use {@link #getResetBackoffDurationThreshold()} */ - @Deprecated - public Time getResetBackoffThreshold() { - return Time.fromDuration(resetBackoffThreshold); - } - public Duration getResetBackoffDurationThreshold() { return resetBackoffThreshold; } @@ -424,18 +321,6 @@ public static final class FailureRateRestartStrategyConfiguration private final Duration failureInterval; private final Duration delayBetweenAttemptsInterval; - /** - * @deprecated Use {@link #FailureRateRestartStrategyConfiguration(int, Duration, Duration)} - */ - @Deprecated - public FailureRateRestartStrategyConfiguration( - int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) { - this( - maxFailureRate, - Time.toDuration(failureInterval), - Time.toDuration(delayBetweenAttemptsInterval)); - } - public FailureRateRestartStrategyConfiguration( int maxFailureRate, Duration failureInterval, @@ -449,22 +334,10 @@ public int getMaxFailureRate() { return maxFailureRate; } - /** @deprecated Use {@link #getFailureIntervalDuration()} */ - @Deprecated - public Time getFailureInterval() { - return Time.fromDuration(getFailureIntervalDuration()); - } - public Duration getFailureIntervalDuration() { return failureInterval; } - /** @deprecated Use {@link #getDurationBetweenAttempts()} */ - @Deprecated - public Time getDelayBetweenAttemptsInterval() { - return Time.fromDuration(getDurationBetweenAttempts()); - } - public Duration getDurationBetweenAttempts() { return delayBetweenAttemptsInterval; } @@ -594,10 +467,7 @@ private static RestartStrategyConfiguration parseConfiguration( Duration failureRateDelay = configuration.get( RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY); - return failureRateRestart( - maxFailures, - Time.milliseconds(failureRateInterval.toMillis()), - Time.milliseconds(failureRateDelay.toMillis())); + return failureRateRestart(maxFailures, failureRateInterval, failureRateDelay); default: throw new IllegalArgumentException( "Unknown restart strategy " + restartstrategyKind + "."); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java index c7009bf7c5ede7..0ba3ea00e1feb4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java @@ -19,7 +19,6 @@ package org.apache.flink.api.common.state; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.time.Time; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnegative; @@ -122,13 +121,6 @@ public StateVisibility getStateVisibility() { return stateVisibility; } - /** @deprecated Use {@link #getTimeToLive()} */ - @Deprecated - @Nonnull - public Time getTtl() { - return Time.fromDuration(getTimeToLive()); - } - public Duration getTimeToLive() { return ttl; } @@ -161,13 +153,6 @@ public String toString() { + '}'; } - /** @deprecated Use {@link #newBuilder(Duration)} */ - @Deprecated - @Nonnull - public static Builder newBuilder(@Nonnull Time ttl) { - return new Builder(ttl); - } - public static Builder newBuilder(Duration ttl) { return new Builder(ttl); } @@ -184,12 +169,6 @@ public static class Builder { private final EnumMap strategies = new EnumMap<>(CleanupStrategies.Strategies.class); - /** @deprecated Use {@link #newBuilder(Duration)} */ - @Deprecated - public Builder(@Nonnull Time ttl) { - this(Time.toDuration(ttl)); - } - private Builder(Duration ttl) { this.ttl = ttl; } @@ -361,18 +340,6 @@ public Builder disableCleanupInBackground() { return this; } - /** - * Sets the ttl time. - * - * @param ttl The ttl time. - * @deprecated Use {@link #setTimeToLive(Duration)} - */ - @Deprecated - @Nonnull - public Builder setTtl(@Nonnull Time ttl) { - return setTimeToLive(Time.toDuration(ttl)); - } - public Builder setTimeToLive(Duration ttl) { this.ttl = Preconditions.checkNotNull(ttl); return this; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java index 526e52b42ed420..0fdbef00471f98 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java @@ -18,7 +18,7 @@ package org.apache.flink.api.common.time; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import javax.annotation.Nullable; @@ -32,10 +32,10 @@ /** * The definition of a time interval. Similar to a simpler version of {@link java.time.Duration}. * - * @deprecated Use {@link Duration} + * @deprecated Use {@link Duration}. Time needs to be kept until all external libraries remove this. */ @Deprecated -@PublicEvolving +@Internal public final class Time implements Serializable { private static final long serialVersionUID = -350254188460915999L; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index a8ecc139a7930c..338f9b660dc342 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -19,7 +19,6 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.time.Time; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TimeUtils; @@ -52,13 +51,12 @@ public class ConfigurationUtils { * @return extracted {@link MetricOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code * Optional.empty()} if {@link MetricOptions#SYSTEM_RESOURCE_METRICS} are disabled. */ - public static Optional