Skip to content

Commit

Permalink
Merge #47106
Browse files Browse the repository at this point in the history
47106: rowexec: account for some additional memory used by stats collection r=rytaft a=rytaft

Prior to this commit, we did not account for the memory used in the
`sampleAggregator` when we copy all samples into a new slice before
generating a histogram. This commit adds some additional memory
accounting for this overhead.

Informs #45812

Release note: None

Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
craig[bot] and rytaft committed Apr 7, 2020
2 parents c257a8e + 11fff37 commit 17b1ba4
Showing 1 changed file with 38 additions and 7 deletions.
45 changes: 38 additions & 7 deletions pkg/sql/rowexec/sample_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ type sampleAggregator struct {

spec *execinfrapb.SampleAggregatorSpec
input execinfra.RowSource
memAcc mon.BoundAccount
inTypes []types.T
sr stats.SampleReservoir

// memAcc accounts for memory accumulated throughout the life of the
// sampleAggregator.
memAcc mon.BoundAccount

// tempMemAcc is used to account for memory that is allocated temporarily
// and released before the sampleAggregator is finished.
tempMemAcc mon.BoundAccount

tableID sqlbase.ID
sampledCols []sqlbase.ColumnID
sketches []sketchInfo
Expand Down Expand Up @@ -97,8 +104,9 @@ func newSampleAggregator(
s := &sampleAggregator{
spec: spec,
input: input,
memAcc: memMonitor.MakeBoundAccount(),
inTypes: input.OutputTypes(),
memAcc: memMonitor.MakeBoundAccount(),
tempMemAcc: memMonitor.MakeBoundAccount(),
tableID: spec.TableID,
sampledCols: spec.SampledColumnIDs,
sketches: make([]sketchInfo, len(spec.Sketches)),
Expand Down Expand Up @@ -161,6 +169,7 @@ func (s *sampleAggregator) Run(ctx context.Context) {
func (s *sampleAggregator) close() {
if s.InternalClose() {
s.memAcc.Close(s.Ctx)
s.tempMemAcc.Close(s.Ctx)
s.MemMonitor.Stop(s.Ctx)
}
}
Expand Down Expand Up @@ -322,7 +331,8 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error {
colIdx := int(si.spec.Columns[0])
typ := &s.inTypes[colIdx]

h, err := generateHistogram(
h, err := s.generateHistogram(
ctx,
s.EvalCtx,
s.sr.Get(),
colIdx,
Expand Down Expand Up @@ -368,6 +378,9 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error {
); err != nil {
return err
}

// Release any memory temporarily used for this statistic.
s.tempMemAcc.Clear(ctx)
}

return nil
Expand All @@ -382,7 +395,8 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error {
// generateHistogram returns a histogram (on a given column) from a set of
// samples.
// numRows is the total number of rows from which values were sampled.
func generateHistogram(
func (s *sampleAggregator) generateHistogram(
ctx context.Context,
evalCtx *tree.EvalContext,
samples []stats.SampledRow,
colIdx int,
Expand All @@ -391,15 +405,32 @@ func generateHistogram(
distinctCount int64,
maxBuckets int,
) (stats.HistogramData, error) {
var da sqlbase.DatumAlloc
// Account for the memory we'll use copying the samples into values.
if err := s.tempMemAcc.Grow(ctx, sizeOfDatum*int64(len(samples))); err != nil {
return stats.HistogramData{}, err
}
values := make(tree.Datums, 0, len(samples))
for _, s := range samples {
ed := &s.Row[colIdx]

var da sqlbase.DatumAlloc
for _, sample := range samples {
ed := &sample.Row[colIdx]
// Ignore NULLs (they are counted separately).
if !ed.IsNull() {
beforeSize := ed.Datum.Size()
if err := ed.EnsureDecoded(colType, &da); err != nil {
return stats.HistogramData{}, err
}
afterSize := ed.Datum.Size()

// Perform memory accounting. This memory is not added to the temporary
// account since it won't be released until the sampleAggregator is
// destroyed.
if afterSize > beforeSize {
if err := s.memAcc.Grow(ctx, int64(afterSize-beforeSize)); err != nil {
return stats.HistogramData{}, err
}
}

values = append(values, ed.Datum)
}
}
Expand Down

0 comments on commit 17b1ba4

Please sign in to comment.