Skip to content

Commit

Permalink
use string val
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 29, 2022
1 parent 7e4ae2c commit 0531ad2
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 71 deletions.
1 change: 1 addition & 0 deletions cdc/api/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
// test no change error
changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"}
oldInfo.SinkURI = "blackhole://"
oldInfo.Config.Sink.TxnAtomicity = "table"
newInfo, err = VerifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.NotNil(t, err)
require.Regexp(t, ".*changefeed config is the same with the old one.*", err)
Expand Down
10 changes: 5 additions & 5 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,14 @@ func (n *sinkNode) releaseResource(ctx context.Context) error {
}

func (n *sinkNode) checkSplitTxn(e *model.PolymorphicEvent) {
if n.replicaConfig.Sink.TxnAtomicity == config.NoneTxnAtomicity {
ta := n.replicaConfig.Sink.TxnAtomicity
ta.Validate()
if ta.ShouldSplitTxn() {
return
}

if n.replicaConfig.Sink.TxnAtomicity != config.TableTxnAtomicity {
log.Panic("unsupported txn atomicity", zap.Any("replicaConfig", n.replicaConfig))
}

// Check that BatchResolved events and RowChangedEvent events with `SplitTxn==true`
// have not been received by sinkNode.
if e.Resolved != nil && e.Resolved.IsBatchMode() {
log.Panic("batch mode resolved ts is not supported when sink.splitTxn is false",
zap.Any("event", e), zap.Any("replicaConfig", n.replicaConfig))
Expand Down
38 changes: 29 additions & 9 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,14 @@ func (s *mockCloseControlSink) Close(ctx context.Context) error {

func TestState(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test-status"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})

Expand Down Expand Up @@ -270,11 +273,14 @@ func TestState(t *testing.T) {
// until the underlying sink is closed
func TestStopStatus(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test-state"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})

Expand Down Expand Up @@ -317,11 +323,14 @@ func TestStopStatus(t *testing.T) {

func TestManyTs(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})
state := TableStatePrepared
Expand Down Expand Up @@ -492,11 +501,14 @@ func TestManyTs(t *testing.T) {

func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})
state := TableStatePreparing
Expand All @@ -519,11 +531,14 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {

func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: config.GetDefaultReplicaConfig(),
Config: config,
},
})
state := TableStatePreparing
Expand Down Expand Up @@ -581,13 +596,15 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {

func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
config := config.GetDefaultReplicaConfig()
config.EnableOldValue = false
// tableTxnAtomicity
config.Sink.TxnAtomicity = "none"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: cfg,
Config: config,
},
})
state := TableStatePreparing
Expand Down Expand Up @@ -733,11 +750,14 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
config := config.GetDefaultReplicaConfig()
// tableTxnAtomicity
config.Sink.TxnAtomicity = "table"
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: cfg,
Config: config,
},
})
state := TableStatePreparing
Expand Down
3 changes: 1 addition & 2 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tiflow/cdc/sink/flowcontrol"
"github.com/pingcap/tiflow/pkg/actor"
"github.com/pingcap/tiflow/pkg/actor/message"
"github.com/pingcap/tiflow/pkg/config"
serverConfig "github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -283,7 +282,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
zap.String("tableName", t.tableName),
zap.Uint64("quota", t.memoryQuota))

splitTxn := t.replicaConfig.Sink.TxnAtomicity == config.NoneTxnAtomicity
splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn()

