Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] s3 Blob Store #841

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions common/aws/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ func NewClient(ctx context.Context, cfg commonaws.ClientConfig, logger logging.L
return ref, err
}

func (s *client) CreateBucket(ctx context.Context, bucket string) error {
_, err := s.s3Client.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(bucket),
})
if err != nil {
return err
}

return nil
}

func (s *client) DownloadObject(ctx context.Context, bucket string, key string) ([]byte, error) {
var partMiBs int64 = 10
downloader := manager.NewDownloader(s.s3Client, func(d *manager.Downloader) {
Expand Down
1 change: 1 addition & 0 deletions common/aws/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package s3
import "context"

type Client interface {
CreateBucket(ctx context.Context, bucket string) error
DownloadObject(ctx context.Context, bucket string, key string) ([]byte, error)
UploadObject(ctx context.Context, bucket string, key string, data []byte) error
DeleteObject(ctx context.Context, bucket string, key string) error
Expand Down
4 changes: 4 additions & 0 deletions common/mock/s3_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ func NewS3Client() *S3Client {
return &S3Client{bucket: make(map[string][]byte)}
}

func (s *S3Client) CreateBucket(ctx context.Context, bucket string) error {
return nil
}

func (s *S3Client) DownloadObject(ctx context.Context, bucket string, key string) ([]byte, error) {
data, ok := s.bucket[key]
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"math/big"
"os"
"testing"
"time"

"github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils"
"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/inabox/deploy"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand All @@ -31,10 +31,13 @@ var (
deployLocalStack bool
localStackPort = "4571"

s3Client s3.Client
dynamoClient *dynamodb.Client
blobStore *blobstore.BlobStore
blobMetadataStore *blobstore.BlobMetadataStore

UUID = uuid.New()
s3BucketName = "test-eigenda-blobstore"
metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID)

mockCommitment = encoding.BlobCommitments{}
Expand Down Expand Up @@ -82,7 +85,19 @@ func setup(m *testing.M) {
panic("failed to create dynamodb client: " + err.Error())
}

blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName)

s3Client, err = s3.NewClient(context.Background(), cfg, logger)
if err != nil {
teardown()
panic("failed to create s3 client: " + err.Error())
}
err = s3Client.CreateBucket(context.Background(), s3BucketName)
if err != nil {
teardown()
panic("failed to create s3 bucket: " + err.Error())
}
blobStore = blobstore.NewBlobStore(s3BucketName, s3Client, logger)

var X1, Y1 fp.Element
X1 = *X1.SetBigInt(big.NewInt(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,14 @@ type BlobMetadataStore struct {
dynamoDBClient *commondynamodb.Client
logger logging.Logger
tableName string
ttl time.Duration
}

func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore {
logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl)
func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string) *BlobMetadataStore {
logger.Debugf("creating blob metadata store v2 with table %s", tableName)
return &BlobMetadataStore{
dynamoDBClient: dynamoDBClient,
logger: logger.With("component", "blobMetadataStoreV2"),
tableName: tableName,
ttl: ttl,
}
}

Expand Down
42 changes: 42 additions & 0 deletions disperser/common/v2/blobstore/s3_blob_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package blobstore

import (
"context"

"github.com/Layr-Labs/eigenda/common/aws/s3"
"github.com/Layr-Labs/eigensdk-go/logging"
)

type BlobStore struct {
bucketName string
s3Client s3.Client
logger logging.Logger
}

func NewBlobStore(s3BucketName string, s3Client s3.Client, logger logging.Logger) *BlobStore {
return &BlobStore{
bucketName: s3BucketName,
s3Client: s3Client,
logger: logger,
}
}

// StoreBlob adds a blob to the blob store
func (b *BlobStore) StoreBlob(ctx context.Context, blobKey string, data []byte) error {
err := b.s3Client.UploadObject(ctx, b.bucketName, blobKey, data)
if err != nil {
b.logger.Errorf("failed to upload blob in bucket %s: %v", b.bucketName, err)
return err
}
return nil
}

// GetBlob retrieves a blob from the blob store
func (b *BlobStore) GetBlob(ctx context.Context, blobKey string) ([]byte, error) {
data, err := b.s3Client.DownloadObject(ctx, b.bucketName, blobKey)
if err != nil {
b.logger.Errorf("failed to download blob from bucket %s: %v", b.bucketName, err)
return nil, err
}
return data, nil
}
Comment on lines +25 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we eventually merge the fragmented uploading utility, we will want to revisit this code. A prerequisite to using fragmented uploads will be to store extra metadata for each blob (i.e. fragment size + blob size needed to download without round trips).

No need to wait to merge this PR. The fact that blobs are fragmeneted in S3 should be invisible to things utilizing this API. As long as we don't deploy this code to production environments, a switchover to the fragmented strategy won't be that tricky.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we want the fragmented uploading in the blob store. This is not the relay store that stores chunks, but the one that stores whole blobs. I think this simple wrapper can serve the blob store in the long run too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok waiting to have the discussion for how we want to store blobs. It's mostly a question as to whether we want to try and improve latency for really big blobs. Since we don't really support super big blobs currently, it might be something we can avoid dealing with for the MVP.

22 changes: 22 additions & 0 deletions disperser/common/v2/blobstore/s3_blob_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package blobstore_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestStoreGetBlob(t *testing.T) {
err := blobStore.StoreBlob(context.Background(), "testBlobKey", []byte("testBlobData"))
assert.NoError(t, err)
data, err := blobStore.GetBlob(context.Background(), "testBlobKey")
assert.NoError(t, err)
assert.Equal(t, []byte("testBlobData"), data)
}

func TestGetBlobNotFound(t *testing.T) {
data, err := blobStore.GetBlob(context.Background(), "nonExistentBlobKey")
assert.Error(t, err)
assert.Nil(t, data)
}
Loading