Skip to content

Commit

Permalink
admission,kvserver: broadcast per-store IO overload status
Browse files Browse the repository at this point in the history
The plan for cockroachdb#79215 is to have stores not send to followers that are
experiencing IO overload, unless this is necessary for quorum.

This commit teaches admission control to relay the current status of the
IO overload signals to the Store, which in turn gossips it. We add a
type `IOThreshold` that encapsulates the input signals to I/O admission
control to present them as a unified score, which hopefully leads to
less ad-hoc use of the underlying signals.

Together, this paves the way for using the signal to inform distribution
decisions (cockroachdb#83490) and (bluntly) shape raft traffic (cockroachdb#79215).

Touches cockroachdb#79215.
Touches cockroachdb#77604.
Touches cockroachdb#82611.
Touches cockroachdb#83490.

Release note: None
  • Loading branch information
tbg committed Jul 6, 2022
1 parent 7cfa5d4 commit aece96b
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 55 deletions.
1 change: 1 addition & 0 deletions docs/generated/http/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ genrule(
"//pkg/ts/tspb:tspb_proto",
"//pkg/util/duration:duration_proto",
"//pkg/util/hlc:hlc_proto",
"//pkg/util/admission/admissionpb:admissionpb_proto",
"//pkg/util/log/logpb:logpb_proto",
"//pkg/util/metric:metric_proto",
"//pkg/util/timeutil/pgdate:pgdate_proto",
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ PROTOBUF_SRCS = [
"//pkg/testutils/grpcutils:grpcutils_go_proto",
"//pkg/ts/catalog:catalog_go_proto",
"//pkg/ts/tspb:tspb_go_proto",
"//pkg/util/admission/admissionpb:admissionpb_go_proto",
"//pkg/util/duration:duration_go_proto",
"//pkg/util/hlc:hlc_go_proto",
"//pkg/util/log/eventpb:eventpb_go_proto",
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,10 @@ type Store struct {
syncutil.Mutex
roachpb.StoreCapacity
}
ioThreshold struct {
syncutil.Mutex
t *admissionpb.IOThreshold // never nil
}

counts struct {
// Number of placeholders removed due to error. Not a good fit for meaningful
Expand Down Expand Up @@ -1170,6 +1174,7 @@ func NewStore(
metrics: newStoreMetrics(cfg.HistogramWindowInterval),
ctSender: cfg.ClosedTimestampSender,
}
s.ioThreshold.t = &admissionpb.IOThreshold{}
if cfg.RPCContext != nil {
s.allocator = allocatorimpl.MakeAllocator(cfg.StorePool, cfg.RPCContext.RemoteClocks.Latency, cfg.TestingKnobs.AllocatorKnobs)
} else {
Expand Down Expand Up @@ -2510,6 +2515,13 @@ func (s *Store) asyncGossipStore(ctx context.Context, reason string, useCached b
}
}

// UpdateIOThreshold updates the IOThreshold reported in the StoreDescriptor.
func (s *Store) UpdateIOThreshold(ioThreshold *admissionpb.IOThreshold) {
s.ioThreshold.Lock()
defer s.ioThreshold.Unlock()
s.ioThreshold.t = ioThreshold
}

// GossipStore broadcasts the store on the gossip network.
func (s *Store) GossipStore(ctx context.Context, useCached bool) error {
// Temporarily indicate that we're gossiping the store capacity to avoid
Expand Down Expand Up @@ -2943,7 +2955,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa

capacity, err := s.engine.Capacity()
if err != nil {
return capacity, err
return roachpb.StoreCapacity{}, err
}

now := s.cfg.Clock.NowAsClockTimestamp()
Expand Down Expand Up @@ -2994,6 +3006,11 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
capacity.QueriesPerSecond = totalQueriesPerSecond
capacity.WritesPerSecond = totalWritesPerSecond
capacity.L0Sublevels = l0SublevelsMax
{
s.ioThreshold.Lock()
capacity.IOThreshold = *s.ioThreshold.t
s.ioThreshold.Unlock()
}
capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica)
capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica)
s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond)
Expand Down
2 changes: 2 additions & 0 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ proto_library(
"//pkg/settings:settings_proto",
"//pkg/storage/enginepb:enginepb_proto",
"//pkg/util:util_proto",
"//pkg/util/admission/admissionpb:admissionpb_proto",
"//pkg/util/hlc:hlc_proto",
"//pkg/util/tracing/tracingpb:tracingpb_proto",
"@com_github_cockroachdb_errors//errorspb:errorspb_proto",
Expand All @@ -164,6 +165,7 @@ go_proto_library(
"//pkg/settings",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/hlc",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//errorspb",
Expand Down
4 changes: 2 additions & 2 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,11 @@ func (sc StoreCapacity) String() string {
func (sc StoreCapacity) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk (capacity=%s, available=%s, used=%s, logicalBytes=%s), "+
"ranges=%d, leases=%d, queries=%.2f, writes=%.2f, "+
"l0Sublevels=%d, bytesPerReplica={%s}, writesPerReplica={%s}",
"l0Sublevels=%d, ioThreshold={%v} bytesPerReplica={%s}, writesPerReplica={%s}",
humanizeutil.IBytes(sc.Capacity), humanizeutil.IBytes(sc.Available),
humanizeutil.IBytes(sc.Used), humanizeutil.IBytes(sc.LogicalBytes),
sc.RangeCount, sc.LeaseCount, sc.QueriesPerSecond, sc.WritesPerSecond,
sc.L0Sublevels, sc.BytesPerReplica, sc.WritesPerReplica)
sc.L0Sublevels, sc.IOThreshold, sc.BytesPerReplica, sc.WritesPerReplica)
}

// FractionUsed computes the fraction of storage capacity that is in use.
Expand Down
2 changes: 2 additions & 0 deletions pkg/roachpb/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ option go_package = "roachpb";

import "util/unresolved_addr.proto";
import "util/hlc/timestamp.proto";
import "util/admission/admissionpb/io_threshold.proto";
import "gogoproto/gogo.proto";

// Attributes specifies a list of arbitrary strings describing
Expand Down Expand Up @@ -335,6 +336,7 @@ message StoreCapacity {
// instances where overlapping node-binary versions within a cluster result
// in this this field missing.
optional int64 l0_sublevels = 12 [(gogoproto.nullable) = false];
optional cockroach.util.admission.admissionpb.IOThreshold io_threshold = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IOThreshold" ];
// bytes_per_replica and writes_per_replica contain percentiles for the
// number of bytes and writes-per-second to each replica in the store.
// This information can be used for rebalancing decisions.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ go_library(
"//pkg/upgrade/upgrademanager",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/contextutil",
"//pkg/util/envutil",
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -769,6 +770,16 @@ func (n *Node) computePeriodicMetrics(ctx context.Context, tick int) error {
})
}

// UpdateIOThreshold relays the supplied IOThreshold to the same method on the
// designated Store.
func (n *Node) UpdateIOThreshold(id roachpb.StoreID, threshold *admissionpb.IOThreshold) {
s, err := n.stores.GetStore(id)
if err != nil {
log.Errorf(n.AnnotateCtx(context.Background()), "%v", err)
}
s.UpdateIOThreshold(threshold)
}

// GetPebbleMetrics implements admission.PebbleMetricsProvider.
func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
var metrics []admission.StoreMetrics
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// existing stores shouldn’t be able to acquire leases yet. Although, below
// Raft commands like log application and snapshot application may be able
// to bypass admission control.
s.storeGrantCoords.SetPebbleMetricsProvider(ctx, s.node)
s.storeGrantCoords.SetPebbleMetricsProvider(ctx, s.node, s.node)

// Once all stores are initialized, check if offline storage recovery
// was done prior to start and record any actions appropriately.
Expand Down
25 changes: 25 additions & 0 deletions pkg/util/admission/admissionpb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,11 +1,36 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "admissionpb",
srcs = [
"admissionpb.go",
"doc.go",
"io_threshold.go",
],
embed = [":admissionpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb",
visibility = ["//visibility:public"],
deps = [
"@com_github_cockroachdb_redact//:redact",
"@com_github_cockroachdb_redact//interfaces",
],
)

proto_library(
name = "admissionpb_proto",
srcs = ["io_threshold.proto"],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"],
)

go_proto_library(
name = "admissionpb_go_proto",
compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb",
proto = ":admissionpb_proto",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto"],
)
51 changes: 51 additions & 0 deletions pkg/util/admission/admissionpb/io_threshold.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package admissionpb

import (
"math"

"github.com/cockroachdb/redact"
"github.com/cockroachdb/redact/interfaces"
)

// Score returns, as the second return value, whether IO admission control is
// considering the Store overloaded. The first return value is a 1-normalized
// float (i.e. 1.0 is the threshold at which the second value flips to true).
//
// The zero value returns (0, false). Use of the nil pointer is not allowed.
func (iot IOThreshold) Score() (float64, bool) {
if iot == (IOThreshold{}) {
return 0, false
}
f := math.Max(
float64(iot.L0NumFiles)/float64(iot.L0NumFilesThreshold),
float64(iot.L0NumSubLevels)/float64(iot.L0NumSubLevelsThreshold),
)
return f, f > 1.0
}

// SafeFormat implements redact.SafeFormatter.
func (iot IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) {
if iot == (IOThreshold{}) {
s.Printf("N/A")
}
sc, overload := iot.Score()
s.Printf("%.3f", redact.SafeFloat(sc))
if overload {
s.Printf("[overload]")
}
}

func (iot IOThreshold) String() string {
return redact.StringWithoutMarkers(iot)
}
26 changes: 26 additions & 0 deletions pkg/util/admission/admissionpb/io_threshold.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

syntax = "proto3";
package cockroach.util.admission.admissionpb;
option go_package = "admissionpb";

import "gogoproto/gogo.proto";

// IOThreshold wraps the raw signals that IO admission control utilizes to determine
// when to introduce queueing.
message IOThreshold {
option (gogoproto.goproto_stringer) = false;

int64 l0_num_sub_levels = 1;
int64 l0_num_sub_levels_threshold = 2;
int64 l0_num_files = 3;
int64 l0_num_files_threshold = 4;
}
Loading

0 comments on commit aece96b

Please sign in to comment.