From 44892b2b004464a1caf3b12c85138db8e4f9951f Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Tue, 29 Oct 2024 18:02:58 +0800 Subject: [PATCH] dm: add retry for dm-worker to join dm-master (#11701) close pingcap/tiflow#4287 --- dm/worker/join.go | 81 +++++++++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 37 deletions(-) diff --git a/dm/worker/join.go b/dm/worker/join.go index 7b39bb79463..476e0077233 100644 --- a/dm/worker/join.go +++ b/dm/worker/join.go @@ -31,6 +31,8 @@ import ( "google.golang.org/grpc/credentials" ) +const retryTimes = 5 + // GetJoinURLs gets the endpoints from the join address. func GetJoinURLs(addrs string) []string { // TODO: handle pm1=xxxx:1234,pm2=xxxx:1234,pm3=xxxx:1234 @@ -63,46 +65,51 @@ func (s *Server) JoinMaster(endpoints []string) error { } var errorStr string - for _, endpoint := range endpoints { - ctx1, cancel1 := context.WithTimeout(ctx, 3*time.Second) - //nolint:staticcheck - conn, err := grpc.DialContext( - ctx1, - utils.UnwrapScheme(endpoint), - grpc.WithBlock(), - grpcTLS, - grpc.WithBackoffMaxDelay(3*time.Second), - ) - cancel1() - if err != nil { - if conn != nil { - conn.Close() + // retry to connect master + for i := 0; i < retryTimes; i++ { + for _, endpoint := range endpoints { + ctx1, cancel1 := context.WithTimeout(ctx, 3*time.Second) + //nolint:staticcheck + conn, err := grpc.DialContext( + ctx1, + utils.UnwrapScheme(endpoint), + grpc.WithBlock(), + grpcTLS, + grpc.WithBackoffMaxDelay(3*time.Second), + ) + cancel1() + if err != nil { + if conn != nil { + conn.Close() + } + log.L().Error("fail to dial dm-master", zap.String("endpoint", endpoint), zap.Error(err)) + errorStr = err.Error() + continue + } + client := pb.NewMasterClient(conn) + ctx1, cancel1 = context.WithTimeout(ctx, 3*time.Second) + resp, err := client.RegisterWorker(ctx1, req) + cancel1() + conn.Close() + if err != nil { + log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.Error(err)) + errorStr = err.Error() + continue + } + if !resp.GetResult() { + log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.String("error", resp.Msg)) + errorStr = resp.Msg + continue } - log.L().Error("fail to dial dm-master", zap.String("endpoint", endpoint), zap.Error(err)) - errorStr = err.Error() - continue - } - client := pb.NewMasterClient(conn) - ctx1, cancel1 = context.WithTimeout(ctx, 3*time.Second) - resp, err := client.RegisterWorker(ctx1, req) - cancel1() - conn.Close() - if err != nil { - log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.Error(err)) - errorStr = err.Error() - continue - } - if !resp.GetResult() { - log.L().Error("fail to register worker", zap.String("endpoint", endpoint), zap.String("error", resp.Msg)) - errorStr = resp.Msg - continue - } - // worker do calls decrypt, but the password is decrypted already, - // but in case we need it later, init it. - encrypt.InitCipher(resp.GetSecretKey()) + // worker do calls decrypt, but the password is decrypted already, + // but in case we need it later, init it. + encrypt.InitCipher(resp.GetSecretKey()) - return nil + return nil + } + log.L().Warn("retry to connect master", zap.Int("retry", i+1), zap.Int("total", retryTimes)) + time.Sleep(retryConnectSleepTime) } return terror.ErrWorkerFailConnectMaster.Generate(endpoints, errorStr) }