diff --git a/cmd/autonomous-auctioneer/config.go b/cmd/autonomous-auctioneer/config.go index bdb5479950..d3f96a8f85 100644 --- a/cmd/autonomous-auctioneer/config.go +++ b/cmd/autonomous-auctioneer/config.go @@ -131,6 +131,9 @@ func (c *AutonomousAuctioneerConfig) GetReloadInterval() time.Duration { } func (c *AutonomousAuctioneerConfig) Validate() error { + if err := c.AuctioneerServer.S3Storage.Validate(); err != nil { + return err + } return nil } diff --git a/das/s3_storage_service.go b/das/s3_storage_service.go index 4c0dcaf5a3..7e4e227a54 100644 --- a/das/s3_storage_service.go +++ b/das/s3_storage_service.go @@ -7,13 +7,10 @@ import ( "bytes" "context" "fmt" - "io" "math" "time" "github.com/aws/aws-sdk-go-v2/aws" - awsConfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" flag "github.com/spf13/pflag" @@ -24,16 +21,9 @@ import ( "github.com/offchainlabs/nitro/arbstate/daprovider" "github.com/offchainlabs/nitro/das/dastree" "github.com/offchainlabs/nitro/util/pretty" + "github.com/offchainlabs/nitro/util/s3client" ) -type S3Uploader interface { - Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) -} - -type S3Downloader interface { - Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) -} - type S3StorageServiceConfig struct { Enable bool `koanf:"enable"` AccessKey string `koanf:"access-key"` @@ -57,16 +47,14 @@ func S3ConfigAddOptions(prefix string, f *flag.FlagSet) { } type S3StorageService struct { - client *s3.Client + client s3client.FullClient bucket string objectPrefix string - uploader S3Uploader - downloader S3Downloader discardAfterTimeout bool } func NewS3StorageService(config S3StorageServiceConfig) (StorageService, error) { - client, err := buildS3Client(config.AccessKey, config.SecretKey, config.Region) + client, err := s3client.NewS3FullClient(config.AccessKey, config.SecretKey, config.Region) if err != nil { return nil, err } @@ -74,31 +62,15 @@ func NewS3StorageService(config S3StorageServiceConfig) (StorageService, error) client: client, bucket: config.Bucket, objectPrefix: config.ObjectPrefix, - uploader: manager.NewUploader(client), - downloader: manager.NewDownloader(client), discardAfterTimeout: config.DiscardAfterTimeout, }, nil } -func buildS3Client(accessKey, secretKey, region string) (*s3.Client, error) { - cfg, err := awsConfig.LoadDefaultConfig(context.TODO(), awsConfig.WithRegion(region), func(options *awsConfig.LoadOptions) error { - // remain backward compatible with accessKey and secretKey credentials provided via cli flags - if accessKey != "" && secretKey != "" { - options.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "") - } - return nil - }) - if err != nil { - return nil, err - } - return s3.NewFromConfig(cfg), nil -} - func (s3s *S3StorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) { log.Trace("das.S3StorageService.GetByHash", "key", pretty.PrettyHash(key), "this", s3s) buf := manager.NewWriteAtBuffer([]byte{}) - _, err := s3s.downloader.Download(ctx, buf, &s3.GetObjectInput{ + _, err := s3s.client.Download(ctx, buf, &s3.GetObjectInput{ Bucket: aws.String(s3s.bucket), Key: aws.String(s3s.objectPrefix + EncodeStorageServiceKey(key)), }) @@ -116,7 +88,7 @@ func (s3s *S3StorageService) Put(ctx context.Context, value []byte, timeout uint expires := time.Unix(int64(timeout), 0) putObjectInput.Expires = &expires } - _, err := s3s.uploader.Upload(ctx, &putObjectInput) + _, err := s3s.client.Upload(ctx, &putObjectInput) if err != nil { log.Error("das.S3StorageService.Store", "err", err) } @@ -143,6 +115,6 @@ func (s3s *S3StorageService) String() string { } func (s3s *S3StorageService) HealthCheck(ctx context.Context) error { - _, err := s3s.client.HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(s3s.bucket)}) + _, err := s3s.client.Client().HeadBucket(ctx, &s3.HeadBucketInput{Bucket: aws.String(s3s.bucket)}) return err } diff --git a/das/s3_storage_service_test.go b/das/s3_storage_service_test.go index d7a715b435..f4bd6f3107 100644 --- a/das/s3_storage_service_test.go +++ b/das/s3_storage_service_test.go @@ -18,11 +18,15 @@ import ( "github.com/offchainlabs/nitro/das/dastree" ) -type mockS3Uploader struct { +type mockS3FullClient struct { mockStorageService StorageService } -func (m *mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) { +func (m *mockS3FullClient) Client() *s3.Client { + return nil +} + +func (m *mockS3FullClient) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) { buf := new(bytes.Buffer) _, err := buf.ReadFrom(input.Body) if err != nil { @@ -33,11 +37,7 @@ func (m *mockS3Uploader) Upload(ctx context.Context, input *s3.PutObjectInput, o return nil, err } -type mockS3Downloader struct { - mockStorageService StorageService -} - -func (m *mockS3Downloader) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) { +func (m *mockS3FullClient) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) { key, err := DecodeStorageServiceKey(*input.Key) if err != nil { return 0, err @@ -56,10 +56,11 @@ func (m *mockS3Downloader) Download(ctx context.Context, w io.WriterAt, input *s func NewTestS3StorageService(ctx context.Context, s3Config genericconf.S3Config) (StorageService, error) { mockStorageService := NewMemoryBackedStorageService(ctx) + s3FullClient := &mockS3FullClient{mockStorageService} return &S3StorageService{ - bucket: s3Config.Bucket, - uploader: &mockS3Uploader{mockStorageService}, - downloader: &mockS3Downloader{mockStorageService}}, nil + bucket: s3Config.Bucket, + client: s3FullClient, + }, nil } func TestS3StorageService(t *testing.T) { diff --git a/timeboost/auctioneer.go b/timeboost/auctioneer.go index 1c66161252..5c40505a6b 100644 --- a/timeboost/auctioneer.go +++ b/timeboost/auctioneer.go @@ -69,6 +69,7 @@ type AuctioneerServerConfig struct { AuctionContractAddress string `koanf:"auction-contract-address"` DbDirectory string `koanf:"db-directory"` AuctionResolutionWaitTime time.Duration `koanf:"auction-resolution-wait-time"` + S3Storage S3StorageServiceConfig `koanf:"s3-storage"` } var DefaultAuctioneerServerConfig = AuctioneerServerConfig{ @@ -77,6 +78,7 @@ var DefaultAuctioneerServerConfig = AuctioneerServerConfig{ ConsumerConfig: pubsub.DefaultConsumerConfig, StreamTimeout: 10 * time.Minute, AuctionResolutionWaitTime: 2 * time.Second, + S3Storage: DefaultS3StorageServiceConfig, } var TestAuctioneerServerConfig = AuctioneerServerConfig{ @@ -98,6 +100,7 @@ func AuctioneerServerConfigAddOptions(prefix string, f *pflag.FlagSet) { f.String(prefix+".auction-contract-address", DefaultAuctioneerServerConfig.AuctionContractAddress, "express lane auction contract address") f.String(prefix+".db-directory", DefaultAuctioneerServerConfig.DbDirectory, "path to database directory for persisting validated bids in a sqlite file") f.Duration(prefix+".auction-resolution-wait-time", DefaultAuctioneerServerConfig.AuctionResolutionWaitTime, "wait time after auction closing before resolving the auction") + S3StorageServiceConfigAddOptions(prefix+".s3-storage", f) } // AuctioneerServer is a struct that represents an autonomous auctioneer. @@ -117,6 +120,7 @@ type AuctioneerServer struct { streamTimeout time.Duration auctionResolutionWaitTime time.Duration database *SqliteDatabase + s3StorageService *S3StorageService } // NewAuctioneerServer creates a new autonomous auctioneer struct. @@ -138,6 +142,13 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf if err != nil { return nil, err } + var s3StorageService *S3StorageService + if cfg.S3Storage.Enable { + s3StorageService, err = NewS3StorageService(&cfg.S3Storage, database) + if err != nil { + return nil, err + } + } auctionContractAddr := common.HexToAddress(cfg.AuctionContractAddress) redisClient, err := redisutil.RedisClientFromURL(cfg.RedisURL) if err != nil { @@ -203,6 +214,7 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf chainId: chainId, client: sequencerClient, database: database, + s3StorageService: s3StorageService, consumer: c, auctionContract: auctionContract, auctionContractAddr: auctionContractAddr, @@ -215,6 +227,10 @@ func NewAuctioneerServer(ctx context.Context, configFetcher AuctioneerServerConf func (a *AuctioneerServer) Start(ctx_in context.Context) { a.StopWaiter.Start(ctx_in, a) + // Start S3 storage service to persist validated bids to s3 + if a.s3StorageService != nil { + a.s3StorageService.Start(ctx_in) + } // Channel that consumer uses to indicate its readiness. readyStream := make(chan struct{}, 1) a.consumer.Start(ctx_in) diff --git a/timeboost/db.go b/timeboost/db.go index d5825166d6..8d71a510a8 100644 --- a/timeboost/db.go +++ b/timeboost/db.go @@ -129,6 +129,36 @@ func (d *SqliteDatabase) InsertBid(b *ValidatedBid) error { return nil } +func (d *SqliteDatabase) GetBids(maxDbRows int) ([]*SqliteDatabaseBid, uint64, error) { + d.lock.Lock() + defer d.lock.Unlock() + var maxRound uint64 + query := `SELECT MAX(Round) FROM Bids` + err := d.sqlDB.Get(&maxRound, query) + if err != nil { + return nil, 0, fmt.Errorf("failed to fetch maxRound from bids: %w", err) + } + var sqlDBbids []*SqliteDatabaseBid + if maxDbRows == 0 { + if err := d.sqlDB.Select(&sqlDBbids, "SELECT * FROM Bids WHERE Round < ? ORDER BY Round ASC", maxRound); err != nil { + return nil, 0, err + } + return sqlDBbids, maxRound, nil + } + if err := d.sqlDB.Select(&sqlDBbids, "SELECT * FROM Bids WHERE Round < ? ORDER BY Round ASC LIMIT ?", maxRound, maxDbRows); err != nil { + return nil, 0, err + } + // We should return contiguous set of bids + for i := len(sqlDBbids) - 1; i > 0; i-- { + if sqlDBbids[i].Round != sqlDBbids[i-1].Round { + return sqlDBbids[:i], sqlDBbids[i].Round, nil + } + } + // If we can't determine a contiguous set of bids, we abort and retry again. + // Saves us from cases where we sometime push same batch data twice + return nil, 0, nil +} + func (d *SqliteDatabase) DeleteBids(round uint64) error { d.lock.Lock() defer d.lock.Unlock() diff --git a/timeboost/db_test.go b/timeboost/db_test.go index 600e5adc8e..7bfae9c61a 100644 --- a/timeboost/db_test.go +++ b/timeboost/db_test.go @@ -15,16 +15,6 @@ import ( func TestInsertAndFetchBids(t *testing.T) { t.Parallel() - type DatabaseBid struct { - Id uint64 `db:"Id"` - ChainId string `db:"ChainId"` - Bidder string `db:"Bidder"` - ExpressLaneController string `db:"ExpressLaneController"` - AuctionContractAddress string `db:"AuctionContractAddress"` - Round uint64 `db:"Round"` - Amount string `db:"Amount"` - Signature string `db:"Signature"` - } tmpDir, err := os.MkdirTemp("", "*") require.NoError(t, err) @@ -57,7 +47,7 @@ func TestInsertAndFetchBids(t *testing.T) { for _, bid := range bids { require.NoError(t, db.InsertBid(bid)) } - gotBids := make([]*DatabaseBid, 2) + gotBids := make([]*SqliteDatabaseBid, 2) err = db.sqlDB.Select(&gotBids, "SELECT * FROM Bids ORDER BY Id") require.NoError(t, err) require.Equal(t, bids[0].Amount.String(), gotBids[0].Amount) diff --git a/timeboost/s3_storage.go b/timeboost/s3_storage.go new file mode 100644 index 0000000000..3235fa844b --- /dev/null +++ b/timeboost/s3_storage.go @@ -0,0 +1,247 @@ +package timeboost + +import ( + "bytes" + "context" + "encoding/csv" + "fmt" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/spf13/pflag" + + "github.com/ethereum/go-ethereum/log" + + "github.com/offchainlabs/nitro/util/gzip" + "github.com/offchainlabs/nitro/util/s3client" + "github.com/offchainlabs/nitro/util/stopwaiter" +) + +type S3StorageServiceConfig struct { + Enable bool `koanf:"enable"` + AccessKey string `koanf:"access-key"` + Bucket string `koanf:"bucket"` + ObjectPrefix string `koanf:"object-prefix"` + Region string `koanf:"region"` + SecretKey string `koanf:"secret-key"` + UploadInterval time.Duration `koanf:"upload-interval"` + MaxBatchSize int `koanf:"max-batch-size"` + MaxDbRows int `koanf:"max-db-rows"` +} + +func (c *S3StorageServiceConfig) Validate() error { + if !c.Enable { + return nil + } + if c.MaxBatchSize < 0 { + return fmt.Errorf("invalid max-batch-size value for auctioneer's s3-storage config, it should be non-negative, got: %d", c.MaxBatchSize) + } + if c.MaxDbRows < 0 { + return fmt.Errorf("invalid max-db-rows value for auctioneer's s3-storage config, it should be non-negative, got: %d", c.MaxDbRows) + } + return nil +} + +var DefaultS3StorageServiceConfig = S3StorageServiceConfig{ + Enable: false, + UploadInterval: 15 * time.Minute, + MaxBatchSize: 100000000, + MaxDbRows: 0, // Disabled by default +} + +func S3StorageServiceConfigAddOptions(prefix string, f *pflag.FlagSet) { + f.Bool(prefix+".enable", DefaultS3StorageServiceConfig.Enable, "enable persisting of valdiated bids to AWS S3 bucket") + f.String(prefix+".access-key", DefaultS3StorageServiceConfig.AccessKey, "S3 access key") + f.String(prefix+".bucket", DefaultS3StorageServiceConfig.Bucket, "S3 bucket") + f.String(prefix+".object-prefix", DefaultS3StorageServiceConfig.ObjectPrefix, "prefix to add to S3 objects") + f.String(prefix+".region", DefaultS3StorageServiceConfig.Region, "S3 region") + f.String(prefix+".secret-key", DefaultS3StorageServiceConfig.SecretKey, "S3 secret key") + f.Duration(prefix+".upload-interval", DefaultS3StorageServiceConfig.UploadInterval, "frequency at which batches are uploaded to S3") + f.Int(prefix+".max-batch-size", DefaultS3StorageServiceConfig.MaxBatchSize, "max size of uncompressed batch in bytes to be uploaded to S3") + f.Int(prefix+".max-db-rows", DefaultS3StorageServiceConfig.MaxDbRows, "when the sql db is very large, this enables reading of db in chunks instead of all at once which might cause OOM") +} + +type S3StorageService struct { + stopwaiter.StopWaiter + config *S3StorageServiceConfig + client s3client.FullClient + sqlDB *SqliteDatabase + bucket string + objectPrefix string + lastFailedDeleteRound uint64 +} + +func NewS3StorageService(config *S3StorageServiceConfig, sqlDB *SqliteDatabase) (*S3StorageService, error) { + client, err := s3client.NewS3FullClient(config.AccessKey, config.SecretKey, config.Region) + if err != nil { + return nil, err + } + return &S3StorageService{ + config: config, + client: client, + sqlDB: sqlDB, + bucket: config.Bucket, + objectPrefix: config.ObjectPrefix, + }, nil +} + +func (s *S3StorageService) Start(ctx context.Context) { + s.StopWaiter.Start(ctx, s) + if err := s.LaunchThreadSafe(func(ctx context.Context) { + ticker := time.NewTicker(s.config.UploadInterval) + defer ticker.Stop() + for { + interval := s.uploadBatches(ctx) + if ctx.Err() != nil { + return + } + if interval != s.config.UploadInterval { // Indicates error case, so we'll retry sooner than upload-interval + time.Sleep(interval) + continue + } + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } + }); err != nil { + log.Error("Failed to launch s3-storage service of auctioneer", "err", err) + } +} + +// Used in padding round numbers to a fixed length for naming the batch being uploaded to s3. - +const fixedRoundStrLen = 7 + +func (s *S3StorageService) getBatchName(firstRound, lastRound uint64) string { + padder := "%0" + strconv.Itoa(fixedRoundStrLen) + "d" + now := time.Now() + return fmt.Sprintf("%svalidated-timeboost-bids/%d/%02d/%02d/"+padder+"-"+padder+".csv.gzip", s.objectPrefix, now.Year(), now.Month(), now.Day(), firstRound, lastRound) +} +func (s *S3StorageService) uploadBatch(ctx context.Context, batch []byte, firstRound, lastRound uint64) error { + compressedData, err := gzip.CompressGzip(batch) + if err != nil { + return err + } + key := s.getBatchName(firstRound, lastRound) + putObjectInput := s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + Body: bytes.NewReader(compressedData), + } + if _, err = s.client.Upload(ctx, &putObjectInput); err != nil { + return err + } + return nil +} + +// downloadBatch is only used for testing purposes +func (s *S3StorageService) downloadBatch(ctx context.Context, key string) ([]byte, error) { + buf := manager.NewWriteAtBuffer([]byte{}) + if _, err := s.client.Download(ctx, buf, &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + }); err != nil { + return nil, err + } + return gzip.DecompressGzip(buf.Bytes()) +} + +func csvRecordSize(record []string) int { + size := len(record) // comma between fields + newline + for _, entry := range record { + size += len(entry) + } + return size +} + +func (s *S3StorageService) uploadBatches(ctx context.Context) time.Duration { + // Before doing anything first try to delete the previously uploaded bids that were not successfully erased from the sqlDB + if s.lastFailedDeleteRound != 0 { + if err := s.sqlDB.DeleteBids(s.lastFailedDeleteRound); err != nil { + log.Error("error deleting s3-persisted bids from sql db using lastFailedDeleteRound", "lastFailedDeleteRound", s.lastFailedDeleteRound, "err", err) + return 5 * time.Second + } + s.lastFailedDeleteRound = 0 + } + + bids, round, err := s.sqlDB.GetBids(s.config.MaxDbRows) + if err != nil { + log.Error("Error fetching validated bids from sql DB", "round", round, "err", err) + return 5 * time.Second + } + // Nothing to persist or a contiguous set of bids wasn't found, so exit early + if len(bids) == 0 { + return s.config.UploadInterval + } + + var csvBuffer bytes.Buffer + var size int + var firstBidId int + csvWriter := csv.NewWriter(&csvBuffer) + uploadAndDeleteBids := func(firstRound, lastRound, deletRound uint64) error { + // End current batch when size exceeds MaxBatchSize and the current round ends + csvWriter.Flush() + if err := csvWriter.Error(); err != nil { + log.Error("Error flushing csv writer", "err", err) + return err + } + if err := s.uploadBatch(ctx, csvBuffer.Bytes(), firstRound, lastRound); err != nil { + log.Error("Error uploading batch to s3", "firstRound", firstRound, "lastRound", lastRound, "err", err) + return err + } + // After successful upload we should go ahead and delete the uploaded bids from DB to prevent duplicate uploads + // If the delete fails, we track the deleteRound until a future delete succeeds. + if err := s.sqlDB.DeleteBids(deletRound); err != nil { + log.Error("error deleting s3-persisted bids from sql db", "round", deletRound, "err", err) + s.lastFailedDeleteRound = deletRound + } else { + // Previously failed deletes dont matter anymore as the recent one (larger round number) succeeded + s.lastFailedDeleteRound = 0 + } + return nil + } + + header := []string{"ChainID", "Bidder", "ExpressLaneController", "AuctionContractAddress", "Round", "Amount", "Signature"} + if err := csvWriter.Write(header); err != nil { + log.Error("Error writing to csv writer", "err", err) + return 5 * time.Second + } + for index, bid := range bids { + record := []string{bid.ChainId, bid.Bidder, bid.ExpressLaneController, bid.AuctionContractAddress, fmt.Sprintf("%d", bid.Round), bid.Amount, bid.Signature} + if err := csvWriter.Write(record); err != nil { + log.Error("Error writing to csv writer", "err", err) + return 5 * time.Second + } + if s.config.MaxBatchSize != 0 { + size += csvRecordSize(record) + if size >= s.config.MaxBatchSize && index < len(bids)-1 && bid.Round != bids[index+1].Round { + if uploadAndDeleteBids(bids[firstBidId].Round, bid.Round, bids[index+1].Round) != nil { + return 5 * time.Second + } + // Reset csv for next batch + csvBuffer.Reset() + if err := csvWriter.Write(header); err != nil { + log.Error("Error writing to csv writer", "err", err) + return 5 * time.Second + } + size = 0 + firstBidId = index + 1 + } + } + } + if s.config.MaxBatchSize == 0 || size > 0 { + if uploadAndDeleteBids(bids[firstBidId].Round, bids[len(bids)-1].Round, round) != nil { + return 5 * time.Second + } + } + + if s.lastFailedDeleteRound != 0 { + return 5 * time.Second + } + + return s.config.UploadInterval +} diff --git a/timeboost/s3_storage_test.go b/timeboost/s3_storage_test.go new file mode 100644 index 0000000000..ae2d9a0c19 --- /dev/null +++ b/timeboost/s3_storage_test.go @@ -0,0 +1,238 @@ +package timeboost + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "math/big" + "testing" + + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common" +) + +type mockS3FullClient struct { + data map[string][]byte +} + +func newmockS3FullClient() *mockS3FullClient { + return &mockS3FullClient{make(map[string][]byte)} +} + +func (m *mockS3FullClient) clear() { + m.data = make(map[string][]byte) +} + +func (m *mockS3FullClient) Client() *s3.Client { + return nil +} + +func (m *mockS3FullClient) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) { + buf := new(bytes.Buffer) + _, err := buf.ReadFrom(input.Body) + if err != nil { + return nil, err + } + m.data[*input.Key] = buf.Bytes() + return nil, nil +} + +func (m *mockS3FullClient) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) { + if _, ok := m.data[*input.Key]; ok { + ret, err := w.WriteAt(m.data[*input.Key], 0) + if err != nil { + return 0, err + } + return int64(ret), nil + } + return 0, errors.New("key not found") +} + +func TestS3StorageServiceUploadAndDownload(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient := newmockS3FullClient() + s3StorageService := &S3StorageService{ + client: mockClient, + config: &S3StorageServiceConfig{MaxBatchSize: 0}, + } + + // Test upload and download of data + testData := []byte{1, 2, 3, 4} + require.NoError(t, s3StorageService.uploadBatch(ctx, testData, 10, 11)) + key := s3StorageService.getBatchName(10, 11) + gotData, err := s3StorageService.downloadBatch(ctx, key) + require.NoError(t, err) + require.Equal(t, testData, gotData) + + // Test interaction with sqlDB and upload of multiple batches + mockClient.clear() + db, err := NewDatabase(t.TempDir()) + require.NoError(t, err) + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(2), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000001"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000002"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000003"), + Round: 0, + Amount: big.NewInt(10), + Signature: []byte("signature0"), + })) + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(1), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000001"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000002"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000003"), + Round: 1, + Amount: big.NewInt(100), + Signature: []byte("signature1"), + })) + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(2), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000004"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000005"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000006"), + Round: 2, + Amount: big.NewInt(200), + Signature: []byte("signature2"), + })) + s3StorageService.sqlDB = db + + // Helper functions to verify correctness of batch uploads and + // Check if all the uploaded bids are removed from sql DB + verifyBatchUploadCorrectness := func(firstRound, lastRound uint64, wantBatch []byte) { + key = s3StorageService.getBatchName(firstRound, lastRound) + data, err := s3StorageService.downloadBatch(ctx, key) + require.NoError(t, err) + require.Equal(t, wantBatch, data) + } + var sqlDBbids []*SqliteDatabaseBid + checkUploadedBidsRemoval := func(remainingRound uint64) { + require.NoError(t, db.sqlDB.Select(&sqlDBbids, "SELECT * FROM Bids")) + require.Equal(t, 1, len(sqlDBbids)) + require.Equal(t, remainingRound, sqlDBbids[0].Round) + } + + // UploadBatches should upload only the first bid and only one bid (round = 2) should remain in the sql database + s3StorageService.uploadBatches(ctx) + verifyBatchUploadCorrectness(0, 1, []byte(`ChainID,Bidder,ExpressLaneController,AuctionContractAddress,Round,Amount,Signature +2,0x0000000000000000000000000000000000000003,0x0000000000000000000000000000000000000001,0x0000000000000000000000000000000000000002,0,10,signature0 +1,0x0000000000000000000000000000000000000003,0x0000000000000000000000000000000000000001,0x0000000000000000000000000000000000000002,1,100,signature1 +`)) + checkUploadedBidsRemoval(2) + + // UploadBatches should continue adding bids to the batch until round ends, even if its past MaxBatchSize + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(1), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000007"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000008"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000009"), + Round: 2, + Amount: big.NewInt(150), + Signature: []byte("signature3"), + })) + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(2), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000001"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000002"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000003"), + Round: 3, + Amount: big.NewInt(250), + Signature: []byte("signature4"), + })) + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(2), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000004"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000005"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000006"), + Round: 4, + Amount: big.NewInt(350), + Signature: []byte("signature5"), + })) + record := []string{sqlDBbids[0].ChainId, sqlDBbids[0].Bidder, sqlDBbids[0].ExpressLaneController, sqlDBbids[0].AuctionContractAddress, fmt.Sprintf("%d", sqlDBbids[0].Round), sqlDBbids[0].Amount, sqlDBbids[0].Signature} + s3StorageService.config.MaxBatchSize = csvRecordSize(record) + + // Round 2 bids should all be in the same batch even though the resulting batch exceeds MaxBatchSize + s3StorageService.uploadBatches(ctx) + verifyBatchUploadCorrectness(2, 2, []byte(`ChainID,Bidder,ExpressLaneController,AuctionContractAddress,Round,Amount,Signature +2,0x0000000000000000000000000000000000000006,0x0000000000000000000000000000000000000004,0x0000000000000000000000000000000000000005,2,200,signature2 +1,0x0000000000000000000000000000000000000009,0x0000000000000000000000000000000000000007,0x0000000000000000000000000000000000000008,2,150,signature3 +`)) + + // After Batching Round 2 bids we end that batch and create a new batch for Round 3 bids to adhere to MaxBatchSize rule + s3StorageService.uploadBatches(ctx) + verifyBatchUploadCorrectness(3, 3, []byte(`ChainID,Bidder,ExpressLaneController,AuctionContractAddress,Round,Amount,Signature +2,0x0000000000000000000000000000000000000003,0x0000000000000000000000000000000000000001,0x0000000000000000000000000000000000000002,3,250,signature4 +`)) + checkUploadedBidsRemoval(4) + + // Verify chunked reading of sql db + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(1), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000007"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000008"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000009"), + Round: 4, + Amount: big.NewInt(450), + Signature: []byte("signature6"), + })) + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(2), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000001"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000002"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000003"), + Round: 5, + Amount: big.NewInt(550), + Signature: []byte("signature7"), + })) + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(2), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000004"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000005"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000006"), + Round: 5, + Amount: big.NewInt(650), + Signature: []byte("signature8"), + })) + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(2), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000004"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000005"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000006"), + Round: 6, + Amount: big.NewInt(750), + Signature: []byte("signature9"), + })) + require.NoError(t, db.InsertBid(&ValidatedBid{ + ChainId: big.NewInt(1), + ExpressLaneController: common.HexToAddress("0x0000000000000000000000000000000000000004"), + AuctionContractAddress: common.HexToAddress("0x0000000000000000000000000000000000000005"), + Bidder: common.HexToAddress("0x0000000000000000000000000000000000000006"), + Round: 7, + Amount: big.NewInt(850), + Signature: []byte("signature10"), + })) + s3StorageService.config.MaxDbRows = 5 + + // Since config.MaxBatchSize is kept same and config.MaxDbRows is 5, sqldb.GetBids would return all bids from round 4 and 5, with round used for DeletBids as 6 + // maxBatchSize would then batch bids from round 4 & 5 separately and uploads them to s3 + s3StorageService.uploadBatches(ctx) + verifyBatchUploadCorrectness(4, 4, []byte(`ChainID,Bidder,ExpressLaneController,AuctionContractAddress,Round,Amount,Signature +2,0x0000000000000000000000000000000000000006,0x0000000000000000000000000000000000000004,0x0000000000000000000000000000000000000005,4,350,signature5 +1,0x0000000000000000000000000000000000000009,0x0000000000000000000000000000000000000007,0x0000000000000000000000000000000000000008,4,450,signature6 +`)) + verifyBatchUploadCorrectness(5, 5, []byte(`ChainID,Bidder,ExpressLaneController,AuctionContractAddress,Round,Amount,Signature +2,0x0000000000000000000000000000000000000003,0x0000000000000000000000000000000000000001,0x0000000000000000000000000000000000000002,5,550,signature7 +2,0x0000000000000000000000000000000000000006,0x0000000000000000000000000000000000000004,0x0000000000000000000000000000000000000005,5,650,signature8 +`)) + require.NoError(t, db.sqlDB.Select(&sqlDBbids, "SELECT * FROM Bids ORDER BY Round ASC")) + require.Equal(t, 2, len(sqlDBbids)) + require.Equal(t, uint64(6), sqlDBbids[0].Round) + require.Equal(t, uint64(7), sqlDBbids[1].Round) +} diff --git a/timeboost/schema.go b/timeboost/schema.go index 94fc04d1f1..68a70aac63 100644 --- a/timeboost/schema.go +++ b/timeboost/schema.go @@ -19,6 +19,7 @@ CREATE TABLE IF NOT EXISTS Bids ( Amount TEXT NOT NULL, Signature TEXT NOT NULL ); +CREATE INDEX idx_bids_round ON Bids(Round); ` schemaList = []string{version1} ) diff --git a/timeboost/types.go b/timeboost/types.go index bf049629df..73e2e0d2b6 100644 --- a/timeboost/types.go +++ b/timeboost/types.go @@ -236,3 +236,14 @@ func padBigInt(bi *big.Int) []byte { padded = append(padded, bb...) return padded } + +type SqliteDatabaseBid struct { + Id uint64 `db:"Id"` + ChainId string `db:"ChainId"` + Bidder string `db:"Bidder"` + ExpressLaneController string `db:"ExpressLaneController"` + AuctionContractAddress string `db:"AuctionContractAddress"` + Round uint64 `db:"Round"` + Amount string `db:"Amount"` + Signature string `db:"Signature"` +} diff --git a/util/gzip/gzip_compression.go b/util/gzip/gzip_compression.go new file mode 100644 index 0000000000..4ad069767c --- /dev/null +++ b/util/gzip/gzip_compression.go @@ -0,0 +1,34 @@ +package gzip + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" +) + +func CompressGzip(data []byte) ([]byte, error) { + var buffer bytes.Buffer + gzipWriter := gzip.NewWriter(&buffer) + if _, err := gzipWriter.Write(data); err != nil { + return nil, fmt.Errorf("failed to write to gzip writer: %w", err) + } + if err := gzipWriter.Close(); err != nil { + return nil, fmt.Errorf("failed to close gzip writer: %w", err) + } + return buffer.Bytes(), nil +} + +func DecompressGzip(data []byte) ([]byte, error) { + buffer := bytes.NewReader(data) + gzipReader, err := gzip.NewReader(buffer) + if err != nil { + return nil, fmt.Errorf("failed to create gzip reader: %w", err) + } + defer gzipReader.Close() + decompressData, err := io.ReadAll(gzipReader) + if err != nil { + return nil, fmt.Errorf("failed to read decompressed data: %w", err) + } + return decompressData, nil +} diff --git a/util/gzip/gzip_compression_test.go b/util/gzip/gzip_compression_test.go new file mode 100644 index 0000000000..c55dfb68c0 --- /dev/null +++ b/util/gzip/gzip_compression_test.go @@ -0,0 +1,21 @@ +package gzip + +import ( + "bytes" + "testing" +) + +func TestCompressDecompress(t *testing.T) { + sampleData := []byte{1, 2, 3, 4} + compressedData, err := CompressGzip(sampleData) + if err != nil { + t.Fatalf("got error gzip-compressing data: %v", err) + } + gotData, err := DecompressGzip(compressedData) + if err != nil { + t.Fatalf("got error gzip-decompressing data: %v", err) + } + if !bytes.Equal(sampleData, gotData) { + t.Fatal("original data and decompression of its compression don't match") + } +} diff --git a/util/s3client/s3client.go b/util/s3client/s3client.go new file mode 100644 index 0000000000..623107ea14 --- /dev/null +++ b/util/s3client/s3client.go @@ -0,0 +1,62 @@ +package s3client + +import ( + "context" + "io" + + awsConfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +type Uploader interface { + Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) +} + +type Downloader interface { + Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) +} + +type FullClient interface { + Uploader + Downloader + Client() *s3.Client +} + +type s3Client struct { + client *s3.Client + uploader Uploader + downloader Downloader +} + +func NewS3FullClient(accessKey, secretKey, region string) (FullClient, error) { + cfg, err := awsConfig.LoadDefaultConfig(context.TODO(), awsConfig.WithRegion(region), func(options *awsConfig.LoadOptions) error { + // remain backward compatible with accessKey and secretKey credentials provided via cli flags + if accessKey != "" && secretKey != "" { + options.Credentials = credentials.NewStaticCredentialsProvider(accessKey, secretKey, "") + } + return nil + }) + if err != nil { + return nil, err + } + client := s3.NewFromConfig(cfg) + return &s3Client{ + client: client, + uploader: manager.NewUploader(client), + downloader: manager.NewDownloader(client), + }, nil +} + +func (s *s3Client) Client() *s3.Client { + return s.client +} + +func (s *s3Client) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) { + return s.uploader.Upload(ctx, input, opts...) +} + +func (s *s3Client) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (n int64, err error) { + return s.downloader.Download(ctx, w, input, options...) +}