From cff5c58e9ad1a4eb64d3d72ec996100249fa3228 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Fri, 7 Feb 2020 10:52:17 -0500 Subject: [PATCH 1/2] [kv] Include key name in watch errors --- src/cluster/kv/util/runtime/value.go | 3 ++- src/x/watch/options.go | 17 ++++++++++++++ src/x/watch/value.go | 35 +++++++++++++++++++++------- src/x/watch/value_test.go | 16 +++++++++---- 4 files changed, 57 insertions(+), 14 deletions(-) diff --git a/src/cluster/kv/util/runtime/value.go b/src/cluster/kv/util/runtime/value.go index 185e29b32a..44b93a4678 100644 --- a/src/cluster/kv/util/runtime/value.go +++ b/src/cluster/kv/util/runtime/value.go @@ -81,7 +81,8 @@ func (v *value) initValue() { SetInitWatchTimeout(v.opts.InitWatchTimeout()). SetNewUpdatableFn(v.newUpdatableFn). SetGetUpdateFn(v.getUpdateFn). - SetProcessFn(v.updateFn) + SetProcessFn(v.updateFn). + SetKey(v.key) v.Value = watch.NewValue(valueOpts) } diff --git a/src/x/watch/options.go b/src/x/watch/options.go index ea8d760571..c88525b1fa 100644 --- a/src/x/watch/options.go +++ b/src/x/watch/options.go @@ -61,6 +61,12 @@ type Options interface { // ProcessFn returns the process function. ProcessFn() ProcessFn + + // Key returns the key for the watch. + Key() string + + // SetKey sets the key for the watch. + SetKey(key string) Options } type options struct { @@ -69,6 +75,7 @@ type options struct { newUpdatableFn NewUpdatableFn getUpdateFn GetUpdateFn processFn ProcessFn + key string } // NewOptions creates a new set of options. @@ -128,3 +135,13 @@ func (o *options) SetProcessFn(value ProcessFn) Options { func (o *options) ProcessFn() ProcessFn { return o.processFn } + +func (o *options) Key() string { + return o.key +} + +func (o *options) SetKey(key string) Options { + opts := *o + opts.key = key + return &opts +} diff --git a/src/x/watch/value.go b/src/x/watch/value.go index e5c02493c5..e4d62ac982 100644 --- a/src/x/watch/value.go +++ b/src/x/watch/value.go @@ -71,6 +71,7 @@ type value struct { getUpdateFn GetUpdateFn processFn ProcessFn processWithLockFn processWithLockFn + key string updatable Updatable status valueStatus @@ -100,7 +101,10 @@ func (v *value) Watch() error { } updatable, err := v.newUpdatableFn() if err != nil { - return CreateWatchError{innerError: err} + return CreateWatchError{ + innerError: err, + key: v.opts.Key(), + } } v.status = valueWatching v.updatable = updatable @@ -113,16 +117,25 @@ func (v *value) Watch() error { select { case <-v.updatable.C(): case <-time.After(v.opts.InitWatchTimeout()): - return InitValueError{innerError: errInitWatchTimeout} + return InitValueError{ + innerError: errInitWatchTimeout, + key: v.opts.Key(), + } } update, err := v.getUpdateFn(v.updatable) if err != nil { - return InitValueError{innerError: err} + return InitValueError{ + innerError: err, + key: v.opts.Key(), + } } if err = v.processWithLockFn(update); err != nil { - return InitValueError{innerError: err} + return InitValueError{ + innerError: err, + key: v.opts.Key(), + } } return nil } @@ -152,12 +165,16 @@ func (v *value) watchUpdates(updatable Updatable) { } update, err := v.getUpdateFn(updatable) if err != nil { - v.log.Error("error getting update", zap.Error(err)) + v.log.Error("error getting update", + zap.String("key", v.opts.Key()), + zap.Error(err)) v.Unlock() continue } if err = v.processWithLockFn(update); err != nil { - v.log.Error("error updating update", zap.Error(err)) + v.log.Error("error updating update", + zap.String("key", v.opts.Key()), + zap.Error(err)) } v.Unlock() } @@ -173,17 +190,19 @@ func (v *value) processWithLock(update interface{}) error { // CreateWatchError is returned when encountering an error creating a watch. type CreateWatchError struct { innerError error + key string } func (e CreateWatchError) Error() string { - return fmt.Sprintf("create watch error:%v", e.innerError) + return fmt.Sprintf("create watch error (key='%s'): %v", e.key, e.innerError) } // InitValueError is returned when encountering an error when initializing a value. type InitValueError struct { innerError error + key string } func (e InitValueError) Error() string { - return fmt.Sprintf("initializing value error:%v", e.innerError) + return fmt.Sprintf("initializing value error (key='%s'): %v", e.key, e.innerError) } diff --git a/src/x/watch/value_test.go b/src/x/watch/value_test.go index 292455b058..6e1766e63e 100644 --- a/src/x/watch/value_test.go +++ b/src/x/watch/value_test.go @@ -42,10 +42,14 @@ func TestValueWatchCreateWatchError(t *testing.T) { updatableFn := func() (Updatable, error) { return nil, errWatch } - rv := NewValue(testValueOptions().SetNewUpdatableFn(updatableFn)).(*value) + rv := NewValue( + testValueOptions(). + SetNewUpdatableFn(updatableFn). + SetKey("foobar"), + ).(*value) err := rv.Watch() - require.Equal(t, CreateWatchError{innerError: errWatch}, err) + require.Equal(t, CreateWatchError{innerError: errWatch, key: "foobar"}, err) require.Equal(t, valueNotWatching, rv.status) rv.Unwatch() @@ -55,7 +59,7 @@ func TestValueWatchCreateWatchError(t *testing.T) { func TestValueWatchWatchTimeout(t *testing.T) { _, rv := testWatchableAndValue() err := rv.Watch() - require.Equal(t, InitValueError{innerError: errInitWatchTimeout}, err) + require.Equal(t, InitValueError{innerError: errInitWatchTimeout, key: "foobar"}, err) require.Equal(t, valueWatching, rv.status) rv.Unwatch() @@ -68,7 +72,7 @@ func TestValueWatchUpdateError(t *testing.T) { require.NoError(t, wa.Update(1)) rv.processWithLockFn = func(interface{}) error { return errUpdate } - require.Equal(t, InitValueError{innerError: errUpdate}, rv.Watch()) + require.Equal(t, InitValueError{innerError: errUpdate, key: "foobar"}, rv.Watch()) require.Equal(t, valueWatching, rv.status) rv.Unwatch() @@ -231,7 +235,9 @@ func testWatchableAndValue() (Watchable, *value) { SetNewUpdatableFn(testUpdatableFn(wa)). SetGetUpdateFn(func(updatable Updatable) (interface{}, error) { return updatable.(Watch).Get(), nil - }) + }). + SetKey("foobar") + return wa, NewValue(opts).(*value) } From 8a78b856cbf6e8459b4d9ed4a440b7dbd6b67b13 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Mon, 10 Feb 2020 14:26:27 -0500 Subject: [PATCH 2/2] fix err msg --- src/x/watch/value.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/x/watch/value.go b/src/x/watch/value.go index e4d62ac982..b080dc21bf 100644 --- a/src/x/watch/value.go +++ b/src/x/watch/value.go @@ -172,7 +172,7 @@ func (v *value) watchUpdates(updatable Updatable) { continue } if err = v.processWithLockFn(update); err != nil { - v.log.Error("error updating update", + v.log.Error("error applying update", zap.String("key", v.opts.Key()), zap.Error(err)) }