Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1396 to release-2.0 (#1425)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Feb 4, 2021
1 parent 2327f76 commit 2199abe
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 23 deletions.
4 changes: 2 additions & 2 deletions dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *Server) KeepAlive() {
})

{
err1 := ha.KeepAlive(s.ctx, s.etcdClient, s.cfg.Name, s.cfg.KeepAliveTTL)
err1 := ha.KeepAlive(s.kaCtx, s.etcdClient, s.cfg.Name, s.cfg.KeepAliveTTL)
log.L().Warn("keepalive with master goroutine paused", zap.Error(err1))
}

Expand All @@ -110,7 +110,7 @@ func (s *Server) KeepAlive() {
return // return if failed to stop the worker.
}
select {
case <-s.ctx.Done():
case <-s.kaCtx.Done():
log.L().Info("keepalive with master goroutine exited!")
return
case <-time.After(retryConnectSleepTime):
Expand Down
70 changes: 50 additions & 20 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ import (
)

var (
cmuxReadTimeout = 10 * time.Second
dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryConnectSleepTime = time.Second
syncMasterEndpointsTime = 3 * time.Second
getMinLocForSubTaskFunc = getMinLocForSubTask
cmuxReadTimeout = 10 * time.Second
dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryGetSourceBoundConfig = 5
retryConnectSleepTime = time.Second
syncMasterEndpointsTime = 3 * time.Second
getMinLocForSubTaskFunc = getMinLocForSubTask
)

// Server accepts RPC requests
Expand All @@ -58,10 +59,14 @@ var (
type Server struct {
sync.Mutex
wg sync.WaitGroup
kaWg sync.WaitGroup
closed sync2.AtomicBool
ctx context.Context
cancel context.CancelFunc

kaCtx context.Context
kaCancel context.CancelFunc

cfg *Config

rootLis net.Listener
Expand Down Expand Up @@ -137,21 +142,19 @@ func (s *Server) Start() error {
s.wg.Done()
}()

s.wg.Add(1)
go func() {
defer s.wg.Done()
// TODO: handle fatal error from observeSourceBound
//nolint:errcheck
s.observeSourceBound(s.ctx, revBound)
}()
s.startKeepAlive()

s.wg.Add(1)
go func() {
go func(ctx context.Context) {
defer s.wg.Done()
// worker keepalive with master
// If worker loses connect from master, it would stop all task and try to connect master again.
s.KeepAlive()
}()
for {
err1 := s.observeSourceBound(ctx, revBound)
if err1 == nil {
return
}
s.restartKeepAlive()
}
}(s.ctx)

// create a cmux
m := cmux.New(s.rootLis)
Expand Down Expand Up @@ -190,6 +193,29 @@ func (s *Server) Start() error {
return terror.ErrWorkerStartService.Delegate(err)
}

// worker keepalive with master
// If worker loses connect from master, it would stop all task and try to connect master again.
func (s *Server) startKeepAlive() {
s.kaWg.Add(1)
s.kaCtx, s.kaCancel = context.WithCancel(s.ctx)
go s.doStartKeepAlive()
}

func (s *Server) doStartKeepAlive() {
defer s.kaWg.Done()
s.KeepAlive()
}

func (s *Server) stopKeepAlive() {
s.kaCancel()
s.kaWg.Wait()
}

func (s *Server) restartKeepAlive() {
s.stopKeepAlive()
s.startKeepAlive()
}

func (s *Server) syncMasterEndpoints(ctx context.Context) {
lastClientUrls := []string{}
clientURLs := []string{}
Expand Down Expand Up @@ -257,6 +283,10 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
bound, cfg, rev1, err1 := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err1 != nil {
log.L().Error("get source bound from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum))
retryNum++
if retryNum > retryGetSourceBoundConfig && etcdutil.IsLimitedRetryableError(err1) {
return err1
}
break
}
rev = rev1
Expand All @@ -282,7 +312,6 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}
}
}
retryNum++
}
} else {
if err != nil {
Expand Down Expand Up @@ -326,6 +355,7 @@ func (s *Server) doClose() {
// Close close the RPC server, this function can be called multiple times
func (s *Server) Close() {
s.doClose()
s.stopKeepAlive()
s.wg.Wait()
}

Expand Down
123 changes: 123 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/siddontang/go-mysql/mysql"
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -225,6 +226,128 @@ func (t *testServer) TestServer(c *C) {
t.testWorker(c)
}

func (t *testServer) TestHandleSourceBoundAfterError(c *C) {
var (
masterAddr = "127.0.0.1:8261"
keepAliveTTL = int64(1)
)
// start etcd server
etcdDir := c.MkDir()
ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr)
c.Assert(err, IsNil)
defer ETCD.Close()
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.KeepAliveTTL = keepAliveTTL

// new etcd client
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: GetJoinURLs(cfg.Join),
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
})
c.Assert(err, IsNil)

// watch worker event(oneline or offline)
var (
wg sync.WaitGroup
startRev int64 = 1
)
workerEvCh := make(chan ha.WorkerEvent, 10)
workerErrCh := make(chan error, 10)
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer func() {
close(workerEvCh)
close(workerErrCh)
wg.Done()
}()
ha.WatchWorkerEvent(ctx, etcdCli, startRev, workerEvCh, workerErrCh)
}()

