Skip to content

Commit

Permalink
Embeds a StoreFinder into Store
Browse files Browse the repository at this point in the history
In response to Spencer's PR comments, rather than increasing
the length of storage/store by defining store finder methods on Store itself.
  • Loading branch information
embark committed Nov 26, 2014
1 parent adb6600 commit e142aa5
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 41 deletions.
2 changes: 1 addition & 1 deletion storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// engine-backed range they describe. Information on suitability and
// availability of servers is gleaned from the gossip network.
type allocator struct {
storeFinder StoreFinder
storeFinder FindStoreFunc
rand rand.Rand
}

Expand Down
27 changes: 15 additions & 12 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,19 @@ func (s StoreDescriptor) Less(b util.Ordered) bool {
// A Store maintains a map of ranges by start key. A Store corresponds
// to one physical device.
type Store struct {
Ident proto.StoreIdent
clock *hlc.Clock
engine engine.Engine // The underlying key-value store
db *client.KV // Cockroach KV DB
allocator *allocator // Makes allocation decisions
gossip *gossip.Gossip // Configs and store capacities
raftIDAlloc *IDAllocator // Raft ID allocator
rangeIDAlloc *IDAllocator // Range ID allocator
configMu sync.Mutex // Limit config update processing
finderContext storeFinderContext // Context to find stores for allocation
raft raft
closer chan struct{}
*StoreFinder

Ident proto.StoreIdent
clock *hlc.Clock
engine engine.Engine // The underlying key-value store
db *client.KV // Cockroach KV DB
allocator *allocator // Makes allocation decisions
gossip *gossip.Gossip // Configs and store capacities
raftIDAlloc *IDAllocator // Raft ID allocator
rangeIDAlloc *IDAllocator // Range ID allocator
configMu sync.Mutex // Limit config update processing
raft raft
closer chan struct{}

mu sync.RWMutex // Protects variables below...
ranges map[int64]*Range // Map of ranges by range ID
Expand All @@ -186,6 +187,8 @@ type Store struct {
// NewStore returns a new instance of a store.
func NewStore(clock *hlc.Clock, eng engine.Engine, db *client.KV, gossip *gossip.Gossip) *Store {
s := &Store{
StoreFinder: &StoreFinder{gossip: gossip},

clock: clock,
engine: eng,
db: db,
Expand Down
37 changes: 20 additions & 17 deletions storage/store_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,30 @@ import (
"github.com/cockroachdb/cockroach/proto"
)

// StoreFinder finds the disks in a datacenter that have the requested
// FindStoreFunc finds the disks in a datacenter that have the requested
// attributes.
type StoreFinder func(proto.Attributes) ([]*StoreDescriptor, error)
type FindStoreFunc func(proto.Attributes) ([]*StoreDescriptor, error)

type stringSet map[string]struct{}

type storeFinderContext struct {
sync.Mutex
capacityKeys stringSet
// StoreFinder provides the data necessary to find stores with particular
// attributes.
type StoreFinder struct {
finderMu sync.Mutex
capacityKeys stringSet // Tracks gosisp keys used for capacity
gossip *gossip.Gossip
}

// capacityGossipUpdate is a gossip callback triggered whenever capacity
// information is gossiped. It just tracks keys used for capacity gossip.
func (s *Store) capacityGossipUpdate(key string, contentsChanged bool) {
s.finderContext.Lock()
defer s.finderContext.Unlock()
func (sf *StoreFinder) capacityGossipUpdate(key string, contentsChanged bool) {
sf.finderMu.Lock()
defer sf.finderMu.Unlock()

if s.finderContext.capacityKeys == nil {
s.finderContext.capacityKeys = stringSet{}
if sf.capacityKeys == nil {
sf.capacityKeys = stringSet{}
}
s.finderContext.capacityKeys[key] = struct{}{}
sf.capacityKeys[key] = struct{}{}
}

// findStores is the Store's implementation of a StoreFinder. It returns a list
Expand All @@ -58,16 +61,16 @@ func (s *Store) capacityGossipUpdate(key string, contentsChanged bool) {
// TODO(embark, spencer): consider using a reverse index map from Attr->stores,
// for efficiency. Ensure that entries in this map still have an opportunity
// to be garbage collected.
func (s *Store) findStores(required proto.Attributes) ([]*StoreDescriptor, error) {
s.finderContext.Lock()
defer s.finderContext.Unlock()
func (sf *StoreFinder) findStores(required proto.Attributes) ([]*StoreDescriptor, error) {
sf.finderMu.Lock()
defer sf.finderMu.Unlock()
var stores []*StoreDescriptor
for key := range s.finderContext.capacityKeys {
storeDesc, err := storeDescFromGossip(key, s.gossip)
for key := range sf.capacityKeys {
storeDesc, err := storeDescFromGossip(key, sf.gossip)
if err != nil {
// We can no longer retrieve this key from the gossip store,
// perhaps it expired.
delete(s.finderContext.capacityKeys, key)
delete(sf.capacityKeys, key)
} else if required.IsSubset(storeDesc.Attrs) {
stores = append(stores, storeDesc)
}
Expand Down
21 changes: 10 additions & 11 deletions storage/store_finder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@ import (
)

func TestCapacityGossipUpdate(t *testing.T) {
s, _ := createTestStore(t)
defer s.Stop()
sf := StoreFinder{}
key := "testkey"

// Order and value of contentsChanged shouldn't matter.
s.capacityGossipUpdate(key, true)
s.capacityGossipUpdate(key, false)
sf.capacityGossipUpdate(key, true)
sf.capacityGossipUpdate(key, false)

expectedKeys := stringSet{key: struct{}{}}
s.finderContext.Lock()
actualKeys := s.finderContext.capacityKeys
s.finderContext.Unlock()
sf.finderMu.Lock()
actualKeys := sf.capacityKeys
sf.finderMu.Unlock()

if !reflect.DeepEqual(expectedKeys, actualKeys) {
t.Errorf("expected to fetch %+v, instead %+v", expectedKeys, actualKeys)
Expand Down Expand Up @@ -67,7 +66,7 @@ func TestStoreFinder(t *testing.T) {

// Explicitly add keys rather than registering a gossip callback to avoid
// waiting for the goroutine callback to finish.
s.finderContext.capacityKeys = stringSet{
s.capacityKeys = stringSet{
"k1": struct{}{},
"k2": struct{}{},
"k3": struct{}{},
Expand Down Expand Up @@ -100,7 +99,7 @@ func TestStoreFinderGarbageCollection(t *testing.T) {
s, _ := createTestStore(t)
defer s.Stop()

s.finderContext.capacityKeys = stringSet{
s.capacityKeys = stringSet{
"key0": struct{}{},
"key1": struct{}{},
}
Expand All @@ -114,8 +113,8 @@ func TestStoreFinderGarbageCollection(t *testing.T) {
t.Errorf("expected no stores found, instead %+v", stores)
}

if len(s.finderContext.capacityKeys) != 0 {
if len(s.capacityKeys) != 0 {
t.Errorf("expected keys to be cleared, instead are %+v",
s.finderContext.capacityKeys)
s.capacityKeys)
}
}

0 comments on commit e142aa5

Please sign in to comment.