Skip to content

Commit

Permalink
make pubsub work with new gax
Browse files Browse the repository at this point in the history
update documentation links too
  • Loading branch information
pongad committed Jan 5, 2017
1 parent e575ba7 commit 0559a75
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 93 deletions.
2 changes: 1 addition & 1 deletion google-cloud-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>0.0.27-SNAPSHOT</version>
<version>0.0.27</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@
package com.google.cloud.pubsub;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.grpc.Channel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.MoreCallCredentials;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@

package com.google.cloud.pubsub;

import com.google.auto.value.AutoValue;
import com.google.cloud.AsyncPage;
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
import com.google.cloud.Page;
import com.google.cloud.Policy;
import com.google.cloud.Service;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ public interface Publisher {
* #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}).
*
* <p>If set to false, a publish call will fail with either {@link
* RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException}, as
* appropriate, when flow control limits are reached.
* RequestByteMaxOutstandingReachedException} or {@link
* ElementCountMaxOutstandingReachedException}, as appropriate, when flow control limits are
* reached.
*/
boolean failOnFlowControlLimits();

Expand Down Expand Up @@ -250,8 +251,8 @@ public Builder setFlowControlSettings(FlowController.Settings flowControlSetting

/**
* Whether to fail publish when reaching any of the flow control limits, with either a {@link
* RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException} as
* appropriate.
* RequestByteMaxOutstandingReachedException} or {@link
* ElementCountMaxOutstandingReachedException} as appropriate.
*
* <p>If set to false, then publish operations will block the current thread until the
* outstanding requests go under the limits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
*
* <p>A {@link Subscriber} allows you to provide an implementation of a {@link MessageReceiver
* receiver} to which messages are going to be delivered as soon as they are received by the
* subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link
* AckReply#NACK nacked} at will as they get processed by the receiver. Nacking a
* messages implies a later redelivery of such message.
* subscriber. The delivered messages then can be {@link AckReply#ACK acked} or {@link AckReply#NACK
* nacked} at will as they get processed by the receiver. Nacking a messages implies a later
* redelivery of such message.
*
* <p>The subscriber handles the ack management, by automatically extending the ack deadline while
* the message is being processed, to then issue the ack or nack of such message when the processing
Expand All @@ -54,38 +54,38 @@
* in memory before the receiver either ack or nack them.
* </ul>
*
* <p>If no credentials are provided, the {@link Publisher} will use application default
* credentials through {@link GoogleCredentials#getApplicationDefault}.
* <p>If no credentials are provided, the {@link Publisher} will use application default credentials
* through {@link GoogleCredentials#getApplicationDefault}.
*
* <p>For example, a {@link Subscriber} can be constructed and used to receive messages as follows:
*
* <pre>
* MessageReceiver receiver =
* message -> {
* // ... process message ...
* return Futures.immediateFuture(AckReply.ACK);
* });
* <pre>{@code
* MessageReceiver receiver = new MessageReceiver() {
* @Override
* public ListenableFuture<AckReply> receiveMessage(PubsubMessage message) {
* // ... process message ...
* return Futures.immediateFuture(AckReply.ACK);
* }
* }
*
* Subscriber subscriber =
* Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
* .setMaxBundleAcks(100)
* .build();
* Subscriber subscriber =
* Subscriber.Builder.newBuilder(MY_SUBSCRIPTION, receiver)
* .setMaxBundleAcks(100)
* .build();
*
* subscriber.startAsync();
* subscriber.startAsync();
*
* ... recommended, listen for fatal errors that break the subscriber streaming ...
* subscriber.addListener(
new Listener() {
@Override
public void failed(State from, Throwable failure) {
System.out.println("Subscriber faile with error: " + failure);
}
},
Executors.newSingleThreadExecutor());
* // ... recommended, listen for fatal errors that break the subscriber streaming ...
* subscriber.addListener(new Listener() {
* @Override
* public void failed(State from, Throwable failure) {
* System.out.println("Subscriber faile with error: " + failure);
* }
* }, Executors.newSingleThreadExecutor());
*
* ... and when done with the subscriber ...
* subscriber.stopAsync();
* </pre>
* // ... and when done with the subscriber ...
* subscriber.stopAsync();
* }</pre>
*/
public interface Subscriber extends Service {
String PUBSUB_API_ADDRESS = "pubsub.googleapis.com";
Expand Down Expand Up @@ -129,13 +129,11 @@ public static enum AckReply {
*
* <p><b>When limits are enforced, no more messages will be dispatched to the {@link
* MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window
* management, still some extra bytes could be kept at lower layers.
* management, still some extra bytes could be kept at lower layers.</b>
*/
Optional<Integer> getMaxOutstandingElementCount();

/**
* Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced.
*/
/** Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced. */
Optional<Integer> getMaxOutstandingRequestBytes();

/** Builder of {@link Subscriber Subscribers}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.GrpcServiceOptions;
import com.google.cloud.Policy;
import com.google.cloud.pubsub.PubSub.PullOption;
import com.google.common.base.Function;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Future;
Expand All @@ -41,18 +37,12 @@
* processed and the Pub/Sub system can delete it from the subscription; a non-success response
* indicates that the Pub/Sub server should resend it (implicit "nack").
*
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
* When messages are pulled with {@link PubSub#pull(String, int)} or
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
* {@link PubSub#ackAsync(String, String, String...)}.
* <p>In a pull subscription, the subscribing application must pull messages using {@link
* PubSub#getSubscriber(SubscriptionInfo, Subscriber.MessageReceiver)}.
*
* <p>{@code Subscription} adds a layer of service-related functionality over
* {@link SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription}
* object with the most recent information use {@link #reload} or {@link #reloadAsync}.
* <p>{@code Subscription} adds a layer of service-related functionality over {@link
* SubscriptionInfo}. Objects of this class are immutable. To get a {@code Subscription} object with
* the most recent information use {@link #reload} or {@link #reloadAsync}.
*
* @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
* @see <a href="https://cloud.google.com/pubsub/subscriber">Subscriber Guide</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@

import com.google.cloud.pubsub.spi.v1.SubscriberClient;
import com.google.common.base.MoreObjects;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* A Google Cloud Pub/Sub subscription. A subscription represents the stream of messages from a
Expand All @@ -36,14 +34,8 @@
* processed and the Pub/Sub system can delete it from the subscription; a non-success response
* indicates that the Pub/Sub server should resend it (implicit "nack").
*
* <p>In a pull subscription, the subscribing application must explicitly pull messages using one of
* {@link PubSub#pull(String, int)}, {@link PubSub#pullAsync(String, int)} or
* {@link PubSub#pullAsync(String, PubSub.MessageProcessor callback, PubSub.PullOption...)}.
* When messages are pulled with {@link PubSub#pull(String, int)} or
* {@link PubSub#pullAsync(String, int)} the subscribing application must also explicitly
* acknowledge them using one of {@link PubSub#ack(String, Iterable)},
* {@link PubSub#ack(String, String, String...)}, {@link PubSub#ackAsync(String, Iterable)} or
* {@link PubSub#ackAsync(String, String, String...)}.
* <p>In a pull subscription, the subscribing application must pull messages using {@link
* PubSub#getSubscriber(SubscriptionInfo, Subscriber.MessageReceiver)}.
*
* @see <a href="https://cloud.google.com/pubsub/overview#data_model">Pub/Sub Data Model</a>
* @see <a href="https://cloud.google.com/pubsub/subscriber">Subscriber Guide</a>
Expand Down Expand Up @@ -140,10 +132,10 @@ public abstract static class Builder {
* acknowledge the message. After message delivery but before the ack deadline expires and
* before the message is acknowledged, it is an outstanding message and will not be delivered
* again during that time (on a best-effort basis). For pull subscriptions, this value is used
* as the initial value for the ack deadline. To override the ack deadline value for a given
* message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push
* delivery, this value is used to set the request timeout for the call to the push endpoint.
* This value must be between 10 and 600 seconds, if not specified, 10 seconds is used.
* as the initial value for the ack deadline, and {@link Subscriber} automatically renews
* unprocessed messages. For push delivery, this value is used to set the request timeout for
* the call to the push endpoint. This value must be between 10 and 600 seconds, if not
* specified, 10 seconds is used.
*/
@Deprecated
public abstract Builder ackDeadLineSeconds(int ackDeadLineSeconds);
Expand All @@ -153,10 +145,10 @@ public abstract static class Builder {
* acknowledge the message. After message delivery but before the ack deadline expires and
* before the message is acknowledged, it is an outstanding message and will not be delivered
* again during that time (on a best-effort basis). For pull subscriptions, this value is used
* as the initial value for the ack deadline. To override the ack deadline value for a given
* message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push
* delivery, this value is used to set the request timeout for the call to the push endpoint.
* This value must be between 10 and 600 seconds, if not specified, 10 seconds is used.
* as the initial value for the ack deadline. , and {@link Subscriber} automatically renews
* unprocessed messages. For push delivery, this value is used to set the request timeout for
* the call to the push endpoint. This value must be between 10 and 600 seconds, if not
* specified, 10 seconds is used.
*/
public abstract Builder setAckDeadLineSeconds(int ackDeadLineSeconds);

Expand Down Expand Up @@ -333,13 +325,13 @@ public PushConfig getPushConfig() {

/**
* Returns the maximum time after a subscriber receives a message before the subscriber should
* acknowledge the message. After message delivery but before the ack deadline expires and
* before the message is acknowledged, it is an outstanding message and will not be delivered
* again during that time (on a best-effort basis). For pull subscriptions, this value is used
* as the initial value for the ack deadline. To override the ack deadline value for a given
* message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push
* delivery, this value is used to set the request timeout for the call to the push endpoint. This
* value must be between 10 and 600 seconds, if not specified, 10 seconds is used.
* acknowledge the message. After message delivery but before the ack deadline expires and before
* the message is acknowledged, it is an outstanding message and will not be delivered again
* during that time (on a best-effort basis). For pull subscriptions, this value is used as the
* initial value for the ack deadline, and {@link Subscriber} automatically renews unprocessed
* messages. For push delivery, this value is used to set the request timeout for the call to the
* push endpoint. This value must be between 10 and 600 seconds, if not specified, 10 seconds is
* used.
*/
@Deprecated
public long ackDeadlineSeconds() {
Expand All @@ -348,13 +340,13 @@ public long ackDeadlineSeconds() {

/**
* Returns the maximum time after a subscriber receives a message before the subscriber should
* acknowledge the message. After message delivery but before the ack deadline expires and
* before the message is acknowledged, it is an outstanding message and will not be delivered
* again during that time (on a best-effort basis). For pull subscriptions, this value is used
* as the initial value for the ack deadline. To override the ack deadline value for a given
* message, use {@link PubSub#modifyAckDeadline(String, int, TimeUnit, Iterable)}. For push
* delivery, this value is used to set the request timeout for the call to the push endpoint. This
* value must be between 10 and 600 seconds, if not specified, 10 seconds is used.
* acknowledge the message. After message delivery but before the ack deadline expires and before
* the message is acknowledged, it is an outstanding message and will not be delivered again
* during that time (on a best-effort basis). For pull subscriptions, this value is used as the
* initial value for the ack deadline, and {@link Subscriber} automatically renews unprocessed
* messages. For push delivery, this value is used to set the request timeout for the call to the
* push endpoint. This value must be between 10 and 600 seconds, if not specified, 10 seconds is
* used.
*/
public long getAckDeadlineSeconds() {
return ackDeadlineSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,10 @@ public void testBuilderInvalidArguments() {
}
try {
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setRequestByteThreshold((Long)null).build());
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setRequestByteThreshold((Long) null)
.build());
fail("Should have thrown an NullPointerException");
} catch (NullPointerException expected) {
// Expected
Expand Down Expand Up @@ -454,7 +457,10 @@ public void testBuilderInvalidArguments() {
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold(1).build());
try {
builder.setBundlingSettings(
Publisher.DEFAULT_BUNDLING_SETTINGS.toBuilder().setElementCountThreshold((Long)null).build());
Publisher.DEFAULT_BUNDLING_SETTINGS
.toBuilder()
.setElementCountThreshold((Long) null)
.build());
fail("Should have thrown an NullPointerException");
} catch (NullPointerException expected) {
// Expected
Expand Down

0 comments on commit 0559a75

Please sign in to comment.