Skip to content

Commit

Permalink
Add additional queue metrics (Layr-Labs#919)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored Nov 22, 2024
1 parent 0f63078 commit a6e08a0
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 17 deletions.
10 changes: 8 additions & 2 deletions disperser/cmd/encoder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigenda/disperser/cmd/encoder/flags"
blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/Layr-Labs/eigenda/disperser/encoder"
"github.com/Layr-Labs/eigenda/encoding/kzg/prover"
"github.com/prometheus/client_golang/prometheus"
"github.com/Layr-Labs/eigenda/relay/chunkstore"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -56,11 +58,15 @@ func RunEncoderServer(ctx *cli.Context) error {
return err
}

metrics := encoder.NewMetrics(config.MetricsConfig.HTTPPort, logger)
reg := prometheus.NewRegistry()
metrics := encoder.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger)
grpcMetrics := grpcprom.NewServerMetrics()
if config.MetricsConfig.EnableMetrics {
httpSocket := fmt.Sprintf(":%s", config.MetricsConfig.HTTPPort)
metrics.Start(context.Background())
logger.Info("Enabled metrics for Encoder", "socket", httpSocket)

reg.MustRegister(grpcMetrics)
}

if config.EncoderVersion == V2 {
Expand Down Expand Up @@ -100,7 +106,7 @@ func RunEncoderServer(ctx *cli.Context) error {
return fmt.Errorf("failed to create encoder: %w", err)
}

server := encoder.NewEncoderServer(*config.ServerConfig, logger, prover, metrics)
server := encoder.NewEncoderServer(*config.ServerConfig, logger, prover, metrics, grpcMetrics)

return server.Start()

Expand Down
26 changes: 24 additions & 2 deletions disperser/encoder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ type Metrics struct {
BlobSizeTotal *prometheus.CounterVec
Latency *prometheus.SummaryVec
BlobQueue *prometheus.GaugeVec
QueueCapacity prometheus.Gauge
QueueUtilization prometheus.Gauge
}

func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
reg := prometheus.NewRegistry()
func NewMetrics(reg *prometheus.Registry, httpPort string, logger logging.Logger) *Metrics {
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

Expand Down Expand Up @@ -71,6 +72,20 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
},
[]string{"size_bucket"},
),
QueueCapacity: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: "eigenda_encoder",
Name: "request_pool_capacity",
Help: "The maximum capacity of the request pool",
},
),
QueueUtilization: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: "eigenda_encoder",
Name: "request_pool_utilization",
Help: "Current utilization of request pool (total across all buckets)",
},
),
}
}

Expand Down Expand Up @@ -107,9 +122,16 @@ func (m *Metrics) ObserveLatency(stage string, duration time.Duration) {
}

func (m *Metrics) ObserveQueue(queueStats map[string]int) {
total := 0
for bucket, num := range queueStats {
m.BlobQueue.With(prometheus.Labels{"size_bucket": bucket}).Set(float64(num))
total += num
}
m.QueueUtilization.Set(float64(total))
}

func (m *Metrics) SetQueueCapacity(capacity int) {
m.QueueCapacity.Set(float64(capacity))
}

