diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index a851797500a0..3400c259e950 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -24,7 +24,6 @@ import com.google.api.gax.batching.FlowController; import com.google.api.gax.batching.FlowController.FlowControlException; import com.google.api.gax.core.Distribution; -import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; @@ -33,14 +32,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Deque; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -91,9 +87,6 @@ class MessageDispatcher { private final Lock jobLock; private ScheduledFuture backgroundJob; - private final LinkedBlockingDeque outstandingMessageBatches = - new LinkedBlockingDeque<>(); - // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; @@ -155,7 +148,6 @@ private void forget() { } flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); - processOutstandingBatches(); } @Override @@ -296,50 +288,19 @@ int getMessageDeadlineSeconds() { return messageDeadlineSeconds.get(); } - static class OutstandingMessageBatch { - private final Deque messages; - private final Runnable doneCallback; - - static class OutstandingMessage { - private final ReceivedMessage receivedMessage; - private final AckHandler ackHandler; - - public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { - this.receivedMessage = receivedMessage; - this.ackHandler = ackHandler; - } - - public ReceivedMessage receivedMessage() { - return receivedMessage; - } - - public AckHandler ackHandler() { - return ackHandler; - } - } + static class OutstandingMessage { + private final ReceivedMessage receivedMessage; + private final AckHandler ackHandler; - public OutstandingMessageBatch(Runnable doneCallback) { - this.messages = new LinkedList<>(); - this.doneCallback = doneCallback; - } - - public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { - this.messages.add(new OutstandingMessage(receivedMessage, ackHandler)); - } - - public Deque messages() { - return messages; + public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.receivedMessage = receivedMessage; + this.ackHandler = ackHandler; } } - public void processReceivedMessages(List messages, Runnable doneCallback) { - if (messages.isEmpty()) { - doneCallback.run(); - return; - } - + public void processReceivedMessages(List messages) { Instant totalExpiration = now().plus(maxAckExtensionPeriod); - OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback); + List outstandingBatch = new ArrayList<>(messages.size()); for (ReceivedMessage message : messages) { AckHandler ackHandler = new AckHandler( @@ -355,42 +316,25 @@ public void processReceivedMessages(List messages, Runnable don // totally expire so that pubsub service sends us the message again. continue; } - outstandingBatch.addMessage(message, ackHandler); + outstandingBatch.add(new OutstandingMessage(message, ackHandler)); pendingReceipts.add(message.getAckId()); } - if (outstandingBatch.messages.isEmpty()) { - doneCallback.run(); - return; - } - - messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size()); - outstandingMessageBatches.add(outstandingBatch); - processOutstandingBatches(); + processBatch(outstandingBatch); } - private void processOutstandingBatches() { - for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll(); - nextBatch != null; - nextBatch = outstandingMessageBatches.poll()) { - for (OutstandingMessage nextMessage = nextBatch.messages.poll(); - nextMessage != null; - nextMessage = nextBatch.messages.poll()) { - try { - // This is a non-blocking flow controller. - flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize()); - } catch (FlowController.MaxOutstandingElementCountReachedException - | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { - // Unwind previous changes in the batches outstanding. - nextBatch.messages.addFirst(nextMessage); - outstandingMessageBatches.addFirst(nextBatch); - return; - } catch (FlowControlException unexpectedException) { - throw new IllegalStateException("Flow control unexpected exception", unexpectedException); - } - processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler); + private void processBatch(List batch) { + messagesWaiter.incrementPendingMessages(batch.size()); + for (OutstandingMessage message : batch) { + // This is a blocking flow controller. We have already incremented MessageWaiter, so + // shutdown will block on processing of all these messages anyway. + try { + flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize()); + } catch (FlowControlException unexpectedException) { + // This should be a blocking flow controller and never throw an exception. + throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } - nextBatch.doneCallback.run(); + processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler); } } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index ba12e97fb8a1..0f273c3429a8 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -151,26 +151,20 @@ public void onStart(StreamController controller) { @Override public void onResponse(StreamingPullResponse response) { channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); - messageDispatcher.processReceivedMessages( - response.getReceivedMessagesList(), - new Runnable() { - @Override - public void run() { - // Only request more if we're not shutdown. - // If errorFuture is done, the stream has either failed or hung up, - // and we don't need to request. - if (isAlive() && !errorFuture.isDone()) { - lock.lock(); - try { - thisController.request(1); - } catch (Exception e) { - logger.log(Level.WARNING, "cannot request more messages", e); - } finally { - lock.unlock(); - } - } - } - }); + messageDispatcher.processReceivedMessages(response.getReceivedMessagesList()); + // Only request more if we're not shutdown. + // If errorFuture is done, the stream has either failed or hung up, + // and we don't need to request. + if (isAlive() && !errorFuture.isDone()) { + lock.lock(); + try { + thisController.request(1); + } catch (Exception e) { + logger.log(Level.WARNING, "cannot request more messages", e); + } finally { + lock.unlock(); + } + } } @Override diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 70d17a4c9884..eb42ad82b7b6 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -130,7 +130,7 @@ private Subscriber(Builder builder) { builder .flowControlSettings .toBuilder() - .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .setLimitExceededBehavior(LimitExceededBehavior.Block) .build()); this.numPullers = builder.parallelPullCount; diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 494945e028df..c4ca6e51a023 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -105,7 +105,7 @@ public void sendAckOperations( new FlowController( FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(1L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) .build()); dispatcher = @@ -124,7 +124,7 @@ public void sendAckOperations( @Test public void testReceipt() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); dispatcher.processOutstandingAckOperations(); assertThat(sentModAcks) .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); @@ -132,7 +132,7 @@ public void testReceipt() throws Exception { @Test public void testAck() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); consumers.take().ack(); dispatcher.processOutstandingAckOperations(); assertThat(sentAcks).contains(TEST_MESSAGE.getAckId()); @@ -140,7 +140,7 @@ public void testAck() throws Exception { @Test public void testNack() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); consumers.take().nack(); dispatcher.processOutstandingAckOperations(); assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0)); @@ -148,7 +148,7 @@ public void testNack() throws Exception { @Test public void testExtension() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); dispatcher.extendDeadlines(); assertThat(sentModAcks) .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); @@ -161,7 +161,7 @@ public void testExtension() throws Exception { @Test public void testExtension_Close() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); dispatcher.extendDeadlines(); assertThat(sentModAcks) .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); @@ -176,7 +176,7 @@ public void testExtension_Close() throws Exception { @Test public void testExtension_GiveUp() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); dispatcher.extendDeadlines(); assertThat(sentModAcks) .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); @@ -188,7 +188,7 @@ public void testExtension_GiveUp() throws Exception { dispatcher.extendDeadlines(); assertThat(sentModAcks).isEmpty(); - // We should be able to reserve another item in the flow controller and not block shutdown + // We should be able to reserve another item in the flow controller and not block. flowController.reserve(1, 0); dispatcher.stop(); } @@ -197,7 +197,7 @@ public void testExtension_GiveUp() throws Exception { public void testDeadlineAdjustment() throws Exception { assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10); - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); clock.advance(42, TimeUnit.SECONDS); consumers.take().ack();