Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: make aggregate builtins share the same memory account #46700

Merged
merged 1 commit into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ NULL /1 {5} 5
query T
SELECT url FROM [EXPLAIN ANALYZE (DISTSQL) SELECT kv.k, avg(kw.k) FROM kv JOIN kw ON kv.k=kw.k GROUP BY kv.k]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzsmW1P40YQx9_3U6z2Fegc7F3bPFg6KdyVVrkGhxKQSk8RMvE2WHHsdO3wIMR3rxxTAQnev529QwHxjjj5eTyT2d-Mwh3N_o2pR_sH3YOvJ2QmY_Lbce-QfD_466i73_HJvr_fPfv7gGz82umf9P_sbpKHj46vtsYGCa5GG-PrrfFmiY2vyLdexyfja9Lz5x8hn0nxPvn9uHd6RL6czS8OqEGTNBR-MBEZ9b5TRg3KqUFtalCHGtSlA4NOZToUWZbK4iN3c6AT3lDPMmiUTGd5cXlg0GEqBfXuaB7lsaAePQkuYnEsglBI06IGDUUeRPE8zFRGk0DetsdX1KD9aZBkHmmZReDeLPdIu3iMiyAfXoqMpLN8WlwsruWzabxwKROxGObRVZTfesTasopIWR7EMcmjifCIldHBvUFLpEzg_we8uCWXQXb5_NHajA7uB_N7jAT12L2xWsbbFRlfP2ZssvXMmVfm_HifVIZCinDxPp-KwLU-9UL5DoUciW9plAhpsoWOicU_-Uabfdr8LKPRZfnnY-mMNternrgRw1kepUlVBR-rYzfpiP3RSIpRkKfSZO5S0Q3aKwv08PXv-2fnfu_k3D_tdjfarEiwf3q40ebFX197p_7Jw98_NlODyvQ6I1IE4QP3vJcMOgluyERMUnlLZlnxuMwif0RftLrM0esydbG5pVfs_unheacot128OhZJKOS80Uibm2371ZrNrVGkWfJSmV6skJ-20qnJFzpxMRV3ORVXP5XtZ6mw-iZleHaYvGXaa2hStmrOOzWmx5rmzCtzfsXpwd7G9AAd8XR6bP-46bGq7X9ujXSmR-MuA9ODvcfpwevrh9dQrt0ynTXUD1s1590ayl3TnHllzq-oXP42lAs64qlydz6UW63cxl0GlMvfo3Lt-vqxayjXaZnuGuqHrZrzXg3lrmnOvDLnV1Su_TaUCzriqXJ3P5RbrdzGXQaUa79H5Tr19ePUUK7bWkP5sFUzdrFwWyYjQRISRtL8Usg1zJ5XZv-K6nXehnpBbzxV796HeqvV27jLgHqd96he8L-QY5FN0yQTCz9Tv3xnq6igCEeirHiWzuRQHMl0OA9TvuzNufmvP6HI8vJdXr7oJOVbxQPWh3d1YGZr0VqxOYjNGtSMN4N3deCFmjWltWJzEJsv0tZT-jlsLcK2MrSj_rbUMLO0aFeH5lqxOYjtKCsOYFcJ823197WtIxU1DA62GkZSAbRWbCSVHR2pqGFwsNUwkgqgtWIjqewqu3RP3aR7OlJRw0gqgAZnU00jqQAaxGZLk7OJVViTydmUBqcT0EgNCNeLDjeOpfHZaOVQ02juq2m4dABcLzoyBFuaoc_6lTlqR7ClIdpEEoBGlkA4OmxqHHkC4Si6eoNAtNYGAWh0VPV2CITrRYei0FojAI2Oqt4igXC96FAU6l2CgWWCaW0TgIai0NsnAA5FobdRcK2NgmttFIAGRxXQSBQI14uORMG1NgpAg6MKaCQKhOtFhz9kqDcKDjYKrrVRABqJAuHosOltFAhH0ZttFIP7X_4LAAD__1ZuMMc=
https://cockroachdb.github.io/distsqlplan/decode.html#eJzsmW1P40YQx9_3U6z2Fegc7F3bPFg6KdyVVrkGhxKQSk8RMvE2WHHsdO3wIMR3rxxTAQnev529QwHxjjj5eTyT2d-Mwh3N_o2pR_sH3YOvJ2QmY_Lbce-QfD_466i73_HJvr_fPfv7gGz82umf9P_sbpKHj46vtsYGCa5GG-PrrfFmiY2vyLdexyfja9Lz5x8hn0nxPvn9uHd6RL6czS8OqEGTNBR-MBEZ9b5TRg3KqUFtalCHGtSlA4NOZToUWZbK4iN3c6AT3lDPMmiUTGd5cXlg0GEqBfXuaB7lsaAePQkuYnEsglBI06IGDUUeRPE8zFRGk0DetsdX1KD9aZBkHmmZReDeLPdIu3iMiyAfXoqMpLN8WlwsruWzabxwKROxGObRVZTfesTasopIWR7EMcmjifCIldHBvUFLpEzg_we8uCWXQXb5_NHajA7uB_N7jAT12L2xWsbbFRlfP2ZssvXMmVfm_HifVIZCinDxPp-KwLU-9UL5DoUciW9plAhpsoWOicU_-Uabfdr8LKPRZfnnY-mMNternrgRw1kepUlVBR-rYzfpiP3RSIpRkKfSZO5S0Q3aKwv08PXv-2fnfu_k3D_tdjfarEiwf3q40ebFX197p_7Jw98_NlODyvQ6I1IE4QP3vJcMOgluyERMUnlLZlnxuNwif0RftLrM0esydbG5pVfs_unheacot128OhZJKOS80Uibm2371ZrNrVGkWfJSmV6skJ-20qnJFzpxMRV3ORVXP5XtZ6mw-iZleHaYvGXaa2hStmrOOzWmx5rmzCtzfsXpwd7G9AAd8XR6bP-46bGq7X9ujXSmR-MuA9ODvcfpwevrh9dQrt0ynTXUD1s1590ayl3TnHllzq-oXP42lAs64qlydz6UW63cxl0GlMvfo3Lt-vqxayjXaZnuGuqHrZrzXg3lrmnOvDLnV1Su_TaUCzriqXJ3P5RbrdzGXQaUa79H5Tr19ePUUK7bWkP5sFUzdrFwWyYjQRISRtL8Usg1zJ5XZv-K6nXehnpBbzxV796HeqvV27jLgHqd96he8L-QY5FN0yQTCz9Tv3xnq6igCEeirHiWzuRQHMl0OA9TvuzNufmvP6HI8vJdXr7oJOVbxQPWh3d1YGZr0VqxOYjNGtSMN4N3deCFmjWltWJzEJsv0tZT-jlsLcK2MrSj_rbUMLO0aFeH5lqxOYjtKCsOYFcJ823197WtIxU1DA62GkZSAbRWbCSVHR2pqGFwsNUwkgqgtWIjqewqu3RP3aR7OlJRw0gqgAZnU00jqQAaxGZLk7OJVViTydmUBqcT0EgNCNeLDjeOpfHZaOVQ02juq2m4dABcLzoyBFuaoc_6lTlqR7ClIdpEEoBGlkA4OmxqHHkC4Si6eoNAtNYGAWh0VPV2CITrRYei0FojAI2Oqt4igXC96FAU6l2CgWWCaW0TgIai0NsnAA5FobdRcK2NgmttFIAGRxXQSBQI14uORMG1NgpAg6MKaCQKhOtFhz9kqDcKDjYKrrVRABqJAuHosOltFAhH0ZttFIP7X_4LAAD__wDDMMw=

