Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: sql: add prepared_statements_cache_size setting #99254

Merged
merged 1 commit into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 147 additions & 18 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ func (s *Server) newConnExecutor(
mon.MemoryResource,
memMetrics.SessionPreparedCurBytesCount,
memMetrics.SessionPreparedMaxBytesHist,
-1 /* increment */, noteworthyMemoryUsageBytes, s.cfg.Settings,
1024 /* increment */, noteworthyMemoryUsageBytes, s.cfg.Settings,
)
// The txn monitor is started in txnState.resetForNewSQLTxn().
txnMon := mon.NewMonitor(
Expand Down Expand Up @@ -1046,12 +1046,14 @@ func (s *Server) newConnExecutor(
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionInit, timeutil.Now())

ex.extraTxnState.prepStmtsNamespace = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]PreparedPortal),
prepStmts: make(map[string]*PreparedStatement),
prepStmtsLRU: make(map[string]struct{ prev, next string }),
portals: make(map[string]PreparedPortal),
}
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]PreparedPortal),
prepStmts: make(map[string]*PreparedStatement),
prepStmtsLRU: make(map[string]struct{ prev, next string }),
portals: make(map[string]PreparedPortal),
}
ex.extraTxnState.prepStmtsNamespaceMemAcc = ex.sessionMon.MakeBoundAccount()
dsdp := catsessiondata.NewDescriptorSessionDataStackProvider(sdMutIterator.sds)
Expand Down Expand Up @@ -1646,27 +1648,127 @@ type prepStmtNamespace struct {
// prepStmts contains the prepared statements currently available on the
// session.
prepStmts map[string]*PreparedStatement
// prepStmtsLRU is a circular doubly-linked list containing the prepared
// statement names ordered by most recent access (needed to determine
// evictions when prepared_statements_cache_size is set). There is a special
// entry for the empty string which is both the head and tail of the
// list. (Consequently, if it exists, the actual prepared statement for the
// empty string does not have an entry in this list and cannot be evicted.)
prepStmtsLRU map[string]struct{ prev, next string }
// prepStmtsLRUAlloc is the total amount of memory allocated for prepared
// statements in prepStmtsLRU. This will sometimes be less than
// ex.sessionPreparedMon.AllocBytes() because refcounting causes us to hold
// onto more PreparedStatements than are currently in the LRU list.
prepStmtsLRUAlloc int64
// portals contains the portals currently available on the session. Note
// that PreparedPortal.accountForCopy needs to be called if a copy of a
// PreparedPortal is retained.
portals map[string]PreparedPortal
}

