diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 735ae462192c..6c830553f308 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -2177,6 +2177,32 @@ func TestIntegration_DetectProjectID(t *testing.T) { } } +func TestIntegration_PublishCompression(t *testing.T) { + ctx := context.Background() + client := integrationTestClient(ctx, t) + defer client.Close() + + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) + if err != nil { + t.Fatal(err) + } + defer topic.Delete(ctx) + defer topic.Stop() + + topic.PublishSettings.EnableCompression = true + topic.PublishSettings.CompressionBytesThreshold = 50 + + const messageSizeBytes = 1000 + + msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))} + res := topic.Publish(ctx, msg) + + _, err = res.Get(ctx) + if err != nil { + t.Errorf("publish result got err: %v", err) + } +} + // createTopicWithRetry creates a topic, wrapped with testutil.Retry and returns the created topic or an error. func createTopicWithRetry(ctx context.Context, t *testing.T, c *Client, topicID string, cfg *TopicConfig) (*Topic, error) { var topic *Topic diff --git a/pubsub/topic.go b/pubsub/topic.go index b85b4b1c342a..b953cb4d1bcb 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -36,6 +36,7 @@ import ( "google.golang.org/api/support/bundler" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" @@ -117,6 +118,17 @@ type PublishSettings struct { // FlowControlSettings defines publisher flow control settings. FlowControlSettings FlowControlSettings + + // EnableCompression enables transport compression for Publish operations + EnableCompression bool + + // CompressionBytesThreshold defines the threshold (in bytes) above which messages + // are compressed for transport. Only takes effect if EnableCompression is true. + CompressionBytesThreshold int +} + +func (ps *PublishSettings) shouldCompress(batchSize int) bool { + return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold } // DefaultPublishSettings holds the default values for topics' PublishSettings. @@ -134,6 +146,10 @@ var DefaultPublishSettings = PublishSettings{ MaxOutstandingBytes: -1, LimitExceededBehavior: FlowControlIgnore, }, + // Publisher compression defaults matches Java's defaults + // https://github.com/googleapis/java-pubsub/blob/7d33e7891db1b2e32fd523d7655b6c11ea140a8b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L717-L718 + EnableCompression: false, + CompressionBytesThreshold: 240, } // CreateTopic creates a new topic. @@ -875,6 +891,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) } pbMsgs := make([]*pb.PubsubMessage, len(bms)) var orderingKey string + batchSize := 0 for i, bm := range bms { orderingKey = bm.msg.OrderingKey pbMsgs[i] = &pb.PubsubMessage{ @@ -882,6 +899,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } + batchSize = batchSize + proto.Size(pbMsgs[i]) bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse @@ -897,11 +915,17 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) opt.Resolve(&settings) } r := &publishRetryer{defaultRetryer: settings.Retry()} + gaxOpts := []gax.CallOption{ + gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)), + gax.WithRetry(func() gax.Retryer { return r }), + } + if t.PublishSettings.shouldCompress(batchSize) { + gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name))) + } res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{ Topic: t.name, Messages: pbMsgs, - }, gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)), - gax.WithRetry(func() gax.Retryer { return r })) + }, gaxOpts...) } end := time.Now() if err != nil { diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index eb53bef5f370..974e0c487d65 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -747,3 +747,26 @@ func TestPublishOrderingNotEnabled(t *testing.T) { t.Errorf("got %v, want errTopicOrderingNotEnabled", err) } } + +func TestPublishCompression(t *testing.T) { + ctx := context.Background() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "topic-compression") + defer topic.Stop() + + topic.PublishSettings.EnableCompression = true + topic.PublishSettings.CompressionBytesThreshold = 50 + + const messageSizeBytes = 1000 + + msg := &Message{Data: bytes.Repeat([]byte{'A'}, int(messageSizeBytes))} + res := topic.Publish(ctx, msg) + + _, err := res.Get(ctx) + if err != nil { + t.Errorf("publish result got err: %v", err) + } +}