Skip to content

Commit

Permalink
make Publisher uses RetrySettings (#1538)
Browse files Browse the repository at this point in the history
* make Publisher uses RetrySettings

* fix tests
  • Loading branch information
pongad authored Jan 18, 2017
1 parent 46bc1db commit 5ee13e7
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.bundling.FlowController;
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
Expand Down Expand Up @@ -145,18 +146,30 @@ public interface Publisher {

/** A builder of {@link Publisher}s. */
public final class Builder {
static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds
static final Duration MIN_RPC_TIMEOUT = new Duration(10); // 10 milliseconds

// Meaningful defaults.
static final long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
static final long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
static final Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
static final Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
static final Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
static final Duration MIN_REQUEST_TIMEOUT = new Duration(10); // 10 milliseconds
static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L;
static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 1000L; // 1 kB
static final Duration DEFAULT_DELAY_THRESHOLD = new Duration(1); // 1ms
static final Duration DEFAULT_RPC_TIMEOUT = new Duration(10 * 1000); // 10 seconds
static final Duration DEFAULT_TOTAL_TIMEOUT = MIN_TOTAL_TIMEOUT;
static final BundlingSettings DEFAULT_BUNDLING_SETTINGS =
BundlingSettings.newBuilder()
.setDelayThreshold(DEFAULT_MAX_BUNDLE_DURATION)
.setRequestByteThreshold(DEFAULT_MAX_BUNDLE_BYTES)
.setElementCountThreshold(DEFAULT_MAX_BUNDLE_MESSAGES)
.setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
.setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
.setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
.build();
static final RetrySettings DEFAULT_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setTotalTimeout(DEFAULT_TOTAL_TIMEOUT)
.setInitialRetryDelay(Duration.millis(5))
.setRetryDelayMultiplier(2)
.setMaxRetryDelay(Duration.millis(Long.MAX_VALUE))
.setInitialRpcTimeout(DEFAULT_RPC_TIMEOUT)
.setRpcTimeoutMultiplier(2)
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
.build();

String topic;
Expand All @@ -168,11 +181,7 @@ public final class Builder {
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;
boolean failOnFlowControlLimits = false;

// Send bundle deadline
Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;

// RPC options
Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;

// Channels and credentials
Optional<Credentials> userCredentials = Optional.absent();
Expand Down Expand Up @@ -258,18 +267,13 @@ public Builder setFailOnFlowControlLimits(boolean fail) {
return this;
}

/** Maximum time to attempt sending (and retrying) a bundle of messages before giving up. */
public Builder setSendBundleDeadline(Duration deadline) {
Preconditions.checkArgument(deadline.compareTo(MIN_SEND_BUNDLE_DURATION) >= 0);
sendBundleDeadline = deadline;
return this;
}

// Runtime options
/** Time to wait for a publish call to return from the server. */
public Builder setRequestTimeout(Duration timeout) {
Preconditions.checkArgument(timeout.compareTo(MIN_REQUEST_TIMEOUT) >= 0);
requestTimeout = timeout;
/** Configures the Publisher's retry parameters. */
public Builder setRetrySettings(RetrySettings retrySettings) {
Preconditions.checkArgument(
retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0);
Preconditions.checkArgument(
retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0);
this.retrySettings = retrySettings;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.bundling.FlowController;
import com.google.api.gax.core.RetrySettings;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -44,9 +45,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.joda.time.Duration;
Expand All @@ -56,17 +58,13 @@
/** Implementation of {@link Publisher}. */
final class PublisherImpl implements Publisher {
private static final int DEFAULT_MIN_THREAD_POOL_SIZE = 5;
private static final double INITIAL_BACKOFF_MS = 5;
private static final double BACKOFF_RANDOMNESS_FACTOR = 0.2;

private static final Logger logger = LoggerFactory.getLogger(PublisherImpl.class);

private final String topic;

private final long maxBundleMessages;
private final long maxBundleBytes;
private final Duration maxBundleDuration;
private final boolean hasBundlingBytes;
private final BundlingSettings bundlingSettings;
private final RetrySettings retrySettings;

private final FlowController.Settings flowControlSettings;
private final boolean failOnFlowControlLimits;
Expand All @@ -79,32 +77,24 @@ final class PublisherImpl implements Publisher {

private final FlowController flowController;
private final Channel[] channels;
private final AtomicLong channelIndex;
private final AtomicInteger channelIndex;
private final CallCredentials credentials;
private final Duration requestTimeout;

private final ScheduledExecutorService executor;
private final AtomicBoolean shutdown;
private final MessagesWaiter messagesWaiter;
private final Duration sendBundleDeadline;
private ScheduledFuture<?> currentAlarmFuture;

PublisherImpl(Builder builder) throws IOException {
topic = builder.topic;

maxBundleMessages = builder.bundlingSettings.getElementCountThreshold();
maxBundleBytes = builder.bundlingSettings.getRequestByteThreshold();
maxBundleDuration = builder.bundlingSettings.getDelayThreshold();
hasBundlingBytes = maxBundleBytes > 0;
this.bundlingSettings = builder.bundlingSettings;
this.retrySettings = builder.retrySettings;

flowControlSettings = builder.flowControlSettings;
failOnFlowControlLimits = builder.failOnFlowControlLimits;
this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits);

sendBundleDeadline = builder.sendBundleDeadline;

requestTimeout = builder.requestTimeout;

messagesBundle = new LinkedList<>();
messagesBundleLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
Expand All @@ -119,7 +109,7 @@ final class PublisherImpl implements Publisher {
.setNameFormat("cloud-pubsub-publisher-thread-%d")
.build());
channels = new Channel[numCores];
channelIndex = new AtomicLong(0);
channelIndex = new AtomicInteger(0);
for (int i = 0; i < numCores; i++) {
channels[i] =
builder.channelBuilder.isPresent()
Expand Down Expand Up @@ -150,17 +140,21 @@ public PublisherStats getStats() {

@Override
public Duration getMaxBundleDuration() {
return maxBundleDuration;
return bundlingSettings.getDelayThreshold();
}

@Override
public long getMaxBundleBytes() {
return maxBundleBytes;
return bundlingSettings.getRequestByteThreshold();
}

private boolean hasBundlingBytes() {
return getMaxBundleBytes() > 0;
}

@Override
public long getMaxBundleMessages() {
return maxBundleMessages;
return bundlingSettings.getElementCountThreshold();
}

@Override
Expand Down Expand Up @@ -212,7 +206,7 @@ public ListenableFuture<String> publish(PubsubMessage message) {
try {
// Check if the next message makes the bundle exceed the current bundle byte size.
if (!messagesBundle.isEmpty()
&& hasBundlingBytes
&& hasBundlingBytes()
&& bundledBytes + messageSize >= getMaxBundleBytes()) {
bundleToSend = new OutstandingBundle(messagesBundle, bundledBytes);
messagesBundle = new LinkedList<>();
Expand All @@ -221,7 +215,7 @@ public ListenableFuture<String> publish(PubsubMessage message) {

// Border case if the message to send is greater equals to the max bundle size then can't be
// included in the current bundle and instead sent immediately.
if (!hasBundlingBytes || messageSize < getMaxBundleBytes()) {
if (!hasBundlingBytes() || messageSize < getMaxBundleBytes()) {
bundledBytes += messageSize;
messagesBundle.add(outstandingPublish);

Expand Down Expand Up @@ -262,7 +256,7 @@ public void run() {

// If the message is over the size limit, it was not added to the pending messages and it will
// be sent in its own bundle immediately.
if (hasBundlingBytes && messageSize >= getMaxBundleBytes()) {
if (hasBundlingBytes() && messageSize >= getMaxBundleBytes()) {
logger.debug("Message exceeds the max bundle bytes, scheduling it for immediate send.");
executor.execute(
new Runnable() {
Expand Down Expand Up @@ -317,11 +311,18 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle)
for (OutstandingPublish outstandingPublish : outstandingBundle.outstandingPublishes) {
publishRequest.addMessages(outstandingPublish.message);
}
int currentChannel = (int) (channelIndex.getAndIncrement() % channels.length);
int currentChannel = channelIndex.getAndIncrement() % channels.length;

long rpcTimeoutMs =
Math.round(
retrySettings.getInitialRpcTimeout().getMillis()
* Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBundle.attempt - 1));
rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().getMillis());

Futures.addCallback(
PublisherGrpc.newFutureStub(channels[currentChannel])
.withCallCredentials(credentials)
.withDeadlineAfter(requestTimeout.getMillis(), TimeUnit.MILLISECONDS)
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS)
.publish(publishRequest.build()),
new FutureCallback<PublishResponse>() {
@Override
Expand All @@ -335,7 +336,8 @@ public void onSuccess(PublishResponse result) {
+ "the expected %s results. Please contact Cloud Pub/Sub support "
+ "if this frequently occurs",
result.getMessageIdsCount(), outstandingBundle.size()));
for (OutstandingPublish oustandingMessage : outstandingBundle.outstandingPublishes) {
for (OutstandingPublish oustandingMessage :
outstandingBundle.outstandingPublishes) {
oustandingMessage.publishResult.setException(t);
}
return;
Expand All @@ -354,12 +356,12 @@ public void onSuccess(PublishResponse result) {

@Override
public void onFailure(Throwable t) {
long nextBackoffDelay = computeNextBackoffDelayMs(outstandingBundle);
long nextBackoffDelay = computeNextBackoffDelayMs(outstandingBundle, retrySettings);

if (!isRetryable(t)
|| System.currentTimeMillis() + nextBackoffDelay
> outstandingBundle.creationTime
+ PublisherImpl.this.sendBundleDeadline.getMillis()) {
+ retrySettings.getTotalTimeout().getMillis()) {
try {
for (OutstandingPublish outstandingPublish :
outstandingBundle.outstandingPublishes) {
Expand Down Expand Up @@ -424,13 +426,15 @@ public void shutdown() {
messagesWaiter.waitNoMessages();
}

private static long computeNextBackoffDelayMs(OutstandingBundle outstandingBundle) {
long delayMillis = Math.round(Math.scalb(INITIAL_BACKOFF_MS, outstandingBundle.attempt));
int randomWaitMillis =
Ints.saturatedCast(
(long) ((Math.random() - 0.5) * 2 * delayMillis * BACKOFF_RANDOMNESS_FACTOR));
++outstandingBundle.attempt;
return delayMillis + randomWaitMillis;
private static long computeNextBackoffDelayMs(
OutstandingBundle outstandingBundle, RetrySettings retrySettings) {
long delayMillis =
Math.round(
retrySettings.getInitialRetryDelay().getMillis()
* Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBundle.attempt - 1));
delayMillis = Math.min(retrySettings.getMaxRetryDelay().getMillis(), delayMillis);
outstandingBundle.attempt++;
return ThreadLocalRandom.current().nextLong(0, delayMillis);
}

private boolean isRetryable(Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception
Publisher publisher =
getTestPublisherBuilder()
.setExecutor(Executors.newSingleThreadScheduledExecutor())
.setSendBundleDeadline(Duration.standardSeconds(10))
.setRetrySettings(
Publisher.Builder.DEFAULT_RETRY_SETTINGS
.toBuilder()
.setTotalTimeout(Duration.standardSeconds(10))
.build())
.setBundlingSettings(
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
Expand Down Expand Up @@ -308,7 +312,11 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
Publisher publisher =
getTestPublisherBuilder()
.setExecutor(Executors.newSingleThreadScheduledExecutor())
.setSendBundleDeadline(Duration.standardSeconds(10))
.setRetrySettings(
Publisher.Builder.DEFAULT_RETRY_SETTINGS
.toBuilder()
.setTotalTimeout(Duration.standardSeconds(10))
.build())
.setBundlingSettings(
Publisher.Builder.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
Expand Down Expand Up @@ -350,8 +358,6 @@ public void testPublisherGetters() throws Exception {
.setMaxOutstandingRequestBytes(Optional.of(13))
.setMaxOutstandingElementCount(Optional.of(14))
.build());
builder.setRequestTimeout(new Duration(15));
builder.setSendBundleDeadline(new Duration(16000));
Publisher publisher = builder.build();

assertEquals(TEST_TOPIC, publisher.getTopic());
Expand All @@ -371,17 +377,15 @@ public void testBuilderParametersAndDefaults() {
assertEquals(Optional.absent(), builder.executor);
assertFalse(builder.failOnFlowControlLimits);
assertEquals(
Publisher.Builder.DEFAULT_MAX_BUNDLE_BYTES,
Publisher.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD,
builder.bundlingSettings.getRequestByteThreshold().longValue());
assertEquals(
Publisher.Builder.DEFAULT_MAX_BUNDLE_DURATION,
builder.bundlingSettings.getDelayThreshold());
Publisher.Builder.DEFAULT_DELAY_THRESHOLD, builder.bundlingSettings.getDelayThreshold());
assertEquals(
Publisher.Builder.DEFAULT_MAX_BUNDLE_MESSAGES,
Publisher.Builder.DEFAULT_ELEMENT_COUNT_THRESHOLD,
builder.bundlingSettings.getElementCountThreshold().longValue());
assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings);
assertEquals(Publisher.Builder.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout);
assertEquals(Publisher.Builder.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline);
assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings);
assertEquals(Optional.absent(), builder.userCredentials);
}

Expand Down Expand Up @@ -551,16 +555,32 @@ public void testBuilderInvalidArguments() {
// Expected
}

builder.setRequestTimeout(Publisher.Builder.MIN_REQUEST_TIMEOUT);
builder.setRetrySettings(
Publisher.Builder.DEFAULT_RETRY_SETTINGS
.toBuilder()
.setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT)
.build());
try {
builder.setRequestTimeout(Publisher.Builder.MIN_REQUEST_TIMEOUT.minus(1));
builder.setRetrySettings(
Publisher.Builder.DEFAULT_RETRY_SETTINGS
.toBuilder()
.setInitialRpcTimeout(Publisher.Builder.MIN_RPC_TIMEOUT.minus(1))
.build());
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException expected) {
// Expected
}
builder.setSendBundleDeadline(Publisher.Builder.MIN_SEND_BUNDLE_DURATION);
builder.setRetrySettings(
Publisher.Builder.DEFAULT_RETRY_SETTINGS
.toBuilder()
.setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT)
.build());
try {
builder.setSendBundleDeadline(Publisher.Builder.MIN_SEND_BUNDLE_DURATION.minus(1));
builder.setRetrySettings(
Publisher.Builder.DEFAULT_RETRY_SETTINGS
.toBuilder()
.setTotalTimeout(Publisher.Builder.MIN_TOTAL_TIMEOUT.minus(1))
.build());
fail("Should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException expected) {
// Expected
Expand Down

0 comments on commit 5ee13e7

Please sign in to comment.