Skip to content

Commit

Permalink
tenant: add endpoint with instant metrics
Browse files Browse the repository at this point in the history
Previously the tenant process was serving various metrics on
`/_status/vars`. This endpoint has all the available metrics and these are
updated every 10 sec. Many of the metrics show a rate that is calculated
over the 10 sec interval. Some of the metrics are used by the cockroach
operator to monitor the CPU workload of the tenant process and use that
workload for automatic scaling. The 10 sec interval however is too long
and causes a slow scaling up. The reporting of high CPU utilization can
take up to 20 sec (to compute a delta). To resolve this, the PR adds a
new endpoint `/_status/load` that provides an instant reading of a
very small subset of the normal metrics - user and system CPU time for
now. By having these be instant, the client can retrieve in quick
succession, consecutive snapshots and compute a precise CPU utulization.
It also allows the client to control the interval between the two pulls
(as opposed to having it hard coded to 10 sec).

Release note: None
  • Loading branch information
darinpp committed Sep 30, 2021
1 parent aa7b70b commit 5b0bdb0
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 11 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_test(
"server_sql_test.go",
"tenant_grpc_test.go",
"tenant_status_test.go",
"tenant_vars_test.go",
],
embed = [":serverccl"],
deps = [
Expand Down Expand Up @@ -62,7 +63,10 @@ go_test(
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_elastic_gosigar//:gosigar",
"@com_github_lib_pq//:pq",
"@com_github_prometheus_client_model//go",
"@com_github_prometheus_common//expfmt",
"@com_github_stretchr_testify//require",
"@org_golang_x_crypto//bcrypt",
],
Expand Down
110 changes: 110 additions & 0 deletions pkg/ccl/serverccl/tenant_vars_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package serverccl

import (
"context"
"crypto/tls"
"net/http"
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/elastic/gosigar"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/require"
)

func TestTenantVars(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

serverParams, _ := tests.CreateTestServerParams()
testCluster := serverutils.StartNewTestCluster(t, 1 /* numNodes */, base.TestClusterArgs{
ServerArgs: serverParams,
})
defer testCluster.Stopper().Stop(ctx)

server := testCluster.Server(0 /* idx */)

tenant, _ := serverutils.StartTenant(t, server, base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(10 /* id */),
})

url := "https://" + tenant.HTTPAddr() + "/_status/load"
client := http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
resp, err := client.Get(url)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, 200, resp.StatusCode,
"invalid non-200 status code %v from tenant", resp.StatusCode)

var parser expfmt.TextParser
metrics, err := parser.TextToMetricFamilies(resp.Body)
require.NoError(t, err)

userCPU, found := metrics["sys_cpu_user_ns"]
require.True(t, found)
require.Len(t, userCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, userCPU.GetType())
cpuUserNanos := userCPU.Metric[0].GetGauge().GetValue()

sysCPU, found := metrics["sys_cpu_sys_ns"]
require.True(t, found)
require.True(t, found)
require.Len(t, sysCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, sysCPU.GetType())
cpuSysNanos := sysCPU.Metric[0].GetGauge().GetValue()

// The values are between zero and whatever User/Sys time is observed after the get.
require.Positive(t, cpuUserNanos)
require.Positive(t, cpuSysNanos)
cpuTime := gosigar.ProcTime{}
require.NoError(t, cpuTime.Get(os.Getpid()))
require.LessOrEqual(t, cpuUserNanos, float64(cpuTime.User)*1e6)
require.LessOrEqual(t, cpuSysNanos, float64(cpuTime.Sys)*1e6)

resp, err = client.Get(url)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, 200, resp.StatusCode,
"invalid non-200 status code %v from tenant", resp.StatusCode)

metrics, err = parser.TextToMetricFamilies(resp.Body)
require.NoError(t, err)

userCPU, found = metrics["sys_cpu_user_ns"]
require.True(t, found)
require.Len(t, userCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, userCPU.GetType())
cpuUserNanos2 := userCPU.Metric[0].GetGauge().GetValue()

sysCPU, found = metrics["sys_cpu_sys_ns"]
require.True(t, found)
require.True(t, found)
require.Len(t, sysCPU.GetMetric(), 1)
require.Equal(t, io_prometheus_client.MetricType_GAUGE, sysCPU.GetType())
cpuSysNanos2 := sysCPU.Metric[0].GetGauge().GetValue()

