From 4d2e1bf61ce69bf42958e88efea203d3dfab80a0 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 21 Jun 2022 19:16:36 +0800 Subject: [PATCH] sink(ticdc): store checkpointTs by flushedResolvedTsMap in mysql sink (#5418) (#5437) close pingcap/tiflow#5107 --- cdc/sink/common/common.go | 30 +-- cdc/sink/common/common_test.go | 358 +++++++++++++++++++++------------ cdc/sink/mysql.go | 35 ++-- cdc/sink/mysql_test.go | 5 +- 4 files changed, 268 insertions(+), 160 deletions(-) diff --git a/cdc/sink/common/common.go b/cdc/sink/common/common.go index 702f7134bba..8541ca74aec 100644 --- a/cdc/sink/common/common.go +++ b/cdc/sink/common/common.go @@ -111,31 +111,39 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha func (c *UnresolvedTxnCache) Resolved(resolvedTsMap *sync.Map) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) { c.unresolvedTxnsMu.Lock() defer c.unresolvedTxnsMu.Unlock() - if len(c.unresolvedTxns) == 0 { - return nil, nil - } return splitResolvedTxn(resolvedTsMap, c.unresolvedTxns) } func splitResolvedTxn( resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, -) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { +) (checkpointTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { + var ( + ok bool + txnsLength int + txns []*txnsWithTheSameCommitTs + resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs + ) + + checkpointTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) + resolvedTsMap.Range(func(k, v interface{}) bool { + tableID := k.(model.TableID) + resolvedTs := v.(model.Ts) + checkpointTsMap[tableID] = resolvedTs + return true + }) + resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) - flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) - for tableID, txns := range unresolvedTxns { - v, ok := resolvedTsMap.Load(tableID) - if !ok { + for tableID, resolvedTs := range checkpointTsMap { + if txns, ok = unresolvedTxns[tableID]; !ok { continue } - resolvedTs := v.(uint64) i := sort.Search(len(txns), func(i int) bool { return txns[i].commitTs > resolvedTs }) if i == 0 { continue } - var resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs if i == len(txns) { resolvedTxnsWithTheSameCommitTs = txns delete(unresolvedTxns, tableID) @@ -143,7 +151,6 @@ func splitResolvedTxn( resolvedTxnsWithTheSameCommitTs = txns[:i] unresolvedTxns[tableID] = txns[i:] } - var txnsLength int for _, txns := range resolvedTxnsWithTheSameCommitTs { txnsLength += len(txns.txns) } @@ -154,7 +161,6 @@ func splitResolvedTxn( } } resolvedRowsMap[tableID] = resolvedTxns - flushedResolvedTsMap[tableID] = resolvedTs } return } diff --git a/cdc/sink/common/common_test.go b/cdc/sink/common/common_test.go index 615fbc8c3c7..0de69467eb8 100644 --- a/cdc/sink/common/common_test.go +++ b/cdc/sink/common/common_test.go @@ -20,142 +20,247 @@ import ( "github.com/google/go-cmp/cmp" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/stretchr/testify/require" ) func TestSplitResolvedTxn(test *testing.T) { - defer testleak.AfterTestT(test)() - + test.Parallel() testCases := [][]struct { input []*model.RowChangedEvent resolvedTsMap map[model.TableID]uint64 expected map[model.TableID][]*model.SingleTableTxn - }{{{ // Testing basic transaction collocation, no txns with the same commitTs - input: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}}, - }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(6), - 2: uint64(6), - }, - expected: map[model.TableID][]*model.SingleTableTxn{ - 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - }}}, - 2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 6, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - }}}, - }, - }, { - input: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}}, - }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(13), - 2: uint64(13), - 3: uint64(13), - }, - expected: map[model.TableID][]*model.SingleTableTxn{ - 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - }}, {Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 11, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, - }}}, - 2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 12, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}}, - }}}, - 3: {{Table: &model.TableName{TableID: 3}, StartTs: 1, CommitTs: 7, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}}, - }}, {Table: &model.TableName{TableID: 3}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}}, - }}}, - }, - }}, {{ // Testing the short circuit path - input: []*model.RowChangedEvent{}, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(13), - 2: uint64(13), - 3: uint64(13), + }{ + { // Testing basic transaction collocation, no txns with the same commitTs + { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(6), + 2: uint64(6), + 3: uint64(6), + }, + expected: map[model.TableID][]*model.SingleTableTxn{ + 1: {{ + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 5, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + }, + }}, + 2: {{ + Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 6, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + }, + }}, + }, + }, { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(13), + 2: uint64(13), + 3: uint64(13), + 4: uint64(6), + }, + expected: map[model.TableID][]*model.SingleTableTxn{ + 1: { + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 8, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + }, + }, + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 11, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, + }, + }, + }, + 2: { + { + Table: &model.TableName{TableID: 2}, + StartTs: 1, CommitTs: 12, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}}, + }, + }, + }, + 3: { + { + Table: &model.TableName{TableID: 3}, + StartTs: 1, + CommitTs: 7, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 3}}, + }, + }, + { + Table: &model.TableName{TableID: 3}, + StartTs: 1, + CommitTs: 8, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}}, + }, + }, + }, + }, + }, }, - expected: nil, - }, { - input: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 13, Table: &model.TableName{TableID: 2}}, + { // Testing the short circuit path + { + input: []*model.RowChangedEvent{}, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(13), + 2: uint64(13), + 3: uint64(13), + }, + expected: map[model.TableID][]*model.SingleTableTxn{}, + }, + { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 13, Table: &model.TableName{TableID: 2}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(6), + 2: uint64(6), + 3: uint64(13), + }, + expected: map[model.TableID][]*model.SingleTableTxn{}, + }, }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(6), - 2: uint64(6), + { // Testing the txns with the same commitTs + { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(6), + 2: uint64(6), + 3: uint64(13), + }, + expected: map[model.TableID][]*model.SingleTableTxn{ + 1: { + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 5, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, + }, + }, + }, + 2: { + { + Table: &model.TableName{TableID: 2}, + StartTs: 1, + CommitTs: 6, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + }, + }, { + Table: &model.TableName{TableID: 2}, + StartTs: 2, + CommitTs: 6, + Rows: []*model.RowChangedEvent{ + {StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}}, + }, + }, + }, + }, + }, + { + input: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + {StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}}, + }, + resolvedTsMap: map[model.TableID]uint64{ + 1: uint64(13), + 2: uint64(13), + 3: uint64(13), + }, + expected: map[model.TableID][]*model.SingleTableTxn{ + 1: { + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 8, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + }, + }, + { + Table: &model.TableName{TableID: 1}, + StartTs: 2, + CommitTs: 8, + Rows: []*model.RowChangedEvent{ + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, + }, + }, + { + Table: &model.TableName{TableID: 1}, + StartTs: 1, + CommitTs: 9, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}}, + }, + }, + }, + 2: { + { + Table: &model.TableName{TableID: 2}, + StartTs: 1, + CommitTs: 7, + Rows: []*model.RowChangedEvent{ + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + }, + }, { + Table: &model.TableName{TableID: 2}, + StartTs: 2, + CommitTs: 7, + Rows: []*model.RowChangedEvent{ + {StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}}, + }, + }, + }, + }, + }, }, - expected: map[model.TableID][]*model.SingleTableTxn{}, - }}, {{ // Testing the txns with the same commitTs - input: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(6), - 2: uint64(6), - }, - expected: map[model.TableID][]*model.SingleTableTxn{ - 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}}, - }}}, - 2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 6, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - }}, {Table: &model.TableName{TableID: 2}, StartTs: 2, CommitTs: 6, Rows: []*model.RowChangedEvent{ - {StartTs: 2, CommitTs: 6, Table: &model.TableName{TableID: 2}}, - }}}, - }, - }, { - input: []*model.RowChangedEvent{ - {StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}}, - }, - resolvedTsMap: map[model.TableID]uint64{ - 1: uint64(13), - 2: uint64(13), - }, - expected: map[model.TableID][]*model.SingleTableTxn{ - 1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - }}, {Table: &model.TableName{TableID: 1}, StartTs: 2, CommitTs: 8, Rows: []*model.RowChangedEvent{ - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - {StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}}, - }}, {Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 9, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}}, - }}}, - 2: {{Table: &model.TableName{TableID: 2}, StartTs: 1, CommitTs: 7, Rows: []*model.RowChangedEvent{ - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - {StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - }}, {Table: &model.TableName{TableID: 2}, StartTs: 2, CommitTs: 7, Rows: []*model.RowChangedEvent{ - {StartTs: 2, CommitTs: 7, Table: &model.TableName{TableID: 2}}, - }}}, - }, - }}} + } for _, tc := range testCases { cache := NewUnresolvedTxnCache() for _, t := range tc { @@ -164,7 +269,7 @@ func TestSplitResolvedTxn(test *testing.T) { for tableID, ts := range t.resolvedTsMap { resolvedTsMap.Store(tableID, ts) } - _, resolved := cache.Resolved(&resolvedTsMap) + checkpointTsMap, resolved := cache.Resolved(&resolvedTsMap) for tableID, txns := range resolved { sort.Slice(txns, func(i, j int) bool { if txns[i].CommitTs != txns[j].CommitTs { @@ -175,6 +280,7 @@ func TestSplitResolvedTxn(test *testing.T) { resolved[tableID] = txns } require.Equal(test, t.expected, resolved, cmp.Diff(resolved, t.expected)) + require.Equal(test, t.resolvedTsMap, checkpointTsMap) } } } diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 304c5434706..5bb03a88dc9 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -64,7 +64,7 @@ type mysqlSink struct { tableMaxResolvedTs sync.Map execWaitNotifier *notify.Notifier - resolvedNotifier *notify.Notifier + resolvedCh chan struct{} errCh chan error flushSyncWg sync.WaitGroup @@ -193,18 +193,14 @@ func newMySQLSink( } sink.execWaitNotifier = new(notify.Notifier) - sink.resolvedNotifier = new(notify.Notifier) + sink.resolvedCh = make(chan struct{}, 1) err = sink.createSinkWorkers(ctx) if err != nil { return nil, err } - receiver, err := sink.resolvedNotifier.NewReceiver(50 * time.Millisecond) - if err != nil { - return nil, err - } - go sink.flushRowChangedEvents(ctx, receiver) + go sink.flushRowChangedEvents(ctx) return sink, nil } @@ -222,12 +218,15 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab if !ok || v.(uint64) < resolvedTs { s.tableMaxResolvedTs.Store(tableID, resolvedTs) } - s.resolvedNotifier.Notify() // check and throw error select { + case <-ctx.Done(): + return 0, ctx.Err() case err := <-s.errCh: return 0, err + case s.resolvedCh <- struct{}{}: + // Notify `flushRowChangedEvents` to asynchronously write data. default: } @@ -236,7 +235,7 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab return checkpointTs, nil } -func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.Receiver) { +func (s *mysqlSink) flushRowChangedEvents(ctx context.Context) { defer func() { for _, worker := range s.workers { worker.closedCh <- struct{}{} @@ -246,16 +245,9 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify. select { case <-ctx.Done(): return - case <-receiver.C: - } - flushedResolvedTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs) - if len(resolvedTxnsMap) == 0 { - s.tableMaxResolvedTs.Range(func(key, value interface{}) bool { - s.tableCheckpointTs.Store(key, value) - return true - }) - continue + case <-s.resolvedCh: } + checkpointTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs) if s.cyclic != nil { // Filter rows if it is origin from downstream. @@ -264,8 +256,10 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify. s.statistics.SubRowsCount(skippedRowCount) } - s.dispatchAndExecTxns(ctx, resolvedTxnsMap) - for tableID, resolvedTs := range flushedResolvedTsMap { + if len(resolvedTxnsMap) != 0 { + s.dispatchAndExecTxns(ctx, resolvedTxnsMap) + } + for tableID, resolvedTs := range checkpointTsMap { s.tableCheckpointTs.Store(tableID, resolvedTs) } } @@ -478,7 +472,6 @@ func (s *mysqlSink) cleanTableResource(tableID model.TableID) { func (s *mysqlSink) Close(ctx context.Context) error { s.execWaitNotifier.Close() - s.resolvedNotifier.Close() err := s.db.Close() s.cancel() return cerror.WrapError(cerror.ErrMySQLConnectionError, err) diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 509a3451268..ee1e61bf0f6 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -1083,6 +1083,9 @@ func TestNewMySQLSink(t *testing.T) { require.Nil(t, err) err = sink.Close(ctx) require.Nil(t, err) + // Test idempotency of `Close` interface + err = sink.Close(ctx) + require.Nil(t, err) } func TestMySQLSinkClose(t *testing.T) { @@ -1232,7 +1235,7 @@ func TestCleanTableResource(t *testing.T) { require.Nil(t, s.Init(tblID)) m := &sync.Map{} m.Store(tblID, uint64(10)) - ret, _ := s.txnCache.Resolved(m) + _, ret := s.txnCache.Resolved(m) require.True(t, len(ret) == 0) _, ok := s.tableCheckpointTs.Load(tblID) require.False(t, ok)