Skip to content

Commit

Permalink
***WIP*** First pass at range allocation. Lays down a basic algorithm…
Browse files Browse the repository at this point in the history
… for selecting a set of peers to send a new range to. MISSING 1) A lot of the config data structures are straw men right now 2) The actual data in the gossip 3) Most Tests

Test Plan: go test ./...

Reviewers: spencerkimball

Reviewed By: spencerkimball

Differential Revision: http://phabricator.andybons.com/D25
  • Loading branch information
Levon Lloyd committed Apr 17, 2014
1 parent f0af605 commit 8afb7eb
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 18 deletions.
85 changes: 73 additions & 12 deletions storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,88 @@
package storage

import (
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/util"
"math/rand"
)

// An allocator makes allocation decisions based on a zone
// ZoneConfigFinder looks up a ZoneConfig for a range that starts with
// a given key.
type ZoneConfigFinder func(string) (ZoneConfig, error)

// AvailableDiskFinder finds the disks in a datacenter with the most available capacity.
type AvailableDiskFinder func(string) ([]AvailableDiskConfig, error)

// allocator makes allocation decisions based on a zone
// configuration, existing range metadata and available servers &
// disks. Configuration settings and range metadata information is
// stored directly in the engine-backed range they describe.
// Information on suitability and availability of servers is
// gleaned from the gossip network.
type allocator struct {
gossip *gossip.Gossip
diskFinder AvailableDiskFinder
zcFinder ZoneConfigFinder
rand rand.Rand
}

var maxCapacityPrefix = "max-free-capacity-"

