Skip to content

Commit

Permalink
Merge pull request cockroachdb#84089 from RichardJCai/backport22.1-83678
Browse files Browse the repository at this point in the history
release-22.1: sql: Use one BytesMonitor for all IE created by IEFactory
  • Loading branch information
RichardJCai authored Jul 18, 2022
2 parents 6b0ec79 + 3c7d6d9 commit 7b257ec
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 96 deletions.
7 changes: 3 additions & 4 deletions pkg/ccl/serverccl/role_authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ func TestVerifyPassword(t *testing.T) {
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

mon := sql.MakeInternalExecutorMemMonitor(sql.MemoryMetrics{}, s.ClusterSettings())
mon.Start(ctx, s.(*server.TestServer).Server.PGServer().SQLServer.GetBytesMonitor(), mon.MakeBoundAccount())
ie := sql.MakeInternalExecutor(
context.Background(),
s.(*server.TestServer).Server.PGServer().SQLServer,
sql.MemoryMetrics{},
s.ExecutorConfig().(sql.ExecutorConfig).Settings,
s.(*server.TestServer).Server.PGServer().SQLServer, sql.MemoryMetrics{}, mon,
)

ts := s.(*server.TestServer)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/backend_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestBackendDialTLS(t *testing.T) {
defer sql.Stopper().Stop(ctx)

conn, err := BackendDial(startupMsg, sql.ServingSQLAddr(), tlsConfig)

require.NoError(t, err)
require.NotNil(t, conn)
})
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ go_test(
"servemode_test.go",
"server_http_test.go",
"server_import_ts_test.go",
"server_internal_executor_factory_test.go",
"server_systemlog_gc_test.go",
"server_test.go",
"settings_cache_test.go",
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,10 @@ func (s *Server) PreStart(ctx context.Context) error {
// Initialize the external storage builders configuration params now that the
// engines have been created. The object can be used to create ExternalStorage
// objects hereafter.
fileTableInternalExecutor := sql.MakeInternalExecutor(ctx, s.PGServer().SQLServer, sql.MemoryMetrics{}, s.st)
ieMon := sql.MakeInternalExecutorMemMonitor(sql.MemoryMetrics{}, s.ClusterSettings())
ieMon.Start(ctx, s.PGServer().SQLServer.GetBytesMonitor(), mon.BoundAccount{})
s.stopper.AddCloser(stop.CloserFn(func() { ieMon.Stop(ctx) }))
fileTableInternalExecutor := sql.MakeInternalExecutor(s.PGServer().SQLServer, sql.MemoryMetrics{}, ieMon)
s.externalStorageBuilder.init(
ctx,
s.cfg.ExternalIODirConfig,
Expand Down
45 changes: 45 additions & 0 deletions pkg/server/server_internal_executor_factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestInternalExecutorClearsMonitorMemory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

mon := s.(*TestServer).sqlServer.internalExecutorFactoryMemMonitor
ief := s.ExecutorConfig().(sql.ExecutorConfig).InternalExecutorFactory
sessionData := sql.NewFakeSessionData(&s.ClusterSettings().SV)
ie := ief(ctx, sessionData)
rows, err := ie.QueryIteratorEx(ctx, "test", nil, sessiondata.NodeUserSessionDataOverride, `SELECT 1`)
require.NoError(t, err)
require.Greater(t, mon.AllocBytes(), int64(0))
err = rows.Close()
require.NoError(t, err)
s.Stopper().Stop(ctx)
require.Equal(t, mon.AllocBytes(), int64(0))
}
99 changes: 59 additions & 40 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ type SQLServer struct {
// connection management tools learning this status from health checks.
// This is set to true when the server has started accepting client conns.
isReady syncutil.AtomicBool

// internalExecutorFactoryMemMonitor is the memory monitor corresponding to the
// InternalExecutorFactory singleton. It only gets closed when
// Server is closed. Every InternalExecutor created via the factory
// uses this memory monitor.
internalExecutorFactoryMemMonitor *mon.BytesMonitor
}

// sqlServerOptionalKVArgs are the arguments supplied to newSQLServer which are
Expand Down Expand Up @@ -872,19 +878,31 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
distSQLServer.ServerConfig.SQLStatsController = pgServer.SQLServer.GetSQLStatsController()
distSQLServer.ServerConfig.IndexUsageStatsController = pgServer.SQLServer.GetIndexUsageStatsController()

// We use one BytesMonitor for all InternalExecutor's created by the
// ieFactory.
// Note that ieFactoryMonitor does not have to be closed, the parent
// monitor comes from server. ieFactoryMonitor is a singleton attached
// to server, if server is closed, we don't have to worry about
// returning the memory allocated to ieFactoryMonitor since the
// parent monitor is being closed anyway.
ieFactoryMonitor := mon.NewMonitor(
"internal executor factory",
mon.MemoryResource,
internalMemMetrics.CurBytesCount,
internalMemMetrics.MaxBytesHist,
-1, /* use default increment */
math.MaxInt64, /* noteworthy */
cfg.Settings,
)
ieFactoryMonitor.Start(ctx, pgServer.SQLServer.GetBytesMonitor(), mon.BoundAccount{})
// Now that we have a pgwire.Server (which has a sql.Server), we can close a
// circular dependency between the rowexec.Server and sql.Server and set
// SessionBoundInternalExecutorFactory. The same applies for setting a
// SessionBoundInternalExecutor on the job registry.
ieFactory := func(
ctx context.Context, sessionData *sessiondata.SessionData,
) sqlutil.InternalExecutor {
ie := sql.MakeInternalExecutor(
ctx,
pgServer.SQLServer,
internalMemMetrics,
cfg.Settings,
)
ie := sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor)
ie.SetSessionData(sessionData)
return &ie
}
Expand Down Expand Up @@ -914,9 +932,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
for _, m := range pgServer.Metrics() {
cfg.registry.AddMetricStruct(m)
}
*cfg.circularInternalExecutor = sql.MakeInternalExecutor(
ctx, pgServer.SQLServer, internalMemMetrics, cfg.Settings,
)
*cfg.circularInternalExecutor = sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor)
execCfg.InternalExecutor = cfg.circularInternalExecutor
stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor,
Expand Down Expand Up @@ -1051,35 +1067,36 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}

