Skip to content

Commit

Permalink
pubsub: all connections share one message queue (#2459)
Browse files Browse the repository at this point in the history
Fixes #2452.
  • Loading branch information
pongad authored Sep 23, 2017
1 parent 20678f4 commit 7864376
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.api.gax.core.Distribution;
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessagesBatch.OutstandingMessage;
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -81,7 +81,7 @@ class MessageDispatcher {
private Instant nextAckDeadlineExtensionAlarmTime;
private ScheduledFuture<?> pendingAcksAlarm;

private final Deque<OutstandingMessagesBatch> outstandingMessageBatches;
private final Deque<OutstandingMessageBatch> outstandingMessageBatches;

// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;
Expand Down Expand Up @@ -238,6 +238,7 @@ void sendAckOperations(
Duration maxAckExtensionPeriod,
Distribution ackLatencyDistribution,
FlowController flowController,
Deque<OutstandingMessageBatch> outstandingMessageBatches,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
ApiClock clock) {
Expand All @@ -248,7 +249,7 @@ void sendAckOperations(
this.receiver = receiver;
this.ackProcessor = ackProcessor;
this.flowController = flowController;
outstandingMessageBatches = new LinkedList<>();
this.outstandingMessageBatches = outstandingMessageBatches;
outstandingAckHandlers = new PriorityQueue<>();
pendingAcks = new HashSet<>();
pendingNacks = new HashSet<>();
Expand Down Expand Up @@ -282,7 +283,7 @@ public int getMessageDeadlineSeconds() {
return messageDeadlineSeconds;
}

static class OutstandingMessagesBatch {
static class OutstandingMessageBatch {
private final Deque<OutstandingMessage> messages;
private final Runnable doneCallback;

Expand All @@ -304,7 +305,7 @@ public AckHandler ackHandler() {
}
}

public OutstandingMessagesBatch(Runnable doneCallback) {
public OutstandingMessageBatch(Runnable doneCallback) {
this.messages = new LinkedList<>();
this.doneCallback = doneCallback;
}
Expand All @@ -325,7 +326,7 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
}
messagesWaiter.incrementPendingMessages(messages.size());

OutstandingMessagesBatch outstandingBatch = new OutstandingMessagesBatch(doneCallback);
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
final ArrayList<AckHandler> ackHandlers = new ArrayList<>(messages.size());
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
Expand Down Expand Up @@ -358,7 +359,7 @@ public void processOutstandingBatches() {
Runnable batchCallback = null;
OutstandingMessage outstandingMessage;
synchronized (outstandingMessageBatches) {
OutstandingMessagesBatch nextBatch = outstandingMessageBatches.peek();
OutstandingMessageBatch nextBatch = outstandingMessageBatches.peek();
if (nextBatch == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
import com.google.pubsub.v1.Subscription;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -70,6 +71,7 @@ public PollingSubscriberConnection(
SubscriberFutureStub stub,
FlowController flowController,
@Nullable Long maxDesiredPulledMessages,
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
ApiClock clock) {
Expand All @@ -84,6 +86,7 @@ public PollingSubscriberConnection(
maxAckExtensionPeriod,
ackLatencyDistribution,
flowController,
outstandingMessageBatches,
executor,
systemExecutor,
clock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -71,6 +72,7 @@ public StreamingSubscriberConnection(
Distribution ackLatencyDistribution,
SubscriberStub asyncStub,
FlowController flowController,
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
ScheduledExecutorService executor,
@Nullable ScheduledExecutorService alarmsExecutor,
ApiClock clock) {
Expand All @@ -85,6 +87,7 @@ public StreamingSubscriberConnection(
maxAckExtensionPeriod,
ackLatencyDistribution,
flowController,
outstandingMessageBatches,
executor,
alarmsExecutor,
clock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -115,6 +117,8 @@ public class Subscriber extends AbstractApiService {
private final MessageReceiver receiver;
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
new LinkedList<>();
private final ApiClock clock;
private final List<AutoCloseable> closeables = new ArrayList<>();
private final boolean useStreaming;
Expand Down Expand Up @@ -328,6 +332,7 @@ private void startPollingConnections() throws IOException {
stub,
flowController,
flowControlSettings.getMaxOutstandingElementCount(),
outstandingMessageBatches,
executor,
alarmsExecutor,
clock));
Expand Down Expand Up @@ -374,6 +379,7 @@ private void startStreamingConnections() throws IOException {
ackLatencyDistribution,
stub,
flowController,
outstandingMessageBatches,
executor,
alarmsExecutor,
clock));
Expand Down

0 comments on commit 7864376

Please sign in to comment.