diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index cc6758db0..995278cf7 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -15,15 +15,13 @@ package initializer import ( - "context" "fmt" "os" "path/filepath" - "strings" "time" - "github.com/gardener/etcd-backup-restore/pkg/etcdutil" "github.com/gardener/etcd-backup-restore/pkg/initializer/validator" + "github.com/gardener/etcd-backup-restore/pkg/member" "github.com/gardener/etcd-backup-restore/pkg/metrics" "github.com/gardener/etcd-backup-restore/pkg/miscellaneous" "github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer" @@ -31,7 +29,6 @@ import ( brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" ) @@ -46,39 +43,10 @@ import ( // - No snapshots are available, start etcd as a fresh installation. func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int64) error { start := time.Now() - if !e.IsMemberInCluster() { - - //Add member as learner to cluster - memAddCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - defer cancel() - memberURL := getMemberURL() - if memberURL == "" { - e.Logger.Warn("Could not fetch member URL") - } - for { - //Create etcd client - clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ - Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid - InsecureTransport: true, - }) - cli, _ := clientFactory.NewCluster() - _, err2 := cli.MemberAddAsLearner(memAddCtx, []string{memberURL}) - if err2 != nil { - e.Logger.Warn("Error adding member as a learner: ", err2) - } - if err2 == nil || strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err2.Error()) { - break - //TODO: why not just return here? - } - e.Logger.Info("Could not as member as learner due to: ", err2) - e.Logger.Info("Trying again in 5 seconds") - timer := time.NewTimer(5 * time.Second) - <-timer.C - timer.Stop() - } - - e.Logger.Info("Adding member to cluster as learner") - + if !member.IsMemberInCluster(e.Logger) { + //Etcd cluster scale-up case + //return here as no restoration or validation needed + member.AddMemberAsLearner(e.Logger) return nil } @@ -301,39 +269,3 @@ func (e *EtcdInitializer) removeDir(dirname string) error { // return false // } - -func (r *EtcdInitializer) IsMemberInCluster() bool { - //Create etcd client - clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ - Endpoints: []string{"http://etcd-main-peer.default.svc.cluster.local:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid - InsecureTransport: true, //TODO: is it right to use insecure transport? - }) - cli, _ := clientFactory.NewCluster() - r.Logger.Info("Etcd client created") - - // List members in cluster - memListCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - etcdMemberList, err := cli.MemberList(memListCtx) - cancel() - if err != nil { - r.Logger.Warn("Could not list any etcd members", err) - return true - } - - for _, y := range etcdMemberList.Members { - if y.Name == os.Getenv("POD_NAME") { - return true - } - } - - cli.Close() - - return false -} - -func getMemberURL() string { - //end := strings.Split(os.Getenv("ETCD_ENDPOINT"), "//") //TODO: use ETCD_ENDPOINT env var passed by druid - memberURL := "http://" + os.Getenv("POD_NAME") + ".etcd-main-peer.default.svc:2380" - //memberURL := end[0] + "//" + os.Getenv("POD_NAME") + "." + end[1] - return memberURL -} diff --git a/pkg/member/member_add.go b/pkg/member/member_add.go new file mode 100644 index 000000000..238fa273d --- /dev/null +++ b/pkg/member/member_add.go @@ -0,0 +1,138 @@ +package member + +import ( + "context" + "os" + "strings" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + brtypes "github.com/gardener/etcd-backup-restore/pkg/types" + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" +) + +func AddMemberAsLearner(logger *logrus.Logger) error { + //Add member as learner to cluster + // memAddCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + // defer cancel() + memberURL := getMemberURL() + if memberURL == "" { + logger.Warn("Could not fetch member URL") + } + for { + //Create etcd client + clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ + Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid + InsecureTransport: true, + }) + cli, _ := clientFactory.NewCluster() + memAddCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + _, err2 := cli.MemberAddAsLearner(memAddCtx, []string{memberURL}) + cancel() + if err2 != nil { + logger.Warn("Error adding member as a learner: ", err2) + } + if err2 == nil || strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err2.Error()) { + break + //TODO: why not just return here? + } + logger.Info("Could not as member as learner due to: ", err2) + logger.Info("Trying again in 5 seconds") + cli.Close() + timer := time.NewTimer(5 * time.Second) + <-timer.C + timer.Stop() + } + + logger.Info("Adding member to cluster as learner") + + return nil +} + +func IsMemberInCluster(logger *logrus.Logger) bool { + //Create etcd client + clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ + Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid + InsecureTransport: true, //TODO: is it right to use insecure transport? + }) + cli, _ := clientFactory.NewCluster() + logger.Info("Etcd client created") + + // List members in cluster + memListCtx, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout) + etcdMemberList, err := cli.MemberList(memListCtx) + cancel() + if err != nil { + logger.Warn("Could not list any etcd members", err) + return true + } + + for _, y := range etcdMemberList.Members { + if y.Name == os.Getenv("POD_NAME") { + return true + } + } + + cli.Close() + + return false +} + +func getMemberURL() string { + //end := strings.Split(os.Getenv("ETCD_ENDPOINT"), "//") //TODO: use ETCD_ENDPOINT env var passed by druid + memberURL := "http://" + os.Getenv("POD_NAME") + ".etcd-main-peer.default.svc:2380" + //memberURL := end[0] + "//" + os.Getenv("POD_NAME") + "." + end[1] + return memberURL +} + +func PromoteMember(ctx context.Context, logger *logrus.Entry) { + for { //TODO: wise to have an infinite loop? Is there a risk of getting stuck unnecessarily here? + clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ + Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //[]string{os.Getenv("ETCD_ENDPOINT")}, //TODO: Pass this via env var + InsecureTransport: true, + }) + cli, _ := clientFactory.NewCluster() + + //List all members in the etcd cluster + //Member URL will appear in the memberlist call response as soon as the member has been added to the cluster + //However, the name of the member will appear only if the member has started + ctx2, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout) + etcdList, memListErr := cli.MemberList(ctx2) + cancel() + + if memListErr != nil { + logger.Info("error listing members: ", memListErr) + continue + } + + //TODO: Simplify logic below + var promoted bool + promoted = false + for _, y := range etcdList.Members { + logger.Info("CHECK: ", y.Name, " : ", os.Getenv("POD_NAME")) + if y.Name == os.Getenv("POD_NAME") { + logger.Info("Promoting member ", y.Name) + ctx2, cancel := context.WithTimeout(context.TODO(), 15*time.Second) + defer cancel() + //Member promote call will succeed only if member is in sync with leader, and will error out otherwise + _, err3 := cli.MemberPromote(ctx2, y.ID) + if err3 == nil || strings.Contains(rpctypes.ErrGRPCMemberNotLearner.Error(), err3.Error()) { + //Exit if member is successfully promoted or if member is not a learner + promoted = true + logger.Info("Member promoted ", y.Name, " : ", y.ID) + break + } + cancel() + } + } + if promoted { + break + } + + //Timer here so that the member promote loop doesn't execute too frequently + timer := time.NewTimer(5 * time.Second) + <-timer.C + timer.Stop() + } +} diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index d6639972a..2d28cba49 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -19,8 +19,6 @@ import ( "fmt" "net" "net/http" - "os" - "strings" "sync" "sync/atomic" "time" @@ -28,6 +26,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/backoff" "github.com/gardener/etcd-backup-restore/pkg/common" "github.com/gardener/etcd-backup-restore/pkg/leaderelection" + "github.com/gardener/etcd-backup-restore/pkg/member" "github.com/gardener/etcd-backup-restore/pkg/metrics" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" @@ -42,7 +41,6 @@ import ( "github.com/prometheus/client_golang/prometheus" cron "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/pkg/types" "k8s.io/apimachinery/pkg/util/clock" ) @@ -165,56 +163,8 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype handler := b.startHTTPServer(etcdInitializer, b.config.SnapstoreConfig.Provider, b.config.EtcdConnectionConfig, nil) defer handler.Stop() - //Promote self from a learner to a full member in the etcd cluster - for { //TODO: wise to have an infinite loop? Is there a risk of getting stuck unnecessarily here? - b.logger.Info("Promote loop") - clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{ - Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //[]string{os.Getenv("ETCD_ENDPOINT")}, //TODO: Pass this via env var - InsecureTransport: true, - }) - cli, _ := clientFactory.NewCluster() - - //List all members in the etcd cluster - //Member URL will appear in the memberlist call response as soon as the member has been added to the cluster - //However, the name of the member will appear only if the member has started - ctx2, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - etcdList, memListErr := cli.MemberList(ctx2) - cancel() - - if memListErr != nil { - b.logger.Info("error listing members: ", memListErr) - continue - } - - //TODO: Simplify logic below - var promoted bool - promoted = false - for _, y := range etcdList.Members { - b.logger.Info("CHECK: ", y.Name, " : ", os.Getenv("POD_NAME")) - if y.Name == os.Getenv("POD_NAME") { - b.logger.Info("Promoting member ", y.Name) - ctx2, cancel := context.WithTimeout(context.TODO(), 15*time.Second) - defer cancel() - //Member promote call will succeed only if member is in sync with leader, and will error out otherwise - _, err3 := cli.MemberPromote(ctx2, y.ID) - if err3 == nil || strings.Contains(rpctypes.ErrGRPCMemberNotLearner.Error(), err3.Error()) { - //Exit if member is successfully promoted or if member is not a learner - promoted = true - b.logger.Info("Member promoted ", y.Name, " : ", y.ID) - break - } - cancel() - } - } - if promoted { - break - } - - //Timer here so that the member promote loop doesn't execute too frequently - timer := time.NewTimer(5 * time.Second) - <-timer.C - timer.Stop() - } + //Promote member if it is a learner + member.PromoteMember(context.TODO(), b.logger) leaderCallbacks := &brtypes.LeaderCallbacks{ OnStartedLeading: func(leCtx context.Context) {