From 365a0f7d295d89a162ee1ebda9b092c2255ad4fc Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Tue, 28 Feb 2017 11:34:33 +1100 Subject: [PATCH] add test for Publisher and Subscriber snippets (#1658) --- .../pubsub/snippets/PublisherSnippets.java | 5 +- .../pubsub/snippets/SubscriberSnippets.java | 2 +- .../pubsub/snippets/ITPubSubSnippets.java | 88 +++++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java index 7b8d5acf8fc8..28cb71f615ea 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.google.cloud.examples.pubsub; +package com.google.cloud.examples.pubsub.snippets; import com.google.api.gax.core.RpcFuture; import com.google.api.gax.core.RpcFutureCallback; @@ -35,7 +35,7 @@ public PublisherSnippets(Publisher publisher) { */ // [TARGET publish(PubsubMessage)] // [VARIABLE "my_message"] - public void publish(String message) { + public RpcFuture publish(String message) { // [START publish] ByteString data = ByteString.copyFromUtf8(message); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); @@ -50,6 +50,7 @@ public void onFailure(Throwable t) { } }); // [END publish] + return messageIdFuture; } /** diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java index 13bc5a6c6cdd..8ca753fc266e 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java @@ -50,7 +50,7 @@ public SubscriberSnippets( * Example of receiving a specific number of messages. */ // [TARGET startAsync()] - public void startAsync() throws Exception { + public void startAndWait() throws Exception { // [START startAsync] Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build(); subscriber.addListener(new Subscriber.SubscriberListener() { diff --git a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java index 1d69d94aaa18..1bf5a2bed2f9 100644 --- a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java +++ b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.google.api.gax.core.RpcFutureCallback; +import com.google.api.gax.core.SettableRpcFuture; import com.google.cloud.Identity; import com.google.cloud.Page; import com.google.cloud.Policy; @@ -33,14 +35,25 @@ import com.google.cloud.pubsub.deprecated.SubscriptionInfo; import com.google.cloud.pubsub.deprecated.Topic; import com.google.cloud.pubsub.deprecated.TopicInfo; +import com.google.cloud.pubsub.spi.v1.Publisher; +import com.google.cloud.pubsub.spi.v1.PublisherClient; +import com.google.cloud.pubsub.spi.v1.SubscriberClient; import com.google.common.collect.Iterators; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -264,4 +277,79 @@ public void testTopicPolicyAsync() throws ExecutionException, InterruptedExcepti topic.delete(); subscription.delete(); } + + @Test + public void testPublisherSubscriber() throws Exception { + TopicName topicName = + TopicName.create(pubsub.getOptions().getProjectId(), formatForTest("test-topic")); + SubscriptionName subscriptionName = + SubscriptionName.create( + pubsub.getOptions().getProjectId(), formatForTest("test-subscription")); + try (PublisherClient publisherClient = PublisherClient.create(); + SubscriberClient subscriberClient = SubscriberClient.create()) { + publisherClient.createTopic(topicName); + subscriberClient.createSubscription( + subscriptionName, topicName, PushConfig.getDefaultInstance(), 0); + + testPublisherSubscriberHelper(topicName, subscriptionName); + + subscriberClient.deleteSubscription(subscriptionName); + publisherClient.deleteTopic(topicName); + } + } + + private void testPublisherSubscriberHelper( + TopicName topicName, SubscriptionName subscriptionName) throws Exception { + String messageToPublish = "my-message"; + + Publisher publisher = null; + try { + publisher = Publisher.newBuilder(topicName).build(); + PublisherSnippets snippets = new PublisherSnippets(publisher); + final SettableRpcFuture done = new SettableRpcFuture<>(); + snippets + .publish(messageToPublish) + .addCallback( + new RpcFutureCallback() { + public void onSuccess(String messageId) { + done.set(null); + } + + public void onFailure(Throwable t) { + done.setException(t); + } + }); + done.get(); + } finally { + if (publisher != null) { + publisher.shutdown(); + } + } + + final BlockingQueue queue = new ArrayBlockingQueue<>(1); + final SettableRpcFuture done = new SettableRpcFuture<>(); + final SettableRpcFuture received = new SettableRpcFuture<>(); + SubscriberSnippets snippets = + new SubscriberSnippets( + subscriptionName, + new MessageReceiverSnippets(queue).messageReceiver(), + done, + MoreExecutors.directExecutor()); + new Thread(new Runnable() { + @Override + public void run() { + try { + received.set(queue.poll(10, TimeUnit.MINUTES)); + } catch (InterruptedException e) { + received.set(null); + } + done.set(null); // signal the subscriber to clean up + } + }).start(); + snippets.startAndWait(); // blocks until done is set + + PubsubMessage message = received.get(); + assertNotNull(message); + assertEquals(message.getData().toStringUtf8(), messageToPublish); + } }