Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#6038
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jul 1, 2022
1 parent 41ab14c commit 57377f2
Show file tree
Hide file tree
Showing 18 changed files with 831 additions and 41 deletions.
5 changes: 5 additions & 0 deletions cdc/api/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
// test no change error
changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"}
oldInfo.SinkURI = "blackhole://"
<<<<<<< HEAD:cdc/api/validator_test.go
newInfo, err = verifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
=======
oldInfo.Config.Sink.TxnAtomicity = "table"
newInfo, err = VerifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038)):cdc/api/validator/validator_test.go
require.NotNil(t, err)
require.Regexp(t, ".*changefeed config is the same with the old one.*", err)
require.Nil(t, newInfo)
Expand Down
48 changes: 48 additions & 0 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipeline

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -79,11 +80,27 @@ type sinkNode struct {

flowController tableFlowController

<<<<<<< HEAD
replicaConfig *config.ReplicaConfig
isTableActorMode bool
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
=======
replicaConfig *config.ReplicaConfig
splitTxn bool
}

func newSinkNode(
tableID model.TableID, sink sink.Sink,
startTs model.Ts, targetTs model.Ts,
flowController tableFlowController,
redoManager redo.LogManager,
state *TableState,
changefeed model.ChangeFeedID,
splitTxn bool,
) *sinkNode {
>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038))
sn := &sinkNode{
tableID: tableID,
sink: sink,
Expand All @@ -93,6 +110,11 @@ func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, target
barrierTs: startTs,

flowController: flowController,
<<<<<<< HEAD
=======
redoManager: redoManager,
splitTxn: splitTxn,
>>>>>>> 0b7969dee (sink(ticdc): add transaction-atomicity parameter to SinkURI (#6038))
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
return sn
Expand Down Expand Up @@ -293,6 +315,10 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
switch msg.Tp {
case pmessage.MessageTypePolymorphicEvent:
event := msg.PolymorphicEvent
if err := n.verifySplitTxn(event); err != nil {
return false, errors.Trace(err)
}

if event.IsResolved() {
if n.status.Load() == TableStatusInitializing {
n.status.Store(TableStatusRunning)
Expand Down Expand Up @@ -346,3 +372,25 @@ func (n *sinkNode) releaseResource(ctx context.Context) error {
n.flowController.Abort()
return n.sink.Close(ctx)
}

// Verify that TxnAtomicity compatibility with BatchResolved event and RowChangedEvent
// with `SplitTxn==true`.
func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error {
if n.splitTxn {
return nil
}

// Fail-fast check, this situation should never happen normally when split transactions
// are not supported.
if e.Resolved != nil && e.Resolved.IsBatchMode() {
msg := fmt.Sprintf("batch mode resolved ts is not supported "+
"when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}

if e.Row != nil && e.Row.SplitTxn {
msg := fmt.Sprintf("should not split txn when sink.splitTxn is %+v", n.splitTxn)
return cerror.ErrSinkInvalidConfig.GenWithStackByArgs(msg)
}
return nil
}
Loading

0 comments on commit 57377f2

Please sign in to comment.