Skip to content

Commit

Permalink
Update PublisherExample.java (#1097)
Browse files Browse the repository at this point in the history
* Update PublisherExample.java
  • Loading branch information
chenyumic authored and jabubake committed Jun 29, 2018
1 parent 1d07106 commit 86603c2
Showing 1 changed file with 15 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
// [START pubsub_quickstart_publisher]

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class PublisherExample {

// use the default project id
Expand All @@ -41,6 +43,8 @@ public static void main(String... args) throws Exception {
int messageCount = Integer.parseInt(args[1]);
ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
Publisher publisher = null;
List<ApiFuture<String>> futures = new ArrayList<>();

try {
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).build();
Expand All @@ -54,31 +58,18 @@ public static void main(String... args) throws Exception {
.setData(data)
.build();

//schedule a message to be published, messages are automatically batched
// Schedule a message to be published. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);

// add an asynchronous callback to handle success / failure
ApiFutures.addCallback(future, new ApiFutureCallback<String>() {

@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// details on the API exception
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + message);
}

@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
System.out.println(messageId);
}
});
futures.add(future);
}
} finally {
// Wait on any pending requests
List<String> messageIds = ApiFutures.allAsList(futures).get();

for (String messageId : messageIds) {
System.out.println(messageId);
}

if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
Expand Down

0 comments on commit 86603c2

Please sign in to comment.