diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 2624d7db9c5..3db6205fd08 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -121,12 +121,13 @@ func (p *processor) checkReadyForMessages() bool { // 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true. // 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false func (p *processor) AddTable( - ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, + ctx context.Context, tableID model.TableID, checkpoint tablepb.Checkpoint, isPrepare bool, ) (bool, error) { if !p.checkReadyForMessages() { return false, nil } + startTs := checkpoint.CheckpointTs if startTs == 0 { log.Panic("table start ts must not be 0", zap.String("captureID", p.captureInfo.ID), @@ -167,6 +168,11 @@ func (p *processor) AddTable( // be stopped on original capture already, it's safe to start replicating data now. if !isPrepare { if p.pullBasedSinking { + if p.redoDMLMgr.Enabled() { + // ResolvedTs is store in external storage when redo log is enabled, so we need to + // start table with ResolvedTs in redoDMLManager. + p.redoDMLMgr.StartTable(tableID, checkpoint.ResolvedTs) + } if err := p.sinkManager.StartTable(tableID, startTs); err != nil { return false, errors.Trace(err) } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 99549554386..7a7d288e225 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -279,7 +279,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { tester.MustApplyPatches() // table-1: `preparing` -> `prepared` -> `replicating` - ok, err := p.AddTable(ctx, 1, 20, true) + ok, err := p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 20}, true) require.NoError(t, err) require.True(t, ok) @@ -305,12 +305,12 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.True(t, done) require.Equal(t, tablepb.TableStatePrepared, table1.State()) - ok, err = p.AddTable(ctx, 1, 30, true) + ok, err = p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 30}, true) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(0), table1.sinkStartTs) - ok, err = p.AddTable(ctx, 1, 30, false) + ok, err = p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 30}, false) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(30), table1.sinkStartTs) @@ -405,10 +405,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err := p.AddTable(ctx, model.TableID(1), 20, false) + done, err := p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 20}, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, model.TableID(2), 30, false) + done, err = p.AddTable(ctx, 2, tablepb.Checkpoint{CheckpointTs: 30}, false) require.Nil(t, err) require.True(t, done) @@ -440,10 +440,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err = p.AddTable(ctx, model.TableID(1), 20, false) + done, err = p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 20}, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, model.TableID(2), 30, false) + done, err = p.AddTable(ctx, 2, tablepb.Checkpoint{CheckpointTs: 30}, false) require.Nil(t, err) require.True(t, done) err = p.Tick(ctx) @@ -474,10 +474,10 @@ func TestPositionDeleted(t *testing.T) { p, tester := initProcessor4Test(ctx, t, &liveness) var err error // add table - done, err := p.AddTable(ctx, model.TableID(1), 30, false) + done, err := p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 30}, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, model.TableID(2), 40, false) + done, err = p.AddTable(ctx, 2, tablepb.Checkpoint{CheckpointTs: 40}, false) require.Nil(t, err) require.True(t, done) // init tick @@ -572,7 +572,7 @@ func TestUpdateBarrierTs(t *testing.T) { }) p.schemaStorage.(*mockSchemaStorage).resolvedTs = 10 - done, err := p.AddTable(ctx, model.TableID(1), 5, false) + done, err := p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 5}, false) require.True(t, done) require.Nil(t, err) err = p.Tick(ctx) diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index e06bb04814e..7abea626afb 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -98,6 +98,7 @@ func (m *ddlManager) GetResolvedTs() model.Ts { type DMLManager interface { redoManager AddTable(tableID model.TableID, startTs uint64) + StartTable(tableID model.TableID, startTs uint64) RemoveTable(tableID model.TableID) UpdateResolvedTs(ctx context.Context, tableID model.TableID, resolvedTs uint64) error GetResolvedTs(tableID model.TableID) model.Ts @@ -166,10 +167,6 @@ func (s *statefulRts) getUnflushed() model.Ts { return atomic.LoadUint64(&s.unflushed) } -func (s *statefulRts) setFlushed(flushed model.Ts) { - atomic.StoreUint64(&s.flushed, flushed) -} - func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (changed bool) { for { old := atomic.LoadUint64(&s.unflushed) @@ -183,6 +180,19 @@ func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (changed bool) { return true } +func (s *statefulRts) checkAndSetFlushed(flushed model.Ts) (changed bool) { + for { + old := atomic.LoadUint64(&s.flushed) + if old > flushed { + return false + } + if atomic.CompareAndSwapUint64(&s.flushed, old, flushed) { + break + } + } + return true +} + // logManager manages redo log writer, buffers un-persistent redo logs, calculates // redo log resolved ts. It implements DDLManager and DMLManager interface. type logManager struct { @@ -287,6 +297,18 @@ func (m *logManager) emitRedoEvents( }) } +// StartTable starts a table, which means the table is ready to emit redo events. +// Note that this function should only be called once when adding a new table to processor. +func (m *logManager) StartTable(tableID model.TableID, resolvedTs uint64) { + // advance unflushed resolved ts + m.onResolvedTsMsg(tableID, resolvedTs) + + // advance flushed resolved ts + if value, loaded := m.rtsMap.Load(tableID); loaded { + value.(*statefulRts).checkAndSetFlushed(resolvedTs) + } +} + // UpdateResolvedTs asynchronously updates resolved ts of a single table. func (m *logManager) UpdateResolvedTs( ctx context.Context, @@ -351,7 +373,13 @@ func (m *logManager) prepareForFlush() (tableRtsMap map[model.TableID]model.Ts) func (m *logManager) postFlush(tableRtsMap map[model.TableID]model.Ts) { for tableID, flushed := range tableRtsMap { if value, loaded := m.rtsMap.Load(tableID); loaded { - value.(*statefulRts).setFlushed(flushed) + changed := value.(*statefulRts).checkAndSetFlushed(flushed) + if !changed { + log.Debug("flush redo with regressed resolved ts", + zap.Int64("tableID", tableID), + zap.Uint64("flushed", flushed), + zap.Uint64("current", value.(*statefulRts).getFlushed())) + } } } } diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 588a64b7bda..231f6db99e8 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -364,8 +364,8 @@ func (m *metaManager) prepareForFlushMeta() (bool, common.LogMeta) { } func (m *metaManager) postFlushMeta(meta common.LogMeta) { - m.metaResolvedTs.setFlushed(meta.ResolvedTs) - m.metaCheckpointTs.setFlushed(meta.CheckpointTs) + m.metaResolvedTs.checkAndSetFlushed(meta.ResolvedTs) + m.metaCheckpointTs.checkAndSetFlushed(meta.CheckpointTs) } func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index 222166d0ecb..91e893973dd 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -26,11 +26,11 @@ import ( // to adapt the current Processor implementation to it. // TODO find a way to make the semantics easier to understand. type TableExecutor interface { - // AddTable add a new table with `startTs` + // AddTable add a new table with `Checkpoint.CheckpointTs` // if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol. // if `isPrepare` is false, the 2nd phase. AddTable( - ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, + ctx context.Context, tableID model.TableID, checkpoint tablepb.Checkpoint, isPrepare bool, ) (done bool, err error) // IsAddTableFinished make sure the requested table is in the proper status diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index 59671905be5..7a925bf1458 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -293,12 +293,12 @@ const ( ) type dispatchTableTask struct { - TableID model.TableID - StartTs model.Ts - IsRemove bool - IsPrepare bool - Epoch schedulepb.ProcessorEpoch - status dispatchTableTaskStatus + TableID model.TableID + Checkpoint tablepb.Checkpoint + IsRemove bool + IsPrepare bool + Epoch schedulepb.ProcessorEpoch + status dispatchTableTaskStatus } func (a *agent) handleMessageDispatchTableRequest( @@ -326,12 +326,12 @@ func (a *agent) handleMessageDispatchTableRequest( case *schedulepb.DispatchTableRequest_AddTable: tableID := req.AddTable.GetTableID() task = &dispatchTableTask{ - TableID: tableID, - StartTs: req.AddTable.GetCheckpoint().CheckpointTs, - IsRemove: false, - IsPrepare: req.AddTable.GetIsSecondary(), - Epoch: epoch, - status: dispatchTableTaskReceived, + TableID: tableID, + Checkpoint: req.AddTable.GetCheckpoint(), + IsRemove: false, + IsPrepare: req.AddTable.GetIsSecondary(), + Epoch: epoch, + status: dispatchTableTaskReceived, } table = a.tableM.addTable(tableID) case *schedulepb.DispatchTableRequest_RemoveTable: diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 6d961bc32bd..4f950d95975 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -931,11 +931,11 @@ func newMockTableExecutor() *MockTableExecutor { // AddTable adds a table to the executor. func (e *MockTableExecutor) AddTable( - ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, + ctx context.Context, tableID model.TableID, checkpoint tablepb.Checkpoint, isPrepare bool, ) (bool, error) { log.Info("AddTable", zap.Int64("tableID", tableID), - zap.Any("startTs", startTs), + zap.Any("startTs", checkpoint), zap.Bool("isPrepare", isPrepare)) state, ok := e.tables[tableID] @@ -954,7 +954,7 @@ func (e *MockTableExecutor) AddTable( delete(e.tables, tableID) } } - args := e.Called(ctx, tableID, startTs, isPrepare) + args := e.Called(ctx, tableID, checkpoint, isPrepare) if args.Bool(0) { e.tables[tableID] = tablepb.TableStatePreparing } diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index 8cea19c220b..927d7c41028 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -174,7 +174,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess for changed { switch state { case tablepb.TableStateAbsent: - done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, t.task.IsPrepare) + done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.Checkpoint, t.task.IsPrepare) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), @@ -205,7 +205,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess } if t.task.status == dispatchTableTaskReceived { - done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, false) + done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.Checkpoint, false) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace),