Skip to content

Commit

Permalink
get multiple blob metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Aug 15, 2024
1 parent 12f1a6f commit 6eac895
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 4 deletions.
2 changes: 2 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ func (c *Client) GetItem(ctx context.Context, tableName string, key Key) (Item,
return resp.Item, nil
}

// GetItems returns the items for the given keys
// Note: ordering of items is not guaranteed
func (c *Client) GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
items, err := c.readItems(ctx, tableName, keys)
if err != nil {
Expand Down
34 changes: 30 additions & 4 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,18 @@ func (s *BlobMetadataStore) QueueNewBlobMetadata(ctx context.Context, blobMetada
return s.dynamoDBClient.PutItem(ctx, s.tableName, item)
}

func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey) (*disperser.BlobMetadata, error) {
func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobKey) (*disperser.BlobMetadata, error) {
item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{
"BlobHash": &types.AttributeValueMemberS{
Value: metadataKey.BlobHash,
Value: blobKey.BlobHash,
},
"MetadataHash": &types.AttributeValueMemberS{
Value: metadataKey.MetadataHash,
Value: blobKey.MetadataHash,
},
})

if item == nil {
return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, metadataKey)
return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey)
}

if err != nil {
Expand All @@ -90,6 +90,32 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, metadataKey dis
return metadata, nil
}

// GetBulkBlobMetadata returns the metadata for the given blob keys
// Note: ordering of items is not guaranteed
func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*disperser.BlobMetadata, error) {
keys := make([]map[string]types.AttributeValue, len(blobKeys))
for i := 0; i < len(blobKeys); i += 1 {
keys[i] = map[string]types.AttributeValue{
"BlobHash": &types.AttributeValueMemberS{Value: blobKeys[i].BlobHash},
"MetadataHash": &types.AttributeValueMemberS{Value: blobKeys[i].MetadataHash},
}
}
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys)
if err != nil {
return nil, err
}

metadata := make([]*disperser.BlobMetadata, len(items))
for i, item := range items {
metadata[i], err = UnmarshalBlobMetadata(item)
if err != nil {
return nil, err
}
}

return metadata, nil
}

// GetBlobMetadataByStatus returns all the metadata with the given status
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented.
Expand Down
5 changes: 5 additions & 0 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, metadata2, fetchedMetadata)

fetchBulk, err := blobMetadataStore.GetBulkBlobMetadata(ctx, []disperser.BlobKey{blobKey1, blobKey2})
assert.NoError(t, err)
assert.Equal(t, metadata1, fetchBulk[0])
assert.Equal(t, metadata2, fetchBulk[1])

processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Len(t, processing, 1)
Expand Down
4 changes: 4 additions & 0 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ func (s *SharedBlobStore) GetBlobMetadata(ctx context.Context, metadataKey dispe
return s.blobMetadataStore.GetBlobMetadata(ctx, metadataKey)
}

func (s *SharedBlobStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*disperser.BlobMetadata, error) {
return s.blobMetadataStore.GetBulkBlobMetadata(ctx, blobKeys)
}

func (s *SharedBlobStore) HandleBlobFailure(ctx context.Context, metadata *disperser.BlobMetadata, maxRetry uint) (bool, error) {
if metadata.NumRetries < maxRetry {
if err := s.MarkBlobProcessing(ctx, metadata.GetBlobKey()); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,18 @@ func (q *BlobStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobK
return nil, disperser.ErrBlobNotFound
}

func (q *BlobStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*disperser.BlobMetadata, error) {
q.mu.RLock()
defer q.mu.RUnlock()
metas := make([]*disperser.BlobMetadata, len(blobKeys))
for i, key := range blobKeys {
if meta, ok := q.Metadata[key]; ok {
metas[i] = meta
}
}
return metas, nil
}

func (q *BlobStore) HandleBlobFailure(ctx context.Context, metadata *disperser.BlobMetadata, maxRetry uint) (bool, error) {
if metadata.NumRetries < maxRetry {
if err := q.MarkBlobProcessing(ctx, metadata.GetBlobKey()); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ type BlobStore interface {
GetAllBlobMetadataByBatchWithPagination(ctx context.Context, batchHeaderHash [32]byte, limit int32, exclusiveStartKey *BatchIndexExclusiveStartKey) ([]*BlobMetadata, *BatchIndexExclusiveStartKey, error)
// GetBlobMetadata returns a blob metadata given a metadata key
GetBlobMetadata(ctx context.Context, blobKey BlobKey) (*BlobMetadata, error)
// GetBulkBlobMetadata returns a list of blob metadata given a list of blob keys
GetBulkBlobMetadata(ctx context.Context, blobKeys []BlobKey) ([]*BlobMetadata, error)
// HandleBlobFailure handles a blob failure by either incrementing the retry count or marking the blob as failed
// Returns a boolean indicating whether the blob should be retried and an error
HandleBlobFailure(ctx context.Context, metadata *BlobMetadata, maxRetry uint) (bool, error)
Expand Down

0 comments on commit 6eac895

Please sign in to comment.