diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java index 6478289a44bd..b27a6c800bd5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java @@ -32,12 +32,9 @@ class FlowController { private final Optional maxOutstandingMessages; private final Optional maxOutstandingBytes; - FlowController( - Optional maxOutstandingMessages, - Optional maxOutstandingBytes, - boolean failOnFlowControlLimits) { - this.maxOutstandingMessages = Preconditions.checkNotNull(maxOutstandingMessages); - this.maxOutstandingBytes = Preconditions.checkNotNull(maxOutstandingBytes); + FlowController(PubSub.FlowControlSettings settings, boolean failOnFlowControlLimits) { + this.maxOutstandingMessages = settings.getMaxOutstandingMessages(); + this.maxOutstandingBytes = settings.getMaxOutstandingBytes(); outstandingMessageCount = maxOutstandingMessages.isPresent() ? new Semaphore(maxOutstandingMessages.get()) : null; outstandingByteCount = @@ -47,7 +44,7 @@ class FlowController { void reserve(int messages, int bytes) throws CloudPubsubFlowControlException { Preconditions.checkArgument(messages > 0); - + if (outstandingMessageCount != null) { if (!failOnLimits) { outstandingMessageCount.acquireUninterruptibly(messages); @@ -70,7 +67,7 @@ void reserve(int messages, int bytes) throws CloudPubsubFlowControlException { void release(int messages, int bytes) { Preconditions.checkArgument(messages > 0); - + if (outstandingMessageCount != null) { outstandingMessageCount.release(messages); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 9439e4665baf..852b27a35c9f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -16,11 +16,14 @@ package com.google.cloud.pubsub; +import com.google.auto.value.AutoValue; import com.google.cloud.AsyncPage; import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; import com.google.cloud.Policy; import com.google.cloud.Service; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.List; import java.util.Map; @@ -137,6 +140,49 @@ public static PullOption executorFactory(ExecutorFactory executorFactory) { } } + @AutoValue + public abstract static class FlowControlSettings { + static FlowControlSettings DEFAULT = + newBuilder() + .setMaxOutstandingBytes(Optional.absent()) + .setMaxOutstandingMessages(Optional.absent()) + .build(); + + /** Maximum number of outstanding messages to keep in memory before enforcing flow control. */ + abstract Optional getMaxOutstandingMessages(); + + /** Maximum number of outstanding bytes to keep in memory before enforcing flow control. */ + abstract Optional getMaxOutstandingBytes(); + + Builder toBuilder() { + return new AutoValue_PubSub_FlowControlSettings.Builder(this); + } + + static Builder newBuilder() { + return new AutoValue_PubSub_FlowControlSettings.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + abstract Builder setMaxOutstandingMessages(Optional value); + + abstract Builder setMaxOutstandingBytes(Optional value); + + abstract FlowControlSettings autoBuild(); + + FlowControlSettings build() { + FlowControlSettings settings = autoBuild(); + Preconditions.checkArgument( + settings.getMaxOutstandingMessages().or(1) > 0, + "maxOutstandingMessages limit is disabled by default, but if set it must be set to a value greater than 0."); + Preconditions.checkArgument( + settings.getMaxOutstandingBytes().or(1) > 0, + "maxOutstandingBytes limit is disabled by default, but if set it must be set to a value greater than 0."); + return settings; + } + } + } + /** * Creates a new topic. * diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java index efc63b14faa9..cadb4f3bcddc 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java @@ -164,24 +164,24 @@ final class Builder { String topic; // Bundling options - BundlingSettings bundlingSettings; + BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS; // Client-side flow control options - Optional maxOutstandingMessages; - Optional maxOutstandingBytes; - boolean failOnFlowControlLimits; + PubSub.FlowControlSettings flowControlSettings = PubSub.FlowControlSettings.DEFAULT; + boolean failOnFlowControlLimits = false; // Send bundle deadline - Duration sendBundleDeadline; + Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION; // RPC options - Duration requestTimeout; + Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT; // Channels and credentials - Optional userCredentials; - Optional>> channelBuilder; + Optional userCredentials = Optional.absent(); + Optional>> channelBuilder = + Optional.absent(); - Optional executor; + Optional executor = Optional.absent(); /** Constructs a new {@link Builder} using the given topic. */ public static Builder newBuilder(String topic) { @@ -190,19 +190,6 @@ public static Builder newBuilder(String topic) { Builder(String topic) { this.topic = Preconditions.checkNotNull(topic); - setDefaults(); - } - - private void setDefaults() { - userCredentials = Optional.absent(); - channelBuilder = Optional.absent(); - maxOutstandingMessages = Optional.absent(); - maxOutstandingBytes = Optional.absent(); - bundlingSettings = DEFAULT_BUNDLING_SETTINGS; - requestTimeout = DEFAULT_REQUEST_TIMEOUT; - sendBundleDeadline = MIN_SEND_BUNDLE_DURATION; - failOnFlowControlLimits = false; - executor = Optional.absent(); } /** @@ -254,17 +241,9 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) { // Flow control options - /** Maximum number of outstanding messages to keep in memory before enforcing flow control. */ - public Builder setMaxOutstandingMessages(int messages) { - Preconditions.checkArgument(messages > 0); - maxOutstandingMessages = Optional.of(messages); - return this; - } - - /** Maximum number of outstanding messages to keep in memory before enforcing flow control. */ - public Builder setMaxOutstandingBytes(int bytes) { - Preconditions.checkArgument(bytes > 0); - maxOutstandingBytes = Optional.of(bytes); + /** Sets the flow control settings. */ + public Builder setFlowControlSettings(PubSub.FlowControlSettings flowControlSettings) { + this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings); return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java index 355cfffee265..7e5ffbd265ab 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java @@ -68,8 +68,7 @@ final class PublisherImpl implements Publisher { private final Duration maxBundleDuration; private final boolean hasBundlingBytes; - private final Optional maxOutstandingMessages; - private final Optional maxOutstandingBytes; + private final PubSub.FlowControlSettings flowControlSettings; private final boolean failOnFlowControlLimits; private final Lock messagesBundleLock; @@ -98,11 +97,9 @@ final class PublisherImpl implements Publisher { maxBundleDuration = builder.bundlingSettings.getDelayThreshold(); hasBundlingBytes = maxBundleBytes > 0; - maxOutstandingMessages = builder.maxOutstandingMessages; - maxOutstandingBytes = builder.maxOutstandingBytes; + flowControlSettings = builder.flowControlSettings; failOnFlowControlLimits = builder.failOnFlowControlLimits; - this.flowController = - new FlowController(maxOutstandingMessages, maxOutstandingBytes, failOnFlowControlLimits); + this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); sendBundleDeadline = builder.sendBundleDeadline; @@ -166,12 +163,12 @@ public long getMaxBundleMessages() { @Override public Optional getMaxOutstandingMessages() { - return maxOutstandingMessages; + return flowControlSettings.getMaxOutstandingMessages(); } @Override public Optional getMaxOutstandingBytes() { - return maxOutstandingBytes; + return flowControlSettings.getMaxOutstandingBytes(); } @Override @@ -181,12 +178,12 @@ public boolean failOnFlowControlLimits() { /** Whether flow control kicks in on a per outstanding messages basis. */ boolean isPerMessageEnforced() { - return maxOutstandingMessages.isPresent(); + return getMaxOutstandingMessages().isPresent(); } /** Whether flow control kicks in on a per outstanding bytes basis. */ boolean isPerBytesEnforced() { - return maxOutstandingBytes.isPresent(); + return getMaxOutstandingBytes().isPresent(); } @Override diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java index cf24e2eca2d5..6cd11cd60e16 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java @@ -143,17 +143,17 @@ final class Builder { private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500); String subscription; - Optional credentials; + Optional credentials = Optional.absent(); MessageReceiver receiver; - Duration ackExpirationPadding; + Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; - Optional maxOutstandingMessages; - Optional maxOutstandingBytes; + PubSub.FlowControlSettings flowControlSettings = PubSub.FlowControlSettings.DEFAULT; - Optional executor; - Optional>> channelBuilder; - Optional clock; + Optional executor = Optional.absent(); + Optional>> channelBuilder = + Optional.absent(); + Optional clock = Optional.absent(); /** * Constructs a new {@link Builder}. @@ -170,21 +170,10 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) } Builder(String subscription, MessageReceiver receiver) { - setDefaults(); this.subscription = subscription; this.receiver = receiver; } - private void setDefaults() { - credentials = Optional.absent(); - channelBuilder = Optional.absent(); - ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; - maxOutstandingBytes = Optional.absent(); - maxOutstandingMessages = Optional.absent(); - executor = Optional.absent(); - clock = Optional.absent(); - } - /** * Credentials to authenticate with. * @@ -208,33 +197,9 @@ public Builder setChannelBuilder( return this; } - /** - * Sets the maximum number of outstanding messages; messages delivered to the {@link - * MessageReceiver} that have not been acknowledged or rejected. - * - * @param maxOutstandingMessages must be greater than 0 - */ - public Builder setMaxOutstandingMessages(int maxOutstandingMessages) { - Preconditions.checkArgument( - maxOutstandingMessages > 0, - "maxOutstandingMessages limit is disabled by default, but if set it must be set to a " - + "value greater to 0."); - this.maxOutstandingMessages = Optional.of(maxOutstandingMessages); - return this; - } - - /** - * Sets the maximum number of outstanding bytes; bytes delivered to the {@link MessageReceiver} - * that have not been acknowledged or rejected. - * - * @param maxOutstandingBytes must be greater than 0 - */ - public Builder setMaxOutstandingBytes(int maxOutstandingBytes) { - Preconditions.checkArgument( - maxOutstandingBytes > 0, - "maxOutstandingBytes limit is disabled by default, but if set it must be set to a value " - + "greater than 0."); - this.maxOutstandingBytes = Optional.of(maxOutstandingBytes); + /** Sets the flow control settings. */ + public Builder setFlowControlSettings(PubSub.FlowControlSettings flowControlSettings) { + this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings); return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java index 913028931af3..c842fa37e7ff 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java @@ -59,8 +59,7 @@ class SubscriberImpl extends AbstractService implements Subscriber { private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class); private final String subscription; - private final Optional maxOutstandingBytes; - private final Optional maxOutstandingMessages; + private final PubSub.FlowControlSettings flowControlSettings; private final Duration ackExpirationPadding; private final ScheduledExecutorService executor; private final Distribution ackLatencyDistribution = @@ -78,8 +77,7 @@ class SubscriberImpl extends AbstractService implements Subscriber { public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException { receiver = builder.receiver; - maxOutstandingBytes = builder.maxOutstandingBytes; - maxOutstandingMessages = builder.maxOutstandingMessages; + flowControlSettings = builder.flowControlSettings; subscription = builder.subscription; ackExpirationPadding = builder.ackExpirationPadding; streamAckDeadlineSeconds = @@ -88,8 +86,7 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException { Ints.saturatedCast(ackExpirationPadding.getStandardSeconds())); clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock(); - flowController = - new FlowController(builder.maxOutstandingBytes, builder.maxOutstandingBytes, false); + flowController = new FlowController(builder.flowControlSettings, false); numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE; executor = @@ -316,11 +313,11 @@ public Duration getAckExpirationPadding() { @Override public Optional getMaxOutstandingMessages() { - return maxOutstandingMessages; + return flowControlSettings.getMaxOutstandingMessages(); } @Override public Optional getMaxOutstandingBytes() { - return maxOutstandingBytes; + return flowControlSettings.getMaxOutstandingBytes(); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java index 7fd9dfd81818..10679cd91f59 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java @@ -36,7 +36,13 @@ public class FlowControllerTest { @Test public void testReserveRelease_ok() throws Exception { - FlowController flowController = new FlowController(Optional.of(10), Optional.of(10), false); + FlowController flowController = + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.of(10)) + .setMaxOutstandingBytes(Optional.of(10)) + .build(), + false); flowController.reserve(1, 1); flowController.release(1, 1); @@ -44,25 +50,31 @@ public void testReserveRelease_ok() throws Exception { @Test public void testInvalidArguments() throws Exception { - FlowController flowController = new FlowController(Optional.of(10), Optional.of(10), false); + FlowController flowController = + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.of(10)) + .setMaxOutstandingBytes(Optional.of(10)) + .build(), + false); flowController.reserve(1, 0); try { flowController.reserve(-1, 1); fail("Must have thrown an illegal argument error"); - } catch (IllegalArgumentException expected) { + } catch (IllegalArgumentException expected) { // Expected } try { flowController.reserve(1, -1); fail("Must have thrown an illegal argument error"); - } catch (IllegalArgumentException expected) { + } catch (IllegalArgumentException expected) { // Expected } try { flowController.reserve(0, 1); fail("Must have thrown an illegal argument error"); - } catch (IllegalArgumentException expected) { + } catch (IllegalArgumentException expected) { // Expected } } @@ -70,7 +82,12 @@ public void testInvalidArguments() throws Exception { @Test public void testReserveRelease_noLimits_ok() throws Exception { FlowController flowController = - new FlowController(Optional.absent(), Optional.absent(), false); + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.absent()) + .setMaxOutstandingBytes(Optional.absent()) + .build(), + false); flowController.reserve(1, 1); flowController.release(1, 1); @@ -78,7 +95,13 @@ public void testReserveRelease_noLimits_ok() throws Exception { @Test public void testReserveRelease_blockedByNumberOfMessages() throws Exception { - FlowController flowController = new FlowController(Optional.of(10), Optional.of(100), false); + FlowController flowController = + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.of(10)) + .setMaxOutstandingBytes(Optional.of(100)) + .build(), + false); testBlockingReserveRelease(flowController, 10, 10); } @@ -86,14 +109,25 @@ public void testReserveRelease_blockedByNumberOfMessages() throws Exception { @Test public void testReserveRelease_blockedByNumberOfMessages_noBytesLimit() throws Exception { FlowController flowController = - new FlowController(Optional.of(10), Optional.absent(), false); + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.of(10)) + .setMaxOutstandingBytes(Optional.absent()) + .build(), + false); testBlockingReserveRelease(flowController, 10, 10); } @Test public void testReserveRelease_blockedByNumberOfBytes() throws Exception { - FlowController flowController = new FlowController(Optional.of(100), Optional.of(10), false); + FlowController flowController = + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.of(100)) + .setMaxOutstandingBytes(Optional.of(10)) + .build(), + false); testBlockingReserveRelease(flowController, 10, 10); } @@ -101,7 +135,12 @@ public void testReserveRelease_blockedByNumberOfBytes() throws Exception { @Test public void testReserveRelease_blockedByNumberOfBytes_noMessagesLimit() throws Exception { FlowController flowController = - new FlowController(Optional.absent(), Optional.of(10), false); + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.absent()) + .setMaxOutstandingBytes(Optional.of(10)) + .build(), + false); testBlockingReserveRelease(flowController, 10, 10); } @@ -136,7 +175,13 @@ public void run() { @Test public void testReserveRelease_rejectedByNumberOfMessages() throws Exception { - FlowController flowController = new FlowController(Optional.of(10), Optional.of(100), true); + FlowController flowController = + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.of(10)) + .setMaxOutstandingBytes(Optional.of(100)) + .build(), + true); testRejectedReserveRelease( flowController, 10, 10, MaxOutstandingMessagesReachedException.class); @@ -145,7 +190,12 @@ public void testReserveRelease_rejectedByNumberOfMessages() throws Exception { @Test public void testReserveRelease_rejectedByNumberOfMessages_noBytesLimit() throws Exception { FlowController flowController = - new FlowController(Optional.of(10), Optional.absent(), true); + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.of(10)) + .setMaxOutstandingBytes(Optional.absent()) + .build(), + true); testRejectedReserveRelease( flowController, 10, 10, MaxOutstandingMessagesReachedException.class); @@ -153,7 +203,13 @@ public void testReserveRelease_rejectedByNumberOfMessages_noBytesLimit() throws @Test public void testReserveRelease_rejectedByNumberOfBytes() throws Exception { - FlowController flowController = new FlowController(Optional.of(100), Optional.of(10), true); + FlowController flowController = + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.of(100)) + .setMaxOutstandingBytes(Optional.of(10)) + .build(), + true); testRejectedReserveRelease(flowController, 10, 10, MaxOutstandingBytesReachedException.class); } @@ -161,7 +217,12 @@ public void testReserveRelease_rejectedByNumberOfBytes() throws Exception { @Test public void testReserveRelease_rejectedByNumberOfBytes_noMessagesLimit() throws Exception { FlowController flowController = - new FlowController(Optional.absent(), Optional.of(10), true); + new FlowController( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingMessages(Optional.absent()) + .setMaxOutstandingBytes(Optional.of(10)) + .build(), + true); testRejectedReserveRelease(flowController, 10, 10, MaxOutstandingBytesReachedException.class); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java index 29fbf99be07c..f242933345e2 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java @@ -344,8 +344,11 @@ public void testPublisherGetters() throws Exception { .setDelayThreshold(new Duration(11)) .setElementCountThreshold(12) .build()); - builder.setMaxOutstandingBytes(13); - builder.setMaxOutstandingMessages(14); + builder.setFlowControlSettings( + PubSub.FlowControlSettings.newBuilder() + .setMaxOutstandingBytes(Optional.of(13)) + .setMaxOutstandingMessages(Optional.of(14)) + .build()); builder.setRequestTimeout(new Duration(15)); builder.setSendBundleDeadline(new Duration(16000)); Publisher publisher = builder.build(); @@ -374,8 +377,7 @@ public void testBuilderParametersAndDefaults() { assertEquals( Publisher.DEFAULT_MAX_BUNDLE_MESSAGES, builder.bundlingSettings.getElementCountThreshold().longValue()); - assertEquals(Optional.absent(), builder.maxOutstandingBytes); - assertEquals(Optional.absent(), builder.maxOutstandingMessages); + assertEquals(PubSub.FlowControlSettings.DEFAULT, builder.flowControlSettings); assertEquals(Publisher.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout); assertEquals(Publisher.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline); assertEquals(Optional.absent(), builder.userCredentials); @@ -471,29 +473,53 @@ public void testBuilderInvalidArguments() { // Expected } - builder.setMaxOutstandingBytes(1); + builder.setFlowControlSettings( + PubSub.FlowControlSettings.DEFAULT + .toBuilder() + .setMaxOutstandingBytes(Optional.of(1)) + .build()); try { - builder.setMaxOutstandingBytes(0); + builder.setFlowControlSettings( + PubSub.FlowControlSettings.DEFAULT + .toBuilder() + .setMaxOutstandingBytes(Optional.of(0)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } try { - builder.setMaxOutstandingBytes(-1); + builder.setFlowControlSettings( + PubSub.FlowControlSettings.DEFAULT + .toBuilder() + .setMaxOutstandingBytes(Optional.of(-1)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } - builder.setMaxOutstandingMessages(1); + builder.setFlowControlSettings( + PubSub.FlowControlSettings.DEFAULT + .toBuilder() + .setMaxOutstandingMessages(Optional.of(1)) + .build()); try { - builder.setMaxOutstandingMessages(0); + builder.setFlowControlSettings( + PubSub.FlowControlSettings.DEFAULT + .toBuilder() + .setMaxOutstandingMessages(Optional.of(0)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected } try { - builder.setMaxOutstandingMessages(-1); + builder.setFlowControlSettings( + PubSub.FlowControlSettings.DEFAULT + .toBuilder() + .setMaxOutstandingMessages(Optional.of(-1)) + .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { // Expected