# This query verifies stats collection for the hashJoiner, distinct and sorter.
query T
Expand Down
30 changes: 25 additions & 5 deletions pkg/sql/rowexec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type aggregatorBase struct {
datumAlloc sqlbase.DatumAlloc
rowAlloc sqlbase.EncDatumRowAlloc

bucketsAcc mon.BoundAccount
bucketsAcc mon.BoundAccount
aggFuncsAcc mon.BoundAccount

// isScalar can only be set if there are no groupCols, and it means that we
// will generate a result row even if there are no input rows. Used for
Expand Down Expand Up @@ -108,6 +109,7 @@ func (ag *aggregatorBase) init(
ag.row = make(sqlbase.EncDatumRow, len(spec.Aggregations))
ag.bucketsAcc = memMonitor.MakeBoundAccount()
ag.arena = stringarena.Make(&ag.bucketsAcc)
ag.aggFuncsAcc = memMonitor.MakeBoundAccount()

// Loop over the select expressions and extract any aggregate functions --
// non-aggregation functions are replaced with parser.NewIdentAggregate,
Expand Down Expand Up @@ -336,6 +338,10 @@ func newAggregator(
return nil, err
}

// A new tree.EvalCtx was created during initializing aggregatorBase above
// and will be used only by this aggregator, so it is ok to update EvalCtx
// directly.
ag.EvalCtx.SingleDatumAggMemAccount = &ag.aggFuncsAcc
return ag, nil
}

Expand Down Expand Up @@ -365,6 +371,10 @@ func newOrderedAggregator(
return nil, err
}

// A new tree.EvalCtx was created during initializing aggregatorBase above
// and will be used only by this aggregator, so it is ok to update EvalCtx
// directly.
ag.EvalCtx.SingleDatumAggMemAccount = &ag.aggFuncsAcc
return ag, nil
}

