From 7732506172df79022a938126c45be98d96d2a88e Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Wed, 17 Apr 2019 08:19:55 -0400 Subject: [PATCH] Refactoring of the Pub/Sub Ordering keys client (#4962) --- .../com/google/cloud/pubsub/v1/Publisher.java | 26 ++++++++++--------- .../pubsub/v1/SequentialExecutorService.java | 10 +++---- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 44268b2c9fb8..c7d461ea678f 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -234,30 +234,30 @@ public ApiFuture publish(PubsubMessage message) { messagesBatchLock.lock(); try { // Check if the next message makes the current batch exceed the max batch byte size. - MessagesBatch batch = messagesBatches.get(orderingKey); - if (batch == null) { - batch = new MessagesBatch(orderingKey); - messagesBatches.put(orderingKey, batch); + MessagesBatch messageBatch = messagesBatches.get(orderingKey); + if (messageBatch == null) { + messageBatch = new MessagesBatch(orderingKey); + messagesBatches.put(orderingKey, messageBatch); } - if (!batch.isEmpty() + if (!messageBatch.isEmpty() && hasBatchingBytes() - && batch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) { - batchToSend = batch.popOutstandingBatch(); + && messageBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) { + batchToSend = messageBatch.popOutstandingBatch(); } // Border case if the message to send is greater or equals to the max batch size then can't // be included in the current batch and instead sent immediately. if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) { - batch.addMessage(outstandingPublish, messageSize); + messageBatch.addMessage(outstandingPublish, messageSize); // If after adding the message we have reached the batch max messages then we have a batch // to send. - if (batch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) { - batchToSend = batch.popOutstandingBatch(); + if (messageBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) { + batchToSend = messageBatch.popOutstandingBatch(); } } // Setup the next duration based delivery alarm if there are messages batched. - if (!batch.isEmpty()) { + if (!messageBatch.isEmpty()) { setupDurationBasedPublishAlarm(); } else { messagesBatches.remove(orderingKey); @@ -405,7 +405,9 @@ public ApiFuture call() { } }; ApiFutures.addCallback( - sequentialExecutor.submit(outstandingBatch.orderingKey, func), futureCallback); + sequentialExecutor.submit(outstandingBatch.orderingKey, func), + futureCallback, + directExecutor()); } } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 7ec61c5f8063..c9557fb65c77 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -102,7 +102,7 @@ public void cancel(Throwable e) { } /** Runs synchronous {@code Runnable} tasks sequentially. */ - public void submit(final String key, final Runnable runnable) { + void submit(final String key, final Runnable runnable) { autoSequentialExecutor.execute(key, runnable); } @@ -128,7 +128,7 @@ enum TaskCompleteAction { * Creates a AutoSequentialExecutor which executes the next queued task automatically when the * previous task has completed. */ - public static SequentialExecutor newAutoSequentialExecutor(Executor executor) { + static SequentialExecutor newAutoSequentialExecutor(Executor executor) { return new SequentialExecutor(executor, TaskCompleteAction.EXECUTE_NEXT_TASK); } @@ -147,7 +147,7 @@ private SequentialExecutor(Executor executor, TaskCompleteAction taskCompleteAct this.tasksByKey = new HashMap<>(); } - public void execute(final String key, Runnable task) { + void execute(final String key, Runnable task) { Deque newTasks; synchronized (tasksByKey) { newTasks = tasksByKey.get(key); @@ -182,7 +182,7 @@ public void run() { } /** Cancels every task in the queue assoicated with {@code key}. */ - public void cancelQueuedTasks(final String key, Throwable e) { + void cancelQueuedTasks(final String key, Throwable e) { // TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked, // so that no more tasks are scheduled. synchronized (tasksByKey) { @@ -204,7 +204,7 @@ public void cancelQueuedTasks(final String key, Throwable e) { } /** Executes the next queued task associated with {@code key}. */ - public void resume(final String key) { + void resume(final String key) { if (taskCompleteAction.equals(TaskCompleteAction.EXECUTE_NEXT_TASK)) { // resume() is no-op since tasks are executed automatically. return;