Skip to content

Commit

Permalink
Add base class for operation options, javadoc and tests (#996)
Browse files Browse the repository at this point in the history
* Add base class for operation options, javadoc and tests

* Refactor PullOption
- Make maxMessages a method parameter rather than an optional option
- Move MessageConsumer.PullOption to PubSub
- Remove MessageConsumer.start/stop methods in favor of close()
  • Loading branch information
mziccard committed May 11, 2016
1 parent 9879a16 commit cd848c0
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;

import java.io.Serializable;
import java.util.Objects;

/**
* Base class for Pub/Sub operation options.
*/
abstract class Option implements Serializable {

private static final long serialVersionUID = 4956295408130172192L;

private final OptionType optionType;
private final Object value;

interface OptionType {

String name();
}

Option(OptionType optionType, Object value) {
this.optionType = checkNotNull(optionType);
this.value = value;
}

@SuppressWarnings("unchecked")
<T extends OptionType> T optionType() {
return (T) optionType;
}

Object value() {
return value;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof Option)) {
return false;
}
Option other = (Option) obj;
return Objects.equals(optionType, other.optionType)
&& Objects.equals(value, other.value);
}

@Override
public int hashCode() {
return Objects.hash(optionType, value);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("name", optionType.name())
.add("value", value)
.toString();
}
}
122 changes: 52 additions & 70 deletions gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.google.cloud.Page;
import com.google.cloud.Service;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand All @@ -33,65 +33,79 @@
*/
public interface PubSub extends Service<PubSubOptions> {

final class ListOption implements Serializable {
/**
* Class for specifying options for listing topics and subscriptions.
*/
final class ListOption extends Option {

private static final long serialVersionUID = 6517442127283383124L;

private final Option option;
private final Object value;
enum OptionType implements Option.OptionType {
PAGE_SIZE, PAGE_TOKEN;

enum Option {
PAGE_SIZE, PAGE_TOKEN
}
@SuppressWarnings("unchecked")
<T> T get(Map<Option.OptionType, ?> options) {
return (T) options.get(this);
}

private ListOption(Option option, Object value) {
this.option = option;
this.value = value;
}
String getString(Map<Option.OptionType, ?> options) {
return get(options);
}

Option option() {
return option;
Integer getInteger(Map<Option.OptionType, ?> options) {
return get(options);
}
}

Object value() {
return value;
private ListOption(OptionType option, Object value) {
super(option, value);
}

/**
* Returns an option to specify the maximum number of resources returned per page.
*/
public static ListOption pageSize(int pageSize) {
return new ListOption(Option.PAGE_SIZE, pageSize);
return new ListOption(OptionType.PAGE_SIZE, pageSize);
}

/**
* Returns an option to specify the page token from which to start listing resources.
*/
public static ListOption pageToken(String pageToken) {
return new ListOption(Option.PAGE_TOKEN, pageToken);
return new ListOption(OptionType.PAGE_TOKEN, pageToken);
}
}

final class PullOption implements Serializable {

private static final long serialVersionUID = -5220474819637439937L;
/**
* Class for specifying options for pulling messages.
*/
final class PullOption extends Option {

private final Option option;
private final Object value;
private static final long serialVersionUID = 4792164134340316582L;

enum Option {
MAX_MESSAGES
}
enum OptionType implements Option.OptionType {
MAX_CONCURRENT_CALLBACKS;

private PullOption(Option option, Object value) {
this.option = option;
this.value = value;
}
@SuppressWarnings("unchecked")
<T> T get(Map<Option.OptionType, ?> options) {
return (T) options.get(this);
}

Option option() {
return option;
Integer getInteger(Map<Option.OptionType, ?> options) {
return get(options);
}
}

Object value() {
return value;
private PullOption(Option.OptionType option, Object value) {
super(option, value);
}

public static PullOption maxMessages(int maxMessages) {
return new PullOption(Option.MAX_MESSAGES, maxMessages);
/**
* Returns an option to specify the maximum number of messages that can be executed
* concurrently at any time.
*/
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
}
}

Expand All @@ -108,38 +122,6 @@ interface MessageProcessor {
*/
interface MessageConsumer extends AutoCloseable {

final class PullOption implements Serializable {

private static final long serialVersionUID = 4792164134340316582L;

private final Option option;
private final Object value;

enum Option {
MAX_CONCURRENT_CALLBACKS
}

private PullOption(Option option, Object value) {
this.option = option;
this.value = value;
}

Option option() {
return option;
}

Object value() {
return value;
}

public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
return new PullOption(Option.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
}
}

void start(MessageConsumer.PullOption... options);

void stop();
}

Topic create(TopicInfo topic);
Expand Down Expand Up @@ -198,11 +180,11 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) {

Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);

Iterator<ReceivedMessage> pull(String subscription, PullOption... options);
Iterator<ReceivedMessage> pull(String subscription, int maxMessages);

Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options);
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);

MessageConsumer pullAsync(String subscription, MessageProcessor callback);
MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);

void ack(String subscription, String ackId, String... ackIds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,13 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
}

@Override
public Iterator<ReceivedMessage> pull(String subscription, PullOption... options) {
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
// this should set return_immediately to true
return null;
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options) {
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
// though this method can set return_immediately to false (as future can be canceled) I
// suggest to keep it false so sync could delegate to asyc and use the same options
// this method also should use the VTKIT thread-pool to renew ack deadline for non consumed
Expand All @@ -211,7 +211,8 @@ public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOpti
}

@Override
public MessageConsumer pullAsync(String subscription, MessageProcessor callback) {
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
PullOption... options) {
// this method should use the VTKIT thread-pool (maybe getting it should be part of the spi)
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ public Future<Void> replacePushConfigAsync(PushConfig pushConfig) {
return pubsub.replacePushConfigAsync(name(), pushConfig);
}

public Iterator<ReceivedMessage> pull(PullOption... options) {
return pubsub.pull(name(), options);
public Iterator<ReceivedMessage> pull(int maxMessages) {
return pubsub.pull(name(), maxMessages);
}

public Future<Iterator<ReceivedMessage>> pullAsync(PullOption... options) {
return pubsub.pullAsync(name(), options);
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
return pubsub.pullAsync(name(), maxMessages);
}

public MessageConsumer pullAsync(MessageProcessor callback) {
return pubsub.pullAsync(name(), callback);
public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
return pubsub.pullAsync(name(), callback, options);
}

private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;

import com.google.cloud.pubsub.Option.OptionType;
import com.google.cloud.pubsub.PubSub.ListOption;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class OptionTest {

private static final OptionType OPTION_TYPE = ListOption.OptionType.PAGE_SIZE;
private static final OptionType ANOTHER_OPTION_TYPE = ListOption.OptionType.PAGE_TOKEN;
private static final String VALUE = "some value";
private static final String OTHER_VALUE = "another value";
private static final Option OPTION = new Option(OPTION_TYPE, VALUE) {};
private static final Option OPTION_EQUALS = new Option(OPTION_TYPE, VALUE) {};
private static final Option OPTION_NOT_EQUALS1 = new Option(ANOTHER_OPTION_TYPE, OTHER_VALUE) {};
private static final Option OPTION_NOT_EQUALS2 = new Option(ANOTHER_OPTION_TYPE, VALUE) {};

@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void testEquals() {
assertEquals(OPTION, OPTION_EQUALS);
assertNotEquals(OPTION, OPTION_NOT_EQUALS1);
assertNotEquals(OPTION, OPTION_NOT_EQUALS2);
}

@Test
public void testHashCode() {
assertEquals(OPTION.hashCode(), OPTION_EQUALS.hashCode());
}

@Test
public void testConstructor() {
assertEquals(OPTION_TYPE, OPTION.optionType());
assertEquals(VALUE, OPTION.value());
Option option = new Option(OPTION_TYPE, null) {};
assertEquals(OPTION_TYPE, option.optionType());
assertNull(option.value());
thrown.expect(NullPointerException.class);
new Option(null, VALUE) {};
}
}
Loading

0 comments on commit cd848c0

Please sign in to comment.