Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
dmctl: support and sync mutiple endpoints (#1333) (#1349)
Browse files Browse the repository at this point in the history
Co-authored-by: GMHDBJD <[email protected]>
Co-authored-by: lance6716 <[email protected]>
Co-authored-by: gmhdbjd <[email protected]>
  • Loading branch information
4 people authored Dec 24, 2020
1 parent c8a2a89 commit 3dec335
Show file tree
Hide file tree
Showing 23 changed files with 370 additions and 150 deletions.
5 changes: 5 additions & 0 deletions cmd/dm-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -195,6 +196,10 @@ func interactionMode() {
}
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go common.SyncMasterEndpoints(ctx)

loop()

fmt.Println("dmctl exit")
Expand Down
17 changes: 14 additions & 3 deletions dm/ctl/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"fmt"
"net"
"strings"
"time"

"github.com/pingcap/dm/dm/config"
Expand All @@ -39,6 +40,11 @@ const (
Master = "master"
// Worker specifies member worker type
Worker = "worker"

dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
syncMasterEndpointsTime = 3 * time.Second
)

// NewConfig creates a new base config for dmctl.
Expand Down Expand Up @@ -139,7 +145,7 @@ func (c *Config) Parse(arguments []string) (finish bool, err error) {
}

if c.MasterAddr == "" {
return false, flag.ErrHelp
return false, errors.Errorf("--master-addr not provided, use --help to see help messages")
}

return false, errors.Trace(c.adjust())
Expand Down Expand Up @@ -180,6 +186,11 @@ func (c *Config) adjust() error {

// validate host:port format address
func validateAddr(addr string) error {
_, _, err := net.SplitHostPort(addr)
return errors.Trace(err)
endpoints := strings.Split(addr, ",")
for _, endpoint := range endpoints {
if _, _, err := net.SplitHostPort(utils.UnwrapScheme(endpoint)); err != nil {
return errors.Trace(err)
}
}
return nil
}
16 changes: 11 additions & 5 deletions dm/ctl/common/operate_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ import (
func OperateRelay(op pb.RelayOp, workers []string) (*pb.OperateWorkerRelayResponse, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := MasterClient()
return cli.OperateWorkerRelayTask(ctx, &pb.OperateWorkerRelayRequest{
Op: op,
Sources: workers,
})
resp := &pb.OperateWorkerRelayResponse{}
err := SendRequest(
ctx,
"OperateWorkerRelayTask",
&pb.OperateWorkerRelayRequest{
Op: op,
Sources: workers,
},
&resp,
)
return resp, err
}
19 changes: 13 additions & 6 deletions dm/ctl/common/operate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ import (
func OperateTask(op pb.TaskOp, name string, sources []string) (*pb.OperateTaskResponse, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := MasterClient()
return cli.OperateTask(ctx, &pb.OperateTaskRequest{
Op: op,
Name: name,
Sources: sources,
})

resp := &pb.OperateTaskResponse{}
err := SendRequest(
ctx,
"OperateTask",
&pb.OperateTaskRequest{
Op: op,
Name: name,
Sources: sources,
},
&resp,
)
return resp, err
}
135 changes: 124 additions & 11 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
package common

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"reflect"
"strings"
"sync"
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"go.etcd.io/etcd/clientv3"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
Expand All @@ -33,13 +37,83 @@ import (
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
masterClient pb.MasterClient
globalConfig = &Config{}
ctlClient = &CtlClient{}
)

// CtlClient used to get master client for dmctl
type CtlClient struct {
mu sync.RWMutex
tls *toolutils.TLS
etcdClient *clientv3.Client
conn *grpc.ClientConn
masterClient pb.MasterClient
}

func (c *CtlClient) updateMasterClient() error {
var (
err error
conn *grpc.ClientConn
)

c.mu.Lock()
defer c.mu.Unlock()

if c.conn != nil {
c.conn.Close()
}

endpoints := c.etcdClient.Endpoints()
for _, endpoint := range endpoints {
//nolint:staticcheck
conn, err = grpc.Dial(utils.UnwrapScheme(endpoint), c.tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
if err == nil {
c.conn = conn
c.masterClient = pb.NewMasterClient(conn)
return nil
}
}
return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", strings.Join(endpoints, ","))
}

func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
c.mu.RLock()
defer c.mu.RUnlock()

params := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(req)}
results := reflect.ValueOf(c.masterClient).MethodByName(reqName).Call(params)

reflect.ValueOf(respPointer).Elem().Set(results[0])
errInterface := results[1].Interface()
// nil can't pass type conversion, so we handle it separately
if errInterface == nil {
return nil
}
return errInterface.(error)
}

// SendRequest send request to master
func SendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
err := ctlClient.sendRequest(ctx, reqName, req, respPointer)
if err == nil || status.Code(err) != codes.Unavailable {
return err
}

// update master client
err = ctlClient.updateMasterClient()
if err != nil {
return err
}

// sendRequest again
return ctlClient.sendRequest(ctx, reqName, req, respPointer)
}

// InitUtils inits necessary dmctl utils
func InitUtils(cfg *Config) error {
globalConfig = cfg
Expand All @@ -53,25 +127,31 @@ func InitClient(addr string, securityCfg config.Security) error {
return terror.ErrCtlInvalidTLSCfg.Delegate(err)
}

//nolint:staticcheck
conn, err := grpc.Dial(addr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
endpoints := strings.Split(addr, ",")
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
TLS: tls.TLSConfig(),
})
if err != nil {
return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", addr)
return err
}
masterClient = pb.NewMasterClient(conn)
return nil

ctlClient = &CtlClient{
tls: tls,
etcdClient: etcdClient,
}

return ctlClient.updateMasterClient()
}

// GlobalConfig returns global dmctl config
func GlobalConfig() *Config {
return globalConfig
}

// MasterClient returns dm-master client
func MasterClient() pb.MasterClient {
return masterClient
}

// PrintLines adds a wrap to support `\n` within `chzyer/readline`
func PrintLines(format string, a ...interface{}) {
fmt.Println(fmt.Sprintf(format, a...))
Expand Down Expand Up @@ -234,3 +314,36 @@ func PrintCmdUsage(cmd *cobra.Command) {
fmt.Println("can't output command's usage:", err)
}
}

// SyncMasterEndpoints sync masters' endpoints
func SyncMasterEndpoints(ctx context.Context) {
lastClientUrls := []string{}
clientURLs := []string{}
updateF := func() {
clientURLs = clientURLs[:0]
resp, err := ctlClient.etcdClient.MemberList(ctx)
if err != nil {
return
}

for _, m := range resp.Members {
clientURLs = append(clientURLs, m.GetClientURLs()...)
}
if utils.NonRepeatStringsEqual(clientURLs, lastClientUrls) {
return
}
ctlClient.etcdClient.SetEndpoints(clientURLs...)
lastClientUrls = make([]string, len(clientURLs))
copy(lastClientUrls, clientURLs)
}

for {
updateF()

select {
case <-ctx.Done():
return
case <-time.After(syncMasterEndpointsTime):
}
}
}
14 changes: 10 additions & 4 deletions dm/ctl/master/check_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,16 @@ func checkTaskFunc(cmd *cobra.Command, _ []string) (err error) {
defer cancel()

// start task
cli := common.MasterClient()
resp, err := cli.CheckTask(ctx, &pb.CheckTaskRequest{
Task: string(content),
})
resp := &pb.CheckTaskResponse{}
err = common.SendRequest(
ctx,
"CheckTask",
&pb.CheckTaskRequest{
Task: string(content),
},
&resp,
)

if err != nil {
return
}
Expand Down
15 changes: 10 additions & 5 deletions dm/ctl/master/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,19 @@ func getCfgFunc(cmd *cobra.Command, _ []string) (err error) {
return
}

cli := common.MasterClient()
ctx, cancel := context.WithTimeout(context.Background(), common.GlobalConfig().RPCTimeout)
defer cancel()

resp, err := cli.GetCfg(ctx, &pb.GetCfgRequest{
Type: tp,
Name: cfgName,
})
resp := &pb.GetCfgResponse{}
err = common.SendRequest(
ctx,
"GetCfg",
&pb.GetCfgRequest{
Type: tp,
Name: cfgName,
},
&resp,
)
if err != nil {
common.PrintLines("can not get %s config of %s", cfgType, cfgName)
return
Expand Down
24 changes: 15 additions & 9 deletions dm/ctl/master/handle_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,21 @@ func handleErrorFunc(cmd *cobra.Command, _ []string) (err error) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := common.MasterClient()

resp, err := cli.HandleError(ctx, &pb.HandleErrorRequest{
Op: op,
Task: taskName,
BinlogPos: binlogPos,
Sqls: sqls,
Sources: sources,
})

resp := &pb.HandleErrorResponse{}
err = common.SendRequest(
ctx,
"HandleError",
&pb.HandleErrorRequest{
Op: op,
Task: taskName,
BinlogPos: binlogPos,
Sqls: sqls,
Sources: sources,
},
&resp,
)

if err != nil {
return
}
Expand Down
18 changes: 11 additions & 7 deletions dm/ctl/master/list_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,17 @@ func listMemberFunc(cmd *cobra.Command, _ []string) (err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cli := common.MasterClient()
resp, err := cli.ListMember(ctx, &pb.ListMemberRequest{
Leader: leader,
Master: master,
Worker: worker,
Names: listMemberFlags.names,
})
resp := &pb.ListMemberResponse{}
err = common.SendRequest(ctx,
"ListMember",
&pb.ListMemberRequest{
Leader: leader,
Master: master,
Worker: worker,
Names: listMemberFlags.names,
},
&resp,
)

if err != nil {
return
Expand Down
Loading

0 comments on commit 3dec335

Please sign in to comment.