Skip to content

Commit

Permalink
[FLINK-14068][core] Removes API that uses org.apache.flink.api.common…
Browse files Browse the repository at this point in the history
….time.Time
  • Loading branch information
XComp committed Sep 12, 2024
1 parent 8dc40ab commit 3be4cba
Show file tree
Hide file tree
Showing 390 changed files with 1,348 additions and 1,648 deletions.
16 changes: 8 additions & 8 deletions docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ env.execute()
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -318,7 +318,7 @@ stateDescriptor.enableTimeToLive(ttlConfig);
```scala
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
import java.time.Duration

val ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -337,7 +337,7 @@ from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.new_builder(Duration.ofSeconds(1)) \
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
.build()
Expand Down Expand Up @@ -420,7 +420,7 @@ from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.new_builder(Duration.ofSeconds(1)) \
.disable_cleanup_in_background() \
.build()
```
Expand All @@ -438,7 +438,7 @@ ttl_config = StateTtlConfig \
{{< tab "Java" >}}
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -449,7 +449,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
{{< tab "Scala" >}}
```scala
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
import java.time.Duration

val ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -463,7 +463,7 @@ from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.new_builder(Duration.ofSeconds(1)) \
.cleanup_full_snapshot() \
.build()
```
Expand Down Expand Up @@ -507,7 +507,7 @@ from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.new_builder(Duration.ofSeconds(1)) \
.cleanup_incrementally(10, True) \
.build()
```
Expand Down
14 changes: 7 additions & 7 deletions docs/content.zh/docs/ops/state/task_failure_recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
Duration.ofSeconds(10) // 延时
))
```
{{< /tab >}}
Expand Down Expand Up @@ -126,7 +126,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
Duration.ofSeconds(10) // 延时
))
```
{{< /tab >}}
Expand Down Expand Up @@ -184,10 +184,10 @@ env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
Duration.ofMillis(1), // initial delay between restarts
Duration.ofMillis(1000), // maximum delay between restarts
1.1, // exponential multiplier
Time.of(2, TimeUnit.SECONDS), // 重置延迟时间到初始值的阈值
Duration.ofSeconds(2), // 重置延迟时间到初始值的阈值
0.1 // jitter
))
```
Expand Down Expand Up @@ -279,8 +279,8 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(10, TimeUnit.SECONDS) // 延时
Duration.ofMinutes(5), // 测量故障率的时间间隔
Duration.ofSeconds(10) // 延时
))
```
{{< /tab >}}
Expand Down
18 changes: 9 additions & 9 deletions docs/content/docs/dev/datastream/fault-tolerance/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ functionality can then be enabled in any state descriptor by passing the configu
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -357,7 +357,7 @@ stateDescriptor.enableTimeToLive(ttlConfig);
```scala
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
import java.time.Duration

val ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -376,7 +376,7 @@ from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.new_builder(Duration.ofSeconds(1)) \
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
.build()
Expand Down Expand Up @@ -467,7 +467,7 @@ from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.new_builder(Duration.ofSeconds(1)) \
.disable_cleanup_in_background() \
.build()
```
Expand All @@ -488,7 +488,7 @@ It can be configured in `StateTtlConfig`:
{{< tab "Java" >}}
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -499,7 +499,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
{{< tab "Scala" >}}
```scala
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
import java.time.Duration

val ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -513,7 +513,7 @@ from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.new_builder(Duration.ofSeconds(1)) \
.cleanup_full_snapshot() \
.build()
```
Expand Down Expand Up @@ -563,7 +563,7 @@ from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.new_builder(Duration.ofSeconds(1)) \
.cleanup_incrementally(10, True) \
.build()
```
Expand Down Expand Up @@ -622,7 +622,7 @@ from pyflink.common.time import Time
from pyflink.datastream.state import StateTtlConfig

ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.new_builder(Duration.ofSeconds(1)) \
.cleanup_in_rocksdb_compact_filter(1000, Duration.of_hours(1)) \
.build()
```
Expand Down
14 changes: 7 additions & 7 deletions docs/content/docs/ops/state/task_failure_recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
Duration.ofSeconds(10) // delay
))
```
{{< /tab >}}
Expand Down Expand Up @@ -129,7 +129,7 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
Duration.ofSeconds(10) // delay
))
```
{{< /tab >}}
Expand Down Expand Up @@ -188,10 +188,10 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
Duration.ofMillis(1), // initial delay between restarts
Duration.ofMillis(1000), // maximum delay between restarts
1.1, // exponential multiplier
Time.of(2, TimeUnit.SECONDS), // threshold duration to reset delay to its initial value
Duration.ofSeconds(2), // threshold duration to reset delay to its initial value
0.1 // jitter
))
```
Expand Down Expand Up @@ -281,8 +281,8 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
Duration.ofMinutes(5), //time interval for measuring failure rate
Duration.ofSeconds(10) // delay
))
```
{{< /tab >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -364,10 +364,8 @@ private CompletableFuture<JobResult> getJobResult(
final JobID jobId,
final ScheduledExecutor scheduledExecutor,
final boolean tolerateMissingResult) {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Time retryPeriod =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
final Duration retryPeriod = configuration.get(ClientOptions.CLIENT_RETRY_PERIOD);
final CompletableFuture<JobResult> jobResultFuture =
JobStatusPollingUtils.getJobResult(
dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
Expand All @@ -40,6 +39,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -59,15 +59,15 @@ public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway

private final ScheduledExecutor retryExecutor;

private final Time timeout;
private final Duration timeout;

private final ClassLoader classLoader;

public EmbeddedJobClient(
final JobID jobId,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor retryExecutor,
final Time rpcTimeout,
final Duration rpcTimeout,
final ClassLoader classLoader) {
this.jobId = checkNotNull(jobId);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
Expand Down Expand Up @@ -136,7 +136,7 @@ public CompletableFuture<Map<String, Object>> getAccumulators() {
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
checkNotNull(classLoader);

final Time retryPeriod = Time.milliseconds(100L);
final Duration retryPeriod = Duration.ofMillis(100L);
return JobStatusPollingUtils.getJobResult(
dispatcherGateway, jobId, retryExecutor, timeout, retryPeriod)
.thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand All @@ -50,14 +50,14 @@ static CompletableFuture<JobResult> getJobResult(
final DispatcherGateway dispatcherGateway,
final JobID jobId,
final ScheduledExecutor scheduledExecutor,
final Time rpcTimeout,
final Time retryPeriod) {
final Duration rpcTimeout,
final Duration retryPeriod) {

return pollJobResultAsync(
() -> dispatcherGateway.requestJobStatus(jobId, rpcTimeout),
() -> dispatcherGateway.requestJobResult(jobId, rpcTimeout),
scheduledExecutor,
retryPeriod.toMilliseconds());
retryPeriod.toMillis());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
Expand All @@ -43,6 +42,7 @@

import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -136,8 +136,7 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture(
final Configuration configuration,
final ClassLoader userCodeClassloader)
throws MalformedURLException {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);

final JobGraph jobGraph =
PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
Expand Down Expand Up @@ -191,7 +190,7 @@ private static CompletableFuture<JobID> submitJob(
final Configuration configuration,
final DispatcherGateway dispatcherGateway,
final JobGraph jobGraph,
final Time rpcTimeout) {
final Duration rpcTimeout) {
checkNotNull(jobGraph);

LOG.info("Submitting Job with JobId={}.", jobGraph.getJobID());
Expand Down
Loading

0 comments on commit 3be4cba

Please sign in to comment.