From 5ee13e7b7586169b9ccd5beead2f52d7d6920b57 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 19 Jan 2017 10:18:51 +1100 Subject: [PATCH] make Publisher uses RetrySettings (#1538) * make Publisher uses RetrySettings * fix tests --- .../google/cloud/pubsub/spi/v1/Publisher.java | 56 +++++++------ .../cloud/pubsub/spi/v1/PublisherImpl.java | 80 ++++++++++--------- .../pubsub/spi/v1/PublisherImplTest.java | 48 +++++++---- 3 files changed, 106 insertions(+), 78 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index a552dc65682e..0176f4c4f0d7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -17,6 +17,7 @@ package com.google.cloud.pubsub.spi.v1; import com.google.api.gax.bundling.FlowController; +import com.google.api.gax.core.RetrySettings; import com.google.api.gax.grpc.BundlingSettings; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; @@ -145,18 +146,30 @@ public interface Publisher { /** A builder of {@link Publisher}s. */ public final class Builder { + static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds + static final Duration MIN_RPC_TIMEOUT = new Duration(10); // 10 milliseconds + // Meaningful defaults. - static final long DEFAULT_MAX_BUNDLE_MESSAGES = 100L; - static final long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB - static final Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms - static final Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds - static final Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds - static final Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds + static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L; + static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 1000L; // 1 kB + static final Duration DEFAULT_DELAY_THRESHOLD = new Duration(1); // 1ms + static final Duration DEFAULT_RPC_TIMEOUT = new Duration(10 * 1000); // 10 seconds + static final Duration DEFAULT_TOTAL_TIMEOUT = MIN_TOTAL_TIMEOUT; static final BundlingSettings DEFAULT_BUNDLING_SETTINGS = BundlingSettings.newBuilder() - .setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION) - .setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES) - .setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES) + .setDelayThreshold(DEFAULT_DELAY_THRESHOLD) + .setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD) + .setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD) + .build(); + static final RetrySettings DEFAULT_RETRY_SETTINGS = + RetrySettings.newBuilder() + .setTotalTimeout(DEFAULT_TOTAL_TIMEOUT) + .setInitialRetryDelay(Duration.millis(5)) + .setRetryDelayMultiplier(2) + .setMaxRetryDelay(Duration.millis(Long.MAX_VALUE)) + .setInitialRpcTimeout(DEFAULT_RPC_TIMEOUT) + .setRpcTimeoutMultiplier(2) + .setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT) .build(); String topic; @@ -168,11 +181,7 @@ public final class Builder { FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT; boolean failOnFlowControlLimits = false; - // Send bundle deadline - Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION; - - // RPC options - Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT; + RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; // Channels and credentials Optional userCredentials = Optional.absent(); @@ -258,18 +267,13 @@ public Builder setFailOnFlowControlLimits(boolean fail) { return this; } - /** Maximum time to attempt sending (and retrying) a bundle of messages before giving up. */ - public Builder setSendBundleDeadline(Duration deadline) { - Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BUNDLE_DURATION) >= 0); - sendBundleDeadline = deadline; - return this; - } - - // Runtime options - /** Time to wait for a publish call to return from the server. */ - public Builder setRequestTimeout(Duration timeout) { - Preconditions.checkArgument(timeout.compareTo(MIN_REQUEST_TIMEOUT) >= 0); - requestTimeout = timeout; + /** Configures the Publisher's retry parameters. */ + public Builder setRetrySettings(RetrySettings retrySettings) { + Preconditions.checkArgument( + retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0); + Preconditions.checkArgument( + retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0); + this.retrySettings = retrySettings; return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherImpl.java index f9ad503dfe2c..a7c28a5cf643 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherImpl.java @@ -17,10 +17,11 @@ package com.google.cloud.pubsub.spi.v1; import com.google.api.gax.bundling.FlowController; +import com.google.api.gax.core.RetrySettings; +import com.google.api.gax.grpc.BundlingSettings; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; -import com.google.common.primitives.Ints; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -44,9 +45,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.joda.time.Duration; @@ -56,17 +58,13 @@ /** Implementation of {@link Publisher}. */ final class PublisherImpl implements Publisher { private static final int DEFAULT_MIN_THREAD_POOL_SIZE = 5; - private static final double INITIAL_BACKOFF_MS = 5; - private static final double BACKOFF_RANDOMNESS_FACTOR = 0.2; private static final Logger logger = LoggerFactory.getLogger(PublisherImpl.class); private final String topic; - private final long maxBundleMessages; - private final long maxBundleBytes; - private final Duration maxBundleDuration; - private final boolean hasBundlingBytes; + private final BundlingSettings bundlingSettings; + private final RetrySettings retrySettings; private final FlowController.Settings flowControlSettings; private final boolean failOnFlowControlLimits; @@ -79,32 +77,24 @@ final class PublisherImpl implements Publisher { private final FlowController flowController; private final Channel[] channels; - private final AtomicLong channelIndex; + private final AtomicInteger channelIndex; private final CallCredentials credentials; - private final Duration requestTimeout; private final ScheduledExecutorService executor; private final AtomicBoolean shutdown; private final MessagesWaiter messagesWaiter; - private final Duration sendBundleDeadline; private ScheduledFuture currentAlarmFuture; PublisherImpl(Builder builder) throws IOException { topic = builder.topic; - maxBundleMessages = builder.bundlingSettings.getElementCountThreshold(); - maxBundleBytes = builder.bundlingSettings.getRequestByteThreshold(); - maxBundleDuration = builder.bundlingSettings.getDelayThreshold(); - hasBundlingBytes = maxBundleBytes > 0; + this.bundlingSettings = builder.bundlingSettings; + this.retrySettings = builder.retrySettings; flowControlSettings = builder.flowControlSettings; failOnFlowControlLimits = builder.failOnFlowControlLimits; this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); - sendBundleDeadline = builder.sendBundleDeadline; - - requestTimeout = builder.requestTimeout; - messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); @@ -119,7 +109,7 @@ final class PublisherImpl implements Publisher { .setNameFormat("cloud-pubsub-publisher-thread-%d") .build()); channels = new Channel[numCores]; - channelIndex = new AtomicLong(0); + channelIndex = new AtomicInteger(0); for (int i = 0; i < numCores; i++) { channels[i] = builder.channelBuilder.isPresent() @@ -150,17 +140,21 @@ public PublisherStats getStats() { @Override public Duration getMaxBundleDuration() { - return maxBundleDuration; + return bundlingSettings.getDelayThreshold(); } @Override public long getMaxBundleBytes() { - return maxBundleBytes; + return bundlingSettings.getRequestByteThreshold(); + } + + private boolean hasBundlingBytes() { + return getMaxBundleBytes() > 0; } @Override public long getMaxBundleMessages() { - return maxBundleMessages; + return bundlingSettings.getElementCountThreshold(); } @Override @@ -212,7 +206,7 @@ public ListenableFuture publish(PubsubMessage message) { try { // Check if the next message makes the bundle exceed the current bundle byte size. if (!messagesBundle.isEmpty() - && hasBundlingBytes + && hasBundlingBytes() && bundledBytes + messageSize >= getMaxBundleBytes()) { bundleToSend = new OutstandingBundle(messagesBundle, bundledBytes); messagesBundle = new LinkedList<>(); @@ -221,7 +215,7 @@ public ListenableFuture publish(PubsubMessage message) { // Border case if the message to send is greater equals to the max bundle size then can't be // included in the current bundle and instead sent immediately. - if (!hasBundlingBytes || messageSize < getMaxBundleBytes()) { + if (!hasBundlingBytes() || messageSize < getMaxBundleBytes()) { bundledBytes += messageSize; messagesBundle.add(outstandingPublish); @@ -262,7 +256,7 @@ public void run() { // If the message is over the size limit, it was not added to the pending messages and it will // be sent in its own bundle immediately. - if (hasBundlingBytes && messageSize >= getMaxBundleBytes()) { + if (hasBundlingBytes() && messageSize >= getMaxBundleBytes()) { logger.debug("Message exceeds the max bundle bytes, scheduling it for immediate send."); executor.execute( new Runnable() { @@ -317,11 +311,18 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) { publishRequest.addMessages(outstandingPublish.message); } - int currentChannel = (int) (channelIndex.getAndIncrement() % channels.length); + int currentChannel = channelIndex.getAndIncrement() % channels.length; + + long rpcTimeoutMs = + Math.round( + retrySettings.getInitialRpcTimeout().getMillis() + * Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBundle.attempt - 1)); + rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().getMillis()); + Futures.addCallback( PublisherGrpc.newFutureStub(channels[currentChannel]) .withCallCredentials(credentials) - .withDeadlineAfter(requestTimeout.getMillis(), TimeUnit.MILLISECONDS) + .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS) .publish(publishRequest.build()), new FutureCallback() { @Override @@ -335,7 +336,8 @@ public void onSuccess(PublishResponse result) { + "the expected %s results. Please contact Cloud Pub/Sub support " + "if this frequently occurs", result.getMessageIdsCount(), outstandingBundle.size())); - for (OutstandingPublish oustandingMessage : outstandingBundle.outstandingPublishes) { + for (OutstandingPublish oustandingMessage : + outstandingBundle.outstandingPublishes) { oustandingMessage.publishResult.setException(t); } return; @@ -354,12 +356,12 @@ public void onSuccess(PublishResponse result) { @Override public void onFailure(Throwable t) { - long nextBackoffDelay = computeNextBackoffDelayMs(outstandingBundle); + long nextBackoffDelay = computeNextBackoffDelayMs(outstandingBundle, retrySettings); if (!isRetryable(t) || System.currentTimeMillis() + nextBackoffDelay > outstandingBundle.creationTime - + PublisherImpl.this.sendBundleDeadline.getMillis()) { + + retrySettings.getTotalTimeout().getMillis()) { try { for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) { @@ -424,13 +426,15 @@ public void shutdown() { messagesWaiter.waitNoMessages(); } - private static long computeNextBackoffDelayMs(OutstandingBundle outstandingBundle) { - long delayMillis = Math.round(Math.scalb(INITIAL_BACKOFF_MS, outstandingBundle.attempt)); - int randomWaitMillis = - Ints.saturatedCast( - (long) ((Math.random() - 0.5) * 2 * delayMillis * BACKOFF_RANDOMNESS_FACTOR)); - ++outstandingBundle.attempt; - return delayMillis + randomWaitMillis; + private static long computeNextBackoffDelayMs( + OutstandingBundle outstandingBundle, RetrySettings retrySettings) { + long delayMillis = + Math.round( + retrySettings.getInitialRetryDelay().getMillis() + * Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBundle.attempt - 1)); + delayMillis = Math.min(retrySettings.getMaxRetryDelay().getMillis(), delayMillis); + outstandingBundle.attempt++; + return ThreadLocalRandom.current().nextLong(0, delayMillis); } private boolean isRetryable(Throwable t) { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index 1e88d8885489..b518cb2d50e7 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -279,7 +279,11 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception Publisher publisher = getTestPublisherBuilder() .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setSendBundleDeadline(Duration.standardSeconds(10)) + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Duration.standardSeconds(10)) + .build()) .setBundlingSettings( Publisher.Builder.DEFAULT_BUNDLING_SETTINGS .toBuilder() @@ -308,7 +312,11 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce Publisher publisher = getTestPublisherBuilder() .setExecutor(Executors.newSingleThreadScheduledExecutor()) - .setSendBundleDeadline(Duration.standardSeconds(10)) + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Duration.standardSeconds(10)) + .build()) .setBundlingSettings( Publisher.Builder.DEFAULT_BUNDLING_SETTINGS .toBuilder() @@ -350,8 +358,6 @@ public void testPublisherGetters() throws Exception { .setMaxOutstandingRequestBytes(Optional.of(13)) .setMaxOutstandingElementCount(Optional.of(14)) .build()); - builder.setRequestTimeout(new Duration(15)); - builder.setSendBundleDeadline(new Duration(16000)); Publisher publisher = builder.build(); assertEquals(TEST_TOPIC, publisher.getTopic()); @@ -371,17 +377,15 @@ public void testBuilderParametersAndDefaults() { assertEquals(Optional.absent(), builder.executor); assertFalse(builder.failOnFlowControlLimits); assertEquals( - Publisher.Builder.DEFAULT_MAX_BUNDLE_BYTES, + Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD, builder.bundlingSettings.getRequestByteThreshold().longValue()); assertEquals( - Publisher.Builder.DEFAULT_MAX_BUNDLE_DURATION, - builder.bundlingSettings.getDelayThreshold()); + Publisher.Builder.DEFAULT_DELAY_THRESHOLD, builder.bundlingSettings.getDelayThreshold()); assertEquals( - Publisher.Builder.DEFAULT_MAX_BUNDLE_MESSAGES, + Publisher.Builder.DEFAULT_ELEMENT_COUNT_THRESHOLD, builder.bundlingSettings.getElementCountThreshold().longValue()); assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings); - assertEquals(Publisher.Builder.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout); - assertEquals(Publisher.Builder.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline); + assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings); assertEquals(Optional.absent(), builder.userCredentials); } @@ -551,16 +555,32 @@ public void testBuilderInvalidArguments() { // Expected } - builder.setRequestTimeout(Publisher.Builder.MIN_REQUEST_TIMEOUT); + builder.setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT) + .build()); try { - builder.setRequestTimeout(Publisher.Builder.MIN_REQUEST_TIMEOUT.minus(1)); + builder.setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT.minus(1)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } - builder.setSendBundleDeadline(Publisher.Builder.MIN_SEND_BUNDLE_DURATION); + builder.setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT) + .build()); try { - builder.setSendBundleDeadline(Publisher.Builder.MIN_SEND_BUNDLE_DURATION.minus(1)); + builder.setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT.minus(1)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected