diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 82394aeff6c4..ef8a0a0721df 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -965,7 +965,7 @@ func (s *Server) newConnExecutor( mon.MemoryResource, memMetrics.SessionPreparedCurBytesCount, memMetrics.SessionPreparedMaxBytesHist, - -1 /* increment */, noteworthyMemoryUsageBytes, s.cfg.Settings, + 1024 /* increment */, noteworthyMemoryUsageBytes, s.cfg.Settings, ) // The txn monitor is started in txnState.resetForNewSQLTxn(). txnMon := mon.NewMonitor( @@ -1046,12 +1046,14 @@ func (s *Server) newConnExecutor( ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionInit, timeutil.Now()) ex.extraTxnState.prepStmtsNamespace = prepStmtNamespace{ - prepStmts: make(map[string]*PreparedStatement), - portals: make(map[string]PreparedPortal), + prepStmts: make(map[string]*PreparedStatement), + prepStmtsLRU: make(map[string]struct{ prev, next string }), + portals: make(map[string]PreparedPortal), } ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos = prepStmtNamespace{ - prepStmts: make(map[string]*PreparedStatement), - portals: make(map[string]PreparedPortal), + prepStmts: make(map[string]*PreparedStatement), + prepStmtsLRU: make(map[string]struct{ prev, next string }), + portals: make(map[string]PreparedPortal), } ex.extraTxnState.prepStmtsNamespaceMemAcc = ex.sessionMon.MakeBoundAccount() dsdp := catsessiondata.NewDescriptorSessionDataStackProvider(sdMutIterator.sds) @@ -1646,6 +1648,18 @@ type prepStmtNamespace struct { // prepStmts contains the prepared statements currently available on the // session. prepStmts map[string]*PreparedStatement + // prepStmtsLRU is a circular doubly-linked list containing the prepared + // statement names ordered by most recent access (needed to determine + // evictions when prepared_statements_cache_size is set). There is a special + // entry for the empty string which is both the head and tail of the + // list. (Consequently, if it exists, the actual prepared statement for the + // empty string does not have an entry in this list and cannot be evicted.) + prepStmtsLRU map[string]struct{ prev, next string } + // prepStmtsLRUAlloc is the total amount of memory allocated for prepared + // statements in prepStmtsLRU. This will sometimes be less than + // ex.sessionPreparedMon.AllocBytes() because refcounting causes us to hold + // onto more PreparedStatements than are currently in the LRU list. + prepStmtsLRUAlloc int64 // portals contains the portals currently available on the session. Note // that PreparedPortal.accountForCopy needs to be called if a copy of a // PreparedPortal is retained. @@ -1653,20 +1667,108 @@ type prepStmtNamespace struct { } // HasActivePortals returns true if there are portals in the session. -func (ns prepStmtNamespace) HasActivePortals() bool { +func (ns *prepStmtNamespace) HasActivePortals() bool { return len(ns.portals) > 0 } // HasPortal returns true if there exists a given named portal in the session. -func (ns prepStmtNamespace) HasPortal(s string) bool { +func (ns *prepStmtNamespace) HasPortal(s string) bool { _, ok := ns.portals[s] return ok } +const prepStmtsLRUHead = "" +const prepStmtsLRUTail = "" + +// addLRUEntry adds a new prepared statement name to the LRU list. It is an +// error to re-add an existing name to the LRU list. +func (ns *prepStmtNamespace) addLRUEntry(name string, alloc int64) { + if name == prepStmtsLRUHead { + return + } + if _, ok := ns.prepStmtsLRU[name]; ok { + // Assert that we're not re-adding an existing name to the LRU list. + panic(errors.AssertionFailedf( + "prepStmtsLRU unexpected existing entry (%s): %v", name, ns.prepStmtsLRU, + )) + } + var this struct{ prev, next string } + this.prev = prepStmtsLRUHead + // Note: must do this serially in case head and next are the same entry. + head := ns.prepStmtsLRU[this.prev] + this.next = head.next + head.next = name + ns.prepStmtsLRU[prepStmtsLRUHead] = head + next, ok := ns.prepStmtsLRU[this.next] + if !ok || next.prev != prepStmtsLRUHead { + // Assert that the chain isn't broken before we modify it. + panic(errors.AssertionFailedf( + "prepStmtsLRU head entry not correct (%s): %v", this.next, ns.prepStmtsLRU, + )) + } + next.prev = name + ns.prepStmtsLRU[this.next] = next + ns.prepStmtsLRU[name] = this + ns.prepStmtsLRUAlloc += alloc +} + +// delLRUEntry removes a prepared statement name from the LRU list. (It is not an +// error to remove a non-existent prepared statement.) +func (ns *prepStmtNamespace) delLRUEntry(name string, alloc int64) { + if name == prepStmtsLRUHead { + return + } + this, ok := ns.prepStmtsLRU[name] + if !ok { + // Not an error to remove a non-existent prepared statement. + return + } + // Note: must do this serially in case prev and next are the same entry. + prev, ok := ns.prepStmtsLRU[this.prev] + if !ok || prev.next != name { + // Assert that the chain isn't broken before we modify it. + panic(errors.AssertionFailedf( + "prepStmtsLRU prev entry not correct (%s): %v", this.prev, ns.prepStmtsLRU, + )) + } + prev.next = this.next + ns.prepStmtsLRU[this.prev] = prev + next, ok := ns.prepStmtsLRU[this.next] + if !ok || next.prev != name { + // Assert that the chain isn't broken before we modify it. + panic(errors.AssertionFailedf( + "prepStmtsLRU next entry not correct (%s): %v", this.next, ns.prepStmtsLRU, + )) + } + next.prev = this.prev + ns.prepStmtsLRU[this.next] = next + delete(ns.prepStmtsLRU, name) + ns.prepStmtsLRUAlloc -= alloc +} + +// touchLRUEntry moves an existing prepared statement to the front of the LRU +// list. +func (ns *prepStmtNamespace) touchLRUEntry(name string) { + if name == prepStmtsLRUHead { + return + } + if ns.prepStmtsLRU[prepStmtsLRUHead].next == name { + // Already at the front of the list. + return + } + ns.delLRUEntry(name, 0) + ns.addLRUEntry(name, 0) +} + // MigratablePreparedStatements returns a mapping of all prepared statements. -func (ns prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.MigratableSession_PreparedStatement { +func (ns *prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.MigratableSession_PreparedStatement { ret := make([]sessiondatapb.MigratableSession_PreparedStatement, 0, len(ns.prepStmts)) - for name, stmt := range ns.prepStmts { + + // Serialize prepared statements from least-recently used to most-recently + // used, so that we build the LRU list correctly when deserializing. + for e, ok := ns.prepStmtsLRU[prepStmtsLRUTail]; ok && e.prev != prepStmtsLRUHead; e, ok = ns.prepStmtsLRU[e.prev] { + name := e.prev + stmt := ns.prepStmts[name] ret = append( ret, sessiondatapb.MigratableSession_PreparedStatement{ @@ -1676,15 +1778,32 @@ func (ns prepStmtNamespace) MigratablePreparedStatements() []sessiondatapb.Migra }, ) } + // Finally, serialize the anonymous prepared statement (if it exists). + if stmt, ok := ns.prepStmts[""]; ok { + ret = append( + ret, + sessiondatapb.MigratableSession_PreparedStatement{ + Name: "", + PlaceholderTypeHints: stmt.InferredTypes, + SQL: stmt.SQL, + }, + ) + } + return ret } -func (ns prepStmtNamespace) String() string { +func (ns *prepStmtNamespace) String() string { var sb strings.Builder sb.WriteString("Prep stmts: ") - for name := range ns.prepStmts { - sb.WriteString(name + " ") + // Put the anonymous prepared statement first (if it exists). + if _, ok := ns.prepStmts[""]; ok { + sb.WriteString("\"\" ") } + for e, ok := ns.prepStmtsLRU[prepStmtsLRUHead]; ok && e.next != prepStmtsLRUTail; e, ok = ns.prepStmtsLRU[e.next] { + sb.WriteString(e.next + " ") + } + fmt.Fprintf(&sb, "LRU alloc: %d ", ns.prepStmtsLRUAlloc) sb.WriteString("Portals: ") for name := range ns.portals { sb.WriteString(name + " ") @@ -1697,7 +1816,7 @@ func (ns *prepStmtNamespace) resetToEmpty( ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, ) { // No errors could occur since we're releasing the resources. - _ = ns.resetTo(ctx, prepStmtNamespace{}, prepStmtsNamespaceMemAcc) + _ = ns.resetTo(ctx, &prepStmtNamespace{}, prepStmtsNamespaceMemAcc) } // resetTo resets a namespace to equate another one (`to`). All the receiver's @@ -1708,12 +1827,15 @@ func (ns *prepStmtNamespace) resetToEmpty( // It can only return an error if we've reached the memory limit and had to make // a copy of portals. func (ns *prepStmtNamespace) resetTo( - ctx context.Context, to prepStmtNamespace, prepStmtsNamespaceMemAcc *mon.BoundAccount, + ctx context.Context, to *prepStmtNamespace, prepStmtsNamespaceMemAcc *mon.BoundAccount, ) error { for name, p := range ns.prepStmts { p.decRef(ctx) delete(ns.prepStmts, name) } + for name := range ns.prepStmtsLRU { + delete(ns.prepStmtsLRU, name) + } for name, p := range ns.portals { p.close(ctx, prepStmtsNamespaceMemAcc, name) delete(ns.portals, name) @@ -1723,6 +1845,10 @@ func (ns *prepStmtNamespace) resetTo( ps.incRef(ctx) ns.prepStmts[name] = ps } + for name, entry := range to.prepStmtsLRU { + ns.prepStmtsLRU[name] = entry + } + ns.prepStmtsLRUAlloc = to.prepStmtsLRUAlloc for name, p := range to.portals { if err := p.accountForCopy(ctx, prepStmtsNamespaceMemAcc, name); err != nil { return err @@ -2933,7 +3059,7 @@ func (ex *connExecutor) generateID() clusterunique.ID { // prepStmtsNamespaceAtTxnRewindPos that's not part of prepStmtsNamespace. func (ex *connExecutor) commitPrepStmtNamespace(ctx context.Context) error { return ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo( - ctx, ex.extraTxnState.prepStmtsNamespace, &ex.extraTxnState.prepStmtsNamespaceMemAcc, + ctx, &ex.extraTxnState.prepStmtsNamespace, &ex.extraTxnState.prepStmtsNamespaceMemAcc, ) } @@ -2941,7 +3067,7 @@ func (ex *connExecutor) commitPrepStmtNamespace(ctx context.Context) error { // not part of prepStmtsNamespaceAtTxnRewindPos. func (ex *connExecutor) rewindPrepStmtNamespace(ctx context.Context) error { return ex.extraTxnState.prepStmtsNamespace.resetTo( - ctx, ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos, &ex.extraTxnState.prepStmtsNamespaceMemAcc, + ctx, &ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos, &ex.extraTxnState.prepStmtsNamespaceMemAcc, ) } @@ -4020,14 +4146,17 @@ func (ps connExPrepStmtsAccessor) List() map[string]*PreparedStatement { } // Get is part of the preparedStatementsAccessor interface. -func (ps connExPrepStmtsAccessor) Get(name string) (*PreparedStatement, bool) { +func (ps connExPrepStmtsAccessor) Get(name string, touchLRU bool) (*PreparedStatement, bool) { s, ok := ps.ex.extraTxnState.prepStmtsNamespace.prepStmts[name] + if ok && touchLRU { + ps.ex.extraTxnState.prepStmtsNamespace.touchLRUEntry(name) + } return s, ok } // Delete is part of the preparedStatementsAccessor interface. func (ps connExPrepStmtsAccessor) Delete(ctx context.Context, name string) bool { - _, ok := ps.Get(name) + _, ok := ps.Get(name, false /* touchLRU */) if !ok { return false } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 724c13b02d01..190c185df192 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -453,12 +453,10 @@ func (ex *connExecutor) execStmtInOpenState( name := e.Name.String() ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[name] if !ok { - err := pgerror.Newf( - pgcode.InvalidSQLStatementName, - "prepared statement %q does not exist", name, - ) - return makeErrEvent(err) + return makeErrEvent(newPreparedStmtDNEError(ex.sessionData(), name)) } + ex.extraTxnState.prepStmtsNamespace.touchLRUEntry(name) + var err error pinfo, err = ex.planner.fillInPlaceholders(ctx, ps, name, e.Params) if err != nil { @@ -2087,7 +2085,7 @@ func (ex *connExecutor) sessionStateBase64() (tree.Datum, error) { // we look at CurState() directly. _, isNoTxn := ex.machine.CurState().(stateNoTxn) state, err := serializeSessionState( - !isNoTxn, ex.extraTxnState.prepStmtsNamespace, ex.sessionData(), + !isNoTxn, &ex.extraTxnState.prepStmtsNamespace, ex.sessionData(), ex.server.cfg, ) if err != nil { diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index cb3603adec46..ded9cfad88c5 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -19,9 +19,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/fsm" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -127,6 +129,25 @@ func (ex *connExecutor) addPreparedStmt( return nil, err } ex.extraTxnState.prepStmtsNamespace.prepStmts[name] = prepared + ex.extraTxnState.prepStmtsNamespace.addLRUEntry(name, prepared.memAcc.Allocated()) + + // Check if we're over prepared_statements_cache_size. + cacheSize := ex.sessionData().PreparedStatementsCacheSize + if cacheSize != 0 { + lru := ex.extraTxnState.prepStmtsNamespace.prepStmtsLRU + // While we're over the cache size, deallocate the LRU prepared statement. + for tail := lru[prepStmtsLRUTail]; tail.prev != prepStmtsLRUHead && tail.prev != name; tail = lru[prepStmtsLRUTail] { + if ex.extraTxnState.prepStmtsNamespace.prepStmtsLRUAlloc <= cacheSize { + break + } + log.VEventf( + ctx, 1, + "prepared statements are using more than prepared_statements_cache_size (%s), "+ + "automatically deallocating %s", string(humanizeutil.IBytes(cacheSize)), tail.prev, + ) + ex.deletePreparedStmt(ctx, tail.prev) + } + } // Remember the inferred placeholder types so they can be reported on // Describe. First, try to preserve the hints sent by the client. @@ -309,10 +330,9 @@ func (ex *connExecutor) execBind( ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[bindCmd.PreparedStatementName] if !ok { - return retErr(pgerror.Newf( - pgcode.InvalidSQLStatementName, - "unknown prepared statement %q", bindCmd.PreparedStatementName)) + return retErr(newPreparedStmtDNEError(ex.sessionData(), bindCmd.PreparedStatementName)) } + ex.extraTxnState.prepStmtsNamespace.touchLRUEntry(bindCmd.PreparedStatementName) // We need to make sure type resolution happens within a transaction. // Otherwise, for user-defined types we won't take the correct leases and @@ -527,8 +547,10 @@ func (ex *connExecutor) deletePreparedStmt(ctx context.Context, name string) { if !ok { return } + alloc := ps.memAcc.Allocated() ps.decRef(ctx) delete(ex.extraTxnState.prepStmtsNamespace.prepStmts, name) + ex.extraTxnState.prepStmtsNamespace.delLRUEntry(name, alloc) } func (ex *connExecutor) deletePortal(ctx context.Context, name string) { @@ -579,10 +601,10 @@ func (ex *connExecutor) execDescribe( case pgwirebase.PrepareStatement: ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[string(descCmd.Name)] if !ok { - return retErr(pgerror.Newf( - pgcode.InvalidSQLStatementName, - "unknown prepared statement %q", descCmd.Name)) + return retErr(newPreparedStmtDNEError(ex.sessionData(), string(descCmd.Name))) } + // Not currently counting this as an LRU touch on prepStmtsLRU for + // prepared_statements_cache_size (but maybe we should?). ast := ps.AST if execute, ok := ast.(*tree.Execute); ok { @@ -591,9 +613,7 @@ func (ex *connExecutor) execDescribe( // return the wrong information for describe. innerPs, found := ex.extraTxnState.prepStmtsNamespace.prepStmts[string(execute.Name)] if !found { - return retErr(pgerror.Newf( - pgcode.InvalidSQLStatementName, - "unknown prepared statement %q", descCmd.Name)) + return retErr(newPreparedStmtDNEError(ex.sessionData(), string(execute.Name))) } ast = innerPs.AST } @@ -659,3 +679,19 @@ func (ex *connExecutor) isAllowedInAbortedTxn(ast tree.Statement) bool { return false } } + +// newPreparedStmtDNEError creates an InvalidSQLStatementName error for when a +// prepared statement does not exist. +func newPreparedStmtDNEError(sd *sessiondata.SessionData, name string) error { + err := pgerror.Newf( + pgcode.InvalidSQLStatementName, "prepared statement %q does not exist", name, + ) + cacheSize := sd.PreparedStatementsCacheSize + if cacheSize != 0 { + err = errors.WithHintf( + err, "note that prepared_statements_cache_size is set to %s", + string(humanizeutil.IBytes(cacheSize)), + ) + } + return err +} diff --git a/pkg/sql/deallocate.go b/pkg/sql/deallocate.go index e8f0a5a11d03..a2f7f9a98d60 100644 --- a/pkg/sql/deallocate.go +++ b/pkg/sql/deallocate.go @@ -13,8 +13,6 @@ package sql import ( "context" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" ) @@ -25,8 +23,7 @@ func (p *planner) Deallocate(ctx context.Context, s *tree.Deallocate) (planNode, p.preparedStatements.DeleteAll(ctx) } else { if found := p.preparedStatements.Delete(ctx, string(s.Name)); !found { - return nil, pgerror.Newf(pgcode.InvalidSQLStatementName, - "prepared statement %q does not exist", s.Name) + return nil, newPreparedStmtDNEError(p.SessionData(), string(s.Name)) } } return newZeroNode(nil /* columns */), nil diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 5022b9422ac1..fe4fd1cbed3d 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3500,6 +3500,10 @@ func (m *sessionDataMutator) SetDefaultTextSearchConfig(val string) { m.data.DefaultTextSearchConfig = val } +func (m *sessionDataMutator) SetPreparedStatementsCacheSize(val int64) { + m.data.PreparedStatementsCacheSize = val +} + func (m *sessionDataMutator) SetStreamerEnabled(val bool) { m.data.StreamerEnabled = val } diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 9fb5c60507eb..34a3b2820331 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -5281,6 +5281,7 @@ parallelize_multi_key_lookup_joins_enabled off password_encryption scram-sha-256 pg_trgm.similarity_threshold 0.3 prefer_lookup_joins_for_fks off +prepared_statements_cache_size 0 B propagate_input_ordering off reorder_joins_limit 8 require_explicit_primary_keys off diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index e7b3f99b3578..b5f7840e7ea4 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2773,6 +2773,7 @@ parallelize_multi_key_lookup_joins_enabled off NULL password_encryption scram-sha-256 NULL NULL NULL string pg_trgm.similarity_threshold 0.3 NULL NULL NULL string prefer_lookup_joins_for_fks off NULL NULL NULL string +prepared_statements_cache_size 0 B NULL NULL NULL string propagate_input_ordering off NULL NULL NULL string reorder_joins_limit 8 NULL NULL NULL string require_explicit_primary_keys off NULL NULL NULL string @@ -2925,6 +2926,7 @@ parallelize_multi_key_lookup_joins_enabled off NULL password_encryption scram-sha-256 NULL user NULL scram-sha-256 scram-sha-256 pg_trgm.similarity_threshold 0.3 NULL user NULL 0.3 0.3 prefer_lookup_joins_for_fks off NULL user NULL off off +prepared_statements_cache_size 0 B NULL user NULL 0 B 0 B propagate_input_ordering off NULL user NULL off off reorder_joins_limit 8 NULL user NULL 8 8 require_explicit_primary_keys off NULL user NULL off off @@ -3076,6 +3078,7 @@ parallelize_multi_key_lookup_joins_enabled NULL NULL NULL password_encryption NULL NULL NULL NULL NULL pg_trgm.similarity_threshold NULL NULL NULL NULL NULL prefer_lookup_joins_for_fks NULL NULL NULL NULL NULL +prepared_statements_cache_size NULL NULL NULL NULL NULL propagate_input_ordering NULL NULL NULL NULL NULL reorder_joins_limit NULL NULL NULL NULL NULL require_explicit_primary_keys NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/prepare b/pkg/sql/logictest/testdata/logic_test/prepare index d1c1ddba213f..950663dcddc3 100644 --- a/pkg/sql/logictest/testdata/logic_test/prepare +++ b/pkg/sql/logictest/testdata/logic_test/prepare @@ -1482,3 +1482,190 @@ query ITT EXECUTE args_deduce_type_1(1,10,100); ---- 1 10 100 + +# Test that prepared_statements_cache_size functions correctly. +subtest prepared_statements_cache_size + +statement ok +DEALLOCATE ALL + +# With a very small cache, all prepared statements except the most recent one +# should be automatically deallocated. +statement ok +SET prepared_statements_cache_size = '1 KiB' + +statement ok +PREPARE pscs01 AS SELECT $1::bool, 1 + +statement ok +PREPARE pscs02 AS SELECT $1::float, 2 + +statement ok +PREPARE pscs03 AS SELECT $1::decimal, 3 + +statement ok +PREPARE pscs04 AS SELECT $1::string, 4 + +statement ok +PREPARE pscs05 AS SELECT $1::json, 5 + +statement ok +PREPARE pscs06 AS SELECT $1::int, 6 + +query T +SELECT name FROM pg_catalog.pg_prepared_statements ORDER BY name +---- +pscs06 + +query II +EXECUTE pscs06(6) +---- +6 6 + +statement error prepared statement \"pscs05\" does not exist +EXECUTE pscs05(5) + +statement error prepared statement \"pscs04\" does not exist +EXECUTE pscs04(4) + +statement error prepared statement \"pscs03\" does not exist +EXECUTE pscs03(3) + +statement error prepared statement \"pscs02\" does not exist +EXECUTE pscs02(2) + +statement error prepared statement \"pscs01\" does not exist +EXECUTE pscs01(1) + +# We should automatically deallocate old prepared statements as the cache grows. +statement ok +SET prepared_statements_cache_size = '20 KiB' + +statement ok +PREPARE pscs07 AS SELECT $1::date, 7 + +statement ok +PREPARE pscs08 AS SELECT $1::timestamp, 8 + +statement ok +PREPARE pscs09 AS SELECT $1::bool, 9 + +statement ok +PREPARE pscs10 AS SELECT $1::bytes, 10 + +statement ok +PREPARE pscs11 AS SELECT $1::smallint, 11 + +statement ok +PREPARE pscs12 AS SELECT $1::time, 12 + +statement ok +PREPARE pscs13 AS SELECT $1::bigint, 13 + +query T +SELECT name FROM pg_catalog.pg_prepared_statements ORDER BY name +---- +pscs08 +pscs09 +pscs10 +pscs11 +pscs12 +pscs13 + +statement ok +DEALLOCATE pscs10 + +# Now we should have room for one more. +statement ok +PREPARE pscs14 AS SELECT $1::int, 14 + +query T +SELECT name FROM pg_catalog.pg_prepared_statements ORDER BY name +---- +pscs08 +pscs09 +pscs11 +pscs12 +pscs13 +pscs14 + +# Executing a prepared statement should move it to the front of the LRU list. +query II +EXECUTE pscs11(11) +---- +11 11 + +statement ok +PREPARE pscs15 AS SELECT $1::timetz, 15 + +statement ok +PREPARE pscs16 AS SELECT $1::float, 16 + +statement ok +PREPARE pscs17 AS SELECT $1::interval, 17 + +query T +SELECT name FROM pg_catalog.pg_prepared_statements ORDER BY name +---- +pscs11 +pscs13 +pscs14 +pscs15 +pscs16 +pscs17 + +# Retrying a transaction should rewind the LRU list after each retry even if +# some prepared statements were evicted. (We use a sequence to break out of the +# retry loop.) + +statement ok +CREATE SEQUENCE s + +statement ok +CREATE TABLE prep_stmts (which INT, name STRING) + +statement ok +BEGIN; +INSERT INTO prep_stmts SELECT 1, name FROM pg_catalog.pg_prepared_statements; +PREPARE pscs18 AS SELECT $1::inet, 18; +EXECUTE pscs14(14); +PREPARE pscs19 AS SELECT $1::string, 19; +INSERT INTO prep_stmts SELECT 2, name FROM pg_catalog.pg_prepared_statements; +SELECT IF(nextval('s') <= 3, crdb_internal.force_retry('1 hour'), 0); +COMMIT + +# Validate that the transaction was actually tried multiple times. +query I +SELECT currval('s') +---- +4 + +# Validate that the LRU list was correct before and after the PREPAREs, even +# after multiple retries. +query IT +SELECT which, name FROM prep_stmts ORDER BY which, name +---- +1 pscs11 +1 pscs13 +1 pscs14 +1 pscs15 +1 pscs16 +1 pscs17 +2 pscs14 +2 pscs15 +2 pscs16 +2 pscs17 +2 pscs18 +2 pscs19 + +statement ok +DROP TABLE prep_stmts + +statement ok +DROP SEQUENCE s + +statement ok +DEALLOCATE ALL + +statement ok +RESET prepared_statements_cache_size diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index b71883d963d8..fdbfc92f2c39 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -125,6 +125,7 @@ parallelize_multi_key_lookup_joins_enabled off password_encryption scram-sha-256 pg_trgm.similarity_threshold 0.3 prefer_lookup_joins_for_fks off +prepared_statements_cache_size 0 B propagate_input_ordering off reorder_joins_limit 8 require_explicit_primary_keys off diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index 10198aaabce4..81dec259c213 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -90,7 +90,7 @@ func (p *planner) prepareUsingOptimizer(ctx context.Context) (planFlags, error) // we need to set the expected output columns to the output columns of the // prepared statement that the user is trying to execute. name := string(t.Name) - prepared, ok := p.preparedStatements.Get(name) + prepared, ok := p.preparedStatements.Get(name, true /* touchLRU */) if !ok { // We're trying to prepare an EXECUTE of a statement that doesn't exist. // Let's just give up at this point. diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index 7a0c67cc051e..2f3834bda37c 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -108,9 +108,10 @@ type preparedStatementsAccessor interface { // List returns all prepared statements as a map keyed by name. // The map itself is a copy of the prepared statements. List() map[string]*PreparedStatement - // Get returns the prepared statement with the given name. The returned bool - // is false if a statement with the given name doesn't exist. - Get(name string) (*PreparedStatement, bool) + // Get returns the prepared statement with the given name. If touchLRU is + // true, this counts as an access for LRU bookkeeping. The returned bool is + // false if a statement with the given name doesn't exist. + Get(name string, touchLRU bool) (*PreparedStatement, bool) // Delete removes the PreparedStatement with the provided name from the // collection. If a portal exists for that statement, it is also removed. // The method returns true if statement with that name was found and removed, diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index db0f19cc4d79..15e46dce7262 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -359,6 +359,12 @@ message LocalOnlySessionData { // of a role membership which the transaction relied on has successfully been // committed and acknowledged to the user. bool allow_role_memberships_to_change_during_transaction = 96; + // PreparedStatementsCacheSize, when not equal to 0, causes the LRU prepared + // statements in a session to be automatically deallocated when total prepared + // statement memory usage for that session is more than the cache size. + // Execution of these deallocated prepared statements will fail until they are + // prepared again. + int64 prepared_statements_cache_size = 97; // StreamerEnabled controls whether the Streamer API can be used. bool streamer_enabled = 98; diff --git a/pkg/sql/testdata/session_migration/prepared_statements b/pkg/sql/testdata/session_migration/prepared_statements index 10e6d015b11e..c9e13821b93b 100644 --- a/pkg/sql/testdata/session_migration/prepared_statements +++ b/pkg/sql/testdata/session_migration/prepared_statements @@ -113,3 +113,71 @@ exec SELECT crdb_internal.deserialize_session( decode('$x', 'hex') ) ---- ERROR: crdb_internal.deserialize_session(): prepared statement "s2" already exists (SQLSTATE 42P05) + +# Test if the LRU list of prepared statements is preserved during migration. + +exec +DEALLOCATE ALL +---- + +exec +SET prepared_statements_cache_size = '10 KiB' +---- + +wire_prepare pscs1 +SELECT 101 +---- + +wire_prepare pscs2 +SELECT 102 +---- + +wire_prepare pscs3 +SELECT 103 +---- + +wire_prepare pscs4 +SELECT 104 +---- + +# Move pscs2 to the front of the LRU list. +wire_exec pscs2 +---- + +query +SELECT name FROM pg_catalog.pg_prepared_statements ORDER BY name +---- +pscs2 +pscs3 +pscs4 + +let $z +SELECT encode(crdb_internal.serialize_session(), 'hex') +---- + +exec +DEALLOCATE ALL +---- + +exec +SELECT crdb_internal.deserialize_session( decode('$z', 'hex') ) +---- + +query +SELECT name FROM pg_catalog.pg_prepared_statements ORDER BY name +---- +pscs2 +pscs3 +pscs4 + +# Check that pscs2 is still at the front of the LRU list. +wire_prepare pscs5 +SELECT 105 +---- + +query +SELECT name FROM pg_catalog.pg_prepared_statements ORDER BY name +---- +pscs2 +pscs4 +pscs5 diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index 06351725f54b..8c10a9a85ead 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -2585,6 +2585,24 @@ var varGen = map[string]sessionVar{ GlobalDefault: globalFalse, }, + // CockroachDB extension. + `prepared_statements_cache_size`: { + Set: func(_ context.Context, m sessionDataMutator, s string) error { + limit, err := humanizeutil.ParseBytes(s) + if err != nil { + return err + } + m.SetPreparedStatementsCacheSize(limit) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return string(humanizeutil.IBytes(evalCtx.SessionData().PreparedStatementsCacheSize)), nil + }, + GlobalDefault: func(_ *settings.Values) string { + return string(humanizeutil.IBytes(0)) + }, + }, + // CockroachDB extension. `streamer_enabled`: { GetStringVal: makePostgresBoolGetStringValFn(`streamer_enabled`), diff --git a/pkg/util/mon/bytes_usage.go b/pkg/util/mon/bytes_usage.go index a3a81ccda9e8..d8f6944a7dd7 100644 --- a/pkg/util/mon/bytes_usage.go +++ b/pkg/util/mon/bytes_usage.go @@ -696,7 +696,7 @@ func (b *BoundAccount) Monitor() *BytesMonitor { return b.mon } -func (b *BoundAccount) allocated() int64 { +func (b *BoundAccount) Allocated() int64 { if b == nil { return 0 } @@ -802,7 +802,7 @@ func (b *BoundAccount) Close(ctx context.Context) { // monitor -- "bytes out of the aether". This needs not be closed. return } - if a := b.allocated(); a > 0 { + if a := b.Allocated(); a > 0 { b.mon.releaseBytes(ctx, a) } } @@ -1018,7 +1018,7 @@ func (mm *BytesMonitor) roundSize(sz int64) int64 { func (mm *BytesMonitor) releaseBudget(ctx context.Context) { // NB: mm.mu need not be locked here, as this is only called from StopMonitor(). if log.V(2) { - log.Infof(ctx, "%s: releasing %d bytes to the pool", mm.name, mm.mu.curBudget.allocated()) + log.Infof(ctx, "%s: releasing %d bytes to the pool", mm.name, mm.mu.curBudget.Allocated()) } mm.mu.curBudget.Clear(ctx) } diff --git a/pkg/util/mon/bytes_usage_test.go b/pkg/util/mon/bytes_usage_test.go index 3d3e4acd4531..be70039a6505 100644 --- a/pkg/util/mon/bytes_usage_test.go +++ b/pkg/util/mon/bytes_usage_test.go @@ -70,7 +70,7 @@ func TestMemoryAllocations(t *testing.T) { t.Errorf("account %d went negative: %d", accI, accs[accI].used) fail = true } - sum += accs[accI].allocated() + sum += accs[accI].Allocated() } if m.mu.curAllocated < 0 { t.Errorf("monitor current count went negative: %d", m.mu.curAllocated) @@ -84,7 +84,7 @@ func TestMemoryAllocations(t *testing.T) { t.Errorf("monitor current budget went negative: %d", m.mu.curBudget.used) fail = true } - avail := m.mu.curBudget.allocated() + m.reserved.used + avail := m.mu.curBudget.Allocated() + m.reserved.used if sum > avail { t.Errorf("total account sum %d greater than total monitor budget %d", sum, avail) fail = true @@ -93,7 +93,7 @@ func TestMemoryAllocations(t *testing.T) { t.Errorf("pool cur %d exceeds max %d", pool.mu.curAllocated, pool.reserved.used) fail = true } - if m.mu.curBudget.allocated() != pool.mu.curAllocated { + if m.mu.curBudget.Allocated() != pool.mu.curAllocated { t.Errorf("monitor budget %d different from pool cur %d", m.mu.curBudget.used, pool.mu.curAllocated) fail = true } @@ -287,7 +287,7 @@ func TestNilBoundAccount(t *testing.T) { var ba *BoundAccount _ = ba.Used() _ = ba.Monitor() - _ = ba.allocated() + _ = ba.Allocated() ba.Empty(ctx) ba.Clear(ctx) ba.Close(ctx)