Skip to content

Commit

Permalink
rowexec: account for some additional memory used by stats collection
Browse files Browse the repository at this point in the history
Prior to this commit, we did not account for the memory used in the
sampleAggregator when we copy all samples into a new slice before
generating a histogram. This commit adds some additional memory
accounting for this overhead.

Informs cockroachdb#45812

Release note: None
  • Loading branch information
rytaft committed Apr 7, 2020
1 parent de1b706 commit 4ca9851
Showing 1 changed file with 38 additions and 7 deletions.
45 changes: 38 additions & 7 deletions pkg/sql/rowexec/sample_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ type sampleAggregator struct {

spec *execinfrapb.SampleAggregatorSpec
input execinfra.RowSource
memAcc mon.BoundAccount
inTypes []types.T
sr stats.SampleReservoir

// memAcc accounts for memory accumulated throughout the life of the
// sampleAggregator.
memAcc mon.BoundAccount

// tempMemAcc is used to account for memory that is allocated temporarily
// and released before the sampleAggregator is finished.
tempMemAcc mon.BoundAccount

tableID sqlbase.ID
sampledCols []sqlbase.ColumnID
sketches []sketchInfo
Expand Down Expand Up @@ -97,8 +104,9 @@ func newSampleAggregator(
s := &sampleAggregator{
spec: spec,
input: input,
memAcc: memMonitor.MakeBoundAccount(),
inTypes: input.OutputTypes(),
memAcc: memMonitor.MakeBoundAccount(),
tempMemAcc: memMonitor.MakeBoundAccount(),
tableID: spec.TableID,
sampledCols: spec.SampledColumnIDs,
sketches: make([]sketchInfo, len(spec.Sketches)),
Expand Down Expand Up @@ -161,6 +169,7 @@ func (s *sampleAggregator) Run(ctx context.Context) {
func (s *sampleAggregator) close() {
if s.InternalClose() {
s.memAcc.Close(s.Ctx)
s.tempMemAcc.Close(s.Ctx)
s.MemMonitor.Stop(s.Ctx)
}
}
Expand Down Expand Up @@ -322,7 +331,8 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error {
colIdx := int(si.spec.Columns[0])
typ := &s.inTypes[colIdx]

h, err := generateHistogram(
h, err := s.generateHistogram(
s.EvalCtx.Context,
s.EvalCtx,
s.sr.Get(),
colIdx,
Expand Down Expand Up @@ -368,6 +378,9 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error {
); err != nil {
return err
}

// Release any memory temporarily used for this statistic.
s.tempMemAcc.Clear(ctx)
}

return nil
Expand All @@ -382,7 +395,8 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error {
// generateHistogram returns a histogram (on a given column) from a set of
// samples.
// numRows is the total number of rows from which values were sampled.
func generateHistogram(
func (s *sampleAggregator) generateHistogram(
ctx context.Context,
evalCtx *tree.EvalContext,
samples []stats.SampledRow,
colIdx int,
Expand All @@ -391,15 +405,32 @@ func generateHistogram(
distinctCount int64,
maxBuckets int,
) (stats.HistogramData, error) {
var da sqlbase.DatumAlloc
// Account for the memory we'll use copying the samples into values.
if err := s.tempMemAcc.Grow(ctx, sizeOfDatum*int64(len(samples))); err != nil {
return stats.HistogramData{}, err
}
values := make(tree.Datums, 0, len(samples))
for _, s := range samples {
ed := &s.Row[colIdx]

var da sqlbase.DatumAlloc
for _, sample := range samples {
ed := &sample.Row[colIdx]
// Ignore NULLs (they are counted separately).
if !ed.IsNull() {
beforeSize := ed.Datum.Size()
if err := ed.EnsureDecoded(colType, &da); err != nil {
return stats.HistogramData{}, err
}
afterSize := ed.Datum.Size()

// Perform memory accounting. This memory is not added to the temporary
// account since it won't be released until the sampleAggregator is
// destroyed.
if afterSize > beforeSize {
if err := s.memAcc.Grow(ctx, int64(afterSize-beforeSize)); err != nil {
return stats.HistogramData{}, err
}
}

values = append(values, ed.Datum)
}
}
Expand Down

0 comments on commit 4ca9851

Please sign in to comment.