From f2461243898e16bb26b27a4794d3f82bab086795 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 1 Nov 2017 11:50:01 +1100 Subject: [PATCH 1/3] pubsub: create experimental message dispatcher We emphasize simplicity, not optimal performance. Of course, performance is still important, hence the many concurrent data structures used here. Major differences from the original and some justifications: - It modacks immediately after receiving messages to take advantage of server-side duplicate-mitigation feature that should soon be available. - In the original impl, we create many dispatchers. They share some data structures and keep others separate. I find this too confusing. The new impl creates only one dispatcher, to be shared among all connections. This obviously increases contention but should be at least partially alleviated by some lock-free algorithms used here. - It makes deadline a constant of 1 minute. With the dup-mitigation feature, the server might need to wait on the order of minutes before it could redeliver messages. I opine that setting the deadline to 1 minute shouldn't drastically worsen the redelivery latency. Also unlike the original, it does not periodically adjust deadline. I have some ideas on how this could be simply implemented; we can add this feature back if necessary. - Modack time is also set to 1 minute and doesn't exponentially back off. Since the deadline is already 1 minute, it seems silly to bicker over a few extra seconds. [1] - Modacks run on fixed schedule, giving 15 seconds padding, and modacks all pending messages, not just the ones about to expire. While clearly suboptimal, it's not very expensive since it only happens once every 45 seconds. [1] This caused a bug. If the padding is set too large, we'd schedule modacks to occur in the past, creating a modack storm. I believe the benefits of reduced complexity outweighs the cost. Load test shows the current implementation still has no trouble catching up with the publisher. --- .../cloud/pubsub/v1/MessageDispatcher2.java | 278 ++++++++++++++++++ 1 file changed, 278 insertions(+) create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java new file mode 100644 index 000000000000..6a974f986fad --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java @@ -0,0 +1,278 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.batching.FlowController.FlowControlException; +import com.google.common.collect.Lists; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.StreamingPullRequest; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +class MessageDispatcher2 { + interface Connection { + void send(StreamingPullRequest request); + } + + private static class WorkItem { + final ReceivedMessage message; + final AtomicInteger remaining; + final Runnable callback; + + WorkItem(ReceivedMessage message, AtomicInteger remaining, Runnable callback) { + this.message = message; + this.remaining = remaining; + this.callback = callback; + } + } + + private class Consumer implements AckReplyConsumer { + final AtomicReference completed = new AtomicReference<>(); + final WorkItem workItem; + + Consumer(WorkItem workItem) { + this.workItem = workItem; + } + + private void complete(String reason) { + if (!completed.compareAndSet(null, reason)) { + throw new IllegalStateException( + String.format( + "the message has already been completed (%s), cannot %s", completed.get(), reason)); + } + extensionSet.remove(workItem.message.getAckId()); + flowController.release(1, workItem.message.getMessage().getSerializedSize()); + if (workItem.remaining.decrementAndGet() == 0) { + workItem.callback.run(); + } + workMessages(); + } + + @Override + public void ack() { + complete("ack"); + idsToAck.add(workItem.message.getAckId()); + } + + @Override + public void nack() { + complete("nack"); + idsToNack.add(workItem.message.getAckId()); + } + + void throwException(Throwable t) { + complete("throw exception"); + String ackId = workItem.message.getAckId(); + logger.log( + Level.WARNING, + "MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.", + t); + idsToNack.add(ackId); + } + } + + private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName()); + private static final int DEADLINE_EXTENSION_SEC = 60; + private static final int KEEPALIVE_SEC = 45; + private static final int MAX_CHANGE_PER_REQUEST = 1000; + + private final ExecutorService executor; + private final ScheduledExecutorService systemExecutor; + private final MessageReceiver receiver; + private final FlowController flowController; + + private final ConcurrentLinkedQueue workQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue idsToAck = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue idsToNack = new ConcurrentLinkedQueue<>(); + + // Boolean, not Void, because the map doesn't allow null values. + private final ConcurrentHashMap extensionSet = new ConcurrentHashMap<>(); + + private final LinkedBlockingQueue connections = new LinkedBlockingQueue<>(); + + private ScheduledFuture ackNackJob; + private ScheduledFuture extensionJob; + + MessageDispatcher2( + ExecutorService executor, + ScheduledExecutorService systemExecutor, + MessageReceiver receiver, + FlowController flowController) { + this.executor = executor; + this.systemExecutor = systemExecutor; + this.receiver = receiver; + this.flowController = flowController; + } + + synchronized void start(Collection connections) { + this.connections.addAll(connections); + this.ackNackJob = + systemExecutor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + StreamingPullRequest.Builder builder = StreamingPullRequest.newBuilder(); + boolean more = true; + while (more) { + builder.clear(); + more = drainAckNacks(builder); + // If both are 0, there's nothing to send. + if (builder.getAckIdsCount() != 0 + || builder.getModifyDeadlineAckIdsCount() != 0) { + sendRequest(builder.build()); + } + } + } + }, + 100, + 100, + TimeUnit.MILLISECONDS); + + this.extensionJob = + systemExecutor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + List extendIds = new ArrayList<>(extensionSet.keySet()); + for (List batch : Lists.partition(extendIds, MAX_CHANGE_PER_REQUEST)) { + sendRequest( + StreamingPullRequest.newBuilder() + .addAllModifyDeadlineAckIds(batch) + .addAllModifyDeadlineSeconds( + Collections.nCopies(batch.size(), DEADLINE_EXTENSION_SEC)) + .build()); + } + } + }, + KEEPALIVE_SEC, + KEEPALIVE_SEC, + TimeUnit.SECONDS); + } + + synchronized void shutdown() { + ackNackJob.cancel(false); + extensionJob.cancel(false); + } + + private boolean drainAckNacks(StreamingPullRequest.Builder builder) { + while (builder.getAckIdsCount() < MAX_CHANGE_PER_REQUEST) { + String id = idsToAck.poll(); + if (id == null) { + break; + } + builder.addAckIds(id); + } + + while (builder.getModifyDeadlineAckIdsCount() < MAX_CHANGE_PER_REQUEST) { + String id = idsToNack.poll(); + if (id == null) { + break; + } + builder.addModifyDeadlineAckIds(id); + } + builder.addAllModifyDeadlineSeconds( + Collections.nCopies(builder.getModifyDeadlineAckIdsCount(), 0)); + + return builder.getAckIdsCount() == MAX_CHANGE_PER_REQUEST + || builder.getModifyDeadlineAckIdsCount() == MAX_CHANGE_PER_REQUEST; + } + + void processReceivedMessages(List messages, Runnable doneCallback) { + if (messages.isEmpty()) { + doneCallback.run(); + return; + } + + StreamingPullRequest.Builder builder = + StreamingPullRequest.newBuilder() + .addAllModifyDeadlineSeconds( + Collections.nCopies(messages.size(), DEADLINE_EXTENSION_SEC)); + for (ReceivedMessage message : messages) { + builder.addModifyDeadlineAckIds(message.getAckId()); + } + sendRequest(builder.build()); + + AtomicInteger remaining = new AtomicInteger(messages.size()); + for (ReceivedMessage message : messages) { + workQueue.add(new WorkItem(message, remaining, doneCallback)); + extensionSet.put(message.getAckId(), true); + } + workMessages(); + } + + private synchronized void workMessages() { + for (; ; ) { + final WorkItem item = workQueue.peek(); + if (item == null) { + return; + } + + try { + flowController.reserve(1, item.message.getMessage().getSerializedSize()); + } catch (FlowControlException e) { + return; + } + workQueue.poll(); + + final Consumer consumer = new Consumer(item); + executor.submit( + new Runnable() { + @Override + public void run() { + try { + receiver.receiveMessage(item.message.getMessage(), consumer); + } catch (Exception e) { + consumer.throwException(e); + } + } + }); + } + } + + private void sendRequest(StreamingPullRequest request) { + Connection connection = null; + try { + connection = connections.take(); + connection.send(request); + } catch (InterruptedException e) { + logger.log( + Level.WARNING, "interrupted while waiting for available connection; message not sent", e); + } finally { + if (connection != null) { + connections.add(connection); + } + } + } + + int getMessageDeadlineSeconds() { + return DEADLINE_EXTENSION_SEC; + } +} From a9c1812183dc42217332220f1092b6ca019d12ca Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 2 Nov 2017 18:36:41 +1100 Subject: [PATCH 2/3] make codacy happy --- .../cloud/pubsub/v1/MessageDispatcher2.java | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java index 6a974f986fad..f250faec5669 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java @@ -38,6 +38,29 @@ import java.util.logging.Logger; class MessageDispatcher2 { + + private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName()); + private static final int DEADLINE_EXTENSION_SEC = 60; + private static final int KEEPALIVE_SEC = 45; + private static final int MAX_CHANGE_PER_REQUEST = 1000; + + private final ExecutorService executor; + private final ScheduledExecutorService systemExecutor; + private final MessageReceiver receiver; + private final FlowController flowController; + + private final ConcurrentLinkedQueue workQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue idsToAck = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue idsToNack = new ConcurrentLinkedQueue<>(); + + // Boolean, not Void, because the map doesn't allow null values. + private final ConcurrentHashMap extensionSet = new ConcurrentHashMap<>(); + + private final LinkedBlockingQueue connections = new LinkedBlockingQueue<>(); + + private ScheduledFuture ackNackJob; + private ScheduledFuture extensionJob; + interface Connection { void send(StreamingPullRequest request); } @@ -99,28 +122,6 @@ void throwException(Throwable t) { } } - private static final Logger logger = Logger.getLogger(MessageDispatcher.class.getName()); - private static final int DEADLINE_EXTENSION_SEC = 60; - private static final int KEEPALIVE_SEC = 45; - private static final int MAX_CHANGE_PER_REQUEST = 1000; - - private final ExecutorService executor; - private final ScheduledExecutorService systemExecutor; - private final MessageReceiver receiver; - private final FlowController flowController; - - private final ConcurrentLinkedQueue workQueue = new ConcurrentLinkedQueue<>(); - private final ConcurrentLinkedQueue idsToAck = new ConcurrentLinkedQueue<>(); - private final ConcurrentLinkedQueue idsToNack = new ConcurrentLinkedQueue<>(); - - // Boolean, not Void, because the map doesn't allow null values. - private final ConcurrentHashMap extensionSet = new ConcurrentHashMap<>(); - - private final LinkedBlockingQueue connections = new LinkedBlockingQueue<>(); - - private ScheduledFuture ackNackJob; - private ScheduledFuture extensionJob; - MessageDispatcher2( ExecutorService executor, ScheduledExecutorService systemExecutor, From 06fecbbf024e6c0398423ec7da845cda4cec2587 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 2 Nov 2017 18:49:43 +1100 Subject: [PATCH 3/3] batch modack better --- .../cloud/pubsub/v1/MessageDispatcher2.java | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java index f250faec5669..d4fa9a95c38a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher2.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; +import com.google.auto.value.AutoValue; class MessageDispatcher2 { @@ -51,7 +52,7 @@ class MessageDispatcher2 { private final ConcurrentLinkedQueue workQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue idsToAck = new ConcurrentLinkedQueue<>(); - private final ConcurrentLinkedQueue idsToNack = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue modAcks = new ConcurrentLinkedQueue<>(); // Boolean, not Void, because the map doesn't allow null values. private final ConcurrentHashMap extensionSet = new ConcurrentHashMap<>(); @@ -60,11 +61,21 @@ class MessageDispatcher2 { private ScheduledFuture ackNackJob; private ScheduledFuture extensionJob; - + interface Connection { void send(StreamingPullRequest request); } +@AutoValue + static abstract class ModAckItem { + abstract String ackId(); + abstract int seconds(); + + static ModAckItem create(String ackId, int seconds) { + return new AutoValue_MessageDispatcher2_ModAckItem(ackId, seconds); + } + } + private static class WorkItem { final ReceivedMessage message; final AtomicInteger remaining; @@ -108,7 +119,7 @@ public void ack() { @Override public void nack() { complete("nack"); - idsToNack.add(workItem.message.getAckId()); + modAcks.add(ModAckItem.create(workItem.message.getAckId(), 0)); } void throwException(Throwable t) { @@ -118,7 +129,7 @@ void throwException(Throwable t) { Level.WARNING, "MessageReceiver failed to processes ack ID: " + ackId + ", the message will be nacked.", t); - idsToNack.add(ackId); + modAcks.add(ModAckItem.create(ackId, 0)); } } @@ -193,14 +204,13 @@ private boolean drainAckNacks(StreamingPullRequest.Builder builder) { } while (builder.getModifyDeadlineAckIdsCount() < MAX_CHANGE_PER_REQUEST) { - String id = idsToNack.poll(); - if (id == null) { + ModAckItem modAck = modAcks.poll(); + if (modAck == null) { break; } - builder.addModifyDeadlineAckIds(id); + builder.addModifyDeadlineAckIds(modAck.ackId()); + builder.addModifyDeadlineSeconds(modAck.seconds()); } - builder.addAllModifyDeadlineSeconds( - Collections.nCopies(builder.getModifyDeadlineAckIdsCount(), 0)); return builder.getAckIdsCount() == MAX_CHANGE_PER_REQUEST || builder.getModifyDeadlineAckIdsCount() == MAX_CHANGE_PER_REQUEST; @@ -212,14 +222,9 @@ void processReceivedMessages(List messages, Runnable doneCallba return; } - StreamingPullRequest.Builder builder = - StreamingPullRequest.newBuilder() - .addAllModifyDeadlineSeconds( - Collections.nCopies(messages.size(), DEADLINE_EXTENSION_SEC)); for (ReceivedMessage message : messages) { - builder.addModifyDeadlineAckIds(message.getAckId()); + modAcks.add(ModAckItem.create(message.getAckId(), DEADLINE_EXTENSION_SEC)); } - sendRequest(builder.build()); AtomicInteger remaining = new AtomicInteger(messages.size()); for (ReceivedMessage message : messages) {