Skip to content

Commit

Permalink
[FLINK-14068] Removes org.apache.flink.api.common.time.Time
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Aug 23, 2024
1 parent 4faf096 commit d646f5b
Show file tree
Hide file tree
Showing 61 changed files with 289 additions and 405 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -364,10 +364,10 @@ private CompletableFuture<JobResult> getJobResult(
final JobID jobId,
final ScheduledExecutor scheduledExecutor,
final boolean tolerateMissingResult) {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Time retryPeriod =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
final Duration timeout =
Duration.ofMillis(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Duration retryPeriod =
Duration.ofMillis(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
final CompletableFuture<JobResult> jobResultFuture =
JobStatusPollingUtils.getJobResult(
dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
Expand All @@ -40,6 +39,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -59,15 +59,15 @@ public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway

private final ScheduledExecutor retryExecutor;

private final Time timeout;
private final Duration timeout;

private final ClassLoader classLoader;

public EmbeddedJobClient(
final JobID jobId,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor retryExecutor,
final Time rpcTimeout,
final Duration rpcTimeout,
final ClassLoader classLoader) {
this.jobId = checkNotNull(jobId);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
Expand Down Expand Up @@ -136,7 +136,7 @@ public CompletableFuture<Map<String, Object>> getAccumulators() {
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
checkNotNull(classLoader);

final Time retryPeriod = Time.milliseconds(100L);
final Duration retryPeriod = Duration.ofMillis(100L);
return JobStatusPollingUtils.getJobResult(
dispatcherGateway, jobId, retryExecutor, timeout, retryPeriod)
.thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand All @@ -50,8 +50,8 @@ static CompletableFuture<JobResult> getJobResult(
final DispatcherGateway dispatcherGateway,
final JobID jobId,
final ScheduledExecutor scheduledExecutor,
final Time rpcTimeout,
final Time retryPeriod) {
final Duration rpcTimeout,
final Duration retryPeriod) {

return pollJobResultAsync(
() -> dispatcherGateway.requestJobStatus(jobId, rpcTimeout),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
Expand All @@ -43,6 +42,7 @@

import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -136,8 +136,8 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture(
final Configuration configuration,
final ClassLoader userCodeClassloader)
throws MalformedURLException {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Duration timeout =
Duration.ofMillis(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());

final JobGraph jobGraph =
PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
Expand Down Expand Up @@ -191,7 +191,7 @@ private static CompletableFuture<JobID> submitJob(
final Configuration configuration,
final DispatcherGateway dispatcherGateway,
final JobGraph jobGraph,
final Time rpcTimeout) {
final Duration rpcTimeout) {
checkNotNull(jobGraph);

LOG.info("Submitting Job with JobId={}.", jobGraph.getJobID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.EmbeddedJobClient;
import org.apache.flink.configuration.Configuration;
Expand All @@ -29,6 +28,7 @@
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import java.time.Duration;
import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -81,8 +81,8 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
dispatcherGateway,
configuration,
(jobId, userCodeClassloader) -> {
final Time timeout =
Time.milliseconds(
final Duration timeout =
Duration.ofMillis(
configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
return new EmbeddedJobClient(
jobId, dispatcherGateway, retryExecutor, timeout, userCodeClassloader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
Expand Down Expand Up @@ -300,7 +299,7 @@ public void close() {
TimeUnit.MILLISECONDS,
retryExecutorService);

this.restClient.shutdown(Time.seconds(5));
this.restClient.shutdown(Duration.ofSeconds(5));
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.connector.base.sink;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -29,6 +28,8 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Integration tests of a baseline generic sink that implements the AsyncSinkBase. */
Expand Down Expand Up @@ -66,7 +67,7 @@ public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
@Test
public void testThatNoIssuesOccurWhenCheckpointingIsEnabled() throws Exception {
env.enableCheckpointing(20);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(200)));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(200)));
env.fromSequence(1, 10_000).map(Object::toString).sinkTo(new ArrayListAsyncSink());
env.execute("Integration Test: AsyncSinkBaseITCase");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
Expand All @@ -34,6 +33,8 @@
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamSource;

import java.time.Duration;

/** Tests the functionality of the {@link FileSink} in BATCH mode. */
class BatchExecutionFileSinkITCase extends FileSinkITBase {

Expand All @@ -49,7 +50,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) {
env.configure(config, getClass().getClassLoader());

if (triggerFailover) {
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100)));
} else {
env.setRestartStrategy(RestartStrategies.noRestart());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.CheckpointingMode;
Expand All @@ -39,6 +38,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -82,7 +82,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) {
env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE);

if (triggerFailover) {
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100)));
} else {
env.setRestartStrategy(RestartStrategies.noRestart());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.streaming.connectors.wikiedits;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.testutils.junit.RetryOnFailure;
Expand All @@ -31,6 +30,7 @@

import java.net.InetSocketAddress;
import java.net.Socket;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand All @@ -53,7 +53,7 @@ class WikipediaEditsSourceTest {
@RetryOnFailure(times = 1)
void testWikipediaEditsSource() throws Exception {
if (canConnect(1, TimeUnit.SECONDS)) {
final Time testTimeout = Time.seconds(60);
final Duration testTimeout = Duration.ofSeconds(60);
final WikipediaEditsSource wikipediaEditsSource = new WikipediaEditsSource();

ExecutorService executorService = null;
Expand Down Expand Up @@ -116,8 +116,8 @@ void testWikipediaEditsSource() throws Exception {
}
}

private long deadlineNanos(Time testTimeout) {
return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMilliseconds());
private long deadlineNanos(Duration testTimeout) {
return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMillis());
}

private static class CollectingSourceContext<T> implements SourceFunction.SourceContext<T> {
Expand Down
Loading

0 comments on commit d646f5b

Please sign in to comment.