Skip to content

Commit

Permalink
Added timeouts to contexts when calling etcd (#5392)
Browse files Browse the repository at this point in the history
Signed-off-by: Soon-Ping Phang <[email protected]>
  • Loading branch information
soonping-amzn authored Jun 14, 2023
1 parent 3fc1bf0 commit aeec731
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* [BUGFIX] Storage: Bucket index updater should ignore meta not found for partial blocks. #5343
* [BUGFIX] Ring: Add JOINING state to read operation. #5346
* [BUGFIX] Compactor: Partial block with only visit marker should be deleted even there is no deletion marker. #5342
* [BUGFIX] KV: Etcd calls will no longer block indefinitely and will now time out after the DialTimeout period. #5392

## 1.15.1 2023-04-26

Expand Down
40 changes: 33 additions & 7 deletions pkg/ring/kv/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
var revision int64
var lastErr error

opsCtx, cancel := c.opsContext(ctx)
defer cancel()

for i := 0; i < c.cfg.MaxRetries; i++ {
resp, err := c.cli.Get(ctx, key)
resp, err := c.cli.Get(opsCtx, key)
if err != nil {
level.Error(c.logger).Log("msg", "error getting key", "key", key, "err", err)
lastErr = err
Expand Down Expand Up @@ -165,7 +168,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou
continue
}

result, err := c.cli.Txn(ctx).
result, err := c.cli.Txn(opsCtx).
If(clientv3.Compare(clientv3.Version(key), "=", revision)).
Then(clientv3.OpPut(key, string(buf))).
Commit()
Expand Down Expand Up @@ -198,7 +201,12 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b

// Ensure the context used by the Watch is always cancelled.
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
cancel()
level.Debug(c.logger).Log("msg", "Finished watching key", "key", key)
}()

level.Debug(c.logger).Log("msg", "Watching key", "key", key)

outer:
for backoff.Ongoing() {
Expand Down Expand Up @@ -234,7 +242,12 @@ func (c *Client) WatchPrefix(ctx context.Context, key string, f func(string, int

// Ensure the context used by the Watch is always cancelled.
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
cancel()
level.Debug(c.logger).Log("msg", "Finished watching prefix", "key", key)
}()

level.Debug(c.logger).Log("msg", "Watching prefix", "key", key)

outer:
for backoff.Ongoing() {
Expand Down Expand Up @@ -268,7 +281,10 @@ outer:

// List implements kv.Client.
func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {
resp, err := c.cli.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
opsCtx, cancel := c.opsContext(ctx)
defer cancel()

resp, err := c.cli.Get(opsCtx, prefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return nil, err
}
Expand All @@ -281,7 +297,10 @@ func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {

// Get implements kv.Client.
func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
resp, err := c.cli.Get(ctx, key)
opsCtx, cancel := c.opsContext(ctx)
defer cancel()

resp, err := c.cli.Get(opsCtx, key)
if err != nil {
return nil, err
}
Expand All @@ -295,10 +314,17 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {

// Delete implements kv.Client.
func (c *Client) Delete(ctx context.Context, key string) error {
_, err := c.cli.Delete(ctx, key)
opsCtx, cancel := c.opsContext(ctx)
defer cancel()

_, err := c.cli.Delete(opsCtx, key)
return err
}

func (c *Client) LastUpdateTime(_ string) time.Time {
return time.Now().UTC()
}

func (c *Client) opsContext(parent context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(parent, time.Duration(float64(c.cfg.DialTimeout)*float64(c.cfg.MaxRetries)))
}

0 comments on commit aeec731

Please sign in to comment.