diff --git a/google-cloud-core/pom.xml b/google-cloud-core/pom.xml index 489d27cdf126..190cbe8ec854 100644 --- a/google-cloud-core/pom.xml +++ b/google-cloud-core/pom.xml @@ -111,7 +111,7 @@ com.google.api gax - 0.0.25 + 0.0.27-SNAPSHOT io.grpc diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java deleted file mode 100644 index b27a6c800bd5..000000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/FlowController.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2016 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub; - -import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException; -import com.google.cloud.pubsub.Publisher.MaxOutstandingBytesReachedException; -import com.google.cloud.pubsub.Publisher.MaxOutstandingMessagesReachedException; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import java.util.concurrent.Semaphore; -import javax.annotation.Nullable; - -/** Provides flow control capability for Pub/Sub client classes. */ -class FlowController { - @Nullable private final Semaphore outstandingMessageCount; - @Nullable private final Semaphore outstandingByteCount; - private final boolean failOnLimits; - private final Optional maxOutstandingMessages; - private final Optional maxOutstandingBytes; - - FlowController(PubSub.FlowControlSettings settings, boolean failOnFlowControlLimits) { - this.maxOutstandingMessages = settings.getMaxOutstandingMessages(); - this.maxOutstandingBytes = settings.getMaxOutstandingBytes(); - outstandingMessageCount = - maxOutstandingMessages.isPresent() ? new Semaphore(maxOutstandingMessages.get()) : null; - outstandingByteCount = - maxOutstandingBytes.isPresent() ? new Semaphore(maxOutstandingBytes.get()) : null; - this.failOnLimits = failOnFlowControlLimits; - } - - void reserve(int messages, int bytes) throws CloudPubsubFlowControlException { - Preconditions.checkArgument(messages > 0); - - if (outstandingMessageCount != null) { - if (!failOnLimits) { - outstandingMessageCount.acquireUninterruptibly(messages); - } else if (!outstandingMessageCount.tryAcquire(messages)) { - throw new MaxOutstandingMessagesReachedException(maxOutstandingMessages.get()); - } - } - - // Will always allow to send a message even if it is larger than the flow control limit, - // if it doesn't then it will deadlock the thread. - if (outstandingByteCount != null) { - int permitsToDraw = Math.min(bytes, maxOutstandingBytes.get()); - if (!failOnLimits) { - outstandingByteCount.acquireUninterruptibly(permitsToDraw); - } else if (!outstandingByteCount.tryAcquire(permitsToDraw)) { - throw new MaxOutstandingBytesReachedException(maxOutstandingBytes.get()); - } - } - } - - void release(int messages, int bytes) { - Preconditions.checkArgument(messages > 0); - - if (outstandingMessageCount != null) { - outstandingMessageCount.release(messages); - } - if (outstandingByteCount != null) { - // Need to return at most as much bytes as it can be drawn. - int permitsToReturn = Math.min(bytes, maxOutstandingBytes.get()); - outstandingByteCount.release(permitsToReturn); - } - } -} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java index ea0ddd46926a..855f2d3b5545 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java @@ -16,9 +16,9 @@ package com.google.cloud.pubsub; +import com.google.api.gax.bundling.FlowController; import com.google.auth.Credentials; import com.google.cloud.Clock; -import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException; import com.google.cloud.pubsub.Subscriber.MessageReceiver; import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply; import com.google.common.annotations.VisibleForTesting; @@ -287,7 +287,7 @@ public void run() { } try { flowController.reserve(receivedMessagesCount, totalByteCount); - } catch (CloudPubsubFlowControlException unexpectedException) { + } catch (FlowController.FlowControlException unexpectedException) { throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java index 9fa0a9eefa41..ec292451029e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java @@ -18,6 +18,7 @@ import static com.google.cloud.pubsub.StatusUtil.isRetryable; +import com.google.api.gax.bundling.FlowController; import com.google.auth.Credentials; import com.google.cloud.Clock; import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 852b27a35c9f..ee1d6cfe0f56 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -140,49 +140,6 @@ public static PullOption executorFactory(ExecutorFactory executorFactory) { } } - @AutoValue - public abstract static class FlowControlSettings { - static FlowControlSettings DEFAULT = - newBuilder() - .setMaxOutstandingBytes(Optional.absent()) - .setMaxOutstandingMessages(Optional.absent()) - .build(); - - /** Maximum number of outstanding messages to keep in memory before enforcing flow control. */ - abstract Optional getMaxOutstandingMessages(); - - /** Maximum number of outstanding bytes to keep in memory before enforcing flow control. */ - abstract Optional getMaxOutstandingBytes(); - - Builder toBuilder() { - return new AutoValue_PubSub_FlowControlSettings.Builder(this); - } - - static Builder newBuilder() { - return new AutoValue_PubSub_FlowControlSettings.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - abstract Builder setMaxOutstandingMessages(Optional value); - - abstract Builder setMaxOutstandingBytes(Optional value); - - abstract FlowControlSettings autoBuild(); - - FlowControlSettings build() { - FlowControlSettings settings = autoBuild(); - Preconditions.checkArgument( - settings.getMaxOutstandingMessages().or(1) > 0, - "maxOutstandingMessages limit is disabled by default, but if set it must be set to a value greater than 0."); - Preconditions.checkArgument( - settings.getMaxOutstandingBytes().or(1) > 0, - "maxOutstandingBytes limit is disabled by default, but if set it must be set to a value greater than 0."); - return settings; - } - } - } - /** * Creates a new topic. * diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java index cadb4f3bcddc..8428cce91e90 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub; +import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; @@ -86,8 +87,8 @@ public interface Publisher { int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) // Meaningful defaults. - int DEFAULT_MAX_BUNDLE_MESSAGES = 100; - int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB + long DEFAULT_MAX_BUNDLE_MESSAGES = 100L; + long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds @@ -129,20 +130,20 @@ public interface Publisher { * Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced. * See {@link #failOnFlowControlLimits()}. */ - Optional getMaxOutstandingMessages(); + Optional getMaxOutstandingElementCount(); /** * Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced. See * {@link #failOnFlowControlLimits()}. */ - Optional getMaxOutstandingBytes(); + Optional getMaxOutstandingRequestBytes(); /** * Whether to block publish calls when reaching flow control limits (see {@link - * #getMaxOutstandingBytes()} & {@link #getMaxOutstandingMessages()}). + * #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}). * *

If set to false, a publish call will fail with either {@link - * MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException}, as + * RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException}, as * appropriate, when flow control limits are reached. */ boolean failOnFlowControlLimits(); @@ -167,7 +168,7 @@ final class Builder { BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS; // Client-side flow control options - PubSub.FlowControlSettings flowControlSettings = PubSub.FlowControlSettings.DEFAULT; + FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT; boolean failOnFlowControlLimits = false; // Send bundle deadline @@ -242,14 +243,14 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) { // Flow control options /** Sets the flow control settings. */ - public Builder setFlowControlSettings(PubSub.FlowControlSettings flowControlSettings) { + public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) { this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings); return this; } /** * Whether to fail publish when reaching any of the flow control limits, with either a {@link - * MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException} as + * RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException} as * appropriate. * *

If set to false, then publish operations will block the current thread until the @@ -285,51 +286,4 @@ public Publisher build() throws IOException { return new PublisherImpl(this); } } - - /** Base exception that signals a flow control state. */ - abstract class CloudPubsubFlowControlException extends Exception {} - - /** - * Returned as a future exception when client-side flow control is enforced based on the maximum - * number of outstanding in-memory messages. - */ - final class MaxOutstandingMessagesReachedException extends CloudPubsubFlowControlException { - private final int currentMaxMessages; - - public MaxOutstandingMessagesReachedException(int currentMaxMessages) { - this.currentMaxMessages = currentMaxMessages; - } - - public int getCurrentMaxBundleMessages() { - return currentMaxMessages; - } - - @Override - public String toString() { - return String.format( - "The maximum number of bundle messages: %d have been reached.", currentMaxMessages); - } - } - - /** - * Returned as a future exception when client-side flow control is enforced based on the maximum - * number of unacknowledged in-memory bytes. - */ - final class MaxOutstandingBytesReachedException extends CloudPubsubFlowControlException { - private final int currentMaxBytes; - - public MaxOutstandingBytesReachedException(int currentMaxBytes) { - this.currentMaxBytes = currentMaxBytes; - } - - public int getCurrentMaxBundleBytes() { - return currentMaxBytes; - } - - @Override - public String toString() { - return String.format( - "The maximum number of bundle bytes: %d have been reached.", currentMaxBytes); - } - } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java index 7e5ffbd265ab..d9e375b2dedd 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub; +import com.google.api.gax.bundling.FlowController; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -68,7 +69,7 @@ final class PublisherImpl implements Publisher { private final Duration maxBundleDuration; private final boolean hasBundlingBytes; - private final PubSub.FlowControlSettings flowControlSettings; + private final FlowController.Settings flowControlSettings; private final boolean failOnFlowControlLimits; private final Lock messagesBundleLock; @@ -162,13 +163,13 @@ public long getMaxBundleMessages() { } @Override - public Optional getMaxOutstandingMessages() { - return flowControlSettings.getMaxOutstandingMessages(); + public Optional getMaxOutstandingElementCount() { + return flowControlSettings.getMaxOutstandingElementCount(); } @Override - public Optional getMaxOutstandingBytes() { - return flowControlSettings.getMaxOutstandingBytes(); + public Optional getMaxOutstandingRequestBytes() { + return flowControlSettings.getMaxOutstandingRequestBytes(); } @Override @@ -178,12 +179,12 @@ public boolean failOnFlowControlLimits() { /** Whether flow control kicks in on a per outstanding messages basis. */ boolean isPerMessageEnforced() { - return getMaxOutstandingMessages().isPresent(); + return getMaxOutstandingElementCount().isPresent(); } /** Whether flow control kicks in on a per outstanding bytes basis. */ boolean isPerBytesEnforced() { - return getMaxOutstandingBytes().isPresent(); + return getMaxOutstandingRequestBytes().isPresent(); } @Override @@ -200,7 +201,7 @@ public ListenableFuture publish(PubsubMessage message) { final int messageSize = message.getSerializedSize(); try { flowController.reserve(1, messageSize); - } catch (CloudPubsubFlowControlException e) { + } catch (FlowController.FlowControlException e) { return Futures.immediateFailedFuture(e); } OutstandingBundle bundleToSend = null; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/StreamingSubscriberConnection.java index e8f0e63c04d9..d0d3cd4b3e6c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/StreamingSubscriberConnection.java @@ -18,6 +18,7 @@ import static com.google.cloud.pubsub.StatusUtil.isRetryable; +import com.google.api.gax.bundling.FlowController; import com.google.auth.Credentials; import com.google.cloud.Clock; import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java index 6cd11cd60e16..a193510f398a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub; +import com.google.api.gax.bundling.FlowController; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Clock; @@ -130,12 +131,12 @@ public static enum AckReply { * MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window * management, still some extra bytes could be kept at lower layers. */ - Optional getMaxOutstandingMessages(); + Optional getMaxOutstandingElementCount(); /** * Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. */ - Optional getMaxOutstandingBytes(); + Optional getMaxOutstandingRequestBytes(); /** Builder of {@link Subscriber Subscribers}. */ final class Builder { @@ -148,7 +149,7 @@ final class Builder { Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING; - PubSub.FlowControlSettings flowControlSettings = PubSub.FlowControlSettings.DEFAULT; + FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT; Optional executor = Optional.absent(); Optional>> channelBuilder = @@ -198,7 +199,7 @@ public Builder setChannelBuilder( } /** Sets the flow control settings. */ - public Builder setFlowControlSettings(PubSub.FlowControlSettings flowControlSettings) { + public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) { this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings); return this; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java index c842fa37e7ff..a8b98d6d6707 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriberImpl.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub; +import com.google.api.gax.bundling.FlowController; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Clock; @@ -59,7 +60,7 @@ class SubscriberImpl extends AbstractService implements Subscriber { private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class); private final String subscription; - private final PubSub.FlowControlSettings flowControlSettings; + private final FlowController.Settings flowControlSettings; private final Duration ackExpirationPadding; private final ScheduledExecutorService executor; private final Distribution ackLatencyDistribution = @@ -312,12 +313,12 @@ public Duration getAckExpirationPadding() { } @Override - public Optional getMaxOutstandingMessages() { - return flowControlSettings.getMaxOutstandingMessages(); + public Optional getMaxOutstandingElementCount() { + return flowControlSettings.getMaxOutstandingElementCount(); } @Override - public Optional getMaxOutstandingBytes() { - return flowControlSettings.getMaxOutstandingBytes(); + public Optional getMaxOutstandingRequestBytes() { + return flowControlSettings.getMaxOutstandingRequestBytes(); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java deleted file mode 100644 index 10679cd91f59..000000000000 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/FlowControllerTest.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Copyright 2016 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException; -import com.google.cloud.pubsub.Publisher.MaxOutstandingBytesReachedException; -import com.google.cloud.pubsub.Publisher.MaxOutstandingMessagesReachedException; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.SettableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link FlowController}. */ -@RunWith(JUnit4.class) -public class FlowControllerTest { - - @Test - public void testReserveRelease_ok() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.of(10)) - .setMaxOutstandingBytes(Optional.of(10)) - .build(), - false); - - flowController.reserve(1, 1); - flowController.release(1, 1); - } - - @Test - public void testInvalidArguments() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.of(10)) - .setMaxOutstandingBytes(Optional.of(10)) - .build(), - false); - - flowController.reserve(1, 0); - try { - flowController.reserve(-1, 1); - fail("Must have thrown an illegal argument error"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - flowController.reserve(1, -1); - fail("Must have thrown an illegal argument error"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - flowController.reserve(0, 1); - fail("Must have thrown an illegal argument error"); - } catch (IllegalArgumentException expected) { - // Expected - } - } - - @Test - public void testReserveRelease_noLimits_ok() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.absent()) - .setMaxOutstandingBytes(Optional.absent()) - .build(), - false); - - flowController.reserve(1, 1); - flowController.release(1, 1); - } - - @Test - public void testReserveRelease_blockedByNumberOfMessages() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.of(10)) - .setMaxOutstandingBytes(Optional.of(100)) - .build(), - false); - - testBlockingReserveRelease(flowController, 10, 10); - } - - @Test - public void testReserveRelease_blockedByNumberOfMessages_noBytesLimit() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.of(10)) - .setMaxOutstandingBytes(Optional.absent()) - .build(), - false); - - testBlockingReserveRelease(flowController, 10, 10); - } - - @Test - public void testReserveRelease_blockedByNumberOfBytes() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.of(100)) - .setMaxOutstandingBytes(Optional.of(10)) - .build(), - false); - - testBlockingReserveRelease(flowController, 10, 10); - } - - @Test - public void testReserveRelease_blockedByNumberOfBytes_noMessagesLimit() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.absent()) - .setMaxOutstandingBytes(Optional.of(10)) - .build(), - false); - - testBlockingReserveRelease(flowController, 10, 10); - } - - private static void testBlockingReserveRelease( - final FlowController flowController, final int maxNumMessages, final int maxNumBytes) - throws Exception { - - flowController.reserve(1, 1); - - final SettableFuture permitsReserved = SettableFuture.create(); - Future finished = - Executors.newCachedThreadPool() - .submit( - new Runnable() { - @Override - public void run() { - try { - permitsReserved.set(null); - flowController.reserve(maxNumMessages, maxNumBytes); - } catch (CloudPubsubFlowControlException e) { - throw new AssertionError(e); - } - } - }); - - permitsReserved.get(); - flowController.release(1, 1); - - finished.get(); - } - - @Test - public void testReserveRelease_rejectedByNumberOfMessages() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.of(10)) - .setMaxOutstandingBytes(Optional.of(100)) - .build(), - true); - - testRejectedReserveRelease( - flowController, 10, 10, MaxOutstandingMessagesReachedException.class); - } - - @Test - public void testReserveRelease_rejectedByNumberOfMessages_noBytesLimit() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.of(10)) - .setMaxOutstandingBytes(Optional.absent()) - .build(), - true); - - testRejectedReserveRelease( - flowController, 10, 10, MaxOutstandingMessagesReachedException.class); - } - - @Test - public void testReserveRelease_rejectedByNumberOfBytes() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.of(100)) - .setMaxOutstandingBytes(Optional.of(10)) - .build(), - true); - - testRejectedReserveRelease(flowController, 10, 10, MaxOutstandingBytesReachedException.class); - } - - @Test - public void testReserveRelease_rejectedByNumberOfBytes_noMessagesLimit() throws Exception { - FlowController flowController = - new FlowController( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingMessages(Optional.absent()) - .setMaxOutstandingBytes(Optional.of(10)) - .build(), - true); - - testRejectedReserveRelease(flowController, 10, 10, MaxOutstandingBytesReachedException.class); - } - - private void testRejectedReserveRelease( - FlowController flowController, - int maxNumMessages, - int maxNumBytes, - Class expectedException) - throws CloudPubsubFlowControlException { - - flowController.reserve(1, 1); - - try { - flowController.reserve(maxNumMessages, maxNumBytes); - fail("Should thrown a CloudPubsubFlowControlException"); - } catch (CloudPubsubFlowControlException e) { - assertTrue(expectedException.isInstance(e)); - } - - flowController.release(1, 1); - - flowController.reserve(maxNumMessages, maxNumBytes); - } -} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java index f242933345e2..74bf9e28f53b 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.times; +import com.google.api.gax.bundling.FlowController; import com.google.api.gax.grpc.BundlingSettings; import com.google.cloud.pubsub.Publisher.Builder; import com.google.common.base.Optional; @@ -345,9 +346,9 @@ public void testPublisherGetters() throws Exception { .setElementCountThreshold(12) .build()); builder.setFlowControlSettings( - PubSub.FlowControlSettings.newBuilder() - .setMaxOutstandingBytes(Optional.of(13)) - .setMaxOutstandingMessages(Optional.of(14)) + FlowController.Settings.newBuilder() + .setMaxOutstandingRequestBytes(Optional.of(13)) + .setMaxOutstandingElementCount(Optional.of(14)) .build()); builder.setRequestTimeout(new Duration(15)); builder.setSendBundleDeadline(new Duration(16000)); @@ -357,8 +358,8 @@ public void testPublisherGetters() throws Exception { assertEquals(10, publisher.getMaxBundleBytes()); assertEquals(new Duration(11), publisher.getMaxBundleDuration()); assertEquals(12, publisher.getMaxBundleMessages()); - assertEquals(Optional.of(13), publisher.getMaxOutstandingBytes()); - assertEquals(Optional.of(14), publisher.getMaxOutstandingMessages()); + assertEquals(Optional.of(13), publisher.getMaxOutstandingRequestBytes()); + assertEquals(Optional.of(14), publisher.getMaxOutstandingElementCount()); assertTrue(publisher.failOnFlowControlLimits()); } @@ -377,7 +378,7 @@ public void testBuilderParametersAndDefaults() { assertEquals( Publisher.DEFAULT_MAX_BUNDLE_MESSAGES, builder.bundlingSettings.getElementCountThreshold().longValue()); - assertEquals(PubSub.FlowControlSettings.DEFAULT, builder.flowControlSettings); + assertEquals(FlowController.Settings.DEFAULT, builder.flowControlSettings); assertEquals(Publisher.DEFAULT_REQUEST_TIMEOUT, builder.requestTimeout); assertEquals(Publisher.MIN_SEND_BUNDLE_DURATION, builder.sendBundleDeadline); assertEquals(Optional.absent(), builder.userCredentials); @@ -409,7 +410,7 @@ public void testBuilderInvalidArguments() { } try { builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold(null).build()); + Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold((Long)null).build()); fail("Should have thrown an NullPointerException"); } catch (NullPointerException expected) { // Expected @@ -453,7 +454,7 @@ public void testBuilderInvalidArguments() { Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build()); try { builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(null).build()); + Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold((Long)null).build()); fail("Should have thrown an NullPointerException"); } catch (NullPointerException expected) { // Expected @@ -474,15 +475,15 @@ public void testBuilderInvalidArguments() { } builder.setFlowControlSettings( - PubSub.FlowControlSettings.DEFAULT + FlowController.Settings.DEFAULT .toBuilder() - .setMaxOutstandingBytes(Optional.of(1)) + .setMaxOutstandingRequestBytes(Optional.of(1)) .build()); try { builder.setFlowControlSettings( - PubSub.FlowControlSettings.DEFAULT + FlowController.Settings.DEFAULT .toBuilder() - .setMaxOutstandingBytes(Optional.of(0)) + .setMaxOutstandingRequestBytes(Optional.of(0)) .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { @@ -490,9 +491,9 @@ public void testBuilderInvalidArguments() { } try { builder.setFlowControlSettings( - PubSub.FlowControlSettings.DEFAULT + FlowController.Settings.DEFAULT .toBuilder() - .setMaxOutstandingBytes(Optional.of(-1)) + .setMaxOutstandingRequestBytes(Optional.of(-1)) .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { @@ -500,15 +501,15 @@ public void testBuilderInvalidArguments() { } builder.setFlowControlSettings( - PubSub.FlowControlSettings.DEFAULT + FlowController.Settings.DEFAULT .toBuilder() - .setMaxOutstandingMessages(Optional.of(1)) + .setMaxOutstandingElementCount(Optional.of(1)) .build()); try { builder.setFlowControlSettings( - PubSub.FlowControlSettings.DEFAULT + FlowController.Settings.DEFAULT .toBuilder() - .setMaxOutstandingMessages(Optional.of(0)) + .setMaxOutstandingElementCount(Optional.of(0)) .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) { @@ -516,9 +517,9 @@ public void testBuilderInvalidArguments() { } try { builder.setFlowControlSettings( - PubSub.FlowControlSettings.DEFAULT + FlowController.Settings.DEFAULT .toBuilder() - .setMaxOutstandingMessages(Optional.of(-1)) + .setMaxOutstandingElementCount(Optional.of(-1)) .build()); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException expected) {