From d7fa4315228d7078c889ba97155f720147f69a88 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Fri, 26 Nov 2021 14:07:51 +0800 Subject: [PATCH 1/2] cdc/sink: Refine sink interface (#3613) --- cdc/owner/async_sink_test.go | 2 +- cdc/processor/pipeline/sink.go | 10 ++++++---- cdc/processor/pipeline/sink_test.go | 20 ++++++++++---------- cdc/processor/pipeline/table.go | 2 +- cdc/sink/black_hole.go | 4 ++-- cdc/sink/buffer_sink.go | 5 +++-- cdc/sink/cdclog/file.go | 4 ++-- cdc/sink/cdclog/s3.go | 4 ++-- cdc/sink/manager.go | 6 +++--- cdc/sink/manager_test.go | 26 +++++++++++++------------- cdc/sink/mq.go | 4 ++-- cdc/sink/mq_test.go | 10 ++++++---- cdc/sink/mysql.go | 6 +++--- cdc/sink/mysql_test.go | 6 +++--- cdc/sink/simple_mysql_tester.go | 4 ++-- cdc/sink/sink.go | 4 ++-- cdc/sink/table_sink.go | 8 ++++---- cmd/kafka-consumer/main.go | 3 ++- pkg/applier/redo.go | 7 ++++--- 19 files changed, 71 insertions(+), 64 deletions(-) diff --git a/cdc/owner/async_sink_test.go b/cdc/owner/async_sink_test.go index 3c7fc11a11e..529eea01704 100644 --- a/cdc/owner/async_sink_test.go +++ b/cdc/owner/async_sink_test.go @@ -65,7 +65,7 @@ func (m *mockSink) Close(ctx context.Context) error { return nil } -func (m *mockSink) Barrier(ctx context.Context) error { +func (m *mockSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index a53e0c86c57..4d642824b66 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -64,8 +64,9 @@ func (s *TableStatus) Store(new TableStatus) { } type sinkNode struct { - sink sink.Sink - status TableStatus + sink sink.Sink + status TableStatus + tableID model.TableID resolvedTs model.Ts checkpointTs model.Ts @@ -78,8 +79,9 @@ type sinkNode struct { flowController tableFlowController } -func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { +func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { return &sinkNode{ + tableID: tableID, sink: sink, status: TableStatusInitializing, targetTs: targetTs, @@ -136,7 +138,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err if err := n.emitRow2Sink(ctx); err != nil { return errors.Trace(err) } - checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs) + checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs) if err != nil { return errors.Trace(err) } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 12ba3bb1fa5..2b334ca6323 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -76,7 +76,7 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *mockSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { s.received = append(s.received, struct { resolvedTs model.Ts row *model.RowChangedEvent @@ -92,7 +92,7 @@ func (s *mockSink) Close(ctx context.Context) error { return nil } -func (s *mockSink) Barrier(ctx context.Context) error { +func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -137,7 +137,7 @@ func (s *outputSuite) TestStatus(c *check.C) { }) // test stop at targetTs - node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) + node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -163,7 +163,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(10)) // test the stop at ts command - node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -186,7 +186,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(2)) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts - node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -223,7 +223,7 @@ func (s *outputSuite) TestStopStatus(c *check.C) { }) closeCh := make(chan interface{}, 1) - node := newSinkNode(&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) + node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx, @@ -258,7 +258,7 @@ func (s *outputSuite) TestManyTs(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -379,7 +379,7 @@ func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // empty row, no Columns and PreColumns. @@ -399,7 +399,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // nil row. @@ -458,7 +458,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) { }, }) sink := &mockSink{} - node := newSinkNode(sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil) // nil row. diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index be4ba66f5c1..357e2cf7ea5 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -202,7 +202,7 @@ func NewTablePipeline(ctx cdcContext.Context, p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize) sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs, flowController, mounter) - sinkNode := newSinkNode(sink, replicaInfo.StartTs, targetTs, flowController) + sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController) p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName)) p.AppendNode(ctx, "sorter", sorterNode) diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 3eca14a0119..2f730bf381f 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -46,7 +46,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model return nil } -func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs)) err := b.statistics.RecordBatchExecution(func() (int, error) { // TODO: add some random replication latency @@ -79,6 +79,6 @@ func (b *blackHoleSink) Close(ctx context.Context) error { return nil } -func (b *blackHoleSink) Barrier(ctx context.Context) error { +func (b *blackHoleSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cdc/sink/buffer_sink.go b/cdc/sink/buffer_sink.go index d8d70456ca7..de7d7dc3953 100644 --- a/cdc/sink/buffer_sink.go +++ b/cdc/sink/buffer_sink.go @@ -109,7 +109,8 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) { b.bufferMu.Unlock() start := time.Now() - checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, resolvedTs) + // todo: use real table ID + checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, 0, resolvedTs) if err != nil { if errors.Cause(err) != context.Canceled { errCh <- err @@ -146,7 +147,7 @@ func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Ro return nil } -func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { select { case <-ctx.Done(): return atomic.LoadUint64(&b.checkpointTs), ctx.Err() diff --git a/cdc/sink/cdclog/file.go b/cdc/sink/cdclog/file.go index c6794b3c6e2..6913b015a0b 100644 --- a/cdc/sink/cdclog/file.go +++ b/cdc/sink/cdclog/file.go @@ -225,7 +225,7 @@ func (f *fileSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowC return f.emitRowChangedEvents(ctx, newTableStream, rows...) } -func (f *fileSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (f *fileSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { log.Debug("[FlushRowChangedEvents] enter", zap.Uint64("ts", resolvedTs)) return f.flushRowChangedEvents(ctx, resolvedTs) } @@ -349,7 +349,7 @@ func (f *fileSink) Close(ctx context.Context) error { return nil } -func (f *fileSink) Barrier(ctx context.Context) error { +func (f *fileSink) Barrier(ctx context.Context, tableID model.TableID) error { // Barrier does nothing because FlushRowChangedEvents in file sink has flushed // all buffered events forcedlly. return nil diff --git a/cdc/sink/cdclog/s3.go b/cdc/sink/cdclog/s3.go index f76d6c23946..53db1e3fb6d 100644 --- a/cdc/sink/cdclog/s3.go +++ b/cdc/sink/cdclog/s3.go @@ -222,7 +222,7 @@ func (s *s3Sink) flushLogMeta(ctx context.Context) error { return cerror.WrapError(cerror.ErrS3SinkWriteStorage, s.storage.WriteFile(ctx, logMetaFile, data)) } -func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { // we should flush all events before resolvedTs, there are two kind of flush policy // 1. flush row events to a s3 chunk: if the event size is not enough, // TODO: when cdc crashed, we should repair these chunks to a complete file @@ -347,7 +347,7 @@ func (s *s3Sink) Close(ctx context.Context) error { return nil } -func (s *s3Sink) Barrier(ctx context.Context) error { +func (s *s3Sink) Barrier(ctx context.Context, tableID model.TableID) error { // Barrier does nothing because FlushRowChangedEvents in s3 sink has flushed // all buffered events forcedlly. return nil diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 80bf0c265ea..ed80e41703d 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -106,7 +106,7 @@ func (m *Manager) getMinEmittedTs() model.Ts { return minTs } -func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { +func (m *Manager) flushBackendSink(ctx context.Context, tableID model.TableID) (model.Ts, error) { // NOTICE: Because all table sinks will try to flush backend sink, // which will cause a lot of lock contention and blocking in high concurrency cases. // So here we use flushing as a lightweight lock to improve the lock competition problem. @@ -119,7 +119,7 @@ func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { atomic.StoreInt64(&m.flushing, 0) }() minEmittedTs := m.getMinEmittedTs() - checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, minEmittedTs) + checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, tableID, minEmittedTs) if err != nil { return m.getCheckpointTs(), errors.Trace(err) } @@ -142,7 +142,7 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e return ctx.Err() case <-callback: } - return m.backendSink.Barrier(ctx) + return m.backendSink.Barrier(ctx, tableID) } func (m *Manager) getCheckpointTs() uint64 { diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index 29ea4bd83d2..a485d9e7a75 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -55,7 +55,7 @@ func (c *checkSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (c *checkSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { c.rowsMu.Lock() defer c.rowsMu.Unlock() var newRows []*model.RowChangedEvent @@ -83,7 +83,7 @@ func (c *checkSink) Close(ctx context.Context) error { return nil } -func (c *checkSink) Barrier(ctx context.Context) error { +func (c *checkSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -118,7 +118,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { for j := 1; j < rowNum; j++ { if rand.Intn(10) == 0 { resolvedTs := lastResolvedTs + uint64(rand.Intn(j-int(lastResolvedTs))) - _, err := tableSink.FlushRowChangedEvents(ctx, resolvedTs) + _, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), resolvedTs) c.Assert(err, check.IsNil) lastResolvedTs = resolvedTs } else { @@ -129,7 +129,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) { c.Assert(err, check.IsNil) } } - _, err := tableSink.FlushRowChangedEvents(ctx, uint64(rowNum)) + _, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), uint64(rowNum)) c.Assert(err, check.IsNil) }() } @@ -180,7 +180,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) { }) c.Assert(err, check.IsNil) } - _, err := sink.FlushRowChangedEvents(ctx, resolvedTs) + _, err := sink.FlushRowChangedEvents(ctx, sink.(*tableSink).tableID, resolvedTs) if err != nil { c.Assert(errors.Cause(err), check.Equals, context.Canceled) } @@ -244,7 +244,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) { CommitTs: uint64(110), }) c.Assert(err, check.IsNil) - _, err = tableSink.FlushRowChangedEvents(ctx, 110) + _, err = tableSink.FlushRowChangedEvents(ctx, tableID, 110) c.Assert(err, check.IsNil) err = manager.destroyTableSink(ctx, tableID) c.Assert(err, check.IsNil) @@ -295,11 +295,11 @@ func BenchmarkManagerFlushing(b *testing.B) { // All tables are flushed concurrently, except table 0. for i := 1; i < goroutineNum; i++ { i := i - tableSink := tableSinks[i] + tblSink := tableSinks[i] go func() { for j := 1; j < rowNum; j++ { if j%2 == 0 { - _, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(j)) + _, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(j)) if err != nil { b.Error(err) } @@ -310,9 +310,9 @@ func BenchmarkManagerFlushing(b *testing.B) { b.ResetTimer() // Table 0 flush. - tableSink := tableSinks[0] + tblSink := tableSinks[0] for i := 0; i < b.N; i++ { - _, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(rowNum)) + _, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(rowNum)) if err != nil { b.Error(err) } @@ -345,7 +345,7 @@ func (e *errorSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error panic("unreachable") } -func (e *errorSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (e *errorSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { return 0, errors.New("error in flush row changed events") } @@ -357,7 +357,7 @@ func (e *errorSink) Close(ctx context.Context) error { return nil } -func (e *errorSink) Barrier(ctx context.Context) error { +func (e *errorSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } @@ -374,7 +374,7 @@ func (s *managerSuite) TestManagerError(c *check.C) { Table: &model.TableName{TableID: 1}, }) c.Assert(err, check.IsNil) - _, err = sink.FlushRowChangedEvents(ctx, 2) + _, err = sink.FlushRowChangedEvents(ctx, 1, 2) c.Assert(err, check.IsNil) err = <-errCh c.Assert(err.Error(), check.Equals, "error in emit row changed events") diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 5684767beb7..0dc4a96dfad 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -155,7 +155,7 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha return nil } -func (k *mqSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { if resolvedTs <= k.checkpointTs { return k.checkpointTs, nil } @@ -260,7 +260,7 @@ func (k *mqSink) Close(ctx context.Context) error { return errors.Trace(err) } -func (k *mqSink) Barrier(cxt context.Context) error { +func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error { // Barrier does nothing because FlushRowChangedEvents in mq sink has flushed // all buffered events by force. return nil diff --git a/cdc/sink/mq_test.go b/cdc/sink/mq_test.go index 029700c26c4..a77cc3e3c22 100644 --- a/cdc/sink/mq_test.go +++ b/cdc/sink/mq_test.go @@ -80,10 +80,12 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { // mock kafka broker processes 1 row changed event leader.Returns(prodSuccess) + tableID := model.TableID(1) row := &model.RowChangedEvent{ Table: &model.TableName{ - Schema: "test", - Table: "t1", + Schema: "test", + Table: "t1", + TableID: tableID, }, StartTs: 100, CommitTs: 120, @@ -91,11 +93,11 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { } err = sink.EmitRowChangedEvents(ctx, row) c.Assert(err, check.IsNil) - checkpointTs, err := sink.FlushRowChangedEvents(ctx, uint64(120)) + checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, uint64(120)) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) // flush older resolved ts - checkpointTs, err = sink.FlushRowChangedEvents(ctx, uint64(110)) + checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, uint64(110)) c.Assert(err, check.IsNil) c.Assert(checkpointTs, check.Equals, uint64(120)) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 4b2e609ae07..3b4d8478b7a 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -207,7 +207,7 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row // FlushRowChangedEvents will flush all received events, we don't allow mysql // sink to receive events before resolving -func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { if atomic.LoadUint64(&s.maxResolvedTs) < resolvedTs { atomic.StoreUint64(&s.maxResolvedTs, resolvedTs) } @@ -478,7 +478,7 @@ func (s *mysqlSink) Close(ctx context.Context) error { return cerror.WrapError(cerror.ErrMySQLConnectionError, err) } -func (s *mysqlSink) Barrier(ctx context.Context) error { +func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error { warnDuration := 3 * time.Minute ticker := time.NewTicker(warnDuration) defer ticker.Stop() @@ -495,7 +495,7 @@ func (s *mysqlSink) Barrier(ctx context.Context) error { if s.checkpointTs() >= maxResolvedTs { return nil } - checkpointTs, err := s.FlushRowChangedEvents(ctx, maxResolvedTs) + checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolvedTs) if err != nil { return err } diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 5588074f5b6..c7328070eef 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -678,7 +678,7 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { c.Assert(err, check.IsNil) err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, uint64(2)) + ts, err := sink.FlushRowChangedEvents(ctx, 2, uint64(2)) c.Assert(err, check.IsNil) if ts < uint64(2) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 2) @@ -689,7 +689,7 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { c.Assert(err, check.IsNil) err = retry.Do(context.Background(), func() error { - ts, err := sink.FlushRowChangedEvents(ctx, uint64(4)) + ts, err := sink.FlushRowChangedEvents(ctx, 2, uint64(4)) c.Assert(err, check.IsNil) if ts < uint64(4) { return errors.Errorf("checkpoint ts %d less than resolved ts %d", ts, 4) @@ -698,7 +698,7 @@ func (s MySQLSinkSuite) TestNewMySQLSinkExecDML(c *check.C) { }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError)) c.Assert(err, check.IsNil) - err = sink.Barrier(ctx) + err = sink.Barrier(ctx, 2) c.Assert(err, check.IsNil) err = sink.Close(ctx) diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index c6abc68a736..fd435344b5c 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -186,7 +186,7 @@ func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all of Event which of commitTs less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` -func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (s *simpleMySQLSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { s.rowsBufferLock.Lock() defer s.rowsBufferLock.Unlock() newBuffer := make([]*model.RowChangedEvent, 0, len(s.rowsBuffer)) @@ -216,7 +216,7 @@ func (s *simpleMySQLSink) Close(ctx context.Context) error { return s.db.Close() } -func (s *simpleMySQLSink) Barrier(ctx context.Context) error { +func (s *simpleMySQLSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 3d28595a707..6b89e334167 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -45,7 +45,7 @@ type Sink interface { // FlushRowChangedEvents flushes each row which of commitTs less than or equal to `resolvedTs` into downstream. // TiCDC guarantees that all the Events whose commitTs is less than or equal to `resolvedTs` are sent to Sink through `EmitRowChangedEvents` - FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) + FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) // EmitCheckpointTs sends CheckpointTs to Sink // TiCDC guarantees that all Events **in the cluster** which of commitTs less than or equal `checkpointTs` are sent to downstream successfully. @@ -56,7 +56,7 @@ type Sink interface { // Barrier is a synchronous function to wait all events to be flushed in underlying sink // Note once Barrier is called, the resolved ts won't be pushed until the Barrier call returns. - Barrier(ctx context.Context) error + Barrier(ctx context.Context, tableID model.TableID) error } var sinkIniterMap = make(map[string]sinkInitFunc) diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go index 098566b6499..2cb30150ae9 100644 --- a/cdc/sink/table_sink.go +++ b/cdc/sink/table_sink.go @@ -54,7 +54,7 @@ func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error // FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs // is required to be no more than global resolvedTs, table barrierTs and table // redo log watermarkTs. -func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { +func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { i := sort.Search(len(t.buffer), func(i int) bool { return t.buffer[i].CommitTs > resolvedTs }) @@ -64,7 +64,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 if err != nil { return ckpt, err } - return t.manager.flushBackendSink(ctx) + return t.manager.flushBackendSink(ctx, tableID) } resolvedRows := t.buffer[:i] t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) @@ -78,7 +78,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 if err != nil { return ckpt, err } - return t.manager.flushBackendSink(ctx) + return t.manager.flushBackendSink(ctx, tableID) } func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { @@ -116,6 +116,6 @@ func (t *tableSink) Close(ctx context.Context) error { } // Barrier is not used in table sink -func (t *tableSink) Barrier(ctx context.Context) error { +func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error { return nil } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 566063d6406..2ab0af53c34 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -593,7 +593,8 @@ func syncFlushRowChangedEvents(ctx context.Context, sink sink.Sink, resolvedTs u return ctx.Err() default: } - checkpointTs, err := sink.FlushRowChangedEvents(ctx, resolvedTs) + // todo: use real table id + checkpointTs, err := sink.FlushRowChangedEvents(ctx, 0, resolvedTs) if err != nil { return err } diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go index a4421250846..14868cfcf07 100644 --- a/pkg/applier/redo.go +++ b/pkg/applier/redo.go @@ -151,7 +151,8 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { lastSafeResolvedTs, lastResolvedTs = lastResolvedTs, redoLog.Row.CommitTs } } - _, err = s.FlushRowChangedEvents(ctx, lastSafeResolvedTs) + // todo: use real table ID + _, err = s.FlushRowChangedEvents(ctx, 0, lastSafeResolvedTs) if err != nil { return err } @@ -160,11 +161,11 @@ func (ra *RedoApplier) consumeLogs(ctx context.Context) error { if err != nil { return err } - _, err = s.FlushRowChangedEvents(ctx, resolvedTs) + _, err = s.FlushRowChangedEvents(ctx, 0, resolvedTs) if err != nil { return err } - err = s.Barrier(ctx) + err = s.Barrier(ctx, 0) if err != nil { return err } From 4c30fdf6701edcf8994cb659391e24fd5ccf86ab Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 26 Nov 2021 14:29:50 +0800 Subject: [PATCH 2/2] *(dm): support start/stop relay cmd without worker name (#3226) --- dm/_utils/terror_gen/errors_release.txt | 4 + dm/dm/ctl/master/start_stop_relay.go | 11 +- dm/dm/master/scheduler/scheduler.go | 99 ++++++++++++- dm/dm/master/scheduler/scheduler_test.go | 140 ++++++++++++++++++ dm/dm/master/server.go | 20 ++- dm/dm/worker/server.go | 11 +- dm/dm/worker/source_worker.go | 27 +++- dm/dm/worker/source_worker_test.go | 8 +- dm/errors.toml | 24 +++ dm/pkg/terror/error_list.go | 8 + dm/pkg/utils/util.go | 9 ++ dm/tests/adjust_gtid/run.sh | 4 - dm/tests/all_mode/run.sh | 4 - .../dmctl_basic/check_list/start_relay.sh | 26 +++- dm/tests/dmctl_basic/check_list/stop_relay.sh | 24 ++- dm/tests/dmctl_basic/run.sh | 11 +- dm/tests/duplicate_event/conf/source2.yaml | 2 +- dm/tests/full_mode/run.sh | 4 - dm/tests/gtid/run.sh | 3 - dm/tests/ha/conf/source2.yaml | 2 +- dm/tests/incremental_mode/conf/source1.yaml | 2 +- dm/tests/incremental_mode/conf/source2.yaml | 4 +- dm/tests/initial_unit/run.sh | 4 - dm/tests/lightning_mode/run.sh | 4 - dm/tests/online_ddl/run.sh | 8 - dm/tests/only_dml/run.sh | 8 - dm/tests/relay_interrupt/run.sh | 5 - dm/tests/tiup/lib.sh | 9 +- dm/tests/tracker_ignored_ddl/run.sh | 8 + tools/check/go.sum | 2 + 30 files changed, 403 insertions(+), 92 deletions(-) diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index c81497876da..f629222737e 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -526,6 +526,10 @@ ErrSchedulerLatchInUse,[code=46024:class=scheduler:scope=internal:level=low], "M ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update relay-log related parts for now" ErrSchedulerWrongWorkerInput,[code=46026:class=scheduler:scope=internal:level=medium], "Message: require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]" ErrSchedulerBoundDiffWithStartedRelay,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s], Workaround: If you intend to bind the source with worker, you can stop-relay for current source." +ErrSchedulerStartRelayOnSpecified,[code=46028:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now, Workaround: Please stop all relay workers first, or specify worker name for `start-relay`." +ErrSchedulerStopRelayOnSpecified,[code=46029:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `stop-relay` without worker name now, Workaround: Please specify worker names for `stop-relay`." +ErrSchedulerStartRelayOnBound,[code=46030:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `start-relay` with worker name now, Workaround: Please stop relay by `stop-relay` without worker name first." +ErrSchedulerStopRelayOnBound,[code=46031:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `stop-relay` with worker name now, Workaround: Please use `stop-relay` without worker name." ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection." ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line." ErrCtlLoadTLSCfg,[code=48003:class=dmctl:scope=internal:level=high], "Message: can not load tls config, Workaround: Please ensure that the tls certificate is accessible on the node currently running dmctl." diff --git a/dm/dm/ctl/master/start_stop_relay.go b/dm/dm/ctl/master/start_stop_relay.go index 8cd101fce8b..21925af46c5 100644 --- a/dm/dm/ctl/master/start_stop_relay.go +++ b/dm/dm/ctl/master/start_stop_relay.go @@ -58,17 +58,14 @@ func startStopRelay(cmd *cobra.Command, op pb.RelayOpV2) error { return err } - if len(cmd.Flags().Args()) == 0 { + if len(cmd.Flags().Args()) == 0 && len(sources) == 0 { + // all args empty cmd.SetOut(os.Stdout) - if len(sources) == 0 { - // all args empty - common.PrintCmdUsage(cmd) - } else { - common.PrintLinesf("must specify at least one worker") - } + common.PrintCmdUsage(cmd) return errors.New("please check output to see error") } + // TODO: support multiple sources and all sources if len(sources) != 1 { common.PrintLinesf("must specify one source (`-s` / `--source`)") return errors.New("please check output to see error") diff --git a/dm/dm/master/scheduler/scheduler.go b/dm/dm/master/scheduler/scheduler.go index 3d2a7bca6fc..21de5b1737c 100644 --- a/dm/dm/master/scheduler/scheduler.go +++ b/dm/dm/master/scheduler/scheduler.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/ticdc/dm/pkg/ha" "github.com/pingcap/ticdc/dm/pkg/log" "github.com/pingcap/ticdc/dm/pkg/terror" + "github.com/pingcap/ticdc/dm/pkg/utils" ) // Scheduler schedules tasks for DM-worker instances, including: @@ -59,6 +60,25 @@ import ( // - remove source request from user. // TODO: try to handle the return `err` of etcd operations, // because may put into etcd, but the response to the etcd client interrupted. +// Relay scheduling: +// - scheduled by source +// DM-worker will enable relay according to its bound source, in current implementation, it will read `enable-relay` +// of source config and decide whether to enable relay. +// turn on `enable-relay`: +// - use `enable-relay: true` when create source +// - `start-relay -s source` to dynamically change `enable-relay` +// turn off `enable-relay`: +// - use `enable-relay: false` when create source +// - `stop-relay -s source` to dynamically change `enable-relay` +// - found conflict schedule type with (source, worker) when scheduler bootstrap +// - scheduled by (source, worker) +// DM-worker will check if relay is assigned to it no matter it's bound or not. In current implementation, it will +// read UpstreamRelayWorkerKeyAdapter in etcd. +// add UpstreamRelayWorkerKeyAdapter: +// - use `start-relay -s source -w worker` +// remove UpstreamRelayWorkerKeyAdapter: +// - use `stop-relay -s source -w worker` +// - remove worker by `offline-member` type Scheduler struct { mu sync.RWMutex @@ -1052,10 +1072,37 @@ func (s *Scheduler) StartRelay(source string, workers []string) error { } // 1. precheck - if _, ok := s.sourceCfgs[source]; !ok { + sourceCfg, ok := s.sourceCfgs[source] + if !ok { return terror.ErrSchedulerSourceCfgNotExist.Generate(source) } startedWorkers := s.relayWorkers[source] + + // quick path for `start-relay` without worker name + if len(workers) == 0 { + if len(startedWorkers) != 0 { + return terror.ErrSchedulerStartRelayOnSpecified.Generate(utils.SetToSlice(startedWorkers)) + } + // update enable-relay in source config + sourceCfg.EnableRelay = true + _, err := ha.PutSourceCfg(s.etcdCli, sourceCfg) + if err != nil { + return err + } + s.sourceCfgs[source] = sourceCfg + // notify bound worker + w, ok2 := s.bounds[source] + if !ok2 { + return nil + } + stage := ha.NewRelayStage(pb.Stage_Running, source) + _, err = ha.PutRelayStageSourceBound(s.etcdCli, stage, w.Bound()) + return err + } else if sourceCfg.EnableRelay { + // error when `enable-relay` and `start-relay` with worker name + return terror.ErrSchedulerStartRelayOnBound.Generate() + } + if startedWorkers == nil { startedWorkers = map[string]struct{}{} s.relayWorkers[source] = startedWorkers @@ -1139,9 +1186,37 @@ func (s *Scheduler) StopRelay(source string, workers []string) error { } // 1. precheck - if _, ok := s.sourceCfgs[source]; !ok { + sourceCfg, ok := s.sourceCfgs[source] + if !ok { return terror.ErrSchedulerSourceCfgNotExist.Generate(source) } + + // quick path for `stop-relay` without worker name + if len(workers) == 0 { + startedWorker := s.relayWorkers[source] + if len(startedWorker) != 0 { + return terror.ErrSchedulerStopRelayOnSpecified.Generate(utils.SetToSlice(startedWorker)) + } + // update enable-relay in source config + sourceCfg.EnableRelay = false + _, err := ha.PutSourceCfg(s.etcdCli, sourceCfg) + if err != nil { + return err + } + s.sourceCfgs[source] = sourceCfg + // notify bound worker + w, ok2 := s.bounds[source] + if !ok2 { + return nil + } + // TODO: remove orphan relay stage + _, err = ha.PutSourceBound(s.etcdCli, w.Bound()) + return err + } else if sourceCfg.EnableRelay { + // error when `enable-relay` and `stop-relay` with worker name + return terror.ErrSchedulerStopRelayOnBound.Generate() + } + var ( notExistWorkers []string unmatchedWorkers, unmatchedSources []string @@ -1469,11 +1544,31 @@ func (s *Scheduler) recoverSubTasks(cli *clientv3.Client) error { } // recoverRelayConfigs recovers history relay configs for each worker from etcd. +// This function also removes conflicting relay schedule types, which means if a source has both `enable-relay` and +// (source, worker) relay config, we remove the latter. +// should be called after recoverSources. func (s *Scheduler) recoverRelayConfigs(cli *clientv3.Client) error { relayWorkers, _, err := ha.GetAllRelayConfig(cli) if err != nil { return err } + + for source, workers := range relayWorkers { + sourceCfg, ok := s.sourceCfgs[source] + if !ok { + s.logger.Warn("found a not existing source by relay config", zap.String("source", source)) + continue + } + if sourceCfg.EnableRelay { + // current etcd max-txn-op is 2048 + _, err2 := ha.DeleteRelayConfig(cli, utils.SetToSlice(workers)...) + if err2 != nil { + return err2 + } + delete(relayWorkers, source) + } + } + s.relayWorkers = relayWorkers return nil } diff --git a/dm/dm/master/scheduler/scheduler_test.go b/dm/dm/master/scheduler/scheduler_test.go index ec2d5c116de..3e393c224fc 100644 --- a/dm/dm/master/scheduler/scheduler_test.go +++ b/dm/dm/master/scheduler/scheduler_test.go @@ -1353,6 +1353,82 @@ func (t *testScheduler) TestStartStopRelay(c *C) { c.Assert(bound, IsFalse) } +func (t *testScheduler) TestRelayWithWithoutWorker(c *C) { + defer clearTestInfoOperation(c) + + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + sourceID1 = "mysql-replica-1" + workerName1 = "dm-worker-1" + workerName2 = "dm-worker-2" + ) + + worker1 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName1}} + worker2 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName2}} + + // step 1: start an empty scheduler + s.started = true + s.etcdCli = etcdTestCli + s.workers[workerName1] = worker1 + s.workers[workerName2] = worker2 + s.sourceCfgs[sourceID1] = &config.SourceConfig{} + + worker1.ToFree() + c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil) + worker2.ToFree() + + // step 2: check when enable-relay = false, can start/stop relay without worker name + c.Assert(s.StartRelay(sourceID1, []string{}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsTrue) + + c.Assert(s.StartRelay(sourceID1, []string{}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsTrue) + + c.Assert(s.StopRelay(sourceID1, []string{}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsFalse) + + c.Assert(s.StopRelay(sourceID1, []string{}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsFalse) + + // step 3: check when enable-relay = false, can start/stop relay with worker name + c.Assert(s.StartRelay(sourceID1, []string{workerName1, workerName2}), IsNil) + c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsFalse) + c.Assert(worker1.Stage(), Equals, WorkerBound) + c.Assert(worker2.Stage(), Equals, WorkerRelay) + + c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil) + c.Assert(worker1.Stage(), Equals, WorkerBound) + c.Assert(worker2.Stage(), Equals, WorkerRelay) + + c.Assert(s.StopRelay(sourceID1, []string{workerName2}), IsNil) + c.Assert(worker1.Stage(), Equals, WorkerBound) + c.Assert(worker2.Stage(), Equals, WorkerFree) + + // step 4: check when enable-relay = true, can't start/stop relay with worker name + c.Assert(s.StartRelay(sourceID1, []string{}), IsNil) + + err := s.StartRelay(sourceID1, []string{workerName1}) + c.Assert(terror.ErrSchedulerStartRelayOnBound.Equal(err), IsTrue) + err = s.StartRelay(sourceID1, []string{workerName2}) + c.Assert(terror.ErrSchedulerStartRelayOnBound.Equal(err), IsTrue) + + err = s.StopRelay(sourceID1, []string{workerName1}) + c.Assert(terror.ErrSchedulerStopRelayOnBound.Equal(err), IsTrue) + err = s.StopRelay(sourceID1, []string{workerName2}) + c.Assert(terror.ErrSchedulerStopRelayOnBound.Equal(err), IsTrue) + + c.Assert(s.StopRelay(sourceID1, []string{}), IsNil) + + // step5. check when started relay with workerName, can't turn on enable-relay + c.Assert(s.StartRelay(sourceID1, []string{workerName1}), IsNil) + + err = s.StartRelay(sourceID1, []string{}) + c.Assert(terror.ErrSchedulerStartRelayOnSpecified.Equal(err), IsTrue) + err = s.StopRelay(sourceID1, []string{}) + c.Assert(terror.ErrSchedulerStopRelayOnSpecified.Equal(err), IsTrue) +} + func checkAllWorkersClosed(c *C, s *Scheduler, closed bool) { for _, worker := range s.workers { cli, ok := worker.cli.(*workerrpc.GRPCClient) @@ -1711,3 +1787,67 @@ func (t *testScheduler) TestWorkerHasDiffRelayAndBound(c *C) { _, ok = s.unbounds[sourceID1] c.Assert(ok, IsTrue) } + +func (t *testScheduler) TestUpgradeCauseConflictRelayType(c *C) { + defer clearTestInfoOperation(c) + + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + sourceID1 = "mysql-replica-1" + workerName1 = "dm-worker-1" + workerName2 = "dm-worker-2" + keepAlive = int64(3) + ) + + workerInfo1 := ha.WorkerInfo{Name: workerName1} + workerInfo2 := ha.WorkerInfo{Name: workerName2} + bound := ha.SourceBound{ + Source: sourceID1, + Worker: workerName1, + } + + sourceCfg, err := config.LoadFromFile("../source.yaml") + c.Assert(err, IsNil) + sourceCfg.Checker.BackoffMax = config.Duration{Duration: 5 * time.Second} + + // prepare etcd data + s.etcdCli = etcdTestCli + sourceCfg.EnableRelay = true + sourceCfg.SourceID = sourceID1 + _, err = ha.PutSourceCfg(etcdTestCli, sourceCfg) + c.Assert(err, IsNil) + _, err = ha.PutRelayConfig(etcdTestCli, sourceID1, workerName1) + c.Assert(err, IsNil) + _, err = ha.PutRelayConfig(etcdTestCli, sourceID1, workerName2) + c.Assert(err, IsNil) + _, err = ha.PutWorkerInfo(etcdTestCli, workerInfo1) + c.Assert(err, IsNil) + _, err = ha.PutWorkerInfo(etcdTestCli, workerInfo2) + c.Assert(err, IsNil) + _, err = ha.PutSourceBound(etcdTestCli, bound) + c.Assert(err, IsNil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + //nolint:errcheck + go ha.KeepAlive(ctx, etcdTestCli, workerName1, keepAlive) + //nolint:errcheck + go ha.KeepAlive(ctx, etcdTestCli, workerName2, keepAlive) + + // bootstrap + c.Assert(s.recoverSources(etcdTestCli), IsNil) + c.Assert(s.recoverRelayConfigs(etcdTestCli), IsNil) + _, err = s.recoverWorkersBounds(etcdTestCli) + c.Assert(err, IsNil) + + // check when the relay config is conflicting with source config, relay config should be deleted + c.Assert(s.relayWorkers[sourceID1], HasLen, 0) + result, _, err := ha.GetAllRelayConfig(etcdTestCli) + c.Assert(err, IsNil) + c.Assert(result, HasLen, 0) + + worker := s.workers[workerName1] + c.Assert(worker.Stage(), Equals, WorkerBound) + c.Assert(worker.RelaySourceID(), HasLen, 0) + c.Assert(s.workers[workerName2].Stage(), Equals, WorkerFree) +} diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index abc312d60a6..3d8d7516ffd 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -875,10 +875,28 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR var wg sync.WaitGroup for _, source := range req.Sources { - workers, err := s.scheduler.GetRelayWorkers(source) + var ( + workers []*scheduler.Worker + workerNameSet = make(map[string]struct{}) + err error + ) + + workers, err = s.scheduler.GetRelayWorkers(source) if err != nil { return nil, err } + // returned workers is not duplicated + for _, w := range workers { + workerNameSet[w.BaseInfo().Name] = struct{}{} + } + // subtask workers may have been found in relay workers + taskWorker := s.scheduler.GetWorkerBySource(source) + if taskWorker != nil { + if _, ok := workerNameSet[taskWorker.BaseInfo().Name]; !ok { + workers = append(workers, taskWorker) + } + } + if len(workers) == 0 { setWorkerResp(errorCommonWorkerResponse(fmt.Sprintf("relay worker for source %s not found, please `start-relay` first", source), source, "")) continue diff --git a/dm/dm/worker/server.go b/dm/dm/worker/server.go index 0b4f0237506..69853b0d890 100644 --- a/dm/dm/worker/server.go +++ b/dm/dm/worker/server.go @@ -360,7 +360,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error { // we check if observeSourceBound has started a worker // TODO: add a test for this situation if !w.relayEnabled.Load() { - if err2 := w.EnableRelay(); err2 != nil { + if err2 := w.EnableRelay(false); err2 != nil { return err2 } } @@ -679,12 +679,15 @@ func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock b } if sourceCfg.EnableRelay { - w.startedRelayBySourceCfg = true - if err2 := w.EnableRelay(); err2 != nil { + log.L().Info("will start relay by `enable-relay` in source config") + if err2 := w.EnableRelay(true); err2 != nil { log.L().Error("found a `enable-relay: true` source, but failed to enable relay for DM worker", zap.Error(err2)) return err2 } + } else if w.startedRelayBySourceCfg { + log.L().Info("will disable relay by `enable-relay: false` in source config") + w.DisableRelay() } if err2 := w.EnableHandleSubtasks(); err2 != nil { @@ -748,7 +751,7 @@ func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) erro // because no re-assigned mechanism exists for keepalived DM-worker yet. return err2 } - if err2 = w.EnableRelay(); err2 != nil { + if err2 = w.EnableRelay(false); err2 != nil { s.setSourceStatus(sourceCfg.SourceID, err2, false) return err2 } diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index 67f1a51eac9..1417353892f 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -267,7 +267,19 @@ func (w *SourceWorker) updateSourceStatus(ctx context.Context) error { } // EnableRelay enables the functionality of start/watch/handle relay. -func (w *SourceWorker) EnableRelay() (err error) { +// According to relay schedule of DM-master, a source worker will enable relay in two scenarios: its bound source has +// `enable-relay: true` in config, or it has a UpstreamRelayWorkerKeyAdapter etcd KV. +// The paths to EnableRelay are: +// - source config `enable-relay: true`, which is checked in enableHandleSubtasks +// - when DM-worker Server.Start +// - when DM-worker Server watches a SourceBound change, which is to turn a free source worker to bound or notify a +// bound worker that source config has changed +// - when DM-worker Server fails watching and recovers from a snapshot +// - UpstreamRelayWorkerKeyAdapter +// - when DM-worker Server.Start +// - when DM-worker Server watches a UpstreamRelayWorkerKeyAdapter change +// - when DM-worker Server fails watching and recovers from a snapshot +func (w *SourceWorker) EnableRelay(startBySourceCfg bool) (err error) { w.l.Info("enter EnableRelay") w.Lock() defer w.Unlock() @@ -276,6 +288,8 @@ func (w *SourceWorker) EnableRelay() (err error) { return nil } + w.startedRelayBySourceCfg = startBySourceCfg + var sourceCfg *config.SourceConfig failpoint.Inject("MockGetSourceCfgFromETCD", func(_ failpoint.Value) { failpoint.Goto("bypass") @@ -364,10 +378,21 @@ func (w *SourceWorker) EnableRelay() (err error) { } // DisableRelay disables the functionality of start/watch/handle relay. +// a source worker will disable relay by the reason of EnableRelay is no longer valid. +// The paths to DisableRelay are: +// - source config `enable-relay: true` no longer valid +// - when DM-worker Server watches a SourceBound change, which is to notify that source config has changed, and the +// worker has started relay by that bound +// - when the source worker is unbound and has started relay by that bound +// - UpstreamRelayWorkerKeyAdapter no longer valid +// - when DM-worker Server watches a UpstreamRelayWorkerKeyAdapter change +// - when DM-worker Server fails watching and recovers from a snapshot func (w *SourceWorker) DisableRelay() { w.l.Info("enter DisableRelay") w.Lock() defer w.Unlock() + + w.startedRelayBySourceCfg = false if !w.relayEnabled.CAS(true, false) { w.l.Warn("already disabled relay") return diff --git a/dm/dm/worker/source_worker_test.go b/dm/dm/worker/source_worker_test.go index 5934abc5291..e68ef583c36 100644 --- a/dm/dm/worker/source_worker_test.go +++ b/dm/dm/worker/source_worker_test.go @@ -81,7 +81,7 @@ func (t *testServer) testWorker(c *C) { }() w, err := NewSourceWorker(cfg, etcdCli, "") c.Assert(err, IsNil) - c.Assert(w.EnableRelay(), ErrorMatches, "init error") + c.Assert(w.EnableRelay(false), ErrorMatches, "init error") NewRelayHolder = NewDummyRelayHolder w, err = NewSourceWorker(cfg, etcdCli, "") @@ -187,7 +187,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { w, err2 := s.getOrStartWorker(sourceConfig, true) c.Assert(err2, IsNil) // we set sourceConfig.EnableRelay = true above - c.Assert(w.EnableRelay(), IsNil) + c.Assert(w.EnableRelay(false), IsNil) c.Assert(w.EnableHandleSubtasks(), IsNil) return true }), IsTrue) @@ -363,7 +363,7 @@ func (t *testWorkerFunctionalities) TestWorkerFunctionalities(c *C) { func (t *testWorkerFunctionalities) testEnableRelay(c *C, w *SourceWorker, etcdCli *clientv3.Client, sourceCfg *config.SourceConfig, cfg *Config) { - c.Assert(w.EnableRelay(), IsNil) + c.Assert(w.EnableRelay(false), IsNil) c.Assert(w.relayEnabled.Load(), IsTrue) c.Assert(w.relayHolder.Stage(), Equals, pb.Stage_New) @@ -588,7 +588,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { defer cancel() defer w.Close() go func() { - c.Assert(w.EnableRelay(), IsNil) + c.Assert(w.EnableRelay(false), IsNil) w.Start() }() c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool { diff --git a/dm/errors.toml b/dm/errors.toml index 9fd7af01ece..0386727b985 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -3166,6 +3166,30 @@ description = "" workaround = "If you intend to bind the source with worker, you can stop-relay for current source." tags = ["internal", "medium"] +[error.DM-scheduler-46028] +message = "the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now" +description = "" +workaround = "Please stop all relay workers first, or specify worker name for `start-relay`." +tags = ["internal", "low"] + +[error.DM-scheduler-46029] +message = "the source has `start-relay` with worker name for workers %v, so it can't `stop-relay` without worker name now" +description = "" +workaround = "Please specify worker names for `stop-relay`." +tags = ["internal", "low"] + +[error.DM-scheduler-46030] +message = "the source has `start-relay` automatically for bound worker, so it can't `start-relay` with worker name now" +description = "" +workaround = "Please stop relay by `stop-relay` without worker name first." +tags = ["internal", "low"] + +[error.DM-scheduler-46031] +message = "the source has `start-relay` automatically for bound worker, so it can't `stop-relay` with worker name now" +description = "" +workaround = "Please use `stop-relay` without worker name." +tags = ["internal", "low"] + [error.DM-dmctl-48001] message = "can not create grpc connection" description = "" diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index 347192f5719..b1dc05a4437 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -643,6 +643,10 @@ const ( codeSchedulerSourceCfgUpdate codeSchedulerWrongWorkerInput codeSchedulerCantTransferToRelayWorker + codeSchedulerStartRelayOnSpecified + codeSchedulerStopRelayOnSpecified + codeSchedulerStartRelayOnBound + codeSchedulerStopRelayOnBound ) // dmctl error code. @@ -1285,6 +1289,10 @@ var ( ErrSchedulerSourceCfgUpdate = New(codeSchedulerSourceCfgUpdate, ClassScheduler, ScopeInternal, LevelLow, "source can only update relay-log related parts for now", "") ErrSchedulerWrongWorkerInput = New(codeSchedulerWrongWorkerInput, ClassScheduler, ScopeInternal, LevelMedium, "require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]", "") ErrSchedulerBoundDiffWithStartedRelay = New(codeSchedulerCantTransferToRelayWorker, ClassScheduler, ScopeInternal, LevelMedium, "require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s]", "If you intend to bind the source with worker, you can stop-relay for current source.") + ErrSchedulerStartRelayOnSpecified = New(codeSchedulerStartRelayOnSpecified, ClassScheduler, ScopeInternal, LevelLow, "the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now", "Please stop all relay workers first, or specify worker name for `start-relay`.") + ErrSchedulerStopRelayOnSpecified = New(codeSchedulerStopRelayOnSpecified, ClassScheduler, ScopeInternal, LevelLow, "the source has `start-relay` with worker name for workers %v, so it can't `stop-relay` without worker name now", "Please specify worker names for `stop-relay`.") + ErrSchedulerStartRelayOnBound = New(codeSchedulerStartRelayOnBound, ClassScheduler, ScopeInternal, LevelLow, "the source has `start-relay` automatically for bound worker, so it can't `start-relay` with worker name now", "Please stop relay by `stop-relay` without worker name first.") + ErrSchedulerStopRelayOnBound = New(codeSchedulerStopRelayOnBound, ClassScheduler, ScopeInternal, LevelLow, "the source has `start-relay` automatically for bound worker, so it can't `stop-relay` with worker name now", "Please use `stop-relay` without worker name.") // dmctl. ErrCtlGRPCCreateConn = New(codeCtlGRPCCreateConn, ClassDMCtl, ScopeInternal, LevelHigh, "can not create grpc connection", "Please check your network connection.") diff --git a/dm/pkg/utils/util.go b/dm/pkg/utils/util.go index d6975f668a0..2f9f072edd6 100644 --- a/dm/pkg/utils/util.go +++ b/dm/pkg/utils/util.go @@ -264,6 +264,15 @@ func proxyFields() []zap.Field { return fields } +// SetToSlice converts a map of struct{} value to a slice to pretty print. +func SetToSlice(set map[string]struct{}) []string { + slice := make([]string, 0, len(set)) + for key := range set { + slice = append(slice, key) + } + return slice +} + func NewStoppedTimer() *time.Timer { // stopped timer should be Reset with correct duration, so use 0 here t := time.NewTimer(0) diff --git a/dm/tests/adjust_gtid/run.sh b/dm/tests/adjust_gtid/run.sh index a06dc92fa70..1cca51fe0e3 100755 --- a/dm/tests/adjust_gtid/run.sh +++ b/dm/tests/adjust_gtid/run.sh @@ -70,10 +70,6 @@ function run() { # make sure source1 is bound to worker1 dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 diff --git a/dm/tests/all_mode/run.sh b/dm/tests/all_mode/run.sh index 0d127e841a3..ce27dce043c 100755 --- a/dm/tests/all_mode/run.sh +++ b/dm/tests/all_mode/run.sh @@ -362,10 +362,6 @@ function run() { # make sure source1 is bound to worker1 dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 diff --git a/dm/tests/dmctl_basic/check_list/start_relay.sh b/dm/tests/dmctl_basic/check_list/start_relay.sh index 5af13ffb787..323f2a79fc6 100644 --- a/dm/tests/dmctl_basic/check_list/start_relay.sh +++ b/dm/tests/dmctl_basic/check_list/start_relay.sh @@ -12,12 +12,6 @@ function start_relay_wrong_arg() { "must specify one source (\`-s\` \/ \`--source\`)" 1 } -function start_relay_without_worker() { - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1" \ - "must specify at least one worker" 1 -} - function start_relay_success() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-relay -s $SOURCE_ID1 worker1" \ @@ -32,12 +26,30 @@ function start_relay_success() { "\"worker\": \"worker2\"" 1 } -function start_relay_fail() { +function start_relay_without_worker_name_success() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1" \ + "\"result\": true" 1 +} + +function start_relay_diff_worker_fail() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-relay -s $SOURCE_ID1 worker2" \ "these workers \[worker2\] have bound for another sources \[$SOURCE_ID2\] respectively" 1 } +function start_relay_with_worker_name_fail() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "can't \`start-relay\` with worker name now" 1 +} + +function start_relay_without_worker_name_fail() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1" \ + "can't \`start-relay\` without worker name now" 1 +} + function start_relay_on_offline_worker() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-relay -s $SOURCE_ID2 worker2" \ diff --git a/dm/tests/dmctl_basic/check_list/stop_relay.sh b/dm/tests/dmctl_basic/check_list/stop_relay.sh index b5e6904c93d..765efa2eccf 100644 --- a/dm/tests/dmctl_basic/check_list/stop_relay.sh +++ b/dm/tests/dmctl_basic/check_list/stop_relay.sh @@ -12,12 +12,6 @@ function stop_relay_wrong_arg() { "must specify one source (\`-s\` \/ \`--source\`)" 1 } -function stop_relay_without_worker() { - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "stop-relay -s $SOURCE_ID1" \ - "must specify at least one worker" 1 -} - function stop_relay_success() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-relay -s $SOURCE_ID1 worker1" \ @@ -27,12 +21,30 @@ function stop_relay_success() { "\"result\": true" 2 } +function stop_relay_with_worker_name_success() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-relay -s $SOURCE_ID1" \ + "\"result\": true" 1 +} + function stop_relay_fail() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-relay -s $SOURCE_ID1 worker2" \ "these workers \[worker2\] have started relay for another sources \[$SOURCE_ID2\] respectively" 1 } +function stop_relay_with_worker_name_fail() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-relay -s $SOURCE_ID1 worker1" \ + "can't \`stop-relay\` with worker name now" 1 +} + +function stop_relay_without_worker_name_fail() { + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-relay -s $SOURCE_ID1" \ + "can't \`stop-relay\` without worker name now" 1 +} + function stop_relay_on_offline_worker() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-relay -s $SOURCE_ID2 worker2" \ diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index 02af24df688..30796edcb9e 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -76,12 +76,10 @@ function usage_and_arg_test() { echo "start_relay_empty_arg" start_relay_empty_arg start_relay_wrong_arg - start_relay_without_worker echo "stop_relay_empty_arg" stop_relay_empty_arg stop_relay_wrong_arg - stop_relay_without_worker } function recover_max_binlog_size() { @@ -267,7 +265,9 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT start_relay_success - start_relay_fail + start_relay_diff_worker_fail + start_relay_without_worker_name_fail + stop_relay_without_worker_name_fail echo "pause_relay_success" pause_relay_success @@ -337,6 +337,11 @@ function run() { # update_task_success_single_worker $TASK_CONF $SOURCE_ID1 # update_task_success $TASK_CONF + start_relay_without_worker_name_success + start_relay_with_worker_name_fail + stop_relay_with_worker_name_fail + stop_relay_with_worker_name_success + start_relay_success run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 diff --git a/dm/tests/duplicate_event/conf/source2.yaml b/dm/tests/duplicate_event/conf/source2.yaml index 538e8942a40..621e82bfce5 100644 --- a/dm/tests/duplicate_event/conf/source2.yaml +++ b/dm/tests/duplicate_event/conf/source2.yaml @@ -2,7 +2,7 @@ source-id: mysql-replica-02 server-id: 123456 flavor: 'mysql' enable-gtid: true -enable-relay: true +enable-relay: false from: host: 127.0.0.1 user: root diff --git a/dm/tests/full_mode/run.sh b/dm/tests/full_mode/run.sh index 87e4f5a4b2b..aac7afef52e 100755 --- a/dm/tests/full_mode/run.sh +++ b/dm/tests/full_mode/run.sh @@ -37,10 +37,6 @@ function fail_acquire_global_lock() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID2 worker2" \ - "\"result\": true" 2 - cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml sed -i '/heartbeat-report-interval/i\ignore-checking-items: ["dump_privilege"]' $WORK_DIR/dm-task.yaml run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ diff --git a/dm/tests/gtid/run.sh b/dm/tests/gtid/run.sh index 44824a6878b..adca64f0320 100755 --- a/dm/tests/gtid/run.sh +++ b/dm/tests/gtid/run.sh @@ -24,9 +24,6 @@ function run() { run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT diff --git a/dm/tests/ha/conf/source2.yaml b/dm/tests/ha/conf/source2.yaml index d6f08468313..fb1985ca354 100644 --- a/dm/tests/ha/conf/source2.yaml +++ b/dm/tests/ha/conf/source2.yaml @@ -1,7 +1,7 @@ source-id: mysql-replica-02 flavor: '' enable-gtid: false -enable-relay: true +enable-relay: false from: host: 127.0.0.1 user: root diff --git a/dm/tests/incremental_mode/conf/source1.yaml b/dm/tests/incremental_mode/conf/source1.yaml index 406e7ae2547..679a2f4db7c 100644 --- a/dm/tests/incremental_mode/conf/source1.yaml +++ b/dm/tests/incremental_mode/conf/source1.yaml @@ -3,7 +3,7 @@ flavor: 'mysql' enable-gtid: true relay-binlog-name: '' relay-binlog-gtid: '' -enable-relay: true +enable-relay: false from: host: 127.0.0.1 user: root diff --git a/dm/tests/incremental_mode/conf/source2.yaml b/dm/tests/incremental_mode/conf/source2.yaml index e21304db8e3..21294f2c27c 100644 --- a/dm/tests/incremental_mode/conf/source2.yaml +++ b/dm/tests/incremental_mode/conf/source2.yaml @@ -3,11 +3,11 @@ flavor: '' enable-gtid: false relay-binlog-name: '' relay-binlog-gtid: '' -enable-relay: true +enable-relay: false from: host: 127.0.0.1 user: root password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= port: 3307 checker: - check-enable: false \ No newline at end of file + check-enable: false diff --git a/dm/tests/initial_unit/run.sh b/dm/tests/initial_unit/run.sh index 3f479d08958..fbbae7de5ad 100644 --- a/dm/tests/initial_unit/run.sh +++ b/dm/tests/initial_unit/run.sh @@ -43,10 +43,6 @@ function run() { sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - echo "start task and query status, the sync unit will initial failed" task_conf="$cur/conf/dm-task.yaml" run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ diff --git a/dm/tests/lightning_mode/run.sh b/dm/tests/lightning_mode/run.sh index c5a7ad43ae7..b2c9263d266 100755 --- a/dm/tests/lightning_mode/run.sh +++ b/dm/tests/lightning_mode/run.sh @@ -37,10 +37,6 @@ function run() { # make sure source1 is bound to worker1 dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 diff --git a/dm/tests/online_ddl/run.sh b/dm/tests/online_ddl/run.sh index e487f91538d..cca736349ae 100755 --- a/dm/tests/online_ddl/run.sh +++ b/dm/tests/online_ddl/run.sh @@ -40,10 +40,6 @@ function run() { dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID2 worker2" \ - "\"result\": true" 2 - # imitate a DM task is started during the running of online DDL tool # *_ignore will be skipped by block-allow-list run_sql_source1 "create table online_ddl.gho_ignore (c int); create table online_ddl._gho_ignore_gho (c int);" @@ -116,10 +112,6 @@ function run() { run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID2 worker3" \ - "\"result\": true" 2 - echo "wait and check task running" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ diff --git a/dm/tests/only_dml/run.sh b/dm/tests/only_dml/run.sh index fb0c192ac0e..2de2f17ee00 100755 --- a/dm/tests/only_dml/run.sh +++ b/dm/tests/only_dml/run.sh @@ -69,14 +69,6 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - # start relay - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID2 worker2" \ - "\"result\": true" 2 - # check dm-workers metrics unit: relay file index must be 1. check_metric $WORKER1_PORT "dm_relay_binlog_file" 3 0 2 check_metric $WORKER2_PORT "dm_relay_binlog_file" 3 0 2 diff --git a/dm/tests/relay_interrupt/run.sh b/dm/tests/relay_interrupt/run.sh index 8763e01920a..7f6e97dcd5a 100644 --- a/dm/tests/relay_interrupt/run.sh +++ b/dm/tests/relay_interrupt/run.sh @@ -47,11 +47,6 @@ function run() { sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 1 \ - "ERROR 1152" 1 - echo "query status, relay log failed" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s $SOURCE_ID1" \ diff --git a/dm/tests/tiup/lib.sh b/dm/tests/tiup/lib.sh index 318b8be2724..648df2791f2 100755 --- a/dm/tests/tiup/lib.sh +++ b/dm/tests/tiup/lib.sh @@ -265,14 +265,7 @@ function run_dmctl_with_retry() { function ensure_start_relay() { # manually enable relay for source1 after v2.0.2 if [[ "$PRE_VER" != "v2.0.0" ]] && [[ "$PRE_VER" != "v2.0.1" ]]; then - dmctl_log="get-worker.txt" # always use CUR_VER, because we might use tiup mirror in previous steps. - tiup dmctl:$CUR_VER --master-addr=master1:8261 operate-source show -s mysql-replica-01 >$dmctl_log 2>&1 - worker=$(grep "worker" $dmctl_log | awk -F'"' '{ print $4 }') - if [[ "$PRE_VER" == "v2.0.2" ]] || [[ "$PRE_VER" == "v2.0.3" ]] || [[ "$PRE_VER" == "v2.0.4" ]] || [[ "$PRE_VER" == "v2.0.5" ]] || [[ "$PRE_VER" == "v2.0.6" ]] || [[ "$PRE_VER" == "v2.0.7" ]]; then - run_dmctl_with_retry $CUR_VER "start-relay -s mysql-replica-01 $worker" "\"result\": true" 1 - else - run_dmctl_with_retry $CUR_VER "start-relay -s mysql-replica-01 $worker" "\"result\": true" 2 - fi + run_dmctl_with_retry $CUR_VER "start-relay -s mysql-replica-01" "\"result\": true" 2 fi } diff --git a/dm/tests/tracker_ignored_ddl/run.sh b/dm/tests/tracker_ignored_ddl/run.sh index 57368dd23c1..670eadd4337 100644 --- a/dm/tests/tracker_ignored_ddl/run.sh +++ b/dm/tests/tracker_ignored_ddl/run.sh @@ -47,6 +47,14 @@ function run() { run_sql_tidb "select count(1) from $TEST_NAME.t1;" check_contains "count(1): 2" echo "increment2 check success" + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-relay -s $SOURCE_ID1" \ + "\"result\": true" 1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 1 } cleanup_data $TEST_NAME diff --git a/tools/check/go.sum b/tools/check/go.sum index 5e57e018ef0..3a9006fdcc7 100644 --- a/tools/check/go.sum +++ b/tools/check/go.sum @@ -662,6 +662,7 @@ github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuM github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3 h1:8l9lu9RjWkI/VeqrP+Fn3tvZNPu5GYP0rYLLN5Q46go= github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= +github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd h1:I8IeI8MNiZVKnwuXhcIIzz6pREcOSbq18Q31KYIzFVM= github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd/go.mod h1:IVF+ijPSMZVtx2oIqxAg7ur6EyixtTYfOHwpfmlhqI4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e h1:aoZm08cpOy4WuID//EZDgcC4zIxODThtZNPirFr42+A= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -736,6 +737,7 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg github.com/securego/gosec/v2 v2.8.1 h1:Tyy/nsH39TYCOkqf5HAgRE+7B5D8sHDwPdXRgFWokh8= github.com/securego/gosec/v2 v2.8.1/go.mod h1:pUmsq6+VyFEElJMUX+QB3p3LWNHXg1R3xh2ssVJPs8Q= github.com/securego/gosec/v2 v2.9.1/go.mod h1:oDcDLcatOJxkCGaCaq8lua1jTnYf6Sou4wdiJ1n4iHc= +github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c h1:W65qqJCIOVP4jpqPQ0YvHYKwcMEMVWIzWC5iNQQfBTU= github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs=