From 0559a75fc80317f79d39334b2f3a8ce2896e3d15 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 6 Jan 2017 10:53:26 +1100 Subject: [PATCH] make pubsub work with new gax update documentation links too --- google-cloud-core/pom.xml | 2 +- .../cloud/pubsub/MessagesProcessor.java | 3 - .../pubsub/PollingSubscriberConnection.java | 1 - .../java/com/google/cloud/pubsub/PubSub.java | 3 - .../com/google/cloud/pubsub/Publisher.java | 9 +-- .../com/google/cloud/pubsub/Subscriber.java | 62 +++++++++---------- .../com/google/cloud/pubsub/Subscription.java | 20 ++---- .../google/cloud/pubsub/SubscriptionInfo.java | 56 +++++++---------- .../cloud/pubsub/PublisherImplTest.java | 10 ++- 9 files changed, 73 insertions(+), 93 deletions(-) diff --git a/google-cloud-core/pom.xml b/google-cloud-core/pom.xml index 190cbe8ec854..d395b23313f9 100644 --- a/google-cloud-core/pom.xml +++ b/google-cloud-core/pom.xml @@ -111,7 +111,7 @@ com.google.api gax - 0.0.27-SNAPSHOT + 0.0.27 io.grpc diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java index 855f2d3b5545..32e00bbb7322 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/MessagesProcessor.java @@ -17,19 +17,16 @@ package com.google.cloud.pubsub; import com.google.api.gax.bundling.FlowController; -import com.google.auth.Credentials; import com.google.cloud.Clock; import com.google.cloud.pubsub.Subscriber.MessageReceiver; import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; -import io.grpc.Status; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java index ec292451029e..e5328cce07ae 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PollingSubscriberConnection.java @@ -40,7 +40,6 @@ import io.grpc.Channel; import io.grpc.StatusRuntimeException; import io.grpc.auth.MoreCallCredentials; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index ee1d6cfe0f56..9439e4665baf 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -16,14 +16,11 @@ package com.google.cloud.pubsub; -import com.google.auto.value.AutoValue; import com.google.cloud.AsyncPage; import com.google.cloud.GrpcServiceOptions.ExecutorFactory; import com.google.cloud.Page; import com.google.cloud.Policy; import com.google.cloud.Service; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import java.io.IOException; import java.util.List; import java.util.Map; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java index 8428cce91e90..9013b94bef43 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java @@ -143,8 +143,9 @@ public interface Publisher { * #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}). * *

If set to false, a publish call will fail with either {@link - * RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException}, as - * appropriate, when flow control limits are reached. + * RequestByteMaxOutstandingReachedException} or {@link + * ElementCountMaxOutstandingReachedException}, as appropriate, when flow control limits are + * reached. */ boolean failOnFlowControlLimits(); @@ -250,8 +251,8 @@ public Builder setFlowControlSettings(FlowController.Settings flowControlSetting /** * Whether to fail publish when reaching any of the flow control limits, with either a {@link - * RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException} as - * appropriate. + * RequestByteMaxOutstandingReachedException} or {@link + * ElementCountMaxOutstandingReachedException} as appropriate. * *

If set to false, then publish operations will block the current thread until the * outstanding requests go under the limits. diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java index a193510f398a..ba5931b09248 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscriber.java @@ -37,9 +37,9 @@ * *

A {@link Subscriber} allows you to provide an implementation of a {@link MessageReceiver * receiver} to which messages are going to be delivered as soon as they are received by the - * subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link - * AckReply#NACK nacked} at will as they get processed by the receiver. Nacking a - * messages implies a later redelivery of such message. + * subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link AckReply#NACK + * nacked} at will as they get processed by the receiver. Nacking a messages implies a later + * redelivery of such message. * *

The subscriber handles the ack management, by automatically extending the ack deadline while * the message is being processed, to then issue the ack or nack of such message when the processing @@ -54,38 +54,38 @@ * in memory before the receiver either ack or nack them. * * - *

If no credentials are provided, the {@link Publisher} will use application default - * credentials through {@link GoogleCredentials#getApplicationDefault}. + *

If no credentials are provided, the {@link Publisher} will use application default credentials + * through {@link GoogleCredentials#getApplicationDefault}. * *

For example, a {@link Subscriber} can be constructed and used to receive messages as follows: * - *

- *  MessageReceiver receiver =
- *      message -> {
- *        // ... process message ...
- *        return Futures.immediateFuture(AckReply.ACK);
- *      });
+ * 
{@code
+ * MessageReceiver receiver = new MessageReceiver() {
+ *   @Override
+ *   public ListenableFuture receiveMessage(PubsubMessage message) {
+ *     // ... process message ...
+ *     return Futures.immediateFuture(AckReply.ACK);
+ *   }
+ * }
  *
- *  Subscriber subscriber =
- *      Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
- *          .setMaxBundleAcks(100)
- *          .build();
+ * Subscriber subscriber =
+ *     Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
+ *         .setMaxBundleAcks(100)
+ *         .build();
  *
- *  subscriber.startAsync();
+ * subscriber.startAsync();
  *
- *  ... recommended, listen for fatal errors that break the subscriber streaming ...
- *  subscriber.addListener(
-        new Listener() {
-          @Override
-          public void failed(State from, Throwable failure) {
-            System.out.println("Subscriber faile with error: " + failure);
-          }
-        },
-        Executors.newSingleThreadExecutor());
+ * // ... recommended, listen for fatal errors that break the subscriber streaming ...
+ * subscriber.addListener(new Listener() {
+ *   @Override
+ *   public void failed(State from, Throwable failure) {
+ *     System.out.println("Subscriber faile with error: " + failure);
+ *   }
+ * }, Executors.newSingleThreadExecutor());
  *
- *  ... and when done with the subscriber ...
- *  subscriber.stopAsync();
- * 
+ * // ... and when done with the subscriber ... + * subscriber.stopAsync(); + * }
*/ public interface Subscriber extends Service { String PUBSUB_API_ADDRESS = "pubsub.googleapis.com"; @@ -129,13 +129,11 @@ public static enum AckReply { * *

When limits are enforced, no more messages will be dispatched to the {@link * MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window - * management, still some extra bytes could be kept at lower layers. + * management, still some extra bytes could be kept at lower layers. */ Optional getMaxOutstandingElementCount(); - /** - * Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. - */ + /** Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. */ Optional getMaxOutstandingRequestBytes(); /** Builder of {@link Subscriber Subscribers}. */ diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java index f0f589410300..6f07c120c66c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java @@ -18,14 +18,10 @@ import static com.google.common.base.Preconditions.checkNotNull; -import com.google.cloud.GrpcServiceOptions; import com.google.cloud.Policy; -import com.google.cloud.pubsub.PubSub.PullOption; import com.google.common.base.Function; - import java.io.IOException; import java.io.ObjectInputStream; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.Future; @@ -41,18 +37,12 @@ * processed and the Pub/Sub system can delete it from the subscription; a non-success response * indicates that the Pub/Sub server should resend it (implicit "nack"). * - *

In a pull subscription, the subscribing application must explicitly pull messages using one of - * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or - * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}. - * When messages are pulled with {@link PubSub#pull(String, int)} or - * {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly - * acknowledge them using one of {@link PubSub#ack(String, Iterable)}, - * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or - * {@link PubSub#ackAsync(String, String, String...)}. + *

In a pull subscription, the subscribing application must pull messages using {@link + * PubSub#getSubscriber(SubscriptionInfo, Subscriber.MessageReceiver)}. * - *

{@code Subscription} adds a layer of service-related functionality over - * {@link SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription} - * object with the most recent information use {@link #reload} or {@link #reloadAsync}. + *

{@code Subscription} adds a layer of service-related functionality over {@link + * SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription} object with + * the most recent information use {@link #reload} or {@link #reloadAsync}. * * @see Pub/Sub Data Model * @see Subscriber Guide diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java index 5bbc4cd161bc..b7ddc5bea16f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/SubscriptionInfo.java @@ -20,10 +20,8 @@ import com.google.cloud.pubsub.spi.v1.SubscriberClient; import com.google.common.base.MoreObjects; - import java.io.Serializable; import java.util.Objects; -import java.util.concurrent.TimeUnit; /** * A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a @@ -36,14 +34,8 @@ * processed and the Pub/Sub system can delete it from the subscription; a non-success response * indicates that the Pub/Sub server should resend it (implicit "nack"). * - *

In a pull subscription, the subscribing application must explicitly pull messages using one of - * {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or - * {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}. - * When messages are pulled with {@link PubSub#pull(String, int)} or - * {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly - * acknowledge them using one of {@link PubSub#ack(String, Iterable)}, - * {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or - * {@link PubSub#ackAsync(String, String, String...)}. + *

In a pull subscription, the subscribing application must pull messages using {@link + * PubSub#getSubscriber(SubscriptionInfo, Subscriber.MessageReceiver)}. * * @see Pub/Sub Data Model * @see Subscriber Guide @@ -140,10 +132,10 @@ public abstract static class Builder { * acknowledge the message. After message delivery but before the ack deadline expires and * before the message is acknowledged, it is an outstanding message and will not be delivered * again during that time (on a best-effort basis). For pull subscriptions, this value is used - * as the initial value for the ack deadline. To override the ack deadline value for a given - * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. - * This value must be between 10 and 600 seconds, if not specified, 10 seconds is used. + * as the initial value for the ack deadline, and {@link Subscriber} automatically renews + * unprocessed messages. For push delivery, this value is used to set the request timeout for + * the call to the push endpoint. This value must be between 10 and 600 seconds, if not + * specified, 10 seconds is used. */ @Deprecated public abstract Builder ackDeadLineSeconds(int ackDeadLineSeconds); @@ -153,10 +145,10 @@ public abstract static class Builder { * acknowledge the message. After message delivery but before the ack deadline expires and * before the message is acknowledged, it is an outstanding message and will not be delivered * again during that time (on a best-effort basis). For pull subscriptions, this value is used - * as the initial value for the ack deadline. To override the ack deadline value for a given - * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. - * This value must be between 10 and 600 seconds, if not specified, 10 seconds is used. + * as the initial value for the ack deadline. , and {@link Subscriber} automatically renews + * unprocessed messages. For push delivery, this value is used to set the request timeout for + * the call to the push endpoint. This value must be between 10 and 600 seconds, if not + * specified, 10 seconds is used. */ public abstract Builder setAckDeadLineSeconds(int ackDeadLineSeconds); @@ -333,13 +325,13 @@ public PushConfig getPushConfig() { /** * Returns the maximum time after a subscriber receives a message before the subscriber should - * acknowledge the message. After message delivery but before the ack deadline expires and - * before the message is acknowledged, it is an outstanding message and will not be delivered - * again during that time (on a best-effort basis). For pull subscriptions, this value is used - * as the initial value for the ack deadline. To override the ack deadline value for a given - * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. This - * value must be between 10 and 600 seconds, if not specified, 10 seconds is used. + * acknowledge the message. After message delivery but before the ack deadline expires and before + * the message is acknowledged, it is an outstanding message and will not be delivered again + * during that time (on a best-effort basis). For pull subscriptions, this value is used as the + * initial value for the ack deadline, and {@link Subscriber} automatically renews unprocessed + * messages. For push delivery, this value is used to set the request timeout for the call to the + * push endpoint. This value must be between 10 and 600 seconds, if not specified, 10 seconds is + * used. */ @Deprecated public long ackDeadlineSeconds() { @@ -348,13 +340,13 @@ public long ackDeadlineSeconds() { /** * Returns the maximum time after a subscriber receives a message before the subscriber should - * acknowledge the message. After message delivery but before the ack deadline expires and - * before the message is acknowledged, it is an outstanding message and will not be delivered - * again during that time (on a best-effort basis). For pull subscriptions, this value is used - * as the initial value for the ack deadline. To override the ack deadline value for a given - * message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push - * delivery, this value is used to set the request timeout for the call to the push endpoint. This - * value must be between 10 and 600 seconds, if not specified, 10 seconds is used. + * acknowledge the message. After message delivery but before the ack deadline expires and before + * the message is acknowledged, it is an outstanding message and will not be delivered again + * during that time (on a best-effort basis). For pull subscriptions, this value is used as the + * initial value for the ack deadline, and {@link Subscriber} automatically renews unprocessed + * messages. For push delivery, this value is used to set the request timeout for the call to the + * push endpoint. This value must be between 10 and 600 seconds, if not specified, 10 seconds is + * used. */ public long getAckDeadlineSeconds() { return ackDeadlineSeconds; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java index 74bf9e28f53b..bd906b33ca74 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PublisherImplTest.java @@ -410,7 +410,10 @@ public void testBuilderInvalidArguments() { } try { builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold((Long)null).build()); + Publisher.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setRequestByteThreshold((Long) null) + .build()); fail("Should have thrown an NullPointerException"); } catch (NullPointerException expected) { // Expected @@ -454,7 +457,10 @@ public void testBuilderInvalidArguments() { Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build()); try { builder.setBundlingSettings( - Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold((Long)null).build()); + Publisher.DEFAULT_BUNDLING_SETTINGS + .toBuilder() + .setElementCountThreshold((Long) null) + .build()); fail("Should have thrown an NullPointerException"); } catch (NullPointerException expected) { // Expected