Skip to content

Commit

Permalink
Added member pkg for all etcd member manipulation functions
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronfern committed Jun 15, 2022
1 parent e848014 commit da446ef
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 200 deletions.
146 changes: 5 additions & 141 deletions pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,20 @@
package initializer

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/gardener/etcd-backup-restore/pkg/initializer/validator"
"github.com/gardener/etcd-backup-restore/pkg/member"
"github.com/gardener/etcd-backup-restore/pkg/metrics"
"github.com/gardener/etcd-backup-restore/pkg/miscellaneous"
"github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer"
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
)

Expand All @@ -46,39 +43,10 @@ import (
// - No snapshots are available, start etcd as a fresh installation.
func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int64) error {
start := time.Now()
if !e.IsMemberInCluster() {

//Add member as learner to cluster
memAddCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
memberURL := getMemberURL()
if memberURL == "" {
e.Logger.Warn("Could not fetch member URL")
}
for {
//Create etcd client
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid
InsecureTransport: true,
})
cli, _ := clientFactory.NewCluster()
_, err2 := cli.MemberAddAsLearner(memAddCtx, []string{memberURL})
if err2 != nil {
e.Logger.Warn("Error adding member as a learner: ", err2)
}
if err2 == nil || strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err2.Error()) {
break
//TODO: why not just return here?
}
e.Logger.Info("Could not as member as learner due to: ", err2)
e.Logger.Info("Trying again in 5 seconds")
timer := time.NewTimer(5 * time.Second)
<-timer.C
timer.Stop()
}

e.Logger.Info("Adding member to cluster as learner")

//Etcd cluster scale-up case
if !member.IsMemberInCluster(e.Logger) {
//return here as no restoration or validation needed
member.AddMemberAsLearner(e.Logger)
return nil
}

Expand Down Expand Up @@ -167,11 +135,6 @@ func (e *EtcdInitializer) restoreCorruptData() (bool, error) {
return e.restoreWithEmptySnapstore()
}

// if e.SingleMemberRestoreCase() {
// logger.Infof("Single member restore case")
// return e.restoreWithEmptySnapstore()
// }

tempRestoreOptions.BaseSnapshot = baseSnap
tempRestoreOptions.DeltaSnapList = deltaSnapList
tempRestoreOptions.Config.RestoreDataDir = fmt.Sprintf("%s.%s", tempRestoreOptions.Config.RestoreDataDir, "part")
Expand Down Expand Up @@ -238,102 +201,3 @@ func (e *EtcdInitializer) removeDir(dirname string) error {
}
return nil
}

// func (r *EtcdInitializer) SingleMemberRestoreCase() bool {
// r.Logger.Info("Starting single member restore case")
// clientSet, err := miscellaneous.GetKubernetesClientSetOrError()
// if err != nil {
// r.Logger.Errorf("failed to create clientset: %v", err)
// return false
// }
// r.Logger.Info("Single member restore case: k8s clientset created")

// //Fetch lease associated with member
// memberLease := &v1.Lease{}
// err = clientSet.Get(context.TODO(), controller_runtime_client.ObjectKey{
// Namespace: os.Getenv("POD_NAMESPACE"),
// Name: os.Getenv("POD_NAME"),
// }, memberLease)
// if err != nil {
// r.Logger.Errorf("error fetching lease: %v", err)
// }
// r.Logger.Info("Single member restore case: fetched lease")

// if memberLease.Spec.HolderIdentity != nil {
// r.Logger.Info("Single member restore case: lease holder ID is not nil")
// //Case of lease already existing
// //Assume case of single member restoration

// //Create etcd client
// clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
// Endpoints: []string{"http://etcd-main-peer.default.svc.cluster.local:2380"},
// InsecureTransport: true,
// })
// cli, _ := clientFactory.NewCluster()
// r.Logger.Info("Single member restore case: created etcd client")

// //remove self from cluster
// ctx3, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
// etcdList, err2 := cli.MemberList(ctx3)
// if err2 != nil {
// r.Logger.Warn("Could not list any etcd members")
// }
// cancel()
// r.Logger.Info("Single member restore case: etcd member list done")
// //mem := make(map[string]uint64)
// var peerURL []string
// for _, y := range etcdList.Members {
// if y.Name == os.Getenv("POD_NAME") {
// peerURL = y.PeerURLs
// cli.MemberRemove(context.TODO(), y.ID)
// r.Logger.Info("Single member restore case: member removed")
// break
// }
// }

// //Add self to the cluster
// ctx4, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
// cli.MemberAdd(ctx4, peerURL)
// cancel()
// r.Logger.Info("Single member restore case: member added")
// return true
// }

// return false
// }

func (r *EtcdInitializer) IsMemberInCluster() bool {
//Create etcd client
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc.cluster.local:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid
InsecureTransport: true, //TODO: is it right to use insecure transport?
})
cli, _ := clientFactory.NewCluster()
r.Logger.Info("Etcd client created")

// List members in cluster
memListCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
etcdMemberList, err := cli.MemberList(memListCtx)
cancel()
if err != nil {
r.Logger.Warn("Could not list any etcd members", err)
return true
}

