Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial metrics to Churn server #11

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions churner/churner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ type churner struct {

privateKey *ecdsa.PrivateKey
logger common.Logger
metrics *Metrics
}

func NewChurner(
config *Config,
indexer thegraph.IndexedChainState,
transactor core.Transactor,
logger common.Logger,
metrics *Metrics,
) (*churner, error) {
privateKey, err := crypto.HexToECDSA(config.EthClientConfig.PrivateKeyString)
if err != nil {
Expand All @@ -70,6 +72,7 @@ func NewChurner(

privateKey: privateKey,
logger: logger,
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -207,12 +210,14 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op
// verify the lowest stake against the registering operator's stake
// make sure that: lowestStake * churnBIPsOfOperatorStake < operatorToRegisterStake * bipMultiplier
if new(big.Int).Mul(lowestStake, churnBIPsOfOperatorStake).Cmp(new(big.Int).Mul(operatorToRegisterStake, bipMultiplier)) >= 0 {
c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToRegister)
return nil, errors.New("registering operator has less than churnBIPsOfOperatorStake")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another case which fails to pass the stake check below

// verify the lowest stake against the total stake
// make sure that: lowestStake * bipMultiplier < totalStake * churnBIPsOfTotalStake
if new(big.Int).Mul(lowestStake, bipMultiplier).Cmp(new(big.Int).Mul(totalStake, churnBIPsOfTotalStake)) >= 0 {
c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToChurn)
return nil, errors.New("operator to churn has less than churnBIPSOfTotalStake")
}

Expand Down
3 changes: 2 additions & 1 deletion churner/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func TestProcessChurnRequest(t *testing.T) {
PrivateKeyString: churnerPrivateKeyHex,
},
}
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger)
metrics := churner.NewMetrics("9001", logger)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
assert.NoError(t, err)
assert.NotNil(t, cn)

Expand Down
8 changes: 4 additions & 4 deletions churner/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"log"
"net"
Expand Down Expand Up @@ -81,14 +80,15 @@ func run(ctx *cli.Context) error {

querier := graphql.NewClient(config.GraphUrl, nil)
indexer := thegraph.NewIndexedChainState(cs, querier, logger)
metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger)

cn, err := churner.NewChurner(config, indexer, tx, logger)
cn, err := churner.NewChurner(config, indexer, tx, logger, metrics)
if err != nil {
log.Fatalln("cannot create churner", err)
}

churnerServer := churner.NewServer(config, cn, logger)
if err = churnerServer.Start(context.Background()); err != nil {
churnerServer := churner.NewServer(config, cn, logger, metrics)
if err = churnerServer.Start(config.MetricsConfig); err != nil {
log.Fatalln("failed to start churner server", err)
}

Expand Down
5 changes: 5 additions & 0 deletions churner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
EthClientConfig geth.EthClientConfig
LoggerConfig logging.Config
GraphUrl string
MetricsConfig MetricsConfig

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
Expand All @@ -28,5 +29,9 @@ func NewConfig(ctx *cli.Context) *Config {
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
PerPublicKeyRateLimit: ctx.GlobalDuration(flags.PerPublicKeyRateLimit.Name),
MetricsConfig: MetricsConfig{
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name),
},
}
}
16 changes: 16 additions & 0 deletions churner/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ var (
EnvVar: common.PrefixEnvVar(envPrefix, "PER_PUBLIC_KEY_RATE_LIMIT"),
Value: 24 * time.Hour,
}
EnableMetrics = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-metrics"),
Usage: "start metrics server",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "ENABLE_METRICS"),
}
/* Optional Flags*/
MetricsHTTPPort = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"),
Usage: "the http port which the metrics prometheus server is listening",
Required: false,
Value: "9100",
EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"),
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -65,10 +79,12 @@ var requiredFlags = []cli.Flag{
GraphUrlFlag,
BlsOperatorStateRetrieverFlag,
EigenDAServiceManagerFlag,
EnableMetrics,
}