Expand All @@ -389,7 +399,6 @@ func (ag *aggregatorBase) start(ctx context.Context, procName string) context.Co
func (ag *hashAggregator) close() {
if ag.InternalClose() {
log.VEventf(ag.Ctx, 2, "exiting aggregator")
ag.bucketsAcc.Close(ag.Ctx)
// If we have started emitting rows, bucketsIter will represent which
// buckets are still open, since buckets are closed once their results are
// emitted.
Expand All @@ -402,17 +411,28 @@ func (ag *hashAggregator) close() {
ag.buckets[bucket].close(ag.Ctx)
}
}
// Note that we should be closing accounts only after closing all the
// buckets since the latter might be releasing some precisely tracked
// memory, and if we were to close the accounts first, there would be
// no memory to release for the buckets.
ag.bucketsAcc.Close(ag.Ctx)
ag.aggFuncsAcc.Close(ag.Ctx)
ag.MemMonitor.Stop(ag.Ctx)
}
}

func (ag *orderedAggregator) close() {
if ag.InternalClose() {
log.VEventf(ag.Ctx, 2, "exiting aggregator")
ag.bucketsAcc.Close(ag.Ctx)
if ag.bucket != nil {
ag.bucket.close(ag.Ctx)
}
// Note that we should be closing accounts only after closing the
// bucket since the latter might be releasing some precisely tracked
// memory, and if we were to close the accounts first, there would be
// no memory to release for the bucket.
ag.bucketsAcc.Close(ag.Ctx)
ag.aggFuncsAcc.Close(ag.Ctx)
ag.MemMonitor.Stop(ag.Ctx)
}
}
Expand All @@ -423,7 +443,7 @@ func (ag *orderedAggregator) close() {
func (ag *aggregatorBase) matchLastOrdGroupCols(row sqlbase.EncDatumRow) (bool, error) {
for _, colIdx := range ag.orderedGroupCols {
res, err := ag.lastOrdGroupCols[colIdx].Compare(
&ag.inputTypes[colIdx], &ag.datumAlloc, ag.FlowCtx.EvalCtx, &row[colIdx],
&ag.inputTypes[colIdx], &ag.datumAlloc, ag.EvalCtx, &row[colIdx],
)
if res != 0 || err != nil {
return false, err
Expand Down Expand Up @@ -937,7 +957,7 @@ func (ag *aggregatorBase) createAggregateFuncs() (aggregateFuncs, error) {
}
bucket := make(aggregateFuncs, len(ag.funcs))
for i, f := range ag.funcs {
agg := f.create(ag.FlowCtx.EvalCtx, f.arguments)
agg := f.create(ag.EvalCtx, f.arguments)
if err := ag.bucketsAcc.Grow(ag.Ctx, agg.Size()); err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/rowexec/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func newWindower(
}

w.acc = w.MemMonitor.MakeBoundAccount()
// If we have aggregate builtins that aggregate a single datum, we want
// them to reuse the same shared memory account with the windower.
evalCtx.SingleDatumAggMemAccount = &w.acc

if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
w.input = newInputStatCollector(w.input)
Expand Down
Loading