diff --git a/churner/churner.go b/churner/churner.go index e1ebd0d93..09075e4af 100644 --- a/churner/churner.go +++ b/churner/churner.go @@ -50,6 +50,7 @@ type churner struct { privateKey *ecdsa.PrivateKey logger common.Logger + metrics *Metrics } func NewChurner( @@ -57,6 +58,7 @@ func NewChurner( indexer thegraph.IndexedChainState, transactor core.Transactor, logger common.Logger, + metrics *Metrics, ) (*churner, error) { privateKey, err := crypto.HexToECDSA(config.EthClientConfig.PrivateKeyString) if err != nil { @@ -70,6 +72,7 @@ func NewChurner( privateKey: privateKey, logger: logger, + metrics: metrics, }, nil } @@ -207,12 +210,14 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op // verify the lowest stake against the registering operator's stake // make sure that: lowestStake * churnBIPsOfOperatorStake < operatorToRegisterStake * bipMultiplier if new(big.Int).Mul(lowestStake, churnBIPsOfOperatorStake).Cmp(new(big.Int).Mul(operatorToRegisterStake, bipMultiplier)) >= 0 { + c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToRegister) return nil, errors.New("registering operator has less than churnBIPsOfOperatorStake") } // verify the lowest stake against the total stake // make sure that: lowestStake * bipMultiplier < totalStake * churnBIPsOfTotalStake if new(big.Int).Mul(lowestStake, bipMultiplier).Cmp(new(big.Int).Mul(totalStake, churnBIPsOfTotalStake)) >= 0 { + c.metrics.IncrementFailedRequestNum("getOperatorsToChurn", FailReasonInsufficientStakeToChurn) return nil, errors.New("operator to churn has less than churnBIPSOfTotalStake") } diff --git a/churner/churner_test.go b/churner/churner_test.go index fbefe302e..906d7f31a 100644 --- a/churner/churner_test.go +++ b/churner/churner_test.go @@ -26,7 +26,8 @@ func TestProcessChurnRequest(t *testing.T) { PrivateKeyString: churnerPrivateKeyHex, }, } - cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger) + metrics := churner.NewMetrics("9001", logger) + cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics) assert.NoError(t, err) assert.NotNil(t, cn) diff --git a/churner/cmd/main.go b/churner/cmd/main.go index aa10f7f6f..1535632c8 100644 --- a/churner/cmd/main.go +++ b/churner/cmd/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "fmt" "log" "net" @@ -81,14 +80,15 @@ func run(ctx *cli.Context) error { querier := graphql.NewClient(config.GraphUrl, nil) indexer := thegraph.NewIndexedChainState(cs, querier, logger) + metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger) - cn, err := churner.NewChurner(config, indexer, tx, logger) + cn, err := churner.NewChurner(config, indexer, tx, logger, metrics) if err != nil { log.Fatalln("cannot create churner", err) } - churnerServer := churner.NewServer(config, cn, logger) - if err = churnerServer.Start(context.Background()); err != nil { + churnerServer := churner.NewServer(config, cn, logger, metrics) + if err = churnerServer.Start(config.MetricsConfig); err != nil { log.Fatalln("failed to start churner server", err) } diff --git a/churner/config.go b/churner/config.go index 061d80452..a0f6fb429 100644 --- a/churner/config.go +++ b/churner/config.go @@ -13,6 +13,7 @@ type Config struct { EthClientConfig geth.EthClientConfig LoggerConfig logging.Config GraphUrl string + MetricsConfig MetricsConfig BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string @@ -28,5 +29,9 @@ func NewConfig(ctx *cli.Context) *Config { BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name), PerPublicKeyRateLimit: ctx.GlobalDuration(flags.PerPublicKeyRateLimit.Name), + MetricsConfig: MetricsConfig{ + HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), + EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name), + }, } } diff --git a/churner/flags/flags.go b/churner/flags/flags.go index 068ac8ea2..1c7d1e34d 100644 --- a/churner/flags/flags.go +++ b/churner/flags/flags.go @@ -57,6 +57,20 @@ var ( EnvVar: common.PrefixEnvVar(envPrefix, "PER_PUBLIC_KEY_RATE_LIMIT"), Value: 24 * time.Hour, } + EnableMetrics = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-metrics"), + Usage: "start metrics server", + Required: true, + EnvVar: common.PrefixEnvVar(envPrefix, "ENABLE_METRICS"), + } + /* Optional Flags*/ + MetricsHTTPPort = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"), + Usage: "the http port which the metrics prometheus server is listening", + Required: false, + Value: "9100", + EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"), + } ) var requiredFlags = []cli.Flag{ @@ -65,10 +79,12 @@ var requiredFlags = []cli.Flag{ GraphUrlFlag, BlsOperatorStateRetrieverFlag, EigenDAServiceManagerFlag, + EnableMetrics, } var optionalFlags = []cli.Flag{ PerPublicKeyRateLimit, + MetricsHTTPPort, } // Flags contains the list of configuration options available to the binary. diff --git a/churner/metrics.go b/churner/metrics.go new file mode 100644 index 000000000..f7b2119cd --- /dev/null +++ b/churner/metrics.go @@ -0,0 +1,110 @@ +package churner + +import ( + "context" + "fmt" + "net/http" + + "github.com/Layr-Labs/eigenda/common" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type FailReason string + +const ( + FailReasonRateLimitExceeded FailReason = "rate_limit_exceeded" // Rate limited: per operator rate limiting + FailReasonInsufficientStakeToRegister FailReason = "insufficient_stake_to_register" // Operator doesn't have enough stake to be registered + FailReasonInsufficientStakeToChurn FailReason = "insufficient_stake_to_churn" // Operator doesn't have enough stake to be churned + FailReasonQuorumIdOutOfRange FailReason = "quorum_id_out_of_range" // Quorum ID out of range: quorum is not in the range of [0, QuorumCount] + FailReasonPrevApprovalNotExpired FailReason = "prev_approval_not_expired" // Expiry: previous approval hasn't expired + FailReasonInvalidSignature FailReason = "invalid_signature" // Invalid signature: operator's signature is wong + FailReasonProcessChurnRequestFailed FailReason = "failed_process_churn_request" // Failed to process churn request +) + +type MetricsConfig struct { + HTTPPort string + EnableMetrics bool +} + +type Metrics struct { + registry *prometheus.Registry + + NumRequests *prometheus.CounterVec + Latency *prometheus.SummaryVec + + httpPort string + logger common.Logger +} + +func NewMetrics(httpPort string, logger common.Logger) *Metrics { + namespace := "eigenda_churner" + reg := prometheus.NewRegistry() + reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + reg.MustRegister(collectors.NewGoCollector()) + + metrics := &Metrics{ + NumRequests: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "requests", + Help: "the number of requests", + }, + []string{"status", "reason", "method"}, + ), + Latency: promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "latency_ms", + Help: "latency summary in milliseconds", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + }, + []string{"method"}, + ), + registry: reg, + httpPort: httpPort, + logger: logger, + } + return metrics +} + +// ObserveLatency observes the latency of a stage in 'stage +func (g *Metrics) ObserveLatency(method string, latencyMs float64) { + g.Latency.WithLabelValues(method).Observe(latencyMs) +} + +// IncrementSuccessfulRequestNum increments the number of successful requests +func (g *Metrics) IncrementSuccessfulRequestNum(method string) { + g.NumRequests.With(prometheus.Labels{ + "status": "success", + "method": method, + "reason": "", + }).Inc() +} + +// IncrementFailedRequestNum increments the number of failed requests +func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) { + g.NumRequests.With(prometheus.Labels{ + "status": "failed", + "reason": string(reason), + "method": method, + }).Inc() +} + +// Start starts the metrics server +func (g *Metrics) Start(ctx context.Context) { + g.logger.Info("Starting metrics server at ", "port", g.httpPort) + addr := fmt.Sprintf(":%s", g.httpPort) + go func() { + log := g.logger + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor( + g.registry, + promhttp.HandlerOpts{}, + )) + err := http.ListenAndServe(addr, mux) + log.Error("Prometheus server failed", "err", err) + }() +} diff --git a/churner/server.go b/churner/server.go index 5ec120b59..85f9b196e 100644 --- a/churner/server.go +++ b/churner/server.go @@ -8,6 +8,7 @@ import ( pb "github.com/Layr-Labs/eigenda/api/grpc/churner" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" + "github.com/prometheus/client_golang/prometheus" ) type Server struct { @@ -19,13 +20,15 @@ type Server struct { latestExpiry int64 lastRequestTimeByOperatorID map[core.OperatorID]time.Time - logger common.Logger + logger common.Logger + metrics *Metrics } func NewServer( config *Config, churner *churner, logger common.Logger, + metrics *Metrics, ) *Server { return &Server{ config: config, @@ -33,20 +36,31 @@ func NewServer( latestExpiry: int64(0), lastRequestTimeByOperatorID: make(map[core.OperatorID]time.Time), logger: logger, + metrics: metrics, } } -func (s *Server) Start(ctx context.Context) error { - // TODO: Start Metrics +func (s *Server) Start(metricsConfig MetricsConfig) error { + // Enable Metrics Block + if metricsConfig.EnableMetrics { + httpSocket := fmt.Sprintf(":%s", metricsConfig.HTTPPort) + s.metrics.Start(context.Background()) + s.logger.Info("Enabled metrics for Churner", "socket", httpSocket) + } return nil } func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnReply, error) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("Churn", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds()) now := time.Now() // check that we are after the previous approval's expiry if now.Unix() < s.latestExpiry { + s.metrics.IncrementFailedRequestNum("Churn", FailReasonPrevApprovalNotExpired) return nil, fmt.Errorf("previous approval not expired, retry in %d", s.latestExpiry-now.Unix()) } @@ -58,6 +72,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl } if quorumID >= int(s.churner.QuorumCount) { + s.metrics.IncrementFailedRequestNum("Churn", FailReasonQuorumIdOutOfRange) return nil, fmt.Errorf("Invalid request: the quorum_id must be in range [0, %d], but found %d", s.churner.QuorumCount-1, quorumID) } } @@ -67,17 +82,20 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl operatorToRegisterAddress, err := s.churner.VerifyRequestSignature(ctx, request) if err != nil { + s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidSignature) return nil, fmt.Errorf("failed to verify request signature: %w", err) } // check if the request should be rate limited err = s.checkShouldBeRateLimited(now, *request) if err != nil { + s.metrics.IncrementFailedRequestNum("Churn", FailReasonRateLimitExceeded) return nil, fmt.Errorf("rate limiter error: %w", err) } response, err := s.churner.ProcessChurnRequest(ctx, operatorToRegisterAddress, request) if err != nil { + s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed) return nil, fmt.Errorf("failed to process churn request: %w", err) } @@ -86,6 +104,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl operatorsToChurn := convertToOperatorsToChurnGrpc(response.OperatorsToChurn) + s.metrics.IncrementSuccessfulRequestNum("Churn") return &pb.ChurnReply{ SignatureWithSaltAndExpiry: &pb.SignatureWithSaltAndExpiry{ Signature: response.SignatureWithSaltAndExpiry.Signature, diff --git a/churner/server_test.go b/churner/server_test.go index 7a5e53f26..de2be6f6a 100644 --- a/churner/server_test.go +++ b/churner/server_test.go @@ -152,12 +152,13 @@ func newTestServer(t *testing.T) *churner.Server { setupMockTransactor() - cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger) + metrics := churner.NewMetrics("9001", logger) + cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics) if err != nil { log.Fatalln("cannot create churner", err) } - return churner.NewServer(config, cn, logger) + return churner.NewServer(config, cn, logger, metrics) } func makeOperatorId(id int) dacore.OperatorID { diff --git a/churner/tests/churner_test.go b/churner/tests/churner_test.go index 7b6c0c82a..4c4925f55 100644 --- a/churner/tests/churner_test.go +++ b/churner/tests/churner_test.go @@ -186,8 +186,9 @@ func newTestServer(t *testing.T) *churner.Server { ) assert.NoError(t, err) - cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger) + metrics := churner.NewMetrics("9001", logger) + cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger, metrics) assert.NoError(t, err) - return churner.NewServer(config, cn, logger) + return churner.NewServer(config, cn, logger, metrics) } diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index fbdcf21aa..c870c5fc1 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -149,6 +149,9 @@ func (env *Config) generateChurnerVars(ind int, graphUrl, logPath, grpcPort stri CHURNER_STD_LOG_LEVEL: "debug", CHURNER_FILE_LOG_LEVEL: "trace", CHURNER_LOG_PATH: logPath, + + CHURNER_ENABLE_METRICS: "true", + CHURNER_METRICS_HTTP_PORT: "9095", } env.applyDefaults(&v, "CHURNER", "churner", ind) diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index d19e73d97..b1301bbdc 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -355,6 +355,10 @@ type ChurnerVars struct { CHURNER_LOG_PATH string CHURNER_INDEXER_PULL_INTERVAL string + + CHURNER_ENABLE_METRICS string + + CHURNER_METRICS_HTTP_PORT string } func (vars ChurnerVars) getEnvMap() map[string]string {