diff --git a/pubsub/cloud-client/src/main/java/com/example/pubsub/PublisherExample.java b/pubsub/cloud-client/src/main/java/com/example/pubsub/PublisherExample.java index 454eeeeae7f..3969aad4ad2 100644 --- a/pubsub/cloud-client/src/main/java/com/example/pubsub/PublisherExample.java +++ b/pubsub/cloud-client/src/main/java/com/example/pubsub/PublisherExample.java @@ -18,15 +18,17 @@ // [START pubsub_quickstart_publisher] import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.api.gax.rpc.ApiException; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + public class PublisherExample { // use the default project id @@ -41,6 +43,8 @@ public static void main(String... args) throws Exception { int messageCount = Integer.parseInt(args[1]); ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId); Publisher publisher = null; + List> futures = new ArrayList<>(); + try { // Create a publisher instance with default settings bound to the topic publisher = Publisher.newBuilder(topicName).build(); @@ -54,31 +58,18 @@ public static void main(String... args) throws Exception { .setData(data) .build(); - //schedule a message to be published, messages are automatically batched + // Schedule a message to be published. Messages are automatically batched. ApiFuture future = publisher.publish(pubsubMessage); - - // add an asynchronous callback to handle success / failure - ApiFutures.addCallback(future, new ApiFutureCallback() { - - @Override - public void onFailure(Throwable throwable) { - if (throwable instanceof ApiException) { - ApiException apiException = ((ApiException) throwable); - // details on the API exception - System.out.println(apiException.getStatusCode().getCode()); - System.out.println(apiException.isRetryable()); - } - System.out.println("Error publishing message : " + message); - } - - @Override - public void onSuccess(String messageId) { - // Once published, returns server-assigned message ids (unique within the topic) - System.out.println(messageId); - } - }); + futures.add(future); } } finally { + // Wait on any pending requests + List messageIds = ApiFutures.allAsList(futures).get(); + + for (String messageId : messageIds) { + System.out.println(messageId); + } + if (publisher != null) { // When finished with the publisher, shutdown to free up resources. publisher.shutdown();