Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*(ticdc): split old update kv entry after restarting changefeed (#10919) #11029

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type RawKVEntry struct {
RegionID uint64 `msg:"region_id"`
}

// IsUpdate checks if the event is an update event.
func (v *RawKVEntry) IsUpdate() bool {
return v.OpType == OpTypePut && v.OldValue != nil && v.Value != nil
}

func (v *RawKVEntry) String() string {
// TODO: redact values.
return fmt.Sprintf(
Expand Down
23 changes: 11 additions & 12 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,18 +772,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, there is no need to split a single unique key changed update event, this
// is also to keep the backward compatibility, the same behavior as before.
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) dontSplitUpdateEvent(scheme string) bool {
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(scheme) {
return true
}
return false
return sink.IsMySQLCompatibleScheme(scheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
Expand Down Expand Up @@ -813,8 +812,8 @@ func trySplitAndSortUpdateEvent(

// This indicates that it is an update event. if the pk or uk is updated,
// we need to split it into two events (delete and insert).
if e.IsUpdate() && shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if e.IsUpdate() && ShouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := SplitUpdateEvent(e)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -831,10 +830,10 @@ func trySplitAndSortUpdateEvent(
return rowChangedEvents, nil
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
Expand All @@ -856,8 +855,8 @@ func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(
// SplitUpdateEvent splits an update event into a delete and an insert event.
func SplitUpdateEvent(
updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error) {
if updateEvent == nil {
Expand Down
53 changes: 53 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -563,3 +564,55 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(result))
}

var ukUpdatedEvent = &RowChangedEvent{
PreColumns: []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag | UniqueKeyFlag,
Value: "col2-value",
},
},

Columns: []*Column{
{
Name: "col1",
Flag: BinaryFlag,
Value: "col1-value",
},
{
Name: "col2",
Flag: HandleKeyFlag | UniqueKeyFlag,
Value: "col2-value-updated",
},
},
}

func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
txn := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}

err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}
21 changes: 20 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"net/url"
"strconv"
"sync"
"time"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -445,6 +447,18 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts,
Expand Down Expand Up @@ -684,9 +698,14 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
return errors.Trace(err)
}

pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.changefeed.Info.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, p.changefeed.Info.Config.BDRMode)
sortEngine, p.changefeed.Info.Config.BDRMode,
pullerSafeModeAtStart)
p.sourceManager.name = "SourceManager"
p.sourceManager.spawn(stdCtx)

Expand Down
5 changes: 5 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))

// For duplicate entry error, we fast fail to restart changefeed.
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (suite *redoLogWorkerSuite) createWorker(
) (*redoWorker, engine.SortEngine, *mockRedoDMLManager) {
sortEngine := memory.New(context.Background())
sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}),
&entry.MockMountGroup{}, sortEngine, false)
&entry.MockMountGroup{}, sortEngine, false, false)
go func() { _ = sm.Run(ctx) }()

// To avoid refund or release panics.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (suite *tableSinkWorkerSuite) createWorker(
) (*sinkWorker, engine.SortEngine) {
sortEngine := memory.New(context.Background())
sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}),
&entry.MockMountGroup{}, sortEngine, false)
&entry.MockMountGroup{}, sortEngine, false, false)
go func() { sm.Run(ctx) }()

// To avoid refund or release panics.
Expand Down
59 changes: 58 additions & 1 deletion cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -25,8 +26,12 @@ import (
pullerwrapper "github.com/pingcap/tiflow/cdc/processor/sourcemanager/puller"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/puller"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -53,13 +58,18 @@ type SourceManager struct {
// Used to indicate whether the changefeed is in BDR mode.
bdrMode bool

safeModeAtStart bool
startTs model.Ts

// pullerWrapperCreator is used to create a puller wrapper.
// Only used for testing.
pullerWrapperCreator func(changefeed model.ChangeFeedID,
span tablepb.Span,
tableName string,
startTs model.Ts,
bdrMode bool,
shouldSplitKVEntry pullerwrapper.ShouldSplitKVEntry,
splitUpdateKVEntry pullerwrapper.SplitUpdateKVEntry,
) pullerwrapper.Wrapper
}

Expand All @@ -70,6 +80,7 @@ func New(
mg entry.MounterGroup,
engine engine.SortEngine,
bdrMode bool,
safeModeAtStart bool,
) *SourceManager {
return &SourceManager{
ready: make(chan struct{}),
Expand All @@ -79,6 +90,7 @@ func New(
engine: engine,
errChan: make(chan error, 16),
bdrMode: bdrMode,
safeModeAtStart: safeModeAtStart,
pullerWrapperCreator: pullerwrapper.NewPullerWrapper,
}
}
Expand All @@ -103,11 +115,31 @@ func NewForTest(
}
}

func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool {
return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs
}

func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil
}

// AddTable adds a table to the source manager. Start puller and register table to the engine.
func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts) {
// Add table to the engine first, so that the engine can receive the events from the puller.
m.engine.AddTable(span, startTs)
p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode)
shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs)
}
p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode, shouldSplitKVEntry, splitUpdateKVEntry)
p.Start(m.ctx, m.up, m.engine, m.errChan)
m.pullers.Store(span, p)
}
Expand Down Expand Up @@ -159,6 +191,11 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats

// Run implements util.Runnable.
func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error {
startTs, err := getCurrentTs(ctx, m.up.PDClient)
if err != nil {
return err
}
m.startTs = startTs
m.ctx = ctx
close(m.ready)
select {
Expand Down Expand Up @@ -208,3 +245,23 @@ func (m *SourceManager) Close() {
func (m *SourceManager) Add(span tablepb.Span, events ...*model.PolymorphicEvent) {
m.engine.Add(span, events...)
}

func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) {
backoffBaseDelayInMs := int64(100)
totalRetryDuration := 10 * time.Second
var replicateTs model.Ts
err := retry.Do(ctx, func() error {
phy, logic, err := pdClient.GetTS(ctx)
if err != nil {
return errors.Trace(err)
}
replicateTs = oracle.ComposeTS(phy, logic)
return nil
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithTotalRetryDuratoin(totalRetryDuration),
retry.WithIsRetryableErr(cerrors.IsRetryableError))
if err != nil {
return model.Ts(0), errors.Trace(err)
}
return replicateTs, nil
}
2 changes: 2 additions & 0 deletions cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func NewPullerWrapperForTest(
tableName string,
startTs model.Ts,
bdrMode bool,
shouldSplitKVEntry ShouldSplitKVEntry,
splitUpdateKVEntry SplitUpdateKVEntry,
) Wrapper {
return &dummyPullerWrapper{}
}
Expand Down
Loading
Loading