Skip to content

Commit

Permalink
Refactoring of the Pub/Sub Ordering keys client (#4962)
Browse files Browse the repository at this point in the history
  • Loading branch information
sduskis authored Apr 17, 2019
1 parent e88432a commit 7732506
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,30 +234,30 @@ public ApiFuture<String> publish(PubsubMessage message) {
messagesBatchLock.lock();
try {
// Check if the next message makes the current batch exceed the max batch byte size.
MessagesBatch batch = messagesBatches.get(orderingKey);
if (batch == null) {
batch = new MessagesBatch(orderingKey);
messagesBatches.put(orderingKey, batch);
MessagesBatch messageBatch = messagesBatches.get(orderingKey);
if (messageBatch == null) {
messageBatch = new MessagesBatch(orderingKey);
messagesBatches.put(orderingKey, messageBatch);
}
if (!batch.isEmpty()
if (!messageBatch.isEmpty()
&& hasBatchingBytes()
&& batch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
batchToSend = batch.popOutstandingBatch();
&& messageBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
batchToSend = messageBatch.popOutstandingBatch();
}

// Border case if the message to send is greater or equals to the max batch size then can't
// be included in the current batch and instead sent immediately.
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
batch.addMessage(outstandingPublish, messageSize);
messageBatch.addMessage(outstandingPublish, messageSize);
// If after adding the message we have reached the batch max messages then we have a batch
// to send.
if (batch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = batch.popOutstandingBatch();
if (messageBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = messageBatch.popOutstandingBatch();
}
}

// Setup the next duration based delivery alarm if there are messages batched.
if (!batch.isEmpty()) {
if (!messageBatch.isEmpty()) {
setupDurationBasedPublishAlarm();
} else {
messagesBatches.remove(orderingKey);
Expand Down Expand Up @@ -405,7 +405,9 @@ public ApiFuture call() {
}
};
ApiFutures.addCallback(
sequentialExecutor.submit(outstandingBatch.orderingKey, func), futureCallback);
sequentialExecutor.submit(outstandingBatch.orderingKey, func),
futureCallback,
directExecutor());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void cancel(Throwable e) {
}

/** Runs synchronous {@code Runnable} tasks sequentially. */
public void submit(final String key, final Runnable runnable) {
void submit(final String key, final Runnable runnable) {
autoSequentialExecutor.execute(key, runnable);
}

Expand All @@ -128,7 +128,7 @@ enum TaskCompleteAction {
* Creates a AutoSequentialExecutor which executes the next queued task automatically when the
* previous task has completed.
*/
public static SequentialExecutor newAutoSequentialExecutor(Executor executor) {
static SequentialExecutor newAutoSequentialExecutor(Executor executor) {
return new SequentialExecutor(executor, TaskCompleteAction.EXECUTE_NEXT_TASK);
}

Expand All @@ -147,7 +147,7 @@ private SequentialExecutor(Executor executor, TaskCompleteAction taskCompleteAct
this.tasksByKey = new HashMap<>();
}

public void execute(final String key, Runnable task) {
void execute(final String key, Runnable task) {
Deque<Runnable> newTasks;
synchronized (tasksByKey) {
newTasks = tasksByKey.get(key);
Expand Down Expand Up @@ -182,7 +182,7 @@ public void run() {
}

/** Cancels every task in the queue assoicated with {@code key}. */
public void cancelQueuedTasks(final String key, Throwable e) {
void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
// so that no more tasks are scheduled.
synchronized (tasksByKey) {
Expand All @@ -204,7 +204,7 @@ public void cancelQueuedTasks(final String key, Throwable e) {
}

/** Executes the next queued task associated with {@code key}. */
public void resume(final String key) {
void resume(final String key) {
if (taskCompleteAction.equals(TaskCompleteAction.EXECUTE_NEXT_TASK)) {
// resume() is no-op since tasks are executed automatically.
return;
Expand Down

0 comments on commit 7732506

Please sign in to comment.