Skip to content

Commit

Permalink
make code compile after moving FlowController to gax
Browse files Browse the repository at this point in the history
  • Loading branch information
pongad committed Jan 5, 2017
1 parent a9319fa commit e575ba7
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 469 deletions.
2 changes: 1 addition & 1 deletion google-cloud-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>0.0.25</version>
<version>0.0.27-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.google.cloud.pubsub;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.Publisher.CloudPubsubFlowControlException;
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -287,7 +287,7 @@ public void run() {
}
try {
flowController.reserve(receivedMessagesCount, totalByteCount);
} catch (CloudPubsubFlowControlException unexpectedException) {
} catch (FlowController.FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.pubsub.StatusUtil.isRetryable;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,49 +140,6 @@ public static PullOption executorFactory(ExecutorFactory executorFactory) {
}
}

@AutoValue
public abstract static class FlowControlSettings {
static FlowControlSettings DEFAULT =
newBuilder()
.setMaxOutstandingBytes(Optional.<Integer>absent())
.setMaxOutstandingMessages(Optional.<Integer>absent())
.build();

/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
abstract Optional<Integer> getMaxOutstandingMessages();

/** Maximum number of outstanding bytes to keep in memory before enforcing flow control. */
abstract Optional<Integer> getMaxOutstandingBytes();

Builder toBuilder() {
return new AutoValue_PubSub_FlowControlSettings.Builder(this);
}

static Builder newBuilder() {
return new AutoValue_PubSub_FlowControlSettings.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
abstract Builder setMaxOutstandingMessages(Optional<Integer> value);

abstract Builder setMaxOutstandingBytes(Optional<Integer> value);

abstract FlowControlSettings autoBuild();

FlowControlSettings build() {
FlowControlSettings settings = autoBuild();
Preconditions.checkArgument(
settings.getMaxOutstandingMessages().or(1) > 0,
"maxOutstandingMessages limit is disabled by default, but if set it must be set to a value greater than 0.");
Preconditions.checkArgument(
settings.getMaxOutstandingBytes().or(1) > 0,
"maxOutstandingBytes limit is disabled by default, but if set it must be set to a value greater than 0.");
return settings;
}
}
}

/**
* Creates a new topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub;

import com.google.api.gax.bundling.FlowController;
import com.google.api.gax.grpc.BundlingSettings;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
Expand Down Expand Up @@ -86,8 +87,8 @@ public interface Publisher {
int MAX_BUNDLE_BYTES = 10 * 1000 * 1000; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)

// Meaningful defaults.
int DEFAULT_MAX_BUNDLE_MESSAGES = 100;
int DEFAULT_MAX_BUNDLE_BYTES = 1000; // 1 kB
long DEFAULT_MAX_BUNDLE_MESSAGES = 100L;
long DEFAULT_MAX_BUNDLE_BYTES = 1000L; // 1 kB
Duration DEFAULT_MAX_BUNDLE_DURATION = new Duration(1); // 1ms
Duration DEFAULT_REQUEST_TIMEOUT = new Duration(10 * 1000); // 10 seconds
Duration MIN_SEND_BUNDLE_DURATION = new Duration(10 * 1000); // 10 seconds
Expand Down Expand Up @@ -129,20 +130,20 @@ public interface Publisher {
* Maximum number of outstanding (i.e. pending to publish) messages before limits are enforced.
* See {@link #failOnFlowControlLimits()}.
*/
Optional<Integer> getMaxOutstandingMessages();
Optional<Integer> getMaxOutstandingElementCount();

/**
* Maximum number of outstanding (i.e. pending to publish) bytes before limits are enforced. See
* {@link #failOnFlowControlLimits()}.
*/
Optional<Integer> getMaxOutstandingBytes();
Optional<Integer> getMaxOutstandingRequestBytes();

