Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: sanitize the behavior of runtime stats sampling / dumps for secondary tenants #96168

Merged
merged 3 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 97 additions & 82 deletions pkg/ccl/serverccl/tenant_vars_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -41,87 +43,100 @@ func TestTenantVars(t *testing.T) {
})
defer testCluster.Stopper().Stop(ctx)

server := testCluster.Server(0 /* idx */)

tenant, _ := serverutils.StartTenant(t, server, base.TestTenantArgs{
TenantID: roachpb.MustMakeTenantID(10 /* id */),
srv := testCluster.Server(0 /* idx */)

testutils.RunTrueAndFalse(t, "shared-process", func(t *testing.T, sharedProcess bool) {
var tenant serverutils.TestTenantInterface
if !sharedProcess {
tenant, _ = serverutils.StartTenant(t, srv, base.TestTenantArgs{
TenantID: roachpb.MustMakeTenantID(10 /* id */),
})
} else {
var err error
tenant, _, err = srv.(*server.TestServer).StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantName: roachpb.TenantName("test"),
TenantID: roachpb.MustMakeTenantID(20),
})
require.NoError(t, err)
}

startNowNanos := timeutil.Now().UnixNano()
url := tenant.AdminURL() + "/_status/load"
client := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
resp, err := client.Get(url)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, 200, resp.StatusCode,
"invalid non-200 status code %v from tenant", resp.StatusCode)

var parser expfmt.TextParser
metrics, err := parser.TextToMetricFamilies(resp.Body)
require.NoError(t, err)

userCPU, found := metrics["sys_cpu_user_ns"]
require.True(t, found)
require.Len(t, userCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, userCPU.GetType())
cpuUserNanos := userCPU.Metric[0].GetGauge().GetValue()

sysCPU, found := metrics["sys_cpu_sys_ns"]
require.True(t, found)
require.Len(t, sysCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, sysCPU.GetType())
cpuSysNanos := sysCPU.Metric[0].GetGauge().GetValue()

now, found := metrics["sys_cpu_now_ns"]
require.True(t, found)
require.Len(t, now.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, now.GetType())
nowNanos := now.Metric[0].GetGauge().GetValue()

// The values are between zero and whatever User/Sys time is observed after the get.
require.Positive(t, cpuUserNanos)
require.Positive(t, cpuSysNanos)
require.Positive(t, nowNanos)
cpuTime := gosigar.ProcTime{}
require.NoError(t, cpuTime.Get(os.Getpid()))
require.LessOrEqual(t, cpuUserNanos, float64(cpuTime.User)*1e6)
require.LessOrEqual(t, cpuSysNanos, float64(cpuTime.Sys)*1e6)
require.GreaterOrEqual(t, nowNanos, float64(startNowNanos))
require.LessOrEqual(t, nowNanos, float64(timeutil.Now().UnixNano()))

resp, err = client.Get(url)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, 200, resp.StatusCode,
"invalid non-200 status code %v from tenant", resp.StatusCode)

metrics, err = parser.TextToMetricFamilies(resp.Body)
require.NoError(t, err)

userCPU, found = metrics["sys_cpu_user_ns"]
require.True(t, found)
require.Len(t, userCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, userCPU.GetType())
cpuUserNanos2 := userCPU.Metric[0].GetGauge().GetValue()

sysCPU, found = metrics["sys_cpu_sys_ns"]
require.True(t, found)
require.True(t, found)
require.Len(t, sysCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, sysCPU.GetType())
cpuSysNanos2 := sysCPU.Metric[0].GetGauge().GetValue()

require.LessOrEqual(t, float64(cpuTime.User)*1e6, cpuUserNanos2)
require.LessOrEqual(t, float64(cpuTime.Sys)*1e6, cpuSysNanos2)

_, found = metrics["jobs_running_non_idle"]
require.True(t, found)
_, found = metrics["sql_query_count"]
require.True(t, found)
_, found = metrics["sql_conns"]
require.True(t, found)
})

