Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding bundling support for PublisherApi.publish #776

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@

package com.google.gcloud.pubsub.spi.v1;

import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiCallable;
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableInfo;
import com.google.api.gax.grpc.BundlerFactory;
import com.google.api.gax.protobuf.PathTemplate;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.DeleteTopicRequest;
Expand Down Expand Up @@ -65,9 +66,25 @@
*/
@javax.annotation.Generated("by GAPIC")
public class PublisherApi implements AutoCloseable {
// ========
// Members
// ========

private final ManagedChannel channel;
private final List<AutoCloseable> closeables = new ArrayList<>();

private final ApiCallable<Topic, Topic> createTopicCallable;
private final ApiCallable<PublishRequest, PublishResponse> publishCallable;
private final ApiCallable<GetTopicRequest, Topic> getTopicCallable;
private final ApiCallable<ListTopicsRequest, ListTopicsResponse> listTopicsCallable;
private final ApiCallable<ListTopicsRequest, Iterable<Topic>> listTopicsIterableCallable;
private final ApiCallable<ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse>
listTopicSubscriptionsCallable;
private final ApiCallable<ListTopicSubscriptionsRequest, Iterable<String>>
listTopicSubscriptionsIterableCallable;
private final ApiCallable<DeleteTopicRequest, Empty> deleteTopicCallable;

public static class ResourceNames {
private ResourceNames() {}

// =======================
// ResourceNames Constants
Expand All @@ -93,6 +110,8 @@ private ResourceNames() {}
private static final PathTemplate TOPIC_PATH_TEMPLATE =
PathTemplate.create("projects/{project}/topics/{topic}");

private ResourceNames() {}

// ==============================
// Resource Name Helper Functions
// ==============================
Expand Down Expand Up @@ -153,24 +172,6 @@ public static final String parseTopicFromTopicPath(String topicPath) {
}
}

// ========
// Members
// ========

private final ManagedChannel channel;
private final List<AutoCloseable> closeables = new ArrayList<>();

private final ApiCallable<Topic, Topic> createTopicCallable;
private final ApiCallable<PublishRequest, PublishResponse> publishCallable;
private final ApiCallable<GetTopicRequest, Topic> getTopicCallable;
private final ApiCallable<ListTopicsRequest, ListTopicsResponse> listTopicsCallable;
private final ApiCallable<ListTopicsRequest, Iterable<Topic>> listTopicsIterableCallable;
private final ApiCallable<ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse>
listTopicSubscriptionsCallable;
private final ApiCallable<ListTopicSubscriptionsRequest, Iterable<String>>
listTopicSubscriptionsIterableCallable;
private final ApiCallable<DeleteTopicRequest, Empty> deleteTopicCallable;

// ===============
// Factory Methods
// ===============
Expand All @@ -186,8 +187,9 @@ public static PublisherApi create() throws IOException {
}

/**
* Constructs an instance of PublisherApi, using the given settings. The channels are created based
* on the settings passed in, or defaults for any settings that are not set.
* Constructs an instance of PublisherApi, using the given settings.
* The channels are created based on the settings passed in, or defaults for any
* settings that are not set.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -197,8 +199,9 @@ public static PublisherApi create(PublisherSettings settings) throws IOException
}

/**
* Constructs an instance of PublisherApi, using the given settings. This is protected so that it
* easy to make a subclass, but otherwise, the static factory methods should be preferred.
* Constructs an instance of PublisherApi, using the given settings.
* This is protected so that it easy to make a subclass, but otherwise, the static
* factory methods should be preferred.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -207,7 +210,14 @@ protected PublisherApi(PublisherSettings settings) throws IOException {
this.channel = settings.getChannel();

this.createTopicCallable = settings.createTopicMethod().build(settings);
this.publishCallable = settings.publishMethod().build(settings);
BundlableApiCallableInfo<PublishRequest, PublishResponse> bundlablePublish =
settings.publishMethod().buildBundlable(settings);
this.publishCallable = bundlablePublish.getApiCallable();
BundlerFactory<PublishRequest, PublishResponse> publishBundlerFactory =
bundlablePublish.getBundlerFactory();
if (publishBundlerFactory != null) {
this.closeables.add(publishBundlerFactory);
}
this.getTopicCallable = settings.getTopicMethod().build(settings);
this.listTopicsCallable = settings.listTopicsMethod().build(settings);
this.listTopicsIterableCallable = settings.listTopicsMethod().buildPageStreaming(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
import com.google.api.gax.core.RetryParams;
import com.google.api.gax.grpc.ApiCallSettings;
import com.google.api.gax.grpc.ApiCallable.ApiCallableBuilder;
import com.google.api.gax.grpc.ApiCallable.BundlableApiCallableBuilder;
import com.google.api.gax.grpc.ApiCallable.PageStreamingApiCallableBuilder;
import com.google.api.gax.grpc.PageDescriptor;
import com.google.api.gax.grpc.BundlingDescriptor;
import com.google.api.gax.grpc.PageStreamingDescriptor;
import com.google.api.gax.grpc.RequestIssuer;
import com.google.api.gax.grpc.ServiceApiSettings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -56,8 +59,12 @@
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

// Manually-added imports: add custom (non-generated) imports after this point.

Expand Down Expand Up @@ -132,9 +139,11 @@ public class PublisherSettings extends ServiceApiSettings {
RETRY_PARAM_DEFINITIONS = definitions.build();
}

private final MethodBuilders methods;

private static class MethodBuilders {
private final ApiCallableBuilder<Topic, Topic> createTopicMethod;
private final ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
private final BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod;
private final ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod;
private final PageStreamingApiCallableBuilder<ListTopicsRequest, ListTopicsResponse, Topic>
listTopicsMethod;
Expand All @@ -149,7 +158,8 @@ public MethodBuilders() {
createTopicMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("idempotent"));
createTopicMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));

publishMethod = new ApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH);
publishMethod =
new BundlableApiCallableBuilder<>(PublisherGrpc.METHOD_PUBLISH, PUBLISH_BUNDLING_DESC);
publishMethod.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"));
publishMethod.setRetryParams(RETRY_PARAM_DEFINITIONS.get("default"));

Expand Down Expand Up @@ -187,8 +197,6 @@ public MethodBuilders() {
}
}

private final MethodBuilders methods;

// ===============
// Factory Methods
// ===============
Expand All @@ -211,8 +219,9 @@ public static PublisherSettings create() {
}

/**
* Constructs an instance of PublisherSettings with default settings. This is protected so that it
* easy to make a subclass, but otherwise, the static factory methods should be preferred.
* Constructs an instance of PublisherSettings with default settings. This is protected
* so that it easy to make a subclass, but otherwise, the static factory methods should be
* preferred.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -223,7 +232,7 @@ protected PublisherSettings(MethodBuilders methods) {
}

/**
* Returns the ApiCallableBuilder for the API method createTopic.
* Returns the builder for the API method createTopic.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -233,17 +242,17 @@ public ApiCallableBuilder<Topic, Topic> createTopicMethod() {
}

/**
* Returns the ApiCallableBuilder for the API method publish.
* Returns the builder for the API method publish.
*
* <!-- manual edit -->
* <!-- end manual edit -->
*/
public ApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
public BundlableApiCallableBuilder<PublishRequest, PublishResponse> publishMethod() {
return methods.publishMethod;
}

/**
* Returns the ApiCallableBuilder for the API method getTopic.
* Returns the builder for the API method getTopic.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -253,7 +262,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
}

/**
* Returns the PageStreamingApiCallableBuilder for the API method listTopics.
* Returns the builder for the API method listTopics.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -264,7 +273,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
}

/**
* Returns the PageStreamingApiCallableBuilder for the API method listTopicSubscriptions.
* Returns the builder for the API method listTopicSubscriptions.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -276,7 +285,7 @@ public ApiCallableBuilder<GetTopicRequest, Topic> getTopicMethod() {
}

/**
* Returns the ApiCallableBuilder for the API method deleteTopic.
* Returns the builder for the API method deleteTopic.
*
* <!-- manual edit -->
* <!-- end manual edit -->
Expand All @@ -285,9 +294,9 @@ public ApiCallableBuilder<DeleteTopicRequest, Empty> deleteTopicMethod() {
return methods.deleteTopicMethod;
}

private static PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
private static PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>
LIST_TOPICS_PAGE_STR_DESC =
new PageDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
new PageStreamingDescriptor<ListTopicsRequest, ListTopicsResponse, Topic>() {
@Override
public Object emptyToken() {
return "";
Expand All @@ -309,10 +318,10 @@ public Iterable<Topic> extractResources(ListTopicsResponse payload) {
}
};

private static PageDescriptor<
private static PageStreamingDescriptor<
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>
LIST_TOPIC_SUBSCRIPTIONS_PAGE_STR_DESC =
new PageDescriptor<
new PageStreamingDescriptor<
ListTopicSubscriptionsRequest, ListTopicSubscriptionsResponse, String>() {
@Override
public Object emptyToken() {
Expand All @@ -337,4 +346,66 @@ public Iterable<String> extractResources(ListTopicSubscriptionsResponse payload)
return payload.getSubscriptionsList();
}
};

private static BundlingDescriptor<PublishRequest, PublishResponse> PUBLISH_BUNDLING_DESC =
new BundlingDescriptor<PublishRequest, PublishResponse>() {
@Override
public String getBundlePartitionKey(PublishRequest request) {
return request.getTopic();
}

@Override
public PublishRequest mergeRequests(Collection<PublishRequest> requests) {
PublishRequest firstRequest = requests.iterator().next();

List<PubsubMessage> elements = new ArrayList<>();
for (PublishRequest request : requests) {
elements.addAll(request.getMessagesList());
}

PublishRequest bundleRequest =
PublishRequest.newBuilder()
.setTopic(firstRequest.getTopic())
.addAllMessages(elements)
.build();
return bundleRequest;
}

@Override
public void splitResponse(
PublishResponse bundleResponse,
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
int bundleMessageIndex = 0;
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
List<String> subresponseElements = new ArrayList<>();
int subresponseCount = responder.getRequest().getMessagesCount();
for (int i = 0; i < subresponseCount; i++) {

This comment was marked as spam.

This comment was marked as spam.

subresponseElements.add(bundleResponse.getMessageIds(bundleMessageIndex));
bundleMessageIndex += 1;
}
PublishResponse response =
PublishResponse.newBuilder().addAllMessageIds(subresponseElements).build();
responder.setResponse(response);
}
}

@Override
public void splitException(
Throwable throwable,
Collection<? extends RequestIssuer<PublishRequest, PublishResponse>> bundle) {
for (RequestIssuer<PublishRequest, PublishResponse> responder : bundle) {
responder.setException(throwable);
}
}

@Override
public long countElements(PublishRequest request) {
return request.getMessagesCount();
}

@Override
public long countBytes(PublishRequest request) {
return request.getSerializedSize();
}
};
}
Loading