diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 7ceecf5e690d..ba1ba3f2170c 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -185,6 +185,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test", "//pkg/kv/kvserver/allocator/storepool:storepool_test", "//pkg/kv/kvserver/apply:apply_test", + "//pkg/kv/kvserver/asim/gossip:gossip_test", "//pkg/kv/kvserver/asim/op:op_test", "//pkg/kv/kvserver/asim/queue:queue_test", "//pkg/kv/kvserver/asim/state:state_test", @@ -1157,6 +1158,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/apply:apply", "//pkg/kv/kvserver/apply:apply_test", "//pkg/kv/kvserver/asim/config:config", + "//pkg/kv/kvserver/asim/gossip:gossip", + "//pkg/kv/kvserver/asim/gossip:gossip_test", "//pkg/kv/kvserver/asim/op:op", "//pkg/kv/kvserver/asim/op:op_test", "//pkg/kv/kvserver/asim/queue:queue", @@ -2547,6 +2550,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/apply:get_x_data", "//pkg/kv/kvserver/asim:get_x_data", "//pkg/kv/kvserver/asim/config:get_x_data", + "//pkg/kv/kvserver/asim/gossip:get_x_data", "//pkg/kv/kvserver/asim/op:get_x_data", "//pkg/kv/kvserver/asim/queue:get_x_data", "//pkg/kv/kvserver/asim/state:get_x_data", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index bcfafec221b8..a3f71af914ce 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -80,6 +80,7 @@ go_library( "storage_engine_client.go", "store.go", "store_create_replica.go", + "store_gossip.go", "store_init.go", "store_merge.go", "store_raft.go", @@ -312,6 +313,7 @@ go_test( "split_queue_test.go", "split_trigger_helper_test.go", "stats_test.go", + "store_gossip_test.go", "store_pool_test.go", "store_raft_test.go", "store_rebalancer_test.go", diff --git a/pkg/kv/kvserver/asim/BUILD.bazel b/pkg/kv/kvserver/asim/BUILD.bazel index c1e19e2d7f51..2555e556f239 100644 --- a/pkg/kv/kvserver/asim/BUILD.bazel +++ b/pkg/kv/kvserver/asim/BUILD.bazel @@ -12,12 +12,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/asim/config", + "//pkg/kv/kvserver/asim/gossip", "//pkg/kv/kvserver/asim/op", "//pkg/kv/kvserver/asim/queue", "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/asim/storerebalancer", "//pkg/kv/kvserver/asim/workload", - "//pkg/roachpb", "//pkg/util/encoding/csv", "//pkg/util/log", ], diff --git a/pkg/kv/kvserver/asim/asim.go b/pkg/kv/kvserver/asim/asim.go index 03657eed2139..6166ef0fce7b 100644 --- a/pkg/kv/kvserver/asim/asim.go +++ b/pkg/kv/kvserver/asim/asim.go @@ -15,12 +15,12 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/queue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/storerebalancer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -56,7 +56,7 @@ type Simulator struct { state state.State changer state.Changer - exchange state.Exchange + gossip gossip.Gossip shuffler func(n int, swap func(i, j int)) metrics *MetricsTracker @@ -68,7 +68,6 @@ func NewSimulator( interval, bgInterval time.Duration, wgs []workload.Generator, initialState state.State, - exchange state.Exchange, changer state.Changer, settings *config.SimulationSettings, metrics *MetricsTracker, @@ -138,7 +137,7 @@ func NewSimulator( controllers: controllers, srs: srs, pacers: pacers, - exchange: exchange, + gossip: gossip.NewGossip(initialState, settings), metrics: metrics, shuffler: state.NewShuffler(settings.Seed), } @@ -174,6 +173,9 @@ func (s *Simulator) RunSim(ctx context.Context) { break } + // Update the store clocks with the current tick time. + s.tickStoreClocks(tick) + // Update the state with generated load. s.tickWorkload(ctx, tick) @@ -181,10 +183,7 @@ func (s *Simulator) RunSim(ctx context.Context) { s.tickStateChanges(ctx, tick) // Update each allocators view of the stores in the cluster. - s.tickStateExchange(tick) - - // Update the store clocks with the current tick time. - s.tickStoreClocks(tick) + s.tickGossip(ctx, tick) // Done with config and load updates, the state is ready for the // allocators. @@ -235,19 +234,11 @@ func (s *Simulator) tickStateChanges(ctx context.Context, tick time.Time) { } } -// tickStateExchange puts the current tick store descriptors into the state +// tickGossip puts the current tick store descriptors into the state // exchange. It then updates the exchanged descriptors for each store's store // pool. -func (s *Simulator) tickStateExchange(tick time.Time) { - if s.bgLastTick.Add(s.bgInterval).After(tick) { - return - } - storeDescriptors := s.state.StoreDescriptors() - s.exchange.Put(tick, storeDescriptors...) - for _, store := range s.state.Stores() { - storeID := store.StoreID() - s.state.UpdateStorePool(storeID, s.exchange.Get(tick, roachpb.StoreID(storeID))) - } +func (s *Simulator) tickGossip(ctx context.Context, tick time.Time) { + s.gossip.Tick(ctx, tick, s.state) } func (s *Simulator) tickStoreClocks(tick time.Time) { diff --git a/pkg/kv/kvserver/asim/asim_test.go b/pkg/kv/kvserver/asim/asim_test.go index 319fad82cbc2..bdb9fe2f5c40 100644 --- a/pkg/kv/kvserver/asim/asim_test.go +++ b/pkg/kv/kvserver/asim/asim_test.go @@ -38,11 +38,10 @@ func TestRunAllocatorSimulator(t *testing.T) { rwg := make([]workload.Generator, 1) rwg[0] = testCreateWorkloadGenerator(start, 1, 10) m := asim.NewMetricsTracker(os.Stdout) - exchange := state.NewFixedDelayExhange(start, settings.StateExchangeInterval, settings.StateExchangeDelay) changer := state.NewReplicaChanger() s := state.LoadConfig(state.ComplexConfig) - sim := asim.NewSimulator(start, end, interval, interval, rwg, s, exchange, changer, settings, m) + sim := asim.NewSimulator(start, end, interval, interval, rwg, s, changer, settings, m) sim.RunSim(ctx) } @@ -67,16 +66,6 @@ func testCreateWorkloadGenerator(start time.Time, stores int, keySpan int64) wor ) } -// testPreGossipStores populates the state exchange with the existing state. -// This is done at the time given, which should be before the test start time -// minus the gossip delay and interval. This alleviates a cold start, where the -// allocator for each store does not have information to make a decision for -// the ranges it holds leases for. -func testPreGossipStores(s state.State, exchange state.Exchange, at time.Time) { - storeDescriptors := s.StoreDescriptors() - exchange.Put(at, storeDescriptors...) -} - // TestAllocatorSimulatorSpeed tests that the simulation runs at a rate of at // least 1.67 simulated minutes per wall clock second (1:100) for a 32 node // cluster, with 32000 replicas. The workload is generating 16000 keys per @@ -96,7 +85,6 @@ func TestAllocatorSimulatorSpeed(t *testing.T) { end := start.Add(5 * time.Minute) bgInterval := 10 * time.Second interval := 2 * time.Second - preGossipStart := start.Add(-settings.StateExchangeInterval - settings.StateExchangeDelay) stores := 32 replsPerRange := 3 @@ -112,7 +100,6 @@ func TestAllocatorSimulatorSpeed(t *testing.T) { sample := func() int64 { rwg := make([]workload.Generator, 1) rwg[0] = testCreateWorkloadGenerator(start, stores, int64(keyspace)) - exchange := state.NewFixedDelayExhange(preGossipStart, settings.StateExchangeInterval, settings.StateExchangeDelay) changer := state.NewReplicaChanger() m := asim.NewMetricsTracker() // no output replicaDistribution := make([]float64, stores) @@ -129,8 +116,7 @@ func TestAllocatorSimulatorSpeed(t *testing.T) { } s := state.NewTestStateReplDistribution(replicaDistribution, ranges, replsPerRange, keyspace) - testPreGossipStores(s, exchange, preGossipStart) - sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, exchange, changer, settings, m) + sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, changer, settings, m) startTime := timeutil.Now() sim.RunSim(ctx) @@ -169,7 +155,6 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) { end := start.Add(15 * time.Minute) bgInterval := 10 * time.Second interval := 2 * time.Second - preGossipStart := start.Add(-settings.StateExchangeInterval - settings.StateExchangeDelay) stores := 7 replsPerRange := 3 @@ -187,7 +172,6 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) { for run := 0; run < runs; run++ { rwg := make([]workload.Generator, 1) rwg[0] = testCreateWorkloadGenerator(start, stores, int64(keyspace)) - exchange := state.NewFixedDelayExhange(preGossipStart, settings.StateExchangeInterval, settings.StateExchangeDelay) changer := state.NewReplicaChanger() m := asim.NewMetricsTracker() // no output replicaDistribution := make([]float64, stores) @@ -204,12 +188,17 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) { } s := state.NewTestStateReplDistribution(replicaDistribution, ranges, replsPerRange, keyspace) - testPreGossipStores(s, exchange, preGossipStart) - sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, exchange, changer, settings, m) + sim := asim.NewSimulator(start, end, interval, bgInterval, rwg, s, changer, settings, m) ctx := context.Background() sim.RunSim(ctx) - descs := s.StoreDescriptors() + + storeRefs := s.Stores() + storeIDs := make([]state.StoreID, len(storeRefs)) + for i, store := range storeRefs { + storeIDs[i] = store.StoreID() + } + descs := s.StoreDescriptors(false /* cached */, storeIDs...) if run == 0 { refRun = descs diff --git a/pkg/kv/kvserver/asim/gossip/BUILD.bazel b/pkg/kv/kvserver/asim/gossip/BUILD.bazel new file mode 100644 index 000000000000..f007187eb6e6 --- /dev/null +++ b/pkg/kv/kvserver/asim/gossip/BUILD.bazel @@ -0,0 +1,39 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "gossip", + srcs = [ + "exchange.go", + "gossip.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver", + "//pkg/kv/kvserver/allocator/storepool", + "//pkg/kv/kvserver/asim/config", + "//pkg/kv/kvserver/asim/state", + "//pkg/roachpb", + "//pkg/util/protoutil", + ], +) + +go_test( + name = "gossip_test", + srcs = [ + "exchange_test.go", + "gossip_test.go", + ], + args = ["-test.timeout=295s"], + embed = [":gossip"], + deps = [ + "//pkg/kv/kvserver/allocator/storepool", + "//pkg/kv/kvserver/asim/config", + "//pkg/kv/kvserver/asim/state", + "//pkg/roachpb", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/asim/gossip/exchange.go b/pkg/kv/kvserver/asim/gossip/exchange.go new file mode 100644 index 000000000000..b2aa771c106d --- /dev/null +++ b/pkg/kv/kvserver/asim/gossip/exchange.go @@ -0,0 +1,64 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gossip + +import ( + "sort" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// exchangeInfo contains the information of a gossiped store descriptor. +type exchangeInfo struct { + created time.Time + desc roachpb.StoreDescriptor +} + +// fixedDelayExchange simulates a gossip exchange network with a symmetric +// fixed delay between all connected clients. +type fixedDelayExchange struct { + pending []exchangeInfo + settings *config.SimulationSettings +} + +// put adds the given descriptors at the current tick into the exchange +// network. +func (u *fixedDelayExchange) put(tick time.Time, descs ...roachpb.StoreDescriptor) { + for _, desc := range descs { + u.pending = append(u.pending, exchangeInfo{created: tick, desc: desc}) + } +} + +// updates returns back exchanged infos, wrapped as store details that have +// completed between the last tick update was called and the tick given. +func (u *fixedDelayExchange) updates(tick time.Time) []*storepool.StoreDetail { + sort.Slice(u.pending, func(i, j int) bool { return u.pending[i].created.Before(u.pending[j].created) }) + ready := []*storepool.StoreDetail{} + i := 0 + for ; i < len(u.pending) && !tick.Before(u.pending[i].created.Add(u.settings.StateExchangeDelay)); i++ { + ready = append(ready, makeStoreDetail(&u.pending[i].desc, u.pending[i].created)) + } + u.pending = u.pending[i:] + return ready +} + +// makeStoreDetail wraps a store descriptor into a storepool StoreDetail at the +// given tick. +func makeStoreDetail(desc *roachpb.StoreDescriptor, tick time.Time) *storepool.StoreDetail { + return &storepool.StoreDetail{ + Desc: desc, + LastUpdatedTime: tick, + LastAvailable: tick, + } +} diff --git a/pkg/kv/kvserver/asim/gossip/exchange_test.go b/pkg/kv/kvserver/asim/gossip/exchange_test.go new file mode 100644 index 000000000000..a52550197b1e --- /dev/null +++ b/pkg/kv/kvserver/asim/gossip/exchange_test.go @@ -0,0 +1,51 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gossip + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/stretchr/testify/require" +) + +func TestFixedDelayExchange(t *testing.T) { + makeStoresFn := func(stores []int32) []roachpb.StoreDescriptor { + descriptors := make([]roachpb.StoreDescriptor, len(stores)) + for i := range stores { + descriptors[i] = roachpb.StoreDescriptor{StoreID: roachpb.StoreID(stores[i])} + + } + return descriptors + } + + settings := config.DefaultSimulationSettings() + tick := state.TestingStartTime() + exchange := fixedDelayExchange{pending: []exchangeInfo{}, settings: settings} + + // There should be no updates initially. + require.Len(t, exchange.updates(tick), 0) + + // Put an update at the current tick. + exchange.put(tick, makeStoresFn([]int32{1, 2, 3})...) + require.Len(t, exchange.pending, 3) + + // There should be no updates until after the tick + state exchange delay. + halfTick := tick.Add(settings.StateExchangeDelay / 2) + require.Len(t, exchange.updates(halfTick), 0) + + // Update the tick to be >= tick + delay, there should be three updates. + tick = tick.Add(settings.StateExchangeDelay) + require.Len(t, exchange.updates(tick), 3) + require.Len(t, exchange.pending, 0) +} diff --git a/pkg/kv/kvserver/asim/gossip/gossip.go b/pkg/kv/kvserver/asim/gossip/gossip.go new file mode 100644 index 000000000000..28a3e6cfa2b6 --- /dev/null +++ b/pkg/kv/kvserver/asim/gossip/gossip.go @@ -0,0 +1,201 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gossip + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" +) + +// Gossip collects and updates the storepools of the cluster upon capacity +// changes and the gossip interval defined in the simulation settings. +type Gossip interface { + // Tick checks for completed gossip updates and triggers new gossip + // updates if needed. + Tick(context.Context, time.Time, state.State) +} + +// gossip is an implementation of the Gossip interface. It manages the +// dissemenation of gossip information to stores in the cluster. +type gossip struct { + storeGossip map[state.StoreID]*storeGossiper + settings *config.SimulationSettings + exchange *fixedDelayExchange +} + +var _ Gossip = &gossip{} + +// storeGossiper is the store-local gossip state that tracks information +// required to decide when to gossip and pending outbound gossip infos that +// have been triggered by the underlying kvserver.StoreGossip component. +type storeGossiper struct { + local *kvserver.StoreGossip + lastIntervalGossip time.Time + descriptorGetter func(cached bool) roachpb.StoreDescriptor + pendingOutbound *roachpb.StoreDescriptor +} + +func newStoreGossiper(descriptorGetter func(cached bool) roachpb.StoreDescriptor) *storeGossiper { + sg := &storeGossiper{ + lastIntervalGossip: time.Time{}, + descriptorGetter: descriptorGetter, + } + + desc := sg.descriptorGetter(false /* cached */) + knobs := kvserver.StoreGossipTestingKnobs{AsyncDisabled: true} + sg.local = kvserver.NewStoreGossip(sg, sg, knobs) + sg.local.Ident = roachpb.StoreIdent{StoreID: desc.StoreID, NodeID: desc.Node.NodeID} + + return sg +} + +var _ kvserver.InfoGossiper = &storeGossiper{} + +var _ kvserver.StoreDescriptorProvider = &storeGossiper{} + +// AddInfoProto adds or updates an info object in gossip. Returns an error +// if info couldn't be added. +func (sg *storeGossiper) AddInfoProto(key string, msg protoutil.Message, ttl time.Duration) error { + desc := msg.(*roachpb.StoreDescriptor) + // NB: We overwrite any pending outbound gossip infos. This behavior is + // valid as at each tick we will check this field before sending it off to + // the asim internal gossip exchange; any overwritten gossip infos would + // also have been overwritten in the same tick at the receiver i.e. last + // writer in the tick wins. + sg.pendingOutbound = desc + return nil +} + +// Descriptor returns a StoreDescriptor including current store +// capacity information. +func (sg *storeGossiper) Descriptor( + ctx context.Context, cached bool, +) (*roachpb.StoreDescriptor, error) { + desc := sg.descriptorGetter(cached) + if cached { + capacity := sg.local.CachedCapacity() + if capacity != (roachpb.StoreCapacity{}) { + desc.Capacity = capacity + } + } + return &desc, nil +} + +// NewGossip returns an implementation of the Gossip interface. +func NewGossip(s state.State, settings *config.SimulationSettings) *gossip { + g := &gossip{ + settings: settings, + storeGossip: map[state.StoreID]*storeGossiper{}, + exchange: &fixedDelayExchange{ + pending: []exchangeInfo{}, + settings: settings, + }, + } + for _, store := range s.Stores() { + g.addStoreToGossip(s, store.StoreID()) + } + s.RegisterCapacityChangeListener(g) + s.RegisterCapacityListener(g) + return g +} + +func (g *gossip) addStoreToGossip(s state.State, storeID state.StoreID) { + g.storeGossip[storeID] = newStoreGossiper(func(cached bool) roachpb.StoreDescriptor { + return s.StoreDescriptors(cached, storeID)[0] + }) +} + +// Tick checks for completed gossip updates and triggers new gossip +// updates if needed. +func (g *gossip) Tick(ctx context.Context, tick time.Time, s state.State) { + stores := s.Stores() + for _, store := range stores { + var sg *storeGossiper + var ok bool + // If the store gossip for this store doesn't yet exist, create it and + // add it to the map of store gossips. + if sg, ok = g.storeGossip[store.StoreID()]; !ok { + g.addStoreToGossip(s, store.StoreID()) + } + + // If the interval between the last time this store was gossiped for + // interval and this tick is not less than the gossip interval, then we + // shoud gossip. + // NB: In the real code this is controlled by a gossip + // ticker on the node that activates every 10 seconds. + if !tick.Before(sg.lastIntervalGossip.Add(g.settings.StateExchangeInterval)) { + sg.lastIntervalGossip = tick + _ = sg.local.GossipStore(ctx, false /* useCached */) + } + + // Put the pending gossip infos into the exchange. + if sg.pendingOutbound != nil { + desc := *sg.pendingOutbound + g.exchange.put(tick, desc) + // Clear the pending gossip infos for this store. + sg.pendingOutbound = nil + } + } + // Update with any recently complete gossip infos. + g.maybeUpdateState(tick, s) +} + +// CapacityChangeNotify notifies that a capacity change event has occurred +// for the store with ID StoreID. +func (g *gossip) CapacityChangeNotify(cce kvserver.CapacityChangeEvent, storeID state.StoreID) { + if sg, ok := g.storeGossip[storeID]; ok { + sg.local.MaybeGossipOnCapacityChange(context.Background(), cce) + } else { + panic( + fmt.Sprintf("capacity change event but no found store in store gossip with ID %d", + storeID, + )) + } +} + +// NewCapacityNotify notifies that a new capacity event has occurred for +// the store with ID StoreID. +func (g *gossip) NewCapacityNotify(capacity roachpb.StoreCapacity, storeID state.StoreID) { + if sg, ok := g.storeGossip[storeID]; ok { + sg.local.UpdateCachedCapacity(capacity) + } else { + panic( + fmt.Sprintf("new capacity event but no found store in store gossip with ID %d", + storeID, + )) + } +} + +func (g *gossip) maybeUpdateState(tick time.Time, s state.State) { + // NB: The updates function gives back all store descriptors which have + // completed exchange. We apply the update to every stores state uniformly, + // i.e. fixed delay. + updates := g.exchange.updates(tick) + if len(updates) == 0 { + return + } + + updateMap := map[roachpb.StoreID]*storepool.StoreDetail{} + for _, update := range updates { + updateMap[update.Desc.StoreID] = update + } + for _, store := range s.Stores() { + s.UpdateStorePool(store.StoreID(), updateMap) + } +} diff --git a/pkg/kv/kvserver/asim/gossip/gossip_test.go b/pkg/kv/kvserver/asim/gossip/gossip_test.go new file mode 100644 index 000000000000..cf8598e44504 --- /dev/null +++ b/pkg/kv/kvserver/asim/gossip/gossip_test.go @@ -0,0 +1,109 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package gossip + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/stretchr/testify/require" +) + +func TestGossip(t *testing.T) { + settings := config.DefaultSimulationSettings() + + tick := state.TestingStartTime() + s := state.NewTestStateEvenDistribution(3, 100, 3, 1000) + details := map[state.StoreID]*map[roachpb.StoreID]*storepool.StoreDetail{} + + for _, store := range s.Stores() { + // Cast the storepool to a concrete storepool type in order to mutate + // it directly for testing. + sp := s.StorePool(store.StoreID()).(*storepool.StorePool) + details[store.StoreID()] = &sp.DetailsMu.StoreDetails + } + + assertStorePool := func(f func(prev, cur *map[roachpb.StoreID]*storepool.StoreDetail)) { + var prev *map[roachpb.StoreID]*storepool.StoreDetail + for _, cur := range details { + if prev != nil { + f(prev, cur) + } + prev = cur + } + } + + assertEmptyFn := func(prev, cur *map[roachpb.StoreID]*storepool.StoreDetail) { + require.Len(t, *prev, 0) + require.Len(t, *cur, 0) + } + + assertSameFn := func(prev, cur *map[roachpb.StoreID]*storepool.StoreDetail) { + require.Equal(t, *prev, *cur) + } + + gossip := NewGossip(s, settings) + ctx := context.Background() + + // The initial storepool state should be empty for each store. + assertStorePool(assertEmptyFn) + + gossip.Tick(ctx, tick, s) + // The storepool state is still empty, after ticking, since the + // delay is 1 second. + assertStorePool(assertEmptyFn) + // The last interval gossip time for each store should be the current tick. + for _, sg := range gossip.storeGossip { + require.Equal(t, tick, sg.lastIntervalGossip) + } + + // The exchange component should contain three store descriptors, one for + // each store. + require.Len(t, gossip.exchange.pending, 3) + + // Add the delay interval and then assert that the storepools for each + // store are populated. + tick = tick.Add(settings.StateExchangeDelay) + gossip.Tick(ctx, tick, s) + + // The exchange component should now be empty, clearing the previous + // gossiped descriptors. + require.Len(t, gossip.exchange.pending, 0) + assertStorePool(assertSameFn) + + // Update the usage info leases for s1 and s2, so that it exceeds the delta + // required to trigger a gossip update. We do this by transferring every + // lease to s2. + for _, rng := range s.Ranges() { + s.TransferLease(rng.RangeID(), 2) + } + gossip.Tick(ctx, tick, s) + // There should be just store 1 and 2 pending gossip updates in the exchanger. + require.Len(t, gossip.exchange.pending, 2) + // Increment the tick and check that the updated lease count information + // reached each storepool. + tick = tick.Add(settings.StateExchangeDelay) + gossip.Tick(ctx, tick, s) + require.Len(t, gossip.exchange.pending, 0) + + // NB: If all the storepools are identical, we only need to check one + // stores to ensure it matches expectation. + assertStorePool(assertSameFn) + // Assert that the lease counts are as expected after transferring all of + // the leases to s2. + require.Equal(t, int32(0), (*details[1])[1].Desc.Capacity.LeaseCount) + require.Equal(t, int32(100), (*details[1])[2].Desc.Capacity.LeaseCount) + require.Equal(t, int32(0), (*details[1])[3].Desc.Capacity.LeaseCount) +} diff --git a/pkg/kv/kvserver/asim/metrics_tracker_test.go b/pkg/kv/kvserver/asim/metrics_tracker_test.go index 224e93e202e0..f10428528b7c 100644 --- a/pkg/kv/kvserver/asim/metrics_tracker_test.go +++ b/pkg/kv/kvserver/asim/metrics_tracker_test.go @@ -113,12 +113,10 @@ func Example_workload() { rwg[0] = testCreateWorkloadGenerator(start, 10, 10000) m := asim.NewMetricsTracker(os.Stdout) - exchange := state.NewFixedDelayExhange(start, settings.StateExchangeInterval, settings.StateExchangeDelay) changer := state.NewReplicaChanger() s := state.LoadConfig(state.ComplexConfig) - testPreGossipStores(s, exchange, start) - sim := asim.NewSimulator(start, end, interval, interval, rwg, s, exchange, changer, settings, m) + sim := asim.NewSimulator(start, end, interval, interval, rwg, s, changer, settings, m) sim.RunSim(ctx) // WIP: non deterministic // Output: diff --git a/pkg/kv/kvserver/asim/op/BUILD.bazel b/pkg/kv/kvserver/asim/op/BUILD.bazel index 968930fb2965..0467780be8cb 100644 --- a/pkg/kv/kvserver/asim/op/BUILD.bazel +++ b/pkg/kv/kvserver/asim/op/BUILD.bazel @@ -33,6 +33,7 @@ go_test( "//pkg/kv/kvserver/allocator", "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/asim/config", + "//pkg/kv/kvserver/asim/gossip", "//pkg/kv/kvserver/asim/state", "//pkg/roachpb", "@com_github_stretchr_testify//require", diff --git a/pkg/kv/kvserver/asim/op/controller_test.go b/pkg/kv/kvserver/asim/op/controller_test.go index 2e3200c73bb4..d264fbd63123 100644 --- a/pkg/kv/kvserver/asim/op/controller_test.go +++ b/pkg/kv/kvserver/asim/op/controller_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/stretchr/testify/require" @@ -143,6 +144,8 @@ func TestRelocateRangeOp(t *testing.T) { settings := config.DefaultSimulationSettings() settings.ReplicaAddRate = 1 settings.ReplicaChangeBaseDelay = 5 * time.Second + settings.StateExchangeInterval = 1 * time.Second + settings.StateExchangeDelay = 0 type testRelocationArgs struct { voters []state.StoreID @@ -290,19 +293,8 @@ func TestRelocateRangeOp(t *testing.T) { } } - exchange := state.NewFixedDelayExhange( - start, - time.Second, - time.Second*0, /* no state update delay */ - ) - - // Update the storepool for informing allocator decisions. - storeDescriptors := s.StoreDescriptors() - exchange.Put(state.OffsetTick(start, 0), storeDescriptors...) - for _, store := range s.Stores() { - storeID := store.StoreID() - s.UpdateStorePool(storeID, exchange.Get(state.OffsetTick(start, 1), roachpb.StoreID(storeID))) - } + gossip := gossip.NewGossip(s, settings) + gossip.Tick(ctx, start, s) results := map[int64]map[state.RangeID]rangeState{} pending := []DispatchedTicket{} diff --git a/pkg/kv/kvserver/asim/queue/BUILD.bazel b/pkg/kv/kvserver/asim/queue/BUILD.bazel index 1ab01d83f4a9..2f146bc58f0f 100644 --- a/pkg/kv/kvserver/asim/queue/BUILD.bazel +++ b/pkg/kv/kvserver/asim/queue/BUILD.bazel @@ -29,6 +29,7 @@ go_test( embed = [":queue"], deps = [ "//pkg/kv/kvserver/asim/config", + "//pkg/kv/kvserver/asim/gossip", "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/asim/workload", "//pkg/roachpb", diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go index 3485ca9a9225..3d1f10018014 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue_test.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue_test.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/stretchr/testify/require" @@ -68,7 +69,12 @@ func TestReplicateQueue(t *testing.T) { getReplCounts := func(s state.State) map[int]int { storeView := make(map[int]int) - for _, desc := range s.StoreDescriptors() { + stores := s.Stores() + storeIDs := make([]state.StoreID, len(stores)) + for i, store := range stores { + storeIDs[i] = store.StoreID() + } + for _, desc := range s.StoreDescriptors(false /* cached */, storeIDs...) { storeView[int(desc.StoreID)] = int(desc.Capacity.RangeCount) } return storeView @@ -136,12 +142,8 @@ func TestReplicateQueue(t *testing.T) { results := make(map[int64]map[int]int) // Initialize the store pool information. - exchange := state.NewFixedDelayExhange( - start, - testSettings.StateExchangeInterval, - time.Second*0, /* no state update delay */ - ) - exchange.Put(start, s.StoreDescriptors()...) + gossip := gossip.NewGossip(s, testSettings) + gossip.Tick(ctx, start, s) nextRepl := 0 repls := s.Replicas(store.StoreID()) @@ -153,13 +155,7 @@ func TestReplicateQueue(t *testing.T) { // Update the store's view of the cluster, we update all stores // but only care about s1's view. - exchange.Put(state.OffsetTick(start, tick), s.StoreDescriptors()...) - - // Update s1's view of the cluster. - s.UpdateStorePool( - store.StoreID(), - exchange.Get(state.OffsetTick(start, tick), store.Descriptor().StoreID), - ) + gossip.Tick(ctx, state.OffsetTick(start, tick), s) // Tick the replicate queue, popping a queued replicas and // considering rebalance. diff --git a/pkg/kv/kvserver/asim/state/BUILD.bazel b/pkg/kv/kvserver/asim/state/BUILD.bazel index 4de663f39177..031415c2256a 100644 --- a/pkg/kv/kvserver/asim/state/BUILD.bazel +++ b/pkg/kv/kvserver/asim/state/BUILD.bazel @@ -6,18 +6,19 @@ go_library( srcs = [ "change.go", "config_loader.go", - "exchange.go", "helpers.go", "impl.go", "load.go", "split_decider.go", "state.go", + "state_listener.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state", visibility = ["//visibility:public"], deps = [ "//pkg/config/zonepb", "//pkg/gossip", + "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator", "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/allocator/storepool", @@ -44,7 +45,6 @@ go_test( srcs = [ "change_test.go", "config_loader_test.go", - "exchange_test.go", "split_decider_test.go", "state_test.go", ], diff --git a/pkg/kv/kvserver/asim/state/exchange.go b/pkg/kv/kvserver/asim/state/exchange.go deleted file mode 100644 index 4157f62d4bbf..000000000000 --- a/pkg/kv/kvserver/asim/state/exchange.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package state - -import ( - "fmt" - "time" - - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" - "github.com/cockroachdb/cockroach/pkg/roachpb" -) - -// Exchange controls the dissemination of a store's state, to every other -// store in a simulation. The contract requires that: -// (1) Single value per tick: Multiple puts at tick t, with desc d should -// -// provide an identical d to all other puts for d. -// -// (2) Calls to Put are monotonic w.r.t tick value passed in. -type Exchange interface { - // Put inserts store state(s) at tick t. This state will be visible to - // other stores at potentially different times t' >= t. - Put(tick time.Time, descs ...roachpb.StoreDescriptor) - - // Get retrieves a store's view of the state exchange at tick t. - // The view returned may not be the same for each store at t. - Get(tick time.Time, StoreID roachpb.StoreID) map[roachpb.StoreID]*storepool.StoreDetail -} - -// FixedDelayExchange implements state exchange. It groups puts at an interval, -// selecting the last put for each descriptor within an interval. Updates -// propagate at a fixed number of intervals, such that a get at time t, will -// retrieve the largest tick t' less than t - delay. -// -// TODO(kvoli,lidorcarmel): Implement gossip exchange that provides a -// configuration similar to pkg/gossip. -type FixedDelayExchange struct { - end time.Time - interval time.Duration - delay time.Duration - intervalMap map[int64]*map[roachpb.StoreID]*storepool.StoreDetail -} - -// NewFixedDelayExhange returns a state delay exchange with fixed delay. -func NewFixedDelayExhange(start time.Time, interval, delay time.Duration) *FixedDelayExchange { - return &FixedDelayExchange{ - end: start.Add(interval), - interval: interval, - delay: delay, - intervalMap: make(map[int64]*map[roachpb.StoreID]*storepool.StoreDetail), - } -} - -// Put inserts store state(s) at tick t. This state will be visible to -// all other stores at a fixed delay. -func (u *FixedDelayExchange) Put(tick time.Time, descs ...roachpb.StoreDescriptor) { - prevInterval, ok := u.intervalMap[u.end.Unix()] - if !ok { - tmpInterval := make(map[roachpb.StoreID]*storepool.StoreDetail) - prevInterval = &tmpInterval - } - - u.maybeUpdateInterval(tick) - curInterval, ok := u.intervalMap[u.end.Unix()] - if !ok { - // Copy the previous store map, so that older exchanged configurations - // appear in the latest tick, when queried. This could be updated to a - // linked list or btree if copy performance affects runtime - // significantly. - prevIntervalCopy := make(map[roachpb.StoreID]*storepool.StoreDetail) - for _, storeDetail := range *prevInterval { - storeDescCopy := *storeDetail.Desc - storeDetailCopy := *storeDetail - storeDetailCopy.Desc = &storeDescCopy - prevIntervalCopy[storeDetail.Desc.StoreID] = &storeDetailCopy - } - u.intervalMap[u.end.Unix()] = &prevIntervalCopy - curInterval = u.intervalMap[u.end.Unix()] - } - - for _, d := range descs { - desc := d - nextStoreDetail := storepool.StoreDetail{} - nextStoreDetail.Desc = &desc - nextStoreDetail.LastAvailable = tick - nextStoreDetail.LastUpdatedTime = tick - nextStoreDetail.Desc.Node = desc.Node - - (*curInterval)[desc.StoreID] = &nextStoreDetail - } -} - -// Get retrieves a store's view of the state exchange at tick t. In this -// simple implementation of state exchange, all stores have a symmetrical -// view of the others state. -func (u *FixedDelayExchange) Get( - tick time.Time, storeID roachpb.StoreID, -) map[roachpb.StoreID]*storepool.StoreDetail { - delayedEnd := tickUpperBound(tick.Add(-u.delay-u.interval), u.end, u.interval) - if state, ok := u.intervalMap[delayedEnd.Unix()]; ok { - return *state - } - return make(map[roachpb.StoreID]*storepool.StoreDetail) -} - -// maybeUpdateInterval compares the tick given to the current interval's end -// time. When tick < intervalEnd, do nothing. When tick >= intervalEnd, update -// intervalEnd to be the smallest time that contains tick, by adding interval -// repeatedly to it. -func (u *FixedDelayExchange) maybeUpdateInterval(tick time.Time) { - if tick.Add(u.interval).Before(u.end) { - panic(fmt.Sprintf( - "Only monotonic calls to fixed delay exchange are allowed for puts. "+ - "Lowest acceptable tick is greater than given (%d > %d)", - u.end.Add(-u.interval).UTC().Second(), tick.UTC().Second()), - ) - } - // Tick is in the current period, no need to update the end interval. - if tick.Before(u.end) { - return - } - u.end = tickUpperBound(tick, u.end, u.interval) -} - -// tickUpperBound returns the smallest interval end time, given the current end -// time and interval duration, that is greater than tick. -func tickUpperBound(tick, end time.Time, interval time.Duration) time.Time { - delta := tick.Sub(end) - intervalDelta := delta / interval - if delta%interval == 0 || !tick.Before(end) { - intervalDelta++ - } - return end.Add(intervalDelta * interval) -} diff --git a/pkg/kv/kvserver/asim/state/exchange_test.go b/pkg/kv/kvserver/asim/state/exchange_test.go deleted file mode 100644 index c80cc6a63b48..000000000000 --- a/pkg/kv/kvserver/asim/state/exchange_test.go +++ /dev/null @@ -1,167 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package state - -import ( - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/stretchr/testify/require" -) - -// TestTickUpperBound asserts that finding the smallest end time a tick is -// bounded by, given the current end time and the interval behaves correctly. -func TestTickUpperBound(t *testing.T) { - start := TestingStartTime() - - // The cases that can occur are a combination of the remainder of (tick - - // end) / interval, and whether tick < end. The possible cases are outlined - // in the descriptions below. - testCases := []struct { - desc string - tick time.Time - end time.Time - interval time.Duration - expected time.Time - }{ - { - desc: "tick = end and (tick - end) % interval = 0", - tick: start.Add(time.Second * 10), - end: start.Add(time.Second * 10), - interval: time.Second * 10, - expected: start.Add(time.Second * 20), - }, - { - desc: "tick > end and (tick - end) % interval > 0", - tick: start.Add(time.Second * 15), - end: start, - interval: time.Second * 10, - expected: start.Add(time.Second * 20), - }, - { - desc: "tick > end and (tick - end) % interval = 0", - tick: start.Add(time.Second * 10), - end: start, - interval: time.Second * 10, - expected: start.Add(time.Second * 20), - }, - { - desc: "tick < end and (tick - end) % interval = 0", - tick: start.Add(time.Second * 5), - end: start.Add(time.Second * 25), - interval: time.Second * 10, - expected: start.Add(time.Second * 15), - }, - { - desc: "tick < end and (tick - end) % interval < 0", - tick: start.Add(time.Second * 0), - end: start.Add(time.Second * 15), - interval: time.Second * 10, - expected: start.Add(time.Second * 5), - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - require.Equal(t, tc.expected, tickUpperBound(tc.tick, tc.end, tc.interval)) - }) - } -} - -// TestFixedDelayExchange asserts that: -// Put inserts store descriptors at tick t. These store descriptors will become -// visible to other stores at the same time. -func TestFixedDelayExchange(t *testing.T) { - start := TestingStartTime() - interval := time.Second * 10 - delay := time.Second * 2 - - makeStoreDescriptors := func(stores []int32) []roachpb.StoreDescriptor { - descriptors := make([]roachpb.StoreDescriptor, len(stores)) - for i := range stores { - descriptors[i] = roachpb.StoreDescriptor{StoreID: roachpb.StoreID(stores[i])} - - } - return descriptors - } - - testCases := []struct { - desc string - stores []int - putOrder []int64 - puts map[int64][]int32 - gets []int64 - expected map[int64]map[int32]int64 - }{ - { - desc: "all stores put", - putOrder: []int64{1, 15}, - puts: map[int64][]int32{ - 1: {1, 2, 3}, - 15: {1, 2, 3}, - }, - expected: map[int64]map[int32]int64{ - 11: {}, - 12: {1: 1, 2: 1, 3: 1}, - 21: {1: 1, 2: 1, 3: 1}, - 22: {1: 15, 2: 15, 3: 15}, - }, - }, - { - desc: "some stores put", - putOrder: []int64{1, 15}, - puts: map[int64][]int32{ - 1: {1, 3}, - 15: {1, 2}, - }, - expected: map[int64]map[int32]int64{ - 11: {}, - 12: {1: 1, 3: 1}, - 21: {1: 1, 3: 1}, - 22: {1: 15, 2: 15, 3: 1}, - }, - }, - { - desc: "no stores put", - puts: map[int64][]int32{}, - putOrder: []int64{}, - expected: map[int64]map[int32]int64{ - 11: {}, - 12: {}, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - stateExchange := NewFixedDelayExhange(start, interval, delay) - // Put the store updates for each tick. - for _, tick := range tc.putOrder { - stateExchange.Put(OffsetTick(start, tick), makeStoreDescriptors(tc.puts[tick])...) - } - // Collect the results for the stores, here we expect each store to - // view a symmetrical map of state. - results := make(map[int64]map[int32]int64) - for tick, storeMap := range tc.expected { - results[tick] = make(map[int32]int64) - for store := range storeMap { - storeDetailMap := stateExchange.Get(OffsetTick(start, tick), roachpb.StoreID(store)) - for store, storeDetail := range storeDetailMap { - results[tick][int32(store)] = ReverseOffsetTick(start, storeDetail.LastAvailable) - } - } - } - require.Equal(t, tc.expected, results) - - }) - } -} diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index c879741898c2..d306afc8965a 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -17,6 +17,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" @@ -32,15 +33,17 @@ import ( ) type state struct { - nodes map[NodeID]*node - stores map[StoreID]*store - load map[RangeID]ReplicaLoad - loadsplits map[StoreID]LoadSplitter - ranges *rmap - clusterinfo ClusterInfo - usageInfo *ClusterUsageInfo - clock *ManualSimClock - settings *config.SimulationSettings + nodes map[NodeID]*node + stores map[StoreID]*store + load map[RangeID]ReplicaLoad + loadsplits map[StoreID]LoadSplitter + capacityChangeListeners []CapacityChangeListener + newCapacityListeners []NewCapacityListener + ranges *rmap + clusterinfo ClusterInfo + usageInfo *ClusterUsageInfo + clock *ManualSimClock + settings *config.SimulationSettings // Unique ID generators for Nodes and Stores. These are incremented // pre-assignment. So that IDs start from 1. @@ -170,20 +173,27 @@ func (s *state) Stores() []Store { return stores } -// StoreDescriptors returns the descriptors for all stores that exist in -// this state. -func (s *state) StoreDescriptors() []roachpb.StoreDescriptor { +// StoreDescriptors returns the descriptors for the StoreIDs given. If the +// first flag is false, then the capacity is generated from scratch, +// otherwise the last calculated capacity values are used for each store. +func (s *state) StoreDescriptors(cached bool, storeIDs ...StoreID) []roachpb.StoreDescriptor { storeDescriptors := []roachpb.StoreDescriptor{} - for _, store := range s.Stores() { - s.updateStoreCapacity(store.StoreID()) - storeDescriptors = append(storeDescriptors, store.Descriptor()) + for _, storeID := range storeIDs { + if store, ok := s.Store(storeID); ok { + if !cached { + s.updateStoreCapacity(storeID) + } + storeDescriptors = append(storeDescriptors, store.Descriptor()) + } } return storeDescriptors } func (s *state) updateStoreCapacity(storeID StoreID) { if store, ok := s.stores[storeID]; ok { - store.desc.Capacity = Capacity(s, storeID) + capacity := Capacity(s, storeID) + store.desc.Capacity = capacity + s.publishNewCapacityEvent(capacity, storeID) } } @@ -363,6 +373,7 @@ func (s *state) addReplica(rangeID RangeID, storeID StoreID) (*replica, bool) { store.replicas[rangeID] = replica.replicaID rng.replicas[storeID] = replica + s.publishCapacityChangeEvent(kvserver.RangeAddEvent, storeID) // This is the first replica to be added for this range. Make it the // leaseholder as a placeholder. The caller can update the lease, however @@ -437,6 +448,8 @@ func (s *state) removeReplica(rangeID RangeID, storeID StoreID) bool { delete(store.replicas, rangeID) delete(rng.replicas, storeID) + s.publishCapacityChangeEvent(kvserver.RangeRemoveEvent, storeID) + return true } @@ -604,6 +617,7 @@ func (s *state) setLeaseholder(rangeID RangeID, storeID StoreID) { rng.replicas[storeID].holdsLease = true replicaID := s.stores[storeID].replicas[rangeID] rng.leaseholder = replicaID + s.publishCapacityChangeEvent(kvserver.LeaseAddEvent, storeID) } func (s *state) removeLeaseholder(rangeID RangeID, storeID StoreID) { @@ -611,6 +625,7 @@ func (s *state) removeLeaseholder(rangeID RangeID, storeID StoreID) { if repl, ok := rng.replicas[storeID]; ok { if repl.holdsLease { repl.holdsLease = false + s.publishCapacityChangeEvent(kvserver.LeaseRemoveEvent, storeID) return } } @@ -721,7 +736,12 @@ func (s *state) TickClock(tick time.Time) { func (s *state) UpdateStorePool( storeID StoreID, storeDescriptors map[roachpb.StoreID]*storepool.StoreDetail, ) { - s.stores[storeID].storepool.DetailsMu.StoreDetails = storeDescriptors + for gossipStoreID, detail := range storeDescriptors { + copiedDetail := *detail + copiedDesc := *detail.Desc + copiedDetail.Desc = &copiedDesc + s.stores[storeID].storepool.DetailsMu.StoreDetails[gossipStoreID] = &copiedDetail + } } // NextReplicasFn returns a function, that when called will return the current @@ -843,6 +863,32 @@ func (s *state) RaftStatus(rangeID RangeID, storeID StoreID) *raft.Status { return status } +// RegisterCapacityChangeListener registers a listener which will be called +// on events where there is a capacity change (lease or replica) in the +// cluster state. +func (s *state) RegisterCapacityChangeListener(listener CapacityChangeListener) { + s.capacityChangeListeners = append(s.capacityChangeListeners, listener) +} + +func (s *state) publishCapacityChangeEvent(cce kvserver.CapacityChangeEvent, storeID StoreID) { + for _, listener := range s.capacityChangeListeners { + listener.CapacityChangeNotify(cce, storeID) + } +} + +// RegisterCapacityListener registers a listener which will be called when +// a new store capacity has been generated from scratch, for a specific +// store. +func (s *state) RegisterCapacityListener(listener NewCapacityListener) { + s.newCapacityListeners = append(s.newCapacityListeners, listener) +} + +func (s *state) publishNewCapacityEvent(capacity roachpb.StoreCapacity, storeID StoreID) { + for _, listener := range s.newCapacityListeners { + listener.NewCapacityNotify(capacity, storeID) + } +} + // node is an implementation of the Node interface. type node struct { nodeID NodeID diff --git a/pkg/kv/kvserver/asim/state/state.go b/pkg/kv/kvserver/asim/state/state.go index d8af55e83453..55b7912d1116 100644 --- a/pkg/kv/kvserver/asim/state/state.go +++ b/pkg/kv/kvserver/asim/state/state.go @@ -54,8 +54,10 @@ type State interface { // runtime in profiling. We should investigate optimizing it, by way of // incremental descriptor computation, when replicas, leases or load is // changed. - // StoreDescriptors returns the descriptors for the StoreIDs given. - StoreDescriptors() []roachpb.StoreDescriptor + // StoreDescriptors returns the descriptors for the StoreIDs given. If the + // first flag is false, then the capacity is generated from scratch, + // otherwise the last calculated capacity values are used for each store. + StoreDescriptors(bool, ...StoreID) []roachpb.StoreDescriptor // Nodes returns all nodes that exist in this state. Nodes() []Node // RangeFor returns the range containing Key in [StartKey, EndKey). This @@ -159,6 +161,14 @@ type State interface { // RaftStatus returns the current raft status for the replica of the Range // with ID RangeID, on the store with ID StoreID. RaftStatus(RangeID, StoreID) *raft.Status + // RegisterCapacityChangeListener registers a listener which will be + // notified on events where there is a lease or replica addition or + // removal, for a specific store. + RegisterCapacityChangeListener(CapacityChangeListener) + // RegisterCapacityListener registers a listener which will be called when + // a new store capacity has been generated from scratch, for a specific + // store. + RegisterCapacityListener(NewCapacityListener) } // Node is a container for stores and is part of a cluster. diff --git a/pkg/kv/kvserver/asim/state/state_listener.go b/pkg/kv/kvserver/asim/state/state_listener.go new file mode 100644 index 000000000000..d1de123b4fae --- /dev/null +++ b/pkg/kv/kvserver/asim/state/state_listener.go @@ -0,0 +1,30 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package state + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// CapacityChangeListener listens for notification of capacity change events. +type CapacityChangeListener interface { + // CapacityChangeNotify notifies that a capacity change event has occurred + // for the store with ID StoreID. + CapacityChangeNotify(kvserver.CapacityChangeEvent, StoreID) +} + +// NewCapacityListener listens for notification of new store capacity events. +type NewCapacityListener interface { + // NewCapacityNotify notifies that a new capacity event has occurred for + // the store with ID StoreID. + NewCapacityNotify(roachpb.StoreCapacity, StoreID) +} diff --git a/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel b/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel index 6f8589dcd478..76129418a80a 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel +++ b/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel @@ -38,9 +38,9 @@ go_test( "//pkg/kv/kvserver", "//pkg/kv/kvserver/allocator/load", "//pkg/kv/kvserver/asim/config", + "//pkg/kv/kvserver/asim/gossip", "//pkg/kv/kvserver/asim/op", "//pkg/kv/kvserver/asim/state", - "//pkg/roachpb", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go index d997dfb98eb7..cdfc697335df 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go +++ b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer_test.go @@ -16,15 +16,20 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/stretchr/testify/require" ) func testingGetStoreQPS(s state.State) map[state.StoreID]float64 { ret := map[state.StoreID]float64{} - for _, desc := range s.StoreDescriptors() { + stores := s.Stores() + storeIDs := make([]state.StoreID, len(stores)) + for i, store := range stores { + storeIDs[i] = store.StoreID() + } + for _, desc := range s.StoreDescriptors(false /* cached */, storeIDs...) { ret[state.StoreID(desc.StoreID)] = desc.Capacity.QueriesPerSecond } return ret @@ -35,6 +40,7 @@ func TestStoreRebalancer(t *testing.T) { testingStore := state.StoreID(1) testSettings := config.DefaultSimulationSettings() testSettings.ReplicaChangeBaseDelay = 5 * time.Second + testSettings.StateExchangeDelay = 0 // NB: We trigger lease rebalancing in this test, where the end result // should be a perfectly balanced QPS of 500 per store. We only simulate @@ -179,15 +185,8 @@ func TestStoreRebalancer(t *testing.T) { ctx := context.Background() s := tc.s - exchange := state.NewFixedDelayExhange( - start, - time.Second, - time.Second*0, /* no state update delay */ - ) - - // Update the storepool for informing allocator decisions. - exchange.Put(state.OffsetTick(start, 0), s.StoreDescriptors()...) - s.UpdateStorePool(testingStore, exchange.Get(state.OffsetTick(start, 1), roachpb.StoreID(testingStore))) + gossip := gossip.NewGossip(s, testSettings) + gossip.Tick(ctx, start, s) allocator := s.MakeAllocator(testingStore) storePool := s.StorePool(testingStore) @@ -202,9 +201,7 @@ func TestStoreRebalancer(t *testing.T) { s.TickClock(state.OffsetTick(start, tick)) changer.Tick(state.OffsetTick(start, tick), s) controller.Tick(ctx, state.OffsetTick(start, tick), s) - exchange.Put(state.OffsetTick(start, 0), s.StoreDescriptors()...) - s.UpdateStorePool(testingStore, exchange.Get(state.OffsetTick(start, 1), roachpb.StoreID(testingStore))) - + gossip.Tick(ctx, state.OffsetTick(start, tick), s) src.Tick(ctx, state.OffsetTick(start, tick), s) resultsPhase = append(resultsPhase, src.rebalancerState.phase) storeQPS := testingGetStoreQPS(s) @@ -224,6 +221,8 @@ func TestStoreRebalancerBalances(t *testing.T) { testSettings := config.DefaultSimulationSettings() testSettings.ReplicaAddRate = 1 testSettings.ReplicaChangeBaseDelay = 1 * time.Second + testSettings.StateExchangeInterval = 1 * time.Second + testSettings.StateExchangeDelay = 0 distributeQPS := func(s state.State, qpsCounts map[state.StoreID]float64) { dist := make([]float64, len(qpsCounts)) @@ -288,15 +287,10 @@ func TestStoreRebalancerBalances(t *testing.T) { s.TransferLease(5, 3) distributeQPS(s, tc.qpsCounts) - exchange := state.NewFixedDelayExhange( - start, - time.Second, - time.Second*0, /* no state update delay */ - ) + gossip := gossip.NewGossip(s, testSettings) // Update the storepool for informing allocator decisions. - exchange.Put(state.OffsetTick(start, 0), s.StoreDescriptors()...) - s.UpdateStorePool(testingStore, exchange.Get(state.OffsetTick(start, 1), roachpb.StoreID(testingStore))) + gossip.Tick(ctx, start, s) allocator := s.MakeAllocator(testingStore) storePool := s.StorePool(testingStore) @@ -310,8 +304,7 @@ func TestStoreRebalancerBalances(t *testing.T) { s.TickClock(state.OffsetTick(start, tick)) changer.Tick(state.OffsetTick(start, tick), s) controller.Tick(ctx, state.OffsetTick(start, tick), s) - exchange.Put(state.OffsetTick(start, 0), s.StoreDescriptors()...) - s.UpdateStorePool(testingStore, exchange.Get(state.OffsetTick(start, 1), roachpb.StoreID(testingStore))) + gossip.Tick(ctx, state.OffsetTick(start, tick), s) src.Tick(ctx, state.OffsetTick(start, tick), s) results = append(results, testingGetStoreQPS(s)) diff --git a/pkg/kv/kvserver/client_replica_raft_overload_test.go b/pkg/kv/kvserver/client_replica_raft_overload_test.go index 678b567deb80..3e9b8e401e1e 100644 --- a/pkg/kv/kvserver/client_replica_raft_overload_test.go +++ b/pkg/kv/kvserver/client_replica_raft_overload_test.go @@ -44,17 +44,20 @@ func TestReplicaRaftOverload(t *testing.T) { on.Store(false) var args base.TestClusterArgs args.ReplicationMode = base.ReplicationManual - args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{StoreGossipIntercept: func(descriptor *roachpb.StoreDescriptor) { - if !on.Load().(bool) || descriptor.StoreID != 3 { - return - } - descriptor.Capacity.IOThreshold = admissionpb.IOThreshold{ - L0NumSubLevels: 1000000, - L0NumSubLevelsThreshold: 1, - L0NumFiles: 1000000, - L0NumFilesThreshold: 1, - } - }} + args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + GossipTestingKnobs: kvserver.StoreGossipTestingKnobs{ + StoreGossipIntercept: func(descriptor *roachpb.StoreDescriptor) { + if !on.Load().(bool) || descriptor.StoreID != 3 { + return + } + descriptor.Capacity.IOThreshold = admissionpb.IOThreshold{ + L0NumSubLevels: 1000000, + L0NumSubLevelsThreshold: 1, + L0NumFiles: 1000000, + L0NumFilesThreshold: 1, + } + }, + }} tc := testcluster.StartTestCluster(t, 3, args) defer tc.Stopper().Stop(ctx) diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 6119ba97a40d..4b26145245ca 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -2312,14 +2312,16 @@ func TestStoreRangeGossipOnSplits(t *testing.T) { serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - DisableMergeQueue: true, - DisableSplitQueue: true, - DisableScanner: true, - GossipWhenCapacityDeltaExceedsFraction: 0.5, // 50% for testing - // We can't properly test how frequently changes in the number of ranges - // trigger the store to gossip its capacities if we have to worry about - // changes in the number of leases also triggering store gossip. - DisableLeaseCapacityGossip: true, + DisableMergeQueue: true, + DisableSplitQueue: true, + DisableScanner: true, + GossipTestingKnobs: kvserver.StoreGossipTestingKnobs{ + OverrideGossipWhenCapacityDeltaExceedsFraction: 0.5, // 50% for testing + // We can't properly test how frequently changes in the number of ranges + // trigger the store to gossip its capacities if we have to worry about + // changes in the number of leases also triggering store gossip. + DisableLeaseCapacityGossip: true, + }, }, }, }) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index bf59882acbaf..dfe25104af8d 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -398,9 +398,9 @@ func (r *Replica) leasePostApplyLocked( currentOwner := newLease.OwnedBy(r.store.StoreID()) if leaseChangingHands && (prevOwner || currentOwner) { if currentOwner { - r.store.maybeGossipOnCapacityChange(ctx, leaseAddEvent) + r.store.storeGossip.MaybeGossipOnCapacityChange(ctx, LeaseAddEvent) } else if prevOwner { - r.store.maybeGossipOnCapacityChange(ctx, leaseRemoveEvent) + r.store.storeGossip.MaybeGossipOnCapacityChange(ctx, LeaseRemoveEvent) } if r.loadStats != nil { r.loadStats.Reset() diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index f3907716bd8f..d9dd58fafc62 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -105,19 +105,6 @@ const ( // maxes out RaftMaxInflightMsgs, we want the receiving replica to still have // some buffer for other messages, primarily heartbeats. replicaQueueExtraSize = 10 - - // GossipWhenCapacityDeltaExceedsFraction specifies the fraction from the - // last gossiped store capacity values which need be exceeded before the - // store will gossip immediately without waiting for the periodic gossip - // interval. - defaultGossipWhenCapacityDeltaExceedsFraction = 0.05 - - // systemDataGossipInterval is the interval at which range lease - // holders verify that the most recent system data is gossiped. - // This ensures that system data is always eventually gossiped, even - // if a range lease holder experiences a failure causing a missed - // gossip update. - systemDataGossipInterval = 1 * time.Minute ) var storeSchedulerConcurrency = envutil.EnvOrDefaultInt( @@ -764,19 +751,7 @@ type Store struct { sstSnapshotStorage SSTSnapshotStorage protectedtsReader spanconfig.ProtectedTSReader ctSender *sidetransport.Sender - - // gossipRangeCountdown and leaseRangeCountdown are countdowns of - // changes to range and leaseholder counts, after which the store - // descriptor will be re-gossiped earlier than the normal periodic - // gossip interval. Updated atomically. - gossipRangeCountdown int32 - gossipLeaseCountdown int32 - // gossipQueriesPerSecondVal and gossipWritesPerSecond serve similar - // purposes, but simply record the most recently gossiped value so that we - // can tell if a newly measured value differs by enough to justify - // re-gossiping the store. - gossipQueriesPerSecondVal syncutil.AtomicFloat64 - gossipWritesPerSecondVal syncutil.AtomicFloat64 + storeGossip *StoreGossip coalescedMu struct { syncutil.Mutex @@ -961,13 +936,6 @@ type Store struct { // reactively, i.e. will refresh on each tick loop iteration only. ioThresholds *ioThresholds - // cachedCapacity caches information on store capacity to prevent - // expensive recomputations in case leases or replicas are rapidly - // rebalancing. - cachedCapacity struct { - syncutil.Mutex - roachpb.StoreCapacity - } ioThreshold struct { syncutil.Mutex t *admissionpb.IOThreshold // never nil @@ -1010,8 +978,8 @@ type StoreConfig struct { DefaultSpanConfig roachpb.SpanConfig Settings *cluster.Settings Clock *hlc.Clock - DB *kv.DB Gossip *gossip.Gossip + DB *kv.DB NodeLiveness *liveness.NodeLiveness StorePool *storepool.StorePool Transport *RaftTransport @@ -1176,10 +1144,6 @@ func (sc *StoreConfig) SetDefaults() { if sc.RaftEntryCacheSize == 0 { sc.RaftEntryCacheSize = defaultRaftEntryCacheSize } - - if sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction == 0 { - sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction = defaultGossipWhenCapacityDeltaExceedsFraction - } } // GetStoreConfig exposes the config used for this store. @@ -1254,6 +1218,7 @@ func NewStore( s.metrics.registry.AddMetricStruct(s.allocator.Metrics.LoadBasedLeaseTransferMetrics) s.metrics.registry.AddMetricStruct(s.allocator.Metrics.LoadBasedReplicaRebalanceMetrics) } + s.replRankings = NewReplicaRankings() s.replRankingsByTenant = NewReplicaRankingsMap() @@ -1390,6 +1355,8 @@ func NewStore( updateSystemConfigUpdateQueueLimits) if s.cfg.Gossip != nil { + s.storeGossip = NewStoreGossip(cfg.Gossip, s, cfg.TestingKnobs.GossipTestingKnobs) + // Add range scanner and configure with queues. s.scanner = newReplicaScanner( s.cfg.AmbientCtx, s.cfg.Clock, cfg.ScanInterval, @@ -1943,6 +1910,8 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Gossip is only ever nil while bootstrapping a cluster and // in unittests. if s.cfg.Gossip != nil { + s.storeGossip.stopper = stopper + s.storeGossip.Ident = *s.Ident // Start a single goroutine in charge of periodically gossiping the // sentinel and first range metadata if we have a first range. @@ -2026,93 +1995,6 @@ func (s *Store) WaitForInit() { s.initComplete.Wait() } -var errPeriodicGossipsDisabled = errors.New("periodic gossip is disabled") - -// startGossip runs an infinite loop in a goroutine which regularly checks -// whether the store has a first range or config replica and asks those ranges -// to gossip accordingly. -func (s *Store) startGossip() { - wakeReplica := func(ctx context.Context, repl *Replica) error { - // Acquire the range lease, which in turn triggers system data gossip - // functions (e.g. MaybeGossipSystemConfig or MaybeGossipNodeLiveness). - _, pErr := repl.getLeaseForGossip(ctx) - return pErr.GoError() - } - gossipFns := []struct { - key roachpb.Key - fn func(context.Context, *Replica) error - description redact.SafeString - interval time.Duration - }{ - { - key: roachpb.KeyMin, - fn: func(ctx context.Context, repl *Replica) error { - // The first range is gossiped by all replicas, not just the lease - // holder, so wakeReplica is not used here. - return repl.maybeGossipFirstRange(ctx).GoError() - }, - description: "first range descriptor", - interval: s.cfg.SentinelGossipTTL() / 2, - }, - { - key: keys.NodeLivenessSpan.Key, - fn: wakeReplica, - description: "node liveness", - interval: systemDataGossipInterval, - }, - } - - cannotGossipEvery := log.Every(time.Minute) - cannotGossipEvery.ShouldLog() // only log next time after waiting out the delay - - // Periodic updates run in a goroutine and signal a WaitGroup upon completion - // of their first iteration. - s.initComplete.Add(len(gossipFns)) - for _, gossipFn := range gossipFns { - gossipFn := gossipFn // per-iteration copy - bgCtx := s.AnnotateCtx(context.Background()) - if err := s.stopper.RunAsyncTask(bgCtx, "store-gossip", func(ctx context.Context) { - ticker := time.NewTicker(gossipFn.interval) - defer ticker.Stop() - for first := true; ; { - // Retry in a backoff loop until gossipFn succeeds. The gossipFn might - // temporarily fail (e.g. because node liveness hasn't initialized yet - // making it impossible to get an epoch-based range lease), in which - // case we want to retry quickly. - retryOptions := base.DefaultRetryOptions() - retryOptions.Closer = s.stopper.ShouldQuiesce() - for r := retry.Start(retryOptions); r.Next(); { - if repl := s.LookupReplica(roachpb.RKey(gossipFn.key)); repl != nil { - annotatedCtx := repl.AnnotateCtx(ctx) - if err := gossipFn.fn(annotatedCtx, repl); err != nil { - if cannotGossipEvery.ShouldLog() { - log.Infof(annotatedCtx, "could not gossip %s: %v", gossipFn.description, err) - } - if !errors.Is(err, errPeriodicGossipsDisabled) { - continue - } - } - } - break - } - if first { - first = false - s.initComplete.Done() - } - select { - case <-ticker.C: - case <-s.stopper.ShouldQuiesce(): - return - } - } - }); err != nil { - s.initComplete.Done() - } - } -} - -var errSysCfgUnavailable = errors.New("system config not available in gossip") - // GetConfReader exposes access to a configuration reader. func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, error) { if s.cfg.TestingKnobs.MakeSystemConfigSpanUnavailableToQueues { @@ -2271,58 +2153,6 @@ func (s *Store) removeReplicaWithRangefeed(rangeID roachpb.RangeID) { s.rangefeedReplicas.Unlock() } -// systemGossipUpdate is a callback for gossip updates to -// the system config which affect range split boundaries. -func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { - if !s.cfg.SpanConfigsDisabled && spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { - return // nothing to do - } - - ctx := s.AnnotateCtx(context.Background()) - s.computeInitialMetrics.Do(func() { - // Metrics depend in part on the system config. Compute them as soon as we - // get the first system config, then periodically in the background - // (managed by the Node). - if err := s.ComputeMetrics(ctx); err != nil { - log.Infof(ctx, "%s: failed initial metrics computation: %s", s, err) - } - log.Event(ctx, "computed initial metrics") - }) - - // We'll want to offer all replicas to the split and merge queues. Be a little - // careful about not spawning too many individual goroutines. - shouldQueue := s.systemConfigUpdateQueueRateLimiter.AdmitN(1) - - // For every range, update its zone config and check if it needs to - // be split or merged. - now := s.cfg.Clock.NowAsClockTimestamp() - newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { - key := repl.Desc().StartKey - conf, err := sysCfg.GetSpanConfigForKey(ctx, key) - if err != nil { - if log.V(1) { - log.Infof(context.TODO(), "failed to get span config for key %s", key) - } - conf = s.cfg.DefaultSpanConfig - } - - if s.cfg.SpanConfigsDisabled || - !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { - repl.SetSpanConfig(conf) - } - - if shouldQueue { - s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) { - h.MaybeAdd(ctx, repl, now) - }) - s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) { - h.MaybeAdd(ctx, repl, now) - }) - } - return true // more - }) -} - // onSpanConfigUpdate is the callback invoked whenever this store learns of a // span config update. func (s *Store) onSpanConfigUpdate(ctx context.Context, updated roachpb.Span) { @@ -2428,16 +2258,9 @@ func (s *Store) applyAllFromSpanConfigStore(ctx context.Context) { }) } -func (s *Store) asyncGossipStore(ctx context.Context, reason string, useCached bool) { - if err := s.stopper.RunAsyncTask( - ctx, fmt.Sprintf("storage.Store: gossip on %s", reason), - func(ctx context.Context) { - if err := s.GossipStore(ctx, useCached); err != nil { - log.Warningf(ctx, "error gossiping on %s: %+v", reason, err) - } - }); err != nil { - log.Warningf(ctx, "unable to gossip on %s: %+v", reason, err) - } +// GossipStore broadcasts the store on the gossip network. +func (s *Store) GossipStore(ctx context.Context, useCached bool) error { + return s.storeGossip.GossipStore(ctx, useCached) } // UpdateIOThreshold updates the IOThreshold reported in the StoreDescriptor. @@ -2447,120 +2270,6 @@ func (s *Store) UpdateIOThreshold(ioThreshold *admissionpb.IOThreshold) { s.ioThreshold.t = ioThreshold } -// GossipStore broadcasts the store on the gossip network. -func (s *Store) GossipStore(ctx context.Context, useCached bool) error { - // Temporarily indicate that we're gossiping the store capacity to avoid - // recursively triggering a gossip of the store capacity. - syncutil.StoreFloat64(&s.gossipQueriesPerSecondVal, -1) - syncutil.StoreFloat64(&s.gossipWritesPerSecondVal, -1) - - storeDesc, err := s.Descriptor(ctx, useCached) - if err != nil { - return errors.Wrapf(err, "problem getting store descriptor for store %+v", s.Ident) - } - - // Set countdown target for re-gossiping capacity to be large enough that - // it would only occur when there has been significant changes. We - // currently gossip every 10 seconds, meaning that unless significant - // redistribution occurs we do not wish to gossip again to avoid wasting - // bandwidth and racing with local storepool estimations. - // TODO(kvoli): Reconsider what triggers gossip here and possibly limit to - // only significant workload changes (load), rather than lease or range - // count. Previoulsy, this was not as much as an issue as the gossip - // interval was 60 seconds, such that gossiping semi-frequently on changes - // was required. - rangeCountdown := float64(storeDesc.Capacity.RangeCount) * s.cfg.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction - atomic.StoreInt32(&s.gossipRangeCountdown, int32(math.Ceil(math.Max(rangeCountdown, 10)))) - leaseCountdown := float64(storeDesc.Capacity.LeaseCount) * s.cfg.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction - atomic.StoreInt32(&s.gossipLeaseCountdown, int32(math.Ceil(math.Max(leaseCountdown, 10)))) - syncutil.StoreFloat64(&s.gossipQueriesPerSecondVal, storeDesc.Capacity.QueriesPerSecond) - syncutil.StoreFloat64(&s.gossipWritesPerSecondVal, storeDesc.Capacity.WritesPerSecond) - - // Unique gossip key per store. - gossipStoreKey := gossip.MakeStoreDescKey(storeDesc.StoreID) - // Gossip store descriptor. - if fn := s.cfg.TestingKnobs.StoreGossipIntercept; fn != nil { - // Give the interceptor a chance to see and/or mutate the descriptor we're about - // to gossip. - fn(storeDesc) - } - return s.cfg.Gossip.AddInfoProto(gossipStoreKey, storeDesc, gossip.StoreTTL) -} - -type capacityChangeEvent int - -const ( - rangeAddEvent capacityChangeEvent = iota - rangeRemoveEvent - leaseAddEvent - leaseRemoveEvent -) - -// maybeGossipOnCapacityChange decrements the countdown on range -// and leaseholder counts. If it reaches 0, then we trigger an -// immediate gossip of this store's descriptor, to include updated -// capacity information. -func (s *Store) maybeGossipOnCapacityChange(ctx context.Context, cce capacityChangeEvent) { - if s.cfg.TestingKnobs.DisableLeaseCapacityGossip && (cce == leaseAddEvent || cce == leaseRemoveEvent) { - return - } - - // Incrementally adjust stats to keep them up to date even if the - // capacity is gossiped, but isn't due yet to be recomputed from scratch. - s.cachedCapacity.Lock() - switch cce { - case rangeAddEvent: - s.cachedCapacity.RangeCount++ - case rangeRemoveEvent: - s.cachedCapacity.RangeCount-- - case leaseAddEvent: - s.cachedCapacity.LeaseCount++ - case leaseRemoveEvent: - s.cachedCapacity.LeaseCount-- - } - s.cachedCapacity.Unlock() - - if ((cce == rangeAddEvent || cce == rangeRemoveEvent) && atomic.AddInt32(&s.gossipRangeCountdown, -1) == 0) || - ((cce == leaseAddEvent || cce == leaseRemoveEvent) && atomic.AddInt32(&s.gossipLeaseCountdown, -1) == 0) { - // Reset countdowns to avoid unnecessary gossiping. - atomic.StoreInt32(&s.gossipRangeCountdown, 0) - atomic.StoreInt32(&s.gossipLeaseCountdown, 0) - s.asyncGossipStore(ctx, "capacity change", true /* useCached */) - } -} - -// recordNewPerSecondStats takes recently calculated values for the number of -// queries and key writes the store is handling and decides whether either has -// changed enough to justify re-gossiping the store's capacity. -func (s *Store) recordNewPerSecondStats(newQPS, newWPS float64) { - oldQPS := syncutil.LoadFloat64(&s.gossipQueriesPerSecondVal) - oldWPS := syncutil.LoadFloat64(&s.gossipWritesPerSecondVal) - if oldQPS == -1 || oldWPS == -1 { - // Gossiping of store capacity is already ongoing. - return - } - - const minAbsoluteChange = 100 - updateForQPS := (newQPS < oldQPS*.5 || newQPS > oldQPS*1.5) && math.Abs(newQPS-oldQPS) > minAbsoluteChange - updateForWPS := (newWPS < oldWPS*.5 || newWPS > oldWPS*1.5) && math.Abs(newWPS-oldWPS) > minAbsoluteChange - - if !updateForQPS && !updateForWPS { - return - } - - var message string - if updateForQPS && updateForWPS { - message = "queries-per-second and writes-per-second change" - } else if updateForQPS { - message = "queries-per-second change" - } else { - message = "writes-per-second change" - } - // TODO(a-robinson): Use the provided values to avoid having to recalculate - // them in GossipStore. - s.asyncGossipStore(context.TODO(), message, false /* useCached */) -} - // VisitReplicasOption optionally modifies store.VisitReplicas. type VisitReplicasOption func(*storeReplicaVisitor) @@ -2821,9 +2530,7 @@ func (s *Store) Properties() roachpb.StoreProperties { // internal statistics about its replicas. func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapacity, error) { if useCached { - s.cachedCapacity.Lock() - capacity := s.cachedCapacity.StoreCapacity - s.cachedCapacity.Unlock() + capacity := s.storeGossip.CachedCapacity() if capacity != (roachpb.StoreCapacity{}) { return capacity, nil } @@ -2887,13 +2594,11 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa } capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica) capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica) - s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond) + s.storeGossip.RecordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond) s.replRankings.Update(rankingsAccumulator) s.replRankingsByTenant.Update(rankingsByTenantAccumulator) - s.cachedCapacity.Lock() - s.cachedCapacity.StoreCapacity = capacity - s.cachedCapacity.Unlock() + s.storeGossip.UpdateCachedCapacity(capacity) return capacity, nil } @@ -3124,7 +2829,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.AverageReadBytesPerSecond.Update(averageReadBytesPerSecond) s.metrics.AverageWriteBytesPerSecond.Update(averageWriteBytesPerSecond) s.metrics.AverageCPUNanosPerSecond.Update(averageCPUNanosPerSecond) - s.recordNewPerSecondStats(averageQueriesPerSecond, averageWritesPerSecond) + s.storeGossip.RecordNewPerSecondStats(averageQueriesPerSecond, averageWritesPerSecond) s.metrics.RangeCount.Update(rangeCount) s.metrics.UnavailableRangeCount.Update(unavailableRangeCount) diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index c8131c2e5f10..8d0162ca673c 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -439,7 +439,7 @@ func (s *Store) maybeMarkReplicaInitializedLockedReplLocked( // Add the range to metrics and maybe gossip on capacity change. s.metrics.ReplicaCount.Inc(1) - s.maybeGossipOnCapacityChange(ctx, rangeAddEvent) + s.storeGossip.MaybeGossipOnCapacityChange(ctx, RangeAddEvent) return nil } diff --git a/pkg/kv/kvserver/store_gossip.go b/pkg/kv/kvserver/store_gossip.go new file mode 100644 index 000000000000..4bec1a7aff4a --- /dev/null +++ b/pkg/kv/kvserver/store_gossip.go @@ -0,0 +1,508 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +const ( + // defaultGossipWhenCapacityDeltaExceedsFraction specifies the fraction from the + // last gossiped store capacity values which need be exceeded before the + // store will gossip immediately without waiting for the periodic gossip + // interval. + defaultGossipWhenCapacityDeltaExceedsFraction = 0.05 + + // gossipWhenLeaseCountDeltaExceeds specifies the absolute change from the + // last gossiped store capacity lease count which needs to be exceeded + // before the store will gossip immediately without waiting for the periodic + // gossip interval. + gossipWhenLeaseCountDeltaExceeds = 5 + + // gossipWhenRangeCountDeltaExceeds specifies the absolute change from the + // last gossiped store capacity range count which needs to be exceeded + // before the store will gossip immediately without waiting for the + // periodic gossip interval. + gossipWhenRangeCountDeltaExceeds = 5 + + // gossipWhenLoadDeltaExceedsFraction specifies the fraction from the last + // gossiped store capacity load which needs to be exceeded before the store + // will gossip immediately without waiting for the periodic gossip interval. + gossipWhenLoadDeltaExceedsFraction = 0.5 + + // gossipMinAbsoluteDelta is the minimum delta in load that is required to + // trigger gossip. This stops frequent re-gossiping when load values + // fluctuate but are insignificant in absolute terms. + gossipMinAbsoluteDelta = 100 + + // systemDataGossipInterval is the interval at which range lease + // holders verify that the most recent system data is gossiped. + // This ensures that system data is always eventually gossiped, even + // if a range lease holder experiences a failure causing a missed + // gossip update. + systemDataGossipInterval = 1 * time.Minute +) + +var errPeriodicGossipsDisabled = errors.New("periodic gossip is disabled") + +// startGossip runs an infinite loop in a goroutine which regularly checks +// whether the store has a first range or config replica and asks those ranges +// to gossip accordingly. +// +// TODO(kvoli): Refactor this function to sit on the StoreGossip struct, +// rather than on the store. +func (s *Store) startGossip() { + wakeReplica := func(ctx context.Context, repl *Replica) error { + // Acquire the range lease, which in turn triggers system data gossip + // functions (e.g. MaybeGossipSystemConfig or MaybeGossipNodeLiveness). + _, pErr := repl.getLeaseForGossip(ctx) + return pErr.GoError() + } + gossipFns := []struct { + key roachpb.Key + fn func(context.Context, *Replica) error + description redact.SafeString + interval time.Duration + }{ + { + key: roachpb.KeyMin, + fn: func(ctx context.Context, repl *Replica) error { + // The first range is gossiped by all replicas, not just the lease + // holder, so wakeReplica is not used here. + return repl.maybeGossipFirstRange(ctx).GoError() + }, + description: "first range descriptor", + interval: s.cfg.SentinelGossipTTL() / 2, + }, + { + key: keys.NodeLivenessSpan.Key, + fn: wakeReplica, + description: "node liveness", + interval: systemDataGossipInterval, + }, + } + + cannotGossipEvery := log.Every(time.Minute) + cannotGossipEvery.ShouldLog() // only log next time after waiting out the delay + + // Periodic updates run in a goroutine and signal a WaitGroup upon completion + // of their first iteration. + s.initComplete.Add(len(gossipFns)) + for _, gossipFn := range gossipFns { + gossipFn := gossipFn // per-iteration copy + bgCtx := s.AnnotateCtx(context.Background()) + if err := s.stopper.RunAsyncTask(bgCtx, "store-gossip", func(ctx context.Context) { + ticker := time.NewTicker(gossipFn.interval) + defer ticker.Stop() + for first := true; ; { + // Retry in a backoff loop until gossipFn succeeds. The gossipFn might + // temporarily fail (e.g. because node liveness hasn't initialized yet + // making it impossible to get an epoch-based range lease), in which + // case we want to retry quickly. + retryOptions := base.DefaultRetryOptions() + retryOptions.Closer = s.stopper.ShouldQuiesce() + for r := retry.Start(retryOptions); r.Next(); { + if repl := s.LookupReplica(roachpb.RKey(gossipFn.key)); repl != nil { + annotatedCtx := repl.AnnotateCtx(ctx) + if err := gossipFn.fn(annotatedCtx, repl); err != nil { + if cannotGossipEvery.ShouldLog() { + log.Infof(annotatedCtx, "could not gossip %s: %v", gossipFn.description, err) + } + if !errors.Is(err, errPeriodicGossipsDisabled) { + continue + } + } + } + break + } + if first { + first = false + s.initComplete.Done() + } + select { + case <-ticker.C: + case <-s.stopper.ShouldQuiesce(): + return + } + } + }); err != nil { + s.initComplete.Done() + } + } +} + +var errSysCfgUnavailable = errors.New("system config not available in gossip") + +// systemGossipUpdate is a callback for gossip updates to +// the system config which affect range split boundaries. +// +// TODO(kvoli): Refactor this function to sit on the store gossip struct, +// rather than on the store. +func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { + if !s.cfg.SpanConfigsDisabled && spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { + return // nothing to do + } + + ctx := s.AnnotateCtx(context.Background()) + s.computeInitialMetrics.Do(func() { + // Metrics depend in part on the system config. Compute them as soon as we + // get the first system config, then periodically in the background + // (managed by the Node). + if err := s.ComputeMetrics(ctx); err != nil { + log.Infof(ctx, "%s: failed initial metrics computation: %s", s, err) + } + log.Event(ctx, "computed initial metrics") + }) + + // We'll want to offer all replicas to the split and merge queues. Be a little + // careful about not spawning too many individual goroutines. + shouldQueue := s.systemConfigUpdateQueueRateLimiter.AdmitN(1) + + // For every range, update its zone config and check if it needs to + // be split or merged. + now := s.cfg.Clock.NowAsClockTimestamp() + newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { + key := repl.Desc().StartKey + conf, err := sysCfg.GetSpanConfigForKey(ctx, key) + if err != nil { + if log.V(1) { + log.Infof(context.TODO(), "failed to get span config for key %s", key) + } + conf = s.cfg.DefaultSpanConfig + } + + if s.cfg.SpanConfigsDisabled || + !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { + repl.SetSpanConfig(conf) + } + + if shouldQueue { + s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) + } + return true // more + }) +} + +// cachedCapacity caches information on store capacity to prevent +// expensive recomputations in case leases or replicas are rapidly +// rebalancing. +type cachedCapacity struct { + syncutil.Mutex + cached, lastGossiped roachpb.StoreCapacity +} + +// StoreGossip is responsible for gossiping the store descriptor. It maintains +// state for cached information to gosip and countdown to gossip more +// frequently on updates. +type StoreGossip struct { + // Ident is the identity of the store this store gossip is associated with. + // This field is set after initialization, at store Start(). + Ident roachpb.StoreIdent + stopper *stop.Stopper + knobs StoreGossipTestingKnobs + // cachedCapacity caches information on store capacity to prevent + // expensive recomputations in case leases or replicas are rapidly + // rebalancing. + cachedCapacity *cachedCapacity + // gossipOngoing indicates whether there is currently a triggered gossip, + // to avoid recursively re-triggering gossip. + gossipOngoing syncutil.AtomicBool + // gossiper is used for adding information to gossip. + gossiper InfoGossiper + // descriptorGetter is used for getting an up to date or cached store + // descriptor to gossip. + descriptorGetter StoreDescriptorProvider +} + +// StoreGossipTestingKnobs defines the testing knobs specific to StoreGossip. +type StoreGossipTestingKnobs struct { + // OverrideGossipWhenCapacityDeltaExceedsFraction specifies the fraction + // from the last gossiped store capacity values which need be exceeded + // before the store will gossip immediately without waiting for the + // periodic gossip interval. This is ignored unless set to a value > 0. + OverrideGossipWhenCapacityDeltaExceedsFraction float64 + // This method, if set, gets to see (and mutate, if desired) any local + // StoreDescriptor before it is being sent out on the Gossip network. + StoreGossipIntercept func(descriptor *roachpb.StoreDescriptor) + // DisableLeaseCapacityGossip disables the ability of a changing number of + // leases to trigger the store to gossip its capacity. With this enabled, + // only changes in the number of replicas can cause the store to gossip its + // capacity. + DisableLeaseCapacityGossip bool + // AsyncDisabled indicates that asyncGossipStore should not be treated as + // async. + AsyncDisabled bool +} + +// NewStoreGossip returns a new StoreGosip which may be used for gossiping the +// store descriptor: both proactively, calling Gossip() and reacively on +// capacity/load changes. +func NewStoreGossip( + gossiper InfoGossiper, descGetter StoreDescriptorProvider, testingKnobs StoreGossipTestingKnobs, +) *StoreGossip { + return &StoreGossip{ + cachedCapacity: &cachedCapacity{}, + gossiper: gossiper, + descriptorGetter: descGetter, + knobs: testingKnobs, + } +} + +// CachedCapacity returns the current cached capacity. +func (s *StoreGossip) CachedCapacity() roachpb.StoreCapacity { + s.cachedCapacity.Lock() + defer s.cachedCapacity.Unlock() + + return s.cachedCapacity.cached +} + +// UpdateCachedCapacity updates the cached capacity to be equal to the capacity +// given. +func (s *StoreGossip) UpdateCachedCapacity(capacity roachpb.StoreCapacity) { + s.cachedCapacity.Lock() + defer s.cachedCapacity.Unlock() + + s.cachedCapacity.cached = capacity +} + +// StoreDescriptorProvider provides a method to access the store descriptor. +type StoreDescriptorProvider interface { + // Descriptor returns a StoreDescriptor including current store + // capacity information. + Descriptor(ctx context.Context, cached bool) (*roachpb.StoreDescriptor, error) +} + +var _ StoreDescriptorProvider = &Store{} + +// InfoGossiper provides a method to add a message to gossip. +type InfoGossiper interface { + // AddInfoProto adds or updates an info object in gossip. Returns an error + // if info couldn't be added. + AddInfoProto(key string, msg protoutil.Message, ttl time.Duration) error +} + +var _ InfoGossiper = &gossip.Gossip{} + +// asyncGossipStore asynchronously gossips the store descriptor, for a given +// reason. A cached descriptor is used if specified, otherwise the store +// descriptor is updated and capacities recalculated. +func (s *StoreGossip) asyncGossipStore(ctx context.Context, reason string, useCached bool) { + gossipFn := func(ctx context.Context) { + log.VEventf(ctx, 2, "gossiping on %s", reason) + if err := s.GossipStore(ctx, useCached); err != nil { + log.Warningf(ctx, "error gossiping on %s: %+v", reason, err) + } + } + + // If async is disabled, then gossip immediately rather than running the + // gossipFn in a task. + if s.knobs.AsyncDisabled { + gossipFn(ctx) + return + } + + if err := s.stopper.RunAsyncTask( + ctx, fmt.Sprintf("storage.Store: gossip on %s", reason), gossipFn, + ); err != nil { + log.Warningf(ctx, "unable to gossip on %s: %+v", reason, err) + } +} + +// GossipStore broadcasts the store on the gossip network. +func (s *StoreGossip) GossipStore(ctx context.Context, useCached bool) error { + // Temporarily indicate that we're gossiping the store capacity to avoid + // recursively triggering a gossip of the store capacity. This doesn't + // block direct calls to GossipStore, rather capacity triggered gossip + // outlined in the methods below. + s.gossipOngoing.Set(true) + defer s.gossipOngoing.Set(false) + + storeDesc, err := s.descriptorGetter.Descriptor(ctx, useCached) + if err != nil { + return errors.Wrapf(err, "problem getting store descriptor for store %+v", s.Ident) + } + + // Set countdown target for re-gossiping capacity to be large enough that + // it would only occur when there has been significant changes. We + // currently gossip every 10 seconds, meaning that unless significant + // redistribution occurs we do not wish to gossip again to avoid wasting + // bandwidth and racing with local storepool estimations. + // TODO(kvoli): Reconsider what triggers gossip here and possibly limit to + // only significant workload changes (load), rather than lease or range + // count. Previoulsy, this was not as much as an issue as the gossip + // interval was 60 seconds, such that gossiping semi-frequently on changes + // was required. + s.cachedCapacity.Lock() + s.cachedCapacity.lastGossiped = storeDesc.Capacity + s.cachedCapacity.Unlock() + + // Unique gossip key per store. + gossipStoreKey := gossip.MakeStoreDescKey(storeDesc.StoreID) + // Gossip store descriptor. + if fn := s.knobs.StoreGossipIntercept; fn != nil { + // Give the interceptor a chance to see and/or mutate the descriptor we're about + // to gossip. + fn(storeDesc) + } + + return s.gossiper.AddInfoProto(gossipStoreKey, storeDesc, gossip.StoreTTL) +} + +// CapacityChangeEvent represents a change in a store's capacity for either +// leases or replicas. +type CapacityChangeEvent int + +const ( + RangeAddEvent CapacityChangeEvent = iota + RangeRemoveEvent + LeaseAddEvent + LeaseRemoveEvent +) + +// maybeGossipOnCapacityChange decrements the countdown on range +// and leaseholder counts. If it reaches 0, then we trigger an +// immediate gossip of this store's descriptor, to include updated +// capacity information. +func (s *StoreGossip) MaybeGossipOnCapacityChange(ctx context.Context, cce CapacityChangeEvent) { + if s.knobs.DisableLeaseCapacityGossip && (cce == LeaseAddEvent || cce == LeaseRemoveEvent) { + return + } + + // Incrementally adjust stats to keep them up to date even if the + // capacity is gossiped, but isn't due yet to be recomputed from scratch. + s.cachedCapacity.Lock() + switch cce { + case RangeAddEvent: + s.cachedCapacity.cached.RangeCount++ + case RangeRemoveEvent: + s.cachedCapacity.cached.RangeCount-- + case LeaseAddEvent: + s.cachedCapacity.cached.LeaseCount++ + case LeaseRemoveEvent: + s.cachedCapacity.cached.LeaseCount-- + } + s.cachedCapacity.Unlock() + + if shouldGossip, reason := s.shouldGossipOnCapacityDelta(); shouldGossip { + s.asyncGossipStore(context.TODO(), reason, true /* useCached */) + } +} + +// recordNewPerSecondStats takes recently calculated values for the number of +// queries and key writes the store is handling and decides whether either has +// changed enough to justify re-gossiping the store's capacity. +func (s *StoreGossip) RecordNewPerSecondStats(newQPS, newWPS float64) { + // Overwrite stats to keep them up to date even if the capacity is + // gossiped, but isn't due yet to be recomputed from scratch. + s.cachedCapacity.Lock() + s.cachedCapacity.cached.QueriesPerSecond = newQPS + s.cachedCapacity.cached.WritesPerSecond = newWPS + s.cachedCapacity.Unlock() + + if shouldGossip, reason := s.shouldGossipOnCapacityDelta(); shouldGossip { + // TODO(a-robinson): Use the provided values to avoid having to recalculate + // them in GossipStore. + s.asyncGossipStore(context.TODO(), reason, false /* useCached */) + } +} + +// shouldGossipOnCapacityDelta determines whether the difference between the +// last gossiped store capacity and the currently cached capacity is large +// enough that gossiping immediately is required to avoid poor allocation +// decisions by stores in the cluster. The difference must be large enough in +// both absolute and relative terms in order to trigger gossip. +func (s *StoreGossip) shouldGossipOnCapacityDelta() (should bool, reason string) { + // If there is an ongoing gossip attempt, then there is no need to regossip + // immediately as we will already be gossiping an up to date (cached) capacity. + if s.gossipOngoing.Get() { + return + } + + gossipWhenCapacityDeltaExceedsFraction := defaultGossipWhenCapacityDeltaExceedsFraction + if overrideCapacityDeltaFraction := s.knobs.OverrideGossipWhenCapacityDeltaExceedsFraction; overrideCapacityDeltaFraction > 0 { + gossipWhenCapacityDeltaExceedsFraction = overrideCapacityDeltaFraction + } + + s.cachedCapacity.Lock() + updateForQPS, deltaQPS := deltaExceedsThreshold( + s.cachedCapacity.lastGossiped.QueriesPerSecond, s.cachedCapacity.cached.QueriesPerSecond, + gossipMinAbsoluteDelta, gossipWhenLoadDeltaExceedsFraction) + updateForWPS, deltaWPS := deltaExceedsThreshold( + s.cachedCapacity.lastGossiped.WritesPerSecond, s.cachedCapacity.cached.WritesPerSecond, + gossipMinAbsoluteDelta, gossipWhenLoadDeltaExceedsFraction) + + updateForRangeCount, deltaRangeCount := deltaExceedsThreshold( + float64(s.cachedCapacity.lastGossiped.RangeCount), float64(s.cachedCapacity.cached.RangeCount), + gossipWhenRangeCountDeltaExceeds, gossipWhenCapacityDeltaExceedsFraction) + + updateForLeaseCount, deltaLeaseCount := deltaExceedsThreshold( + float64(s.cachedCapacity.lastGossiped.LeaseCount), float64(s.cachedCapacity.cached.LeaseCount), + gossipWhenLeaseCountDeltaExceeds, gossipWhenCapacityDeltaExceedsFraction) + s.cachedCapacity.Unlock() + + if updateForQPS { + reason += fmt.Sprintf("queries-per-second(%.1f) ", deltaQPS) + } + if updateForWPS { + reason += fmt.Sprintf("writes-per-second(%.1f) ", deltaWPS) + } + if updateForRangeCount { + reason += fmt.Sprintf("range-count(%.1f) ", deltaRangeCount) + } + if updateForLeaseCount { + reason += fmt.Sprintf("lease-count(%.1f) ", deltaLeaseCount) + } + if reason != "" { + should = true + reason += "change" + } + return should, reason +} + +// shouldGossipOnLoadDelta takes in old gossiped load value and a new one, +// returning true if the delta exceeds the threshold to gossip. +func deltaExceedsThreshold( + old, cur, requiredMinDelta, requiredDeltaFraction float64, +) (exceeds bool, delta float64) { + delta = cur - old + deltaAbsolute := math.Abs(cur - old) + deltaFraction := 10e9 + // If the old value was not zero, then calculate the fractional delta. + // Otherwise it is undefined and we defer to the absolute check by setting + // it to a high number. + if old != 0 { + deltaFraction = deltaAbsolute / old + } + exceeds = deltaAbsolute >= requiredMinDelta && deltaFraction >= requiredDeltaFraction + return exceeds, delta +} diff --git a/pkg/kv/kvserver/store_gossip_test.go b/pkg/kv/kvserver/store_gossip_test.go new file mode 100644 index 000000000000..23cc3666c9f5 --- /dev/null +++ b/pkg/kv/kvserver/store_gossip_test.go @@ -0,0 +1,82 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestStoreGossipDeltaTrigger asserts that the delta between the last gosipped +// capacity and the current cached capacity will trigger gossip depending on +// the change. +func TestStoreGossipDeltaTrigger(t *testing.T) { + defer leaktest.AfterTest(t)() + testCases := []struct { + desc string + cached, lastGossiped roachpb.StoreCapacity + expectedReason string + expectedShould bool + }{ + { + desc: "no delta (empty): shouldn't gossip", + lastGossiped: roachpb.StoreCapacity{}, + cached: roachpb.StoreCapacity{}, + expectedReason: "", + expectedShould: false, + }, + { + desc: "no delta: shouldn't gossip", + lastGossiped: roachpb.StoreCapacity{QueriesPerSecond: 1000, WritesPerSecond: 1000, RangeCount: 1000, LeaseCount: 1000}, + cached: roachpb.StoreCapacity{QueriesPerSecond: 1000, WritesPerSecond: 1000, RangeCount: 1000, LeaseCount: 1000}, + expectedReason: "", + expectedShould: false, + }, + { + desc: "delta less than abs: shouldn't gossip", + lastGossiped: roachpb.StoreCapacity{QueriesPerSecond: 100, WritesPerSecond: 100, RangeCount: 100, LeaseCount: 100}, + cached: roachpb.StoreCapacity{QueriesPerSecond: 199, WritesPerSecond: 199, RangeCount: 104, LeaseCount: 104}, + expectedReason: "", + expectedShould: false, + }, + { + desc: "should gossip on qps delta (>50%)", + lastGossiped: roachpb.StoreCapacity{QueriesPerSecond: 100, WritesPerSecond: 100, RangeCount: 100, LeaseCount: 100}, + cached: roachpb.StoreCapacity{QueriesPerSecond: 200, WritesPerSecond: 199, RangeCount: 96, LeaseCount: 96}, + expectedReason: "queries-per-second(100.0) change", + expectedShould: true, + }, + { + desc: "should gossip on all delta", + lastGossiped: roachpb.StoreCapacity{QueriesPerSecond: 100, WritesPerSecond: 100, RangeCount: 10, LeaseCount: 10}, + cached: roachpb.StoreCapacity{QueriesPerSecond: 200, WritesPerSecond: 0, RangeCount: 15, LeaseCount: 5}, + expectedReason: "queries-per-second(100.0) writes-per-second(-100.0) range-count(5.0) lease-count(-5.0) change", + expectedShould: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + cfg := &StoreConfig{} + cfg.SetDefaults() + sg := NewStoreGossip(nil, nil, cfg.TestingKnobs.GossipTestingKnobs) + sg.cachedCapacity.cached = tc.cached + sg.cachedCapacity.lastGossiped = tc.lastGossiped + + should, reason := sg.shouldGossipOnCapacityDelta() + require.Equal(t, tc.expectedReason, reason) + require.Equal(t, tc.expectedShould, should) + }) + } +} diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index 4ec79ec8920e..e0cf6515a599 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -222,7 +222,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( return nil }() - s.maybeGossipOnCapacityChange(ctx, rangeRemoveEvent) + s.storeGossip.MaybeGossipOnCapacityChange(ctx, RangeRemoveEvent) s.scanner.RemoveReplica(rep) return ph, nil } diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index c61824bee315..d787397674b0 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -364,7 +364,7 @@ func (s *Store) SplitRange( } // Add the range to metrics and maybe gossip on capacity change. s.metrics.ReplicaCount.Inc(1) - s.maybeGossipOnCapacityChange(ctx, rangeAddEvent) + s.storeGossip.MaybeGossipOnCapacityChange(ctx, RangeAddEvent) } return nil diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 3835ee8a694f..ab998562d9af 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -43,6 +43,7 @@ type StoreTestingKnobs struct { TenantRateKnobs tenantrate.TestingKnobs EngineKnobs []storage.ConfigOption AllocatorKnobs *allocator.TestingKnobs + GossipTestingKnobs StoreGossipTestingKnobs // TestingRequestFilter is called before evaluating each request on a // replica. The filter is run before the request acquires latches, so @@ -252,11 +253,6 @@ type StoreTestingKnobs struct { // SkipMinSizeCheck, if set, makes the store creation process skip the check // for a minimum size. SkipMinSizeCheck bool - // DisableLeaseCapacityGossip disables the ability of a changing number of - // leases to trigger the store to gossip its capacity. With this enabled, - // only changes in the number of replicas can cause the store to gossip its - // capacity. - DisableLeaseCapacityGossip bool // SystemLogsGCPeriod is used to override the period of GC of system logs. SystemLogsGCPeriod time.Duration // SystemLogsGCGCDone is used to notify when system logs GC is done. @@ -383,10 +379,6 @@ type StoreTestingKnobs struct { // bootstrapping ranges. This is used for testing the migration // infrastructure. InitialReplicaVersionOverride *roachpb.Version - // GossipWhenCapacityDeltaExceedsFraction specifies the fraction from the last - // gossiped store capacity values which need be exceeded before the store will - // gossip immediately without waiting for the periodic gossip interval. - GossipWhenCapacityDeltaExceedsFraction float64 // TimeSeriesDataStore is an interface used by the store's time series // maintenance queue to dispatch individual maintenance tasks. TimeSeriesDataStore TimeSeriesDataStore @@ -432,10 +424,6 @@ type StoreTestingKnobs struct { // send snapshot semaphore. AfterSendSnapshotThrottle func() - // This method, if set, gets to see (and mutate, if desired) any local - // StoreDescriptor before it is being sent out on the Gossip network. - StoreGossipIntercept func(descriptor *roachpb.StoreDescriptor) - // EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`. EnqueueReplicaInterceptor func(queueName string, replica *Replica)