Skip to content

Commit

Permalink
Fix: Synchronize inmem store to fix finalizer test (Layr-Labs#277)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Feb 23, 2024
1 parent 3ed5953 commit b744382
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
6 changes: 3 additions & 3 deletions disperser/batcher/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func (f *finalizer) FinalizeBlobs(ctx context.Context) error {
if err != nil {
return fmt.Errorf("FinalizeBlobs: error getting blob headers: %w", err)
}
metadatas := metadatas
f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metadatas), "finalizedBlockNumber", lastFinalBlock)
metas := metadatas
f.logger.Info("FinalizeBlobs: finalizing blobs", "numBlobs", len(metas), "finalizedBlockNumber", lastFinalBlock)
pool.Submit(func() {
f.updateBlobs(ctx, metadatas, lastFinalBlock)
f.updateBlobs(ctx, metas, lastFinalBlock)
})
totalProcessed += len(metadatas)

Expand Down
28 changes: 28 additions & 0 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"encoding/hex"
"sort"
"strconv"
"sync"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
)

// BlobStore is an in-memory implementation of the BlobStore interface
type BlobStore struct {
mu sync.RWMutex
Blobs map[disperser.BlobHash]*BlobHolder
Metadata map[disperser.BlobKey]*disperser.BlobMetadata
}
Expand All @@ -33,6 +35,8 @@ func NewBlobStore() disperser.BlobStore {
}

func (q *BlobStore) StoreBlob(ctx context.Context, blob *core.Blob, requestedAt uint64) (disperser.BlobKey, error) {
q.mu.Lock()
defer q.mu.Unlock()
blobKey := disperser.BlobKey{}
// Generate the blob key
blobHash, err := q.getNewBlobHash()
Expand Down Expand Up @@ -63,6 +67,8 @@ func (q *BlobStore) StoreBlob(ctx context.Context, blob *core.Blob, requestedAt
}

func (q *BlobStore) GetBlobContent(ctx context.Context, blobHash disperser.BlobHash) ([]byte, error) {
q.mu.RLock()
defer q.mu.RUnlock()
if holder, ok := q.Blobs[blobHash]; ok {
return holder.Data, nil
} else {
Expand All @@ -71,6 +77,8 @@ func (q *BlobStore) GetBlobContent(ctx context.Context, blobHash disperser.BlobH
}

func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationInfo *disperser.ConfirmationInfo) (*disperser.BlobMetadata, error) {
q.mu.Lock()
defer q.mu.Unlock()
// TODO (ian-shim): remove this check once we are sure that the metadata is never overwritten
refreshedMetadata, err := q.GetBlobMetadata(ctx, existingMetadata.GetBlobKey())
if err != nil {
Expand All @@ -92,6 +100,8 @@ func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *dis
}

func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existingMetadata *disperser.BlobMetadata, confirmationInfo *disperser.ConfirmationInfo) (*disperser.BlobMetadata, error) {
q.mu.Lock()
defer q.mu.Unlock()
blobKey := existingMetadata.GetBlobKey()
if _, ok := q.Metadata[blobKey]; !ok {
return nil, disperser.ErrBlobNotFound
Expand All @@ -104,6 +114,8 @@ func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existing
}

func (q *BlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.BlobKey) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
}
Expand All @@ -113,6 +125,8 @@ func (q *BlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.Blo
}

func (q *BlobStore) MarkBlobProcessing(ctx context.Context, blobKey disperser.BlobKey) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
}
Expand All @@ -122,6 +136,8 @@ func (q *BlobStore) MarkBlobProcessing(ctx context.Context, blobKey disperser.Bl
}

func (q *BlobStore) MarkBlobFailed(ctx context.Context, blobKey disperser.BlobKey) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
}
Expand All @@ -131,6 +147,8 @@ func (q *BlobStore) MarkBlobFailed(ctx context.Context, blobKey disperser.BlobKe
}

func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadata *disperser.BlobMetadata) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok {
return disperser.ErrBlobNotFound
}
Expand All @@ -140,6 +158,8 @@ func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadat
}

func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperser.BlobMetadata) (map[disperser.BlobKey]*core.Blob, error) {
q.mu.RLock()
defer q.mu.RUnlock()
blobs := make(map[disperser.BlobKey]*core.Blob)
for _, meta := range metadata {
if holder, ok := q.Blobs[meta.BlobHash]; ok {
Expand All @@ -155,6 +175,8 @@ func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperse
}

func (q *BlobStore) GetBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) ([]*disperser.BlobMetadata, error) {
q.mu.RLock()
defer q.mu.RUnlock()
metas := make([]*disperser.BlobMetadata, 0)
for _, meta := range q.Metadata {
if meta.BlobStatus == status {
Expand All @@ -165,6 +187,8 @@ func (q *BlobStore) GetBlobMetadataByStatus(ctx context.Context, status disperse
}

func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) {
q.mu.RLock()
defer q.mu.RUnlock()
metas := make([]*disperser.BlobMetadata, 0)
foundStart := exclusiveStartKey == nil

Expand Down Expand Up @@ -206,6 +230,8 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s
}

func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) {
q.mu.RLock()
defer q.mu.RUnlock()
for _, meta := range q.Metadata {
if meta.ConfirmationInfo != nil && meta.ConfirmationInfo.BatchHeaderHash == batchHeaderHash && meta.ConfirmationInfo.BlobIndex == blobIndex {
return meta, nil
Expand All @@ -216,6 +242,8 @@ func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]
}

func (q *BlobStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) {
q.mu.RLock()
defer q.mu.RUnlock()
metas := make([]*disperser.BlobMetadata, 0)
for _, meta := range q.Metadata {
if meta.ConfirmationInfo != nil && meta.ConfirmationInfo.BatchHeaderHash == batchHeaderHash {
Expand Down

0 comments on commit b744382

Please sign in to comment.