Skip to content

Commit

Permalink
Merge pull request #2745 from OffchainLabs/persist-auctioneerbids-s3
Browse files Browse the repository at this point in the history
Persist Auctioneer Bid DB to S3
  • Loading branch information
Tristan-Wilson authored Dec 27, 2024
2 parents 77df3c7 + 7e47ce2 commit e4da075
Show file tree
Hide file tree
Showing 13 changed files with 681 additions and 55 deletions.
3 changes: 3 additions & 0 deletions cmd/autonomous-auctioneer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (c *AutonomousAuctioneerConfig) GetReloadInterval() time.Duration {
}

func (c *AutonomousAuctioneerConfig) Validate() error {
if err := c.AuctioneerServer.S3Storage.Validate(); err != nil {
return err
}
return nil
}

Expand Down
40 changes: 6 additions & 34 deletions das/s3_storage_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ import (
"bytes"
"context"
"fmt"
"io"
"math"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
flag "github.com/spf13/pflag"
Expand All @@ -24,16 +21,9 @@ import (
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/pretty"
"github.com/offchainlabs/nitro/util/s3client"
)

type S3Uploader interface {
Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error)
}

type S3Downloader interface {
Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error)
}

type S3StorageServiceConfig struct {
Enable bool `koanf:"enable"`
AccessKey string `koanf:"access-key"`
Expand All @@ -57,48 +47,30 @@ func S3ConfigAddOptions(prefix string, f *flag.FlagSet) {
}

type S3StorageService struct {
client *s3.Client
client s3client.FullClient
bucket string
objectPrefix string
uploader S3Uploader
downloader S3Downloader
discardAfterTimeout bool
}

func NewS3StorageService(config S3StorageServiceConfig) (StorageService, error) {
client, err := buildS3Client(config.AccessKey, config.SecretKey, config.Region)
client, err := s3client.NewS3FullClient(config.AccessKey, config.SecretKey, config.Region)
if err != nil {
return nil, err
}
return &S3StorageService{
client: client,
bucket: config.Bucket,
objectPrefix: config.ObjectPrefix,
uploader: manager.NewUploader(client),
downloader: manager.NewDownloader(client),
discardAfterTimeout: config.DiscardAfterTimeout,
}, nil
}

func buildS3Client(accessKey, secretKey, region string) (*s3.Client, error) {
cfg, err := awsConfig.LoadDefaultConfig(context.TODO(), awsConfig.WithRegion(region), func(options *awsConfig.LoadOptions) error {
// remain backward compatible with accessKey and secretKey credentials provided via cli flags
if accessKey != "" && secretKey != "" {
options.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
}
return nil
})
if err != nil {
return nil, err
}
return s3.NewFromConfig(cfg), nil
}

func (s3s *S3StorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) {
log.Trace("das.S3StorageService.GetByHash", "key", pretty.PrettyHash(key), "this", s3s)

buf := manager.NewWriteAtBuffer([]byte{})
_, err := s3s.downloader.Download(ctx, buf, &s3.GetObjectInput{
_, err := s3s.client.Download(ctx, buf, &s3.GetObjectInput{
Bucket: aws.String(s3s.bucket),
Key: aws.String(s3s.objectPrefix + EncodeStorageServiceKey(key)),
})
Expand All @@ -116,7 +88,7 @@ func (s3s *S3StorageService) Put(ctx context.Context, value []byte, timeout uint
expires := time.Unix(int64(timeout), 0)
putObjectInput.Expires = &expires
}
_, err := s3s.uploader.Upload(ctx, &putObjectInput)
_, err := s3s.client.Upload(ctx, &putObjectInput)
if err != nil {
log.Error("das.S3StorageService.Store", "err", err)
}
Expand All @@ -143,6 +115,6 @@ func (s3s *S3StorageService) String() string {
}

func (s3s *S3StorageService) HealthCheck(ctx context.Context) error {
_, err := s3s.client.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(s3s.bucket)})
_, err := s3s.client.Client().HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(s3s.bucket)})
return err
}
21 changes: 11 additions & 10 deletions das/s3_storage_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ import (
"github.com/offchainlabs/nitro/das/dastree"
)

