Skip to content

Commit

Permalink
update metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 15, 2023
1 parent 8870629 commit 360c3c9
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 47 deletions.
4 changes: 3 additions & 1 deletion core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(state *IndexedOperatorState
socket = op.Socket
}
if r.Err != nil {
a.Logger.Warn("Error returned from messageChan", "operator", operatorIDHex, "socket", socket, "err", r.Err)
a.Logger.Warn("[AggregateSignatures] error returned from messageChan", "operator", operatorIDHex, "socket", socket, "err", r.Err)
continue
}

Expand All @@ -110,6 +110,8 @@ func (a *StdSignatureAggregator) AggregateSignatures(state *IndexedOperatorState
continue
}

a.Logger.Info("[AggregateSignatures] received signature from operator", "operator", operatorIDHex, "socket", socket)

for ind, id := range quorumIDs {

// Get stake amounts for operator
Expand Down
8 changes: 4 additions & 4 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,9 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.
// Append the error
result = multierror.Append(result, err)
}
b.Metrics.IncrementCompletedBlobCount(int(metadata.RequestMetadata.BlobSize), disperser.Failed)
}

b.Metrics.UpdateFailedBatchAndBlobs(len(blobMetadatas))

// Return the error(s)
return result.ErrorOrNil()
}
Expand Down Expand Up @@ -318,8 +317,10 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {

if status == disperser.Confirmed {
_, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo)
b.Metrics.IncrementCompletedBlobCount(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed)
} else if status == disperser.InsufficientSignatures {
_, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo)
b.Metrics.IncrementCompletedBlobCount(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures)
} else {
updateConfirmationInfoErr = fmt.Errorf("HandleSingleBatch: trying to update confirmation info for blob in status other than confirmed or insufficient signatures: %s", status.String())
}
Expand All @@ -340,8 +341,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {

log.Trace("[batcher] Update confirmation info took", "duration", time.Since(stageTimer))
b.Metrics.ObserveLatency("UpdateConfirmationInfo", float64(time.Since(stageTimer).Milliseconds()))

b.Metrics.UpdateCompletedBatchAndBlobs(batch.BlobMetadata, passed)
b.Metrics.IncrementBatchCount(len(batch.BlobMetadata))
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/logging"
cmock "github.com/Layr-Labs/eigenda/common/mock"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/encoding"
Expand Down Expand Up @@ -63,14 +64,15 @@ func makeTestBlob(securityParams []*core.SecurityParam) core.Blob {

func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) {
// Common Components
logger := &cmock.Logger{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)

// Core Components
cst, err := coremock.NewChainDataMock(10)
assert.NoError(t, err)
cst.On("GetCurrentBlockNumber").Return(uint(10), nil)
asgn := &core.StdAssignmentCoordinator{}
agg := &core.StdSignatureAggregator{}
agg := core.NewStdSignatureAggregator(logger)
enc, err := makeTestEncoder()
assert.NoError(t, err)

Expand Down
1 change: 0 additions & 1 deletion disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (e *encodedBlobStore) PutEncodingResult(result *EncodingResult) error {
}
e.encoded[requestID] = result
delete(e.requested, requestID)
e.logger.Trace("[PutEncodingResult]", "referenceBlockNumber", result.ReferenceBlockNumber, "requestID", requestID, "encodedSize", e.encodedResultSize)

return nil
}
Expand Down
57 changes: 24 additions & 33 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
Name: "batches_total",
Help: "the number and size of total dispersal batch",
},
[]string{"state", "data"},
[]string{"data"},
),
BatchProcLatency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Expand Down Expand Up @@ -85,43 +85,34 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
return metrics
}