return &SQLServer{
ambientCtx: cfg.BaseConfig.AmbientCtx,
stopper: cfg.stopper,
sqlIDContainer: cfg.nodeIDContainer,
pgServer: pgServer,
distSQLServer: distSQLServer,
execCfg: execCfg,
internalExecutor: cfg.circularInternalExecutor,
leaseMgr: leaseMgr,
blobService: blobService,
tracingService: tracingService,
tenantConnect: cfg.tenantConnect,
sessionRegistry: cfg.sessionRegistry,
jobRegistry: jobRegistry,
statsRefresher: statsRefresher,
temporaryObjectCleaner: temporaryObjectCleaner,
internalMemMetrics: internalMemMetrics,
sqlMemMetrics: sqlMemMetrics,
stmtDiagnosticsRegistry: stmtDiagnosticsRegistry,
sqlLivenessProvider: cfg.sqlLivenessProvider,
sqlInstanceProvider: cfg.sqlInstanceProvider,
metricsRegistry: cfg.registry,
diagnosticsReporter: reporter,
spanconfigMgr: spanConfig.manager,
spanconfigSQLTranslatorFactory: spanConfig.sqlTranslatorFactory,
spanconfigSQLWatcher: spanConfig.sqlWatcher,
settingsWatcher: settingsWatcher,
systemConfigWatcher: cfg.systemConfigWatcher,
isMeta1Leaseholder: cfg.isMeta1Leaseholder,
cfg: cfg.BaseConfig,
ambientCtx: cfg.BaseConfig.AmbientCtx,
stopper: cfg.stopper,
sqlIDContainer: cfg.nodeIDContainer,
pgServer: pgServer,
distSQLServer: distSQLServer,
execCfg: execCfg,
internalExecutor: cfg.circularInternalExecutor,
leaseMgr: leaseMgr,
blobService: blobService,
tracingService: tracingService,
tenantConnect: cfg.tenantConnect,
sessionRegistry: cfg.sessionRegistry,
jobRegistry: jobRegistry,
statsRefresher: statsRefresher,
temporaryObjectCleaner: temporaryObjectCleaner,
internalMemMetrics: internalMemMetrics,
sqlMemMetrics: sqlMemMetrics,
stmtDiagnosticsRegistry: stmtDiagnosticsRegistry,
sqlLivenessProvider: cfg.sqlLivenessProvider,
sqlInstanceProvider: cfg.sqlInstanceProvider,
metricsRegistry: cfg.registry,
diagnosticsReporter: reporter,
spanconfigMgr: spanConfig.manager,
spanconfigSQLTranslatorFactory: spanConfig.sqlTranslatorFactory,
spanconfigSQLWatcher: spanConfig.sqlWatcher,
settingsWatcher: settingsWatcher,
systemConfigWatcher: cfg.systemConfigWatcher,
isMeta1Leaseholder: cfg.isMeta1Leaseholder,
cfg: cfg.BaseConfig,
internalExecutorFactoryMemMonitor: ieFactoryMonitor,
}, nil
}

