Skip to content

Commit

Permalink
*(ticdc): split old update kv entry after restarting changefeed (#10919
Browse files Browse the repository at this point in the history
…) (#11029)

close #10918
  • Loading branch information
ti-chi-bot authored May 27, 2024
1 parent f8a1dbc commit 192fc75
Show file tree
Hide file tree
Showing 28 changed files with 1,093 additions and 92 deletions.
5 changes: 5 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type RawKVEntry struct {
RegionID uint64 `msg:"region_id"`
}

// IsUpdate checks if the event is an update event.
func (v *RawKVEntry) IsUpdate() bool {
return v.OpType == OpTypePut && v.OldValue != nil && v.Value != nil
}

func (v *RawKVEntry) String() string {
// TODO: redact values.
return fmt.Sprintf(
Expand Down
23 changes: 11 additions & 12 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,18 +772,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, there is no need to split a single unique key changed update event, this
// is also to keep the backward compatibility, the same behavior as before.
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) dontSplitUpdateEvent(scheme string) bool {
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(scheme) {
return true
}
return false
return sink.IsMySQLCompatibleScheme(scheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
Expand Down Expand Up @@ -813,8 +812,8 @@ func trySplitAndSortUpdateEvent(

// This indicates that it is an update event. if the pk or uk is updated,
// we need to split it into two events (delete and insert).
if e.IsUpdate() && shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if e.IsUpdate() && ShouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := SplitUpdateEvent(e)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -831,10 +830,10 @@ func trySplitAndSortUpdateEvent(
return rowChangedEvents, nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
Expand All @@ -856,8 +855,8 @@ func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(
// SplitUpdateEvent splits an update event into a delete and an insert event.
func SplitUpdateEvent(
updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error) {
if updateEvent == nil {
Expand Down
53 changes: 53 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -563,3 +564,55 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result))
}

var ukUpdatedEvent = &RowChangedEvent{
PreColumns: []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag | UniqueKeyFlag,
Value: "col2-value",
},
},

Columns: []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag | UniqueKeyFlag,
Value: "col2-value-updated",
},
},
}

func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
txn := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}

err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}
21 changes: 20 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"net/url"
"strconv"
"sync"
"time"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -445,6 +447,18 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts,
Expand Down Expand Up @@ -684,9 +698,14 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
return errors.Trace(err)
}

pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.changefeed.Info.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, p.changefeed.Info.Config.BDRMode)
sortEngine, p.changefeed.Info.Config.BDRMode,
pullerSafeModeAtStart)
p.sourceManager.name = "SourceManager"
p.sourceManager.spawn(stdCtx)

Expand Down
5 changes: 5 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))

// For duplicate entry error, we fast fail to restart changefeed.
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (suite *redoLogWorkerSuite) createWorker(
) (*redoWorker, engine.SortEngine, *mockRedoDMLManager) {
sortEngine := memory.New(context.Background())
sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}),
&entry.MockMountGroup{}, sortEngine, false)
&entry.MockMountGroup{}, sortEngine, false, false)
go func() { _ = sm.Run(ctx) }()

// To avoid refund or release panics.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (suite *tableSinkWorkerSuite) createWorker(
) (*sinkWorker, engine.SortEngine) {
sortEngine := memory.New(context.Background())
sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}),
&entry.MockMountGroup{}, sortEngine, false)
&entry.MockMountGroup{}, sortEngine, false, false)
go func() { sm.Run(ctx) }()

// To avoid refund or release panics.
Expand Down
59 changes: 58 additions & 1 deletion cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -25,8 +26,12 @@ import (
pullerwrapper "github.com/pingcap/tiflow/cdc/processor/sourcemanager/puller"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/puller"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -53,13 +58,18 @@ type SourceManager struct {
// Used to indicate whether the changefeed is in BDR mode.
bdrMode bool

safeModeAtStart bool
startTs model.Ts

// pullerWrapperCreator is used to create a puller wrapper.
// Only used for testing.
pullerWrapperCreator func(changefeed model.ChangeFeedID,
span tablepb.Span,
tableName string,
startTs model.Ts,
bdrMode bool,
shouldSplitKVEntry pullerwrapper.ShouldSplitKVEntry,
splitUpdateKVEntry pullerwrapper.SplitUpdateKVEntry,
) pullerwrapper.Wrapper
}

Expand All @@ -70,6 +80,7 @@ func New(
mg entry.MounterGroup,
engine engine.SortEngine,
bdrMode bool,
safeModeAtStart bool,
) *SourceManager {
return &SourceManager{
ready: make(chan struct{}),
Expand All @@ -79,6 +90,7 @@ func New(
engine: engine,
errChan: make(chan error, 16),
bdrMode: bdrMode,
safeModeAtStart: safeModeAtStart,
pullerWrapperCreator: pullerwrapper.NewPullerWrapper,
}
}
Expand All @@ -103,11 +115,31 @@ func NewForTest(
}
}

func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool {
return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs
}

func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil
}

// AddTable adds a table to the source manager. Start puller and register table to the engine.
func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts) {
// Add table to the engine first, so that the engine can receive the events from the puller.
m.engine.AddTable(span, startTs)
p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode)
shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs)
}
p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode, shouldSplitKVEntry, splitUpdateKVEntry)
p.Start(m.ctx, m.up, m.engine, m.errChan)
m.pullers.Store(span, p)
}
Expand Down Expand Up @@ -159,6 +191,11 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats

// Run implements util.Runnable.
func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error {
startTs, err := getCurrentTs(ctx, m.up.PDClient)
if err != nil {
return err
}
m.startTs = startTs
m.ctx = ctx
close(m.ready)
select {
Expand Down Expand Up @@ -208,3 +245,23 @@ func (m *SourceManager) Close() {
func (m *SourceManager) Add(span tablepb.Span, events ...*model.PolymorphicEvent) {
m.engine.Add(span, events...)
}

func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) {
backoffBaseDelayInMs := int64(100)
totalRetryDuration := 10 * time.Second
var replicateTs model.Ts
err := retry.Do(ctx, func() error {
phy, logic, err := pdClient.GetTS(ctx)
if err != nil {
return errors.Trace(err)
}
replicateTs = oracle.ComposeTS(phy, logic)
return nil
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithTotalRetryDuratoin(totalRetryDuration),
retry.WithIsRetryableErr(cerrors.IsRetryableError))
if err != nil {
return model.Ts(0), errors.Trace(err)
}
return replicateTs, nil
}
2 changes: 2 additions & 0 deletions cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func NewPullerWrapperForTest(
tableName string,
startTs model.Ts,
bdrMode bool,
shouldSplitKVEntry ShouldSplitKVEntry,
splitUpdateKVEntry SplitUpdateKVEntry,
) Wrapper {
return &dummyPullerWrapper{}
}
Expand Down
Loading

0 comments on commit 192fc75

Please sign in to comment.