diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 4b6d02d90867..53b979d5e447 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -62,6 +62,7 @@ class MessageDispatcher { @InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100); private final Executor executor; + private final SequentialExecutorService.AutoExecutor sequentialExecutor; private final ScheduledExecutorService systemExecutor; private final ApiClock clock; @@ -206,6 +207,7 @@ void sendAckOperations( jobLock = new ReentrantLock(); messagesWaiter = new MessageWaiter(); this.clock = clock; + this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor); } void start() { @@ -358,7 +360,7 @@ public void nack() { } }; ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); - executor.execute( + Runnable deliverMessageTask = new Runnable() { @Override public void run() { @@ -379,7 +381,12 @@ public void run() { response.setException(e); } } - }); + }; + if (message.getOrderingKey().isEmpty()) { + executor.execute(deliverMessageTask); + } else { + sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask); + } } /** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */ diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 31a57fcc9daa..8a5802420854 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -49,9 +49,12 @@ import com.google.pubsub.v1.TopicNames; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -85,15 +88,20 @@ public class Publisher { private final String topicName; private final BatchingSettings batchingSettings; + private final boolean enableMessageOrdering; private final Lock messagesBatchLock; - private MessagesBatch messagesBatch; + + private final Map messagesBatches; private final AtomicBoolean activeAlarm; private final PublisherStub publisherStub; private final ScheduledExecutorService executor; + + private final SequentialExecutorService.CallbackExecutor sequentialExecutor; + private final AtomicBoolean shutdown; private final BackgroundResource backgroundResources; private final MessageWaiter messagesWaiter; @@ -114,12 +122,14 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; + this.enableMessageOrdering = builder.enableMessageOrdering; this.messageTransform = builder.messageTransform; - messagesBatch = new MessagesBatch(batchingSettings); + messagesBatches = new HashMap<>(); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); + sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor); List backgroundResourceList = new ArrayList<>(); if (builder.executorProvider.shouldAutoClose()) { backgroundResourceList.add(new ExecutorAsBackgroundResource(executor)); @@ -127,9 +137,18 @@ private Publisher(Builder builder) throws IOException { // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry. // We post-process this here to keep backward-compatibility. - RetrySettings retrySettings = builder.retrySettings; - if (retrySettings.getMaxAttempts() == 0) { - retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build(); + // Also, if "message ordering" is enabled, the publisher should retry sending the failed + // message infinitely rather than sending the next one. + RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder(); + if (retrySettingsBuilder.getMaxAttempts() == 0) { + retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE); + } + if (enableMessageOrdering) { + // TODO: is there a way to have the default retry settings for requests without an ordering + // key? + retrySettingsBuilder + .setMaxAttempts(Integer.MAX_VALUE) + .setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE)); } PublisherStubSettings.Builder stubSettings = @@ -147,7 +166,7 @@ private Publisher(Builder builder) throws IOException { StatusCode.Code.RESOURCE_EXHAUSTED, StatusCode.Code.UNKNOWN, StatusCode.Code.UNAVAILABLE) - .setRetrySettings(retrySettings) + .setRetrySettings(retrySettingsBuilder.build()) .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); backgroundResourceList.add(publisherStub); @@ -194,38 +213,71 @@ public String getTopicNameString() { public ApiFuture publish(PubsubMessage message) { Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); + final String orderingKey = message.getOrderingKey(); + Preconditions.checkState( + orderingKey.isEmpty() || enableMessageOrdering, + "Cannot publish a message with an ordering key when message ordering is not enabled."); + final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); List batchesToSend; messagesBatchLock.lock(); try { + MessagesBatch messagesBatch = messagesBatches.get(orderingKey); + if (messagesBatch == null) { + messagesBatch = new MessagesBatch(batchingSettings, orderingKey); + messagesBatches.put(orderingKey, messagesBatch); + } + batchesToSend = messagesBatch.add(outstandingPublish); + if (!batchesToSend.isEmpty() && messagesBatch.isEmpty()) { + messagesBatches.remove(orderingKey); + } // Setup the next duration based delivery alarm if there are messages batched. setupAlarm(); + // For messages with an ordering key, we need to publish with messagesBatchLock held in order + // to ensure another publish doesn't slip in and send a batch before these batches we already + // want to send. + if (!batchesToSend.isEmpty() && !orderingKey.isEmpty()) { + for (final OutstandingBatch batch : batchesToSend) { + logger.log(Level.FINER, "Scheduling a batch for immediate sending."); + publishOutstandingBatch(batch); + } + } } finally { messagesBatchLock.unlock(); } messagesWaiter.incrementPendingMessages(1); - if (!batchesToSend.isEmpty()) { + // For messages without ordering keys, it is okay to send batches without holding + // messagesBatchLock. + if (!batchesToSend.isEmpty() && orderingKey.isEmpty()) { for (final OutstandingBatch batch : batchesToSend) { logger.log(Level.FINER, "Scheduling a batch for immediate sending."); - executor.execute( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch(batch); - } - }); + publishOutstandingBatch(batch); } } return outstandingPublish.publishResult; } + /** + * There may be non-recoverable problems with a request for an ordering key. In that case, all + * subsequent requests will fail until this method is called. If the key is not currently paused, + * calling this method will be a no-op. + * + * @param key The key for which to resume publishing. + */ + // TODO: make this public when Ordering keys is live + @BetaApi + void resumePublish(String key) { + Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); + sequentialExecutor.resumePublish(key); + } + private void setupAlarm() { - if (!messagesBatch.isEmpty()) { + if (!messagesBatches.isEmpty()) { if (!activeAlarm.getAndSet(true)) { long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis(); logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs); @@ -236,7 +288,7 @@ private void setupAlarm() { public void run() { logger.log(Level.FINER, "Sending messages based on schedule."); activeAlarm.getAndSet(false); - publishAllOutstanding(); + publishAllWithoutInflight(); } }, delayThresholdMs, @@ -256,17 +308,61 @@ public void run() { * futures returned from {@code publish}. */ public void publishAllOutstanding() { + OutstandingBatch unorderedOutstandingBatch = null; + messagesBatchLock.lock(); + try { + for (MessagesBatch batch : messagesBatches.values()) { + if (!batch.isEmpty()) { + if (!batch.orderingKey.isEmpty()) { + // For messages with an ordering key, we need to publish with messagesBatchLock held in + // order to ensure another publish doesn't slip in and send a batch before these batches + // we already want to send. + publishOutstandingBatch(batch.popOutstandingBatch()); + } else { + unorderedOutstandingBatch = batch.popOutstandingBatch(); + } + } + } + messagesBatches.clear(); + } finally { + messagesBatchLock.unlock(); + } + if (unorderedOutstandingBatch != null) { + publishOutstandingBatch(unorderedOutstandingBatch); + } + } + + /** + * Publish any outstanding batches if non-empty and there are no other batches in flight. This + * method sends buffered messages, but does not wait for the send operations to complete. To wait + * for messages to send, call {@code get} on the futures returned from {@code publish}. + */ + private void publishAllWithoutInflight() { + OutstandingBatch unorderedOutstandingBatch = null; messagesBatchLock.lock(); - OutstandingBatch batchToSend; try { - if (messagesBatch.isEmpty()) { - return; + Iterator> it = messagesBatches.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + MessagesBatch batch = entry.getValue(); + String key = entry.getKey(); + if (batch.isEmpty()) { + it.remove(); + } else if (key.isEmpty()) { + // We will publish the batch with no ordering key outside messagesBatchLock. + unorderedOutstandingBatch = batch.popOutstandingBatch(); + it.remove(); + } else if (!sequentialExecutor.hasTasksInflight(key)) { + publishOutstandingBatch(batch.popOutstandingBatch()); + it.remove(); + } } - batchToSend = messagesBatch.popOutstandingBatch(); } finally { messagesBatchLock.unlock(); } - publishOutstandingBatch(batchToSend); + if (unorderedOutstandingBatch != null) { + publishOutstandingBatch(unorderedOutstandingBatch); + } } private ApiFuture publishCall(OutstandingBatch outstandingBatch) { @@ -280,12 +376,12 @@ private ApiFuture publishCall(OutstandingBatch outstandingBatch } private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { - ApiFutureCallback futureCallback = + final ApiFutureCallback futureCallback = new ApiFutureCallback() { @Override public void onSuccess(PublishResponse result) { try { - if (result.getMessageIdsCount() != outstandingBatch.size()) { + if (result == null || result.getMessageIdsCount() != outstandingBatch.size()) { outstandingBatch.onFailure( new IllegalStateException( String.format( @@ -311,7 +407,21 @@ public void onFailure(Throwable t) { } }; - ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor()); + ApiFuture future; + if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) { + future = publishCall(outstandingBatch); + } else { + // If ordering key is specified, publish the batch using the sequential executor. + future = + sequentialExecutor.submit( + outstandingBatch.orderingKey, + new Callable>() { + public ApiFuture call() { + return publishCall(outstandingBatch); + } + }); + } + ApiFutures.addCallback(future, futureCallback, directExecutor()); } private static final class OutstandingBatch { @@ -319,12 +429,15 @@ private static final class OutstandingBatch { final long creationTime; int attempt; int batchSizeBytes; + final String orderingKey; - OutstandingBatch(List outstandingPublishes, int batchSizeBytes) { + OutstandingBatch( + List outstandingPublishes, int batchSizeBytes, String orderingKey) { this.outstandingPublishes = outstandingPublishes; attempt = 1; creationTime = System.currentTimeMillis(); this.batchSizeBytes = batchSizeBytes; + this.orderingKey = orderingKey; } int size() { @@ -468,7 +581,7 @@ public static final class Builder { .setRpcTimeoutMultiplier(2) .setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT) .build(); - + static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false; private static final int THREADS_PER_CPU = 5; static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() @@ -482,6 +595,8 @@ public static final class Builder { RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; + private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING; + private TransportChannelProvider channelProvider = TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); @@ -576,6 +691,14 @@ public Builder setRetrySettings(RetrySettings retrySettings) { return this; } + /** Sets the message ordering option. */ + // TODO: make this public when Ordering keys is live + @BetaApi + Builder setEnableMessageOrdering(boolean enableMessageOrdering) { + this.enableMessageOrdering = enableMessageOrdering; + return this; + } + /** Gives the ability to set a custom executor to be used by the library. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); @@ -601,15 +724,17 @@ public Publisher build() throws IOException { private static class MessagesBatch { private List messages; private int batchedBytes; + private String orderingKey; private final BatchingSettings batchingSettings; - public MessagesBatch(BatchingSettings batchingSettings) { + private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) { this.batchingSettings = batchingSettings; + this.orderingKey = orderingKey; reset(); } private OutstandingBatch popOutstandingBatch() { - OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes); + OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey); reset(); return batch; } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java new file mode 100644 index 000000000000..a172c1ecb36d --- /dev/null +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -0,0 +1,264 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.BetaApi; +import com.google.api.core.SettableApiFuture; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; + +interface CancellableRunnable extends Runnable { + void cancel(Throwable e); +} + +/** + * An executor service that runs the tasks with the same key sequentially. The tasks with the same + * key will be run only when its predecessor has been completed while tasks with different keys can + * be run in parallel. + */ +final class SequentialExecutorService { + + // This class is not directly usable. + private SequentialExecutorService() {} + + /** + * This Executor takes a serial stream of string keys and {@code Runnable} tasks, and runs the + * tasks with the same key sequentially. Tasks with the same key will be run only when its + * predecessor has been completed while tasks with different keys can be run in parallel. + */ + private abstract static class SequentialExecutor { + // Maps keys to tasks. + protected final Map> tasksByKey; + protected final Executor executor; + + private SequentialExecutor(Executor executor) { + this.executor = executor; + this.tasksByKey = new HashMap<>(); + } + + boolean hasTasksInflight(String key) { + synchronized (tasksByKey) { + return tasksByKey.containsKey(key); + } + } + + protected void execute(final String key, R task) { + synchronized (tasksByKey) { + Queue newTasks = tasksByKey.get(key); + // If this key is already being handled, add it to the queue and return. + if (newTasks != null) { + newTasks.add(task); + return; + } else { + newTasks = new LinkedList<>(); + newTasks.add(task); + tasksByKey.put(key, newTasks); + } + } + + callNextTaskAsync(key); + } + + protected void callNextTaskAsync(final String key) { + boolean executeTask = true; + synchronized (tasksByKey) { + Queue tasks = tasksByKey.get(key); + if (tasks != null && tasks.isEmpty()) { + // Only remove the Queue after all tasks were completed + tasksByKey.remove(key); + executeTask = false; + } + } + if (executeTask) { + executor.execute( + new Runnable() { + @Override + public void run() { + R task = null; + synchronized (tasksByKey) { + Queue tasks = tasksByKey.get(key); + if (tasks != null && !tasks.isEmpty()) { + task = tasks.poll(); + } + } + if (task != null) { + task.run(); + } + } + }); + } + } + } + + @BetaApi + static class AutoExecutor extends SequentialExecutor { + AutoExecutor(Executor executor) { + super(executor); + } + + /** Runs synchronous {@code Runnable} tasks sequentially. */ + void submit(final String key, final Runnable task) { + super.execute( + key, + new Runnable() { + @Override + public void run() { + try { + task.run(); + } finally { + callNextTaskAsync(key); + } + } + }); + } + } + + /** + * Runs asynchronous {@code Callable} tasks sequentially for the same key. If one of the tasks + * fails, other tasks with the same key that have not been executed will be cancelled. + */ + @BetaApi + static class CallbackExecutor extends SequentialExecutor { + static CancellationException CANCELLATION_EXCEPTION = + new CancellationException( + "Execution cancelled because executing previous runnable failed."); + + private final Set keysWithErrors = Collections.synchronizedSet(new HashSet()); + + CallbackExecutor(Executor executor) { + super(executor); + } + + /** + * Runs asynchronous {@code Callable} tasks sequentially. If one of the tasks fails, other tasks + * with the same key that have not been executed will be cancelled. + * + *

This method does the following in a chain: + * + *

    + *
  1. Creates an `ApiFuture` that can be used for tracking progress. + *
  2. Creates a `CancellableRunnable` out of the `Callable` + *
  3. Adds the `CancellableRunnable` to the task queue + *
  4. Once the task is ready to be run, it will execute the `Callable` + *
  5. When the `Callable` completes one of two things happens: + *
      + *
    1. On success: + *
        + *
      1. Complete the `ApiFuture` by setting the return value. + *
      2. Call the next task. + *
      + *
    2. On Failure: + *
        + *
      1. Fail the `ApiFuture` by setting the exception. + *
      2. Cancel all tasks in the queue. + *
      + *
    + *
+ * + * @param key The key for the task queue + * @param callable The thing to run + * @param The return type for the `Callable`'s `ApiFuture`. + * @return an `ApiFuture` for tracking. + */ + ApiFuture submit(final String key, final Callable> callable) { + // Step 1: create a future for the user + final SettableApiFuture future = SettableApiFuture.create(); + + if (keysWithErrors.contains(key)) { + future.setException(CANCELLATION_EXCEPTION); + return future; + } + + // Step 2: create the CancellableRunnable + // Step 3: add the task to queue via `execute` + CancellableRunnable task = + new CancellableRunnable() { + private boolean cancelled = false; + + @Override + public void run() { + // the task was cancelled + if (cancelled) { + return; + } + + try { + // Step 4: call the `Callable` + ApiFutureCallback callback = + new ApiFutureCallback() { + // Step 5.1: on success + @Override + public void onSuccess(T msg) { + future.set(msg); + callNextTaskAsync(key); + } + + // Step 5.2: on failure + @Override + public void onFailure(Throwable e) { + future.setException(e); + cancelQueuedTasks(key, CANCELLATION_EXCEPTION); + } + }; + ApiFutures.addCallback(callable.call(), callback, directExecutor()); + } catch (Exception e) { + cancel(e); + } + } + + @Override + public void cancel(Throwable e) { + this.cancelled = true; + future.setException(e); + } + }; + execute(key, task); + return future; + } + + void resumePublish(String key) { + keysWithErrors.remove(key); + } + + /** Cancels every task in the queue associated with {@code key}. */ + private void cancelQueuedTasks(final String key, Throwable e) { + keysWithErrors.add(key); + synchronized (tasksByKey) { + final Queue tasks = tasksByKey.get(key); + if (tasks != null) { + while (!tasks.isEmpty()) { + tasks.poll().cancel(e); + } + tasksByKey.remove(key); + } + } + } + } +} diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java index 396b5d05bd5f..620a09ac98bc 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud @@ -33,6 +34,8 @@ class FakePublisherServiceImpl extends PublisherImplBase { private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue publishResponses = new LinkedBlockingQueue<>(); + private final AtomicInteger nextMessageId = new AtomicInteger(1); + private boolean autoPublishResponse; /** Class used to save the state of a possible response. */ private static class Response { @@ -75,7 +78,15 @@ public void publish(PublishRequest request, StreamObserver resp requests.add(request); Response response; try { - response = publishResponses.take(); + if (autoPublishResponse) { + PublishResponse.Builder builder = PublishResponse.newBuilder(); + for (int i = 0; i < request.getMessagesCount(); i++) { + builder.addMessageIds(Integer.toString(nextMessageId.getAndIncrement())); + } + response = new Response(builder.build()); + } else { + response = publishResponses.take(); + } } catch (InterruptedException e) { throw new IllegalArgumentException(e); } @@ -87,6 +98,15 @@ public void publish(PublishRequest request, StreamObserver resp } } + /** + * If enabled, PublishResponse is generated with a unique message id automatically when publish() + * is called. + */ + public FakePublisherServiceImpl setAutoPublishResponse(boolean autoPublishResponse) { + this.autoPublishResponse = autoPublishResponse; + return this; + } + public FakePublisherServiceImpl addPublishResponse(PublishResponse publishResponse) { publishResponses.add(new Response(publishResponse)); return this; diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index acbc82c95c41..f3c85220fc31 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -35,16 +35,19 @@ import com.google.cloud.pubsub.v1.Publisher.Builder; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PubsubMessage; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.inprocess.InProcessServerBuilder; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.easymock.EasyMock; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -240,6 +243,275 @@ private ApiFuture sendTestMessage(Publisher publisher, String data) { PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build()); } + @Test + public void testBatchedMessagesWithOrderingKeyByNum() throws Exception { + // Limit the number of maximum elements in a single batch to 3. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(3L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + + // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "m3", "OrderA"); + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "m4", "OrderB"); + + // Verify that none of them were published since the batching size is 3. + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + assertFalse(publishFuture3.isDone()); + assertFalse(publishFuture4.isDone()); + + // One of the batches reaches the limit. + ApiFuture publishFuture5 = sendTestMessageWithOrderingKey(publisher, "m5", "OrderA"); + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); + assertTrue(Integer.parseInt(publishFuture3.get()) < Integer.parseInt(publishFuture5.get())); + + // The other batch reaches the limit. + ApiFuture publishFuture6 = sendTestMessageWithOrderingKey(publisher, "m6", "OrderB"); + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + assertTrue(Integer.parseInt(publishFuture4.get()) < Integer.parseInt(publishFuture6.get())); + + // Verify that every message within the same batch has the same ordering key. + List requests = testPublisherServiceImpl.getCapturedRequests(); + for (PublishRequest request : requests) { + if (request.getMessagesCount() > 1) { + String orderingKey = request.getMessages(0).getOrderingKey(); + for (PubsubMessage message : request.getMessagesList()) { + assertEquals(message.getOrderingKey(), orderingKey); + } + } + } + publisher.shutdown(); + } + + @Test + public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { + // Limit the batching timeout to 100 seconds. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(10L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + + // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "m3", "OrderA"); + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "m4", "OrderB"); + + // Verify that none of them were published since the batching size is 10 and timeout has not + // been expired. + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + assertFalse(publishFuture3.isDone()); + assertFalse(publishFuture4.isDone()); + + // The timeout expires. + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + + // Verify that every message within the same batch has the same ordering key. + List requests = testPublisherServiceImpl.getCapturedRequests(); + for (PublishRequest request : requests) { + if (request.getMessagesCount() > 1) { + String orderingKey = request.getMessages(0).getOrderingKey(); + for (PubsubMessage message : request.getMessagesList()) { + assertEquals(message.getOrderingKey(), orderingKey); + } + } + } + publisher.shutdown(); + } + + @Test + public void testLargeMessagesDoNotReorderBatches() throws Exception { + // Set the maximum batching size to 20 bytes. + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(10L) + .setRequestByteThreshold(20L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableMessageOrdering(true) + .build(); + testPublisherServiceImpl.setAutoPublishResponse(true); + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "m2", "OrderB"); + + assertFalse(publishFuture1.isDone()); + assertFalse(publishFuture2.isDone()); + + ApiFuture publishFuture3 = + sendTestMessageWithOrderingKey(publisher, "VeryLargeMessage", "OrderB"); + // Verify that messages with "OrderB" were delivered in order. + assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture3.get())); + + publisher.shutdown(); + } + + @Test + public void testOrderingKeyWhenDisabled_throwsException() throws Exception { + // Message ordering is disabled by default. + Publisher publisher = getTestPublisherBuilder().build(); + try { + ApiFuture publishFuture = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + fail("Should have thrown an IllegalStateException"); + } catch (IllegalStateException expected) { + // expected + } + publisher.shutdown(); + } + + @Test + public void testEnableMessageOrdering_overwritesMaxAttempts() throws Exception { + // Set maxAttempts to 1 and enableMessageOrdering to true at the same time. + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setRetrySettings( + Publisher.Builder.DEFAULT_RETRY_SETTINGS + .toBuilder() + .setTotalTimeout(Duration.ofSeconds(10)) + .setMaxAttempts(1) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Although maxAttempts is 1, the publisher will retry until it succeeds since + // enableMessageOrdering is true. + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + assertEquals("1", publishFuture1.get()); + + assertEquals(4, testPublisherServiceImpl.getCapturedRequests().size()); + publisher.shutdown(); + } + + @Test + /** + * Make sure that resume publishing works as expected: + * + *
    + *
  1. publish with key orderA which returns a failure. + *
  2. publish with key orderA again, which should fail immediately + *
  3. publish with key orderB, which should succeed + *
  4. resume publishing on key orderA + *
  5. publish with key orderA, which should now succeed + *
+ */ + public void testResumePublish() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .setEnableMessageOrdering(true) + .build(); + + ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); + + fakeExecutor.advanceTime(Duration.ZERO); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + // This exception should stop future publishing to the same key + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + + fakeExecutor.advanceTime(Duration.ZERO); + + try { + future1.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + try { + future2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + // Submit new requests with orderA that should fail. + ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); + ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); + + try { + future3.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + try { + future4.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // Submit a new request with orderB, which should succeed + ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); + ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); + + Assert.assertEquals("5", future5.get()); + Assert.assertEquals("6", future6.get()); + + // Resume publishing of "orderA", which should now succeed + publisher.resumePublish("orderA"); + + ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); + ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); + + Assert.assertEquals("7", future7.get()); + Assert.assertEquals("8", future8.get()); + + publisher.shutdown(); + } + + private ApiFuture sendTestMessageWithOrderingKey( + Publisher publisher, String data, String orderingKey) { + return publisher.publish( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()); + } + @Test public void testErrorPropagation() throws Exception { Publisher publisher = diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java new file mode 100644 index 000000000000..3788bd3c04f8 --- /dev/null +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java @@ -0,0 +1,247 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.common.collect.ImmutableList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public final class SequentialExecutorServiceTest { + private final ExecutorProvider executorProvider = + InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(5 * Runtime.getRuntime().availableProcessors()) + .build(); + + static class AsyncTaskCallable implements Callable> { + boolean isCalled = false; + SettableApiFuture result = SettableApiFuture.create(); + + @Override + public ApiFuture call() { + isCalled = true; + return result; + } + + public boolean isCalled() { + return isCalled; + } + + public void finishWithError(Throwable e) { + result.setException(e); + } + + public void finish() { + result.set("ok"); + } + } + + @Test + public void testExecutorRunsNextTaskWhenPrevResponseReceived() throws Exception { + SequentialExecutorService.CallbackExecutor sequentialExecutorService = + new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor()); + AsyncTaskCallable callable1 = new AsyncTaskCallable(); + AsyncTaskCallable callable2 = new AsyncTaskCallable(); + AsyncTaskCallable callable3 = new AsyncTaskCallable(); + + ApiFuture result1 = sequentialExecutorService.submit("key", callable1); + ApiFuture result2 = sequentialExecutorService.submit("key", callable2); + ApiFuture result3 = sequentialExecutorService.submit("key", callable3); + + Thread.sleep(1000); + assertFalse(callable2.isCalled()); + assertFalse(callable3.isCalled()); + callable1.finish(); + assertEquals("ok", result1.get()); + + assertFalse(callable3.isCalled()); + callable2.finish(); + assertEquals("ok", result2.get()); + + callable3.finish(); + assertEquals("ok", result3.get()); + } + + @Test + public void testExecutorRunsDifferentKeySimultaneously() throws Exception { + SequentialExecutorService.CallbackExecutor sequentialExecutorService = + new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor()); + AsyncTaskCallable callable1 = new AsyncTaskCallable(); + AsyncTaskCallable callable2 = new AsyncTaskCallable(); + AsyncTaskCallable callable3 = new AsyncTaskCallable(); + + // Submit three tasks (two tasks with "key", and one task with "key2"). + ApiFuture result1 = sequentialExecutorService.submit("key", callable1); + ApiFuture result2 = sequentialExecutorService.submit("key", callable2); + ApiFuture result3 = sequentialExecutorService.submit("key2", callable3); + + // The task associated with "key2" can be run in parallel with other tasks with "key". + callable3.finish(); + assertEquals("ok", result3.get()); + + // Sleep some time to give the test a chance to fail. Verify that the second task has not been + // executed while the main thread is slpeeing. + Thread.sleep(100); + assertFalse(callable2.isCalled()); + // Complete the first task. + callable1.finish(); + assertEquals("ok", result1.get()); + // Now, the second task can be executed. + callable2.finish(); + assertEquals("ok", result2.get()); + } + + @Test + public void testExecutorCancelsAllTasksWhenOneFailed() throws Exception { + SequentialExecutorService.CallbackExecutor sequentialExecutorService = + new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor()); + AsyncTaskCallable callable1 = new AsyncTaskCallable(); + AsyncTaskCallable callable2 = new AsyncTaskCallable(); + AsyncTaskCallable callable3 = new AsyncTaskCallable(); + + ApiFuture result1 = sequentialExecutorService.submit("key", callable1); + ApiFuture result2 = sequentialExecutorService.submit("key", callable2); + ApiFuture result3 = sequentialExecutorService.submit("key", callable3); + + Throwable failure = new Exception("failure"); + callable1.finishWithError(failure); + // The failed task throws an exception that contains the cause of the failure. + try { + result1.get(); + fail("Should have thrown an ExecutionException"); + } catch (ExecutionException e) { + assertEquals(failure, e.getCause()); + } + // Other tasks in the queue are expected to fail with a CancellationException. + for (ApiFuture result : ImmutableList.of(result2, result3)) { + try { + result.get(); + fail("Should have thrown an ExecutionException"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(CancellationException.class); + } + } + } + + /** + * A task that sleeps {@code taskDurationMillis} milliseconds. Appends its {@code taskId} to + * {@code startedTasksSequence} before sleeping and appends it to {@code completedTasksSequence} + * when sleeping is done. + */ + static class SleepingSyncTask implements Runnable { + private final int taskId; + private final long taskDurationMillis; + private final LinkedHashSet startedTasksSequence; + private final LinkedHashSet completedTasksSequence; + private final CountDownLatch remainingTasksCount; + + public SleepingSyncTask( + int taskId, + long taskDurationMillis, + LinkedHashSet startedTasksSequence, + LinkedHashSet completedTasksSequence, + CountDownLatch remainingTasksCount) { + this.taskId = taskId; + this.taskDurationMillis = taskDurationMillis; + this.startedTasksSequence = startedTasksSequence; + this.completedTasksSequence = completedTasksSequence; + this.remainingTasksCount = remainingTasksCount; + } + + @Override + public void run() { + if (taskId > 0) { + // Verify that the previous task has been completed. + assertTrue(startedTasksSequence.contains(taskId - 1)); + assertTrue(completedTasksSequence.contains(taskId - 1)); + } + startedTasksSequence.add(taskId); + try { + Thread.sleep(taskDurationMillis); + } catch (InterruptedException e) { + return; + } + completedTasksSequence.add(taskId); + remainingTasksCount.countDown(); + + // Verify that the next task has not been started yet. + assertFalse(startedTasksSequence.contains(taskId + 1)); + assertFalse(completedTasksSequence.contains(taskId + 1)); + } + } + + @Test + public void SequentialExecutorRunsTasksAutomatically() throws Exception { + int numKeys = 50; + int numTasks = 50; + SequentialExecutorService.AutoExecutor sequentialExecutor = + new SequentialExecutorService.AutoExecutor(executorProvider.getExecutor()); + CountDownLatch remainingTasksCount = new CountDownLatch(numKeys * numTasks); + // Maps keys to lists of started and completed tasks. + Map> startedTasks = new HashMap<>(); + Map> completedTasks = new HashMap<>(); + + for (int i = 0; i < numKeys; i++) { + String key = "key" + i; + LinkedHashSet startedTasksSequence = new LinkedHashSet<>(); + LinkedHashSet completedTasksSequence = new LinkedHashSet<>(); + startedTasks.put(key, completedTasksSequence); + completedTasks.put(key, completedTasksSequence); + for (int taskId = 0; taskId < numTasks; taskId++) { + SleepingSyncTask task = + new SleepingSyncTask( + taskId, 5, startedTasksSequence, completedTasksSequence, remainingTasksCount); + sequentialExecutor.submit(key, task); + } + } + + remainingTasksCount.await(); + + for (int i = 0; i < numKeys; i++) { + LinkedHashSet startedTasksSequence = startedTasks.get("key" + i); + LinkedHashSet completedTasksSequence = completedTasks.get("key" + i); + // Verify that the tasks have been started and completed in order. + int expectedTaskId = 0; + Iterator it1 = startedTasksSequence.iterator(); + Iterator it2 = completedTasksSequence.iterator(); + while (it1.hasNext() && it2.hasNext()) { + assertEquals(expectedTaskId, it1.next().intValue()); + assertEquals(expectedTaskId, it2.next().intValue()); + expectedTaskId++; + } + } + } +}