Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metrics] Export batch failure reasons #108

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (b *Batcher) Start(ctx context.Context) error {
return nil
}

func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata) error {
func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error {
var result *multierror.Error
for _, metadata := range blobMetadatas {
err := b.Queue.HandleBlobFailure(ctx, metadata, b.MaxNumRetriesPerBlob)
Expand All @@ -185,6 +185,7 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.
}
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Failed)
}
b.Metrics.UpdateBatchError(reason, len(blobMetadatas))

// Return the error(s)
return result.ErrorOrNil()
Expand Down Expand Up @@ -214,7 +215,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
log.Trace("[batcher] Getting batch header hash...")
headerHash, err := batch.BatchHeader.GetBatchHeaderHash()
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata)
_ = b.handleFailure(ctx, batch.BlobMetadata, FailBatchHeaderHash)
return fmt.Errorf("HandleSingleBatch: error getting batch header hash: %w", err)
}

Expand All @@ -230,7 +231,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
stageTimer = time.Now()
aggSig, err := b.Aggregator.AggregateSignatures(batch.BatchMetadata.State, quorumIDs, headerHash, update)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata)
_ = b.handleFailure(ctx, batch.BlobMetadata, FailAggregateSignatures)
return fmt.Errorf("HandleSingleBatch: error aggregating signatures: %w", err)
}
log.Trace("[batcher] AggregateSignatures took", "duration", time.Since(stageTimer))
Expand All @@ -240,7 +241,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
passed, numPassed := getBlobQuorumPassStatus(aggSig.QuorumResults, batch.BlobHeaders)
// TODO(mooselumph): Determine whether to confirm the batch based on the number of successes
if numPassed == 0 {
_ = b.handleFailure(ctx, batch.BlobMetadata)
_ = b.handleFailure(ctx, batch.BlobMetadata, FailNoSignatures)
return fmt.Errorf("HandleSingleBatch: no blobs received sufficient signatures")
}

Expand All @@ -249,7 +250,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
stageTimer = time.Now()
txnReceipt, err := b.Confirmer.ConfirmBatch(ctx, batch.BatchHeader, aggSig.QuorumResults, aggSig)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata)
_ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch)
return fmt.Errorf("HandleSingleBatch: error confirming batch: %w", err)
}
log.Trace("[batcher] ConfirmBatch took", "duration", time.Since(stageTimer))
Expand All @@ -259,7 +260,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {

batchID, err := b.getBatchID(ctx, txnReceipt)
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata)
_ = b.handleFailure(ctx, batch.BlobMetadata, FailGetBatchID)
return fmt.Errorf("HandleSingleBatch: error fetching batch ID: %w", err)
}

Expand Down Expand Up @@ -335,7 +336,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
}

if len(blobsToRetry) > 0 {
_ = b.handleFailure(ctx, blobsToRetry)
_ = b.handleFailure(ctx, blobsToRetry, FailUpdateConfirmationInfo)
if len(blobsToRetry) == len(batch.BlobMetadata) {
return fmt.Errorf("HandleSingleBatch: failed to update blob confirmed metadata for all blobs in batch: %w", updateConfirmationInfoErr)
}
Expand Down
24 changes: 24 additions & 0 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type FailReason string

const (
FailBatchHeaderHash FailReason = "batch_header_hash"
FailAggregateSignatures FailReason = "aggregate_signatures"
FailNoSignatures FailReason = "no_signatures"
FailConfirmBatch FailReason = "confirm_batch"
FailGetBatchID FailReason = "get_batch_id"
FailUpdateConfirmationInfo FailReason = "update_confirmation_info"
)

type MetricsConfig struct {
HTTPPort string
EnableMetrics bool
Expand All @@ -32,6 +43,7 @@ type Metrics struct {
BatchProcLatency *prometheus.SummaryVec
GasUsed prometheus.Gauge
Attestation *prometheus.GaugeVec
BatchError *prometheus.CounterVec

httpPort string
logger common.Logger
Expand Down Expand Up @@ -96,6 +108,14 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
},
[]string{"type"},
),
BatchError: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "batch_error",
Help: "number of batch errors",
},
[]string{"type"},
),
registry: reg,
httpPort: httpPort,
logger: logger,
Expand Down Expand Up @@ -133,6 +153,10 @@ func (g *Metrics) IncrementBatchCount(size int64) {
g.Batch.WithLabelValues("size").Add(float64(size))
}

func (g *Metrics) UpdateBatchError(errType FailReason, numBlobs int) {
g.BatchError.WithLabelValues(string(errType)).Add(float64(numBlobs))
}

func (g *Metrics) ObserveLatency(stage string, latencyMs float64) {
g.BatchProcLatency.WithLabelValues(stage).Observe(latencyMs)
}
Expand Down
Loading