Skip to content

Commit

Permalink
[coordinator] Marshal return val of kv handler with jsonpb (#3116)
Browse files Browse the repository at this point in the history
* [coordinator] Marshal return val of kv handler with jsonpb

* Feedback
  • Loading branch information
wesleyk authored Jan 22, 2021
1 parent c74143f commit 92bd2f4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 34 deletions.
16 changes: 11 additions & 5 deletions src/query/api/v1/handler/database/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package database

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -67,9 +68,9 @@ type KeyValueUpdateResult struct {
// Key to update.
Key string `json:"key"`
// Old is the value before the update.
Old string `json:"old"`
Old json.RawMessage `json:"old"`
// New is the value after the update.
New string `json:"new"`
New json.RawMessage `json:"new"`
// Version of the key.
Version int `json:"version"`
}
Expand Down Expand Up @@ -162,10 +163,15 @@ func (h *KeyValueStoreHandler) update(
return nil, err
}

if err := jsonpb.UnmarshalString(string([]byte(update.Value)), newProto); err != nil {
if err := jsonpb.UnmarshalString(string(update.Value), newProto); err != nil {
return nil, err
}

oldProtoMarshalled := bytes.NewBuffer(nil)
if err := (&jsonpb.Marshaler{}).Marshal(oldProtoMarshalled, oldProto); err != nil {
logger.Error("cannot unmarshal old kv proto", zap.Error(err), zap.String("key", update.Key))
}

var version int
if update.Commit {
version, err = kvStore.Set(update.Key, newProto)
Expand All @@ -176,8 +182,8 @@ func (h *KeyValueStoreHandler) update(

result := KeyValueUpdateResult{
Key: update.Key,
Old: oldProto.String(),
New: newProto.String(),
Old: oldProtoMarshalled.Bytes(),
New: update.Value,
Version: version,
}

Expand Down
60 changes: 31 additions & 29 deletions src/query/api/v1/handler/database/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"testing"

"github.com/gogo/protobuf/jsonpb"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand All @@ -41,20 +42,17 @@ func TestUpdateQueryLimits(t *testing.T) {
name string
limits *kvpb.QueryLimits
commit bool
expectedJSON string
expectedError string
}{
{
name: `nil`,
limits: nil,
commit: true,
expectedJSON: "",
name: `nil`,
limits: nil,
commit: true,
},
{
name: `empty`,
limits: &kvpb.QueryLimits{},
commit: true,
expectedJSON: "",
name: `empty`,
limits: &kvpb.QueryLimits{},
commit: true,
},
{
name: `only block - commit`,
Expand All @@ -65,8 +63,7 @@ func TestUpdateQueryLimits(t *testing.T) {
ForceExceeded: true,
},
},
commit: true,
expectedJSON: `maxRecentlyQueriedSeriesBlocks:<limit:1 lookbackSeconds:15 forceExceeded:true > `,
commit: true,
},
{
name: `only block - no commit`,
Expand All @@ -77,8 +74,7 @@ func TestUpdateQueryLimits(t *testing.T) {
ForceExceeded: true,
},
},
commit: false,
expectedJSON: `maxRecentlyQueriedSeriesBlocks:<limit:1 lookbackSeconds:15 forceExceeded:true > `,
commit: false,
},
{
name: `all - commit`,
Expand All @@ -100,8 +96,6 @@ func TestUpdateQueryLimits(t *testing.T) {
},
},
commit: true,
// nolint: lll
expectedJSON: `maxRecentlyQueriedSeriesBlocks:<limit:1 lookbackSeconds:15 forceExceeded:true > maxRecentlyQueriedSeriesDiskBytesRead:<limit:1 lookbackSeconds:15 forceExceeded:true > maxRecentlyQueriedSeriesDiskRead:<limit:1 lookbackSeconds:15 forceExceeded:true > `,
},
{
name: `all - no commit`,
Expand All @@ -123,8 +117,6 @@ func TestUpdateQueryLimits(t *testing.T) {
},
},
commit: false,
// nolint: lll
expectedJSON: `maxRecentlyQueriedSeriesBlocks:<limit:1 lookbackSeconds:15 forceExceeded:true > maxRecentlyQueriedSeriesDiskBytesRead:<limit:1 lookbackSeconds:15 forceExceeded:true > maxRecentlyQueriedSeriesDiskRead:<limit:1 lookbackSeconds:15 forceExceeded:true > `,
},
}