var optionalFlags = []cli.Flag{
PerPublicKeyRateLimit,
MetricsHTTPPort,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
110 changes: 110 additions & 0 deletions churner/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package churner

import (
"context"
"fmt"
"net/http"

"github.com/Layr-Labs/eigenda/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type FailReason string

const (
FailReasonRateLimitExceeded FailReason = "rate_limit_exceeded" // Rate limited: per operator rate limiting
FailReasonInsufficientStakeToRegister FailReason = "insufficient_stake_to_register" // Operator doesn't have enough stake to be registered
FailReasonInsufficientStakeToChurn FailReason = "insufficient_stake_to_churn" // Operator doesn't have enough stake to be churned
FailReasonQuorumIdOutOfRange FailReason = "quorum_id_out_of_range" // Quorum ID out of range: quorum is not in the range of [0, QuorumCount]
FailReasonPrevApprovalNotExpired FailReason = "prev_approval_not_expired" // Expiry: previous approval hasn't expired
FailReasonInvalidSignature FailReason = "invalid_signature" // Invalid signature: operator's signature is wong
FailReasonProcessChurnRequestFailed FailReason = "failed_process_churn_request" // Failed to process churn request
)

type MetricsConfig struct {
HTTPPort string
EnableMetrics bool
}

type Metrics struct {
registry *prometheus.Registry

NumRequests *prometheus.CounterVec
Latency *prometheus.SummaryVec

httpPort string
logger common.Logger
}

func NewMetrics(httpPort string, logger common.Logger) *Metrics {
namespace := "eigenda_churner"
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

metrics := &Metrics{
NumRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "requests",
Help: "the number of requests",
},
[]string{"status", "reason", "method"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a "reason" to "success" request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want 2 METRICS a part?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is sufficient, I am mostly looking at why NumFailedRequests is needed, since NumRequests already has a status label that can indicate if it's success or failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I overlooked the metric "NumFailedRequests" in the code, and it shouldn't be there.

),
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "latency_ms",
Help: "latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"method"},
),
registry: reg,
httpPort: httpPort,
logger: logger,
}
return metrics
}

// ObserveLatency observes the latency of a stage in 'stage
func (g *Metrics) ObserveLatency(method string, latencyMs float64) {
g.Latency.WithLabelValues(method).Observe(latencyMs)
}

// IncrementSuccessfulRequestNum increments the number of successful requests
func (g *Metrics) IncrementSuccessfulRequestNum(method string) {
g.NumRequests.With(prometheus.Labels{
"status": "success",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as "reason", if this is always "success", why it needs to be a label?

And how do you tell the total requests that the churner received easily if there are two metrics (one for success and one for failed)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?
We can filter, isn't it?

eigenda_churner_ requests({status ="success"})

eigenda_churner_ requests({status ="failed", reason="XPTO"})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then there is no need to have NumFailedRequests

"method": method,
"reason": "",
}).Inc()
}

// IncrementFailedRequestNum increments the number of failed requests
func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) {
g.NumRequests.With(prometheus.Labels{
"status": "failed",
"reason": string(reason),
"method": method,
}).Inc()
}

// Start starts the metrics server
func (g *Metrics) Start(ctx context.Context) {
g.logger.Info("Starting metrics server at ", "port", g.httpPort)
addr := fmt.Sprintf(":%s", g.httpPort)
go func() {
log := g.logger
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(
g.registry,
promhttp.HandlerOpts{},
))
err := http.ListenAndServe(addr, mux)
log.Error("Prometheus server failed", "err", err)
}()
}
25 changes: 22 additions & 3 deletions churner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
pb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/prometheus/client_golang/prometheus"
)

