Skip to content

Commit

Permalink
Integrated a CLI interface to the main cockroach binary which current…
Browse files Browse the repository at this point in the history
…ly has two commands: init & start.

Summary:
A new cluster is init'd by specifying a single data directory as first
argument and a default zone configuration file as second arg. A new
RocksDB instance is created in the data directory (an error results if
one is already there), and a new cluster UUID is generated to uniquely
identify the cockroach cluster. The first range is created as a single
replica with ident: NodeID: 1, StoreID: 1, RangeID: 1. meta1/meta2
records are created for the first range, and node id sequence generator,
store id sequence generator, and store-local range id sequence generator
are all created.

The roach node is run using "cockroach start". Typically this will be
accompanied by --bootstrap_hosts in order to connect to gossip network,
and --data_dirs=dir1,dir2,...  The --data_dirs can either have a list
of paths or a list of "mem=100000,mem=200000,...", which creates in-mem
stores. Any stores which are as yet uninitialized are bootstrapped
using the cluster ID and new store IDs from the store id sequence
generator. Stores other than the one used to init a new cluster,
or all stores on a new node being added to an existing cluster will need
initialization.

Changed Replica specification to be a series of three integers: NodeID,
StoreID & RangeID. Node & store IDs are 32-bit; range ID is 64-bit.

Added convenience methods to kv.DB as well as storage.Engine for reading
and writing arbitrary go values via interface{}.

Test Plan: go test

Reviewers: petermattis, andybons, bdarnell

Reviewed By: bdarnell

Subscribers: hrsht, levonlloyd

Differential Revision: http://phabricator.andybons.com/D29
  • Loading branch information
