Skip to content

Commit

Permalink
cdctest: add mvcc_timestamp, diff, sink_config options to nemesis tes…
Browse files Browse the repository at this point in the history
…ting

This work adds to random changefeed settings we pass into changefeeds
in nemesis testing. This commit adds support for: changefeeds without
the diff option specified, the mvcc_timestamp option and the kafka and
pubsub sink configs.

See also: cockroachdb#134119

Epic: CRDB-42866

Release note: None
  • Loading branch information
aerfrei committed Jan 21, 2025
1 parent 7d9b214 commit f78327a
Show file tree
Hide file tree
Showing 4 changed files with 513 additions and 102 deletions.
230 changes: 206 additions & 24 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,227 @@ import (
"bytes"
"context"
gosql "database/sql"
"encoding/json"
"fmt"
"math/rand"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/internal/sqlsmith"
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

type ChangefeedOption struct {
FullTableName bool
Format string
KeyInValue bool
Format string
PubsubSinkConfig string
BooleanOptions map[string]bool
KafkaSinkConfig string
}

type SinkConfig struct {
Flush map[string]any
Retry map[string]any
Base map[string]any
}

func (sk SinkConfig) OptionString() (string, error) {
nonEmptyConfig := make(map[string]any)

if len(sk.Flush) > 0 {
nonEmptyConfig["Flush"] = sk.Flush
}
if len(sk.Retry) > 0 {
nonEmptyConfig["Retry"] = sk.Retry
}
for k, v := range sk.Base {
nonEmptyConfig[k] = v
}

jsonData, err := json.Marshal(nonEmptyConfig)
if err != nil {
return "", err
}

return string(jsonData), nil
}

func newFlushConfig(isKafka bool) map[string]any {
flush := make(map[string]any)

nonZeroInterval := "500ms"
if rand.Intn(2) < 1 {
// Setting either Messages or Bytes with a non-zero value without setting
// Frequency is an invalid configuration. We set Frequency to a non-zero
// interval here but can reset it later.
flush["Messages"] = rand.Intn(10) + 1
flush["Frequency"] = nonZeroInterval
}
if rand.Intn(2) < 1 {
flush["Bytes"] = rand.Intn(1000) + 1
flush["Frequency"] = nonZeroInterval
}
if rand.Intn(2) < 1 {
intervals := []string{"100ms", "500ms", "1s", "5s"}
interval := intervals[rand.Intn(len(intervals))]
flush["Frequency"] = interval
}

if isKafka && rand.Intn(2) < 1 {
flush["MaxMessages"] = rand.Intn(10000) + 1
}

return flush
}

func newRetryConfig() map[string]any {
retry := make(map[string]any)
if rand.Intn(2) < 1 {
if rand.Intn(2) < 1 {
retry["Max"] = "inf"
} else {
retry["Max"] = rand.Intn(5) + 1
}
}
if rand.Intn(2) < 1 {
intervals := []string{"100ms", "500ms", "1s", "5s"}
interval := intervals[rand.Intn(len(intervals))]
retry["Backoff"] = interval
}
return retry
}

func newKafkaBaseConfig() map[string]any {
base := make(map[string]any)
if rand.Intn(2) < 1 {
clientIds := []string{"ABCabc123._-", "FooBar", "2002-02-02.1_1"}
clientId := clientIds[rand.Intn(len(clientIds))]
base["ClientID"] = clientId
}
if rand.Intn(2) < 1 {
versions := []string{"2.7.2", "0.8.2.0"}
version := versions[rand.Intn(len(versions))]
base["Version"] = version
}
if rand.Intn(2) < 1 {
compressions := []string{"NONE", "GZIP", "SNAPPY"}
// lz4 compression requires Version >= V0_10_0_0
if base["Version"] != "0.8.2.0" {
compressions = append(compressions, "LZ4")
}
// zstd compression requires Version >= V2_1_0_0
if base["Version"] == "2.7.2" {
compressions = append(compressions, "ZSTD")
}
compression := compressions[rand.Intn(len(compressions))]
base["Compression"] = compression
if compression == "GZIP" {
// GZIP compression can be integers -1 to 10
level := rand.Intn(11) - 1
base["CompressionLevel"] = level

}
if base["Version"] == "2.7.2" && compression == "ZSTD" {
level := rand.Intn(4) + 1
base["CompressionLevel"] = level
}

if base["Version"] != "0.8.2.0" && compression == "LZ4" {
levels := []int{0, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072}
level := levels[rand.Intn(len(levels))]
base["CompressionLevel"] = level
}
}
if rand.Intn(2) < 1 {
levels := []string{"ONE", "NONE", "ALL"}
level := levels[rand.Intn(len(levels))]
base["RequiredAcks"] = level
}
return base
}

func newSinkConfig(isKafka bool) SinkConfig {
if isKafka {
return SinkConfig{
Flush: newFlushConfig(isKafka),
Base: newKafkaBaseConfig(),
}
}

return SinkConfig{
Flush: newFlushConfig(isKafka),
Retry: newRetryConfig(),
}
}

func newChangefeedOption(testName string) ChangefeedOption {
isCloudstorage := strings.Contains(testName, "cloudstorage")
isWebhook := strings.Contains(testName, "webhook")
cfo := ChangefeedOption{
FullTableName: rand.Intn(2) < 1,
isPubsub := strings.Contains(testName, "pubsub")
isKafka := strings.Contains(testName, "kafka")

cfo := ChangefeedOption{}

cfo.BooleanOptions = make(map[string]bool)

booleanOptionEligibility := map[string]bool{
changefeedbase.OptFullTableName: true,
// Because key_in_value is on by default for cloudstorage and webhook sinks,
// the key in the value is extracted and removed from the test feed
// messages (see extractKeyFromJSONValue function).
// TODO: (#138749) enable testing key_in_value for cloudstorage
// and webhook sinks
KeyInValue: !isCloudstorage && !isWebhook && rand.Intn(2) < 1,
Format: "json",
changefeedbase.OptKeyInValue: !isCloudstorage && !isWebhook,
changefeedbase.OptDiff: true,
changefeedbase.OptMVCCTimestamps: true,
}

for option, eligible := range booleanOptionEligibility {
cfo.BooleanOptions[option] = eligible && rand.Intn(2) < 1
}

if isPubsub {
sinkConfigOptionString, err := newSinkConfig(isKafka).OptionString()
if err != nil {
cfo.PubsubSinkConfig = sinkConfigOptionString
}
}

if isKafka {
sinkConfigOptionString, err := newSinkConfig(isKafka).OptionString()
if err != nil {
cfo.KafkaSinkConfig = sinkConfigOptionString
}
}

if isCloudstorage && rand.Intn(2) < 1 {
cfo.Format = "parquet"
} else {
cfo.Format = "json"
}

return cfo
}

func (co ChangefeedOption) String() string {
return fmt.Sprintf("full_table_name=%t,key_in_value=%t,format=%s",
co.FullTableName, co.KeyInValue, co.Format)
}

func (cfo ChangefeedOption) OptionString() string {
options := ""
if cfo.Format == "parquet" {
options = ", format=parquet"
var options []string
for option, value := range cfo.BooleanOptions {
if value {
options = append(options, option)
}
}
if cfo.Format != "" {
option := fmt.Sprintf("format=%s", cfo.Format)
options = append(options, option)
}
if cfo.FullTableName {
options = options + ", full_table_name"
if cfo.PubsubSinkConfig != "" {
option := fmt.Sprintf("pubsub_sink_config='%s'", cfo.PubsubSinkConfig)
options = append(options, option)
}
if cfo.KeyInValue {
options = options + ", key_in_value"
if cfo.KafkaSinkConfig != "" {
option := fmt.Sprintf("kafka_sink_config='%s'", cfo.KafkaSinkConfig)
options = append(options, option)
}
return options
return fmt.Sprintf("WITH updated, resolved, %s", strings.Join(options, ","))
}

type NemesesOption struct {
Expand Down Expand Up @@ -250,7 +416,7 @@ func RunNemesis(

cfo := newChangefeedOption(testName)
changefeedStatement := fmt.Sprintf(
`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff%s`,
`CREATE CHANGEFEED FOR foo %s`,
cfo.OptionString(),
)
log.Infof(ctx, "Using changefeed options: %s", changefeedStatement)
Expand All @@ -270,14 +436,17 @@ func RunNemesis(
return nil, err
}

baV, err := NewBeforeAfterValidator(db, `foo`, cfo)
baV, err := NewBeforeAfterValidator(db, `foo`, cfo.BooleanOptions[changefeedbase.OptDiff])
if err != nil {
return nil, err
}

tV := NewTopicValidator(`foo`, cfo.BooleanOptions[changefeedbase.OptFullTableName])

validators := Validators{
NewOrderValidator(`foo`),
baV,
tV,
}

if nOp.EnableFpValidator {
Expand All @@ -288,6 +457,19 @@ func RunNemesis(
validators = append(validators, fprintV)
}

if cfo.BooleanOptions[changefeedbase.OptKeyInValue] {
kivV, err := NewKeyInValueValidator(db, `foo`)
if err != nil {
return nil, err
}
validators = append(validators, kivV)
}

if cfo.BooleanOptions[changefeedbase.OptMVCCTimestamps] {
mvccV := NewMvccTimestampValidator()
validators = append(validators, mvccV)
}

ns.v = NewCountValidator(validators)

// Initialize the actual row count, overwriting what the initialization loop did. That
Expand Down
Loading

0 comments on commit f78327a

Please sign in to comment.