Skip to content

Commit

Permalink
Added member pkg for all etcd member manipulation functions
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronfern committed Jun 13, 2022
1 parent e848014 commit 3b8c033
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 126 deletions.
78 changes: 5 additions & 73 deletions pkg/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,20 @@
package initializer

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/gardener/etcd-backup-restore/pkg/initializer/validator"
"github.com/gardener/etcd-backup-restore/pkg/member"
"github.com/gardener/etcd-backup-restore/pkg/metrics"
"github.com/gardener/etcd-backup-restore/pkg/miscellaneous"
"github.com/gardener/etcd-backup-restore/pkg/snapshot/restorer"
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
)

Expand All @@ -46,39 +43,10 @@ import (
// - No snapshots are available, start etcd as a fresh installation.
func (e *EtcdInitializer) Initialize(mode validator.Mode, failBelowRevision int64) error {
start := time.Now()
if !e.IsMemberInCluster() {

//Add member as learner to cluster
memAddCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
memberURL := getMemberURL()
if memberURL == "" {
e.Logger.Warn("Could not fetch member URL")
}
for {
//Create etcd client
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid
InsecureTransport: true,
})
cli, _ := clientFactory.NewCluster()
_, err2 := cli.MemberAddAsLearner(memAddCtx, []string{memberURL})
if err2 != nil {
e.Logger.Warn("Error adding member as a learner: ", err2)
}
if err2 == nil || strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err2.Error()) {
break
//TODO: why not just return here?
}
e.Logger.Info("Could not as member as learner due to: ", err2)
e.Logger.Info("Trying again in 5 seconds")
timer := time.NewTimer(5 * time.Second)
<-timer.C
timer.Stop()
}

e.Logger.Info("Adding member to cluster as learner")

if !member.IsMemberInCluster(e.Logger) {
//Etcd cluster scale-up case
//return here as no restoration or validation needed
member.AddMemberAsLearner(e.Logger)
return nil
}

Expand Down Expand Up @@ -301,39 +269,3 @@ func (e *EtcdInitializer) removeDir(dirname string) error {

// return false
// }

func (r *EtcdInitializer) IsMemberInCluster() bool {
//Create etcd client
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc.cluster.local:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid
InsecureTransport: true, //TODO: is it right to use insecure transport?
})
cli, _ := clientFactory.NewCluster()
r.Logger.Info("Etcd client created")

// List members in cluster
memListCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
etcdMemberList, err := cli.MemberList(memListCtx)
cancel()
if err != nil {
r.Logger.Warn("Could not list any etcd members", err)
return true
}

for _, y := range etcdMemberList.Members {
if y.Name == os.Getenv("POD_NAME") {
return true
}
}

cli.Close()

return false
}

func getMemberURL() string {
//end := strings.Split(os.Getenv("ETCD_ENDPOINT"), "//") //TODO: use ETCD_ENDPOINT env var passed by druid
memberURL := "http://" + os.Getenv("POD_NAME") + ".etcd-main-peer.default.svc:2380"
//memberURL := end[0] + "//" + os.Getenv("POD_NAME") + "." + end[1]
return memberURL
}
138 changes: 138 additions & 0 deletions pkg/member/member_add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package member

import (
"context"
"os"
"strings"
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

func AddMemberAsLearner(logger *logrus.Logger) error {
//Add member as learner to cluster
// memAddCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
// defer cancel()
memberURL := getMemberURL()
if memberURL == "" {
logger.Warn("Could not fetch member URL")
}
for {
//Create etcd client
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid
InsecureTransport: true,
})
cli, _ := clientFactory.NewCluster()
memAddCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
_, err2 := cli.MemberAddAsLearner(memAddCtx, []string{memberURL})
cancel()
if err2 != nil {
logger.Warn("Error adding member as a learner: ", err2)
}
if err2 == nil || strings.Contains(rpctypes.ErrGRPCPeerURLExist.Error(), err2.Error()) {
break
//TODO: why not just return here?
}
logger.Info("Could not as member as learner due to: ", err2)
logger.Info("Trying again in 5 seconds")
cli.Close()
timer := time.NewTimer(5 * time.Second)
<-timer.C
timer.Stop()
}

logger.Info("Adding member to cluster as learner")

return nil
}

func IsMemberInCluster(logger *logrus.Logger) bool {
//Create etcd client
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //TODO: use ETCD_ENDPOINT env var passed by druid
InsecureTransport: true, //TODO: is it right to use insecure transport?
})
cli, _ := clientFactory.NewCluster()
logger.Info("Etcd client created")

// List members in cluster
memListCtx, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout)
etcdMemberList, err := cli.MemberList(memListCtx)
cancel()
if err != nil {
logger.Warn("Could not list any etcd members", err)
return true
}

for _, y := range etcdMemberList.Members {
if y.Name == os.Getenv("POD_NAME") {
return true
}
}

cli.Close()

return false
}