// allocate returns a suitable Replica for the range and zone. If none
// are available / suitable, returns an error.
//
// TODO(spencer): currently this just returns a random device from
// amongst the available servers on the gossip network; need to add
// zone config and replica metadata.
func (a *allocator) allocate() (*Replica, error) {
// TODO(spencer): choose at random from gossip network's available Disks.
return nil, util.Errorf("unimplemented")
// are available / suitable, returns an error. It looks up the zone config for
// the block to determine where it needs to send data, then uses the gossip
// network to pick a random set of nodes in each data center based on the
// available capacity of the node.
// TODO(levon) Handle Racks/Power Units.
// TODO(levon) throw error if not enough suitable replicas can be found

func (a *allocator) allocate(start string, existingReplicas []Replica) ([]Replica, error) {
usedHosts := make(map[string]bool)
for _, replica := range existingReplicas {
usedHosts[replica.Addr] = true
}
// Find the Zone Config that applies to this range.
zoneConfig, err := a.zcFinder(start)
if err != nil {
return nil, err
}
result := make([]Replica, 0, 1)
for dc, diskTypes := range zoneConfig.Replicas {
// For each replica to be placed in this data center.
for _, diskType := range diskTypes {
// Randomly pick a node weighted by capacity.
candidates := make([]AvailableDiskConfig, len(zoneConfig.Replicas))
var candidateCapacityTotal float64
disks, err := a.diskFinder(dc)
if err != nil {
return nil, err
}
for _, c := range disks {
if c.DiskType == diskType && !usedHosts[c.Node.Address] {
candidates = append(candidates, c)
candidateCapacityTotal += c.DiskCapacity.PercentAvail()
}
}

var capacitySeen float64
targetCapacity := rand.Float64()

// Walk through candidates in random order, stopping when
// we've passed the capacity target.
for _, c := range candidates {
capacitySeen += (c.DiskCapacity.PercentAvail() / candidateCapacityTotal)
if capacitySeen >= targetCapacity {
replica := Replica{
Addr: c.Node.Address,
Disk: c.Disk,
}
result = append(result, replica)
usedHosts[c.Node.Address] = true
break
}
}
}
}
return result, nil
}

/*func findZoneConfig(key string) (ZoneConfig, error) {
}*/
63 changes: 63 additions & 0 deletions storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package storage

import (
"github.com/cockroachdb/cockroach/gossip"
yaml "gopkg.in/yaml.v1"
)

// Replica describes a replica location by address (host:port), disk
// (device name) and range start key.
type Replica struct {
Expand All @@ -25,6 +30,64 @@ type Replica struct {
RangeKey []byte // Range start key.
}

// DiskType is the type of a disk that a Store is storing data on.
type DiskType uint32

const (
// SSD = Solid State Disk
SSD DiskType = iota
// HDD = Spinning disk
HDD
)

// DiskCapacity contains capacity information for a storage device.
type DiskCapacity struct {
Capacity uint64
Available uint64
}

// NodeConfig holds configuration about the node.
type NodeConfig struct {
Address string
DataCenter string
PDU string
Rack string
}

// AvailableDiskConfig holds information about a disk that is available in a RoachNode.
type AvailableDiskConfig struct {
Node NodeConfig
Disk string
DiskCapacity DiskCapacity
DiskType DiskType
}

// ZoneConfig holds configuration that is needed for a range of KV pairs.
type ZoneConfig struct {
// Replicas is a map from datacenter name to a slice of disk types.
Replicas map[string]([]DiskType) `yaml:"replicas,omitempty"`
RangeMinBytes uint64 `yaml:"range_min_bytes,omitempty"`
RangeMaxBytes uint64 `yaml:"range_max_bytes,omitempty"`
}

// ParseZoneConfig parses a YAML serialized ZoneConfig.
func ParseZoneConfig(in []byte) (*ZoneConfig, error) {
z := &ZoneConfig{}
err := yaml.Unmarshal(in, z)
return z, err
}

// ToYAML serializes a ZoneConfig as YAML.
func (z *ZoneConfig) ToYAML() ([]byte, error) {
return yaml.Marshal(z)
}

// Less compares two AvailableDiskConfigs based on percentage of disk available.
func (a AvailableDiskConfig) Less(b gossip.Ordered) bool {
return a.DiskCapacity.PercentAvail() < b.(AvailableDiskConfig).DiskCapacity.PercentAvail()
}

// PercentAvail computes the percentage of disk space that is available.
func (d DiskCapacity) PercentAvail() float64 {
return float64(d.Available) / float64(d.Capacity)
}
44 changes: 44 additions & 0 deletions storage/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
// Author: Levon Lloyd ([email protected])

package storage

import (
"log"
"reflect"
"testing"
)

var testConfig = ZoneConfig{
Replicas: map[string][]DiskType{
"a": []DiskType{1, 2},
"b": []DiskType{1, 2},
},
}

func TestZoneConfigRoundTrip(t *testing.T) {
yaml, err := testConfig.ToYAML()
if err != nil {
log.Fatalf("failed converting to yaml: %v", err)
}
parsedZoneConfig, err := ParseZoneConfig(yaml)
if err != nil {
log.Fatalf("failed parsing config: %v", err)
}
if !reflect.DeepEqual(testConfig, *parsedZoneConfig) {
log.Fatalf("yaml round trip configs differ.\nOriginal: %+v\nParse: %+v\n", testConfig, parsedZoneConfig)
}
}
54 changes: 53 additions & 1 deletion storage/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package storage
import (
"flag"
"strings"
"time"

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/rpc"
Expand All @@ -44,23 +45,33 @@ var (
// full capacity.
cacheSize = flag.Int64("cache_size", defaultCacheSize, "total size in bytes for "+
"data stored in all caches, shared evenly if there are multiple storage devices")

// storageGossipInterval is the period at which storage is checked and re-added to gossip
storageGossipInterval = flag.Duration(
"storage_gossip_interval", time.Minute,
"approximate interval (time.Duration) for checking local disk capacity and adding to gossip")
)

// Node holds the set of stores which this roach node serves traffic for.
type Node struct {
gossip *gossip.Gossip
storeMap map[string]*store
config NodeConfig
closer chan struct{}
}

// NewNode returns a new instance of Node, interpreting command line
// flags to intialize the appropriate store or set of
// stores. Registers the storage instance for the RPC service "Node".
func NewNode(rpcServer *rpc.Server, gossip *gossip.Gossip) *Node {
// TODO(levon): Read node configuration and set it here.
n := &Node{
gossip: gossip,
storeMap: make(map[string]*store),
closer: make(chan struct{}, 1),
}
allocator := &allocator{gossip: gossip}
// TODO(levon): real args here
allocator := &allocator{}
rpcServer.RegisterName("Node", n)

if *dataDirs == "" {
Expand All @@ -80,6 +91,37 @@ func NewNode(rpcServer *rpc.Server, gossip *gossip.Gossip) *Node {
return n
}

// startGossip starts a goroutine that periodically checks local disk
// capacity and gossips it.
func (n *Node) startGossip() {
ticker := time.NewTicker(*storageGossipInterval)
go func() {
for {
select {
case <-ticker.C:
n.gossipCapacities()
case <-n.closer:
ticker.Stop()
return
}
}
}()
}

// gossipCapacities calls capacity on each store and adds it to the gossip
func (n *Node) gossipCapacities() {
gossipTopic := maxCapacityPrefix + n.config.DataCenter
for _, store := range n.storeMap {
capacity, err := store.engine.capacity()
if err != nil {
glog.Warningf("Problem getting capacity: %v", err)
continue
}

n.gossip.AddInfo(gossipTopic, &AvailableDiskConfig{DiskCapacity: *capacity, Node: n.config}, time.Duration(2)*(*storageGossipInterval))
}
}

// getRange looks up the store by Replica.Disk and then queries it for
// the range specified by Replica.Range.
func (n *Node) getRange(r *Replica) (*Range, error) {
Expand All @@ -94,6 +136,16 @@ func (n *Node) getRange(r *Replica) (*Range, error) {
return rng, nil
}

// Start starts the node
func (n *Node) Start() {
n.startGossip()
}

// Stop cleanly stops the node
func (n *Node) Stop() {
close(n.closer)
}

// All methods to satisfy the Node RPC service fetch the range
// based on the Replica target provided in the argument header.
// Commands are broken down into read-only and read-write and
Expand Down
12 changes: 7 additions & 5 deletions storage/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ func (r *RocksDB) capacity() (*DiskCapacity, error) {
if r.name == "" {
// TODO(spencer): set name.
}
// TODO(spencer): set values in DiskCapacity struct.
// capacity = fs.Bsize * fs.Blocks
// available = fs.Bsize * fs.Bavail
// percentAvail = float64(available) / float64(capacity)
capacity := &DiskCapacity{}

cap := uint64(fs.Bsize) * fs.Blocks
avail := uint64(fs.Bsize) * fs.Bavail
capacity := &DiskCapacity{
Capacity: cap,
Available: avail,
}
return capacity, nil
}

0 comments on commit 8afb7eb

Please sign in to comment.