diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 39302d3f7..399d99658 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -35,6 +35,7 @@ import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; @@ -50,6 +51,7 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import com.google.pubsub.v1.TopicNames; +import io.grpc.CallOptions; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -89,6 +91,8 @@ public class Publisher implements PublisherInterface { private static final Logger logger = Logger.getLogger(Publisher.class.getName()); + private static final String GZIP_COMPRESSION = "gzip"; + private final String topicName; private final BatchingSettings batchingSettings; @@ -114,6 +118,12 @@ public class Publisher implements PublisherInterface { private MessageFlowController flowController = null; + private final boolean enableCompression; + private final long compressionBytesThreshold; + + private final GrpcCallContext publishContext; + private final GrpcCallContext publishContextWithCompression; + /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { return 1000L; @@ -140,6 +150,8 @@ private Publisher(Builder builder) throws IOException { this.enableMessageOrdering = builder.enableMessageOrdering; this.messageTransform = builder.messageTransform; + this.enableCompression = builder.enableCompression; + this.compressionBytesThreshold = builder.compressionBytesThreshold; messagesBatches = new HashMap<>(); messagesBatchLock = new ReentrantLock(); @@ -191,6 +203,10 @@ private Publisher(Builder builder) throws IOException { backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); shutdown = new AtomicBoolean(false); messagesWaiter = new Waiter(); + this.publishContext = GrpcCallContext.createDefault(); + this.publishContextWithCompression = + GrpcCallContext.createDefault() + .withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION)); } /** Topic which the publisher publishes to. */ @@ -431,13 +447,18 @@ private void publishAllWithoutInflightForKey(final String orderingKey) { } private ApiFuture publishCall(OutstandingBatch outstandingBatch) { + GrpcCallContext context = publishContext; + if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) { + context = publishContextWithCompression; + } return publisherStub .publishCallable() .futureCall( PublishRequest.newBuilder() .setTopic(topicName) .addAllMessages(outstandingBatch.getMessages()) - .build()); + .build(), + context); } private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { @@ -688,6 +709,8 @@ public static final class Builder { InstantiatingExecutorProvider.newBuilder() .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors()) .build(); + static final boolean DEFAULT_ENABLE_COMPRESSION = false; + static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 240L; String topicName; private String endpoint = PublisherStubSettings.getDefaultEndpoint(); @@ -717,6 +740,9 @@ public PubsubMessage apply(PubsubMessage input) { } }; + private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION; + private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD; + private Builder(String topic) { this.topicName = Preconditions.checkNotNull(topic); } @@ -827,6 +853,21 @@ public Builder setEndpoint(String endpoint) { return this; } + /** Gives the ability to enable transport compression. */ + public Builder setEnableCompression(boolean enableCompression) { + this.enableCompression = enableCompression; + return this; + } + + /** + * Sets the threshold (in bytes) above which messages are compressed for transport. Only takes + * effect if setEnableCompression(true) is also called." + */ + public Builder setCompressionBytesThreshold(long compressionBytesThreshold) { + this.compressionBytesThreshold = compressionBytesThreshold; + return this; + } + /** Returns the default BatchingSettings used by the client if settings are not provided. */ public static BatchingSettings getDefaultBatchingSettings() { return DEFAULT_BATCHING_SETTINGS; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index c72d52d3d..290b9927d 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -40,11 +40,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.Timeout; public class ITPubSubTest { @@ -403,6 +399,70 @@ public void failed(Subscriber.State from, Throwable failure) { topicAdminClient.deleteTopic(topicName); } + @Test + public void testPublishSubscribeWithCompression() throws Exception { + TopicName topicName = + TopicName.newBuilder() + .setProject(projectId) + .setTopic(formatForTest("testing-compression-topic")) + .build(); + SubscriptionName subscriptionName = + SubscriptionName.of(projectId, formatForTest("testing-compression-subscription")); + + topicAdminClient.createTopic(topicName); + + subscriptionAdminClient.createSubscription( + getSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), 10, false)); + + final BlockingQueue receiveQueue = new LinkedBlockingQueue<>(); + Subscriber subscriber = + Subscriber.newBuilder( + subscriptionName.toString(), + new MessageReceiver() { + @Override + public void receiveMessage( + final PubsubMessage message, final AckReplyConsumer consumer) { + receiveQueue.offer(MessageAndConsumer.create(message, consumer)); + } + }) + .build(); + subscriber.addListener( + new Subscriber.Listener() { + public void failed(Subscriber.State from, Throwable failure) { + receiveQueue.offer(failure); + } + }, + MoreExecutors.directExecutor()); + subscriber.startAsync(); + + Publisher publisher = Publisher.newBuilder(topicName).setEnableCompression(true).build(); + + String msg1 = generateMessage("msg1", 1000); + String msg2 = generateMessage("msg2", 1500); + publisher + .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(msg1)).build()) + .get(); + publisher + .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(msg2)).build()) + .get(); + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + + // Ack the first message. + MessageAndConsumer toAck1 = pollQueueMessageAndConsumer(receiveQueue); + toAck1.consumer().ack(); + + // Ack the second message. + MessageAndConsumer toAck2 = pollQueueMessageAndConsumer(receiveQueue); + toAck2.consumer().ack(); + + assertNotEquals(toAck1.message().getData(), toAck2.message().getData()); + + subscriber.stopAsync().awaitTerminated(); + subscriptionAdminClient.deleteSubscription(subscriptionName); + topicAdminClient.deleteTopic(topicName); + } + private MessageAndConsumer pollQueueMessageAndConsumer(BlockingQueue queue) throws InterruptedException { Object obj = pollQueue(queue); @@ -434,4 +494,14 @@ private Object pollQueue(BlockingQueue queue) throws InterruptedExceptio return obj; } + + /** Generates message of given bytes by repeatedly concatenating a token. */ + private String generateMessage(String token, int bytes) { + String result = ""; + int tokenBytes = token.length(); + for (int i = 0; i < Math.floor(bytes / tokenBytes) + 1; i++) { + result = result.concat(token); + } + return result; + } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index c41931de6..9985efc6b 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -282,6 +282,31 @@ public void testPublishMixedSizeAndDuration() throws Exception { shutdownTestPublisher(publisher); } + @Test + public void testPublishWithCompression() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableCompression(true) + .setCompressionBytesThreshold(100) + .build(); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); + ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); + ApiFuture publishFuture2 = sendTestMessage(publisher, "B"); + assertEquals("1", publishFuture1.get()); + assertEquals("2", publishFuture2.get()); + + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + shutdownTestPublisher(publisher); + } + private ApiFuture sendTestMessage(Publisher publisher, String data) { return publisher.publish( PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build());