type mockS3Uploader struct {
type mockS3FullClient struct {
mockStorageService StorageService
}

func (m *mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) {
func (m *mockS3FullClient) Client() *s3.Client {
return nil
}

func (m *mockS3FullClient) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) {
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(input.Body)
if err != nil {
Expand All @@ -33,11 +37,7 @@ func (m *mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, o
return nil, err
}

type mockS3Downloader struct {
mockStorageService StorageService
}

func (m *mockS3Downloader) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) {
func (m *mockS3FullClient) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) {
key, err := DecodeStorageServiceKey(*input.Key)
if err != nil {
return 0, err
Expand All @@ -56,10 +56,11 @@ func (m *mockS3Downloader) Download(ctx context.Context, w io.WriterAt, input *s

func NewTestS3StorageService(ctx context.Context, s3Config genericconf.S3Config) (StorageService, error) {
mockStorageService := NewMemoryBackedStorageService(ctx)
s3FullClient := &mockS3FullClient{mockStorageService}
return &S3StorageService{
bucket: s3Config.Bucket,
uploader: &mockS3Uploader{mockStorageService},
downloader: &mockS3Downloader{mockStorageService}}, nil
bucket: s3Config.Bucket,
client: s3FullClient,
}, nil
}

func TestS3StorageService(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions timeboost/auctioneer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type AuctioneerServerConfig struct {
AuctionContractAddress string `koanf:"auction-contract-address"`
DbDirectory string `koanf:"db-directory"`
AuctionResolutionWaitTime time.Duration `koanf:"auction-resolution-wait-time"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`
}

var DefaultAuctioneerServerConfig = AuctioneerServerConfig{
Expand All @@ -77,6 +78,7 @@ var DefaultAuctioneerServerConfig = AuctioneerServerConfig{
ConsumerConfig: pubsub.DefaultConsumerConfig,
StreamTimeout: 10 * time.Minute,
AuctionResolutionWaitTime: 2 * time.Second,
S3Storage: DefaultS3StorageServiceConfig,
}

var TestAuctioneerServerConfig = AuctioneerServerConfig{
Expand All @@ -98,6 +100,7 @@ func AuctioneerServerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.String(prefix+".auction-contract-address", DefaultAuctioneerServerConfig.AuctionContractAddress, "express lane auction contract address")
f.String(prefix+".db-directory", DefaultAuctioneerServerConfig.DbDirectory, "path to database directory for persisting validated bids in a sqlite file")
f.Duration(prefix+".auction-resolution-wait-time", DefaultAuctioneerServerConfig.AuctionResolutionWaitTime, "wait time after auction closing before resolving the auction")
S3StorageServiceConfigAddOptions(prefix+".s3-storage", f)
}

// AuctioneerServer is a struct that represents an autonomous auctioneer.
Expand All @@ -117,6 +120,7 @@ type AuctioneerServer struct {
streamTimeout time.Duration
auctionResolutionWaitTime time.Duration
database *SqliteDatabase
s3StorageService *S3StorageService
}

// NewAuctioneerServer creates a new autonomous auctioneer struct.
Expand All @@ -138,6 +142,13 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf
if err != nil {
return nil, err
}
var s3StorageService *S3StorageService
if cfg.S3Storage.Enable {
s3StorageService, err = NewS3StorageService(&cfg.S3Storage, database)
if err != nil {
return nil, err
}
}
auctionContractAddr := common.HexToAddress(cfg.AuctionContractAddress)
redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL)
if err != nil {
Expand Down Expand Up @@ -203,6 +214,7 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf
chainId: chainId,
client: sequencerClient,
database: database,
s3StorageService: s3StorageService,
consumer: c,
auctionContract: auctionContract,
auctionContractAddr: auctionContractAddr,
Expand All @@ -215,6 +227,10 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf

func (a *AuctioneerServer) Start(ctx_in context.Context) {
a.StopWaiter.Start(ctx_in, a)
// Start S3 storage service to persist validated bids to s3
if a.s3StorageService != nil {
a.s3StorageService.Start(ctx_in)
}
// Channel that consumer uses to indicate its readiness.
readyStream := make(chan struct{}, 1)
a.consumer.Start(ctx_in)
Expand Down
30 changes: 30 additions & 0 deletions timeboost/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,36 @@ func (d *SqliteDatabase) InsertBid(b *ValidatedBid) error {
return nil
}

func (d *SqliteDatabase) GetBids(maxDbRows int) ([]*SqliteDatabaseBid, uint64, error) {
d.lock.Lock()
defer d.lock.Unlock()
var maxRound uint64
query := `SELECT MAX(Round) FROM Bids`
err := d.sqlDB.Get(&maxRound, query)
if err != nil {
return nil, 0, fmt.Errorf("failed to fetch maxRound from bids: %w", err)
}
var sqlDBbids []*SqliteDatabaseBid
if maxDbRows == 0 {
if err := d.sqlDB.Select(&sqlDBbids, "SELECT * FROM Bids WHERE Round < ? ORDER BY Round ASC", maxRound); err != nil {
return nil, 0, err
}
return sqlDBbids, maxRound, nil
}
if err := d.sqlDB.Select(&sqlDBbids, "SELECT * FROM Bids WHERE Round < ? ORDER BY Round ASC LIMIT ?", maxRound, maxDbRows); err != nil {
return nil, 0, err
}
// We should return contiguous set of bids
for i := len(sqlDBbids) - 1; i > 0; i-- {
if sqlDBbids[i].Round != sqlDBbids[i-1].Round {
return sqlDBbids[:i], sqlDBbids[i].Round, nil
}
}
// If we can't determine a contiguous set of bids, we abort and retry again.
// Saves us from cases where we sometime push same batch data twice
return nil, 0, nil
}

func (d *SqliteDatabase) DeleteBids(round uint64) error {
d.lock.Lock()
defer d.lock.Unlock()
Expand Down
12 changes: 1 addition & 11 deletions timeboost/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ import (

func TestInsertAndFetchBids(t *testing.T) {
t.Parallel()
type DatabaseBid struct {
Id uint64 `db:"Id"`
ChainId string `db:"ChainId"`
Bidder string `db:"Bidder"`
ExpressLaneController string `db:"ExpressLaneController"`
AuctionContractAddress string `db:"AuctionContractAddress"`
Round uint64 `db:"Round"`
Amount string `db:"Amount"`
Signature string `db:"Signature"`
}

tmpDir, err := os.MkdirTemp("", "*")
require.NoError(t, err)
Expand Down Expand Up @@ -57,7 +47,7 @@ func TestInsertAndFetchBids(t *testing.T) {
for _, bid := range bids {
require.NoError(t, db.InsertBid(bid))
}
gotBids := make([]*DatabaseBid, 2)
gotBids := make([]*SqliteDatabaseBid, 2)
err = db.sqlDB.Select(&gotBids, "SELECT * FROM Bids ORDER BY Id")
require.NoError(t, err)
require.Equal(t, bids[0].Amount.String(), gotBids[0].Amount)
Expand Down
Loading

0 comments on commit e4da075

Please sign in to comment.