From da446ef35327c3c54e6de80b974005d5eb6024f1 Mon Sep 17 00:00:00 2001 From: aaronfern Date: Mon, 13 Jun 2022 22:19:53 +0530 Subject: [PATCH] Added member pkg for all etcd member manipulation functions --- pkg/initializer/initializer.go | 146 +--------------------------- pkg/member/member_add.go | 155 ++++++++++++++++++++++++++++++ pkg/server/backuprestoreserver.go | 56 +---------- pkg/server/httpAPI.go | 13 +-- 4 files changed, 170 insertions(+), 200 deletions(-) create mode 100644 pkg/member/member_add.go diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index cc6758db0..42c0eefd5 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -15,15 +15,13 @@ package initializer import ( - "context" "fmt" "os" "path/filepath" - "strings" "time" - "github.com/gardener/etcd-backup-restore/pkg/etcdutil" "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" + "github.com/gardener/etcd-backup-restore/pkg/member" "github.com/gardener/etcd-backup-restore/pkg/metrics" "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" @@ -31,7 +29,6 @@ import ( brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" ) @@ -46,39 +43,10 @@ import ( // - No snapshots are available, start etcd as a fresh installation. func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int64) error { start := time.Now() - if !e.IsMemberInCluster() { - - //Add member as learner to cluster - memAddCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - defer cancel() - memberURL := getMemberURL() - if memberURL == "" { - e.Logger.Warn("Could not fetch member URL") - } - for { - //Create etcd client - clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ - Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid - InsecureTransport: true, - }) - cli, _ := clientFactory.NewCluster() - _, err2 := cli.MemberAddAsLearner(memAddCtx, []string{memberURL}) - if err2 != nil { - e.Logger.Warn("Error adding member as a learner: ", err2) - } - if err2 == nil || strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err2.Error()) { - break - //TODO: why not just return here? - } - e.Logger.Info("Could not as member as learner due to: ", err2) - e.Logger.Info("Trying again in 5 seconds") - timer := time.NewTimer(5 * time.Second) - <-timer.C - timer.Stop() - } - - e.Logger.Info("Adding member to cluster as learner") - + //Etcd cluster scale-up case + if !member.IsMemberInCluster(e.Logger) { + //return here as no restoration or validation needed + member.AddMemberAsLearner(e.Logger) return nil } @@ -167,11 +135,6 @@ func (e *EtcdInitializer) restoreCorruptData() (bool, error) { return e.restoreWithEmptySnapstore() } - // if e.SingleMemberRestoreCase() { - // logger.Infof("Single member restore case") - // return e.restoreWithEmptySnapstore() - // } - tempRestoreOptions.BaseSnapshot = baseSnap tempRestoreOptions.DeltaSnapList = deltaSnapList tempRestoreOptions.Config.RestoreDataDir = fmt.Sprintf("%s.%s", tempRestoreOptions.Config.RestoreDataDir, "part") @@ -238,102 +201,3 @@ func (e *EtcdInitializer) removeDir(dirname string) error { } return nil } - -// func (r *EtcdInitializer) SingleMemberRestoreCase() bool { -// r.Logger.Info("Starting single member restore case") -// clientSet, err := miscellaneous.GetKubernetesClientSetOrError() -// if err != nil { -// r.Logger.Errorf("failed to create clientset: %v", err) -// return false -// } -// r.Logger.Info("Single member restore case: k8s clientset created") - -// //Fetch lease associated with member -// memberLease := &v1.Lease{} -// err = clientSet.Get(context.TODO(), controller_runtime_client.ObjectKey{ -// Namespace: os.Getenv("POD_NAMESPACE"), -// Name: os.Getenv("POD_NAME"), -// }, memberLease) -// if err != nil { -// r.Logger.Errorf("error fetching lease: %v", err) -// } -// r.Logger.Info("Single member restore case: fetched lease") - -// if memberLease.Spec.HolderIdentity != nil { -// r.Logger.Info("Single member restore case: lease holder ID is not nil") -// //Case of lease already existing -// //Assume case of single member restoration - -// //Create etcd client -// clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ -// Endpoints: []string{"http://etcd-main-peer.default.svc.cluster.local:2380"}, -// InsecureTransport: true, -// }) -// cli, _ := clientFactory.NewCluster() -// r.Logger.Info("Single member restore case: created etcd client") - -// //remove self from cluster -// ctx3, cancel := context.WithTimeout(context.TODO(), 5*time.Second) -// etcdList, err2 := cli.MemberList(ctx3) -// if err2 != nil { -// r.Logger.Warn("Could not list any etcd members") -// } -// cancel() -// r.Logger.Info("Single member restore case: etcd member list done") -// //mem := make(map[string]uint64) -// var peerURL []string -// for _, y := range etcdList.Members { -// if y.Name == os.Getenv("POD_NAME") { -// peerURL = y.PeerURLs -// cli.MemberRemove(context.TODO(), y.ID) -// r.Logger.Info("Single member restore case: member removed") -// break -// } -// } - -// //Add self to the cluster -// ctx4, cancel := context.WithTimeout(context.TODO(), 5*time.Second) -// cli.MemberAdd(ctx4, peerURL) -// cancel() -// r.Logger.Info("Single member restore case: member added") -// return true -// } - -// return false -// } - -func (r *EtcdInitializer) IsMemberInCluster() bool { - //Create etcd client - clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ - Endpoints: []string{"http://etcd-main-peer.default.svc.cluster.local:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid - InsecureTransport: true, //TODO: is it right to use insecure transport? - }) - cli, _ := clientFactory.NewCluster() - r.Logger.Info("Etcd client created") - - // List members in cluster - memListCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - etcdMemberList, err := cli.MemberList(memListCtx) - cancel() - if err != nil { - r.Logger.Warn("Could not list any etcd members", err) - return true - } - - for _, y := range etcdMemberList.Members { - if y.Name == os.Getenv("POD_NAME") { - return true - } - } - - cli.Close() - - return false -} - -func getMemberURL() string { - //end := strings.Split(os.Getenv("ETCD_ENDPOINT"), "//") //TODO: use ETCD_ENDPOINT env var passed by druid - memberURL := "http://" + os.Getenv("POD_NAME") + ".etcd-main-peer.default.svc:2380" - //memberURL := end[0] + "//" + os.Getenv("POD_NAME") + "." + end[1] - return memberURL -} diff --git a/pkg/member/member_add.go b/pkg/member/member_add.go new file mode 100644 index 000000000..eefbb1c3b --- /dev/null +++ b/pkg/member/member_add.go @@ -0,0 +1,155 @@ +package member + +import ( + "context" + "os" + "strings" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" +) + +const ( + // RetryPeriod is the peroid after which an operation is retried + RetryPeriod time.Duration = 5 * time.Second +) + +func AddMemberAsLearner(logger *logrus.Logger) error { + //Add member as learner to cluster + memberURL := getMemberURL() + if memberURL == "" { + logger.Warn("Could not fetch member URL") + } + for { + //Create etcd client + //TODO: use ETCD_ENDPOINT env var passed by druid and use secure transport + clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ + Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid + InsecureTransport: true, + }) + + memAddCtx, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout) + cli, _ := clientFactory.NewCluster() + _, err := cli.MemberAddAsLearner(memAddCtx, []string{memberURL}) + cancel() + cli.Close() + + if err != nil { + logger.Warn("Error adding member as a learner: ", err) + } + if err == nil || strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err.Error()) { + logger.Info("Added member to cluster as a learner") + break //TODO: why not just return here? + } + if strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err.Error()) { + logger.Info("Member already part of etcd cluster") + break + } + + logger.Info("Could not as member as learner due to: ", err) + logger.Info("Trying again in 5 seconds... ") + timer := time.NewTimer(RetryPeriod) + <-timer.C + timer.Stop() + } + + return nil +} + +func IsMemberInCluster(logger *logrus.Logger) bool { + //Create etcd client + // TODO: use ETCD_ENDPOINT env var passed by druid and use secure transport + clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ + Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid + InsecureTransport: true, //TODO: is it right to use insecure transport? + }) + + // TODO: should use a retry mechanism here + cli, _ := clientFactory.NewCluster() + defer cli.Close() + logger.Info("Etcd client created") + + // List members in cluster + memListCtx, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout) + etcdMemberList, err := cli.MemberList(memListCtx) + defer cancel() + if err != nil { + logger.Warn("Could not list any etcd members", err) + return true + } + + for _, y := range etcdMemberList.Members { + if y.Name == os.Getenv("POD_NAME") { + return true + } + } + + return false +} + +func getMemberURL() string { + //end := strings.Split(os.Getenv("ETCD_ENDPOINT"), "//") //TODO: use ETCD_ENDPOINT env var passed by druid + memberURL := "http://" + os.Getenv("POD_NAME") + ".etcd-main-peer.default.svc:2380" + //memberURL := end[0] + "//" + os.Getenv("POD_NAME") + "." + end[1] + return memberURL +} + +func PromoteMember(ctx context.Context, logger *logrus.Entry) { + for { + // TODO: use ETCD_ENDPOINT env var passed by druid and use secure transport + clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ + Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //[]string{os.Getenv("ETCD_ENDPOINT")}, + InsecureTransport: true, + }) + cli, _ := clientFactory.NewCluster() + + //List all members in the etcd cluster + //Member URL will appear in the memberlist call response as soon as the member has been added to the cluster as a learner + //However, the name of the member will appear only if the member has started running + memListCtx, memListCtxcancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout) + etcdList, memListErr := cli.MemberList(memListCtx) + memListCtxcancel() + + if memListErr != nil { + logger.Info("error listing members: ", memListErr) + cli.Close() + continue + } + + //TODO: Simplify logic below + var promoted bool + promoted = false + for _, y := range etcdList.Members { + if y.Name == os.Getenv("POD_NAME") { + logger.Info("Promoting member ", y.Name) + memPromoteCtx, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout) + cancel() + //Member promote call will succeed only if member is in sync with leader, and will error out otherwise + _, memPromoteErr := cli.MemberPromote(memPromoteCtx, y.ID) + if memPromoteErr == nil || strings.Contains(rpctypes.ErrGRPCMemberNotLearner.Error(), memPromoteErr.Error()) { + //Exit if member is successfully promoted or if member is not a learner + promoted = true + logger.Info("Member promoted ", y.Name, " : ", y.ID) + break + } + if strings.Contains(rpctypes.ErrGRPCMemberNotLearner.Error(), memPromoteErr.Error()) { + //Exit if member is already part of the cluster + promoted = true + logger.Info("Mmeber ", y.Name, " : ", y.ID, " already part of etcd cluster") + } + } + } + if promoted { + break + } + + //Timer here so that the member promote loop doesn't execute too frequently + logger.Info("Member still catching up logs from leader. Retrying promotion...") + timer := time.NewTimer(RetryPeriod) + <-timer.C + timer.Stop() + } +} diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index d6639972a..be690b2bd 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -19,8 +19,6 @@ import ( "fmt" "net" "net/http" - "os" - "strings" "sync" "sync/atomic" "time" @@ -28,6 +26,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/backoff" "github.com/gardener/etcd-backup-restore/pkg/common" "github.com/gardener/etcd-backup-restore/pkg/leaderelection" + "github.com/gardener/etcd-backup-restore/pkg/member" "github.com/gardener/etcd-backup-restore/pkg/metrics" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" @@ -42,7 +41,6 @@ import ( "github.com/prometheus/client_golang/prometheus" cron "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/pkg/types" "k8s.io/apimachinery/pkg/util/clock" ) @@ -165,56 +163,8 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype handler := b.startHTTPServer(etcdInitializer, b.config.SnapstoreConfig.Provider, b.config.EtcdConnectionConfig, nil) defer handler.Stop() - //Promote self from a learner to a full member in the etcd cluster - for { //TODO: wise to have an infinite loop? Is there a risk of getting stuck unnecessarily here? - b.logger.Info("Promote loop") - clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ - Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //[]string{os.Getenv("ETCD_ENDPOINT")}, //TODO: Pass this via env var - InsecureTransport: true, - }) - cli, _ := clientFactory.NewCluster() - - //List all members in the etcd cluster - //Member URL will appear in the memberlist call response as soon as the member has been added to the cluster - //However, the name of the member will appear only if the member has started - ctx2, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - etcdList, memListErr := cli.MemberList(ctx2) - cancel() - - if memListErr != nil { - b.logger.Info("error listing members: ", memListErr) - continue - } - - //TODO: Simplify logic below - var promoted bool - promoted = false - for _, y := range etcdList.Members { - b.logger.Info("CHECK: ", y.Name, " : ", os.Getenv("POD_NAME")) - if y.Name == os.Getenv("POD_NAME") { - b.logger.Info("Promoting member ", y.Name) - ctx2, cancel := context.WithTimeout(context.TODO(), 15*time.Second) - defer cancel() - //Member promote call will succeed only if member is in sync with leader, and will error out otherwise - _, err3 := cli.MemberPromote(ctx2, y.ID) - if err3 == nil || strings.Contains(rpctypes.ErrGRPCMemberNotLearner.Error(), err3.Error()) { - //Exit if member is successfully promoted or if member is not a learner - promoted = true - b.logger.Info("Member promoted ", y.Name, " : ", y.ID) - break - } - cancel() - } - } - if promoted { - break - } - - //Timer here so that the member promote loop doesn't execute too frequently - timer := time.NewTimer(5 * time.Second) - <-timer.C - timer.Stop() - } + // Promotes member if it is a learner + member.PromoteMember(context.TODO(), b.logger) leaderCallbacks := &brtypes.LeaderCallbacks{ OnStartedLeading: func(leCtx context.Context) { diff --git a/pkg/server/httpAPI.go b/pkg/server/httpAPI.go index c5374dde7..981b9d61e 100644 --- a/pkg/server/httpAPI.go +++ b/pkg/server/httpAPI.go @@ -29,7 +29,6 @@ import ( "strings" "sync" "sync/atomic" - "time" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" "github.com/gardener/etcd-backup-restore/pkg/initializer" @@ -414,7 +413,7 @@ func (h *HTTPHandler) serveConfig(rw http.ResponseWriter, req *http.Request) { } // fetch pod name from env - ns := os.Getenv("POD_NAMESPACE") + podNS := os.Getenv("POD_NAMESPACE") podName := os.Getenv("POD_NAME") config["name"] = podName @@ -446,11 +445,12 @@ func (h *HTTPHandler) serveConfig(rw http.ResponseWriter, req *http.Request) { } curSts := &appsv1.StatefulSet{} errSts := clientSet.Get(context.TODO(), client.ObjectKey{ - Namespace: ns, - Name: "etcd-main", //TODO: derive sts name from pod name + Namespace: podNS, + Name: podName[:strings.LastIndex(podName, "-")], }, curSts) if errSts != nil { h.Logger.Warn("error fetching etcd sts ", errSts) + return } config["initial-cluster-state"] = "new" @@ -458,6 +458,7 @@ func (h *HTTPHandler) serveConfig(rw http.ResponseWriter, req *http.Request) { if *curSts.Spec.Replicas > 1 && *curSts.Spec.Replicas > curSts.Status.UpdatedReplicas { config["initial-cluster-state"] = "existing" } + // TODO Remove below prints fmt.Println("*curSts.Spec.Replicas is ", *curSts.Spec.Replicas) fmt.Println("curSts.Status.UpdatedReplicas is ", curSts.Status.UpdatedReplicas) @@ -471,7 +472,7 @@ func (h *HTTPHandler) serveConfig(rw http.ResponseWriter, req *http.Request) { if err != nil { h.Logger.Warnf("Error with NewCluster() : %v", err) } - ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout) defer cancel() memList, err := cli.MemberList(ctx) noOfMembers := 0 @@ -497,7 +498,7 @@ func (h *HTTPHandler) serveConfig(rw http.ResponseWriter, req *http.Request) { cluster = cluster + y + "," } } - + //remove trailing `,` config["initial-cluster"] = cluster[:len(cluster)-1] }