Skip to content

Commit

Permalink
Merge pull request cockroachdb#20590 from petermattis/pmattis/benchma…
Browse files Browse the repository at this point in the history
…rk-bound-account-grow

util/mon: reserve extra bytes for allocation in BytesAccount
  • Loading branch information
petermattis authored Dec 12, 2017
2 parents 4deb6b7 + e350223 commit 729388e
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 233 deletions.
4 changes: 2 additions & 2 deletions pkg/sql/builtin_mem_usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ func TestBuiltinsAccountForMemory(t *testing.T) {
evalCtx := tree.NewTestingEvalContext()
defer evalCtx.Stop(context.Background())
defer evalCtx.ActiveMemAcc.Close(context.Background())
previouslyAllocated := evalCtx.Mon.GetCurrentAllocationForTesting()
previouslyAllocated := evalCtx.ActiveMemAcc.Used()
_, err := test.builtin.Fn(evalCtx, test.args)
if err != nil {
t.Fatal(err)
}
deltaAllocated := evalCtx.Mon.GetCurrentAllocationForTesting() - previouslyAllocated
deltaAllocated := evalCtx.ActiveMemAcc.Used() - previouslyAllocated
if deltaAllocated != test.expectedAllocation {
t.Errorf("Expected to allocate %d, actually allocated %d", test.expectedAllocation, deltaAllocated)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (n *distinctNode) Next(params runParams) (bool, error) {
n.run.suffixMemAcc.Clear(ctx)
n.run.suffixSeen = make(map[string]struct{})
}
if err := n.run.prefixMemAcc.ResizeItem(
if err := n.run.prefixMemAcc.Resize(
ctx, int64(len(n.run.prefixSeen)), int64(len(prefix))); err != nil {
return false, err
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,13 +485,9 @@ func (e *Executor) Prepare(
prepared := &PreparedStatement{
TypeHints: placeholderHints,
portalNames: make(map[string]struct{}),
memAcc: session.sessionMon.MakeBoundAccount(),
}

// We need a memory account available in order to prepare a statement, since we
// might need to allocate memory for constant-folded values in the process of
// planning it.
prepared.constantAcc = session.mon.MakeBoundAccount()

if stmt.AST == nil {
return prepared, nil
}
Expand Down Expand Up @@ -529,7 +525,7 @@ func (e *Executor) Prepare(
planner := session.newPlanner(e, txn)
planner.semaCtx.Placeholders.SetTypeHints(placeholderHints)
planner.evalCtx.PrepareOnly = true
planner.evalCtx.ActiveMemAcc = &prepared.constantAcc
planner.evalCtx.ActiveMemAcc = &prepared.memAcc

if protoTS != nil {
planner.asOfSystemTime = true
Expand All @@ -554,7 +550,12 @@ func (e *Executor) Prepare(
if plan == nil {
return prepared, nil
}
defer plan.Close(session.Ctx())
defer func() {
plan.Close(session.Ctx())
// NB: if we start caching the plan, we'll want to keep around the memory
// account used for the plan, rather than clearing it.
prepared.memAcc.Clear(session.Ctx())
}()
prepared.Columns = planColumns(plan)
for _, c := range prepared.Columns {
if err := checkResultType(c.Typ); err != nil {
Expand Down
35 changes: 16 additions & 19 deletions pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,11 @@ type PreparedStatement struct {

ProtocolMeta interface{} // a field for protocol implementations to hang metadata off of.

memAcc WrappableMemoryAccount
// constantAcc handles the allocation of various constant-folded values which
// are generated while planning the statement.
constantAcc mon.BoundAccount
memAcc mon.BoundAccount
}

func (p *PreparedStatement) close(ctx context.Context, s *Session) {
p.memAcc.Wsession(s).Close(ctx)
p.constantAcc.Close(ctx)
func (p *PreparedStatement) close(ctx context.Context) {
p.memAcc.Close(ctx)
}

// PreparedStatements is a mapping of PreparedStatement names to their
Expand Down Expand Up @@ -135,12 +131,12 @@ func (ps PreparedStatements) New(
// statement name. When we start storing the prepared query plan
// during prepare, this should be tallied up to the monitor as well.
sz := int64(uintptr(len(name)+len(stmtStr)) + unsafe.Sizeof(*pStmt))
if err := pStmt.memAcc.Wsession(ps.session).OpenAndInit(ps.session.Ctx(), sz); err != nil {
if err := pStmt.memAcc.Grow(ps.session.Ctx(), sz); err != nil {
return nil, err
}

if prevStmt, ok := ps.Get(name); ok {
prevStmt.close(ps.session.Ctx(), ps.session)
prevStmt.close(ps.session.Ctx())
}

pStmt.Str = stmtStr
Expand All @@ -156,11 +152,11 @@ func (ps PreparedStatements) Delete(ctx context.Context, name string) bool {
for portalName := range stmt.portalNames {
if portal, ok := ps.session.PreparedPortals.Get(name); ok {
delete(ps.session.PreparedPortals.portals, portalName)
portal.memAcc.Wsession(ps.session).Close(ctx)
portal.memAcc.Close(ctx)
}
}
}
stmt.close(ctx, ps.session)
stmt.close(ctx)
delete(ps.stmts, name)
return true
}
Expand All @@ -170,10 +166,10 @@ func (ps PreparedStatements) Delete(ctx context.Context, name string) bool {
// closeAll de-registers all statements and portals from the monitor.
func (ps PreparedStatements) closeAll(ctx context.Context, s *Session) {
for _, stmt := range ps.stmts {
stmt.close(ctx, s)
stmt.close(ctx)
}
for _, portal := range s.PreparedPortals.portals {
portal.memAcc.Wsession(s).Close(ctx)
portal.memAcc.Close(ctx)
}
}

Expand Down Expand Up @@ -202,7 +198,7 @@ type PreparedPortal struct {

ProtocolMeta interface{} // a field for protocol implementations to hang metadata off of.

memAcc WrappableMemoryAccount
memAcc mon.BoundAccount
}

// PreparedPortals is a mapping of PreparedPortal names to their corresponding
Expand Down Expand Up @@ -237,18 +233,19 @@ func (pp PreparedPortals) New(
ctx context.Context, name string, stmt *PreparedStatement, qargs tree.QueryArguments,
) (*PreparedPortal, error) {
portal := &PreparedPortal{
Stmt: stmt,
Qargs: qargs,
Stmt: stmt,
Qargs: qargs,
memAcc: pp.session.mon.MakeBoundAccount(),
}
sz := int64(uintptr(len(name)) + unsafe.Sizeof(*portal))
if err := portal.memAcc.Wsession(pp.session).OpenAndInit(ctx, sz); err != nil {
if err := portal.memAcc.Grow(ctx, sz); err != nil {
return nil, err
}

stmt.portalNames[name] = struct{}{}

if prevPortal, ok := pp.Get(name); ok {
prevPortal.memAcc.Wsession(pp.session).Close(ctx)
prevPortal.memAcc.Close(ctx)
}

pp.portals[name] = portal
Expand All @@ -260,7 +257,7 @@ func (pp PreparedPortals) New(
func (pp PreparedPortals) Delete(ctx context.Context, name string) bool {
if portal, ok := pp.Get(name); ok {
delete(portal.Stmt.portalNames, name)
portal.memAcc.Wsession(pp.session).Close(ctx)
portal.memAcc.Close(ctx)
delete(pp.portals, name)
return true
}
Expand Down
34 changes: 0 additions & 34 deletions pkg/sql/session_mem_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,10 @@ package sql
import (
"math"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

// WrappableMemoryAccount encapsulates a MemoryAccount to
// give it the Wsession() method below.
type WrappableMemoryAccount struct {
acc mon.BytesAccount
}

// Wsession captures the current session monitor pointer so it can be provided
// transparently to the other Account APIs below.
func (w *WrappableMemoryAccount) Wsession(s *Session) WrappedMemoryAccount {
return WrappedMemoryAccount{
acc: &w.acc,
mon: &s.sessionMon,
}
}

// WrappedMemoryAccount is the transient structure that carries
// the extra argument to the MemoryAccount APIs.
type WrappedMemoryAccount struct {
acc *mon.BytesAccount
mon *mon.BytesMonitor
}

// OpenAndInit interfaces between Session and mon.MemoryMonitor.
func (w WrappedMemoryAccount) OpenAndInit(ctx context.Context, initialAllocation int64) error {
return w.mon.OpenAndInitAccount(ctx, w.acc, initialAllocation)
}

// Close interfaces between Session and mon.MemoryMonitor.
func (w WrappedMemoryAccount) Close(ctx context.Context) {
w.mon.CloseAccount(ctx, w.acc)
}

// noteworthyMemoryUsageBytes is the minimum size tracked by a
// transaction or session monitor before the monitor starts explicitly
// logging overall usage growth in the log.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sqlbase/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (c *RowContainer) Replace(ctx context.Context, i int, newRow tree.Datums) e
row := c.At(i)
oldSz := c.rowSize(row)
if newSz != oldSz {
if err := c.memAcc.ResizeItem(ctx, oldSz, newSz); err != nil {
if err := c.memAcc.Resize(ctx, oldSz, newSz); err != nil {
return err
}
}
Expand All @@ -341,5 +341,5 @@ func (c *RowContainer) Replace(ctx context.Context, i int, newRow tree.Datums) e

// MemUsage returns the current accounted memory usage.
func (c *RowContainer) MemUsage() int64 {
return c.memAcc.CurrentlyAllocated()
return c.memAcc.Used()
}
Loading

0 comments on commit 729388e

Please sign in to comment.