// start worker server
s := NewServer(cfg)
defer s.Close()
go func() {
err1 := s.Start()
c.Assert(err1, IsNil)
}()
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return !s.closed.Get()
}), IsTrue)

// check if the worker is online
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
select {
case ev := <-workerEvCh:
if !ev.IsDeleted {
return true
}
default:
}
return false
}), IsTrue)

// enable failpont
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/ha/FailToGetSourceCfg", `return(true)`), IsNil)
sourceCfg := loadSourceConfigWithoutPassword(c)
sourceCfg.EnableRelay = false
_, err = ha.PutSourceCfg(etcdCli, sourceCfg)
c.Assert(err, IsNil)
sourceBound := ha.NewSourceBound(sourceCfg.SourceID, s.cfg.Name)
_, err = ha.PutSourceBound(etcdCli, sourceBound)
c.Assert(err, IsNil)

// do check until worker offline
c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool {
select {
case ev := <-workerEvCh:
if ev.IsDeleted {
return true
}
default:
}
return false
}), IsTrue)

// check if the worker is online
c.Assert(utils.WaitSomething(5, time.Duration(s.cfg.KeepAliveTTL)*time.Second, func() bool {
select {
case ev := <-workerEvCh:
if !ev.IsDeleted {
return true
}
default:
}
return false
}), IsTrue)

// stop watching and disable failpoint
cancel()
wg.Wait()
c.Assert(failpoint.Disable("github.com/pingcap/dm/pkg/ha/FailToGetSourceCfg"), IsNil)

_, err = ha.PutSourceBound(etcdCli, sourceBound)
c.Assert(err, IsNil)
_, err = ha.PutSourceCfg(etcdCli, sourceCfg)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
s.Mutex.Lock()
defer s.Mutex.Unlock()
return s.worker != nil
}), IsTrue)

_, err = ha.DeleteSourceBound(etcdCli, s.cfg.Name)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
s.Mutex.Lock()
defer s.Mutex.Unlock()
return s.worker == nil
}), IsTrue)
}

func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
var (
masterAddr = "127.0.0.1:8261"
Expand Down
12 changes: 11 additions & 1 deletion pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,17 @@ func DoOpsInOneCmpsTxnWithRetry(cli *clientv3.Client, cmps []clientv3.Cmp, opsTh
// IsRetryableError check whether error is retryable error for etcd to build again
func IsRetryableError(err error) bool {
switch errors.Cause(err) {
case v3rpc.ErrCompacted, v3rpc.ErrNoLeader, v3rpc.ErrNoSpace:
case v3rpc.ErrCompacted, v3rpc.ErrNoLeader, v3rpc.ErrNoSpace, context.DeadlineExceeded:
return true
default:
return false
}
}

// IsLimitedRetryableError check whether error is retryable error for etcd to build again in a limited number of times
func IsLimitedRetryableError(err error) bool {
switch errors.Cause(err) {
case v3rpc.ErrNoSpace, context.DeadlineExceeded:
return true
default:
return false
Expand Down
4 changes: 4 additions & 0 deletions pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"time"

"github.com/pingcap/failpoint"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -119,6 +120,9 @@ func GetSourceBound(cli *clientv3.Client, worker string) (map[string]SourceBound
resp *clientv3.GetResponse
err error
)
failpoint.Inject("FailToGetSourceCfg", func() {
failpoint.Return(sbm, 0, context.DeadlineExceeded)
})
if worker != "" {
resp, err = cli.Get(ctx, common.UpstreamBoundWorkerKeyAdapter.Encode(worker))
} else {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ha/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ha
import (
"context"

"github.com/pingcap/failpoint"
"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/dm/common"
Expand Down Expand Up @@ -50,6 +51,9 @@ func GetSourceCfg(cli *clientv3.Client, source string, rev int64) (map[string]co
resp *clientv3.GetResponse
err error
)
failpoint.Inject("FailToGetSourceCfg", func() {
failpoint.Return(scm, 0, context.DeadlineExceeded)
})
if source != "" {
resp, err = cli.Get(ctx, common.UpstreamConfigKeyAdapter.Encode(source), clientv3.WithRev(rev))
} else {
Expand Down

0 comments on commit 2199abe

Please sign in to comment.