Skip to content

Commit

Permalink
asim: replica load adapter
Browse files Browse the repository at this point in the history
This patch introduces an adapter for the simulator state of load on
stores and replicas, into the datastructures supported by the allocator.

Replica load may be experimented with in the future and this patch
defines an interface for interacting with recording and summarizing
replica load.

Store load is partially an aggregation of replica load and
therefore relies on the replica load interface to calculate capacity.

Release note: None
  • Loading branch information
kvoli committed May 17, 2022
1 parent f15df2d commit 6152275
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 71 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/asim/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ go_library(
srcs = [
"asim.go",
"config_loader.go",
"load.go",
"workload.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/roachpb",
"//pkg/util/timeutil",
Expand Down
66 changes: 4 additions & 62 deletions pkg/kv/kvserver/asim/asim.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,30 +105,6 @@ func (rm *RangeMap) GetRange(key string) *Range {
return rng
}

// ReplicaLoad is the sum of all key accesses and size of bytes, both written
// and read.
// TODO(kvoli): In the non-simulated code, replica_stats currently maintains
// this structure, which is rated. This datastructure needs to be adapated by
// the user to be rated over time. In the future we should introduce a better
// general pupose stucture that enables rating.
type ReplicaLoad struct {
WriteKeys int64
WriteBytes int64
ReadKeys int64
ReadBytes int64
}

// applyReplicaLoad applies a load event onto a replica.
func (r *Replica) applyReplicaLoad(le LoadEvent) {
if le.isWrite {
r.ReplicaLoad.WriteBytes += le.size
r.ReplicaLoad.WriteKeys++
} else {
r.ReplicaLoad.ReadBytes += le.size
r.ReplicaLoad.ReadKeys++
}
}

// Replica represents a replica of a range.
type Replica struct {
spanConf *roachpb.SpanConfig
Expand Down Expand Up @@ -221,7 +197,7 @@ func (s *State) AddReplica(r *Range, node int) int {
spanConf: &roachpb.SpanConfig{},
rangeDesc: r.Desc,
replDesc: &desc,
ReplicaLoad: ReplicaLoad{},
ReplicaLoad: &ReplicaLoadCounter{},
}

store := nodeImpl.Stores[0]
Expand All @@ -247,38 +223,6 @@ func (s *State) AddReplica(r *Range, node int) int {
return int(desc.ReplicaID)
}

// StoreLoad represents the current load of the store.
type StoreLoad struct {
WriteKeys int64
WriteBytes int64
ReadKeys int64
ReadBytes int64
RangeCount int64
LeaseCount int64
}

// GetStoreLoad returns the current store load. The values in Write(Read)
// Bytes(Keys) represent the aggregation across all leaseholder replicas on
// this store. These values are not rated, instead they are counters.
// TODO(kvoli): We should separate out recording of workload against
// replicas and stores into a separate file. The internal capture datastructure
// is less important. Then define an interface that transforms the recorded
// load into store descriptors and range usage info.
func (s *Store) GetStoreLoad() StoreLoad {
storeLoad := StoreLoad{}
for _, repl := range s.Replicas {
if repl.leaseHolder {
storeLoad.LeaseCount++
}
storeLoad.RangeCount++
storeLoad.ReadKeys += repl.ReplicaLoad.ReadKeys
storeLoad.WriteKeys += repl.ReplicaLoad.WriteKeys
storeLoad.WriteBytes += repl.ReplicaLoad.WriteBytes
storeLoad.ReadBytes += repl.ReplicaLoad.ReadBytes
}
return storeLoad
}

// ApplyAllocatorAction updates the state with allocator ops such as
// moving/adding/removing replicas.
func (s *State) ApplyAllocatorAction(
Expand All @@ -292,14 +236,12 @@ func (s *State) ApplyAllocatorAction(
// written and therefore reads never fail.
func (s *State) ApplyLoad(ctx context.Context, le LoadEvent) {
r := s.Ranges.GetRange(fmt.Sprintf("%d", le.Key))

// Apply the load event to the leaseholder replica of the range this key is contained in.
//
//NB: Reads may occur in practice across follower replicas, however these
// NB: Reads may occur in practice across follower replicas, however these
// statistics are not currently considered in allocation decisions.
// Initially we ignore workload on followers.
// TODO(kvoli): Apply write/read load to followers.
r.Leaseholder.applyReplicaLoad(le)
leaseHolder := r.Leaseholder
leaseHolder.ReplicaLoad.ApplyLoad(le)
}

func shouldRun(time.Time) bool {
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/asim/asim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,16 @@ func TestWorkloadApply(t *testing.T) {

// Assert that the leaseholder replica load correctly matches the number of
// requests made.
require.Equal(t, int64(100), r1.Leaseholder.ReplicaLoad.ReadKeys)
require.Equal(t, int64(1000), r2.Leaseholder.ReplicaLoad.ReadKeys)
require.Equal(t, int64(10000), r3.Leaseholder.ReplicaLoad.ReadKeys)
require.Equal(t, float64(100), r1.Leaseholder.ReplicaLoad.Load().QueriesPerSecond)
require.Equal(t, float64(1000), r2.Leaseholder.ReplicaLoad.Load().QueriesPerSecond)
require.Equal(t, float64(10000), r3.Leaseholder.ReplicaLoad.Load().QueriesPerSecond)

expectedLoad := asim.StoreLoad{ReadKeys: 100, LeaseCount: 1, RangeCount: 1}
expectedLoad := roachpb.StoreCapacity{QueriesPerSecond: 100, LeaseCount: 1, RangeCount: 1}

// Assert that the store load is also updated upon request GetStoreLoad.
require.Equal(t, expectedLoad, s.Nodes[n1].Stores[0].GetStoreLoad())
expectedLoad.ReadKeys *= 10
require.Equal(t, expectedLoad, s.Nodes[n2].Stores[0].GetStoreLoad())
expectedLoad.ReadKeys *= 10
require.Equal(t, expectedLoad, s.Nodes[n3].Stores[0].GetStoreLoad())
require.Equal(t, expectedLoad, s.Nodes[n1].Stores[0].Capacity())
expectedLoad.QueriesPerSecond *= 10
require.Equal(t, expectedLoad, s.Nodes[n2].Stores[0].Capacity())
expectedLoad.QueriesPerSecond *= 10
require.Equal(t, expectedLoad, s.Nodes[n3].Stores[0].Capacity())
}
79 changes: 79 additions & 0 deletions pkg/kv/kvserver/asim/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package asim

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// ReplicaLoad defines the methods a datastructure is required to perform in
// order to record and report load events.
type ReplicaLoad interface {
// ApplyLoad applies a load event to a replica.
ApplyLoad(le LoadEvent)
// Load translates the recorded load events into usage information of the
// replica.
Load() allocator.RangeUsageInfo
}

// ReplicaLoadCounter is the sum of all key accesses and size of bytes, both written
// and read.
// TODO(kvoli): In the non-simulated code, replica_stats currently maintains
// this structure, which is rated. This datastructure needs to be adapated by
// the user to be rated over time. In the future we should introduce a better
// general pupose stucture that enables rating.
type ReplicaLoadCounter struct {
WriteKeys int64
WriteBytes int64
ReadKeys int64
ReadBytes int64
}

// ApplyLoad applies a load event onto a replica load counter.
func (rl *ReplicaLoadCounter) ApplyLoad(le LoadEvent) {
if le.isWrite {
rl.WriteBytes += le.size
rl.WriteKeys++
} else {
rl.ReadBytes += le.size
rl.ReadKeys++
}
}

// Load translates the recorded key accesses and size into range usage
// information.
func (rl *ReplicaLoadCounter) Load() allocator.RangeUsageInfo {
return allocator.RangeUsageInfo{
LogicalBytes: rl.WriteBytes,
QueriesPerSecond: float64(rl.WriteKeys + rl.ReadKeys),
WritesPerSecond: float64(rl.WriteKeys),
}
}

// Capacity returns the current store capacity. It aggregates the load from each
// replica within the store.
func (s *Store) Capacity() roachpb.StoreCapacity {
// TODO(kvoli,lidorcarmel): Store capacity will need to be populated with
// the following missing fields: capacity, available, used, l0sublevels,
// bytesperreplica, writesperreplica.
capacity := roachpb.StoreCapacity{}
for _, repl := range s.Replicas {
if repl.leaseHolder {
capacity.LeaseCount++
}
capacity.RangeCount++
capacity.QueriesPerSecond += repl.ReplicaLoad.Load().QueriesPerSecond
capacity.WritesPerSecond += repl.ReplicaLoad.Load().WritesPerSecond
capacity.LogicalBytes += repl.ReplicaLoad.Load().LogicalBytes
}
return capacity
}

0 comments on commit 6152275

Please sign in to comment.