Skip to content

Commit

Permalink
*(dm): support start/stop relay cmd without worker name (#3226)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Nov 26, 2021
1 parent d7fa431 commit 4c30fdf
Show file tree
Hide file tree
Showing 30 changed files with 403 additions and 92 deletions.
4 changes: 4 additions & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,10 @@ ErrSchedulerLatchInUse,[code=46024:class=scheduler:scope=internal:level=low], "M
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update relay-log related parts for now"
ErrSchedulerWrongWorkerInput,[code=46026:class=scheduler:scope=internal:level=medium], "Message: require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]"
ErrSchedulerBoundDiffWithStartedRelay,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s], Workaround: If you intend to bind the source with worker, you can stop-relay for current source."
ErrSchedulerStartRelayOnSpecified,[code=46028:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now, Workaround: Please stop all relay workers first, or specify worker name for `start-relay`."
ErrSchedulerStopRelayOnSpecified,[code=46029:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `stop-relay` without worker name now, Workaround: Please specify worker names for `stop-relay`."
ErrSchedulerStartRelayOnBound,[code=46030:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `start-relay` with worker name now, Workaround: Please stop relay by `stop-relay` without worker name first."
ErrSchedulerStopRelayOnBound,[code=46031:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `stop-relay` with worker name now, Workaround: Please use `stop-relay` without worker name."
ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection."
ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line."
ErrCtlLoadTLSCfg,[code=48003:class=dmctl:scope=internal:level=high], "Message: can not load tls config, Workaround: Please ensure that the tls certificate is accessible on the node currently running dmctl."
Expand Down
11 changes: 4 additions & 7 deletions dm/dm/ctl/master/start_stop_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,14 @@ func startStopRelay(cmd *cobra.Command, op pb.RelayOpV2) error {
return err
}

if len(cmd.Flags().Args()) == 0 {
if len(cmd.Flags().Args()) == 0 && len(sources) == 0 {
// all args empty
cmd.SetOut(os.Stdout)
if len(sources) == 0 {
// all args empty
common.PrintCmdUsage(cmd)
} else {
common.PrintLinesf("must specify at least one worker")
}
common.PrintCmdUsage(cmd)
return errors.New("please check output to see error")
}

// TODO: support multiple sources and all sources
if len(sources) != 1 {
common.PrintLinesf("must specify one source (`-s` / `--source`)")
return errors.New("please check output to see error")
Expand Down
99 changes: 97 additions & 2 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/ticdc/dm/pkg/ha"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/terror"
"github.com/pingcap/ticdc/dm/pkg/utils"
)

// Scheduler schedules tasks for DM-worker instances, including:
Expand Down Expand Up @@ -59,6 +60,25 @@ import (
// - remove source request from user.
// TODO: try to handle the return `err` of etcd operations,
// because may put into etcd, but the response to the etcd client interrupted.
// Relay scheduling:
// - scheduled by source
// DM-worker will enable relay according to its bound source, in current implementation, it will read `enable-relay`
// of source config and decide whether to enable relay.
// turn on `enable-relay`:
// - use `enable-relay: true` when create source
// - `start-relay -s source` to dynamically change `enable-relay`
// turn off `enable-relay`:
// - use `enable-relay: false` when create source
// - `stop-relay -s source` to dynamically change `enable-relay`
// - found conflict schedule type with (source, worker) when scheduler bootstrap
// - scheduled by (source, worker)
// DM-worker will check if relay is assigned to it no matter it's bound or not. In current implementation, it will
// read UpstreamRelayWorkerKeyAdapter in etcd.
// add UpstreamRelayWorkerKeyAdapter:
// - use `start-relay -s source -w worker`
// remove UpstreamRelayWorkerKeyAdapter:
// - use `stop-relay -s source -w worker`
// - remove worker by `offline-member`
type Scheduler struct {
mu sync.RWMutex

Expand Down Expand Up @@ -1052,10 +1072,37 @@ func (s *Scheduler) StartRelay(source string, workers []string) error {
}

// 1. precheck
if _, ok := s.sourceCfgs[source]; !ok {
sourceCfg, ok := s.sourceCfgs[source]
if !ok {
return terror.ErrSchedulerSourceCfgNotExist.Generate(source)
}
startedWorkers := s.relayWorkers[source]

// quick path for `start-relay` without worker name
if len(workers) == 0 {
if len(startedWorkers) != 0 {
return terror.ErrSchedulerStartRelayOnSpecified.Generate(utils.SetToSlice(startedWorkers))
}
// update enable-relay in source config
sourceCfg.EnableRelay = true
_, err := ha.PutSourceCfg(s.etcdCli, sourceCfg)
if err != nil {
return err
}
s.sourceCfgs[source] = sourceCfg
// notify bound worker
w, ok2 := s.bounds[source]
if !ok2 {
return nil
}
stage := ha.NewRelayStage(pb.Stage_Running, source)
_, err = ha.PutRelayStageSourceBound(s.etcdCli, stage, w.Bound())
return err
} else if sourceCfg.EnableRelay {
// error when `enable-relay` and `start-relay` with worker name
return terror.ErrSchedulerStartRelayOnBound.Generate()
}

if startedWorkers == nil {
startedWorkers = map[string]struct{}{}
s.relayWorkers[source] = startedWorkers
Expand Down Expand Up @@ -1139,9 +1186,37 @@ func (s *Scheduler) StopRelay(source string, workers []string) error {
}

// 1. precheck
if _, ok := s.sourceCfgs[source]; !ok {
sourceCfg, ok := s.sourceCfgs[source]
if !ok {
return terror.ErrSchedulerSourceCfgNotExist.Generate(source)
}

// quick path for `stop-relay` without worker name
if len(workers) == 0 {
startedWorker := s.relayWorkers[source]
if len(startedWorker) != 0 {
return terror.ErrSchedulerStopRelayOnSpecified.Generate(utils.SetToSlice(startedWorker))
}
// update enable-relay in source config
sourceCfg.EnableRelay = false
_, err := ha.PutSourceCfg(s.etcdCli, sourceCfg)
if err != nil {
return err
}
s.sourceCfgs[source] = sourceCfg
// notify bound worker
w, ok2 := s.bounds[source]
if !ok2 {
return nil
}
// TODO: remove orphan relay stage
_, err = ha.PutSourceBound(s.etcdCli, w.Bound())
return err
} else if sourceCfg.EnableRelay {
// error when `enable-relay` and `stop-relay` with worker name
return terror.ErrSchedulerStopRelayOnBound.Generate()
}

var (
notExistWorkers []string
unmatchedWorkers, unmatchedSources []string
Expand Down Expand Up @@ -1469,11 +1544,31 @@ func (s *Scheduler) recoverSubTasks(cli *clientv3.Client) error {
}

// recoverRelayConfigs recovers history relay configs for each worker from etcd.
// This function also removes conflicting relay schedule types, which means if a source has both `enable-relay` and
// (source, worker) relay config, we remove the latter.
// should be called after recoverSources.
func (s *Scheduler) recoverRelayConfigs(cli *clientv3.Client) error {
relayWorkers, _, err := ha.GetAllRelayConfig(cli)
if err != nil {
return err
}

for source, workers := range relayWorkers {
sourceCfg, ok := s.sourceCfgs[source]
if !ok {
s.logger.Warn("found a not existing source by relay config", zap.String("source", source))
continue
}
if sourceCfg.EnableRelay {
// current etcd max-txn-op is 2048
_, err2 := ha.DeleteRelayConfig(cli, utils.SetToSlice(workers)...)
if err2 != nil {
return err2
}
delete(relayWorkers, source)
}
}

s.relayWorkers = relayWorkers
return nil
}
Expand Down
140 changes: 140 additions & 0 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,82 @@ func (t *testScheduler) TestStartStopRelay(c *C) {
c.Assert(bound, IsFalse)
}

func (t *testScheduler) TestRelayWithWithoutWorker(c *C) {
defer clearTestInfoOperation(c)

var (
logger = log.L()
s = NewScheduler(&logger, config.Security{})
sourceID1 = "mysql-replica-1"
workerName1 = "dm-worker-1"
workerName2 = "dm-worker-2"
)

worker1 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName1}}
worker2 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName2}}

// step 1: start an empty scheduler
s.started = true
s.etcdCli = etcdTestCli
s.workers[workerName1] = worker1
s.workers[workerName2] = worker2
s.sourceCfgs[sourceID1] = &config.SourceConfig{}

worker1.ToFree()
c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil)
worker2.ToFree()

// step 2: check when enable-relay = false, can start/stop relay without worker name
c.Assert(s.StartRelay(sourceID1, []string{}), IsNil)
c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsTrue)

c.Assert(s.StartRelay(sourceID1, []string{}), IsNil)
c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsTrue)

c.Assert(s.StopRelay(sourceID1, []string{}), IsNil)
c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsFalse)

c.Assert(s.StopRelay(sourceID1, []string{}), IsNil)
c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsFalse)

// step 3: check when enable-relay = false, can start/stop relay with worker name
c.Assert(s.StartRelay(sourceID1, []string{workerName1, workerName2}), IsNil)
c.Assert(s.sourceCfgs[sourceID1].EnableRelay, IsFalse)
c.Assert(worker1.Stage(), Equals, WorkerBound)
c.Assert(worker2.Stage(), Equals, WorkerRelay)

c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil)
c.Assert(worker1.Stage(), Equals, WorkerBound)
c.Assert(worker2.Stage(), Equals, WorkerRelay)

c.Assert(s.StopRelay(sourceID1, []string{workerName2}), IsNil)
c.Assert(worker1.Stage(), Equals, WorkerBound)
c.Assert(worker2.Stage(), Equals, WorkerFree)

// step 4: check when enable-relay = true, can't start/stop relay with worker name
c.Assert(s.StartRelay(sourceID1, []string{}), IsNil)

err := s.StartRelay(sourceID1, []string{workerName1})
c.Assert(terror.ErrSchedulerStartRelayOnBound.Equal(err), IsTrue)
err = s.StartRelay(sourceID1, []string{workerName2})
c.Assert(terror.ErrSchedulerStartRelayOnBound.Equal(err), IsTrue)

err = s.StopRelay(sourceID1, []string{workerName1})
c.Assert(terror.ErrSchedulerStopRelayOnBound.Equal(err), IsTrue)
err = s.StopRelay(sourceID1, []string{workerName2})
c.Assert(terror.ErrSchedulerStopRelayOnBound.Equal(err), IsTrue)

c.Assert(s.StopRelay(sourceID1, []string{}), IsNil)

// step5. check when started relay with workerName, can't turn on enable-relay
c.Assert(s.StartRelay(sourceID1, []string{workerName1}), IsNil)

err = s.StartRelay(sourceID1, []string{})
c.Assert(terror.ErrSchedulerStartRelayOnSpecified.Equal(err), IsTrue)
err = s.StopRelay(sourceID1, []string{})
c.Assert(terror.ErrSchedulerStopRelayOnSpecified.Equal(err), IsTrue)
}

func checkAllWorkersClosed(c *C, s *Scheduler, closed bool) {
for _, worker := range s.workers {
cli, ok := worker.cli.(*workerrpc.GRPCClient)
Expand Down Expand Up @@ -1711,3 +1787,67 @@ func (t *testScheduler) TestWorkerHasDiffRelayAndBound(c *C) {
_, ok = s.unbounds[sourceID1]
c.Assert(ok, IsTrue)
}

func (t *testScheduler) TestUpgradeCauseConflictRelayType(c *C) {
defer clearTestInfoOperation(c)

var (
logger = log.L()
s = NewScheduler(&logger, config.Security{})
sourceID1 = "mysql-replica-1"
workerName1 = "dm-worker-1"
workerName2 = "dm-worker-2"
keepAlive = int64(3)
)

workerInfo1 := ha.WorkerInfo{Name: workerName1}
workerInfo2 := ha.WorkerInfo{Name: workerName2}
bound := ha.SourceBound{
Source: sourceID1,
Worker: workerName1,
}

sourceCfg, err := config.LoadFromFile("../source.yaml")
c.Assert(err, IsNil)
sourceCfg.Checker.BackoffMax = config.Duration{Duration: 5 * time.Second}

// prepare etcd data
s.etcdCli = etcdTestCli
sourceCfg.EnableRelay = true
sourceCfg.SourceID = sourceID1
_, err = ha.PutSourceCfg(etcdTestCli, sourceCfg)
c.Assert(err, IsNil)
_, err = ha.PutRelayConfig(etcdTestCli, sourceID1, workerName1)
c.Assert(err, IsNil)
_, err = ha.PutRelayConfig(etcdTestCli, sourceID1, workerName2)
c.Assert(err, IsNil)
_, err = ha.PutWorkerInfo(etcdTestCli, workerInfo1)
c.Assert(err, IsNil)
_, err = ha.PutWorkerInfo(etcdTestCli, workerInfo2)
c.Assert(err, IsNil)
_, err = ha.PutSourceBound(etcdTestCli, bound)
c.Assert(err, IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//nolint:errcheck
go ha.KeepAlive(ctx, etcdTestCli, workerName1, keepAlive)
//nolint:errcheck
go ha.KeepAlive(ctx, etcdTestCli, workerName2, keepAlive)

// bootstrap
c.Assert(s.recoverSources(etcdTestCli), IsNil)
c.Assert(s.recoverRelayConfigs(etcdTestCli), IsNil)
_, err = s.recoverWorkersBounds(etcdTestCli)
c.Assert(err, IsNil)

// check when the relay config is conflicting with source config, relay config should be deleted
c.Assert(s.relayWorkers[sourceID1], HasLen, 0)
result, _, err := ha.GetAllRelayConfig(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(result, HasLen, 0)

worker := s.workers[workerName1]
c.Assert(worker.Stage(), Equals, WorkerBound)
c.Assert(worker.RelaySourceID(), HasLen, 0)
c.Assert(s.workers[workerName2].Stage(), Equals, WorkerFree)
}
20 changes: 19 additions & 1 deletion dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,10 +875,28 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR

var wg sync.WaitGroup
for _, source := range req.Sources {
workers, err := s.scheduler.GetRelayWorkers(source)
var (
workers []*scheduler.Worker
workerNameSet = make(map[string]struct{})
err error
)

workers, err = s.scheduler.GetRelayWorkers(source)
if err != nil {
return nil, err
}
// returned workers is not duplicated
for _, w := range workers {
workerNameSet[w.BaseInfo().Name] = struct{}{}
}
// subtask workers may have been found in relay workers
taskWorker := s.scheduler.GetWorkerBySource(source)
if taskWorker != nil {
if _, ok := workerNameSet[taskWorker.BaseInfo().Name]; !ok {
workers = append(workers, taskWorker)
}
}

if len(workers) == 0 {
setWorkerResp(errorCommonWorkerResponse(fmt.Sprintf("relay worker for source %s not found, please `start-relay` first", source), source, ""))
continue
Expand Down
11 changes: 7 additions & 4 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
// we check if observeSourceBound has started a worker
// TODO: add a test for this situation
if !w.relayEnabled.Load() {
if err2 := w.EnableRelay(); err2 != nil {
if err2 := w.EnableRelay(false); err2 != nil {
return err2
}
}
Expand Down Expand Up @@ -679,12 +679,15 @@ func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock b
}

if sourceCfg.EnableRelay {
w.startedRelayBySourceCfg = true
if err2 := w.EnableRelay(); err2 != nil {
log.L().Info("will start relay by `enable-relay` in source config")
if err2 := w.EnableRelay(true); err2 != nil {
log.L().Error("found a `enable-relay: true` source, but failed to enable relay for DM worker",
zap.Error(err2))
return err2
}
} else if w.startedRelayBySourceCfg {
log.L().Info("will disable relay by `enable-relay: false` in source config")
w.DisableRelay()
}

if err2 := w.EnableHandleSubtasks(); err2 != nil {
Expand Down Expand Up @@ -748,7 +751,7 @@ func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) erro
// because no re-assigned mechanism exists for keepalived DM-worker yet.
return err2
}
if err2 = w.EnableRelay(); err2 != nil {
if err2 = w.EnableRelay(false); err2 != nil {
s.setSourceStatus(sourceCfg.SourceID, err2, false)
return err2
}
Expand Down
Loading

0 comments on commit 4c30fdf

Please sign in to comment.