From 10fe0bdde98d985c065d7fefe6f9e9fb633aa1a5 Mon Sep 17 00:00:00 2001 From: zeminzhou <50103576+zeminzhou@users.noreply.github.com> Date: Thu, 4 Feb 2021 14:22:55 +0800 Subject: [PATCH] worker, ha: For errors that can only be attempted a limited number of times, the number of attempts is limited (#1396) --- dm/worker/join.go | 4 +- dm/worker/server.go | 70 +++++++++++++++------- dm/worker/server_test.go | 123 +++++++++++++++++++++++++++++++++++++++ pkg/etcdutil/etcdutil.go | 12 +++- pkg/ha/bound.go | 4 ++ pkg/ha/source.go | 4 ++ 6 files changed, 194 insertions(+), 23 deletions(-) diff --git a/dm/worker/join.go b/dm/worker/join.go index 5ad1b7d226..974902a71c 100644 --- a/dm/worker/join.go +++ b/dm/worker/join.go @@ -97,7 +97,7 @@ func (s *Server) KeepAlive() { }) { - err1 := ha.KeepAlive(s.ctx, s.etcdClient, s.cfg.Name, s.cfg.KeepAliveTTL) + err1 := ha.KeepAlive(s.kaCtx, s.etcdClient, s.cfg.Name, s.cfg.KeepAliveTTL) log.L().Warn("keepalive with master goroutine paused", zap.Error(err1)) } @@ -110,7 +110,7 @@ func (s *Server) KeepAlive() { return // return if failed to stop the worker. } select { - case <-s.ctx.Done(): + case <-s.kaCtx.Done(): log.L().Info("keepalive with master goroutine exited!") return case <-time.After(retryConnectSleepTime): diff --git a/dm/worker/server.go b/dm/worker/server.go index a317767d47..b8a7bbb42d 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -43,13 +43,14 @@ import ( ) var ( - cmuxReadTimeout = 10 * time.Second - dialTimeout = 3 * time.Second - keepaliveTimeout = 3 * time.Second - keepaliveTime = 3 * time.Second - retryConnectSleepTime = time.Second - syncMasterEndpointsTime = 3 * time.Second - getMinLocForSubTaskFunc = getMinLocForSubTask + cmuxReadTimeout = 10 * time.Second + dialTimeout = 3 * time.Second + keepaliveTimeout = 3 * time.Second + keepaliveTime = 3 * time.Second + retryGetSourceBoundConfig = 5 + retryConnectSleepTime = time.Second + syncMasterEndpointsTime = 3 * time.Second + getMinLocForSubTaskFunc = getMinLocForSubTask ) // Server accepts RPC requests @@ -58,10 +59,14 @@ var ( type Server struct { sync.Mutex wg sync.WaitGroup + kaWg sync.WaitGroup closed sync2.AtomicBool ctx context.Context cancel context.CancelFunc + kaCtx context.Context + kaCancel context.CancelFunc + cfg *Config rootLis net.Listener @@ -137,21 +142,19 @@ func (s *Server) Start() error { s.wg.Done() }() - s.wg.Add(1) - go func() { - defer s.wg.Done() - // TODO: handle fatal error from observeSourceBound - //nolint:errcheck - s.observeSourceBound(s.ctx, revBound) - }() + s.startKeepAlive() s.wg.Add(1) - go func() { + go func(ctx context.Context) { defer s.wg.Done() - // worker keepalive with master - // If worker loses connect from master, it would stop all task and try to connect master again. - s.KeepAlive() - }() + for { + err1 := s.observeSourceBound(ctx, revBound) + if err1 == nil { + return + } + s.restartKeepAlive() + } + }(s.ctx) // create a cmux m := cmux.New(s.rootLis) @@ -190,6 +193,29 @@ func (s *Server) Start() error { return terror.ErrWorkerStartService.Delegate(err) } +// worker keepalive with master +// If worker loses connect from master, it would stop all task and try to connect master again. +func (s *Server) startKeepAlive() { + s.kaWg.Add(1) + s.kaCtx, s.kaCancel = context.WithCancel(s.ctx) + go s.doStartKeepAlive() +} + +func (s *Server) doStartKeepAlive() { + defer s.kaWg.Done() + s.KeepAlive() +} + +func (s *Server) stopKeepAlive() { + s.kaCancel() + s.kaWg.Wait() +} + +func (s *Server) restartKeepAlive() { + s.stopKeepAlive() + s.startKeepAlive() +} + func (s *Server) syncMasterEndpoints(ctx context.Context) { lastClientUrls := []string{} clientURLs := []string{} @@ -257,6 +283,10 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { bound, cfg, rev1, err1 := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name) if err1 != nil { log.L().Error("get source bound from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum)) + retryNum++ + if retryNum > retryGetSourceBoundConfig && etcdutil.IsLimitedRetryableError(err1) { + return err1 + } break } rev = rev1 @@ -282,7 +312,6 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { } } } - retryNum++ } } else { if err != nil { @@ -326,6 +355,7 @@ func (s *Server) doClose() { // Close close the RPC server, this function can be called multiple times func (s *Server) Close() { s.doClose() + s.stopKeepAlive() s.wg.Wait() } diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index a5505a9faa..91c520f959 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -23,6 +23,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/siddontang/go-mysql/mysql" "github.com/tikv/pd/pkg/tempurl" "go.etcd.io/etcd/clientv3" @@ -225,6 +226,128 @@ func (t *testServer) TestServer(c *C) { t.testWorker(c) } +func (t *testServer) TestHandleSourceBoundAfterError(c *C) { + var ( + masterAddr = "127.0.0.1:8261" + keepAliveTTL = int64(1) + ) + // start etcd server + etcdDir := c.MkDir() + ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr) + c.Assert(err, IsNil) + defer ETCD.Close() + cfg := NewConfig() + c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil) + cfg.KeepAliveTTL = keepAliveTTL + + // new etcd client + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: GetJoinURLs(cfg.Join), + DialTimeout: dialTimeout, + DialKeepAliveTime: keepaliveTime, + DialKeepAliveTimeout: keepaliveTimeout, + }) + c.Assert(err, IsNil) + + // watch worker event(oneline or offline) + var ( + wg sync.WaitGroup + startRev int64 = 1 + ) + workerEvCh := make(chan ha.WorkerEvent, 10) + workerErrCh := make(chan error, 10) + ctx, cancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + defer func() { + close(workerEvCh) + close(workerErrCh) + wg.Done() + }() + ha.WatchWorkerEvent(ctx, etcdCli, startRev, workerEvCh, workerErrCh) + }() + + // start worker server + s := NewServer(cfg) + defer s.Close() + go func() { + err1 := s.Start() + c.Assert(err1, IsNil) + }() + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return !s.closed.Get() + }), IsTrue) + + // check if the worker is online + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + select { + case ev := <-workerEvCh: + if !ev.IsDeleted { + return true + } + default: + } + return false + }), IsTrue) + + // enable failpont + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/ha/FailToGetSourceCfg", `return(true)`), IsNil) + sourceCfg := loadSourceConfigWithoutPassword(c) + sourceCfg.EnableRelay = false + _, err = ha.PutSourceCfg(etcdCli, sourceCfg) + c.Assert(err, IsNil) + sourceBound := ha.NewSourceBound(sourceCfg.SourceID, s.cfg.Name) + _, err = ha.PutSourceBound(etcdCli, sourceBound) + c.Assert(err, IsNil) + + // do check until worker offline + c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool { + select { + case ev := <-workerEvCh: + if ev.IsDeleted { + return true + } + default: + } + return false + }), IsTrue) + + // check if the worker is online + c.Assert(utils.WaitSomething(5, time.Duration(s.cfg.KeepAliveTTL)*time.Second, func() bool { + select { + case ev := <-workerEvCh: + if !ev.IsDeleted { + return true + } + default: + } + return false + }), IsTrue) + + // stop watching and disable failpoint + cancel() + wg.Wait() + c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/ha/FailToGetSourceCfg"), IsNil) + + _, err = ha.PutSourceBound(etcdCli, sourceBound) + c.Assert(err, IsNil) + _, err = ha.PutSourceCfg(etcdCli, sourceCfg) + c.Assert(err, IsNil) + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + s.Mutex.Lock() + defer s.Mutex.Unlock() + return s.worker != nil + }), IsTrue) + + _, err = ha.DeleteSourceBound(etcdCli, s.cfg.Name) + c.Assert(err, IsNil) + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + s.Mutex.Lock() + defer s.Mutex.Unlock() + return s.worker == nil + }), IsTrue) +} + func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { var ( masterAddr = "127.0.0.1:8261" diff --git a/pkg/etcdutil/etcdutil.go b/pkg/etcdutil/etcdutil.go index 89ea16c3ea..48bc1eff7d 100644 --- a/pkg/etcdutil/etcdutil.go +++ b/pkg/etcdutil/etcdutil.go @@ -148,7 +148,17 @@ func DoOpsInOneCmpsTxnWithRetry(cli *clientv3.Client, cmps []clientv3.Cmp, opsTh // IsRetryableError check whether error is retryable error for etcd to build again func IsRetryableError(err error) bool { switch errors.Cause(err) { - case v3rpc.ErrCompacted, v3rpc.ErrNoLeader, v3rpc.ErrNoSpace: + case v3rpc.ErrCompacted, v3rpc.ErrNoLeader, v3rpc.ErrNoSpace, context.DeadlineExceeded: + return true + default: + return false + } +} + +// IsLimitedRetryableError check whether error is retryable error for etcd to build again in a limited number of times +func IsLimitedRetryableError(err error) bool { + switch errors.Cause(err) { + case v3rpc.ErrNoSpace, context.DeadlineExceeded: return true default: return false diff --git a/pkg/ha/bound.go b/pkg/ha/bound.go index e6be3e3369..39ce801b62 100644 --- a/pkg/ha/bound.go +++ b/pkg/ha/bound.go @@ -18,6 +18,7 @@ import ( "encoding/json" "time" + "github.com/pingcap/failpoint" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -119,6 +120,9 @@ func GetSourceBound(cli *clientv3.Client, worker string) (map[string]SourceBound resp *clientv3.GetResponse err error ) + failpoint.Inject("FailToGetSourceCfg", func() { + failpoint.Return(sbm, 0, context.DeadlineExceeded) + }) if worker != "" { resp, err = cli.Get(ctx, common.UpstreamBoundWorkerKeyAdapter.Encode(worker)) } else { diff --git a/pkg/ha/source.go b/pkg/ha/source.go index 12a3dedbb3..afa31d8f1d 100644 --- a/pkg/ha/source.go +++ b/pkg/ha/source.go @@ -16,6 +16,7 @@ package ha import ( "context" + "github.com/pingcap/failpoint" "go.etcd.io/etcd/clientv3" "github.com/pingcap/dm/dm/common" @@ -50,6 +51,9 @@ func GetSourceCfg(cli *clientv3.Client, source string, rev int64) (map[string]co resp *clientv3.GetResponse err error ) + failpoint.Inject("FailToGetSourceCfg", func() { + failpoint.Return(scm, 0, context.DeadlineExceeded) + }) if source != "" { resp, err = cli.Get(ctx, common.UpstreamConfigKeyAdapter.Encode(source), clientv3.WithRev(rev)) } else {