From 7edfd9c315405c7703509fb9b7bc4f0d2686d870 Mon Sep 17 00:00:00 2001 From: Diego Marquez Date: Thu, 21 Nov 2024 13:19:53 -0500 Subject: [PATCH] feat: introduce `java.time` variables and methods (#2271) * feat: introduce `java.time` variables and methods * ignore package private public interface changes --- .../clirr-ignored-differences.xml | 14 +++ .../cloud/pubsub/v1/MessageDispatcher.java | 6 +- .../com/google/cloud/pubsub/v1/Publisher.java | 20 ++--- .../v1/StreamingSubscriberConnection.java | 2 +- .../google/cloud/pubsub/v1/Subscriber.java | 88 ++++++++++++++----- .../pubsub/v1/FakePublisherServiceImpl.java | 2 +- .../v1/FakeScheduledExecutorService.java | 4 +- .../pubsub/v1/MessageDispatcherTest.java | 2 +- .../cloud/pubsub/v1/PublisherImplTest.java | 74 ++++++++-------- .../v1/StreamingSubscriberConnectionTest.java | 2 +- .../cloud/pubsub/v1/SubscriberTest.java | 11 ++- 11 files changed, 145 insertions(+), 80 deletions(-) diff --git a/google-cloud-pubsub/clirr-ignored-differences.xml b/google-cloud-pubsub/clirr-ignored-differences.xml index fc73daacd..866962660 100644 --- a/google-cloud-pubsub/clirr-ignored-differences.xml +++ b/google-cloud-pubsub/clirr-ignored-differences.xml @@ -1,4 +1,18 @@ + + 7005 + + com/google/cloud/pubsub/v1/MessageDispatcher$Builder + *(org.threeten.bp.Duration) + *(java.time.Duration) + + + 7005 + + com/google/cloud/pubsub/v1/StreamingSubscriberConnection$Builder + *(org.threeten.bp.Duration) + *(java.time.Duration) + diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 860fcbcf9..7112d4c02 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -28,6 +28,9 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -48,9 +51,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; -import org.threeten.bp.Duration; -import org.threeten.bp.Instant; -import org.threeten.bp.temporal.ChronoUnit; /** * Dispatches messages to a message receiver while handling the messages acking and lease diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 3713cf69b..af7a57471 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -57,6 +57,7 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -73,7 +74,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; -import org.threeten.bp.Duration; /** * A Cloud Pub/Sub publisher, that is @@ -198,7 +198,7 @@ private Publisher(Builder builder) throws IOException { // key? retrySettingsBuilder .setMaxAttempts(Integer.MAX_VALUE) - .setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE)); + .setTotalTimeoutDuration(Duration.ofNanos(Long.MAX_VALUE)); } PublisherStubSettings.Builder stubSettings = @@ -740,7 +740,7 @@ public static final class Builder { private static final double DEFAULT_MULTIPLIER = 4; static final BatchingSettings DEFAULT_BATCHING_SETTINGS = BatchingSettings.newBuilder() - .setDelayThreshold(DEFAULT_DELAY_THRESHOLD) + .setDelayThresholdDuration(DEFAULT_DELAY_THRESHOLD) .setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD) .setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD) .setFlowControlSettings( @@ -750,13 +750,13 @@ public static final class Builder { .build(); static final RetrySettings DEFAULT_RETRY_SETTINGS = RetrySettings.newBuilder() - .setTotalTimeout(DEFAULT_TOTAL_TIMEOUT) - .setInitialRetryDelay(DEFAULT_INITIAL_RETRY_DELAY) + .setTotalTimeoutDuration(DEFAULT_TOTAL_TIMEOUT) + .setInitialRetryDelayDuration(DEFAULT_INITIAL_RETRY_DELAY) .setRetryDelayMultiplier(DEFAULT_MULTIPLIER) - .setMaxRetryDelay(DEFAULT_MAX_RETRY_DELAY) - .setInitialRpcTimeout(DEFAULT_INITIAL_RPC_TIMEOUT) + .setMaxRetryDelayDuration(DEFAULT_MAX_RETRY_DELAY) + .setInitialRpcTimeoutDuration(DEFAULT_INITIAL_RPC_TIMEOUT) .setRpcTimeoutMultiplier(DEFAULT_MULTIPLIER) - .setMaxRpcTimeout(DEFAULT_MAX_RPC_TIMEOUT) + .setMaxRpcTimeoutDuration(DEFAULT_MAX_RPC_TIMEOUT) .build(); static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false; private static final int THREADS_PER_CPU = 5; @@ -876,9 +876,9 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) { /** Configures the Publisher's retry parameters. */ public Builder setRetrySettings(RetrySettings retrySettings) { Preconditions.checkArgument( - retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0); + retrySettings.getTotalTimeoutDuration().compareTo(MIN_TOTAL_TIMEOUT) >= 0); Preconditions.checkArgument( - retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0); + retrySettings.getInitialRpcTimeoutDuration().compareTo(MIN_RPC_TIMEOUT) >= 0); this.retrySettings = retrySettings; return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 60da55cee..3ad124f80 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -51,6 +51,7 @@ import io.grpc.Status; import io.grpc.protobuf.StatusProto; import io.opentelemetry.api.trace.Span; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -68,7 +69,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; -import org.threeten.bp.Duration; /** Implementation of {@link AckProcessor} based on Cloud Pub/Sub streaming pull. */ final class StreamingSubscriberConnection extends AbstractApiService implements AckProcessor { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index e9926fa58..4b9ea3825 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -16,12 +16,15 @@ package com.google.cloud.pubsub.v1; +import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeDuration; + import com.google.api.core.AbstractApiService; import com.google.api.core.ApiClock; import com.google.api.core.ApiService; import com.google.api.core.BetaApi; import com.google.api.core.CurrentMillisClock; import com.google.api.core.InternalApi; +import com.google.api.core.ObsoleteApi; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; @@ -55,7 +58,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; -import org.threeten.bp.Duration; /** * A Cloud Pub/Sub subscriber that is @@ -98,24 +100,37 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private static final int MAX_INBOUND_METADATA_SIZE = 4 * 1024 * 1024; // 4MB API maximum metadata size - @InternalApi static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60); + @InternalApi + static final java.time.Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = + java.time.Duration.ofMinutes(60); @InternalApi - static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY = - Duration.ofMinutes(1); + static final java.time.Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION_EXACTLY_ONCE_DELIVERY = + java.time.Duration.ofMinutes(1); - @InternalApi static final Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION = Duration.ofMinutes(0); - @InternalApi static final Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION = Duration.ofSeconds(0); + @InternalApi + static final java.time.Duration DEFAULT_MIN_ACK_DEADLINE_EXTENSION = + java.time.Duration.ofMinutes(0); - @InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10); - @InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600); + @InternalApi + static final java.time.Duration DEFAULT_MAX_ACK_DEADLINE_EXTENSION = + java.time.Duration.ofSeconds(0); - @InternalApi static final Duration STREAM_ACK_DEADLINE_DEFAULT = Duration.ofSeconds(60); + @InternalApi + static final java.time.Duration MIN_STREAM_ACK_DEADLINE = java.time.Duration.ofSeconds(10); @InternalApi - static final Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT = Duration.ofSeconds(60); + static final java.time.Duration MAX_STREAM_ACK_DEADLINE = java.time.Duration.ofSeconds(600); - @InternalApi static final Duration ACK_EXPIRATION_PADDING_DEFAULT = Duration.ofSeconds(5); + @InternalApi + static final java.time.Duration STREAM_ACK_DEADLINE_DEFAULT = java.time.Duration.ofSeconds(60); + + @InternalApi + static final java.time.Duration STREAM_ACK_DEADLINE_EXACTLY_ONCE_DELIVERY_DEFAULT = + java.time.Duration.ofSeconds(60); + + @InternalApi + static final java.time.Duration ACK_EXPIRATION_PADDING_DEFAULT = java.time.Duration.ofSeconds(5); private static final Logger logger = Logger.getLogger(Subscriber.class.getName()); @@ -124,10 +139,10 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final boolean useLegacyFlowControl; - private final Duration maxAckExtensionPeriod; - private final Duration maxDurationPerAckExtension; + private final java.time.Duration maxAckExtensionPeriod; + private final java.time.Duration maxDurationPerAckExtension; private final boolean maxDurationPerAckExtensionDefaultUsed; - private final Duration minDurationPerAckExtension; + private final java.time.Duration minDurationPerAckExtension; private final boolean minDurationPerAckExtensionDefaultUsed; // The ExecutorProvider used to generate executors for processing messages. @@ -490,10 +505,10 @@ public static final class Builder { private MessageReceiver receiver; private MessageReceiverWithAckResponse receiverWithAckResponse; - private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; - private Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION; + private java.time.Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; + private java.time.Duration minDurationPerAckExtension = DEFAULT_MIN_ACK_DEADLINE_EXTENSION; private boolean minDurationPerAckExtensionDefaultUsed = true; - private Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION; + private java.time.Duration maxDurationPerAckExtension = DEFAULT_MAX_ACK_DEADLINE_EXTENSION; private boolean maxDurationPerAckExtensionDefaultUsed = true; private boolean useLegacyFlowControl = false; @@ -505,7 +520,7 @@ public static final class Builder { SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder() .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) .setMaxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE) - .setKeepAliveTime(Duration.ofMinutes(5)) + .setKeepAliveTimeDuration(java.time.Duration.ofMinutes(5)) .build(); private HeaderProvider headerProvider = new NoHeaderProvider(); private CredentialsProvider credentialsProvider = @@ -596,6 +611,15 @@ public Builder setUseLegacyFlowControl(boolean value) { return this; } + /** + * This method is obsolete. Use {@link #setMaxAckExtensionPeriodDuration(java.time.Duration)} + * instead. + */ + @ObsoleteApi("Use setMaxAckExtensionPeriodDuration(java.time.Duration) instead") + public Builder setMaxAckExtensionPeriod(org.threeten.bp.Duration maxAckExtensionPeriod) { + return setMaxAckExtensionPeriodDuration(toJavaTimeDuration(maxAckExtensionPeriod)); + } + /** * Set the maximum period a message ack deadline will be extended. Defaults to one hour. * @@ -605,12 +629,22 @@ public Builder setUseLegacyFlowControl(boolean value) { * *

A zero duration effectively disables auto deadline extensions. */ - public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { + public Builder setMaxAckExtensionPeriodDuration(java.time.Duration maxAckExtensionPeriod) { Preconditions.checkArgument(maxAckExtensionPeriod.toMillis() >= 0); this.maxAckExtensionPeriod = maxAckExtensionPeriod; return this; } + /** + * This method is obsolete. Use {@link + * #setMaxDurationPerAckExtensionDuration(java.time.Duration)} instead. + */ + @ObsoleteApi("Use setMaxDurationPerAckExtensionDuration(java.time.Duration) instead") + public Builder setMaxDurationPerAckExtension( + org.threeten.bp.Duration maxDurationPerAckExtension) { + return setMaxDurationPerAckExtensionDuration(toJavaTimeDuration(maxDurationPerAckExtension)); + } + /** * Set the upper bound for a single mod ack extention period. * @@ -621,7 +655,8 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { * *

MaxDurationPerAckExtension configuration can be disabled by specifying a zero duration. */ - public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) { + public Builder setMaxDurationPerAckExtensionDuration( + java.time.Duration maxDurationPerAckExtension) { // If a non-default min is set, make sure min is less than max Preconditions.checkArgument( maxDurationPerAckExtension.toMillis() >= 0 @@ -633,6 +668,16 @@ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension return this; } + /** + * This method is obsolete. Use {@link + * #setMinDurationPerAckExtensionDuration(java.time.Duration)} instead. + */ + @ObsoleteApi("Use setMinDurationPerAckExtensionDuration(java.time.Duration) instead") + public Builder setMinDurationPerAckExtension( + org.threeten.bp.Duration minDurationPerAckExtension) { + return setMinDurationPerAckExtensionDuration(toJavaTimeDuration(minDurationPerAckExtension)); + } + /** * Set the lower bound for a single mod ack extention period. * @@ -643,7 +688,8 @@ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension * *

MinDurationPerAckExtension configuration can be disabled by specifying a zero duration. */ - public Builder setMinDurationPerAckExtension(Duration minDurationPerAckExtension) { + public Builder setMinDurationPerAckExtensionDuration( + java.time.Duration minDurationPerAckExtension) { // If a non-default max is set, make sure min is less than max Preconditions.checkArgument( minDurationPerAckExtension.toMillis() >= 0 diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java index 23817f558..9ab1dec73 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java @@ -21,13 +21,13 @@ import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase; import io.grpc.stub.StreamObserver; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.threeten.bp.Duration; /** * A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java index cf067e2da..65e199e92 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java @@ -18,6 +18,8 @@ import com.google.common.primitives.Ints; import com.google.common.util.concurrent.SettableFuture; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Deque; import java.util.LinkedList; @@ -32,8 +34,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import org.threeten.bp.Duration; -import org.threeten.bp.Instant; /** * Fake implementation of {@link ScheduledExecutorService} that allows tests control the reference diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index c608ee8d5..bd3dccccf 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -26,13 +26,13 @@ import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.threeten.bp.Duration; public class MessageDispatcherTest { private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data"); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 411b61d15..cdefa84f7 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -54,6 +54,7 @@ import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.trace.data.SpanData; +import java.time.Duration; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -66,7 +67,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.threeten.bp.Duration; @RunWith(JUnit4.class) public class PublisherImplTest { @@ -120,7 +120,7 @@ public void testPublishByDuration() throws Exception { .setBatchingSettings( Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .setElementCountThreshold(10L) .build()) .build(); @@ -151,7 +151,7 @@ public void testPublishByNumBatchedMessages() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(100)) + .setDelayThresholdDuration(Duration.ofSeconds(100)) .build()) .build(); @@ -190,7 +190,7 @@ public void testSinglePublishByNumBytes() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(100)) + .setDelayThresholdDuration(Duration.ofSeconds(100)) .build()) .build(); @@ -225,7 +225,7 @@ public void testPublishByShutdown() throws Exception { .setBatchingSettings( Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() - .setDelayThreshold(Duration.ofSeconds(100)) + .setDelayThresholdDuration(Duration.ofSeconds(100)) .setElementCountThreshold(10L) .build()) .build(); @@ -259,7 +259,7 @@ public void testPublishMixedSizeAndDuration() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .build()) .build(); @@ -300,7 +300,7 @@ public void testPublishWithCompression() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(100)) + .setDelayThresholdDuration(Duration.ofSeconds(100)) .build()) .setEnableCompression(true) .setCompressionBytesThreshold(100) @@ -331,7 +331,7 @@ public void testBatchedMessagesWithOrderingKeyByNum() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(3L) - .setDelayThreshold(Duration.ofSeconds(100)) + .setDelayThresholdDuration(Duration.ofSeconds(100)) .build()) .setEnableMessageOrdering(true) .build(); @@ -384,7 +384,7 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(10L) - .setDelayThreshold(Duration.ofSeconds(100)) + .setDelayThresholdDuration(Duration.ofSeconds(100)) .build()) .setEnableMessageOrdering(true) .build(); @@ -448,7 +448,7 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception { .toBuilder() .setElementCountThreshold(10L) .setRequestByteThreshold(64L) - .setDelayThreshold(Duration.ofSeconds(100)) + .setDelayThresholdDuration(Duration.ofSeconds(100)) .build()) .setEnableMessageOrdering(true) .build(); @@ -490,7 +490,7 @@ public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { .setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setTotalTimeout(Duration.ofSeconds(10)) + .setTotalTimeoutDuration(Duration.ofSeconds(10)) .setMaxAttempts(1) .build()) .setEnableMessageOrdering(true) @@ -613,7 +613,7 @@ public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws E Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(500)) + .setDelayThresholdDuration(Duration.ofSeconds(500)) .build()) .setEnableMessageOrdering(true) .build(); @@ -674,7 +674,7 @@ public void testErrorPropagation() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .build()) .build(); testPublisherServiceImpl.addPublishError(Status.DATA_LOSS.asException()); @@ -695,7 +695,7 @@ public void testPublishFailureRetries() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .build()) .build(); // To demonstrate that reaching duration will trigger publish @@ -718,7 +718,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception { .setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setTotalTimeout(Duration.ofSeconds(10)) + .setTotalTimeoutDuration(Duration.ofSeconds(10)) .setMaxAttempts(1) .build()) .build(); @@ -743,7 +743,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception { .setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setTotalTimeout(Duration.ofSeconds(10)) + .setTotalTimeoutDuration(Duration.ofSeconds(10)) .setMaxAttempts(3) .build()) .build(); @@ -768,7 +768,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception .setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setTotalTimeout(Duration.ofSeconds(10)) + .setTotalTimeoutDuration(Duration.ofSeconds(10)) .setMaxAttempts(0) .build()) .build(); @@ -794,13 +794,13 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce .setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setTotalTimeout(Duration.ofSeconds(10)) + .setTotalTimeoutDuration(Duration.ofSeconds(10)) .build()) .setBatchingSettings( Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .build()) .build(); // To demonstrate that reaching duration will trigger publish @@ -825,7 +825,7 @@ public void testPublisherGetters() throws Exception { builder.setBatchingSettings( BatchingSettings.newBuilder() .setRequestByteThreshold(10L) - .setDelayThreshold(Duration.ofMillis(11)) + .setDelayThresholdDuration(Duration.ofMillis(11)) .setElementCountThreshold(12L) .build()); builder.setCredentialsProvider(NoCredentialsProvider.create()); @@ -833,7 +833,8 @@ public void testPublisherGetters() throws Exception { assertEquals(TEST_TOPIC, publisher.getTopicName()); assertEquals(10, (long) publisher.getBatchingSettings().getRequestByteThreshold()); - assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold()); + assertEquals( + Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThresholdDuration()); assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold()); publisher.shutdown(); assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); @@ -848,7 +849,8 @@ public void testBuilderParametersAndDefaults() { Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD, builder.batchingSettings.getRequestByteThreshold().longValue()); assertEquals( - Publisher.Builder.DEFAULT_DELAY_THRESHOLD, builder.batchingSettings.getDelayThreshold()); + Publisher.Builder.DEFAULT_DELAY_THRESHOLD, + builder.batchingSettings.getDelayThresholdDuration()); assertEquals( Publisher.Builder.DEFAULT_ELEMENT_COUNT_THRESHOLD, builder.batchingSettings.getElementCountThreshold().longValue()); @@ -906,7 +908,7 @@ public void testBuilderInvalidArguments() { builder.setBatchingSettings( Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() - .setDelayThreshold(Duration.ofMillis(1)) + .setDelayThresholdDuration(Duration.ofMillis(1)) .build()); try { builder.setBatchingSettings( @@ -919,7 +921,7 @@ public void testBuilderInvalidArguments() { builder.setBatchingSettings( Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() - .setDelayThreshold(Duration.ofMillis(-1)) + .setDelayThresholdDuration(Duration.ofMillis(-1)) .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { @@ -965,13 +967,13 @@ public void testBuilderInvalidArguments() { builder.setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT) + .setInitialRpcTimeoutDuration(Publisher.Builder.MIN_RPC_TIMEOUT) .build()); try { builder.setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT.minusMillis(1)) + .setInitialRpcTimeoutDuration(Publisher.Builder.MIN_RPC_TIMEOUT.minusMillis(1)) .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { @@ -980,13 +982,13 @@ public void testBuilderInvalidArguments() { builder.setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT) + .setTotalTimeoutDuration(Publisher.Builder.MIN_TOTAL_TIMEOUT) .build()); try { builder.setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT.minusMillis(1)) + .setTotalTimeoutDuration(Publisher.Builder.MIN_TOTAL_TIMEOUT.minusMillis(1)) .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { @@ -1031,7 +1033,7 @@ public void testAwaitTermination() throws Exception { .setRetrySettings( Publisher.Builder.DEFAULT_RETRY_SETTINGS .toBuilder() - .setTotalTimeout(Duration.ofSeconds(10)) + .setTotalTimeoutDuration(Duration.ofSeconds(10)) .setMaxAttempts(0) .build()) .build(); @@ -1066,7 +1068,7 @@ public void invalidFlowControlBytes_throwException() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .setFlowControlSettings( FlowControlSettings.newBuilder() .setLimitExceededBehavior( @@ -1091,7 +1093,7 @@ public void invalidFlowControlElementCount_throwException() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .setFlowControlSettings( FlowControlSettings.newBuilder() .setLimitExceededBehavior( @@ -1116,7 +1118,7 @@ public void testMessageExceedsFlowControlLimits_throwException() throws Exceptio Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .setFlowControlSettings( FlowControlSettings.newBuilder() .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) @@ -1144,7 +1146,7 @@ public void testPublishFlowControl_throwException() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .setFlowControlSettings( FlowControlSettings.newBuilder() .setLimitExceededBehavior( @@ -1186,7 +1188,7 @@ public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Except Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .setFlowControlSettings( FlowControlSettings.newBuilder() .setLimitExceededBehavior( @@ -1233,7 +1235,7 @@ public void testPublishFlowControl_block() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .setFlowControlSettings( FlowControlSettings.newBuilder() .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) @@ -1325,7 +1327,7 @@ public void testPublishOpenTelemetryTracing() throws Exception { Publisher.Builder.DEFAULT_BATCHING_SETTINGS .toBuilder() .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) + .setDelayThresholdDuration(Duration.ofSeconds(5)) .setFlowControlSettings( FlowControlSettings.newBuilder() .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java index 95f8897a4..412dd2ad8 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java @@ -34,6 +34,7 @@ import com.google.rpc.Status; import io.grpc.StatusException; import io.grpc.protobuf.StatusProto; +import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import org.junit.After; @@ -41,7 +42,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.threeten.bp.Duration; /** Tests for {@link StreamingSubscriberConnection}. */ public class StreamingSubscriberConnectionTest { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 612c244fe..679d37e40 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -35,13 +35,13 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import java.time.Duration; import java.util.concurrent.*; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.threeten.bp.Duration; /** Tests for {@link Subscriber}. */ public class SubscriberTest { @@ -241,7 +241,8 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception { Subscriber subscriber = startSubscriber( getTestSubscriberBuilder(testReceiver) - .setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension))); + .setMaxDurationPerAckExtensionDuration( + Duration.ofSeconds(maxDurationPerAckExtension))); assertEquals( expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); assertEquals( @@ -255,7 +256,8 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception { subscriber = startSubscriber( getTestSubscriberBuilder(testReceiver) - .setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension))); + .setMaxDurationPerAckExtensionDuration( + Duration.ofSeconds(maxDurationPerAckExtension))); assertEquals( expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); assertEquals( @@ -269,7 +271,8 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception { subscriber = startSubscriber( getTestSubscriberBuilder(testReceiver) - .setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension))); + .setMaxDurationPerAckExtensionDuration( + Duration.ofSeconds(maxDurationPerAckExtension))); assertEquals( expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); assertEquals(