From 237f8b852b0fda84d8d06606e233afe995c53c28 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 19 Mar 2021 12:04:20 -0700 Subject: [PATCH] colflow: enforce stats collection before drain meta This commit introduces a dummy vectorized stats collector that is also a metadata source that simply checks that `GetStats` has been called before `DrainMeta` and returns an error as a metadata if not. This stats invariants checker is now planned whenever `util.CrdbTestBuild` is true (which is when `TAGS=crdb_test` was specified). Release note: None --- pkg/sql/colflow/stats.go | 40 ++++++++++++++++++++ pkg/sql/colflow/vectorized_flow.go | 60 ++++++++++++++---------------- 2 files changed, 68 insertions(+), 32 deletions(-) diff --git a/pkg/sql/colflow/stats.go b/pkg/sql/colflow/stats.go index 8410a0006c6f..8243bfdda244 100644 --- a/pkg/sql/colflow/stats.go +++ b/pkg/sql/colflow/stats.go @@ -15,10 +15,12 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colflow/colrpc" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -237,3 +239,41 @@ func (nvsc *networkVectorizedStatsCollectorImpl) GetStats() *execinfrapb.Compone return s } + +func maybeAddStatsInvariantChecker(op *colexecargs.OpWithMetaInfo) { + if util.CrdbTestBuild { + c := &statsInvariantChecker{} + op.StatsCollectors = append(op.StatsCollectors, c) + op.MetadataSources = append(op.MetadataSources, c) + } +} + +// statsInvariantChecker is a dummy colexecop.VectorizedStatsCollector as well +// as colexecop.MetadataSource which asserts that GetStats is called before +// DrainMeta. It should only be used in the test environment. +type statsInvariantChecker struct { + colexecop.ZeroInputNode + + statsRetrieved bool +} + +var _ colexecop.VectorizedStatsCollector = &statsInvariantChecker{} +var _ colexecop.MetadataSource = &statsInvariantChecker{} + +func (i *statsInvariantChecker) Init() {} + +func (i *statsInvariantChecker) Next(context.Context) coldata.Batch { + return coldata.ZeroBatch +} + +func (i *statsInvariantChecker) GetStats() *execinfrapb.ComponentStats { + i.statsRetrieved = true + return &execinfrapb.ComponentStats{} +} + +func (i *statsInvariantChecker) DrainMeta(context.Context) []execinfrapb.ProducerMetadata { + if !i.statsRetrieved { + return []execinfrapb.ProducerMetadata{{Err: errors.New("GetStats wasn't called before DrainMeta")}} + } + return nil +} diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 40bed5571bdb..940a56336f1b 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -315,13 +315,13 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) { // created wrapper with those corresponding to operators in inputs (the latter // must have already been wrapped). func (s *vectorizedFlowCreator) wrapWithVectorizedStatsCollectorBase( - op colexecop.Operator, + op *colexecargs.OpWithMetaInfo, kvReader colexecop.KVReader, columnarizer colexecop.VectorizedStatsCollector, inputs []colexecargs.OpWithMetaInfo, component execinfrapb.ComponentID, monitors []*mon.BytesMonitor, -) (colexecop.VectorizedStatsCollector, error) { +) error { inputWatch := timeutil.NewStopWatch() var memMonitors, diskMonitors []*mon.BytesMonitor for _, m := range monitors { @@ -335,26 +335,33 @@ func (s *vectorizedFlowCreator) wrapWithVectorizedStatsCollectorBase( for i, input := range inputs { sc, ok := input.Root.(childStatsCollector) if !ok { - return nil, errors.New("unexpectedly an input is not collecting stats") + return errors.New("unexpectedly an input is not collecting stats") } inputStatsCollectors[i] = sc } - return newVectorizedStatsCollector( - op, kvReader, columnarizer, component, inputWatch, + vsc := newVectorizedStatsCollector( + op.Root, kvReader, columnarizer, component, inputWatch, memMonitors, diskMonitors, inputStatsCollectors, - ), nil + ) + op.Root = vsc + op.StatsCollectors = append(op.StatsCollectors, vsc) + maybeAddStatsInvariantChecker(op) + return nil } // wrapWithNetworkVectorizedStatsCollector creates a new -// colexec.NetworkVectorizedStatsCollector that wraps op. +// colexecop.VectorizedStatsCollector that wraps op. func (s *vectorizedFlowCreator) wrapWithNetworkVectorizedStatsCollector( - op colexecop.Operator, + op *colexecargs.OpWithMetaInfo, inbox *colrpc.Inbox, component execinfrapb.ComponentID, latency time.Duration, -) colexecop.VectorizedStatsCollector { +) { inputWatch := timeutil.NewStopWatch() - return newNetworkVectorizedStatsCollector(op, component, inputWatch, inbox, latency) + nvsc := newNetworkVectorizedStatsCollector(op.Root, component, inputWatch, inbox, latency) + op.Root = nvsc + op.StatsCollectors = []colexecop.VectorizedStatsCollector{nvsc} + maybeAddStatsInvariantChecker(op) } // makeGetStatsFnForOutbox creates a function that will retrieve all execution @@ -768,15 +775,12 @@ func (s *vectorizedFlowCreator) setupRouter( // Wrap local outputs with vectorized stats collectors when recording // stats. This is mostly for compatibility but will provide some useful // information (e.g. output stall time). - vsc, err := s.wrapWithVectorizedStatsCollectorBase( - op, nil /* kvReader */, nil /* columnarizer */, nil, /* inputs */ - flowCtx.StreamComponentID(stream.StreamID), mons, - ) - if err != nil { + if err := s.wrapWithVectorizedStatsCollectorBase( + &opWithMetaInfo, nil /* kvReader */, nil, /* columnarizer */ + nil /* inputs */, flowCtx.StreamComponentID(stream.StreamID), mons, + ); err != nil { return err } - opWithMetaInfo.Root = vsc - opWithMetaInfo.StatsCollectors = []colexecop.VectorizedStatsCollector{vsc} } s.streamIDToInputOp[stream.StreamID] = opWithMetaInfo } @@ -860,9 +864,7 @@ func (s *vectorizedFlowCreator) setupInput( compID := execinfrapb.StreamComponentID( base.SQLInstanceID(inputStream.OriginNodeID), flowCtx.ID, inputStream.StreamID, ) - vsc := s.wrapWithNetworkVectorizedStatsCollector(op, inbox, compID, latency) - opWithMetaInfo.Root = vsc - opWithMetaInfo.StatsCollectors = []colexecop.VectorizedStatsCollector{vsc} + s.wrapWithNetworkVectorizedStatsCollector(&opWithMetaInfo, inbox, compID, latency) } inputStreamOps = append(inputStreamOps, opWithMetaInfo) default: @@ -920,15 +922,12 @@ func (s *vectorizedFlowCreator) setupInput( } // TODO(asubiotto): Once we have IDs for synchronizers, plumb them into // this stats collector to display stats. - vsc, err := s.wrapWithVectorizedStatsCollectorBase( - opWithMetaInfo.Root, nil /* kvReader */, nil, /* columnarizer */ + if err := s.wrapWithVectorizedStatsCollectorBase( + &opWithMetaInfo, nil /* kvReader */, nil, /* columnarizer */ statsInputsAsOps, execinfrapb.ComponentID{}, nil, /* monitors */ - ) - if err != nil { + ); err != nil { return colexecargs.OpWithMetaInfo{}, err } - opWithMetaInfo.Root = vsc - opWithMetaInfo.StatsCollectors = []colexecop.VectorizedStatsCollector{vsc} } } return opWithMetaInfo, nil @@ -1113,15 +1112,12 @@ func (s *vectorizedFlowCreator) setupFlow( } if s.recordingStats { - vsc, err := s.wrapWithVectorizedStatsCollectorBase( - result.Root, result.KVReader, result.Columnarizer, inputs, + if err := s.wrapWithVectorizedStatsCollectorBase( + &result.OpWithMetaInfo, result.KVReader, result.Columnarizer, inputs, flowCtx.ProcessorComponentID(pspec.ProcessorID), result.OpMonitors, - ) - if err != nil { + ); err != nil { return } - result.Root = vsc - result.StatsCollectors = append(result.StatsCollectors, vsc) } if err = s.setupOutput(