Skip to content

Commit

Permalink
changefeedccl: Fix Kafka v2 Sink GZIP Compression Level Issue
Browse files Browse the repository at this point in the history
Previously, the Kafka v2 sink could not properly handle negative
compression levels due to differences in the underlying compression
libraries used between the v1 and v2 sinks. The Kafka v1 sink
implementation, which relies on the Sarama library, uses the
klauspost/compress library that supports a compression level of -3.
However, our v2 sink has transitioned to using franz-go, which utilizes
the standard library's compression/gzip, and does not support the -3
level. In this update, the validation function now checks the GZIP
compression range between HuffmanOnly (-2) and BestCompression (9).

Fixes: #136492

Epic: none

Release note (bug fix): We have resolved an issue in the Kafka v2 sink
configuration within CockroachDB, where users were previously unable to
set negative GZIP compression levels. Now, users can configure the
CompressionLevel for the Kafka sink in the range of [-2, 9]. Please
update the user guide to include the new valid GZIP compression level
range of [-2, 9], where -2 enables Huffman encoding and -1 sets the
default compression.
  • Loading branch information
yaothao committed Dec 19, 2024
1 parent a833dc2 commit 490865c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ go_library(
"@com_github_gogo_protobuf//types",
"@com_github_google_btree//:btree",
"@com_github_ibm_sarama//:sarama",
"@com_github_klauspost_compress//gzip",
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_lib_pq//:pq",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package changefeedccl

import (
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
Expand All @@ -28,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"github.com/rcrowley/go-metrics"
"github.com/twmb/franz-go/pkg/kadm"
Expand Down Expand Up @@ -569,7 +569,7 @@ func validateCompressionLevel(compressionType compressionCodec, level int) error
case sarama.CompressionNone:
return nil
case sarama.CompressionGZIP:
if level < gzip.NoCompression || level > gzip.BestCompression {
if level < gzip.HuffmanOnly || level > gzip.BestCompression {
return errors.Errorf(`invalid gzip compression level: %d`, level)
}
case sarama.CompressionSnappy:
Expand Down
18 changes: 18 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,18 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) {
level: "9",
expected: kgo.GzipCompression().WithLevel(9),
},
{
name: "gzip level -1",
codec: "GZIP",
level: "-1",
expected: kgo.GzipCompression().WithLevel(-1),
},
{
name: "gzip level -2",
codec: "GZIP",
level: "-2",
expected: kgo.GzipCompression().WithLevel(-2),
},
{
name: "snappy no level",
codec: "SNAPPY",
Expand Down Expand Up @@ -481,6 +493,12 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) {
level: "100",
shouldErr: true,
},
{
name: "invalid gzip level '-3'",
codec: "GZIP",
level: "-3",
shouldErr: true,
},
{
name: "invalid snappy level",
codec: "SNAPPY",
Expand Down

0 comments on commit 490865c

Please sign in to comment.