diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Option.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Option.java new file mode 100644 index 000000000000..5359d1797f55 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Option.java @@ -0,0 +1,77 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; + +import java.io.Serializable; +import java.util.Objects; + +/** + * Base class for Pub/Sub operation options. + */ +abstract class Option implements Serializable { + + private static final long serialVersionUID = 4956295408130172192L; + + private final OptionType optionType; + private final Object value; + + interface OptionType { + + String name(); + } + + Option(OptionType optionType, Object value) { + this.optionType = checkNotNull(optionType); + this.value = value; + } + + @SuppressWarnings("unchecked") + T optionType() { + return (T) optionType; + } + + Object value() { + return value; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Option)) { + return false; + } + Option other = (Option) obj; + return Objects.equals(optionType, other.optionType) + && Objects.equals(value, other.value); + } + + @Override + public int hashCode() { + return Objects.hash(optionType, value); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("name", optionType.name()) + .add("value", value) + .toString(); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 3410d3f54a3e..6fde6f4425df 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -20,9 +20,9 @@ import com.google.cloud.Page; import com.google.cloud.Service; -import java.io.Serializable; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -33,65 +33,79 @@ */ public interface PubSub extends Service { - final class ListOption implements Serializable { + /** + * Class for specifying options for listing topics and subscriptions. + */ + final class ListOption extends Option { private static final long serialVersionUID = 6517442127283383124L; - private final Option option; - private final Object value; + enum OptionType implements Option.OptionType { + PAGE_SIZE, PAGE_TOKEN; - enum Option { - PAGE_SIZE, PAGE_TOKEN - } + @SuppressWarnings("unchecked") + T get(Map options) { + return (T) options.get(this); + } - private ListOption(Option option, Object value) { - this.option = option; - this.value = value; - } + String getString(Map options) { + return get(options); + } - Option option() { - return option; + Integer getInteger(Map options) { + return get(options); + } } - Object value() { - return value; + private ListOption(OptionType option, Object value) { + super(option, value); } + /** + * Returns an option to specify the maximum number of resources returned per page. + */ public static ListOption pageSize(int pageSize) { - return new ListOption(Option.PAGE_SIZE, pageSize); + return new ListOption(OptionType.PAGE_SIZE, pageSize); } + /** + * Returns an option to specify the page token from which to start listing resources. + */ public static ListOption pageToken(String pageToken) { - return new ListOption(Option.PAGE_TOKEN, pageToken); + return new ListOption(OptionType.PAGE_TOKEN, pageToken); } } - final class PullOption implements Serializable { - - private static final long serialVersionUID = -5220474819637439937L; + /** + * Class for specifying options for pulling messages. + */ + final class PullOption extends Option { - private final Option option; - private final Object value; + private static final long serialVersionUID = 4792164134340316582L; - enum Option { - MAX_MESSAGES - } + enum OptionType implements Option.OptionType { + MAX_CONCURRENT_CALLBACKS; - private PullOption(Option option, Object value) { - this.option = option; - this.value = value; - } + @SuppressWarnings("unchecked") + T get(Map options) { + return (T) options.get(this); + } - Option option() { - return option; + Integer getInteger(Map options) { + return get(options); + } } - Object value() { - return value; + private PullOption(Option.OptionType option, Object value) { + super(option, value); } - public static PullOption maxMessages(int maxMessages) { - return new PullOption(Option.MAX_MESSAGES, maxMessages); + /** + * Returns an option to specify the maximum number of messages that can be executed + * concurrently at any time. + */ + public static PullOption maxConcurrentCallbacks(int maxConcurrency) { + return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency); } } @@ -108,38 +122,6 @@ interface MessageProcessor { */ interface MessageConsumer extends AutoCloseable { - final class PullOption implements Serializable { - - private static final long serialVersionUID = 4792164134340316582L; - - private final Option option; - private final Object value; - - enum Option { - MAX_CONCURRENT_CALLBACKS - } - - private PullOption(Option option, Object value) { - this.option = option; - this.value = value; - } - - Option option() { - return option; - } - - Object value() { - return value; - } - - public static PullOption maxConcurrentCallbacks(int maxConcurrency) { - return new PullOption(Option.MAX_CONCURRENT_CALLBACKS, maxConcurrency); - } - } - - void start(MessageConsumer.PullOption... options); - - void stop(); } Topic create(TopicInfo topic); @@ -198,11 +180,11 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) { Future> listSubscriptionsAsync(String topic, ListOption... options); - Iterator pull(String subscription, PullOption... options); + Iterator pull(String subscription, int maxMessages); - Future> pullAsync(String subscription, PullOption... options); + Future> pullAsync(String subscription, int maxMessages); - MessageConsumer pullAsync(String subscription, MessageProcessor callback); + MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options); void ack(String subscription, String ackId, String... ackIds); diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 17a99842259f..19b2e5a35fec 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -196,13 +196,13 @@ public Future> listSubscriptionsAsync(String topic, } @Override - public Iterator pull(String subscription, PullOption... options) { + public Iterator pull(String subscription, int maxMessages) { // this should set return_immediately to true return null; } @Override - public Future> pullAsync(String subscription, PullOption... options) { + public Future> pullAsync(String subscription, int maxMessages) { // though this method can set return_immediately to false (as future can be canceled) I // suggest to keep it false so sync could delegate to asyc and use the same options // this method also should use the VTKIT thread-pool to renew ack deadline for non consumed @@ -211,7 +211,8 @@ public Future> pullAsync(String subscription, PullOpti } @Override - public MessageConsumer pullAsync(String subscription, MessageProcessor callback) { + public MessageConsumer pullAsync(String subscription, MessageProcessor callback, + PullOption... options) { // this method should use the VTKIT thread-pool (maybe getting it should be part of the spi) return null; } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java index 11efb5971aa2..7dd203a73348 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java @@ -151,16 +151,16 @@ public Future replacePushConfigAsync(PushConfig pushConfig) { return pubsub.replacePushConfigAsync(name(), pushConfig); } - public Iterator pull(PullOption... options) { - return pubsub.pull(name(), options); + public Iterator pull(int maxMessages) { + return pubsub.pull(name(), maxMessages); } - public Future> pullAsync(PullOption... options) { - return pubsub.pullAsync(name(), options); + public Future> pullAsync(int maxMessages) { + return pubsub.pullAsync(name(), maxMessages); } - public MessageConsumer pullAsync(MessageProcessor callback) { - return pubsub.pullAsync(name(), callback); + public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) { + return pubsub.pullAsync(name(), callback, options); } private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/OptionTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/OptionTest.java new file mode 100644 index 000000000000..119e64e24c3a --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/OptionTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; + +import com.google.cloud.pubsub.Option.OptionType; +import com.google.cloud.pubsub.PubSub.ListOption; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class OptionTest { + + private static final OptionType OPTION_TYPE = ListOption.OptionType.PAGE_SIZE; + private static final OptionType ANOTHER_OPTION_TYPE = ListOption.OptionType.PAGE_TOKEN; + private static final String VALUE = "some value"; + private static final String OTHER_VALUE = "another value"; + private static final Option OPTION = new Option(OPTION_TYPE, VALUE) {}; + private static final Option OPTION_EQUALS = new Option(OPTION_TYPE, VALUE) {}; + private static final Option OPTION_NOT_EQUALS1 = new Option(ANOTHER_OPTION_TYPE, OTHER_VALUE) {}; + private static final Option OPTION_NOT_EQUALS2 = new Option(ANOTHER_OPTION_TYPE, VALUE) {}; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testEquals() { + assertEquals(OPTION, OPTION_EQUALS); + assertNotEquals(OPTION, OPTION_NOT_EQUALS1); + assertNotEquals(OPTION, OPTION_NOT_EQUALS2); + } + + @Test + public void testHashCode() { + assertEquals(OPTION.hashCode(), OPTION_EQUALS.hashCode()); + } + + @Test + public void testConstructor() { + assertEquals(OPTION_TYPE, OPTION.optionType()); + assertEquals(VALUE, OPTION.value()); + Option option = new Option(OPTION_TYPE, null) {}; + assertEquals(OPTION_TYPE, option.optionType()); + assertNull(option.value()); + thrown.expect(NullPointerException.class); + new Option(null, VALUE) {}; + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java new file mode 100644 index 000000000000..620e737111f7 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java @@ -0,0 +1,50 @@ +/* + * Copyright 2016 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.pubsub.PubSub.ListOption; +import com.google.cloud.pubsub.PubSub.PullOption; + +import org.junit.Test; + +public class PubSubTest { + + private static final int PAGE_SIZE = 42; + private static final String PAGE_TOKEN = "page token"; + private static final int MAX_CONCURRENT_CALLBACKS = 42; + + @Test + public void testListOption() { + // page token + ListOption listOption = ListOption.pageToken(PAGE_TOKEN); + assertEquals(PAGE_TOKEN, listOption.value()); + assertEquals(ListOption.OptionType.PAGE_TOKEN, listOption.optionType()); + // page size + listOption = ListOption.pageSize(PAGE_SIZE); + assertEquals(PAGE_SIZE, listOption.value()); + assertEquals(ListOption.OptionType.PAGE_SIZE, listOption.optionType()); + } + + @Test + public void testPullOptions() { + PullOption pullOption = PullOption.maxConcurrentCallbacks(MAX_CONCURRENT_CALLBACKS); + assertEquals(MAX_CONCURRENT_CALLBACKS, pullOption.value()); + assertEquals(PullOption.OptionType.MAX_CONCURRENT_CALLBACKS, pullOption.optionType()); + } +}