Skip to content

Commit

Permalink
Add an in memory implementation of cluster/client.Client (#2219)
Browse files Browse the repository at this point in the history
What this PR does / why we need it:

This adds a small testing utility to back a cluster/client.Client with an in-memory kv.Store. We have the cluster/kv/mem implementation of kv.Store, but there's currently no way to back a cluster/client.Client with it. Construction of the kv.Store is tightly coupled with etcd in the current class; having a separate class seemed like a reasonable way to do this given the current code, but we could consider a more dependency injection style for that--let me know if you want to hash that out.
  • Loading branch information
andrewmains12 authored Mar 20, 2020
1 parent da7ced3 commit 6852564
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 0 deletions.
142 changes: 142 additions & 0 deletions src/cluster/mem/mem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package memcluster

import (
"errors"
"sync"

"github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/kv/mem"
"github.com/m3db/m3/src/cluster/services"
)

const (
_kvPrefix = "_kv"
)

var (
// assert the interface matches.
_ client.Client = (*Client)(nil)
)

// Client provides a cluster/client.Client backed by kv/mem transaction store,
// which stores data in memory instead of in etcd.
type Client struct {
mu sync.Mutex
serviceOpts kv.OverrideOptions
cache map[cacheKey]kv.TxnStore
}

// New instantiates a client which defaults its stores to the given zone/env/namespace.
func New(serviceOpts kv.OverrideOptions) *Client {
return &Client{
serviceOpts: serviceOpts,
cache: make(map[cacheKey]kv.TxnStore),
}
}

// Services constructs a gateway to all cluster services, backed by a mem store.
func (c *Client) Services(opts services.OverrideOptions) (services.Services, error) {
if opts == nil {
opts = services.NewOverrideOptions()
}

errUnsupported := errors.New("currently unsupported for inMemoryClusterClient")

kvGen := func(zone string) (kv.Store, error) {
return c.Store(kv.NewOverrideOptions().SetZone(zone))
}

heartbeatGen := func(sid services.ServiceID) (services.HeartbeatService, error) {
return nil, errUnsupported
}

leaderGen := func(sid services.ServiceID, opts services.ElectionOptions) (services.LeaderService, error) {
return nil, errUnsupported
}

return services.NewServices(
services.NewOptions().
SetKVGen(kvGen).
SetHeartbeatGen(heartbeatGen).
SetLeaderGen(leaderGen).
SetNamespaceOptions(opts.NamespaceOptions()),
)
}

// KV returns/constructs a mem backed kv.Store for the default zone/env/namespace.
func (c *Client) KV() (kv.Store, error) {
return c.TxnStore(kv.NewOverrideOptions())
}

// Txn returns/constructs a mem backed kv.TxnStore for the default zone/env/namespace.
func (c *Client) Txn() (kv.TxnStore, error) {
return c.TxnStore(kv.NewOverrideOptions())
}

// Store returns/constructs a mem backed kv.Store for the given env/zone/namespace.
func (c *Client) Store(opts kv.OverrideOptions) (kv.Store, error) {
return c.TxnStore(opts)
}

// TxnStore returns/constructs a mem backed kv.TxnStore for the given env/zone/namespace.
func (c *Client) TxnStore(opts kv.OverrideOptions) (kv.TxnStore, error) {
c.mu.Lock()
defer c.mu.Unlock()

opts = mergeOpts(c.serviceOpts, opts)
key := cacheKey{
Env: opts.Environment(),
Zone: opts.Zone(),
Namespace: opts.Namespace(),
}
if s, ok := c.cache[key]; ok {
return s, nil
}

store := mem.NewStore()
c.cache[key] = store
return store, nil
}

type cacheKey struct {
Env string
Zone string
Namespace string
}

func mergeOpts(defaults kv.OverrideOptions, opts kv.OverrideOptions) kv.OverrideOptions {
if opts.Zone() == "" {
opts = opts.SetZone(defaults.Zone())
}

if opts.Environment() == "" {
opts = opts.SetEnvironment(defaults.Environment())
}

if opts.Namespace() == "" {
opts = opts.SetNamespace(_kvPrefix)
}

return opts
}
94 changes: 94 additions & 0 deletions src/cluster/mem/mem_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package memcluster

import (
"testing"

"github.com/m3db/m3/src/cluster/kv"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/cluster/services"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestReusesStores(t *testing.T) {
key := "my_key"

c := New(kv.NewOverrideOptions())
store, err := c.TxnStore(kv.NewOverrideOptions())
require.NoError(t, err)
version, err := store.Set(key, &dummyProtoMessage{"my_value"})
require.NoError(t, err)

// retrieve the same store
sameStore, err := c.TxnStore(kv.NewOverrideOptions())
require.NoError(t, err)

v, err := sameStore.Get(key)
require.NoError(t, err)
assert.Equal(t, version, v.Version())

// other store doesn't have the value.
otherZone, err := c.TxnStore(kv.NewOverrideOptions().SetZone("other"))
require.NoError(t, err)
_, err = otherZone.Get(key)
assert.EqualError(t, err, "key not found")
}

func TestServices_Placement(t *testing.T) {
c := New(kv.NewOverrideOptions())
svcs, err := c.Services(services.NewOverrideOptions())
require.NoError(t, err)

placementSvc, err := svcs.PlacementService(services.NewServiceID().SetName("test_svc"), placement.NewOptions())
require.NoError(t, err)

p := placement.NewPlacement().SetInstances([]placement.Instance{
placement.NewInstance().SetHostname("host").SetEndpoint("127.0.0.1"),
})

p, err = placementSvc.Set(p)
require.NoError(t, err)

retrieved, err := placementSvc.Placement()
require.NoError(t, err)

// n.b.: placements are hard to compare directly since they're interfaces and contain more pointers than
// they ought, and it's not worth writing the method here.
assert.Equal(t, p.Version(), retrieved.Version())
}

// dummyProtoMessage implements proto.Message and exists solely as a thing
// to pass to a kv.Store.
type dummyProtoMessage struct {
Val string
}

func (d *dummyProtoMessage) Reset() {
}

func (d *dummyProtoMessage) String() string {
return d.Val
}

func (d *dummyProtoMessage) ProtoMessage() {
}

0 comments on commit 6852564

Please sign in to comment.