/**
* Whether to block publish calls when reaching flow control limits (see {@link
* #getMaxOutstandingBytes()} & {@link #getMaxOutstandingMessages()}).
* #getMaxOutstandingRequestBytes()} & {@link #getMaxOutstandingElementCount()}).
*
* <p>If set to false, a publish call will fail with either {@link
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException}, as
* RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException}, as
* appropriate, when flow control limits are reached.
*/
boolean failOnFlowControlLimits();
Expand All @@ -167,7 +168,7 @@ final class Builder {
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;

// Client-side flow control options
PubSub.FlowControlSettings flowControlSettings = PubSub.FlowControlSettings.DEFAULT;
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;
boolean failOnFlowControlLimits = false;

// Send bundle deadline
Expand Down Expand Up @@ -242,14 +243,14 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) {
// Flow control options

/** Sets the flow control settings. */
public Builder setFlowControlSettings(PubSub.FlowControlSettings flowControlSettings) {
public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) {
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
return this;
}

/**
* Whether to fail publish when reaching any of the flow control limits, with either a {@link
* MaxOutstandingBytesReachedException} or {@link MaxOutstandingMessagesReachedException} as
* RequestByteMaxOutstandingReachedException} or {@link ElementCountMaxOutstandingReachedException} as
* appropriate.
*
* <p>If set to false, then publish operations will block the current thread until the
Expand Down Expand Up @@ -285,51 +286,4 @@ public Publisher build() throws IOException {
return new PublisherImpl(this);
}
}

/** Base exception that signals a flow control state. */
abstract class CloudPubsubFlowControlException extends Exception {}

/**
* Returned as a future exception when client-side flow control is enforced based on the maximum
* number of outstanding in-memory messages.
*/
final class MaxOutstandingMessagesReachedException extends CloudPubsubFlowControlException {
private final int currentMaxMessages;

public MaxOutstandingMessagesReachedException(int currentMaxMessages) {
this.currentMaxMessages = currentMaxMessages;
}

public int getCurrentMaxBundleMessages() {
return currentMaxMessages;
}

@Override
public String toString() {
return String.format(
"The maximum number of bundle messages: %d have been reached.", currentMaxMessages);
}
}

/**
* Returned as a future exception when client-side flow control is enforced based on the maximum
* number of unacknowledged in-memory bytes.
*/
final class MaxOutstandingBytesReachedException extends CloudPubsubFlowControlException {
private final int currentMaxBytes;

public MaxOutstandingBytesReachedException(int currentMaxBytes) {
this.currentMaxBytes = currentMaxBytes;
}

public int getCurrentMaxBundleBytes() {
return currentMaxBytes;
}

@Override
public String toString() {
return String.format(
"The maximum number of bundle bytes: %d have been reached.", currentMaxBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -68,7 +69,7 @@ final class PublisherImpl implements Publisher {
private final Duration maxBundleDuration;
private final boolean hasBundlingBytes;

private final PubSub.FlowControlSettings flowControlSettings;
private final FlowController.Settings flowControlSettings;
private final boolean failOnFlowControlLimits;

private final Lock messagesBundleLock;
Expand Down Expand Up @@ -162,13 +163,13 @@ public long getMaxBundleMessages() {
}

@Override
public Optional<Integer> getMaxOutstandingMessages() {
return flowControlSettings.getMaxOutstandingMessages();
public Optional<Integer> getMaxOutstandingElementCount() {
return flowControlSettings.getMaxOutstandingElementCount();
}

@Override
public Optional<Integer> getMaxOutstandingBytes() {
return flowControlSettings.getMaxOutstandingBytes();
public Optional<Integer> getMaxOutstandingRequestBytes() {
return flowControlSettings.getMaxOutstandingRequestBytes();
}

@Override
Expand All @@ -178,12 +179,12 @@ public boolean failOnFlowControlLimits() {

/** Whether flow control kicks in on a per outstanding messages basis. */
boolean isPerMessageEnforced() {
return getMaxOutstandingMessages().isPresent();
return getMaxOutstandingElementCount().isPresent();
}

/** Whether flow control kicks in on a per outstanding bytes basis. */
boolean isPerBytesEnforced() {
return getMaxOutstandingBytes().isPresent();
return getMaxOutstandingRequestBytes().isPresent();
}

@Override
Expand All @@ -200,7 +201,7 @@ public ListenableFuture<String> publish(PubsubMessage message) {
final int messageSize = message.getSerializedSize();
try {
flowController.reserve(1, messageSize);
} catch (CloudPubsubFlowControlException e) {
} catch (FlowController.FlowControlException e) {
return Futures.immediateFailedFuture(e);
}
OutstandingBundle bundleToSend = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.pubsub.StatusUtil.isRetryable;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.MessagesProcessor.AcksProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub;

import com.google.api.gax.bundling.FlowController;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.Clock;
Expand Down Expand Up @@ -130,12 +131,12 @@ public static enum AckReply {
* MessageReceiver} but due to the gRPC and HTTP/2 buffering and congestion control window
* management, still some extra bytes could be kept at lower layers.
*/
Optional<Integer> getMaxOutstandingMessages();
Optional<Integer> getMaxOutstandingElementCount();

/**
* Maximum number of outstanding (i.e. pending to process) bytes before limits are enforced.
*/
Optional<Integer> getMaxOutstandingBytes();
Optional<Integer> getMaxOutstandingRequestBytes();

/** Builder of {@link Subscriber Subscribers}. */
final class Builder {
Expand All @@ -148,7 +149,7 @@ final class Builder {

Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;

PubSub.FlowControlSettings flowControlSettings = PubSub.FlowControlSettings.DEFAULT;
FlowController.Settings flowControlSettings = FlowController.Settings.DEFAULT;

Optional<ScheduledExecutorService> executor = Optional.absent();
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
Expand Down Expand Up @@ -198,7 +199,7 @@ public Builder setChannelBuilder(
}

/** Sets the flow control settings. */
public Builder setFlowControlSettings(PubSub.FlowControlSettings flowControlSettings) {
public Builder setFlowControlSettings(FlowController.Settings flowControlSettings) {
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
return this;
}
Expand Down
Loading

0 comments on commit e575ba7

Please sign in to comment.