diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index 6da2ed12439..8ecc4fef084 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -56,7 +56,11 @@ func NewManager4Test( liveness *model.Liveness, changefeedEpoch uint64, ) *processor { +<<<<<<< HEAD return newProcessor4Test(t, state, captureInfo, createTablePipeline, m.liveness) +======= + return newProcessor4Test(t, state, captureInfo, m.liveness, cfg, false) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) } return m } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 8289c0579f1..dd35806873c 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -120,13 +120,19 @@ func (p *processor) checkReadyForMessages() bool { // 1. `Create Table`, a new table dispatched to the processor, `isPrepare` should be false // 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true. // 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false +<<<<<<< HEAD func (p *processor) AddTable( ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, +======= +func (p *processor) AddTableSpan( + ctx context.Context, span tablepb.Span, checkpoint tablepb.Checkpoint, isPrepare bool, +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) ) (bool, error) { if !p.checkReadyForMessages() { return false, nil } + startTs := checkpoint.CheckpointTs if startTs == 0 { log.Panic("table start ts must not be 0", zap.String("captureID", p.captureInfo.ID), @@ -166,12 +172,22 @@ func (p *processor) AddTable( // table is `prepared`, and a `isPrepare = false` request indicate that old table should // be stopped on original capture already, it's safe to start replicating data now. if !isPrepare { +<<<<<<< HEAD if p.pullBasedSinking { if err := p.sinkManager.StartTable(tableID, startTs); err != nil { return false, errors.Trace(err) } } else { p.tables[tableID].Start(startTs) +======= + if p.redo.r.Enabled() { + // ResolvedTs is store in external storage when redo log is enabled, so we need to + // start table with ResolvedTs in redoDMLManager. + p.redo.r.StartTable(span, checkpoint.ResolvedTs) + } + if err := p.sinkManager.r.StartTable(span, startTs); err != nil { + return false, errors.Trace(err) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) } } return true, nil diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 99549554386..fd2686522bd 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -34,6 +34,11 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" +<<<<<<< HEAD +======= + redoPkg "github.com/pingcap/tiflow/pkg/redo" + "github.com/pingcap/tiflow/pkg/spanz" +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) "github.com/pingcap/tiflow/pkg/upstream" "github.com/stretchr/testify/require" ) @@ -47,6 +52,11 @@ func newProcessor4Test( captureInfo *model.CaptureInfo, createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepb.TablePipeline, error), liveness *model.Liveness, +<<<<<<< HEAD +======= + cfg *config.SchedulerConfig, + enableRedo bool, +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) ) *processor { up := upstream.NewUpstream4Test(nil) p := newProcessor( @@ -54,8 +64,49 @@ func newProcessor4Test( captureInfo, model.ChangeFeedID4Test("processor-test", "processor-test"), up, liveness, 0) p.lazyInit = func(ctx cdcContext.Context) error { +<<<<<<< HEAD p.agent = &mockAgent{executor: p} p.sinkV1 = mocksink.NewNormalMockSink() +======= + if p.initialized { + return nil + } + + if !enableRedo { + p.redo.r = redo.NewDisabledDMLManager() + } else { + tmpDir := t.TempDir() + redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID) + dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{ + Level: string(redoPkg.ConsistentLevelEventual), + MaxLogSize: redoPkg.DefaultMaxLogSize, + FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs, + Storage: "file://" + redoDir, + UseFileBackend: false, + }) + require.NoError(t, err) + p.redo.r = dmlMgr + } + p.redo.name = "RedoManager" + p.redo.changefeedID = changefeedID + p.redo.spawn(ctx) + + p.agent = &mockAgent{executor: p, liveness: liveness} + p.sinkManager.r, p.sourceManager.r, _ = sinkmanager.NewManagerWithMemEngine( + t, changefeedID, state.Info, p.redo.r) + p.sinkManager.name = "SinkManager" + p.sinkManager.changefeedID = changefeedID + p.sinkManager.spawn(ctx) + p.sourceManager.name = "SourceManager" + p.sourceManager.changefeedID = changefeedID + p.sourceManager.spawn(ctx) + + // NOTICE: we have to bind the sourceManager to the sinkManager + // otherwise the sinkManager will not receive the resolvedTs. + p.sourceManager.r.OnResolve(p.sinkManager.r.UpdateReceivedSorterResolvedTs) + + p.initialized = true +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) return nil } p.redoDMLMgr = redo.NewDisabledDMLManager() @@ -65,7 +116,7 @@ func newProcessor4Test( } func initProcessor4Test( - ctx cdcContext.Context, t *testing.T, liveness *model.Liveness, + ctx cdcContext.Context, t *testing.T, liveness *model.Liveness, enableRedo bool, ) (*processor, *orchestrator.ReactorStateTester) { changefeedInfo := ` { @@ -105,7 +156,12 @@ func initProcessor4Test( changefeed := orchestrator.NewChangefeedReactorState( etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID) captureInfo := &model.CaptureInfo{ID: "capture-test", AdvertiseAddr: "127.0.0.1:0000"} +<<<<<<< HEAD p := newProcessor4Test(t, changefeed, captureInfo, newMockTablePipeline, liveness) +======= + cfg := config.NewDefaultSchedulerConfig() + p := newProcessor4Test(t, changefeed, captureInfo, liveness, cfg, enableRedo) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) captureID := ctx.GlobalVars().CaptureInfo.ID changefeedID := ctx.ChangefeedVars().ID @@ -259,7 +315,7 @@ func (a *mockAgent) Close() error { func TestTableExecutorAddingTableIndirectly(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) var err error // init tick @@ -279,7 +335,12 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { tester.MustApplyPatches() // table-1: `preparing` -> `prepared` -> `replicating` +<<<<<<< HEAD ok, err := p.AddTable(ctx, 1, 20, true) +======= + span := spanz.TableIDToComparableSpan(1) + ok, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 20}, true) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) require.NoError(t, err) require.True(t, ok) @@ -305,12 +366,21 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.True(t, done) require.Equal(t, tablepb.TableStatePrepared, table1.State()) +<<<<<<< HEAD ok, err = p.AddTable(ctx, 1, 30, true) +======= + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, true) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(0), table1.sinkStartTs) +<<<<<<< HEAD ok, err = p.AddTable(ctx, 1, 30, false) +======= + // Start to replicate table-1. + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, false) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(30), table1.sinkStartTs) @@ -330,11 +400,113 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Nil(t, p.agent) } +func TestTableExecutorAddingTableIndirectlyWithRedoEnabled(t *testing.T) { + ctx := cdcContext.NewBackendContext4Test(true) + liveness := model.LivenessCaptureAlive + p, tester := initProcessor4Test(ctx, t, &liveness, true) + + // init tick + err := p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + p.changefeed.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + status.CheckpointTs = 20 + return status, true, nil + }) + tester.MustApplyPatches() + + // no operation + err = p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + + // table-1: `preparing` -> `prepared` -> `replicating` + span := spanz.TableIDToComparableSpan(1) + ok, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 20}, true) + require.NoError(t, err) + require.True(t, ok) + p.sinkManager.r.UpdateBarrierTs(20, nil) + stats := p.sinkManager.r.GetTableStats(span) + require.Equal(t, model.Ts(20), stats.CheckpointTs) + require.Equal(t, model.Ts(20), stats.ResolvedTs) + require.Equal(t, model.Ts(20), stats.BarrierTs) + require.Len(t, p.sinkManager.r.GetAllCurrentTableSpans(), 1) + require.Equal(t, 1, p.sinkManager.r.GetAllCurrentTableSpansCount()) + + done := p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), true) + require.False(t, done) + state, ok := p.sinkManager.r.GetTableState(span) + require.True(t, ok) + require.Equal(t, tablepb.TableStatePreparing, state) + + // Push the resolved ts, mock that sorterNode receive first resolved event. + p.sourceManager.r.Add( + span, + []*model.PolymorphicEvent{{ + CRTs: 101, + RawKV: &model.RawKVEntry{ + OpType: model.OpTypeResolved, + CRTs: 101, + }, + }}..., + ) + + err = p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + + done = p.IsAddTableSpanFinished(span, true) + require.True(t, done) + state, ok = p.sinkManager.r.GetTableState(span) + require.True(t, ok) + require.Equal(t, tablepb.TableStatePrepared, state) + + // ignore duplicate add request + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, true) + require.NoError(t, err) + require.True(t, ok) + stats = p.sinkManager.r.GetTableStats(span) + require.Equal(t, model.Ts(20), stats.CheckpointTs) + require.Equal(t, model.Ts(20), stats.ResolvedTs) + require.Equal(t, model.Ts(20), stats.BarrierTs) + + p.sinkManager.r.UpdateBarrierTs(50, nil) + stats = p.sinkManager.r.GetTableStats(span) + require.Equal(t, model.Ts(20), stats.ResolvedTs) + require.Equal(t, model.Ts(50), stats.BarrierTs) + + // Start to replicate table-1. + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30, ResolvedTs: 60}, false) + require.NoError(t, err) + require.True(t, ok) + + stats = p.sinkManager.r.GetTableStats(span) + require.Equal(t, model.Ts(60), stats.ResolvedTs) + require.Equal(t, model.Ts(50), stats.BarrierTs) + + err = p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + + // table-1: `prepared` -> `replicating` + state, ok = p.sinkManager.r.GetTableState(span) + require.True(t, ok) + require.Equal(t, tablepb.TableStateReplicating, state) + + err = p.Close() + require.Nil(t, err) + require.Nil(t, p.agent) +} + func TestProcessorError(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive +<<<<<<< HEAD p, tester := initProcessor4Test(ctx, t, &liveness) var err error +======= + p, tester := initProcessor4Test(ctx, t, &liveness, false) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) // init tick err = p.Tick(ctx) require.Nil(t, err) @@ -354,7 +526,7 @@ func TestProcessorError(t *testing.T) { }, }) - p, tester = initProcessor4Test(ctx, t, &liveness) + p, tester = initProcessor4Test(ctx, t, &liveness, false) // init tick err = p.Tick(ctx) require.Nil(t, err) @@ -373,7 +545,7 @@ func TestProcessorError(t *testing.T) { func TestProcessorExit(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) var err error // init tick err = p.Tick(ctx) @@ -397,18 +569,29 @@ func TestProcessorExit(t *testing.T) { func TestProcessorClose(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive +<<<<<<< HEAD p, tester := initProcessor4Test(ctx, t, &liveness) var err error +======= + p, tester := initProcessor4Test(ctx, t, &liveness, false) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) // init tick err = p.Tick(ctx) require.Nil(t, err) tester.MustApplyPatches() // add tables +<<<<<<< HEAD done, err := p.AddTable(ctx, model.TableID(1), 20, false) require.Nil(t, err) require.True(t, done) done, err = p.AddTable(ctx, model.TableID(2), 30, false) +======= + done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 20}, false) + require.Nil(t, err) + require.True(t, done) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 30}, false) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) require.Nil(t, err) require.True(t, done) @@ -433,17 +616,24 @@ func TestProcessorClose(t *testing.T) { require.True(t, p.tables[1].(*mockTablePipeline).canceled) require.True(t, p.tables[2].(*mockTablePipeline).canceled) - p, tester = initProcessor4Test(ctx, t, &liveness) + p, tester = initProcessor4Test(ctx, t, &liveness, false) // init tick err = p.Tick(ctx) require.Nil(t, err) tester.MustApplyPatches() // add tables +<<<<<<< HEAD done, err = p.AddTable(ctx, model.TableID(1), 20, false) require.Nil(t, err) require.True(t, done) done, err = p.AddTable(ctx, model.TableID(2), 30, false) +======= + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 20}, false) + require.Nil(t, err) + require.True(t, done) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 30}, false) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) require.Nil(t, err) require.True(t, done) err = p.Tick(ctx) @@ -471,10 +661,16 @@ func TestProcessorClose(t *testing.T) { func TestPositionDeleted(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive +<<<<<<< HEAD p, tester := initProcessor4Test(ctx, t, &liveness) var err error // add table done, err := p.AddTable(ctx, model.TableID(1), 30, false) +======= + p, tester := initProcessor4Test(ctx, t, &liveness, false) + // init tick + err := p.Tick(ctx) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) require.Nil(t, err) require.True(t, done) done, err = p.AddTable(ctx, model.TableID(2), 40, false) @@ -485,7 +681,17 @@ func TestPositionDeleted(t *testing.T) { require.Nil(t, err) tester.MustApplyPatches() +<<<<<<< HEAD require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID) +======= + // add table + done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, false) + require.Nil(t, err) + require.True(t, done) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 40}, false) + require.Nil(t, err) + require.True(t, done) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) // some others delete the task position p.changefeed.PatchTaskPosition(p.captureInfo.ID, @@ -505,7 +711,7 @@ func TestPositionDeleted(t *testing.T) { func TestSchemaGC(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) var err error // init tick @@ -564,7 +770,7 @@ func TestIgnorableError(t *testing.T) { func TestUpdateBarrierTs(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) p.changefeed.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { status.CheckpointTs = 5 status.ResolvedTs = 10 @@ -572,7 +778,23 @@ func TestUpdateBarrierTs(t *testing.T) { }) p.schemaStorage.(*mockSchemaStorage).resolvedTs = 10 +<<<<<<< HEAD done, err := p.AddTable(ctx, model.TableID(1), 5, false) +======= + // init tick + err := p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID) + + // Do a no operation tick to lazy init the processor. + err = p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + + span := spanz.TableIDToComparableSpan(1) + done, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 5}, false) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) require.True(t, done) require.Nil(t, err) err = p.Tick(ctx) @@ -604,6 +826,7 @@ func TestUpdateBarrierTs(t *testing.T) { func TestProcessorLiveness(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive +<<<<<<< HEAD p, tester := initProcessor4Test(ctx, t, &liveness) p.lazyInit = func(ctx cdcContext.Context) error { // Mock the newAgent procedure in p.lazyInitImpl, @@ -611,6 +834,9 @@ func TestProcessorLiveness(t *testing.T) { p.agent = &mockAgent{executor: p, liveness: p.liveness} return nil } +======= + p, tester := initProcessor4Test(ctx, t, &liveness, false) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) // First tick for creating position. err := p.Tick(ctx) @@ -629,4 +855,40 @@ func TestProcessorLiveness(t *testing.T) { // Force set liveness to alive. *p.agent.(*mockAgent).liveness = model.LivenessCaptureAlive require.Equal(t, model.LivenessCaptureAlive, p.liveness.Load()) +<<<<<<< HEAD +======= + + require.Nil(t, p.Close()) + tester.MustApplyPatches() +} + +func TestProcessorDostNotStuckInInit(t *testing.T) { + _ = failpoint. + Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkManagerRunError", + "1*return(true)") + defer func() { + _ = failpoint. + Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkManagerRunError") + }() + + ctx := cdcContext.NewBackendContext4Test(true) + liveness := model.LivenessCaptureAlive + p, tester := initProcessor4Test(ctx, t, &liveness, false) + + // First tick for creating position. + err := p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + + // Second tick for init. + err = p.Tick(ctx) + require.Nil(t, err) + + // TODO(qupeng): third tick for handle a warning. + err = p.Tick(ctx) + require.Nil(t, err) + + require.Nil(t, p.Close()) + tester.MustApplyPatches() +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) } diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 6d073634b26..7c001b1b4ad 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -945,6 +945,18 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats { resolvedTs = tableSink.getReceivedSorterResolvedTs() } +<<<<<<< HEAD +======= + if resolvedTs < checkpointTs.ResolvedMark() { + log.Error("sinkManager: resolved ts should not less than checkpoint ts", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &span), + zap.Uint64("resolvedTs", resolvedTs), + zap.Any("checkpointTs", checkpointTs), + zap.Uint64("barrierTs", tableSink.barrierTs.Load())) + } +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) return TableStats{ CheckpointTs: checkpointTs.ResolvedMark(), ResolvedTs: resolvedTs, diff --git a/cdc/processor/sinkmanager/manager_test_helper.go b/cdc/processor/sinkmanager/manager_test_helper.go new file mode 100644 index 00000000000..7726dbd5b25 --- /dev/null +++ b/cdc/processor/sinkmanager/manager_test_helper.go @@ -0,0 +1,95 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sinkmanager + +import ( + "context" + "math" + "testing" + + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" + "github.com/pingcap/tiflow/cdc/redo" + "github.com/pingcap/tiflow/pkg/upstream" + pd "github.com/tikv/pd/client" +) + +// MockPD only for test. +type MockPD struct { + pd.Client + ts int64 +} + +// GetTS implements the PD interface. +func (p *MockPD) GetTS(_ context.Context) (int64, int64, error) { + if p.ts != 0 { + return p.ts, p.ts, nil + } + return math.MaxInt64, math.MaxInt64, nil +} + +// nolint:revive +// In test it is ok move the ctx to the second parameter. +func CreateManagerWithMemEngine( + t *testing.T, + ctx context.Context, + changefeedID model.ChangeFeedID, + changefeedInfo *model.ChangeFeedInfo, + errChan chan error, +) (*SinkManager, *sourcemanager.SourceManager, engine.SortEngine) { + handleError := func(err error) { + if err != nil && errors.Cause(err) != context.Canceled { + select { + case errChan <- err: + case <-ctx.Done(): + } + } + } + + sortEngine := memory.New(context.Background()) + up := upstream.NewUpstream4Test(&MockPD{}) + mg := &entry.MockMountGroup{} + schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64} + + sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false) + go func() { handleError(sourceManager.Run(ctx)) }() + sourceManager.WaitForReady(ctx) + + sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, nil, sourceManager) + go func() { handleError(sinkManager.Run(ctx)) }() + sinkManager.WaitForReady(ctx) + + return sinkManager, sourceManager, sortEngine +} + +// nolint:revive +// In test it is ok move the ctx to the second parameter. +func NewManagerWithMemEngine( + t *testing.T, + changefeedID model.ChangeFeedID, + changefeedInfo *model.ChangeFeedInfo, + redoMgr redo.DMLManager, +) (*SinkManager, *sourcemanager.SourceManager, engine.SortEngine) { + sortEngine := memory.New(context.Background()) + up := upstream.NewUpstream4Test(&MockPD{}) + mg := &entry.MockMountGroup{} + schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64} + sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false) + sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, redoMgr, sourceManager) + return sinkManager, sourceManager, sortEngine +} diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index e06bb04814e..7394a1e52fc 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -166,10 +166,6 @@ func (s *statefulRts) getUnflushed() model.Ts { return atomic.LoadUint64(&s.unflushed) } -func (s *statefulRts) setFlushed(flushed model.Ts) { - atomic.StoreUint64(&s.flushed, flushed) -} - func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (changed bool) { for { old := atomic.LoadUint64(&s.unflushed) @@ -183,6 +179,19 @@ func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (changed bool) { return true } +func (s *statefulRts) checkAndSetFlushed(flushed model.Ts) (changed bool) { + for { + old := atomic.LoadUint64(&s.flushed) + if old > flushed { + return false + } + if atomic.CompareAndSwapUint64(&s.flushed, old, flushed) { + break + } + } + return true +} + // logManager manages redo log writer, buffers un-persistent redo logs, calculates // redo log resolved ts. It implements DDLManager and DMLManager interface. type logManager struct { @@ -287,6 +296,21 @@ func (m *logManager) emitRedoEvents( }) } +<<<<<<< HEAD +======= +// StartTable starts a table, which means the table is ready to emit redo events. +// Note that this function should only be called once when adding a new table to processor. +func (m *logManager) StartTable(span tablepb.Span, resolvedTs uint64) { + // advance unflushed resolved ts + m.onResolvedTsMsg(span, resolvedTs) + + // advance flushed resolved ts + if value, loaded := m.rtsMap.Load(span); loaded { + value.(*statefulRts).checkAndSetFlushed(resolvedTs) + } +} + +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) // UpdateResolvedTs asynchronously updates resolved ts of a single table. func (m *logManager) UpdateResolvedTs( ctx context.Context, @@ -348,10 +372,23 @@ func (m *logManager) prepareForFlush() (tableRtsMap map[model.TableID]model.Ts) return tableRtsMap } +<<<<<<< HEAD func (m *logManager) postFlush(tableRtsMap map[model.TableID]model.Ts) { for tableID, flushed := range tableRtsMap { if value, loaded := m.rtsMap.Load(tableID); loaded { value.(*statefulRts).setFlushed(flushed) +======= +func (m *logManager) postFlush(tableRtsMap *spanz.HashMap[model.Ts]) { + tableRtsMap.Range(func(span tablepb.Span, flushed uint64) bool { + if value, loaded := m.rtsMap.Load(span); loaded { + changed := value.(*statefulRts).checkAndSetFlushed(flushed) + if !changed { + log.Debug("flush redo with regressed resolved ts", + zap.Stringer("span", &span), + zap.Uint64("flushed", flushed), + zap.Uint64("current", value.(*statefulRts).getFlushed())) + } +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) } } } diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 588a64b7bda..231f6db99e8 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -364,8 +364,8 @@ func (m *metaManager) prepareForFlushMeta() (bool, common.LogMeta) { } func (m *metaManager) postFlushMeta(meta common.LogMeta) { - m.metaResolvedTs.setFlushed(meta.ResolvedTs) - m.metaCheckpointTs.setFlushed(meta.CheckpointTs) + m.metaResolvedTs.checkAndSetFlushed(meta.ResolvedTs) + m.metaCheckpointTs.checkAndSetFlushed(meta.CheckpointTs) } func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index 222166d0ecb..82474b0d419 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -26,11 +26,19 @@ import ( // to adapt the current Processor implementation to it. // TODO find a way to make the semantics easier to understand. type TableExecutor interface { +<<<<<<< HEAD // AddTable add a new table with `startTs` // if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol. // if `isPrepare` is false, the 2nd phase. AddTable( ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, +======= + // AddTableSpan add a new table span with `Checkpoint.CheckpointTs` + // if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol. + // if `isPrepare` is false, the 2nd phase. + AddTableSpan( + ctx context.Context, span tablepb.Span, checkpoint tablepb.Checkpoint, isPrepare bool, +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) ) (done bool, err error) // IsAddTableFinished make sure the requested table is in the proper status diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index 59671905be5..5b3aff4f2db 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -293,12 +293,21 @@ const ( ) type dispatchTableTask struct { +<<<<<<< HEAD TableID model.TableID StartTs model.Ts IsRemove bool IsPrepare bool Epoch schedulepb.ProcessorEpoch status dispatchTableTaskStatus +======= + Span tablepb.Span + Checkpoint tablepb.Checkpoint + IsRemove bool + IsPrepare bool + Epoch schedulepb.ProcessorEpoch + status dispatchTableTaskStatus +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) } func (a *agent) handleMessageDispatchTableRequest( @@ -326,12 +335,21 @@ func (a *agent) handleMessageDispatchTableRequest( case *schedulepb.DispatchTableRequest_AddTable: tableID := req.AddTable.GetTableID() task = &dispatchTableTask{ +<<<<<<< HEAD TableID: tableID, StartTs: req.AddTable.GetCheckpoint().CheckpointTs, IsRemove: false, IsPrepare: req.AddTable.GetIsSecondary(), Epoch: epoch, status: dispatchTableTaskReceived, +======= + Span: span, + Checkpoint: req.AddTable.GetCheckpoint(), + IsRemove: false, + IsPrepare: req.AddTable.GetIsSecondary(), + Epoch: epoch, + status: dispatchTableTaskReceived, +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) } table = a.tableM.addTable(tableID) case *schedulepb.DispatchTableRequest_RemoveTable: diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 6d961bc32bd..7360c296044 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -929,12 +929,22 @@ func newMockTableExecutor() *MockTableExecutor { } } +<<<<<<< HEAD // AddTable adds a table to the executor. func (e *MockTableExecutor) AddTable( ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, ) (bool, error) { log.Info("AddTable", zap.Int64("tableID", tableID), +======= +// AddTableSpan adds a table span to the executor. +func (e *MockTableExecutor) AddTableSpan( + ctx context.Context, span tablepb.Span, checkpoint tablepb.Checkpoint, isPrepare bool, +) (bool, error) { + startTs := checkpoint.CheckpointTs + log.Info("AddTableSpan", + zap.String("span", span.String()), +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) zap.Any("startTs", startTs), zap.Bool("isPrepare", isPrepare)) diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index 8cea19c220b..ee23f114f37 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -168,13 +168,22 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message { return nil } +<<<<<<< HEAD func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Message, err error) { state, _ := t.getAndUpdateTableState() +======= +func (t *tableSpan) handleAddTableTask(ctx context.Context) (result *schedulepb.Message, err error) { + state, _ := t.getAndUpdateTableSpanState() +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) changed := true for changed { switch state { case tablepb.TableStateAbsent: +<<<<<<< HEAD done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, t.task.IsPrepare) +======= + done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.Checkpoint, t.task.IsPrepare) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), @@ -205,7 +214,11 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess } if t.task.status == dispatchTableTaskReceived { +<<<<<<< HEAD done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, false) +======= + done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.Checkpoint, false) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), @@ -280,7 +293,11 @@ func (t *table) injectDispatchTableTask(task *dispatchTableTask) { zap.Any("ignoredTask", task)) } +<<<<<<< HEAD func (t *table) poll(ctx context.Context) (*schedulepb.Message, error) { +======= +func (t *tableSpan) poll(ctx context.Context) (*schedulepb.Message, error) { +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) if t.task == nil { return nil, nil } @@ -307,10 +324,19 @@ func newTableManager( } } +<<<<<<< HEAD func (tm *tableManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { result := make([]*schedulepb.Message, 0) for tableID, table := range tm.tables { message, err := table.poll(ctx) +======= +func (tm *tableSpanManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { + result := make([]*schedulepb.Message, 0) + var err error + toBeDropped := []tablepb.Span{} + tm.tables.Ascend(func(span tablepb.Span, table *tableSpan) bool { + message, err1 := table.poll(ctx) +>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281)) if err != nil { return result, errors.Trace(err) }