Skip to content

Commit

Permalink
Handle the concurrency in the accounting of batch storing and validation
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix committed Jan 18, 2024
1 parent 9d4be86 commit 6ba09fb
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,12 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
err error

// The keys that are stored to database for a single batch.
// Undefined if the err is not nil or err is ErrBatchAlreadyExist.
// Defined only if the batch not already exists and gets stored to database successfully.
keys *[][]byte

// Latency (in ms) to store the batch.
// Defined only if the batch not already exists and gets stored to database successfully.
latency float64
}
storeChan := make(chan storeResult)
go func(n *Node) {
Expand All @@ -287,16 +291,14 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
// If batch already exists, we don't store it again, but we should not
// error out in such case.
if err == ErrBatchAlreadyExist {
storeChan <- storeResult{err: nil, keys: nil}
storeChan <- storeResult{err: nil, keys: nil, latency: 0}
} else {
storeChan <- storeResult{err: fmt.Errorf("failed to store batch: %w", err), keys: nil}
storeChan <- storeResult{err: fmt.Errorf("failed to store batch: %w", err), keys: nil, latency: 0}
}
return
}
n.Metrics.AcceptBatches("stored", batchSize)
n.Metrics.ObserveLatency("StoreChunks", "stored", float64(time.Since(start).Milliseconds()))
n.Logger.Debug("Store batch took", "duration:", time.Since(start))
storeChan <- storeResult{err: nil, keys: keys}
storeChan <- storeResult{err: nil, keys: keys, latency: float64(time.Since(start).Milliseconds())}
}(n)

// Validate batch.
Expand All @@ -322,6 +324,10 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
if result.err != nil {
return nil, err
}
if result.keys != nil {
n.Metrics.AcceptBatches("stored", batchSize)
n.Metrics.ObserveLatency("StoreChunks", "stored", result.latency)
}

// Sign batch header hash if all validation checks pass and data items are written to database.
stageTimer = time.Now()
Expand Down

0 comments on commit 6ba09fb

Please sign in to comment.