Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding grpc compression support #1000

Merged
merged 54 commits into from
May 10, 2022
Merged
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
47e4d21
Adding gRPC compression support to the library
rajanya-google Feb 3, 2022
7190fca
Minor comment fix
rajanya-google Feb 3, 2022
8d05ba2
Formatting the code
rajanya-google Feb 3, 2022
13a071d
Adding unit test for compression
rajanya-google Feb 7, 2022
d7eb3dc
Adding integration test for compression
rajanya-google Feb 8, 2022
49896ac
Formatting
rajanya-google Feb 8, 2022
f9bea54
Refactoring integration tests to add support for overriding endpoint
rajanya-google Feb 8, 2022
be4f7b9
Adding sample for publish with compression; Updating README
rajanya-google Feb 9, 2022
525a738
Adding integration test for compression sample
rajanya-google Feb 9, 2022
b6d51d0
Adding parameter compressionBytesThreshold to Publisher; Adding loggi…
rajanya-google Feb 16, 2022
e0819e5
Addressing PR comments
rajanya-google Mar 2, 2022
6412cf2
Addressing checkstyle
rajanya-google Mar 2, 2022
014fe2a
Addressed PR comment
rajanya-google Mar 2, 2022
4bbe64a
Addressing PR comment to put a Precondition for compression and its t…
rajanya-google Mar 10, 2022
3a8c5f7
Addressing PR review
rajanya-google Mar 11, 2022
01c7e3f
Removing logging from example
rajanya-google Mar 11, 2022
22dee27
Adding logging properties
rajanya-google Mar 11, 2022
7832599
Making the publish call unified with context as per PR comments
rajanya-google Mar 15, 2022
9f3fcc0
Removing sample code as per tianzi@'s comments
rajanya-google Mar 21, 2022
9ae5944
Minor fixes
rajanya-google Apr 11, 2022
2fa8849
Adding gRPC compression support to the library
rajanya-google Feb 3, 2022
ab3fc37
Minor comment fix
rajanya-google Feb 3, 2022
d71a052
Formatting the code
rajanya-google Feb 3, 2022
99686ea
Adding unit test for compression
rajanya-google Feb 7, 2022
d8816ca
Adding integration test for compression
rajanya-google Feb 8, 2022
cfb85a0
Formatting
rajanya-google Feb 8, 2022
1d89073
Refactoring integration tests to add support for overriding endpoint
rajanya-google Feb 8, 2022
ed13022
Adding sample for publish with compression; Updating README
rajanya-google Feb 9, 2022
a133b99
Adding integration test for compression sample
rajanya-google Feb 9, 2022
9087f22
Adding parameter compressionBytesThreshold to Publisher; Adding loggi…
rajanya-google Feb 16, 2022
509193c
Addressing PR comments
rajanya-google Mar 2, 2022
061f679
Addressing checkstyle
rajanya-google Mar 2, 2022
86a8327
Addressed PR comment
rajanya-google Mar 2, 2022
1abc9f8
Addressing PR comment to put a Precondition for compression and its t…
rajanya-google Mar 10, 2022
68484e5
Addressing PR review
rajanya-google Mar 11, 2022
fc2378e
Removing logging from example
rajanya-google Mar 11, 2022
7c34ef6
Adding logging properties
rajanya-google Mar 11, 2022
92c2824
Making the publish call unified with context as per PR comments
rajanya-google Mar 15, 2022
265350a
Removing sample code as per tianzi@'s comments
rajanya-google Mar 21, 2022
2e9a738
Minor fixes
rajanya-google Apr 11, 2022
47e725f
Merge remote-tracking branch 'origin/grpc-compression' into grpc-comp…
rajanya-google Apr 28, 2022
cef13ea
Fixing IT
rajanya-google Apr 28, 2022
ed73695
Creating a class variable publishContext to remove the overhead of Gr…
rajanya-google May 5, 2022
db27b12
fixing lint format
rajanya-google May 5, 2022
4d44679
Addressed PR comments
rajanya-google May 6, 2022
b1a2d37
Removing test
rajanya-google May 6, 2022
a26562e
build(deps): update dependency com.google.cloud:google-cloud-shared-c…
renovate-bot Apr 29, 2022
d913048
fix: added exactly once delivery files to owlbot config (#1106)
mmicatka May 2, 2022
5c9778a
chore(bazel): update version of Protobuf to v3.20.1 (#1079)
gcf-owl-bot[bot] May 2, 2022
66d356f
chore(deps): upgrade gapic-generator-java to 2.7.0 and update gax-jav…
gcf-owl-bot[bot] May 5, 2022
53ebb0b
build(deps): update dependency org.apache.maven.plugins:maven-project…
renovate-bot May 5, 2022
1fb553c
Formatting
rajanya-google Feb 8, 2022
f83bcc7
Formatting
rajanya-google May 9, 2022
3903653
Merge remote-tracking branch 'upstream/main' into grpc-compression
rajanya-google May 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressed PR comments
rajanya-google committed May 6, 2022
commit 4d44679a109b7262b26b6b39c157e9c414fd70be
Original file line number Diff line number Diff line change
@@ -119,10 +119,10 @@ public class Publisher implements PublisherInterface {
private MessageFlowController flowController = null;

private final boolean enableCompression;
private final boolean enableCompressionBytesThreshold;
private final long compressionBytesThreshold;

private GrpcCallContext publishContext;
private final GrpcCallContext publishContext;
private final GrpcCallContext publishContextWithCompression;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
@@ -151,7 +151,6 @@ private Publisher(Builder builder) throws IOException {
this.enableMessageOrdering = builder.enableMessageOrdering;
this.messageTransform = builder.messageTransform;
this.enableCompression = builder.enableCompression;
this.enableCompressionBytesThreshold = builder.enableCompressionBytesThreshold;
this.compressionBytesThreshold = builder.compressionBytesThreshold;

messagesBatches = new HashMap<>();
@@ -205,6 +204,9 @@ private Publisher(Builder builder) throws IOException {
shutdown = new AtomicBoolean(false);
messagesWaiter = new Waiter();
this.publishContext = GrpcCallContext.createDefault();
this.publishContextWithCompression =
GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION));
}

/** Topic which the publisher publishes to. */
@@ -256,12 +258,6 @@ public ApiFuture<String> publish(PubsubMessage message) {
+ "Publisher client. Please create a Publisher client with "
+ "setEnableMessageOrdering(true) in the builder.");

Preconditions.checkState(
!enableCompressionBytesThreshold || enableCompression,
"Cannot publish a message with compression bytes threshold when compression is not enabled "
+ "in the Publisher client. Please create a Publisher client with "
+ "setEnableCompression(true) in the builder.");

final OutstandingPublish outstandingPublish =
new OutstandingPublish(messageTransform.apply(message));

@@ -451,9 +447,9 @@ private void publishAllWithoutInflightForKey(final String orderingKey) {
}

private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
GrpcCallContext context = publishContext;
if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) {
mmicatka marked this conversation as resolved.
Show resolved Hide resolved
publishContext =
publishContext.withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION));
context = publishContextWithCompression;
}
return publisherStub
.publishCallable()
@@ -462,7 +458,7 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
.setTopic(topicName)
.addAllMessages(outstandingBatch.getMessages())
.build(),
publishContext);
context);
}

private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
@@ -745,7 +741,6 @@ public PubsubMessage apply(PubsubMessage input) {
};

private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION;
private boolean enableCompressionBytesThreshold = DEFAULT_ENABLE_COMPRESSION;
private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD;

private Builder(String topic) {
@@ -869,7 +864,6 @@ public Builder setEnableCompression(boolean enableCompression) {
* effect if setEnableCompression(true) is also called."
*/
public Builder setCompressionBytesThreshold(long compressionBytesThreshold) {
this.enableCompressionBytesThreshold = true;
this.compressionBytesThreshold = compressionBytesThreshold;
mmicatka marked this conversation as resolved.
Show resolved Hide resolved
return this;
}