Skip to content

Commit

Permalink
Merge #93945
Browse files Browse the repository at this point in the history
93945: asim: rework gossip component r=alextalks a=kvoli

This series of commits reworks the gossip component in `asim` to more accurately reflect reality. See individual commit messages.

part of #83990 

Release note: None

Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
craig[bot] and kvoli committed Jan 19, 2023
2 parents f27fa9d + b54947d commit f5cac1e
Show file tree
Hide file tree
Showing 33 changed files with 1,267 additions and 771 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test",
"//pkg/kv/kvserver/allocator/storepool:storepool_test",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim/gossip:gossip_test",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/queue:queue_test",
"//pkg/kv/kvserver/asim/state:state_test",
Expand Down Expand Up @@ -1157,6 +1158,8 @@ GO_TARGETS = [
"//pkg/kv/kvserver/apply:apply",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim/config:config",
"//pkg/kv/kvserver/asim/gossip:gossip",
"//pkg/kv/kvserver/asim/gossip:gossip_test",
"//pkg/kv/kvserver/asim/op:op",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/queue:queue",
Expand Down Expand Up @@ -2547,6 +2550,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/apply:get_x_data",
"//pkg/kv/kvserver/asim:get_x_data",
"//pkg/kv/kvserver/asim/config:get_x_data",
"//pkg/kv/kvserver/asim/gossip:get_x_data",
"//pkg/kv/kvserver/asim/op:get_x_data",
"//pkg/kv/kvserver/asim/queue:get_x_data",
"//pkg/kv/kvserver/asim/state:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"storage_engine_client.go",
"store.go",
"store_create_replica.go",
"store_gossip.go",
"store_init.go",
"store_merge.go",
"store_raft.go",
Expand Down Expand Up @@ -312,6 +313,7 @@ go_test(
"split_queue_test.go",
"split_trigger_helper_test.go",
"stats_test.go",
"store_gossip_test.go",
"store_pool_test.go",
"store_raft_test.go",
"store_rebalancer_test.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/asim/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/gossip",
"//pkg/kv/kvserver/asim/op",
"//pkg/kv/kvserver/asim/queue",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/asim/storerebalancer",
"//pkg/kv/kvserver/asim/workload",
"//pkg/roachpb",
"//pkg/util/encoding/csv",
"//pkg/util/log",
],
Expand Down
29 changes: 10 additions & 19 deletions pkg/kv/kvserver/asim/asim.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/queue"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/storerebalancer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand Down Expand Up @@ -56,7 +56,7 @@ type Simulator struct {

state state.State
changer state.Changer
exchange state.Exchange
gossip gossip.Gossip
shuffler func(n int, swap func(i, j int))

metrics *MetricsTracker
Expand All @@ -68,7 +68,6 @@ func NewSimulator(
interval, bgInterval time.Duration,
wgs []workload.Generator,
initialState state.State,
exchange state.Exchange,
changer state.Changer,
settings *config.SimulationSettings,
metrics *MetricsTracker,
Expand Down Expand Up @@ -138,7 +137,7 @@ func NewSimulator(
controllers: controllers,
srs: srs,
pacers: pacers,
exchange: exchange,
gossip: gossip.NewGossip(initialState, settings),
metrics: metrics,
shuffler: state.NewShuffler(settings.Seed),
}
Expand Down Expand Up @@ -174,17 +173,17 @@ func (s *Simulator) RunSim(ctx context.Context) {
break
}

// Update the store clocks with the current tick time.
s.tickStoreClocks(tick)

// Update the state with generated load.
s.tickWorkload(ctx, tick)

// Update pending state changes.
s.tickStateChanges(ctx, tick)

// Update each allocators view of the stores in the cluster.
s.tickStateExchange(tick)

// Update the store clocks with the current tick time.
s.tickStoreClocks(tick)
s.tickGossip(ctx, tick)

// Done with config and load updates, the state is ready for the
// allocators.
Expand Down Expand Up @@ -235,19 +234,11 @@ func (s *Simulator) tickStateChanges(ctx context.Context, tick time.Time) {
}
}

// tickStateExchange puts the current tick store descriptors into the state
// tickGossip puts the current tick store descriptors into the state
// exchange. It then updates the exchanged descriptors for each store's store
// pool.
func (s *Simulator) tickStateExchange(tick time.Time) {
if s.bgLastTick.Add(s.bgInterval).After(tick) {
return
}
storeDescriptors := s.state.StoreDescriptors()
s.exchange.Put(tick, storeDescriptors...)
for _, store := range s.state.Stores() {
storeID := store.StoreID()
s.state.UpdateStorePool(storeID, s.exchange.Get(tick, roachpb.StoreID(storeID)))
}
func (s *Simulator) tickGossip(ctx context.Context, tick time.Time) {
s.gossip.Tick(ctx, tick, s.state)
}

func (s *Simulator) tickStoreClocks(tick time.Time) {
Expand Down
31 changes: 10 additions & 21 deletions pkg/kv/kvserver/asim/asim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ func TestRunAllocatorSimulator(t *testing.T) {
rwg := make([]workload.Generator, 1)
rwg[0] = testCreateWorkloadGenerator(start, 1, 10)
m := asim.NewMetricsTracker(os.Stdout)
exchange := state.NewFixedDelayExhange(start, settings.StateExchangeInterval, settings.StateExchangeDelay)
changer := state.NewReplicaChanger()
s := state.LoadConfig(state.ComplexConfig)

sim := asim.NewSimulator(start, end, interval, interval, rwg, s, exchange, changer, settings, m)
sim := asim.NewSimulator(start, end, interval, interval, rwg, s, changer, settings, m)
sim.RunSim(ctx)
}

Expand All @@ -67,16 +66,6 @@ func testCreateWorkloadGenerator(start time.Time, stores int, keySpan int64) wor
)
}

// testPreGossipStores populates the state exchange with the existing state.
// This is done at the time given, which should be before the test start time
// minus the gossip delay and interval. This alleviates a cold start, where the
// allocator for each store does not have information to make a decision for
// the ranges it holds leases for.
func testPreGossipStores(s state.State, exchange state.Exchange, at time.Time) {
storeDescriptors := s.StoreDescriptors()
exchange.Put(at, storeDescriptors...)
}

// TestAllocatorSimulatorSpeed tests that the simulation runs at a rate of at
// least 1.67 simulated minutes per wall clock second (1:100) for a 32 node
// cluster, with 32000 replicas. The workload is generating 16000 keys per
Expand All @@ -96,7 +85,6 @@ func TestAllocatorSimulatorSpeed(t *testing.T) {
end := start.Add(5 * time.Minute)
bgInterval := 10 * time.Second
interval := 2 * time.Second
preGossipStart := start.Add(-settings.StateExchangeInterval - settings.StateExchangeDelay)

stores := 32
replsPerRange := 3
Expand All @@ -112,7 +100,6 @@ func TestAllocatorSimulatorSpeed(t *testing.T) {
sample := func() int64 {
rwg := make([]workload.Generator, 1)
rwg[0] = testCreateWorkloadGenerator(start, stores, int64(keyspace))
exchange := state.NewFixedDelayExhange(preGossipStart, settings.StateExchangeInterval, settings.StateExchangeDelay)
changer := state.NewReplicaChanger()
m := asim.NewMetricsTracker() // no output
replicaDistribution := make([]float64, stores)
Expand All @@ -129,8 +116,7 @@ func TestAllocatorSimulatorSpeed(t *testing.T) {
}

s := state.NewTestStateReplDistribution(replicaDistribution, ranges, replsPerRange, keyspace)
testPreGossipStores(s, exchange, preGossipStart)
sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, exchange, changer, settings, m)
sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, changer, settings, m)

startTime := timeutil.Now()
sim.RunSim(ctx)
Expand Down Expand Up @@ -169,7 +155,6 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) {
end := start.Add(15 * time.Minute)
bgInterval := 10 * time.Second
interval := 2 * time.Second
preGossipStart := start.Add(-settings.StateExchangeInterval - settings.StateExchangeDelay)

stores := 7
replsPerRange := 3
Expand All @@ -187,7 +172,6 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) {
for run := 0; run < runs; run++ {
rwg := make([]workload.Generator, 1)
rwg[0] = testCreateWorkloadGenerator(start, stores, int64(keyspace))
exchange := state.NewFixedDelayExhange(preGossipStart, settings.StateExchangeInterval, settings.StateExchangeDelay)
changer := state.NewReplicaChanger()
m := asim.NewMetricsTracker() // no output
replicaDistribution := make([]float64, stores)
Expand All @@ -204,12 +188,17 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) {
}

s := state.NewTestStateReplDistribution(replicaDistribution, ranges, replsPerRange, keyspace)
testPreGossipStores(s, exchange, preGossipStart)
sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, exchange, changer, settings, m)
sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, changer, settings, m)

