Skip to content

Commit

Permalink
sql: make aggregate builtins share the same memory account
Browse files Browse the repository at this point in the history
Release justification: fixes for high-priority or high-severity
bugs in existing functionality (we could hit memory limit due to
accounting long before the available RAM is actually used up).

We recently fixed a couple of leaks in memory accounting by aggregate
builtins. It was done in the same way that other similar aggregates were
doing - by instantiating a separate memory account for each aggregate
struct. However, when a single aggregate, say `anyNotNull`, wants to
store a single datum, say `DInt` of size 8 bytes, when growing its own
memory account will actually make a reservation of
`mon.DefaultPoolAllocation = 10240` bytes although we will only be using
8 bytes. This can result in "memory-starvation" for OLAP-y queries
because we're likely to hit `max-sql-memory` limit long before we're
getting close to it because of such "overestimation" in the accounting.

This commit fixes this problem by making all aggregates that aggregate
a single datum (these are all aggregate builtins that perform memory
accounting except for `arrayAgg` which works with multiple datums) to
share the same memory account (when non-nil) which is plumbed via
`tree.EvalContext` (this is unfortunate but, as always, seems like
necessary evil). That account is instantiated by `rowexec.aggregator`
and `rowexec.windower` processors. Also it is acceptable from the
concurrency's point of view because the processors run in a single
goroutine, so we will never have concurrent calls to grow/shrink this
shared memory account.

If for some reason the field for shared memory account is nil in the
eval context, then we will fallback to old behavior of instantiating
a memory account for each aggregate builtin struct. A helper struct was
created (that is now embedded by all aggregate builtins in question)
that unifies the memory accounting.

Release note: None (a follow-up to a recent PR).
  • Loading branch information
