Skip to content

Commit

Permalink
Merge pull request #301 from cockroachdb/pmattis/gossip-sim
Browse files Browse the repository at this point in the history
Minor code reorg.
  • Loading branch information
petermattis committed Feb 4, 2015
2 parents cc5e4ae + ccde7e6 commit 4b64c75
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 56 deletions.
16 changes: 9 additions & 7 deletions gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
//
// Author: Spencer Kimball ([email protected])

package gossip
package gossip_test

import (
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/gossip/simulation"
"github.com/cockroachdb/cockroach/rpc"
"github.com/cockroachdb/cockroach/util/hlc"
)
Expand All @@ -32,7 +34,7 @@ import (
// more cycles for a fully connected network on busy hardware. The clock
// should be advanced manually instead (this requires some changes to gossip).
func verifyConvergence(numNodes, maxCycles int, t *testing.T) {
network := NewSimulationNetwork(numNodes, "unix", DefaultTestGossipInterval)
network := simulation.NewNetwork(numNodes, "unix", simulation.DefaultTestGossipInterval)

if connectedCycle := network.RunUntilFullyConnected(); connectedCycle > maxCycles {
t.Errorf("expected a fully-connected network within %d cycles; took %d",
Expand All @@ -53,7 +55,7 @@ func TestConvergence(t *testing.T) {
// TestGossipInfoStore verifies operation of gossip instance infostore.
func TestGossipInfoStore(t *testing.T) {
rpcContext := rpc.NewContext(hlc.NewClock(hlc.UnixNano), rpc.LoadInsecureTLSConfig())
g := New(rpcContext)
g := gossip.New(rpcContext)
g.AddInfo("i", int64(1), time.Hour)
if val, err := g.GetInfo("i"); val.(int64) != int64(1) || err != nil {
t.Errorf("error fetching int64: %v", err)
Expand Down Expand Up @@ -81,10 +83,10 @@ func TestGossipInfoStore(t *testing.T) {
// gossip instance infostore.
func TestGossipGroupsInfoStore(t *testing.T) {
rpcContext := rpc.NewContext(hlc.NewClock(hlc.UnixNano), rpc.LoadInsecureTLSConfig())
g := New(rpcContext)
g := gossip.New(rpcContext)

// For int64.
g.RegisterGroup("i", 3, MinGroup)
g.RegisterGroup("i", 3, gossip.MinGroup)
for i := 0; i < 3; i++ {
g.AddInfo(fmt.Sprintf("i.%d", i), int64(i), time.Hour)
}
Expand All @@ -105,7 +107,7 @@ func TestGossipGroupsInfoStore(t *testing.T) {
}

// For float64.
g.RegisterGroup("f", 3, MinGroup)
g.RegisterGroup("f", 3, gossip.MinGroup)
for i := 0; i < 3; i++ {
g.AddInfo(fmt.Sprintf("f.%d", i), float64(i), time.Hour)
}
Expand All @@ -123,7 +125,7 @@ func TestGossipGroupsInfoStore(t *testing.T) {
}

// For string.
g.RegisterGroup("s", 3, MinGroup)
g.RegisterGroup("s", 3, gossip.MinGroup)
for i := 0; i < 3; i++ {
g.AddInfo(fmt.Sprintf("s.%d", i), fmt.Sprintf("%d", i), time.Hour)
}
Expand Down
15 changes: 11 additions & 4 deletions simulation/gossip.go → gossip/simulation/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
//
// Author: Spencer Kimball ([email protected])

// We mark this file as "build ignore" so that it won't be built as
// part of the simulation package, but can still be run using "go run
// gossip.go".

// +build ignore

/*
Package simulation provides tools meant to visualize or test aspects
of a Cockroach cluster on a single host.
Expand All @@ -30,7 +36,7 @@ simulation.
To run:
go run simulation/gossip.go -size=(small|medium|large|huge|ginormous)
go run gossip.go -size=(small|medium|large|huge|ginormous)
Log output includes instructions for displaying the graph output as a
series of images to visualize the evolution of the network.
Expand Down Expand Up @@ -71,6 +77,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/gossip/simulation"
"github.com/cockroachdb/cockroach/util/log"
)

Expand Down Expand Up @@ -140,7 +147,7 @@ func (em edgeMap) addEdge(addr string, e edge) {
// node5 -> node2
// node5 -> node3
// }
func outputDotFile(dotFN string, cycle int, network *gossip.SimulationNetwork, edgeSet map[string]edge) string {
func outputDotFile(dotFN string, cycle int, network *simulation.Network, edgeSet map[string]edge) string {
f, err := os.Create(dotFN)
if err != nil {
log.Fatalf("unable to create temp file: %s", err)
Expand Down Expand Up @@ -291,9 +298,9 @@ func main() {

edgeSet := make(map[string]edge)

n := gossip.NewSimulationNetwork(nodeCount, *networkType, gossipInterval)
n := simulation.NewNetwork(nodeCount, *networkType, gossipInterval)
n.SimulateNetwork(
func(cycle int, network *gossip.SimulationNetwork) bool {
func(cycle int, network *simulation.Network) bool {
if cycle == numCycles {
return false
}
Expand Down
86 changes: 44 additions & 42 deletions gossip/simulation.go → gossip/simulation/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,32 @@
//
// Author: Spencer Kimball ([email protected])

package gossip
package simulation

import (
"fmt"
"net"
"time"

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/rpc"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
)

// SimulationNode represents a node used in a SimulationNetwork.
// It includes information about the node's gossip instance,
// network address, and underlying server.
type SimulationNode struct {
Gossip *Gossip
// Node represents a node used in a Network. It includes information
// about the node's gossip instance, network address, and underlying
// server.
type Node struct {
Gossip *gossip.Gossip
Addr net.Addr
Server *rpc.Server
}

// SimulationNetwork provides access to a test gossip network of nodes.
type SimulationNetwork struct {
Nodes []*SimulationNode
// Network provides access to a test gossip network of nodes.
type Network struct {
Nodes []*Node
Addrs []net.Addr
NetworkType string // "tcp" or "unix"
GossipInterval time.Duration // The length of a round of gossip
Expand All @@ -49,14 +50,14 @@ type SimulationNetwork struct {
// on time scale for testing gossip networks.
const DefaultTestGossipInterval = 10 * time.Millisecond

// NewSimulationNetwork creates nodeCount gossip nodes. The networkType should
// NewNetwork creates nodeCount gossip nodes. The networkType should
// be set to either "tcp" or "unix". The gossipInterval should be set
// to a compressed simulation timescale, though large enough to give
// the concurrent goroutines enough time to pass data back and forth
// in order to yield accurate estimates of how old data actually ends
// up being at the various nodes (e.g. DefaultTestGossipInterval).
func NewSimulationNetwork(nodeCount int, networkType string,
gossipInterval time.Duration) *SimulationNetwork {
func NewNetwork(nodeCount int, networkType string,
gossipInterval time.Duration) *Network {

tlsConfig := rpc.LoadInsecureTLSConfig()
clock := hlc.NewClock(hlc.UnixNano)
Expand All @@ -80,30 +81,30 @@ func NewSimulationNetwork(nodeCount int, networkType string,
bootstrap = addrs[:3]
}

nodes := make([]*SimulationNode, nodeCount)
nodes := make([]*Node, nodeCount)
for i := 0; i < nodeCount; i++ {
node := New(rpcContext)
node := gossip.New(rpcContext)
node.Name = fmt.Sprintf("Node%d", i)
node.SetBootstrap(bootstrap)
node.SetInterval(gossipInterval)
node.Start(servers[i])
// Node 0 gossips node count.
if i == 0 {
node.AddInfo(KeyNodeCount, int64(nodeCount), time.Hour)
node.AddInfo(gossip.KeyNodeCount, int64(nodeCount), time.Hour)
}
nodes[i] = &SimulationNode{Gossip: node, Addr: addrs[i], Server: servers[i]}
nodes[i] = &Node{Gossip: node, Addr: addrs[i], Server: servers[i]}
}

return &SimulationNetwork{
return &Network{
Nodes: nodes,
Addrs: addrs,
NetworkType: networkType,
GossipInterval: gossipInterval}
}

// GetNodeFromAddr returns the simulation node associated
// with provided network address, or nil if there is no such node.
func (n *SimulationNetwork) GetNodeFromAddr(addr string) (*SimulationNode, bool) {
// GetNodeFromAddr returns the simulation node associated with
// provided network address, or nil if there is no such node.
func (n *Network) GetNodeFromAddr(addr string) (*Node, bool) {
for i := 0; i < len(n.Nodes); i++ {
if n.Nodes[i].Addr.String() == addr {
return n.Nodes[i], true
Expand All @@ -112,30 +113,30 @@ func (n *SimulationNetwork) GetNodeFromAddr(addr string) (*SimulationNode, bool)
return nil, false
}

// SimulateNetwork runs a number of gossipInterval periods within the given
// SimulationNetwork. After each gossipInterval period, simCallback is invoked.
// When it returns false, the simulation ends. If it returns true, the
// simulation continues another cycle.
// SimulateNetwork runs a number of gossipInterval periods within the
// given Network. After each gossipInterval period, simCallback is
// invoked. When it returns false, the simulation ends. If it returns
// true, the simulation continues another cycle.
//
// Node0 gossips the node count as well as the gossip sentinel. The gossip
// bootstrap hosts are set to the first three nodes (or fewer if less than
// three are available).
// Node0 gossips the node count as well as the gossip sentinel. The
// gossip bootstrap hosts are set to the first three nodes (or fewer
// if less than three are available).
//
// At each cycle of the simulation, node 0 gossips the sentinel. If the
// simulation requires other nodes to gossip, this should be done via
// simCallback.
// At each cycle of the simulation, node 0 gossips the sentinel. If
// the simulation requires other nodes to gossip, this should be done
// via simCallback.
//
// The simulation callback receives a map of nodes, keyed by node address.
func (n *SimulationNetwork) SimulateNetwork(
simCallback func(cycle int, network *SimulationNetwork) bool) {
func (n *Network) SimulateNetwork(
simCallback func(cycle int, network *Network) bool) {
gossipTimeout := time.Tick(n.GossipInterval)
nodes := n.Nodes
var complete bool
for cycle := 0; !complete; cycle++ {
select {
case <-gossipTimeout:
// Node 0 gossips sentinel every cycle.
nodes[0].Gossip.AddInfo(KeySentinel, int64(cycle), time.Hour)
nodes[0].Gossip.AddInfo(gossip.KeySentinel, int64(cycle), time.Hour)
if !simCallback(cycle, n) {
complete = true
}
Expand All @@ -144,19 +145,19 @@ func (n *SimulationNetwork) SimulateNetwork(
}

// Stop all servers and gossip nodes.
func (n *SimulationNetwork) Stop() {
func (n *Network) Stop() {
for i := 0; i < len(n.Nodes); i++ {
n.Nodes[i].Server.Close()
n.Nodes[i].Gossip.Stop()
}
}

// RunUntilFullyConnected blocks until the gossip network has received gossip
// from every other node in the network. It returns the gossip cycle at which
// the network became fully connected.
func (n *SimulationNetwork) RunUntilFullyConnected() int {
// RunUntilFullyConnected blocks until the gossip network has received
// gossip from every other node in the network. It returns the gossip
// cycle at which the network became fully connected.
func (n *Network) RunUntilFullyConnected() int {
var connectedAtCycle int
n.SimulateNetwork(func(cycle int, network *SimulationNetwork) bool {
n.SimulateNetwork(func(cycle int, network *Network) bool {
nodes := network.Nodes
// Every node should gossip.
for i := 0; i < len(nodes); i++ {
Expand All @@ -171,9 +172,10 @@ func (n *SimulationNetwork) RunUntilFullyConnected() int {
return connectedAtCycle
}

// isNetworkConnected returns true if the network is fully connected with
// no partitions (i.e. every node knows every other node's network address).
func (n *SimulationNetwork) isNetworkConnected() bool {
// isNetworkConnected returns true if the network is fully connected
// with no partitions (i.e. every node knows every other node's
// network address).
func (n *Network) isNetworkConnected() bool {
for i := 0; i < len(n.Nodes); i++ {
for keyIdx := 0; keyIdx < len(n.Addrs); keyIdx++ {
_, err := n.Nodes[i].Gossip.GetInfo(n.Addrs[keyIdx].String())
Expand Down
7 changes: 4 additions & 3 deletions kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"time"

"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/gossip/simulation"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/storage"
"github.com/cockroachdb/cockroach/storage/engine"
)

func TestGetFirstRangeDescriptor(t *testing.T) {
n := gossip.NewSimulationNetwork(3, "unix", gossip.DefaultTestGossipInterval)
n := simulation.NewNetwork(3, "unix", simulation.DefaultTestGossipInterval)
ds := NewDistSender(n.Nodes[0].Gossip)
if _, err := ds.getFirstRangeDescriptor(); err == nil {
t.Errorf("expected not to find first range descriptor")
Expand All @@ -45,7 +46,7 @@ func TestGetFirstRangeDescriptor(t *testing.T) {
n.Nodes[1].Gossip.AddInfo(
gossip.KeyFirstRangeDescriptor, *expectedDesc, time.Hour)
maxCycles := 10
n.SimulateNetwork(func(cycle int, network *gossip.SimulationNetwork) bool {
n.SimulateNetwork(func(cycle int, network *simulation.Network) bool {
desc, err := ds.getFirstRangeDescriptor()
if err != nil {
if cycle >= maxCycles {
Expand All @@ -65,7 +66,7 @@ func TestGetFirstRangeDescriptor(t *testing.T) {
}

func TestVerifyPermissions(t *testing.T) {
n := gossip.NewSimulationNetwork(1, "unix", gossip.DefaultTestGossipInterval)
n := simulation.NewNetwork(1, "unix", simulation.DefaultTestGossipInterval)
ds := NewDistSender(n.Nodes[0].Gossip)
config1 := &proto.PermConfig{
Read: []string{"read1", "readAll", "rw", "rwAll"},
Expand Down

0 comments on commit 4b64c75

Please sign in to comment.