Expand All @@ -150,24 +142,28 @@ func TestUpdateQueryLimits(t *testing.T) {
r, err := handler.update(zap.NewNop(), storeMock, update)
require.NoError(t, err)
require.Equal(t, kvconfig.QueryLimits, r.Key)
require.Equal(t, "", r.Old)
require.Equal(t, test.expectedJSON, r.New)
require.Equal(t, json.RawMessage("{}"), r.Old)
require.Equal(t, json.RawMessage(limitJSON), r.New)
require.Equal(t, 0, r.Version)

// (B) test old value.
mockVal := kv.NewMockValue(ctrl)
storeMock.EXPECT().Get(kvconfig.QueryLimits).Return(mockVal, nil)
mockVal.EXPECT().Unmarshal(gomock.Any()).DoAndReturn(func(v *kvpb.QueryLimits) error {
v.MaxRecentlyQueriedSeriesBlocks = &kvpb.QueryLimit{
oldLimits := &kvpb.QueryLimits{
MaxRecentlyQueriedSeriesBlocks: &kvpb.QueryLimit{
Limit: 10,
LookbackSeconds: 30,
ForceExceeded: false,
}
v.MaxRecentlyQueriedSeriesDiskBytesRead = &kvpb.QueryLimit{
},
MaxRecentlyQueriedSeriesDiskRead: &kvpb.QueryLimit{
Limit: 100,
LookbackSeconds: 300,
ForceExceeded: false,
}
},
}
mockVal := kv.NewMockValue(ctrl)
storeMock.EXPECT().Get(kvconfig.QueryLimits).Return(mockVal, nil)
mockVal.EXPECT().Unmarshal(gomock.Any()).DoAndReturn(func(v *kvpb.QueryLimits) error {
v.MaxRecentlyQueriedSeriesBlocks = oldLimits.MaxRecentlyQueriedSeriesBlocks
v.MaxRecentlyQueriedSeriesDiskRead = oldLimits.MaxRecentlyQueriedSeriesDiskRead
return nil
})
if test.commit {
Expand All @@ -177,10 +173,16 @@ func TestUpdateQueryLimits(t *testing.T) {
handler = &KeyValueStoreHandler{}
r, err = handler.update(zap.NewNop(), storeMock, update)
require.NoError(t, err)

var oldResult kvpb.QueryLimits
err = jsonpb.UnmarshalString(string(r.Old), &oldResult)
require.NoError(t, err)

require.Equal(t, kvconfig.QueryLimits, r.Key)
// nolint: lll
require.Equal(t, `maxRecentlyQueriedSeriesBlocks:<limit:10 lookbackSeconds:30 > maxRecentlyQueriedSeriesDiskBytesRead:<limit:100 lookbackSeconds:300 > `, r.Old)
require.Equal(t, test.expectedJSON, r.New)
require.Equal(t, *oldLimits.MaxRecentlyQueriedSeriesBlocks, *oldResult.MaxRecentlyQueriedSeriesBlocks)
require.Nil(t, oldResult.MaxRecentlyQueriedSeriesDiskBytesRead)
require.Equal(t, *oldLimits.MaxRecentlyQueriedSeriesDiskRead, *oldResult.MaxRecentlyQueriedSeriesDiskRead)
require.Equal(t, json.RawMessage(limitJSON), r.New)
require.Equal(t, 0, r.Version)
}
}

0 comments on commit 92bd2f4

Please sign in to comment.