From 07ef00948600e467c959fc18703b88ae6a0a75b7 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Fri, 22 Mar 2024 17:47:15 +0800 Subject: [PATCH] server, metrics: remove the connection count on server, only use the metrics (#51996) close pingcap/tidb#51889 --- pkg/executor/simple.go | 7 ++ pkg/metrics/grafana/tidb_summary.json | 6 +- pkg/metrics/grafana/tidb_summary.jsonnet | 14 +++- pkg/server/conn.go | 37 ++++------- pkg/server/conn_test.go | 2 +- .../internal/testserverclient/BUILD.bazel | 2 + .../testserverclient/server_client.go | 66 +++++++++++++++++++ pkg/server/server.go | 38 ++++------- pkg/server/tests/commontest/BUILD.bazel | 2 +- pkg/server/tests/commontest/tidb_test.go | 5 ++ 10 files changed, 124 insertions(+), 55 deletions(-) diff --git a/pkg/executor/simple.go b/pkg/executor/simple.go index ec240bbd5a9fb..a34bafa6f935d 100644 --- a/pkg/executor/simple.go +++ b/pkg/executor/simple.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/model" @@ -2850,6 +2851,7 @@ func (e *SimpleExec) executeAdminUnsetBDRRole() error { } func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) error { + originalResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName if s.Name.L != "" { if _, ok := e.is.ResourceGroupByName(s.Name); !ok { return infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(s.Name.O) @@ -2858,6 +2860,11 @@ func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) er } else { e.Ctx().GetSessionVars().ResourceGroupName = resourcegroup.DefaultResourceGroupName } + newResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName + if originalResourceGroup != newResourceGroup { + metrics.ConnGauge.WithLabelValues(originalResourceGroup).Dec() + metrics.ConnGauge.WithLabelValues(newResourceGroup).Inc() + } return nil } diff --git a/pkg/metrics/grafana/tidb_summary.json b/pkg/metrics/grafana/tidb_summary.json index 70008e79e8ea7..cc45a8dfbc670 100644 --- a/pkg/metrics/grafana/tidb_summary.json +++ b/pkg/metrics/grafana/tidb_summary.json @@ -163,7 +163,7 @@ "expr": "tidb_server_connections{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}", "format": "time_series", "intervalFactor": 2, - "legendFormat": "{{instance}}", + "legendFormat": "{{instance}} {{resource_group}}", "refId": "A" }, { @@ -265,7 +265,7 @@ "intervalFactor": 2, "legendFormat": "quota-{{instance}}", "refId": "B" - } + } ], "thresholds": [ ], "timeFrom": null, @@ -365,7 +365,7 @@ "intervalFactor": 2, "legendFormat": "quota-{{instance}}", "refId": "C" - } + } ], "thresholds": [ ], "timeFrom": null, diff --git a/pkg/metrics/grafana/tidb_summary.jsonnet b/pkg/metrics/grafana/tidb_summary.jsonnet index 75b555dfd7f77..3048cd54056e4 100644 --- a/pkg/metrics/grafana/tidb_summary.jsonnet +++ b/pkg/metrics/grafana/tidb_summary.jsonnet @@ -111,7 +111,7 @@ local connectionP = graphPanel.new( .addTarget( prometheus.target( 'tidb_server_connections{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}', - legendFormat='{{instance}}', + legendFormat='{{instance}} {{resource_group}}', ) ) .addTarget( @@ -133,6 +133,12 @@ local cpuP = graphPanel.new( 'rate(process_cpu_seconds_total{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"}[1m])', legendFormat='{{instance}}', ) +) +.addTarget( + prometheus.target( + 'tidb_server_maxprocs{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}', + legendFormat='quota-{{instance}}', + ) ); local memP = graphPanel.new( @@ -153,6 +159,12 @@ local memP = graphPanel.new( 'go_memory_classes_heap_objects_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"} + go_memory_classes_heap_unused_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"}', legendFormat='HeapInuse-{{instance}}', ) +) +.addTarget( + prometheus.target( + 'tidb_server_memory_quota_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"}', + legendFormat='quota-{{instance}}', + ) ); // Query Summary diff --git a/pkg/server/conn.go b/pkg/server/conn.go index 7988dac574198..8f4a386943e31 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -361,29 +361,23 @@ func (cc *clientConn) handshake(ctx context.Context) error { } func (cc *clientConn) Close() error { + // Be careful, this function should be re-entrant. It might be called more than once for a single connection. + // Any logic which is not idempotent should be in closeConn() and wrapped with `cc.closeOnce.Do`, like decresing + // metrics, releasing resources, etc. + // + // TODO: avoid calling this function multiple times. It's not intuitive that a connection can be closed multiple + // times. cc.server.rwlock.Lock() delete(cc.server.clients, cc.connectionID) - resourceGroupName, count := "", 0 - if ctx := cc.getCtx(); ctx != nil { - resourceGroupName = ctx.GetSessionVars().ResourceGroupName - count = cc.server.ConnNumByResourceGroup[resourceGroupName] - if count <= 1 { - delete(cc.server.ConnNumByResourceGroup, resourceGroupName) - } else { - cc.server.ConnNumByResourceGroup[resourceGroupName]-- - } - } cc.server.rwlock.Unlock() - return closeConn(cc, resourceGroupName, count) + return closeConn(cc) } // closeConn is idempotent and thread-safe. // It will be called on the same `clientConn` more than once to avoid connection leak. -func closeConn(cc *clientConn, resourceGroupName string, count int) error { +func closeConn(cc *clientConn) error { var err error cc.closeOnce.Do(func() { - metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count)) - if cc.connectionID > 0 { cc.server.dom.ReleaseConnID(cc.connectionID) cc.connectionID = 0 @@ -397,8 +391,12 @@ func closeConn(cc *clientConn, resourceGroupName string, count int) error { } } // Close statements and session - // This will release advisory locks, row locks, etc. + // At first, it'll decrese the count of connections in the resource group, update the corresponding gauge. + // Then it'll close the statements and session, which release advisory locks, row locks, etc. if ctx := cc.getCtx(); ctx != nil { + resourceGroupName := ctx.GetSessionVars().ResourceGroupName + metrics.ConnGauge.WithLabelValues(resourceGroupName).Dec() + err = ctx.Close() } }) @@ -407,14 +405,7 @@ func closeConn(cc *clientConn, resourceGroupName string, count int) error { func (cc *clientConn) closeWithoutLock() error { delete(cc.server.clients, cc.connectionID) - name := cc.getCtx().GetSessionVars().ResourceGroupName - count := cc.server.ConnNumByResourceGroup[name] - if count <= 1 { - delete(cc.server.ConnNumByResourceGroup, name) - } else { - cc.server.ConnNumByResourceGroup[name]-- - } - return closeConn(cc, name, count-1) + return closeConn(cc) } // writeInitialHandshake sends server version, connection ID, server capability, collation, server status diff --git a/pkg/server/conn_test.go b/pkg/server/conn_test.go index 25161e4536260..ee280b768dc4a 100644 --- a/pkg/server/conn_test.go +++ b/pkg/server/conn_test.go @@ -2079,7 +2079,7 @@ func TestCloseConn(t *testing.T) { for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() - err := closeConn(cc, "", 1) + err := closeConn(cc) require.NoError(t, err) }() } diff --git a/pkg/server/internal/testserverclient/BUILD.bazel b/pkg/server/internal/testserverclient/BUILD.bazel index 1f751d9f38580..4625be01ab8f3 100644 --- a/pkg/server/internal/testserverclient/BUILD.bazel +++ b/pkg/server/internal/testserverclient/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/errno", "//pkg/kv", + "//pkg/metrics", "//pkg/parser/mysql", "//pkg/server", "//pkg/testkit", @@ -17,6 +18,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_log//:log", + "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//require", "@org_uber_go_zap//:zap", ], diff --git a/pkg/server/internal/testserverclient/server_client.go b/pkg/server/internal/testserverclient/server_client.go index 5bceea2dc3326..f3e205eb37912 100644 --- a/pkg/server/internal/testserverclient/server_client.go +++ b/pkg/server/internal/testserverclient/server_client.go @@ -39,11 +39,13 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/metrics" tmysql "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/server" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testenv" "github.com/pingcap/tidb/pkg/util/versioninfo" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -2661,4 +2663,68 @@ func (cli *TestServerClient) RunTestSQLModeIsLoadedBeforeQuery(t *testing.T) { }) } +func (cli *TestServerClient) RunTestConnectionCount(t *testing.T) { + readConnCount := func(resourceGroupName string) float64 { + metric, err := metrics.ConnGauge.GetMetricWith(map[string]string{ + metrics.LblResourceGroup: resourceGroupName, + }) + require.NoError(t, err) + output := &dto.Metric{} + metric.Write(output) + + return output.GetGauge().GetValue() + } + + resourceGroupConnCountReached := func(t *testing.T, resourceGroupName string, expected float64) { + require.Eventually(t, func() bool { + return readConnCount(resourceGroupName) == expected + }, 5*time.Second, 100*time.Millisecond) + } + + cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) { + ctx := context.Background() + dbt.GetDB().SetMaxIdleConns(0) + + // start 100 connections + conns := make([]*sql.Conn, 100) + for i := 0; i < 100; i++ { + conn, err := dbt.GetDB().Conn(ctx) + require.NoError(t, err) + conns[i] = conn + } + resourceGroupConnCountReached(t, "default", 100.0) + + // close 50 connections + for i := 0; i < 50; i++ { + err := conns[i].Close() + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "default", 50.0) + + // close 25 connections + for i := 50; i < 75; i++ { + err := conns[i].Close() + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "default", 25.0) + + // change the following 25 connections from `default` resource group to `test` + dbt.MustExec("create resource group test RU_PER_SEC = 1000;") + for i := 75; i < 100; i++ { + _, err := conns[i].ExecContext(ctx, "set resource group test") + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "default", 0.0) + resourceGroupConnCountReached(t, "test", 25.0) + + // close 25 connections + for i := 75; i < 100; i++ { + err := conns[i].Close() + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "default", 0.0) + resourceGroupConnCountReached(t, "test", 0.0) + }) +} + //revive:enable:exported diff --git a/pkg/server/server.go b/pkg/server/server.go index 1283fa7850bca..394573bb58b4f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -120,9 +120,8 @@ type Server struct { socket net.Listener concurrentLimiter *TokenLimiter - rwlock sync.RWMutex - clients map[uint64]*clientConn - ConnNumByResourceGroup map[string]int + rwlock sync.RWMutex + clients map[uint64]*clientConn capability uint32 dom *domain.Domain @@ -240,15 +239,14 @@ func (s *Server) newConn(conn net.Conn) *clientConn { // NewServer creates a new Server. func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { s := &Server{ - cfg: cfg, - driver: driver, - concurrentLimiter: NewTokenLimiter(cfg.TokenLimit), - clients: make(map[uint64]*clientConn), - ConnNumByResourceGroup: make(map[string]int), - internalSessions: make(map[any]struct{}, 100), - health: uatomic.NewBool(false), - inShutdownMode: uatomic.NewBool(false), - printMDLLogTime: time.Now(), + cfg: cfg, + driver: driver, + concurrentLimiter: NewTokenLimiter(cfg.TokenLimit), + clients: make(map[uint64]*clientConn), + internalSessions: make(map[any]struct{}, 100), + health: uatomic.NewBool(false), + inShutdownMode: uatomic.NewBool(false), + printMDLLogTime: time.Now(), } s.capability = defaultCapability setTxnScope() @@ -634,27 +632,15 @@ func (s *Server) Close() { func (s *Server) registerConn(conn *clientConn) bool { s.rwlock.Lock() defer s.rwlock.Unlock() - connections := make(map[string]int, 0) - for _, conn := range s.clients { - resourceGroup := conn.getCtx().GetSessionVars().ResourceGroupName - connections[resourceGroup]++ - } logger := logutil.BgLogger() if s.inShutdownMode.Load() { logger.Info("close connection directly when shutting down") - for resourceGroupName, count := range s.ConnNumByResourceGroup { - metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count)) - } - terror.Log(closeConn(conn, "", 0)) + terror.Log(closeConn(conn)) return false } s.clients[conn.connectionID] = conn - s.ConnNumByResourceGroup[conn.getCtx().GetSessionVars().ResourceGroupName]++ - - for name, count := range s.ConnNumByResourceGroup { - metrics.ConnGauge.WithLabelValues(name).Set(float64(count)) - } + metrics.ConnGauge.WithLabelValues(conn.getCtx().GetSessionVars().ResourceGroupName).Inc() return true } diff --git a/pkg/server/tests/commontest/BUILD.bazel b/pkg/server/tests/commontest/BUILD.bazel index bc0005fae051b..d2b4d8ed82490 100644 --- a/pkg/server/tests/commontest/BUILD.bazel +++ b/pkg/server/tests/commontest/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "tidb_test.go", ], flaky = True, - shard_count = 48, + shard_count = 49, deps = [ "//pkg/config", "//pkg/ddl/util", diff --git a/pkg/server/tests/commontest/tidb_test.go b/pkg/server/tests/commontest/tidb_test.go index 617274d6d39dc..5e8b0f431b3c3 100644 --- a/pkg/server/tests/commontest/tidb_test.go +++ b/pkg/server/tests/commontest/tidb_test.go @@ -3077,3 +3077,8 @@ func TestSQLModeIsLoadedBeforeQuery(t *testing.T) { ts := servertestkit.CreateTidbTestSuite(t) ts.RunTestSQLModeIsLoadedBeforeQuery(t) } + +func TestConnectionCount(t *testing.T) { + ts := servertestkit.CreateTidbTestSuite(t) + ts.RunTestConnectionCount(t) +}