Skip to content

Commit

Permalink
Merge pull request #187 from embark/embark/rebalancing
Browse files Browse the repository at this point in the history
Simple StoreFinder and new capacity gossiping
  • Loading branch information
embark committed Nov 26, 2014
2 parents e2c0bff + e142aa5 commit b856d2c
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 18 deletions.
2 changes: 1 addition & 1 deletion gossip/infostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (is *infoStore) addInfo(i *info) error {
if existingInfo, ok := is.Infos[i.Key]; ok {
if i.Timestamp < existingInfo.Timestamp ||
(i.Timestamp == existingInfo.Timestamp && i.Hops >= existingInfo.Hops) {
return util.Errorf("info %+v older than current group info %+v", i, existingInfo)
return util.Errorf("info %+v older than current info %+v", i, existingInfo)
}
contentsChanged = !reflect.DeepEqual(existingInfo.Val, i.Val)
} else {
Expand Down
5 changes: 2 additions & 3 deletions gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ const (
KeyConfigZone = "zones"

// KeyMaxAvailCapacityPrefix is the key prefix for gossiping available
// store capacity. The suffix is composed of:
// <datacenter>-<hex node ID>-<hex store ID>. The value is a
// storage.StoreDescriptor struct.
// store capacity. The suffix is composed of: <node ID>-<store ID>.
// The value is a storage.StoreDescriptor struct.
KeyMaxAvailCapacityPrefix = "max-avail-capacity-"

// KeyNodeCount is the count of gossip nodes in the network. The
Expand Down
4 changes: 2 additions & 2 deletions proto/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"strings"
)

// IsSubset returns whether attributes list b is a subset of
// attributes list a.
// IsSubset returns whether attributes list a is a subset of
// attributes list b.
func (a Attributes) IsSubset(b Attributes) bool {
m := map[string]struct{}{}
for _, s := range b.Attrs {
Expand Down
7 changes: 3 additions & 4 deletions server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,10 @@ func (n *Node) gossipCapacities() {
log.Warningf("problem getting store descriptor for store %+v: %v", s.Ident, err)
return nil
}
gossipPrefix := gossip.KeyMaxAvailCapacityPrefix + storeDesc.CombinedAttrs().SortedString()
keyMaxCapacity := gossipPrefix + strconv.FormatInt(int64(storeDesc.Node.NodeID), 10) + "-" +
// Unique gossip key per store.
keyMaxCapacity := gossip.KeyMaxAvailCapacityPrefix +
strconv.FormatInt(int64(storeDesc.Node.NodeID), 10) + "-" +
strconv.FormatInt(int64(storeDesc.StoreID), 10)
// Register gossip group.
n.gossip.RegisterGroup(gossipPrefix, gossipGroupLimit, gossip.MaxGroup)
// Gossip store descriptor.
n.gossip.AddInfo(keyMaxCapacity, *storeDesc, ttlCapacityGossip)
return nil
Expand Down
5 changes: 1 addition & 4 deletions storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ import (
"github.com/cockroachdb/cockroach/util"
)

// StoreFinder finds the disks in a datacenter with the most available capacity.
type StoreFinder func(proto.Attributes) ([]*StoreDescriptor, error)

// allocator makes allocation decisions based on a zone configuration,
// existing range metadata and available stores. Configuration
// settings and range metadata information is stored directly in the
// engine-backed range they describe. Information on suitability and
// availability of servers is gleaned from the gossip network.
type allocator struct {
storeFinder StoreFinder
storeFinder FindStoreFunc
rand rand.Rand
}

Expand Down
5 changes: 2 additions & 3 deletions storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ var multiDCConfig = proto.ZoneConfig{
func filterStores(a proto.Attributes, stores []*StoreDescriptor) ([]*StoreDescriptor, error) {
var filtered []*StoreDescriptor
for _, s := range stores {
b := s.Attrs.Attrs
b = append(b, s.Node.Attrs.Attrs...)
if a.IsSubset(proto.Attributes{Attrs: b}) {
sAttrs := s.CombinedAttrs()
if a.IsSubset(*sAttrs) {
filtered = append(filtered, s)
}
}
Expand Down
11 changes: 10 additions & 1 deletion storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ func (s StoreDescriptor) Less(b util.Ordered) bool {
// A Store maintains a map of ranges by start key. A Store corresponds
// to one physical device.
type Store struct {
*StoreFinder

Ident proto.StoreIdent
clock *hlc.Clock
engine engine.Engine // The underlying key-value store
Expand All @@ -184,7 +186,9 @@ type Store struct {

// NewStore returns a new instance of a store.
func NewStore(clock *hlc.Clock, eng engine.Engine, db *client.KV, gossip *gossip.Gossip) *Store {
return &Store{
s := &Store{
StoreFinder: &StoreFinder{gossip: gossip},

clock: clock,
engine: eng,
db: db,
Expand All @@ -195,6 +199,8 @@ func NewStore(clock *hlc.Clock, eng engine.Engine, db *client.KV, gossip *gossip
ranges: map[int64]*Range{},
rangesByRaftID: map[int64]*Range{},
}
s.allocator.storeFinder = s.findStores
return s
}

// Stop calls Range.Stop() on all active ranges.
Expand Down Expand Up @@ -284,6 +290,9 @@ func (s *Store) Start() error {
if s.gossip != nil {
s.gossip.RegisterCallback(gossip.KeyConfigAccounting, s.configGossipUpdate)
s.gossip.RegisterCallback(gossip.KeyConfigZone, s.configGossipUpdate)
// Callback triggers on capacity gossip from all stores.
capacityRegex := fmt.Sprintf("%s.*", gossip.KeyMaxAvailCapacityPrefix)
s.gossip.RegisterCallback(capacityRegex, s.capacityGossipUpdate)
}

return nil
Expand Down
95 changes: 95 additions & 0 deletions storage/store_finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Kathy Spradlin ([email protected])

package storage

import (
"fmt"
"sync"

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/proto"
)

// FindStoreFunc finds the disks in a datacenter that have the requested
// attributes.
type FindStoreFunc func(proto.Attributes) ([]*StoreDescriptor, error)

type stringSet map[string]struct{}

// StoreFinder provides the data necessary to find stores with particular
// attributes.
type StoreFinder struct {
finderMu sync.Mutex
capacityKeys stringSet // Tracks gosisp keys used for capacity
gossip *gossip.Gossip
}

// capacityGossipUpdate is a gossip callback triggered whenever capacity
// information is gossiped. It just tracks keys used for capacity gossip.
func (sf *StoreFinder) capacityGossipUpdate(key string, contentsChanged bool) {
sf.finderMu.Lock()
defer sf.finderMu.Unlock()

if sf.capacityKeys == nil {
sf.capacityKeys = stringSet{}
}
sf.capacityKeys[key] = struct{}{}
}

// findStores is the Store's implementation of a StoreFinder. It returns a list
// of stores with attributes that are a superset of the required attributes. It
// never returns an error.
//
// If it cannot retrieve a StoreDescriptor from the Store's gossip, it garbage
// collects the failed key.
//
// TODO(embark, spencer): consider using a reverse index map from Attr->stores,
// for efficiency. Ensure that entries in this map still have an opportunity
// to be garbage collected.
func (sf *StoreFinder) findStores(required proto.Attributes) ([]*StoreDescriptor, error) {
sf.finderMu.Lock()
defer sf.finderMu.Unlock()
var stores []*StoreDescriptor
for key := range sf.capacityKeys {
storeDesc, err := storeDescFromGossip(key, sf.gossip)
if err != nil {
// We can no longer retrieve this key from the gossip store,
// perhaps it expired.
delete(sf.capacityKeys, key)
} else if required.IsSubset(storeDesc.Attrs) {
stores = append(stores, storeDesc)
}
}
return stores, nil
}

// storeDescFromGossip retrieves a StoreDescriptor from the specified capacity
// gossip key. Returns an error if the gossip doesn't exist or is not
// a StoreDescriptor.
func storeDescFromGossip(key string, g *gossip.Gossip) (*StoreDescriptor, error) {
info, err := g.GetInfo(key)

if err != nil {
return nil, err
}
storeDesc, ok := info.(StoreDescriptor)
if !ok {
return nil, fmt.Errorf("gossiped info is not a StoreDescriptor: %+v", info)
}
return &storeDesc, nil
}
120 changes: 120 additions & 0 deletions storage/store_finder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Kathy Spradlin ([email protected])

package storage

import (
"reflect"
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/proto"
)

func TestCapacityGossipUpdate(t *testing.T) {
sf := StoreFinder{}
key := "testkey"

// Order and value of contentsChanged shouldn't matter.
sf.capacityGossipUpdate(key, true)
sf.capacityGossipUpdate(key, false)

expectedKeys := stringSet{key: struct{}{}}
sf.finderMu.Lock()
actualKeys := sf.capacityKeys
sf.finderMu.Unlock()

if !reflect.DeepEqual(expectedKeys, actualKeys) {
t.Errorf("expected to fetch %+v, instead %+v", expectedKeys, actualKeys)
}
}

func TestStoreFinder(t *testing.T) {
s, _ := createTestStore(t)
defer s.Stop()
required := []string{"ssd", "dc"}
// Nothing yet.
if stores, _ := s.findStores(proto.Attributes{Attrs: required}); stores != nil {
t.Errorf("expected no stores, instead %+v", stores)
}

matchingStore := StoreDescriptor{
Attrs: proto.Attributes{Attrs: required},
}
supersetStore := StoreDescriptor{
Attrs: proto.Attributes{Attrs: append(required, "db")},
}
unmatchingStore := StoreDescriptor{
Attrs: proto.Attributes{Attrs: []string{"ssd", "otherdc"}},
}
emptyStore := StoreDescriptor{Attrs: proto.Attributes{}}

// Explicitly add keys rather than registering a gossip callback to avoid
// waiting for the goroutine callback to finish.
s.capacityKeys = stringSet{
"k1": struct{}{},
"k2": struct{}{},
"k3": struct{}{},
"k4": struct{}{},
}
s.gossip.AddInfo("k1", matchingStore, time.Hour)
s.gossip.AddInfo("k2", supersetStore, time.Hour)
s.gossip.AddInfo("k3", unmatchingStore, time.Hour)
s.gossip.AddInfo("k4", emptyStore, time.Hour)

expected := []string{matchingStore.Attrs.SortedString(), supersetStore.Attrs.SortedString()}
stores, err := s.findStores(proto.Attributes{Attrs: required})
if err != nil {
t.Errorf("expected no err, got %s", err)
}
var actual []string
for _, store := range stores {
actual = append(actual, store.Attrs.SortedString())
}
sort.Strings(expected)
sort.Strings(actual)
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %+v Attrs, instead %+v", expected, actual)
}
}

// TestStoreFinderGarbageCollection ensures removal of capacity gossip keys in
// the map, if their gossip does not exist when we try to retrieve them.
func TestStoreFinderGarbageCollection(t *testing.T) {
s, _ := createTestStore(t)
defer s.Stop()

s.capacityKeys = stringSet{
"key0": struct{}{},
"key1": struct{}{},
}
required := []string{}

// No gossip added for either key, so they should be removed.
stores, err := s.findStores(proto.Attributes{Attrs: required})
if err != nil {
t.Errorf("unexpected error retrieving stores %s", err)
} else if len(stores) != 0 {
t.Errorf("expected no stores found, instead %+v", stores)
}

if len(s.capacityKeys) != 0 {
t.Errorf("expected keys to be cleared, instead are %+v",
s.capacityKeys)
}
}

0 comments on commit b856d2c

Please sign in to comment.