Expand Down Expand Up @@ -1192,8 +1209,10 @@ func (s *SQLServer) preStart(
s.leaseMgr.RefreshLeases(ctx, stopper, s.execCfg.DB)
s.leaseMgr.PeriodicallyRefreshSomeLeases(ctx)

migrationsExecutor := sql.MakeInternalExecutor(
ctx, s.pgServer.SQLServer, s.internalMemMetrics, s.execCfg.Settings)
ieMon := sql.MakeInternalExecutorMemMonitor(sql.MemoryMetrics{}, s.execCfg.Settings)
ieMon.Start(ctx, s.pgServer.SQLServer.GetBytesMonitor(), mon.BoundAccount{})
s.stopper.AddCloser(stop.CloserFn(func() { ieMon.Stop(ctx) }))
migrationsExecutor := sql.MakeInternalExecutor(s.pgServer.SQLServer, s.internalMemMetrics, ieMon)
migrationsExecutor.SetSessionData(
&sessiondata.SessionData{
LocalOnlySessionData: sessiondatapb.LocalOnlySessionData{
Expand Down
33 changes: 22 additions & 11 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,17 +386,20 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
telemetryLoggingMetrics.Knobs = cfg.TelemetryLoggingTestingKnobs
s.TelemetryLoggingMetrics = telemetryLoggingMetrics

sqlStatsInternalExecutor := MakeInternalExecutor(context.Background(), s, MemoryMetrics{}, cfg.Settings)
sqlStatsInternalExecutorMonitor := MakeInternalExecutorMemMonitor(MemoryMetrics{}, s.GetExecutorConfig().Settings)
sqlStatsInternalExecutorMonitor.Start(context.Background(), s.GetBytesMonitor(), mon.BoundAccount{})
sqlStatsInternalExecutor := MakeInternalExecutor(s, MemoryMetrics{}, sqlStatsInternalExecutorMonitor)
persistedSQLStats := persistedsqlstats.New(&persistedsqlstats.Config{
Settings: s.cfg.Settings,
InternalExecutor: &sqlStatsInternalExecutor,
KvDB: cfg.DB,
SQLIDContainer: cfg.NodeID,
JobRegistry: s.cfg.JobRegistry,
Knobs: cfg.SQLStatsTestingKnobs,
FlushCounter: serverMetrics.StatsMetrics.SQLStatsFlushStarted,
FailureCounter: serverMetrics.StatsMetrics.SQLStatsFlushFailure,
FlushDuration: serverMetrics.StatsMetrics.SQLStatsFlushDuration,
Settings: s.cfg.Settings,
InternalExecutor: &sqlStatsInternalExecutor,
InternalExecutorMonitor: sqlStatsInternalExecutorMonitor,
KvDB: cfg.DB,
SQLIDContainer: cfg.NodeID,
JobRegistry: s.cfg.JobRegistry,
Knobs: cfg.SQLStatsTestingKnobs,
FlushCounter: serverMetrics.StatsMetrics.SQLStatsFlushStarted,
FailureCounter: serverMetrics.StatsMetrics.SQLStatsFlushFailure,
FlushDuration: serverMetrics.StatsMetrics.SQLStatsFlushDuration,
}, memSQLStats)

s.sqlStats = persistedSQLStats
Expand Down Expand Up @@ -628,6 +631,11 @@ func (s *Server) GetExecutorConfig() *ExecutorConfig {
return s.cfg
}

// GetBytesMonitor returns this server's BytesMonitor.
func (s *Server) GetBytesMonitor() *mon.BytesMonitor {
return s.pool
}

// SetupConn creates a connExecutor for the client connection.
//
// When this method returns there are no resources allocated yet that
Expand Down Expand Up @@ -1096,7 +1104,10 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
}

if ex.hasCreatedTemporarySchema && !ex.server.cfg.TestingKnobs.DisableTempObjectsCleanupOnSessionExit {
ie := MakeInternalExecutor(ctx, ex.server, MemoryMetrics{}, ex.server.cfg.Settings)
ieMon := MakeInternalExecutorMemMonitor(MemoryMetrics{}, ex.server.cfg.Settings)
ieMon.Start(ctx, ex.server.GetBytesMonitor(), mon.BoundAccount{})
defer ieMon.Stop(ctx)
ie := MakeInternalExecutor(ex.server, MemoryMetrics{}, ieMon)
err := cleanupSessionTempObjects(
ctx,
ex.server.cfg.Settings,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (ep *DummyEvalPlanner) QueryIteratorEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
override sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) (tree.InternalRows, error) {
Expand Down
22 changes: 14 additions & 8 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,21 @@ func (ie *InternalExecutor) WithSyntheticDescriptors(

// MakeInternalExecutor creates an InternalExecutor.
func MakeInternalExecutor(
ctx context.Context, s *Server, memMetrics MemoryMetrics, settings *cluster.Settings,
s *Server, memMetrics MemoryMetrics, monitor *mon.BytesMonitor,
) InternalExecutor {
monitor := mon.NewMonitor(
return InternalExecutor{
s: s,
mon: monitor,
memMetrics: memMetrics,
}
}

// MakeInternalExecutorMemMonitor creates and starts memory monitor for an
// InternalExecutor.
func MakeInternalExecutorMemMonitor(
memMetrics MemoryMetrics, settings *cluster.Settings,
) *mon.BytesMonitor {
return mon.NewMonitor(
"internal SQL executor",
mon.MemoryResource,
memMetrics.CurBytesCount,
Expand All @@ -107,12 +119,6 @@ func MakeInternalExecutor(
math.MaxInt64, /* noteworthy */
settings,
)
monitor.Start(ctx, s.pool, mon.BoundAccount{})
return InternalExecutor{
s: s,
mon: monitor,
memMetrics: memMetrics,
}
}

// SetSessionData binds the session variables that will be used by queries
Expand Down
29 changes: 12 additions & 17 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,10 @@ func TestInternalFullTableScan(t *testing.T) {
"pq: query `SELECT * FROM t` contains a full table/index scan which is explicitly disallowed",
err.Error())

mon := sql.MakeInternalExecutorMemMonitor(sql.MemoryMetrics{}, s.ClusterSettings())
mon.Start(ctx, s.(*server.TestServer).Server.PGServer().SQLServer.GetBytesMonitor(), mon.MakeBoundAccount())
ie := sql.MakeInternalExecutor(
ctx,
s.(*server.TestServer).Server.PGServer().SQLServer,
sql.MemoryMetrics{},
s.ExecutorConfig().(sql.ExecutorConfig).Settings,
s.(*server.TestServer).Server.PGServer().SQLServer, sql.MemoryMetrics{}, mon,
)
ie.SetSessionData(
&sessiondata.SessionData{
Expand Down Expand Up @@ -189,13 +188,11 @@ func TestInternalStmtFingerprintLimit(t *testing.T) {
_, err = db.Exec("SET CLUSTER SETTING sql.metrics.max_mem_stmt_fingerprints = 0;")
require.NoError(t, err)

mon := sql.MakeInternalExecutorMemMonitor(sql.MemoryMetrics{}, s.ClusterSettings())
mon.Start(ctx, s.(*server.TestServer).Server.PGServer().SQLServer.GetBytesMonitor(), mon.MakeBoundAccount())
ie := sql.MakeInternalExecutor(
ctx,
s.(*server.TestServer).Server.PGServer().SQLServer,
sql.MemoryMetrics{},
s.ExecutorConfig().(sql.ExecutorConfig).Settings,
s.(*server.TestServer).Server.PGServer().SQLServer, sql.MemoryMetrics{}, mon,
)

_, err = ie.Exec(ctx, "stmt-exceeds-fingerprint-limit", nil, "SELECT 1")
require.NoError(t, err)
}
Expand Down Expand Up @@ -322,11 +319,10 @@ func TestSessionBoundInternalExecutor(t *testing.T) {
}

expDB := "foo"
mon := sql.MakeInternalExecutorMemMonitor(sql.MemoryMetrics{}, s.ClusterSettings())
mon.Start(ctx, s.(*server.TestServer).Server.PGServer().SQLServer.GetBytesMonitor(), mon.MakeBoundAccount())
ie := sql.MakeInternalExecutor(
ctx,
s.(*server.TestServer).Server.PGServer().SQLServer,
sql.MemoryMetrics{},
s.ExecutorConfig().(sql.ExecutorConfig).Settings,
s.(*server.TestServer).Server.PGServer().SQLServer, sql.MemoryMetrics{}, mon,
)
ie.SetSessionData(
&sessiondata.SessionData{
Expand Down Expand Up @@ -390,11 +386,10 @@ func TestInternalExecAppNameInitialization(t *testing.T) {
s, _, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())

mon := sql.MakeInternalExecutorMemMonitor(sql.MemoryMetrics{}, s.ClusterSettings())
mon.Start(context.Background(), s.(*server.TestServer).Server.PGServer().SQLServer.GetBytesMonitor(), mon.MakeBoundAccount())
ie := sql.MakeInternalExecutor(
context.Background(),
s.(*server.TestServer).Server.PGServer().SQLServer,
sql.MemoryMetrics{},
s.ExecutorConfig().(sql.ExecutorConfig).Settings,
s.(*server.TestServer).Server.PGServer().SQLServer, sql.MemoryMetrics{}, mon,
)
ie.SetSessionData(
&sessiondata.SessionData{
Expand Down
Loading

0 comments on commit 7b257ec

Please sign in to comment.