Skip to content

Commit

Permalink
Change MessageDispatcher to own its own queue. (#4590)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google authored and sduskis committed Feb 28, 2019
1 parent 1e18a7d commit 54aa55d
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -90,7 +91,8 @@ class MessageDispatcher {
private final Lock jobLock;
private ScheduledFuture<?> backgroundJob;

private final Deque<OutstandingMessageBatch> outstandingMessageBatches;
private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
new LinkedBlockingDeque<>();

// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;
Expand Down Expand Up @@ -200,7 +202,6 @@ void sendAckOperations(
Duration maxAckExtensionPeriod,
Distribution ackLatencyDistribution,
FlowController flowController,
Deque<OutstandingMessageBatch> outstandingMessageBatches,
Executor executor,
ScheduledExecutorService systemExecutor,
ApiClock clock) {
Expand All @@ -211,7 +212,6 @@ void sendAckOperations(
this.receiver = receiver;
this.ackProcessor = ackProcessor;
this.flowController = flowController;
this.outstandingMessageBatches = outstandingMessageBatches;
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
this.ackLatencyDistribution = ackLatencyDistribution;
jobLock = new ReentrantLock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import io.grpc.Status;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -84,7 +83,6 @@ public StreamingSubscriberConnection(
SubscriberStub stub,
int channelAffinity,
FlowController flowController,
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
ApiClock clock) {
Expand All @@ -100,7 +98,6 @@ public StreamingSubscriberConnection(
maxAckExtensionPeriod,
ackLatencyDistribution,
flowController,
outstandingMessageBatches,
executor,
systemExecutor,
clock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
import com.google.pubsub.v1.ProjectSubscriptionName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
Expand Down Expand Up @@ -115,8 +113,6 @@ public class Subscriber extends AbstractApiService {

private final MessageReceiver receiver;
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
private final Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches =
new LinkedList<>();
private final ApiClock clock;
private final List<AutoCloseable> closeables = new ArrayList<>();

Expand Down Expand Up @@ -329,7 +325,6 @@ private void startStreamingConnections() {
subStub,
i,
flowController,
outstandingMessageBatches,
executor,
alarmsExecutor,
clock));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -117,7 +116,6 @@ public void sendAckOperations(
Duration.ofMinutes(60),
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
flowController,
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
MoreExecutors.directExecutor(),
systemExecutor,
clock);
Expand Down

0 comments on commit 54aa55d

Please sign in to comment.