From aeec73120ff35eee689d70e683fd72a02ff5eb1e Mon Sep 17 00:00:00 2001 From: Soon-Ping Date: Wed, 14 Jun 2023 11:49:20 -0700 Subject: [PATCH] Added timeouts to contexts when calling etcd (#5392) Signed-off-by: Soon-Ping Phang --- CHANGELOG.md | 1 + pkg/ring/kv/etcd/etcd.go | 40 +++++++++++++++++++++++++++++++++------- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69e49d7ed4..527c4cac37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ * [BUGFIX] Storage: Bucket index updater should ignore meta not found for partial blocks. #5343 * [BUGFIX] Ring: Add JOINING state to read operation. #5346 * [BUGFIX] Compactor: Partial block with only visit marker should be deleted even there is no deletion marker. #5342 +* [BUGFIX] KV: Etcd calls will no longer block indefinitely and will now time out after the DialTimeout period. #5392 ## 1.15.1 2023-04-26 diff --git a/pkg/ring/kv/etcd/etcd.go b/pkg/ring/kv/etcd/etcd.go index 591845fe88..5e0bf13fa5 100644 --- a/pkg/ring/kv/etcd/etcd.go +++ b/pkg/ring/kv/etcd/etcd.go @@ -124,8 +124,11 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou var revision int64 var lastErr error + opsCtx, cancel := c.opsContext(ctx) + defer cancel() + for i := 0; i < c.cfg.MaxRetries; i++ { - resp, err := c.cli.Get(ctx, key) + resp, err := c.cli.Get(opsCtx, key) if err != nil { level.Error(c.logger).Log("msg", "error getting key", "key", key, "err", err) lastErr = err @@ -165,7 +168,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou continue } - result, err := c.cli.Txn(ctx). + result, err := c.cli.Txn(opsCtx). If(clientv3.Compare(clientv3.Version(key), "=", revision)). Then(clientv3.OpPut(key, string(buf))). Commit() @@ -198,7 +201,12 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b // Ensure the context used by the Watch is always cancelled. watchCtx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + cancel() + level.Debug(c.logger).Log("msg", "Finished watching key", "key", key) + }() + + level.Debug(c.logger).Log("msg", "Watching key", "key", key) outer: for backoff.Ongoing() { @@ -234,7 +242,12 @@ func (c *Client) WatchPrefix(ctx context.Context, key string, f func(string, int // Ensure the context used by the Watch is always cancelled. watchCtx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + cancel() + level.Debug(c.logger).Log("msg", "Finished watching prefix", "key", key) + }() + + level.Debug(c.logger).Log("msg", "Watching prefix", "key", key) outer: for backoff.Ongoing() { @@ -268,7 +281,10 @@ outer: // List implements kv.Client. func (c *Client) List(ctx context.Context, prefix string) ([]string, error) { - resp, err := c.cli.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithKeysOnly()) + opsCtx, cancel := c.opsContext(ctx) + defer cancel() + + resp, err := c.cli.Get(opsCtx, prefix, clientv3.WithPrefix(), clientv3.WithKeysOnly()) if err != nil { return nil, err } @@ -281,7 +297,10 @@ func (c *Client) List(ctx context.Context, prefix string) ([]string, error) { // Get implements kv.Client. func (c *Client) Get(ctx context.Context, key string) (interface{}, error) { - resp, err := c.cli.Get(ctx, key) + opsCtx, cancel := c.opsContext(ctx) + defer cancel() + + resp, err := c.cli.Get(opsCtx, key) if err != nil { return nil, err } @@ -295,10 +314,17 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) { // Delete implements kv.Client. func (c *Client) Delete(ctx context.Context, key string) error { - _, err := c.cli.Delete(ctx, key) + opsCtx, cancel := c.opsContext(ctx) + defer cancel() + + _, err := c.cli.Delete(opsCtx, key) return err } func (c *Client) LastUpdateTime(_ string) time.Time { return time.Now().UTC() } + +func (c *Client) opsContext(parent context.Context) (context.Context, context.CancelFunc) { + return context.WithTimeout(parent, time.Duration(float64(c.cfg.DialTimeout)*float64(c.cfg.MaxRetries))) +}