yuzefovich committed Mar 31, 2020
1 parent 0e9f07a commit fdbb7f7
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ NULL /1 {5} 5
query T
SELECT url FROM [EXPLAIN ANALYZE (DISTSQL) SELECT kv.k, avg(kw.k) FROM kv JOIN kw ON kv.k=kw.k GROUP BY kv.k]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzsmW1P40YQx9_3U6z2Fegc7F3bPFg6KdyVVrkGhxKQSk8RMvE2WHHsdO3wIMR3rxxTAQnev529QwHxjjj5eTyT2d-Mwh3N_o2pR_sH3YOvJ2QmY_Lbce-QfD_466i73_HJvr_fPfv7gGz82umf9P_sbpKHj46vtsYGCa5GG-PrrfFmiY2vyLdexyfja9Lz5x8hn0nxPvn9uHd6RL6czS8OqEGTNBR-MBEZ9b5TRg3KqUFtalCHGtSlA4NOZToUWZbK4iN3c6AT3lDPMmiUTGd5cXlg0GEqBfXuaB7lsaAePQkuYnEsglBI06IGDUUeRPE8zFRGk0DetsdX1KD9aZBkHmmZReDeLPdIu3iMiyAfXoqMpLN8WlwsruWzabxwKROxGObRVZTfesTasopIWR7EMcmjifCIldHBvUFLpEzg_we8uCWXQXb5_NHajA7uB_N7jAT12L2xWsbbFRlfP2ZssvXMmVfm_HifVIZCinDxPp-KwLU-9UL5DoUciW9plAhpsoWOicU_-Uabfdr8LKPRZfnnY-mMNternrgRw1kepUlVBR-rYzfpiP3RSIpRkKfSZO5S0Q3aKwv08PXv-2fnfu_k3D_tdjfarEiwf3q40ebFX197p_7Jw98_NlODyvQ6I1IE4QP3vJcMOgluyERMUnlLZlnxuMwif0RftLrM0esydbG5pVfs_unheacot128OhZJKOS80Uibm2371ZrNrVGkWfJSmV6skJ-20qnJFzpxMRV3ORVXP5XtZ6mw-iZleHaYvGXaa2hStmrOOzWmx5rmzCtzfsXpwd7G9AAd8XR6bP-46bGq7X9ujXSmR-MuA9ODvcfpwevrh9dQrt0ynTXUD1s1590ayl3TnHllzq-oXP42lAs64qlydz6UW63cxl0GlMvfo3Lt-vqxayjXaZnuGuqHrZrzXg3lrmnOvDLnV1Su_TaUCzriqXJ3P5RbrdzGXQaUa79H5Tr19ePUUK7bWkP5sFUzdrFwWyYjQRISRtL8Usg1zJ5XZv-K6nXehnpBbzxV796HeqvV27jLgHqd96he8L-QY5FN0yQTCz9Tv3xnq6igCEeirHiWzuRQHMl0OA9TvuzNufmvP6HI8vJdXr7oJOVbxQPWh3d1YGZr0VqxOYjNGtSMN4N3deCFmjWltWJzEJsv0tZT-jlsLcK2MrSj_rbUMLO0aFeH5lqxOYjtKCsOYFcJ823197WtIxU1DA62GkZSAbRWbCSVHR2pqGFwsNUwkgqgtWIjqewqu3RP3aR7OlJRw0gqgAZnU00jqQAaxGZLk7OJVViTydmUBqcT0EgNCNeLDjeOpfHZaOVQ02juq2m4dABcLzoyBFuaoc_6lTlqR7ClIdpEEoBGlkA4OmxqHHkC4Si6eoNAtNYGAWh0VPV2CITrRYei0FojAI2Oqt4igXC96FAU6l2CgWWCaW0TgIai0NsnAA5FobdRcK2NgmttFIAGRxXQSBQI14uORMG1NgpAg6MKaCQKhOtFhz9kqDcKDjYKrrVRABqJAuHosOltFAhH0ZttFIP7X_4LAAD__1ZuMMc=
https://cockroachdb.github.io/distsqlplan/decode.html#eJzsmW1P40YQx9_3U6z2Fegc7F3bPFg6KdyVVrkGhxKQSk8RMvE2WHHsdO3wIMR3rxxTAQnev529QwHxjjj5eTyT2d-Mwh3N_o2pR_sH3YOvJ2QmY_Lbce-QfD_466i73_HJvr_fPfv7gGz82umf9P_sbpKHj46vtsYGCa5GG-PrrfFmiY2vyLdexyfja9Lz5x8hn0nxPvn9uHd6RL6czS8OqEGTNBR-MBEZ9b5TRg3KqUFtalCHGtSlA4NOZToUWZbK4iN3c6AT3lDPMmiUTGd5cXlg0GEqBfXuaB7lsaAePQkuYnEsglBI06IGDUUeRPE8zFRGk0DetsdX1KD9aZBkHmmZReDeLPdIu3iMiyAfXoqMpLN8WlwsruWzabxwKROxGObRVZTfesTasopIWR7EMcmjifCIldHBvUFLpEzg_we8uCWXQXb5_NHajA7uB_N7jAT12L2xWsbbFRlfP2ZssvXMmVfm_HifVIZCinDxPp-KwLU-9UL5DoUciW9plAhpsoWOicU_-Uabfdr8LKPRZfnnY-mMNternrgRw1kepUlVBR-rYzfpiP3RSIpRkKfSZO5S0Q3aKwv08PXv-2fnfu_k3D_tdjfarEiwf3q40ebFX197p_7Jw98_NlODyvQ6I1IE4QP3vJcMOgluyERMUnlLZlnxuNwif0RftLrM0esydbG5pVfs_unheacot128OhZJKOS80Uibm2371ZrNrVGkWfJSmV6skJ-20qnJFzpxMRV3ORVXP5XtZ6mw-iZleHaYvGXaa2hStmrOOzWmx5rmzCtzfsXpwd7G9AAd8XR6bP-46bGq7X9ujXSmR-MuA9ODvcfpwevrh9dQrt0ynTXUD1s1590ayl3TnHllzq-oXP42lAs64qlydz6UW63cxl0GlMvfo3Lt-vqxayjXaZnuGuqHrZrzXg3lrmnOvDLnV1Su_TaUCzriqXJ3P5RbrdzGXQaUa79H5Tr19ePUUK7bWkP5sFUzdrFwWyYjQRISRtL8Usg1zJ5XZv-K6nXehnpBbzxV796HeqvV27jLgHqd96he8L-QY5FN0yQTCz9Tv3xnq6igCEeirHiWzuRQHMl0OA9TvuzNufmvP6HI8vJdXr7oJOVbxQPWh3d1YGZr0VqxOYjNGtSMN4N3deCFmjWltWJzEJsv0tZT-jlsLcK2MrSj_rbUMLO0aFeH5lqxOYjtKCsOYFcJ823197WtIxU1DA62GkZSAbRWbCSVHR2pqGFwsNUwkgqgtWIjqewqu3RP3aR7OlJRw0gqgAZnU00jqQAaxGZLk7OJVViTydmUBqcT0EgNCNeLDjeOpfHZaOVQ02juq2m4dABcLzoyBFuaoc_6lTlqR7ClIdpEEoBGlkA4OmxqHHkC4Si6eoNAtNYGAWh0VPV2CITrRYei0FojAI2Oqt4igXC96FAU6l2CgWWCaW0TgIai0NsnAA5FobdRcK2NgmttFIAGRxXQSBQI14uORMG1NgpAg6MKaCQKhOtFhz9kqDcKDjYKrrVRABqJAuHosOltFAhH0ZttFIP7X_4LAAD__wDDMMw=