for _, y := range etcdMemberList.Members {
if y.Name == os.Getenv("POD_NAME") {
return true
}
}

cli.Close()

return false
}

func getMemberURL() string {
//end := strings.Split(os.Getenv("ETCD_ENDPOINT"), "//") //TODO: use ETCD_ENDPOINT env var passed by druid
memberURL := "http://" + os.Getenv("POD_NAME") + ".etcd-main-peer.default.svc:2380"
//memberURL := end[0] + "//" + os.Getenv("POD_NAME") + "." + end[1]
return memberURL
}
155 changes: 155 additions & 0 deletions pkg/member/member_add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package member

import (
"context"
"os"
"strings"
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

const (
// RetryPeriod is the peroid after which an operation is retried
RetryPeriod time.Duration = 5 * time.Second
)

func AddMemberAsLearner(logger *logrus.Logger) error {
//Add member as learner to cluster
memberURL := getMemberURL()
if memberURL == "" {
logger.Warn("Could not fetch member URL")
}
for {
//Create etcd client
//TODO: use ETCD_ENDPOINT env var passed by druid and use secure transport
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid
InsecureTransport: true,
})

memAddCtx, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout)
cli, _ := clientFactory.NewCluster()
_, err := cli.MemberAddAsLearner(memAddCtx, []string{memberURL})
cancel()
cli.Close()

if err != nil {
logger.Warn("Error adding member as a learner: ", err)
}
if err == nil || strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err.Error()) {
logger.Info("Added member to cluster as a learner")
break //TODO: why not just return here?
}
if strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err.Error()) {
logger.Info("Member already part of etcd cluster")
break
}

logger.Info("Could not as member as learner due to: ", err)
logger.Info("Trying again in 5 seconds... ")
timer := time.NewTimer(RetryPeriod)
<-timer.C
timer.Stop()
}

return nil
}

func IsMemberInCluster(logger *logrus.Logger) bool {
//Create etcd client
// TODO: use ETCD_ENDPOINT env var passed by druid and use secure transport
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid
InsecureTransport: true, //TODO: is it right to use insecure transport?
})

// TODO: should use a retry mechanism here
cli, _ := clientFactory.NewCluster()
defer cli.Close()
logger.Info("Etcd client created")

// List members in cluster
memListCtx, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout)
etcdMemberList, err := cli.MemberList(memListCtx)
defer cancel()
if err != nil {
logger.Warn("Could not list any etcd members", err)
return true
}

for _, y := range etcdMemberList.Members {
if y.Name == os.Getenv("POD_NAME") {
return true
}
}

return false
}

func getMemberURL() string {
//end := strings.Split(os.Getenv("ETCD_ENDPOINT"), "//") //TODO: use ETCD_ENDPOINT env var passed by druid
memberURL := "http://" + os.Getenv("POD_NAME") + ".etcd-main-peer.default.svc:2380"
//memberURL := end[0] + "//" + os.Getenv("POD_NAME") + "." + end[1]
return memberURL
}

func PromoteMember(ctx context.Context, logger *logrus.Entry) {
for {
// TODO: use ETCD_ENDPOINT env var passed by druid and use secure transport
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //[]string{os.Getenv("ETCD_ENDPOINT")},
InsecureTransport: true,
})
cli, _ := clientFactory.NewCluster()

//List all members in the etcd cluster
//Member URL will appear in the memberlist call response as soon as the member has been added to the cluster as a learner
//However, the name of the member will appear only if the member has started running
memListCtx, memListCtxcancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout)
etcdList, memListErr := cli.MemberList(memListCtx)
memListCtxcancel()

if memListErr != nil {
logger.Info("error listing members: ", memListErr)
cli.Close()
continue
}

//TODO: Simplify logic below
var promoted bool
promoted = false
for _, y := range etcdList.Members {
if y.Name == os.Getenv("POD_NAME") {
logger.Info("Promoting member ", y.Name)
memPromoteCtx, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout)
cancel()
//Member promote call will succeed only if member is in sync with leader, and will error out otherwise
_, memPromoteErr := cli.MemberPromote(memPromoteCtx, y.ID)
if memPromoteErr == nil || strings.Contains(rpctypes.ErrGRPCMemberNotLearner.Error(), memPromoteErr.Error()) {
//Exit if member is successfully promoted or if member is not a learner
promoted = true
logger.Info("Member promoted ", y.Name, " : ", y.ID)
break
}
if strings.Contains(rpctypes.ErrGRPCMemberNotLearner.Error(), memPromoteErr.Error()) {
//Exit if member is already part of the cluster
promoted = true
logger.Info("Mmeber ", y.Name, " : ", y.ID, " already part of etcd cluster")
}
}
}
if promoted {
break
}

//Timer here so that the member promote loop doesn't execute too frequently
logger.Info("Member still catching up logs from leader. Retrying promotion...")
timer := time.NewTimer(RetryPeriod)
<-timer.C
timer.Stop()
}
}
Loading

0 comments on commit da446ef

Please sign in to comment.