diff --git a/gcloud-java-gax/pom.xml b/gcloud-java-gax/pom.xml index bbfd8c6007e5..5710df3d5bfe 100644 --- a/gcloud-java-gax/pom.xml +++ b/gcloud-java-gax/pom.xml @@ -63,7 +63,7 @@ add-source - generated/src/main + generated/src/main/java diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java index edaa0885d46f..2e563a4413d0 100644 --- a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java +++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ApiCallable.java @@ -62,8 +62,6 @@ public abstract class ApiCallable { // TODO(wrwg): Support interceptors and method/call option configurations. - // TODO(wrwg): gather more feedback whether the overload with java.util.Concurrent hurts that - // much that we want to rename this into ClientCallable or such. // Subclass Contract // ================= @@ -390,20 +388,4 @@ public ApiCallable retrying() { return new PageStreamingCallable(this, pageDescriptor); } - /** - * Returns a callable which behaves the same as {@link #pageStreaming(PageDescriptor)}, with - * the page descriptor attempted to derive from the callable descriptor. - * - * @throws IllegalArgumentException if a page descriptor is not derivable. - */ - public ApiCallable - pageStreaming(Class resourceType) { - PageDescriptor pageDescriptor = - getDescriptor() != null ? getDescriptor().getPageDescriptor(resourceType) : null; - if (pageDescriptor == null) { - throw new IllegalArgumentException(String.format( - "cannot derive page descriptor for '%s'", this)); - } - return pageStreaming(pageDescriptor); - } } diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java index 485d4794d917..eb3ca2b7a9d9 100644 --- a/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java +++ b/gcloud-java-gax/src/main/java/io/gapi/gax/grpc/ServiceApiSettings.java @@ -35,12 +35,15 @@ import io.grpc.ManagedChannel; +/** + * A settings class to configure a service api class. + */ public class ServiceApiSettings { private boolean isIdempotentRetrying; private Credentials credentials; - private String servicePath; + private String serviceAddress; private int port; private ManagedChannel channel; @@ -48,7 +51,7 @@ public class ServiceApiSettings { public ServiceApiSettings() { isIdempotentRetrying = true; credentials = null; - servicePath = null; + serviceAddress = null; port = 0; } @@ -69,7 +72,8 @@ public boolean getIsIdempotentRetrying() { /** * Sets the credentials to use in order to call the service. The default is to acquire - * the credentials using GoogleCredentials.getApplicationDefault(). + * the credentials using GoogleCredentials.getApplicationDefault(). These credentials + * will not be used if the channel is set. */ public ServiceApiSettings setCredentials(Credentials credentials) { this.credentials = credentials; @@ -81,19 +85,19 @@ public Credentials getCredentials() { } /** - * The path used to reach the service. + * The path used to reach the service. This value will not be used if the channel is set. */ - public ServiceApiSettings setServicePath(String servicePath) { - this.servicePath = servicePath; + public ServiceApiSettings setServiceAddress(String serviceAddress) { + this.serviceAddress = serviceAddress; return this; } - public String getServicePath() { - return servicePath; + public String getServiceAddress() { + return serviceAddress; } /** - * The port used to reach the service. + * The port used to reach the service. This value will not be used if the channel is set. */ public ServiceApiSettings setPort(int port) { this.port = port; @@ -105,8 +109,10 @@ public int getPort() { } /** - * An instance of ManagedChannel; shutdown will be called on this channel when - * the instance of LoggingServiceApi is shut down. + * The channel used to send requests to the service. Whichever service api class that + * this instance of ServiceApiSettings is passed to will call shutdown() on this + * channel. This injection mechanism is intended for use by unit tests to override + * the channel that would be created by default for real calls to the service. */ public ServiceApiSettings setChannel(ManagedChannel channel) { this.channel = channel; diff --git a/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java b/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java index 6673726f761e..3327db0c0211 100644 --- a/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java +++ b/gcloud-java-gax/src/main/java/io/gapi/gax/internal/ApiUtils.java @@ -31,12 +31,7 @@ package io.gapi.gax.internal; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Executors; - +import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.Lists; @@ -48,6 +43,11 @@ import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; + public class ApiUtils { // TODO(wgg): make this configurable @@ -63,17 +63,26 @@ public static ApiCallable prepareIdem } /** - * Creates a channel for the given path, address and port. + * Acquires application-default credentials, applying the given scopes if the + * credentials require scopes. + */ + public static Credentials credentialsWithScopes(String scopes[]) throws IOException { + List scopeList = Arrays.asList(scopes); + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); + if (credentials.createScopedRequired()) { + credentials = credentials.createScoped(scopeList); + } + return credentials; + } + + /** + * Creates a channel for the given address, port, and credentials. */ - public static ManagedChannel createChannel(String address, int port, Collection scopes) + public static ManagedChannel createChannel(String address, int port, Credentials credentials) throws IOException { List interceptors = Lists.newArrayList(); //TODO: MIGRATION interceptors.add(ChannelFactory.authorityInterceptor(address)); - GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); - if (credentials.createScopedRequired()) { - credentials = credentials.createScoped(scopes); - } interceptors.add(new ClientAuthInterceptor(credentials, Executors.newFixedThreadPool(AUTH_THREADS))); @@ -84,23 +93,31 @@ public static ManagedChannel createChannel(String address, int port, Collection< .build(); } - public static ServiceApiSettings settingsWithChannels(ServiceApiSettings settings, - String defaultServicePath, int defaultServicePort, String scopes[]) throws IOException { + /** + * Creates a new instance of ServiceApiSettings with all fields populated, using + * the given defaults if the corresponding values are not set on ServiceApiSettings. + */ + public static ServiceApiSettings populateSettings(ServiceApiSettings settings, + String defaultServiceAddress, int defaultServicePort, String scopes[]) throws IOException { ManagedChannel channel = settings.getChannel(); if (channel == null) { - String servicePath = defaultServicePath; - if (settings.getServicePath() != null) { - servicePath = settings.getServicePath(); + String servicePath = settings.getServiceAddress(); + if (servicePath == null) { + servicePath = defaultServiceAddress; + } + + int port = settings.getPort(); + if (port == 0) { + port = defaultServicePort; } - int port = defaultServicePort; - if (settings.getPort() != 0) { - port = settings.getPort(); + Credentials credentials = settings.getCredentials(); + if (credentials == null) { + credentials = credentialsWithScopes(scopes); } - List scopeList = Arrays.asList(scopes); - channel = ApiUtils.createChannel(servicePath, port, scopeList); + channel = ApiUtils.createChannel(servicePath, port, credentials); } return new ServiceApiSettings() diff --git a/gcloud-java-pubsub/pom.xml b/gcloud-java-pubsub/pom.xml index 3bcdd9d68f12..998adeee4827 100644 --- a/gcloud-java-pubsub/pom.xml +++ b/gcloud-java-pubsub/pom.xml @@ -32,6 +32,12 @@ auto-value 1.1 + + junit + junit + 4.12 + test + @@ -45,7 +51,7 @@ add-source - generated/src/main + generated/src/main/java diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java new file mode 100644 index 000000000000..54572afa2e6d --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/PublisherApi.java @@ -0,0 +1,522 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +/* + * EDITING INSTRUCTIONS + * This file was generated from the file + * https://github.com/google/googleapis/blob/master/google/pubsub/v1/pubsub.proto + * and updates to that file get reflected here through a refresh process. + * For the short term, the refresh process will only be runnable by Google engineers. + * Manual additions are allowed because the refresh process performs + * a 3-way merge in order to preserve those manual additions. In order to not + * break the refresh process, only certain types of modifications are + * allowed. + * + * Allowed modifications - currently these are the only types allowed: + * 1. New methods (these should be added to the end of the class) + * 2. New imports + * + * Happy editing! + */ +package com.google.gcloud.pubsub.spi; + +import com.google.protobuf.Empty; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.gapi.gax.grpc.ApiCallable; +import io.gapi.gax.grpc.PageDescriptor; +import io.gapi.gax.grpc.ServiceApiSettings; +import io.gapi.gax.internal.ApiUtils; +import io.gapi.gax.protobuf.PathTemplate; +import io.grpc.ManagedChannel; + +import java.io.IOException; +import java.util.List; + +// Manually-added imports: add custom (non-generated) imports after this point. + + + +// AUTO-GENERATED DOCUMENTATION AND SERVICE - see instructions at the top of the file for editing. +/** + * The service that an application uses to manipulate topics, and to send + * messages to a topic. + */ +@javax.annotation.Generated("by the veneer generator") +public class PublisherApi implements AutoCloseable { + + // ========= + // Constants + // ========= + + /** + * The default address of the service. + */ + public static final String SERVICE_ADDRESS = "pubsub-experimental.googleapis.com"; + + /** + * The default port of the service. + */ + public static final int DEFAULT_SERVICE_PORT = 443; + + + private static final ApiCallable + CREATE_TOPIC = ApiCallable.create(PublisherGrpc.METHOD_CREATE_TOPIC); + private static final ApiCallable + PUBLISH = ApiCallable.create(PublisherGrpc.METHOD_PUBLISH); + private static final ApiCallable + GET_TOPIC = ApiCallable.create(PublisherGrpc.METHOD_GET_TOPIC); + private static final ApiCallable + LIST_TOPICS = ApiCallable.create(PublisherGrpc.METHOD_LIST_TOPICS); + private static final ApiCallable + LIST_TOPIC_SUBSCRIPTIONS = ApiCallable.create(PublisherGrpc.METHOD_LIST_TOPIC_SUBSCRIPTIONS); + private static final ApiCallable + DELETE_TOPIC = ApiCallable.create(PublisherGrpc.METHOD_DELETE_TOPIC); + + private static PageDescriptor LIST_TOPICS_PAGE_DESC = + new PageDescriptor() { + @Override + public Object emptyToken() { + return ""; + } + @Override + public ListTopicsRequest injectToken( + ListTopicsRequest payload, Object token) { + return ListTopicsRequest + .newBuilder(payload) + .setPageToken((String) token) + .build(); + } + @Override + public Object extractNextToken(ListTopicsResponse payload) { + return payload.getNextPageToken(); + } + @Override + public Iterable extractResources(ListTopicsResponse payload) { + return payload.getTopicsList(); + } + }; + + private static PageDescriptor LIST_TOPIC_SUBSCRIPTIONS_PAGE_DESC = + new PageDescriptor() { + @Override + public Object emptyToken() { + return ""; + } + @Override + public ListTopicSubscriptionsRequest injectToken( + ListTopicSubscriptionsRequest payload, Object token) { + return ListTopicSubscriptionsRequest + .newBuilder(payload) + .setPageToken((String) token) + .build(); + } + @Override + public Object extractNextToken(ListTopicSubscriptionsResponse payload) { + return payload.getNextPageToken(); + } + @Override + public Iterable extractResources(ListTopicSubscriptionsResponse payload) { + return payload.getSubscriptionsList(); + } + }; + + private static String ALL_SCOPES[] = { + "https://www.googleapis.com/auth/pubsub" + }; + + /** + * A PathTemplate representing the fully-qualified path to represent + * a project resource. + */ + private static final PathTemplate PROJECT_PATH_TEMPLATE = + PathTemplate.create("/projects/{project}"); + + /** + * A PathTemplate representing the fully-qualified path to represent + * a topic resource. + */ + private static final PathTemplate TOPIC_PATH_TEMPLATE = + PathTemplate.create("/projects/{project}/topics/{topic}"); + + // ======== + // Members + // ======== + + private final ManagedChannel channel; + private final ServiceApiSettings settings; + + // =============== + // Factory Methods + // =============== + + /** + * Constructs an instance of PublisherApi with default settings. + */ + public static PublisherApi create() throws IOException { + return create(new ServiceApiSettings()); + } + + /** + * Constructs an instance of PublisherApi, using the given settings. The channels are created based + * on the settings passed in, or defaults for any settings that are not set. + */ + public static PublisherApi create(ServiceApiSettings settings) throws IOException { + return new PublisherApi(settings); + } + + /** + * Constructs an instance of PublisherApi, using the given settings. This is protected so that it + * easy to make a subclass, but otherwise, the static factory methods should be preferred. + */ + protected PublisherApi(ServiceApiSettings settings) throws IOException { + ServiceApiSettings internalSettings = ApiUtils.populateSettings(settings, + SERVICE_ADDRESS, DEFAULT_SERVICE_PORT, ALL_SCOPES); + this.settings = internalSettings; + this.channel = internalSettings.getChannel(); + } + + // ============================== + // Resource Name Helper Functions + // ============================== + + /** + * Creates a string containing the fully-qualified path to represent + * a project resource. + */ + public static final String createProjectPath(String project) { + return PROJECT_PATH_TEMPLATE.instantiate( + "project", project); + } + + /** + * Creates a string containing the fully-qualified path to represent + * a topic resource. + */ + public static final String createTopicPath(String project, String topic) { + return TOPIC_PATH_TEMPLATE.instantiate( + "project", project,"topic", topic); + } + + /** + * Extracts the project from the given fully-qualified path which + * represents a project resource. + */ + public static final String extractProjectFromProjectPath(String projectPath) { + return PROJECT_PATH_TEMPLATE.parse(projectPath).get("project"); + } + + /** + * Extracts the project from the given fully-qualified path which + * represents a topic resource. + */ + public static final String extractProjectFromTopicPath(String topicPath) { + return TOPIC_PATH_TEMPLATE.parse(topicPath).get("project"); + } + + /** + * Extracts the topic from the given fully-qualified path which + * represents a topic resource. + */ + public static final String extractTopicFromTopicPath(String topicPath) { + return TOPIC_PATH_TEMPLATE.parse(topicPath).get("topic"); + } + + + // ============= + // Service Calls + // ============= + + // ----- createTopic ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates the given topic with the given name. + * + * @param name The name of the topic. It must have the format + * `"projects/{project}/topics/{topic}"`. `{topic}` must start with a letter, + * and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`), + * underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent + * signs (`%`). It must be between 3 and 255 characters in length, and it + * must not start with `"goog"`. + */ + public Topic createTopic(String name) { + Topic request = + Topic.newBuilder() + .setName(name) + .build(); + + return createTopic(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates the given topic with the given name. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Topic createTopic(Topic request) { + return createTopicCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates the given topic with the given name. + */ + public ApiCallable createTopicCallable() { + return ApiUtils.prepareIdempotentCallable(CREATE_TOPIC, settings).bind(channel); + } + + // ----- publish ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Adds one or more messages to the topic. Returns NOT_FOUND if the topic does + * not exist. The message payload must not be empty; it must contain either a + * non-empty data field, or at least one attribute. + * + * @param topic The messages in the request will be published on this topic. + * @param messages The messages to publish. + */ + public PublishResponse publish(String topic, List messages) { + PublishRequest request = + PublishRequest.newBuilder() + .setTopic(topic) + .addAllMessages(messages) + .build(); + + return publish(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Adds one or more messages to the topic. Returns NOT_FOUND if the topic does + * not exist. The message payload must not be empty; it must contain either a + * non-empty data field, or at least one attribute. + * + * @param request The request object containing all of the parameters for the API call. + */ + public PublishResponse publish(PublishRequest request) { + return publishCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Adds one or more messages to the topic. Returns NOT_FOUND if the topic does + * not exist. The message payload must not be empty; it must contain either a + * non-empty data field, or at least one attribute. + */ + public ApiCallable publishCallable() { + return PUBLISH.bind(channel); + } + + // ----- getTopic ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration of a topic. + * + * @param topic The name of the topic to get. + */ + public Topic getTopic(String topic) { + GetTopicRequest request = + GetTopicRequest.newBuilder() + .setTopic(topic) + .build(); + + return getTopic(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration of a topic. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Topic getTopic(GetTopicRequest request) { + return getTopicCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration of a topic. + */ + public ApiCallable getTopicCallable() { + return ApiUtils.prepareIdempotentCallable(GET_TOPIC, settings).bind(channel); + } + + // ----- listTopics ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching topics. + */ + public Iterable listTopics(String project) { + ListTopicsRequest request = + ListTopicsRequest.newBuilder() + .setProject(project) + .build(); + return listTopics(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching topics. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Iterable listTopics(ListTopicsRequest request) { + return listTopicsStreamingCallable() + .iterableResponseStreamCall(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching topics. + */ + public ApiCallable listTopicsStreamingCallable() { + return listTopicsCallable().pageStreaming(LIST_TOPICS_PAGE_DESC); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching topics. + */ + public ApiCallable listTopicsCallable() { + return ApiUtils.prepareIdempotentCallable(LIST_TOPICS, settings).bind(channel); + } + + // ----- listTopicSubscriptions ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists the name of the subscriptions for this topic. + */ + public Iterable listTopicSubscriptions(String topic) { + ListTopicSubscriptionsRequest request = + ListTopicSubscriptionsRequest.newBuilder() + .setTopic(topic) + .build(); + return listTopicSubscriptions(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists the name of the subscriptions for this topic. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Iterable listTopicSubscriptions(ListTopicSubscriptionsRequest request) { + return listTopicSubscriptionsStreamingCallable() + .iterableResponseStreamCall(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists the name of the subscriptions for this topic. + */ + public ApiCallable listTopicSubscriptionsStreamingCallable() { + return listTopicSubscriptionsCallable().pageStreaming(LIST_TOPIC_SUBSCRIPTIONS_PAGE_DESC); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists the name of the subscriptions for this topic. + */ + public ApiCallable listTopicSubscriptionsCallable() { + return ApiUtils.prepareIdempotentCallable(LIST_TOPIC_SUBSCRIPTIONS, settings).bind(channel); + } + + // ----- deleteTopic ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes the topic with the given name. Returns NOT_FOUND if the topic does + * not exist. After a topic is deleted, a new topic may be created with the + * same name; this is an entirely new topic with none of the old + * configuration or subscriptions. Existing subscriptions to this topic are + * not deleted, but their `topic` field is set to `_deleted-topic_`. + * + * @param topic Name of the topic to delete. + */ + public void deleteTopic(String topic) { + DeleteTopicRequest request = + DeleteTopicRequest.newBuilder() + .setTopic(topic) + .build(); + + deleteTopic(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes the topic with the given name. Returns NOT_FOUND if the topic does + * not exist. After a topic is deleted, a new topic may be created with the + * same name; this is an entirely new topic with none of the old + * configuration or subscriptions. Existing subscriptions to this topic are + * not deleted, but their `topic` field is set to `_deleted-topic_`. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void deleteTopic(DeleteTopicRequest request) { + deleteTopicCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes the topic with the given name. Returns NOT_FOUND if the topic does + * not exist. After a topic is deleted, a new topic may be created with the + * same name; this is an entirely new topic with none of the old + * configuration or subscriptions. Existing subscriptions to this topic are + * not deleted, but their `topic` field is set to `_deleted-topic_`. + */ + public ApiCallable deleteTopicCallable() { + return ApiUtils.prepareIdempotentCallable(DELETE_TOPIC, settings).bind(channel); + } + + + // ======== + // Cleanup + // ======== + + /** + * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately + * cancelled. + */ + @Override + public void close() { + // Manually-added shutdown code + + // Auto-generated shutdown code + channel.shutdown(); + + // Manually-added shutdown code + } + + + // ======== + // Manually-added methods: add custom (non-generated) methods after this point. + // ======== + +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java new file mode 100644 index 000000000000..ff320597ddf7 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/SubscriberApi.java @@ -0,0 +1,681 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +/* + * EDITING INSTRUCTIONS + * This file was generated from the file + * https://github.com/google/googleapis/blob/master/google/pubsub/v1/pubsub.proto + * and updates to that file get reflected here through a refresh process. + * For the short term, the refresh process will only be runnable by Google engineers. + * Manual additions are allowed because the refresh process performs + * a 3-way merge in order to preserve those manual additions. In order to not + * break the refresh process, only certain types of modifications are + * allowed. + * + * Allowed modifications - currently these are the only types allowed: + * 1. New methods (these should be added to the end of the class) + * 2. New imports + * + * Happy editing! + */ +package com.google.gcloud.pubsub.spi; + +import com.google.protobuf.Empty; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.DeleteSubscriptionRequest; +import com.google.pubsub.v1.GetSubscriptionRequest; +import com.google.pubsub.v1.ListSubscriptionsRequest; +import com.google.pubsub.v1.ListSubscriptionsResponse; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; +import com.google.pubsub.v1.ModifyPushConfigRequest; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.SubscriberGrpc; +import com.google.pubsub.v1.Subscription; + +import io.gapi.gax.grpc.ApiCallable; +import io.gapi.gax.grpc.PageDescriptor; +import io.gapi.gax.grpc.ServiceApiSettings; +import io.gapi.gax.internal.ApiUtils; +import io.gapi.gax.protobuf.PathTemplate; +import io.grpc.ManagedChannel; + +import java.io.IOException; +import java.util.List; + +// Manually-added imports: add custom (non-generated) imports after this point. + + + +// AUTO-GENERATED DOCUMENTATION AND SERVICE - see instructions at the top of the file for editing. +/** + * The service that an application uses to manipulate subscriptions and to + * consume messages from a subscription via the Pull method. + */ +@javax.annotation.Generated("by the veneer generator") +public class SubscriberApi implements AutoCloseable { + + // ========= + // Constants + // ========= + + /** + * The default address of the service. + */ + public static final String SERVICE_ADDRESS = "pubsub-experimental.googleapis.com"; + + /** + * The default port of the service. + */ + public static final int DEFAULT_SERVICE_PORT = 443; + + + private static final ApiCallable + CREATE_SUBSCRIPTION = ApiCallable.create(SubscriberGrpc.METHOD_CREATE_SUBSCRIPTION); + private static final ApiCallable + GET_SUBSCRIPTION = ApiCallable.create(SubscriberGrpc.METHOD_GET_SUBSCRIPTION); + private static final ApiCallable + LIST_SUBSCRIPTIONS = ApiCallable.create(SubscriberGrpc.METHOD_LIST_SUBSCRIPTIONS); + private static final ApiCallable + DELETE_SUBSCRIPTION = ApiCallable.create(SubscriberGrpc.METHOD_DELETE_SUBSCRIPTION); + private static final ApiCallable + MODIFY_ACK_DEADLINE = ApiCallable.create(SubscriberGrpc.METHOD_MODIFY_ACK_DEADLINE); + private static final ApiCallable + ACKNOWLEDGE = ApiCallable.create(SubscriberGrpc.METHOD_ACKNOWLEDGE); + private static final ApiCallable + PULL = ApiCallable.create(SubscriberGrpc.METHOD_PULL); + private static final ApiCallable + MODIFY_PUSH_CONFIG = ApiCallable.create(SubscriberGrpc.METHOD_MODIFY_PUSH_CONFIG); + + private static PageDescriptor LIST_SUBSCRIPTIONS_PAGE_DESC = + new PageDescriptor() { + @Override + public Object emptyToken() { + return ""; + } + @Override + public ListSubscriptionsRequest injectToken( + ListSubscriptionsRequest payload, Object token) { + return ListSubscriptionsRequest + .newBuilder(payload) + .setPageToken((String) token) + .build(); + } + @Override + public Object extractNextToken(ListSubscriptionsResponse payload) { + return payload.getNextPageToken(); + } + @Override + public Iterable extractResources(ListSubscriptionsResponse payload) { + return payload.getSubscriptionsList(); + } + }; + + private static String ALL_SCOPES[] = { + "https://www.googleapis.com/auth/pubsub" + }; + + /** + * A PathTemplate representing the fully-qualified path to represent + * a project resource. + */ + private static final PathTemplate PROJECT_PATH_TEMPLATE = + PathTemplate.create("/projects/{project}"); + + /** + * A PathTemplate representing the fully-qualified path to represent + * a subscription resource. + */ + private static final PathTemplate SUBSCRIPTION_PATH_TEMPLATE = + PathTemplate.create("/projects/{project}/subscriptions/{subscription}"); + + // ======== + // Members + // ======== + + private final ManagedChannel channel; + private final ServiceApiSettings settings; + + // =============== + // Factory Methods + // =============== + + /** + * Constructs an instance of SubscriberApi with default settings. + */ + public static SubscriberApi create() throws IOException { + return create(new ServiceApiSettings()); + } + + /** + * Constructs an instance of SubscriberApi, using the given settings. The channels are created based + * on the settings passed in, or defaults for any settings that are not set. + */ + public static SubscriberApi create(ServiceApiSettings settings) throws IOException { + return new SubscriberApi(settings); + } + + /** + * Constructs an instance of SubscriberApi, using the given settings. This is protected so that it + * easy to make a subclass, but otherwise, the static factory methods should be preferred. + */ + protected SubscriberApi(ServiceApiSettings settings) throws IOException { + ServiceApiSettings internalSettings = ApiUtils.populateSettings(settings, + SERVICE_ADDRESS, DEFAULT_SERVICE_PORT, ALL_SCOPES); + this.settings = internalSettings; + this.channel = internalSettings.getChannel(); + } + + // ============================== + // Resource Name Helper Functions + // ============================== + + /** + * Creates a string containing the fully-qualified path to represent + * a project resource. + */ + public static final String createProjectPath(String project) { + return PROJECT_PATH_TEMPLATE.instantiate( + "project", project); + } + + /** + * Creates a string containing the fully-qualified path to represent + * a subscription resource. + */ + public static final String createSubscriptionPath(String project, String subscription) { + return SUBSCRIPTION_PATH_TEMPLATE.instantiate( + "project", project,"subscription", subscription); + } + + /** + * Extracts the project from the given fully-qualified path which + * represents a project resource. + */ + public static final String extractProjectFromProjectPath(String projectPath) { + return PROJECT_PATH_TEMPLATE.parse(projectPath).get("project"); + } + + /** + * Extracts the project from the given fully-qualified path which + * represents a subscription resource. + */ + public static final String extractProjectFromSubscriptionPath(String subscriptionPath) { + return SUBSCRIPTION_PATH_TEMPLATE.parse(subscriptionPath).get("project"); + } + + /** + * Extracts the subscription from the given fully-qualified path which + * represents a subscription resource. + */ + public static final String extractSubscriptionFromSubscriptionPath(String subscriptionPath) { + return SUBSCRIPTION_PATH_TEMPLATE.parse(subscriptionPath).get("subscription"); + } + + + // ============= + // Service Calls + // ============= + + // ----- createSubscription ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates a subscription to a given topic for a given subscriber. + * If the subscription already exists, returns ALREADY_EXISTS. + * If the corresponding topic doesn't exist, returns NOT_FOUND. + * + * If the name is not provided in the request, the server will assign a random + * name for this subscription on the same project as the topic. + * + * @param name The name of the subscription. It must have the format + * `"projects/{project}/subscriptions/{subscription}"`. `{subscription}` must + * start with a letter, and contain only letters (`[A-Za-z]`), numbers + * (`[0-9]`), dashes (`-`), underscores (`_`), periods (`.`), tildes (`~`), + * plus (`+`) or percent signs (`%`). It must be between 3 and 255 characters + * in length, and it must not start with `"goog"`. + * @param topic The name of the topic from which this subscription is receiving messages. + * The value of this field will be `_deleted-topic_` if the topic has been + * deleted. + * @param pushConfig If push delivery is used with this subscription, this field is + * used to configure it. An empty pushConfig signifies that the subscriber + * will pull and ack messages using API methods. + * @param ackDeadlineSeconds This value is the maximum time after a subscriber receives a message + * before the subscriber should acknowledge the message. After message + * delivery but before the ack deadline expires and before the message is + * acknowledged, it is an outstanding message and will not be delivered + * again during that time (on a best-effort basis). + * + * For pull delivery this value is used as the initial value for the ack + * deadline. To override this value for a given message, call + * ModifyAckDeadline with the corresponding ack_id. + * + * For push delivery, this value is also used to set the request timeout for + * the call to the push endpoint. + * + * If the subscriber never acknowledges the message, the Pub/Sub + * system will eventually redeliver the message. + * + * If this parameter is not set, the default value of 10 seconds is used. + */ + public Subscription createSubscription(String name, String topic, PushConfig pushConfig, int ackDeadlineSeconds) { + Subscription request = + Subscription.newBuilder() + .setName(name) + .setTopic(topic) + .setPushConfig(pushConfig) + .setAckDeadlineSeconds(ackDeadlineSeconds) + .build(); + + return createSubscription(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates a subscription to a given topic for a given subscriber. + * If the subscription already exists, returns ALREADY_EXISTS. + * If the corresponding topic doesn't exist, returns NOT_FOUND. + * + * If the name is not provided in the request, the server will assign a random + * name for this subscription on the same project as the topic. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Subscription createSubscription(Subscription request) { + return createSubscriptionCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Creates a subscription to a given topic for a given subscriber. + * If the subscription already exists, returns ALREADY_EXISTS. + * If the corresponding topic doesn't exist, returns NOT_FOUND. + * + * If the name is not provided in the request, the server will assign a random + * name for this subscription on the same project as the topic. + */ + public ApiCallable createSubscriptionCallable() { + return ApiUtils.prepareIdempotentCallable(CREATE_SUBSCRIPTION, settings).bind(channel); + } + + // ----- getSubscription ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration details of a subscription. + * + * @param subscription The name of the subscription to get. + */ + public Subscription getSubscription(String subscription) { + GetSubscriptionRequest request = + GetSubscriptionRequest.newBuilder() + .setSubscription(subscription) + .build(); + + return getSubscription(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration details of a subscription. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Subscription getSubscription(GetSubscriptionRequest request) { + return getSubscriptionCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Gets the configuration details of a subscription. + */ + public ApiCallable getSubscriptionCallable() { + return ApiUtils.prepareIdempotentCallable(GET_SUBSCRIPTION, settings).bind(channel); + } + + // ----- listSubscriptions ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching subscriptions. + */ + public Iterable listSubscriptions(String project) { + ListSubscriptionsRequest request = + ListSubscriptionsRequest.newBuilder() + .setProject(project) + .build(); + return listSubscriptions(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching subscriptions. + * + * @param request The request object containing all of the parameters for the API call. + */ + public Iterable listSubscriptions(ListSubscriptionsRequest request) { + return listSubscriptionsStreamingCallable() + .iterableResponseStreamCall(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching subscriptions. + */ + public ApiCallable listSubscriptionsStreamingCallable() { + return listSubscriptionsCallable().pageStreaming(LIST_SUBSCRIPTIONS_PAGE_DESC); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Lists matching subscriptions. + */ + public ApiCallable listSubscriptionsCallable() { + return ApiUtils.prepareIdempotentCallable(LIST_SUBSCRIPTIONS, settings).bind(channel); + } + + // ----- deleteSubscription ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes an existing subscription. All pending messages in the subscription + * are immediately dropped. Calls to Pull after deletion will return + * NOT_FOUND. After a subscription is deleted, a new one may be created with + * the same name, but the new one has no association with the old + * subscription, or its topic unless the same topic is specified. + * + * @param subscription The subscription to delete. + */ + public void deleteSubscription(String subscription) { + DeleteSubscriptionRequest request = + DeleteSubscriptionRequest.newBuilder() + .setSubscription(subscription) + .build(); + + deleteSubscription(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes an existing subscription. All pending messages in the subscription + * are immediately dropped. Calls to Pull after deletion will return + * NOT_FOUND. After a subscription is deleted, a new one may be created with + * the same name, but the new one has no association with the old + * subscription, or its topic unless the same topic is specified. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void deleteSubscription(DeleteSubscriptionRequest request) { + deleteSubscriptionCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Deletes an existing subscription. All pending messages in the subscription + * are immediately dropped. Calls to Pull after deletion will return + * NOT_FOUND. After a subscription is deleted, a new one may be created with + * the same name, but the new one has no association with the old + * subscription, or its topic unless the same topic is specified. + */ + public ApiCallable deleteSubscriptionCallable() { + return ApiUtils.prepareIdempotentCallable(DELETE_SUBSCRIPTION, settings).bind(channel); + } + + // ----- modifyAckDeadline ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the ack deadline for a specific message. This method is useful to + * indicate that more time is needed to process a message by the subscriber, + * or to make the message available for redelivery if the processing was + * interrupted. + * + * @param subscription The name of the subscription. + * @param ackIds List of acknowledgment IDs. + * @param ackDeadlineSeconds The new ack deadline with respect to the time this request was sent to the + * Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack + * deadline will expire 10 seconds after the ModifyAckDeadline call was made. + * Specifying zero may immediately make the message available for another pull + * request. + */ + public void modifyAckDeadline(String subscription, List ackIds, int ackDeadlineSeconds) { + ModifyAckDeadlineRequest request = + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(ackIds) + .setAckDeadlineSeconds(ackDeadlineSeconds) + .build(); + + modifyAckDeadline(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the ack deadline for a specific message. This method is useful to + * indicate that more time is needed to process a message by the subscriber, + * or to make the message available for redelivery if the processing was + * interrupted. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void modifyAckDeadline(ModifyAckDeadlineRequest request) { + modifyAckDeadlineCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the ack deadline for a specific message. This method is useful to + * indicate that more time is needed to process a message by the subscriber, + * or to make the message available for redelivery if the processing was + * interrupted. + */ + public ApiCallable modifyAckDeadlineCallable() { + return MODIFY_ACK_DEADLINE.bind(channel); + } + + // ----- acknowledge ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Acknowledges the messages associated with the ack tokens in the + * AcknowledgeRequest. The Pub/Sub system can remove the relevant messages + * from the subscription. + * + * Acknowledging a message whose ack deadline has expired may succeed, + * but such a message may be redelivered later. Acknowledging a message more + * than once will not result in an error. + * + * @param subscription The subscription whose message is being acknowledged. + * @param ackIds The acknowledgment ID for the messages being acknowledged that was returned + * by the Pub/Sub system in the Pull response. Must not be empty. + */ + public void acknowledge(String subscription, List ackIds) { + AcknowledgeRequest request = + AcknowledgeRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(ackIds) + .build(); + + acknowledge(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Acknowledges the messages associated with the ack tokens in the + * AcknowledgeRequest. The Pub/Sub system can remove the relevant messages + * from the subscription. + * + * Acknowledging a message whose ack deadline has expired may succeed, + * but such a message may be redelivered later. Acknowledging a message more + * than once will not result in an error. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void acknowledge(AcknowledgeRequest request) { + acknowledgeCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Acknowledges the messages associated with the ack tokens in the + * AcknowledgeRequest. The Pub/Sub system can remove the relevant messages + * from the subscription. + * + * Acknowledging a message whose ack deadline has expired may succeed, + * but such a message may be redelivered later. Acknowledging a message more + * than once will not result in an error. + */ + public ApiCallable acknowledgeCallable() { + return ACKNOWLEDGE.bind(channel); + } + + // ----- pull ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Pulls messages from the server. Returns an empty list if there are no + * messages available in the backlog. The server may return UNAVAILABLE if + * there are too many concurrent pull requests pending for the given + * subscription. + * + * @param subscription The subscription from which messages should be pulled. + * @param returnImmediately If this is specified as true the system will respond immediately even if + * it is not able to return a message in the Pull response. Otherwise the + * system is allowed to wait until at least one message is available rather + * than returning no messages. The client may cancel the request if it does + * not wish to wait any longer for the response. + * @param maxMessages The maximum number of messages returned for this request. The Pub/Sub + * system may return fewer than the number specified. + */ + public PullResponse pull(String subscription, boolean returnImmediately, int maxMessages) { + PullRequest request = + PullRequest.newBuilder() + .setSubscription(subscription) + .setReturnImmediately(returnImmediately) + .setMaxMessages(maxMessages) + .build(); + + return pull(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Pulls messages from the server. Returns an empty list if there are no + * messages available in the backlog. The server may return UNAVAILABLE if + * there are too many concurrent pull requests pending for the given + * subscription. + * + * @param request The request object containing all of the parameters for the API call. + */ + public PullResponse pull(PullRequest request) { + return pullCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Pulls messages from the server. Returns an empty list if there are no + * messages available in the backlog. The server may return UNAVAILABLE if + * there are too many concurrent pull requests pending for the given + * subscription. + */ + public ApiCallable pullCallable() { + return PULL.bind(channel); + } + + // ----- modifyPushConfig ----- + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the PushConfig for a specified subscription. + * + * This may be used to change a push subscription to a pull one (signified + * by an empty PushConfig) or vice versa, or change the endpoint URL and other + * attributes of a push subscription. Messages will accumulate for + * delivery continuously through the call regardless of changes to the + * PushConfig. + * + * @param subscription The name of the subscription. + * @param pushConfig The push configuration for future deliveries. + * + * An empty pushConfig indicates that the Pub/Sub system should + * stop pushing messages from the given subscription and allow + * messages to be pulled and acknowledged - effectively pausing + * the subscription if Pull is not called. + */ + public void modifyPushConfig(String subscription, PushConfig pushConfig) { + ModifyPushConfigRequest request = + ModifyPushConfigRequest.newBuilder() + .setSubscription(subscription) + .setPushConfig(pushConfig) + .build(); + + modifyPushConfig(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the PushConfig for a specified subscription. + * + * This may be used to change a push subscription to a pull one (signified + * by an empty PushConfig) or vice versa, or change the endpoint URL and other + * attributes of a push subscription. Messages will accumulate for + * delivery continuously through the call regardless of changes to the + * PushConfig. + * + * @param request The request object containing all of the parameters for the API call. + */ + public void modifyPushConfig(ModifyPushConfigRequest request) { + modifyPushConfigCallable().call(request); + } + + // AUTO-GENERATED DOCUMENTATION AND METHOD - see instructions at the top of the file for editing. + /** + * Modifies the PushConfig for a specified subscription. + * + * This may be used to change a push subscription to a pull one (signified + * by an empty PushConfig) or vice versa, or change the endpoint URL and other + * attributes of a push subscription. Messages will accumulate for + * delivery continuously through the call regardless of changes to the + * PushConfig. + */ + public ApiCallable modifyPushConfigCallable() { + return MODIFY_PUSH_CONFIG.bind(channel); + } + + + // ======== + // Cleanup + // ======== + + /** + * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately + * cancelled. + */ + @Override + public void close() { + // Manually-added shutdown code + + // Auto-generated shutdown code + channel.shutdown(); + + // Manually-added shutdown code + } + + + // ======== + // Manually-added methods: add custom (non-generated) methods after this point. + // ======== + +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java new file mode 100644 index 000000000000..6ec1c008f6d0 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/spi/testing/LocalPublisherImpl.java @@ -0,0 +1,120 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.gcloud.pubsub.spi.testing; + +import com.google.gcloud.pubsub.spi.PublisherApi; +import com.google.protobuf.Empty; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc.Publisher; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.grpc.stub.StreamObserver; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class LocalPublisherImpl implements Publisher { + + private Map> topics = new HashMap<>(); + + @Override + public void createTopic(Topic request, StreamObserver responseObserver) { + topics.put(request.getName(), new ArrayList()); + + Topic response = Topic.newBuilder().setName(request.getName()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void publish(PublishRequest request, StreamObserver responseObserver) { + List topicMessages = topics.get(request.getTopic()); + List ids = new ArrayList<>(); + int index = 0; + for (PubsubMessage msg : request.getMessagesList()) { + topicMessages.add(msg); + ids.add(new Integer(index).toString()); + } + responseObserver.onNext(PublishResponse.newBuilder().addAllMessageIds(ids).build()); + responseObserver.onCompleted(); + } + + @Override + public void getTopic(GetTopicRequest request, StreamObserver responseObserver) { + if (topics.get(request.getTopic()) == null) { + throw new IllegalArgumentException("topic doesn't exist: " + request.getTopic()); + } + Topic response = Topic.newBuilder().setName(request.getTopic()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void listTopics(ListTopicsRequest request, StreamObserver responseObserver) { + List responseTopics = new ArrayList<>(); + for (String topicName : topics.keySet()) { + String projectOfTopic = PublisherApi.extractProjectFromTopicPath(topicName); + String projectOfRequest = PublisherApi.extractProjectFromProjectPath(request.getProject()); + if (projectOfTopic.equals(projectOfRequest)) { + Topic topicObj = Topic.newBuilder().setName(topicName).build(); + responseTopics.add(topicObj); + } + } + Collections.sort(responseTopics, new Comparator() { + @Override public int compare(Topic o1, Topic o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + ListTopicsResponse.Builder response = ListTopicsResponse.newBuilder(); + response.setNextPageToken(""); + response.addAllTopics(responseTopics); + responseObserver.onNext(response.build()); + responseObserver.onCompleted(); + } + + @Override + public void listTopicSubscriptions(ListTopicSubscriptionsRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(ListTopicSubscriptionsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void deleteTopic(DeleteTopicRequest request, StreamObserver responseObserver) { + topics.remove(request.getTopic()); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + + public Map> getTopics() { + return topics; + } + + public void reset() { + topics = new HashMap<>(); + } +} diff --git a/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java new file mode 100644 index 000000000000..b9c17e0f0831 --- /dev/null +++ b/gcloud-java-pubsub/src/main/java/com/google/gcloud/pubsub/testing/LocalPubsubHelper.java @@ -0,0 +1,85 @@ +package com.google.gcloud.pubsub.testing; + +import com.google.gcloud.pubsub.spi.testing.LocalPublisherImpl; +import com.google.pubsub.v1.PublisherGrpc; + +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; + +import java.io.IOException; +import java.net.SocketAddress; + +/** + * A class that runs an in-memory Publisher instance for use in tests. + */ +public class LocalPubsubHelper { + private static int FLOW_CONTROL_WINDOW = 65 * 1024; + + private final SocketAddress address; + private final Server server; + private final LocalPublisherImpl publisherImpl; + + /** + * Constructs a new LocalPubsubHelper. The method start() must + * be called before it is used. + */ + public LocalPubsubHelper(String addressString) { + address = new LocalAddress(addressString); + publisherImpl = new LocalPublisherImpl(); + NettyServerBuilder builder = NettyServerBuilder + .forAddress(address) + .flowControlWindow(FLOW_CONTROL_WINDOW) + .channelType(LocalServerChannel.class); + builder.addService(PublisherGrpc.bindService(publisherImpl)); + server = builder.build(); + } + + /** + * Starts the in-memory service. + */ + public void start() { + try { + server.start(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + /** + * Resets the state of the in-memory service. + */ + public void reset() { + publisherImpl.reset(); + } + + /** + * Returns the internal in-memory service. + */ + public LocalPublisherImpl getPublisherImpl() { + return publisherImpl; + } + + /** + * Creates a channel for making requests to the in-memory service. + */ + public ManagedChannel createChannel() { + return NettyChannelBuilder + .forAddress(address) + .negotiationType(NegotiationType.PLAINTEXT) + .channelType(LocalChannel.class) + .build(); + } + + /** + * Shuts down the in-memory service. + */ + public void shutdownNow() { + server.shutdownNow(); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/LocalPublisherImpl.java b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/LocalPublisherImpl.java new file mode 100644 index 000000000000..c6cd637dd980 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/LocalPublisherImpl.java @@ -0,0 +1,138 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.gcloud.pubsub.spi; + +import com.google.protobuf.Empty; +import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsRequest; +import com.google.pubsub.v1.ListTopicSubscriptionsResponse; +import com.google.pubsub.v1.ListTopicsRequest; +import com.google.pubsub.v1.ListTopicsResponse; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc.Publisher; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.grpc.stub.StreamObserver; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class LocalPublisherImpl implements Publisher { + + private Map> topics = new HashMap<>(); + + @Override + public void createTopic(Topic request, StreamObserver responseObserver) { + topics.put(request.getName(), new ArrayList()); + + Topic response = Topic.newBuilder().setName(request.getName()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void publish(PublishRequest request, StreamObserver responseObserver) { + List topicMessages = topics.get(request.getTopic()); + List ids = new ArrayList<>(); + int index = 0; + for (PubsubMessage msg : request.getMessagesList()) { + topicMessages.add(msg); + ids.add(new Integer(index).toString()); + } + + responseObserver.onNext(PublishResponse.newBuilder().addAllMessageIds(ids).build()); + responseObserver.onCompleted(); + } + + @Override + public void getTopic(GetTopicRequest request, StreamObserver responseObserver) { + if (topics.get(request.getTopic()) == null) { + throw new IllegalArgumentException("topic doesn't exist: " + request.getTopic()); + } + + Topic response = Topic.newBuilder().setName(request.getTopic()).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + + @Override + public void listTopics(ListTopicsRequest request, StreamObserver responseObserver) { + List responseTopics = new ArrayList<>(); + for (String topicName : topics.keySet()) { + String projectOfTopic = PublisherApi.TOPIC_PATH_TEMPLATE.parse(topicName).get("project"); + String projectOfRequest = PublisherApi.PROJECT_PATH_TEMPLATE.parse(request.getProject()).get("project"); + if (projectOfTopic.equals(projectOfRequest)) { + Topic topicObj = Topic.newBuilder().setName(topicName).build(); + responseTopics.add(topicObj); + } + } + Collections.sort(responseTopics, new Comparator() { + @Override public int compare(Topic o1, Topic o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + ListTopicsResponse.Builder response = ListTopicsResponse.newBuilder(); + response.setNextPageToken(""); + response.addAllTopics(responseTopics); + responseObserver.onNext(response.build()); + responseObserver.onCompleted(); + } + + @Override + public void listTopicSubscriptions(ListTopicSubscriptionsRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(ListTopicSubscriptionsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void deleteTopic(DeleteTopicRequest request, StreamObserver responseObserver) { + topics.remove(request.getTopic()); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + + public Map> getTopics() { + return topics; + } + + public void reset() { + topics = new HashMap<>(); + } +} diff --git a/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java new file mode 100644 index 000000000000..38e337890aa1 --- /dev/null +++ b/gcloud-java-pubsub/src/test/java/com/google/gcloud/pubsub/spi/PublisherApiTest.java @@ -0,0 +1,123 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.gcloud.pubsub.spi; + +import com.google.gcloud.pubsub.testing.LocalPubsubHelper; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.gapi.gax.grpc.ServiceApiSettings; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class PublisherApiTest { + private static LocalPubsubHelper pubsubHelper; + private PublisherApi publisherApi; + + @BeforeClass + public static void startStaticServer() { + pubsubHelper = new LocalPubsubHelper("in-process-1"); + pubsubHelper.start(); + } + + @AfterClass + public static void stopServer() { + pubsubHelper.shutdownNow(); + } + + @Before + public void setUp() throws Exception { + pubsubHelper.reset(); + ServiceApiSettings settings = new ServiceApiSettings(); + settings.setChannel(pubsubHelper.createChannel()); + publisherApi = PublisherApi.create(settings); + } + + @After + public void tearDown() throws Exception { + if (publisherApi != null) { + publisherApi.close(); + } + pubsubHelper.reset(); + } + + @Test + public void testCreateTopic() throws Exception { + String topicName = PublisherApi.createTopicPath("my-project", "my-topic"); + Topic result = publisherApi.createTopic(topicName); + Assert.assertEquals(topicName, result.getName()); + Assert.assertEquals(1, pubsubHelper.getPublisherImpl().getTopics().size()); + Assert.assertNotNull(pubsubHelper.getPublisherImpl().getTopics().get(topicName)); + } + + @Test + public void testPublish() throws Exception { + String topicName = PublisherApi.createTopicPath("my-project", "publish-topic"); + publisherApi.createTopic(topicName); + PubsubMessage msg = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("pubsub-message")) + .build(); + publisherApi.publish(topicName, Collections.singletonList(msg)); + List publishedMessages = + pubsubHelper.getPublisherImpl().getTopics().get(topicName); + Assert.assertEquals(1, publishedMessages.size()); + Assert.assertEquals("pubsub-message", publishedMessages.get(0).getData().toStringUtf8()); + } + + @Test + public void testGetTopic() throws Exception { + String topicName = PublisherApi.createTopicPath("my-project", "fun-topic"); + publisherApi.createTopic(topicName); + Topic result = publisherApi.getTopic(topicName); + Assert.assertNotNull(result); + Assert.assertEquals(topicName, result.getName()); + } + + @Test + public void testListTopics() throws Exception { + String project1 = PublisherApi.createProjectPath("project.1"); + String topicName1 = PublisherApi.createTopicPath("project.1", "topic.1"); + String topicName2 = PublisherApi.createTopicPath("project.1", "topic.2"); + String topicName3 = PublisherApi.createTopicPath("project.2", "topic.3"); + publisherApi.createTopic(topicName1); + publisherApi.createTopic(topicName2); + publisherApi.createTopic(topicName3); + List topics = new ArrayList<>(); + for (Topic topic : publisherApi.listTopics(project1)) { + topics.add(topic); + } + Assert.assertEquals(2, topics.size()); + Assert.assertEquals(topicName1, topics.get(0).getName()); + Assert.assertEquals(topicName2, topics.get(1).getName()); + } + + @Test + public void testDeleteTopic() throws Exception { + String topicName = PublisherApi.createTopicPath("my-project", "fun-topic"); + publisherApi.createTopic(topicName); + publisherApi.deleteTopic(topicName); + Assert.assertEquals(0, pubsubHelper.getPublisherImpl().getTopics().size()); + } +} diff --git a/pom.xml b/pom.xml index e35a620247d1..44bdccc273a4 100644 --- a/pom.xml +++ b/pom.xml @@ -67,8 +67,10 @@ gcloud-java-core + gcloud-java-gax gcloud-java-datastore gcloud-java-storage + gcloud-java-pubsub gcloud-java gcloud-java-examples