startNowNanos := timeutil.Now().UnixNano()
url := tenant.AdminURL() + "/_status/load"
client := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
resp, err := client.Get(url)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, 200, resp.StatusCode,
"invalid non-200 status code %v from tenant", resp.StatusCode)

var parser expfmt.TextParser
metrics, err := parser.TextToMetricFamilies(resp.Body)
require.NoError(t, err)

userCPU, found := metrics["sys_cpu_user_ns"]
require.True(t, found)
require.Len(t, userCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, userCPU.GetType())
cpuUserNanos := userCPU.Metric[0].GetGauge().GetValue()

sysCPU, found := metrics["sys_cpu_sys_ns"]
require.True(t, found)
require.Len(t, sysCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, sysCPU.GetType())
cpuSysNanos := sysCPU.Metric[0].GetGauge().GetValue()

now, found := metrics["sys_cpu_now_ns"]
require.True(t, found)
require.Len(t, now.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, now.GetType())
nowNanos := now.Metric[0].GetGauge().GetValue()

// The values are between zero and whatever User/Sys time is observed after the get.
require.Positive(t, cpuUserNanos)
require.Positive(t, cpuSysNanos)
require.Positive(t, nowNanos)
cpuTime := gosigar.ProcTime{}
require.NoError(t, cpuTime.Get(os.Getpid()))
require.LessOrEqual(t, cpuUserNanos, float64(cpuTime.User)*1e6)
require.LessOrEqual(t, cpuSysNanos, float64(cpuTime.Sys)*1e6)
require.GreaterOrEqual(t, nowNanos, float64(startNowNanos))
require.LessOrEqual(t, nowNanos, float64(timeutil.Now().UnixNano()))

resp, err = client.Get(url)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, 200, resp.StatusCode,
"invalid non-200 status code %v from tenant", resp.StatusCode)

metrics, err = parser.TextToMetricFamilies(resp.Body)
require.NoError(t, err)

userCPU, found = metrics["sys_cpu_user_ns"]
require.True(t, found)
require.Len(t, userCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, userCPU.GetType())
cpuUserNanos2 := userCPU.Metric[0].GetGauge().GetValue()

sysCPU, found = metrics["sys_cpu_sys_ns"]
require.True(t, found)
require.True(t, found)
require.Len(t, sysCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, sysCPU.GetType())
cpuSysNanos2 := sysCPU.Metric[0].GetGauge().GetValue()

require.LessOrEqual(t, float64(cpuTime.User)*1e6, cpuUserNanos2)
require.LessOrEqual(t, float64(cpuTime.Sys)*1e6, cpuSysNanos2)

