From dc25d95f9179d6973d1f9ab2dfdb7a3b692f8f07 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Wed, 20 Nov 2024 19:14:59 +0000 Subject: [PATCH 1/5] Add additional queue metrics --- disperser/encoder/metrics.go | 23 +++++++++++++++++++++++ disperser/encoder/server.go | 8 +++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/disperser/encoder/metrics.go b/disperser/encoder/metrics.go index 11ba438b54..a52eb8af06 100644 --- a/disperser/encoder/metrics.go +++ b/disperser/encoder/metrics.go @@ -27,6 +27,8 @@ type Metrics struct { BlobSizeTotal *prometheus.CounterVec Latency *prometheus.SummaryVec BlobQueue *prometheus.GaugeVec + QueueCapacity prometheus.Gauge + QueueUtilization prometheus.Gauge } func NewMetrics(httpPort string, logger logging.Logger) *Metrics { @@ -71,6 +73,20 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics { }, []string{"size_bucket"}, ), + QueueCapacity: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: "eigenda_encoder", + Name: "request_pool_capacity", + Help: "The maximum capacity of the request pool", + }, + ), + QueueUtilization: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: "eigenda_encoder", + Name: "request_pool_utilization", + Help: "Current utilization of request pool (total across all buckets)", + }, + ), } } @@ -107,9 +123,16 @@ func (m *Metrics) ObserveLatency(stage string, duration time.Duration) { } func (m *Metrics) ObserveQueue(queueStats map[string]int) { + total := 0 for bucket, num := range queueStats { m.BlobQueue.With(prometheus.Labels{"size_bucket": bucket}).Set(float64(num)) + total += num } + m.QueueUtilization.Set(float64(total)) +} + +func (m *Metrics) SetQueueCapacity(capacity int) { + m.QueueCapacity.Set(float64(capacity)) } func (m *Metrics) Start(ctx context.Context) { diff --git a/disperser/encoder/server.go b/disperser/encoder/server.go index 18a7ad43ec..78a31d0adb 100644 --- a/disperser/encoder/server.go +++ b/disperser/encoder/server.go @@ -40,6 +40,9 @@ type blobRequest struct { } func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics) *EncoderServer { + // Set initial queue capacity metric + metrics.SetQueueCapacity(config.RequestPoolSize) + return &EncoderServer{ config: config, logger: logger.With("component", "EncoderServer"), @@ -91,10 +94,12 @@ func (s *EncoderServer) Close() { func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) { startTime := time.Now() blobSize := len(req.GetData()) + sizeBucket := common.BlobSizeBucket(blobSize) + select { case s.requestPool <- blobRequest{blobSizeByte: blobSize}: s.queueLock.Lock() - s.queueStats[common.BlobSizeBucket(blobSize)]++ + s.queueStats[sizeBucket]++ s.metrics.ObserveQueue(s.queueStats) s.queueLock.Unlock() default: @@ -102,6 +107,7 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests) return nil, errors.New("too many requests") } + s.runningRequests <- struct{}{} defer s.popRequest() From cea9cca74bf19c25a87b6be50ec76440ca8b981b Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 21 Nov 2024 21:04:07 +0000 Subject: [PATCH 2/5] Add grpc metrics --- disperser/cmd/encoder/main.go | 10 ++++++++-- disperser/encoder/metrics.go | 12 ++++++++++-- disperser/encoder/server.go | 14 ++++++++++++-- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/disperser/cmd/encoder/main.go b/disperser/cmd/encoder/main.go index d546291da2..9c90d6bf7e 100644 --- a/disperser/cmd/encoder/main.go +++ b/disperser/cmd/encoder/main.go @@ -10,8 +10,10 @@ import ( "github.com/Layr-Labs/eigenda/common/aws/s3" "github.com/Layr-Labs/eigenda/disperser/cmd/encoder/flags" blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/Layr-Labs/eigenda/disperser/encoder" "github.com/Layr-Labs/eigenda/encoding/kzg/prover" + "github.com/prometheus/client_golang/prometheus" "github.com/Layr-Labs/eigenda/relay/chunkstore" "github.com/urfave/cli" ) @@ -56,11 +58,15 @@ func RunEncoderServer(ctx *cli.Context) error { return err } - metrics := encoder.NewMetrics(config.MetricsConfig.HTTPPort, logger) + reg := prometheus.NewRegistry() + metrics := encoder.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger) + grpcMetrics := grpcprom.NewServerMetrics() if config.MetricsConfig.EnableMetrics { httpSocket := fmt.Sprintf(":%s", config.MetricsConfig.HTTPPort) metrics.Start(context.Background()) logger.Info("Enabled metrics for Encoder", "socket", httpSocket) + + reg.MustRegister(grpcMetrics) } if config.EncoderVersion == V2 { @@ -100,7 +106,7 @@ func RunEncoderServer(ctx *cli.Context) error { return fmt.Errorf("failed to create encoder: %w", err) } - server := encoder.NewEncoderServer(*config.ServerConfig, logger, prover, metrics) + server := encoder.NewEncoderServer(*config.ServerConfig, logger, prover, metrics, grpcMetrics) return server.Start() diff --git a/disperser/encoder/metrics.go b/disperser/encoder/metrics.go index a52eb8af06..79d6e98db4 100644 --- a/disperser/encoder/metrics.go +++ b/disperser/encoder/metrics.go @@ -29,10 +29,10 @@ type Metrics struct { BlobQueue *prometheus.GaugeVec QueueCapacity prometheus.Gauge QueueUtilization prometheus.Gauge + IncomingRequestRate *prometheus.GaugeVec } -func NewMetrics(httpPort string, logger logging.Logger) *Metrics { - reg := prometheus.NewRegistry() +func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics { reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) @@ -87,6 +87,14 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics { Help: "Current utilization of request pool (total across all buckets)", }, ), + IncomingRequestRate: promauto.With(reg).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "eigenda_encoder", + Name: "incoming_request_rate", + Help: "The rate of incoming requests per second, categorized by size bucket", + }, + []string{"size_bucket"}, + ), } } diff --git a/disperser/encoder/server.go b/disperser/encoder/server.go index 78a31d0adb..4eb0c39f61 100644 --- a/disperser/encoder/server.go +++ b/disperser/encoder/server.go @@ -12,6 +12,7 @@ import ( "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/disperser" pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/Layr-Labs/eigenda/disperser/common" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" @@ -26,6 +27,7 @@ type EncoderServer struct { logger logging.Logger prover encoding.Prover metrics *Metrics + grpcMetrics *grpcprom.ServerMetrics close func() runningRequests chan struct{} @@ -39,7 +41,7 @@ type blobRequest struct { blobSizeByte int } -func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics) *EncoderServer { +func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics, grpcMetrics *grpcprom.ServerMetrics) *EncoderServer { // Set initial queue capacity metric metrics.SetQueueCapacity(config.RequestPoolSize) @@ -48,6 +50,7 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin logger: logger.With("component", "EncoderServer"), prover: prover, metrics: metrics, + grpcMetrics: grpcMetrics, runningRequests: make(chan struct{}, config.MaxConcurrentRequests), requestPool: make(chan blobRequest, config.RequestPoolSize), @@ -64,9 +67,14 @@ func (s *EncoderServer) Start() error { } opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB - gs := grpc.NewServer(opt) + gs := grpc.NewServer(opt, + grpc.UnaryInterceptor( + s.grpcMetrics.UnaryServerInterceptor(), + ), + ) reflection.Register(gs) pb.RegisterEncoderServer(gs, s) + s.grpcMetrics.InitializeMetrics(gs) // Register Server for Health Checks name := pb.Encoder_ServiceDesc.ServiceName @@ -96,6 +104,8 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques blobSize := len(req.GetData()) sizeBucket := common.BlobSizeBucket(blobSize) + + select { case s.requestPool <- blobRequest{blobSizeByte: blobSize}: s.queueLock.Lock() From 14cc7ca60f49017746a8f5cbb138b2ac7d5da384 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:08:23 -0800 Subject: [PATCH 3/5] fix --- disperser/encoder/metrics.go | 9 --------- disperser/encoder/server_test.go | 15 ++++++++------- disperser/encoder/server_v2_test.go | 3 ++- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/disperser/encoder/metrics.go b/disperser/encoder/metrics.go index 79d6e98db4..d4c4d88680 100644 --- a/disperser/encoder/metrics.go +++ b/disperser/encoder/metrics.go @@ -29,7 +29,6 @@ type Metrics struct { BlobQueue *prometheus.GaugeVec QueueCapacity prometheus.Gauge QueueUtilization prometheus.Gauge - IncomingRequestRate *prometheus.GaugeVec } func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics { @@ -87,14 +86,6 @@ func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger Help: "Current utilization of request pool (total across all buckets)", }, ), - IncomingRequestRate: promauto.With(reg).NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "eigenda_encoder", - Name: "incoming_request_rate", - Help: "The rate of incoming requests per second, categorized by size bucket", - }, - []string{"size_bucket"}, - ), } } diff --git a/disperser/encoder/server_test.go b/disperser/encoder/server_test.go index 9aa12a7282..0b2ba4da23 100644 --- a/disperser/encoder/server_test.go +++ b/disperser/encoder/server_test.go @@ -12,6 +12,7 @@ import ( "github.com/consensys/gnark-crypto/ecc/bn254" "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -109,8 +110,8 @@ func getTestData() (core.Blob, encoding.EncodingParams) { } func newEncoderTestServer(t *testing.T) *EncoderServer { - metrics := NewMetrics("9000", logger) - return NewEncoderServer(testServerConfig, logger, testProver, metrics) + metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger) + return NewEncoderServer(testServerConfig, logger, testProver, metrics, nil) } func TestEncodeBlob(t *testing.T) { @@ -179,7 +180,7 @@ func TestThrottling(t *testing.T) { lengthCommitment = lengthProof - metrics := NewMetrics("9000", logger) + metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger) concurrentRequests := 2 requestPoolSize := 4 encoder := &encmock.MockEncoder{ @@ -202,7 +203,7 @@ func TestThrottling(t *testing.T) { MaxConcurrentRequests: concurrentRequests, RequestPoolSize: requestPoolSize, } - s := NewEncoderServer(encoderServerConfig, logger, encoder, metrics) + s := NewEncoderServer(encoderServerConfig, logger, encoder, metrics, nil) testBlobData, testEncodingParams := getTestData() testEncodingParamsProto := &pb.EncodingParams{ @@ -254,8 +255,8 @@ func TestThrottling(t *testing.T) { func TestEncoderPointsLoading(t *testing.T) { // encoder 1 only loads 1500 points prover1, config1 := makeTestProver(1500) - metrics := NewMetrics("9000", logger) - server1 := NewEncoderServer(config1, logger, prover1, metrics) + metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger) + server1 := NewEncoderServer(config1, logger, prover1, metrics, nil) testBlobData, testEncodingParams := getTestData() @@ -299,7 +300,7 @@ func TestEncoderPointsLoading(t *testing.T) { // encoder 2 only loads 2900 points encoder2, config2 := makeTestProver(2900) - server2 := NewEncoderServer(config2, logger, encoder2, metrics) + server2 := NewEncoderServer(config2, logger, encoder2, metrics, nil) reply2, err := server2.EncodeBlob(context.Background(), encodeBlobRequestProto) assert.NoError(t, err) diff --git a/disperser/encoder/server_v2_test.go b/disperser/encoder/server_v2_test.go index 26c7110f7e..57a850b2c0 100644 --- a/disperser/encoder/server_v2_test.go +++ b/disperser/encoder/server_v2_test.go @@ -18,6 +18,7 @@ import ( "github.com/Layr-Labs/eigenda/encoding/kzg/prover" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/relay/chunkstore" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" @@ -196,7 +197,7 @@ func createTestComponents(t *testing.T) *testComponents { t.Helper() prover, err := makeTestProver(300000) require.NoError(t, err, "Failed to create prover") - metrics := encoder.NewMetrics("9000", logger) + metrics := encoder.NewMetrics(prometheus.NewRegistry(), "9000", logger) s3Client := mock.NewS3Client() dynamoDBClient := &mock.MockDynamoDBClient{} blobStore := blobstore.NewBlobStore(s3BucketName, s3Client, logger) From 9579c029f3a39e28377b0acb921700e1d2cb95c3 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:11:56 -0800 Subject: [PATCH 4/5] fix integration test --- test/integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration_test.go b/test/integration_test.go index 72caeb8fb0..50ca62f7a0 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -173,12 +173,12 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser } p0, _ := mustMakeTestComponents() - metrics := encoder.NewMetrics("9000", logger) + metrics := encoder.NewMetrics(prometheus.NewRegistry(), "9000", logger) grpcEncoder := encoder.NewEncoderServer(encoder.ServerConfig{ GrpcPort: encoderPort, MaxConcurrentRequests: 16, RequestPoolSize: 32, - }, logger, p0, metrics) + }, logger, p0, metrics, nil) encoderClient, err := encoder.NewEncoderClient(batcherConfig.EncoderSocket, 10*time.Second) if err != nil { From 7a467ee3e57cf3be991f2b6c8fcee9683e1b5c06 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:33:34 -0800 Subject: [PATCH 5/5] fix integration test --- test/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration_test.go b/test/integration_test.go index 50ca62f7a0..aebb546f4a 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -178,7 +178,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser GrpcPort: encoderPort, MaxConcurrentRequests: 16, RequestPoolSize: 32, - }, logger, p0, metrics, nil) + }, logger, p0, metrics, grpcprom.NewServerMetrics()) encoderClient, err := encoder.NewEncoderClient(batcherConfig.EncoderSocket, 10*time.Second) if err != nil {