Skip to content

Commit

Permalink
cherry pick pingcap#1405 to release-2.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
lance6716 authored and ti-srebot committed Feb 4, 2021
1 parent 2327f76 commit 51ff168
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 14 deletions.
19 changes: 16 additions & 3 deletions dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"

"github.com/BurntSushi/toml"
"github.com/pingcap/failpoint"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/log"
Expand All @@ -33,8 +34,18 @@ import (
// SampleConfigFile is sample config file of dm-worker
// later we can read it from dm/worker/dm-worker.toml
// and assign it to SampleConfigFile while we build dm-worker
var SampleConfigFile string
var defaultKeepAliveTTL = int64(10)
var (
SampleConfigFile string
defaultKeepAliveTTL = int64(60) // 1 minute
defaultRelayKeepAliveTTL = int64(60 * 30) // 30 minutes
)

func init() {
failpoint.Inject("defaultKeepAliveTTL", func(val failpoint.Value) {
i := val.(int)
defaultKeepAliveTTL = int64(i)
})
}

// NewConfig creates a new base config for worker.
func NewConfig() *Config {
Expand All @@ -55,6 +66,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: dm-master cluster's "${master-addr}")`)
fs.StringVar(&cfg.Name, "name", "", "human-readable name for DM-worker member")
fs.Int64Var(&cfg.KeepAliveTTL, "keepalive-ttl", defaultKeepAliveTTL, "dm-worker's TTL for keepalive with etcd (in seconds)")
fs.Int64Var(&cfg.RelayKeepAliveTTL, "relay-keepalive-ttl", defaultRelayKeepAliveTTL, "dm-worker's TTL for keepalive with etcd when handle relay enabled sources (in seconds)")

fs.StringVar(&cfg.SSLCA, "ssl-ca", "", "path of file that contains list of trusted SSL CAs for connection")
fs.StringVar(&cfg.SSLCert, "ssl-cert", "", "path of file that contains X509 certificate in PEM format for connection")
Expand All @@ -80,7 +92,8 @@ type Config struct {

ConfigFile string `toml:"config-file" json:"config-file"`
// TODO: in the future dm-workers should share a same ttl from dm-master
KeepAliveTTL int64 `toml:"keepalive-ttl" json:"keepalive-ttl"`
KeepAliveTTL int64 `toml:"keepalive-ttl" json:"keepalive-ttl"`
RelayKeepAliveTTL int64 `toml:"relay-keepalive-ttl" json:"relay-keepalive-ttl"`

// tls config
config.Security
Expand Down
12 changes: 12 additions & 0 deletions dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,15 @@ func (s *Server) KeepAlive() {
}
}
}

// UpdateKeepAliveTTL updates keepalive key with new lease TTL in place, to avoid watcher observe a DELETE event
// this function should not be concurrently called
func (s *Server) UpdateKeepAliveTTL(newTTL int64) {
if ha.CurrentKeepAliveTTL == newTTL {
log.L().Info("not changing keepalive TTL, skip", zap.Int64("ttl", newTTL))
return
}
ha.CurrentKeepAliveTTL = newTTL
ha.KeepAliveUpdateCh <- newTTL
log.L().Debug("received update keepalive TTL request, should be updated soon", zap.Int64("new ttl", newTTL))
}
2 changes: 2 additions & 0 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ func (s *Server) stopWorker(sourceID string) error {
s.Unlock()
return terror.ErrWorkerSourceNotMatch
}
s.UpdateKeepAliveTTL(s.cfg.KeepAliveTTL)
s.setWorker(nil, false)
s.Unlock()
w.Close()
Expand Down Expand Up @@ -588,6 +589,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {
return err
}
startRelay = !relayStage.IsDeleted && relayStage.Expect == pb.Stage_Running
s.UpdateKeepAliveTTL(s.cfg.RelayKeepAliveTTL)
}
go func() {
w.Start(startRelay)
Expand Down
4 changes: 3 additions & 1 deletion dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (t *testServer) TestServer(c *C) {
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.KeepAliveTTL = keepAliveTTL
cfg.RelayKeepAliveTTL = keepAliveTTL

NewRelayHolder = NewDummyRelayHolder
NewSubTask = func(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *SubTask {
Expand Down Expand Up @@ -238,6 +239,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
cfg := NewConfig()
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.KeepAliveTTL = keepAliveTTL
cfg.RelayKeepAliveTTL = keepAliveTTL

s := NewServer(cfg)
etcdCli, err := clientv3.New(clientv3.Config{
Expand Down Expand Up @@ -375,7 +377,7 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool)
func (t *testServer) testRetryConnectMaster(c *C, s *Server, ETCD *embed.Etcd, dir string, hostName string) *embed.Etcd {
ETCD.Close()
time.Sleep(6 * time.Second)
// When worker server fail to keepalive with etcd, sever should close its worker
// When worker server fail to keepalive with etcd, server should close its worker
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.getSourceStatus(true).Result, IsNil)
ETCD, err := createMockETCD(dir, "http://"+hostName)
Expand Down
2 changes: 2 additions & 0 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.Join = masterAddr
cfg.KeepAliveTTL = keepAliveTTL
cfg.RelayKeepAliveTTL = keepAliveTTL

etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: GetJoinURLs(cfg.Join),
Expand Down Expand Up @@ -344,6 +345,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) {
c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil)
cfg.Join = masterAddr
cfg.KeepAliveTTL = keepAliveTTL
cfg.RelayKeepAliveTTL = keepAliveTTL

etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: GetJoinURLs(cfg.Join),
Expand Down
73 changes: 65 additions & 8 deletions pkg/ha/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ha
import (
"context"
"encoding/json"
"sync/atomic"
"time"

"go.etcd.io/etcd/clientv3"
Expand All @@ -27,6 +28,13 @@ import (
"github.com/pingcap/dm/pkg/log"
)

var (
// CurrentKeepAliveTTL may be assigned to KeepAliveTTL or RelayKeepAliveTTL
CurrentKeepAliveTTL int64
// KeepAliveUpdateCh is used to notify keepalive TTL changing, in order to let watcher not see a DELETE of old key
KeepAliveUpdateCh = make(chan int64, 10)
)

// WorkerEvent represents the PUT/DELETE keepalive event of DM-worker.
type WorkerEvent struct {
WorkerName string `json:"worker-name"` // the worker name of the worker.
Expand Down Expand Up @@ -72,12 +80,13 @@ func workerEventFromKey(key string) (WorkerEvent, error) {
// this key will be kept in etcd until the worker is blocked or failed
// k/v: workerName -> join time.
func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, keepAliveTTL int64) error {
cliCtx, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
lease, err := cli.Grant(cliCtx, keepAliveTTL)
if err != nil {
return err
// TTL in KeepAliveUpdateCh has higher priority
for len(KeepAliveUpdateCh) > 0 {
keepAliveTTL = <-KeepAliveUpdateCh
}
// though in regular routine there's no concurrent KeepAlive, we need to handle tests
atomic.StoreInt64(&CurrentKeepAliveTTL, keepAliveTTL)

k := common.WorkerKeepAliveKeyAdapter.Encode(workerName)
workerEventJSON, err := WorkerEvent{
WorkerName: workerName,
Expand All @@ -86,19 +95,40 @@ func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, kee
if err != nil {
return err
}
_, err = cli.Put(cliCtx, k, workerEventJSON, clientv3.WithLease(lease.ID))

grantAndPutKV := func(k, v string, ttl int64) (clientv3.LeaseID, error) {
cliCtx, cancel := context.WithTimeout(ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
lease, err := cli.Grant(cliCtx, ttl)
if err != nil {
return 0, err
}
_, err = cli.Put(cliCtx, k, v, clientv3.WithLease(lease.ID))
if err != nil {
return 0, err
}
return lease.ID, nil
}

leaseID, err := grantAndPutKV(k, workerEventJSON, keepAliveTTL)
if err != nil {
return err
}

// once we put the key successfully, we should revoke lease before we quit keepalive normally
defer func() {
_, err2 := revokeLease(cli, lease.ID)
_, err2 := revokeLease(cli, leaseID)
if err2 != nil {
log.L().Warn("fail to revoke lease", zap.Error(err))
}
}()

ch, err := cli.KeepAlive(ctx, lease.ID)
keepAliveCtx, keepAliveCancel := context.WithCancel(ctx)
defer func() {
keepAliveCancel()
}()

ch, err := cli.KeepAlive(keepAliveCtx, leaseID)
if err != nil {
return err
}
Expand All @@ -107,11 +137,38 @@ func KeepAlive(ctx context.Context, cli *clientv3.Client, workerName string, kee
case _, ok := <-ch:
if !ok {
log.L().Info("keep alive channel is closed")
keepAliveCancel() // make go vet happy
return nil
}
case <-ctx.Done():
log.L().Info("ctx is canceled, keepalive will exit now")
keepAliveCancel() // make go vet happy
return nil
case newTTL := <-KeepAliveUpdateCh:
// create a new lease with new TTL, and overwrite original KV
oldLeaseID := leaseID
leaseID, err = grantAndPutKV(k, workerEventJSON, newTTL)
if err != nil {
keepAliveCancel() // make go vet happy
return err
}

oldCancel := keepAliveCancel
keepAliveCtx, keepAliveCancel = context.WithCancel(ctx)
ch, err = cli.KeepAlive(keepAliveCtx, leaseID)
if err != nil {
log.L().Error("meet error when change keepalive TTL", zap.Error(err))
keepAliveCancel() // make go vet happy
return err
}
log.L().Info("dynamically changed keepalive TTL to", zap.Int64("ttl in seconds", newTTL))

// after new keepalive is succeed, we cancel the old keepalive
oldCancel()
_, err2 := revokeLease(cli, oldLeaseID)
if err2 != nil {
log.L().Warn("fail to revoke lease", zap.Error(err))
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion tests/dmctl_basic/conf/get_worker1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ join = "http://127.0.0.1:8261"
worker-addr = "0.0.0.0:8262"
advertise-addr = "127.0.0.1:8262"
config-file = "/tmp/dm_test/dmctl_basic/worker1/dm-worker.toml"
keepalive-ttl = 10
keepalive-ttl = 60
relay-keepalive-ttl = 1800
ssl-ca = ""
ssl-cert = ""
ssl-key = ""
3 changes: 2 additions & 1 deletion tests/dmctl_basic/conf/get_worker2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ join = "http://127.0.0.1:8261"
worker-addr = "0.0.0.0:8263"
advertise-addr = "127.0.0.1:8263"
config-file = "/tmp/dm_test/dmctl_basic/worker2/dm-worker.toml"
keepalive-ttl = 10
keepalive-ttl = 60
relay-keepalive-ttl = 1800
ssl-ca = ""
ssl-cert = ""
ssl-key = ""
27 changes: 27 additions & 0 deletions tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,26 @@ function run() {
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/defaultKeepAliveTTL=return(1)"

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# test keepalive is changed by failpoint, so after 1 second DM master will know not alive
killall -9 dm-worker.test
sleep 3
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"\"stage\": \"offline\"" 2
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
Expand All @@ -29,6 +43,19 @@ function run() {
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# relay should be started after source bounded
sleep 1
# and now default keepalive TTL is 30 minutes
killall -9 dm-worker.test
sleep 3
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"\"stage\": \"bound\"" 2
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# start a task in `full` mode
echo "start task in full mode"
cat $cur/conf/dm-task.yaml > $WORK_DIR/dm-task.yaml
Expand Down

0 comments on commit 51ff168

Please sign in to comment.