Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
137646: changefeedccl: Fix Kafka v2 Sink GZIP Compression Level Issue r=asg0451 a=yaothao

Previously, the Kafka v2 sink could not properly handle negative
compression levels due to differences in the underlying compression
libraries used between the v1 and v2 sinks. The Kafka v1 sink
implementation, which relies on the Sarama library, uses the
klauspost/compress library that supports a compression level of -3.
However, our v2 sink has transitioned to using franz-go, which utilizes
the standard library's compression/gzip, and does not support the -3
level. In this update, the validation function now checks the GZIP
compression range between HuffmanOnly (-2) and BestCompression (9).

Fixes: #136492

Epic: none

Release note (bug fix): We have resolved an issue in the Kafka v2 sink
configuration within CockroachDB, where users were previously unable to
set negative GZIP compression levels. Now, users can configure the
CompressionLevel for the Kafka sink in the range of [-2, 9]. Please
update the user guide to include the new valid GZIP compression level
range of [-2, 9], where -2 enables Huffman encoding and -1 sets the
default compression.

137842: testutils: add a helper to capture a side-eye snapshot on demand r=srosenberg a=asg0451

Add a helper to capture a side-eye snapshot on
demand. This can be used to investigate test
failures that you can reproduce locally but only
by running the test many times.

Epic: None

Release note: None


Co-authored-by: yaothao <[email protected]>
Co-authored-by: Miles Frankel <[email protected]>
  • Loading branch information
