Skip to content

Commit

Permalink
[!] refactor etcd checker with etcd/client/v3.Watch(), closes #196 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub authored Jan 19, 2024
1 parent f56cab7 commit f2a8850
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 70 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ This is a list of all avaiable configuration items:
`etcd-user` | `VIP_ETCD_USER` | no | patroni | A username that is allowed to look at the `trigger-key` in an etcd DCS. Optional when using `dcs-type=etcd` .
`etcd-password` | `VIP_ETCD_PASSWORD` | no | snakeoil | The password for `etcd-user`. Optional when using `dcs-type=etcd` . Requires that `etcd-user` is also set.
`consul-token` | `VIP_CONSUL_TOKEN` | no | snakeoil | A token that can be used with the consul-API for authentication. Optional when using `dcs-type=consul` .
`interval` | `VIP_INTERVAL` | no | 1000 | The time vip-manager main loop sleeps before checking for changes. Measured in ms. Defaults to `1000`.
`interval` | `VIP_INTERVAL` | no | 1000 | The time vip-manager main loop sleeps before checking for changes. Measured in ms. Defaults to `1000`. Doesn't affect etcd checker since v2.3.0.
`retry-after` | `VIP_RETRY_AFTER` | no | 250 | The time to wait before retrying interactions with components outside of vip-manager. Measured in ms. Defaults to `250`.
`retry-num` | `VIP_RETRY_NUM` | no | 3 | The number of times interactions with components outside of vip-manager are retried. Defaults to `3`.
`etcd-ca-file` | `VIP_ETCD_CA_FILE` | no | /etc/etcd/ca.cert.pem | A certificate authority file that can be used to verify the certificate provided by etcd endpoints. Make sure to change `dcs-endpoints` to reflect that `https` is used.
Expand Down
119 changes: 52 additions & 67 deletions checker/etcd_leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,35 @@ import (
"time"

"github.com/cybertec-postgresql/vip-manager/vipconfig"
client "go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
)

// EtcdLeaderChecker is used to check state of the leader key in Etcd
type EtcdLeaderChecker struct {
key string
nodename string
kapi client.KV
*vipconfig.Config
*clientv3.Client
}

// naming this c_conf to avoid conflict with conf in etcd_leader_checker.go
var eConf *vipconfig.Config
// NewEtcdLeaderChecker returns a new instance
func NewEtcdLeaderChecker(conf *vipconfig.Config) (*EtcdLeaderChecker, error) {
tlsConfig, err := getTransport(conf)
if err != nil {
return nil, err
}
cfg := clientv3.Config{
Endpoints: conf.Endpoints,
TLS: tlsConfig,
DialKeepAliveTimeout: 5 * time.Second,
DialKeepAliveTime: 5 * time.Second,
Username: conf.EtcdUser,
Password: conf.EtcdPassword,
}
c, err := clientv3.New(cfg)
return &EtcdLeaderChecker{conf, c}, err
}

func getTransport(conf *vipconfig.Config) (*tls.Config, error) {
var caCertPool *x509.CertPool

// create valid CertPool only if the ca certificate file exists
if conf.EtcdCAFile != "" {
caCert, err := os.ReadFile(conf.EtcdCAFile)
Expand All @@ -36,9 +49,7 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) {
caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
}

var certificates []tls.Certificate

// create valid []Certificate only if the client cert and key files exists
if conf.EtcdCertFile != "" && conf.EtcdKeyFile != "" {
cert, err := tls.LoadX509KeyPair(conf.EtcdCertFile, conf.EtcdKeyFile)
Expand All @@ -48,83 +59,57 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) {

certificates = []tls.Certificate{cert}
}

tlsClientConfig := new(tls.Config)

if caCertPool != nil {
tlsClientConfig.RootCAs = caCertPool
if certificates != nil {
tlsClientConfig.Certificates = certificates
}
}

return tlsClientConfig, nil
}

