From 8da7e8bebadd97e187ca24d8fa286cd71afb23b8 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 1 Jul 2022 19:01:49 +0800 Subject: [PATCH] sink(ticdc): remove force consume when redo is disabled (#5712) close pingcap/tiflow#1683, close pingcap/tiflow#5453 --- cdc/model/mounter.go | 66 +++-- cdc/model/mounter_test.go | 37 +++ cdc/processor/pipeline/sink.go | 60 ++-- cdc/processor/pipeline/sink_test.go | 24 +- cdc/processor/pipeline/sorter.go | 53 ++-- cdc/processor/pipeline/table.go | 13 +- cdc/processor/pipeline/table_actor.go | 33 ++- cdc/processor/pipeline/table_actor_test.go | 15 +- cdc/processor/processor.go | 2 + cdc/sink/black_hole.go | 4 +- cdc/sink/flowcontrol/flow_control.go | 119 +++++--- cdc/sink/flowcontrol/flow_control_test.go | 309 +++++++++++++++++++-- cdc/sink/flowcontrol/table_memory_quota.go | 6 +- cdc/sink/mq/mq.go | 32 ++- cdc/sink/mq/mq_test.go | 6 +- cdc/sink/mysql/mysql.go | 34 +-- cdc/sink/mysql/mysql_test.go | 22 +- cdc/sink/mysql/simple_mysql_tester.go | 6 +- cdc/sink/mysql/txn_cache.go | 12 +- cdc/sink/mysql/txn_cache_test.go | 4 +- cdc/sink/sink.go | 2 +- cdc/sink/table_sink.go | 39 ++- cmd/kafka-consumer/main.go | 8 +- 23 files changed, 645 insertions(+), 261 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 38f38bc2cc4..24f6e29f813 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -13,13 +13,18 @@ package model +import ( + "math" + + "github.com/pingcap/log" + "go.uber.org/zap" +) + // PolymorphicEvent describes an event can be in multiple states. type PolymorphicEvent struct { - StartTs uint64 - // Commit or resolved TS - CRTs uint64 - // Identify whether the resolved event is in batch mode. - Mode ResolvedMode + StartTs uint64 + CRTs uint64 + Resolved *ResolvedTs RawKV *RawKVEntry Row *RowChangedEvent @@ -66,11 +71,6 @@ func (e *PolymorphicEvent) IsResolved() bool { return e.RawKV.OpType == OpTypeResolved } -// IsBatchResolved returns true if the event is batch resolved event. -func (e *PolymorphicEvent) IsBatchResolved() bool { - return e.IsResolved() && e.Mode == BatchResolvedMode -} - // ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order. // It returns true if and only if i should precede j. func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { @@ -108,16 +108,48 @@ const ( // ResolvedTs is the resolved timestamp of sink module. type ResolvedTs struct { - Ts uint64 - Mode ResolvedMode + Mode ResolvedMode + Ts uint64 + BatchID uint64 } -// NewResolvedTs creates a new ResolvedTs. +// NewResolvedTs creates a normal ResolvedTs. func NewResolvedTs(t uint64) ResolvedTs { - return ResolvedTs{Ts: t, Mode: NormalResolvedMode} + return ResolvedTs{Ts: t, Mode: NormalResolvedMode, BatchID: math.MaxUint64} +} + +// IsBatchMode returns true if the resolved ts is BatchResolvedMode. +func (r ResolvedTs) IsBatchMode() bool { + return r.Mode == BatchResolvedMode +} + +// ResolvedMark returns a timestamp `ts` based on the r.mode, which marks that all events +// whose commitTs is less than or equal to `ts` are sent to Sink. +func (r ResolvedTs) ResolvedMark() uint64 { + switch r.Mode { + case NormalResolvedMode: + // with NormalResolvedMode, cdc guarantees all events whose commitTs is + // less than or equal to `resolved.Ts` are sent to Sink. + return r.Ts + case BatchResolvedMode: + // with BatchResolvedMode, cdc guarantees all events whose commitTs is + // less than `resolved.Ts` are sent to Sink. + return r.Ts - 1 + default: + log.Error("unknown resolved mode", zap.Any("resolved", r)) + return 0 + } +} + +// EqualOrGreater judge whether the resolved ts is equal or greater than the given ts. +func (r ResolvedTs) EqualOrGreater(r1 ResolvedTs) bool { + if r.Ts == r1.Ts { + return r.BatchID >= r1.BatchID + } + return r.Ts > r1.Ts } -// NewResolvedTsWithMode creates a ResolvedTs with a given batch type. -func NewResolvedTsWithMode(t uint64, m ResolvedMode) ResolvedTs { - return ResolvedTs{Ts: t, Mode: m} +// Less judge whether the resolved ts is less than the given ts. +func (r ResolvedTs) Less(r1 ResolvedTs) bool { + return !r.EqualOrGreater(r1) } diff --git a/cdc/model/mounter_test.go b/cdc/model/mounter_test.go index 3e7ec14f152..c692b3fc3b7 100644 --- a/cdc/model/mounter_test.go +++ b/cdc/model/mounter_test.go @@ -14,6 +14,7 @@ package model import ( + "math/rand" "testing" "github.com/stretchr/testify/require" @@ -44,3 +45,39 @@ func TestPolymorphicEvent(t *testing.T) { require.Equal(t, resolved.CRTs, polyEvent.CRTs) require.Equal(t, uint64(0), polyEvent.StartTs) } + +func TestResolvedTs(t *testing.T) { + t.Parallel() + + invalidResolvedTs := ResolvedTs{Mode: -1, Ts: 1} + require.Equal(t, uint64(0), invalidResolvedTs.ResolvedMark()) + + ts := rand.Uint64() + batchID := rand.Uint64() + normalResolvedTs := NewResolvedTs(ts) + batchResolvedTs1 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID} + require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs1)) + require.False(t, batchResolvedTs1.EqualOrGreater(normalResolvedTs)) + require.False(t, normalResolvedTs.Less(batchResolvedTs1)) + require.True(t, batchResolvedTs1.Less(normalResolvedTs)) + + batchResolvedTs2 := ResolvedTs{Mode: BatchResolvedMode, Ts: ts, BatchID: batchID + 1} + require.True(t, normalResolvedTs.EqualOrGreater(batchResolvedTs2)) + require.True(t, batchResolvedTs2.EqualOrGreater(batchResolvedTs1)) + require.True(t, batchResolvedTs2.Less(normalResolvedTs)) + require.True(t, batchResolvedTs1.Less(batchResolvedTs2)) + + largerResolvedTs := NewResolvedTs(ts + rand.Uint64()%10) + require.True(t, largerResolvedTs.EqualOrGreater(normalResolvedTs)) + largerBatchResolvedTs := ResolvedTs{ + Mode: BatchResolvedMode, + Ts: ts + rand.Uint64()%10, + BatchID: batchID, + } + require.True(t, largerBatchResolvedTs.EqualOrGreater(normalResolvedTs)) + + smallerResolvedTs := NewResolvedTs(0) + require.True(t, normalResolvedTs.EqualOrGreater(smallerResolvedTs)) + smallerBatchResolvedTs := ResolvedTs{Mode: BatchResolvedMode, Ts: 0, BatchID: batchID} + require.True(t, batchResolvedTs1.EqualOrGreater(smallerBatchResolvedTs)) +} diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 37ecd559762..e6ec1903662 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -73,7 +73,7 @@ type sinkNode struct { // atomic oprations for model.ResolvedTs resolvedTs atomic.Value - checkpointTs model.Ts + checkpointTs atomic.Value targetTs model.Ts barrierTs model.Ts @@ -85,23 +85,31 @@ type sinkNode struct { func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { sn := &sinkNode{ - tableID: tableID, - sink: sink, - status: TableStatusInitializing, - targetTs: targetTs, - checkpointTs: startTs, - barrierTs: startTs, + tableID: tableID, + sink: sink, + status: TableStatusInitializing, + targetTs: targetTs, + barrierTs: startTs, flowController: flowController, } sn.resolvedTs.Store(model.NewResolvedTs(startTs)) + sn.checkpointTs.Store(model.NewResolvedTs(startTs)) return sn } -func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) } -func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) } -func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } -func (n *sinkNode) Status() TableStatus { return n.status.Load() } +func (n *sinkNode) ResolvedTs() model.Ts { return n.getResolvedTs().ResolvedMark() } +func (n *sinkNode) CheckpointTs() model.Ts { return n.getCheckpointTs().ResolvedMark() } +func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } +func (n *sinkNode) Status() TableStatus { return n.status.Load() } + +func (n *sinkNode) getResolvedTs() model.ResolvedTs { + return n.resolvedTs.Load().(model.ResolvedTs) +} + +func (n *sinkNode) getCheckpointTs() model.ResolvedTs { + return n.checkpointTs.Load().(model.ResolvedTs) +} func (n *sinkNode) Init(ctx pipeline.NodeContext) error { n.replicaConfig = ctx.ChangefeedVars().Info.Config @@ -137,22 +145,22 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er n.status.Store(TableStatusStopped) return } - if atomic.LoadUint64(&n.checkpointTs) >= n.targetTs { + if n.CheckpointTs() >= n.targetTs { err = n.stop(ctx) } }() currentBarrierTs := atomic.LoadUint64(&n.barrierTs) - currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs) + currentCheckpointTs := n.getCheckpointTs() if resolved.Ts > currentBarrierTs { - resolved.Ts = currentBarrierTs + resolved = model.NewResolvedTs(currentBarrierTs) } if resolved.Ts > n.targetTs { - resolved.Ts = n.targetTs + resolved = model.NewResolvedTs(n.targetTs) } - if resolved.Ts <= currentCheckpointTs { + if currentCheckpointTs.EqualOrGreater(resolved) { return nil } - checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved) + checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved) if err != nil { return errors.Trace(err) } @@ -160,16 +168,16 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er // we must call flowController.Release immediately after we call // FlushRowChangedEvents to prevent deadlock cause by checkpointTs // fall back - n.flowController.Release(checkpointTs) + n.flowController.Release(checkpoint) // the checkpointTs may fall back in some situation such as: // 1. This table is newly added to the processor // 2. There is one table in the processor that has a smaller // checkpointTs than this one - if checkpointTs <= currentCheckpointTs { + if currentCheckpointTs.EqualOrGreater(checkpoint) { return nil } - atomic.StoreUint64(&n.checkpointTs, checkpointTs) + n.checkpointTs.Store(checkpoint) return nil } @@ -301,7 +309,13 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo failpoint.Return(false, errors.New("processor sync resolved injected error")) }) - resolved := model.NewResolvedTsWithMode(event.CRTs, event.Mode) + var resolved model.ResolvedTs + if event.Resolved != nil { + resolved = *(event.Resolved) + } else { + resolved = model.NewResolvedTs(event.CRTs) + } + if err := n.flushSink(ctx, resolved); err != nil { return false, errors.Trace(err) } @@ -312,7 +326,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo return false, errors.Trace(err) } case pmessage.MessageTypeTick: - if err := n.flushSink(ctx, n.ResolvedTs()); err != nil { + if err := n.flushSink(ctx, n.getResolvedTs()); err != nil { return false, errors.Trace(err) } case pmessage.MessageTypeCommand: @@ -331,7 +345,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error { atomic.StoreUint64(&n.barrierTs, ts) - if err := n.flushSink(ctx, n.ResolvedTs()); err != nil { + if err := n.flushSink(ctx, n.getResolvedTs()); err != nil { return errors.Trace(err) } return nil diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 0c8f5ced58d..0497e2b49c7 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -43,12 +43,12 @@ type mockFlowController struct{} func (c *mockFlowController) Consume( msg *model.PolymorphicEvent, size uint64, - blockCallBack func(bool) error, + blockCallBack func(uint64) error, ) error { return nil } -func (c *mockFlowController) Release(resolvedTs uint64) { +func (c *mockFlowController) Release(resolved model.ResolvedTs) { } func (c *mockFlowController) Abort() { @@ -78,12 +78,12 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error func (s *mockSink) FlushRowChangedEvents( ctx context.Context, _ model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { s.received = append(s.received, struct { resolvedTs model.Ts row *model.RowChangedEvent }{resolvedTs: resolved.Ts}) - return resolved.Ts, nil + return resolved, nil } func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error { @@ -422,7 +422,7 @@ func TestManyTs(t *testing.T) { {resolvedTs: 1}, }) sink.Reset() - require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs()) + require.Equal(t, model.NewResolvedTs(uint64(2)), node.getResolvedTs()) require.Equal(t, uint64(1), node.CheckpointTs()) require.Nil(t, node.Receive( @@ -435,7 +435,7 @@ func TestManyTs(t *testing.T) { {resolvedTs: 2}, }) sink.Reset() - require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs()) + require.Equal(t, model.NewResolvedTs(uint64(2)), node.getResolvedTs()) require.Equal(t, uint64(2), node.CheckpointTs()) } @@ -636,7 +636,7 @@ type flushFlowController struct { releaseCounter int } -func (c *flushFlowController) Release(resolvedTs uint64) { +func (c *flushFlowController) Release(resolved model.ResolvedTs) { c.releaseCounter++ } @@ -650,11 +650,11 @@ var fallBackResolvedTs = uint64(10) func (s *flushSink) FlushRowChangedEvents( ctx context.Context, _ model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { if resolved.Ts == fallBackResolvedTs { - return 0, nil + return model.NewResolvedTs(0), nil } - return resolved.Ts, nil + return resolved, nil } // TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always @@ -680,11 +680,11 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { err := sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(8))) require.Nil(t, err) - require.Equal(t, uint64(8), sNode.checkpointTs) + require.Equal(t, uint64(8), sNode.CheckpointTs()) require.Equal(t, 1, flowController.releaseCounter) // resolvedTs will fall back in this call err = sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(10))) require.Nil(t, err) - require.Equal(t, uint64(8), sNode.checkpointTs) + require.Equal(t, uint64(8), sNode.CheckpointTs()) require.Equal(t, 2, flowController.releaseCounter) } diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index f9cb796bcdb..37e2bf41dce 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -161,6 +161,20 @@ func (n *sorterNode) start( metricsTicker := time.NewTicker(flushMemoryMetricsDuration) defer metricsTicker.Stop() + resolvedTsInterpolateFunc := func(commitTs uint64) { + // checks the condition: cur_event_commit_ts > prev_event_commit_ts > last_resolved_ts + // If this is true, it implies that (1) the last transaction has finished, and we are + // processing the first event in a new transaction, (2) a resolved-ts is safe to be + // sent, but it has not yet. This means that we can interpolate prev_event_commit_ts + // as a resolved-ts, improving the frequency at which the sink flushes. + if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs { + lastSentResolvedTs = lastCRTs + lastSendResolvedTsTime = time.Now() + } + msg := model.NewResolvedPolymorphicEvent(0, lastSentResolvedTs) + ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg)) + } + for { // We must call `sorter.Output` before receiving resolved events. // Skip calling `sorter.Output` and caching output channel may fail @@ -188,34 +202,33 @@ func (n *sorterNode) start( commitTs := msg.CRTs // We interpolate a resolved-ts if none has been sent for some time. if time.Since(lastSendResolvedTsTime) > resolvedTsInterpolateInterval { - // checks the condition: cur_event_commit_ts > prev_event_commit_ts > last_resolved_ts - // If this is true, it implies that (1) the last transaction has finished, and we are processing - // the first event in a new transaction, (2) a resolved-ts is safe to be sent, but it has not yet. - // This means that we can interpolate prev_event_commit_ts as a resolved-ts, improving the frequency - // at which the sink flushes. - if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs { - lastSentResolvedTs = lastCRTs - lastSendResolvedTsTime = time.Now() - msg := model.NewResolvedPolymorphicEvent(0, lastCRTs) - ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg)) - } + resolvedTsInterpolateFunc(commitTs) } // We calculate memory consumption by RowChangedEvent size. // It's much larger than RawKVEntry. size := uint64(msg.Row.ApproximateBytes()) - // NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. - // Otherwise the pipeline would deadlock. - err = n.flowController.Consume(msg, size, func(batch bool) error { - if batch { - log.Panic("cdc does not support the batch resolve mechanism at this time") - } else if lastCRTs > lastSentResolvedTs { + // NOTE when redo log enabled, we allow the quota to be exceeded if blocking + // means interrupting a transaction. Otherwise the pipeline would deadlock. + err = n.flowController.Consume(msg, size, func(batchID uint64) error { + if commitTs > lastCRTs { // If we are blocking, we send a Resolved Event here to elicit a sink-flush. // Not sending a Resolved Event here will very likely deadlock the pipeline. - lastSentResolvedTs = lastCRTs - lastSendResolvedTsTime = time.Now() + resolvedTsInterpolateFunc(commitTs) + } else if commitTs == lastCRTs { + // send batch resolve event msg := model.NewResolvedPolymorphicEvent(0, lastCRTs) + msg.Resolved = &model.ResolvedTs{ + Ts: commitTs, + Mode: model.BatchResolvedMode, + BatchID: batchID, + } ctx.SendToNextNode(pmessage.PolymorphicEventMessage(msg)) + } else { + log.Panic("flow control blocked, report a bug", + zap.Uint64("commitTs", commitTs), + zap.Uint64("lastCommitTs", lastCRTs), + zap.Uint64("lastSentResolvedTs", lastSentResolvedTs)) } return nil }) @@ -229,7 +242,7 @@ func (n *sorterNode) start( } return nil } - lastCRTs = commitTs + lastCRTs = msg.CRTs } else { // handle OpTypeResolved if msg.CRTs < lastSentResolvedTs { diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 0c540f0a0a9..3de4d307256 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -79,8 +79,12 @@ type tablePipelineImpl struct { // TODO find a better name or avoid using an interface // We use an interface here for ease in unit testing. type tableFlowController interface { - Consume(msg *model.PolymorphicEvent, size uint64, blockCallBack func(batch bool) error) error - Release(resolvedTs uint64) + Consume( + msg *model.PolymorphicEvent, + size uint64, + blockCallBack func(batchID uint64) error, + ) error + Release(resolved model.ResolvedTs) Abort() GetConsumption() uint64 } @@ -92,7 +96,7 @@ func (t *tablePipelineImpl) ResolvedTs() model.Ts { // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if redo.IsConsistentEnabled(t.replConfig.Consistent.Level) { - return t.sinkNode.ResolvedTs().Ts + return t.sinkNode.ResolvedTs() } return t.sorterNode.ResolvedTs() } @@ -180,6 +184,7 @@ func NewTablePipeline(ctx cdcContext.Context, sink sink.Sink, targetTs model.Ts, upstream *upstream.Upstream, + redoLogEnabled bool, ) TablePipeline { ctx, cancel := cdcContext.WithCancel(ctx) changefeed := ctx.ChangefeedVars().ID @@ -199,7 +204,7 @@ func NewTablePipeline(ctx cdcContext.Context, zap.String("tableName", tableName), zap.Int64("tableID", tableID), zap.Uint64("quota", perTableMemoryQuota)) - flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota) + flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota, redoLogEnabled) config := ctx.ChangefeedVars().Info.Config cyclicEnabled := config.Cyclic != nil && config.Cyclic.IsEnabled() runnerSize := defaultRunnersSize diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 1944199f655..e633329d075 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -57,7 +57,8 @@ type tableActor struct { // backend mounter mounter entry.Mounter // backend tableSink - tableSink sink.Sink + tableSink sink.Sink + redoLogEnabled bool pullerNode *pullerNode sortNode *sorterNode @@ -103,6 +104,7 @@ func NewTableActor(cdcCtx cdcContext.Context, tableName string, replicaInfo *model.TableReplicaInfo, sink sink.Sink, + redoLogEnabled bool, targetTs model.Ts, ) (TablePipeline, error) { config := cdcCtx.ChangefeedVars().Info.Config @@ -124,18 +126,19 @@ func NewTableActor(cdcCtx cdcContext.Context, wg: wg, cancel: cancel, - tableID: tableID, - markTableID: replicaInfo.MarkTableID, - tableName: tableName, - cyclicEnabled: cyclicEnabled, - memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota, - upStream: upStream, - mounter: mounter, - replicaInfo: replicaInfo, - replicaConfig: config, - tableSink: sink, - targetTs: targetTs, - started: false, + tableID: tableID, + markTableID: replicaInfo.MarkTableID, + tableName: tableName, + cyclicEnabled: cyclicEnabled, + memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota, + upStream: upStream, + mounter: mounter, + replicaInfo: replicaInfo, + replicaConfig: config, + tableSink: sink, + redoLogEnabled: redoLogEnabled, + targetTs: targetTs, + started: false, changefeedID: changefeedVars.ID, changefeedVars: changefeedVars, @@ -279,7 +282,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { zap.String("tableName", t.tableName), zap.Uint64("quota", t.memoryQuota)) - flowController := flowcontrol.NewTableFlowController(t.memoryQuota) + flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoLogEnabled) sorterNode := newSorterNode(t.tableName, t.tableID, t.replicaInfo.StartTs, flowController, t.mounter, t.replicaConfig, @@ -431,7 +434,7 @@ func (t *tableActor) ResolvedTs() model.Ts { // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) { - return t.sinkNode.ResolvedTs().Ts + return t.sinkNode.ResolvedTs() } return t.sortNode.ResolvedTs() } diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index 0f80a4e9b6f..d5bc42366f0 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -16,7 +16,6 @@ package pipeline import ( "context" "sync" - "sync/atomic" "testing" "time" @@ -86,7 +85,7 @@ func TestTableActorInterface(t *testing.T) { require.Equal(t, TableStatusStopped, tbl.Status()) require.Equal(t, uint64(1), tbl.Workload().Workload) - atomic.StoreUint64(&sink.checkpointTs, 3) + sink.checkpointTs.Store(model.NewResolvedTs(3)) require.Equal(t, model.Ts(3), tbl.CheckpointTs()) require.Equal(t, model.Ts(5), tbl.ResolvedTs()) @@ -189,10 +188,10 @@ func TestPollTickMessage(t *testing.T) { status: TableStatusInitializing, sink: &mockSink{}, flowController: &mockFlowController{}, - checkpointTs: 10, targetTs: 11, } sn.resolvedTs.Store(model.NewResolvedTs(10)) + sn.checkpointTs.Store(model.NewResolvedTs(10)) tbl := tableActor{ sinkNode: sn, @@ -239,11 +238,11 @@ func TestPollStopMessage(t *testing.T) { func TestPollBarrierTsMessage(t *testing.T) { sn := &sinkNode{ - targetTs: 10, - checkpointTs: 5, - barrierTs: 8, + targetTs: 10, + barrierTs: 8, } sn.resolvedTs.Store(model.NewResolvedTs(5)) + sn.checkpointTs.Store(model.NewResolvedTs(5)) tbl := tableActor{ sinkNode: sn, @@ -359,7 +358,7 @@ func TestNewTableActor(t *testing.T) { &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, - }, &mockSink{}, 10) + }, &mockSink{}, false, 10) require.NotNil(t, tbl) require.Nil(t, err) require.NotPanics(t, func() { @@ -375,7 +374,7 @@ func TestNewTableActor(t *testing.T) { &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, - }, &mockSink{}, 10) + }, &mockSink{}, false, 10) require.Nil(t, tbl) require.NotNil(t, err) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 5d3ab1768d0..a13dcd88bb6 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -813,6 +813,7 @@ func (p *processor) createTablePipelineImpl( tableName, replicaInfo, s, + p.redoManager.Enabled(), p.changefeed.Info.GetTargetTs()) if err != nil { return nil, errors.Trace(err) @@ -827,6 +828,7 @@ func (p *processor) createTablePipelineImpl( s, p.changefeed.Info.GetTargetTs(), p.upStream, + p.redoManager.Enabled(), ) } diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index e00394daee4..51481ca8d27 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -55,7 +55,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model func (b *blackHoleSink) FlushRowChangedEvents( ctx context.Context, _ model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency @@ -65,7 +65,7 @@ func (b *blackHoleSink) FlushRowChangedEvents( return int(batchSize), nil }) b.statistics.PrintStatus(ctx) - return resolved.Ts, err + return resolved, err } func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error { diff --git a/cdc/sink/flowcontrol/flow_control.go b/cdc/sink/flowcontrol/flow_control.go index 1e97ed6390e..597c2b17cc0 100644 --- a/cdc/sink/flowcontrol/flow_control.go +++ b/cdc/sink/flowcontrol/flow_control.go @@ -25,14 +25,16 @@ import ( ) const ( - maxRowsPerTxn = 1024 - maxSizePerTxn = 1024 * 1024 /* 1MB */ - batchSize = 100 + defaultRowsPerTxn = 1024 + defaultSizePerTxn = 1024 * 1024 /* 1MB */ + defaultBatchSize = 100 ) // TableFlowController provides a convenient interface to control the memory consumption of a per table event stream type TableFlowController struct { - memoryQuota *tableMemoryQuota + memoryQuota *tableMemoryQuota + redoLogEnabled bool + lastCommitTs uint64 queueMu struct { sync.Mutex @@ -41,29 +43,43 @@ type TableFlowController struct { // batchGroupCount is the number of txnSizeEntries with same commitTs, which could be: // 1. Different txns with same commitTs but different startTs // 2. TxnSizeEntry split from the same txns which exceeds max rows or max size - batchGroupCount uint + batchGroupCount uint64 + batchID uint64 - lastCommitTs uint64 + batchSize uint64 + maxRowsPerTxn uint64 + maxSizePerTxn uint64 } type txnSizeEntry struct { // txn id startTs uint64 commitTs uint64 + size uint64 rowCount uint64 + batchID uint64 } // NewTableFlowController creates a new TableFlowController -func NewTableFlowController(quota uint64) *TableFlowController { +func NewTableFlowController(quota uint64, redoLogEnabled bool) *TableFlowController { + maxSizePerTxn := uint64(defaultSizePerTxn) + if maxSizePerTxn > quota && !redoLogEnabled { + maxSizePerTxn = quota + } + return &TableFlowController{ - memoryQuota: newTableMemoryQuota(quota), + memoryQuota: newTableMemoryQuota(quota), + redoLogEnabled: redoLogEnabled, queueMu: struct { sync.Mutex queue deque.Deque }{ queue: deque.NewDeque(), }, + batchSize: defaultBatchSize, + maxRowsPerTxn: defaultRowsPerTxn, + maxSizePerTxn: maxSizePerTxn, } } @@ -72,10 +88,19 @@ func NewTableFlowController(quota uint64) *TableFlowController { func (c *TableFlowController) Consume( msg *model.PolymorphicEvent, size uint64, - callBack func(batch bool) error, + callBack func(batchID uint64) error, ) error { commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) + blockingCallBack := func() error { + err := callBack(c.batchID) + + if commitTs == lastCommitTs { + c.batchID++ + c.resetBatch(lastCommitTs, commitTs) + } + return err + } if commitTs < lastCommitTs { log.Panic("commitTs regressed, report a bug", @@ -83,34 +108,33 @@ func (c *TableFlowController) Consume( zap.Uint64("lastCommitTs", c.lastCommitTs)) } - if commitTs > lastCommitTs { - err := c.memoryQuota.consumeWithBlocking(size, callBack) - if err != nil { + if commitTs == lastCommitTs && c.redoLogEnabled { + // Here commitTs == lastCommitTs, which means we are not crossing transaction + // boundaries, and redo log currently does not support split transactions, hence + // we use `forceConsume` to avoid deadlock. + // TODO: fix this after we figure out how to make redo log support split txn. + if err := c.memoryQuota.forceConsume(size); err != nil { return errors.Trace(err) } } else { - // Here commitTs == lastCommitTs, which means that we are not crossing - // a transaction boundary. In this situation, we use `forceConsume` because - // blocking the event stream mid-transaction is highly likely to cause - // a deadlock. - // TODO fix this in the future, after we figure out how to elegantly support large txns. - err := c.memoryQuota.forceConsume(size) - if err != nil { + if err := c.memoryQuota.consumeWithBlocking(size, blockingCallBack); err != nil { return errors.Trace(err) } } - c.enqueueSingleMsg(msg, size) + c.enqueueSingleMsg(msg, size, blockingCallBack) return nil } -// Release is called when all events committed before resolvedTs has been freed from memory. -func (c *TableFlowController) Release(resolvedTs uint64) { +// Release releases the memory quota based on the given resolved timestamp. +func (c *TableFlowController) Release(resolved model.ResolvedTs) { var nBytesToRelease uint64 c.queueMu.Lock() for c.queueMu.queue.Len() > 0 { - if peeked := c.queueMu.queue.Front().(*txnSizeEntry); peeked.commitTs <= resolvedTs { + peeked := c.queueMu.queue.Front().(*txnSizeEntry) + if peeked.commitTs < resolved.Ts || + (peeked.commitTs == resolved.Ts && peeked.batchID <= resolved.BatchID) { nBytesToRelease += peeked.size c.queueMu.queue.PopFront() } else { @@ -123,59 +147,68 @@ func (c *TableFlowController) Release(resolvedTs uint64) { } // Note that msgs received by enqueueSingleMsg must be sorted by commitTs_startTs order. -func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size uint64) { +func (c *TableFlowController) enqueueSingleMsg( + msg *model.PolymorphicEvent, size uint64, callback func() error, +) { commitTs := msg.CRTs lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) c.queueMu.Lock() defer c.queueMu.Unlock() - var e deque.Elem + e := c.queueMu.queue.Back() // 1. Processing a new txn with different commitTs. - if e = c.queueMu.queue.Back(); e == nil || lastCommitTs < commitTs { + if e == nil || lastCommitTs < commitTs { atomic.StoreUint64(&c.lastCommitTs, commitTs) - c.queueMu.queue.PushBack(&txnSizeEntry{ - startTs: msg.StartTs, - commitTs: commitTs, - size: size, - rowCount: 1, - }) - c.batchGroupCount = 1 - msg.Row.SplitTxn = true + c.resetBatch(lastCommitTs, commitTs) + c.addEntry(msg, size) return } // Processing txns with the same commitTs. txnEntry := e.(*txnSizeEntry) if txnEntry.commitTs != lastCommitTs { - log.Panic("got wrong commitTs from deque, report a bug", + log.Panic("got wrong commitTs from deque in flow control, report a bug", zap.Uint64("lastCommitTs", c.lastCommitTs), zap.Uint64("commitTsInDeque", txnEntry.commitTs)) } // 2. Append row to current txn entry. - if txnEntry.startTs == msg.Row.StartTs && - txnEntry.rowCount < maxRowsPerTxn && txnEntry.size < maxSizePerTxn { + if txnEntry.batchID == c.batchID && txnEntry.startTs == msg.Row.StartTs && + txnEntry.rowCount < c.maxRowsPerTxn && txnEntry.size < c.maxSizePerTxn { txnEntry.size += size txnEntry.rowCount++ return } // 3. Split the txn or handle a new txn with the same commitTs. + if c.batchGroupCount >= c.batchSize { + _ = callback() + } + c.addEntry(msg, size) +} + +// addEntry should be called only if c.queueMu is locked. +func (c *TableFlowController) addEntry(msg *model.PolymorphicEvent, size uint64) { + c.batchGroupCount++ c.queueMu.queue.PushBack(&txnSizeEntry{ startTs: msg.StartTs, - commitTs: commitTs, + commitTs: msg.CRTs, size: size, rowCount: 1, + batchID: c.batchID, }) - c.batchGroupCount++ msg.Row.SplitTxn = true +} - if c.batchGroupCount >= batchSize { - c.batchGroupCount = 0 - // TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem - log.Debug("emit batch resolve event throw callback") +// resetBatch reset batchID and batchGroupCount if handling a new txn, Otherwise, +// just reset batchGroupCount. +func (c *TableFlowController) resetBatch(lastCommitTs, commitTs uint64) { + if lastCommitTs < commitTs { + // At least one batch for each txn. + c.batchID = 1 } + c.batchGroupCount = 0 } // Abort interrupts any ongoing Consume call diff --git a/cdc/sink/flowcontrol/flow_control_test.go b/cdc/sink/flowcontrol/flow_control_test.go index 6836299e4a4..7c2cefac86a 100644 --- a/cdc/sink/flowcontrol/flow_control_test.go +++ b/cdc/sink/flowcontrol/flow_control_test.go @@ -21,12 +21,18 @@ import ( "testing" "time" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/stretchr/testify/require" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) -func dummyCallBack(_ bool) error { +func dummyCallBack() error { + return nil +} + +func dummyCallBackWithBatch(batchID uint64) error { return nil } @@ -35,7 +41,7 @@ type mockCallBacker struct { injectedErr error } -func (c *mockCallBacker) cb(_ bool) error { +func (c *mockCallBacker) cb(_ uint64) error { c.timesCalled += 1 return c.injectedErr } @@ -163,11 +169,11 @@ func TestMemoryQuotaReleaseZero(t *testing.T) { } type mockedEvent struct { - resolvedTs uint64 - size uint64 + resolved model.ResolvedTs + size uint64 } -func TestFlowControlBasic(t *testing.T) { +func TestFlowControlWithForceConsume(t *testing.T) { t.Parallel() var consumedBytes uint64 @@ -175,7 +181,7 @@ func TestFlowControlBasic(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(2048) + flowController := NewTableFlowController(2048, true) errg.Go(func() error { lastCommitTs := uint64(1) @@ -222,14 +228,14 @@ func TestFlowControlBasic(t *testing.T) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: resolvedTs, + resolved: model.NewResolvedTs(resolvedTs), }: } resolvedTs = mockedRow.commitTs updatedResolvedTs = true } err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), - mockedRow.size, dummyCallBack) + mockedRow.size, dummyCallBackWithBatch) require.Nil(t, err) select { case <-ctx.Done(): @@ -248,7 +254,7 @@ func TestFlowControlBasic(t *testing.T) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: resolvedTs, + resolved: model.NewResolvedTs(resolvedTs), }: } @@ -271,7 +277,250 @@ func TestFlowControlBasic(t *testing.T) { if event.size != 0 { atomic.AddUint64(&consumedBytes, -event.size) } else { - flowController.Release(event.resolvedTs) + flowController.Release(event.resolved) + } + } + + return nil + }) + + require.Nil(t, errg.Wait()) + require.Equal(t, uint64(0), atomic.LoadUint64(&consumedBytes)) + require.Equal(t, uint64(0), flowController.GetConsumption()) +} + +func TestFlowControlWithBatchAndForceConsume(t *testing.T) { + t.Parallel() + + var consumedBytes uint64 + ctx, cancel := context.WithTimeout(context.TODO(), time.Hour*10) + defer cancel() + errg, ctx := errgroup.WithContext(ctx) + mockedRowsCh := make(chan *txnSizeEntry, 1024) + flowController := NewTableFlowController(512, true) + maxBatch := uint64(3) + + // simulate a big txn + errg.Go(func() error { + lastCommitTs := uint64(1) + for i := uint64(0); i <= maxBatch*defaultRowsPerTxn*defaultBatchSize; i++ { + size := uint64(128 + rand.Int()%64) + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRowsCh <- &txnSizeEntry{ + commitTs: lastCommitTs, + size: size, + }: + } + } + + close(mockedRowsCh) + return nil + }) + + eventCh := make(chan *mockedEvent, 1024) + errg.Go(func() error { + defer close(eventCh) + lastCRTs := uint64(0) + maxBatchID := uint64(0) + for { + var mockedRow *txnSizeEntry + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRow = <-mockedRowsCh: + } + + if mockedRow == nil { + break + } + + atomic.AddUint64(&consumedBytes, mockedRow.size) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, func(batchID uint64) error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: batchID, + }, + }: + } + log.Debug("", zap.Any("batchID", batchID)) + maxBatchID = batchID + return nil + }) + require.Nil(t, err) + lastCRTs = mockedRow.commitTs + + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + size: mockedRow.size, + }: + } + } + require.Less(t, uint64(0), flowController.GetConsumption()) + require.Equal(t, maxBatch, maxBatchID) + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: maxBatchID + 1, + }, + }: + } + time.Sleep(time.Millisecond * 500) + require.Equal(t, uint64(0), flowController.GetConsumption()) + return nil + }) + + errg.Go(func() error { + for { + var event *mockedEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-eventCh: + } + + if event == nil { + break + } + + if event.size != 0 { + atomic.AddUint64(&consumedBytes, -event.size) + } else { + flowController.Release(event.resolved) + } + } + + return nil + }) + + require.Nil(t, errg.Wait()) + require.Equal(t, uint64(0), atomic.LoadUint64(&consumedBytes)) +} + +func TestFlowControlWithoutForceConsume(t *testing.T) { + t.Parallel() + + var consumedBytes uint64 + ctx, cancel := context.WithTimeout(context.TODO(), time.Hour*10) + defer cancel() + errg, ctx := errgroup.WithContext(ctx) + mockedRowsCh := make(chan *txnSizeEntry, 1024) + flowController := NewTableFlowController(512, false) + maxBatch := uint64(3) + + // simulate a big txn + errg.Go(func() error { + lastCommitTs := uint64(1) + for i := uint64(0); i < maxBatch*defaultRowsPerTxn*defaultBatchSize; i++ { + size := uint64(128 + rand.Int()%64) + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRowsCh <- &txnSizeEntry{ + commitTs: lastCommitTs, + size: size, + }: + } + } + + close(mockedRowsCh) + return nil + }) + + eventCh := make(chan *mockedEvent, 1024) + errg.Go(func() error { + defer close(eventCh) + lastCRTs := uint64(0) + maxBatchID := uint64(0) + for { + var mockedRow *txnSizeEntry + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRow = <-mockedRowsCh: + } + + if mockedRow == nil { + break + } + + atomic.AddUint64(&consumedBytes, mockedRow.size) + err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), + mockedRow.size, func(batchID uint64) error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: batchID, + }, + }: + } + log.Debug("", zap.Any("batchID", batchID)) + maxBatchID = batchID + return nil + }) + require.Nil(t, err) + lastCRTs = mockedRow.commitTs + + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + size: mockedRow.size, + }: + } + } + require.Less(t, uint64(0), flowController.GetConsumption()) + require.LessOrEqual(t, maxBatch, maxBatchID) + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolved: model.ResolvedTs{ + Mode: model.BatchResolvedMode, + Ts: lastCRTs, + BatchID: maxBatchID + 1, + }, + }: + } + time.Sleep(time.Millisecond * 500) + require.Equal(t, uint64(0), flowController.GetConsumption()) + return nil + }) + + errg.Go(func() error { + for { + var event *mockedEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-eventCh: + } + + if event == nil { + break + } + + if event.size != 0 { + atomic.AddUint64(&consumedBytes, -event.size) + } else { + flowController.Release(event.resolved) } } @@ -286,7 +535,7 @@ func TestFlowControlAbort(t *testing.T) { t.Parallel() callBacker := &mockCallBacker{} - controller := NewTableFlowController(1024) + controller := NewTableFlowController(1024, false) var wg sync.WaitGroup wg.Add(1) go func() { @@ -317,7 +566,7 @@ func TestFlowControlCallBack(t *testing.T) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 1024) - flowController := NewTableFlowController(512) + flowController := NewTableFlowController(512, false) errg.Go(func() error { lastCommitTs := uint64(1) @@ -358,12 +607,12 @@ func TestFlowControlCallBack(t *testing.T) { atomic.AddUint64(&consumedBytes, mockedRow.size) err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), - mockedRow.size, func(bool) error { + mockedRow.size, func(uint64) error { select { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: lastCRTs, + resolved: model.NewResolvedTs(lastCRTs), }: } return nil @@ -383,7 +632,7 @@ func TestFlowControlCallBack(t *testing.T) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: lastCRTs, + resolved: model.NewResolvedTs(lastCRTs), }: } @@ -406,7 +655,7 @@ func TestFlowControlCallBack(t *testing.T) { if event.size != 0 { atomic.AddUint64(&consumedBytes, -event.size) } else { - flowController.Release(event.resolvedTs) + flowController.Release(event.resolved) } } @@ -421,7 +670,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { t.Parallel() var wg sync.WaitGroup - controller := NewTableFlowController(512) + controller := NewTableFlowController(512, false) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -429,7 +678,7 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(uint64) error { t.Error("unreachable") return nil }) @@ -442,11 +691,11 @@ func TestFlowControlCallBackNotBlockingRelease(t *testing.T) { <-time.After(time.Second * 1) // makes sure that this test case is valid require.Equal(t, int32(1), atomic.LoadInt32(&isBlocked)) - controller.Release(1) + controller.Release(model.NewResolvedTs(1)) cancel() }() - err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(uint64) error { atomic.StoreInt32(&isBlocked, 1) <-ctx.Done() atomic.StoreInt32(&isBlocked, 0) @@ -463,7 +712,7 @@ func TestFlowControlCallBackError(t *testing.T) { t.Parallel() var wg sync.WaitGroup - controller := NewTableFlowController(512) + controller := NewTableFlowController(512, false) wg.Add(1) ctx, cancel := context.WithCancel(context.TODO()) @@ -471,12 +720,12 @@ func TestFlowControlCallBackError(t *testing.T) { go func() { defer wg.Done() - err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(bool) error { + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 511, func(uint64) error { t.Error("unreachable") return nil }) require.Nil(t, err) - err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(bool) error { + err = controller.Consume(model.NewEmptyPolymorphicEvent(2), 511, func(uint64) error { <-ctx.Done() return ctx.Err() }) @@ -492,8 +741,8 @@ func TestFlowControlCallBackError(t *testing.T) { func TestFlowControlConsumeLargerThanQuota(t *testing.T) { t.Parallel() - controller := NewTableFlowController(1024) - err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(bool) error { + controller := NewTableFlowController(1024, false) + err := controller.Consume(model.NewEmptyPolymorphicEvent(1), 2048, func(uint64) error { t.Error("unreachable") return nil }) @@ -505,7 +754,7 @@ func BenchmarkTableFlowController(B *testing.B) { defer cancel() errg, ctx := errgroup.WithContext(ctx) mockedRowsCh := make(chan *txnSizeEntry, 102400) - flowController := NewTableFlowController(20 * 1024 * 1024) // 20M + flowController := NewTableFlowController(20*1024*1024, false) // 20M errg.Go(func() error { lastCommitTs := uint64(1) @@ -549,13 +798,13 @@ func BenchmarkTableFlowController(B *testing.B) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: resolvedTs, + resolved: model.NewResolvedTs(resolvedTs), }: } resolvedTs = mockedRow.commitTs } err := flowController.Consume(model.NewEmptyPolymorphicEvent(mockedRow.commitTs), - mockedRow.size, dummyCallBack) + mockedRow.size, dummyCallBackWithBatch) if err != nil { B.Fatal(err) } @@ -571,7 +820,7 @@ func BenchmarkTableFlowController(B *testing.B) { case <-ctx.Done(): return ctx.Err() case eventCh <- &mockedEvent{ - resolvedTs: resolvedTs, + resolved: model.NewResolvedTs(resolvedTs), }: } @@ -592,7 +841,7 @@ func BenchmarkTableFlowController(B *testing.B) { } if event.size == 0 { - flowController.Release(event.resolvedTs) + flowController.Release(event.resolved) } } diff --git a/cdc/sink/flowcontrol/table_memory_quota.go b/cdc/sink/flowcontrol/table_memory_quota.go index c563ba4f333..7ca15e7857f 100644 --- a/cdc/sink/flowcontrol/table_memory_quota.go +++ b/cdc/sink/flowcontrol/table_memory_quota.go @@ -54,9 +54,7 @@ func newTableMemoryQuota(quota uint64) *tableMemoryQuota { // block until enough memory has been freed up by release. // blockCallBack will be called if the function will block. // Should be used with care to prevent deadlock. -func (c *tableMemoryQuota) consumeWithBlocking( - nBytes uint64, blockCallBack func(bool) error, -) error { +func (c *tableMemoryQuota) consumeWithBlocking(nBytes uint64, blockCallBack func() error) error { if nBytes >= c.quota { return cerrors.ErrFlowControllerEventLargerThanQuota.GenWithStackByArgs(nBytes, c.quota) } @@ -64,7 +62,7 @@ func (c *tableMemoryQuota) consumeWithBlocking( c.consumed.Lock() if c.consumed.bytes+nBytes >= c.quota { c.consumed.Unlock() - err := blockCallBack(false) + err := blockCallBack() if err != nil { return errors.Trace(err) } diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index fec9b47865e..f63bd1c9189 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -125,10 +125,10 @@ func (k *mqSink) AddTable(tableID model.TableID) error { // otherwise when the table is dispatched back again, // it may read the old values. // See: https://github.com/pingcap/tiflow/issues/4464#issuecomment-1085385382. - if checkpointTs, loaded := k.tableCheckpointTsMap.LoadAndDelete(tableID); loaded { + if checkpoint, loaded := k.tableCheckpointTsMap.LoadAndDelete(tableID); loaded { log.Info("clean up table checkpoint ts in MQ sink", zap.Int64("tableID", tableID), - zap.Uint64("checkpointTs", checkpointTs.(uint64))) + zap.Uint64("checkpointTs", checkpoint.(model.ResolvedTs).Ts)) } return nil @@ -174,25 +174,21 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha // FlushRowChangedEvents is thread-safe. func (k *mqSink) FlushRowChangedEvents( ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, -) (uint64, error) { - var checkpointTs uint64 - v, ok := k.tableCheckpointTsMap.Load(tableID) - if ok { - checkpointTs = v.(uint64) - } - if resolved.Ts <= checkpointTs { - return checkpointTs, nil +) (model.ResolvedTs, error) { + checkpoint := k.getTableCheckpointTs(tableID) + if checkpoint.EqualOrGreater(resolved) { + return checkpoint, nil } select { case <-ctx.Done(): - return 0, ctx.Err() + return model.NewResolvedTs(0), ctx.Err() case k.resolvedBuffer.In() <- resolvedTsEvent{ tableID: tableID, - resolved: model.NewResolvedTs(resolved.Ts), + resolved: resolved, }: } k.statistics.PrintStatus(ctx) - return checkpointTs, nil + return checkpoint, nil } // bgFlushTs flush resolvedTs to workers and flush the mqProducer @@ -217,7 +213,7 @@ func (k *mqSink) bgFlushTs(ctx context.Context) error { // Since CDC does not guarantee exactly once semantic, it won't cause any problem // here even if the table was moved or removed. // ref: https://github.com/pingcap/tiflow/pull/4356#discussion_r787405134 - k.tableCheckpointTsMap.Store(msg.tableID, resolved.Ts) + k.tableCheckpointTsMap.Store(msg.tableID, resolved) } } } @@ -360,6 +356,14 @@ func (k *mqSink) RemoveTable(cxt context.Context, tableID model.TableID) error { return nil } +func (k *mqSink) getTableCheckpointTs(tableID model.TableID) model.ResolvedTs { + v, ok := k.tableCheckpointTsMap.Load(tableID) + if ok { + return v.(model.ResolvedTs) + } + return model.NewResolvedTs(0) +} + func (k *mqSink) run(ctx context.Context) error { wg, ctx := errgroup.WithContext(ctx) wg.Go(func() error { diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index 76211767f89..d1d757665c0 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -56,7 +56,7 @@ func waitCheckpointTs(t *testing.T, s *mqSink, tableID int64, target uint64) uin var checkpointTs uint64 err := retry.Do(context.Background(), func() error { if v, ok := s.tableCheckpointTsMap.Load(tableID); ok { - checkpointTs = v.(uint64) + checkpointTs = v.(model.ResolvedTs).Ts } if checkpointTs >= target { return nil @@ -119,9 +119,9 @@ func TestKafkaSink(t *testing.T) { checkpointTs := waitCheckpointTs(t, sink, tableID, uint64(120)) require.Equal(t, uint64(120), checkpointTs) // flush older resolved ts - checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(110))) + checkpoint, err := sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(110))) require.Nil(t, err) - require.Equal(t, uint64(120), checkpointTs) + require.Equal(t, uint64(120), checkpoint.Ts) // mock kafka broker processes 1 checkpoint ts event err = sink.EmitCheckpointTs(ctx, uint64(120), []model.TableName{{ diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 4f4cbeac7f8..19d1e41bb99 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -230,20 +230,20 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row // Concurrency Note: FlushRowChangedEvents is thread-safe. func (s *mysqlSink) FlushRowChangedEvents( ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { if err := s.error.Load(); err != nil { - return 0, err + return model.NewResolvedTs(0), err } v, ok := s.getTableResolvedTs(tableID) - if !ok || v.Ts < resolved.Ts { + if !ok || v.Less(resolved) { s.tableMaxResolvedTs.Store(tableID, resolved) } // check and throw error select { case <-ctx.Done(): - return 0, ctx.Err() + return model.NewResolvedTs(0), ctx.Err() case s.resolvedCh <- struct{}{}: // Notify `flushRowChangedEvents` to asynchronously write data. default: @@ -291,8 +291,8 @@ outer: continue outer } } - for tableID, resolvedTs := range checkpointTsMap { - s.tableCheckpointTs.Store(tableID, resolvedTs) + for tableID, resolved := range checkpointTsMap { + s.tableCheckpointTs.Store(tableID, resolved) } } } @@ -531,10 +531,10 @@ func (s *mysqlSink) cleanTableResource(tableID model.TableID) { zap.Int64("tableID", tableID), zap.Uint64("resolvedTs", resolved.(model.ResolvedTs).Ts)) } - if checkpointTs, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded { + if checkpoint, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded { log.Info("clean up table checkpoint ts in MySQL sink", zap.Int64("tableID", tableID), - zap.Uint64("checkpointTs", checkpointTs.(uint64))) + zap.Uint64("checkpointTs", checkpoint.(model.ResolvedTs).Ts)) } // try to remove table txn cache s.txnCache.RemoveTableTxn(tableID) @@ -559,11 +559,11 @@ func (s *mysqlSink) RemoveTable(ctx context.Context, tableID model.TableID) erro return errors.Trace(ctx.Err()) case <-ticker.C: maxResolved, ok := s.getTableResolvedTs(tableID) - log.Warn("Barrier doesn't return in time, may be stuck", + log.Warn("RemoveTable doesn't return in time, may be stuck", zap.Int64("tableID", tableID), zap.Bool("hasResolvedTs", ok), zap.Any("resolvedTs", maxResolved.Ts), - zap.Uint64("checkpointTs", s.getTableCheckpointTs(tableID))) + zap.Uint64("checkpointTs", s.getTableCheckpointTs(tableID).Ts)) default: if err := s.error.Load(); err != nil { return err @@ -573,15 +573,15 @@ func (s *mysqlSink) RemoveTable(ctx context.Context, tableID model.TableID) erro log.Info("No table resolvedTs is found", zap.Int64("tableID", tableID)) return nil } - tableCkpt := s.getTableCheckpointTs(tableID) - if tableCkpt >= maxResolved.Ts { + checkpoint := s.getTableCheckpointTs(tableID) + if checkpoint.EqualOrGreater(maxResolved) { return nil } - checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolved) + checkpoint, err := s.FlushRowChangedEvents(ctx, tableID, maxResolved) if err != nil { return err } - if checkpointTs >= maxResolved.Ts { + if checkpoint.Ts >= maxResolved.Ts { return nil } // short sleep to avoid cpu spin @@ -590,12 +590,12 @@ func (s *mysqlSink) RemoveTable(ctx context.Context, tableID model.TableID) erro } } -func (s *mysqlSink) getTableCheckpointTs(tableID model.TableID) uint64 { +func (s *mysqlSink) getTableCheckpointTs(tableID model.TableID) model.ResolvedTs { v, ok := s.tableCheckpointTs.Load(tableID) if ok { - return v.(uint64) + return v.(model.ResolvedTs) } - return uint64(0) + return model.NewResolvedTs(0) } func (s *mysqlSink) getTableResolvedTs(tableID model.TableID) (model.ResolvedTs, bool) { diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index e09718e9fed..199f524270e 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1251,8 +1251,8 @@ func TestNewMySQLSinkExecDML(t *testing.T) { err = retry.Do(context.Background(), func() error { ts, err := sink.FlushRowChangedEvents(ctx, 1, model.NewResolvedTs(uint64(2))) require.Nil(t, err) - if ts < uint64(2) { - return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 2) + if ts.Ts < uint64(2) { + return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts.Ts, 2) } return nil }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) @@ -1262,8 +1262,8 @@ func TestNewMySQLSinkExecDML(t *testing.T) { err = retry.Do(context.Background(), func() error { ts, err := sink.FlushRowChangedEvents(ctx, 2, model.NewResolvedTs(uint64(4))) require.Nil(t, err) - if ts < uint64(4) { - return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 4) + if ts.Ts < uint64(4) { + return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts.Ts, 4) } return nil }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) @@ -1790,7 +1790,7 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { require.Nil(t, err) checkpoint, err := sink.FlushRowChangedEvents(ctx, model.TableID(1), model.NewResolvedTs(1)) require.Nil(t, err) - require.True(t, checkpoint <= 1) + require.True(t, checkpoint.Ts <= 1) rows := []*model.RowChangedEvent{ { Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, @@ -1808,9 +1808,9 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { err = sink.EmitRowChangedEvents(ctx, rows...) require.Nil(t, err) checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(1), model.NewResolvedTs(6)) - require.True(t, checkpoint <= 6) + require.True(t, checkpoint.Ts <= 6) require.Nil(t, err) - require.True(t, sink.getTableCheckpointTs(model.TableID(1)) <= 6) + require.True(t, sink.getTableCheckpointTs(model.TableID(1)).Ts <= 6) rows = []*model.RowChangedEvent{ { Table: &model.TableName{Schema: "s1", Table: "t2", TableID: 2}, @@ -1828,9 +1828,9 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { err = sink.EmitRowChangedEvents(ctx, rows...) require.Nil(t, err) checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), model.NewResolvedTs(5)) - require.True(t, checkpoint <= 5) + require.True(t, checkpoint.Ts <= 5) require.Nil(t, err) - require.True(t, sink.getTableCheckpointTs(model.TableID(2)) <= 5) + require.True(t, sink.getTableCheckpointTs(model.TableID(2)).Ts <= 5) _ = sink.Close(ctx) _, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), model.NewResolvedTs(6)) require.Nil(t, err) @@ -1906,7 +1906,7 @@ func TestCleanTableResource(t *testing.T) { require.Nil(t, s.EmitRowChangedEvents(ctx, &model.RowChangedEvent{ Table: &model.TableName{TableID: tblID, Schema: "test", Table: "t1"}, })) - s.tableCheckpointTs.Store(tblID, uint64(1)) + s.tableCheckpointTs.Store(tblID, model.NewResolvedTs(uint64(1))) s.tableMaxResolvedTs.Store(tblID, model.NewResolvedTs(uint64(2))) _, ok := s.txnCache.unresolvedTxns[tblID] require.True(t, ok) @@ -2094,7 +2094,7 @@ func TestMySQLSinkExecDMLError(t *testing.T) { if err != nil { break } - require.Less(t, ts, uint64(2)) + require.Less(t, ts.ResolvedMark(), uint64(2)) i++ } diff --git a/cdc/sink/mysql/simple_mysql_tester.go b/cdc/sink/mysql/simple_mysql_tester.go index ed68b1f12b5..9abad52a26e 100644 --- a/cdc/sink/mysql/simple_mysql_tester.go +++ b/cdc/sink/mysql/simple_mysql_tester.go @@ -179,7 +179,7 @@ func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` func (s *simpleMySQLSink) FlushRowChangedEvents( ctx context.Context, _ model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { s.rowsBufferLock.Lock() defer s.rowsBufferLock.Unlock() newBuffer := make([]*model.RowChangedEvent, 0, len(s.rowsBuffer)) @@ -187,14 +187,14 @@ func (s *simpleMySQLSink) FlushRowChangedEvents( if row.CommitTs <= resolved.Ts { err := s.executeRowChangedEvents(ctx, row) if err != nil { - return 0, err + return model.NewResolvedTs(0), err } } else { newBuffer = append(newBuffer, row) } } s.rowsBuffer = newBuffer - return resolved.Ts, nil + return resolved, nil } // EmitCheckpointTs sends CheckpointTs to Sink diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 440e8bd96c0..f7cb0dd9470 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -112,7 +112,7 @@ func (c *unresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha // The returned map contains many txns grouped by tableID. for each table, the each commitTs of txn in txns slice is strictly increasing func (c *unresolvedTxnCache) Resolved( resolvedTsMap *sync.Map, -) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) { +) (map[model.TableID]model.ResolvedTs, map[model.TableID][]*model.SingleTableTxn) { c.unresolvedTxnsMu.Lock() defer c.unresolvedTxnsMu.Unlock() @@ -121,7 +121,7 @@ func (c *unresolvedTxnCache) Resolved( func splitResolvedTxn( resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, -) (checkpointTsMap map[model.TableID]uint64, +) (checkpointTsMap map[model.TableID]model.ResolvedTs, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn, ) { var ( @@ -131,21 +131,21 @@ func splitResolvedTxn( resolvedTxnsWithTheSameCommitTs []*txnsWithTheSameCommitTs ) - checkpointTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) + checkpointTsMap = make(map[model.TableID]model.ResolvedTs, len(unresolvedTxns)) resolvedTsMap.Range(func(k, v any) bool { tableID := k.(model.TableID) resolved := v.(model.ResolvedTs) - checkpointTsMap[tableID] = resolved.Ts + checkpointTsMap[tableID] = resolved return true }) resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) - for tableID, resolvedTs := range checkpointTsMap { + for tableID, resolved := range checkpointTsMap { if txns, ok = unresolvedTxns[tableID]; !ok { continue } i := sort.Search(len(txns), func(i int) bool { - return txns[i].commitTs > resolvedTs + return txns[i].commitTs > resolved.Ts }) if i != 0 { if i == len(txns) { diff --git a/cdc/sink/mysql/txn_cache_test.go b/cdc/sink/mysql/txn_cache_test.go index d55bd25cf6c..f56d2a63785 100644 --- a/cdc/sink/mysql/txn_cache_test.go +++ b/cdc/sink/mysql/txn_cache_test.go @@ -266,7 +266,9 @@ func TestSplitResolvedTxn(test *testing.T) { for _, t := range tc { cache.Append(nil, t.input...) resolvedTsMap := sync.Map{} + expectedCheckpointTsMap := make(map[model.TableID]model.ResolvedTs) for tableID, ts := range t.resolvedTsMap { + expectedCheckpointTsMap[tableID] = model.NewResolvedTs(ts) resolvedTsMap.Store(tableID, model.NewResolvedTs(ts)) } checkpointTsMap, resolvedTxn := cache.Resolved(&resolvedTsMap) @@ -280,7 +282,7 @@ func TestSplitResolvedTxn(test *testing.T) { resolvedTxn[tableID] = txns } require.Equal(test, t.expected, resolvedTxn, cmp.Diff(resolvedTxn, t.expected)) - require.Equal(test, t.resolvedTsMap, checkpointTsMap) + require.Equal(test, expectedCheckpointTsMap, checkpointTsMap) } } } diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 7477e3fb6dd..4838e351cd8 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -62,7 +62,7 @@ type Sink interface { // FlushRowChangedEvents is thread-safe. FlushRowChangedEvents( ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, - ) (uint64, error) + ) (model.ResolvedTs, error) // EmitCheckpointTs sends CheckpointTs to Sink. // TiCDC guarantees that all Events **in the cluster** which of commitTs diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go index ee3bdc2a548..a3307fc6abf 100644 --- a/cdc/sink/table_sink.go +++ b/cdc/sink/table_sink.go @@ -73,7 +73,7 @@ func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // redo log watermarkTs. func (t *tableSink) FlushRowChangedEvents( ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, -) (uint64, error) { +) (model.ResolvedTs, error) { resolvedTs := resolved.Ts if tableID != t.tableID { log.Panic("inconsistent table sink", @@ -90,42 +90,35 @@ func (t *tableSink) FlushRowChangedEvents( err := t.backendSink.EmitRowChangedEvents(ctx, resolvedRows...) if err != nil { - return 0, errors.Trace(err) + return model.NewResolvedTs(0), errors.Trace(err) } return t.flushResolvedTs(ctx, resolved) } func (t *tableSink) flushResolvedTs( ctx context.Context, resolved model.ResolvedTs, -) (uint64, error) { - redoTs, err := t.flushRedoLogs(ctx, resolved.Ts) - if err != nil { - return 0, errors.Trace(err) - } - if redoTs < resolved.Ts { - resolved.Ts = redoTs +) (model.ResolvedTs, error) { + if t.redoManager.Enabled() { + if resolved.IsBatchMode() { + return model.NewResolvedTs(0), nil + } + err := t.redoManager.FlushLog(ctx, t.tableID, resolved.Ts) + if err != nil { + return model.NewResolvedTs(0), errors.Trace(err) + } + redoTs := t.redoManager.GetMinResolvedTs() + if redoTs < resolved.Ts { + resolved.Ts = redoTs + } } checkpointTs, err := t.backendSink.FlushRowChangedEvents(ctx, t.tableID, resolved) if err != nil { - return 0, errors.Trace(err) + return model.NewResolvedTs(0), errors.Trace(err) } return checkpointTs, nil } -// flushRedoLogs flush redo logs and returns redo log resolved ts which means -// all events before the ts have been persisted to redo log storage. -func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { - if t.redoManager.Enabled() { - err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs) - if err != nil { - return 0, err - } - return t.redoManager.GetMinResolvedTs(), nil - } - return resolvedTs, nil -} - func (t *tableSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error { // the table sink doesn't receive the checkpoint event return nil diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index e33f54e5906..9baaa681592 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -784,18 +784,18 @@ func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSink, resolve } // tables are flushed var ( - err error - checkpointTs uint64 + err error + checkpoint model.ResolvedTs ) flushedResolvedTs := true sink.tablesMap.Range(func(key, value interface{}) bool { tableID := key.(int64) - checkpointTs, err = sink.FlushRowChangedEvents(ctx, + checkpoint, err = sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(resolvedTs)) if err != nil { return false } - if checkpointTs < resolvedTs { + if checkpoint.Ts < resolvedTs { flushedResolvedTs = false } return true