Skip to content

Commit

Permalink
feat: Adding grpc compression support for publisher client (#1000)
Browse files Browse the repository at this point in the history
* Adding gRPC compression support to the library

* Minor comment fix

* Formatting the code

* Adding unit test for compression

* Adding integration test for compression

* Formatting

* Refactoring integration tests to add support for overriding endpoint

* Adding sample for publish with compression; Updating README

* Adding integration test for compression sample

* Adding parameter compressionBytesThreshold to Publisher; Adding logging support in the compression example

* Addressing PR comments

* Addressing checkstyle

* Addressed PR comment

* Addressing PR comment to put a Precondition for compression and its threshold

* Addressing PR review

* Removing logging from example

* Adding logging properties

* Making the publish call unified with context as per PR comments

* Removing sample code as per tianzi@'s comments

* Minor fixes

* Adding gRPC compression support to the library

* Minor comment fix

* Formatting the code

* Adding unit test for compression

* Adding integration test for compression

* Formatting

* Refactoring integration tests to add support for overriding endpoint

* Adding sample for publish with compression; Updating README

* Adding integration test for compression sample

* Adding parameter compressionBytesThreshold to Publisher; Adding logging support in the compression example

* Addressing PR comments

* Addressing checkstyle

* Addressed PR comment

* Addressing PR comment to put a Precondition for compression and its threshold

* Addressing PR review

* Removing logging from example

* Adding logging properties

* Making the publish call unified with context as per PR comments

* Removing sample code as per tianzi@'s comments

* Minor fixes

* Fixing IT

* Creating a class variable publishContext to remove the overhead of GrpcCallContext.createDefault() with every publish call

* fixing lint format

* Addressed PR comments

* Removing test

* build(deps): update dependency com.google.cloud:google-cloud-shared-config to v1.4.0 (#1105)

[![WhiteSource Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com)

This PR contains the following updates:

| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [com.google.cloud:google-cloud-shared-config](https://togithub.com/googleapis/java-shared-config) | `1.3.3` -> `1.4.0` | [![age](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-shared-config/1.4.0/age-slim)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-shared-config/1.4.0/adoption-slim)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-shared-config/1.4.0/compatibility-slim/1.3.3)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://badges.renovateapi.com/packages/maven/com.google.cloud:google-cloud-shared-config/1.4.0/confidence-slim/1.3.3)](https://docs.renovatebot.com/merge-confidence/) |

---

### Release Notes

<details>
<summary>googleapis/java-shared-config</summary>

### [`v1.4.0`](https://togithub.com/googleapis/java-shared-config/blob/HEAD/CHANGELOG.md#&#8203;140-httpsgithubcomgoogleapisjava-shared-configcomparev133v140-2022-04-28)

[Compare Source](https://togithub.com/googleapis/java-shared-config/compare/v1.3.3...v1.4.0)

##### Features

-   **java:** remove native image module ([#&#8203;471](https://togithub.com/googleapis/java-shared-config/issues/471)) ([7fcba01](https://togithub.com/googleapis/java-shared-config/commit/7fcba016b3138d7beaa4e962853f9bc80f59438c))

##### [1.3.3](https://togithub.com/googleapis/java-shared-config/compare/v1.3.2...v1.3.3) (2022-04-19)

##### Bug Fixes

-   **java:** remove protobuf feature from native profile ([#&#8203;461](https://togithub.com/googleapis/java-shared-config/issues/461)) ([ffd07cb](https://togithub.com/googleapis/java-shared-config/commit/ffd07cb18ee7d45d4daee1d9ea6f6d321fdca874))

##### Dependencies

-   update dependency com.google.cloud:native-image-support to v0.12.11 ([#&#8203;459](https://togithub.com/googleapis/java-shared-config/issues/459)) ([d20008d](https://togithub.com/googleapis/java-shared-config/commit/d20008df15209708fdf9d06828b567778190f4d0))
-   update dependency com.google.cloud:native-image-support to v0.13.1 ([#&#8203;465](https://togithub.com/googleapis/java-shared-config/issues/465)) ([b202064](https://togithub.com/googleapis/java-shared-config/commit/b2020648816feb4740ad70acedfed470d7da5bcf))

##### [1.3.2](https://togithub.com/googleapis/java-shared-config/compare/v1.3.1...v1.3.2) (2022-03-28)

##### Dependencies

-   revert google-java-format to 1.7 ([#&#8203;453](https://togithub.com/googleapis/java-shared-config/issues/453)) ([cbc777f](https://togithub.com/googleapis/java-shared-config/commit/cbc777f3e9ab75edb6fa2e0268a7446ae4111725))

##### [1.3.1](https://togithub.com/googleapis/java-shared-config/compare/v1.3.0...v1.3.1) (2022-03-25)

##### Dependencies

-   update dependency com.google.cloud:native-image-support to v0.12.10 ([#&#8203;443](https://togithub.com/googleapis/java-shared-config/issues/443)) ([5b39d5e](https://togithub.com/googleapis/java-shared-config/commit/5b39d5ee15121f052226ff873b6ab101e9c71de5))
-   update dependency com.google.googlejavaformat:google-java-format to v1.15.0 ([#&#8203;426](https://togithub.com/googleapis/java-shared-config/issues/426)) ([4c3c4b6](https://togithub.com/googleapis/java-shared-config/commit/4c3c4b66129632181e6bc363a0ecccf4f5aac914))
-   update dependency org.graalvm.buildtools:junit-platform-native to v0.9.11 ([#&#8203;448](https://togithub.com/googleapis/java-shared-config/issues/448)) ([f7f518e](https://togithub.com/googleapis/java-shared-config/commit/f7f518e87d1d9feb9ac54d7c099f97d8751ee3da))
-   update dependency org.graalvm.buildtools:native-maven-plugin to v0.9.11 ([#&#8203;449](https://togithub.com/googleapis/java-shared-config/issues/449)) ([3e1c0b5](https://togithub.com/googleapis/java-shared-config/commit/3e1c0b5a1d2f4a0db88c06a0d683ed90cbc745e2))

</details>

---

### Configuration

📅 **Schedule**: At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox.

---

This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#github/googleapis/java-pubsub).

* fix: added exactly once delivery files to owlbot config (#1106)

* chore(bazel): update version of Protobuf to v3.20.1 (#1079)

- [ ] Regenerate this pull request now.

PiperOrigin-RevId: 444328399

Source-Link: googleapis/googleapis@c7ca416

Source-Link: googleapis/googleapis-gen@d617054
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiZDYxNzA1NDUzYTYyYjNlY2RhNzhhYTMwYzE5Mjg0MGViYzVhOGE5MCJ9

feat: AuditConfig for IAM v1

PiperOrigin-RevId: 439356405

Source-Link: googleapis/googleapis@afa2ba1

Source-Link: googleapis/googleapis-gen@3e40c17
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiM2U0MGMxN2UxNTEwYzk1ZmFiNThmYzIxNDNjY2I2MWNjZWNhNTk4OSJ9

* chore(deps): upgrade gapic-generator-java to 2.7.0 and update gax-java to 2.16.0 (#1107)

- [ ] Regenerate this pull request now.

PiperOrigin-RevId: 446250659

Source-Link: googleapis/googleapis@dc4ef31

Source-Link: googleapis/googleapis-gen@5fdda4d
Copy-Tag: eyJwIjoiLmdpdGh1Yi8uT3dsQm90LnlhbWwiLCJoIjoiNWZkZGE0ZGRmYmFiODc5OThlNzdlNGE0NTNlMGZmODc5ODZkMmRiOCJ9

* build(deps): update dependency org.apache.maven.plugins:maven-project-info-reports-plugin to v3.3.0 (#1104)

[![WhiteSource Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com)

This PR contains the following updates:

| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [org.apache.maven.plugins:maven-project-info-reports-plugin](https://maven.apache.org/plugins/) ([source](https://togithub.com/apache/maven-project-info-reports-plugin)) | `3.2.2` -> `3.3.0` | [![age](https://badges.renovateapi.com/packages/maven/org.apache.maven.plugins:maven-project-info-reports-plugin/3.3.0/age-slim)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://badges.renovateapi.com/packages/maven/org.apache.maven.plugins:maven-project-info-reports-plugin/3.3.0/adoption-slim)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://badges.renovateapi.com/packages/maven/org.apache.maven.plugins:maven-project-info-reports-plugin/3.3.0/compatibility-slim/3.2.2)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://badges.renovateapi.com/packages/maven/org.apache.maven.plugins:maven-project-info-reports-plugin/3.3.0/confidence-slim/3.2.2)](https://docs.renovatebot.com/merge-confidence/) |

---

### Configuration

📅 **Schedule**: At any time (no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox.

---

This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#github/googleapis/java-pubsub).

* Formatting

* Formatting

Co-authored-by: WhiteSource Renovate <[email protected]>
Co-authored-by: Mike Micatka <[email protected]>
Co-authored-by: gcf-owl-bot[bot] <78513119+gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored May 10, 2022
1 parent dc1670c commit 4ad1a3f
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.NoHeaderProvider;
Expand All @@ -50,6 +51,7 @@
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.TopicNames;
import io.grpc.CallOptions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -89,6 +91,8 @@
public class Publisher implements PublisherInterface {
private static final Logger logger = Logger.getLogger(Publisher.class.getName());

private static final String GZIP_COMPRESSION = "gzip";

private final String topicName;

private final BatchingSettings batchingSettings;
Expand All @@ -114,6 +118,12 @@ public class Publisher implements PublisherInterface {

private MessageFlowController flowController = null;

private final boolean enableCompression;
private final long compressionBytesThreshold;

private final GrpcCallContext publishContext;
private final GrpcCallContext publishContextWithCompression;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
return 1000L;
Expand All @@ -140,6 +150,8 @@ private Publisher(Builder builder) throws IOException {

this.enableMessageOrdering = builder.enableMessageOrdering;
this.messageTransform = builder.messageTransform;
this.enableCompression = builder.enableCompression;
this.compressionBytesThreshold = builder.compressionBytesThreshold;

messagesBatches = new HashMap<>();
messagesBatchLock = new ReentrantLock();
Expand Down Expand Up @@ -191,6 +203,10 @@ private Publisher(Builder builder) throws IOException {
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
shutdown = new AtomicBoolean(false);
messagesWaiter = new Waiter();
this.publishContext = GrpcCallContext.createDefault();
this.publishContextWithCompression =
GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION));
}

/** Topic which the publisher publishes to. */
Expand Down Expand Up @@ -431,13 +447,18 @@ private void publishAllWithoutInflightForKey(final String orderingKey) {
}

private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
GrpcCallContext context = publishContext;
if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) {
context = publishContextWithCompression;
}
return publisherStub
.publishCallable()
.futureCall(
PublishRequest.newBuilder()
.setTopic(topicName)
.addAllMessages(outstandingBatch.getMessages())
.build());
.build(),
context);
}

private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
Expand Down Expand Up @@ -688,6 +709,8 @@ public static final class Builder {
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
.build();
static final boolean DEFAULT_ENABLE_COMPRESSION = false;
static final long DEFAULT_COMPRESSION_BYTES_THRESHOLD = 240L;

String topicName;
private String endpoint = PublisherStubSettings.getDefaultEndpoint();
Expand Down Expand Up @@ -717,6 +740,9 @@ public PubsubMessage apply(PubsubMessage input) {
}
};

private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION;
private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD;

private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}
Expand Down Expand Up @@ -827,6 +853,21 @@ public Builder setEndpoint(String endpoint) {
return this;
}

/** Gives the ability to enable transport compression. */
public Builder setEnableCompression(boolean enableCompression) {
this.enableCompression = enableCompression;
return this;
}

/**
* Sets the threshold (in bytes) above which messages are compressed for transport. Only takes
* effect if setEnableCompression(true) is also called."
*/
public Builder setCompressionBytesThreshold(long compressionBytesThreshold) {
this.compressionBytesThreshold = compressionBytesThreshold;
return this;
}

/** Returns the default BatchingSettings used by the client if settings are not provided. */
public static BatchingSettings getDefaultBatchingSettings() {
return DEFAULT_BATCHING_SETTINGS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.*;
import org.junit.rules.Timeout;

public class ITPubSubTest {
Expand Down Expand Up @@ -403,6 +399,70 @@ public void failed(Subscriber.State from, Throwable failure) {
topicAdminClient.deleteTopic(topicName);
}

@Test
public void testPublishSubscribeWithCompression() throws Exception {
TopicName topicName =
TopicName.newBuilder()
.setProject(projectId)
.setTopic(formatForTest("testing-compression-topic"))
.build();
SubscriptionName subscriptionName =
SubscriptionName.of(projectId, formatForTest("testing-compression-subscription"));

topicAdminClient.createTopic(topicName);

subscriptionAdminClient.createSubscription(
getSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), 10, false));

final BlockingQueue<Object> receiveQueue = new LinkedBlockingQueue<>();
Subscriber subscriber =
Subscriber.newBuilder(
subscriptionName.toString(),
new MessageReceiver() {
@Override
public void receiveMessage(
final PubsubMessage message, final AckReplyConsumer consumer) {
receiveQueue.offer(MessageAndConsumer.create(message, consumer));
}
})
.build();
subscriber.addListener(
new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
receiveQueue.offer(failure);
}
},
MoreExecutors.directExecutor());
subscriber.startAsync();

Publisher publisher = Publisher.newBuilder(topicName).setEnableCompression(true).build();

String msg1 = generateMessage("msg1", 1000);
String msg2 = generateMessage("msg2", 1500);
publisher
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(msg1)).build())
.get();
publisher
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(msg2)).build())
.get();
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);

// Ack the first message.
MessageAndConsumer toAck1 = pollQueueMessageAndConsumer(receiveQueue);
toAck1.consumer().ack();

// Ack the second message.
MessageAndConsumer toAck2 = pollQueueMessageAndConsumer(receiveQueue);
toAck2.consumer().ack();

assertNotEquals(toAck1.message().getData(), toAck2.message().getData());

subscriber.stopAsync().awaitTerminated();
subscriptionAdminClient.deleteSubscription(subscriptionName);
topicAdminClient.deleteTopic(topicName);
}

private MessageAndConsumer pollQueueMessageAndConsumer(BlockingQueue<Object> queue)
throws InterruptedException {
Object obj = pollQueue(queue);
Expand Down Expand Up @@ -434,4 +494,14 @@ private Object pollQueue(BlockingQueue<Object> queue) throws InterruptedExceptio

return obj;
}

/** Generates message of given bytes by repeatedly concatenating a token. */
private String generateMessage(String token, int bytes) {
String result = "";
int tokenBytes = token.length();
for (int i = 0; i < Math.floor(bytes / tokenBytes) + 1; i++) {
result = result.concat(token);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,31 @@ public void testPublishMixedSizeAndDuration() throws Exception {
shutdownTestPublisher(publisher);
}

@Test
public void testPublishWithCompression() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(100))
.build())
.setEnableCompression(true)
.setCompressionBytesThreshold(100)
.build();

testPublisherServiceImpl.addPublishResponse(
PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2"));
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
ApiFuture<String> publishFuture2 = sendTestMessage(publisher, "B");
assertEquals("1", publishFuture1.get());
assertEquals("2", publishFuture2.get());

fakeExecutor.advanceTime(Duration.ofSeconds(100));
shutdownTestPublisher(publisher);
}

private ApiFuture<String> sendTestMessage(Publisher publisher, String data) {
return publisher.publish(
PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build());
Expand Down

0 comments on commit 4ad1a3f

Please sign in to comment.