Skip to content

Commit

Permalink
Refactor MessageConsumerImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 20, 2016
1 parent 5260293 commit 28512a3
Showing 1 changed file with 39 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ public void close(ScheduledExecutorService instance) {
private final AtomicInteger queuedCallbacks;
private final int maxQueuedCallbacks;
private final Object futureLock = new Object();
private final Runnable scheduleRunnable;
private final Runnable consumerRunnable;
private boolean closed;
private Future<?> scheduledFuture;
private PullFuture pullerFuture;
private boolean stopped = true;

/**
* Default executor factory for the message processor executor. By default a single-threaded
Expand All @@ -99,6 +98,37 @@ public void release(ExecutorService executor) {
}
}

class ConsumerRunnable implements Runnable {

@Override
public void run() {
if (closed) {
return;
}
pullerFuture = pubsubRpc.pull(createPullRequest());
pullerFuture.addCallback(new PullCallback() {
@Override
public void success(PullResponse response) {
List<com.google.pubsub.v1.ReceivedMessage> messages = response.getReceivedMessagesList();
queuedCallbacks.addAndGet(messages.size());
for (com.google.pubsub.v1.ReceivedMessage message : messages) {
deadlineRenewer.add(subscription, message.getAckId());
ReceivedMessage receivedMessage = ReceivedMessage.fromPb(pubsub, subscription, message);
executor.execute(ackingRunnable(receivedMessage));
}
nextPull();
}

@Override
public void failure(Throwable error) {
if (!(error instanceof CancellationException)) {
nextPull();
}
}
});
}
}

private MessageConsumerImpl(Builder builder) {
this.pubsubOptions = builder.pubsubOptions;
this.subscription = builder.subscription;
Expand All @@ -111,17 +141,7 @@ private MessageConsumerImpl(Builder builder) {
this.executorFactory = firstNonNull(builder.executorFactory, new DefaultExecutorFactory());
this.executor = executorFactory.get();
this.maxQueuedCallbacks = firstNonNull(builder.maxQueuedCallbacks, MAX_QUEUED_CALLBACKS);
this.scheduleRunnable = new Runnable() {
@Override
public void run() {
synchronized (futureLock) {
if (closed) {
return;
}
pull();
}
}
};
this.consumerRunnable = new ConsumerRunnable();
nextPull();
}

Expand Down Expand Up @@ -155,51 +175,23 @@ private PullRequest createPullRequest() {

private void scheduleNextPull(long delay, TimeUnit timeUnit) {
synchronized (futureLock) {
if (!closed && stopped) {
scheduledFuture = timer.schedule(scheduleRunnable, delay, timeUnit);
if (closed || scheduledFuture != null) {
return;
}
scheduledFuture = timer.schedule(consumerRunnable, delay, timeUnit);
}
}

private void nextPull() {
synchronized (futureLock) {
if (closed) {
if (closed || queuedCallbacks.get() == maxQueuedCallbacks) {
scheduledFuture = null;
return;
}
if (queuedCallbacks.get() == maxQueuedCallbacks) {
stopped = true;
} else {
stopped = false;
scheduledFuture = timer.submit(scheduleRunnable);
}
scheduledFuture = timer.submit(consumerRunnable);
}
}

private void pull() {
pullerFuture = pubsubRpc.pull(createPullRequest());
pullerFuture.addCallback(new PullCallback() {
@Override
public void success(PullResponse response) {
List<com.google.pubsub.v1.ReceivedMessage> messages = response.getReceivedMessagesList();
queuedCallbacks.addAndGet(messages.size());
for (com.google.pubsub.v1.ReceivedMessage message : messages) {
deadlineRenewer.add(subscription, message.getAckId());
final ReceivedMessage receivedMessage =
ReceivedMessage.fromPb(pubsub, subscription, message);
executor.execute(ackingRunnable(receivedMessage));
}
nextPull();
}

@Override
public void failure(Throwable error) {
if (!(error instanceof CancellationException)) {
nextPull();
}
}
});
}

@Override
public void close() {
synchronized (futureLock) {
Expand Down Expand Up @@ -268,14 +260,4 @@ static Builder builder(PubSubOptions pubsubOptions, String subscription,
AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) {
return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor);
}

/**
* Returns a {@code MessageConsumerImpl} objects given the service options, the subscription from
* which messages must be pulled, the acknowledge deadline renewer and a message processor used to
* process messages.
*/
static Builder of(PubSubOptions pubsubOptions, String subscription,
AckDeadlineRenewer deadlineRenewer, MessageProcessor messageProcessor) {
return new Builder(pubsubOptions, subscription, deadlineRenewer, messageProcessor);
}
}

0 comments on commit 28512a3

Please sign in to comment.