From f2a885022edc40aba93bcc9323a196760406ddf5 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 19 Jan 2024 08:25:21 +0100 Subject: [PATCH] [!] refactor etcd checker with `etcd/client/v3.Watch()`, closes #196 (#199) --- README.md | 2 +- checker/etcd_leader_checker.go | 119 ++++++++++++++------------------- test/behaviour_test.sh | 4 +- 3 files changed, 55 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 913516f..b463d6b 100644 --- a/README.md +++ b/README.md @@ -125,7 +125,7 @@ This is a list of all avaiable configuration items: `etcd-user` | `VIP_ETCD_USER` | no | patroni | A username that is allowed to look at the `trigger-key` in an etcd DCS. Optional when using `dcs-type=etcd` . `etcd-password` | `VIP_ETCD_PASSWORD` | no | snakeoil | The password for `etcd-user`. Optional when using `dcs-type=etcd` . Requires that `etcd-user` is also set. `consul-token` | `VIP_CONSUL_TOKEN` | no | snakeoil | A token that can be used with the consul-API for authentication. Optional when using `dcs-type=consul` . -`interval` | `VIP_INTERVAL` | no | 1000 | The time vip-manager main loop sleeps before checking for changes. Measured in ms. Defaults to `1000`. +`interval` | `VIP_INTERVAL` | no | 1000 | The time vip-manager main loop sleeps before checking for changes. Measured in ms. Defaults to `1000`. Doesn't affect etcd checker since v2.3.0. `retry-after` | `VIP_RETRY_AFTER` | no | 250 | The time to wait before retrying interactions with components outside of vip-manager. Measured in ms. Defaults to `250`. `retry-num` | `VIP_RETRY_NUM` | no | 3 | The number of times interactions with components outside of vip-manager are retried. Defaults to `3`. `etcd-ca-file` | `VIP_ETCD_CA_FILE` | no | /etc/etcd/ca.cert.pem | A certificate authority file that can be used to verify the certificate provided by etcd endpoints. Make sure to change `dcs-endpoints` to reflect that `https` is used. diff --git a/checker/etcd_leader_checker.go b/checker/etcd_leader_checker.go index f7d0125..2593c85 100644 --- a/checker/etcd_leader_checker.go +++ b/checker/etcd_leader_checker.go @@ -10,22 +10,35 @@ import ( "time" "github.com/cybertec-postgresql/vip-manager/vipconfig" - client "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" ) // EtcdLeaderChecker is used to check state of the leader key in Etcd type EtcdLeaderChecker struct { - key string - nodename string - kapi client.KV + *vipconfig.Config + *clientv3.Client } -// naming this c_conf to avoid conflict with conf in etcd_leader_checker.go -var eConf *vipconfig.Config +// NewEtcdLeaderChecker returns a new instance +func NewEtcdLeaderChecker(conf *vipconfig.Config) (*EtcdLeaderChecker, error) { + tlsConfig, err := getTransport(conf) + if err != nil { + return nil, err + } + cfg := clientv3.Config{ + Endpoints: conf.Endpoints, + TLS: tlsConfig, + DialKeepAliveTimeout: 5 * time.Second, + DialKeepAliveTime: 5 * time.Second, + Username: conf.EtcdUser, + Password: conf.EtcdPassword, + } + c, err := clientv3.New(cfg) + return &EtcdLeaderChecker{conf, c}, err +} func getTransport(conf *vipconfig.Config) (*tls.Config, error) { var caCertPool *x509.CertPool - // create valid CertPool only if the ca certificate file exists if conf.EtcdCAFile != "" { caCert, err := os.ReadFile(conf.EtcdCAFile) @@ -36,9 +49,7 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) { caCertPool = x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) } - var certificates []tls.Certificate - // create valid []Certificate only if the client cert and key files exists if conf.EtcdCertFile != "" && conf.EtcdKeyFile != "" { cert, err := tls.LoadX509KeyPair(conf.EtcdCertFile, conf.EtcdKeyFile) @@ -48,83 +59,57 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) { certificates = []tls.Certificate{cert} } - tlsClientConfig := new(tls.Config) - if caCertPool != nil { tlsClientConfig.RootCAs = caCertPool if certificates != nil { tlsClientConfig.Certificates = certificates } } - return tlsClientConfig, nil } -// NewEtcdLeaderChecker returns a new instance -func NewEtcdLeaderChecker(con *vipconfig.Config) (*EtcdLeaderChecker, error) { - eConf = con - e := &EtcdLeaderChecker{key: eConf.Key, nodename: eConf.Nodename} - - tlsConfig, err := getTransport(eConf) +// init gets the current leader from etcd +func (elc *EtcdLeaderChecker) init(ctx context.Context, out chan<- bool) { + resp, err := elc.Get(ctx, elc.Key) if err != nil { - return nil, err - } - - cfg := client.Config{ - Endpoints: eConf.Endpoints, - TLS: tlsConfig, - DialKeepAliveTimeout: 5 * time.Second, - DialKeepAliveTime: 5 * time.Second, - Username: eConf.EtcdUser, - Password: eConf.EtcdPassword, + log.Printf("etcd error: %s", err) + out <- false + return } - c, err := client.New(cfg) - if err != nil { - return nil, err + for _, kv := range resp.Kvs { + log.Printf("Current Leader from DCS: %s", kv.Value) + out <- string(kv.Value) == elc.Nodename } - e.kapi = c.KV - return e, nil } -// GetChangeNotificationStream checks the status in the loop -func (e *EtcdLeaderChecker) GetChangeNotificationStream(ctx context.Context, out chan<- bool) error { - var state bool - var alreadyConnected = false -checkLoop: +// watch monitors the leader change from etcd +func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error { + watchChan := elc.Watch(ctx, elc.Key) + log.Println("set WATCH on " + elc.Key) for { - resp, err := e.kapi.Get(ctx, e.key) - - if err != nil { - if ctx.Err() != nil { - break checkLoop - } - log.Printf("etcd error: %s", err) - out <- false - time.Sleep(time.Duration(eConf.Interval) * time.Millisecond) - continue - } - - if !alreadyConnected { - log.Printf("etcd checker started up, found key %s", e.key) - alreadyConnected = true - } - - for _, kv := range resp.Kvs { - if eConf.Verbose { - log.Println("Leader from DCS:", string(kv.Value)) - } - state = string(kv.Value) == e.nodename - } - select { case <-ctx.Done(): - break checkLoop - case out <- state: - time.Sleep(time.Duration(eConf.Interval) * time.Millisecond) - continue + return ctx.Err() + case watchResp := <-watchChan: + if err := watchResp.Err(); err != nil { + log.Printf("etcd watcher returned error: %s", err) + out <- false + continue + } + for _, event := range watchResp.Events { + out <- string(event.Kv.Value) == elc.Nodename + log.Printf("Current Leader from DCS: %s", event.Kv.Value) + } } } +} - return ctx.Err() +// GetChangeNotificationStream monitors the leader in etcd +func (elc *EtcdLeaderChecker) GetChangeNotificationStream(ctx context.Context, out chan<- bool) error { + defer elc.Close() + go elc.init(ctx, out) + wctx, cancel := context.WithCancel(ctx) + defer cancel() + return elc.watch(wctx, out) } diff --git a/test/behaviour_test.sh b/test/behaviour_test.sh index 6cdc19c..5bdd7b3 100755 --- a/test/behaviour_test.sh +++ b/test/behaviour_test.sh @@ -53,7 +53,7 @@ trap cleanup EXIT # run etcd with podman/docker maybe? # podman rm etcd || true -# podman run -d --name etcd -p 2379:2379 -e "ETCD_ENABLE_V2=true" -e "ALLOW_NONE_AUTHENTICATION=yes" bitnami/etcd +# podman run -d --name etcd -p 2379:2379 -e "ALLOW_NONE_AUTHENTICATION=yes" bitnami/etcd # run etcd locally maybe? etcd & @@ -67,7 +67,7 @@ echo $! > .ncatPid etcdctl del service/pgcluster/leader || true touch .failed -./vip-manager --interface $dev --ip $vip --netmask 32 --trigger-key service/pgcluster/leader --trigger-value $HOSTNAME & #2>&1 & +./vip-manager --interval 3000 --interface $dev --ip $vip --netmask 32 --trigger-key service/pgcluster/leader --trigger-value $HOSTNAME & #2>&1 & echo $! > .vipPid sleep 2