Skip to content

Commit

Permalink
config(ticdc): integrity only works for kafka sink (#8853)
Browse files Browse the repository at this point in the history
close #8854
  • Loading branch information
3AceShowHand authored Apr 26, 2023
1 parent c34cf32 commit ed5a5f5
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 18 deletions.
20 changes: 15 additions & 5 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package util
import (
"bytes"
"fmt"
"net/url"
"os"
"path/filepath"
"syscall"
Expand Down Expand Up @@ -182,8 +183,12 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
require.Equal(t, &config.MounterConfig{
WorkerNum: 16,
}, cfg.Mounter)
err = cfg.ValidateAndAdjust(nil)
require.Nil(t, err)

sinkURL, err := url.Parse("kafka://127.0.0.1:9092")
require.NoError(t, err)

err = cfg.ValidateAndAdjust(sinkURL)
require.NoError(t, err)
require.Equal(t, &config.SinkConfig{
EncoderConcurrency: 16,
DispatchRules: []*config.DispatchRule{
Expand All @@ -209,11 +214,16 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
func TestAndWriteStorageSinkTOML(t *testing.T) {
cfg := config.GetDefaultReplicaConfig()
err := StrictDecodeFile("changefeed_storage_sink.toml", "cdc", &cfg)
require.Nil(t, err)
require.NoError(t, err)

err = cfg.ValidateAndAdjust(nil)
require.Nil(t, err)
sinkURL, err := url.Parse("s3://127.0.0.1:9092")
require.NoError(t, err)

cfg.Sink.Protocol = config.ProtocolCanalJSON.String()
err = cfg.ValidateAndAdjust(sinkURL)
require.NoError(t, err)
require.Equal(t, &config.SinkConfig{
Protocol: config.ProtocolCanalJSON.String(),
EncoderConcurrency: 16,
Terminator: "\r\n",
DateSeparator: "day",
Expand Down
10 changes: 10 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/pkg/config/outdated"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/sink"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -222,6 +223,15 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error {
}

if c.Integrity != nil {
switch strings.ToLower(sinkURI.Scheme) {
case sink.KafkaScheme, sink.KafkaSSLScheme:
default:
if c.Integrity.Enabled() {
log.Warn("integrity checksum only support kafka sink now, disable integrity")
c.Integrity.IntegrityCheckLevel = IntegrityCheckLevelNone
}
}

if err := c.Integrity.Validate(); err != nil {
return err
}
Expand Down
40 changes: 27 additions & 13 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,25 @@ func TestReplicaConfigOutDated(t *testing.T) {
func TestReplicaConfigValidate(t *testing.T) {
t.Parallel()
conf := GetDefaultReplicaConfig()
require.Nil(t, conf.ValidateAndAdjust(nil))

sinkURL, err := url.Parse("blackhole://")
require.NoError(t, err)

require.Nil(t, conf.ValidateAndAdjust(sinkURL))

// Incorrect sink configuration.
conf = GetDefaultReplicaConfig()
conf.Sink.Protocol = "canal"
conf.EnableOldValue = false
require.Regexp(t, ".*canal protocol requires old value to be enabled.*",
conf.ValidateAndAdjust(nil))
conf.ValidateAndAdjust(sinkURL))

conf = GetDefaultReplicaConfig()
conf.Sink.DispatchRules = []*DispatchRule{
{Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"},
}
require.Regexp(t, ".*dispatcher and partition cannot be configured both.*",
conf.ValidateAndAdjust(nil))
conf.ValidateAndAdjust(sinkURL))

// Correct sink configuration.
conf = GetDefaultReplicaConfig()
Expand All @@ -129,8 +133,8 @@ func TestReplicaConfigValidate(t *testing.T) {
{Matcher: []string{"a.c"}, PartitionRule: "p1"},
{Matcher: []string{"a.d"}},
}
err := conf.ValidateAndAdjust(nil)
require.Nil(t, err)
err = conf.ValidateAndAdjust(sinkURL)
require.NoError(t, err)
rules := conf.Sink.DispatchRules
require.Equal(t, "d1", rules[0].PartitionRule)
require.Equal(t, "p1", rules[1].PartitionRule)
Expand All @@ -139,38 +143,48 @@ func TestReplicaConfigValidate(t *testing.T) {
// Test memory quota can be adjusted
conf = GetDefaultReplicaConfig()
conf.MemoryQuota = 0
err = conf.ValidateAndAdjust(nil)
err = conf.ValidateAndAdjust(sinkURL)
require.NoError(t, err)
require.Equal(t, uint64(DefaultChangefeedMemoryQuota), conf.MemoryQuota)

conf.MemoryQuota = uint64(1024)
err = conf.ValidateAndAdjust(nil)
err = conf.ValidateAndAdjust(sinkURL)
require.NoError(t, err)
require.Equal(t, uint64(1024), conf.MemoryQuota)
}

func TestValidateAndAdjust(t *testing.T) {
cfg := GetDefaultReplicaConfig()
require.False(t, cfg.EnableSyncPoint)
require.NoError(t, cfg.ValidateAndAdjust(nil))

sinkURL, err := url.Parse("blackhole://")
require.NoError(t, err)

require.NoError(t, cfg.ValidateAndAdjust(sinkURL))

cfg.EnableSyncPoint = true
require.NoError(t, cfg.ValidateAndAdjust(nil))
require.NoError(t, cfg.ValidateAndAdjust(sinkURL))

cfg.SyncPointInterval = time.Second * 29
require.Error(t, cfg.ValidateAndAdjust(nil))
require.Error(t, cfg.ValidateAndAdjust(sinkURL))

cfg.SyncPointInterval = time.Second * 30
cfg.SyncPointRetention = time.Minute * 10
require.Error(t, cfg.ValidateAndAdjust(nil))
require.Error(t, cfg.ValidateAndAdjust(sinkURL))

cfg.Sink.EncoderConcurrency = -1
require.Error(t, cfg.ValidateAndAdjust(nil))
require.Error(t, cfg.ValidateAndAdjust(sinkURL))

cfg = GetDefaultReplicaConfig()
cfg.Scheduler = nil
require.Nil(t, cfg.ValidateAndAdjust(nil))
require.Nil(t, cfg.ValidateAndAdjust(sinkURL))
require.False(t, cfg.Scheduler.EnableTableAcrossNodes)

// enable the checksum verification, but use blackhole sink
cfg = GetDefaultReplicaConfig()
cfg.Integrity.IntegrityCheckLevel = IntegrityCheckLevelCorrectness
require.NoError(t, cfg.ValidateAndAdjust(sinkURL))
require.Equal(t, IntegrityCheckLevelNone, cfg.Integrity.IntegrityCheckLevel)
}

func TestIsSinkCompatibleWithSpanReplication(t *testing.T) {
Expand Down

0 comments on commit ed5a5f5

Please sign in to comment.