func (g *Metrics) UpdateAttestation(signers, nonSigners int) {
g.Attestation.WithLabelValues("signers").Set(float64(signers))
g.Attestation.WithLabelValues("non_signers").Set(float64(nonSigners))
func (g *Metrics) UpdateAttestation(operatorCount, nonSignerCount int) {
g.Attestation.WithLabelValues("signers").Set(float64(operatorCount - nonSignerCount))
g.Attestation.WithLabelValues("non_signers").Set(float64(nonSignerCount))
}

// UpdateFailedBatchAndBlobs updates failed a batch and number of blob within it, it only
// counts the number of blob and batches
func (g *Metrics) UpdateFailedBatchAndBlobs(numBlob int) {
g.Blob.WithLabelValues("failed", "number").Add(float64(numBlob))
g.Batch.WithLabelValues("failed", "number").Inc()
}

// UpdateCompletedBatchAndBlobs updates whenever there is a completed batch. it updates both the
// number for batch and blob, and it updates size of data blob. Moreover, it updates the
// time it takes to process the entire batch from "getting the blobs" to "marking as finished"
func (g *Metrics) UpdateCompletedBatchAndBlobs(blobsInBatch []*disperser.BlobMetadata, succeeded []bool) {
totalBlobSucceeded := 0
totalBlobFailed := 0
totalBlobSize := 0

for ind, metadata := range blobsInBatch {
if succeeded[ind] {
totalBlobSucceeded += 1
totalBlobSize += int(metadata.RequestMetadata.BlobSize)
} else {
totalBlobFailed += 1
}
// IncrementCompletedBlobCount increments the number and updates size of processed blobs.
func (g *Metrics) IncrementCompletedBlobCount(size int, status disperser.BlobStatus) {
switch status {
case disperser.Confirmed:
g.Blob.WithLabelValues("confirmed", "number").Inc()
g.Blob.WithLabelValues("confirmed", "size").Add(float64(size))
case disperser.Failed:
g.Blob.WithLabelValues("failed", "number").Inc()
g.Blob.WithLabelValues("failed", "size").Add(float64(size))
case disperser.InsufficientSignatures:
g.Blob.WithLabelValues("insufficient_signature", "number").Inc()
g.Blob.WithLabelValues("insufficient_signature", "size").Add(float64(size))
default:
return
}

// Failed blob
g.UpdateFailedBatchAndBlobs(totalBlobFailed)
g.Blob.WithLabelValues("total", "number").Inc()
g.Blob.WithLabelValues("total", "size").Add(float64(size))
}

// Blob
g.Blob.WithLabelValues("completed", "number").Add(float64(totalBlobSucceeded))
g.Blob.WithLabelValues("completed", "size").Add(float64(totalBlobSize))
// Batch
g.Batch.WithLabelValues("completed", "number").Inc()
func (g *Metrics) IncrementBatchCount(size int) {
g.Batch.WithLabelValues("number").Inc()
g.Batch.WithLabelValues("size").Add(float64(size))
}

func (g *Metrics) ObserveLatency(stage string, latencyMs float64) {
Expand Down
12 changes: 7 additions & 5 deletions disperser/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()

request, err := GetStoreChunksRequest(blobs, header)
request, totalSize, err := GetStoreChunksRequest(blobs, header)
if err != nil {
return nil, err
}

opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 1024)
c.logger.Debug("sending chunks to operator", "operator", op.Socket, "size", totalSize)
reply, err := gc.StoreChunks(ctx, request, opt)

if err != nil {
Expand All @@ -106,23 +107,24 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
return sig, nil
}

func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchHeader) (*node.StoreChunksRequest, error) {

func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchHeader) (*node.StoreChunksRequest, int, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := 0
for i, blob := range blobMessages {
var err error
blobs[i], err = getBlobMessage(blob)
if err != nil {
return nil, err
return nil, 0, err
}
totalSize += blob.BlobHeader.EncodedSizeAllQuorums()
}

request := &node.StoreChunksRequest{
BatchHeader: getBatchHeaderMessage(header),
Blobs: blobs,
}

return request, nil
return request, totalSize, nil
}

func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) {
Expand Down
3 changes: 2 additions & 1 deletion node/grpc/server_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ func TestStoreChunks(t *testing.T) {
numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0])
}
t.Logf("Batch numTotalChunks: %d", numTotalChunks)
req, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader)
req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader)
assert.NoError(t, err)
assert.Equal(t, 50790400, totalSize)

timer := time.Now()
reply, err := server.StoreChunks(context.Background(), req)
Expand Down

0 comments on commit 360c3c9

Please sign in to comment.