Skip to content

Commit

Permalink
add test for Publisher and Subscriber snippets
Browse files Browse the repository at this point in the history
  • Loading branch information
pongad committed Feb 24, 2017
1 parent d7267f5 commit d84ae75
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.google.cloud.examples.pubsub;
package com.google.cloud.examples.pubsub.snippets;

import com.google.api.gax.core.RpcFuture;
import com.google.api.gax.core.RpcFutureCallback;
Expand All @@ -35,7 +35,7 @@ public PublisherSnippets(Publisher publisher) {
*/
// [TARGET publish(PubsubMessage)]
// [VARIABLE "my_message"]
public void publish(String message) {
public RpcFuture<String> publish(String message) {
// [START publish]
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
Expand All @@ -50,6 +50,7 @@ public void onFailure(Throwable t) {
}
});
// [END publish]
return messageIdFuture;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SubscriberSnippets(
* Example of receiving a specific number of messages.
*/
// [TARGET startAsync()]
public void startAsync() throws Exception {
public void startAndWait() throws Exception {
// [START startAsync]
Subscriber subscriber = Subscriber.newBuilder(subscription, receiver).build();
subscriber.addListener(new Subscriber.SubscriberListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.core.RpcFutureCallback;
import com.google.api.gax.core.SettableRpcFuture;
import com.google.cloud.Identity;
import com.google.cloud.Page;
import com.google.cloud.Policy;
Expand All @@ -33,14 +35,25 @@
import com.google.cloud.pubsub.deprecated.SubscriptionInfo;
import com.google.cloud.pubsub.deprecated.Topic;
import com.google.cloud.pubsub.deprecated.TopicInfo;
import com.google.cloud.pubsub.spi.v1.Publisher;
import com.google.cloud.pubsub.spi.v1.PublisherClient;
import com.google.cloud.pubsub.spi.v1.SubscriberClient;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down Expand Up @@ -264,4 +277,79 @@ public void testTopicPolicyAsync() throws ExecutionException, InterruptedExcepti
topic.delete();
subscription.delete();
}

@Test
public void testPublisherSubscriber() throws Exception {
TopicName topicName =
TopicName.create(pubsub.getOptions().getProjectId(), formatForTest("test-topic"));
SubscriptionName subscriptionName =
SubscriptionName.create(
pubsub.getOptions().getProjectId(), formatForTest("test-subscription"));
try (PublisherClient publisherClient = PublisherClient.create();
SubscriberClient subscriberClient = SubscriberClient.create()) {
publisherClient.createTopic(topicName);
subscriberClient.createSubscription(
subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);

testPublisherSubscriberHelper(topicName, subscriptionName);

subscriberClient.deleteSubscription(subscriptionName);
publisherClient.deleteTopic(topicName);
}
}

private void testPublisherSubscriberHelper(
TopicName topicName, SubscriptionName subscriptionName) throws Exception {
String messageToPublish = "my-message";

Publisher publisher = null;
try {
publisher = Publisher.newBuilder(topicName).build();
PublisherSnippets snippets = new PublisherSnippets(publisher);
final SettableRpcFuture<Void> done = new SettableRpcFuture<>();
snippets
.publish(messageToPublish)
.addCallback(
new RpcFutureCallback<String>() {
public void onSuccess(String messageId) {
done.set(null);
}

public void onFailure(Throwable t) {
done.setException(t);
}
});
done.get();
} finally {
if (publisher != null) {
publisher.shutdown();
}
}

final BlockingQueue<PubsubMessage> queue = new ArrayBlockingQueue<>(1);
final SettableRpcFuture<Void> done = new SettableRpcFuture<>();
final SettableRpcFuture<PubsubMessage> received = new SettableRpcFuture<>();
SubscriberSnippets snippets =
new SubscriberSnippets(
subscriptionName,
new MessageReceiverSnippets(queue).messageReceiver(),
done,
MoreExecutors.directExecutor());
new Thread(new Runnable() {
@Override
public void run() {
try {
received.set(queue.poll(10, TimeUnit.MINUTES));
} catch (InterruptedException e) {
received.set(null);
}
done.set(null); // signal the subscriber to clean up
}
}).start();
snippets.startAndWait(); // blocks until done is set

PubsubMessage message = received.get();
assertNotNull(message);
assertEquals(message.getData().toStringUtf8(), messageToPublish);
}
}

0 comments on commit d84ae75

Please sign in to comment.