Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker, ha: For errors that can only be attempted a limited number of times, the number of attempts is limited #1396

Merged
merged 9 commits into from
Feb 4, 2021
4 changes: 2 additions & 2 deletions dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *Server) KeepAlive() {
})

{
err1 := ha.KeepAlive(s.ctx, s.etcdClient, s.cfg.Name, s.cfg.KeepAliveTTL)
err1 := ha.KeepAlive(s.kaCtx, s.etcdClient, s.cfg.Name, s.cfg.KeepAliveTTL)
log.L().Warn("keepalive with master goroutine paused", zap.Error(err1))
}

Expand All @@ -110,7 +110,7 @@ func (s *Server) KeepAlive() {
return // return if failed to stop the worker.
}
select {
case <-s.ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using another variable to save s.ctx here to avoid data race.

case <-s.kaCtx.Done():
log.L().Info("keepalive with master goroutine exited!")
return
case <-time.After(retryConnectSleepTime):
Expand Down
70 changes: 50 additions & 20 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ import (
)

var (
cmuxReadTimeout = 10 * time.Second
dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryConnectSleepTime = time.Second
syncMasterEndpointsTime = 3 * time.Second
getMinLocForSubTaskFunc = getMinLocForSubTask
cmuxReadTimeout = 10 * time.Second
dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryGetSourceBoundConfig = 5
retryConnectSleepTime = time.Second
syncMasterEndpointsTime = 3 * time.Second
getMinLocForSubTaskFunc = getMinLocForSubTask
)

// Server accepts RPC requests
Expand All @@ -58,10 +59,14 @@ var (
type Server struct {
sync.Mutex
wg sync.WaitGroup
kaWg sync.WaitGroup
closed sync2.AtomicBool
ctx context.Context
cancel context.CancelFunc

kaCtx context.Context
kaCancel context.CancelFunc

cfg *Config

rootLis net.Listener
Expand Down Expand Up @@ -137,21 +142,19 @@ func (s *Server) Start() error {
s.wg.Done()
}()

s.wg.Add(1)
go func() {
defer s.wg.Done()
// TODO: handle fatal error from observeSourceBound
//nolint:errcheck
s.observeSourceBound(s.ctx, revBound)
}()
s.startKeepAlive()

s.wg.Add(1)
go func() {
go func(ctx context.Context) {
defer s.wg.Done()
// worker keepalive with master
// If worker loses connect from master, it would stop all task and try to connect master again.
s.KeepAlive()
}()
for {
err1 := s.observeSourceBound(ctx, revBound)
if err1 == nil {
return
}
s.restartKeepAlive()
}
}(s.ctx)

// create a cmux
m := cmux.New(s.rootLis)
Expand Down Expand Up @@ -190,6 +193,29 @@ func (s *Server) Start() error {
return terror.ErrWorkerStartService.Delegate(err)
}

// worker keepalive with master
// If worker loses connect from master, it would stop all task and try to connect master again.
func (s *Server) startKeepAlive() {
s.kaWg.Add(1)
s.kaCtx, s.kaCancel = context.WithCancel(s.ctx)
go s.doStartKeepAlive()
}

func (s *Server) doStartKeepAlive() {
defer s.kaWg.Done()
s.KeepAlive()
}

func (s *Server) stopKeepAlive() {
s.kaCancel()
s.kaWg.Wait()
}

func (s *Server) restartKeepAlive() {
s.stopKeepAlive()
s.startKeepAlive()
}

func (s *Server) syncMasterEndpoints(ctx context.Context) {
lastClientUrls := []string{}
clientURLs := []string{}
Expand Down Expand Up @@ -257,6 +283,10 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
bound, cfg, rev1, err1 := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err1 != nil {
log.L().Error("get source bound from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum))
retryNum++
if retryNum > retryGetSourceBoundConfig && etcdutil.IsLimitedRetryableError(err1) {
return err1
}
break
}
rev = rev1
Expand All @@ -282,7 +312,6 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}
}
}
retryNum++
}
} else {
if err != nil {
Expand Down Expand Up @@ -326,6 +355,7 @@ func (s *Server) doClose() {
// Close close the RPC server, this function can be called multiple times
func (s *Server) Close() {
s.doClose()
s.stopKeepAlive()
s.wg.Wait()
}

Expand Down
123 changes: 123 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/siddontang/go-mysql/mysql"
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -225,6 +226,128 @@ func (t *testServer) TestServer(c *C) {
t.testWorker(c)
}

func (t *testServer) TestHandleSourceBoundAfterError(c *C) {
var (
masterAddr = "127.0.0.1:8261"
keepAliveTTL = int64(1)
)
// start etcd server
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.KeepAliveTTL = keepAliveTTL

// new etcd client
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: GetJoinURLs(cfg.Join),
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
})
c.Assert(err, IsNil)

// watch worker event(oneline or offline)
var (
wg sync.WaitGroup
startRev int64 = 1
)
workerEvCh := make(chan ha.WorkerEvent, 10)
workerErrCh := make(chan error, 10)
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer func() {
close(workerEvCh)
close(workerErrCh)
wg.Done()
}()
ha.WatchWorkerEvent(ctx, etcdCli, startRev, workerEvCh, workerErrCh)
}()

// start worker server
s := NewServer(cfg)
defer s.Close()
go func() {
err1 := s.Start()
c.Assert(err1, IsNil)
}()
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return !s.closed.Get()
}), IsTrue)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

