Skip to content

Commit

Permalink
colflow: enforce stats collection before drain meta
Browse files Browse the repository at this point in the history
This commit introduces a dummy vectorized stats collector that is also
a metadata source that simply checks that `GetStats` has been called
before `DrainMeta` and returns an error as a metadata if not. This stats
invariants checker is now planned whenever `util.CrdbTestBuild` is true
(which is when `TAGS=crdb_test` was specified).

Release note: None
  • Loading branch information
yuzefovich committed Apr 9, 2021
1 parent 8c22cdb commit 237f8b8
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 32 deletions.
40 changes: 40 additions & 0 deletions pkg/sql/colflow/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colflow/colrpc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -237,3 +239,41 @@ func (nvsc *networkVectorizedStatsCollectorImpl) GetStats() *execinfrapb.Compone

return s
}

func maybeAddStatsInvariantChecker(op *colexecargs.OpWithMetaInfo) {
if util.CrdbTestBuild {
c := &statsInvariantChecker{}
op.StatsCollectors = append(op.StatsCollectors, c)
op.MetadataSources = append(op.MetadataSources, c)
}
}

// statsInvariantChecker is a dummy colexecop.VectorizedStatsCollector as well
// as colexecop.MetadataSource which asserts that GetStats is called before
// DrainMeta. It should only be used in the test environment.
type statsInvariantChecker struct {
colexecop.ZeroInputNode

statsRetrieved bool
}

var _ colexecop.VectorizedStatsCollector = &statsInvariantChecker{}
var _ colexecop.MetadataSource = &statsInvariantChecker{}

func (i *statsInvariantChecker) Init() {}

func (i *statsInvariantChecker) Next(context.Context) coldata.Batch {
return coldata.ZeroBatch
}

func (i *statsInvariantChecker) GetStats() *execinfrapb.ComponentStats {
i.statsRetrieved = true
return &execinfrapb.ComponentStats{}
}

