Skip to content

Commit

Permalink
feat: Allow REST transport for PubSub Java client.
Browse files Browse the repository at this point in the history
This leverages capabilities added in googleapis#1162
  • Loading branch information
sanjivr committed Jun 13, 2024
1 parent 94c55d0 commit d870839
Showing 1 changed file with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.httpjson.HttpJsonCallContext;
import com.google.api.gax.httpjson.InstantiatingHttpJsonChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.NoHeaderProvider;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
import com.google.cloud.pubsub.v1.stub.HttpJsonPublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -120,9 +124,10 @@ public class Publisher implements PublisherInterface {

private final boolean enableCompression;
private final long compressionBytesThreshold;
private final boolean enableRESTJsonTransport;

private final GrpcCallContext publishContext;
private final GrpcCallContext publishContextWithCompression;
private final ApiCallContext publishContext;
private final ApiCallContext publishContextWithCompression;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
Expand Down Expand Up @@ -152,6 +157,8 @@ private Publisher(Builder builder) throws IOException {
this.messageTransform = builder.messageTransform;
this.enableCompression = builder.enableCompression;
this.compressionBytesThreshold = builder.compressionBytesThreshold;
this.enableRESTJsonTransport =
builder.channelProvider instanceof InstantiatingHttpJsonChannelProvider;

messagesBatches = new HashMap<>();
messagesBatchLock = new ReentrantLock();
Expand Down Expand Up @@ -199,15 +206,24 @@ private Publisher(Builder builder) throws IOException {
StatusCode.Code.UNAVAILABLE)
.setRetrySettings(retrySettingsBuilder.build())
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
this.publisherStub =
this.enableRESTJsonTransport
? HttpJsonPublisherStub.create(stubSettings.build())
: GrpcPublisherStub.create(stubSettings.build());
backgroundResourceList.add(publisherStub);
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
shutdown = new AtomicBoolean(false);
messagesWaiter = new Waiter();
this.publishContext = GrpcCallContext.createDefault();
this.publishContext =
this.enableRESTJsonTransport
? HttpJsonCallContext.createDefault()
: GrpcCallContext.createDefault();
this.publishContextWithCompression =
GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION));
this.enableRESTJsonTransport
? this.publishContext
: // TODO
GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION));
}

/** Topic which the publisher publishes to. */
Expand Down Expand Up @@ -448,7 +464,7 @@ private void publishAllWithoutInflightForKey(final String orderingKey) {
}

private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
GrpcCallContext context = publishContext;
ApiCallContext context = publishContext;
if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) {
context = publishContextWithCompression;
}
Expand Down

0 comments on commit d870839

Please sign in to comment.