diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 800d26c41ce9..166aad008c99 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -16,9 +16,9 @@ package com.google.cloud.pubsub; -import static com.google.api.client.util.Preconditions.checkArgument; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.lazyTransform; import com.google.cloud.AsyncPage; @@ -27,6 +27,7 @@ import com.google.cloud.Page; import com.google.cloud.PageImpl; import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; import com.google.cloud.pubsub.spi.v1.PublisherApi; import com.google.cloud.pubsub.spi.v1.SubscriberApi; import com.google.common.annotations.VisibleForTesting; @@ -476,15 +477,24 @@ public Future> pullAsync(final String subscription, in .setMaxMessages(maxMessages) .setReturnImmediately(true) .build(); - Future response = rpc.pull(request); - return lazyTransform(response, new Function>() { + PullFuture future = rpc.pull(request); + future.addCallback(new PubSubRpc.PullCallback() { @Override - public Iterator apply(PullResponse pullResponse) { - // Add all received messages to the automatic ack deadline renewer - List ackIds = Lists.transform(pullResponse.getReceivedMessagesList(), + public void success(PullResponse response) { + List ackIds = Lists.transform(response.getReceivedMessagesList(), MESSAGE_TO_ACK_ID_FUNCTION); ackDeadlineRenewer.add(subscription, ackIds); - return Iterators.transform(pullResponse.getReceivedMessagesList().iterator(), + } + + @Override + public void failure(Throwable error) { + // ignore + } + }); + return lazyTransform(future, new Function>() { + @Override + public Iterator apply(PullResponse response) { + return Iterators.transform(response.getReceivedMessagesList().iterator(), new Function() { @Override public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessage) { diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index 4ac3837088f8..b0f9b76e3920 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -31,6 +31,8 @@ import com.google.cloud.pubsub.spi.v1.SubscriberSettings; import com.google.common.base.Function; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ForwardingListenableFuture; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Empty; @@ -89,6 +91,30 @@ protected ExecutorFactory executorFactory() { } } + private static final class PullFutureImpl + extends ForwardingListenableFuture.SimpleForwardingListenableFuture + implements PullFuture { + + PullFutureImpl(ListenableFuture delegate) { + super(delegate); + } + + @Override + public void addCallback(final PullCallback callback) { + Futures.addCallback(delegate(), new FutureCallback() { + @Override + public void onSuccess(PullResponse result) { + callback.success(result); + } + + @Override + public void onFailure(Throwable error) { + callback.failure(error); + } + }); + } + } + public DefaultPubSubRpc(PubSubOptions options) throws IOException { executorFactory = new InternalPubSubOptions(options).executorFactory(); executor = executorFactory.get(); @@ -136,13 +162,13 @@ private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) { return ApiCallSettings.newBuilder().setRetrySettingsBuilder(builder); } - private static Future translate(ListenableFuture from, final boolean idempotent, - int... returnNullOn) { + private static ListenableFuture translate(ListenableFuture from, + final boolean idempotent, int... returnNullOn) { final Set returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length); for (int value : returnNullOn) { returnNullOnSet.add(value); } - return Futures.catching(from, ApiException.class, new Function() { + return Futures.catching(from, ApiException.class, new Function() { @Override public V apply(ApiException exception) { if (returnNullOnSet.contains(exception.getStatusCode().value())) { @@ -224,8 +250,8 @@ public Future acknowledge(AcknowledgeRequest request) { } @Override - public Future pull(PullRequest request) { - return translate(subscriberApi.pullCallable().futureCall(request), false); + public PullFuture pull(PullRequest request) { + return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false)); } @Override diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java index 6d738cd554c4..c9bd77f46262 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/PubSubRpc.java @@ -41,6 +41,44 @@ public interface PubSubRpc extends AutoCloseable { + /** + * A callback that can be registered to {@link PullFuture} objects. Objects of this class allow + * to asynchronously react to the success or failure of a pull RPC. + */ + interface PullCallback { + + /** + * This method is invoked with the result of a {@link PullFuture} when it was successful. + * + * @param response the pull response + */ + void success(PullResponse response); + + /** + * This method is invoked when the {@link PullFuture} failed or was cancelled. + * + * @param error the execption that caused the {@link PullFuture} to fail + */ + void failure(Throwable error); + } + + /** + * A {@link Future} implementation for pull RPCs. This class also allows users to register + * callbacks via {@link #addCallback(PullCallback)}. + */ + interface PullFuture extends Future { + + /** + * Registers a callback to be run on the given executor. The listener will run when the pull + * future completed its computation or, if the computation is already complete, immediately. + * There is no guaranteed ordering of execution of callbacks. + * + *

Registered callbacks are run using the same thread that run the RPC call. Only lightweight + * callbacks should be registered via this method. + */ + void addCallback(final PullCallback callback); + } + // in all cases root cause of ExecutionException is PubSubException Future create(Topic topic); @@ -66,7 +104,7 @@ public interface PubSubRpc extends AutoCloseable { Future acknowledge(AcknowledgeRequest request); - Future pull(PullRequest request); + PullFuture pull(PullRequest request); Future modify(ModifyPushConfigRequest request); } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index 8eb03a7de073..2f5a71195df0 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -22,10 +22,10 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import com.google.api.client.util.Lists; import com.google.cloud.AsyncPage; import com.google.cloud.Page; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.junit.Ignore; diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index 7a9fee0d999d..b310c72fe850 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -23,12 +23,15 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import com.google.cloud.AsyncPage; import com.google.cloud.Page; import com.google.cloud.RetryParams; import com.google.cloud.pubsub.PubSub.ListOption; import com.google.cloud.pubsub.spi.PubSubRpc; +import com.google.cloud.pubsub.spi.PubSubRpc.PullCallback; +import com.google.cloud.pubsub.spi.PubSubRpc.PullFuture; import com.google.cloud.pubsub.spi.PubSubRpcFactory; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; @@ -55,6 +58,7 @@ import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; +import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.After; import org.junit.Before; @@ -62,6 +66,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; @@ -1229,7 +1234,7 @@ public void testListTopicSubscriptionsAsyncWithOptions() } @Test - public void testPullMessages() { + public void testPullMessages() throws ExecutionException, InterruptedException { pubsub = new PubSubImpl(options, renewerMock); PullRequest request = PullRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1243,10 +1248,16 @@ public void testPullMessages() { .addReceivedMessages(MESSAGE_PB1) .addReceivedMessages(MESSAGE_PB2) .build(); - EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response)); + Capture callback = Capture.newInstance(); + PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class); + futureMock.addCallback(EasyMock.capture(callback)); + EasyMock.expectLastCall(); + EasyMock.expect(futureMock.get()).andReturn(response); + EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock); renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2")); - EasyMock.replay(pubsubRpcMock, renewerMock); + EasyMock.replay(pubsubRpcMock, renewerMock, futureMock); Iterator messageIterator = pubsub.pull(SUBSCRIPTION, 42); + callback.getValue().success(response); EasyMock.reset(renewerMock); for (ReceivedMessage message : messageList) { renewerMock.remove(SUBSCRIPTION, message.ackId()); @@ -1256,6 +1267,7 @@ public void testPullMessages() { while (messageIterator.hasNext()) { messageIterator.next(); } + EasyMock.verify(futureMock); } @Test @@ -1273,10 +1285,16 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept .addReceivedMessages(MESSAGE_PB1) .addReceivedMessages(MESSAGE_PB2) .build(); - EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response)); + Capture callback = Capture.newInstance(); + PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class); + futureMock.addCallback(EasyMock.capture(callback)); + EasyMock.expectLastCall(); + EasyMock.expect(futureMock.get()).andReturn(response); + EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock); renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2")); - EasyMock.replay(pubsubRpcMock, renewerMock); + EasyMock.replay(pubsubRpcMock, renewerMock, futureMock); Iterator messageIterator = pubsub.pullAsync(SUBSCRIPTION, 42).get(); + callback.getValue().success(response); EasyMock.reset(renewerMock); for (ReceivedMessage message : messageList) { renewerMock.remove(SUBSCRIPTION, message.ackId()); @@ -1286,6 +1304,55 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept while (messageIterator.hasNext()) { messageIterator.next(); } + EasyMock.verify(futureMock); + } + + @Test + public void testPullMessagesError() throws ExecutionException, InterruptedException { + pubsub = new PubSubImpl(options, renewerMock); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(42) + .setReturnImmediately(true) + .build(); + PubSubException exception = new PubSubException(new IOException(), false); + PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class); + futureMock.addCallback(EasyMock.anyObject(PullCallback.class)); + EasyMock.expectLastCall(); + EasyMock.expect(futureMock.get()).andThrow(new ExecutionException(exception)); + EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock); + EasyMock.replay(pubsubRpcMock, renewerMock, futureMock); + try { + pubsub.pull(SUBSCRIPTION, 42); + fail("Expected PubSubException"); + } catch (PubSubException ex) { + assertSame(exception, ex); + } + EasyMock.verify(futureMock); + } + + @Test + public void testPullMessagesAsyncError() throws ExecutionException, InterruptedException { + pubsub = new PubSubImpl(options, renewerMock); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(42) + .setReturnImmediately(true) + .build(); + PubSubException exception = new PubSubException(new IOException(), false); + PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class); + futureMock.addCallback(EasyMock.anyObject(PullCallback.class)); + EasyMock.expectLastCall(); + EasyMock.expect(futureMock.get()).andThrow(new ExecutionException(exception)); + EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(futureMock); + EasyMock.replay(pubsubRpcMock, renewerMock, futureMock); + try { + pubsub.pullAsync(SUBSCRIPTION, 42).get(); + fail("Expected ExecutionException"); + } catch (ExecutionException ex) { + assertSame(exception, ex.getCause()); + } + EasyMock.verify(futureMock); } @Test