Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server, metrics: remove the connection count on server, only use the metrics (#51996) #53056

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -2823,6 +2824,7 @@ func (e *SimpleExec) executeAdminFlushPlanCache(s *ast.AdminStmt) error {
}

func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) error {
originalResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if s.Name.L != "" {
if _, ok := e.is.ResourceGroupByName(s.Name); !ok {
return infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(s.Name.O)
Expand All @@ -2831,6 +2833,11 @@ func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) er
} else {
e.Ctx().GetSessionVars().ResourceGroupName = resourcegroup.DefaultResourceGroupName
}
newResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if originalResourceGroup != newResourceGroup {
metrics.ConnGauge.WithLabelValues(originalResourceGroup).Dec()
metrics.ConnGauge.WithLabelValues(newResourceGroup).Inc()
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/grafana/tidb_summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
"expr": "tidb_server_connections{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
"legendFormat": "{{instance}} {{resource_group}}",
"refId": "A"
},
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/grafana/tidb_summary.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ local connectionP = graphPanel.new(
.addTarget(
prometheus.target(
'tidb_server_connections{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}',
legendFormat='{{instance}}',
legendFormat='{{instance}} {{resource_group}}',
)
)
.addTarget(
Expand Down
36 changes: 14 additions & 22 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,28 +337,23 @@ func (cc *clientConn) handshake(ctx context.Context) error {
}

func (cc *clientConn) Close() error {
// Be careful, this function should be re-entrant. It might be called more than once for a single connection.
// Any logic which is not idempotent should be in closeConn() and wrapped with `cc.closeOnce.Do`, like decresing
// metrics, releasing resources, etc.
//
// TODO: avoid calling this function multiple times. It's not intuitive that a connection can be closed multiple
// times.
cc.server.rwlock.Lock()
delete(cc.server.clients, cc.connectionID)
resourceGroupName, count := "", 0
if ctx := cc.getCtx(); ctx != nil {
resourceGroupName = ctx.GetSessionVars().ResourceGroupName
count = cc.server.ConnNumByResourceGroup[resourceGroupName]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, resourceGroupName)
} else {
cc.server.ConnNumByResourceGroup[resourceGroupName]--
}
}
cc.server.rwlock.Unlock()
return closeConn(cc, resourceGroupName, count)
return closeConn(cc)
}

// closeConn is idempotent and thread-safe.
// It will be called on the same `clientConn` more than once to avoid connection leak.
func closeConn(cc *clientConn, resourceGroupName string, connections int) error {
func closeConn(cc *clientConn) error {
var err error
cc.closeOnce.Do(func() {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(connections))
if cc.connectionID > 0 {
cc.server.dom.ReleaseConnID(cc.connectionID)
cc.connectionID = 0
Expand All @@ -372,8 +367,12 @@ func closeConn(cc *clientConn, resourceGroupName string, connections int) error
}
}
// Close statements and session
// This will release advisory locks, row locks, etc.
// At first, it'll decrese the count of connections in the resource group, update the corresponding gauge.
// Then it'll close the statements and session, which release advisory locks, row locks, etc.
if ctx := cc.getCtx(); ctx != nil {
resourceGroupName := ctx.GetSessionVars().ResourceGroupName
metrics.ConnGauge.WithLabelValues(resourceGroupName).Dec()

err = ctx.Close()
}
})
Expand All @@ -382,14 +381,7 @@ func closeConn(cc *clientConn, resourceGroupName string, connections int) error

func (cc *clientConn) closeWithoutLock() error {
delete(cc.server.clients, cc.connectionID)
name := cc.getCtx().GetSessionVars().ResourceGroupName
count := cc.server.ConnNumByResourceGroup[name]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, name)
} else {
cc.server.ConnNumByResourceGroup[name]--
}
return closeConn(cc, name, count-1)
return closeConn(cc)
}

// writeInitialHandshake sends server version, connection ID, server capability, collation, server status
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2036,7 +2036,7 @@ func TestCloseConn(t *testing.T) {
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
err := closeConn(cc, "default", 1)
err := closeConn(cc)
require.NoError(t, err)
}()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/internal/testserverclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/config",
"//pkg/errno",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/mysql",
"//pkg/server",
"//pkg/testkit",
Expand All @@ -18,6 +19,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
],
Expand Down
67 changes: 67 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package testserverclient

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
Expand All @@ -38,11 +39,13 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testenv"
"github.com/pingcap/tidb/pkg/util/versioninfo"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -2532,4 +2535,68 @@ func (cli *TestServerClient) RunTestStmtCountLimit(t *testing.T) {
})
}

