Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#83953

83720: admission,kvserver: broadcast per-store IO overload status r=kvoli a=tbg

The plan for cockroachdb#79215 is to have stores not send to followers that are
experiencing IO overload, unless this is necessary for quorum.

This commit teaches admission control to relay the current status of the
IO overload signals to the Store, which in turn gossips it. We add a
type `IOThreshold` that encapsulates the input signals to I/O admission
control to present them as a unified score, which hopefully leads to
less ad-hoc use of the underlying signals.

Together, this paves the way for using the signal to inform distribution
decisions (cockroachdb#83490) and (bluntly) shape raft traffic (cockroachdb#79215).

Touches cockroachdb#79215.
Touches cockroachdb#77604.
Touches cockroachdb#82611.
Touches cockroachdb#83490.

Release note: None

83732: sql: move NullableArgs function property to overload level r=chengxiong-ruan a=chengxiong-ruan

Currently we only have builtins, and that all overloads of a
same function name share function properties. However, we're
going to support user defined functions whose properties can
vary as how users define them. So we need to move any property
that's relevant to UDF to overload level. Currently, `NullableArgs`
is the only one matters.

Release note: None.

83938: sql: add identifiers to sampled query r=THardy98 a=THardy98

Partially addresses: cockroachdb#71328

This change introduces identifiers into the sampled query log, namely:
- Database name
- Session ID
- Transaction ID
- Statement ID

Adding transaction ID incurs an additional lock access, the difference in performance is negligible. Results after running `kv95` on a 3 node GCE cluster using roachprod:
```
setup:

./workload init kv --splits 1000 --read-percent 95
./workload run kv --read-percent 95 --concurrency 64 --sequential --duration 30m
```

```
master:

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)__total
 1800.0s        0       29725479        16514.2      3.4      2.8      8.9     16.8    104.9  read

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)__total
 1800.0s        0        1564158          869.0      8.5      8.1     15.7     26.2    100.7  write

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)__result
 1800.0s        0       31289637        17383.1      3.7      2.9     10.0     17.8    104.9
```

```
enrich_telemetry_add_identifiers

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)__total
 1800.0s        0       29635045        16463.9      3.4      2.8      8.9     16.8    117.4  read

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)__total
 1800.0s        0        1561022          867.2      8.5      7.9     15.7     26.2    113.2  write

_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)__result
 1800.0s        0       31196067        17331.1      3.7      2.9     10.0     17.8    117.4
```


Release note (sql change): Sampled query telemetry log now includes
session/transaction/statement IDs, and database name of the query.

83953: reducesql: reduce index STORING columns r=mgartner a=mgartner

The `reduce` tool can now reduce `STORING` columns in indexes and unique
constraints.

Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Chengxiong Ruan <[email protected]>
Co-authored-by: Thomas Hardy <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
5 people committed Jul 7, 2022
5 parents bce2267 + aece96b + 7e6d300 + eaeb52f + 8be4bb7 commit e67e47f
Show file tree
Hide file tree
Showing 38 changed files with 787 additions and 533 deletions.
4 changes: 4 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2459,6 +2459,10 @@ contains common SQL event/execution details.
| `CostEstimate` | Cost of the query as estimated by the optimizer. | no |
| `Distribution` | The distribution of the DistSQL query plan (local, full, or partial). | no |
| `PlanGist` | The query's plan gist bytes as a base64 encoded string. | no |
| `SessionID` | SessionID is the ID of the session that initiated the query. | no |
| `Database` | Name of the database that initiated the query. | no |
| `StatementID` | Statement ID of the query. | no |
| `TransactionID` | Transaction ID of the query. | no |


#### Common fields
Expand Down
1 change: 1 addition & 0 deletions docs/generated/http/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ genrule(
"//pkg/ts/tspb:tspb_proto",
"//pkg/util/duration:duration_proto",
"//pkg/util/hlc:hlc_proto",
"//pkg/util/admission/admissionpb:admissionpb_proto",
"//pkg/util/log/logpb:logpb_proto",
"//pkg/util/metric:metric_proto",
"//pkg/util/timeutil/pgdate:pgdate_proto",
Expand Down
49 changes: 33 additions & 16 deletions pkg/cmd/reduce/reduce/reducesql/reducesql.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var SQLPasses = []reduce.Pass{
removeCreateNullDefs,
removeIndexCols,
removeIndexPredicate,
removeIndexStoringCols,
removeWindowPartitions,
removeDBSchema,
removeFroms,
Expand Down Expand Up @@ -496,31 +497,31 @@ var (
switch node := node.(type) {
case *tree.Delete:
n := len(node.OrderBy)
if xfi < len(node.OrderBy) {
if xfi < n {
node.OrderBy = append(node.OrderBy[:xfi], node.OrderBy[xfi+1:]...)
}
return n
case *tree.FuncExpr:
n := len(node.OrderBy)
if xfi < len(node.OrderBy) {
if xfi < n {
node.OrderBy = append(node.OrderBy[:xfi], node.OrderBy[xfi+1:]...)
}
return n
case *tree.Select:
n := len(node.OrderBy)
if xfi < len(node.OrderBy) {
if xfi < n {
node.OrderBy = append(node.OrderBy[:xfi], node.OrderBy[xfi+1:]...)
}
return n
case *tree.Update:
n := len(node.OrderBy)
if xfi < len(node.OrderBy) {
if xfi < n {
node.OrderBy = append(node.OrderBy[:xfi], node.OrderBy[xfi+1:]...)
}
return n
case *tree.WindowDef:
n := len(node.OrderBy)
if xfi < len(node.OrderBy) {
if xfi < n {
node.OrderBy = append(node.OrderBy[:xfi], node.OrderBy[xfi+1:]...)
}
return n
Expand All @@ -544,7 +545,7 @@ var (
switch node := node.(type) {
case *tree.SelectClause:
n := len(node.GroupBy)
if xfi < len(node.GroupBy) {
if xfi < n {
node.GroupBy = append(node.GroupBy[:xfi], node.GroupBy[xfi+1:]...)
}
return n
Expand All @@ -568,7 +569,7 @@ var (
switch node := node.(type) {
case *tree.SelectClause:
n := len(node.Exprs)
if xfi < len(node.Exprs) {
if xfi < n {
node.Exprs = append(node.Exprs[:xfi], node.Exprs[xfi+1:]...)
}
return n
Expand All @@ -590,7 +591,7 @@ var (
break
}
n := len(clause.Exprs)
if xfi < len(clause.Exprs) {
if xfi < n {
node.Name.Cols = append(node.Name.Cols[:xfi], node.Name.Cols[xfi+1:]...)
clause.Exprs = append(clause.Exprs[:xfi], clause.Exprs[xfi+1:]...)
}
Expand Down Expand Up @@ -688,7 +689,7 @@ var (
break
}
n := len(clause.Exprs)
if xfi < len(clause.Exprs) {
if xfi < n {
node.As.Cols = append(node.As.Cols[:xfi], node.As.Cols[xfi+1:]...)
clause.Exprs = append(clause.Exprs[:xfi], clause.Exprs[xfi+1:]...)
}
Expand Down Expand Up @@ -747,7 +748,7 @@ var (
switch node := node.(type) {
case *tree.CreateTable:
n := len(node.Defs)
if xfi < len(node.Defs) {
if xfi < n {
node.Defs = append(node.Defs[:xfi], node.Defs[xfi+1:]...)
}
return n
Expand Down Expand Up @@ -785,7 +786,7 @@ var (
removeIndexCols = walkSQL("remove INDEX cols", func(xfi int, node interface{}) int {
removeCol := func(idx *tree.IndexTableDef) int {
n := len(idx.Columns)
if xfi < len(idx.Columns) {
if xfi < n {
idx.Columns = append(idx.Columns[:xfi], idx.Columns[xfi+1:]...)
}
return n
Expand All @@ -811,11 +812,27 @@ var (
}
return 0
})
removeIndexStoringCols = walkSQL("remove INDEX STORING cols", func(xfi int, node interface{}) int {
removeStoringCol := func(idx *tree.IndexTableDef) int {
n := len(idx.Storing)
if xfi < n {
idx.Storing = append(idx.Storing[:xfi], idx.Storing[xfi+1:]...)
}
return n
}
switch node := node.(type) {
case *tree.IndexTableDef:
return removeStoringCol(node)
case *tree.UniqueConstraintTableDef:
return removeStoringCol(&node.IndexTableDef)
}
return 0
})
removeWindowPartitions = walkSQL("remove WINDOW partitions", func(xfi int, node interface{}) int {
switch node := node.(type) {
case *tree.WindowDef:
n := len(node.Partitions)
if xfi < len(node.Partitions) {
if xfi < n {
node.Partitions = append(node.Partitions[:xfi], node.Partitions[xfi+1:]...)
}
return n
Expand All @@ -826,7 +843,7 @@ var (
switch node := node.(type) {
case *tree.ValuesClause:
n := len(node.Rows)
if xfi < len(node.Rows) {
if xfi < n {
node.Rows = append(node.Rows[:xfi], node.Rows[xfi+1:]...)
}
return n
Expand All @@ -837,7 +854,7 @@ var (
switch node := node.(type) {
case *tree.With:
n := len(node.CTEList)
if xfi < len(node.CTEList) {
if xfi < n {
node.CTEList = append(node.CTEList[:xfi], node.CTEList[xfi+1:]...)
}
return n
Expand All @@ -861,7 +878,7 @@ var (
switch node := node.(type) {
case *tree.SelectClause:
n := len(node.From.Tables)
if xfi < len(node.From.Tables) {
if xfi < n {
node.From.Tables = append(node.From.Tables[:xfi], node.From.Tables[xfi+1:]...)
}
return n
Expand Down Expand Up @@ -1024,7 +1041,7 @@ var (
return 1, nil
case *tree.With:
n := len(node.CTEList)
if xfi < len(node.CTEList) {
if xfi < n {
return n, node.CTEList[xfi].Stmt
}
return n, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ PROTOBUF_SRCS = [
"//pkg/testutils/grpcutils:grpcutils_go_proto",
"//pkg/ts/catalog:catalog_go_proto",
"//pkg/ts/tspb:tspb_go_proto",
"//pkg/util/admission/admissionpb:admissionpb_go_proto",
"//pkg/util/duration:duration_go_proto",
"//pkg/util/hlc:hlc_go_proto",
"//pkg/util/log/eventpb:eventpb_go_proto",
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,10 @@ type Store struct {
syncutil.Mutex
roachpb.StoreCapacity
}
ioThreshold struct {
syncutil.Mutex
t *admissionpb.IOThreshold // never nil
}

counts struct {
// Number of placeholders removed due to error. Not a good fit for meaningful
Expand Down Expand Up @@ -1170,6 +1174,7 @@ func NewStore(
metrics: newStoreMetrics(cfg.HistogramWindowInterval),
ctSender: cfg.ClosedTimestampSender,
}
s.ioThreshold.t = &admissionpb.IOThreshold{}
if cfg.RPCContext != nil {
s.allocator = allocatorimpl.MakeAllocator(cfg.StorePool, cfg.RPCContext.RemoteClocks.Latency, cfg.TestingKnobs.AllocatorKnobs)
} else {
Expand Down Expand Up @@ -2510,6 +2515,13 @@ func (s *Store) asyncGossipStore(ctx context.Context, reason string, useCached b
}
}

// UpdateIOThreshold updates the IOThreshold reported in the StoreDescriptor.
func (s *Store) UpdateIOThreshold(ioThreshold *admissionpb.IOThreshold) {
s.ioThreshold.Lock()
defer s.ioThreshold.Unlock()
s.ioThreshold.t = ioThreshold
}

// GossipStore broadcasts the store on the gossip network.
func (s *Store) GossipStore(ctx context.Context, useCached bool) error {
// Temporarily indicate that we're gossiping the store capacity to avoid
Expand Down Expand Up @@ -2943,7 +2955,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa

capacity, err := s.engine.Capacity()
if err != nil {
return capacity, err
return roachpb.StoreCapacity{}, err
}

now := s.cfg.Clock.NowAsClockTimestamp()
Expand Down Expand Up @@ -2994,6 +3006,11 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
capacity.QueriesPerSecond = totalQueriesPerSecond
capacity.WritesPerSecond = totalWritesPerSecond
capacity.L0Sublevels = l0SublevelsMax
{
s.ioThreshold.Lock()
capacity.IOThreshold = *s.ioThreshold.t
s.ioThreshold.Unlock()
}
capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica)
capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica)
s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond)
Expand Down
2 changes: 2 additions & 0 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ proto_library(
"//pkg/settings:settings_proto",
"//pkg/storage/enginepb:enginepb_proto",
"//pkg/util:util_proto",
"//pkg/util/admission/admissionpb:admissionpb_proto",
"//pkg/util/hlc:hlc_proto",
"//pkg/util/tracing/tracingpb:tracingpb_proto",
"@com_github_cockroachdb_errors//errorspb:errorspb_proto",
Expand All @@ -164,6 +165,7 @@ go_proto_library(
"//pkg/settings",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/hlc",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//errorspb",
Expand Down
4 changes: 2 additions & 2 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,11 @@ func (sc StoreCapacity) String() string {
func (sc StoreCapacity) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk (capacity=%s, available=%s, used=%s, logicalBytes=%s), "+
"ranges=%d, leases=%d, queries=%.2f, writes=%.2f, "+
"l0Sublevels=%d, bytesPerReplica={%s}, writesPerReplica={%s}",
"l0Sublevels=%d, ioThreshold={%v} bytesPerReplica={%s}, writesPerReplica={%s}",
humanizeutil.IBytes(sc.Capacity), humanizeutil.IBytes(sc.Available),
humanizeutil.IBytes(sc.Used), humanizeutil.IBytes(sc.LogicalBytes),
sc.RangeCount, sc.LeaseCount, sc.QueriesPerSecond, sc.WritesPerSecond,
sc.L0Sublevels, sc.BytesPerReplica, sc.WritesPerReplica)
sc.L0Sublevels, sc.IOThreshold, sc.BytesPerReplica, sc.WritesPerReplica)
}

// FractionUsed computes the fraction of storage capacity that is in use.
Expand Down
2 changes: 2 additions & 0 deletions pkg/roachpb/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ option go_package = "roachpb";

import "util/unresolved_addr.proto";
import "util/hlc/timestamp.proto";
import "util/admission/admissionpb/io_threshold.proto";
import "gogoproto/gogo.proto";

// Attributes specifies a list of arbitrary strings describing
Expand Down Expand Up @@ -335,6 +336,7 @@ message StoreCapacity {
// instances where overlapping node-binary versions within a cluster result
// in this this field missing.
optional int64 l0_sublevels = 12 [(gogoproto.nullable) = false];
optional cockroach.util.admission.admissionpb.IOThreshold io_threshold = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IOThreshold" ];
// bytes_per_replica and writes_per_replica contain percentiles for the
// number of bytes and writes-per-second to each replica in the store.
// This information can be used for rebalancing decisions.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ go_library(
"//pkg/upgrade/upgrademanager",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/contextutil",
"//pkg/util/envutil",
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -769,6 +770,16 @@ func (n *Node) computePeriodicMetrics(ctx context.Context, tick int) error {
})
}

// UpdateIOThreshold relays the supplied IOThreshold to the same method on the
// designated Store.
func (n *Node) UpdateIOThreshold(id roachpb.StoreID, threshold *admissionpb.IOThreshold) {
s, err := n.stores.GetStore(id)
if err != nil {
log.Errorf(n.AnnotateCtx(context.Background()), "%v", err)
}
s.UpdateIOThreshold(threshold)
}

// GetPebbleMetrics implements admission.PebbleMetricsProvider.
func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
var metrics []admission.StoreMetrics
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// existing stores shouldn’t be able to acquire leases yet. Although, below
// Raft commands like log application and snapshot application may be able
// to bypass admission control.
s.storeGrantCoords.SetPebbleMetricsProvider(ctx, s.node)
s.storeGrantCoords.SetPebbleMetricsProvider(ctx, s.node, s.node)

// Once all stores are initialized, check if offline storage recovery
// was done prior to start and record any actions appropriately.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/builtin_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (b *defaultBuiltinFuncOperator) Next() coldata.Batch {
err error
)
// Some functions cannot handle null arguments.
if hasNulls && !b.funcExpr.CanHandleNulls() {
if hasNulls && !b.funcExpr.ResolvedOverload().NullableArgs {
res = tree.DNull
} else {
res, err = b.funcExpr.ResolvedOverload().
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ func (p *planner) maybeLogStatementInternal(
CostEstimate: p.curPlan.instrumentation.costEstimate,
Distribution: p.curPlan.instrumentation.distribution.String(),
PlanGist: p.curPlan.instrumentation.planGist.String(),
SessionID: p.extendedEvalCtx.SessionID.String(),
Database: p.CurrentDatabase(),
StatementID: p.stmt.QueryID.String(),
TransactionID: p.txn.ID().String(),
}})
} else {
telemetryMetrics.incSkippedQueryCount()
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/execinfra/execagg/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GetAggregateInfo(
return builtins.NewAnyNotNullAggregate, inputTypes[0], nil
}

props, builtins := builtins.GetBuiltinProperties(strings.ToLower(fn.String()))
_, builtins := builtins.GetBuiltinProperties(strings.ToLower(fn.String()))
for _, b := range builtins {
typs := b.Types.Types()
if len(typs) != len(inputTypes) {
Expand All @@ -47,7 +47,7 @@ func GetAggregateInfo(
match := true
for i, t := range typs {
if !inputTypes[i].Equivalent(t) {
if props.NullableArgs && inputTypes[i].IsAmbiguous() {
if b.NullableArgs && inputTypes[i].IsAmbiguous() {
continue
}
match = false
Expand Down Expand Up @@ -131,7 +131,7 @@ func GetWindowFunctionInfo(
"function is neither an aggregate nor a window function",
)
}
props, builtins := builtins.GetBuiltinProperties(strings.ToLower(funcStr))
_, builtins := builtins.GetBuiltinProperties(strings.ToLower(funcStr))
for _, b := range builtins {
typs := b.Types.Types()
if len(typs) != len(inputTypes) {
Expand All @@ -140,7 +140,7 @@ func GetWindowFunctionInfo(
match := true
for i, t := range typs {
if !inputTypes[i].Equivalent(t) {
if props.NullableArgs && inputTypes[i].IsAmbiguous() {
if b.NullableArgs && inputTypes[i].IsAmbiguous() {
continue
}
match = false
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/memo/constraint_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (cb *constraintsBuilder) buildConstraintForTupleInequality(
func (cb *constraintsBuilder) buildFunctionConstraints(
f *FunctionExpr,
) (_ *constraint.Set, tight bool) {
if f.Properties.NullableArgs {
if f.FunctionPrivate.Overload.NullableArgs {
return unconstrained, false
}

Expand Down
Loading

0 comments on commit e67e47f

Please sign in to comment.