diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 4d91a2dd699..363ac513d1b 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -101,7 +101,7 @@ func NewMounter(schemaStorage SchemaStorage, // this method could block indefinitely if the DDL puller is lagging. func (m *mounterImpl) DecodeEvent(ctx context.Context, pEvent *model.PolymorphicEvent) error { m.metricTotalRows.Inc() - if pEvent.RawKV.OpType == model.OpTypeResolved { + if pEvent.IsResolved() { return nil } start := time.Now() diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index ab2a3d3bec4..ce507b3961f 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -13,17 +13,19 @@ package model -// PolymorphicEvent describes an event can be in multiple states +// PolymorphicEvent describes an event can be in multiple states. type PolymorphicEvent struct { StartTs uint64 // Commit or resolved TS CRTs uint64 + // Identify whether the resolved event is in batch mode. + Batch bool RawKV *RawKVEntry Row *RowChangedEvent } -// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV +// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV. func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent { if rawKV.OpType == OpTypeResolved { return NewResolvedPolymorphicEvent(rawKV.RegionID, rawKV.CRTs) @@ -35,7 +37,7 @@ func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent { } } -// NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts +// NewResolvedPolymorphicEvent creates a new PolymorphicEvent with the resolved ts. func NewResolvedPolymorphicEvent(regionID uint64, resolvedTs uint64) *PolymorphicEvent { return &PolymorphicEvent{ CRTs: resolvedTs, @@ -49,13 +51,23 @@ func (e *PolymorphicEvent) RegionID() uint64 { return e.RawKV.RegionID } +// IsResolved returns true if the event is resolved. +func (e *PolymorphicEvent) IsResolved() bool { + return e.RawKV != nil && e.RawKV.OpType == OpTypeResolved +} + +// IsBatchResolved returns true if the event is batch resolved event. +func (e *PolymorphicEvent) IsBatchResolved() bool { + return e.IsResolved() && e.Batch +} + // ComparePolymorphicEvents compares two events by CRTs, Resolved, StartTs, Delete/Put order. // It returns true if and only if i should precede j. func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { if i.CRTs == j.CRTs { - if i.RawKV.OpType == OpTypeResolved { + if i.IsResolved() { return false - } else if j.RawKV.OpType == OpTypeResolved { + } else if j.IsResolved() { return true } @@ -71,3 +83,19 @@ func ComparePolymorphicEvents(i, j *PolymorphicEvent) bool { } return i.CRTs < j.CRTs } + +// ResolvedTs is the resolved timestamp of sink module. +type ResolvedTs struct { + Ts uint64 + Batch bool +} + +// NewResolvedTs creates a new ResolvedTs. +func NewResolvedTs(t uint64) ResolvedTs { + return ResolvedTs{Ts: t, Batch: false} +} + +// NewBatchResolvedTs creates a ResolvedTs with a given batch type. +func NewBatchResolvedTs(t uint64, b bool) ResolvedTs { + return ResolvedTs{Ts: t, Batch: b} +} diff --git a/cdc/processor/pipeline/cyclic_mark.go b/cdc/processor/pipeline/cyclic_mark.go index 5df9e5fa9c8..010cf3b01b9 100644 --- a/cdc/processor/pipeline/cyclic_mark.go +++ b/cdc/processor/pipeline/cyclic_mark.go @@ -95,7 +95,7 @@ func (n *cyclicMarkNode) TryHandleDataMessage( case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent n.flush(ctx, event.CRTs) - if event.RawKV.OpType == model.OpTypeResolved { + if event.IsResolved() { ctx.SendToNextNode(msg) return true, nil } diff --git a/cdc/processor/pipeline/cyclic_mark_test.go b/cdc/processor/pipeline/cyclic_mark_test.go index 84d1f4ad09e..c5552ab9f72 100644 --- a/cdc/processor/pipeline/cyclic_mark_test.go +++ b/cdc/processor/pipeline/cyclic_mark_test.go @@ -171,7 +171,7 @@ func TestCyclicMarkNode(t *testing.T) { go func() { defer wg.Done() for row := range outputCh { - if row.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved { + if row.PolymorphicEvent.IsResolved() { continue } output = append(output, row.PolymorphicEvent.Row) @@ -213,7 +213,7 @@ func TestCyclicMarkNode(t *testing.T) { require.Nil(t, err) output := []*model.RowChangedEvent{} putToOutput := func(row *pmessage.Message) { - if row == nil || row.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved { + if row == nil || row.PolymorphicEvent.IsResolved() { return } output = append(output, row.PolymorphicEvent.Row) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index e87bf4c9859..80b735caabb 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -71,7 +71,8 @@ type sinkNode struct { status TableStatus tableID model.TableID - resolvedTs model.Ts + // atomic oprations for model.ResolvedTs + resolvedTs atomic.Value checkpointTs model.Ts targetTs model.Ts barrierTs model.Ts @@ -83,23 +84,24 @@ type sinkNode struct { } func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { - return &sinkNode{ + sn := &sinkNode{ tableID: tableID, sink: sink, status: TableStatusInitializing, targetTs: targetTs, - resolvedTs: startTs, checkpointTs: startTs, barrierTs: startTs, flowController: flowController, } + sn.resolvedTs.Store(model.NewResolvedTs(startTs)) + return sn } -func (n *sinkNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) } -func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) } -func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } -func (n *sinkNode) Status() TableStatus { return n.status.Load() } +func (n *sinkNode) ResolvedTs() model.ResolvedTs { return n.resolvedTs.Load().(model.ResolvedTs) } +func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) } +func (n *sinkNode) BarrierTs() model.Ts { return atomic.LoadUint64(&n.barrierTs) } +func (n *sinkNode) Status() TableStatus { return n.status.Load() } func (n *sinkNode) Init(ctx pipeline.NodeContext) error { n.replicaConfig = ctx.ChangefeedVars().Info.Config @@ -129,7 +131,7 @@ func (n *sinkNode) stop(ctx context.Context) (err error) { // flushSink emits all rows in rowBuffer to the backend sink and flushes // the backend sink. -func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err error) { +func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (err error) { defer func() { if err != nil { n.status.Store(TableStatusStopped) @@ -141,16 +143,16 @@ func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err erro }() currentBarrierTs := atomic.LoadUint64(&n.barrierTs) currentCheckpointTs := atomic.LoadUint64(&n.checkpointTs) - if resolvedTs > currentBarrierTs { - resolvedTs = currentBarrierTs + if resolved.Ts > currentBarrierTs { + resolved.Ts = currentBarrierTs } - if resolvedTs > n.targetTs { - resolvedTs = n.targetTs + if resolved.Ts > n.targetTs { + resolved.Ts = n.targetTs } - if resolvedTs <= currentCheckpointTs { + if resolved.Ts <= currentCheckpointTs { return nil } - checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs) + checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved) if err != nil { return errors.Trace(err) } @@ -291,24 +293,26 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo switch msg.Tp { case pmessage.MessageTypePolymorphicEvent: event := msg.PolymorphicEvent - if event.RawKV.OpType == model.OpTypeResolved { + if event.IsResolved() { if n.status.Load() == TableStatusInitializing { n.status.Store(TableStatusRunning) } failpoint.Inject("ProcessorSyncResolvedError", func() { failpoint.Return(false, errors.New("processor sync resolved injected error")) }) - if err := n.flushSink(ctx, msg.PolymorphicEvent.CRTs); err != nil { + + resolved := model.NewBatchResolvedTs(event.CRTs, event.Batch) + if err := n.flushSink(ctx, resolved); err != nil { return false, errors.Trace(err) } - atomic.StoreUint64(&n.resolvedTs, msg.PolymorphicEvent.CRTs) + n.resolvedTs.Store(resolved) return true, nil } if err := n.emitRowToSink(ctx, event); err != nil { return false, errors.Trace(err) } case pmessage.MessageTypeTick: - if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil { + if err := n.flushSink(ctx, n.ResolvedTs()); err != nil { return false, errors.Trace(err) } case pmessage.MessageTypeCommand: @@ -327,7 +331,7 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo func (n *sinkNode) updateBarrierTs(ctx context.Context, ts model.Ts) error { atomic.StoreUint64(&n.barrierTs, ts) - if err := n.flushSink(ctx, atomic.LoadUint64(&n.resolvedTs)); err != nil { + if err := n.flushSink(ctx, n.ResolvedTs()); err != nil { return errors.Trace(err) } return nil diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 343f4c8ed44..5247290c41e 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -72,12 +72,14 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { +func (s *mockSink) FlushRowChangedEvents( + ctx context.Context, _ model.TableID, resolved model.ResolvedTs, +) (uint64, error) { s.received = append(s.received, struct { resolvedTs model.Ts row *model.RowChangedEvent - }{resolvedTs: resolvedTs}) - return resolvedTs, nil + }{resolvedTs: resolved.Ts}) + return resolved.Ts, nil } func (s *mockSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error { @@ -416,7 +418,7 @@ func TestManyTs(t *testing.T) { {resolvedTs: 1}, }) sink.Reset() - require.Equal(t, uint64(2), node.ResolvedTs()) + require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs()) require.Equal(t, uint64(1), node.CheckpointTs()) require.Nil(t, node.Receive( @@ -429,7 +431,7 @@ func TestManyTs(t *testing.T) { {resolvedTs: 2}, }) sink.Reset() - require.Equal(t, uint64(2), node.ResolvedTs()) + require.Equal(t, model.NewResolvedTs(uint64(2)), node.ResolvedTs()) require.Equal(t, uint64(2), node.CheckpointTs()) } @@ -642,11 +644,13 @@ type flushSink struct { // fall back var fallBackResolvedTs = uint64(10) -func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { - if resolvedTs == fallBackResolvedTs { +func (s *flushSink) FlushRowChangedEvents( + ctx context.Context, _ model.TableID, resolved model.ResolvedTs, +) (uint64, error) { + if resolved.Ts == fallBackResolvedTs { return 0, nil } - return resolvedTs, nil + return resolved.Ts, nil } // TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always @@ -670,12 +674,12 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil))) sNode.barrierTs = 10 - err := sNode.flushSink(context.Background(), uint64(8)) + err := sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(8))) require.Nil(t, err) require.Equal(t, uint64(8), sNode.checkpointTs) require.Equal(t, 1, flowController.releaseCounter) // resolvedTs will fall back in this call - err = sNode.flushSink(context.Background(), uint64(10)) + err = sNode.flushSink(context.Background(), model.NewResolvedTs(uint64(10))) require.Nil(t, err) require.Equal(t, uint64(8), sNode.checkpointTs) require.Equal(t, 2, flowController.releaseCounter) diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 8c015656d7a..819c797f53f 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -91,7 +91,7 @@ func (t *tablePipelineImpl) ResolvedTs() model.Ts { // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if redo.IsConsistentEnabled(t.replConfig.Consistent.Level) { - return t.sinkNode.ResolvedTs() + return t.sinkNode.ResolvedTs().Ts } return t.sorterNode.ResolvedTs() } diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index d6a1bb35f9f..6c28c9e6215 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -429,7 +429,7 @@ func (t *tableActor) ResolvedTs() model.Ts { // another replication barrier for consistent replication instead of reusing // the global resolved-ts. if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) { - return t.sinkNode.ResolvedTs() + return t.sinkNode.ResolvedTs().Ts } return t.sortNode.ResolvedTs() } diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index 92f25cef6a9..0f80a4e9b6f 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -91,7 +91,7 @@ func TestTableActorInterface(t *testing.T) { require.Equal(t, model.Ts(5), tbl.ResolvedTs()) tbl.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual) - atomic.StoreUint64(&sink.resolvedTs, 6) + sink.resolvedTs.Store(model.NewResolvedTs(6)) require.Equal(t, model.Ts(6), tbl.ResolvedTs()) } @@ -184,15 +184,18 @@ func TestPollStoppedActor(t *testing.T) { func TestPollTickMessage(t *testing.T) { startTime := time.Now().Add(-sinkFlushInterval) + + sn := &sinkNode{ + status: TableStatusInitializing, + sink: &mockSink{}, + flowController: &mockFlowController{}, + checkpointTs: 10, + targetTs: 11, + } + sn.resolvedTs.Store(model.NewResolvedTs(10)) + tbl := tableActor{ - sinkNode: &sinkNode{ - status: TableStatusInitializing, - sink: &mockSink{}, - flowController: &mockFlowController{}, - resolvedTs: 10, - checkpointTs: 10, - targetTs: 11, - }, + sinkNode: sn, lastFlushSinkTime: time.Now().Add(-2 * sinkFlushInterval), cancel: func() {}, reportErr: func(err error) {}, @@ -235,13 +238,15 @@ func TestPollStopMessage(t *testing.T) { } func TestPollBarrierTsMessage(t *testing.T) { + sn := &sinkNode{ + targetTs: 10, + checkpointTs: 5, + barrierTs: 8, + } + sn.resolvedTs.Store(model.NewResolvedTs(5)) + tbl := tableActor{ - sinkNode: &sinkNode{ - targetTs: 10, - checkpointTs: 5, - resolvedTs: 5, - barrierTs: 8, - }, + sinkNode: sn, sortNode: &sorterNode{ barrierTs: 8, }, diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 23054c304ef..e00394daee4 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -37,6 +37,8 @@ type blackHoleSink struct { lastAccumulated uint64 } +var _ Sink = (*blackHoleSink)(nil) + func (b *blackHoleSink) AddTable(tableID model.TableID) error { return nil } @@ -51,8 +53,10 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model return nil } -func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { - log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs)) +func (b *blackHoleSink) FlushRowChangedEvents( + ctx context.Context, _ model.TableID, resolved model.ResolvedTs, +) (uint64, error) { + log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolved.Ts)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency accumulated := atomic.LoadUint64(&b.accumulated) @@ -61,7 +65,7 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.Table return int(batchSize), nil }) b.statistics.PrintStatus(ctx) - return resolvedTs, err + return resolved.Ts, err } func (b *blackHoleSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error { diff --git a/cdc/sink/mq/mq.go b/cdc/sink/mq/mq.go index 7e42dab6325..1d472836454 100644 --- a/cdc/sink/mq/mq.go +++ b/cdc/sink/mq/mq.go @@ -43,8 +43,8 @@ import ( ) type resolvedTsEvent struct { - tableID model.TableID - resolvedTs model.Ts + tableID model.TableID + resolved model.ResolvedTs } const ( @@ -181,21 +181,23 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha // that the data before the resolvedTs has been // successfully written downstream. // FlushRowChangedEvents is thread-safe. -func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { +func (k *mqSink) FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, +) (uint64, error) { var checkpointTs uint64 v, ok := k.tableCheckpointTsMap.Load(tableID) if ok { checkpointTs = v.(uint64) } - if resolvedTs <= checkpointTs { + if resolved.Ts <= checkpointTs { return checkpointTs, nil } select { case <-ctx.Done(): return 0, ctx.Err() case k.resolvedBuffer <- resolvedTsEvent{ - tableID: tableID, - resolvedTs: resolvedTs, + tableID: tableID, + resolved: model.NewResolvedTs(resolved.Ts), }: } k.statistics.PrintStatus(ctx) @@ -209,21 +211,21 @@ func (k *mqSink) bgFlushTs(ctx context.Context) error { case <-ctx.Done(): return errors.Trace(ctx.Err()) case msg := <-k.resolvedBuffer: - resolvedTs := msg.resolvedTs - err := k.flushTsToWorker(ctx, resolvedTs) + resolved := msg.resolved + err := k.flushTsToWorker(ctx, resolved) if err != nil { return errors.Trace(err) } // Since CDC does not guarantee exactly once semantic, it won't cause any problem // here even if the table was moved or removed. // ref: https://github.com/pingcap/tiflow/pull/4356#discussion_r787405134 - k.tableCheckpointTsMap.Store(msg.tableID, resolvedTs) + k.tableCheckpointTsMap.Store(msg.tableID, resolved.Ts) } } } -func (k *mqSink) flushTsToWorker(ctx context.Context, resolvedTs model.Ts) error { - if err := k.flushWorker.addEvent(ctx, mqEvent{resolvedTs: resolvedTs}); err != nil { +func (k *mqSink) flushTsToWorker(ctx context.Context, resolved model.ResolvedTs) error { + if err := k.flushWorker.addEvent(ctx, mqEvent{resolved: resolved}); err != nil { if errors.Cause(err) != context.Canceled { log.Warn("failed to flush TS to worker", zap.Error(err)) } else { diff --git a/cdc/sink/mq/mq_flush_worker.go b/cdc/sink/mq/mq_flush_worker.go index bf96cd5ab01..3c082ce0a64 100644 --- a/cdc/sink/mq/mq_flush_worker.go +++ b/cdc/sink/mq/mq_flush_worker.go @@ -43,9 +43,9 @@ type topicPartitionKey struct { // It carries the partition information of the message, // and it is also used as resolved ts messaging. type mqEvent struct { - key topicPartitionKey - row *model.RowChangedEvent - resolvedTs model.Ts + key topicPartitionKey + row *model.RowChangedEvent + resolved model.ResolvedTs } // flushWorker is responsible for sending messages to the Kafka producer on a batch basis. @@ -98,7 +98,7 @@ func (w *flushWorker) batch( case msg := <-w.msgChan: // When the resolved ts is received, // we need to write the previous data to the producer as soon as possible. - if msg.resolvedTs != 0 { + if msg.resolved.Ts != 0 { w.needSyncFlush = true return index, nil } @@ -116,7 +116,7 @@ func (w *flushWorker) batch( case <-ctx.Done(): return index, ctx.Err() case msg := <-w.msgChan: - if msg.resolvedTs != 0 { + if msg.resolved.Ts != 0 { w.needSyncFlush = true return index, nil } diff --git a/cdc/sink/mq/mq_flush_worker_test.go b/cdc/sink/mq/mq_flush_worker_test.go index a2a410beb76..fa17a90eb9e 100644 --- a/cdc/sink/mq/mq_flush_worker_test.go +++ b/cdc/sink/mq/mq_flush_worker_test.go @@ -117,7 +117,7 @@ func TestBatch(t *testing.T) { name: "Normal batching", events: []mqEvent{ { - resolvedTs: 0, + resolved: model.NewResolvedTs(0), }, { row: &model.RowChangedEvent{ @@ -142,7 +142,7 @@ func TestBatch(t *testing.T) { name: "No row change events", events: []mqEvent{ { - resolvedTs: 1, + resolved: model.NewResolvedTs(1), }, }, expectedN: 0, @@ -159,7 +159,7 @@ func TestBatch(t *testing.T) { key: key, }, { - resolvedTs: 1, + resolved: model.NewResolvedTs(1), }, { row: &model.RowChangedEvent{ @@ -387,7 +387,7 @@ func TestFlush(t *testing.T) { key: key1, }, { - resolvedTs: 1, + resolved: model.NewResolvedTs(1), }, } @@ -464,15 +464,15 @@ func TestProducerError(t *testing.T) { }, }) require.NoError(t, err) - err = worker.addEvent(ctx, mqEvent{resolvedTs: 100}) + err = worker.addEvent(ctx, mqEvent{resolved: model.NewResolvedTs(100)}) require.NoError(t, err) wg.Wait() - err = worker.addEvent(ctx, mqEvent{resolvedTs: 200}) + err = worker.addEvent(ctx, mqEvent{resolved: model.NewResolvedTs(200)}) require.Error(t, err) require.Regexp(t, ".*fake.*", err.Error()) - err = worker.addEvent(ctx, mqEvent{resolvedTs: 300}) + err = worker.addEvent(ctx, mqEvent{resolved: model.NewResolvedTs(300)}) require.Error(t, err) require.Regexp(t, ".*ErrMQWorkerClosed.*", err.Error()) } diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index d07428c31e3..e01e6c53daf 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -93,11 +93,11 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { } err = sink.EmitRowChangedEvents(ctx, row) c.Assert(err, check.IsNil) - checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, uint64(120)) + checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(120))) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) // flush older resolved ts - checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, uint64(110)) + checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(uint64(110))) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) @@ -324,20 +324,24 @@ func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { // mock kafka broker processes 1 row resolvedTs event leader.Returns(prodSuccess) - checkpointTs1, err := sink.FlushRowChangedEvents(ctx, tableID1, row1.CommitTs) + checkpointTs1, err := sink.FlushRowChangedEvents(ctx, + tableID1, model.NewResolvedTs(row1.CommitTs)) c.Assert(err, check.IsNil) c.Assert(checkpointTs1, check.Equals, row1.CommitTs) - checkpointTs2, err := sink.FlushRowChangedEvents(ctx, tableID2, row2.CommitTs) + checkpointTs2, err := sink.FlushRowChangedEvents(ctx, + tableID2, model.NewResolvedTs(row2.CommitTs)) c.Assert(err, check.IsNil) c.Assert(checkpointTs2, check.Equals, row2.CommitTs) - checkpointTs3, err := sink.FlushRowChangedEvents(ctx, tableID3, row3.CommitTs) + checkpointTs3, err := sink.FlushRowChangedEvents(ctx, + tableID3, model.NewResolvedTs(row3.CommitTs)) c.Assert(err, check.IsNil) c.Assert(checkpointTs3, check.Equals, row3.CommitTs) // flush older resolved ts - checkpointTsOld, err := sink.FlushRowChangedEvents(ctx, tableID1, uint64(110)) + checkpointTsOld, err := sink.FlushRowChangedEvents(ctx, tableID1, + model.NewResolvedTs(uint64(110))) c.Assert(err, check.IsNil) c.Assert(checkpointTsOld, check.Equals, row1.CommitTs) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index 56401dd1688..c4ce12e0e20 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -230,10 +230,12 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row // FlushRowChangedEvents will flush all received events, // we do not write data downstream until we receive resolvedTs. // Concurrency Note: FlushRowChangedEvents is thread-safe. -func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { - v, ok := s.tableMaxResolvedTs.Load(tableID) - if !ok || v.(uint64) < resolvedTs { - s.tableMaxResolvedTs.Store(tableID, resolvedTs) +func (s *mysqlSink) FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, +) (uint64, error) { + v, ok := s.getTableResolvedTs(tableID) + if !ok || v.Ts < resolved.Ts { + s.tableMaxResolvedTs.Store(tableID, resolved) } s.resolvedNotifier.Notify() @@ -264,7 +266,8 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify. flushedResolvedTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs) if len(resolvedTxnsMap) == 0 { s.tableMaxResolvedTs.Range(func(key, value interface{}) bool { - s.tableCheckpointTs.Store(key, value) + resolved := value.(model.ResolvedTs) + s.tableCheckpointTs.Store(key, resolved.Ts) return true }) continue @@ -278,8 +281,8 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify. } s.dispatchAndExecTxns(ctx, resolvedTxnsMap) - for tableID, resolvedTs := range flushedResolvedTsMap { - s.tableCheckpointTs.Store(tableID, resolvedTs) + for tableID, resolved := range flushedResolvedTsMap { + s.tableCheckpointTs.Store(tableID, resolved.Ts) } } } @@ -513,10 +516,10 @@ func (s *mysqlSink) cleanTableResource(tableID model.TableID) { // otherwise when the table is dispatched back again, // it may read the old values. // See: https://github.com/pingcap/tiflow/issues/4464#issuecomment-1085385382. - if resolvedTs, loaded := s.tableMaxResolvedTs.LoadAndDelete(tableID); loaded { + if resolved, loaded := s.tableMaxResolvedTs.LoadAndDelete(tableID); loaded { log.Info("clean up table max resolved ts in MySQL sink", zap.Int64("tableID", tableID), - zap.Uint64("resolvedTs", resolvedTs.(uint64))) + zap.Uint64("resolvedTs", resolved.(model.ResolvedTs).Ts)) } if checkpointTs, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded { log.Info("clean up table checkpoint ts in MySQL sink", @@ -546,27 +549,26 @@ func (s *mysqlSink) RemoveTable(ctx context.Context, tableID model.TableID) erro case <-ctx.Done(): return errors.Trace(ctx.Err()) case <-ticker.C: - maxResolvedTs, ok := s.tableMaxResolvedTs.Load(tableID) + maxResolved, ok := s.getTableResolvedTs(tableID) log.Warn("Barrier doesn't return in time, may be stuck", zap.Int64("tableID", tableID), zap.Bool("hasResolvedTs", ok), - zap.Any("resolvedTs", maxResolvedTs), + zap.Any("resolvedTs", maxResolved.Ts), zap.Uint64("checkpointTs", s.getTableCheckpointTs(tableID))) default: - v, ok := s.tableMaxResolvedTs.Load(tableID) + maxResolved, ok := s.getTableResolvedTs(tableID) if !ok { log.Info("No table resolvedTs is found", zap.Int64("tableID", tableID)) return nil } - maxResolvedTs := v.(uint64) - if s.getTableCheckpointTs(tableID) >= maxResolvedTs { + if s.getTableCheckpointTs(tableID) >= maxResolved.Ts { return nil } - checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolvedTs) + checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolved) if err != nil { return err } - if checkpointTs >= maxResolvedTs { + if checkpointTs >= maxResolved.Ts { return nil } // short sleep to avoid cpu spin @@ -583,6 +585,15 @@ func (s *mysqlSink) getTableCheckpointTs(tableID model.TableID) uint64 { return uint64(0) } +func (s *mysqlSink) getTableResolvedTs(tableID model.TableID) (model.ResolvedTs, bool) { + v, ok := s.tableMaxResolvedTs.Load(tableID) + var resolved model.ResolvedTs + if ok { + resolved = v.(model.ResolvedTs) + } + return resolved, ok +} + func logDMLTxnErr(err error) error { if isRetryableDMLError(err) { log.Warn("execute DMLs with error, retry later", zap.Error(err)) diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 2ebc196f14c..5357ed1e9f4 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1248,7 +1248,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) { // retry to make sure event is flushed err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, 1, uint64(2)) + ts, err := sink.FlushRowChangedEvents(ctx, 1, model.NewResolvedTs(uint64(2))) require.Nil(t, err) if ts < uint64(2) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 2) @@ -1259,7 +1259,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) { require.Nil(t, err) err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, 2, uint64(4)) + ts, err := sink.FlushRowChangedEvents(ctx, 2, model.NewResolvedTs(uint64(4))) require.Nil(t, err) if ts < uint64(4) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 4) @@ -1784,7 +1784,7 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { model.DefaultChangeFeedID(changefeed), sinkURI, f, rc, map[string]string{}) require.Nil(t, err) - checkpoint, err := sink.FlushRowChangedEvents(ctx, model.TableID(1), 1) + checkpoint, err := sink.FlushRowChangedEvents(ctx, model.TableID(1), model.NewResolvedTs(1)) require.Nil(t, err) require.True(t, checkpoint <= 1) rows := []*model.RowChangedEvent{ @@ -1803,7 +1803,7 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { } err = sink.EmitRowChangedEvents(ctx, rows...) require.Nil(t, err) - checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(1), 6) + checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(1), model.NewResolvedTs(6)) require.True(t, checkpoint <= 6) require.Nil(t, err) require.True(t, sink.getTableCheckpointTs(model.TableID(1)) <= 6) @@ -1823,7 +1823,7 @@ func TestMySQLSinkFlushResolvedTs(t *testing.T) { } err = sink.EmitRowChangedEvents(ctx, rows...) require.Nil(t, err) - checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), 5) + checkpoint, err = sink.FlushRowChangedEvents(ctx, model.TableID(2), model.NewResolvedTs(5)) require.True(t, checkpoint <= 5) require.Nil(t, err) require.True(t, sink.getTableCheckpointTs(model.TableID(2)) <= 5) @@ -1897,7 +1897,7 @@ func TestCleanTableResource(t *testing.T) { Table: &model.TableName{TableID: tblID, Schema: "test", Table: "t1"}, })) s.tableCheckpointTs.Store(tblID, uint64(1)) - s.tableMaxResolvedTs.Store(tblID, uint64(2)) + s.tableMaxResolvedTs.Store(tblID, model.NewResolvedTs(uint64(2))) _, ok := s.txnCache.unresolvedTxns[tblID] require.True(t, ok) require.Nil(t, s.AddTable(tblID)) diff --git a/cdc/sink/mysql/simple_mysql_tester.go b/cdc/sink/mysql/simple_mysql_tester.go index ff3df320f50..ed68b1f12b5 100644 --- a/cdc/sink/mysql/simple_mysql_tester.go +++ b/cdc/sink/mysql/simple_mysql_tester.go @@ -177,12 +177,14 @@ func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` -func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) { +func (s *simpleMySQLSink) FlushRowChangedEvents( + ctx context.Context, _ model.TableID, resolved model.ResolvedTs, +) (uint64, error) { s.rowsBufferLock.Lock() defer s.rowsBufferLock.Unlock() newBuffer := make([]*model.RowChangedEvent, 0, len(s.rowsBuffer)) for _, row := range s.rowsBuffer { - if row.CommitTs <= resolvedTs { + if row.CommitTs <= resolved.Ts { err := s.executeRowChangedEvents(ctx, row) if err != nil { return 0, err @@ -192,7 +194,7 @@ func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, _ model.Tab } } s.rowsBuffer = newBuffer - return resolvedTs, nil + return resolved.Ts, nil } // EmitCheckpointTs sends CheckpointTs to Sink diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 2aa02ec299b..dc7834c59a7 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -112,7 +112,7 @@ func (c *unresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha // The returned map contains many txns grouped by tableID. for each table, the each commitTs of txn in txns slice is strictly increasing func (c *unresolvedTxnCache) Resolved( resolvedTsMap *sync.Map, -) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) { +) (map[model.TableID]model.ResolvedTs, map[model.TableID][]*model.SingleTableTxn) { c.unresolvedTxnsMu.Lock() defer c.unresolvedTxnsMu.Unlock() if len(c.unresolvedTxns) == 0 { @@ -124,17 +124,19 @@ func (c *unresolvedTxnCache) Resolved( func splitResolvedTxn( resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, -) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { +) (flushedResolvedTsMap map[model.TableID]model.ResolvedTs, + resolvedRowsMap map[model.TableID][]*model.SingleTableTxn, +) { resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns)) - flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns)) + flushedResolvedTsMap = make(map[model.TableID]model.ResolvedTs, len(unresolvedTxns)) for tableID, txns := range unresolvedTxns { v, ok := resolvedTsMap.Load(tableID) if !ok { continue } - resolvedTs := v.(uint64) + resolved := v.(model.ResolvedTs) i := sort.Search(len(txns), func(i int) bool { - return txns[i].commitTs > resolvedTs + return txns[i].commitTs > resolved.Ts }) if i == 0 { continue @@ -156,7 +158,7 @@ func splitResolvedTxn( resolvedTxns = append(resolvedTxns, txns.txns...) } resolvedRowsMap[tableID] = resolvedTxns - flushedResolvedTsMap[tableID] = resolvedTs + flushedResolvedTsMap[tableID] = resolved } return } diff --git a/cdc/sink/mysql/txn_cache_test.go b/cdc/sink/mysql/txn_cache_test.go index 0656b93b3dd..103d5ac9c4f 100644 --- a/cdc/sink/mysql/txn_cache_test.go +++ b/cdc/sink/mysql/txn_cache_test.go @@ -262,19 +262,19 @@ func TestSplitResolvedTxn(test *testing.T) { cache.Append(nil, t.input...) resolvedTsMap := sync.Map{} for tableID, ts := range t.resolvedTsMap { - resolvedTsMap.Store(tableID, ts) + resolvedTsMap.Store(tableID, model.NewResolvedTs(ts)) } - _, resolved := cache.Resolved(&resolvedTsMap) - for tableID, txns := range resolved { + _, resolvedTxn := cache.Resolved(&resolvedTsMap) + for tableID, txns := range resolvedTxn { sort.Slice(txns, func(i, j int) bool { if txns[i].CommitTs != txns[j].CommitTs { return txns[i].CommitTs < txns[j].CommitTs } return txns[i].StartTs < txns[j].StartTs }) - resolved[tableID] = txns + resolvedTxn[tableID] = txns } - require.Equal(test, t.expected, resolved, cmp.Diff(resolved, t.expected)) + require.Equal(test, t.expected, resolvedTxn, cmp.Diff(resolvedTxn, t.expected)) } } } diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 4c0081570e6..e88a760ec1f 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -53,12 +53,16 @@ type Sink interface { EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // FlushRowChangedEvents flushes each row which of commitTs less than or - // equal to `resolvedTs` into downstream. - // TiCDC guarantees that all the Events whose commitTs is less than or - // equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` + // equal to `resolved.Ts` into downstream. + // With `resolved.Batch == false`, TiCDC guarantees that all the Events whose commitTs + // is less than or equal to `resolved.Ts` are sent to Sink through `EmitRowChangedEvents`. + // With `resolved.Batch == true`, TiCDC guarantees that all events whose commitTs + // is less than 'resolved.Ts' are sent to Sink. // // FlushRowChangedEvents is thread-safe. - FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) + FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, + ) (uint64, error) // EmitCheckpointTs sends CheckpointTs to Sink. // TiCDC guarantees that all Events **in the cluster** which of commitTs diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go index 34f4950a5c7..ee3bdc2a548 100644 --- a/cdc/sink/table_sink.go +++ b/cdc/sink/table_sink.go @@ -71,7 +71,10 @@ func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs // is required to be no more than global resolvedTs, table barrierTs and table // redo log watermarkTs. -func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { +func (t *tableSink) FlushRowChangedEvents( + ctx context.Context, tableID model.TableID, resolved model.ResolvedTs, +) (uint64, error) { + resolvedTs := resolved.Ts if tableID != t.tableID { log.Panic("inconsistent table sink", zap.Int64("tableID", tableID), zap.Int64("sinkTableID", t.tableID)) @@ -80,7 +83,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab return t.buffer[i].CommitTs > resolvedTs }) if i == 0 { - return t.flushResolvedTs(ctx, resolvedTs) + return t.flushResolvedTs(ctx, resolved) } resolvedRows := t.buffer[:i] t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) @@ -89,19 +92,21 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab if err != nil { return 0, errors.Trace(err) } - return t.flushResolvedTs(ctx, resolvedTs) + return t.flushResolvedTs(ctx, resolved) } -func (t *tableSink) flushResolvedTs(ctx context.Context, resolvedTs uint64) (uint64, error) { - redoTs, err := t.flushRedoLogs(ctx, resolvedTs) +func (t *tableSink) flushResolvedTs( + ctx context.Context, resolved model.ResolvedTs, +) (uint64, error) { + redoTs, err := t.flushRedoLogs(ctx, resolved.Ts) if err != nil { return 0, errors.Trace(err) } - if redoTs < resolvedTs { - resolvedTs = redoTs + if redoTs < resolved.Ts { + resolved.Ts = redoTs } - checkpointTs, err := t.backendSink.FlushRowChangedEvents(ctx, t.tableID, resolvedTs) + checkpointTs, err := t.backendSink.FlushRowChangedEvents(ctx, t.tableID, resolved) if err != nil { return 0, errors.Trace(err) } diff --git a/cdc/sorter/leveldb/writer.go b/cdc/sorter/leveldb/writer.go index e9a959bab02..1e3251113b5 100644 --- a/cdc/sorter/leveldb/writer.go +++ b/cdc/sorter/leveldb/writer.go @@ -17,7 +17,6 @@ import ( "context" "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sorter/encoding" "github.com/pingcap/tiflow/cdc/sorter/leveldb/message" "github.com/pingcap/tiflow/pkg/actor" @@ -57,7 +56,7 @@ func (w *writer) Poll(ctx context.Context, msgs []actormsg.Message[message.Task] } ev := msgs[i].Value.InputEvent - if ev.RawKV.OpType == model.OpTypeResolved { + if ev.IsResolved() { if w.maxResolvedTs < ev.CRTs { w.maxResolvedTs = ev.CRTs } diff --git a/cdc/sorter/memory/entry_sorter.go b/cdc/sorter/memory/entry_sorter.go index 233497f3a89..02c6f884248 100644 --- a/cdc/sorter/memory/entry_sorter.go +++ b/cdc/sorter/memory/entry_sorter.go @@ -141,7 +141,7 @@ func (es *EntrySorter) AddEntry(_ context.Context, entry *model.PolymorphicEvent } es.lock.Lock() defer es.lock.Unlock() - if entry.RawKV.OpType == model.OpTypeResolved { + if entry.IsResolved() { es.resolvedTsGroup = append(es.resolvedTsGroup, entry.CRTs) es.resolvedNotifier.Notify() } else { diff --git a/cdc/sorter/memory/entry_sorter_test.go b/cdc/sorter/memory/entry_sorter_test.go index 90b95f99e8a..d88ec9b473d 100644 --- a/cdc/sorter/memory/entry_sorter_test.go +++ b/cdc/sorter/memory/entry_sorter_test.go @@ -275,7 +275,7 @@ func TestEntrySorterRandomly(t *testing.T) { } lastTs = entry.CRTs lastOpType = entry.RawKV.OpType - if entry.RawKV.OpType == model.OpTypeResolved { + if entry.IsResolved() { resolvedTs = entry.CRTs } if resolvedTs == maxTs { @@ -509,7 +509,7 @@ func BenchmarkSorter(b *testing.B) { }() var resolvedTs uint64 for entry := range es.Output() { - if entry.RawKV.OpType == model.OpTypeResolved { + if entry.IsResolved() { resolvedTs = entry.CRTs } if resolvedTs == maxTs { diff --git a/cdc/sorter/unified/heap_sorter.go b/cdc/sorter/unified/heap_sorter.go index 16d78dfeec2..30c4930472c 100644 --- a/cdc/sorter/unified/heap_sorter.go +++ b/cdc/sorter/unified/heap_sorter.go @@ -118,7 +118,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { // Since when a table is mostly idle or near-idle, most flushes would contain one ResolvedEvent alone, // this optimization will greatly improve performance when (1) total number of table is large, // and (2) most tables do not have many events. - if h.heap.Len() == 1 && h.heap[0].entry.RawKV.OpType == model.OpTypeResolved { + if h.heap.Len() == 1 && h.heap[0].entry.IsResolved() { h.heap.Pop() } @@ -303,7 +303,7 @@ func (h *heapSorter) init(ctx context.Context, onError func(err error)) { poolHandle := heapSorterPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error { event := eventI.(*model.PolymorphicEvent) heap.Push(&h.heap, &sortItem{entry: event}) - isResolvedEvent := event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved + isResolvedEvent := event.RawKV != nil && event.IsResolved() if isResolvedEvent { if event.RawKV.CRTs < state.maxResolved { diff --git a/cdc/sorter/unified/merger.go b/cdc/sorter/unified/merger.go index 238bf41a1a3..efd0a08d43a 100644 --- a/cdc/sorter/unified/merger.go +++ b/cdc/sorter/unified/merger.go @@ -352,7 +352,7 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch continue } - if event.CRTs > minResolvedTs || (event.CRTs == minResolvedTs && event.RawKV.OpType == model.OpTypeResolved) { + if event.CRTs > minResolvedTs || (event.CRTs == minResolvedTs && event.IsResolved()) { // we have processed all events from this task that need to be processed in this merge if event.CRTs > minResolvedTs || event.RawKV.OpType != model.OpTypeResolved { pendingSet.Store(task, event) diff --git a/cdc/sorter/unified/unified_sorter.go b/cdc/sorter/unified/unified_sorter.go index 70f04cf2118..0287f158675 100644 --- a/cdc/sorter/unified/unified_sorter.go +++ b/cdc/sorter/unified/unified_sorter.go @@ -183,7 +183,7 @@ func (s *Sorter) Run(ctx context.Context) error { case <-subctx.Done(): return subctx.Err() case event := <-s.inputCh: - if event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved { + if event.RawKV != nil && event.IsResolved() { // broadcast resolved events for _, sorter := range heapSorters { select { diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 11e9cb179db..9fbe12b782a 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -790,7 +790,8 @@ func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSink, resolve flushedResolvedTs := true sink.tablesMap.Range(func(key, value interface{}) bool { tableID := key.(int64) - checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, resolvedTs) + checkpointTs, err = sink.FlushRowChangedEvents(ctx, + tableID, model.NewResolvedTs(resolvedTs)) if err != nil { return false } diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index b20b326e452..f13bf1c9fd5 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -165,7 +165,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { } for tableID, tableLastResolvedTs := range tableResolvedTsMap { - _, err = s.FlushRowChangedEvents(ctx, tableID, tableLastResolvedTs) + _, err = s.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(tableLastResolvedTs)) if err != nil { return err } @@ -177,7 +177,7 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { } for tableID := range tableResolvedTsMap { - _, err = s.FlushRowChangedEvents(ctx, tableID, resolvedTs) + _, err = s.FlushRowChangedEvents(ctx, tableID, model.NewResolvedTs(resolvedTs)) if err != nil { return err }