Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: [CRDB-2743] sql: create builltin generator crdb_internal.probe_range #83820

Merged
merged 1 commit into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3101,6 +3101,25 @@ table. Returns an error if validation fails.</p>
</span></td></tr></tbody>
</table>

### TUPLE{INT AS RANGE_ID, STRING AS ERROR, INT AS END_TO_END_LATENCY_MS, STRING AS VERBOSE_TRACE} functions

<table>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th></tr></thead>
<tbody>
<tr><td><a name="crdb_internal.probe_ranges"></a><code>crdb_internal.probe_ranges(timeout: <a href="interval.html">interval</a>, probe_type: unknown_enum) &rarr; tuple{int AS range_id, string AS error, int AS end_to_end_latency_ms, string AS verbose_trace}</code></td><td><span class="funcdesc"><p>Returns rows of range data based on the results received when using the prober.
Parameters
timeout: interval for the maximum time the user wishes the prober to probe a range.
probe_type: enum indicating which kind of probe the prober should conduct (options are read or write).
Example usage
number of failed write probes: select count(1) from crdb_internal.probe_ranges(INTERVAL ‘1000ms’, ‘write’) where error != ‘’;
50 slowest probes: select range_id, error, end_to_end_latency_ms from crdb_internal.probe_ranges(INTERVAL ‘1000ms’, true) order by end_to_end_latency_ms desc limit 50;
Notes
If a probe should fail, the latency will be set to MaxInt64 in order to naturally sort above other latencies.
Read probes are cheaper than write probes. If write probes have already ran, it’s not necessary to also run a read probe.
A write probe will effectively probe reads as well.</p>
</span></td></tr></tbody>
</table>

### UUID functions

<table>
Expand Down
21 changes: 10 additions & 11 deletions pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ type Metrics struct {
ProbePlanFailures *metric.Counter
}

