Skip to content

Commit

Permalink
Defragment etcd datastore before clearing alarms
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Mar 28, 2022
1 parent 4ca1f94 commit 98036ce
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
4 changes: 3 additions & 1 deletion pkg/cluster/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,9 @@ func (c *Cluster) reconcileEtcd(ctx context.Context) error {
if err := e.SetControlConfig(reconcileCtx, c.config); err != nil {
return err
}
e.StartEmbeddedTemporary(reconcileCtx)
if err := e.StartEmbeddedTemporary(reconcileCtx); err != nil {
return err
}

for {
if err := e.Test(reconcileCtx); err != nil && !errors.Is(err, etcd.ErrNotMember) {
Expand Down
58 changes: 39 additions & 19 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/clientv3"
etcd "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/snapshot"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
Expand Down Expand Up @@ -160,9 +161,10 @@ func (e *ETCD) SetControlConfig(ctx context.Context, config *config.Control) err
return e.setName(false)
}

// Test ensures that the local node is a voting member of the target cluster.
// Test ensures that the local node is a voting member of the target cluster,
// and that the datastore is defragmented and not in maintenance mode due to alarms.
// If it is still a learner or not a part of the cluster, an error is raised.
// If it has any alarms that cannot be disarmed, an error is raised.
// If it cannot be defragmented or has any alarms that cannot be disarmed, an error is raised.
func (e *ETCD) Test(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()
Expand All @@ -177,10 +179,24 @@ func (e *ETCD) Test(ctx context.Context) error {
return errors.New("this server has not yet been promoted from learner to voting member")
}

if err := e.defragment(ctx); err != nil {
return errors.Wrap(err, "failed to defragment etcd database")
}

if err := e.clearAlarms(ctx); err != nil {
return errors.Wrap(err, "failed to report and disarm etcd alarms")
}

// refresh status to see if any errors remain after clearing alarms
status, err = e.client.Status(ctx, endpoints[0])
if err != nil {
return err
}

if len(status.Errors) > 0 {
return fmt.Errorf("etcd cluster errors: %s", strings.Join(status.Errors, ", "))
}

members, err := e.client.MemberList(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -713,13 +729,10 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init
func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
etcdDataDir := DBDir(e.config)
tmpDataDir := etcdDataDir + "-tmp"

os.RemoveAll(tmpDataDir)
if err := os.Mkdir(tmpDataDir, 0700); err != nil {
return err
}

defer func() {
go func() {
<-ctx.Done()
if err := os.RemoveAll(tmpDataDir); err != nil {
logrus.Warnf("Failed to remove etcd temp dir: %v", err)
}
Expand Down Expand Up @@ -749,7 +762,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context) error {
ElectionTimeout: 5000,
Name: e.name,
LogOutputs: []string{"stderr"},
}, nil)
}, append(e.config.ExtraAPIArgs, "--max-snapshots=0", "--max-wals=0"))
}

func addPort(address string, offset int) (string, error) {
Expand Down Expand Up @@ -950,27 +963,34 @@ func (e *ETCD) clearAlarms(ctx context.Context) error {
if err != nil {
return fmt.Errorf("etcd alarm list failed: %v", err)
}
if len(alarmList.Alarms) == 0 {
return nil
}

var hasAlarm bool
for _, alarm := range alarmList.Alarms {
if alarmList.Header.MemberId != alarm.MemberID {
continue
}
logrus.Warnf("Alarm on etcd server: %s", alarm.Alarm)
hasAlarm = true
logrus.Warnf("Alarm on etcd member %d: %s", alarm.MemberID, alarm.Alarm)
}
if hasAlarm {
if _, err := e.client.AlarmDisarm(ctx, &etcd.AlarmMember{}); err != nil {

if len(alarmList.Alarms) > 0 {
if _, err := e.client.AlarmDisarm(ctx, &clientv3.AlarmMember{}); err != nil {
return fmt.Errorf("etcd alarm disarm failed: %v", err)
}
logrus.Infof("Alarms disarmed on etcd server")
}
return nil
}

func (e *ETCD) defragment(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, testTimeout)
defer cancel()

if e.client == nil {
return errors.New("etcd client was nil")
}

logrus.Infof("Defragmenting etcd database")
endpoints := getEndpoints(e.config.Runtime)
_, err := e.client.Defragment(ctx, endpoints[0])
return err
}

// clientURLs returns a list of all non-learner etcd cluster member client access URLs.
// The list is retrieved from the remote server that is being joined.
func ClientURLs(ctx context.Context, clientAccessInfo *clientaccess.Info, selfIP string) ([]string, Members, error) {
Expand Down

0 comments on commit 98036ce

Please sign in to comment.