Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

testutil/promrated: update promrated querying #2844

Merged
merged 7 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 18 additions & 38 deletions testutil/promrated/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,13 @@ import (
"github.com/obolnetwork/charon/app/promauto"
)

var (
validatorLabels = []string{"pubkey_full", "cluster_name", "cluster_hash", "cluster_network"}
networkLabels = []string{"cluster_network"}

uptime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_uptime",
Help: "Uptime of a validation key.",
}, validatorLabels)

correctness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_correctness",
Help: "Average correctness of a validation key.",
}, validatorLabels)

inclusionDelay = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_inclusion_delay",
Help: "Average inclusion delay of a validation key.",
}, validatorLabels)

attester = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_attester_effectiveness",
Help: "Attester effectiveness of a validation key.",
}, validatorLabels)

proposer = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_proposer_effectiveness",
Help: "Proposer effectiveness of a validation key.",
}, validatorLabels)
const (
clusterNetworkLabel = "cluster_network"
nodeOperatorLabel = "node_operator"
)

effectiveness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "validator_effectiveness",
Help: "Effectiveness of a validation key.",
}, validatorLabels)
var (
networkLabels = []string{clusterNetworkLabel, nodeOperatorLabel}

networkUptime = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Expand All @@ -72,6 +40,18 @@ var (
Help: "Effectiveness of the network.",
}, networkLabels)

networkProposerEffectiveness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "network_proposer_effectiveness",
Help: "Proposer Effectiveness of the network.",
}, networkLabels)

networkAttesterEffectiveness = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "promrated",
Name: "network_attester_effectiveness",
Help: "Attester Effectiveness of the network.",
}, networkLabels)

ratedErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "promrated",
Name: "api_error_total",
Expand Down
80 changes: 0 additions & 80 deletions testutil/promrated/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,15 @@
package promrated

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/z"
)

const (
promQuery = "group by (cluster_name, cluster_hash, cluster_network, pubkey_full) (core_scheduler_validator_balance_gwei)"
)

type validator struct {
PubKey string `json:"pubkey_full"`
ClusterName string `json:"cluster_name"`
ClusterHash string `json:"cluster_hash"`
ClusterNetwork string `json:"cluster_network"`
}

// serveMonitoring creates a liveness endpoint and serves metrics to prometheus.
func serveMonitoring(addr string, registry *prometheus.Registry) error {
mux := http.NewServeMux()
Expand All @@ -50,69 +33,6 @@ func serveMonitoring(addr string, registry *prometheus.Registry) error {
return errors.Wrap(server.ListenAndServe(), "failed to serve prometheus metrics")
}

// getValidators queries prometheus and returns a list of validators with associated cluster and pubkey.
func getValidators(ctx context.Context, promEndpoint string, promAuth string) ([]validator, error) {
client := new(http.Client)

url, err := url.ParseRequestURI(promEndpoint)
if err != nil {
return nil, errors.Wrap(err, "parse prometheus endpoint")
}

query := url.Query()
query.Add("query", promQuery)
url.RawQuery = query.Encode()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "new prometheus request")
}

req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", promAuth))

res, err := client.Do(req)
if err != nil {
return nil, errors.Wrap(err, "requesting prom metrics")
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, errors.Wrap(err, "reading body")
}

if res.StatusCode/100 != 2 {
return nil, errors.New("not ok http response", z.Str("body", string(body)))
}

return parseValidators(body)
}

// parseValidators reads prometheus response and returns a list of validators.
func parseValidators(body []byte) ([]validator, error) {
var result struct {
Data struct {
Result []struct {
Labels validator `json:"metric"`
} `json:"result"`
} `json:"data"`
}

if err := json.Unmarshal(body, &result); err != nil {
return nil, errors.Wrap(err, "deserializing json")
}

var validators []validator
for _, datum := range result.Data.Result {
if datum.Labels.ClusterName == "" || datum.Labels.ClusterNetwork == "" || datum.Labels.PubKey == "" {
continue
}
validators = append(validators, datum.Labels)
}

return validators, nil
}