# This query verifies stats collection for the hashJoiner, distinct and sorter.
query T
Expand Down
18 changes: 15 additions & 3 deletions pkg/sql/rowexec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type aggregatorBase struct {
datumAlloc sqlbase.DatumAlloc
rowAlloc sqlbase.EncDatumRowAlloc

bucketsAcc mon.BoundAccount
bucketsAcc mon.BoundAccount
aggFuncsAcc mon.BoundAccount

// isScalar can only be set if there are no groupCols, and it means that we
// will generate a result row even if there are no input rows. Used for
Expand Down Expand Up @@ -108,6 +109,7 @@ func (ag *aggregatorBase) init(
ag.row = make(sqlbase.EncDatumRow, len(spec.Aggregations))
ag.bucketsAcc = memMonitor.MakeBoundAccount()
ag.arena = stringarena.Make(&ag.bucketsAcc)
ag.aggFuncsAcc = memMonitor.MakeBoundAccount()

// Loop over the select expressions and extract any aggregate functions --
// non-aggregation functions are replaced with parser.NewIdentAggregate,
Expand Down Expand Up @@ -336,6 +338,10 @@ func newAggregator(
return nil, err
}

// A new tree.EvalCtx was created during initializing aggregatorBase above
// and will be used only by this aggregator, so it is ok to update EvalCtx
// directly.
ag.EvalCtx.SingleDatumAggMemAccount = &ag.aggFuncsAcc
return ag, nil
}

Expand Down Expand Up @@ -365,6 +371,10 @@ func newOrderedAggregator(
return nil, err
}

// A new tree.EvalCtx was created during initializing aggregatorBase above
// and will be used only by this aggregator, so it is ok to update EvalCtx
// directly.
ag.EvalCtx.SingleDatumAggMemAccount = &ag.aggFuncsAcc
return ag, nil
}

Expand All @@ -390,6 +400,7 @@ func (ag *hashAggregator) close() {
if ag.InternalClose() {
log.VEventf(ag.Ctx, 2, "exiting aggregator")
ag.bucketsAcc.Close(ag.Ctx)
ag.aggFuncsAcc.Close(ag.Ctx)
// If we have started emitting rows, bucketsIter will represent which
// buckets are still open, since buckets are closed once their results are
// emitted.
Expand All @@ -410,6 +421,7 @@ func (ag *orderedAggregator) close() {
if ag.InternalClose() {
log.VEventf(ag.Ctx, 2, "exiting aggregator")
ag.bucketsAcc.Close(ag.Ctx)
ag.aggFuncsAcc.Close(ag.Ctx)
if ag.bucket != nil {
ag.bucket.close(ag.Ctx)
}
Expand All @@ -423,7 +435,7 @@ func (ag *orderedAggregator) close() {
func (ag *aggregatorBase) matchLastOrdGroupCols(row sqlbase.EncDatumRow) (bool, error) {
for _, colIdx := range ag.orderedGroupCols {
res, err := ag.lastOrdGroupCols[colIdx].Compare(
&ag.inputTypes[colIdx], &ag.datumAlloc, ag.FlowCtx.EvalCtx, &row[colIdx],
&ag.inputTypes[colIdx], &ag.datumAlloc, ag.EvalCtx, &row[colIdx],
)
if res != 0 || err != nil {
return false, err
Expand Down Expand Up @@ -937,7 +949,7 @@ func (ag *aggregatorBase) createAggregateFuncs() (aggregateFuncs, error) {
}
bucket := make(aggregateFuncs, len(ag.funcs))
for i, f := range ag.funcs {
agg := f.create(ag.FlowCtx.EvalCtx, f.arguments)
agg := f.create(ag.EvalCtx, f.arguments)
if err := ag.bucketsAcc.Grow(ag.Ctx, agg.Size()); err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/rowexec/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func newWindower(
}

w.acc = w.MemMonitor.MakeBoundAccount()
// If we have aggregate builtins that aggregate a single datum, we want
// them to reuse the same shared memory account with the windower.
evalCtx.SingleDatumAggMemAccount = &w.acc

if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
w.input = newInputStatCollector(w.input)
Expand Down
Loading

0 comments on commit fdbb7f7

Please sign in to comment.