func getMemberURL() string {
//end := strings.Split(os.Getenv("ETCD_ENDPOINT"), "//") //TODO: use ETCD_ENDPOINT env var passed by druid
memberURL := "http://" + os.Getenv("POD_NAME") + ".etcd-main-peer.default.svc:2380"
//memberURL := end[0] + "//" + os.Getenv("POD_NAME") + "." + end[1]
return memberURL
}

func PromoteMember(ctx context.Context, logger *logrus.Entry) {
for { //TODO: wise to have an infinite loop? Is there a risk of getting stuck unnecessarily here?
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //[]string{os.Getenv("ETCD_ENDPOINT")}, //TODO: Pass this via env var
InsecureTransport: true,
})
cli, _ := clientFactory.NewCluster()

//List all members in the etcd cluster
//Member URL will appear in the memberlist call response as soon as the member has been added to the cluster
//However, the name of the member will appear only if the member has started
ctx2, cancel := context.WithTimeout(context.TODO(), brtypes.DefaultEtcdConnectionTimeout)
etcdList, memListErr := cli.MemberList(ctx2)
cancel()

if memListErr != nil {
logger.Info("error listing members: ", memListErr)
continue
}

//TODO: Simplify logic below
var promoted bool
promoted = false
for _, y := range etcdList.Members {
logger.Info("CHECK: ", y.Name, " : ", os.Getenv("POD_NAME"))
if y.Name == os.Getenv("POD_NAME") {
logger.Info("Promoting member ", y.Name)
ctx2, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
defer cancel()
//Member promote call will succeed only if member is in sync with leader, and will error out otherwise
_, err3 := cli.MemberPromote(ctx2, y.ID)
if err3 == nil || strings.Contains(rpctypes.ErrGRPCMemberNotLearner.Error(), err3.Error()) {
//Exit if member is successfully promoted or if member is not a learner
promoted = true
logger.Info("Member promoted ", y.Name, " : ", y.ID)
break
}
cancel()
}
}
if promoted {
break
}

//Timer here so that the member promote loop doesn't execute too frequently
timer := time.NewTimer(5 * time.Second)
<-timer.C
timer.Stop()
}
}
56 changes: 3 additions & 53 deletions pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ import (
"fmt"
"net"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gardener/etcd-backup-restore/pkg/backoff"
"github.com/gardener/etcd-backup-restore/pkg/common"
"github.com/gardener/etcd-backup-restore/pkg/leaderelection"
"github.com/gardener/etcd-backup-restore/pkg/member"
"github.com/gardener/etcd-backup-restore/pkg/metrics"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"

Expand All @@ -42,7 +41,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
cron "github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
)
Expand Down Expand Up @@ -165,56 +163,8 @@ func (b *BackupRestoreServer) runServer(ctx context.Context, restoreOpts *brtype
handler := b.startHTTPServer(etcdInitializer, b.config.SnapstoreConfig.Provider, b.config.EtcdConnectionConfig, nil)
defer handler.Stop()

//Promote self from a learner to a full member in the etcd cluster
for { //TODO: wise to have an infinite loop? Is there a risk of getting stuck unnecessarily here?
b.logger.Info("Promote loop")
clientFactory := etcdutil.NewFactory(brtypes.EtcdConnectionConfig{
Endpoints: []string{"http://etcd-main-peer.default.svc:2380"}, //[]string{os.Getenv("ETCD_ENDPOINT")}, //TODO: Pass this via env var
InsecureTransport: true,
})
cli, _ := clientFactory.NewCluster()

//List all members in the etcd cluster
//Member URL will appear in the memberlist call response as soon as the member has been added to the cluster
//However, the name of the member will appear only if the member has started
ctx2, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
etcdList, memListErr := cli.MemberList(ctx2)
cancel()

if memListErr != nil {
b.logger.Info("error listing members: ", memListErr)
continue
}

//TODO: Simplify logic below
var promoted bool
promoted = false
for _, y := range etcdList.Members {
b.logger.Info("CHECK: ", y.Name, " : ", os.Getenv("POD_NAME"))
if y.Name == os.Getenv("POD_NAME") {
b.logger.Info("Promoting member ", y.Name)
ctx2, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
defer cancel()
//Member promote call will succeed only if member is in sync with leader, and will error out otherwise
_, err3 := cli.MemberPromote(ctx2, y.ID)
if err3 == nil || strings.Contains(rpctypes.ErrGRPCMemberNotLearner.Error(), err3.Error()) {
//Exit if member is successfully promoted or if member is not a learner
promoted = true
b.logger.Info("Member promoted ", y.Name, " : ", y.ID)
break
}
cancel()
}
}
if promoted {
break
}

//Timer here so that the member promote loop doesn't execute too frequently
timer := time.NewTimer(5 * time.Second)
<-timer.C
timer.Stop()
}
//Promote member if it is a learner
member.PromoteMember(context.TODO(), b.logger)

leaderCallbacks := &brtypes.LeaderCallbacks{
OnStartedLeading: func(leCtx context.Context) {
Expand Down

0 comments on commit 3b8c033

Please sign in to comment.