require.LessOrEqual(t, float64(cpuTime.User)*1e6, cpuUserNanos2)
require.LessOrEqual(t, float64(cpuTime.Sys)*1e6, cpuSysNanos2)
}
3 changes: 3 additions & 0 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ const (
// statusVars exposes prometheus metrics for monitoring consumption.
statusVars = statusPrefix + "vars"

// loadStatusVars exposes prometheus metrics for instant monitoring of CPU load.
loadStatusVars = statusPrefix + "load"

// raftStateDormant is used when there is no known raft state.
raftStateDormant = "StateDormant"

Expand Down
18 changes: 8 additions & 10 deletions pkg/server/status/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ func (rsr *RuntimeStatSampler) SampleEnvironment(
if err := mem.Get(pid); err != nil {
log.Ops.Errorf(ctx, "unable to get mem usage: %v", err)
}
cpuTime := gosigar.ProcTime{}
if err := cpuTime.Get(pid); err != nil {
userTimeMillis, sysTimeMillis, err := GetCPUTime(ctx)
if err != nil {
log.Ops.Errorf(ctx, "unable to get cpu usage: %v", err)
}
cgroupCPU, _ := cgroups.GetCgroupCPU()
Expand Down Expand Up @@ -507,8 +507,8 @@ func (rsr *RuntimeStatSampler) SampleEnvironment(
now := rsr.clock.PhysicalNow()
dur := float64(now - rsr.last.now)
// cpuTime.{User,Sys} are in milliseconds, convert to nanoseconds.
utime := int64(cpuTime.User) * 1e6
stime := int64(cpuTime.Sys) * 1e6
utime := userTimeMillis * 1e6
stime := sysTimeMillis * 1e6
urate := float64(utime-rsr.last.utime) / dur
srate := float64(stime-rsr.last.stime) / dur
combinedNormalizedPerc := (srate + urate) / cpuShare
Expand Down Expand Up @@ -690,14 +690,12 @@ func subtractNetworkCounters(from *net.IOCountersStat, sub net.IOCountersStat) {
from.PacketsSent -= sub.PacketsSent
}

// GetUserCPUSeconds returns the cumulative User CPU time for this process, in
// seconds.
func GetUserCPUSeconds(ctx context.Context) float64 {
// GetCPUTime returns the cumulative user/system time (in ms) since the process start.
func GetCPUTime(ctx context.Context) (userTimeMillis, sysTimeMillis int64, err error) {
pid := os.Getpid()
cpuTime := gosigar.ProcTime{}
if err := cpuTime.Get(pid); err != nil {
log.Ops.Errorf(ctx, "unable to get cpu usage: %v", err)
return 0, 0, err
}
// cpuTime.User is in milliseconds; convert to seconds.
return float64(cpuTime.User) * 1e-3
return int64(cpuTime.User), int64(cpuTime.Sys), nil
}
41 changes: 40 additions & 1 deletion pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func StartTenant(
})
f := varsHandler{metricSource: args.recorder, st: args.Settings}.handleVars
mux.Handle(statusVars, http.HandlerFunc(f))
ff := loadVarsHandler(ctx, args.runtime)
mux.Handle(loadStatusVars, http.HandlerFunc(ff))

tlsConnManager := netutil.MakeServer(
args.stopper,
Expand Down Expand Up @@ -277,8 +279,12 @@ func StartTenant(
log.SetTenantIDs(args.TenantID.String(), int32(s.SQLInstanceID()))

externalUsageFn := func(ctx context.Context) multitenant.ExternalUsage {
userTimeMillis, _, err := status.GetCPUTime(ctx)
if err != nil {
log.Ops.Errorf(ctx, "unable to get cpu usage: %v", err)
}
return multitenant.ExternalUsage{
CPUSecs: status.GetUserCPUSeconds(ctx),
CPUSecs: float64(userTimeMillis) * 1e-3,
PGWireBytes: s.pgServer.BytesInAndOut(),
}
}
Expand Down Expand Up @@ -308,6 +314,39 @@ func StartTenant(
return s, pgLAddr, httpLAddr, nil
}

// Construct a handler responsible for serving the instant values of selected
// load metrics. These include user and system CPU time currently.
func loadVarsHandler(
ctx context.Context, rsr *status.RuntimeStatSampler,
) func(http.ResponseWriter, *http.Request) {
cpuUserNanos := metric.NewGauge(rsr.CPUUserNS.GetMetadata())
cpuSysNanos := metric.NewGauge(rsr.CPUSysNS.GetMetadata())
registry := metric.NewRegistry()
registry.AddMetric(cpuUserNanos)
registry.AddMetric(cpuSysNanos)

return func(w http.ResponseWriter, r *http.Request) {
userTimeMillis, sysTimeMillis, err := status.GetCPUTime(ctx)
if err != nil {
// Just log but don't return an error to match the _status/vars metrics handler.
log.Ops.Errorf(ctx, "unable to get cpu usage: %v", err)
}
// cpuTime.{User,Sys} are in milliseconds, convert to nanoseconds.
utime := userTimeMillis * 1e6
stime := sysTimeMillis * 1e6
cpuUserNanos.Update(utime)
cpuSysNanos.Update(stime)

exporter := metric.MakePrometheusExporter()
exporter.ScrapeRegistry(registry, true)
if err := exporter.PrintAsText(w); err != nil {
log.Errorf(r.Context(), "%v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}

func makeTenantSQLServerArgs(
stopper *stop.Stopper, kvClusterName string, baseCfg BaseConfig, sqlCfg SQLConfig,
) (sqlServerArgs, error) {
Expand Down

0 comments on commit 5b0bdb0

Please sign in to comment.