diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java index ac120147d98e..249eab662bae 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/MessageConsumerImpl.java @@ -74,11 +74,10 @@ public void close(ScheduledExecutorService instance) { private final AtomicInteger queuedCallbacks; private final int maxQueuedCallbacks; private final Object futureLock = new Object(); - private final Runnable scheduleRunnable; + private final Runnable consumerRunnable; private boolean closed; private Future scheduledFuture; private PullFuture pullerFuture; - private boolean stopped = true; /** * Default executor factory for the message processor executor. By default a single-threaded @@ -99,6 +98,37 @@ public void release(ExecutorService executor) { } } + class ConsumerRunnable implements Runnable { + + @Override + public void run() { + if (closed) { + return; + } + pullerFuture = pubsubRpc.pull(createPullRequest()); + pullerFuture.addCallback(new PullCallback() { + @Override + public void success(PullResponse response) { + List messages = response.getReceivedMessagesList(); + queuedCallbacks.addAndGet(messages.size()); + for (com.google.pubsub.v1.ReceivedMessage message : messages) { + deadlineRenewer.add(subscription, message.getAckId()); + ReceivedMessage receivedMessage = ReceivedMessage.fromPb(pubsub, subscription, message); + executor.execute(ackingRunnable(receivedMessage)); + } + nextPull(); + } + + @Override + public void failure(Throwable error) { + if (!(error instanceof CancellationException)) { + nextPull(); + } + } + }); + } + } + private MessageConsumerImpl(Builder builder) { this.pubsubOptions = builder.pubsubOptions; this.subscription = builder.subscription; @@ -111,17 +141,7 @@ private MessageConsumerImpl(Builder builder) { this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory()); this.executor = executorFactory.get(); this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS); - this.scheduleRunnable = new Runnable() { - @Override - public void run() { - synchronized (futureLock) { - if (closed) { - return; - } - pull(); - } - } - }; + this.consumerRunnable = new ConsumerRunnable(); nextPull(); } @@ -155,51 +175,23 @@ private PullRequest createPullRequest() { private void scheduleNextPull(long delay, TimeUnit timeUnit) { synchronized (futureLock) { - if (!closed && stopped) { - scheduledFuture = timer.schedule(scheduleRunnable, delay, timeUnit); + if (closed || scheduledFuture != null) { + return; } + scheduledFuture = timer.schedule(consumerRunnable, delay, timeUnit); } } private void nextPull() { synchronized (futureLock) { - if (closed) { + if (closed || queuedCallbacks.get() == maxQueuedCallbacks) { + scheduledFuture = null; return; } - if (queuedCallbacks.get() == maxQueuedCallbacks) { - stopped = true; - } else { - stopped = false; - scheduledFuture = timer.submit(scheduleRunnable); - } + scheduledFuture = timer.submit(consumerRunnable); } } - private void pull() { - pullerFuture = pubsubRpc.pull(createPullRequest()); - pullerFuture.addCallback(new PullCallback() { - @Override - public void success(PullResponse response) { - List messages = response.getReceivedMessagesList(); - queuedCallbacks.addAndGet(messages.size()); - for (com.google.pubsub.v1.ReceivedMessage message : messages) { - deadlineRenewer.add(subscription, message.getAckId()); - final ReceivedMessage receivedMessage = - ReceivedMessage.fromPb(pubsub, subscription, message); - executor.execute(ackingRunnable(receivedMessage)); - } - nextPull(); - } - - @Override - public void failure(Throwable error) { - if (!(error instanceof CancellationException)) { - nextPull(); - } - } - }); - } - @Override public void close() { synchronized (futureLock) { @@ -268,14 +260,4 @@ static Builder builder(PubSubOptions pubsubOptions, String subscription, AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor); } - - /** - * Returns a {@code MessageConsumerImpl} objects given the service options, the subscription from - * which messages must be pulled, the acknowledge deadline renewer and a message processor used to - * process messages. - */ - static Builder of(PubSubOptions pubsubOptions, String subscription, - AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) { - return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor); - } }