From 2d3bb0457c30a1af17a22e4b40fd0888d0606113 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Mon, 8 Jan 2024 20:26:00 -0800 Subject: [PATCH] update encoding streamer to consider blobs pending confirmation --- disperser/batcher/batcher.go | 8 ++ disperser/batcher/batcher_test.go | 109 ++++++++++++++++++++++++ disperser/batcher/encoded_blob_store.go | 35 ++++++-- disperser/batcher/encoding_streamer.go | 11 +++ 4 files changed, 158 insertions(+), 5 deletions(-) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 0ec01c3c8..079c54f92 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -351,6 +351,7 @@ func (b *Batcher) ProcessConfirmedBatch(ctx context.Context, receiptOrErr *Recei func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error { var result *multierror.Error for _, metadata := range blobMetadatas { + b.EncodingStreamer.RemoveEncodedBlob(metadata) err := b.Queue.HandleBlobFailure(ctx, metadata, b.MaxNumRetriesPerBlob) if err != nil { b.logger.Error("HandleSingleBatch: error handling blob failure", "err", err) @@ -446,6 +447,13 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { if err != nil { _ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch) return fmt.Errorf("HandleSingleBatch: error sending confirmBatch transaction: %w", err) + } else { + for _, metadata := range batch.BlobMetadata { + err = b.EncodingStreamer.MarkBlobPendingConfirmation(metadata) + if err != nil { + log.Error("HandleSingleBatch: error marking blob as pending confirmation", "err", err) + } + } } return nil diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index e38e436fe..5d335e94d 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -260,8 +260,15 @@ func TestBlobFailures(t *testing.T) { metadatas, err := blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing) assert.NoError(t, err) assert.Len(t, metadatas, 1) + encodedResult, err := components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0) + assert.Error(t, err) + assert.Nil(t, encodedResult) // Test with receipt response with no block number + err = components.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + err = components.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) components.encodingStreamer.ReferenceBlockNumber = 10 err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) @@ -282,6 +289,10 @@ func TestBlobFailures(t *testing.T) { assert.Equal(t, uint(2), meta.NumRetries) // Try again + err = components.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + err = components.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) components.encodingStreamer.ReferenceBlockNumber = 10 err = batcher.HandleSingleBatch(ctx) assert.NoError(t, err) @@ -303,6 +314,104 @@ func TestBlobFailures(t *testing.T) { assert.Equal(t, uint(2), meta.NumRetries) } +// TestBlobRetry tests that the blob that has been dispersed to DA nodes but is pending onchain confirmation isn't re-dispersed. +func TestBlobRetry(t *testing.T) { + blob := makeTestBlob([]*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + QuorumThreshold: 100, + }}) + + components, batcher := makeBatcher(t) + blobStore := components.blobStore + ctx := context.Background() + _, blobKey := queueBlob(t, ctx, &blob, blobStore) + + // Start the batcher + out := make(chan bat.EncodingResultOrStatus) + err := components.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + err = components.encodingStreamer.ProcessEncodedBlobs(ctx, <-out) + assert.NoError(t, err) + + encodedResult, err := components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0) + assert.NoError(t, err) + assert.Equal(t, encodedResult.Status, bat.PendingDispersal) + + txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) + components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil) + components.txnManager.On("ProcessTransaction").Return(nil) + + err = batcher.HandleSingleBatch(ctx) + assert.NoError(t, err) + + meta, err := blobStore.GetBlobMetadata(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, disperser.Processing, meta.BlobStatus) + encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0) + assert.NoError(t, err) + assert.Equal(t, encodedResult.Status, bat.PendingConfirmation) + + err = components.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + timer := time.NewTimer(1 * time.Second) + select { + case <-out: + t.Fatal("shouldn't have picked up any blobs to encode") + case <-timer.C: + } + batch, err := components.encodingStreamer.CreateBatch() + assert.ErrorContains(t, err, "no encoded results") + assert.Nil(t, batch) + + // Shouldn't pick up any blobs to encode + components.encodingStreamer.ReferenceBlockNumber = 12 + err = components.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + timer = time.NewTimer(1 * time.Second) + select { + case <-out: + t.Fatal("shouldn't have picked up any blobs to encode") + case <-timer.C: + } + + batch, err = components.encodingStreamer.CreateBatch() + assert.ErrorContains(t, err, "no encoded results") + assert.Nil(t, batch) + _, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0) + assert.NoError(t, err) + + meta, err = blobStore.GetBlobMetadata(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, disperser.Processing, meta.BlobStatus) + + // Trigger a retry + confirmationErr := fmt.Errorf("error") + err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{ + Receipt: nil, + Err: confirmationErr, + Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata, + }) + assert.ErrorIs(t, err, confirmationErr) + + components.encodingStreamer.ReferenceBlockNumber = 14 + // Should pick up the blob to encode + err = components.encodingStreamer.RequestEncoding(ctx, out) + assert.NoError(t, err) + timer = time.NewTimer(1 * time.Second) + var res bat.EncodingResultOrStatus + select { + case res = <-out: + case <-timer.C: + t.Fatal("should have picked up the blob to encode") + } + err = components.encodingStreamer.ProcessEncodedBlobs(ctx, res) + assert.NoError(t, err) + encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0) + assert.NoError(t, err) + assert.Equal(t, encodedResult.Status, bat.PendingDispersal) +} + func TestRetryTxnReceipt(t *testing.T) { var err error blob := makeTestBlob([]*core.SecurityParam{{ diff --git a/disperser/batcher/encoded_blob_store.go b/disperser/batcher/encoded_blob_store.go index 48574878f..bdf9eb860 100644 --- a/disperser/batcher/encoded_blob_store.go +++ b/disperser/batcher/encoded_blob_store.go @@ -10,6 +10,12 @@ import ( ) type requestID string +type status uint + +const ( + PendingDispersal status = iota + PendingConfirmation +) type encodedBlobStore struct { mu sync.RWMutex @@ -30,6 +36,7 @@ type EncodingResult struct { Commitment *core.BlobCommitments Chunks []*core.Chunk Assignments map[core.OperatorID]core.Assignment + Status status } // EncodingResultOrStatus is a wrapper for EncodingResult that also contains an error @@ -66,7 +73,7 @@ func (e *encodedBlobStore) HasEncodingRequested(blobKey disperser.BlobKey, quoru } res, ok := e.encoded[requestID] - if ok && res.ReferenceBlockNumber == referenceBlockNumber { + if ok && (res.Status == PendingConfirmation || res.ReferenceBlockNumber == referenceBlockNumber) { return true } return false @@ -132,23 +139,28 @@ func (e *encodedBlobStore) DeleteEncodingResult(blobKey disperser.BlobKey, quoru e.encodedResultSize -= getChunksSize(encodedResult) } -// GetNewAndDeleteStaleEncodingResults returns all the fresh encoded results and deletes all the stale results +// GetNewAndDeleteStaleEncodingResults returns all the fresh encoded results that are pending dispersal, and deletes all the stale results that are older than the given block number func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint) []*EncodingResult { e.mu.Lock() defer e.mu.Unlock() fetched := make([]*EncodingResult, 0) staleCount := 0 + pendingConfirmation := 0 for k, encodedResult := range e.encoded { - if encodedResult.ReferenceBlockNumber < blockNumber { + if encodedResult.Status == PendingConfirmation { + pendingConfirmation++ + } else if encodedResult.ReferenceBlockNumber == blockNumber { + fetched = append(fetched, encodedResult) + } else if encodedResult.ReferenceBlockNumber < blockNumber { // this is safe: https://go.dev/doc/effective_go#for delete(e.encoded, k) staleCount++ e.encodedResultSize -= getChunksSize(encodedResult) } else { - fetched = append(fetched, encodedResult) + e.logger.Error("GetNewAndDeleteStaleEncodingResults: unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "blockNumber", blockNumber, "status", encodedResult.Status) } } - e.logger.Trace("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "blockNumber", blockNumber, "encodedSize", e.encodedResultSize) + e.logger.Trace("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "pendingConfirmation", pendingConfirmation, "blockNumber", blockNumber, "encodedSize", e.encodedResultSize) return fetched } @@ -161,6 +173,19 @@ func (e *encodedBlobStore) GetEncodedResultSize() (int, uint64) { return len(e.encoded), e.encodedResultSize } +func (e *encodedBlobStore) MarkEncodedResultPendingConfirmation(blobKey disperser.BlobKey, quorumID core.QuorumID) error { + e.mu.Lock() + defer e.mu.Unlock() + + requestID := getRequestID(blobKey, quorumID) + if _, ok := e.encoded[requestID]; !ok { + return fmt.Errorf("MarkEncodedBlobPendingConfirmation: no such key (%s) in encoded set", requestID) + } + + e.encoded[requestID].Status = PendingConfirmation + return nil +} + func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID { return requestID(fmt.Sprintf("%s-%d", key.String(), quorumID)) } diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 4ff48bf76..e5009f626 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -359,6 +359,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata Commitment: commits, Chunks: chunks, Assignments: res.Assignments, + Status: PendingDispersal, }, Err: nil, } @@ -545,6 +546,16 @@ func (e *EncodingStreamer) RemoveEncodedBlob(metadata *disperser.BlobMetadata) { } } +func (e *EncodingStreamer) MarkBlobPendingConfirmation(metadata *disperser.BlobMetadata) error { + for _, sp := range metadata.RequestMetadata.SecurityParams { + err := e.EncodedBlobstore.MarkEncodedResultPendingConfirmation(metadata.GetBlobKey(), sp.QuorumID) + if err != nil { + return fmt.Errorf("error marking blob pending confirmation: %w", err) + } + } + return nil +} + // getOperatorState returns the operator state for the blobs that have valid quorums func (e *EncodingStreamer) getOperatorState(ctx context.Context, metadatas []*disperser.BlobMetadata, blockNumber uint) (*core.IndexedOperatorState, error) {