Skip to content

Commit

Permalink
use string val
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 28, 2022
1 parent f19cae6 commit 77a9a2d
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 60 deletions.
10 changes: 5 additions & 5 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,14 @@ func (n *sinkNode) releaseResource(ctx context.Context) error {
}

func (n *sinkNode) checkSplitTxn(e *model.PolymorphicEvent) {
if n.replicaConfig.Sink.TxnAtomicity == config.NoneTxnAtomicity {
ta := n.replicaConfig.Sink.TxnAtomicity
ta.Validate()
if ta.ShouldSplitTxn() {
return
}

if n.replicaConfig.Sink.TxnAtomicity != config.TableTxnAtomicity {
log.Panic("unsupported txn atomicity", zap.Any("replicaConfig", n.replicaConfig))
}

// Check that BatchResolved events and RowChangedEvent events with `SplitTxn==true`
// have not been received by sinkNode.
if e.Resolved != nil && e.Resolved.IsBatchMode() {
log.Panic("batch mode resolved ts is not supported when sink.splitTxn is false",
zap.Any("event", e), zap.Any("replicaConfig", n.replicaConfig))
Expand Down
3 changes: 1 addition & 2 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/tiflow/cdc/sink/flowcontrol"
"github.com/pingcap/tiflow/pkg/actor"
"github.com/pingcap/tiflow/pkg/actor/message"
"github.com/pingcap/tiflow/pkg/config"
serverConfig "github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -283,7 +282,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
zap.String("tableName", t.tableName),
zap.Uint64("quota", t.memoryQuota))

splitTxn := t.replicaConfig.Sink.TxnAtomicity == config.NoneTxnAtomicity
splitTxn := t.replicaConfig.Sink.TxnAtomicity.ShouldSplitTxn()

flowController := flowcontrol.NewTableFlowController(t.memoryQuota,
t.redoManager.Enabled(), splitTxn)
Expand Down
5 changes: 2 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ const (
}
],
"schema-registry": "",
"transaction-atomicity": 2
"transaction-atomicity": ""
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -204,8 +204,7 @@ const (
"b"
]
}
],
"transaction-atomicity": 2
]
},
"consistent": {
"level": "none",
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var defaultReplicaConfig = &ReplicaConfig{
Mounter: &MounterConfig{
WorkerNum: 16,
},
Sink: &SinkConfig{TxnAtomicity: TableTxnAtomicity},
Sink: &SinkConfig{},
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestReplicaConfigOutDated(t *testing.T) {
{Matcher: []string{"a.c"}, DispatcherRule: "r2"},
{Matcher: []string{"a.d"}, DispatcherRule: "r2"},
}
conf.Sink.TxnAtomicity = UnknowTxnAtomicity
conf.Sink.TxnAtomicity = unknowTxnAtomicity
require.Equal(t, conf, conf2)
}

Expand Down
71 changes: 46 additions & 25 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,36 @@ import (
const DefaultMaxMessageBytes = 10 * 1024 * 1024 // 10M

// AtomicityLevel represents the atomicity level of a changefeed.
type AtomicityLevel int
type AtomicityLevel string

const (
// UnknowTxnAtomicity is the default atomicity level, which is invalid.
UnknowTxnAtomicity AtomicityLevel = iota
// NoneTxnAtomicity means atomicity of transactions is not guaranteed
NoneTxnAtomicity
// TableTxnAtomicity means atomicity of single table transactions is guaranteed.
TableTxnAtomicity
// GlobalTxnAtomicity means atomicity of cross table transactions is guaranteed, which
// unknowTxnAtomicity is the default atomicity level, which is invalid and will
// be set to a valid value when initializing sink in processor.
unknowTxnAtomicity AtomicityLevel = ""

// noneTxnAtomicity means atomicity of transactions is not guaranteed
noneTxnAtomicity AtomicityLevel = "none"

// tableTxnAtomicity means atomicity of single table transactions is guaranteed.
tableTxnAtomicity AtomicityLevel = "table"

// globalTxnAtomicity means atomicity of cross table transactions is guaranteed, which
// is currently not supported by TiCDC.
GlobalTxnAtomicity
globalTxnAtomicity AtomicityLevel = "global"
)

// ShouldUseOldValue returns whether the sink should split txn.
func (l AtomicityLevel) ShouldSplitTxn() bool {
return l == noneTxnAtomicity
}

// Validate checks the AtomicityLevel is supported by TiCDC.
func (l AtomicityLevel) Validate() {
if l != noneTxnAtomicity && l != tableTxnAtomicity {
log.Panic(fmt.Sprintf("unsupported transaction atomicity: %s", l))
}
}

// ForceEnableOldValueProtocols specifies which protocols need to be forced to enable old value.
var ForceEnableOldValueProtocols = []string{
ProtocolCanal.String(),
Expand Down Expand Up @@ -116,28 +132,33 @@ func (s *SinkConfig) applyParameter(sinkURI *url.URL) error {
params := sinkURI.Query()

txnAtomicity := params.Get("transaction-atomicity")
switch txnAtomicity {
case "":
fallthrough
case "table":
s.TxnAtomicity = TableTxnAtomicity
case "none":
s.TxnAtomicity = NoneTxnAtomicity
switch AtomicityLevel(txnAtomicity) {
case unknowTxnAtomicity:
// Set default value according to scheme.
if isMqScheme(sinkURI.Scheme) {
s.TxnAtomicity = noneTxnAtomicity
} else {
s.TxnAtomicity = tableTxnAtomicity
}
case noneTxnAtomicity:
s.TxnAtomicity = noneTxnAtomicity
case tableTxnAtomicity:
// MqSink only support `noneTxnAtomicity`.
if isMqScheme(sinkURI.Scheme) {
log.Warn("The configuration of transaction-atomicity is incompatible with scheme",
zap.Any("txnAtomicity", s.TxnAtomicity),
zap.String("scheme", sinkURI.Scheme),
zap.String("protocol", s.Protocol))
s.TxnAtomicity = noneTxnAtomicity
} else {
s.TxnAtomicity = tableTxnAtomicity
}
default:
errMsg := fmt.Sprintf("%s level atomicity is not supported by %s scheme",
txnAtomicity, sinkURI.Scheme)
return cerror.ErrSinkURIInvalid.GenWithStackByArgs(errMsg)
}

// MqSink only support `NoneTxnAtomicity`.
if isMqScheme(sinkURI.Scheme) && s.TxnAtomicity != NoneTxnAtomicity {
log.Warn("The configuration of transaction-atomicity is incompatible with scheme",
zap.Any("txnAtomicity", s.TxnAtomicity),
zap.String("scheme", sinkURI.Scheme),
zap.String("protocol", s.Protocol))
s.TxnAtomicity = NoneTxnAtomicity
}

s.Protocol = params.Get(ProtocolKey)
// validate that protocol is compatible with the scheme
if isMqScheme(sinkURI.Scheme) {
Expand Down
46 changes: 23 additions & 23 deletions pkg/config/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,24 @@ func TestValidateOldValue(t *testing.T) {
func TestValidateApplyParameter(t *testing.T) {
t.Parallel()
testCases := []struct {
sinkURI string
expectedErr string
expectedSplitTxn AtomicityLevel
sinkURI string
expectedErr string
expectedLevel AtomicityLevel
}{
{
sinkURI: "mysql://normal:[email protected]:3306",
expectedErr: "",
expectedSplitTxn: TableTxnAtomicity,
sinkURI: "mysql://normal:[email protected]:3306",
expectedErr: "",
expectedLevel: tableTxnAtomicity,
},
{
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=table",
expectedErr: "",
expectedSplitTxn: TableTxnAtomicity,
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=table",
expectedErr: "",
expectedLevel: tableTxnAtomicity,
},
{
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=none",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=none",
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "mysql://normal:[email protected]:3306?transaction-atomicity=global",
Expand All @@ -112,26 +112,26 @@ func TestValidateApplyParameter(t *testing.T) {
expectedErr: ".*protocol cannot be configured when using tidb scheme.*",
},
{
sinkURI: "blackhole://normal:[email protected]:3306?transaction-atomicity=none",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
sinkURI: "blackhole://normal:[email protected]:3306?transaction-atomicity=none",
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=none" +
"&protocol=open-protocol",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "pulsar://127.0.0.1:9092?transaction-atomicity=table" +
"&protocol=open-protocol",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "kafka://127.0.0.1:9092?protocol=default",
expectedErr: "",
expectedSplitTxn: NoneTxnAtomicity,
sinkURI: "kafka://127.0.0.1:9092?protocol=default",
expectedErr: "",
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "kafka://127.0.0.1:9092?transaction-atomicity=table",
Expand All @@ -145,7 +145,7 @@ func TestValidateApplyParameter(t *testing.T) {
require.Nil(t, err)
if tc.expectedErr == "" {
require.Nil(t, cfg.validateAndAdjust(parsedSinkURI, true))
require.Equal(t, tc.expectedSplitTxn, cfg.TxnAtomicity)
require.Equal(t, tc.expectedLevel, cfg.TxnAtomicity)
} else {
require.Regexp(t, tc.expectedErr, cfg.validateAndAdjust(parsedSinkURI, true))
}
Expand Down

0 comments on commit 77a9a2d

Please sign in to comment.