func (i *statsInvariantChecker) DrainMeta(context.Context) []execinfrapb.ProducerMetadata {
if !i.statsRetrieved {
return []execinfrapb.ProducerMetadata{{Err: errors.New("GetStats wasn't called before DrainMeta")}}
}
return nil
}
60 changes: 28 additions & 32 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) {
// created wrapper with those corresponding to operators in inputs (the latter
// must have already been wrapped).
func (s *vectorizedFlowCreator) wrapWithVectorizedStatsCollectorBase(
op colexecop.Operator,
op *colexecargs.OpWithMetaInfo,
kvReader colexecop.KVReader,
columnarizer colexecop.VectorizedStatsCollector,
inputs []colexecargs.OpWithMetaInfo,
component execinfrapb.ComponentID,
monitors []*mon.BytesMonitor,
) (colexecop.VectorizedStatsCollector, error) {
) error {
inputWatch := timeutil.NewStopWatch()
var memMonitors, diskMonitors []*mon.BytesMonitor
for _, m := range monitors {
Expand All @@ -335,26 +335,33 @@ func (s *vectorizedFlowCreator) wrapWithVectorizedStatsCollectorBase(
for i, input := range inputs {
sc, ok := input.Root.(childStatsCollector)
if !ok {
return nil, errors.New("unexpectedly an input is not collecting stats")
return errors.New("unexpectedly an input is not collecting stats")
}
inputStatsCollectors[i] = sc
}
return newVectorizedStatsCollector(
op, kvReader, columnarizer, component, inputWatch,
vsc := newVectorizedStatsCollector(
op.Root, kvReader, columnarizer, component, inputWatch,
memMonitors, diskMonitors, inputStatsCollectors,
), nil
)
op.Root = vsc
op.StatsCollectors = append(op.StatsCollectors, vsc)
maybeAddStatsInvariantChecker(op)
return nil
}

// wrapWithNetworkVectorizedStatsCollector creates a new
// colexec.NetworkVectorizedStatsCollector that wraps op.
// colexecop.VectorizedStatsCollector that wraps op.
func (s *vectorizedFlowCreator) wrapWithNetworkVectorizedStatsCollector(
op colexecop.Operator,
op *colexecargs.OpWithMetaInfo,
inbox *colrpc.Inbox,
component execinfrapb.ComponentID,
latency time.Duration,
) colexecop.VectorizedStatsCollector {
) {
inputWatch := timeutil.NewStopWatch()
return newNetworkVectorizedStatsCollector(op, component, inputWatch, inbox, latency)
nvsc := newNetworkVectorizedStatsCollector(op.Root, component, inputWatch, inbox, latency)
op.Root = nvsc
op.StatsCollectors = []colexecop.VectorizedStatsCollector{nvsc}
maybeAddStatsInvariantChecker(op)
}

// makeGetStatsFnForOutbox creates a function that will retrieve all execution
Expand Down Expand Up @@ -768,15 +775,12 @@ func (s *vectorizedFlowCreator) setupRouter(
// Wrap local outputs with vectorized stats collectors when recording
// stats. This is mostly for compatibility but will provide some useful
// information (e.g. output stall time).
vsc, err := s.wrapWithVectorizedStatsCollectorBase(
op, nil /* kvReader */, nil /* columnarizer */, nil, /* inputs */
flowCtx.StreamComponentID(stream.StreamID), mons,
)
if err != nil {
if err := s.wrapWithVectorizedStatsCollectorBase(
&opWithMetaInfo, nil /* kvReader */, nil, /* columnarizer */
nil /* inputs */, flowCtx.StreamComponentID(stream.StreamID), mons,
); err != nil {
return err
}
opWithMetaInfo.Root = vsc
opWithMetaInfo.StatsCollectors = []colexecop.VectorizedStatsCollector{vsc}
}
s.streamIDToInputOp[stream.StreamID] = opWithMetaInfo
}
Expand Down Expand Up @@ -860,9 +864,7 @@ func (s *vectorizedFlowCreator) setupInput(
compID := execinfrapb.StreamComponentID(
base.SQLInstanceID(inputStream.OriginNodeID), flowCtx.ID, inputStream.StreamID,
)
vsc := s.wrapWithNetworkVectorizedStatsCollector(op, inbox, compID, latency)
opWithMetaInfo.Root = vsc
opWithMetaInfo.StatsCollectors = []colexecop.VectorizedStatsCollector{vsc}
s.wrapWithNetworkVectorizedStatsCollector(&opWithMetaInfo, inbox, compID, latency)
}
inputStreamOps = append(inputStreamOps, opWithMetaInfo)
default:
Expand Down Expand Up @@ -920,15 +922,12 @@ func (s *vectorizedFlowCreator) setupInput(
}
// TODO(asubiotto): Once we have IDs for synchronizers, plumb them into
// this stats collector to display stats.
vsc, err := s.wrapWithVectorizedStatsCollectorBase(
opWithMetaInfo.Root, nil /* kvReader */, nil, /* columnarizer */
if err := s.wrapWithVectorizedStatsCollectorBase(
&opWithMetaInfo, nil /* kvReader */, nil, /* columnarizer */
statsInputsAsOps, execinfrapb.ComponentID{}, nil, /* monitors */
)
if err != nil {
); err != nil {
return colexecargs.OpWithMetaInfo{}, err
}
opWithMetaInfo.Root = vsc
opWithMetaInfo.StatsCollectors = []colexecop.VectorizedStatsCollector{vsc}
}
}
return opWithMetaInfo, nil
Expand Down Expand Up @@ -1113,15 +1112,12 @@ func (s *vectorizedFlowCreator) setupFlow(
}

if s.recordingStats {
vsc, err := s.wrapWithVectorizedStatsCollectorBase(
result.Root, result.KVReader, result.Columnarizer, inputs,
if err := s.wrapWithVectorizedStatsCollectorBase(
&result.OpWithMetaInfo, result.KVReader, result.Columnarizer, inputs,
flowCtx.ProcessorComponentID(pspec.ProcessorID), result.OpMonitors,
)
if err != nil {
); err != nil {
return
}
result.Root = vsc
result.StatsCollectors = append(result.StatsCollectors, vsc)
}

if err = s.setupOutput(
Expand Down

0 comments on commit 237f8b8

Please sign in to comment.