Skip to content

Commit

Permalink
Add TopicId and SubscriptionId classes (#984)
Browse files Browse the repository at this point in the history
* Add TopicId class and tests. Use TopicId in SubscriptionInfo

* Add SubscriptionId class and tests. Use SubscriptionId in listSubscriptions(topic)

* Minor javadoc fixes

* Add identity for deleted topics
  • Loading branch information
mziccard committed May 10, 2016
1 parent 6bc34ff commit 9879a16
Show file tree
Hide file tree
Showing 13 changed files with 513 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@
* for (T value : page.values()) {
* // do something with value
* }
* page = page.nextPage().get();
* page = page.nextPageAsync().get();
* }}</pre>
*
* @param <T> the value type that the page holds
*/
public interface AsyncPage<T> extends Page<T> {

/**
* Returns a {@link Future} object for the next page.
* Returns a {@link Future} object for the next page. {@link Future#get()} returns {@code null} if
* the last page has been reached.
*/
Future<AsyncPage<T>> nextPageAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;

import java.io.Serializable;
Expand Down Expand Up @@ -47,7 +48,7 @@ private static class SyncNextPageFetcher<T> implements PageImpl.NextPageFetcher<

private static final long serialVersionUID = -4124568632363525351L;

private NextPageFetcher<T> asyncPageFetcher;
private final NextPageFetcher<T> asyncPageFetcher;

private SyncNextPageFetcher(NextPageFetcher<T> asyncPageFetcher) {
this.asyncPageFetcher = asyncPageFetcher;
Expand Down Expand Up @@ -75,7 +76,7 @@ public AsyncPageImpl(NextPageFetcher<T> asyncPageFetcher, String cursor, Iterabl
@Override
public Future<AsyncPage<T>> nextPageAsync() {
if (nextPageCursor() == null || asyncPageFetcher == null) {
return null;
return Futures.immediateCheckedFuture(null);
}
return asyncPageFetcher.nextPage();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) {

Future<AsyncPage<Subscription>> listSubscriptionsAsync(ListOption... options);

Page<Subscription> listSubscriptions(String topic, ListOption... options);
Page<SubscriptionId> listSubscriptions(String topic, ListOption... options);

Future<AsyncPage<Subscription>> listSubscriptionsAsync(String topic, ListOption... options);
Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);

Iterator<ReceivedMessage> pull(String subscription, PullOption... options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,12 @@ public Future<AsyncPage<Subscription>> listSubscriptionsAsync(ListOption... opti
}

@Override
public Page<Subscription> listSubscriptions(String topic, ListOption... options) {
public Page<SubscriptionId> listSubscriptions(String topic, ListOption... options) {
return null;
}

@Override
public Future<AsyncPage<Subscription>> listSubscriptionsAsync(String topic,
public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
ListOption... options) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,20 @@ private Builder(Subscription subscription) {
}

@Override
public Builder topic(String name) {
delegate.topic(name);
public Builder topic(TopicId topic) {
delegate.topic(topic);
return this;
}

@Override
public Builder topic(String project, String topic) {
delegate.topic(project, topic);
return this;
}

@Override
public Builder topic(String topic) {
delegate.topic(topic);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsub;

import static com.google.cloud.pubsub.spi.v1.SubscriberApi.parseProjectFromSubscriptionName;
import static com.google.cloud.pubsub.spi.v1.SubscriberApi.parseSubscriptionFromSubscriptionName;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.MoreObjects;

import java.io.Serializable;
import java.util.Objects;

/**
* Identity for a Google PubSub subscription. {@code SubscriptionId} objects are returned by the
* {@link PubSub#listSubscriptions(String, PubSub.ListOption...)} and
* {@link PubSub#listSubscriptionsAsync(String, PubSub.ListOption...)} methods as a topic may have
* subscriptions from different projects.
*/
public class SubscriptionId implements Serializable {

private static final long serialVersionUID = 6507142968866856283L;

private final String project;
private final String subscription;

SubscriptionId(String project, String subscription) {
this.project = checkNotNull(project);
this.subscription = checkNotNull(subscription);
}

/**
* Returns the name of the project where the subscription resides.
*/
public String project() {
return project;
}

/**
* Returns the name of the subscription.
*/
public String subscription() {
return subscription;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("project", project)
.add("subscription", subscription).toString();
}

@Override
public final int hashCode() {
return Objects.hash(project, subscription);
}

@Override
public final boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof SubscriptionId)) {
return false;
}
SubscriptionId other = (SubscriptionId) obj;
return Objects.equals(project, other.project)
&& Objects.equals(subscription, other.subscription);
}

static SubscriptionId fromPb(String pb) {
return new SubscriptionId(parseProjectFromSubscriptionName(pb),
parseSubscriptionFromSubscriptionName(pb));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.pubsub.spi.v1.PublisherApi;
import com.google.cloud.pubsub.spi.v1.SubscriberApi;
import com.google.common.base.MoreObjects;

Expand Down Expand Up @@ -53,7 +52,7 @@ public class SubscriptionInfo implements Serializable {
private static final long serialVersionUID = 1860057426574127128L;

private final String name;
private final String topic;
private final TopicId topic;
private final PushConfig pushConfig;
private final int ackDeadlineSeconds;

Expand All @@ -72,9 +71,22 @@ public abstract static class Builder {
public abstract Builder name(String name);

/**
* Sets the name of the topic the subscription refers to.
* Sets the topic the subscription refers to, given the topic name. The topic is assumed to
* reside in the {@link PubSubOptions#projectId()} project.
*/
public abstract Builder topic(String name);
public abstract Builder topic(String topic);

/**
* Sets the topic the subscription refers to, given the project and topic names.
*/
public abstract Builder topic(String project, String topic);

/**
* Sets the topic the subscription refers to, given the topic identity. If
* {@code topic.project()} is {@code null} the topic is assumed to reside in the
* {@link PubSubOptions#projectId()} project.
*/
public abstract Builder topic(TopicId topic);

/**
* Sets the push configuration for the subscription. If set, the subscription will be in
Expand Down Expand Up @@ -104,11 +116,11 @@ public abstract static class Builder {
static final class BuilderImpl extends Builder {

private String name;
private String topic;
private TopicId topic;
private PushConfig pushConfig;
private int ackDeadlineSeconds;

private BuilderImpl(String topic, String name) {
private BuilderImpl(TopicId topic, String name) {
this.topic = checkNotNull(topic);
this.name = checkNotNull(name);
}
Expand All @@ -126,8 +138,18 @@ public Builder name(String name) {
return this;
}

@Override
public Builder topic(String project, String topic) {
return topic(TopicId.of(checkNotNull(project), topic));
}

@Override
public Builder topic(String topic) {
return topic(TopicId.of(topic));
}

@Override
public Builder topic(TopicId topic) {
this.topic = checkNotNull(topic);
return this;
}
Expand Down Expand Up @@ -158,9 +180,12 @@ public SubscriptionInfo build() {
}

/**
* Returns the name of the topic this subscription refers to.
* Returns the identity of the topic this subscription refers to. If {@link TopicId#project()} is
* {@code null} the topic is assumed to reside in the {@link PubSubOptions#projectId()} project.
* After a topic is deleted, existing subscriptions to that topic are not deleted, but their topic
* field is set to {@link TopicId#deletedTopic()}.
*/
public String topic() {
public TopicId topic() {
return topic;
}

Expand Down Expand Up @@ -231,7 +256,7 @@ public String toString() {
com.google.pubsub.v1.Subscription toPb(String projectId) {
com.google.pubsub.v1.Subscription.Builder builder =
com.google.pubsub.v1.Subscription.newBuilder();
builder.setTopic(PublisherApi.formatTopicName(projectId, topic));
builder.setTopic(topic.toPb(projectId));
builder.setName(SubscriberApi.formatSubscriptionName(projectId, name));
builder.setAckDeadlineSeconds(ackDeadlineSeconds);
if (pushConfig != null) {
Expand All @@ -241,7 +266,7 @@ com.google.pubsub.v1.Subscription toPb(String projectId) {
}

static SubscriptionInfo fromPb(com.google.pubsub.v1.Subscription subscription) {
Builder builder = builder(PublisherApi.parseTopicFromTopicName(subscription.getTopic()),
Builder builder = builder(TopicId.fromPb(subscription.getTopic()),
SubscriberApi.parseSubscriptionFromSubscriptionName(subscription.getName()));
builder.ackDeadLineSeconds(subscription.getAckDeadlineSeconds());
// A subscription with an "empty" push config is a pull subscription
Expand All @@ -261,48 +286,100 @@ public Builder toBuilder() {

/**
* Creates a pull {@code SubscriptionInfo} object given the name of the topic and the name of the
* subscription.
* subscription. The topic is assumed to reside in the {@link PubSubOptions#projectId()} project.
*
* @param topic the name of the topic the subscription refers to
* @param name the name of the subscription. The name must start with a letter, and contain only
* letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores
* ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs
* ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the
* string {@code goog}
* string {@code goog}.
*/
public static SubscriptionInfo of(String topic, String name) {
return builder(topic, name).build();
}

/**
* Creates a pull {@code SubscriptionInfo} object given the identity of the topic and the name of
* the subscription. If {@code topic.project()} is {@code null} the topic is assumed to reside in
* the {@link PubSubOptions#projectId()} project.
*
* @param topic the identity of the topic the subscription refers to
* @param name the name of the subscription. The name must start with a letter, and contain only
* letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores
* ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs
* ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the
* string {@code goog}.
*/
public static SubscriptionInfo of(TopicId topic, String name) {
return builder(topic, name).build();
}

/**
* Creates a push {@code SubscriptionInfo} object given the name of the topic, the name of the
* subscription and the push endpoint.
* subscription and the push endpoint. The topic is assumed to reside in the
* {@link PubSubOptions#projectId()} project.
*
* @param topic the name of the topic the subscription refers to
* @param name the name of the subscription. The name must start with a letter, and contain only
* letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores
* ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs
* ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the
* string {@code goog}
* string {@code goog}.
* @param endpoint a URL locating the endpoint to which messages should be pushed. For example,
* an endpoint might use {@code https://example.com/push}.
*/
public static SubscriptionInfo of(String topic, String name, String endpoint) {
return builder(topic, name).pushConfig(PushConfig.of(endpoint)).build();
}

/**
* Creates a push {@code SubscriptionInfo} object given the identity of the topic, the name of the
* subscription and the push endpoint. If {@code topic.project()} is {@code null} the topic is
* assumed to reside in the {@link PubSubOptions#projectId()} project.
*
* @param topic the identity of the topic the subscription refers to
* @param name the name of the subscription. The name must start with a letter, and contain only
* letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores
* ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs
* ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the
* string {@code goog}.
* @param endpoint a URL locating the endpoint to which messages should be pushed. For example,
* an endpoint might use {@code https://example.com/push}.
*/
public static SubscriptionInfo of(TopicId topic, String name, String endpoint) {
return builder(topic, name).pushConfig(PushConfig.of(endpoint)).build();
}

/**
* Creates a builder for {@code SubscriptionInfo} objects given the name of the topic and the name
* of the subscription.
* of the subscription. The topic is assumed to reside in the {@link PubSubOptions#projectId()}
* project.
*
* @param topic the name of the topic the subscription refers to
* @param name the name of the subscription. The name must start with a letter, and contain only
* letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores
* ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs
* ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the
* string {@code goog}
* string {@code goog}.
*/
public static Builder builder(String topic, String name) {
return builder(TopicId.of(topic), name);
}

/**
* Creates a builder for {@code SubscriptionInfo} objects given the identity of the topic and the
* name of the subscription. If {@code topic.project()} is {@code null} the topic is assumed to
* reside in the {@link PubSubOptions#projectId()} project.
*
* @param topic the identity of the topic the subscription refers to
* @param name the name of the subscription. The name must start with a letter, and contain only
* letters ({@code [A-Za-z]}), numbers ({@code [0-9]}), dashes ({@code -}), underscores
* ({@code _}), periods ({@code .}), tildes ({@code ~}), plus ({@code +}) or percent signs
* ({@code %}). It must be between 3 and 255 characters in length and cannot begin with the
* string {@code goog}.
*/
public static Builder builder(TopicId topic, String name) {
return new BuilderImpl(topic, name);
}
}
Loading

0 comments on commit 9879a16

Please sign in to comment.