type Server struct {
Expand All @@ -19,34 +20,47 @@ type Server struct {
latestExpiry int64
lastRequestTimeByOperatorID map[core.OperatorID]time.Time

logger common.Logger
logger common.Logger
metrics *Metrics
}

func NewServer(
config *Config,
churner *churner,
logger common.Logger,
metrics *Metrics,
) *Server {
return &Server{
config: config,
churner: churner,
latestExpiry: int64(0),
lastRequestTimeByOperatorID: make(map[core.OperatorID]time.Time),
logger: logger,
metrics: metrics,
}
}

func (s *Server) Start(ctx context.Context) error {
// TODO: Start Metrics
func (s *Server) Start(metricsConfig MetricsConfig) error {
// Enable Metrics Block
if metricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%s", metricsConfig.HTTPPort)
s.metrics.Start(context.Background())
s.logger.Info("Enabled metrics for Churner", "socket", httpSocket)
}
return nil
}

func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("Churn", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()
s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds())

now := time.Now()
// check that we are after the previous approval's expiry
if now.Unix() < s.latestExpiry {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonPrevApprovalNotExpired)
return nil, fmt.Errorf("previous approval not expired, retry in %d", s.latestExpiry-now.Unix())
}

Expand All @@ -58,6 +72,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl
}

if quorumID >= int(s.churner.QuorumCount) {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonQuorumIdOutOfRange)
return nil, fmt.Errorf("Invalid request: the quorum_id must be in range [0, %d], but found %d", s.churner.QuorumCount-1, quorumID)
}
}
Expand All @@ -67,17 +82,20 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl

operatorToRegisterAddress, err := s.churner.VerifyRequestSignature(ctx, request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidSignature)
return nil, fmt.Errorf("failed to verify request signature: %w", err)
}

// check if the request should be rate limited
err = s.checkShouldBeRateLimited(now, *request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonRateLimitExceeded)
return nil, fmt.Errorf("rate limiter error: %w", err)
}

response, err := s.churner.ProcessChurnRequest(ctx, operatorToRegisterAddress, request)
if err != nil {
s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed)
return nil, fmt.Errorf("failed to process churn request: %w", err)
}

Expand All @@ -86,6 +104,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl

operatorsToChurn := convertToOperatorsToChurnGrpc(response.OperatorsToChurn)

s.metrics.IncrementSuccessfulRequestNum("Churn")
return &pb.ChurnReply{
SignatureWithSaltAndExpiry: &pb.SignatureWithSaltAndExpiry{
Signature: response.SignatureWithSaltAndExpiry.Signature,
Expand Down
5 changes: 3 additions & 2 deletions churner/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,13 @@ func newTestServer(t *testing.T) *churner.Server {

setupMockTransactor()

cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger)
metrics := churner.NewMetrics("9001", logger)
cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics)
if err != nil {
log.Fatalln("cannot create churner", err)
}

return churner.NewServer(config, cn, logger)
return churner.NewServer(config, cn, logger, metrics)
}

func makeOperatorId(id int) dacore.OperatorID {
Expand Down
5 changes: 3 additions & 2 deletions churner/tests/churner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ func newTestServer(t *testing.T) *churner.Server {
)
assert.NoError(t, err)

cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger)
metrics := churner.NewMetrics("9001", logger)
cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger, metrics)
assert.NoError(t, err)

return churner.NewServer(config, cn, logger)
return churner.NewServer(config, cn, logger, metrics)
}
3 changes: 3 additions & 0 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (env *Config) generateChurnerVars(ind int, graphUrl, logPath, grpcPort stri
CHURNER_STD_LOG_LEVEL: "debug",
CHURNER_FILE_LOG_LEVEL: "trace",
CHURNER_LOG_PATH: logPath,

CHURNER_ENABLE_METRICS: "true",
CHURNER_METRICS_HTTP_PORT: "9095",
}

env.applyDefaults(&v, "CHURNER", "churner", ind)
Expand Down
4 changes: 4 additions & 0 deletions inabox/deploy/env_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ type ChurnerVars struct {
CHURNER_LOG_PATH string

CHURNER_INDEXER_PULL_INTERVAL string

CHURNER_ENABLE_METRICS string

CHURNER_METRICS_HTTP_PORT string
}

func (vars ChurnerVars) getEnvMap() map[string]string {
Expand Down