Skip to content

Commit

Permalink
resource_manager: implement the metrics info collecting (#5855)
Browse files Browse the repository at this point in the history
ref #5854

Collect the resource group consumption info and use Prometheus to observe it.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
JmPotato and ti-chi-bot authored Jan 18, 2023
1 parent b2a0532 commit 082fc6a
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 6 deletions.
15 changes: 11 additions & 4 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,18 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
targetPeriodMs := request.GetTargetRequestPeriodMs()
resps := &rmpb.TokenBucketsResponse{}
for _, req := range request.Requests {
rg := s.manager.GetMutableResourceGroup(req.ResourceGroupName)
resourceGroupName := req.GetResourceGroupName()
// Get the resource group from manager to acquire token buckets.
rg := s.manager.GetMutableResourceGroup(resourceGroupName)
if rg == nil {
log.Warn("resource group not found", zap.String("resource-group", req.ResourceGroupName))
log.Warn("resource group not found", zap.String("resource-group", resourceGroupName))
continue
}
// Send the consumption to update the metrics.
s.manager.consumptionDispatcher <- struct {
resourceGroupName string
*rmpb.Consumption
}{resourceGroupName, req.GetConsumptionSinceLastRequest()}
now := time.Now()
resp := &rmpb.TokenBucketResponse{
ResourceGroupName: rg.Name,
Expand All @@ -167,10 +174,10 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens)
}
case rmpb.GroupMode_RawMode:
log.Warn("not supports the resource type", zap.String("resource-group", req.ResourceGroupName), zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)]))
log.Warn("not supports the resource type", zap.String("resource-group", resourceGroupName), zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)]))
continue
}
log.Debug("finish token request from", zap.String("resource group", req.ResourceGroupName))
log.Debug("finish token request from", zap.String("resource group", resourceGroupName))
resps.Responses = append(resps.Responses, resp)
}
stream.Send(resps)
Expand Down
66 changes: 66 additions & 0 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
"context"
"sort"
"sync"

Expand All @@ -27,20 +28,33 @@ import (
"go.uber.org/zap"
)

const defaultConsumptionChanSize = 1024

// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
groups map[string]*ResourceGroup
storage func() storage.Storage
// consumptionChan is used to send the consumption
// info to the background metrics flusher.
consumptionDispatcher chan struct {
resourceGroupName string
*rmpb.Consumption
}
}

// NewManager returns a new Manager.
func NewManager(srv *server.Server) *Manager {
m := &Manager{
groups: make(map[string]*ResourceGroup),
storage: srv.GetStorage,
consumptionDispatcher: make(chan struct {
resourceGroupName string
*rmpb.Consumption
}, defaultConsumptionChanSize),
}
srv.AddStartCallback(m.Init)
go m.backgroundMetricsFlush(srv.Context())
return m
}

Expand Down Expand Up @@ -145,3 +159,55 @@ func (m *Manager) GetResourceGroupList() []*ResourceGroup {
})
return res
}

// Receive the consumption and flush it to the metrics.
func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case consumptionInfo := <-m.consumptionDispatcher:
consumption := consumptionInfo.Consumption
if consumption == nil {
continue
}
var (
name = consumptionInfo.resourceGroupName
rruMetrics = readRequestUnitCost.WithLabelValues(name)
wruMetrics = writeRequestUnitCost.WithLabelValues(name)
readByteMetrics = readByteCost.WithLabelValues(name)
writeByteMetrics = writeByteCost.WithLabelValues(name)
kvCPUMetrics = kvCPUCost.WithLabelValues(name)
sqlCPUMetrics = sqlCPUCost.WithLabelValues(name)
readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel)
writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel)
)
// RU info.
if consumption.RRU != 0 {
rruMetrics.Observe(consumption.RRU)
}
if consumption.WRU != 0 {
wruMetrics.Observe(consumption.WRU)
}
// Byte info.
if consumption.ReadBytes != 0 {
readByteMetrics.Observe(consumption.ReadBytes)
}
if consumption.WriteBytes != 0 {
writeByteMetrics.Observe(consumption.WriteBytes)
}
// CPU time info.
if consumption.SqlLayerCpuTimeMs != 0 {
sqlCPUMetrics.Observe(consumption.SqlLayerCpuTimeMs)
kvCPUMetrics.Observe(consumption.TotalCpuTimeMs - consumption.SqlLayerCpuTimeMs)
}
// RPC count info.
if consumption.KvReadRpcCount != 0 {
readRequestCountMetrics.Add(consumption.KvReadRpcCount)
}
if consumption.KvWriteRpcCount != 0 {
writeRequestCountMetrics.Add(consumption.KvWriteRpcCount)
}
}
}
}
87 changes: 85 additions & 2 deletions pkg/mcs/resource_manager/server/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 TiKV Project Authors.
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,4 +14,87 @@

package server

// TODO: add metrics
import (
"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "resource_manager"
ruSubsystem = "resource_unit"
resourceSubsystem = "resource"
resourceGroupNameLabel = "name"
typeLabel = "type"
readTypeLabel = "read"
writeTypeLabel = "write"
)

var (
// RU cost metrics.
readRequestUnitCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: ruSubsystem,
Name: "read_request_unit",
Help: "Bucketed histogram of the read request unit cost for all resource groups.",
Buckets: prometheus.ExponentialBuckets(1, 10, 5), // 1 ~ 100000
}, []string{resourceGroupNameLabel})
writeRequestUnitCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: ruSubsystem,
Name: "write_request_unit",
Help: "Bucketed histogram of the write request unit cost for all resource groups.",
Buckets: prometheus.ExponentialBuckets(3, 10, 5), // 3 ~ 300000
}, []string{resourceGroupNameLabel})

// Resource cost metrics.
readByteCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: resourceSubsystem,
Name: "read_byte",
Help: "Bucketed histogram of the read byte cost for all resource groups.",
Buckets: prometheus.ExponentialBuckets(1, 8, 12),
}, []string{resourceGroupNameLabel})
writeByteCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: resourceSubsystem,
Name: "write_byte",
Help: "Bucketed histogram of the write byte cost for all resource groups.",
Buckets: prometheus.ExponentialBuckets(1, 8, 12),
}, []string{resourceGroupNameLabel})
kvCPUCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: resourceSubsystem,
Name: "kv_cpu_time_ms",
Help: "Bucketed histogram of the KV CPU time cost in milliseconds for all resource groups.",
Buckets: prometheus.ExponentialBuckets(1, 10, 3), // 1 ~ 1000
}, []string{resourceGroupNameLabel})
sqlCPUCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: resourceSubsystem,
Name: "sql_cpu_time_ms",
Help: "Bucketed histogram of the SQL CPU time cost in milliseconds for all resource groups.",
Buckets: prometheus.ExponentialBuckets(1, 10, 3), // 1 ~ 1000
}, []string{resourceGroupNameLabel})
requestCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: resourceSubsystem,
Name: "request_count",
Help: "The number of read/write requests for all resource groups.",
}, []string{resourceGroupNameLabel, typeLabel})
)

func init() {
prometheus.MustRegister(readRequestUnitCost)
prometheus.MustRegister(writeRequestUnitCost)
prometheus.MustRegister(readByteCost)
prometheus.MustRegister(writeByteCost)
prometheus.MustRegister(kvCPUCost)
prometheus.MustRegister(sqlCPUCost)
prometheus.MustRegister(requestCount)
}

0 comments on commit 082fc6a

Please sign in to comment.