Skip to content

Commit

Permalink
create FlowControlSettings
Browse files Browse the repository at this point in the history
FlowController uses it instead of accepting Optionals.
  • Loading branch information
pongad committed Jan 5, 2017
1 parent 6a4bc9d commit a9319fa
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ class FlowController {
private final Optional<Integer> maxOutstandingMessages;
private final Optional<Integer> maxOutstandingBytes;

FlowController(
Optional<Integer> maxOutstandingMessages,
Optional<Integer> maxOutstandingBytes,
boolean failOnFlowControlLimits) {
this.maxOutstandingMessages = Preconditions.checkNotNull(maxOutstandingMessages);
this.maxOutstandingBytes = Preconditions.checkNotNull(maxOutstandingBytes);
FlowController(PubSub.FlowControlSettings settings, boolean failOnFlowControlLimits) {
this.maxOutstandingMessages = settings.getMaxOutstandingMessages();
this.maxOutstandingBytes = settings.getMaxOutstandingBytes();
outstandingMessageCount =
maxOutstandingMessages.isPresent() ? new Semaphore(maxOutstandingMessages.get()) : null;
outstandingByteCount =
Expand All @@ -47,7 +44,7 @@ class FlowController {

void reserve(int messages, int bytes) throws CloudPubsubFlowControlException {
Preconditions.checkArgument(messages > 0);

if (outstandingMessageCount != null) {
if (!failOnLimits) {
outstandingMessageCount.acquireUninterruptibly(messages);
Expand All @@ -70,7 +67,7 @@ void reserve(int messages, int bytes) throws CloudPubsubFlowControlException {

void release(int messages, int bytes) {
Preconditions.checkArgument(messages > 0);

if (outstandingMessageCount != null) {
outstandingMessageCount.release(messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package com.google.cloud.pubsub;

import com.google.auto.value.AutoValue;
import com.google.cloud.AsyncPage;
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
import com.google.cloud.Page;
import com.google.cloud.Policy;
import com.google.cloud.Service;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -137,6 +140,49 @@ public static PullOption executorFactory(ExecutorFactory executorFactory) {
}
}

@AutoValue
public abstract static class FlowControlSettings {
static FlowControlSettings DEFAULT =
newBuilder()
.setMaxOutstandingBytes(Optional.<Integer>absent())
.setMaxOutstandingMessages(Optional.<Integer>absent())
.build();

/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
abstract Optional<Integer> getMaxOutstandingMessages();

/** Maximum number of outstanding bytes to keep in memory before enforcing flow control. */
abstract Optional<Integer> getMaxOutstandingBytes();

Builder toBuilder() {
return new AutoValue_PubSub_FlowControlSettings.Builder(this);
}

static Builder newBuilder() {
return new AutoValue_PubSub_FlowControlSettings.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
abstract Builder setMaxOutstandingMessages(Optional<Integer> value);

abstract Builder setMaxOutstandingBytes(Optional<Integer> value);

abstract FlowControlSettings autoBuild();

FlowControlSettings build() {
FlowControlSettings settings = autoBuild();
Preconditions.checkArgument(
settings.getMaxOutstandingMessages().or(1) > 0,
"maxOutstandingMessages limit is disabled by default, but if set it must be set to a value greater than 0.");
Preconditions.checkArgument(
settings.getMaxOutstandingBytes().or(1) > 0,
"maxOutstandingBytes limit is disabled by default, but if set it must be set to a value greater than 0.");
return settings;
}
}
}

/**
* Creates a new topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,24 @@ final class Builder {
String topic;

// Bundling options
BundlingSettings bundlingSettings;
BundlingSettings bundlingSettings = DEFAULT_BUNDLING_SETTINGS;

// Client-side flow control options
Optional<Integer> maxOutstandingMessages;
Optional<Integer> maxOutstandingBytes;
boolean failOnFlowControlLimits;
PubSub.FlowControlSettings flowControlSettings = PubSub.FlowControlSettings.DEFAULT;
boolean failOnFlowControlLimits = false;

// Send bundle deadline
Duration sendBundleDeadline;
Duration sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;

// RPC options
Duration requestTimeout;
Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;

// Channels and credentials
Optional<Credentials> userCredentials;
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder;
Optional<Credentials> userCredentials = Optional.absent();
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
Optional.absent();

Optional<ScheduledExecutorService> executor;
Optional<ScheduledExecutorService> executor = Optional.absent();

/** Constructs a new {@link Builder} using the given topic. */
public static Builder newBuilder(String topic) {
Expand All @@ -190,19 +190,6 @@ public static Builder newBuilder(String topic) {

Builder(String topic) {
this.topic = Preconditions.checkNotNull(topic);
setDefaults();
}

private void setDefaults() {
userCredentials = Optional.absent();
channelBuilder = Optional.absent();
maxOutstandingMessages = Optional.absent();
maxOutstandingBytes = Optional.absent();
bundlingSettings = DEFAULT_BUNDLING_SETTINGS;
requestTimeout = DEFAULT_REQUEST_TIMEOUT;
sendBundleDeadline = MIN_SEND_BUNDLE_DURATION;
failOnFlowControlLimits = false;
executor = Optional.absent();
}

/**
Expand Down Expand Up @@ -254,17 +241,9 @@ public Builder setBundlingSettings(BundlingSettings bundlingSettings) {

// Flow control options

/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
public Builder setMaxOutstandingMessages(int messages) {
Preconditions.checkArgument(messages > 0);
maxOutstandingMessages = Optional.of(messages);
return this;
}

/** Maximum number of outstanding messages to keep in memory before enforcing flow control. */
public Builder setMaxOutstandingBytes(int bytes) {
Preconditions.checkArgument(bytes > 0);
maxOutstandingBytes = Optional.of(bytes);
/** Sets the flow control settings. */
public Builder setFlowControlSettings(PubSub.FlowControlSettings flowControlSettings) {
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ final class PublisherImpl implements Publisher {
private final Duration maxBundleDuration;
private final boolean hasBundlingBytes;

private final Optional<Integer> maxOutstandingMessages;
private final Optional<Integer> maxOutstandingBytes;
private final PubSub.FlowControlSettings flowControlSettings;
private final boolean failOnFlowControlLimits;

private final Lock messagesBundleLock;
Expand Down Expand Up @@ -98,11 +97,9 @@ final class PublisherImpl implements Publisher {
maxBundleDuration = builder.bundlingSettings.getDelayThreshold();
hasBundlingBytes = maxBundleBytes > 0;

maxOutstandingMessages = builder.maxOutstandingMessages;
maxOutstandingBytes = builder.maxOutstandingBytes;
flowControlSettings = builder.flowControlSettings;
failOnFlowControlLimits = builder.failOnFlowControlLimits;
this.flowController =
new FlowController(maxOutstandingMessages, maxOutstandingBytes, failOnFlowControlLimits);
this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits);

sendBundleDeadline = builder.sendBundleDeadline;

Expand Down Expand Up @@ -166,12 +163,12 @@ public long getMaxBundleMessages() {

@Override
public Optional<Integer> getMaxOutstandingMessages() {
return maxOutstandingMessages;
return flowControlSettings.getMaxOutstandingMessages();
}

@Override
public Optional<Integer> getMaxOutstandingBytes() {
return maxOutstandingBytes;
return flowControlSettings.getMaxOutstandingBytes();
}

@Override
Expand All @@ -181,12 +178,12 @@ public boolean failOnFlowControlLimits() {

/** Whether flow control kicks in on a per outstanding messages basis. */
boolean isPerMessageEnforced() {
return maxOutstandingMessages.isPresent();
return getMaxOutstandingMessages().isPresent();
}

/** Whether flow control kicks in on a per outstanding bytes basis. */
boolean isPerBytesEnforced() {
return maxOutstandingBytes.isPresent();
return getMaxOutstandingBytes().isPresent();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,17 @@ final class Builder {
private static final Duration DEFAULT_ACK_EXPIRATION_PADDING = Duration.millis(500);

String subscription;
Optional<Credentials> credentials;
Optional<Credentials> credentials = Optional.absent();
MessageReceiver receiver;

Duration ackExpirationPadding;
Duration ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;

Optional<Integer> maxOutstandingMessages;
Optional<Integer> maxOutstandingBytes;
PubSub.FlowControlSettings flowControlSettings = PubSub.FlowControlSettings.DEFAULT;

Optional<ScheduledExecutorService> executor;
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder;
Optional<Clock> clock;
Optional<ScheduledExecutorService> executor = Optional.absent();
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
Optional.absent();
Optional<Clock> clock = Optional.absent();

/**
* Constructs a new {@link Builder}.
Expand All @@ -170,21 +170,10 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver)
}

Builder(String subscription, MessageReceiver receiver) {
setDefaults();
this.subscription = subscription;
this.receiver = receiver;
}

private void setDefaults() {
credentials = Optional.absent();
channelBuilder = Optional.absent();
ackExpirationPadding = DEFAULT_ACK_EXPIRATION_PADDING;
maxOutstandingBytes = Optional.absent();
maxOutstandingMessages = Optional.absent();
executor = Optional.absent();
clock = Optional.absent();
}

/**
* Credentials to authenticate with.
*
Expand All @@ -208,33 +197,9 @@ public Builder setChannelBuilder(
return this;
}

/**
* Sets the maximum number of outstanding messages; messages delivered to the {@link
* MessageReceiver} that have not been acknowledged or rejected.
*
* @param maxOutstandingMessages must be greater than 0
*/
public Builder setMaxOutstandingMessages(int maxOutstandingMessages) {
Preconditions.checkArgument(
maxOutstandingMessages > 0,
"maxOutstandingMessages limit is disabled by default, but if set it must be set to a "
+ "value greater to 0.");
this.maxOutstandingMessages = Optional.of(maxOutstandingMessages);
return this;
}

/**
* Sets the maximum number of outstanding bytes; bytes delivered to the {@link MessageReceiver}
* that have not been acknowledged or rejected.
*
* @param maxOutstandingBytes must be greater than 0
*/
public Builder setMaxOutstandingBytes(int maxOutstandingBytes) {
Preconditions.checkArgument(
maxOutstandingBytes > 0,
"maxOutstandingBytes limit is disabled by default, but if set it must be set to a value "
+ "greater than 0.");
this.maxOutstandingBytes = Optional.of(maxOutstandingBytes);
/** Sets the flow control settings. */
public Builder setFlowControlSettings(PubSub.FlowControlSettings flowControlSettings) {
this.flowControlSettings = Preconditions.checkNotNull(flowControlSettings);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class SubscriberImpl extends AbstractService implements Subscriber {
private static final Logger logger = LoggerFactory.getLogger(SubscriberImpl.class);

private final String subscription;
private final Optional<Integer> maxOutstandingBytes;
private final Optional<Integer> maxOutstandingMessages;
private final PubSub.FlowControlSettings flowControlSettings;
private final Duration ackExpirationPadding;
private final ScheduledExecutorService executor;
private final Distribution ackLatencyDistribution =
Expand All @@ -78,8 +77,7 @@ class SubscriberImpl extends AbstractService implements Subscriber {

public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
receiver = builder.receiver;
maxOutstandingBytes = builder.maxOutstandingBytes;
maxOutstandingMessages = builder.maxOutstandingMessages;
flowControlSettings = builder.flowControlSettings;
subscription = builder.subscription;
ackExpirationPadding = builder.ackExpirationPadding;
streamAckDeadlineSeconds =
Expand All @@ -88,8 +86,7 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
Ints.saturatedCast(ackExpirationPadding.getStandardSeconds()));
clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock();

flowController =
new FlowController(builder.maxOutstandingBytes, builder.maxOutstandingBytes, false);
flowController = new FlowController(builder.flowControlSettings, false);

numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
executor =
Expand Down Expand Up @@ -316,11 +313,11 @@ public Duration getAckExpirationPadding() {

@Override
public Optional<Integer> getMaxOutstandingMessages() {
return maxOutstandingMessages;
return flowControlSettings.getMaxOutstandingMessages();
}

@Override
public Optional<Integer> getMaxOutstandingBytes() {
return maxOutstandingBytes;
return flowControlSettings.getMaxOutstandingBytes();
}
}
Loading

0 comments on commit a9319fa

Please sign in to comment.