// proberOps is an interface that the prober will use to run ops against some
// proberOpsI is an interface that the prober will use to run ops against some
// system. This interface exists so that ops can be mocked for tests.
type proberOps interface {
type proberOpsI interface {
Read(key interface{}) func(context.Context, *kv.Txn) error
Write(key interface{}) func(context.Context, *kv.Txn) error
}
Expand All @@ -157,12 +157,11 @@ type proberTxn interface {
TxnRootKV(context.Context, func(context.Context, *kv.Txn) error) error
}

// proberOpsImpl is used to probe the kv layer.
type proberOpsImpl struct {
}
// ProberOps collects the methods used to probe the KV layer.
type ProberOps struct{}

// We attempt to commit a txn that reads some data at the key.
func (p *proberOpsImpl) Read(key interface{}) func(context.Context, *kv.Txn) error {
func (p *ProberOps) Read(key interface{}) func(context.Context, *kv.Txn) error {
return func(ctx context.Context, txn *kv.Txn) error {
_, err := txn.Get(ctx, key)
return err
Expand All @@ -176,7 +175,7 @@ func (p *proberOpsImpl) Read(key interface{}) func(context.Context, *kv.Txn) err
// there is no need to clean up data at the key post range split / merge.
// Note that MVCC tombstones may be left by the probe, but this is okay, as
// GC will clean it up.
func (p *proberOpsImpl) Write(key interface{}) func(context.Context, *kv.Txn) error {
func (p *ProberOps) Write(key interface{}) func(context.Context, *kv.Txn) error {
return func(ctx context.Context, txn *kv.Txn) error {
if err := txn.Put(ctx, key, putValue); err != nil {
return err
Expand Down Expand Up @@ -272,10 +271,10 @@ func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) readProbe(ctx context.Context, db *kv.DB, pl planner) {
p.readProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
p.readProbeImpl(ctx, &ProberOps{}, &proberTxnImpl{db: p.db}, pl)
}

func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
func (p *Prober) readProbeImpl(ctx context.Context, ops proberOpsI, txns proberTxn, pl planner) {
if !readEnabled.Get(&p.settings.SV) {
return
}
Expand Down Expand Up @@ -330,10 +329,10 @@ func (p *Prober) readProbeImpl(ctx context.Context, ops proberOps, txns proberTx

// Doesn't return an error. Instead increments error type specific metrics.
func (p *Prober) writeProbe(ctx context.Context, db *kv.DB, pl planner) {
p.writeProbeImpl(ctx, &proberOpsImpl{}, &proberTxnImpl{db: p.db}, pl)
p.writeProbeImpl(ctx, &ProberOps{}, &proberTxnImpl{db: p.db}, pl)
}

func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOps, txns proberTxn, pl planner) {
func (p *Prober) writeProbeImpl(ctx context.Context, ops proberOpsI, txns proberTxn, pl planner) {
if !writeEnabled.Get(&p.settings.SV) {
return
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/generator_probe_ranges
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# LogicTest: local fakedist

query ITIT colnames
SELECT * FROM crdb_internal.probe_ranges(INTERVAL '1000ms', 'read') WHERE range_id < 0
----
range_id error end_to_end_latency_ms verbose_trace

query I
SELECT count(1) FROM crdb_internal.probe_ranges(INTERVAL '1000ms', 'read') WHERE error != ''
----
0

# Test that the trace has a string matching `proposing command` to verify trace events
# from the kvserver write path are received.
query I
SELECT count(1) FROM crdb_internal.probe_ranges(INTERVAL '1000ms', 'write') WHERE range_id = 1 AND verbose_trace LIKE '%proposing command%'
----
1
4 changes: 4 additions & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"all_builtins.go",
"builtins.go",
"generator_builtins.go",
"generator_probe_ranges.go",
"geo_builtins.go",
"math_builtins.go",
"notice.go",
Expand Down Expand Up @@ -37,6 +38,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/kv/kvprober",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security",
Expand All @@ -48,6 +50,7 @@ go_library(
"//pkg/sql/catalog/catconstants",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/colexecerror",
"//pkg/sql/lex",
"//pkg/sql/lexbase",
Expand Down Expand Up @@ -76,6 +79,7 @@ go_library(
"//pkg/util",
"//pkg/util/arith",
"//pkg/util/bitarray",
"//pkg/util/contextutil",
"//pkg/util/duration",
"//pkg/util/encoding",
"//pkg/util/errorutil",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/all_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func init() {
initPGBuiltins()
initMathBuiltins()
initReplicationBuiltins()
initProbeRangesBuiltins()

AllBuiltinNames = make([]string, 0, len(builtins))
AllAggregateBuiltinNames = make([]string, 0, len(aggregates))
Expand Down
233 changes: 233 additions & 0 deletions pkg/sql/sem/builtins/generator_probe_ranges.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package builtins

import (
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/kv/kvprober"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

func initProbeRangesBuiltins() {
// Add all windows to the Builtins map after a few sanity checks.
for k, v := range probeRangesGenerators {
if _, exists := builtins[k]; exists {
panic("duplicate builtin: " + k)
}

if v.props.Class != tree.GeneratorClass {
panic(errors.AssertionFailedf("generator functions should be marked with the tree.GeneratorClass "+
"function class, found %v", v))
}

builtins[k] = v
}
}

var probeRangesGenerators = map[string]builtinDefinition{
"crdb_internal.probe_ranges": makeBuiltin(
tree.FunctionProperties{
Class: tree.GeneratorClass,
},
makeGeneratorOverload(
tree.ArgTypes{
{Name: "timeout", Typ: types.Interval},
{Name: "probe_type", Typ: makeEnum()},
},
probeRangeGeneratorType,
makeProbeRangeGenerator,
`Returns rows of range data based on the results received when using the prober.
Parameters
timeout: interval for the maximum time the user wishes the prober to probe a range.
probe_type: enum indicating which kind of probe the prober should conduct (options are read or write).
Example usage
number of failed write probes: select count(1) from crdb_internal.probe_ranges(INTERVAL '1000ms', 'write') where error != '';
50 slowest probes: select range_id, error, end_to_end_latency_ms from crdb_internal.probe_ranges(INTERVAL '1000ms', true) order by end_to_end_latency_ms desc limit 50;
Notes
If a probe should fail, the latency will be set to MaxInt64 in order to naturally sort above other latencies.
Read probes are cheaper than write probes. If write probes have already ran, it's not necessary to also run a read probe.
A write probe will effectively probe reads as well.
`,
tree.VolatilityVolatile,
),
),
}

func makeEnum() *types.T {
enumMembers := []string{"read", "write"}
enumType := types.MakeEnum(typedesc.TypeIDToOID(500), typedesc.TypeIDToOID(100500))
enumType.TypeMeta = types.UserDefinedTypeMetadata{
EnumData: &types.EnumMetadata{
LogicalRepresentations: enumMembers,
PhysicalRepresentations: [][]byte{
{0x52, 0x45, 0x41, 0x44},
{0x57, 0x52, 0x49, 0x54, 0x45},
},
IsMemberReadOnly: make([]bool, len(enumMembers)),
},
}
return enumType
}

// probeRangeTypesGenerator supports the execution of
// crdb_internal.probe_range(timeout, type).

var probeRangeGeneratorLabels = []string{"range_id", "error", "end_to_end_latency_ms", "verbose_trace"}

var probeRangeGeneratorType = types.MakeLabeledTuple(
[]*types.T{types.Int, types.String, types.Int, types.String},
probeRangeGeneratorLabels,
)

type probeRangeRow struct {
rangeID int64
error string
latency time.Duration
verboseTrace string
}

type probeRangeGenerator struct {
db *kv.DB
timeout time.Duration
isWrite bool
tracer *tracing.Tracer
// The below are updated during calls to Next() throughout the lifecycle of
// probeRangeGenerator.
ops kvprober.ProberOps
curr probeRangeRow
ranges []kv.KeyValue
}

func makeProbeRangeGenerator(ctx *tree.EvalContext, args tree.Datums) (tree.ValueGenerator, error) {
// The user must be an admin to use this builtin.
isAdmin, err := ctx.SessionAccessor.HasAdminRole(ctx.Context)
if err != nil {
return nil, err
}
if !isAdmin {
return nil, pgerror.Newf(
pgcode.InsufficientPrivilege,
"only users with the admin role are allowed to use crdb_internal.probe_range",
)
}
// Handle args passed in.
timeout := time.Duration(tree.MustBeDInterval(args[0]).Duration.Nanos())
isWrite := args[1].(*tree.DEnum).LogicalRep
ranges, err := kvclient.ScanMetaKVs(ctx.Context, ctx.Txn, roachpb.Span{
Key: keys.MinKey,
EndKey: keys.MaxKey,
})
if err != nil {
return nil, err
}
return &probeRangeGenerator{
db: ctx.DB,
timeout: timeout,
isWrite: isWrite == "write",
tracer: ctx.Tracer,
ranges: ranges,
}, nil
}

// ResolvedType implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) ResolvedType() *types.T {
return probeRangeGeneratorType
}

// Start implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Start(_ context.Context, _ *kv.Txn) error {
return nil
}

// Next implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Next(ctx context.Context) (bool, error) {
if len(p.ranges) == 0 {
return false, nil
}
rawKV := p.ranges[0]
p.ranges = p.ranges[1:]
p.curr = probeRangeRow{}

var opName string
if p.isWrite {
opName = "write probe"
} else {
opName = "read probe"
}
ctx, sp := tracing.EnsureChildSpan(
ctx, p.tracer, opName,
tracing.WithForceRealSpan(),
)
sp.SetRecordingType(tracing.RecordingVerbose)
defer func() {
p.curr.verboseTrace = sp.FinishAndGetConfiguredRecording().String()
}()

tBegin := timeutil.Now()
err := contextutil.RunWithTimeout(ctx, opName, p.timeout, func(ctx context.Context) error {
var desc roachpb.RangeDescriptor
if err := rawKV.ValueProto(&desc); err != nil {
// NB: on error, p.curr.rangeID == 0.
return err
}
p.curr.rangeID = int64(desc.RangeID)

op := p.ops.Read
if p.isWrite {
op = p.ops.Write
}

key := desc.StartKey.AsRawKey()
if desc.RangeID == 1 {
// The first range starts at KeyMin, but the replicated keyspace starts only at keys.LocalMax,
// so there is a special case here.
key = keys.LocalMax
}
// NB: intentionally using a separate txn per probe to avoid undesirable cross-probe effects.
return p.db.Txn(ctx, op(key))
})

p.curr.latency = timeutil.Since(tBegin)
if err != nil {
p.curr.error = err.Error()
p.curr.latency = math.MaxInt64
}

return true, nil
}

// Values implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Values() (tree.Datums, error) {
return tree.Datums{
tree.NewDInt(tree.DInt(p.curr.rangeID)),
tree.NewDString(p.curr.error),
tree.NewDInt(tree.DInt(p.curr.latency.Milliseconds())),
tree.NewDString(p.curr.verboseTrace),
}, nil
}

// Close implements the tree.ValueGenerator interface.
func (p *probeRangeGenerator) Close(_ context.Context) {}