Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49424
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
bufferflies authored and ti-chi-bot committed Jan 5, 2024
1 parent de2293d commit 55a3ec7
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 68 deletions.
10 changes: 5 additions & 5 deletions pkg/executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
})

if preparedObj != nil {
CountStmtNode(preparedObj.PreparedAst.Stmt, sessVars.InRestrictedSQL)
CountStmtNode(preparedObj.PreparedAst.Stmt, sessVars.InRestrictedSQL, stmtCtx.ResourceGroup)
} else {
CountStmtNode(stmtNode, sessVars.InRestrictedSQL)
CountStmtNode(stmtNode, sessVars.InRestrictedSQL, stmtCtx.ResourceGroup)
}
var lowerPriority bool
if c.Ctx.GetSessionVars().StmtCtx.Priority == mysql.NoPriority {
Expand Down Expand Up @@ -187,7 +187,7 @@ func isPhysicalPlanNeedLowerPriority(p plannercore.PhysicalPlan) bool {
}

// CountStmtNode records the number of statements with the same type.
func CountStmtNode(stmtNode ast.StmtNode, inRestrictedSQL bool) {
func CountStmtNode(stmtNode ast.StmtNode, inRestrictedSQL bool, resourceGroup string) {
if inRestrictedSQL {
return
}
Expand All @@ -203,11 +203,11 @@ func CountStmtNode(stmtNode ast.StmtNode, inRestrictedSQL bool) {
}
case config.GetGlobalConfig().Status.RecordDBLabel:
for dbLabel := range dbLabels {
metrics.StmtNodeCounter.WithLabelValues(typeLabel, dbLabel).Inc()
metrics.StmtNodeCounter.WithLabelValues(typeLabel, dbLabel, resourceGroup).Inc()
}
}
} else {
metrics.StmtNodeCounter.WithLabelValues(typeLabel, "").Inc()
metrics.StmtNodeCounter.WithLabelValues(typeLabel, "", resourceGroup).Inc()
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func InitExecutorMetrics() {
Subsystem: "executor",
Name: "statement_total",
Help: "Counter of StmtNode.",
}, []string{LblType, LblDb})
}, []string{LblType, LblDb, LblResourceGroup})

DbStmtNodeCounter = NewCounterVec(
prometheus.CounterOpts{
Expand Down
10 changes: 5 additions & 5 deletions pkg/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
QueryDurationHistogram *prometheus.HistogramVec
QueryTotalCounter *prometheus.CounterVec
AffectedRowsCounter *prometheus.CounterVec
ConnGauge prometheus.Gauge
ConnGauge *prometheus.GaugeVec
DisconnectionCounter *prometheus.CounterVec
PreparedStmtGauge prometheus.Gauge
ExecuteErrorCounter *prometheus.CounterVec
Expand Down Expand Up @@ -97,7 +97,7 @@ func InitServerMetrics() {
Subsystem: "server",
Name: "query_total",
Help: "Counter of queries.",
}, []string{LblType, LblResult})
}, []string{LblType, LblResult, LblResourceGroup})

AffectedRowsCounter = NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -107,13 +107,13 @@ func InitServerMetrics() {
Help: "Counters of server affected rows.",
}, []string{LblSQLType})

ConnGauge = NewGauge(
ConnGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "connections",
Help: "Number of connections.",
})
}, []string{LblResourceGroup})

DisconnectionCounter = NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -136,7 +136,7 @@ func InitServerMetrics() {
Subsystem: "server",
Name: "execute_error_total",
Help: "Counter of execute errors.",
}, []string{LblType, LblDb})
}, []string{LblType, LblDb, LblResourceGroup})

CriticalErrorCounter = NewCounter(
prometheus.CounterOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func InitSessionMetrics() {
Subsystem: "session",
Name: "resource_group_query_total",
Help: "Counter of the total number of queries for the resource group",
}, []string{LblName})
}, []string{LblName, LblResourceGroup})

