Skip to content

Commit

Permalink
server, metrics: remove the connection count on server, only use the …
Browse files Browse the repository at this point in the history
…metrics (#51996) (#53056)

close #51889
  • Loading branch information
ti-chi-bot authored May 23, 2024
1 parent 1cc3f9b commit abf0783
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 51 deletions.
7 changes: 7 additions & 0 deletions pkg/executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -2823,6 +2824,7 @@ func (e *SimpleExec) executeAdminFlushPlanCache(s *ast.AdminStmt) error {
}

func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) error {
originalResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if s.Name.L != "" {
if _, ok := e.is.ResourceGroupByName(s.Name); !ok {
return infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(s.Name.O)
Expand All @@ -2831,6 +2833,11 @@ func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) er
} else {
e.Ctx().GetSessionVars().ResourceGroupName = resourcegroup.DefaultResourceGroupName
}
newResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if originalResourceGroup != newResourceGroup {
metrics.ConnGauge.WithLabelValues(originalResourceGroup).Dec()
metrics.ConnGauge.WithLabelValues(newResourceGroup).Inc()
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/grafana/tidb_summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
"expr": "tidb_server_connections{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
"legendFormat": "{{instance}} {{resource_group}}",
"refId": "A"
},
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/grafana/tidb_summary.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ local connectionP = graphPanel.new(
.addTarget(
prometheus.target(
'tidb_server_connections{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}',
legendFormat='{{instance}}',
legendFormat='{{instance}} {{resource_group}}',
)
)
.addTarget(
Expand Down
36 changes: 14 additions & 22 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,28 +337,23 @@ func (cc *clientConn) handshake(ctx context.Context) error {
}

func (cc *clientConn) Close() error {
// Be careful, this function should be re-entrant. It might be called more than once for a single connection.
// Any logic which is not idempotent should be in closeConn() and wrapped with `cc.closeOnce.Do`, like decresing
// metrics, releasing resources, etc.
//
// TODO: avoid calling this function multiple times. It's not intuitive that a connection can be closed multiple
// times.
cc.server.rwlock.Lock()
delete(cc.server.clients, cc.connectionID)
resourceGroupName, count := "", 0
if ctx := cc.getCtx(); ctx != nil {
resourceGroupName = ctx.GetSessionVars().ResourceGroupName
count = cc.server.ConnNumByResourceGroup[resourceGroupName]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, resourceGroupName)
} else {
cc.server.ConnNumByResourceGroup[resourceGroupName]--
}
}
cc.server.rwlock.Unlock()
return closeConn(cc, resourceGroupName, count)
return closeConn(cc)
}

// closeConn is idempotent and thread-safe.
// It will be called on the same `clientConn` more than once to avoid connection leak.
func closeConn(cc *clientConn, resourceGroupName string, connections int) error {
func closeConn(cc *clientConn) error {
var err error
cc.closeOnce.Do(func() {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(connections))
if cc.connectionID > 0 {
cc.server.dom.ReleaseConnID(cc.connectionID)
cc.connectionID = 0
Expand All @@ -372,8 +367,12 @@ func closeConn(cc *clientConn, resourceGroupName string, connections int) error
}
}
// Close statements and session
// This will release advisory locks, row locks, etc.
// At first, it'll decrese the count of connections in the resource group, update the corresponding gauge.
// Then it'll close the statements and session, which release advisory locks, row locks, etc.
if ctx := cc.getCtx(); ctx != nil {
resourceGroupName := ctx.GetSessionVars().ResourceGroupName
metrics.ConnGauge.WithLabelValues(resourceGroupName).Dec()

err = ctx.Close()
}
})
Expand All @@ -382,14 +381,7 @@ func closeConn(cc *clientConn, resourceGroupName string, connections int) error

func (cc *clientConn) closeWithoutLock() error {
delete(cc.server.clients, cc.connectionID)
name := cc.getCtx().GetSessionVars().ResourceGroupName
count := cc.server.ConnNumByResourceGroup[name]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, name)
} else {
cc.server.ConnNumByResourceGroup[name]--
}
return closeConn(cc, name, count-1)
return closeConn(cc)
}

