Skip to content

Commit

Permalink
Merge #56906
Browse files Browse the repository at this point in the history
56906: sql: produce mock ContentionEvents and display contention time in EXPLAIN ANALYZE r=RaduBerinde,tbg a=asubiotto

Please take a look at individual commits for details

Release note: None

Closes #56612 

@tbg could you take a look at the first commit which defines a `ContentionEvent` protobuf? It's close to what's described in #55583 (minus some fields that can be added later).

Co-authored-by: Alfonso Subiotto Marques <[email protected]>
  • Loading branch information
craig[bot] and asubiotto committed Dec 1, 2020
2 parents ddee2d4 + 05a34d3 commit 5faa952
Show file tree
Hide file tree
Showing 28 changed files with 1,244 additions and 780 deletions.
1,523 changes: 886 additions & 637 deletions pkg/roachpb/api.pb.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import "storage/enginepb/mvcc3.proto";
import "util/hlc/timestamp.proto";
import "util/tracing/tracingpb/recorded_span.proto";
import "gogoproto/gogo.proto";
import "google/protobuf/duration.proto";

// ReadConsistencyType specifies what type of consistency is observed
// during read operations.
Expand Down Expand Up @@ -2141,3 +2142,18 @@ service Internal {
// bootstrapped cluster, an appropriate error is returned.
rpc Join(JoinNodeRequest) returns (JoinNodeResponse) { }
}

