This repository has been archived by the owner on Feb 27, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 773
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: yunfeiyangbuaa <[email protected]>
- Loading branch information
1 parent
9033f63
commit 58b8838
Showing
7 changed files
with
367 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
package ha | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/dragonflyoss/Dragonfly/supernode/config" | ||
|
||
"github.com/sirupsen/logrus" | ||
"go.etcd.io/etcd/clientv3" | ||
) | ||
|
||
//EtcdMgr is the struct to manager etcd. | ||
type EtcdMgr struct { | ||
config clientv3.Config | ||
client *clientv3.Client | ||
leaseTTL int64 | ||
leaseKeepAliveRsp <-chan *clientv3.LeaseKeepAliveResponse | ||
hostIP string | ||
leaseResp *clientv3.LeaseGrantResponse | ||
} | ||
|
||
const ( | ||
//ActiveSupernodeOFF means there is no active supernode. | ||
ActiveSupernodeOFF = "" | ||
//ActiveSupernodeChange means active supernode change to standby supernode because of unhealthy. | ||
ActiveSupernodeChange = 0 | ||
//ActiveSupernodeKeep means active supernode is health. | ||
ActiveSupernodeKeep = 1 | ||
//etcdTimeOut is the etcd client's timeout second | ||
etcdTimeOut = 10 * time.Second | ||
//activeKeyPreFIx is the keyPrefix of active supernode | ||
activeKeyPreFIx = "/supernode/active/" | ||
//standbyKeyPreFix is the keyPrefix of active supernode | ||
//standbyKeyPreFix = "/supernode/standby/" | ||
) | ||
|
||
//NewEtcdMgr produce a etcdmgr object. | ||
func NewEtcdMgr(cfg *config.Config) (*EtcdMgr, error) { | ||
config := clientv3.Config{ | ||
Endpoints: cfg.HAConfig, | ||
DialTimeout: etcdTimeOut, | ||
} | ||
// build connection to etcd. | ||
client, err := clientv3.New(config) | ||
return &EtcdMgr{ | ||
hostIP: cfg.AdvertiseIP, | ||
config: config, | ||
client: client, | ||
}, err | ||
} | ||
|
||
//WatchActiveChange is the progress to watch the etcd,if the value of key /lock/active changes,supernode will be notified. | ||
func (etcd *EtcdMgr) WatchActiveChange(ctx context.Context, messageChannel chan string) { | ||
var watchStartRevision int64 | ||
watcher := clientv3.NewWatcher(etcd.client) | ||
watchChan := watcher.Watch(ctx, activeKeyPreFIx, clientv3.WithRev(watchStartRevision)) | ||
for watchResp := range watchChan { | ||
for _, event := range watchResp.Events { | ||
switch event.Type { | ||
case ActiveSupernodeChange: | ||
messageChannel <- string(event.Kv.Value) | ||
case ActiveSupernodeKeep: | ||
messageChannel <- ActiveSupernodeOFF | ||
default: | ||
logrus.Warnf("failed to get watch active supernode,unexpected response: %d", int(event.Type)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
//ObtainActiveInfo obtain the active supernode's information from etcd. | ||
func (etcd *EtcdMgr) ObtainActiveInfo(ctx context.Context, key string) (string, error) { | ||
var value string | ||
kv := clientv3.NewKV(etcd.client) | ||
getRes, err := kv.Get(ctx, key) | ||
if err != nil { | ||
logrus.Errorf("failed to get the active supernode's(key: %s) info: %v", key, err) | ||
} | ||
for _, v := range getRes.Kvs { | ||
value = string(v.Value) | ||
} | ||
return value, err | ||
} | ||
|
||
//ActiveResureItsStatus keep look on the lease's renew response. | ||
func (etcd *EtcdMgr) ActiveResureItsStatus() { | ||
for { | ||
select { | ||
case keepResp := <-etcd.leaseKeepAliveRsp: | ||
if keepResp == nil { | ||
logrus.Info("failed to renew the etcd lease") | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
//TryBeActive try to change the supernode's status from standby to active. | ||
func (etcd *EtcdMgr) TryBeActive(ctx context.Context) (bool, string, error) { | ||
var ( | ||
err error | ||
leaseResp *clientv3.LeaseGrantResponse | ||
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse | ||
txnResp *clientv3.TxnResponse | ||
) | ||
kv := clientv3.NewKV(etcd.client) | ||
//make a lease to obtain a lock | ||
lease := clientv3.NewLease(etcd.client) | ||
if leaseResp, err = lease.Grant(ctx, etcd.leaseTTL); err != nil { | ||
logrus.Errorf("failed to create a etcd lease: %v", err) | ||
} | ||
if keepRespChan, err = lease.KeepAlive(ctx, leaseResp.ID); err != nil { | ||
logrus.Errorf("failed to create etcd.leaseKeepAliveRsp: %v", err) | ||
} | ||
etcd.leaseKeepAliveRsp = keepRespChan | ||
etcd.leaseResp = leaseResp | ||
//if the lock is available,get the lock. | ||
//else read the lock | ||
txn := kv.Txn(ctx) | ||
txn.If(clientv3.Compare(clientv3.CreateRevision(activeKeyPreFIx), "=", 0)). | ||
Then(clientv3.OpPut(activeKeyPreFIx, etcd.hostIP, clientv3.WithLease(leaseResp.ID))). | ||
Else(clientv3.OpGet(activeKeyPreFIx)) | ||
if txnResp, err = txn.Commit(); err != nil { | ||
logrus.Errorf("failed to commit a etcd transaction: %v", err) | ||
} | ||
if !txnResp.Succeeded { | ||
_, err = lease.Revoke(ctx, leaseResp.ID) | ||
return false, string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value), err | ||
} | ||
return true, etcd.hostIP, nil | ||
} | ||
|
||
//ActiveKillItself cancels the renew of lease. | ||
func (etcd *EtcdMgr) ActiveKillItself(ctx context.Context) bool { | ||
if _, err := etcd.client.Revoke(ctx, etcd.leaseResp.ID); err != nil { | ||
logrus.Errorf("failed to cancel a etcd lease: %v", err) | ||
return false | ||
} | ||
logrus.Info("success to cancel a etcd lease") | ||
return true | ||
} | ||
|
||
//Close close the tool used to implement supernode ha. | ||
func (etcd *EtcdMgr) Close() error { | ||
var err error | ||
if err = etcd.client.Close(); err != nil { | ||
return err | ||
} | ||
logrus.Info("success to close a etcd client") | ||
return nil | ||
} |
Oops, something went wrong.