Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo, processor(ticdc): set flushed resolvedTs when start table (#9281) #9307

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,13 @@ func (p *processor) checkReadyForMessages() bool {
// 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true.
// 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false
func (p *processor) AddTable(
ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool,
ctx context.Context, tableID model.TableID, checkpoint tablepb.Checkpoint, isPrepare bool,
) (bool, error) {
if !p.checkReadyForMessages() {
return false, nil
}

startTs := checkpoint.CheckpointTs
if startTs == 0 {
log.Panic("table start ts must not be 0",
zap.String("captureID", p.captureInfo.ID),
Expand Down Expand Up @@ -167,6 +168,11 @@ func (p *processor) AddTable(
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
if p.pullBasedSinking {
if p.redoDMLMgr.Enabled() {
// ResolvedTs is store in external storage when redo log is enabled, so we need to
// start table with ResolvedTs in redoDMLManager.
p.redoDMLMgr.StartTable(tableID, checkpoint.ResolvedTs)
}
if err := p.sinkManager.StartTable(tableID, startTs); err != nil {
return false, errors.Trace(err)
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
tester.MustApplyPatches()

// table-1: `preparing` -> `prepared` -> `replicating`
ok, err := p.AddTable(ctx, 1, 20, true)
ok, err := p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 20}, true)
require.NoError(t, err)
require.True(t, ok)

Expand All @@ -305,12 +305,12 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.True(t, done)
require.Equal(t, tablepb.TableStatePrepared, table1.State())

ok, err = p.AddTable(ctx, 1, 30, true)
ok, err = p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 30}, true)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, model.Ts(0), table1.sinkStartTs)

ok, err = p.AddTable(ctx, 1, 30, false)
ok, err = p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 30}, false)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, model.Ts(30), table1.sinkStartTs)
Expand Down Expand Up @@ -405,10 +405,10 @@ func TestProcessorClose(t *testing.T) {
tester.MustApplyPatches()

// add tables
done, err := p.AddTable(ctx, model.TableID(1), 20, false)
done, err := p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 20}, false)
require.Nil(t, err)
require.True(t, done)
done, err = p.AddTable(ctx, model.TableID(2), 30, false)
done, err = p.AddTable(ctx, 2, tablepb.Checkpoint{CheckpointTs: 30}, false)
require.Nil(t, err)
require.True(t, done)

Expand Down Expand Up @@ -440,10 +440,10 @@ func TestProcessorClose(t *testing.T) {
tester.MustApplyPatches()

// add tables
done, err = p.AddTable(ctx, model.TableID(1), 20, false)
done, err = p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 20}, false)
require.Nil(t, err)
require.True(t, done)
done, err = p.AddTable(ctx, model.TableID(2), 30, false)
done, err = p.AddTable(ctx, 2, tablepb.Checkpoint{CheckpointTs: 30}, false)
require.Nil(t, err)
require.True(t, done)
err = p.Tick(ctx)
Expand Down Expand Up @@ -474,10 +474,10 @@ func TestPositionDeleted(t *testing.T) {
p, tester := initProcessor4Test(ctx, t, &liveness)
var err error
// add table
done, err := p.AddTable(ctx, model.TableID(1), 30, false)
done, err := p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 30}, false)
require.Nil(t, err)
require.True(t, done)
done, err = p.AddTable(ctx, model.TableID(2), 40, false)
done, err = p.AddTable(ctx, 2, tablepb.Checkpoint{CheckpointTs: 40}, false)
require.Nil(t, err)
require.True(t, done)
// init tick
Expand Down Expand Up @@ -572,7 +572,7 @@ func TestUpdateBarrierTs(t *testing.T) {
})
p.schemaStorage.(*mockSchemaStorage).resolvedTs = 10

