From 0531ad2389cef449c026e74de1aee6af99c1b792 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 28 Jun 2022 17:06:22 +0800 Subject: [PATCH] use string val --- cdc/api/validator/validator_test.go | 1 + cdc/processor/pipeline/sink.go | 10 ++-- cdc/processor/pipeline/sink_test.go | 38 ++++++++++---- cdc/processor/pipeline/table_actor.go | 3 +- cdc/sink/flowcontrol/flow_control.go | 4 ++ pkg/cmd/util/helper_test.go | 3 +- pkg/config/config_test_data.go | 5 +- pkg/config/replica_config.go | 2 +- pkg/config/replica_config_test.go | 2 +- pkg/config/sink.go | 71 +++++++++++++++++---------- pkg/config/sink_test.go | 46 ++++++++--------- 11 files changed, 114 insertions(+), 71 deletions(-) diff --git a/cdc/api/validator/validator_test.go b/cdc/api/validator/validator_test.go index 8e863265569..d8c811121be 100644 --- a/cdc/api/validator/validator_test.go +++ b/cdc/api/validator/validator_test.go @@ -39,6 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { // test no change error changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"} oldInfo.SinkURI = "blackhole://" + oldInfo.Config.Sink.TxnAtomicity = "table" newInfo, err = VerifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo) require.NotNil(t, err) require.Regexp(t, ".*changefeed config is the same with the old one.*", err) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index e1d1ff913ce..b8eb72fd677 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -364,14 +364,14 @@ func (n *sinkNode) releaseResource(ctx context.Context) error { } func (n *sinkNode) checkSplitTxn(e *model.PolymorphicEvent) { - if n.replicaConfig.Sink.TxnAtomicity == config.NoneTxnAtomicity { + ta := n.replicaConfig.Sink.TxnAtomicity + ta.Validate() + if ta.ShouldSplitTxn() { return } - if n.replicaConfig.Sink.TxnAtomicity != config.TableTxnAtomicity { - log.Panic("unsupported txn atomicity", zap.Any("replicaConfig", n.replicaConfig)) - } - + // Check that BatchResolved events and RowChangedEvent events with `SplitTxn==true` + // have not been received by sinkNode. if e.Resolved != nil && e.Resolved.IsBatchMode() { log.Panic("batch mode resolved ts is not supported when sink.splitTxn is false", zap.Any("event", e), zap.Any("replicaConfig", n.replicaConfig)) diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index cf27ed57f06..7f6705f9f1f 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -127,11 +127,14 @@ func (s *mockCloseControlSink) Close(ctx context.Context) error { func TestState(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() + // tableTxnAtomicity + config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test-status"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) @@ -270,11 +273,14 @@ func TestState(t *testing.T) { // until the underlying sink is closed func TestStopStatus(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() + // tableTxnAtomicity + config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test-state"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) @@ -317,11 +323,14 @@ func TestStopStatus(t *testing.T) { func TestManyTs(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() + // tableTxnAtomicity + config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) state := TableStatePrepared @@ -492,11 +501,14 @@ func TestManyTs(t *testing.T) { func TestIgnoreEmptyRowChangeEvent(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() + // tableTxnAtomicity + config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) state := TableStatePreparing @@ -519,11 +531,14 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) { func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + config := config.GetDefaultReplicaConfig() + // tableTxnAtomicity + config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: config.GetDefaultReplicaConfig(), + Config: config, }, }) state := TableStatePreparing @@ -581,13 +596,15 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) - cfg := config.GetDefaultReplicaConfig() - cfg.EnableOldValue = false + config := config.GetDefaultReplicaConfig() + config.EnableOldValue = false + // tableTxnAtomicity + config.Sink.TxnAtomicity = "none" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: cfg, + Config: config, }, }) state := TableStatePreparing @@ -733,11 +750,14 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) cfg := config.GetDefaultReplicaConfig() cfg.EnableOldValue = false + config := config.GetDefaultReplicaConfig() + // tableTxnAtomicity + config.Sink.TxnAtomicity = "table" ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), - Config: cfg, + Config: config, }, }) state := TableStatePreparing diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index d46d3cb4e1b..cac35705527 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tiflow/cdc/sink/flowcontrol" "github.com/pingcap/tiflow/pkg/actor" "github.com/pingcap/tiflow/pkg/actor/message" - "github.com/pingcap/tiflow/pkg/config" serverConfig "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -283,7 +282,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { zap.String("tableName", t.tableName), zap.Uint64("quota", t.memoryQuota)) - splitTxn := t.replicaConfig.Sink.TxnAtomicity == config.NoneTxnAtomicity + splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn() flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoManager.Enabled(), splitTxn) diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index 778a335c485..039d33418f1 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -66,6 +66,10 @@ type txnSizeEntry struct { // NewTableFlowController creates a new TableFlowController func NewTableFlowController(quota uint64, redoLogEnabled bool, splitTxn bool) *TableFlowController { + log.Info("create table flow controller", + zap.Uint64("quota", quota), + zap.Bool("redoLogEnabled", redoLogEnabled), + zap.Bool("splitTxn", splitTxn)) maxSizePerTxn := uint64(defaultSizePerTxn) if maxSizePerTxn > quota { maxSizePerTxn = quota diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 65efd653437..946bb6be6f9 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -191,8 +191,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { {Matcher: []string{"test1.*", "test2.*"}, Columns: []string{"column1", "column2"}}, {Matcher: []string{"test3.*", "test4.*"}, Columns: []string{"!a", "column3"}}, }, - Protocol: "open-protocol", - TxnAtomicity: config.TableTxnAtomicity, + Protocol: "open-protocol", }, cfg.Sink) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 1094be16247..53cd9ce99a6 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -167,7 +167,7 @@ const ( } ], "schema-registry": "", - "transaction-atomicity": 2 + "transaction-atomicity": "" }, "consistent": { "level": "none", @@ -204,8 +204,7 @@ const ( "b" ] } - ], - "transaction-atomicity": 2 + ] }, "consistent": { "level": "none", diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index de3e5b5fc3a..5d9100c90f9 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -36,7 +36,7 @@ var defaultReplicaConfig = &ReplicaConfig{ Mounter: &MounterConfig{ WorkerNum: 16, }, - Sink: &SinkConfig{TxnAtomicity: TableTxnAtomicity}, + Sink: &SinkConfig{}, Consistent: &ConsistentConfig{ Level: "none", MaxLogSize: 64, diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 4769d5be8c1..07ed1b36a21 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -81,7 +81,7 @@ func TestReplicaConfigOutDated(t *testing.T) { {Matcher: []string{"a.c"}, DispatcherRule: "r2"}, {Matcher: []string{"a.d"}, DispatcherRule: "r2"}, } - conf.Sink.TxnAtomicity = UnknowTxnAtomicity + conf.Sink.TxnAtomicity = unknowTxnAtomicity require.Equal(t, conf, conf2) } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 02f7b20328d..1e847bf4918 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -27,20 +27,36 @@ import ( const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M // AtomicityLevel represents the atomicity level of a changefeed. -type AtomicityLevel int +type AtomicityLevel string const ( - // UnknowTxnAtomicity is the default atomicity level, which is invalid. - UnknowTxnAtomicity AtomicityLevel = iota - // NoneTxnAtomicity means atomicity of transactions is not guaranteed - NoneTxnAtomicity - // TableTxnAtomicity means atomicity of single table transactions is guaranteed. - TableTxnAtomicity - // GlobalTxnAtomicity means atomicity of cross table transactions is guaranteed, which + // unknowTxnAtomicity is the default atomicity level, which is invalid and will + // be set to a valid value when initializing sink in processor. + unknowTxnAtomicity AtomicityLevel = "" + + // noneTxnAtomicity means atomicity of transactions is not guaranteed + noneTxnAtomicity AtomicityLevel = "none" + + // tableTxnAtomicity means atomicity of single table transactions is guaranteed. + tableTxnAtomicity AtomicityLevel = "table" + + // globalTxnAtomicity means atomicity of cross table transactions is guaranteed, which // is currently not supported by TiCDC. - GlobalTxnAtomicity + // globalTxnAtomicity AtomicityLevel = "global" ) +// ShouldSplitTxn returns whether the sink should split txn. +func (l AtomicityLevel) ShouldSplitTxn() bool { + return l == noneTxnAtomicity +} + +// Validate checks the AtomicityLevel is supported by TiCDC. +func (l AtomicityLevel) Validate() { + if l != noneTxnAtomicity && l != tableTxnAtomicity { + log.Panic(fmt.Sprintf("unsupported transaction atomicity: %s", l)) + } +} + // ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value. var ForceEnableOldValueProtocols = []string{ ProtocolCanal.String(), @@ -116,28 +132,33 @@ func (s *SinkConfig) applyParameter(sinkURI *url.URL) error { params := sinkURI.Query() txnAtomicity := params.Get("transaction-atomicity") - switch txnAtomicity { - case "": - fallthrough - case "table": - s.TxnAtomicity = TableTxnAtomicity - case "none": - s.TxnAtomicity = NoneTxnAtomicity + switch AtomicityLevel(txnAtomicity) { + case unknowTxnAtomicity: + // Set default value according to scheme. + if isMqScheme(sinkURI.Scheme) { + s.TxnAtomicity = noneTxnAtomicity + } else { + s.TxnAtomicity = tableTxnAtomicity + } + case noneTxnAtomicity: + s.TxnAtomicity = noneTxnAtomicity + case tableTxnAtomicity: + // MqSink only support `noneTxnAtomicity`. + if isMqScheme(sinkURI.Scheme) { + log.Warn("The configuration of transaction-atomicity is incompatible with scheme", + zap.Any("txnAtomicity", s.TxnAtomicity), + zap.String("scheme", sinkURI.Scheme), + zap.String("protocol", s.Protocol)) + s.TxnAtomicity = noneTxnAtomicity + } else { + s.TxnAtomicity = tableTxnAtomicity + } default: errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme", txnAtomicity, sinkURI.Scheme) return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg) } - // MqSink only support `NoneTxnAtomicity`. - if isMqScheme(sinkURI.Scheme) && s.TxnAtomicity != NoneTxnAtomicity { - log.Warn("The configuration of transaction-atomicity is incompatible with scheme", - zap.Any("txnAtomicity", s.TxnAtomicity), - zap.String("scheme", sinkURI.Scheme), - zap.String("protocol", s.Protocol)) - s.TxnAtomicity = NoneTxnAtomicity - } - s.Protocol = params.Get(ProtocolKey) // validate that protocol is compatible with the scheme if isMqScheme(sinkURI.Scheme) { diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index 2c8e75b0aa5..94378a25095 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -84,24 +84,24 @@ func TestValidateOldValue(t *testing.T) { func TestValidateApplyParameter(t *testing.T) { t.Parallel() testCases := []struct { - sinkURI string - expectedErr string - expectedSplitTxn AtomicityLevel + sinkURI string + expectedErr string + expectedLevel AtomicityLevel }{ { - sinkURI: "mysql://normal:123456@127.0.0.1:3306", - expectedErr: "", - expectedSplitTxn: TableTxnAtomicity, + sinkURI: "mysql://normal:123456@127.0.0.1:3306", + expectedErr: "", + expectedLevel: tableTxnAtomicity, }, { - sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=table", - expectedErr: "", - expectedSplitTxn: TableTxnAtomicity, + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=table", + expectedErr: "", + expectedLevel: tableTxnAtomicity, }, { - sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none", - expectedErr: "", - expectedSplitTxn: NoneTxnAtomicity, + sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, }, { sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=global", @@ -112,26 +112,26 @@ func TestValidateApplyParameter(t *testing.T) { expectedErr: ".*protocol cannot be configured when using tidb scheme.*", }, { - sinkURI: "blackhole://normal:123456@127.0.0.1:3306?transaction-atomicity=none", - expectedErr: "", - expectedSplitTxn: NoneTxnAtomicity, + sinkURI: "blackhole://normal:123456@127.0.0.1:3306?transaction-atomicity=none", + expectedErr: "", + expectedLevel: noneTxnAtomicity, }, { sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=none" + "&protocol=open-protocol", - expectedErr: "", - expectedSplitTxn: NoneTxnAtomicity, + expectedErr: "", + expectedLevel: noneTxnAtomicity, }, { sinkURI: "pulsar://127.0.0.1:9092?transaction-atomicity=table" + "&protocol=open-protocol", - expectedErr: "", - expectedSplitTxn: NoneTxnAtomicity, + expectedErr: "", + expectedLevel: noneTxnAtomicity, }, { - sinkURI: "kafka://127.0.0.1:9092?protocol=default", - expectedErr: "", - expectedSplitTxn: NoneTxnAtomicity, + sinkURI: "kafka://127.0.0.1:9092?protocol=default", + expectedErr: "", + expectedLevel: noneTxnAtomicity, }, { sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=table", @@ -145,7 +145,7 @@ func TestValidateApplyParameter(t *testing.T) { require.Nil(t, err) if tc.expectedErr == "" { require.Nil(t, cfg.validateAndAdjust(parsedSinkURI, true)) - require.Equal(t, tc.expectedSplitTxn, cfg.TxnAtomicity) + require.Equal(t, tc.expectedLevel, cfg.TxnAtomicity) } else { require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI, true)) }