flowController := flowcontrol.NewTableFlowController(t.memoryQuota,
t.redoManager.Enabled(), splitTxn)
Expand Down
4 changes: 4 additions & 0 deletions cdc/sink/flowcontrol/flow_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type txnSizeEntry struct {

// NewTableFlowController creates a new TableFlowController
func NewTableFlowController(quota uint64, redoLogEnabled bool, splitTxn bool) *TableFlowController {
log.Info("create table flow controller",
zap.Uint64("quota", quota),
zap.Bool("redoLogEnabled", redoLogEnabled),
zap.Bool("splitTxn", splitTxn))
maxSizePerTxn := uint64(defaultSizePerTxn)
if maxSizePerTxn > quota {
maxSizePerTxn = quota
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
{Matcher: []string{"test1.*", "test2.*"}, Columns: []string{"column1", "column2"}},
{Matcher: []string{"test3.*", "test4.*"}, Columns: []string{"!a", "column3"}},
},
Protocol: "open-protocol",
TxnAtomicity: config.TableTxnAtomicity,
Protocol: "open-protocol",
}, cfg.Sink)
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ const (
}
],
"schema-registry": "",
"transaction-atomicity": 2
"transaction-atomicity": ""
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -204,8 +204,7 @@ const (
"b"
]
}
],
"transaction-atomicity": 2
]
},
"consistent": {
"level": "none",
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var defaultReplicaConfig = &ReplicaConfig{
Mounter: &MounterConfig{
WorkerNum: 16,
},
Sink: &SinkConfig{TxnAtomicity: TableTxnAtomicity},
Sink: &SinkConfig{},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestReplicaConfigOutDated(t *testing.T) {
{Matcher: []string{"a.c"}, DispatcherRule: "r2"},
{Matcher: []string{"a.d"}, DispatcherRule: "r2"},
}
conf.Sink.TxnAtomicity = UnknowTxnAtomicity
conf.Sink.TxnAtomicity = unknowTxnAtomicity
require.Equal(t, conf, conf2)
}

Expand Down
71 changes: 46 additions & 25 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,36 @@ import (
const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M

// AtomicityLevel represents the atomicity level of a changefeed.
type AtomicityLevel int
type AtomicityLevel string

const (
// UnknowTxnAtomicity is the default atomicity level, which is invalid.
UnknowTxnAtomicity AtomicityLevel = iota
// NoneTxnAtomicity means atomicity of transactions is not guaranteed
NoneTxnAtomicity
// TableTxnAtomicity means atomicity of single table transactions is guaranteed.
TableTxnAtomicity
// GlobalTxnAtomicity means atomicity of cross table transactions is guaranteed, which
// unknowTxnAtomicity is the default atomicity level, which is invalid and will
// be set to a valid value when initializing sink in processor.
unknowTxnAtomicity AtomicityLevel = ""

// noneTxnAtomicity means atomicity of transactions is not guaranteed
noneTxnAtomicity AtomicityLevel = "none"

// tableTxnAtomicity means atomicity of single table transactions is guaranteed.
tableTxnAtomicity AtomicityLevel = "table"

// globalTxnAtomicity means atomicity of cross table transactions is guaranteed, which
// is currently not supported by TiCDC.
GlobalTxnAtomicity
// globalTxnAtomicity AtomicityLevel = "global"
)

// ShouldSplitTxn returns whether the sink should split txn.
func (l AtomicityLevel) ShouldSplitTxn() bool {
return l == noneTxnAtomicity
}

// Validate checks the AtomicityLevel is supported by TiCDC.
func (l AtomicityLevel) Validate() {
if l != noneTxnAtomicity && l != tableTxnAtomicity {
log.Panic(fmt.Sprintf("unsupported transaction atomicity: %s", l))
}
}

// ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value.
var ForceEnableOldValueProtocols = []string{
ProtocolCanal.String(),
Expand Down Expand Up @@ -116,28 +132,33 @@ func (s *SinkConfig) applyParameter(sinkURI *url.URL) error {
params := sinkURI.Query()

txnAtomicity := params.Get("transaction-atomicity")
switch txnAtomicity {
case "":
fallthrough
case "table":
s.TxnAtomicity = TableTxnAtomicity
case "none":
s.TxnAtomicity = NoneTxnAtomicity
switch AtomicityLevel(txnAtomicity) {
case unknowTxnAtomicity:
// Set default value according to scheme.
if isMqScheme(sinkURI.Scheme) {
s.TxnAtomicity = noneTxnAtomicity
} else {
s.TxnAtomicity = tableTxnAtomicity
}
case noneTxnAtomicity:
s.TxnAtomicity = noneTxnAtomicity
case tableTxnAtomicity:
// MqSink only support `noneTxnAtomicity`.
if isMqScheme(sinkURI.Scheme) {
log.Warn("The configuration of transaction-atomicity is incompatible with scheme",
zap.Any("txnAtomicity", s.TxnAtomicity),
zap.String("scheme", sinkURI.Scheme),
zap.String("protocol", s.Protocol))
s.TxnAtomicity = noneTxnAtomicity
} else {
s.TxnAtomicity = tableTxnAtomicity
}
default:
errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme",
txnAtomicity, sinkURI.Scheme)
return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg)
}

// MqSink only support `NoneTxnAtomicity`.
if isMqScheme(sinkURI.Scheme) && s.TxnAtomicity != NoneTxnAtomicity {
log.Warn("The configuration of transaction-atomicity is incompatible with scheme",
zap.Any("txnAtomicity", s.TxnAtomicity),
zap.String("scheme", sinkURI.Scheme),
zap.String("protocol", s.Protocol))
s.TxnAtomicity = NoneTxnAtomicity
}

s.Protocol = params.Get(ProtocolKey)
// validate that protocol is compatible with the scheme
if isMqScheme(sinkURI.Scheme) {
Expand Down
46 changes: 23 additions & 23 deletions pkg/config/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,24 @@ func TestValidateOldValue(t *testing.T) {
func TestValidateApplyParameter(t *testing.T) {
t.Parallel()
testCases := []struct {
sinkURI string
expectedErr string
expectedSplitTxn AtomicityLevel
sinkURI string
expectedErr string
expectedLevel AtomicityLevel
}{
{
sinkURI: "mysql://normal:[email protected]:3306",
expectedErr: "",
expectedSplitTxn: TableTxnAtomicity,
sinkURI: "mysql://normal:[email protected]:3306",
expectedErr: "",
expectedLevel: tableTxnAtomicity,
},
{
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=table",
expectedErr: "",
expectedSplitTxn: TableTxnAtomicity,
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=table",
expectedErr: "",
expectedLevel: tableTxnAtomicity,
},
{
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=none",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=none",
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=global",
Expand All @@ -112,26 +112,26 @@ func TestValidateApplyParameter(t *testing.T) {
expectedErr: ".*protocol cannot be configured when using tidb scheme.*",
},
{
sinkURI: "blackhole://normal:[email protected]:3306?transaction-atomicity=none",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
sinkURI: "blackhole://normal:[email protected]:3306?transaction-atomicity=none",
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=none" +
"&protocol=open-protocol",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "pulsar://127.0.0.1:9092?transaction-atomicity=table" +
"&protocol=open-protocol",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "kafka://127.0.0.1:9092?protocol=default",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
sinkURI: "kafka://127.0.0.1:9092?protocol=default",
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=table",
Expand All @@ -145,7 +145,7 @@ func TestValidateApplyParameter(t *testing.T) {
require.Nil(t, err)
if tc.expectedErr == "" {
require.Nil(t, cfg.validateAndAdjust(parsedSinkURI, true))
require.Equal(t, tc.expectedSplitTxn, cfg.TxnAtomicity)
require.Equal(t, tc.expectedLevel, cfg.TxnAtomicity)
} else {
require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI, true))
}
Expand Down

0 comments on commit 0531ad2

Please sign in to comment.