// NewEtcdLeaderChecker returns a new instance
func NewEtcdLeaderChecker(con *vipconfig.Config) (*EtcdLeaderChecker, error) {
eConf = con
e := &EtcdLeaderChecker{key: eConf.Key, nodename: eConf.Nodename}

tlsConfig, err := getTransport(eConf)
// init gets the current leader from etcd
func (elc *EtcdLeaderChecker) init(ctx context.Context, out chan<- bool) {
resp, err := elc.Get(ctx, elc.Key)
if err != nil {
return nil, err
}

cfg := client.Config{
Endpoints: eConf.Endpoints,
TLS: tlsConfig,
DialKeepAliveTimeout: 5 * time.Second,
DialKeepAliveTime: 5 * time.Second,
Username: eConf.EtcdUser,
Password: eConf.EtcdPassword,
log.Printf("etcd error: %s", err)
out <- false
return
}
c, err := client.New(cfg)
if err != nil {
return nil, err
for _, kv := range resp.Kvs {
log.Printf("Current Leader from DCS: %s", kv.Value)
out <- string(kv.Value) == elc.Nodename
}
e.kapi = c.KV
return e, nil
}

// GetChangeNotificationStream checks the status in the loop
func (e *EtcdLeaderChecker) GetChangeNotificationStream(ctx context.Context, out chan<- bool) error {
var state bool
var alreadyConnected = false
checkLoop:
// watch monitors the leader change from etcd
func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error {
watchChan := elc.Watch(ctx, elc.Key)
log.Println("set WATCH on " + elc.Key)
for {
resp, err := e.kapi.Get(ctx, e.key)

if err != nil {
if ctx.Err() != nil {
break checkLoop
}
log.Printf("etcd error: %s", err)
out <- false
time.Sleep(time.Duration(eConf.Interval) * time.Millisecond)
continue
}

if !alreadyConnected {
log.Printf("etcd checker started up, found key %s", e.key)
alreadyConnected = true
}

for _, kv := range resp.Kvs {
if eConf.Verbose {
log.Println("Leader from DCS:", string(kv.Value))
}
state = string(kv.Value) == e.nodename
}

select {
case <-ctx.Done():
break checkLoop
case out <- state:
time.Sleep(time.Duration(eConf.Interval) * time.Millisecond)
continue
return ctx.Err()
case watchResp := <-watchChan:
if err := watchResp.Err(); err != nil {
log.Printf("etcd watcher returned error: %s", err)
out <- false
continue
}
for _, event := range watchResp.Events {
out <- string(event.Kv.Value) == elc.Nodename
log.Printf("Current Leader from DCS: %s", event.Kv.Value)
}
}
}
}

return ctx.Err()
// GetChangeNotificationStream monitors the leader in etcd
func (elc *EtcdLeaderChecker) GetChangeNotificationStream(ctx context.Context, out chan<- bool) error {
defer elc.Close()
go elc.init(ctx, out)
wctx, cancel := context.WithCancel(ctx)
defer cancel()
return elc.watch(wctx, out)
}
4 changes: 2 additions & 2 deletions test/behaviour_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trap cleanup EXIT

# run etcd with podman/docker maybe?
# podman rm etcd || true
# podman run -d --name etcd -p 2379:2379 -e "ETCD_ENABLE_V2=true" -e "ALLOW_NONE_AUTHENTICATION=yes" bitnami/etcd
# podman run -d --name etcd -p 2379:2379 -e "ALLOW_NONE_AUTHENTICATION=yes" bitnami/etcd

# run etcd locally maybe?
etcd &
Expand All @@ -67,7 +67,7 @@ echo $! > .ncatPid
etcdctl del service/pgcluster/leader || true

touch .failed
./vip-manager --interface $dev --ip $vip --netmask 32 --trigger-key service/pgcluster/leader --trigger-value $HOSTNAME & #2>&1 &
./vip-manager --interval 3000 --interface $dev --ip $vip --netmask 32 --trigger-key service/pgcluster/leader --trigger-value $HOSTNAME & #2>&1 &
echo $! > .vipPid
sleep 2

Expand Down

0 comments on commit f2a8850

Please sign in to comment.