diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index e7046c1be..26547529e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -36,6 +36,8 @@ import com.google.api.gax.rpc.StreamController; import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor; import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; @@ -86,6 +88,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private Duration inititalStreamAckDeadline; + private final Map> streamMetadata; + private final SubscriberStub subscriberStub; private final int channelAffinity; private final String subscription; @@ -134,6 +138,9 @@ private StreamingSubscriberConnection(Builder builder) { inititalStreamAckDeadline = builder.maxDurationPerAckExtension; } + streamMetadata = + ImmutableMap.of("x-goog-request-params", ImmutableList.of("subscription=" + subscription)); + subscriberStub = builder.subscriberStub; channelAffinity = builder.channelAffinity; @@ -273,7 +280,9 @@ private void initialize() { .streamingPullCallable() .splitCall( responseObserver, - GrpcCallContext.createDefault().withChannelAffinity(channelAffinity)); + GrpcCallContext.createDefault() + .withChannelAffinity(channelAffinity) + .withExtraHeaders(streamMetadata)); logger.log(Level.FINER, "Initializing stream to subscription {0}", subscription); // We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt