-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
stats.go
361 lines (319 loc) · 12.6 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colflow
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colflow/colrpc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
// childStatsCollector gives access to the stopwatches of a
// colexecop.VectorizedStatsCollector's childStatsCollectors.
type childStatsCollector interface {
getElapsedTime() time.Duration
}
// batchInfoCollector is a helper used by collector implementations.
//
// It wraps a colexecop.Operator (inside of the colexecop.OneInputNode) and
// keeps track of how much time was spent while calling Next on the underlying
// Operator and how many batches and tuples were returned.
type batchInfoCollector struct {
colexecop.OneInputNode
colexecop.NonExplainable
componentID execinfrapb.ComponentID
mu struct {
// We need a mutex because finishAndGetStats() and Next() might be
// called from different goroutines.
syncutil.Mutex
initialized bool
numBatches, numTuples uint64
}
// ctx is used only by the init() adapter.
ctx context.Context
// batch is the last batch returned by the wrapped operator.
batch coldata.Batch
// stopwatch keeps track of the amount of time the wrapped operator spent
// doing work. Note that this will include all of the time that the operator's
// inputs spent doing work - this will be corrected when stats are reported
// in finishAndGetStats().
stopwatch *timeutil.StopWatch
// childStatsCollectors contains the stats collectors for all of the inputs
// to the wrapped operator.
childStatsCollectors []childStatsCollector
}
var _ colexecop.Operator = &batchInfoCollector{}
func makeBatchInfoCollector(
op colexecop.Operator,
id execinfrapb.ComponentID,
inputWatch *timeutil.StopWatch,
childStatsCollectors []childStatsCollector,
) batchInfoCollector {
if inputWatch == nil {
colexecerror.InternalError(errors.AssertionFailedf("input watch is nil"))
}
return batchInfoCollector{
OneInputNode: colexecop.NewOneInputNode(op),
componentID: id,
stopwatch: inputWatch,
childStatsCollectors: childStatsCollectors,
}
}
func (bic *batchInfoCollector) init() {
bic.Input.Init(bic.ctx)
}
// Init is part of the colexecop.Operator interface.
func (bic *batchInfoCollector) Init(ctx context.Context) {
bic.ctx = ctx
bic.stopwatch.Start()
// Wrap the call to Init() with a panic catcher in order to get the correct
// execution time (e.g. in the statement bundle).
err := colexecerror.CatchVectorizedRuntimeError(bic.init)
bic.stopwatch.Stop()
if err != nil {
colexecerror.InternalError(err)
}
// Unset the context so that it's not used outside of the init() function.
bic.ctx = nil
bic.mu.Lock()
// If we got here, then Init above succeeded, so the wrapped operator has
// been properly initialized.
bic.mu.initialized = true
bic.mu.Unlock()
}
func (bic *batchInfoCollector) next() {
bic.batch = bic.Input.Next()
}
// Next is part of the colexecop.Operator interface.
func (bic *batchInfoCollector) Next() coldata.Batch {
bic.stopwatch.Start()
// Wrap the call to Next() with a panic catcher in order to get the correct
// execution time (e.g. in the statement bundle).
err := colexecerror.CatchVectorizedRuntimeError(bic.next)
bic.stopwatch.Stop()
if err != nil {
colexecerror.InternalError(err)
}
if bic.batch.Length() > 0 {
bic.mu.Lock()
bic.mu.numBatches++
bic.mu.numTuples += uint64(bic.batch.Length())
bic.mu.Unlock()
}
return bic.batch
}
// finishAndGetStats calculates the final execution statistics for the wrapped
// operator. ok indicates whether the stats collection was successful.
func (bic *batchInfoCollector) finishAndGetStats() (
numBatches, numTuples uint64,
time time.Duration,
ok bool,
) {
tm := bic.stopwatch.Elapsed()
// Subtract the time spent in each of the child stats collectors, to produce
// the amount of time that the wrapped operator spent doing work itself, not
// including time spent waiting on its inputs.
for _, statsCollectors := range bic.childStatsCollectors {
tm -= statsCollectors.getElapsedTime()
}
if buildutil.CrdbTestBuild {
if tm < 0 {
colexecerror.InternalError(errors.AssertionFailedf("unexpectedly execution time is negative"))
}
}
bic.mu.Lock()
defer bic.mu.Unlock()
return bic.mu.numBatches, bic.mu.numTuples, tm, bic.mu.initialized
}
// getElapsedTime implements the childStatsCollector interface.
func (bic *batchInfoCollector) getElapsedTime() time.Duration {
return bic.stopwatch.Elapsed()
}
// newVectorizedStatsCollector creates a colexecop.VectorizedStatsCollector
// which wraps 'op' that corresponds to a component with either ProcessorID or
// StreamID 'id' (with 'idTagKey' distinguishing between the two). 'kvReader' is
// a component (either an operator or a wrapped processor) that performs KV
// reads that is present in the chain of operators rooted at 'op'.
func newVectorizedStatsCollector(
op colexecop.Operator,
kvReader colexecop.KVReader,
columnarizer colexecop.VectorizedStatsCollector,
id execinfrapb.ComponentID,
inputWatch *timeutil.StopWatch,
memMonitors []*mon.BytesMonitor,
diskMonitors []*mon.BytesMonitor,
inputStatsCollectors []childStatsCollector,
) colexecop.VectorizedStatsCollector {
// TODO(cathymw): Refactor to have specialized stats collectors for
// memory/disk stats and IO operators.
return &vectorizedStatsCollectorImpl{
batchInfoCollector: makeBatchInfoCollector(op, id, inputWatch, inputStatsCollectors),
kvReader: kvReader,
columnarizer: columnarizer,
memMonitors: memMonitors,
diskMonitors: diskMonitors,
}
}
// vectorizedStatsCollectorImpl is the implementation behind
// newVectorizedStatsCollector.
type vectorizedStatsCollectorImpl struct {
batchInfoCollector
kvReader colexecop.KVReader
columnarizer colexecop.VectorizedStatsCollector
memMonitors []*mon.BytesMonitor
diskMonitors []*mon.BytesMonitor
}
// GetStats is part of the colexecop.VectorizedStatsCollector interface.
func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats {
numBatches, numTuples, time, ok := vsc.batchInfoCollector.finishAndGetStats()
if !ok {
// The stats collection wasn't successful for some reason, so we will
// return an empty object (since nil is not allowed by the contract of
// GetStats).
//
// Such scenario can occur, for example, if the operator wrapped by the
// batchInfoCollector wasn't properly initialized, yet the stats
// retrieval is attempted. In many places we assume that if an operator
// is interacted with in any way, it must have been successfully
// initialized. Having such a check in the vectorizedStatsCollectorImpl
// makes sure that we never violate those assumptions.
return &execinfrapb.ComponentStats{}
}
var s *execinfrapb.ComponentStats
if vsc.columnarizer != nil {
s = vsc.columnarizer.GetStats()
} else {
// There was no root columnarizer, so create a new stats object.
s = &execinfrapb.ComponentStats{Component: vsc.componentID}
}
for _, memMon := range vsc.memMonitors {
s.Exec.MaxAllocatedMem.Add(memMon.MaximumBytes())
}
for _, diskMon := range vsc.diskMonitors {
s.Exec.MaxAllocatedDisk.Add(diskMon.MaximumBytes())
}
if vsc.kvReader != nil {
// Note that kvReader is non-nil only for ColBatchScans, and this is the
// only case when we want to add the number of rows read, bytes read,
// and the contention time (because the wrapped row-execution KV reading
// processors - joinReaders, tableReaders, zigzagJoiners, and
// invertedJoiners - will add these statistics themselves). Similarly,
// for those wrapped processors it is ok to show the time as "execution
// time" since "KV time" would only make sense for tableReaders, and
// they are less likely to be wrapped than others.
s.KV.KVTime.Set(time)
s.KV.TuplesRead.Set(uint64(vsc.kvReader.GetRowsRead()))
s.KV.BytesRead.Set(uint64(vsc.kvReader.GetBytesRead()))
s.KV.BatchRequestsIssued.Set(uint64(vsc.kvReader.GetBatchRequestsIssued()))
s.KV.ContentionTime.Set(vsc.kvReader.GetCumulativeContentionTime())
s.KV.UsedStreamer = vsc.kvReader.UsedStreamer()
scanStats := vsc.kvReader.GetScanStats()
execstats.PopulateKVMVCCStats(&s.KV, &scanStats)
s.Exec.ConsumedRU.Set(scanStats.ConsumedRU)
} else {
s.Exec.ExecTime.Set(time)
}
s.Output.NumBatches.Set(numBatches)
s.Output.NumTuples.Set(numTuples)
return s
}
// newNetworkVectorizedStatsCollector creates a new
// colexecop.VectorizedStatsCollector for streams. In addition to the base stats,
// newNetworkVectorizedStatsCollector collects the network latency for a stream.
func newNetworkVectorizedStatsCollector(
op colexecop.Operator,
id execinfrapb.ComponentID,
inputWatch *timeutil.StopWatch,
inbox *colrpc.Inbox,
latency time.Duration,
) colexecop.VectorizedStatsCollector {
return &networkVectorizedStatsCollectorImpl{
batchInfoCollector: makeBatchInfoCollector(op, id, inputWatch, nil /* childStatsCollectors */),
inbox: inbox,
latency: latency,
}
}
// networkVectorizedStatsCollectorImpl is the implementation behind
// newNetworkVectorizedStatsCollector.
type networkVectorizedStatsCollectorImpl struct {
batchInfoCollector
inbox *colrpc.Inbox
latency time.Duration
}
// GetStats is part of the colexecop.VectorizedStatsCollector interface.
func (nvsc *networkVectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats {
numBatches, numTuples, time, ok := nvsc.batchInfoCollector.finishAndGetStats()
if !ok {
// The stats collection wasn't successful for some reason, so we will
// return an empty object (since nil is not allowed by the contract of
// GetStats).
//
// Such scenario can occur, for example, if the operator wrapped by the
// batchInfoCollector wasn't properly initialized, yet the stats
// retrieval is attempted. In many places we assume that if an operator
// is interacted with in any way, it must have been successfully
// initialized. Having such a check in the vectorizedStatsCollectorImpl
// makes sure that we never violate those assumptions.
return &execinfrapb.ComponentStats{}
}
s := &execinfrapb.ComponentStats{Component: nvsc.componentID}
s.NetRx.Latency.Set(nvsc.latency)
s.NetRx.WaitTime.Set(time)
s.NetRx.DeserializationTime.Set(nvsc.inbox.GetDeserializationTime())
s.NetRx.TuplesReceived.Set(uint64(nvsc.inbox.GetRowsRead()))
s.NetRx.BytesReceived.Set(uint64(nvsc.inbox.GetBytesRead()))
s.NetRx.MessagesReceived.Set(uint64(nvsc.inbox.GetNumMessages()))
s.Output.NumBatches.Set(numBatches)
s.Output.NumTuples.Set(numTuples)
return s
}
// maybeAddStatsInvariantChecker will add a statsInvariantChecker to both
// StatsCollectors and MetadataSources of op if crdb_test build tag is
// specified. See comment on statsInvariantChecker for the kind of invariants
// checked.
func maybeAddStatsInvariantChecker(op *colexecargs.OpWithMetaInfo) {
if buildutil.CrdbTestBuild {
c := &statsInvariantChecker{}
op.StatsCollectors = append(op.StatsCollectors, c)
op.MetadataSources = append(op.MetadataSources, c)
}
}
// statsInvariantChecker is a dummy colexecop.VectorizedStatsCollector as well
// as colexecop.MetadataSource which asserts that GetStats is called before
// DrainMeta. It should only be used in the test environment.
type statsInvariantChecker struct {
colexecop.ZeroInputNode
statsRetrieved bool
}
var _ colexecop.VectorizedStatsCollector = &statsInvariantChecker{}
var _ colexecop.MetadataSource = &statsInvariantChecker{}
func (i *statsInvariantChecker) Init(context.Context) {}
func (i *statsInvariantChecker) Next() coldata.Batch {
return coldata.ZeroBatch
}
func (i *statsInvariantChecker) GetStats() *execinfrapb.ComponentStats {
i.statsRetrieved = true
return &execinfrapb.ComponentStats{}
}
func (i *statsInvariantChecker) DrainMeta() []execinfrapb.ProducerMetadata {
if !i.statsRetrieved {
return []execinfrapb.ProducerMetadata{{Err: errors.New("GetStats wasn't called before DrainMeta")}}
}
return nil
}