From 360c3c95d8196487f7a53824b9dcad2a9bc1bac1 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Tue, 14 Nov 2023 18:41:32 -0800 Subject: [PATCH] update metrics --- core/aggregation.go | 4 +- disperser/batcher/batcher.go | 8 ++-- disperser/batcher/batcher_test.go | 6 ++- disperser/batcher/encoded_blob_store.go | 1 - disperser/batcher/metrics.go | 57 +++++++++++-------------- disperser/dispatcher/dispatcher.go | 12 +++--- node/grpc/server_load_test.go | 3 +- 7 files changed, 44 insertions(+), 47 deletions(-) diff --git a/core/aggregation.go b/core/aggregation.go index ea38558ec9..54c6f25032 100644 --- a/core/aggregation.go +++ b/core/aggregation.go @@ -92,7 +92,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(state *IndexedOperatorState socket = op.Socket } if r.Err != nil { - a.Logger.Warn("Error returned from messageChan", "operator", operatorIDHex, "socket", socket, "err", r.Err) + a.Logger.Warn("[AggregateSignatures] error returned from messageChan", "operator", operatorIDHex, "socket", socket, "err", r.Err) continue } @@ -110,6 +110,8 @@ func (a *StdSignatureAggregator) AggregateSignatures(state *IndexedOperatorState continue } + a.Logger.Info("[AggregateSignatures] received signature from operator", "operator", operatorIDHex, "socket", socket) + for ind, id := range quorumIDs { // Get stake amounts for operator diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index d23104cd36..f386c03967 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -187,10 +187,9 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser. // Append the error result = multierror.Append(result, err) } + b.Metrics.IncrementCompletedBlobCount(int(metadata.RequestMetadata.BlobSize), disperser.Failed) } - b.Metrics.UpdateFailedBatchAndBlobs(len(blobMetadatas)) - // Return the error(s) return result.ErrorOrNil() } @@ -318,8 +317,10 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { if status == disperser.Confirmed { _, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo) + b.Metrics.IncrementCompletedBlobCount(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed) } else if status == disperser.InsufficientSignatures { _, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo) + b.Metrics.IncrementCompletedBlobCount(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures) } else { updateConfirmationInfoErr = fmt.Errorf("HandleSingleBatch: trying to update confirmation info for blob in status other than confirmed or insufficient signatures: %s", status.String()) } @@ -340,8 +341,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { log.Trace("[batcher] Update confirmation info took", "duration", time.Since(stageTimer)) b.Metrics.ObserveLatency("UpdateConfirmationInfo", float64(time.Since(stageTimer).Milliseconds())) - - b.Metrics.UpdateCompletedBatchAndBlobs(batch.BlobMetadata, passed) + b.Metrics.IncrementBatchCount(len(batch.BlobMetadata)) return nil } diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index a5a64fd8f1..54911a4e4b 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/logging" cmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/encoding" @@ -63,14 +64,15 @@ func makeTestBlob(securityParams []*core.SecurityParam) core.Blob { func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) { // Common Components - logger := &cmock.Logger{} + logger, err := logging.GetLogger(logging.DefaultCLIConfig()) + assert.NoError(t, err) // Core Components cst, err := coremock.NewChainDataMock(10) assert.NoError(t, err) cst.On("GetCurrentBlockNumber").Return(uint(10), nil) asgn := &core.StdAssignmentCoordinator{} - agg := &core.StdSignatureAggregator{} + agg := core.NewStdSignatureAggregator(logger) enc, err := makeTestEncoder() assert.NoError(t, err) diff --git a/disperser/batcher/encoded_blob_store.go b/disperser/batcher/encoded_blob_store.go index c410c97ff5..02973f9ffc 100644 --- a/disperser/batcher/encoded_blob_store.go +++ b/disperser/batcher/encoded_blob_store.go @@ -102,7 +102,6 @@ func (e *encodedBlobStore) PutEncodingResult(result *EncodingResult) error { } e.encoded[requestID] = result delete(e.requested, requestID) - e.logger.Trace("[PutEncodingResult]", "referenceBlockNumber", result.ReferenceBlockNumber, "requestID", requestID, "encodedSize", e.encodedResultSize) return nil } diff --git a/disperser/batcher/metrics.go b/disperser/batcher/metrics.go index 3d9ce64b24..66792081cd 100644 --- a/disperser/batcher/metrics.go +++ b/disperser/batcher/metrics.go @@ -52,7 +52,7 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics { Name: "batches_total", Help: "the number and size of total dispersal batch", }, - []string{"state", "data"}, + []string{"data"}, ), BatchProcLatency: promauto.With(reg).NewSummaryVec( prometheus.SummaryOpts{ @@ -85,43 +85,34 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics { return metrics } -func (g *Metrics) UpdateAttestation(signers, nonSigners int) { - g.Attestation.WithLabelValues("signers").Set(float64(signers)) - g.Attestation.WithLabelValues("non_signers").Set(float64(nonSigners)) +func (g *Metrics) UpdateAttestation(operatorCount, nonSignerCount int) { + g.Attestation.WithLabelValues("signers").Set(float64(operatorCount - nonSignerCount)) + g.Attestation.WithLabelValues("non_signers").Set(float64(nonSignerCount)) } -// UpdateFailedBatchAndBlobs updates failed a batch and number of blob within it, it only -// counts the number of blob and batches -func (g *Metrics) UpdateFailedBatchAndBlobs(numBlob int) { - g.Blob.WithLabelValues("failed", "number").Add(float64(numBlob)) - g.Batch.WithLabelValues("failed", "number").Inc() -} - -// UpdateCompletedBatchAndBlobs updates whenever there is a completed batch. it updates both the -// number for batch and blob, and it updates size of data blob. Moreover, it updates the -// time it takes to process the entire batch from "getting the blobs" to "marking as finished" -func (g *Metrics) UpdateCompletedBatchAndBlobs(blobsInBatch []*disperser.BlobMetadata, succeeded []bool) { - totalBlobSucceeded := 0 - totalBlobFailed := 0 - totalBlobSize := 0 - - for ind, metadata := range blobsInBatch { - if succeeded[ind] { - totalBlobSucceeded += 1 - totalBlobSize += int(metadata.RequestMetadata.BlobSize) - } else { - totalBlobFailed += 1 - } +// IncrementCompletedBlobCount increments the number and updates size of processed blobs. +func (g *Metrics) IncrementCompletedBlobCount(size int, status disperser.BlobStatus) { + switch status { + case disperser.Confirmed: + g.Blob.WithLabelValues("confirmed", "number").Inc() + g.Blob.WithLabelValues("confirmed", "size").Add(float64(size)) + case disperser.Failed: + g.Blob.WithLabelValues("failed", "number").Inc() + g.Blob.WithLabelValues("failed", "size").Add(float64(size)) + case disperser.InsufficientSignatures: + g.Blob.WithLabelValues("insufficient_signature", "number").Inc() + g.Blob.WithLabelValues("insufficient_signature", "size").Add(float64(size)) + default: + return } - // Failed blob - g.UpdateFailedBatchAndBlobs(totalBlobFailed) + g.Blob.WithLabelValues("total", "number").Inc() + g.Blob.WithLabelValues("total", "size").Add(float64(size)) +} - // Blob - g.Blob.WithLabelValues("completed", "number").Add(float64(totalBlobSucceeded)) - g.Blob.WithLabelValues("completed", "size").Add(float64(totalBlobSize)) - // Batch - g.Batch.WithLabelValues("completed", "number").Inc() +func (g *Metrics) IncrementBatchCount(size int) { + g.Batch.WithLabelValues("number").Inc() + g.Batch.WithLabelValues("size").Add(float64(size)) } func (g *Metrics) ObserveLatency(stage string, latencyMs float64) { diff --git a/disperser/dispatcher/dispatcher.go b/disperser/dispatcher/dispatcher.go index 9c5fa95117..963abc1c04 100644 --- a/disperser/dispatcher/dispatcher.go +++ b/disperser/dispatcher/dispatcher.go @@ -89,12 +89,13 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, ctx, cancel := context.WithTimeout(ctx, c.Timeout) defer cancel() - request, err := GetStoreChunksRequest(blobs, header) + request, totalSize, err := GetStoreChunksRequest(blobs, header) if err != nil { return nil, err } opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 1024) + c.logger.Debug("sending chunks to operator", "operator", op.Socket, "size", totalSize) reply, err := gc.StoreChunks(ctx, request, opt) if err != nil { @@ -106,15 +107,16 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, return sig, nil } -func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchHeader) (*node.StoreChunksRequest, error) { - +func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchHeader) (*node.StoreChunksRequest, int, error) { blobs := make([]*node.Blob, len(blobMessages)) + totalSize := 0 for i, blob := range blobMessages { var err error blobs[i], err = getBlobMessage(blob) if err != nil { - return nil, err + return nil, 0, err } + totalSize += blob.BlobHeader.EncodedSizeAllQuorums() } request := &node.StoreChunksRequest{ @@ -122,7 +124,7 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchH Blobs: blobs, } - return request, nil + return request, totalSize, nil } func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) { diff --git a/node/grpc/server_load_test.go b/node/grpc/server_load_test.go index 663a714b06..6f123a572f 100644 --- a/node/grpc/server_load_test.go +++ b/node/grpc/server_load_test.go @@ -93,8 +93,9 @@ func TestStoreChunks(t *testing.T) { numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0]) } t.Logf("Batch numTotalChunks: %d", numTotalChunks) - req, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader) + req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader) assert.NoError(t, err) + assert.Equal(t, 50790400, totalSize) timer := time.Now() reply, err := server.StoreChunks(context.Background(), req)