ctx := context.Background()
sim.RunSim(ctx)
descs := s.StoreDescriptors()

storeRefs := s.Stores()
storeIDs := make([]state.StoreID, len(storeRefs))
for i, store := range storeRefs {
storeIDs[i] = store.StoreID()
}
descs := s.StoreDescriptors(false /* cached */, storeIDs...)

if run == 0 {
refRun = descs
Expand Down
39 changes: 39 additions & 0 deletions pkg/kv/kvserver/asim/gossip/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "gossip",
srcs = [
"exchange.go",
"gossip.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/state",
"//pkg/roachpb",
"//pkg/util/protoutil",
],
)

go_test(
name = "gossip_test",
srcs = [
"exchange_test.go",
"gossip_test.go",
],
args = ["-test.timeout=295s"],
embed = [":gossip"],
deps = [
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/state",
"//pkg/roachpb",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
64 changes: 64 additions & 0 deletions pkg/kv/kvserver/asim/gossip/exchange.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package gossip

import (
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// exchangeInfo contains the information of a gossiped store descriptor.
type exchangeInfo struct {
created time.Time
desc roachpb.StoreDescriptor
}

// fixedDelayExchange simulates a gossip exchange network with a symmetric
// fixed delay between all connected clients.
type fixedDelayExchange struct {
pending []exchangeInfo
settings *config.SimulationSettings
}

// put adds the given descriptors at the current tick into the exchange
// network.
func (u *fixedDelayExchange) put(tick time.Time, descs ...roachpb.StoreDescriptor) {
for _, desc := range descs {
u.pending = append(u.pending, exchangeInfo{created: tick, desc: desc})
}
}

// updates returns back exchanged infos, wrapped as store details that have
// completed between the last tick update was called and the tick given.
func (u *fixedDelayExchange) updates(tick time.Time) []*storepool.StoreDetail {
sort.Slice(u.pending, func(i, j int) bool { return u.pending[i].created.Before(u.pending[j].created) })
ready := []*storepool.StoreDetail{}
i := 0
for ; i < len(u.pending) && !tick.Before(u.pending[i].created.Add(u.settings.StateExchangeDelay)); i++ {
ready = append(ready, makeStoreDetail(&u.pending[i].desc, u.pending[i].created))
}
u.pending = u.pending[i:]
return ready
}

// makeStoreDetail wraps a store descriptor into a storepool StoreDetail at the
// given tick.
func makeStoreDetail(desc *roachpb.StoreDescriptor, tick time.Time) *storepool.StoreDetail {
return &storepool.StoreDetail{
Desc: desc,
LastUpdatedTime: tick,
LastAvailable: tick,
}
}
51 changes: 51 additions & 0 deletions pkg/kv/kvserver/asim/gossip/exchange_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package gossip

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/stretchr/testify/require"
)

func TestFixedDelayExchange(t *testing.T) {
makeStoresFn := func(stores []int32) []roachpb.StoreDescriptor {
descriptors := make([]roachpb.StoreDescriptor, len(stores))
for i := range stores {
descriptors[i] = roachpb.StoreDescriptor{StoreID: roachpb.StoreID(stores[i])}

}
return descriptors
}

settings := config.DefaultSimulationSettings()
tick := state.TestingStartTime()
exchange := fixedDelayExchange{pending: []exchangeInfo{}, settings: settings}

// There should be no updates initially.
require.Len(t, exchange.updates(tick), 0)

// Put an update at the current tick.
exchange.put(tick, makeStoresFn([]int32{1, 2, 3})...)
require.Len(t, exchange.pending, 3)

// There should be no updates until after the tick + state exchange delay.
halfTick := tick.Add(settings.StateExchangeDelay / 2)
require.Len(t, exchange.updates(halfTick), 0)

// Update the tick to be >= tick + delay, there should be three updates.
tick = tick.Add(settings.StateExchangeDelay)
require.Len(t, exchange.updates(tick), 3)
require.Len(t, exchange.pending, 0)
}
Loading

0 comments on commit f5cac1e

Please sign in to comment.