Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[encodingstreamer] Consider blobs pending confirmation #169

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func (b *Batcher) ProcessConfirmedBatch(ctx context.Context, receiptOrErr *Recei
func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.BlobMetadata, reason FailReason) error {
var result *multierror.Error
for _, metadata := range blobMetadatas {
b.EncodingStreamer.RemoveEncodedBlob(metadata)
err := b.Queue.HandleBlobFailure(ctx, metadata, b.MaxNumRetriesPerBlob)
if err != nil {
b.logger.Error("HandleSingleBatch: error handling blob failure", "err", err)
Expand Down Expand Up @@ -446,6 +447,13 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {
if err != nil {
_ = b.handleFailure(ctx, batch.BlobMetadata, FailConfirmBatch)
return fmt.Errorf("HandleSingleBatch: error sending confirmBatch transaction: %w", err)
} else {
for _, metadata := range batch.BlobMetadata {
err = b.EncodingStreamer.MarkBlobPendingConfirmation(metadata)
if err != nil {
log.Error("HandleSingleBatch: error marking blob as pending confirmation", "err", err)
}
}
}

return nil
Expand Down
109 changes: 109 additions & 0 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,15 @@ func TestBlobFailures(t *testing.T) {
metadatas, err := blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Len(t, metadatas, 1)
encodedResult, err := components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.Error(t, err)
assert.Nil(t, encodedResult)

// Test with receipt response with no block number
err = components.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
err = components.encodingStreamer.ProcessEncodedBlobs(ctx, <-out)
assert.NoError(t, err)
components.encodingStreamer.ReferenceBlockNumber = 10
err = batcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
Expand All @@ -282,6 +289,10 @@ func TestBlobFailures(t *testing.T) {
assert.Equal(t, uint(2), meta.NumRetries)

// Try again
err = components.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
err = components.encodingStreamer.ProcessEncodedBlobs(ctx, <-out)
assert.NoError(t, err)
components.encodingStreamer.ReferenceBlockNumber = 10
err = batcher.HandleSingleBatch(ctx)
assert.NoError(t, err)
Expand All @@ -303,6 +314,104 @@ func TestBlobFailures(t *testing.T) {
assert.Equal(t, uint(2), meta.NumRetries)
}

// TestBlobRetry tests that the blob that has been dispersed to DA nodes but is pending onchain confirmation isn't re-dispersed.
func TestBlobRetry(t *testing.T) {
blob := makeTestBlob([]*core.SecurityParam{{
QuorumID: 0,
AdversaryThreshold: 80,
QuorumThreshold: 100,
}})

components, batcher := makeBatcher(t)
blobStore := components.blobStore
ctx := context.Background()
_, blobKey := queueBlob(t, ctx, &blob, blobStore)

// Start the batcher
out := make(chan bat.EncodingResultOrStatus)
err := components.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
err = components.encodingStreamer.ProcessEncodedBlobs(ctx, <-out)
assert.NoError(t, err)

encodedResult, err := components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.NoError(t, err)
assert.Equal(t, encodedResult.Status, bat.PendingDispersal)

txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil)
components.transactor.On("BuildConfirmBatchTxn").Return(txn, nil)
components.txnManager.On("ProcessTransaction").Return(nil)

err = batcher.HandleSingleBatch(ctx)
assert.NoError(t, err)

meta, err := blobStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Processing, meta.BlobStatus)
encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.NoError(t, err)
assert.Equal(t, encodedResult.Status, bat.PendingConfirmation)

err = components.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
timer := time.NewTimer(1 * time.Second)
select {
case <-out:
t.Fatal("shouldn't have picked up any blobs to encode")
case <-timer.C:
}
batch, err := components.encodingStreamer.CreateBatch()
assert.ErrorContains(t, err, "no encoded results")
assert.Nil(t, batch)

// Shouldn't pick up any blobs to encode
components.encodingStreamer.ReferenceBlockNumber = 12
err = components.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
timer = time.NewTimer(1 * time.Second)
select {
case <-out:
t.Fatal("shouldn't have picked up any blobs to encode")
case <-timer.C:
}

batch, err = components.encodingStreamer.CreateBatch()
assert.ErrorContains(t, err, "no encoded results")
assert.Nil(t, batch)
_, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.NoError(t, err)

meta, err = blobStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Processing, meta.BlobStatus)

// Trigger a retry
confirmationErr := fmt.Errorf("error")
err = batcher.ProcessConfirmedBatch(ctx, &bat.ReceiptOrErr{
Receipt: nil,
Err: confirmationErr,
Metadata: components.txnManager.Requests[len(components.txnManager.Requests)-1].Metadata,
})
assert.ErrorIs(t, err, confirmationErr)

components.encodingStreamer.ReferenceBlockNumber = 14
// Should pick up the blob to encode
err = components.encodingStreamer.RequestEncoding(ctx, out)
assert.NoError(t, err)
timer = time.NewTimer(1 * time.Second)
var res bat.EncodingResultOrStatus
select {
case res = <-out:
case <-timer.C:
t.Fatal("should have picked up the blob to encode")
}
err = components.encodingStreamer.ProcessEncodedBlobs(ctx, res)
assert.NoError(t, err)
encodedResult, err = components.encodingStreamer.EncodedBlobstore.GetEncodingResult(blobKey, 0)
assert.NoError(t, err)
assert.Equal(t, encodedResult.Status, bat.PendingDispersal)
}

func TestRetryTxnReceipt(t *testing.T) {
var err error
blob := makeTestBlob([]*core.SecurityParam{{
Expand Down
35 changes: 30 additions & 5 deletions disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
)

type requestID string
type status uint

const (
PendingDispersal status = iota
PendingConfirmation
)

type encodedBlobStore struct {
mu sync.RWMutex
Expand All @@ -30,6 +36,7 @@ type EncodingResult struct {
Commitment *core.BlobCommitments
Chunks []*core.Chunk
Assignments map[core.OperatorID]core.Assignment
Status status
}

// EncodingResultOrStatus is a wrapper for EncodingResult that also contains an error
Expand Down Expand Up @@ -66,7 +73,7 @@ func (e *encodedBlobStore) HasEncodingRequested(blobKey disperser.BlobKey, quoru
}

res, ok := e.encoded[requestID]
if ok && res.ReferenceBlockNumber == referenceBlockNumber {
if ok && (res.Status == PendingConfirmation || res.ReferenceBlockNumber == referenceBlockNumber) {
return true
}
return false
Expand Down Expand Up @@ -132,23 +139,28 @@ func (e *encodedBlobStore) DeleteEncodingResult(blobKey disperser.BlobKey, quoru
e.encodedResultSize -= getChunksSize(encodedResult)
}

// GetNewAndDeleteStaleEncodingResults returns all the fresh encoded results and deletes all the stale results
// GetNewAndDeleteStaleEncodingResults returns all the fresh encoded results that are pending dispersal, and deletes all the stale results that are older than the given block number
func (e *encodedBlobStore) GetNewAndDeleteStaleEncodingResults(blockNumber uint) []*EncodingResult {
e.mu.Lock()
defer e.mu.Unlock()
fetched := make([]*EncodingResult, 0)
staleCount := 0
pendingConfirmation := 0
for k, encodedResult := range e.encoded {
if encodedResult.ReferenceBlockNumber < blockNumber {
if encodedResult.Status == PendingConfirmation {
pendingConfirmation++
} else if encodedResult.ReferenceBlockNumber == blockNumber {
fetched = append(fetched, encodedResult)
} else if encodedResult.ReferenceBlockNumber < blockNumber {
// this is safe: https://go.dev/doc/effective_go#for
delete(e.encoded, k)
staleCount++
e.encodedResultSize -= getChunksSize(encodedResult)
} else {
Comment on lines +154 to 159
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about whether this case could actually happen?

Possibilities:

  1. I guess since confirmation is happening in another thread, and we don't have locks (e.g. batcher.go, L288), it could be possible that the blob is marked as confirmed in the confirmation thread but not yet removed from the store. But not sure we need to handle that case here.
  2. A blob was encoded but somehow the confirmation transaction never took place. I guess this could probaby happen somehow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think encodedResult.ReferenceBlockNumber > blockNumber would be ever true, and it would be a bug in that case?

  1. Although there is no lock in batcher.go:288, it calls DeleteEncodingResult which would lock the encoded blob store. In that case though, encodedResult.Status would be PendingConfirmation, right?
  2. Depends on how the confirmation transaction ended up not taking place. If it failed somehow, handleFailure would remove the blob from encoded blob store so that it can be re-encoded & retried.

fetched = append(fetched, encodedResult)
e.logger.Error("GetNewAndDeleteStaleEncodingResults: unexpected case", "refBlockNumber", encodedResult.ReferenceBlockNumber, "blockNumber", blockNumber, "status", encodedResult.Status)
}
}
e.logger.Trace("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "blockNumber", blockNumber, "encodedSize", e.encodedResultSize)
e.logger.Trace("consumed encoded results", "fetched", len(fetched), "stale", staleCount, "pendingConfirmation", pendingConfirmation, "blockNumber", blockNumber, "encodedSize", e.encodedResultSize)

return fetched
}
Expand All @@ -161,6 +173,19 @@ func (e *encodedBlobStore) GetEncodedResultSize() (int, uint64) {
return len(e.encoded), e.encodedResultSize
}

func (e *encodedBlobStore) MarkEncodedResultPendingConfirmation(blobKey disperser.BlobKey, quorumID core.QuorumID) error {
e.mu.Lock()
defer e.mu.Unlock()

requestID := getRequestID(blobKey, quorumID)
if _, ok := e.encoded[requestID]; !ok {
return fmt.Errorf("MarkEncodedBlobPendingConfirmation: no such key (%s) in encoded set", requestID)
}

e.encoded[requestID].Status = PendingConfirmation
return nil
}

func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID {
return requestID(fmt.Sprintf("%s-%d", key.String(), quorumID))
}
Expand Down
11 changes: 11 additions & 0 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata
Commitment: commits,
Chunks: chunks,
Assignments: res.Assignments,
Status: PendingDispersal,
},
Err: nil,
}
Expand Down Expand Up @@ -545,6 +546,16 @@ func (e *EncodingStreamer) RemoveEncodedBlob(metadata *disperser.BlobMetadata) {
}
}

func (e *EncodingStreamer) MarkBlobPendingConfirmation(metadata *disperser.BlobMetadata) error {
for _, sp := range metadata.RequestMetadata.SecurityParams {
err := e.EncodedBlobstore.MarkEncodedResultPendingConfirmation(metadata.GetBlobKey(), sp.QuorumID)
if err != nil {
return fmt.Errorf("error marking blob pending confirmation: %w", err)
}
}
return nil
}

// getOperatorState returns the operator state for the blobs that have valid quorums
func (e *EncodingStreamer) getOperatorState(ctx context.Context, metadatas []*disperser.BlobMetadata, blockNumber uint) (*core.IndexedOperatorState, error) {

Expand Down
Loading