_, found = metrics["jobs_running_non_idle"]
require.True(t, found)
_, found = metrics["sql_query_count"]
require.True(t, found)
_, found = metrics["sql_conns"]
require.True(t, found)
}
15 changes: 13 additions & 2 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,26 @@ type BaseConfig struct {
// ReadWithinUncertaintyIntervalError.
MaxOffset MaxOffsetType

// DisableRuntimeStatsMonitor prevents this server from starting the
// async task that collects runtime stats and triggers
// heap/goroutine dumps under high load.
DisableRuntimeStatsMonitor bool

// RuntimeStatSampler, if non-nil, will be used as source for
// run-time metrics instead of constructing a fresh one.
RuntimeStatSampler *status.RuntimeStatSampler

// GoroutineDumpDirName is the directory name for goroutine dumps using
// goroutinedumper.
// goroutinedumper. Only used if DisableRuntimeStatsMonitor is false.
GoroutineDumpDirName string

// HeapProfileDirName is the directory name for heap profiles using
// heapprofiler. If empty, no heap profiles will be collected.
// heapprofiler. If empty, no heap profiles will be collected. Only
// used if DisableRuntimeStatsMonitor is false.
HeapProfileDirName string

// CPUProfileDirName is the directory name for CPU profile dumps.
// Only used if DisableRuntimeStatsMonitor is false.
CPUProfileDirName string

// InflightTraceDirName is the directory name for job traces.
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

runtimeSampler := status.NewRuntimeStatSampler(ctx, clock)
registry.AddMetricStruct(runtimeSampler)
// Save a reference to this sampler for use by additional servers
// started via the server controller.
cfg.RuntimeStatSampler = runtimeSampler

registry.AddMetric(base.LicenseTTL)

Expand Down
26 changes: 21 additions & 5 deletions pkg/server/server_controller_new_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func makeSharedProcessTenantServerConfig(
storeSpec := candidateSpec
if !storeSpec.InMemory {
storeDir := filepath.Join(storeSpec.Path, "tenant-"+tenantID.String())
if err := os.MkdirAll(storeDir, 0700); err != nil {
if err := os.MkdirAll(storeDir, 0755); err != nil {
return baseCfg, sqlCfg, err
}
stopper.AddCloser(stop.CloserFn(func() {
Expand Down Expand Up @@ -296,13 +296,29 @@ func makeSharedProcessTenantServerConfig(
baseCfg.SSLCertsDir = kvServerCfg.BaseConfig.SSLCertsDir
baseCfg.SSLCAKey = kvServerCfg.BaseConfig.SSLCAKey

// TODO(knz): startSampleEnvironment() should not be part of startTenantInternal. For now,
// disable the mechanism manually.
// See: https://github.com/cockroachdb/cockroach/issues/84589
// Don't let this SQL server take its own background heap/goroutine/CPU profile dumps.
// The system tenant's SQL server is doing this job.
baseCfg.DisableRuntimeStatsMonitor = true
baseCfg.GoroutineDumpDirName = ""
baseCfg.HeapProfileDirName = ""
baseCfg.CPUProfileDirName = ""
baseCfg.InflightTraceDirName = ""

// Expose the process-wide runtime metrics to the tenant's metric
// collector. Since they are process-wide, all tenants can see them.
baseCfg.RuntimeStatSampler = kvServerCfg.BaseConfig.RuntimeStatSampler

// If job trace dumps were enabled for the top-level server, enable
// them for us too. However, in contrast to temporary files, we
// don't want them to be deleted when the tenant server shuts down.
// So we store them into a directory relative to the main trace dump
// directory.
if kvServerCfg.BaseConfig.InflightTraceDirName != "" {
traceDir := filepath.Join(kvServerCfg.BaseConfig.InflightTraceDirName, "tenant-"+tenantID.String())
if err := os.MkdirAll(traceDir, 0755); err != nil {
return baseCfg, sqlCfg, err
}
baseCfg.InflightTraceDirName = traceDir
}

// TODO(knz): Define a meaningful storage config for each tenant,
// see: https://github.com/cockroachdb/cockroach/issues/84588.
Expand Down
29 changes: 18 additions & 11 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,16 +569,18 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
s.sqlServer.cfg.SQLAdvertiseAddr,
)

// Begin recording runtime statistics.
if err := startSampleEnvironment(workersCtx,
s.ClusterSettings(),
s.stopper,
s.sqlServer.cfg.GoroutineDumpDirName,
s.sqlServer.cfg.HeapProfileDirName,
s.runtime,
s.tenantStatus.sessionRegistry,
); err != nil {
return err
if !s.sqlServer.cfg.DisableRuntimeStatsMonitor {
// Begin recording runtime statistics.
if err := startSampleEnvironment(workersCtx,
s.ClusterSettings(),
s.stopper,
s.sqlServer.cfg.GoroutineDumpDirName,
s.sqlServer.cfg.HeapProfileDirName,
s.runtime,
s.tenantStatus.sessionRegistry,
); err != nil {
return err
}
}

// Export statistics to graphite, if enabled by configuration.
Expand Down Expand Up @@ -949,7 +951,12 @@ func makeTenantSQLServerArgs(
parentRecorder.AddTenantRecorder(recorder)
}

runtime := status.NewRuntimeStatSampler(startupCtx, clock)
var runtime *status.RuntimeStatSampler
if baseCfg.RuntimeStatSampler != nil {
runtime = baseCfg.RuntimeStatSampler
} else {
runtime = status.NewRuntimeStatSampler(startupCtx, clock)
}
registry.AddMetricStruct(runtime)

// NB: The init method will be called in (*SQLServerWrapper).PreStart().
Expand Down