func (m *Metrics) Start(ctx context.Context) {
Expand Down
22 changes: 19 additions & 3 deletions disperser/encoder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/disperser"
pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand All @@ -26,6 +27,7 @@ type EncoderServer struct {
logger logging.Logger
prover encoding.Prover
metrics *Metrics
grpcMetrics *grpcprom.ServerMetrics
close func()

runningRequests chan struct{}
Expand All @@ -39,12 +41,16 @@ type blobRequest struct {
blobSizeByte int
}

func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics) *EncoderServer {
func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encoding.Prover, metrics *Metrics, grpcMetrics *grpcprom.ServerMetrics) *EncoderServer {
// Set initial queue capacity metric
metrics.SetQueueCapacity(config.RequestPoolSize)

return &EncoderServer{
config: config,
logger: logger.With("component", "EncoderServer"),
prover: prover,
metrics: metrics,
grpcMetrics: grpcMetrics,

runningRequests: make(chan struct{}, config.MaxConcurrentRequests),
requestPool: make(chan blobRequest, config.RequestPoolSize),
Expand All @@ -61,9 +67,14 @@ func (s *EncoderServer) Start() error {
}

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB
gs := grpc.NewServer(opt)
gs := grpc.NewServer(opt,
grpc.UnaryInterceptor(
s.grpcMetrics.UnaryServerInterceptor(),
),
)
reflection.Register(gs)
pb.RegisterEncoderServer(gs, s)
s.grpcMetrics.InitializeMetrics(gs)

// Register Server for Health Checks
name := pb.Encoder_ServiceDesc.ServiceName
Expand Down Expand Up @@ -91,17 +102,22 @@ func (s *EncoderServer) Close() {
func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobRequest) (*pb.EncodeBlobReply, error) {
startTime := time.Now()
blobSize := len(req.GetData())
sizeBucket := common.BlobSizeBucket(blobSize)



select {
case s.requestPool <- blobRequest{blobSizeByte: blobSize}:
s.queueLock.Lock()
s.queueStats[common.BlobSizeBucket(blobSize)]++
s.queueStats[sizeBucket]++
s.metrics.ObserveQueue(s.queueStats)
s.queueLock.Unlock()
default:
s.metrics.IncrementRateLimitedBlobRequestNum(len(req.GetData()))
s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests)
return nil, errors.New("too many requests")
}

s.runningRequests <- struct{}{}
defer s.popRequest()

Expand Down
15 changes: 8 additions & 7 deletions disperser/encoder/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand Down Expand Up @@ -109,8 +110,8 @@ func getTestData() (core.Blob, encoding.EncodingParams) {
}

func newEncoderTestServer(t *testing.T) *EncoderServer {
metrics := NewMetrics("9000", logger)
return NewEncoderServer(testServerConfig, logger, testProver, metrics)
metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger)
return NewEncoderServer(testServerConfig, logger, testProver, metrics, nil)
}

func TestEncodeBlob(t *testing.T) {
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestThrottling(t *testing.T) {

lengthCommitment = lengthProof

metrics := NewMetrics("9000", logger)
metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger)
concurrentRequests := 2
requestPoolSize := 4
encoder := &encmock.MockEncoder{
Expand All @@ -202,7 +203,7 @@ func TestThrottling(t *testing.T) {
MaxConcurrentRequests: concurrentRequests,
RequestPoolSize: requestPoolSize,
}
s := NewEncoderServer(encoderServerConfig, logger, encoder, metrics)
s := NewEncoderServer(encoderServerConfig, logger, encoder, metrics, nil)
testBlobData, testEncodingParams := getTestData()

testEncodingParamsProto := &pb.EncodingParams{
Expand Down Expand Up @@ -254,8 +255,8 @@ func TestThrottling(t *testing.T) {
func TestEncoderPointsLoading(t *testing.T) {
// encoder 1 only loads 1500 points
prover1, config1 := makeTestProver(1500)
metrics := NewMetrics("9000", logger)
server1 := NewEncoderServer(config1, logger, prover1, metrics)
metrics := NewMetrics(prometheus.NewRegistry(), "9000", logger)
server1 := NewEncoderServer(config1, logger, prover1, metrics, nil)

testBlobData, testEncodingParams := getTestData()

Expand Down Expand Up @@ -299,7 +300,7 @@ func TestEncoderPointsLoading(t *testing.T) {

// encoder 2 only loads 2900 points
encoder2, config2 := makeTestProver(2900)
server2 := NewEncoderServer(config2, logger, encoder2, metrics)
server2 := NewEncoderServer(config2, logger, encoder2, metrics, nil)

reply2, err := server2.EncodeBlob(context.Background(), encodeBlobRequestProto)
assert.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion disperser/encoder/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/Layr-Labs/eigenda/encoding/kzg/prover"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigenda/relay/chunkstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -196,7 +197,7 @@ func createTestComponents(t *testing.T) *testComponents {
t.Helper()
prover, err := makeTestProver(300000)
require.NoError(t, err, "Failed to create prover")
metrics := encoder.NewMetrics("9000", logger)
metrics := encoder.NewMetrics(prometheus.NewRegistry(), "9000", logger)
s3Client := mock.NewS3Client()
dynamoDBClient := &mock.MockDynamoDBClient{}
blobStore := blobstore.NewBlobStore(s3BucketName, s3Client, logger)
Expand Down
4 changes: 2 additions & 2 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
}

p0, _ := mustMakeTestComponents()
metrics := encoder.NewMetrics("9000", logger)
metrics := encoder.NewMetrics(prometheus.NewRegistry(), "9000", logger)
grpcEncoder := encoder.NewEncoderServer(encoder.ServerConfig{
GrpcPort: encoderPort,
MaxConcurrentRequests: 16,
RequestPoolSize: 32,
}, logger, p0, metrics)
}, logger, p0, metrics, grpcprom.NewServerMetrics())

encoderClient, err := encoder.NewEncoderClient(batcherConfig.EncoderSocket, 10*time.Second)
if err != nil {
Expand Down

0 comments on commit a6e08a0

Please sign in to comment.