diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index ab2a3d3bec4..9998d6eb5d2 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -13,7 +13,7 @@ package model -// PolymorphicEvent describes an event can be in multiple states +// PolymorphicEvent describes an event can be in multiple states. type PolymorphicEvent struct { StartTs uint64 // Commit or resolved TS @@ -23,7 +23,16 @@ type PolymorphicEvent struct { Row *RowChangedEvent } -// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV +// NewEmptyPolymorphicEvent creates a new empty PolymorphicEvent. +func NewEmptyPolymorphicEvent(ts uint64) *PolymorphicEvent { + return &PolymorphicEvent{ + CRTs: ts, + RawKV: &RawKVEntry{}, + Row: &RowChangedEvent{}, + } +} + +// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV. func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent { if rawKV.OpType == OpTypeResolved { return NewResolvedPolymorphicEvent(rawKV.RegionID, rawKV.CRTs) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index e68e4453cce..eaa3740e505 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -267,6 +267,9 @@ type RowChangedEvent struct { // ApproximateDataSize is the approximate size of protobuf binary // representation of this event. ApproximateDataSize int64 `json:"-" msg:"-"` + + // SplitTxn marks this RowChangedEvent as the first line of a new txn. + SplitTxn bool `json:"-" msg:"-"` } // IsDelete returns true if the row is a delete event diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 343f4c8ed44..87766150120 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -40,7 +40,11 @@ type mockSink struct { // we are testing sinkNode by itself. type mockFlowController struct{} -func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error { +func (c *mockFlowController) Consume( + msg *model.PolymorphicEvent, + size uint64, + blockCallBack func(bool) error, +) error { return nil } diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index a503338f094..f9cb796bcdb 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -206,8 +206,10 @@ func (n *sorterNode) start( size := uint64(msg.Row.ApproximateBytes()) // NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. // Otherwise the pipeline would deadlock. - err = n.flowController.Consume(commitTs, size, func() error { - if lastCRTs > lastSentResolvedTs { + err = n.flowController.Consume(msg, size, func(batch bool) error { + if batch { + log.Panic("cdc does not support the batch resolve mechanism at this time") + } else if lastCRTs > lastSentResolvedTs { // If we are blocking, we send a Resolved Event here to elicit a sink-flush. // Not sending a Resolved Event here will very likely deadlock the pipeline. lastSentResolvedTs = lastCRTs diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 8c015656d7a..556c08ee1b3 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -78,7 +78,7 @@ type tablePipelineImpl struct { // TODO find a better name or avoid using an interface // We use an interface here for ease in unit testing. type tableFlowController interface { - Consume(commitTs uint64, size uint64, blockCallBack func() error) error + Consume(msg *model.PolymorphicEvent, size uint64, blockCallBack func(batch bool) error) error Release(resolvedTs uint64) Abort() GetConsumption() uint64 diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index c6099fa6557..1e97ed6390e 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -20,9 +20,16 @@ import ( "github.com/edwingeng/deque" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" "go.uber.org/zap" ) +const ( + maxRowsPerTxn = 1024 + maxSizePerTxn = 1024 * 1024 /* 1MB */ + batchSize = 100 +) + // TableFlowController provides a convenient interface to control the memory consumption of a per table event stream type TableFlowController struct { memoryQuota *tableMemoryQuota @@ -31,13 +38,20 @@ type TableFlowController struct { sync.Mutex queue deque.Deque } + // batchGroupCount is the number of txnSizeEntries with same commitTs, which could be: + // 1. Different txns with same commitTs but different startTs + // 2. TxnSizeEntry split from the same txns which exceeds max rows or max size + batchGroupCount uint lastCommitTs uint64 } -type commitTsSizeEntry struct { +type txnSizeEntry struct { + // txn id + startTs uint64 commitTs uint64 size uint64 + rowCount uint64 } // NewTableFlowController creates a new TableFlowController @@ -55,7 +69,12 @@ func NewTableFlowController(quota uint64) *TableFlowController { // Consume is called when an event has arrived for being processed by the sink. // It will handle transaction boundaries automatically, and will not block intra-transaction. -func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error { +func (c *TableFlowController) Consume( + msg *model.PolymorphicEvent, + size uint64, + callBack func(batch bool) error, +) error { + commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) if commitTs < lastCommitTs { @@ -65,8 +84,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac } if commitTs > lastCommitTs { - atomic.StoreUint64(&c.lastCommitTs, commitTs) - err := c.memoryQuota.consumeWithBlocking(size, blockCallBack) + err := c.memoryQuota.consumeWithBlocking(size, callBack) if err != nil { return errors.Trace(err) } @@ -82,13 +100,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac } } - c.queueMu.Lock() - defer c.queueMu.Unlock() - c.queueMu.queue.PushBack(&commitTsSizeEntry{ - commitTs: commitTs, - size: size, - }) - + c.enqueueSingleMsg(msg, size) return nil } @@ -98,7 +110,7 @@ func (c *TableFlowController) Release(resolvedTs uint64) { c.queueMu.Lock() for c.queueMu.queue.Len() > 0 { - if peeked := c.queueMu.queue.Front().(*commitTsSizeEntry); peeked.commitTs <= resolvedTs { + if peeked := c.queueMu.queue.Front().(*txnSizeEntry); peeked.commitTs <= resolvedTs { nBytesToRelease += peeked.size c.queueMu.queue.PopFront() } else { @@ -110,6 +122,62 @@ func (c *TableFlowController) Release(resolvedTs uint64) { c.memoryQuota.release(nBytesToRelease) } +// Note that msgs received by enqueueSingleMsg must be sorted by commitTs_startTs order. +func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size uint64) { + commitTs := msg.CRTs + lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) + + c.queueMu.Lock() + defer c.queueMu.Unlock() + + var e deque.Elem + // 1. Processing a new txn with different commitTs. + if e = c.queueMu.queue.Back(); e == nil || lastCommitTs < commitTs { + atomic.StoreUint64(&c.lastCommitTs, commitTs) + c.queueMu.queue.PushBack(&txnSizeEntry{ + startTs: msg.StartTs, + commitTs: commitTs, + size: size, + rowCount: 1, + }) + c.batchGroupCount = 1 + msg.Row.SplitTxn = true + return + } + + // Processing txns with the same commitTs. + txnEntry := e.(*txnSizeEntry) + if txnEntry.commitTs != lastCommitTs { + log.Panic("got wrong commitTs from deque, report a bug", + zap.Uint64("lastCommitTs", c.lastCommitTs), + zap.Uint64("commitTsInDeque", txnEntry.commitTs)) + } + + // 2. Append row to current txn entry. + if txnEntry.startTs == msg.Row.StartTs && + txnEntry.rowCount < maxRowsPerTxn && txnEntry.size < maxSizePerTxn { + txnEntry.size += size + txnEntry.rowCount++ + return + } + + // 3. Split the txn or handle a new txn with the same commitTs. + c.queueMu.queue.PushBack(&txnSizeEntry{ + startTs: msg.StartTs, + commitTs: commitTs, + size: size, + rowCount: 1, + }) + c.batchGroupCount++ + msg.Row.SplitTxn = true + + if c.batchGroupCount >= batchSize { + c.batchGroupCount = 0 + // TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem + log.Debug("emit batch resolve event throw callback") + } +} + // Abort interrupts any ongoing Consume call func (c *TableFlowController) Abort() { c.memoryQuota.abort() diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index 24f639fdf8a..6836299e4a4 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -21,11 +21,12 @@ import ( "testing" "time" + "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) -func dummyCallBack() error { +func dummyCallBack(_ bool) error { return nil } @@ -34,7 +35,7 @@ type mockCallBacker struct { injectedErr error } -func (c *mockCallBacker) cb() error { +func (c *mockCallBacker) cb(_ bool) error { c.timesCalled += 1 return c.injectedErr } @@ -173,7 +174,7 @@ func TestFlowControlBasic(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() errg, ctx := errgroup.WithContext(ctx) - mockedRowsCh := make(chan *commitTsSizeEntry, 1024) + mockedRowsCh := make(chan *txnSizeEntry, 1024) flowController := NewTableFlowController(2048) errg.Go(func() error { @@ -186,7 +187,7 @@ func TestFlowControlBasic(t *testing.T) { select { case <-ctx.Done(): return ctx.Err() - case mockedRowsCh <- &commitTsSizeEntry{ + case mockedRowsCh <- &txnSizeEntry{ commitTs: lastCommitTs, size: size, }: @@ -202,7 +203,7 @@ func TestFlowControlBasic(t *testing.T) { defer close(eventCh) resolvedTs := uint64(0) for { - var mockedRow *commitTsSizeEntry + var mockedRow *txnSizeEntry select { case <-ctx.Done(): return ctx.Err() @@ -227,7 +228,8 @@ func TestFlowControlBasic(t *testing.T) { resolvedTs = mockedRow.commitTs updatedResolvedTs = true } - err := flowController.Consume(mockedRow.commitTs, mockedRow.size, dummyCallBack) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, dummyCallBack) require.Nil(t, err) select { case <-ctx.Done(): @@ -290,13 +292,13 @@ func TestFlowControlAbort(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(1, 1000, callBacker.cb) + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 1000, callBacker.cb) require.Nil(t, err) require.Equal(t, 0, callBacker.timesCalled) - err = controller.Consume(2, 1000, callBacker.cb) + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 1000, callBacker.cb) require.Regexp(t, ".*ErrFlowControllerAborted.*", err) require.Equal(t, 1, callBacker.timesCalled) - err = controller.Consume(2, 10, callBacker.cb) + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 10, callBacker.cb) require.Regexp(t, ".*ErrFlowControllerAborted.*", err) require.Equal(t, 1, callBacker.timesCalled) }() @@ -314,7 +316,7 @@ func TestFlowControlCallBack(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() errg, ctx := errgroup.WithContext(ctx) - mockedRowsCh := make(chan *commitTsSizeEntry, 1024) + mockedRowsCh := make(chan *txnSizeEntry, 1024) flowController := NewTableFlowController(512) errg.Go(func() error { @@ -327,7 +329,7 @@ func TestFlowControlCallBack(t *testing.T) { select { case <-ctx.Done(): return ctx.Err() - case mockedRowsCh <- &commitTsSizeEntry{ + case mockedRowsCh <- &txnSizeEntry{ commitTs: lastCommitTs, size: size, }: @@ -343,7 +345,7 @@ func TestFlowControlCallBack(t *testing.T) { defer close(eventCh) lastCRTs := uint64(0) for { - var mockedRow *commitTsSizeEntry + var mockedRow *txnSizeEntry select { case <-ctx.Done(): return ctx.Err() @@ -355,16 +357,17 @@ func TestFlowControlCallBack(t *testing.T) { } atomic.AddUint64(&consumedBytes, mockedRow.size) - err := flowController.Consume(mockedRow.commitTs, mockedRow.size, func() error { - select { - case <-ctx.Done(): - return ctx.Err() - case eventCh <- &mockedEvent{ - resolvedTs: lastCRTs, - }: - } - return nil - }) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, func(bool) error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolvedTs: lastCRTs, + }: + } + return nil + }) require.Nil(t, err) lastCRTs = mockedRow.commitTs @@ -426,7 +429,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(1, 511, func() error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { t.Error("unreachable") return nil }) @@ -443,7 +446,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { cancel() }() - err = controller.Consume(2, 511, func() error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { atomic.StoreInt32(&isBlocked, 1) <-ctx.Done() atomic.StoreInt32(&isBlocked, 0) @@ -468,12 +471,12 @@ func TestFlowControlCallBackError(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(1, 511, func() error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { t.Error("unreachable") return nil }) require.Nil(t, err) - err = controller.Consume(2, 511, func() error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { <-ctx.Done() return ctx.Err() }) @@ -490,7 +493,7 @@ func TestFlowControlConsumeLargerThanQuota(t *testing.T) { t.Parallel() controller := NewTableFlowController(1024) - err := controller.Consume(1, 2048, func() error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(bool) error { t.Error("unreachable") return nil }) @@ -501,7 +504,7 @@ func BenchmarkTableFlowController(B *testing.B) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() errg, ctx := errgroup.WithContext(ctx) - mockedRowsCh := make(chan *commitTsSizeEntry, 102400) + mockedRowsCh := make(chan *txnSizeEntry, 102400) flowController := NewTableFlowController(20 * 1024 * 1024) // 20M errg.Go(func() error { @@ -514,7 +517,7 @@ func BenchmarkTableFlowController(B *testing.B) { select { case <-ctx.Done(): return ctx.Err() - case mockedRowsCh <- &commitTsSizeEntry{ + case mockedRowsCh <- &txnSizeEntry{ commitTs: lastCommitTs, size: size, }: @@ -530,7 +533,7 @@ func BenchmarkTableFlowController(B *testing.B) { defer close(eventCh) resolvedTs := uint64(0) for { - var mockedRow *commitTsSizeEntry + var mockedRow *txnSizeEntry select { case <-ctx.Done(): return ctx.Err() @@ -551,7 +554,8 @@ func BenchmarkTableFlowController(B *testing.B) { } resolvedTs = mockedRow.commitTs } - err := flowController.Consume(mockedRow.commitTs, mockedRow.size, dummyCallBack) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, dummyCallBack) if err != nil { B.Fatal(err) } diff --git a/cdc/sink/flowcontrol/table_memory_quota.go b/cdc/sink/flowcontrol/table_memory_quota.go index 7ca15e7857f..c563ba4f333 100644 --- a/cdc/sink/flowcontrol/table_memory_quota.go +++ b/cdc/sink/flowcontrol/table_memory_quota.go @@ -54,7 +54,9 @@ func newTableMemoryQuota(quota uint64) *tableMemoryQuota { // block until enough memory has been freed up by release. // blockCallBack will be called if the function will block. // Should be used with care to prevent deadlock. -func (c *tableMemoryQuota) consumeWithBlocking(nBytes uint64, blockCallBack func() error) error { +func (c *tableMemoryQuota) consumeWithBlocking( + nBytes uint64, blockCallBack func(bool) error, +) error { if nBytes >= c.quota { return cerrors.ErrFlowControllerEventLargerThanQuota.GenWithStackByArgs(nBytes, c.quota) } @@ -62,7 +64,7 @@ func (c *tableMemoryQuota) consumeWithBlocking(nBytes uint64, blockCallBack func c.consumed.Lock() if c.consumed.bytes+nBytes >= c.quota { c.consumed.Unlock() - err := blockCallBack() + err := blockCallBack(false) if err != nil { return errors.Trace(err) } diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 84c334ba300..8fa1a2227b6 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -36,7 +36,7 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) { } var txn *model.SingleTableTxn - if len(t.txns) == 0 || t.txns[len(t.txns)-1].StartTs < row.StartTs { + if len(t.txns) == 0 || row.SplitTxn || t.txns[len(t.txns)-1].StartTs < row.StartTs { txn = &model.SingleTableTxn{ StartTs: row.StartTs, CommitTs: row.CommitTs,