// ContentionEvent is a message that will be attached to BatchResponses
// indicating any conflicts with another transaction during replica evaluation.
// This message is currently not emitted and only exists for SQL Execution to
// have a protobuf to work with to build the higher-level infrastructure around
// contention observability while the work to emit these events is ongoing.
message ContentionEvent {
// Key is the key that this and the other transaction conflicted on.
bytes key = 1 [(gogoproto.casttype) = "Key"];
// Txn is the other transaction.
Transaction txn = 2 [(gogoproto.nullable) = false];
// Duration spent contending against the other transaction.
google.protobuf.Duration duration = 3 [(gogoproto.nullable) = false,
(gogoproto.stdduration) = true];
}
6 changes: 3 additions & 3 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@ func (r opResult) createAndWrapRowSource(
if err != nil {
return nil, err
}
if ioReader, ok := proc.(execinfra.IOReader); ok {
r.IOReader = ioReader
if kvReader, ok := proc.(execinfra.KVReader); ok {
r.KVReader = kvReader
}
var (
rs execinfra.RowSource
Expand Down Expand Up @@ -709,7 +709,7 @@ func NewColOperator(
return r, err
}
result.Op = scanOp
result.IOReader = scanOp
result.KVReader = scanOp
result.MetadataSources = append(result.MetadataSources, scanOp)
result.Releasables = append(result.Releasables, scanOp)
// colBatchScan is wrapped with a cancel checker below, so we need to
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type NewColOperatorArgs struct {
// values of NewColOperator call.
type NewColOperatorResult struct {
Op colexecbase.Operator
IOReader execinfra.IOReader
KVReader execinfra.KVReader
ColumnTypes []*types.T
MetadataSources []execinfrapb.MetadataSource
// ToClose is a slice of components that need to be Closed.
Expand Down
40 changes: 20 additions & 20 deletions pkg/sql/colexec/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ func (bic *batchInfoCollector) getElapsedTime() time.Duration {

// NewVectorizedStatsCollector creates a VectorizedStatsCollector which wraps
// 'op' that corresponds to a component with either ProcessorID or StreamID 'id'
// (with 'idTagKey' distinguishing between the two). 'ioReader' is a component
// (either an operator or a wrapped processor) that performs IO reads that is
// (with 'idTagKey' distinguishing between the two). 'kvReader' is a component
// (either an operator or a wrapped processor) that performs KV reads that is
// present in the chain of operators rooted at 'op'.
//
// If omitNumTuples is set, the Output.NumTuples stat will not be set. This is
// used for operators that wrap row processors which already emit the same stat.
func NewVectorizedStatsCollector(
op colexecbase.Operator,
ioReader execinfra.IOReader,
kvReader execinfra.KVReader,
id int32,
idTagKey string,
omitNumTuples bool,
Expand All @@ -143,7 +143,7 @@ func NewVectorizedStatsCollector(
batchInfoCollector: makeBatchInfoCollector(op, id, inputWatch, inputStatsCollectors),
idTagKey: idTagKey,
omitNumTuples: omitNumTuples,
ioReader: ioReader,
kvReader: kvReader,
memMonitors: memMonitors,
diskMonitors: diskMonitors,
}
Expand All @@ -158,10 +158,9 @@ type vectorizedStatsCollectorImpl struct {
idTagKey string

omitNumTuples bool

ioReader execinfra.IOReader
memMonitors []*mon.BytesMonitor
diskMonitors []*mon.BytesMonitor
kvReader execinfra.KVReader
memMonitors []*mon.BytesMonitor
diskMonitors []*mon.BytesMonitor
}

// finish returns the collected stats.
Expand All @@ -177,29 +176,30 @@ func (vsc *vectorizedStatsCollectorImpl) finish() *execinfrapb.ComponentStats {
s.Exec.MaxAllocatedDisk.Add(diskMon.MaximumBytes())
}

// Depending on ioReader, the accumulated time spent by the wrapped operator
// Depending on kvReader, the accumulated time spent by the wrapped operator
// inside Next() is reported as either execution time or KV time.
ioTime := false
if vsc.ioReader != nil {
ioTime = true
if _, isProcessor := vsc.ioReader.(execinfra.Processor); isProcessor {
// We have a wrapped processor that performs IO reads. Most likely
kvTime := false
if vsc.kvReader != nil {
kvTime = true
if _, isProcessor := vsc.kvReader.(execinfra.Processor); isProcessor {
// We have a wrapped processor that performs KV reads. Most likely
// it is a rowexec.joinReader, so we want to display "execution
// time" and not "IO time". In the less likely case that it is a
// time" and not "KV time". In the less likely case that it is a
// wrapped rowexec.tableReader showing "execution time" is also
// acceptable.
ioTime = false
kvTime = false
}
}

if ioTime {
if kvTime {
s.KV.KVTime.Set(time)
// Note that ioTime is true only for ColBatchScans, and this is the
// Note that kvTime is true only for ColBatchScans, and this is the
// only case when we want to add the number of rows read (because the
// wrapped joinReaders and tableReaders will add that statistic
// themselves).
s.KV.TuplesRead.Set(uint64(vsc.ioReader.GetRowsRead()))
s.KV.BytesRead.Set(uint64(vsc.ioReader.GetBytesRead()))
s.KV.TuplesRead.Set(uint64(vsc.kvReader.GetRowsRead()))
s.KV.BytesRead.Set(uint64(vsc.kvReader.GetBytesRead()))
s.KV.ContentionTime.Set(vsc.kvReader.GetCumulativeContentionTime())
} else {
s.Exec.ExecTime.Set(time)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/colexec/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestNumBatches(t *testing.T) {
nBatches := 10
noop := NewNoop(makeFiniteChunksSourceWithBatchSize(nBatches, coldata.BatchSize()))
vsc := NewVectorizedStatsCollector(
noop, nil /* ioReader */, 0 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
noop, nil /* kvReader */, 0 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
timeutil.NewStopWatch(), nil /* memMonitors */, nil, /* diskMonitors */
nil, /* inputStatsCollectors */
)
Expand All @@ -57,7 +57,7 @@ func TestNumTuples(t *testing.T) {
for _, batchSize := range []int{1, 16, 1024} {
noop := NewNoop(makeFiniteChunksSourceWithBatchSize(nBatches, batchSize))
vsc := NewVectorizedStatsCollector(
noop, nil /* ioReader */, 0 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
noop, nil /* kvReader */, 0 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
timeutil.NewStopWatch(), nil /* memMonitors */, nil, /* diskMonitors */
nil, /* inputStatsCollectors */
)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestVectorizedStatsCollector(t *testing.T) {
timeSource: timeSource,
}
leftInput := NewVectorizedStatsCollector(
leftSource, nil /* ioReader */, 0 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
leftSource, nil /* kvReader */, 0 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
timeutil.NewTestStopWatch(timeSource.Now), nil /* memMonitors */, nil, /* diskMonitors */
nil, /* inputStatsCollectors */
)
Expand All @@ -99,7 +99,7 @@ func TestVectorizedStatsCollector(t *testing.T) {
timeSource: timeSource,
}
rightInput := NewVectorizedStatsCollector(
rightSource, nil /* ioReader */, 1 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
rightSource, nil /* kvReader */, 1 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
timeutil.NewTestStopWatch(timeSource.Now), nil /* memMonitors */, nil, /* diskMonitors */
nil, /* inputStatsCollectors */
)
Expand All @@ -120,7 +120,7 @@ func TestVectorizedStatsCollector(t *testing.T) {
}

mjStatsCollector := NewVectorizedStatsCollector(
timeAdvancingMergeJoiner, nil /* ioReader */, 2 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
timeAdvancingMergeJoiner, nil /* kvReader */, 2 /* id */, execinfrapb.ProcessorIDTagKey, false, /* omitNumTuples */
mjInputWatch, nil /* memMonitors */, nil, /* diskMonitors */
[]ChildStatsCollector{leftInput.(ChildStatsCollector), rightInput.(ChildStatsCollector)},
)
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ type cFetcher struct {
// are required to produce an MVCC timestamp system column.
mvccDecodeStrategy row.MVCCDecodingStrategy

// testingGenerateMockContentionEvents is a field that specifies whether
// a kvFetcher generates mock contention events. See
// kvFetcher.TestingEnableMockContentionEventGeneration.
// TODO(asubiotto): Remove once KV layer produces real contention events.
testingGenerateMockContentionEvents bool

// fetcher is the underlying fetcher that provides KVs.
fetcher *row.KVFetcher

Expand Down Expand Up @@ -615,6 +621,9 @@ func (rf *cFetcher) StartScan(
return err
}
rf.fetcher = f
if rf.testingGenerateMockContentionEvents {
rf.fetcher.TestingEnableMockContentionEventGeneration()
}
rf.machine.lastRowPrefix = nil
rf.machine.state[0] = stateInitFetch
return nil
Expand Down
20 changes: 17 additions & 3 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package colfetcher
import (
"context"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -62,7 +63,7 @@ type ColBatchScan struct {
ResultTypes []*types.T
}

var _ execinfra.IOReader = &ColBatchScan{}
var _ execinfra.KVReader = &ColBatchScan{}
var _ execinfra.Releasable = &ColBatchScan{}

// Init initializes a ColBatchScan.
Expand Down Expand Up @@ -119,16 +120,25 @@ func (s *ColBatchScan) DrainMeta(ctx context.Context) []execinfrapb.ProducerMeta
return trailingMeta
}

// GetBytesRead is part of the execinfra.IOReader interface.
// GetBytesRead is part of the execinfra.KVReader interface.
func (s *ColBatchScan) GetBytesRead() int64 {
return s.rf.fetcher.GetBytesRead()
}

// GetRowsRead is part of the execinfra.IOReader interface.
// GetRowsRead is part of the execinfra.KVReader interface.
func (s *ColBatchScan) GetRowsRead() int64 {
return s.rowsRead
}

// GetCumulativeContentionTime is part of the execinfra.KVReader interface.
func (s *ColBatchScan) GetCumulativeContentionTime() time.Duration {
var totalContentionTime time.Duration
for _, e := range s.rf.fetcher.GetContentionEvents() {
totalContentionTime += e.Duration
}
return totalContentionTime
}

var colBatchScanPool = sync.Pool{
New: func() interface{} {
return &ColBatchScan{}
Expand Down Expand Up @@ -197,6 +207,10 @@ func NewColBatchScan(
return nil, err
}

if flowCtx.Cfg.TestingKnobs.GenerateMockContentionEvents {
fetcher.testingGenerateMockContentionEvents = true
}

s := colBatchScanPool.Get().(*ColBatchScan)
spans := s.spans[:0]
for i := range spec.Spans {
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) {
// must have already been wrapped).
func (s *vectorizedFlowCreator) wrapWithVectorizedStatsCollectorBase(
op colexecbase.Operator,
ioReader execinfra.IOReader,
kvReader execinfra.KVReader,
inputs []colexecbase.Operator,
id int32,
idTagKey string,
Expand All @@ -368,7 +368,7 @@ func (s *vectorizedFlowCreator) wrapWithVectorizedStatsCollectorBase(
inputStatsCollectors[i] = sc
}
vsc := colexec.NewVectorizedStatsCollector(
op, ioReader, id, idTagKey, omitNumTuples, inputWatch,
op, kvReader, id, idTagKey, omitNumTuples, inputWatch,
memMonitors, diskMonitors, inputStatsCollectors,
)
s.vectorizedStatsCollectorsQueue = append(s.vectorizedStatsCollectorsQueue, vsc)
Expand Down Expand Up @@ -774,7 +774,7 @@ func (s *vectorizedFlowCreator) setupRouter(
// information (e.g. output stall time).
var err error
localOp, err = s.wrapWithVectorizedStatsCollectorBase(
op, nil /* ioReader */, nil, /* inputs */
op, nil /* kvReader */, nil, /* inputs */
int32(stream.StreamID), execinfrapb.StreamIDTagKey, false /* omitNumTuples */, mons,
)
if err != nil {
Expand Down Expand Up @@ -913,7 +913,7 @@ func (s *vectorizedFlowCreator) setupInput(
// this stats collector to display stats.
var err error
op, err = s.wrapWithVectorizedStatsCollectorBase(
op, nil /* ioReader */, statsInputsAsOps, -1, /* id */
op, nil /* kvReader */, statsInputsAsOps, -1, /* id */
"" /* idTagKey */, false /* omitNumTuples */, nil, /* monitors */
)
if err != nil {
Expand Down Expand Up @@ -1155,7 +1155,7 @@ func (s *vectorizedFlowCreator) setupFlow(
// wrapped processor already emits the same stat.
_, isColumnarizer := originalOp.(*colexec.Columnarizer)
op, err = s.wrapWithVectorizedStatsCollectorBase(
op, result.IOReader, inputs, pspec.ProcessorID,
op, result.KVReader, inputs, pspec.ProcessorID,
execinfrapb.ProcessorIDTagKey, isColumnarizer, result.OpMonitors,
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/kv",
"//pkg/roachpb",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/lease",
"//pkg/sql/colflow",
Expand Down
20 changes: 20 additions & 0 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
Expand Down Expand Up @@ -365,6 +366,18 @@ func (ds *ServerImpl) setupFlow(
return ctx, f, nil
}

// testingGenerateMockContentionEvents is a testing cluster setting that
// produces mock contention events. Refer to
// KVFetcher.TestingEnableMockContentionEventGeneration for a more in-depth
// description of these contention events. This setting is currently used to
// test SQL Execution contention observability.
// TODO(asubiotto): Remove once KV layer produces real contention events.
var testingGenerateMockContentionEvents = settings.RegisterBoolSetting(
"sql.testing.mock_contention.enabled",
"whether the KV layer should generate mock contention events",
false,
)

// NewFlowContext creates a new FlowCtx that can be used during execution of
// a flow.
func (ds *ServerImpl) NewFlowContext(
Expand All @@ -385,6 +398,13 @@ func (ds *ServerImpl) NewFlowContext(
Local: localState.IsLocal,
}

if testingGenerateMockContentionEvents.Get(&flowCtx.Cfg.Settings.SV) {
// Note: If a race occurs with this line, this setting is being improperly
// used. Mock contention events should be generated either using the cluster
// setting or programmatically using the testing knob, but not both.
ds.ServerConfig.TestingKnobs.GenerateMockContentionEvents = true
}

if localState.IsLocal && localState.Collection != nil {
// If we were passed a descs.Collection to use, then take it. In this case,
// the caller will handle releasing the used descriptors, so we don't need
Expand Down
13 changes: 9 additions & 4 deletions pkg/sql/execinfra/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package execinfra

import "time"

// OpNode is an interface to operator-like structures with children.
type OpNode interface {
// ChildCount returns the number of children (inputs) of the operator.
Expand All @@ -19,10 +21,13 @@ type OpNode interface {
Child(nth int, verbose bool) OpNode
}

// IOReader is an operator that performs IO reads.
type IOReader interface {
// GetBytesRead returns the number of bytes read from disk by this operator.
// KVReader is an operator that performs KV reads.
type KVReader interface {
// GetBytesRead returns the number of bytes read from KV by this operator.
GetBytesRead() int64
// GetRowsRead returns the number of rows read from disk by this operator.
// GetRowsRead returns the number of rows read from KV by this operator.
GetRowsRead() int64
// GetCumulativeContentionTime returns the amount of time KV reads spent
// contending.
GetCumulativeContentionTime() time.Duration
}
7 changes: 7 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ type TestingKnobs struct {
// stall time and bytes sent. It replaces them with a zero value.
DeterministicStats bool

// GenerateMockContentionEvents causes any kv fetcher used in the flow to
// generate mock contention events. See
// TestingEnableMockContentionEventGeneration for more details. This testing
// knob can also be enabled via a cluster setting.
// TODO(asubiotto): Remove once KV layer produces real contention events.
GenerateMockContentionEvents bool

// CheckVectorizedFlowIsClosedCorrectly checks that all components in a flow
// were closed explicitly in flow.Cleanup.
CheckVectorizedFlowIsClosedCorrectly bool
Expand Down
Loading

0 comments on commit 5faa952

Please sign in to comment.