func writeResponse(w http.ResponseWriter, status int, msg string) {
w.WriteHeader(status)
_, _ = w.Write([]byte(msg))
Expand Down
65 changes: 0 additions & 65 deletions testutil/promrated/prometheus_internal_test.go

This file was deleted.

81 changes: 28 additions & 53 deletions testutil/promrated/promrated.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
type Config struct {
RatedEndpoint string
RatedAuth string
PromEndpoint string
PromAuth string
MonitoringAddr string
Networks []string
NodeOperators []string
}

// Run blocks running the promrated program until the context is canceled or a fatal error occurs.
func Run(ctx context.Context, config Config) error {
log.Info(ctx, "Promrated started",
z.Str("rated_endpoint", redactURL(config.RatedEndpoint)),
z.Str("prom_auth", config.PromAuth),

Check warning on line 32 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L32

Added line #L32 was not covered by tests
z.Str("monitoring_addr", config.MonitoringAddr),
)

Expand All @@ -42,7 +43,8 @@
serverErr <- serveMonitoring(config.MonitoringAddr, promRegistry)
}()

ticker := time.NewTicker(12 * time.Hour)
// Metrics are produced daily so can preserve Rated CUs
ticker := time.NewTicker(24 * time.Hour)

Check warning on line 47 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L47

Added line #L47 was not covered by tests
LukeHackett12 marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

onStartup := make(chan struct{}, 1)
Expand All @@ -65,45 +67,10 @@

// report the validator effectiveness metrics for prometheus.
func reportMetrics(ctx context.Context, config Config) {
validators, err := getValidators(ctx, config.PromEndpoint, config.PromAuth)
if err != nil {
log.Error(ctx, "Failed fetching validators from prometheus", err)
return
}

for _, validator := range validators {
log.Info(ctx, "Fetched validator from prometheus",
z.Str("pubkey", validator.PubKey),
z.Str("cluster_name", validator.ClusterName),
z.Str("cluster_network", validator.ClusterNetwork),
)

if contains(config.Networks, validator.ClusterNetwork) {
stats, err := getValidatorStatistics(ctx, config.RatedEndpoint, config.RatedAuth, validator)
if err != nil {
log.Error(ctx, "Getting validator statistics", err, z.Str("pubkey", validator.PubKey))
continue
}

clusterLabels := prometheus.Labels{
"pubkey_full": validator.PubKey,
"cluster_name": validator.ClusterName,
"cluster_hash": validator.ClusterHash,
"cluster_network": validator.ClusterNetwork,
}

uptime.With(clusterLabels).Set(stats.Uptime)
correctness.With(clusterLabels).Set(stats.AvgCorrectness)
inclusionDelay.With(clusterLabels).Set(stats.AvgInclusionDelay)
attester.With(clusterLabels).Set(stats.AttesterEffectiveness)
proposer.With(clusterLabels).Set(stats.ProposerEffectiveness)
effectiveness.With(clusterLabels).Set(stats.ValidatorEffectiveness)
}
}

for _, network := range config.Networks {
networkLabels := prometheus.Labels{
"cluster_network": network,
clusterNetworkLabel: network,
nodeOperatorLabel: "all",

Check warning on line 73 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L72-L73

Added lines #L72 - L73 were not covered by tests
}

stats, err := getNetworkStatistics(ctx, config.RatedEndpoint, config.RatedAuth, network)
Expand All @@ -112,24 +79,32 @@
continue
}

networkUptime.With(networkLabels).Set(stats.AvgUptime)
networkCorrectness.With(networkLabels).Set(stats.AvgCorrectness)
networkInclusionDelay.With(networkLabels).Set(stats.AvgInclusionDelay)
networkEffectiveness.With(networkLabels).Set(stats.ValidatorEffectiveness)
}
}
setMetrics(networkLabels, stats)

for _, nodeOperator := range config.NodeOperators {
nodeOperatorLabels := prometheus.Labels{
clusterNetworkLabel: network,
nodeOperatorLabel: nodeOperator,
}

Check warning on line 88 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L82-L88

Added lines #L82 - L88 were not covered by tests

// contains checks if array contains a string s.
func contains(arr []string, s string) bool {
result := false
for _, x := range arr {
if x == s {
result = true
break
stats, err = getNodeOperatorStatistics(ctx, config.RatedEndpoint, config.RatedAuth, nodeOperator, network)
if err != nil {
log.Error(ctx, "Getting node operator statistics", err, z.Str("network", network), z.Str("node_operator", nodeOperator))
continue

Check warning on line 93 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L90-L93

Added lines #L90 - L93 were not covered by tests
}

setMetrics(nodeOperatorLabels, stats)

Check warning on line 96 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L96

Added line #L96 was not covered by tests
}
}
}

return result
func setMetrics(labels prometheus.Labels, stats networkEffectivenessData) {
networkUptime.With(labels).Set(stats.AvgUptime)
networkCorrectness.With(labels).Set(stats.AvgCorrectness)
networkInclusionDelay.With(labels).Set(stats.AvgInclusionDelay)
networkEffectiveness.With(labels).Set(stats.AvgValidatorEffectiveness)
networkProposerEffectiveness.With(labels).Set(stats.AvgProposerEffectiveness)
networkAttesterEffectiveness.With(labels).Set(stats.AvgAttesterEffectiveness)

Check warning on line 107 in testutil/promrated/promrated.go

View check run for this annotation

Codecov / codecov/patch

testutil/promrated/promrated.go#L101-L107

Added lines #L101 - L107 were not covered by tests
}

// redactURL returns a redacted version of the given URL.
Expand Down
4 changes: 2 additions & 2 deletions testutil/promrated/promrated/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func newRootCmd(runFunc func(context.Context, promrated.Config) error) *cobra.Co
func bindPromratedFlag(flags *pflag.FlagSet, config *promrated.Config) {
flags.StringVar(&config.RatedEndpoint, "rated-endpoint", "https://api.rated.network", "Rated API endpoint to poll for validator metrics.")
flags.StringVar(&config.RatedAuth, "rated-auth-token", "token", "[REQUIRED] Token for Rated API.")
flags.StringVar(&config.MonitoringAddr, "monitoring-address", "127.0.0.1:9100", "Listening address (ip and port) for the prometheus monitoring http server.")
flags.StringVar(&config.PromEndpoint, "prom-endpoint", "https://vm.monitoring.gcp.obol.tech/query", "Endpoint for VMetrics Prometheus API.")
flags.StringVar(&config.MonitoringAddr, "monitoring-address", "127.0.0.1:9200", "Listening address (ip and port) for the prometheus monitoring http server.")
flags.StringVar(&config.PromAuth, "prom-auth-token", "token", "[REQUIRED] Token for VMetrics Promtetheus API.")
flags.StringSliceVar(&config.Networks, "networks", nil, "Comma separated list of one or networks to monitor.")
flags.StringSliceVar(&config.NodeOperators, "node-operators", nil, "Comma separated list of one or node operators to monitor.")
}
Loading
Loading