done, err := p.AddTable(ctx, model.TableID(1), 5, false)
done, err := p.AddTable(ctx, 1, tablepb.Checkpoint{CheckpointTs: 5}, false)
require.True(t, done)
require.Nil(t, err)
err = p.Tick(ctx)
Expand Down
38 changes: 33 additions & 5 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (m *ddlManager) GetResolvedTs() model.Ts {
type DMLManager interface {
redoManager
AddTable(tableID model.TableID, startTs uint64)
StartTable(tableID model.TableID, startTs uint64)
RemoveTable(tableID model.TableID)
UpdateResolvedTs(ctx context.Context, tableID model.TableID, resolvedTs uint64) error
GetResolvedTs(tableID model.TableID) model.Ts
Expand Down Expand Up @@ -166,10 +167,6 @@ func (s *statefulRts) getUnflushed() model.Ts {
return atomic.LoadUint64(&s.unflushed)
}

func (s *statefulRts) setFlushed(flushed model.Ts) {
atomic.StoreUint64(&s.flushed, flushed)
}

func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (changed bool) {
for {
old := atomic.LoadUint64(&s.unflushed)
Expand All @@ -183,6 +180,19 @@ func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (changed bool) {
return true
}

func (s *statefulRts) checkAndSetFlushed(flushed model.Ts) (changed bool) {
for {
old := atomic.LoadUint64(&s.flushed)
if old > flushed {
return false
}
if atomic.CompareAndSwapUint64(&s.flushed, old, flushed) {
break
}
}
return true
}

// logManager manages redo log writer, buffers un-persistent redo logs, calculates
// redo log resolved ts. It implements DDLManager and DMLManager interface.
type logManager struct {
Expand Down Expand Up @@ -287,6 +297,18 @@ func (m *logManager) emitRedoEvents(
})
}

// StartTable starts a table, which means the table is ready to emit redo events.
// Note that this function should only be called once when adding a new table to processor.
func (m *logManager) StartTable(tableID model.TableID, resolvedTs uint64) {
// advance unflushed resolved ts
m.onResolvedTsMsg(tableID, resolvedTs)

// advance flushed resolved ts
if value, loaded := m.rtsMap.Load(tableID); loaded {
value.(*statefulRts).checkAndSetFlushed(resolvedTs)
}
}

// UpdateResolvedTs asynchronously updates resolved ts of a single table.
func (m *logManager) UpdateResolvedTs(
ctx context.Context,
Expand Down Expand Up @@ -351,7 +373,13 @@ func (m *logManager) prepareForFlush() (tableRtsMap map[model.TableID]model.Ts)
func (m *logManager) postFlush(tableRtsMap map[model.TableID]model.Ts) {
for tableID, flushed := range tableRtsMap {
if value, loaded := m.rtsMap.Load(tableID); loaded {
value.(*statefulRts).setFlushed(flushed)
changed := value.(*statefulRts).checkAndSetFlushed(flushed)
if !changed {
log.Debug("flush redo with regressed resolved ts",
zap.Int64("tableID", tableID),
zap.Uint64("flushed", flushed),
zap.Uint64("current", value.(*statefulRts).getFlushed()))
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ func (m *metaManager) prepareForFlushMeta() (bool, common.LogMeta) {
}

func (m *metaManager) postFlushMeta(meta common.LogMeta) {
m.metaResolvedTs.setFlushed(meta.ResolvedTs)
m.metaCheckpointTs.setFlushed(meta.CheckpointTs)
m.metaResolvedTs.checkAndSetFlushed(meta.ResolvedTs)
m.metaCheckpointTs.checkAndSetFlushed(meta.CheckpointTs)
}

func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error {
Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/table_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
// to adapt the current Processor implementation to it.
// TODO find a way to make the semantics easier to understand.
type TableExecutor interface {
// AddTable add a new table with `startTs`
// AddTable add a new table with `Checkpoint.CheckpointTs`
// if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol.
// if `isPrepare` is false, the 2nd phase.
AddTable(
ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool,
ctx context.Context, tableID model.TableID, checkpoint tablepb.Checkpoint, isPrepare bool,
) (done bool, err error)

// IsAddTableFinished make sure the requested table is in the proper status
Expand Down
24 changes: 12 additions & 12 deletions cdc/scheduler/internal/v3/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,12 @@ const (
)

type dispatchTableTask struct {
TableID model.TableID
StartTs model.Ts
IsRemove bool
IsPrepare bool
Epoch schedulepb.ProcessorEpoch
status dispatchTableTaskStatus
TableID model.TableID
Checkpoint tablepb.Checkpoint
IsRemove bool
IsPrepare bool
Epoch schedulepb.ProcessorEpoch
status dispatchTableTaskStatus
}

func (a *agent) handleMessageDispatchTableRequest(
Expand Down Expand Up @@ -326,12 +326,12 @@ func (a *agent) handleMessageDispatchTableRequest(
case *schedulepb.DispatchTableRequest_AddTable:
tableID := req.AddTable.GetTableID()
task = &dispatchTableTask{
TableID: tableID,
StartTs: req.AddTable.GetCheckpoint().CheckpointTs,
IsRemove: false,
IsPrepare: req.AddTable.GetIsSecondary(),
Epoch: epoch,
status: dispatchTableTaskReceived,
TableID: tableID,
Checkpoint: req.AddTable.GetCheckpoint(),
IsRemove: false,
IsPrepare: req.AddTable.GetIsSecondary(),
Epoch: epoch,
status: dispatchTableTaskReceived,
}
table = a.tableM.addTable(tableID)
case *schedulepb.DispatchTableRequest_RemoveTable:
Expand Down
6 changes: 3 additions & 3 deletions cdc/scheduler/internal/v3/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,11 +931,11 @@ func newMockTableExecutor() *MockTableExecutor {

// AddTable adds a table to the executor.
func (e *MockTableExecutor) AddTable(
ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool,
ctx context.Context, tableID model.TableID, checkpoint tablepb.Checkpoint, isPrepare bool,
) (bool, error) {
log.Info("AddTable",
zap.Int64("tableID", tableID),
zap.Any("startTs", startTs),
zap.Any("startTs", checkpoint),
zap.Bool("isPrepare", isPrepare))

state, ok := e.tables[tableID]
Expand All @@ -954,7 +954,7 @@ func (e *MockTableExecutor) AddTable(
delete(e.tables, tableID)
}
}
args := e.Called(ctx, tableID, startTs, isPrepare)
args := e.Called(ctx, tableID, checkpoint, isPrepare)
if args.Bool(0) {
e.tables[tableID] = tablepb.TableStatePreparing
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess
for changed {
switch state {
case tablepb.TableStateAbsent:
done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, t.task.IsPrepare)
done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.Checkpoint, t.task.IsPrepare)
if err != nil || !done {
log.Warn("schedulerv3: agent add table failed",
zap.String("namespace", t.changefeedID.Namespace),
Expand Down Expand Up @@ -205,7 +205,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess
}

if t.task.status == dispatchTableTaskReceived {
done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, false)
done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.Checkpoint, false)
if err != nil || !done {
log.Warn("schedulerv3: agent add table failed",
zap.String("namespace", t.changefeedID.Namespace),
Expand Down