FairLockingUsageCount = NewCounterVec(
prometheus.CounterOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func GetNonTransactionalStmtCounter() NonTransactionalStmtCounter {

// GetSavepointStmtCounter gets the savepoint statement executed counter.
func GetSavepointStmtCounter() int64 {
return readCounter(StmtNodeCounter.With(prometheus.Labels{LblType: "Savepoint", LblDb: ""}))
return readCounter(StmtNodeCounter.WithLabelValues("Savepoint", "", ""))
}

// GetLazyPessimisticUniqueCheckSetCounter returns the counter of setting tidb_constraint_check_in_place_pessimistic to false.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/config",
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/domain/resourcegroup",
"//pkg/errno",
"//pkg/executor",
"//pkg/executor/mppcoordmanager",
Expand Down
58 changes: 46 additions & 12 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/extension"
Expand Down Expand Up @@ -334,13 +335,23 @@ func (cc *clientConn) handshake(ctx context.Context) error {
func (cc *clientConn) Close() error {
cc.server.rwlock.Lock()
delete(cc.server.clients, cc.connectionID)
connections := len(cc.server.clients)
resourceGroupName, count := "", 0
if ctx := cc.getCtx(); ctx != nil {
resourceGroupName = ctx.GetSessionVars().ResourceGroupName
count = cc.server.ConnNumByResourceGroup[resourceGroupName]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, resourceGroupName)
} else {
cc.server.ConnNumByResourceGroup[resourceGroupName]--
}
}
cc.server.rwlock.Unlock()
return closeConn(cc, connections)
return closeConn(cc, resourceGroupName, count)
}

// closeConn should be idempotent.
// It will be called on the same `clientConn` more than once to avoid connection leak.
<<<<<<< HEAD
func closeConn(cc *clientConn, connections int) error {
metrics.ConnGauge.Set(float64(connections))
if cc.connectionID > 0 {
Expand All @@ -353,6 +364,16 @@ func closeConn(cc *clientConn, connections int) error {
// We need to expect connection might have already disconnected.
// This is because closeConn() might be called after a connection read-timeout.
logutil.Logger(context.Background()).Debug("could not close connection", zap.Error(err))
=======
func closeConn(cc *clientConn, resourceGroupName string, count int) error {
var err error
cc.closeOnce.Do(func() {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count))

if cc.connectionID > 0 {
cc.server.dom.ReleaseConnID(cc.connectionID)
cc.connectionID = 0
>>>>>>> e80385270c9 (metrics: add connection and fail metrics by `resource group name` (#49424))
}
}
// Close statements and session
Expand All @@ -365,7 +386,14 @@ func closeConn(cc *clientConn, connections int) error {

func (cc *clientConn) closeWithoutLock() error {
delete(cc.server.clients, cc.connectionID)
return closeConn(cc, len(cc.server.clients))
name := cc.getCtx().GetSessionVars().ResourceGroupName
count := cc.server.ConnNumByResourceGroup[name]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, name)
} else {
cc.server.ConnNumByResourceGroup[name]--
}
return closeConn(cc, name, count-1)
}

// writeInitialHandshake sends server version, connection ID, server capability, collation, server status
Expand Down Expand Up @@ -1085,9 +1113,11 @@ func (cc *clientConn) Run(ctx context.Context) {
if ctx := cc.getCtx(); ctx != nil {
txnMode = ctx.GetSessionVars().GetReadableTxnMode()
}
for _, dbName := range session.GetDBNames(cc.getCtx().GetSessionVars()) {
metrics.ExecuteErrorCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err), dbName).Inc()
vars := cc.getCtx().GetSessionVars()
for _, dbName := range session.GetDBNames(vars) {
metrics.ExecuteErrorCounter.WithLabelValues(metrics.ExecuteErrorToLabel(err), dbName, vars.ResourceGroupName).Inc()
}

if storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err) {
logutil.Logger(ctx).Debug("Expected error for FOR UPDATE NOWAIT", zap.Error(err))
} else {
Expand Down Expand Up @@ -1136,20 +1166,25 @@ func (cc *clientConn) addMetrics(cmd byte, startTime time.Time, err error) {
return
}

vars := cc.getCtx().GetSessionVars()
resourceGroupName := vars.ResourceGroupName
var counter prometheus.Counter
if err != nil && int(cmd) < len(server_metrics.QueryTotalCountErr) {
counter = server_metrics.QueryTotalCountErr[cmd]
} else if err == nil && int(cmd) < len(server_metrics.QueryTotalCountOk) {
counter = server_metrics.QueryTotalCountOk[cmd]
if len(resourceGroupName) == 0 || resourceGroupName == resourcegroup.DefaultResourceGroupName {
if err != nil && int(cmd) < len(server_metrics.QueryTotalCountErr) {
counter = server_metrics.QueryTotalCountErr[cmd]
} else if err == nil && int(cmd) < len(server_metrics.QueryTotalCountOk) {
counter = server_metrics.QueryTotalCountOk[cmd]
}
}

if counter != nil {
counter.Inc()
} else {
label := strconv.Itoa(int(cmd))
if err != nil {
metrics.QueryTotalCounter.WithLabelValues(label, "Error").Inc()
metrics.QueryTotalCounter.WithLabelValues(label, "Error", resourceGroupName).Inc()
} else {
metrics.QueryTotalCounter.WithLabelValues(label, "OK").Inc()
metrics.QueryTotalCounter.WithLabelValues(label, "OK", resourceGroupName).Inc()
}
}

Expand All @@ -1175,7 +1210,6 @@ func (cc *clientConn) addMetrics(cmd byte, startTime time.Time, err error) {
server_metrics.AffectedRowsCounterUpdate.Add(float64(affectedRows))
}

vars := cc.getCtx().GetSessionVars()
for _, dbName := range session.GetDBNames(vars) {
metrics.QueryDurationHistogram.WithLabelValues(sqlType, dbName, vars.ResourceGroupName).Observe(cost.Seconds())
}
Expand Down
74 changes: 74 additions & 0 deletions pkg/server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1999,3 +1999,77 @@ func TestEmptyOrgName(t *testing.T) {

testDispatch(t, inputs, 0)
}
<<<<<<< HEAD
=======

func TestStats(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

stats := &compressionStats{}

// No compression
vars := tk.Session().GetSessionVars()
m, err := stats.Stats(vars)
require.NoError(t, err)
require.Equal(t, "OFF", m["Compression"])
require.Equal(t, "", m["Compression_algorithm"])
require.Equal(t, 0, m["Compression_level"])

// zlib compression
vars.CompressionAlgorithm = mysql.CompressionZlib
m, err = stats.Stats(vars)
require.NoError(t, err)
require.Equal(t, "ON", m["Compression"])
require.Equal(t, "zlib", m["Compression_algorithm"])
require.Equal(t, mysql.ZlibCompressDefaultLevel, m["Compression_level"])

// zstd compression, with level 1
vars.CompressionAlgorithm = mysql.CompressionZstd
vars.CompressionLevel = 1
m, err = stats.Stats(vars)
require.NoError(t, err)
require.Equal(t, "ON", m["Compression"])
require.Equal(t, "zstd", m["Compression_algorithm"])
require.Equal(t, 1, m["Compression_level"])
}

func TestCloseConn(t *testing.T) {
var outBuffer bytes.Buffer

store, _ := testkit.CreateMockStoreAndDomain(t)
cfg := serverutil.NewTestConfig()
cfg.Port = 0
cfg.Status.StatusPort = 0
drv := NewTiDBDriver(store)
server, err := NewServer(cfg, drv)
require.NoError(t, err)

cc := &clientConn{
connectionID: 0,
salt: []byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A,
0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14,
},
server: server,
pkt: internal.NewPacketIOForTest(bufio.NewWriter(&outBuffer)),
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: mysql.ClientProtocol41,
}

var wg sync.WaitGroup
const numGoroutines = 10
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
err := closeConn(cc, "", 1)
require.NoError(t, err)
}()
}
wg.Wait()
}
>>>>>>> e80385270c9 (metrics: add connection and fail metrics by `resource group name` (#49424))
4 changes: 2 additions & 2 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2253,7 +2253,7 @@ func (cli *TestServerClient) getMetrics(t *testing.T) []byte {

func getStmtCnt(content string) (stmtCnt map[string]int) {
stmtCnt = make(map[string]int)
r := regexp.MustCompile("tidb_executor_statement_total{db=\"\",type=\"([A-Z|a-z|-]+)\"} (\\d+)")
r := regexp.MustCompile("tidb_executor_statement_total{db=\"\",resource_group=\".*\",type=\"([A-Z|a-z|-]+)\"} (\\d+)")
matchResult := r.FindAllStringSubmatch(content, -1)
for _, v := range matchResult {
cnt, _ := strconv.Atoi(v[2])
Expand All @@ -2264,7 +2264,7 @@ func getStmtCnt(content string) (stmtCnt map[string]int) {

func getDBStmtCnt(content, dbName string) (stmtCnt map[string]int) {
stmtCnt = make(map[string]int)
r := regexp.MustCompile(fmt.Sprintf("tidb_executor_statement_total{db=\"%s\",type=\"([A-Z|a-z|-]+)\"} (\\d+)", dbName))
r := regexp.MustCompile(fmt.Sprintf("tidb_executor_statement_total{db=\"%s\",resource_group=\".*\",type=\"([A-Z|a-z|-]+)\"} (\\d+)", dbName))
matchResult := r.FindAllStringSubmatch(content, -1)
for _, v := range matchResult {
cnt, _ := strconv.Atoi(v[2])
Expand Down
1 change: 1 addition & 0 deletions pkg/server/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/server/metrics",
visibility = ["//visibility:public"],
deps = [
"//pkg/domain/resourcegroup",
"//pkg/metrics",
"//pkg/parser/mysql",
"@com_github_prometheus_client_golang//prometheus",
Expand Down
Loading

0 comments on commit 55a3ec7

Please sign in to comment.