diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 5f5ebbaee204..5c6f1cdc9414 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -90,7 +91,8 @@ class MessageDispatcher { private final Lock jobLock; private ScheduledFuture backgroundJob; - private final Deque outstandingMessageBatches; + private final LinkedBlockingDeque outstandingMessageBatches = + new LinkedBlockingDeque<>(); // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; @@ -200,7 +202,6 @@ void sendAckOperations( Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, FlowController flowController, - Deque outstandingMessageBatches, Executor executor, ScheduledExecutorService systemExecutor, ApiClock clock) { @@ -211,7 +212,6 @@ void sendAckOperations( this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; - this.outstandingMessageBatches = outstandingMessageBatches; // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS this.ackLatencyDistribution = ackLatencyDistribution; jobLock = new ReentrantLock(); diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index defd66806a84..ba12e97fb8a1 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -42,7 +42,6 @@ import com.google.pubsub.v1.StreamingPullRequest; import com.google.pubsub.v1.StreamingPullResponse; import io.grpc.Status; -import java.util.Deque; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -84,7 +83,6 @@ public StreamingSubscriberConnection( SubscriberStub stub, int channelAffinity, FlowController flowController, - Deque outstandingMessageBatches, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, ApiClock clock) { @@ -100,7 +98,6 @@ public StreamingSubscriberConnection( maxAckExtensionPeriod, ackLatencyDistribution, flowController, - outstandingMessageBatches, executor, systemExecutor, clock); diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index ed5e8551d75e..b505681b195b 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -44,8 +44,6 @@ import com.google.pubsub.v1.ProjectSubscriptionName; import java.io.IOException; import java.util.ArrayList; -import java.util.Deque; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; @@ -115,8 +113,6 @@ public class Subscriber extends AbstractApiService { private final MessageReceiver receiver; private final List streamingSubscriberConnections; - private final Deque outstandingMessageBatches = - new LinkedList<>(); private final ApiClock clock; private final List closeables = new ArrayList<>(); @@ -329,7 +325,6 @@ private void startStreamingConnections() { subStub, i, flowController, - outstandingMessageBatches, executor, alarmsExecutor, clock)); diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 785368bb13cb..494945e028df 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -28,7 +28,6 @@ import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -117,7 +116,6 @@ public void sendAckOperations( Duration.ofMinutes(60), new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1), flowController, - new LinkedList(), MoreExecutors.directExecutor(), systemExecutor, clock);