func (cli *TestServerClient) RunTestConnectionCount(t *testing.T) {
readConnCount := func(resourceGroupName string) float64 {
metric, err := metrics.ConnGauge.GetMetricWith(map[string]string{
metrics.LblResourceGroup: resourceGroupName,
})
require.NoError(t, err)
output := &dto.Metric{}
metric.Write(output)

return output.GetGauge().GetValue()
}

resourceGroupConnCountReached := func(t *testing.T, resourceGroupName string, expected float64) {
require.Eventually(t, func() bool {
return readConnCount(resourceGroupName) == expected
}, 5*time.Second, 100*time.Millisecond)
}

cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) {
ctx := context.Background()
dbt.GetDB().SetMaxIdleConns(0)

// start 100 connections
conns := make([]*sql.Conn, 100)
for i := 0; i < 100; i++ {
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
conns[i] = conn
}
resourceGroupConnCountReached(t, "default", 100.0)

// close 50 connections
for i := 0; i < 50; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 50.0)

// close 25 connections
for i := 50; i < 75; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 25.0)

// change the following 25 connections from `default` resource group to `test`
dbt.MustExec("create resource group test RU_PER_SEC = 1000;")
for i := 75; i < 100; i++ {
_, err := conns[i].ExecContext(ctx, "set resource group test")
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 25.0)

// close 25 connections
for i := 75; i < 100; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 0.0)
})
}

//revive:enable:exported
38 changes: 12 additions & 26 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ type Server struct {
socket net.Listener
concurrentLimiter *TokenLimiter

rwlock sync.RWMutex
clients map[uint64]*clientConn
ConnNumByResourceGroup map[string]int
rwlock sync.RWMutex
clients map[uint64]*clientConn

capability uint32
dom *domain.Domain
Expand Down Expand Up @@ -239,15 +238,14 @@ func (s *Server) newConn(conn net.Conn) *clientConn {
// NewServer creates a new Server.
func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
s := &Server{
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
clients: make(map[uint64]*clientConn),
ConnNumByResourceGroup: make(map[string]int),
internalSessions: make(map[any]struct{}, 100),
health: uatomic.NewBool(false),
inShutdownMode: uatomic.NewBool(false),
printMDLLogTime: time.Now(),
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
clients: make(map[uint64]*clientConn),
internalSessions: make(map[any]struct{}, 100),
health: uatomic.NewBool(false),
inShutdownMode: uatomic.NewBool(false),
printMDLLogTime: time.Now(),
}
s.capability = defaultCapability
setTxnScope()
Expand Down Expand Up @@ -633,27 +631,15 @@ func (s *Server) Close() {
func (s *Server) registerConn(conn *clientConn) bool {
s.rwlock.Lock()
defer s.rwlock.Unlock()
connections := make(map[string]int, 0)
for _, conn := range s.clients {
resourceGroup := conn.getCtx().GetSessionVars().ResourceGroupName
connections[resourceGroup]++
}

logger := logutil.BgLogger()
if s.inShutdownMode.Load() {
logger.Info("close connection directly when shutting down")
for resourceGroupName, count := range s.ConnNumByResourceGroup {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count))
}
terror.Log(closeConn(conn, "", 0))
terror.Log(closeConn(conn))
return false
}
s.clients[conn.connectionID] = conn
s.ConnNumByResourceGroup[conn.getCtx().GetSessionVars().ResourceGroupName]++

for name, count := range s.ConnNumByResourceGroup {
metrics.ConnGauge.WithLabelValues(name).Set(float64(count))
}
metrics.ConnGauge.WithLabelValues(conn.getCtx().GetSessionVars().ResourceGroupName).Inc()
return true
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/server/tests/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3134,3 +3134,8 @@ func TestProxyProtocolWithIpNoFallbackable(t *testing.T) {
require.NotNil(t, err)
db.Close()
}

func TestConnectionCount(t *testing.T) {
ts := servertestkit.CreateTidbTestSuite(t)
ts.RunTestConnectionCount(t)
}