From a97fee5d0146ee6f1facf628ee3d6d03d8f7789d Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Thu, 17 Apr 2014 14:34:49 -0400 Subject: [PATCH] Integrated a CLI interface to the main cockroach binary which currently has two commands: init & start. Summary: A new cluster is init'd by specifying a single data directory as first argument and a default zone configuration file as second arg. A new RocksDB instance is created in the data directory (an error results if one is already there), and a new cluster UUID is generated to uniquely identify the cockroach cluster. The first range is created as a single replica with ident: NodeID: 1, StoreID: 1, RangeID: 1. meta1/meta2 records are created for the first range, and node id sequence generator, store id sequence generator, and store-local range id sequence generator are all created. The roach node is run using "cockroach start". Typically this will be accompanied by --bootstrap_hosts in order to connect to gossip network, and --data_dirs=dir1,dir2,... The --data_dirs can either have a list of paths or a list of "mem=100000,mem=200000,...", which creates in-mem stores. Any stores which are as yet uninitialized are bootstrapped using the cluster ID and new store IDs from the store id sequence generator. Stores other than the one used to init a new cluster, or all stores on a new node being added to an existing cluster will need initialization. Changed Replica specification to be a series of three integers: NodeID, StoreID & RangeID. Node & store IDs are 32-bit; range ID is 64-bit. Added convenience methods to kv.DB as well as storage.Engine for reading and writing arbitrary go values via interface{}. Test Plan: go test Reviewers: petermattis, andybons, bdarnell Reviewed By: bdarnell Subscribers: hrsht, levonlloyd Differential Revision: http://phabricator.andybons.com/D29 --- gossip/client.go | 14 +- gossip/gossip.go | 22 +- gossip/infostore.go | 14 +- gossip/infostore_test.go | 23 +- gossip/keys.go | 53 +++++ gossip/simulation.go | 4 +- kv/db.go | 40 ++++ kv/kv_test.go | 22 +- kv/local_db.go | 40 ++-- main.go | 14 +- server/init.go | 63 ++++++ server/node.go | 457 +++++++++++++++++++++++++++++++++++++++ server/server.go | 126 ++++++++++- server/server_test.go | 16 +- simulation/gossip.go | 4 +- storage/allocator.go | 87 ++++---- storage/config.go | 59 ++--- storage/config_test.go | 1 + storage/engine.go | 84 ++++++- storage/in_mem.go | 43 ++-- storage/keys.go | 41 ++++ storage/messages.go | 6 +- storage/node.go | 261 ---------------------- storage/range.go | 174 ++++++++------- storage/rocksdb.go | 38 ++-- storage/store.go | 135 ++++++++++-- 26 files changed, 1268 insertions(+), 573 deletions(-) create mode 100644 gossip/keys.go create mode 100644 server/init.go create mode 100644 server/node.go create mode 100644 storage/keys.go delete mode 100644 storage/node.go diff --git a/gossip/client.go b/gossip/client.go index 60b30c23107d..cdadd38aad7f 100644 --- a/gossip/client.go +++ b/gossip/client.go @@ -46,19 +46,19 @@ func init() { // client is a client-side RPC connection to a gossip peer node. type client struct { - addr net.Addr // Peer node network address - rpcClient *rpc.Client // RPC client - forwardAddr net.Addr // Set if disconnected with an alternate addr - lastFresh int64 // Last wall time client received fresh info - err error // Set if client experienced an error - closer chan interface{} // Client shutdown channel + addr net.Addr // Peer node network address + rpcClient *rpc.Client // RPC client + forwardAddr net.Addr // Set if disconnected with an alternate addr + lastFresh int64 // Last wall time client received fresh info + err error // Set if client experienced an error + closer chan struct{} // Client shutdown channel } // newClient creates and returns a client struct. func newClient(addr net.Addr) *client { return &client{ addr: addr, - closer: make(chan interface{}, 1), + closer: make(chan struct{}, 1), } } diff --git a/gossip/gossip.go b/gossip/gossip.go index 9dd898cc168a..74a3c93f73fa 100644 --- a/gossip/gossip.go +++ b/gossip/gossip.go @@ -78,12 +78,6 @@ var ( ) const ( - // SentinelGossip is gossiped info which must not expire or node - // considers itself partitioned and will retry with bootstrap - // hosts. - SentinelGossip = "meta0" - // NodeCountGossip is the count of gossip nodes in the network. - NodeCountGossip = "nodeCount" // MaxPeers is the maximum number of connected gossip peers. MaxPeers = 10 // defaultNodeCount is the default number of nodes in the gossip @@ -106,6 +100,8 @@ const ( // entre to the gossip network. type Gossip struct { Name string // Optional node name + Connected chan struct{} // Closed upon initial connection + hasConnected bool // Set first time network is connected *server // Embedded gossip RPC server bootstraps *addrSet // Bootstrap host addresses outgoing *addrSet // Set of outgoing client addresses @@ -119,6 +115,7 @@ type Gossip struct { // Cockroach RPC server to initialize the gossip service endpoint. func New(rpcServer *rpc.Server) *Gossip { g := &Gossip{ + Connected: make(chan struct{}, 1), server: newServer(rpcServer, *gossipInterval), bootstraps: newAddrSet(MaxPeers), outgoing: newAddrSet(MaxPeers), @@ -186,7 +183,7 @@ func (g *Gossip) GetGroupInfos(prefix string) ([]interface{}, error) { // RegisterGroup registers a new group with info store. Returns an // error if the group was already registered. func (g *Gossip) RegisterGroup(prefix string, limit int, typeOf GroupType) error { - g.mu.Lock() + g.mu.Lock() // Copyright 2014 The Cockroach Authors. defer g.mu.Unlock() return g.is.registerGroup(newGroup(prefix, limit, typeOf)) } @@ -249,7 +246,7 @@ func (g *Gossip) Stop() <-chan error { func (g *Gossip) maxToleratedHops() uint32 { // Get info directly as we have mutex held here. var nodeCount = int64(defaultNodeCount) - if info := g.is.getInfo(NodeCountGossip); info != nil { + if info := g.is.getInfo(KeyNodeCount); info != nil { nodeCount = info.Val.(int64) } return uint32(math.Ceil(math.Log(float64(nodeCount))/math.Log(float64(MaxPeers))))*2 + 1 @@ -319,7 +316,7 @@ func (g *Gossip) bootstrap() { if avail.len() > 0 { // Check whether or not we need bootstrap. haveClients := g.outgoing.len() > 0 - haveSentinel := g.is.getInfo(SentinelGossip) != nil + haveSentinel := g.is.getInfo(KeySentinel) != nil if !haveClients || !haveSentinel { // Select a bootstrap address at random and start client. addr := avail.selectRandom() @@ -389,9 +386,12 @@ func (g *Gossip) manage() { if g.outgoing.len() == 0 && g.filterExtant(g.bootstraps).len() > 0 { glog.Infof("no outgoing hosts; signaling bootstrap") g.stalled.Signal() - } else if g.is.getInfo(SentinelGossip) == nil { - glog.Infof("missing sentinel gossip %s; assuming partition and reconnecting", SentinelGossip) + } else if g.is.getInfo(KeySentinel) == nil { + glog.Warningf("missing sentinel gossip %s; assuming partition and reconnecting", KeySentinel) g.stalled.Signal() + } else if !g.hasConnected { + g.hasConnected = true + close(g.Connected) } // The exit condition. diff --git a/gossip/infostore.go b/gossip/infostore.go index d5ffc40d99b1..581644483d28 100644 --- a/gossip/infostore.go +++ b/gossip/infostore.go @@ -92,11 +92,15 @@ func newInfoStore(nodeAddr net.Addr) *infoStore { func (is *infoStore) newInfo(key string, val interface{}, ttl time.Duration) *info { is.seqGen++ now := monotonicUnixNano() + ttlStamp := now + int64(ttl) + if ttl == 0*time.Second { + ttlStamp = math.MaxInt64 + } return &info{ Key: key, Val: val, Timestamp: now, - TTLStamp: now + int64(ttl), + TTLStamp: ttlStamp, NodeAddr: is.NodeAddr, peerAddr: is.NodeAddr, seq: is.seqGen, @@ -135,8 +139,12 @@ func (is *infoStore) getGroupInfos(prefix string) infoArray { // // REQUIRES: group.prefix is not already in the info store's groups map. func (is *infoStore) registerGroup(g *group) error { - if _, ok := is.Groups[g.Prefix]; ok { - return util.Errorf("group %q already in group map", g.Prefix) + if g2, ok := is.Groups[g.Prefix]; ok { + if g.Prefix != g2.Prefix || g.Limit != g2.Limit || g.TypeOf != g2.TypeOf { + return util.Errorf("group %q already in group map with different settings %v vs. %v", + g.Prefix, g, g2) + } + return nil } is.Groups[g.Prefix] = g return nil diff --git a/gossip/infostore_test.go b/gossip/infostore_test.go index d9c2759dd15c..5b931799d428 100644 --- a/gossip/infostore_test.go +++ b/gossip/infostore_test.go @@ -19,6 +19,7 @@ package gossip import ( "fmt" + "math" "testing" "time" ) @@ -52,9 +53,25 @@ func TestRegisterGroup(t *testing.T) { t.Error("shouldn't belong to a group") } - // Try to register a group that's already been registered. - if is.registerGroup(groupA) == nil { - t.Error("should not be able to register group A twice") + // Try to register a group that's already been registered; will + // succeed if identical. + if is.registerGroup(groupA) != nil { + t.Error("should be able to register group A twice") + } + // Now change the group type and try again. + groupAAlt := newGroup("a", 1, MaxGroup) + if is.registerGroup(groupAAlt) == nil { + t.Error("should not be able to register group A again with different properties") + } +} + +// TestZeroDuration verifies that specifying a zero duration sets +// TTLStamp to max int64. +func TestZeroDuration(t *testing.T) { + is := newInfoStore(emptyAddr) + info := is.newInfo("a", float64(1), 0*time.Second) + if info.TTLStamp != math.MaxInt64 { + t.Errorf("expected zero duration to get max TTLStamp: %d", info.TTLStamp) } } diff --git a/gossip/keys.go b/gossip/keys.go new file mode 100644 index 000000000000..83d5c7dd31a2 --- /dev/null +++ b/gossip/keys.go @@ -0,0 +1,53 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Spencer Kimball (spencer.kimball@gmail.com) + +package gossip + +// Constants for gossip keys. +const ( + // KeyClusterID is the unique UUID for this Cockroach cluster. + // The value is a string UUID for the cluster. + KeyClusterID = "cluster-id" + + // KeyMaxCapacityPrefix is the key prefix for gossiping available + // store capacity. The suffix is composed of: + // --. The value is a + // storage.StoreAttributes struct. + KeyMaxAvailCapacityPrefix = "max-avail-capacity-" + + // KeyNodeCount is the count of gossip nodes in the network. The + // value is an int64 containing the count of nodes in the cluster. + // TODO(spencer): should remove this and instead just count the + // number of node ids being gossiped. + KeyNodeCount = "node-count" + + // KeyNodeIDPrefix is the key prefix for gossiping node id + // addresses. The actual key is suffixed with the hexadecimal + // representation of the node id and the value is the host:port + // string address of the node. E.g. node-1bfa: fwd56.sjcb1:24001 + KeyNodeIDPrefix = "node-" + + // SentinelKey is a key for gossip which must not expire or else the + // node considers itself partitioned and will retry with bootstrap hosts. + KeySentinel = KeyClusterID + + // KeyFirstRangeMetadata is the metadata for the "first" range. The + // "first" range contains the meta1 key range, the first level of + // the bi-level key addressing scheme. The value is a slice of + // storage.Replica structs. + KeyFirstRangeMetadata = "first-range" +) diff --git a/gossip/simulation.go b/gossip/simulation.go index 3758bd85f100..883502752b44 100644 --- a/gossip/simulation.go +++ b/gossip/simulation.go @@ -125,7 +125,7 @@ func SimulateNetwork(nodeCount int, network string, gossipInterval time.Duration node.SetInterval(gossipInterval) // Node 0 gossips node count. if i == 0 { - node.AddInfo(NodeCountGossip, int64(nodeCount), time.Hour) + node.AddInfo(KeyNodeCount, int64(nodeCount), time.Hour) } node.Start() nodes[addrs[i].String()] = node @@ -137,7 +137,7 @@ func SimulateNetwork(nodeCount int, network string, gossipInterval time.Duration select { case <-gossipTimeout: // Node 0 gossips sentinel every cycle. - nodes[addrs[0].String()].AddInfo(SentinelGossip, int64(cycle), time.Hour) + nodes[addrs[0].String()].AddInfo(KeySentinel, int64(cycle), time.Hour) if !simCallback(cycle, nodes) { complete = true } diff --git a/kv/db.go b/kv/db.go index 9fc855b9a7db..872cb4b11af9 100644 --- a/kv/db.go +++ b/kv/db.go @@ -18,7 +18,10 @@ package kv import ( + "bytes" + "encoding/gob" "reflect" + "time" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/rpc" @@ -42,6 +45,43 @@ type DB interface { EnqueueMessage(args *storage.EnqueueMessageRequest) <-chan *storage.EnqueueMessageResponse } +// GetI fetches the value at the specified key and deserializes it +// into "value". Returns true on success or false if the key was not +// found. The timestamp of the write is returned as the second return +// value. The first result parameter is "ok", true if a value was +// found for the requested key; false otherwise. An error is returned +// on error fetching from underlying storage or deserializing value. +func GetI(db DB, key storage.Key, value interface{}) (bool, int64, error) { + gr := <-db.Get(&storage.GetRequest{Key: key}) + if gr.Error != nil { + return false, 0, gr.Error + } + if len(gr.Value.Bytes) == 0 { + return false, 0, nil + } + if err := gob.NewDecoder(bytes.NewBuffer(gr.Value.Bytes)).Decode(value); err != nil { + return true, gr.Value.Timestamp, err + } + return true, gr.Value.Timestamp, nil +} + +// PutI sets the given key to the serialized byte string of the value +// provided. Uses current time and default expiration. +func PutI(db DB, key storage.Key, value interface{}) error { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(value); err != nil { + return err + } + pr := <-db.Put(&storage.PutRequest{ + Key: key, + Value: storage.Value{ + Bytes: buf.Bytes(), + Timestamp: time.Now().UnixNano(), + }, + }) + return pr.Error +} + // A DistDB provides methods to access Cockroach's monolithic, // distributed key value store. Each method invocation triggers a // lookup or lookups to find replica metadata for implicated key diff --git a/kv/kv_test.go b/kv/kv_test.go index 33b9d223243c..07d3cffc6772 100644 --- a/kv/kv_test.go +++ b/kv/kv_test.go @@ -18,15 +18,9 @@ package kv import ( - "net" "net/http" "net/http/httptest" "sync" - - "github.com/cockroachdb/cockroach/gossip" - "github.com/cockroachdb/cockroach/rpc" - "github.com/cockroachdb/cockroach/storage" - "github.com/golang/glog" ) var ( @@ -35,9 +29,6 @@ var ( ) type kvTestServer struct { - rpc *rpc.Server - gossip *gossip.Gossip - node *storage.Node db DB rest *RESTServer httpServer *httptest.Server @@ -45,23 +36,12 @@ type kvTestServer struct { func startServer() *kvTestServer { once.Do(func() { - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") - if err != nil { - glog.Fatal(err) - } - server = &kvTestServer{ - rpc: rpc.NewServer(addr), - } - server.gossip = gossip.New(server.rpc) - server.gossip.SetBootstrap([]net.Addr{server.rpc.Addr}) - server.node = storage.NewNode(server.rpc, server.gossip) + server = &kvTestServer{} server.db = NewLocalDB() server.rest = NewRESTServer(server.db) server.httpServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server.rest.HandleAction(w, r) })) - server.gossip.Start() - go server.rpc.ListenAndServe() // blocks, so launch in a goroutine }) return server } diff --git a/kv/local_db.go b/kv/local_db.go index 5221bb8dbb58..798610abbe11 100644 --- a/kv/local_db.go +++ b/kv/local_db.go @@ -21,7 +21,6 @@ import ( "reflect" "github.com/cockroachdb/cockroach/storage" - "github.com/golang/glog" ) // A LocalDB provides methods to access only a local, in-memory key @@ -35,11 +34,12 @@ type LocalDB struct { // unittests. func NewLocalDB() DB { ldb := &LocalDB{} - rng, err := storage.NewRange([]byte{}, storage.NewInMem(1<<30), nil) - if err != nil { - glog.Fatal(err) + meta := storage.RangeMetadata{ + RangeID: 1, + StartKey: storage.Key(""), + EndKey: storage.Key("\xff\xff"), } - ldb.rng = rng + ldb.rng = storage.NewRange(meta, storage.NewInMem(1<<30), nil, nil) return ldb } @@ -49,86 +49,82 @@ func NewLocalDB() DB { func (db *LocalDB) invokeMethod(method string, args, reply interface{}) interface{} { chanVal := reflect.MakeChan(reflect.ChanOf(reflect.BothDir, reflect.TypeOf(reply)), 1) replyVal := reflect.ValueOf(reply) - results := reflect.ValueOf(db.rng).MethodByName(method).Call([]reflect.Value{ + reflect.ValueOf(db.rng).MethodByName(method).Call([]reflect.Value{ reflect.ValueOf(args), replyVal, }) - errVal := results[0] - if errVal != reflect.Zero(errVal.Type()) { - reflect.Indirect(replyVal).FieldByName("Error").Set(errVal) - } chanVal.Send(replyVal) return chanVal.Interface() } -// Contains . +// Contains passes through to local range. func (db *LocalDB) Contains(args *storage.ContainsRequest) <-chan *storage.ContainsResponse { return db.invokeMethod("Contains", args, &storage.ContainsResponse{}).(chan *storage.ContainsResponse) } -// Get . +// Get passes through to local range. func (db *LocalDB) Get(args *storage.GetRequest) <-chan *storage.GetResponse { return db.invokeMethod("Get", args, &storage.GetResponse{}).(chan *storage.GetResponse) } -// Put . +// Put passes through to local range. func (db *LocalDB) Put(args *storage.PutRequest) <-chan *storage.PutResponse { return db.invokeMethod("Put", args, &storage.PutResponse{}).(chan *storage.PutResponse) } -// Increment . +// Increment passes through to local range. func (db *LocalDB) Increment(args *storage.IncrementRequest) <-chan *storage.IncrementResponse { return db.invokeMethod("Increment", args, &storage.IncrementResponse{}).(chan *storage.IncrementResponse) } -// Delete . +// Delete passes through to local range. func (db *LocalDB) Delete(args *storage.DeleteRequest) <-chan *storage.DeleteResponse { return db.invokeMethod("Delete", args, &storage.DeleteResponse{}).(chan *storage.DeleteResponse) } -// DeleteRange . +// DeleteRange passes through to local range. func (db *LocalDB) DeleteRange(args *storage.DeleteRangeRequest) <-chan *storage.DeleteRangeResponse { return db.invokeMethod("DeleteRange", args, &storage.DeleteRangeResponse{}).(chan *storage.DeleteRangeResponse) } -// Scan . +// Scan passes through to local range. func (db *LocalDB) Scan(args *storage.ScanRequest) <-chan *storage.ScanResponse { return db.invokeMethod("Scan", args, &storage.ScanResponse{}).(chan *storage.ScanResponse) } -// EndTransaction . +// EndTransaction passes through to local range. func (db *LocalDB) EndTransaction(args *storage.EndTransactionRequest) <-chan *storage.EndTransactionResponse { return db.invokeMethod("EndTransaction", args, &storage.EndTransactionResponse{}).(chan *storage.EndTransactionResponse) } -// AccumulateTS . +// AccumulateTS passes through to local range. func (db *LocalDB) AccumulateTS(args *storage.AccumulateTSRequest) <-chan *storage.AccumulateTSResponse { return db.invokeMethod("AccumulateTS", args, &storage.AccumulateTSResponse{}).(chan *storage.AccumulateTSResponse) } -// ReapQueue . +// ReapQueue passes through to local range. func (db *LocalDB) ReapQueue(args *storage.ReapQueueRequest) <-chan *storage.ReapQueueResponse { return db.invokeMethod("ReapQueue", args, &storage.ReapQueueResponse{}).(chan *storage.ReapQueueResponse) } -// EnqueueUpdate . +// EnqueueUpdate passes through to local range. func (db *LocalDB) EnqueueUpdate(args *storage.EnqueueUpdateRequest) <-chan *storage.EnqueueUpdateResponse { return db.invokeMethod("EnqueueUpdate", args, &storage.EnqueueUpdateResponse{}).(chan *storage.EnqueueUpdateResponse) } -// EnqueueMessage . +// EnqueueMessage passes through to local range. func (db *LocalDB) EnqueueMessage(args *storage.EnqueueMessageRequest) <-chan *storage.EnqueueMessageResponse { return db.invokeMethod("EnqueueMessage", args, &storage.EnqueueMessageResponse{}).(chan *storage.EnqueueMessageResponse) diff --git a/main.go b/main.go index 4fffd45ef630..1ed8e6909a19 100644 --- a/main.go +++ b/main.go @@ -16,15 +16,23 @@ package main import ( - "flag" + "os" + commander "code.google.com/p/go-commander" "github.com/cockroachdb/cockroach/server" "github.com/golang/glog" ) func main() { - flag.Parse() - if err := server.ListenAndServe(); err != nil { + c := commander.Commander{ + Name: "cockroach", + Commands: []*commander.Command{ + server.CmdStart, + server.CmdInit, + }, + } + + if err := c.Run(os.Args[1:]); err != nil { glog.Fatal(err) } } diff --git a/server/init.go b/server/init.go new file mode 100644 index 000000000000..ffd608de037f --- /dev/null +++ b/server/init.go @@ -0,0 +1,63 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Spencer Kimball (spencer.kimball@gmail.com) + +package server + +import ( + commander "code.google.com/p/go-commander" + "github.com/cockroachdb/cockroach/storage" + "github.com/golang/glog" +) + +var CmdInit = &commander.Command{ + UsageLine: "init ", + Short: "init new Cockroach cluster", + Long: ` +Initialize a new Cockroach cluster on this node. The cluster is started +with only a single replica, whose data is stored in the directory specified +by the first argument . + +The provided zone configuration (specified by second argument +) is installed as the default. In the +likely event that the default zone config provides for more than a +single replica, the first range will move to increase its replication +to the correct level upon start. + +To start the cluster after initialization, run "cockroach start". +`, + Run: runInit} + +// runInit. +func runInit(cmd *commander.Command, args []string) { + if len(args) != 2 { + cmd.Usage() + return + } + // Specifying the disk type as HDD may be incorrect, but doesn't + // matter for this bootstrap step. + engine, err := storage.NewRocksDB(storage.HDD, args[0]) + if err != nil { + glog.Fatal(err) + } + clusterID, err := BootstrapCluster(engine) + if err != nil { + glog.Fatal(err) + } + // TODO(spencer): install the default zone config. + glog.Infof("Cockroach cluster %s has been initialized", clusterID) + glog.Infof(`To start the cluster, run "cockroach start"`) +} diff --git a/server/node.go b/server/node.go new file mode 100644 index 000000000000..89550cb6439c --- /dev/null +++ b/server/node.go @@ -0,0 +1,457 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Spencer Kimball (spencer.kimball@gmail.com) + +package server + +import ( + "bytes" + "container/list" + "encoding/gob" + "strconv" + "time" + + "code.google.com/p/go-uuid/uuid" + "github.com/cockroachdb/cockroach/gossip" + "github.com/cockroachdb/cockroach/kv" + "github.com/cockroachdb/cockroach/rpc" + "github.com/cockroachdb/cockroach/storage" + "github.com/cockroachdb/cockroach/util" + "github.com/golang/glog" +) + +const ( + // gossipGroupLimit is the size limit for gossip groups with storage + // topics. + gossipGroupLimit = 100 + // gossipInterval is the interval for gossiping storage-related info. + gossipInterval = 1 * time.Minute + // ttlCapacityGossip is time-to-live for capacity-related info. + ttlCapacityGossip = 2 * time.Minute + // ttlClusterIDGossip is time-to-live for cluster ID + ttlClusterIDGossip = 0 * time.Second + // ttlNodeIDGossip is time-to-live for node ID -> address. + ttlNodeIDGossip = 0 * time.Second +) + +// Node manages a map of stores (by store ID) for which it serves traffic. +type Node struct { + ClusterID string // UUID for Cockroach cluster + Attributes storage.NodeAttributes // Node ID, network/physical topology + gossip *gossip.Gossip // Nodes gossip cluster ID, node ID -> host:port + kvDB kv.DB // Used to access global id generators + storeMap map[int32]*storage.Store // Map from StoreID to Store + closer chan struct{} + + maxAvailPrefix string // Prefix for max avail capacity gossip topic +} + +// BootstrapCluster generates a random UUID to uniquely identify a new +// cluster. Returns the cluster ID on success. +func BootstrapCluster(engine storage.Engine) (string, error) { + sIdent := storage.StoreIdent{ + ClusterID: uuid.New(), + NodeID: 1, + StoreID: 1, + } + s := storage.NewStore(engine, nil) + if err := s.Init(nil); err != nil { + return "", err + } + + // Verify the store isn't already part of a cluster. + if s.Ident.ClusterID != "" { + return "", util.Errorf("storage engine already belongs to a cluster (%s)", s.Ident.ClusterID) + } + if err := s.Bootstrap(sIdent); err != nil { + return "", err + } + + // Create first range. + rng, err := s.CreateRange(storage.Key(""), storage.Key("\xff\xff")) + if err != nil { + return "", err + } + if rng.Meta.RangeID != 1 { + return "", util.Errorf("expected range id of 1, got %d", rng.Meta.RangeID) + } + + // Initialize meta1 and meta2 range addressing records. + replicas := []storage.Replica{storage.Replica{NodeID: 1, StoreID: 1, RangeID: 1}} + var reply storage.PutResponse + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + enc.Encode(replicas) + args := &storage.PutRequest{ + Key: storage.MakeKey(storage.KeyMeta1Prefix, rng.Meta.EndKey), + Value: storage.Value{ + Bytes: buf.Bytes(), + Timestamp: time.Now().UnixNano(), + }, + } + if rng.Put(args, &reply); reply.Error != nil { + return "", reply.Error + } + args.Key = storage.MakeKey(storage.KeyMeta2Prefix, rng.Meta.EndKey) + if rng.Put(args, &reply); reply.Error != nil { + return "", reply.Error + } + + return sIdent.ClusterID, nil +} + +// NewNode returns a new instance of Node, interpreting command line +// flags to initialize the appropriate Store or set of +// Stores. Registers the storage instance for the RPC service "Node". +func NewNode(rpcServer *rpc.Server, kvDB kv.DB, gossip *gossip.Gossip) *Node { + n := &Node{ + Attributes: storage.NodeAttributes{ + Address: rpcServer.Addr, + }, + gossip: gossip, + kvDB: kvDB, + storeMap: make(map[int32]*storage.Store), + closer: make(chan struct{}, 1), + } + rpcServer.RegisterName("Node", n) + return n +} + +// start starts the node by initializing network/physical topology +// attributes gleaned from the environment and initializing stores +// for each specified engine. Launches periodic store gossipping +// in a goroutine. +func (n *Node) start(engines []storage.Engine) error { + if err := n.initAttributes(); err != nil { + return err + } + if err := n.initStoreMap(engines); err != nil { + return err + } + go n.startGossip() + return nil +} + +// stop cleanly stops the node +func (n *Node) stop() { + close(n.closer) +} + +// initAttributes initializes the physical/network topology attributes +// if possible. Datacenter, PDU & Rack values are taken from environment +// variables or command line flags. +func (n *Node) initAttributes() error { + // TODO(spencer,levon): extract these topology values. + return nil +} + +// initStoreMap initializes the Stores map from id to Store. Stores are +// added to the storeMap if the Store is already bootstrapped. A +// bootstrapped Store has a valid ident with cluster, node and Store +// IDs set. If the Store doesn't yet have a valid ident, it's added to +// the bootstraps list for initialization once the cluster and node +// IDs have been determined. +func (n *Node) initStoreMap(engines []storage.Engine) error { + bootstraps := list.New() + + for _, engine := range engines { + s := storage.NewStore(engine, n.gossip) + if err := s.Init(n.gossip); err != nil { + return err + } + // If Stores have been bootstrapped, their ident will be + // non-empty. Add these to Store map; otherwise, add to + // bootstraps list. + if s.Ident.ClusterID != "" { + if s.Ident.StoreID == 0 { + return util.Error("cluster id set for node ident but missing store id") + } + capacity, err := s.Capacity() + if err != nil { + return err + } + glog.Infof("Initialized store %s: %s", s.Ident, capacity) + n.storeMap[s.Ident.StoreID] = s + } else { + bootstraps.PushBack(s) + } + } + if err := n.validateStores(); err != nil { + return err + } + + // Bootstrap any uninitialized stores asynchronously. We may have to + // wait until we've successfully joined the gossip network in order + // to initialize if this node is not yet aware of the cluster it's + // joining. + if bootstraps.Len() > 0 { + go n.bootstrapStores(bootstraps) + } + + return nil +} + +// validateStores iterates over all stores, verifying they agree on +// cluster ID and node ID. The node's ident is initialized based on +// the agreed-upon cluster and node IDs. +func (n *Node) validateStores() error { + for _, s := range n.storeMap { + if s.Ident.ClusterID == "" || s.Ident.NodeID == 0 { + return util.Error("unidentified store in store map: %s", s.Ident) + } + if n.ClusterID == "" { + n.ClusterID = s.Ident.ClusterID + n.Attributes.NodeID = s.Ident.NodeID + } else if n.ClusterID != s.Ident.ClusterID { + return util.Errorf("store ident %s cluster ID doesn't match node ident %s", s.Ident, n.ClusterID) + } else if n.Attributes.NodeID != s.Ident.NodeID { + return util.Errorf("store ident %s node ID doesn't match node ident %s", s.Ident, n.Attributes.NodeID) + } + } + return nil +} + +// bootstrapStores bootstraps uninitialized stores once the cluster +// and node IDs have been established for this node. Store IDs are +// allocated via a sequence id generator stored at a system key per +// node. This method may block if the cluster ID is not yet known +// and should be invoked via goroutine. +func (n *Node) bootstrapStores(bootstraps *list.List) { + // Wait for gossip network if we don't have a cluster ID. + if n.ClusterID == "" { + // Connect to network and read cluster ID. + <-n.gossip.Connected + val, err := n.gossip.GetInfo(gossip.KeyClusterID) + if err != nil || val == nil { + glog.Fatalf("unable to ascertain cluster ID from gossip network: %v", err) + } + n.ClusterID = val.(string) + + // Allocate a new node ID. + ir := <-n.kvDB.Increment(&storage.IncrementRequest{ + Key: storage.KeyNodeIDGenerator, + Increment: 1, + }) + if ir.Error != nil { + glog.Fatalf("unable to allocate node ID: %v", ir.Error) + } + n.Attributes.NodeID = int32(ir.NewValue) + } + + // Bootstrap all waiting stores by allocating a new store id for + // each and invoking store.Bootstrap() to persist. + inc := int64(bootstraps.Len()) + ir := <-n.kvDB.Increment(&storage.IncrementRequest{ + // The Key is a concatenation of StoreIDGeneratorPrefix and this node's ID. + Key: storage.MakeKey(storage.KeyStoreIDGeneratorPrefix, + []byte(strconv.Itoa(int(n.Attributes.NodeID)))), + Increment: inc, + }) + if ir.Error != nil { + glog.Fatalf("unable to allocate %d store IDs: %v", inc, ir.Error) + } + sIdent := storage.StoreIdent{ + ClusterID: n.ClusterID, + NodeID: n.Attributes.NodeID, + StoreID: int32(ir.NewValue - inc + 1), + } + for e := bootstraps.Front(); e != nil; e = e.Next() { + s := e.Value.(*storage.Store) + s.Bootstrap(sIdent) + n.storeMap[s.Ident.StoreID] = s + sIdent.StoreID++ + } +} + +// startGossip loops on a periodic ticker to gossip node-related +// information. Loops until the node is closed and should be +// invoked via goroutin. +func (n *Node) startGossip() { + // Register gossip groups. + n.maxAvailPrefix = gossip.KeyMaxAvailCapacityPrefix + n.Attributes.Datacenter + n.gossip.RegisterGroup(n.maxAvailPrefix, gossipGroupLimit, gossip.MaxGroup) + + // Gossip cluster ID if not yet on network. Multiple nodes may race + // to gossip, but there's no harm in it, as there's no definitive + // source. + if _, err := n.gossip.GetInfo(gossip.KeyClusterID); err != nil { + n.gossip.AddInfo(gossip.KeyClusterID, n.ClusterID, ttlClusterIDGossip) + } + + // Always gossip node ID at startup. + nodeIDKey := gossip.KeyNodeIDPrefix + strconv.FormatInt(int64(n.Attributes.NodeID), 16) + n.gossip.AddInfo(nodeIDKey, n.Attributes.Address, ttlNodeIDGossip) + + ticker := time.NewTicker(gossipInterval) + for { + select { + case <-ticker.C: + n.gossipCapacities() + case <-n.closer: + ticker.Stop() + return + } + } +} + +// gossipCapacities calls capacity on each store and adds it to the +// gossip network. +func (n *Node) gossipCapacities() { + for _, store := range n.storeMap { + capacity, err := store.Capacity() + if err != nil { + glog.Warningf("Problem getting capacity: %v", err) + continue + } + + keyMaxCapacity := n.maxAvailPrefix + strconv.FormatInt(int64(n.Attributes.NodeID), 16) + "-" + + strconv.FormatInt(int64(store.Ident.StoreID), 16) + storeAttr := storage.StoreAttributes{ + StoreID: store.Ident.StoreID, + Attributes: n.Attributes, + Capacity: capacity, + } + n.gossip.AddInfo(keyMaxCapacity, storeAttr, ttlCapacityGossip) + } +} + +// getRange looks up the store by Replica.StoreID and then queries it for +// the range specified by Replica.RangeID. +func (n *Node) getRange(r *storage.Replica) (*storage.Range, error) { + store, ok := n.storeMap[r.StoreID] + if !ok { + return nil, util.Errorf("store %d not found", r.StoreID) + } + rng, err := store.GetRange(r.RangeID) + if err != nil { + return nil, err + } + return rng, nil +} + +// All methods to satisfy the Node RPC service fetch the range +// based on the Replica target provided in the argument header. +// Commands are broken down into read-only and read-write and +// sent along to the range via either Range.readOnlyCmd() or +// Range.readWriteCmd(). + +// Contains. +func (n *Node) Contains(args *storage.ContainsRequest, reply *storage.ContainsResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return rng.ReadOnlyCmd("Contains", args, reply) +} + +// Get. +func (n *Node) Get(args *storage.GetRequest, reply *storage.GetResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return rng.ReadOnlyCmd("Get", args, reply) +} + +// Put. +func (n *Node) Put(args *storage.PutRequest, reply *storage.PutResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return <-rng.ReadWriteCmd("Put", args, reply) +} + +// Increment. +func (n *Node) Increment(args *storage.IncrementRequest, reply *storage.IncrementResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return <-rng.ReadWriteCmd("Increment", args, reply) +} + +// Delete. +func (n *Node) Delete(args *storage.DeleteRequest, reply *storage.DeleteResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return <-rng.ReadWriteCmd("Delete", args, reply) +} + +// DeleteRange. +func (n *Node) DeleteRange(args *storage.DeleteRangeRequest, reply *storage.DeleteRangeResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return <-rng.ReadWriteCmd("DeleteRange", args, reply) +} + +// Scan. +func (n *Node) Scan(args *storage.ScanRequest, reply *storage.ScanResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return rng.ReadOnlyCmd("Scan", args, reply) +} + +// EndTransaction. +func (n *Node) EndTransaction(args *storage.EndTransactionRequest, reply *storage.EndTransactionResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return <-rng.ReadWriteCmd("EndTransaction", args, reply) +} + +// AccumulateTS. +func (n *Node) AccumulateTS(args *storage.AccumulateTSRequest, reply *storage.AccumulateTSResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return <-rng.ReadWriteCmd("AccumulateTS", args, reply) +} + +// ReapQueue. +func (n *Node) ReapQueue(args *storage.ReapQueueRequest, reply *storage.ReapQueueResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return <-rng.ReadWriteCmd("ReapQueue", args, reply) +} + +// EnqueueUpdate. +func (n *Node) EnqueueUpdate(args *storage.EnqueueUpdateRequest, reply *storage.EnqueueUpdateResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return <-rng.ReadWriteCmd("EnqueueUpdate", args, reply) +} + +// EnqueueMessage. +func (n *Node) EnqueueMessage(args *storage.EnqueueMessageRequest, reply *storage.EnqueueMessageResponse) error { + rng, err := n.getRange(&args.Replica) + if err != nil { + return err + } + return <-rng.ReadWriteCmd("EnqueueMessage", args, reply) +} diff --git a/server/server.go b/server/server.go index 636058f49985..d43ce9c1fb0b 100644 --- a/server/server.go +++ b/server/server.go @@ -26,43 +26,131 @@ import ( "io" "net" "net/http" + "regexp" + "strconv" "strings" + commander "code.google.com/p/go-commander" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/kv" "github.com/cockroachdb/cockroach/rpc" "github.com/cockroachdb/cockroach/storage" "github.com/cockroachdb/cockroach/structured" + "github.com/cockroachdb/cockroach/util" "github.com/golang/glog" ) var ( httpAddr = flag.String("http_addr", "localhost:8080", "TCP network address to bind to for HTTP traffic") rpcAddr = flag.String("rpc_addr", "localhost:8081", "TCP network address to bind to for RPC traffic") + + // dataDirs is specified to enable durable storage via + // RocksDB-backed key-value stores. Memory-backed key value stores + // may be optionally specified via a comma-separated list of integer + // sizes. + dataDirs = flag.String("data_dirs", "", "specify a comma-separated list of disk "+ + "type and path or integer size in bytes. For solid state disks, ssd=; "+ + "for spinning disks, hdd=; for in-memory, mem=. E.g. "+ + "--data_dirs=hdd=/mnt/hda1,ssd=/mnt/ssd01,ssd=/mnt/ssd02,mem=") + + // Regular expression for capturing data directory specifications. + dataDirRE = regexp.MustCompile(`^(mem)=([\d]*)|(ssd|hdd)=(.*)$`) ) +var CmdStart = &commander.Command{ + UsageLine: "start", + Short: "start node by joining the gossip network", + Long: fmt.Sprintf(` +Start Cockroach node by joining the gossip network and exporting key +ranges stored on physical device(s). The gossip network is joined by +contacting one or more well-known hosts specified by the +--gossip_bootstrap command line flag. Every node should be run with +the same list of bootstrap hosts to guarantee a connected network. An +alternate approach is to use a single host for --gossip_bootstrap and +round-robin DNS. + +Each node exports data from one or more physical devices. These +devices are specified via the --data_dirs command line flag. This is a +comma-separated list of paths to storage directories. Although the +paths should be specified to correspond uniquely to physical devices, +this requirement isn't strictly enforced. + +A node exports an HTTP API with the following endpoints: + + Health check: http://%s/healthz + Remote shutdown: http://%s/quitquitquit + Key-value REST: http://%s%s + Structured Schema REST: http://%s%s +`, *httpAddr, *httpAddr, *httpAddr, kv.KVKeyPrefix, *httpAddr, structured.StructuredKeyPrefix), + Run: runStart, +} + type server struct { mux *http.ServeMux rpc *rpc.Server gossip *gossip.Gossip - node *storage.Node kvDB kv.DB kvREST *kv.RESTServer + node *Node structuredDB *structured.DB structuredREST *structured.RESTServer } -// ListenAndServe starts an HTTP server at --http_addr and an RPC server +// runStart starts an HTTP server at --http_addr and an RPC server // at --rpc_addr. This method won't return unless the server is shutdown // or a non-temporary error occurs on the HTTP server connection. -func ListenAndServe() error { +func runStart(cmd *commander.Command, args []string) { + glog.Info("starting cockroach cluster") s, err := newServer() if err != nil { - return err + glog.Fatal(err) } - err = s.start() + err = s.start(nil /* init engines from --data_dirs */) s.stop() - return err + if err != nil { + glog.Fatal(err) + } +} + +// initEngines interprets the --data_dirs command line flag to +// initialize a slice of storage.Engine objects. +func initEngines() ([]storage.Engine, error) { + engines := make([]storage.Engine, 0, 1) + for _, dir := range strings.Split(*dataDirs, ",") { + // Error if regexp doesn't match. + matches := dataDirRE.FindStringSubmatch(dir) + if matches == nil || len(matches) != 3 { + return nil, util.Errorf("invalid data directory %q", dir) + } + + var engine storage.Engine + var err error + if matches[1] == "mem" { + size, err := strconv.ParseInt(matches[2], 10, 64) + if err != nil { + glog.Warningf("unable to init in-memory storage %q; skipping...will not serve data", dir) + } + engine = storage.NewInMem(size) + } else { + var typ storage.DiskType + switch matches[2] { + case "hdd": + typ = storage.HDD + case "ssd": + typ = storage.SSD + default: + return nil, util.Errorf("unhandled disk type %q", matches[1]) + } + engine, err = storage.NewRocksDB(typ, matches[2]) + if err != nil { + glog.Warningf("unable to init rocksdb with data dir %q; skipping...will not serve data", matches[2]) + continue + } + } + engines = append(engines, engine) + } + + return engines, nil } func newServer() (*server, error) { @@ -76,21 +164,34 @@ func newServer() (*server, error) { } s.gossip = gossip.New(s.rpc) - s.node = storage.NewNode(s.rpc, s.gossip) s.kvDB = kv.NewDB(s.gossip) s.kvREST = kv.NewRESTServer(s.kvDB) + s.node = NewNode(s.rpc, s.kvDB, s.gossip) s.structuredDB = structured.NewDB(s.kvDB) s.structuredREST = structured.NewRESTServer(s.structuredDB) return s, nil } -func (s *server) start() error { - glog.Infoln("Starting RPC server at", *rpcAddr) +func (s *server) start(engines []storage.Engine) error { go s.rpc.ListenAndServe() // blocks, so launch in a goroutine + glog.Infoln("Started RPC server at", *rpcAddr) - glog.Infoln("Starting gossip instance") s.gossip.Start() + glog.Infoln("Started gossip instance") + + // Init the engines specified via command line flags if not supplied. + if engines == nil { + var err error + engines, err = initEngines() + if err != nil { + return err + } + } + if err := s.node.start(engines); err != nil { + return err + } + glog.Infoln("Initialized %d storage engines", len(engines)) s.initHTTP() return http.ListenAndServe(*httpAddr, s) @@ -98,12 +199,15 @@ func (s *server) start() error { func (s *server) initHTTP() { glog.Infoln("Starting HTTP server at", *httpAddr) - s.mux.HandleFunc("/healthz", s.handleHealthz) + s.mux.HandleFunc("/_admin/healthz", s.handleHealthz) s.mux.HandleFunc(kv.KVKeyPrefix, s.kvREST.HandleAction) s.mux.HandleFunc(structured.StructuredKeyPrefix, s.structuredREST.HandleAction) } func (s *server) stop() { + // TODO(spencer): the http server should exit; this functionality is + // slated for go 1.3. + s.node.stop() s.gossip.Stop() s.rpc.Close() } diff --git a/server/server_test.go b/server/server_test.go index 7a3f40ebfd13..e07680b0cb4a 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -26,6 +26,7 @@ import ( "sync" "testing" + "github.com/cockroachdb/cockroach/storage" "github.com/golang/glog" ) @@ -41,9 +42,12 @@ func startServer() *server { if err != nil { glog.Fatal(err) } + engines := []storage.Engine{storage.NewInMem(1 << 20)} + BootstrapCluster(engines[0]) s.gossip.SetBootstrap([]net.Addr{s.rpc.Addr}) - //s.node.InitCluster() - go s.start() // TODO(spencer): should shutdown server. + go func() { + glog.Fatal(s.start(engines)) // TODO(spencer): should shutdown server. + }() glog.Infof("Test server listening on http: %s, rpc: %s", *httpAddr, *rpcAddr) }) return s @@ -53,12 +57,12 @@ func resetTestData() { // TODO(spencer): remove all data files once rocksdb is hooked up. } -// TestHealthz verifies that /healthz does, in fact, return "ok" +// TestHealthz verifies that /_admin/healthz does, in fact, return "ok" // as expected. func TestHealthz(t *testing.T) { startServer() defer resetTestData() - url := "http://" + *httpAddr + "/healthz" + url := "http://" + *httpAddr + "/_admin/healthz" resp, err := http.Get(url) if err != nil { t.Fatalf("error requesting healthz at %s: %s", url, err) @@ -74,7 +78,7 @@ func TestHealthz(t *testing.T) { } } -// TestGzip hits the /healthz endpoint while explicitly disabling +// TestGzip hits the /_admin/healthz endpoint while explicitly disabling // decompression on a custom client's Transport and setting it // conditionally via the request's Accept-Encoding headers. func TestGzip(t *testing.T) { @@ -86,7 +90,7 @@ func TestGzip(t *testing.T) { DisableCompression: true, }, } - req, err := http.NewRequest("GET", "http://"+*httpAddr+"/healthz", nil) + req, err := http.NewRequest("GET", "http://"+*httpAddr+"/_admin/healthz", nil) if err != nil { t.Fatalf("could not create request: %s", err) } diff --git a/simulation/gossip.go b/simulation/gossip.go index 9229fc26069f..eb8980e3fb48 100644 --- a/simulation/gossip.go +++ b/simulation/gossip.go @@ -192,8 +192,8 @@ func outputDotFile(dotFN string, cycle int, nodes map[string]*gossip.Gossip, edg } var sentinelAge int64 - if val, err := node.GetInfo(gossip.SentinelGossip); err != nil { - glog.Infof("error getting info for sentinel gossip key %q: %s", gossip.SentinelGossip, err) + if val, err := node.GetInfo(gossip.KeySentinel); err != nil { + glog.Infof("error getting info for sentinel gossip key %q: %s", gossip.KeySentinel, err) } else { sentinelAge = int64(cycle) - val.(int64) } diff --git a/storage/allocator.go b/storage/allocator.go index 7697275faacf..a83aa53ef262 100644 --- a/storage/allocator.go +++ b/storage/allocator.go @@ -13,7 +13,7 @@ // permissions and limitations under the License. See the AUTHORS file // for names of contributors. // -// Author: Spencer Kimball (spencer.kimball@gmail.com) +// Author: Levon Lloyd (levon.lloyd@gmail.com) package storage @@ -21,83 +21,72 @@ import ( "math/rand" ) -// ZoneConfigFinder looks up a ZoneConfig for a range that starts with -// a given key. -type ZoneConfigFinder func(string) (ZoneConfig, error) +// StoreFinder finds the disks in a datacenter with the most available capacity. +type StoreFinder func(string) ([]StoreAttributes, error) -// AvailableDiskFinder finds the disks in a datacenter with the most available capacity. -type AvailableDiskFinder func(string) ([]AvailableDiskConfig, error) - -// allocator makes allocation decisions based on a zone -// configuration, existing range metadata and available servers & -// disks. Configuration settings and range metadata information is -// stored directly in the engine-backed range they describe. -// Information on suitability and availability of servers is -// gleaned from the gossip network. +// allocator makes allocation decisions based on a zone configuration, +// existing range metadata and available stores. Configuration +// settings and range metadata information is stored directly in the +// engine-backed range they describe. Information on suitability and +// availability of servers is gleaned from the gossip network. type allocator struct { - diskFinder AvailableDiskFinder - zcFinder ZoneConfigFinder - rand rand.Rand + storeFinder StoreFinder + rand rand.Rand } -var maxCapacityPrefix = "max-free-capacity-" - // allocate returns a suitable Replica for the range and zone. If none -// are available / suitable, returns an error. It looks up the zone config for -// the block to determine where it needs to send data, then uses the gossip -// network to pick a random set of nodes in each data center based on the -// available capacity of the node. -// TODO(levon) Handle Racks/Power Units. -// TODO(levon) throw error if not enough suitable replicas can be found - -func (a *allocator) allocate(start string, existingReplicas []Replica) ([]Replica, error) { - usedHosts := make(map[string]bool) +// are available / suitable, returns an error. It looks up the zone +// config for the block to determine where it needs to send data, then +// uses the gossip network to pick a random set of nodes in each data +// center based on the available capacity of the node. +// TODO(levon): Handle Racks/Power Units. +// TODO(levon): throw error if not enough suitable replicas can be found. +// TODO(spencer): only need the delta between existing replicas and zone config's specifications. +func (a *allocator) allocate(config *ZoneConfig, existingReplicas []Replica) ([]Replica, error) { + usedHosts := make(map[int32]struct{}) for _, replica := range existingReplicas { - usedHosts[replica.Addr] = true - } - // Find the Zone Config that applies to this range. - zoneConfig, err := a.zcFinder(start) - if err != nil { - return nil, err + usedHosts[replica.NodeID] = struct{}{} } - result := make([]Replica, 0, 1) - for dc, diskTypes := range zoneConfig.Replicas { + var results []Replica = nil + for dc, diskTypes := range config.Replicas { // For each replica to be placed in this data center. for _, diskType := range diskTypes { // Randomly pick a node weighted by capacity. - candidates := make([]AvailableDiskConfig, len(zoneConfig.Replicas)) - var candidateCapacityTotal float64 - disks, err := a.diskFinder(dc) + var candidates []StoreAttributes = nil + var capacityTotal float64 + stores, err := a.storeFinder(dc) if err != nil { return nil, err } - for _, c := range disks { - if c.DiskType == diskType && !usedHosts[c.Node.Address] { - candidates = append(candidates, c) - candidateCapacityTotal += c.DiskCapacity.PercentAvail() + for _, s := range stores { + _, alreadyUsed := usedHosts[s.Attributes.NodeID] + if s.Capacity.DiskType == diskType && !alreadyUsed { + candidates = append(candidates, s) + capacityTotal += s.Capacity.PercentAvail() } } var capacitySeen float64 - targetCapacity := rand.Float64() + targetCapacity := rand.Float64() * capacityTotal // Walk through candidates in random order, stopping when // we've passed the capacity target. for _, c := range candidates { - capacitySeen += (c.DiskCapacity.PercentAvail() / candidateCapacityTotal) + capacitySeen += c.Capacity.PercentAvail() if capacitySeen >= targetCapacity { replica := Replica{ - Addr: c.Node.Address, - Disk: c.Disk, + NodeID: c.Attributes.NodeID, + StoreID: c.StoreID, + // RangeID is filled in later, when range is created. } - result = append(result, replica) - usedHosts[c.Node.Address] = true + results = append(results, replica) + usedHosts[c.Attributes.NodeID] = struct{}{} break } } } } - return result, nil + return results, nil } /*func findZoneConfig(key string) (ZoneConfig, error) { diff --git a/storage/config.go b/storage/config.go index 73dc3af615b0..5ae4f226c002 100644 --- a/storage/config.go +++ b/storage/config.go @@ -18,16 +18,19 @@ package storage import ( + "net" + "github.com/cockroachdb/cockroach/gossip" yaml "gopkg.in/yaml.v1" ) -// Replica describes a replica location by address (host:port), disk -// (device name) and range start key. +// Replica describes a replica location by node ID (corresponds to a +// host:port via lookup on gossip network), store ID (corresponds to +// a physical device, unique per node) and range ID. type Replica struct { - Addr string // host:port. - Disk string // e.g. ssd1. - RangeKey []byte // Range start key. + NodeID int32 + StoreID int32 + RangeID int64 } // DiskType is the type of a disk that a Store is storing data on. @@ -38,36 +41,40 @@ const ( SSD DiskType = iota // HDD = Spinning disk HDD + // MEM = DRAM + MEM ) -// DiskCapacity contains capacity information for a storage device. -type DiskCapacity struct { - Capacity uint64 - Available uint64 +// StoreCapacity contains capacity information for a storage device. +type StoreCapacity struct { + Capacity int64 + Available int64 + DiskType DiskType } -// NodeConfig holds configuration about the node. -type NodeConfig struct { - Address string - DataCenter string +// NodeAttributes holds details on node physical/network topology. +type NodeAttributes struct { + NodeID int32 + Address net.Addr + Datacenter string PDU string Rack string } -// AvailableDiskConfig holds information about a disk that is available in a RoachNode. -type AvailableDiskConfig struct { - Node NodeConfig - Disk string - DiskCapacity DiskCapacity - DiskType DiskType +// StoreAttributes holds store information including physical/network +// topology via NodeAttributes and disk type & capacity data. +type StoreAttributes struct { + StoreID int32 + Attributes NodeAttributes + Capacity StoreCapacity } // ZoneConfig holds configuration that is needed for a range of KV pairs. type ZoneConfig struct { // Replicas is a map from datacenter name to a slice of disk types. Replicas map[string]([]DiskType) `yaml:"replicas,omitempty"` - RangeMinBytes uint64 `yaml:"range_min_bytes,omitempty"` - RangeMaxBytes uint64 `yaml:"range_max_bytes,omitempty"` + RangeMinBytes int64 `yaml:"range_min_bytes,omitempty"` + RangeMaxBytes int64 `yaml:"range_max_bytes,omitempty"` } // ParseZoneConfig parses a YAML serialized ZoneConfig. @@ -82,12 +89,12 @@ func (z *ZoneConfig) ToYAML() ([]byte, error) { return yaml.Marshal(z) } -// Less compares two AvailableDiskConfigs based on percentage of disk available. -func (a AvailableDiskConfig) Less(b gossip.Ordered) bool { - return a.DiskCapacity.PercentAvail() < b.(AvailableDiskConfig).DiskCapacity.PercentAvail() +// Less compares two StoreAttributess based on percentage of disk available. +func (a StoreAttributes) Less(b gossip.Ordered) bool { + return a.Capacity.PercentAvail() < b.(StoreAttributes).Capacity.PercentAvail() } // PercentAvail computes the percentage of disk space that is available. -func (d DiskCapacity) PercentAvail() float64 { - return float64(d.Available) / float64(d.Capacity) +func (sc StoreCapacity) PercentAvail() float64 { + return float64(sc.Available) / float64(sc.Capacity) } diff --git a/storage/config_test.go b/storage/config_test.go index 956fe500f0ca..c065b719cd67 100644 --- a/storage/config_test.go +++ b/storage/config_test.go @@ -12,6 +12,7 @@ // implied. See the License for the specific language governing // permissions and limitations under the License. See the AUTHORS file // for names of contributors. +// // Author: Levon Lloyd (levon.lloyd@gmail.com) package storage diff --git a/storage/engine.go b/storage/engine.go index 06614f495bc6..5e694e8d49a0 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -17,15 +17,89 @@ package storage +import ( + "bytes" + "encoding/binary" + "encoding/gob" + "time" + + "github.com/cockroachdb/cockroach/util" +) + // Engine is the interface that wraps the core operations of a // key/value store. type Engine interface { - // Put sets the given key to the value provided. + // put sets the given key to the value provided. put(key Key, value Value) error - // Get returns the value for the given key, nil otherwise. + // get returns the value for the given key, nil otherwise. get(key Key) (Value, error) - // Delete removes the item from the db with the given key. + // delete removes the item from the db with the given key. del(key Key) error - // Capacity returns capacity details for the engine's available storage. - capacity() (*DiskCapacity, error) + // capacity returns capacity details for the engine's available storage. + capacity() (StoreCapacity, error) +} + +// putI sets the given key to the gob-serialized byte string of the +// value provided. Used internally. Uses current time and default +// expiration. +func putI(engine Engine, key Key, value interface{}) error { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(value); err != nil { + return err + } + return engine.put(key, Value{ + Bytes: buf.Bytes(), + Timestamp: time.Now().UnixNano(), + }) +} + +// getI fetches the specified key and gob-deserializes it into +// "value". Returns true on success or false if the key was not +// found. The timestamp of the write is returned as the second return +// value. +func getI(engine Engine, key Key, value interface{}) (bool, int64, error) { + val, err := engine.get(key) + if err != nil { + return false, 0, err + } + if len(val.Bytes) == 0 { + return false, 0, nil + } + if value != nil { + if err = gob.NewDecoder(bytes.NewBuffer(val.Bytes)).Decode(value); err != nil { + return true, val.Timestamp, err + } + } + return true, val.Timestamp, nil +} + +// increment fetches the varint encoded int64 value specified by key +// and adds "inc" to it then re-encodes as varint and puts the new +// value to key using the timestamp "ts". The newly incremented value +// is returned. +func increment(engine Engine, key Key, inc int64, ts int64) (int64, error) { + // First retrieve existing value. + val, err := engine.get(key) + if err != nil { + return 0, err + } + var int64Val int64 + // If the value exists, attempt to decode it as a varint. + if len(val.Bytes) != 0 { + var numBytes int + int64Val, numBytes = binary.Varint(val.Bytes) + if numBytes == 0 { + return 0, util.Errorf("key %q cannot be incremented; not varint-encoded", key) + } else if numBytes < 0 { + return 0, util.Errorf("key %q cannot be incremented; integer overflow", key) + } + } + int64Val += inc + encoded := make([]byte, binary.MaxVarintLen64) + numBytes := binary.PutVarint(encoded, int64Val) + encoded = encoded[:numBytes] + if err = engine.put(key, Value{Bytes: encoded, Timestamp: ts}); err != nil { + return 0, err + } + return int64Val, nil } diff --git a/storage/in_mem.go b/storage/in_mem.go index b15e9eaf94ae..4e3b06e7dead 100644 --- a/storage/in_mem.go +++ b/storage/in_mem.go @@ -22,40 +22,40 @@ import "sync" // InMem a simple, in-memory key-value store. type InMem struct { sync.RWMutex - cacheSize int64 - data map[string]string + maxSize int64 + data map[string]string } // NewInMem allocates and returns a new InMem object. -func NewInMem(cacheSize int64) *InMem { +func NewInMem(maxSize int64) *InMem { return &InMem{ - cacheSize: cacheSize, - data: make(map[string]string), + maxSize: maxSize, + data: make(map[string]string), } } // put sets the given key to the value provided. -func (b *InMem) put(key Key, value Value) error { - b.Lock() - defer b.Unlock() - b.data[string(key)] = string(value.Bytes) +func (in *InMem) put(key Key, value Value) error { + in.Lock() + defer in.Unlock() + in.data[string(key)] = string(value.Bytes) return nil } // get returns the value for the given key, nil otherwise. -func (b *InMem) get(key Key) (Value, error) { - b.RLock() - defer b.RUnlock() +func (in *InMem) get(key Key) (Value, error) { + in.RLock() + defer in.RUnlock() return Value{ - Bytes: []byte(b.data[string(key)]), + Bytes: []byte(in.data[string(key)]), }, nil } // del removes the item from the db with the given key. -func (b *InMem) del(key Key) error { - b.Lock() - defer b.Unlock() - delete(b.data, string(key)) +func (in *InMem) del(key Key) error { + in.Lock() + defer in.Unlock() + delete(in.data, string(key)) return nil } @@ -63,7 +63,10 @@ func (b *InMem) del(key Key) error { // computed size of cached keys and values. The actual free space may // not be entirely accurate due to object storage costs and other // internal glue. -func (r *InMem) capacity() (*DiskCapacity, error) { - capacity := &DiskCapacity{} - return capacity, nil +func (in *InMem) capacity() (StoreCapacity, error) { + return StoreCapacity{ + Capacity: in.maxSize, + Available: in.maxSize, // TODO(spencer): fix this. + DiskType: MEM, + }, nil } diff --git a/storage/keys.go b/storage/keys.go new file mode 100644 index 000000000000..76d1d11dac58 --- /dev/null +++ b/storage/keys.go @@ -0,0 +1,41 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. +// +// Author: Spencer Kimball (spencer.kimball@gmail.com) + +package storage + +import ( + "bytes" +) + +// Constants for system-reserved keys in the KV map. +var ( + // KeyMeta1Prefix is the first level of key addressing. The value is a + // slice of Replica structs. + KeyMeta1Prefix = Key("\x00\x00meta1") + // KeyMeta2Prefix is the second level of key addressing. The value is a + // slice of Replica structs. + KeyMeta2Prefix = Key("\x00\x00meta2") + // KeyNodeIDGenerator contains a sequence generator for node IDs. + KeyNodeIDGenerator = Key("\x00node-id-generator") + // KeyStoreIDGeneratorPrefix specifies key prefixes for sequence + // generators, one per node, for store IDs. + KeyStoreIDGeneratorPrefix = Key("\x00store-id-generator-") +) + +func MakeKey(prefix, suffix Key) Key { + return Key(bytes.Join([][]byte{prefix, suffix}, []byte{})) +} diff --git a/storage/messages.go b/storage/messages.go index a469c81f6c8c..e51176a58a69 100644 --- a/storage/messages.go +++ b/storage/messages.go @@ -121,11 +121,11 @@ type IncrementRequest struct { } // An IncrementResponse is the return value from the Increment -// method. It specifies the new value after increment. If the value -// could not be decoded as specified, Error will be set. +// method. The new value after increment is specified in NewValue. If +// the value could not be decoded as specified, Error will be set. type IncrementResponse struct { ResponseHeader - NewValue Value // NewValue is varint64 encoded + NewValue int64 } // A DeleteRequest is arguments to the Delete() method. diff --git a/storage/node.go b/storage/node.go deleted file mode 100644 index 63531b0be76d..000000000000 --- a/storage/node.go +++ /dev/null @@ -1,261 +0,0 @@ -// Copyright 2014 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. See the AUTHORS file -// for names of contributors. -// -// Author: Spencer Kimball (spencer.kimball@gmail.com) - -package storage - -import ( - "flag" - "strings" - "time" - - "github.com/cockroachdb/cockroach/gossip" - "github.com/cockroachdb/cockroach/rpc" - "github.com/cockroachdb/cockroach/util" - "github.com/golang/glog" -) - -const ( - // defaultCacheSize is the default value for the cacheSize command line flag. - defaultCacheSize = 1 << 30 // GB -) - -var ( - // dataDirs is specified to enable durable storage via - // RocksDB-backed key-value stores. - dataDirs = flag.String("data_dirs", "", "specify a comma-separated list of paths, "+ - "one per physical storage device; if empty, node will serve out of memory") - // cacheSize is the amount of memory in bytes to use for caching data. - // The value is split evenly between the stores if there are more than one. - // If the node only hosts a single in-memory store, then cacheSize is the - // maximum size in bytes the store is allowed to grow to before it reaches - // full capacity. - cacheSize = flag.Int64("cache_size", defaultCacheSize, "total size in bytes for "+ - "data stored in all caches, shared evenly if there are multiple storage devices") - - // storageGossipInterval is the period at which storage is checked and re-added to gossip - storageGossipInterval = flag.Duration( - "storage_gossip_interval", time.Minute, - "approximate interval (time.Duration) for checking local disk capacity and adding to gossip") -) - -// Node holds the set of stores which this roach node serves traffic for. -type Node struct { - gossip *gossip.Gossip - storeMap map[string]*store - config NodeConfig - closer chan struct{} -} - -// NewNode returns a new instance of Node, interpreting command line -// flags to intialize the appropriate store or set of -// stores. Registers the storage instance for the RPC service "Node". -func NewNode(rpcServer *rpc.Server, gossip *gossip.Gossip) *Node { - // TODO(levon): Read node configuration and set it here. - n := &Node{ - gossip: gossip, - storeMap: make(map[string]*store), - closer: make(chan struct{}, 1), - } - // TODO(levon): real args here - allocator := &allocator{} - rpcServer.RegisterName("Node", n) - - if *dataDirs == "" { - n.storeMap["in-mem"] = newStore(NewInMem(*cacheSize), allocator) - } else { - for _, dir := range strings.Split(*dataDirs, ",") { - rocksdb, err := NewRocksDB(dir) - if err != nil { - glog.Infof("unable to stat data directory %s; skipping...will not serve data", dir) - continue - } - n.storeMap[rocksdb.name] = newStore(rocksdb, allocator) - } - // TODO(spencer): set cache sizes on successfully created stores. - glog.Fatal("rocksdb stores unsupported") - } - return n -} - -// startGossip starts a goroutine that periodically checks local disk -// capacity and gossips it. -func (n *Node) startGossip() { - ticker := time.NewTicker(*storageGossipInterval) - go func() { - for { - select { - case <-ticker.C: - n.gossipCapacities() - case <-n.closer: - ticker.Stop() - return - } - } - }() -} - -// gossipCapacities calls capacity on each store and adds it to the gossip -func (n *Node) gossipCapacities() { - gossipTopic := maxCapacityPrefix + n.config.DataCenter - for _, store := range n.storeMap { - capacity, err := store.engine.capacity() - if err != nil { - glog.Warningf("Problem getting capacity: %v", err) - continue - } - - n.gossip.AddInfo(gossipTopic, &AvailableDiskConfig{DiskCapacity: *capacity, Node: n.config}, time.Duration(2)*(*storageGossipInterval)) - } -} - -// getRange looks up the store by Replica.Disk and then queries it for -// the range specified by Replica.Range. -func (n *Node) getRange(r *Replica) (*Range, error) { - store, ok := n.storeMap[r.Disk] - if !ok { - return nil, util.Errorf("disk %s not found", r.Disk) - } - rng, err := store.getRange(r) - if err != nil { - return nil, err - } - return rng, nil -} - -// Start starts the node -func (n *Node) Start() { - n.startGossip() -} - -// Stop cleanly stops the node -func (n *Node) Stop() { - close(n.closer) -} - -// All methods to satisfy the Node RPC service fetch the range -// based on the Replica target provided in the argument header. -// Commands are broken down into read-only and read-write and -// sent along to the range via either Range.readOnlyCmd() or -// Range.readWriteCmd(). - -// Contains . -func (n *Node) Contains(args *ContainsRequest, reply *ContainsResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return rng.readOnlyCmd("Contains", args, reply) -} - -// Get . -func (n *Node) Get(args *GetRequest, reply *GetResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return rng.readOnlyCmd("Get", args, reply) -} - -// Put . -func (n *Node) Put(args *PutRequest, reply *PutResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return <-rng.readWriteCmd("Put", args, reply) -} - -// Increment . -func (n *Node) Increment(args *IncrementRequest, reply *IncrementResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return <-rng.readWriteCmd("Increment", args, reply) -} - -// Delete . -func (n *Node) Delete(args *DeleteRequest, reply *DeleteResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return <-rng.readWriteCmd("Delete", args, reply) -} - -// DeleteRange . -func (n *Node) DeleteRange(args *DeleteRangeRequest, reply *DeleteRangeResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return <-rng.readWriteCmd("DeleteRange", args, reply) -} - -// Scan . -func (n *Node) Scan(args *ScanRequest, reply *ScanResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return rng.readOnlyCmd("Scan", args, reply) -} - -// EndTransaction . -func (n *Node) EndTransaction(args *EndTransactionRequest, reply *EndTransactionResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return <-rng.readWriteCmd("EndTransaction", args, reply) -} - -// AccumulateTS . -func (n *Node) AccumulateTS(args *AccumulateTSRequest, reply *AccumulateTSResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return <-rng.readWriteCmd("AccumulateTS", args, reply) -} - -// ReapQueue . -func (n *Node) ReapQueue(args *ReapQueueRequest, reply *ReapQueueResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return <-rng.readWriteCmd("ReapQueue", args, reply) -} - -// EnqueueUpdate . -func (n *Node) EnqueueUpdate(args *EnqueueUpdateRequest, reply *EnqueueUpdateResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return <-rng.readWriteCmd("EnqueueUpdate", args, reply) -} - -// EnqueueMessage . -func (n *Node) EnqueueMessage(args *EnqueueMessageRequest, reply *EnqueueMessageResponse) error { - rng, err := n.getRange(&args.Replica) - if err != nil { - return err - } - return <-rng.readWriteCmd("EnqueueMessage", args, reply) -} diff --git a/storage/range.go b/storage/range.go index 8a8aa1f42211..085e96120130 100644 --- a/storage/range.go +++ b/storage/range.go @@ -20,12 +20,23 @@ package storage import ( "bytes" "container/list" - "encoding/binary" "sync" + "time" + "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/util" + "github.com/golang/glog" ) +// A RangeMetadata holds information about the range, including +// range ID and start and end keys, and replicas slice. +type RangeMetadata struct { + RangeID int64 + StartKey Key + EndKey Key + Replicas []Replica +} + // A Range is a contiguous keyspace with writes managed via an // instance of the Raft consensus algorithm. Many ranges may exist // in a store and they are unlikely to be contiguous. Ranges are @@ -33,29 +44,29 @@ import ( // integrity by replacing failed replicas, splitting and merging // as appropriate. type Range struct { - rangeKey Key // The start key for this range. - engine Engine // The underlying key-value store. - allocator *allocator // Makes allocation decisions. - mu sync.Mutex // Protects the pending list. - pending *list.List // Not-yet-proposed log entries. + Meta RangeMetadata + engine Engine // The underlying key-value store + allocator *allocator // Makes allocation decisions + gossip *gossip.Gossip // Range may gossip based on contents + mu sync.Mutex // Protects the pending list + pending *list.List // Not-yet-proposed log entries // TODO(andybons): raft instance goes here. } -// NewRange initializes the range starting at key. The underlying -// engine is queried for range metadata. Returns an error if the range -// doesn't exist. -func NewRange(rangeKey Key, engine Engine, allocator *allocator) (*Range, error) { - rng := &Range{ - rangeKey: rangeKey, +// NewRange initializes the range starting at key. +func NewRange(meta RangeMetadata, engine Engine, allocator *allocator, gossip *gossip.Gossip) *Range { + r := &Range{ + Meta: meta, engine: engine, allocator: allocator, + gossip: gossip, pending: list.New(), } - // TODO(spencer): query underlying engine for metadata. - return rng, nil + r.maybeGossip() + return r } -// readOnlyCmd executes a read-only command against the store. If this +// ReadOnlyCmd executes a read-only command against the store. If this // server has executed a raft command or heartbeat at a timestamp // greater than the read timestamp, we can satisfy the read locally // without further ado. Otherwise, we must contact ceil(N/2) raft @@ -68,14 +79,14 @@ func NewRange(rangeKey Key, engine Engine, allocator *allocator) (*Range, error) // is updated on each participant. If this replica has stale info for // the key, an error is returned to the client to retry at the replica // with newer information. -func (r *Range) readOnlyCmd(method string, args, reply interface{}) error { +func (r *Range) ReadOnlyCmd(method string, args, reply interface{}) error { if r == nil { return util.Errorf("invalid node specification") } return nil } -// readWriteCmd executes a read-write command against the store. If +// ReadWriteCmd executes a read-write command against the store. If // this node is the raft leader, it proposes the write to the other // raft participants. Otherwise, the write is forwarded via a // FollowerPropose RPC to the leader and this replica waits for an ACK @@ -84,9 +95,9 @@ func (r *Range) readOnlyCmd(method string, args, reply interface{}) error { // // Commands which mutate the store must be proposed as part of the // raft consensus write protocol. Only after committed can the command -// be executed. To facilitate this, readWriteCmd returns a channel +// be executed. To facilitate this, ReadWriteCmd returns a channel // which is signaled upon completion. -func (r *Range) readWriteCmd(method string, args, reply interface{}) <-chan error { +func (r *Range) ReadWriteCmd(method string, args, reply interface{}) <-chan error { if r == nil { c := make(chan error, 1) c <- util.Errorf("invalid node specification") @@ -106,165 +117,164 @@ func (r *Range) readWriteCmd(method string, args, reply interface{}) <-chan erro return logEntry.done } +// maybeGossip gossips in the event that this range has something +// interesting to share and it's the leader of its consensus +// group. For example, the range containing the start of the key space +// gossips its Replicas configuration to allow key addressing. This +// method should be reinvoked whenever something may have changed +// necessitating fresh gossip. +func (r *Range) maybeGossip() { + // Certain test cases have no gossip; ignore if so. + if r.gossip == nil { + return + } + // TODO(spencer): only do this if we're the leader of the consensus group. + if bytes.Equal(r.Meta.StartKey, Key("")) { + if err := r.gossip.AddInfo(gossip.KeyFirstRangeMetadata, r.Meta.Replicas, 1*time.Hour); err != nil { + glog.Warningf("failed to gossip first range metadata: %v", err) + } + } +} + // executeCmd switches over the method and multiplexes to execute the // appropriate storage API command. func (r *Range) executeCmd(method string, args, reply interface{}) error { switch method { case "Contains": - return r.Contains(args.(*ContainsRequest), reply.(*ContainsResponse)) + r.Contains(args.(*ContainsRequest), reply.(*ContainsResponse)) case "Get": - return r.Get(args.(*GetRequest), reply.(*GetResponse)) + r.Get(args.(*GetRequest), reply.(*GetResponse)) case "Put": - return r.Put(args.(*PutRequest), reply.(*PutResponse)) + r.Put(args.(*PutRequest), reply.(*PutResponse)) case "Increment": - return r.Increment(args.(*IncrementRequest), reply.(*IncrementResponse)) + r.Increment(args.(*IncrementRequest), reply.(*IncrementResponse)) case "Delete": - return r.Delete(args.(*DeleteRequest), reply.(*DeleteResponse)) + r.Delete(args.(*DeleteRequest), reply.(*DeleteResponse)) case "DeleteRange": - return r.DeleteRange(args.(*DeleteRangeRequest), reply.(*DeleteRangeResponse)) + r.DeleteRange(args.(*DeleteRangeRequest), reply.(*DeleteRangeResponse)) case "Scan": - return r.Scan(args.(*ScanRequest), reply.(*ScanResponse)) + r.Scan(args.(*ScanRequest), reply.(*ScanResponse)) case "EndTransaction": - return r.EndTransaction(args.(*EndTransactionRequest), reply.(*EndTransactionResponse)) + r.EndTransaction(args.(*EndTransactionRequest), reply.(*EndTransactionResponse)) case "AccumulateTS": - return r.AccumulateTS(args.(*AccumulateTSRequest), reply.(*AccumulateTSResponse)) + r.AccumulateTS(args.(*AccumulateTSRequest), reply.(*AccumulateTSResponse)) case "ReapQueue": - return r.ReapQueue(args.(*ReapQueueRequest), reply.(*ReapQueueResponse)) + r.ReapQueue(args.(*ReapQueueRequest), reply.(*ReapQueueResponse)) case "EnqueueUpdate": - return r.EnqueueUpdate(args.(*EnqueueUpdateRequest), reply.(*EnqueueUpdateResponse)) + r.EnqueueUpdate(args.(*EnqueueUpdateRequest), reply.(*EnqueueUpdateResponse)) case "EnqueueMessage": - return r.EnqueueMessage(args.(*EnqueueMessageRequest), reply.(*EnqueueMessageResponse)) + r.EnqueueMessage(args.(*EnqueueMessageRequest), reply.(*EnqueueMessageResponse)) default: return util.Errorf("unrecognized command type: %s", method) } + return nil } // Contains verifies the existence of a key in the key value store. -func (r *Range) Contains(args *ContainsRequest, reply *ContainsResponse) error { +func (r *Range) Contains(args *ContainsRequest, reply *ContainsResponse) { val, err := r.engine.get(args.Key) if err != nil { reply.Error = err + return } if val.Bytes != nil { reply.Exists = true } - return nil } // Get returns the value for a specified key. -func (r *Range) Get(args *GetRequest, reply *GetResponse) error { +func (r *Range) Get(args *GetRequest, reply *GetResponse) { val, err := r.engine.get(args.Key) if err != nil { reply.Error = err + return } else if val.Bytes == nil { reply.Error = util.Errorf("key %q not found", args.Key) + return } else { reply.Value.Bytes = val.Bytes } - return nil } // Put sets the value for a specified key. Conditional puts are supported. -func (r *Range) Put(args *PutRequest, reply *PutResponse) error { +func (r *Range) Put(args *PutRequest, reply *PutResponse) { // Handle conditional put. if args.ExpValue != nil { // Handle check for non-existence of key. val, err := r.engine.get(args.Key) if err != nil { reply.Error = err - return nil + return } if args.ExpValue.Bytes == nil && val.Bytes != nil { reply.Error = util.Errorf("key %q already exists", args.Key) - return nil + return } else if args.ExpValue != nil { // Handle check for existence when there is no key. if val.Bytes == nil { reply.Error = util.Errorf("key %q does not exist", args.Key) - return nil + return } else if !bytes.Equal(args.ExpValue.Bytes, val.Bytes) { reply.ActualValue.Bytes = val.Bytes reply.Error = util.Errorf("key %q does not match existing", args.Key) - return nil + return } } } if err := r.engine.put(args.Key, args.Value); err != nil { reply.Error = err + return } - return nil } // Increment increments the value (interpreted as varint64 encoded) and // returns the newly incremented value (encoded as varint64). If no // value exists for the key, zero is incremented. -func (r *Range) Increment(args *IncrementRequest, reply *IncrementResponse) error { - // First retrieve existing value. - val, err := r.engine.get(args.Key) +func (r *Range) Increment(args *IncrementRequest, reply *IncrementResponse) { + newVal, err := increment(r.engine, args.Key, args.Increment, args.Timestamp) if err != nil { reply.Error = err - return nil - } - var int64Val int64 - // If the value exists, attempt to decode it as a varint. - if val.Bytes != nil { - var numBytes int - int64Val, numBytes = binary.Varint(val.Bytes) - if numBytes == 0 { - reply.Error = util.Errorf("key %q cannot be incremented; not varint-encoded", args.Key) - return nil - } else if numBytes < 0 { - reply.Error = util.Errorf("key %q cannot be incremented; integer overflow", args.Key) - return nil - } + return } - int64Val += args.Increment - encoded := make([]byte, binary.MaxVarintLen64) - numBytes := binary.PutVarint(encoded, int64Val) - encoded = encoded[:numBytes] - if err = r.engine.put(args.Key, Value{Bytes: encoded, Timestamp: args.Timestamp}); err != nil { - reply.Error = err - } - return nil + reply.NewValue = newVal } // Delete deletes the key and value specified by key. -func (r *Range) Delete(args *DeleteRequest, reply *DeleteResponse) error { +func (r *Range) Delete(args *DeleteRequest, reply *DeleteResponse) { if err := r.engine.del(args.Key); err != nil { reply.Error = err } - return nil } // DeleteRange deletes the range of key/value pairs specified by // start and end keys. -func (r *Range) DeleteRange(args *DeleteRangeRequest, reply *DeleteRangeResponse) error { - return util.Error("unimplemented") +func (r *Range) DeleteRange(args *DeleteRangeRequest, reply *DeleteRangeResponse) { + reply.Error = util.Error("unimplemented") } // Scan scans the key range specified by start key through end key up // to some maximum number of results. The last key of the iteration is // returned with the reply. -func (r *Range) Scan(args *ScanRequest, reply *ScanResponse) error { - return util.Error("unimplemented") +func (r *Range) Scan(args *ScanRequest, reply *ScanResponse) { + reply.Error = util.Error("unimplemented") } // EndTransaction either commits or aborts (rolls back) an extant // transaction according to the args.Commit parameter. -func (r *Range) EndTransaction(args *EndTransactionRequest, reply *EndTransactionResponse) error { - return util.Error("unimplemented") +func (r *Range) EndTransaction(args *EndTransactionRequest, reply *EndTransactionResponse) { + reply.Error = util.Error("unimplemented") } // AccumulateTS is used internally to aggregate statistics over key // ranges throughout the distributed cluster. -func (r *Range) AccumulateTS(args *AccumulateTSRequest, reply *AccumulateTSResponse) error { - return util.Error("unimplemented") +func (r *Range) AccumulateTS(args *AccumulateTSRequest, reply *AccumulateTSResponse) { + reply.Error = util.Error("unimplemented") } // ReapQueue destructively queries messages from a delivery inbox // queue. This method must be called from within a transaction. -func (r *Range) ReapQueue(args *ReapQueueRequest, reply *ReapQueueResponse) error { - return util.Error("unimplemented") +func (r *Range) ReapQueue(args *ReapQueueRequest, reply *ReapQueueResponse) { + reply.Error = util.Error("unimplemented") } // EnqueueUpdate sidelines an update for asynchronous execution. @@ -272,12 +282,12 @@ func (r *Range) ReapQueue(args *ReapQueueRequest, reply *ReapQueueResponse) erro // are also built using update queues. Crucially, the enqueue happens // as part of the caller's transaction, so is guaranteed to be // executed if the transaction succeeded. -func (r *Range) EnqueueUpdate(args *EnqueueUpdateRequest, reply *EnqueueUpdateResponse) error { - return util.Error("unimplemented") +func (r *Range) EnqueueUpdate(args *EnqueueUpdateRequest, reply *EnqueueUpdateResponse) { + reply.Error = util.Error("unimplemented") } // EnqueueMessage enqueues a message (Value) for delivery to a // recipient inbox. -func (r *Range) EnqueueMessage(args *EnqueueMessageRequest, reply *EnqueueMessageResponse) error { - return util.Error("unimplemented") +func (r *Range) EnqueueMessage(args *EnqueueMessageRequest, reply *EnqueueMessageResponse) { + reply.Error = util.Error("unimplemented") } diff --git a/storage/rocksdb.go b/storage/rocksdb.go index 888b80a0035f..084933db42e1 100644 --- a/storage/rocksdb.go +++ b/storage/rocksdb.go @@ -18,20 +18,34 @@ package storage import ( + "flag" "syscall" "github.com/golang/glog" ) +const ( + // defaultCacheSize is the default value for the cacheSize command line flag. + defaultCacheSize = 1 << 30 // GB +) + +var ( + // cacheSize is the amount of memory in bytes to use for caching data. + // The value is split evenly between the stores if there are more than one. + cacheSize = flag.Int64("cache_size", defaultCacheSize, "total size in bytes for "+ + "caches, shared evenly if there are multiple storage devices") +) + // RocksDB is a wrapper around a RocksDB database instance. type RocksDB struct { - dir string // The data directory - name string // The device name + typ DiskType // HDD or SSD + dir string // The data directory } // NewRocksDB allocates and returns a new InMem object. -func NewRocksDB(dir string) (*RocksDB, error) { +func NewRocksDB(typ DiskType, dir string) (*RocksDB, error) { r := &RocksDB{ + typ: typ, dir: dir, } if _, err := r.capacity(); err != nil { @@ -57,21 +71,15 @@ func (r *RocksDB) del(key Key) error { // capacity queries the underlying file system for disk capacity // information. -func (r *RocksDB) capacity() (*DiskCapacity, error) { +func (r *RocksDB) capacity() (StoreCapacity, error) { var fs syscall.Statfs_t + var capacity StoreCapacity if err := syscall.Statfs(r.dir, &fs); err != nil { - return nil, err + return capacity, err } glog.Infof("stat filesystem: %v", fs) - if r.name == "" { - // TODO(spencer): set name. - } - - cap := uint64(fs.Bsize) * fs.Blocks - avail := uint64(fs.Bsize) * fs.Bavail - capacity := &DiskCapacity{ - Capacity: cap, - Available: avail, - } + capacity.Capacity = int64(fs.Bsize) * int64(fs.Blocks) + capacity.Available = int64(fs.Bsize) * int64(fs.Bavail) + capacity.DiskType = r.typ return capacity, nil } diff --git a/storage/store.go b/storage/store.go index 5c1a2ea67532..b918523c9f04 100644 --- a/storage/store.go +++ b/storage/store.go @@ -17,38 +17,131 @@ package storage -import "sync" +import ( + "strconv" + "sync" + "time" -// A store implements the key-value interface, but coordinates -// writes between a raft consensus group. -type store struct { - engine Engine // The underlying key-value store. - allocator *allocator // Makes allocation decisions. - mu sync.Mutex // Protects the ranges map. - ranges map[string]*Range // Map of ranges by range start key. + "github.com/cockroachdb/cockroach/gossip" + "github.com/cockroachdb/cockroach/util" +) + +// Constants for store-reserved keys. These keys are prefixed with +// three null characters so that they precede all global keys in the +// store's map. Data at these keys is local to this store and is not +// replicated via raft nor is it available via access to the global +// key-value store. +var ( + // keyStoreIdent store immutable identifier for this store, created + // when store is first bootstrapped. + keyStoreIdent = Key("\x00\x00\x00store-ident") + // keyRangeIDGenerator is a range ID generator sequence. Range IDs + // must be unique per node ID. + keyRangeIDGenerator = Key("\x00\x00\x00range-id-generator") + // keyRangeMetadataPrefix is the prefix for keys storing range metadata. + // The value is a struct of type RangeMetadata. + keyRangeMetadataPrefix = Key("\x00\x00\x00range-") +) + +// rangeKey creates a range key as the concatenation of the +// rangeMetadataKeyPrefix and hexadecimal-formatted range ID. +func rangeKey(rangeID int64) Key { + return MakeKey(keyRangeMetadataPrefix, Key(strconv.FormatInt(rangeID, 16))) +} + +// A StoreIdent uniquely identifies a store in the cluster. The +// StoreIdent is written to the underlying storage engine at a +// store-reserved system key (keyStoreIdent). +type StoreIdent struct { + ClusterID string + NodeID int32 + StoreID int32 } -// newStore returns a new instance of a store. -func newStore(engine Engine, allocator *allocator) *store { - return &store{ +// A Store maintains a map of ranges by start key. A Store corresponds +// to one physical device. +type Store struct { + Ident StoreIdent + engine Engine // The underlying key-value store + allocator *allocator // Makes allocation decisions + gossip *gossip.Gossip // Passed to new ranges + mu sync.Mutex // Protects the ranges map + ranges map[int64]*Range // Map of ranges by range ID +} + +// NewStore returns a new instance of a store. +func NewStore(engine Engine, gossip *gossip.Gossip) *Store { + return &Store{ engine: engine, - allocator: allocator, - ranges: make(map[string]*Range), + allocator: &allocator{}, + gossip: gossip, + ranges: make(map[int64]*Range), } } -// getRange fetches a range by looking at Replica.StartKey. The range is -// fetched quickly if it's already been loaded and is in the ranges -// map; otherwise, an instance of the range is instantiated from the -// underlying store. -func (s *store) getRange(r *Replica) (*Range, error) { - if rng, ok := s.ranges[string(r.RangeKey)]; ok { +// Init reads the StoreIdent from the underlying engine. +func (s *Store) Init(gossip *gossip.Gossip) error { + _, _, err := getI(s.engine, keyStoreIdent, &s.Ident) + if err != nil { + return err + } + + // TODO(spencer): scan through all range metadata and instantiate + // ranges. Right now we just get range id hardcoded as 1. + var meta RangeMetadata + _, _, err = getI(s.engine, rangeKey(1), &meta) + if err != nil { + return err + } + rng := NewRange(meta, s.engine, s.allocator, gossip) + s.ranges[meta.RangeID] = rng + + return nil +} + +// Bootstrap writes a new store ident to the underlying engine. +func (s *Store) Bootstrap(ident StoreIdent) error { + s.Ident = ident + return putI(s.engine, keyStoreIdent, s.Ident) +} + +// GetRange fetches a range by ID. The range is fetched quickly if +// it's already been loaded and is in the ranges map; otherwise, an +// instance of the range is instantiated from the underlying store. +func (s *Store) GetRange(rangeID int64) (*Range, error) { + if rng, ok := s.ranges[rangeID]; ok { return rng, nil } - rng, err := NewRange(r.RangeKey, s.engine, s.allocator) + return nil, util.Errorf("range %d not found on store", rangeID) +} + +// CreateRange allocates a new range ID and stores range metadata. +// On success, returns the new range. +func (s *Store) CreateRange(startKey, endKey Key) (*Range, error) { + rangeID, err := increment(s.engine, keyRangeIDGenerator, 1, time.Now().UnixNano()) + if err != nil { + return nil, err + } + if ok, _, _ := getI(s.engine, rangeKey(rangeID), nil); ok { + return nil, util.Error("newly allocated range id already in use") + } + // RangeMetadata is stored local to this store only. It is neither + // replicated via raft nor available via the global kv store. + meta := RangeMetadata{ + RangeID: rangeID, + StartKey: startKey, + EndKey: endKey, + } + err = putI(s.engine, rangeKey(rangeID), meta) if err != nil { return nil, err } - s.ranges[string(r.RangeKey)] = rng + rng := NewRange(meta, s.engine, s.allocator, s.gossip) + s.ranges[rangeID] = rng return rng, nil } + +// Capacity returns the capacity of the underlying storage engine. +func (s *Store) Capacity() (StoreCapacity, error) { + return s.engine.capacity() +}