Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: A new port is added for prometheus collection, and the indicator init… #1826

Closed
wants to merge 10 commits into from
4 changes: 2 additions & 2 deletions collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

module github.com/pegasus-kv/collector
module github.com/limowang/incubator-pegasus/collector
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
module github.com/limowang/incubator-pegasus/collector
module github.com/apache/incubator-pegasus/collector


go 1.18

Expand Down Expand Up @@ -98,4 +98,4 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
)
658 changes: 658 additions & 0 deletions collector/go.sum

Large diffs are not rendered by default.

Binary file added collector/main
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add binaries to the codebase.

Binary file not shown.
14 changes: 10 additions & 4 deletions collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
"strings"
"syscall"

"github.com/pegasus-kv/collector/avail"
"github.com/pegasus-kv/collector/metrics"
"github.com/pegasus-kv/collector/webui"
// "github.com/pegasus-kv/collector/avail"
// "github.com/pegasus-kv/collector/metrics"
// "github.com/pegasus-kv/collector/webui"
"github.com/limowang/incubator-pegasus/collector/avail"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the comments and use the apache repository.

"github.com/limowang/incubator-pegasus/collector/metrics"
"github.com/limowang/incubator-pegasus/collector/webui"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gopkg.in/natefinch/lumberjack.v2"
Expand Down Expand Up @@ -81,7 +84,8 @@ func main() {
return
}

webui.StartWebServer()
metrics.InitMetrics()
//webui.StartWebServer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just remove it if it's not needed.


tom := &tomb.Tomb{}
setupSignalHandler(func() {
Expand All @@ -97,5 +101,7 @@ func main() {
tom.Go(func() error {
return metrics.NewReplicaServerMetricCollector().Start(tom)
})

webui.StartWebServer()
<-tom.Dead() // gracefully wait until all goroutines dead
}
63 changes: 32 additions & 31 deletions collector/metrics/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,13 @@ func NewMetricCollector(
dataSource int,
detectInterval time.Duration,
detectTimeout time.Duration) MetricCollector {
DataSource = dataSource
GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
RoleByDataSource = make(map[int]string, 128)
TableNameByID = make(map[string]string, 128)
RoleByDataSource[0] = "meta_server"
RoleByDataSource[1] = "replica_server"
initMetrics()

return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout}
return &Collector{detectInterval: detectInterval, detectTimeout: detectTimeout, dataSource: dataSource}
}

type Collector struct {
detectInterval time.Duration
detectTimeout time.Duration
dataSource int
}

func (collector *Collector) Start(tom *tomb.Tomb) error {
Expand Down Expand Up @@ -123,19 +114,10 @@ func getReplicaAddrs() ([]string, error) {
return rserverAddrs, nil
}

// Register all metrics.
func initMetrics() {
var addrs []string
var err error
if DataSource == MetaServer {
addrs = viper.GetStringSlice("meta_servers")
} else {
addrs, err = getReplicaAddrs()
if err != nil {
log.Errorf("Get replica server address failed, err: %s", err)
return
}
}
//Get metrics with new labels

// Get all metrics of meta-server and replica-server by their addrs
func getAllMetricsByAddrs(addrs []string) {
for _, addr := range addrs {
data, err := getOneServerMetrics(addr)
if err != nil {
Expand Down Expand Up @@ -167,17 +149,15 @@ func initMetrics() {
Help: desc,
}, []string{"endpoint", "role", "level", "title"})
GaugeMetricsMap[name] = *gaugeMetric
case "Percentile":
if _, ok := SummaryMetricsMap[name]; ok {
case "Percentile": //这个需要改动不能用这个表示,用gauge来表示分位数 --level(p50,p99),title(task_name)来替代区分
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use English because this is a globally open-source project.

if _, ok := GaugeMetricsMap[name]; ok {
continue
}
summaryMetric := promauto.NewSummary(prometheus.SummaryOpts{
gaugeMetric := promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Help: desc,
Objectives: map[float64]float64{
0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001, 0.999: 0.0001},
})
SummaryMetricsMap[name] = summaryMetric
}, []string{"endpoint", "role", "level", "title"})
GaugeMetricsMap[name] = *gaugeMetric
case "Histogram":
default:
log.Errorf("Unsupport metric type %s", mtype)
Expand All @@ -187,6 +167,27 @@ func initMetrics() {
}
}

// Register all metrics.
func InitMetrics() {
GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 256)
CounterMetricsMap = make(map[string]prometheus.CounterVec, 256)
SummaryMetricsMap = make(map[string]prometheus.Summary, 256)
RoleByDataSource = make(map[int]string, 128)
TableNameByID = make(map[string]string, 256)
RoleByDataSource[0] = "meta_server"
RoleByDataSource[1] = "replica_server"

var addrs []string
addrs = viper.GetStringSlice("meta_servers")
replicAddrs, err := getReplicaAddrs()
if err != nil {
log.Errorf("Get raw metrics from %s failed, err: %s", replicAddrs, err)
return
}
addrs = append(addrs, replicAddrs...)
getAllMetricsByAddrs(addrs)
}

// Parse metric data and update metrics.
func processAllServerMetrics() {
var addrs []string
Expand Down
2 changes: 1 addition & 1 deletion collector/sink/falcon_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"net/http"
"time"

"github.com/pegasus-kv/collector/aggregate"
"github.com/limowang/incubator-pegasus/collector/aggregate"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
Expand Down
2 changes: 1 addition & 1 deletion collector/sink/prometheus_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package sink
import (
"sync"

"github.com/pegasus-kv/collector/aggregate"
"github.com/limowang/incubator-pegasus/collector/aggregate"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down
2 changes: 1 addition & 1 deletion collector/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package sink

import (
"github.com/pegasus-kv/collector/aggregate"
"github.com/limowang/incubator-pegasus/collector/aggregate"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
Expand Down
2 changes: 1 addition & 1 deletion collector/usage/usage_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"

"github.com/apache/incubator-pegasus/go-client/pegasus"
"github.com/pegasus-kv/collector/aggregate"
"github.com/limowang/incubator-pegasus/collector/aggregate"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gopkg.in/tomb.v2"
Expand Down
2 changes: 1 addition & 1 deletion collector/webui/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package webui

import (
"github.com/kataras/iris/v12"
"github.com/pegasus-kv/collector/aggregate"
"github.com/limowang/incubator-pegasus/collector/aggregate"
)

var indexPageClusterStats = []string{
Expand Down
18 changes: 17 additions & 1 deletion collector/webui/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package webui

import (
"context"
"net/http"
"time"

"github.com/kataras/iris/v12"
"github.com/limowang/incubator-pegasus/collector/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand Down Expand Up @@ -53,9 +56,22 @@ func StartWebServer() {
app.RegisterView(tmpl)

go func() {
err := app.Listen(":8080")
err := app.Listen(":8081")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to read from config file or command line rather than hard code.

The port should be user-defined to avoid port conflicts in user environments.

if err != nil {
return
}
}()

//Provide metrics for prometheus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//Provide metrics for prometheus
// Provide metrics for Prometheus.

registry := prometheus.NewRegistry()
for _, cV := range metrics.CounterMetricsMap {
registry.MustRegister(cV)
}
for _, gV := range metrics.GaugeMetricsMap {
registry.MustRegister(gV)
}

http.Handle("/metrics", promhttp.Handler())

_ = http.ListenAndServe(":8080", nil)
}
Loading