3 people committed Jan 2, 2025
3 parents 75e6cfd + 490865c + cfb926b commit bb4bfae
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 11 deletions.
16 changes: 13 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2405,10 +2405,10 @@ def go_deps():
name = "com_github_dataexmachina_dev_side_eye_go",
build_file_proto_mode = "disable_global",
importpath = "github.com/DataExMachina-dev/side-eye-go",
sha256 = "8702e7d34a166207ca2329d9780681edfb18ef6a5a9120d35fe33526d418bc4f",
strip_prefix = "github.com/DataExMachina-dev/[email protected]20240528211710-5eb9c7a69e1d",
sha256 = "ea5676af6abb1965ba70c24a9090932c9e45a002411e9c86298f60f60f16a10a",
strip_prefix = "github.com/DataExMachina-dev/[email protected]20250102012104-645b45402adf",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/DataExMachina-dev/side-eye-go/com_github_dataexmachina_dev_side_eye_go-v0.0.0-20240528211710-5eb9c7a69e1d.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/DataExMachina-dev/side-eye-go/com_github_dataexmachina_dev_side_eye_go-v0.0.0-20250102012104-645b45402adf.zip",
],
)
go_repository(
Expand Down Expand Up @@ -6505,6 +6505,16 @@ def go_deps():
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/minio/c2goasm/com_github_minio_c2goasm-v0.0.0-20190812172519-36a3d3bbc4f3.zip",
],
)
go_repository(
name = "com_github_minio_highwayhash",
build_file_proto_mode = "disable_global",
importpath = "github.com/minio/highwayhash",
sha256 = "3ab23da1595a6b8543edf3de80e31afacfba2b1bc9e9f4cf60c6f54ce3f66fa9",
strip_prefix = "github.com/minio/[email protected]",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/minio/highwayhash/com_github_minio_highwayhash-v1.0.2.zip",
],
)
go_repository(
name = "com_github_minio_md5_simd",
build_file_proto_mode = "disable_global",
Expand Down
3 changes: 2 additions & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/DataDog/datadog-api-client-go/v2/com_github_datadog_datadog_api_client_go_v2-v2.15.0.zip": "1b719dab747449f279830dbb1a5920ec45ad041ea13ffde2ef7dc949c52a59f1",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/DataDog/datadog-go/com_github_datadog_datadog_go-v3.2.0+incompatible.zip": "ede4a024d3c106b2f57ca04d7bfc7610e0c83f4d8a3bace2cf87b42fd5cf66cd",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/DataDog/zstd/com_github_datadog_zstd-v1.5.6-0.20230824185856-869dae002e5e.zip": "e4924158bd1abf765a016d2c728fc367b32d20b86a268ef25743ba404c55e097",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/DataExMachina-dev/side-eye-go/com_github_dataexmachina_dev_side_eye_go-v0.0.0-20240528211710-5eb9c7a69e1d.zip": "8702e7d34a166207ca2329d9780681edfb18ef6a5a9120d35fe33526d418bc4f",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/DataExMachina-dev/side-eye-go/com_github_dataexmachina_dev_side_eye_go-v0.0.0-20250102012104-645b45402adf.zip": "ea5676af6abb1965ba70c24a9090932c9e45a002411e9c86298f60f60f16a10a",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/GeertJohan/go.incremental/com_github_geertjohan_go_incremental-v1.0.0.zip": "ce46b3b717f8d2927046bcfb99c6f490b1b547a681e6b23240ac2c2292a891e8",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/GeertJohan/go.rice/com_github_geertjohan_go_rice-v1.0.0.zip": "2fc48b9422bf356c18ed3fe32ec52f6a8b87ac168f83d2eed249afaebcc3eeb8",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/GoogleCloudPlatform/cloudsql-proxy/com_github_googlecloudplatform_cloudsql_proxy-v0.0.0-20190129172621-c8b1d7a94ddf.zip": "d18ff41309efc943c71d5c8faa5b1dd792700a79fa4f61508c5e50f17fc9ca6f",
Expand Down Expand Up @@ -795,6 +795,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/mileusna/useragent/com_github_mileusna_useragent-v0.0.0-20190129205925-3e331f0949a5.zip": "169eabdbd206177d55bcf544ec99437d5e10cea4104f8d542aa16515202e584f",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/minio/asm2plan9s/com_github_minio_asm2plan9s-v0.0.0-20200509001527-cdd76441f9d8.zip": "39a2e28284764fd5423247d7469875046d0c8c4c2773333abf1c544197e9d946",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/minio/c2goasm/com_github_minio_c2goasm-v0.0.0-20190812172519-36a3d3bbc4f3.zip": "04367ddf0fc5cd0f293e2c4f1acefb131b572539d88b5804d92efc905eb718b5",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/minio/highwayhash/com_github_minio_highwayhash-v1.0.2.zip": "3ab23da1595a6b8543edf3de80e31afacfba2b1bc9e9f4cf60c6f54ce3f66fa9",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/minio/md5-simd/com_github_minio_md5_simd-v1.1.2.zip": "f829d35a6e6897db415af8888c4b074d1a253aee0e8fb7054b4d95477a81c3d6",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/minio/minio-go/com_github_minio_minio_go-v0.0.0-20190131015406-c8a261de75c1.zip": "329d7e50f7e20014fa563aa8ff7a789106660e4b6fed87b2ca17fe3387cecb86",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/minio/minio-go/v7/com_github_minio_minio_go_v7-v7.0.21.zip": "826952231f5c7622b7c2d4b5180a4e12cf3379bd3842e1ef4934dfe115786218",
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ require (
github.com/BurntSushi/toml v1.2.1
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/DataDog/datadog-api-client-go/v2 v2.15.0
github.com/DataExMachina-dev/side-eye-go v0.0.0-20240528211710-5eb9c7a69e1d
github.com/DataExMachina-dev/side-eye-go v0.0.0-20250102012104-645b45402adf
github.com/IBM/sarama v1.43.1
github.com/Masterminds/semver/v3 v3.1.1
github.com/MichaelTJones/walk v0.0.0-20161122175330-4748e29d5718
Expand Down Expand Up @@ -387,6 +387,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/minio-go/v7 v7.0.21 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/uf
github.com/DataDog/zstd v1.4.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/DataExMachina-dev/side-eye-go v0.0.0-20240528211710-5eb9c7a69e1d h1:0NRhNOBaRnkXED8mftbOSCNGAf8MZhv4zu840hIUpIc=
github.com/DataExMachina-dev/side-eye-go v0.0.0-20240528211710-5eb9c7a69e1d/go.mod h1:FukCpc3od3BzYgxUtTWm3iB4ALtc4UknLNMQ0rq+V3A=
github.com/DataExMachina-dev/side-eye-go v0.0.0-20250102012104-645b45402adf h1:OrqueGhFhbxPYRBuBtgCx+ZDad4jFWz6obCRvdL+o98=
github.com/DataExMachina-dev/side-eye-go v0.0.0-20250102012104-645b45402adf/go.mod h1:xvmHxkByFl/CbWOoF3oBnlBUx5TtYPtYA9mhAcPHloM=
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0=
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190129172621-c8b1d7a94ddf/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo=
Expand Down Expand Up @@ -1742,6 +1742,8 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpsp
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
Expand Down Expand Up @@ -2714,6 +2716,7 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190124100055-b90733256f2e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ go_library(
"@com_github_gogo_protobuf//types",
"@com_github_google_btree//:btree",
"@com_github_ibm_sarama//:sarama",
"@com_github_klauspost_compress//gzip",
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_lib_pq//:pq",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package changefeedccl

import (
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
Expand All @@ -28,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"github.com/rcrowley/go-metrics"
"github.com/twmb/franz-go/pkg/kadm"
Expand Down Expand Up @@ -569,7 +569,7 @@ func validateCompressionLevel(compressionType compressionCodec, level int) error
case sarama.CompressionNone:
return nil
case sarama.CompressionGZIP:
if level < gzip.NoCompression || level > gzip.BestCompression {
if level < gzip.HuffmanOnly || level > gzip.BestCompression {
return errors.Errorf(`invalid gzip compression level: %d`, level)
}
case sarama.CompressionSnappy:
Expand Down
18 changes: 18 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,18 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) {
level: "9",
expected: kgo.GzipCompression().WithLevel(9),
},
{
name: "gzip level -1",
codec: "GZIP",
level: "-1",
expected: kgo.GzipCompression().WithLevel(-1),
},
{
name: "gzip level -2",
codec: "GZIP",
level: "-2",
expected: kgo.GzipCompression().WithLevel(-2),
},
{
name: "snappy no level",
codec: "SNAPPY",
Expand Down Expand Up @@ -481,6 +493,12 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) {
level: "100",
shouldErr: true,
},
{
name: "invalid gzip level '-3'",
codec: "GZIP",
level: "-3",
shouldErr: true,
},
{
name: "invalid snappy level",
codec: "SNAPPY",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ func (r *testRunner) serveHTTP(wr http.ResponseWriter, req *http.Request) {
}
sideEyeEnv := w.Cluster().sideEyeEnvName()
if sideEyeEnv != "" {
clusterBuilder.WriteString(fmt.Sprintf(" (<a href='%s'>Side-Eye</a>)", sideeyeclient.SnapshotsURL(sideEyeEnv)))
clusterBuilder.WriteString(fmt.Sprintf(" (<a href='%s'>Side-Eye</a>)", sideeyeclient.RecordingsURL(sideEyeEnv)))
}
}
t := w.Test()
Expand Down
3 changes: 3 additions & 0 deletions pkg/testutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"keys.go",
"net.go",
"pprof.go",
"sideeye.go",
"soon.go",
"sort.go",
"subtest.go",
Expand All @@ -31,6 +32,8 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_dataexmachina_dev_side_eye_go//sideeye",
"@com_github_stretchr_testify//require",
],
)

Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/lint/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ func TestLint(t *testing.T) {
":!acceptance/test_acceptance.go", // For COCKROACH_RUN_ACCEPTANCE
":!compose/compare/compare/compare_test.go", // For COCKROACH_RUN_COMPOSE_COMPARE
":!compose/compose_test.go", // For COCKROACH_RUN_COMPOSE
":!testutils/sideeye.go", // For SIDE_EYE_API_TOKEN
},
},
} {
Expand Down
67 changes: 67 additions & 0 deletions pkg/testutils/sideeye.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package testutils

import (
"context"
"fmt"
"os"
"time"

"github.com/DataExMachina-dev/side-eye-go/sideeye"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

type testingT interface {
require.TestingT
TestFatalerLogger
}

// CaptureSideEyeSnapshot captures a Side-Eye snapshot if the
// SIDE_EYE_TOKEN env var is set. If the snapshot is captured, the snapshot's
// URL is logged. Snapshots are captured with a 90s timeout.
func CaptureSideEyeSnapshot(ctx context.Context, t testingT) {
t.Helper()

if sideEyeToken := os.Getenv("SIDE_EYE_TOKEN"); sideEyeToken == "" {
t.Logf("not capturing Side-Eye snapshot; SIDE_EYE_TOKEN env var not set. You can find it in slack or confluence " +
"or on your profile page in the Side-Eye app. If using ./dev, make sure you pass it in the environment: " +
"`./dev test mytest -- --test_env SIDE_EYE_TOKEN=xxx --strip=never`")
return
}

username := os.Getenv("USER")
hostname, err := os.Hostname()
require.NoError(t, err)

var name string
if t, ok := t.(TestNamedFatalerLogger); ok {
name = t.Name()
} else {
name = "unknown test"
}
name = fmt.Sprintf("%s@%s: %s", username, hostname, name)

snapshotCtx, cancel := context.WithTimeoutCause(
ctx, 90*time.Second, errors.New("timed out waiting for Side-Eye snapshot"),
)
defer cancel()
snapshotURL, err := sideeye.CaptureSelfSnapshot(snapshotCtx, name, sideeye.WithEnvironment("unit tests"))
if err != nil {
if errors.As(err, &sideeye.BinaryStrippedError{}) {
t.Logf("failed to capture Side-Eye snapshot because the binary is stripped of debug info; " +
"if running with `go test` instead of bazel, use `go test -o test.out` " +
"for creating a non-stripped binary. If running inside bazel, " +
"add `build --strip=never` to your .bazelrc.user file, or pass `--strip=never` to " +
"bazel test, or with `dev`: `./dev test mytest -- --strip=never`")
}
t.Logf("failed to capture Side-Eye snapshot: %s", err)
return
}
t.Logf("captured Side-Eye snapshot: %s", snapshotURL)

}

0 comments on commit bb4bfae

Please sign in to comment.