// check if the worker is online
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
select {
case ev := <-workerEvCh:
if !ev.IsDeleted {
return true
}
default:
}
return false
}), IsTrue)

// enable failpont
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/ha/FailToGetSourceCfg", `return(true)`), IsNil)
sourceCfg := loadSourceConfigWithoutPassword(c)
sourceCfg.EnableRelay = false
_, err = ha.PutSourceCfg(etcdCli, sourceCfg)
c.Assert(err, IsNil)
sourceBound := ha.NewSourceBound(sourceCfg.SourceID, s.cfg.Name)
_, err = ha.PutSourceBound(etcdCli, sourceBound)
c.Assert(err, IsNil)

// do check until worker offline
c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool {
select {
case ev := <-workerEvCh:
if ev.IsDeleted {
return true
}
default:
}
return false
}), IsTrue)

// check if the worker is online
c.Assert(utils.WaitSomething(5, time.Duration(s.cfg.KeepAliveTTL)*time.Second, func() bool {
select {
case ev := <-workerEvCh:
if !ev.IsDeleted {
return true
}
default:
}
return false
}), IsTrue)

// stop watching and disable failpoint
cancel()
wg.Wait()
c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/ha/FailToGetSourceCfg"), IsNil)

_, err = ha.PutSourceBound(etcdCli, sourceBound)
c.Assert(err, IsNil)
_, err = ha.PutSourceCfg(etcdCli, sourceCfg)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
s.Mutex.Lock()
defer s.Mutex.Unlock()
return s.worker != nil
}), IsTrue)

_, err = ha.DeleteSourceBound(etcdCli, s.cfg.Name)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
s.Mutex.Lock()
defer s.Mutex.Unlock()
return s.worker == nil
}), IsTrue)
}

func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
var (
masterAddr = "127.0.0.1:8261"
Expand Down
12 changes: 11 additions & 1 deletion pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,17 @@ func DoOpsInOneCmpsTxnWithRetry(cli *clientv3.Client, cmps []clientv3.Cmp, opsTh
// IsRetryableError check whether error is retryable error for etcd to build again
func IsRetryableError(err error) bool {
switch errors.Cause(err) {
case v3rpc.ErrCompacted, v3rpc.ErrNoLeader, v3rpc.ErrNoSpace:
case v3rpc.ErrCompacted, v3rpc.ErrNoLeader, v3rpc.ErrNoSpace, context.DeadlineExceeded:
return true
default:
return false
}
}

// IsLimitedRetryableError check whether error is retryable error for etcd to build again in a limited number of times
func IsLimitedRetryableError(err error) bool {
switch errors.Cause(err) {
case v3rpc.ErrNoSpace, context.DeadlineExceeded:
return true
default:
return false
Expand Down
4 changes: 4 additions & 0 deletions pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"time"

"github.com/pingcap/failpoint"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -119,6 +120,9 @@ func GetSourceBound(cli *clientv3.Client, worker string) (map[string]SourceBound
resp *clientv3.GetResponse
err error
)
failpoint.Inject("FailToGetSourceCfg", func() {
failpoint.Return(sbm, 0, context.DeadlineExceeded)
})
if worker != "" {
resp, err = cli.Get(ctx, common.UpstreamBoundWorkerKeyAdapter.Encode(worker))
} else {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ha/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ha
import (
"context"

"github.com/pingcap/failpoint"
"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/dm/common"
Expand Down Expand Up @@ -50,6 +51,9 @@ func GetSourceCfg(cli *clientv3.Client, source string, rev int64) (map[string]co
resp *clientv3.GetResponse
err error
)
failpoint.Inject("FailToGetSourceCfg", func() {
failpoint.Return(scm, 0, context.DeadlineExceeded)
})
if source != "" {
resp, err = cli.Get(ctx, common.UpstreamConfigKeyAdapter.Encode(source), clientv3.WithRev(rev))
} else {
Expand Down