Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adjust kafka v2 sink to support negative GZIP compression level
Browse files Browse the repository at this point in the history
Fixes: cockroachdb#136492
Release note (bug fix): fixed a bug introduced in kafka v2 sink.
Previously, the Kafka v2 sink configuration did not accept negative
values for the compression level setting, despite -1 and -2 having
special meanings within the system.

Action required: Please update the corresponding documentation
to reflect the correct compression level with range [-2,9]
yaothao committed Dec 18, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent f333456 commit 109be1e
Showing 3 changed files with 20 additions and 3 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -174,7 +174,6 @@ go_library(
"@com_github_gogo_protobuf//types",
"@com_github_google_btree//:btree",
"@com_github_ibm_sarama//:sarama",
"@com_github_klauspost_compress//gzip",
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_lib_pq//:pq",
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka_v2.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
package changefeedccl

import (
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
@@ -28,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"github.com/rcrowley/go-metrics"
"github.com/twmb/franz-go/pkg/kadm"
@@ -569,7 +569,7 @@ func validateCompressionLevel(compressionType compressionCodec, level int) error
case sarama.CompressionNone:
return nil
case sarama.CompressionGZIP:
if level < gzip.NoCompression || level > gzip.BestCompression {
if level < gzip.HuffmanOnly || level > gzip.BestCompression {
return errors.Errorf(`invalid gzip compression level: %d`, level)
}
case sarama.CompressionSnappy:
18 changes: 18 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka_v2_test.go
Original file line number Diff line number Diff line change
@@ -448,6 +448,18 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) {
level: "9",
expected: kgo.GzipCompression().WithLevel(9),
},
{
name: "gzip level -1",
codec: "GZIP",
level: "-1",
expected: kgo.GzipCompression().WithLevel(-1),
},
{
name: "gzip level -2",
codec: "GZIP",
level: "-2",
expected: kgo.GzipCompression().WithLevel(-2),
},
{
name: "snappy no level",
codec: "SNAPPY",
@@ -481,6 +493,12 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) {
level: "100",
shouldErr: true,
},
{
name: "invalid gzip level '-3'",
codec: "GZIP",
level: "-3",
shouldErr: true,
},
{
name: "invalid snappy level",
codec: "SNAPPY",

0 comments on commit 109be1e

Please sign in to comment.