-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pubsub ordering keys #5055
Pubsub ordering keys #5055
Changes from 53 commits
41adb13
7793aa3
41430b2
81ea157
fab0188
858d4e9
c2c8b5c
ad54d69
e026c5c
cc44d4b
e492b04
3f9a663
e88432a
7732506
b68add0
54420a4
7f96c1a
6be9e3d
7487e88
708d785
d556ce4
4a26abe
18aca86
b06fd9e
40e83a1
43f2ff3
d661492
dd8a5ce
97ba17c
02e06ae
c90b83c
956537c
75acad9
5480455
d98f981
3bdbf77
41b9975
42697e2
fd07e9e
74daec8
dd8db2e
9259fe6
8573e5a
ec2ecc6
0b00228
142196c
92a7bf4
32f4e23
f6fcbed
c918164
6073e7e
5a81613
e9e754c
33ebb40
cec8e94
2400120
52c7c93
e463a05
c01aa11
3087968
da6dd3c
ede8436
ae78886
99d1396
cf6d528
3ea3dc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,9 +49,13 @@ | |
import com.google.pubsub.v1.TopicNames; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.EnumSet; | ||
import java.util.HashMap; | ||
import java.util.Iterator; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.ScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
|
@@ -85,15 +89,17 @@ public class Publisher { | |
private final String topicName; | ||
|
||
private final BatchingSettings batchingSettings; | ||
private final boolean enableMessageOrdering; | ||
|
||
private final Lock messagesBatchLock; | ||
private MessagesBatch messagesBatch; | ||
final Map<String, MessagesBatch> messagesBatches; | ||
|
||
private final AtomicBoolean activeAlarm; | ||
|
||
private final PublisherStub publisherStub; | ||
|
||
private final ScheduledExecutorService executor; | ||
final SequentialExecutorService.CallbackExecutor sequentialExecutor; | ||
private final AtomicBoolean shutdown; | ||
private final BackgroundResource backgroundResources; | ||
private final MessageWaiter messagesWaiter; | ||
|
@@ -114,22 +120,33 @@ private Publisher(Builder builder) throws IOException { | |
topicName = builder.topicName; | ||
|
||
this.batchingSettings = builder.batchingSettings; | ||
this.enableMessageOrdering = builder.enableMessageOrdering; | ||
this.messageTransform = builder.messageTransform; | ||
|
||
messagesBatch = new MessagesBatch(batchingSettings); | ||
messagesBatches = new HashMap<>(); | ||
messagesBatchLock = new ReentrantLock(); | ||
activeAlarm = new AtomicBoolean(false); | ||
executor = builder.executorProvider.getExecutor(); | ||
sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor); | ||
List<BackgroundResource> backgroundResourceList = new ArrayList<>(); | ||
if (builder.executorProvider.shouldAutoClose()) { | ||
backgroundResourceList.add(new ExecutorAsBackgroundResource(executor)); | ||
} | ||
|
||
// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry. | ||
// We post-process this here to keep backward-compatibility. | ||
RetrySettings retrySettings = builder.retrySettings; | ||
if (retrySettings.getMaxAttempts() == 0) { | ||
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build(); | ||
// Also, if "message ordering" is enabled, the publisher should retry sending the failed | ||
// message infinitely rather than sending the next one. | ||
RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder(); | ||
if (retrySettingsBuilder.getMaxAttempts() == 0) { | ||
retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE); | ||
} | ||
if (enableMessageOrdering) { | ||
// TODO: is there a way to have the default retry settings for requests without an ordering | ||
// key? | ||
retrySettingsBuilder | ||
.setMaxAttempts(Integer.MAX_VALUE) | ||
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE)); | ||
} | ||
|
||
PublisherStubSettings.Builder stubSettings = | ||
|
@@ -140,14 +157,15 @@ private Publisher(Builder builder) throws IOException { | |
stubSettings | ||
.publishSettings() | ||
.setRetryableCodes( | ||
StatusCode.Code.ABORTED, | ||
StatusCode.Code.CANCELLED, | ||
StatusCode.Code.DEADLINE_EXCEEDED, | ||
StatusCode.Code.INTERNAL, | ||
StatusCode.Code.RESOURCE_EXHAUSTED, | ||
StatusCode.Code.UNKNOWN, | ||
StatusCode.Code.UNAVAILABLE) | ||
.setRetrySettings(retrySettings) | ||
EnumSet.of( | ||
StatusCode.Code.ABORTED, | ||
StatusCode.Code.CANCELLED, | ||
StatusCode.Code.DEADLINE_EXCEEDED, | ||
StatusCode.Code.INTERNAL, | ||
StatusCode.Code.RESOURCE_EXHAUSTED, | ||
StatusCode.Code.UNKNOWN, | ||
StatusCode.Code.UNAVAILABLE)) | ||
.setRetrySettings(retrySettingsBuilder.build()) | ||
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); | ||
this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); | ||
backgroundResourceList.add(publisherStub); | ||
|
@@ -194,13 +212,27 @@ public String getTopicNameString() { | |
public ApiFuture<String> publish(PubsubMessage message) { | ||
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); | ||
|
||
final String orderingKey = message.getOrderingKey(); | ||
Preconditions.checkState( | ||
orderingKey.isEmpty() || enableMessageOrdering, | ||
"Cannot publish a message with an ordering key when message ordering is not enabled."); | ||
|
||
final OutstandingPublish outstandingPublish = | ||
new OutstandingPublish(messageTransform.apply(message)); | ||
List<OutstandingBatch> batchesToSend; | ||
messagesBatchLock.lock(); | ||
try { | ||
// Check if the next message makes the current batch exceed the max batch byte size. | ||
MessagesBatch messagesBatch = messagesBatches.get(orderingKey); | ||
if (messagesBatch == null) { | ||
messagesBatch = new MessagesBatch(batchingSettings, orderingKey); | ||
messagesBatches.put(orderingKey, messagesBatch); | ||
} | ||
|
||
batchesToSend = messagesBatch.add(outstandingPublish); | ||
// Setup the next duration based delivery alarm if there are messages batched. | ||
if (!batchesToSend.isEmpty() && messagesBatch.isEmpty()) { | ||
messagesBatches.remove(orderingKey); | ||
} | ||
setupAlarm(); | ||
} finally { | ||
messagesBatchLock.unlock(); | ||
|
@@ -209,6 +241,7 @@ public ApiFuture<String> publish(PubsubMessage message) { | |
messagesWaiter.incrementPendingMessages(1); | ||
|
||
if (!batchesToSend.isEmpty()) { | ||
publishAllWithoutInflight(); | ||
for (final OutstandingBatch batch : batchesToSend) { | ||
logger.log(Level.FINER, "Scheduling a batch for immediate sending."); | ||
executor.execute( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that this execute() call was removed in the original implementation of ordering keys, but was added back when master is merged to this branch. Please see 3bdbf77#diff-c482c19af8c46aea82546fa17236dfd2 publishOutstandingBatch() should not be called by the executor here because it can reorder sequence of invoking the function. Inside publishOutstandingBatch(), we run an executor, a regular one for non-ordered messages and the sequential executor for ordered messages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Woops. I'll change it back. What do you think about moving the |
||
|
@@ -224,8 +257,20 @@ public void run() { | |
return outstandingPublish.publishResult; | ||
} | ||
|
||
/** | ||
* There may be non-recoverable problems with a request for an ordering key. In that case, all | ||
* subsequent requests will fail until this method is called. If the key is not currently paused, | ||
* calling this method will be a no-op. | ||
* | ||
* @param key The key for which to resume publishing. | ||
*/ | ||
public void resumePublish(String key) { | ||
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); | ||
sequentialExecutor.resumePublish(key); | ||
} | ||
|
||
private void setupAlarm() { | ||
if (!messagesBatch.isEmpty()) { | ||
if (!messagesBatches.isEmpty()) { | ||
if (!activeAlarm.getAndSet(true)) { | ||
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis(); | ||
logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs); | ||
|
@@ -236,7 +281,7 @@ private void setupAlarm() { | |
public void run() { | ||
logger.log(Level.FINER, "Sending messages based on schedule."); | ||
activeAlarm.getAndSet(false); | ||
publishAllOutstanding(); | ||
publishAllWithoutInflight(); | ||
} | ||
}, | ||
delayThresholdMs, | ||
|
@@ -257,16 +302,49 @@ public void run() { | |
*/ | ||
public void publishAllOutstanding() { | ||
messagesBatchLock.lock(); | ||
OutstandingBatch batchToSend; | ||
try { | ||
if (messagesBatch.isEmpty()) { | ||
return; | ||
for (MessagesBatch batch : messagesBatches.values()) { | ||
if (!batch.isEmpty()) { | ||
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If | ||
// it's released, the order of publishing cannot be guaranteed if `publish()` is called | ||
// while this function is running. This locking mechanism needs to be improved if it | ||
// causes any performance degradation. | ||
publishOutstandingBatch(batch.popOutstandingBatch()); | ||
} | ||
} | ||
messagesBatches.clear(); | ||
} finally { | ||
messagesBatchLock.unlock(); | ||
} | ||
} | ||
|
||
/** | ||
* Publish any outstanding batches if non-empty and there are no other batches in flight. This | ||
* method sends buffered messages, but does not wait for the send operations to complete. To wait | ||
* for messages to send, call {@code get} on the futures returned from {@code publish}. | ||
*/ | ||
private void publishAllWithoutInflight() { | ||
messagesBatchLock.lock(); | ||
try { | ||
Iterator<Map.Entry<String, MessagesBatch>> it = messagesBatches.entrySet().iterator(); | ||
while (it.hasNext()) { | ||
Map.Entry<String, MessagesBatch> entry = it.next(); | ||
MessagesBatch batch = entry.getValue(); | ||
String key = entry.getKey(); | ||
if (batch.isEmpty()) { | ||
it.remove(); | ||
} else if (key.isEmpty() || !sequentialExecutor.hasTasksInflight(key)) { | ||
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I think this comment is confusing. Can you remove it? I think you already addressed the potential performance issue in the TODO comment below. |
||
// it's released, the order of publishing cannot be guaranteed if `publish()` is called | ||
// while this function is running. This locking mechanism needs to be improved if it | ||
// causes any performance degradation. | ||
publishOutstandingBatch(batch.popOutstandingBatch()); | ||
it.remove(); | ||
} | ||
} | ||
batchToSend = messagesBatch.popOutstandingBatch(); | ||
} finally { | ||
messagesBatchLock.unlock(); | ||
} | ||
publishOutstandingBatch(batchToSend); | ||
} | ||
|
||
private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) { | ||
|
@@ -280,12 +358,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch | |
} | ||
|
||
private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { | ||
ApiFutureCallback<PublishResponse> futureCallback = | ||
final ApiFutureCallback<PublishResponse> futureCallback = | ||
new ApiFutureCallback<PublishResponse>() { | ||
@Override | ||
public void onSuccess(PublishResponse result) { | ||
try { | ||
if (result.getMessageIdsCount() != outstandingBatch.size()) { | ||
if (result == null || result.getMessageIdsCount() != outstandingBatch.size()) { | ||
outstandingBatch.onFailure( | ||
new IllegalStateException( | ||
String.format( | ||
|
@@ -311,20 +389,36 @@ public void onFailure(Throwable t) { | |
} | ||
}; | ||
|
||
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor()); | ||
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) { | ||
ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor()); | ||
} else { | ||
// If ordering key is specified, publish the batch using the sequential executor. | ||
ApiFuture<PublishResponse> future = | ||
sequentialExecutor.submit( | ||
outstandingBatch.orderingKey, | ||
new Callable<ApiFuture<PublishResponse>>() { | ||
public ApiFuture<PublishResponse> call() { | ||
return publishCall(outstandingBatch); | ||
} | ||
}); | ||
ApiFutures.addCallback(future, futureCallback, directExecutor()); | ||
} | ||
} | ||
|
||
private static final class OutstandingBatch { | ||
final List<OutstandingPublish> outstandingPublishes; | ||
final long creationTime; | ||
int attempt; | ||
int batchSizeBytes; | ||
final String orderingKey; | ||
|
||
OutstandingBatch(List<OutstandingPublish> outstandingPublishes, int batchSizeBytes) { | ||
OutstandingBatch( | ||
List<OutstandingPublish> outstandingPublishes, int batchSizeBytes, String orderingKey) { | ||
this.outstandingPublishes = outstandingPublishes; | ||
attempt = 1; | ||
creationTime = System.currentTimeMillis(); | ||
this.batchSizeBytes = batchSizeBytes; | ||
this.orderingKey = orderingKey; | ||
} | ||
|
||
int size() { | ||
|
@@ -468,7 +562,7 @@ public static final class Builder { | |
.setRpcTimeoutMultiplier(2) | ||
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT) | ||
.build(); | ||
|
||
static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false; | ||
private static final int THREADS_PER_CPU = 5; | ||
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = | ||
InstantiatingExecutorProvider.newBuilder() | ||
|
@@ -482,6 +576,8 @@ public static final class Builder { | |
|
||
RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; | ||
|
||
private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING; | ||
|
||
private TransportChannelProvider channelProvider = | ||
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); | ||
|
||
|
@@ -576,6 +672,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) { | |
return this; | ||
} | ||
|
||
/** Sets the message ordering option. */ | ||
public Builder setEnableMessageOrdering(boolean enableMessageOrdering) { | ||
this.enableMessageOrdering = enableMessageOrdering; | ||
return this; | ||
} | ||
|
||
/** Gives the ability to set a custom executor to be used by the library. */ | ||
public Builder setExecutorProvider(ExecutorProvider executorProvider) { | ||
this.executorProvider = Preconditions.checkNotNull(executorProvider); | ||
|
@@ -601,15 +703,17 @@ public Publisher build() throws IOException { | |
private static class MessagesBatch { | ||
private List<OutstandingPublish> messages; | ||
private int batchedBytes; | ||
private String orderingKey; | ||
private final BatchingSettings batchingSettings; | ||
|
||
public MessagesBatch(BatchingSettings batchingSettings) { | ||
private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) { | ||
this.batchingSettings = batchingSettings; | ||
this.orderingKey = orderingKey; | ||
reset(); | ||
} | ||
|
||
private OutstandingBatch popOutstandingBatch() { | ||
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes); | ||
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey); | ||
reset(); | ||
return batch; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment can be removed now.