// HasActivePortals returns true if there are portals in the session.
func (ns prepStmtNamespace) HasActivePortals() bool {
func (ns *prepStmtNamespace) HasActivePortals() bool {
return len(ns.portals) > 0
}

// HasPortal returns true if there exists a given named portal in the session.
func (ns prepStmtNamespace) HasPortal(s string) bool {
func (ns *prepStmtNamespace) HasPortal(s string) bool {
_, ok := ns.portals[s]
return ok
}

const prepStmtsLRUHead = ""
const prepStmtsLRUTail = ""

// addLRUEntry adds a new prepared statement name to the LRU list. It is an
// error to re-add an existing name to the LRU list.
func (ns *prepStmtNamespace) addLRUEntry(name string, alloc int64) {
if name == prepStmtsLRUHead {
return
}
if _, ok := ns.prepStmtsLRU[name]; ok {
// Assert that we're not re-adding an existing name to the LRU list.
panic(errors.AssertionFailedf(
"prepStmtsLRU unexpected existing entry (%s): %v", name, ns.prepStmtsLRU,
))
}
var this struct{ prev, next string }
this.prev = prepStmtsLRUHead
// Note: must do this serially in case head and next are the same entry.
head := ns.prepStmtsLRU[this.prev]
this.next = head.next
head.next = name
ns.prepStmtsLRU[prepStmtsLRUHead] = head
next, ok := ns.prepStmtsLRU[this.next]
if !ok || next.prev != prepStmtsLRUHead {
// Assert that the chain isn't broken before we modify it.
panic(errors.AssertionFailedf(
"prepStmtsLRU head entry not correct (%s): %v", this.next, ns.prepStmtsLRU,
))
}
next.prev = name
ns.prepStmtsLRU[this.next] = next
ns.prepStmtsLRU[name] = this
ns.prepStmtsLRUAlloc += alloc
}

// delLRUEntry removes a prepared statement name from the LRU list. (It is not an
// error to remove a non-existent prepared statement.)
func (ns *prepStmtNamespace) delLRUEntry(name string, alloc int64) {
if name == prepStmtsLRUHead {
return
}
this, ok := ns.prepStmtsLRU[name]
if !ok {
// Not an error to remove a non-existent prepared statement.
return
}
// Note: must do this serially in case prev and next are the same entry.
prev, ok := ns.prepStmtsLRU[this.prev]
if !ok || prev.next != name {
// Assert that the chain isn't broken before we modify it.
panic(errors.AssertionFailedf(
"prepStmtsLRU prev entry not correct (%s): %v", this.prev, ns.prepStmtsLRU,
))
}
prev.next = this.next
ns.prepStmtsLRU[this.prev] = prev
next, ok := ns.prepStmtsLRU[this.next]
if !ok || next.prev != name {
// Assert that the chain isn't broken before we modify it.
panic(errors.AssertionFailedf(
"prepStmtsLRU next entry not correct (%s): %v", this.next, ns.prepStmtsLRU,
))
}
next.prev = this.prev
ns.prepStmtsLRU[this.next] = next
delete(ns.prepStmtsLRU, name)
ns.prepStmtsLRUAlloc -= alloc
}

// touchLRUEntry moves an existing prepared statement to the front of the LRU
// list.
func (ns *prepStmtNamespace) touchLRUEntry(name string) {
if name == prepStmtsLRUHead {
return
}
if ns.prepStmtsLRU[prepStmtsLRUHead].next == name {
// Already at the front of the list.
return
}
ns.delLRUEntry(name, 0)
ns.addLRUEntry(name, 0)
}

// MigratablePreparedStatements returns a mapping of all prepared statements.
func (ns prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.MigratableSession_PreparedStatement {
func (ns *prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.MigratableSession_PreparedStatement {
ret := make([]sessiondatapb.MigratableSession_PreparedStatement, 0, len(ns.prepStmts))
for name, stmt := range ns.prepStmts {

// Serialize prepared statements from least-recently used to most-recently
// used, so that we build the LRU list correctly when deserializing.
for e, ok := ns.prepStmtsLRU[prepStmtsLRUTail]; ok && e.prev != prepStmtsLRUHead; e, ok = ns.prepStmtsLRU[e.prev] {
name := e.prev
stmt := ns.prepStmts[name]
ret = append(
ret,
sessiondatapb.MigratableSession_PreparedStatement{
Expand All @@ -1676,15 +1778,32 @@ func (ns prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.Migra
},
)
}
// Finally, serialize the anonymous prepared statement (if it exists).
if stmt, ok := ns.prepStmts[""]; ok {
ret = append(
ret,
sessiondatapb.MigratableSession_PreparedStatement{
Name: "",
PlaceholderTypeHints: stmt.InferredTypes,
SQL: stmt.SQL,
},
)
}

return ret
}

func (ns prepStmtNamespace) String() string {
func (ns *prepStmtNamespace) String() string {
var sb strings.Builder
sb.WriteString("Prep stmts: ")
for name := range ns.prepStmts {
sb.WriteString(name + " ")
// Put the anonymous prepared statement first (if it exists).
if _, ok := ns.prepStmts[""]; ok {
sb.WriteString("\"\" ")
}
for e, ok := ns.prepStmtsLRU[prepStmtsLRUHead]; ok && e.next != prepStmtsLRUTail; e, ok = ns.prepStmtsLRU[e.next] {
sb.WriteString(e.next + " ")
}
fmt.Fprintf(&sb, "LRU alloc: %d ", ns.prepStmtsLRUAlloc)
sb.WriteString("Portals: ")
for name := range ns.portals {
sb.WriteString(name + " ")
Expand All @@ -1697,7 +1816,7 @@ func (ns *prepStmtNamespace) resetToEmpty(
ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount,
) {
// No errors could occur since we're releasing the resources.
_ = ns.resetTo(ctx, prepStmtNamespace{}, prepStmtsNamespaceMemAcc)
_ = ns.resetTo(ctx, &prepStmtNamespace{}, prepStmtsNamespaceMemAcc)
}

// resetTo resets a namespace to equate another one (`to`). All the receiver's
Expand All @@ -1708,12 +1827,15 @@ func (ns *prepStmtNamespace) resetToEmpty(
// It can only return an error if we've reached the memory limit and had to make
// a copy of portals.
func (ns *prepStmtNamespace) resetTo(
ctx context.Context, to prepStmtNamespace, prepStmtsNamespaceMemAcc *mon.BoundAccount,
ctx context.Context, to *prepStmtNamespace, prepStmtsNamespaceMemAcc *mon.BoundAccount,
) error {
for name, p := range ns.prepStmts {
p.decRef(ctx)
delete(ns.prepStmts, name)
}
for name := range ns.prepStmtsLRU {
delete(ns.prepStmtsLRU, name)
}
for name, p := range ns.portals {
p.close(ctx, prepStmtsNamespaceMemAcc, name)
delete(ns.portals, name)
Expand All @@ -1723,6 +1845,10 @@ func (ns *prepStmtNamespace) resetTo(
ps.incRef(ctx)
ns.prepStmts[name] = ps
}
for name, entry := range to.prepStmtsLRU {
ns.prepStmtsLRU[name] = entry
}
ns.prepStmtsLRUAlloc = to.prepStmtsLRUAlloc
for name, p := range to.portals {
if err := p.accountForCopy(ctx, prepStmtsNamespaceMemAcc, name); err != nil {
return err
Expand Down Expand Up @@ -2933,15 +3059,15 @@ func (ex *connExecutor) generateID() clusterunique.ID {
// prepStmtsNamespaceAtTxnRewindPos that's not part of prepStmtsNamespace.
func (ex *connExecutor) commitPrepStmtNamespace(ctx context.Context) error {
return ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(
ctx, ex.extraTxnState.prepStmtsNamespace, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
ctx, &ex.extraTxnState.prepStmtsNamespace, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

// commitPrepStmtNamespace deallocates everything in prepStmtsNamespace that's
// not part of prepStmtsNamespaceAtTxnRewindPos.
func (ex *connExecutor) rewindPrepStmtNamespace(ctx context.Context) error {
return ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
ctx, &ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

Expand Down Expand Up @@ -4020,14 +4146,17 @@ func (ps connExPrepStmtsAccessor) List() map[string]*PreparedStatement {
}

// Get is part of the preparedStatementsAccessor interface.
func (ps connExPrepStmtsAccessor) Get(name string) (*PreparedStatement, bool) {
func (ps connExPrepStmtsAccessor) Get(name string, touchLRU bool) (*PreparedStatement, bool) {
s, ok := ps.ex.extraTxnState.prepStmtsNamespace.prepStmts[name]
if ok && touchLRU {
ps.ex.extraTxnState.prepStmtsNamespace.touchLRUEntry(name)
}
return s, ok
}

// Delete is part of the preparedStatementsAccessor interface.
func (ps connExPrepStmtsAccessor) Delete(ctx context.Context, name string) bool {
_, ok := ps.Get(name)
_, ok := ps.Get(name, false /* touchLRU */)
if !ok {
return false
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,10 @@ func (ex *connExecutor) execStmtInOpenState(
name := e.Name.String()
ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[name]
if !ok {
err := pgerror.Newf(
pgcode.InvalidSQLStatementName,
"prepared statement %q does not exist", name,
)
return makeErrEvent(err)
return makeErrEvent(newPreparedStmtDNEError(ex.sessionData(), name))
}
ex.extraTxnState.prepStmtsNamespace.touchLRUEntry(name)

var err error
pinfo, err = ex.planner.fillInPlaceholders(ctx, ps, name, e.Params)
if err != nil {
Expand Down Expand Up @@ -2087,7 +2085,7 @@ func (ex *connExecutor) sessionStateBase64() (tree.Datum, error) {
// we look at CurState() directly.
_, isNoTxn := ex.machine.CurState().(stateNoTxn)
state, err := serializeSessionState(
!isNoTxn, ex.extraTxnState.prepStmtsNamespace, ex.sessionData(),
!isNoTxn, &ex.extraTxnState.prepStmtsNamespace, ex.sessionData(),
ex.server.cfg,
)
if err != nil {
Expand Down
Loading