From 92bd2f4deb3c5b96695385b28c80a8d9fd49d487 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 22 Jan 2021 18:28:42 -0500 Subject: [PATCH] [coordinator] Marshal return val of kv handler with jsonpb (#3116) * [coordinator] Marshal return val of kv handler with jsonpb * Feedback --- src/query/api/v1/handler/database/kvstore.go | 16 +++-- .../api/v1/handler/database/kvstore_test.go | 60 ++++++++++--------- 2 files changed, 42 insertions(+), 34 deletions(-) diff --git a/src/query/api/v1/handler/database/kvstore.go b/src/query/api/v1/handler/database/kvstore.go index 8b3c5a9b0e..55430eda67 100644 --- a/src/query/api/v1/handler/database/kvstore.go +++ b/src/query/api/v1/handler/database/kvstore.go @@ -21,6 +21,7 @@ package database import ( + "bytes" "encoding/json" "errors" "fmt" @@ -67,9 +68,9 @@ type KeyValueUpdateResult struct { // Key to update. Key string `json:"key"` // Old is the value before the update. - Old string `json:"old"` + Old json.RawMessage `json:"old"` // New is the value after the update. - New string `json:"new"` + New json.RawMessage `json:"new"` // Version of the key. Version int `json:"version"` } @@ -162,10 +163,15 @@ func (h *KeyValueStoreHandler) update( return nil, err } - if err := jsonpb.UnmarshalString(string([]byte(update.Value)), newProto); err != nil { + if err := jsonpb.UnmarshalString(string(update.Value), newProto); err != nil { return nil, err } + oldProtoMarshalled := bytes.NewBuffer(nil) + if err := (&jsonpb.Marshaler{}).Marshal(oldProtoMarshalled, oldProto); err != nil { + logger.Error("cannot unmarshal old kv proto", zap.Error(err), zap.String("key", update.Key)) + } + var version int if update.Commit { version, err = kvStore.Set(update.Key, newProto) @@ -176,8 +182,8 @@ func (h *KeyValueStoreHandler) update( result := KeyValueUpdateResult{ Key: update.Key, - Old: oldProto.String(), - New: newProto.String(), + Old: oldProtoMarshalled.Bytes(), + New: update.Value, Version: version, } diff --git a/src/query/api/v1/handler/database/kvstore_test.go b/src/query/api/v1/handler/database/kvstore_test.go index 4a6f047bab..7232815de2 100644 --- a/src/query/api/v1/handler/database/kvstore_test.go +++ b/src/query/api/v1/handler/database/kvstore_test.go @@ -24,6 +24,7 @@ import ( "encoding/json" "testing" + "github.com/gogo/protobuf/jsonpb" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -41,20 +42,17 @@ func TestUpdateQueryLimits(t *testing.T) { name string limits *kvpb.QueryLimits commit bool - expectedJSON string expectedError string }{ { - name: `nil`, - limits: nil, - commit: true, - expectedJSON: "", + name: `nil`, + limits: nil, + commit: true, }, { - name: `empty`, - limits: &kvpb.QueryLimits{}, - commit: true, - expectedJSON: "", + name: `empty`, + limits: &kvpb.QueryLimits{}, + commit: true, }, { name: `only block - commit`, @@ -65,8 +63,7 @@ func TestUpdateQueryLimits(t *testing.T) { ForceExceeded: true, }, }, - commit: true, - expectedJSON: `maxRecentlyQueriedSeriesBlocks: `, + commit: true, }, { name: `only block - no commit`, @@ -77,8 +74,7 @@ func TestUpdateQueryLimits(t *testing.T) { ForceExceeded: true, }, }, - commit: false, - expectedJSON: `maxRecentlyQueriedSeriesBlocks: `, + commit: false, }, { name: `all - commit`, @@ -100,8 +96,6 @@ func TestUpdateQueryLimits(t *testing.T) { }, }, commit: true, - // nolint: lll - expectedJSON: `maxRecentlyQueriedSeriesBlocks: maxRecentlyQueriedSeriesDiskBytesRead: maxRecentlyQueriedSeriesDiskRead: `, }, { name: `all - no commit`, @@ -123,8 +117,6 @@ func TestUpdateQueryLimits(t *testing.T) { }, }, commit: false, - // nolint: lll - expectedJSON: `maxRecentlyQueriedSeriesBlocks: maxRecentlyQueriedSeriesDiskBytesRead: maxRecentlyQueriedSeriesDiskRead: `, }, } @@ -150,24 +142,28 @@ func TestUpdateQueryLimits(t *testing.T) { r, err := handler.update(zap.NewNop(), storeMock, update) require.NoError(t, err) require.Equal(t, kvconfig.QueryLimits, r.Key) - require.Equal(t, "", r.Old) - require.Equal(t, test.expectedJSON, r.New) + require.Equal(t, json.RawMessage("{}"), r.Old) + require.Equal(t, json.RawMessage(limitJSON), r.New) require.Equal(t, 0, r.Version) // (B) test old value. - mockVal := kv.NewMockValue(ctrl) - storeMock.EXPECT().Get(kvconfig.QueryLimits).Return(mockVal, nil) - mockVal.EXPECT().Unmarshal(gomock.Any()).DoAndReturn(func(v *kvpb.QueryLimits) error { - v.MaxRecentlyQueriedSeriesBlocks = &kvpb.QueryLimit{ + oldLimits := &kvpb.QueryLimits{ + MaxRecentlyQueriedSeriesBlocks: &kvpb.QueryLimit{ Limit: 10, LookbackSeconds: 30, ForceExceeded: false, - } - v.MaxRecentlyQueriedSeriesDiskBytesRead = &kvpb.QueryLimit{ + }, + MaxRecentlyQueriedSeriesDiskRead: &kvpb.QueryLimit{ Limit: 100, LookbackSeconds: 300, ForceExceeded: false, - } + }, + } + mockVal := kv.NewMockValue(ctrl) + storeMock.EXPECT().Get(kvconfig.QueryLimits).Return(mockVal, nil) + mockVal.EXPECT().Unmarshal(gomock.Any()).DoAndReturn(func(v *kvpb.QueryLimits) error { + v.MaxRecentlyQueriedSeriesBlocks = oldLimits.MaxRecentlyQueriedSeriesBlocks + v.MaxRecentlyQueriedSeriesDiskRead = oldLimits.MaxRecentlyQueriedSeriesDiskRead return nil }) if test.commit { @@ -177,10 +173,16 @@ func TestUpdateQueryLimits(t *testing.T) { handler = &KeyValueStoreHandler{} r, err = handler.update(zap.NewNop(), storeMock, update) require.NoError(t, err) + + var oldResult kvpb.QueryLimits + err = jsonpb.UnmarshalString(string(r.Old), &oldResult) + require.NoError(t, err) + require.Equal(t, kvconfig.QueryLimits, r.Key) - // nolint: lll - require.Equal(t, `maxRecentlyQueriedSeriesBlocks: maxRecentlyQueriedSeriesDiskBytesRead: `, r.Old) - require.Equal(t, test.expectedJSON, r.New) + require.Equal(t, *oldLimits.MaxRecentlyQueriedSeriesBlocks, *oldResult.MaxRecentlyQueriedSeriesBlocks) + require.Nil(t, oldResult.MaxRecentlyQueriedSeriesDiskBytesRead) + require.Equal(t, *oldLimits.MaxRecentlyQueriedSeriesDiskRead, *oldResult.MaxRecentlyQueriedSeriesDiskRead) + require.Equal(t, json.RawMessage(limitJSON), r.New) require.Equal(t, 0, r.Version) } }