From 4ad1a3fc6e334b6b4efe2167dbe6976c5b5625a6 Mon Sep 17 00:00:00 2001 From: Rajanya Dhar <95830086+rajanya-google@users.noreply.github.com> Date: Tue, 10 May 2022 14:19:25 -0400 Subject: [PATCH] feat: Adding grpc compression support for publisher client (#1000) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adding gRPC compression support to the library * Minor comment fix * Formatting the code * Adding unit test for compression * Adding integration test for compression * Formatting * Refactoring integration tests to add support for overriding endpoint * Adding sample for publish with compression; Updating README * Adding integration test for compression sample * Adding parameter compressionBytesThreshold to Publisher; Adding logging support in the compression example * Addressing PR comments * Addressing checkstyle * Addressed PR comment * Addressing PR comment to put a Precondition for compression and its threshold * Addressing PR review * Removing logging from example * Adding logging properties * Making the publish call unified with context as per PR comments * Removing sample code as per tianzi@'s comments * Minor fixes * Adding gRPC compression support to the library * Minor comment fix * Formatting the code * Adding unit test for compression * Adding integration test for compression * Formatting * Refactoring integration tests to add support for overriding endpoint * Adding sample for publish with compression; Updating README * Adding integration test for compression sample * Adding parameter compressionBytesThreshold to Publisher; Adding logging support in the compression example * Addressing PR comments * Addressing checkstyle * Addressed PR comment * Addressing PR comment to put a Precondition for compression and its threshold * Addressing PR review * Removing logging from example * Adding logging properties * Making the publish call unified with context as per PR comments * Removing sample code as per tianzi@'s comments * Minor fixes * Fixing IT * Creating a class variable publishContext to remove the overhead of GrpcCallContext.createDefault() with every publish call * fixing lint format * Addressed PR comments * Removing test * build(deps): update dependency com.google.cloud:google-cloud-shared-config to v1.4.0 (#1105) [![WhiteSource Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [com.google.cloud:google-cloud-shared-config](https://togithub.com/googleapis/java-shared-config) | `1.3.3` -> `1.4.0` | [![age](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-shared-config/1.4.0/age-slim)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-shared-config/1.4.0/adoption-slim)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-shared-config/1.4.0/compatibility-slim/1.3.3)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-shared-config/1.4.0/confidence-slim/1.3.3)](https://docs.renovatebot.com/merge-confidence/) | --- ### Release Notes
googleapis/java-shared-config ### [`v1.4.0`](https://togithub.com/googleapis/java-shared-config/blob/HEAD/CHANGELOG.md#​140-httpsgithubcomgoogleapisjava-shared-configcomparev133v140-2022-04-28) [Compare Source](https://togithub.com/googleapis/java-shared-config/compare/v1.3.3...v1.4.0) ##### Features - **java:** remove native image module ([#​471](https://togithub.com/googleapis/java-shared-config/issues/471)) ([7fcba01](https://togithub.com/googleapis/java-shared-config/commit/7fcba016b3138d7beaa4e962853f9bc80f59438c)) ##### [1.3.3](https://togithub.com/googleapis/java-shared-config/compare/v1.3.2...v1.3.3) (2022-04-19) ##### Bug Fixes - **java:** remove protobuf feature from native profile ([#​461](https://togithub.com/googleapis/java-shared-config/issues/461)) ([ffd07cb](https://togithub.com/googleapis/java-shared-config/commit/ffd07cb18ee7d45d4daee1d9ea6f6d321fdca874)) ##### Dependencies - update dependency com.google.cloud:native-image-support to v0.12.11 ([#​459](https://togithub.com/googleapis/java-shared-config/issues/459)) ([d20008d](https://togithub.com/googleapis/java-shared-config/commit/d20008df15209708fdf9d06828b567778190f4d0)) - update dependency com.google.cloud:native-image-support to v0.13.1 ([#​465](https://togithub.com/googleapis/java-shared-config/issues/465)) ([b202064](https://togithub.com/googleapis/java-shared-config/commit/b2020648816feb4740ad70acedfed470d7da5bcf)) ##### [1.3.2](https://togithub.com/googleapis/java-shared-config/compare/v1.3.1...v1.3.2) (2022-03-28) ##### Dependencies - revert google-java-format to 1.7 ([#​453](https://togithub.com/googleapis/java-shared-config/issues/453)) ([cbc777f](https://togithub.com/googleapis/java-shared-config/commit/cbc777f3e9ab75edb6fa2e0268a7446ae4111725)) ##### [1.3.1](https://togithub.com/googleapis/java-shared-config/compare/v1.3.0...v1.3.1) (2022-03-25) ##### Dependencies - update dependency com.google.cloud:native-image-support to v0.12.10 ([#​443](https://togithub.com/googleapis/java-shared-config/issues/443)) ([5b39d5e](https://togithub.com/googleapis/java-shared-config/commit/5b39d5ee15121f052226ff873b6ab101e9c71de5)) - update dependency com.google.googlejavaformat:google-java-format to v1.15.0 ([#​426](https://togithub.com/googleapis/java-shared-config/issues/426)) ([4c3c4b6](https://togithub.com/googleapis/java-shared-config/commit/4c3c4b66129632181e6bc363a0ecccf4f5aac914)) - update dependency org.graalvm.buildtools:junit-platform-native to v0.9.11 ([#​448](https://togithub.com/googleapis/java-shared-config/issues/448)) ([f7f518e](https://togithub.com/googleapis/java-shared-config/commit/f7f518e87d1d9feb9ac54d7c099f97d8751ee3da)) - update dependency org.graalvm.buildtools:native-maven-plugin to v0.9.11 ([#​449](https://togithub.com/googleapis/java-shared-config/issues/449)) ([3e1c0b5](https://togithub.com/googleapis/java-shared-config/commit/3e1c0b5a1d2f4a0db88c06a0d683ed90cbc745e2))
--- ### Configuration 📅 **Schedule**: At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, click this checkbox. --- This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#github/googleapis/java-pubsub). * fix: added exactly once delivery files to owlbot config (#1106) * chore(bazel): update version of Protobuf to v3.20.1 (#1079) - [ ] Regenerate this pull request now. PiperOrigin-RevId: 444328399 Source-Link: https://github.com/googleapis/googleapis/commit/c7ca416c2856aad6a3f7092924e56b8cf0cb2534 Source-Link: https://github.com/googleapis/googleapis-gen/commit/d61705453a62b3ecda78aa30c192840ebc5a8a90 Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiZDYxNzA1NDUzYTYyYjNlY2RhNzhhYTMwYzE5Mjg0MGViYzVhOGE5MCJ9 feat: AuditConfig for IAM v1 PiperOrigin-RevId: 439356405 Source-Link: https://github.com/googleapis/googleapis/commit/afa2ba156bd5c83ad8168030ab801a8ca84ac819 Source-Link: https://github.com/googleapis/googleapis-gen/commit/3e40c17e1510c95fab58fc2143ccb61cceca5989 Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiM2U0MGMxN2UxNTEwYzk1ZmFiNThmYzIxNDNjY2I2MWNjZWNhNTk4OSJ9 * chore(deps): upgrade gapic-generator-java to 2.7.0 and update gax-java to 2.16.0 (#1107) - [ ] Regenerate this pull request now. PiperOrigin-RevId: 446250659 Source-Link: https://github.com/googleapis/googleapis/commit/dc4ef314fecf1b00833e78288cf2eb4d2b165ad1 Source-Link: https://github.com/googleapis/googleapis-gen/commit/5fdda4ddfbab87998e77e4a453e0ff87986d2db8 Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiNWZkZGE0ZGRmYmFiODc5OThlNzdlNGE0NTNlMGZmODc5ODZkMmRiOCJ9 * build(deps): update dependency org.apache.maven.plugins:maven-project-info-reports-plugin to v3.3.0 (#1104) [![WhiteSource Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com) This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [org.apache.maven.plugins:maven-project-info-reports-plugin](https://maven.apache.org/plugins/) ([source](https://togithub.com/apache/maven-project-info-reports-plugin)) | `3.2.2` -> `3.3.0` | [![age](https://badges.renovateapi.com/packages/maven/org.apache.maven.plugins:maven-project-info-reports-plugin/3.3.0/age-slim)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://badges.renovateapi.com/packages/maven/org.apache.maven.plugins:maven-project-info-reports-plugin/3.3.0/adoption-slim)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://badges.renovateapi.com/packages/maven/org.apache.maven.plugins:maven-project-info-reports-plugin/3.3.0/compatibility-slim/3.2.2)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://badges.renovateapi.com/packages/maven/org.apache.maven.plugins:maven-project-info-reports-plugin/3.3.0/confidence-slim/3.2.2)](https://docs.renovatebot.com/merge-confidence/) | --- ### Configuration 📅 **Schedule**: At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, click this checkbox. --- This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#github/googleapis/java-pubsub). * Formatting * Formatting Co-authored-by: WhiteSource Renovate Co-authored-by: Mike Micatka <31972785+mmicatka@users.noreply.github.com> Co-authored-by: gcf-owl-bot[bot] <78513119+gcf-owl-bot[bot]@users.noreply.github.com> --- .../com/google/cloud/pubsub/v1/Publisher.java | 43 +++++++++- .../google/cloud/pubsub/it/ITPubSubTest.java | 80 +++++++++++++++++-- .../cloud/pubsub/v1/PublisherImplTest.java | 25 ++++++ 3 files changed, 142 insertions(+), 6 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 39302d3f7..399d99658 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -35,6 +35,7 @@ import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; @@ -50,6 +51,7 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import com.google.pubsub.v1.TopicNames; +import io.grpc.CallOptions; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -89,6 +91,8 @@ public class Publisher implements PublisherInterface { private static final Logger logger = Logger.getLogger(Publisher.class.getName()); + private static final String GZIP_COMPRESSION = "gzip"; + private final String topicName; private final BatchingSettings batchingSettings; @@ -114,6 +118,12 @@ public class Publisher implements PublisherInterface { private MessageFlowController flowController = null; + private final boolean enableCompression; + private final long compressionBytesThreshold; + + private final GrpcCallContext publishContext; + private final GrpcCallContext publishContextWithCompression; + /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { return 1000L; @@ -140,6 +150,8 @@ private Publisher(Builder builder) throws IOException { this.enableMessageOrdering = builder.enableMessageOrdering; this.messageTransform = builder.messageTransform; + this.enableCompression = builder.enableCompression; + this.compressionBytesThreshold = builder.compressionBytesThreshold; messagesBatches = new HashMap<>(); messagesBatchLock = new ReentrantLock(); @@ -191,6 +203,10 @@ private Publisher(Builder builder) throws IOException { backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); shutdown = new AtomicBoolean(false); messagesWaiter = new Waiter(); + this.publishContext = GrpcCallContext.createDefault(); + this.publishContextWithCompression = + GrpcCallContext.createDefault() + .withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION)); } /** Topic which the publisher publishes to. */ @@ -431,13 +447,18 @@ private void publishAllWithoutInflightForKey(final String orderingKey) { } private ApiFuture publishCall(OutstandingBatch outstandingBatch) { + GrpcCallContext context = publishContext; + if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) { + context = publishContextWithCompression; + } return publisherStub .publishCallable() .futureCall( PublishRequest.newBuilder() .setTopic(topicName) .addAllMessages(outstandingBatch.getMessages()) - .build()); + .build(), + context); } private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { @@ -688,6 +709,8 @@ public static final class Builder { InstantiatingExecutorProvider.newBuilder() .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors()) .build(); + static final boolean DEFAULT_ENABLE_COMPRESSION = false; + static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 240L; String topicName; private String endpoint = PublisherStubSettings.getDefaultEndpoint(); @@ -717,6 +740,9 @@ public PubsubMessage apply(PubsubMessage input) { } }; + private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION; + private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD; + private Builder(String topic) { this.topicName = Preconditions.checkNotNull(topic); } @@ -827,6 +853,21 @@ public Builder setEndpoint(String endpoint) { return this; } + /** Gives the ability to enable transport compression. */ + public Builder setEnableCompression(boolean enableCompression) { + this.enableCompression = enableCompression; + return this; + } + + /** + * Sets the threshold (in bytes) above which messages are compressed for transport. Only takes + * effect if setEnableCompression(true) is also called." + */ + public Builder setCompressionBytesThreshold(long compressionBytesThreshold) { + this.compressionBytesThreshold = compressionBytesThreshold; + return this; + } + /** Returns the default BatchingSettings used by the client if settings are not provided. */ public static BatchingSettings getDefaultBatchingSettings() { return DEFAULT_BATCHING_SETTINGS; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index c72d52d3d..290b9927d 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -40,11 +40,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.Timeout; public class ITPubSubTest { @@ -403,6 +399,70 @@ public void failed(Subscriber.State from, Throwable failure) { topicAdminClient.deleteTopic(topicName); } + @Test + public void testPublishSubscribeWithCompression() throws Exception { + TopicName topicName = + TopicName.newBuilder() + .setProject(projectId) + .setTopic(formatForTest("testing-compression-topic")) + .build(); + SubscriptionName subscriptionName = + SubscriptionName.of(projectId, formatForTest("testing-compression-subscription")); + + topicAdminClient.createTopic(topicName); + + subscriptionAdminClient.createSubscription( + getSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), 10, false)); + + final BlockingQueue receiveQueue = new LinkedBlockingQueue<>(); + Subscriber subscriber = + Subscriber.newBuilder( + subscriptionName.toString(), + new MessageReceiver() { + @Override + public void receiveMessage( + final PubsubMessage message, final AckReplyConsumer consumer) { + receiveQueue.offer(MessageAndConsumer.create(message, consumer)); + } + }) + .build(); + subscriber.addListener( + new Subscriber.Listener() { + public void failed(Subscriber.State from, Throwable failure) { + receiveQueue.offer(failure); + } + }, + MoreExecutors.directExecutor()); + subscriber.startAsync(); + + Publisher publisher = Publisher.newBuilder(topicName).setEnableCompression(true).build(); + + String msg1 = generateMessage("msg1", 1000); + String msg2 = generateMessage("msg2", 1500); + publisher + .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(msg1)).build()) + .get(); + publisher + .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(msg2)).build()) + .get(); + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + + // Ack the first message. + MessageAndConsumer toAck1 = pollQueueMessageAndConsumer(receiveQueue); + toAck1.consumer().ack(); + + // Ack the second message. + MessageAndConsumer toAck2 = pollQueueMessageAndConsumer(receiveQueue); + toAck2.consumer().ack(); + + assertNotEquals(toAck1.message().getData(), toAck2.message().getData()); + + subscriber.stopAsync().awaitTerminated(); + subscriptionAdminClient.deleteSubscription(subscriptionName); + topicAdminClient.deleteTopic(topicName); + } + private MessageAndConsumer pollQueueMessageAndConsumer(BlockingQueue queue) throws InterruptedException { Object obj = pollQueue(queue); @@ -434,4 +494,14 @@ private Object pollQueue(BlockingQueue queue) throws InterruptedExceptio return obj; } + + /** Generates message of given bytes by repeatedly concatenating a token. */ + private String generateMessage(String token, int bytes) { + String result = ""; + int tokenBytes = token.length(); + for (int i = 0; i < Math.floor(bytes / tokenBytes) + 1; i++) { + result = result.concat(token); + } + return result; + } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index c41931de6..9985efc6b 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -282,6 +282,31 @@ public void testPublishMixedSizeAndDuration() throws Exception { shutdownTestPublisher(publisher); } + @Test + public void testPublishWithCompression() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setEnableCompression(true) + .setCompressionBytesThreshold(100) + .build(); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); + ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); + ApiFuture publishFuture2 = sendTestMessage(publisher, "B"); + assertEquals("1", publishFuture1.get()); + assertEquals("2", publishFuture2.get()); + + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + shutdownTestPublisher(publisher); + } + private ApiFuture sendTestMessage(Publisher publisher, String data) { return publisher.publish( PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build());