From 61522754f39f1b7eb3ef977040ee614e5d6807b7 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Tue, 26 Apr 2022 23:27:11 +0000 Subject: [PATCH] asim: replica load adapter This patch introduces an adapter for the simulator state of load on stores and replicas, into the datastructures supported by the allocator. Replica load may be experimented with in the future and this patch defines an interface for interacting with recording and summarizing replica load. Store load is partially an aggregation of replica load and therefore relies on the replica load interface to calculate capacity. Release note: None --- pkg/kv/kvserver/asim/BUILD.bazel | 2 + pkg/kv/kvserver/asim/asim.go | 66 ++------------------------ pkg/kv/kvserver/asim/asim_test.go | 18 +++---- pkg/kv/kvserver/asim/load.go | 79 +++++++++++++++++++++++++++++++ 4 files changed, 94 insertions(+), 71 deletions(-) create mode 100644 pkg/kv/kvserver/asim/load.go diff --git a/pkg/kv/kvserver/asim/BUILD.bazel b/pkg/kv/kvserver/asim/BUILD.bazel index 4fe3bf679b6e..e39de0dee531 100644 --- a/pkg/kv/kvserver/asim/BUILD.bazel +++ b/pkg/kv/kvserver/asim/BUILD.bazel @@ -5,11 +5,13 @@ go_library( srcs = [ "asim.go", "config_loader.go", + "load.go", "workload.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim", visibility = ["//visibility:public"], deps = [ + "//pkg/kv/kvserver/allocator", "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/roachpb", "//pkg/util/timeutil", diff --git a/pkg/kv/kvserver/asim/asim.go b/pkg/kv/kvserver/asim/asim.go index 2fa931288dba..b96b55e3c015 100644 --- a/pkg/kv/kvserver/asim/asim.go +++ b/pkg/kv/kvserver/asim/asim.go @@ -105,30 +105,6 @@ func (rm *RangeMap) GetRange(key string) *Range { return rng } -// ReplicaLoad is the sum of all key accesses and size of bytes, both written -// and read. -// TODO(kvoli): In the non-simulated code, replica_stats currently maintains -// this structure, which is rated. This datastructure needs to be adapated by -// the user to be rated over time. In the future we should introduce a better -// general pupose stucture that enables rating. -type ReplicaLoad struct { - WriteKeys int64 - WriteBytes int64 - ReadKeys int64 - ReadBytes int64 -} - -// applyReplicaLoad applies a load event onto a replica. -func (r *Replica) applyReplicaLoad(le LoadEvent) { - if le.isWrite { - r.ReplicaLoad.WriteBytes += le.size - r.ReplicaLoad.WriteKeys++ - } else { - r.ReplicaLoad.ReadBytes += le.size - r.ReplicaLoad.ReadKeys++ - } -} - // Replica represents a replica of a range. type Replica struct { spanConf *roachpb.SpanConfig @@ -221,7 +197,7 @@ func (s *State) AddReplica(r *Range, node int) int { spanConf: &roachpb.SpanConfig{}, rangeDesc: r.Desc, replDesc: &desc, - ReplicaLoad: ReplicaLoad{}, + ReplicaLoad: &ReplicaLoadCounter{}, } store := nodeImpl.Stores[0] @@ -247,38 +223,6 @@ func (s *State) AddReplica(r *Range, node int) int { return int(desc.ReplicaID) } -// StoreLoad represents the current load of the store. -type StoreLoad struct { - WriteKeys int64 - WriteBytes int64 - ReadKeys int64 - ReadBytes int64 - RangeCount int64 - LeaseCount int64 -} - -// GetStoreLoad returns the current store load. The values in Write(Read) -// Bytes(Keys) represent the aggregation across all leaseholder replicas on -// this store. These values are not rated, instead they are counters. -// TODO(kvoli): We should separate out recording of workload against -// replicas and stores into a separate file. The internal capture datastructure -// is less important. Then define an interface that transforms the recorded -// load into store descriptors and range usage info. -func (s *Store) GetStoreLoad() StoreLoad { - storeLoad := StoreLoad{} - for _, repl := range s.Replicas { - if repl.leaseHolder { - storeLoad.LeaseCount++ - } - storeLoad.RangeCount++ - storeLoad.ReadKeys += repl.ReplicaLoad.ReadKeys - storeLoad.WriteKeys += repl.ReplicaLoad.WriteKeys - storeLoad.WriteBytes += repl.ReplicaLoad.WriteBytes - storeLoad.ReadBytes += repl.ReplicaLoad.ReadBytes - } - return storeLoad -} - // ApplyAllocatorAction updates the state with allocator ops such as // moving/adding/removing replicas. func (s *State) ApplyAllocatorAction( @@ -292,14 +236,12 @@ func (s *State) ApplyAllocatorAction( // written and therefore reads never fail. func (s *State) ApplyLoad(ctx context.Context, le LoadEvent) { r := s.Ranges.GetRange(fmt.Sprintf("%d", le.Key)) - - // Apply the load event to the leaseholder replica of the range this key is contained in. - // - //NB: Reads may occur in practice across follower replicas, however these + // NB: Reads may occur in practice across follower replicas, however these // statistics are not currently considered in allocation decisions. // Initially we ignore workload on followers. // TODO(kvoli): Apply write/read load to followers. - r.Leaseholder.applyReplicaLoad(le) + leaseHolder := r.Leaseholder + leaseHolder.ReplicaLoad.ApplyLoad(le) } func shouldRun(time.Time) bool { diff --git a/pkg/kv/kvserver/asim/asim_test.go b/pkg/kv/kvserver/asim/asim_test.go index 58fb2e003e23..0a70fa7b308b 100644 --- a/pkg/kv/kvserver/asim/asim_test.go +++ b/pkg/kv/kvserver/asim/asim_test.go @@ -116,16 +116,16 @@ func TestWorkloadApply(t *testing.T) { // Assert that the leaseholder replica load correctly matches the number of // requests made. - require.Equal(t, int64(100), r1.Leaseholder.ReplicaLoad.ReadKeys) - require.Equal(t, int64(1000), r2.Leaseholder.ReplicaLoad.ReadKeys) - require.Equal(t, int64(10000), r3.Leaseholder.ReplicaLoad.ReadKeys) + require.Equal(t, float64(100), r1.Leaseholder.ReplicaLoad.Load().QueriesPerSecond) + require.Equal(t, float64(1000), r2.Leaseholder.ReplicaLoad.Load().QueriesPerSecond) + require.Equal(t, float64(10000), r3.Leaseholder.ReplicaLoad.Load().QueriesPerSecond) - expectedLoad := asim.StoreLoad{ReadKeys: 100, LeaseCount: 1, RangeCount: 1} + expectedLoad := roachpb.StoreCapacity{QueriesPerSecond: 100, LeaseCount: 1, RangeCount: 1} // Assert that the store load is also updated upon request GetStoreLoad. - require.Equal(t, expectedLoad, s.Nodes[n1].Stores[0].GetStoreLoad()) - expectedLoad.ReadKeys *= 10 - require.Equal(t, expectedLoad, s.Nodes[n2].Stores[0].GetStoreLoad()) - expectedLoad.ReadKeys *= 10 - require.Equal(t, expectedLoad, s.Nodes[n3].Stores[0].GetStoreLoad()) + require.Equal(t, expectedLoad, s.Nodes[n1].Stores[0].Capacity()) + expectedLoad.QueriesPerSecond *= 10 + require.Equal(t, expectedLoad, s.Nodes[n2].Stores[0].Capacity()) + expectedLoad.QueriesPerSecond *= 10 + require.Equal(t, expectedLoad, s.Nodes[n3].Stores[0].Capacity()) } diff --git a/pkg/kv/kvserver/asim/load.go b/pkg/kv/kvserver/asim/load.go new file mode 100644 index 000000000000..e02befc073ab --- /dev/null +++ b/pkg/kv/kvserver/asim/load.go @@ -0,0 +1,79 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package asim + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// ReplicaLoad defines the methods a datastructure is required to perform in +// order to record and report load events. +type ReplicaLoad interface { + // ApplyLoad applies a load event to a replica. + ApplyLoad(le LoadEvent) + // Load translates the recorded load events into usage information of the + // replica. + Load() allocator.RangeUsageInfo +} + +// ReplicaLoadCounter is the sum of all key accesses and size of bytes, both written +// and read. +// TODO(kvoli): In the non-simulated code, replica_stats currently maintains +// this structure, which is rated. This datastructure needs to be adapated by +// the user to be rated over time. In the future we should introduce a better +// general pupose stucture that enables rating. +type ReplicaLoadCounter struct { + WriteKeys int64 + WriteBytes int64 + ReadKeys int64 + ReadBytes int64 +} + +// ApplyLoad applies a load event onto a replica load counter. +func (rl *ReplicaLoadCounter) ApplyLoad(le LoadEvent) { + if le.isWrite { + rl.WriteBytes += le.size + rl.WriteKeys++ + } else { + rl.ReadBytes += le.size + rl.ReadKeys++ + } +} + +// Load translates the recorded key accesses and size into range usage +// information. +func (rl *ReplicaLoadCounter) Load() allocator.RangeUsageInfo { + return allocator.RangeUsageInfo{ + LogicalBytes: rl.WriteBytes, + QueriesPerSecond: float64(rl.WriteKeys + rl.ReadKeys), + WritesPerSecond: float64(rl.WriteKeys), + } +} + +// Capacity returns the current store capacity. It aggregates the load from each +// replica within the store. +func (s *Store) Capacity() roachpb.StoreCapacity { + // TODO(kvoli,lidorcarmel): Store capacity will need to be populated with + // the following missing fields: capacity, available, used, l0sublevels, + // bytesperreplica, writesperreplica. + capacity := roachpb.StoreCapacity{} + for _, repl := range s.Replicas { + if repl.leaseHolder { + capacity.LeaseCount++ + } + capacity.RangeCount++ + capacity.QueriesPerSecond += repl.ReplicaLoad.Load().QueriesPerSecond + capacity.WritesPerSecond += repl.ReplicaLoad.Load().WritesPerSecond + capacity.LogicalBytes += repl.ReplicaLoad.Load().LogicalBytes + } + return capacity +}