From 490865cecb282825d17d8a37dad585408bd227d9 Mon Sep 17 00:00:00 2001 From: yaothao Date: Tue, 17 Dec 2024 18:47:10 -0500 Subject: [PATCH] changefeedccl: Fix Kafka v2 Sink GZIP Compression Level Issue Previously, the Kafka v2 sink could not properly handle negative compression levels due to differences in the underlying compression libraries used between the v1 and v2 sinks. The Kafka v1 sink implementation, which relies on the Sarama library, uses the klauspost/compress library that supports a compression level of -3. However, our v2 sink has transitioned to using franz-go, which utilizes the standard library's compression/gzip, and does not support the -3 level. In this update, the validation function now checks the GZIP compression range between HuffmanOnly (-2) and BestCompression (9). Fixes: #136492 Epic: none Release note (bug fix): We have resolved an issue in the Kafka v2 sink configuration within CockroachDB, where users were previously unable to set negative GZIP compression levels. Now, users can configure the CompressionLevel for the Kafka sink in the range of [-2, 9]. Please update the user guide to include the new valid GZIP compression level range of [-2, 9], where -2 enables Huffman encoding and -1 sets the default compression. --- pkg/ccl/changefeedccl/BUILD.bazel | 1 - pkg/ccl/changefeedccl/sink_kafka_v2.go | 4 ++-- pkg/ccl/changefeedccl/sink_kafka_v2_test.go | 18 ++++++++++++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 7e24cfc038eb..2efd6886edbd 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -174,7 +174,6 @@ go_library( "@com_github_gogo_protobuf//types", "@com_github_google_btree//:btree", "@com_github_ibm_sarama//:sarama", - "@com_github_klauspost_compress//gzip", "@com_github_klauspost_compress//zstd", "@com_github_klauspost_pgzip//:pgzip", "@com_github_lib_pq//:pq", diff --git a/pkg/ccl/changefeedccl/sink_kafka_v2.go b/pkg/ccl/changefeedccl/sink_kafka_v2.go index 31ae991fccd1..2bf3053a5e22 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_v2.go +++ b/pkg/ccl/changefeedccl/sink_kafka_v2.go @@ -6,6 +6,7 @@ package changefeedccl import ( + "compress/gzip" "context" "crypto/tls" "crypto/x509" @@ -28,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" - "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/zstd" "github.com/rcrowley/go-metrics" "github.com/twmb/franz-go/pkg/kadm" @@ -569,7 +569,7 @@ func validateCompressionLevel(compressionType compressionCodec, level int) error case sarama.CompressionNone: return nil case sarama.CompressionGZIP: - if level < gzip.NoCompression || level > gzip.BestCompression { + if level < gzip.HuffmanOnly || level > gzip.BestCompression { return errors.Errorf(`invalid gzip compression level: %d`, level) } case sarama.CompressionSnappy: diff --git a/pkg/ccl/changefeedccl/sink_kafka_v2_test.go b/pkg/ccl/changefeedccl/sink_kafka_v2_test.go index 4730dfe49e2d..d52460a8418e 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_v2_test.go +++ b/pkg/ccl/changefeedccl/sink_kafka_v2_test.go @@ -448,6 +448,18 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) { level: "9", expected: kgo.GzipCompression().WithLevel(9), }, + { + name: "gzip level -1", + codec: "GZIP", + level: "-1", + expected: kgo.GzipCompression().WithLevel(-1), + }, + { + name: "gzip level -2", + codec: "GZIP", + level: "-2", + expected: kgo.GzipCompression().WithLevel(-2), + }, { name: "snappy no level", codec: "SNAPPY", @@ -481,6 +493,12 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) { level: "100", shouldErr: true, }, + { + name: "invalid gzip level '-3'", + codec: "GZIP", + level: "-3", + shouldErr: true, + }, { name: "invalid snappy level", codec: "SNAPPY",