diff --git a/pkg/executor/simple.go b/pkg/executor/simple.go index 19e52a9f3ae16..e228515af9046 100644 --- a/pkg/executor/simple.go +++ b/pkg/executor/simple.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/model" @@ -2823,6 +2824,7 @@ func (e *SimpleExec) executeAdminFlushPlanCache(s *ast.AdminStmt) error { } func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) error { + originalResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName if s.Name.L != "" { if _, ok := e.is.ResourceGroupByName(s.Name); !ok { return infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(s.Name.O) @@ -2831,6 +2833,11 @@ func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) er } else { e.Ctx().GetSessionVars().ResourceGroupName = resourcegroup.DefaultResourceGroupName } + newResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName + if originalResourceGroup != newResourceGroup { + metrics.ConnGauge.WithLabelValues(originalResourceGroup).Dec() + metrics.ConnGauge.WithLabelValues(newResourceGroup).Inc() + } return nil } diff --git a/pkg/metrics/grafana/tidb_summary.json b/pkg/metrics/grafana/tidb_summary.json index aa2611e772aab..085eec4916af8 100644 --- a/pkg/metrics/grafana/tidb_summary.json +++ b/pkg/metrics/grafana/tidb_summary.json @@ -163,7 +163,7 @@ "expr": "tidb_server_connections{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}", "format": "time_series", "intervalFactor": 2, - "legendFormat": "{{instance}}", + "legendFormat": "{{instance}} {{resource_group}}", "refId": "A" }, { diff --git a/pkg/metrics/grafana/tidb_summary.jsonnet b/pkg/metrics/grafana/tidb_summary.jsonnet index e11d6587e0429..2d3e6c1d5fc38 100644 --- a/pkg/metrics/grafana/tidb_summary.jsonnet +++ b/pkg/metrics/grafana/tidb_summary.jsonnet @@ -111,7 +111,7 @@ local connectionP = graphPanel.new( .addTarget( prometheus.target( 'tidb_server_connections{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}', - legendFormat='{{instance}}', + legendFormat='{{instance}} {{resource_group}}', ) ) .addTarget( diff --git a/pkg/server/conn.go b/pkg/server/conn.go index e3fb01e3d03d1..3f0f3c0fc7b2e 100644 --- a/pkg/server/conn.go +++ b/pkg/server/conn.go @@ -337,28 +337,23 @@ func (cc *clientConn) handshake(ctx context.Context) error { } func (cc *clientConn) Close() error { + // Be careful, this function should be re-entrant. It might be called more than once for a single connection. + // Any logic which is not idempotent should be in closeConn() and wrapped with `cc.closeOnce.Do`, like decresing + // metrics, releasing resources, etc. + // + // TODO: avoid calling this function multiple times. It's not intuitive that a connection can be closed multiple + // times. cc.server.rwlock.Lock() delete(cc.server.clients, cc.connectionID) - resourceGroupName, count := "", 0 - if ctx := cc.getCtx(); ctx != nil { - resourceGroupName = ctx.GetSessionVars().ResourceGroupName - count = cc.server.ConnNumByResourceGroup[resourceGroupName] - if count <= 1 { - delete(cc.server.ConnNumByResourceGroup, resourceGroupName) - } else { - cc.server.ConnNumByResourceGroup[resourceGroupName]-- - } - } cc.server.rwlock.Unlock() - return closeConn(cc, resourceGroupName, count) + return closeConn(cc) } // closeConn is idempotent and thread-safe. // It will be called on the same `clientConn` more than once to avoid connection leak. -func closeConn(cc *clientConn, resourceGroupName string, connections int) error { +func closeConn(cc *clientConn) error { var err error cc.closeOnce.Do(func() { - metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(connections)) if cc.connectionID > 0 { cc.server.dom.ReleaseConnID(cc.connectionID) cc.connectionID = 0 @@ -372,8 +367,12 @@ func closeConn(cc *clientConn, resourceGroupName string, connections int) error } } // Close statements and session - // This will release advisory locks, row locks, etc. + // At first, it'll decrese the count of connections in the resource group, update the corresponding gauge. + // Then it'll close the statements and session, which release advisory locks, row locks, etc. if ctx := cc.getCtx(); ctx != nil { + resourceGroupName := ctx.GetSessionVars().ResourceGroupName + metrics.ConnGauge.WithLabelValues(resourceGroupName).Dec() + err = ctx.Close() } }) @@ -382,14 +381,7 @@ func closeConn(cc *clientConn, resourceGroupName string, connections int) error func (cc *clientConn) closeWithoutLock() error { delete(cc.server.clients, cc.connectionID) - name := cc.getCtx().GetSessionVars().ResourceGroupName - count := cc.server.ConnNumByResourceGroup[name] - if count <= 1 { - delete(cc.server.ConnNumByResourceGroup, name) - } else { - cc.server.ConnNumByResourceGroup[name]-- - } - return closeConn(cc, name, count-1) + return closeConn(cc) } // writeInitialHandshake sends server version, connection ID, server capability, collation, server status diff --git a/pkg/server/conn_test.go b/pkg/server/conn_test.go index c6f4de555083f..8aa40dfcf52b6 100644 --- a/pkg/server/conn_test.go +++ b/pkg/server/conn_test.go @@ -2036,7 +2036,7 @@ func TestCloseConn(t *testing.T) { for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() - err := closeConn(cc, "default", 1) + err := closeConn(cc) require.NoError(t, err) }() } diff --git a/pkg/server/internal/testserverclient/BUILD.bazel b/pkg/server/internal/testserverclient/BUILD.bazel index dd38198adffb2..3af81cf81a0ee 100644 --- a/pkg/server/internal/testserverclient/BUILD.bazel +++ b/pkg/server/internal/testserverclient/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//pkg/config", "//pkg/errno", "//pkg/kv", + "//pkg/metrics", "//pkg/parser/mysql", "//pkg/server", "//pkg/testkit", @@ -18,6 +19,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_log//:log", + "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//require", "@org_golang_x_text//encoding/simplifiedchinese", "@org_uber_go_zap//:zap", diff --git a/pkg/server/internal/testserverclient/server_client.go b/pkg/server/internal/testserverclient/server_client.go index 97ab06d8af9ed..8435bc2ad3779 100644 --- a/pkg/server/internal/testserverclient/server_client.go +++ b/pkg/server/internal/testserverclient/server_client.go @@ -39,11 +39,13 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/metrics" tmysql "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/server" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testenv" "github.com/pingcap/tidb/pkg/util/versioninfo" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "go.uber.org/zap" "golang.org/x/text/encoding/simplifiedchinese" @@ -2594,4 +2596,68 @@ func (cli *TestServerClient) RunTestTypeAndCharsetOfSendLongData(t *testing.T) { }) } +func (cli *TestServerClient) RunTestConnectionCount(t *testing.T) { + readConnCount := func(resourceGroupName string) float64 { + metric, err := metrics.ConnGauge.GetMetricWith(map[string]string{ + metrics.LblResourceGroup: resourceGroupName, + }) + require.NoError(t, err) + output := &dto.Metric{} + metric.Write(output) + + return output.GetGauge().GetValue() + } + + resourceGroupConnCountReached := func(t *testing.T, resourceGroupName string, expected float64) { + require.Eventually(t, func() bool { + return readConnCount(resourceGroupName) == expected + }, 5*time.Second, 100*time.Millisecond) + } + + cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) { + ctx := context.Background() + dbt.GetDB().SetMaxIdleConns(0) + + // start 100 connections + conns := make([]*sql.Conn, 100) + for i := 0; i < 100; i++ { + conn, err := dbt.GetDB().Conn(ctx) + require.NoError(t, err) + conns[i] = conn + } + resourceGroupConnCountReached(t, "default", 100.0) + + // close 50 connections + for i := 0; i < 50; i++ { + err := conns[i].Close() + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "default", 50.0) + + // close 25 connections + for i := 50; i < 75; i++ { + err := conns[i].Close() + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "default", 25.0) + + // change the following 25 connections from `default` resource group to `test` + dbt.MustExec("create resource group test RU_PER_SEC = 1000;") + for i := 75; i < 100; i++ { + _, err := conns[i].ExecContext(ctx, "set resource group test") + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "default", 0.0) + resourceGroupConnCountReached(t, "test", 25.0) + + // close 25 connections + for i := 75; i < 100; i++ { + err := conns[i].Close() + require.NoError(t, err) + } + resourceGroupConnCountReached(t, "default", 0.0) + resourceGroupConnCountReached(t, "test", 0.0) + }) +} + //revive:enable:exported diff --git a/pkg/server/server.go b/pkg/server/server.go index 55df85982c6bd..df1ce623f3638 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -119,9 +119,8 @@ type Server struct { socket net.Listener concurrentLimiter *TokenLimiter - rwlock sync.RWMutex - clients map[uint64]*clientConn - ConnNumByResourceGroup map[string]int + rwlock sync.RWMutex + clients map[uint64]*clientConn capability uint32 dom *domain.Domain @@ -239,15 +238,14 @@ func (s *Server) newConn(conn net.Conn) *clientConn { // NewServer creates a new Server. func NewServer(cfg *config.Config, driver IDriver) (*Server, error) { s := &Server{ - cfg: cfg, - driver: driver, - concurrentLimiter: NewTokenLimiter(cfg.TokenLimit), - clients: make(map[uint64]*clientConn), - ConnNumByResourceGroup: make(map[string]int), - internalSessions: make(map[any]struct{}, 100), - health: uatomic.NewBool(false), - inShutdownMode: uatomic.NewBool(false), - printMDLLogTime: time.Now(), + cfg: cfg, + driver: driver, + concurrentLimiter: NewTokenLimiter(cfg.TokenLimit), + clients: make(map[uint64]*clientConn), + internalSessions: make(map[any]struct{}, 100), + health: uatomic.NewBool(false), + inShutdownMode: uatomic.NewBool(false), + printMDLLogTime: time.Now(), } s.capability = defaultCapability setTxnScope() @@ -633,27 +631,15 @@ func (s *Server) Close() { func (s *Server) registerConn(conn *clientConn) bool { s.rwlock.Lock() defer s.rwlock.Unlock() - connections := make(map[string]int, 0) - for _, conn := range s.clients { - resourceGroup := conn.getCtx().GetSessionVars().ResourceGroupName - connections[resourceGroup]++ - } logger := logutil.BgLogger() if s.inShutdownMode.Load() { logger.Info("close connection directly when shutting down") - for resourceGroupName, count := range s.ConnNumByResourceGroup { - metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count)) - } - terror.Log(closeConn(conn, "", 0)) + terror.Log(closeConn(conn)) return false } s.clients[conn.connectionID] = conn - s.ConnNumByResourceGroup[conn.getCtx().GetSessionVars().ResourceGroupName]++ - - for name, count := range s.ConnNumByResourceGroup { - metrics.ConnGauge.WithLabelValues(name).Set(float64(count)) - } + metrics.ConnGauge.WithLabelValues(conn.getCtx().GetSessionVars().ResourceGroupName).Inc() return true } diff --git a/pkg/server/tests/tidb_test.go b/pkg/server/tests/tidb_test.go index 46394611bc65e..fd8d3a8d66689 100644 --- a/pkg/server/tests/tidb_test.go +++ b/pkg/server/tests/tidb_test.go @@ -3139,3 +3139,8 @@ func TestTypeAndCharsetOfSendLongData(t *testing.T) { ts := servertestkit.CreateTidbTestSuite(t) ts.RunTestTypeAndCharsetOfSendLongData(t) } + +func TestConnectionCount(t *testing.T) { + ts := servertestkit.CreateTidbTestSuite(t) + ts.RunTestConnectionCount(t) +}