Skip to content

Commit

Permalink
make both the initMetrics function (some other functions are the same…
Browse files Browse the repository at this point in the history
…) and dataSource as members of Collector
  • Loading branch information
limowang committed Jan 18, 2024
1 parent c295f2b commit 0513273
Showing 1 changed file with 29 additions and 28 deletions.
57 changes: 29 additions & 28 deletions collector/metrics/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ var GaugeMetricsMap map[string]prometheus.GaugeVec
var CounterMetricsMap map[string]prometheus.CounterVec
var SummaryMetricsMap map[string]prometheus.Summary

// RoleByDataSource 0 meta server, 1 replica server.
var RoleByDataSource map[int]string

var TableNameByID map[string]string

type MetricCollector interface {
Expand All @@ -69,19 +66,23 @@ func NewMetricCollector(
GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
RoleByDataSource = make(map[int]string, 128)
TableNameByID = make(map[string]string, 128)
RoleByDataSource[0] = "meta"
RoleByDataSource[1] = "replica"
initMetrics(dataSource)

return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout, dataSource: dataSource}
var role string
if dataSource == 0 {
role = "meta"
} else {
role = "replica"
}
var collector = Collector{detectInterval: detectInterval, detectTimeout: detectTimeout, role: role}
collector.initMetrics()
return &collector
}

type Collector struct {
dataSource int
detectInterval time.Duration
detectTimeout time.Duration
role string
}

func (collector *Collector) Start(tom *tomb.Tomb) error {
Expand All @@ -92,7 +93,7 @@ func (collector *Collector) Start(tom *tomb.Tomb) error {
return nil
case <-ticker.C:
updateClusterTableInfo()
processAllServerMetrics(collector.dataSource)
collector.processAllServerMetrics()
}
}
}
Expand Down Expand Up @@ -123,10 +124,10 @@ func getReplicaAddrs() ([]string, error) {
}

// Register all metrics.
func initMetrics(dataSource int) {
func (collector *Collector) initMetrics() {
var addrs []string
var err error
if dataSource == MetaServer {
if collector.role == "meta" {
addrs = viper.GetStringSlice("meta_servers")
} else {
addrs, err = getReplicaAddrs()
Expand Down Expand Up @@ -187,10 +188,10 @@ func initMetrics(dataSource int) {
}

// Parse metric data and update metrics.
func processAllServerMetrics(dataSource int) {
func (collector *Collector) processAllServerMetrics() {
var addrs []string
var err error
if dataSource == MetaServer {
if collector.role == "meta" {
addrs = viper.GetStringSlice("meta_servers")
} else {
addrs, err = getReplicaAddrs()
Expand Down Expand Up @@ -224,7 +225,7 @@ func processAllServerMetrics(dataSource int) {
tableID, &metricsByTableID)
collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID,
&metricsByServerTableID)
updateServerLevelTableMetrics(addr, metricsByServerTableID, dataSource)
collector.updateServerLevelTableMetrics(addr, metricsByServerTableID)
case "server":
mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(),
metricsOfCluster)
Expand All @@ -236,13 +237,13 @@ func processAllServerMetrics(dataSource int) {
}
}

updateClusterLevelTableMetrics(metricsByTableID, dataSource)
updateServerLevelServerMetrics(metricsByAddr, dataSource)
updateClusterLevelMetrics(metricsOfCluster, dataSource)
collector.updateClusterLevelTableMetrics(metricsByTableID)
collector.updateServerLevelServerMetrics(metricsByAddr)
collector.updateClusterLevelMetrics(metricsOfCluster)
}

// Update table metrics. They belong to a specified server.
func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics, dataSource int) {
func (collector *Collector) updateServerLevelTableMetrics(addr string, metricsByServerTableID map[string]Metrics) {
for tableID, metrics := range metricsByServerTableID {
var tableName string
if name, ok := TableNameByID[tableID]; !ok {
Expand All @@ -251,29 +252,29 @@ func updateServerLevelTableMetrics(addr string, metricsByServerTableID map[strin
tableName = name
}
for _, metric := range metrics {
updateMetric(metric, addr, "server", tableName, dataSource)
collector.updateMetric(metric, addr, "server", tableName)
}
}
}

// Update server metrics. They belong to a specified server.
func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics, dataSource int) {
func (collector *Collector) updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) {
for addr, metrics := range metricsByAddr {
for _, metric := range metrics {
updateMetric(metric, addr, "server", "server", dataSource)
collector.updateMetric(metric, addr, "server", "server")
}
}
}

// Update cluster level metrics. They belong to a cluster.
func updateClusterLevelMetrics(metricsOfCluster []Metric, dataSource int) {
func (collector *Collector) updateClusterLevelMetrics(metricsOfCluster []Metric) {
for _, metric := range metricsOfCluster {
updateMetric(metric, "cluster", "server", metric.name, dataSource)
collector.updateMetric(metric, "cluster", "server", metric.name)
}
}

// Update table metrics. They belong to a cluster.
func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics, dataSource int) {
func (collector *Collector) updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
for tableID, metrics := range metricsByTableID {
var tableName string
if name, ok := TableNameByID[tableID]; !ok {
Expand All @@ -282,13 +283,13 @@ func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics, dataSou
tableName = name
}
for _, metric := range metrics {
updateMetric(metric, "cluster", "table", tableName, dataSource)
collector.updateMetric(metric, "cluster", "table", tableName)
}
}
}

func updateMetric(metric Metric, endpoint string, level string, title string, dataSource int) {
role := RoleByDataSource[dataSource]
func (collector *Collector) updateMetric(metric Metric, endpoint string, level string, title string) {
role := collector.role
switch metric.mtype {
case "Counter":
if counter, ok := CounterMetricsMap[metric.name]; ok {
Expand Down

0 comments on commit 0513273

Please sign in to comment.