// writeInitialHandshake sends server version, connection ID, server capability, collation, server status
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2036,7 +2036,7 @@ func TestCloseConn(t *testing.T) {
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
err := closeConn(cc, "default", 1)
err := closeConn(cc)
require.NoError(t, err)
}()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/internal/testserverclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/config",
"//pkg/errno",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/mysql",
"//pkg/server",
"//pkg/testkit",
Expand All @@ -18,6 +19,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
"@org_golang_x_text//encoding/simplifiedchinese",
"@org_uber_go_zap//:zap",
Expand Down
66 changes: 66 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testenv"
"github.com/pingcap/tidb/pkg/util/versioninfo"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/text/encoding/simplifiedchinese"
Expand Down Expand Up @@ -2594,4 +2596,68 @@ func (cli *TestServerClient) RunTestTypeAndCharsetOfSendLongData(t *testing.T) {
})
}

func (cli *TestServerClient) RunTestConnectionCount(t *testing.T) {
readConnCount := func(resourceGroupName string) float64 {
metric, err := metrics.ConnGauge.GetMetricWith(map[string]string{
metrics.LblResourceGroup: resourceGroupName,
})
require.NoError(t, err)
output := &dto.Metric{}
metric.Write(output)

return output.GetGauge().GetValue()
}

resourceGroupConnCountReached := func(t *testing.T, resourceGroupName string, expected float64) {
require.Eventually(t, func() bool {
return readConnCount(resourceGroupName) == expected
}, 5*time.Second, 100*time.Millisecond)
}

cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) {
ctx := context.Background()
dbt.GetDB().SetMaxIdleConns(0)

// start 100 connections
conns := make([]*sql.Conn, 100)
for i := 0; i < 100; i++ {
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
conns[i] = conn
}
resourceGroupConnCountReached(t, "default", 100.0)

// close 50 connections
for i := 0; i < 50; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 50.0)

// close 25 connections
for i := 50; i < 75; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 25.0)

// change the following 25 connections from `default` resource group to `test`
dbt.MustExec("create resource group test RU_PER_SEC = 1000;")
for i := 75; i < 100; i++ {
_, err := conns[i].ExecContext(ctx, "set resource group test")
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 25.0)

// close 25 connections
for i := 75; i < 100; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 0.0)
})
}

//revive:enable:exported
38 changes: 12 additions & 26 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ type Server struct {
socket net.Listener
concurrentLimiter *TokenLimiter

rwlock sync.RWMutex
clients map[uint64]*clientConn
ConnNumByResourceGroup map[string]int
rwlock sync.RWMutex
clients map[uint64]*clientConn

capability uint32
dom *domain.Domain
Expand Down Expand Up @@ -239,15 +238,14 @@ func (s *Server) newConn(conn net.Conn) *clientConn {
// NewServer creates a new Server.
func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
s := &Server{
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
clients: make(map[uint64]*clientConn),
ConnNumByResourceGroup: make(map[string]int),
internalSessions: make(map[any]struct{}, 100),
health: uatomic.NewBool(false),
inShutdownMode: uatomic.NewBool(false),
printMDLLogTime: time.Now(),
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
clients: make(map[uint64]*clientConn),
internalSessions: make(map[any]struct{}, 100),
health: uatomic.NewBool(false),
inShutdownMode: uatomic.NewBool(false),
printMDLLogTime: time.Now(),
}
s.capability = defaultCapability
setTxnScope()
Expand Down Expand Up @@ -633,27 +631,15 @@ func (s *Server) Close() {
func (s *Server) registerConn(conn *clientConn) bool {
s.rwlock.Lock()
defer s.rwlock.Unlock()
connections := make(map[string]int, 0)
for _, conn := range s.clients {
resourceGroup := conn.getCtx().GetSessionVars().ResourceGroupName
connections[resourceGroup]++
}

logger := logutil.BgLogger()
if s.inShutdownMode.Load() {
logger.Info("close connection directly when shutting down")
for resourceGroupName, count := range s.ConnNumByResourceGroup {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count))
}
terror.Log(closeConn(conn, "", 0))
terror.Log(closeConn(conn))
return false
}
s.clients[conn.connectionID] = conn
s.ConnNumByResourceGroup[conn.getCtx().GetSessionVars().ResourceGroupName]++

for name, count := range s.ConnNumByResourceGroup {
metrics.ConnGauge.WithLabelValues(name).Set(float64(count))
}
metrics.ConnGauge.WithLabelValues(conn.getCtx().GetSessionVars().ResourceGroupName).Inc()
return true
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/server/tests/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3139,3 +3139,8 @@ func TestTypeAndCharsetOfSendLongData(t *testing.T) {
ts := servertestkit.CreateTidbTestSuite(t)
ts.RunTestTypeAndCharsetOfSendLongData(t)
}

func TestConnectionCount(t *testing.T) {
ts := servertestkit.CreateTidbTestSuite(t)
ts.RunTestConnectionCount(t)
}

0 comments on commit abf0783

Please sign in to comment.