Spencer Kimball committed Apr 17, 2014
1 parent 8afb7eb commit a97fee5
Show file tree
Hide file tree
Showing 26 changed files with 1,268 additions and 573 deletions.
14 changes: 7 additions & 7 deletions gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ func init() {

// client is a client-side RPC connection to a gossip peer node.
type client struct {
addr net.Addr // Peer node network address
rpcClient *rpc.Client // RPC client
forwardAddr net.Addr // Set if disconnected with an alternate addr
lastFresh int64 // Last wall time client received fresh info
err error // Set if client experienced an error
closer chan interface{} // Client shutdown channel
addr net.Addr // Peer node network address
rpcClient *rpc.Client // RPC client
forwardAddr net.Addr // Set if disconnected with an alternate addr
lastFresh int64 // Last wall time client received fresh info
err error // Set if client experienced an error
closer chan struct{} // Client shutdown channel
}

// newClient creates and returns a client struct.
func newClient(addr net.Addr) *client {
return &client{
addr: addr,
closer: make(chan interface{}, 1),
closer: make(chan struct{}, 1),
}
}

Expand Down
22 changes: 11 additions & 11 deletions gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@ var (
)

const (
// SentinelGossip is gossiped info which must not expire or node
// considers itself partitioned and will retry with bootstrap
// hosts.
SentinelGossip = "meta0"
// NodeCountGossip is the count of gossip nodes in the network.
NodeCountGossip = "nodeCount"
// MaxPeers is the maximum number of connected gossip peers.
MaxPeers = 10
// defaultNodeCount is the default number of nodes in the gossip
Expand All @@ -106,6 +100,8 @@ const (
// entre to the gossip network.
type Gossip struct {
Name string // Optional node name
Connected chan struct{} // Closed upon initial connection
hasConnected bool // Set first time network is connected
*server // Embedded gossip RPC server
bootstraps *addrSet // Bootstrap host addresses
outgoing *addrSet // Set of outgoing client addresses
Expand All @@ -119,6 +115,7 @@ type Gossip struct {
// Cockroach RPC server to initialize the gossip service endpoint.
func New(rpcServer *rpc.Server) *Gossip {
g := &Gossip{
Connected: make(chan struct{}, 1),
server: newServer(rpcServer, *gossipInterval),
bootstraps: newAddrSet(MaxPeers),
outgoing: newAddrSet(MaxPeers),
Expand Down Expand Up @@ -186,7 +183,7 @@ func (g *Gossip) GetGroupInfos(prefix string) ([]interface{}, error) {
// RegisterGroup registers a new group with info store. Returns an
// error if the group was already registered.
func (g *Gossip) RegisterGroup(prefix string, limit int, typeOf GroupType) error {
g.mu.Lock()
g.mu.Lock() // Copyright 2014 The Cockroach Authors.
defer g.mu.Unlock()
return g.is.registerGroup(newGroup(prefix, limit, typeOf))
}
Expand Down Expand Up @@ -249,7 +246,7 @@ func (g *Gossip) Stop() <-chan error {
func (g *Gossip) maxToleratedHops() uint32 {
// Get info directly as we have mutex held here.
var nodeCount = int64(defaultNodeCount)
if info := g.is.getInfo(NodeCountGossip); info != nil {
if info := g.is.getInfo(KeyNodeCount); info != nil {
nodeCount = info.Val.(int64)
}
return uint32(math.Ceil(math.Log(float64(nodeCount))/math.Log(float64(MaxPeers))))*2 + 1
Expand Down Expand Up @@ -319,7 +316,7 @@ func (g *Gossip) bootstrap() {
if avail.len() > 0 {
// Check whether or not we need bootstrap.
haveClients := g.outgoing.len() > 0
haveSentinel := g.is.getInfo(SentinelGossip) != nil
haveSentinel := g.is.getInfo(KeySentinel) != nil
if !haveClients || !haveSentinel {
// Select a bootstrap address at random and start client.
addr := avail.selectRandom()
Expand Down Expand Up @@ -389,9 +386,12 @@ func (g *Gossip) manage() {
if g.outgoing.len() == 0 && g.filterExtant(g.bootstraps).len() > 0 {
glog.Infof("no outgoing hosts; signaling bootstrap")
g.stalled.Signal()
} else if g.is.getInfo(SentinelGossip) == nil {
glog.Infof("missing sentinel gossip %s; assuming partition and reconnecting", SentinelGossip)
} else if g.is.getInfo(KeySentinel) == nil {
glog.Warningf("missing sentinel gossip %s; assuming partition and reconnecting", KeySentinel)
g.stalled.Signal()
} else if !g.hasConnected {
g.hasConnected = true
close(g.Connected)
}

// The exit condition.
Expand Down
14 changes: 11 additions & 3 deletions gossip/infostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ func newInfoStore(nodeAddr net.Addr) *infoStore {
func (is *infoStore) newInfo(key string, val interface{}, ttl time.Duration) *info {
is.seqGen++
now := monotonicUnixNano()
ttlStamp := now + int64(ttl)
if ttl == 0*time.Second {
ttlStamp = math.MaxInt64
}
return &info{
Key: key,
Val: val,
Timestamp: now,
TTLStamp: now + int64(ttl),
TTLStamp: ttlStamp,
NodeAddr: is.NodeAddr,
peerAddr: is.NodeAddr,
seq: is.seqGen,
Expand Down Expand Up @@ -135,8 +139,12 @@ func (is *infoStore) getGroupInfos(prefix string) infoArray {
//
// REQUIRES: group.prefix is not already in the info store's groups map.
func (is *infoStore) registerGroup(g *group) error {
if _, ok := is.Groups[g.Prefix]; ok {
return util.Errorf("group %q already in group map", g.Prefix)
if g2, ok := is.Groups[g.Prefix]; ok {
if g.Prefix != g2.Prefix || g.Limit != g2.Limit || g.TypeOf != g2.TypeOf {
return util.Errorf("group %q already in group map with different settings %v vs. %v",
g.Prefix, g, g2)
}
return nil
}
is.Groups[g.Prefix] = g
return nil
Expand Down
23 changes: 20 additions & 3 deletions gossip/infostore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package gossip

import (
"fmt"
"math"
"testing"
"time"
)
Expand Down Expand Up @@ -52,9 +53,25 @@ func TestRegisterGroup(t *testing.T) {
t.Error("shouldn't belong to a group")
}

// Try to register a group that's already been registered.
if is.registerGroup(groupA) == nil {
t.Error("should not be able to register group A twice")
// Try to register a group that's already been registered; will
// succeed if identical.
if is.registerGroup(groupA) != nil {
t.Error("should be able to register group A twice")
}
// Now change the group type and try again.
groupAAlt := newGroup("a", 1, MaxGroup)
if is.registerGroup(groupAAlt) == nil {
t.Error("should not be able to register group A again with different properties")
}
}

// TestZeroDuration verifies that specifying a zero duration sets
// TTLStamp to max int64.
func TestZeroDuration(t *testing.T) {
is := newInfoStore(emptyAddr)
info := is.newInfo("a", float64(1), 0*time.Second)
if info.TTLStamp != math.MaxInt64 {
t.Errorf("expected zero duration to get max TTLStamp: %d", info.TTLStamp)
}
}

Expand Down
53 changes: 53 additions & 0 deletions gossip/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Spencer Kimball ([email protected])

package gossip

// Constants for gossip keys.
const (
// KeyClusterID is the unique UUID for this Cockroach cluster.
// The value is a string UUID for the cluster.
KeyClusterID = "cluster-id"

// KeyMaxCapacityPrefix is the key prefix for gossiping available
// store capacity. The suffix is composed of:
// <datacenter>-<hex node ID>-<hex store ID>. The value is a
// storage.StoreAttributes struct.
KeyMaxAvailCapacityPrefix = "max-avail-capacity-"

// KeyNodeCount is the count of gossip nodes in the network. The
// value is an int64 containing the count of nodes in the cluster.
// TODO(spencer): should remove this and instead just count the
// number of node ids being gossiped.
KeyNodeCount = "node-count"

// KeyNodeIDPrefix is the key prefix for gossiping node id
// addresses. The actual key is suffixed with the hexadecimal
// representation of the node id and the value is the host:port
// string address of the node. E.g. node-1bfa: fwd56.sjcb1:24001
KeyNodeIDPrefix = "node-"

// SentinelKey is a key for gossip which must not expire or else the
// node considers itself partitioned and will retry with bootstrap hosts.
KeySentinel = KeyClusterID

// KeyFirstRangeMetadata is the metadata for the "first" range. The
// "first" range contains the meta1 key range, the first level of
// the bi-level key addressing scheme. The value is a slice of
// storage.Replica structs.
KeyFirstRangeMetadata = "first-range"
)
4 changes: 2 additions & 2 deletions gossip/simulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func SimulateNetwork(nodeCount int, network string, gossipInterval time.Duration
node.SetInterval(gossipInterval)
// Node 0 gossips node count.
if i == 0 {
node.AddInfo(NodeCountGossip, int64(nodeCount), time.Hour)
node.AddInfo(KeyNodeCount, int64(nodeCount), time.Hour)
}
node.Start()
nodes[addrs[i].String()] = node
Expand All @@ -137,7 +137,7 @@ func SimulateNetwork(nodeCount int, network string, gossipInterval time.Duration
select {
case <-gossipTimeout:
// Node 0 gossips sentinel every cycle.
nodes[addrs[0].String()].AddInfo(SentinelGossip, int64(cycle), time.Hour)
nodes[addrs[0].String()].AddInfo(KeySentinel, int64(cycle), time.Hour)
if !simCallback(cycle, nodes) {
complete = true
}
Expand Down
40 changes: 40 additions & 0 deletions kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package kv

import (
"bytes"
"encoding/gob"
"reflect"
"time"

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/rpc"
Expand All @@ -42,6 +45,43 @@ type DB interface {
EnqueueMessage(args *storage.EnqueueMessageRequest) <-chan *storage.EnqueueMessageResponse
}

// GetI fetches the value at the specified key and deserializes it
// into "value". Returns true on success or false if the key was not
// found. The timestamp of the write is returned as the second return
// value. The first result parameter is "ok", true if a value was
// found for the requested key; false otherwise. An error is returned
// on error fetching from underlying storage or deserializing value.
func GetI(db DB, key storage.Key, value interface{}) (bool, int64, error) {
gr := <-db.Get(&storage.GetRequest{Key: key})
if gr.Error != nil {
return false, 0, gr.Error
}
if len(gr.Value.Bytes) == 0 {
return false, 0, nil
}
if err := gob.NewDecoder(bytes.NewBuffer(gr.Value.Bytes)).Decode(value); err != nil {
return true, gr.Value.Timestamp, err
}
return true, gr.Value.Timestamp, nil
}

// PutI sets the given key to the serialized byte string of the value
// provided. Uses current time and default expiration.
func PutI(db DB, key storage.Key, value interface{}) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(value); err != nil {
return err
}
pr := <-db.Put(&storage.PutRequest{
Key: key,
Value: storage.Value{
Bytes: buf.Bytes(),
Timestamp: time.Now().UnixNano(),
},
})
return pr.Error
}

// A DistDB provides methods to access Cockroach's monolithic,
// distributed key value store. Each method invocation triggers a
// lookup or lookups to find replica metadata for implicated key
Expand Down
22 changes: 1 addition & 21 deletions kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,9 @@
package kv

import (
"net"
"net/http"
"net/http/httptest"
"sync"

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/rpc"
"github.com/cockroachdb/cockroach/storage"
"github.com/golang/glog"
)

var (
Expand All @@ -35,33 +29,19 @@ var (
)

type kvTestServer struct {
rpc *rpc.Server
gossip *gossip.Gossip
node *storage.Node
db DB
rest *RESTServer
httpServer *httptest.Server
}

func startServer() *kvTestServer {
once.Do(func() {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
glog.Fatal(err)
}
server = &kvTestServer{
rpc: rpc.NewServer(addr),
}
server.gossip = gossip.New(server.rpc)
server.gossip.SetBootstrap([]net.Addr{server.rpc.Addr})
server.node = storage.NewNode(server.rpc, server.gossip)
server = &kvTestServer{}
server.db = NewLocalDB()
server.rest = NewRESTServer(server.db)
server.httpServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
server.rest.HandleAction(w, r)
}))
server.gossip.Start()
go server.rpc.ListenAndServe() // blocks, so launch in a goroutine
})
return server
}
Loading

0 comments on commit a97fee5

Please sign in to comment.