From b735f98d75dc44d3d1cefead53db929ed2b1d206 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 4 Aug 2022 16:19:40 +0200 Subject: [PATCH] Make indexing s3 bucket configurable --- exp/lighthorizon/index/backend/s3.go | 32 ++++++++++++------------ exp/lighthorizon/index/builder.go | 2 +- exp/lighthorizon/index/connect.go | 21 +++++++--------- exp/lighthorizon/index/store.go | 2 +- exp/lighthorizon/main.go | 2 +- exp/lighthorizon/services/cursor_test.go | 11 +++++--- 6 files changed, 35 insertions(+), 35 deletions(-) diff --git a/exp/lighthorizon/index/backend/s3.go b/exp/lighthorizon/index/backend/s3.go index 59df939596..996b8f3cec 100644 --- a/exp/lighthorizon/index/backend/s3.go +++ b/exp/lighthorizon/index/backend/s3.go @@ -19,17 +19,16 @@ import ( types "github.com/stellar/go/exp/lighthorizon/index/types" ) -const BUCKET = "horizon-index" - type S3Backend struct { s3Session *session.Session downloader *s3manager.Downloader uploader *s3manager.Uploader parallel uint32 - prefix string + pathPrefix string + bucket string } -func NewS3Backend(awsConfig *aws.Config, prefix string, parallel uint32) (*S3Backend, error) { +func NewS3Backend(awsConfig *aws.Config, bucket string, pathPrefix string, parallel uint32) (*S3Backend, error) { s3Session, err := session.NewSession(awsConfig) if err != nil { return nil, err @@ -40,7 +39,8 @@ func NewS3Backend(awsConfig *aws.Config, prefix string, parallel uint32) (*S3Bac downloader: s3manager.NewDownloader(s3Session), uploader: s3manager.NewUploader(s3Session), parallel: parallel, - prefix: prefix, + pathPrefix: pathPrefix, + bucket: bucket, }, nil } @@ -52,10 +52,10 @@ func (s *S3Backend) FlushAccounts(accounts []string) error { return err } - path := filepath.Join(s.prefix, "accounts") + path := filepath.Join(s.pathPrefix, "accounts") _, err = s.uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(BUCKET), + Bucket: aws.String(s.bucket), Key: aws.String(path), Body: &buf, }) @@ -81,7 +81,7 @@ func (s *S3Backend) writeBatch(b *batch) error { path := s.path(b.account) _, err := s.uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(BUCKET), + Bucket: aws.String(s.bucket), Key: aws.String(path), Body: &buf, }) @@ -97,7 +97,7 @@ func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error var buf bytes.Buffer for key, index := range indexes { buf.Reset() - path := filepath.Join(s.prefix, "tx", key) + path := filepath.Join(s.pathPrefix, "tx", key) zw := gzip.NewWriter(&buf) if _, err := index.WriteTo(zw); err != nil { @@ -111,7 +111,7 @@ func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error } _, err := s.uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(BUCKET), + Bucket: aws.String(s.bucket), Key: aws.String(path), Body: &buf, }) @@ -127,9 +127,9 @@ func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error func (s *S3Backend) ReadAccounts() ([]string, error) { log.Debugf("Downloading accounts list") b := &aws.WriteAtBuffer{} - path := filepath.Join(s.prefix, "accounts") + path := filepath.Join(s.pathPrefix, "accounts") n, err := s.downloader.Download(b, &s3.GetObjectInput{ - Bucket: aws.String(BUCKET), + Bucket: aws.String(s.bucket), Key: aws.String(path), }) if err != nil { @@ -147,7 +147,7 @@ func (s *S3Backend) ReadAccounts() ([]string, error) { } func (s *S3Backend) path(account string) string { - return filepath.Join(s.prefix, account[:10], account) + return filepath.Join(s.pathPrefix, account[:10], account) } func (s *S3Backend) Read(account string) (types.NamedIndices, error) { @@ -159,7 +159,7 @@ func (s *S3Backend) Read(account string) (types.NamedIndices, error) { path := s.path(account) var n int64 n, err = s.downloader.Download(b, &s3.GetObjectInput{ - Bucket: aws.String(BUCKET), + Bucket: aws.String(s.bucket), Key: aws.String(path), }) if err != nil { @@ -189,9 +189,9 @@ func (s *S3Backend) ReadTransactions(prefix string) (*types.TrieIndex, error) { // Check if index exists in S3 log.Debugf("Downloading index: %s", prefix) b := &aws.WriteAtBuffer{} - path := filepath.Join(s.prefix, "tx", prefix) + path := filepath.Join(s.pathPrefix, "tx", prefix) n, err := s.downloader.Download(b, &s3.GetObjectInput{ - Bucket: aws.String(BUCKET), + Bucket: aws.String(s.bucket), Key: aws.String(path), }) if err != nil { diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go index 5079801a1d..b3ce5509f5 100644 --- a/exp/lighthorizon/index/builder.go +++ b/exp/lighthorizon/index/builder.go @@ -34,7 +34,7 @@ func BuildIndices( L := log.Ctx(ctx).WithField("service", "builder") indexStore, err := ConnectWithConfig(StoreConfig{ - Url: targetUrl, + URL: targetUrl, Workers: uint32(workerCount), Log: L.WithField("subservice", "index"), }) diff --git a/exp/lighthorizon/index/connect.go b/exp/lighthorizon/index/connect.go index 8c6d90173e..018f21ec48 100644 --- a/exp/lighthorizon/index/connect.go +++ b/exp/lighthorizon/index/connect.go @@ -6,12 +6,11 @@ import ( "path/filepath" "github.com/aws/aws-sdk-go/aws" - backend "github.com/stellar/go/exp/lighthorizon/index/backend" ) func Connect(backendUrl string) (Store, error) { - return ConnectWithConfig(StoreConfig{Url: backendUrl}) + return ConnectWithConfig(StoreConfig{URL: backendUrl}) } func ConnectWithConfig(config StoreConfig) (Store, error) { @@ -19,7 +18,7 @@ func ConnectWithConfig(config StoreConfig) (Store, error) { config.Workers = 1 } - parsed, err := url.Parse(config.Url) + parsed, err := url.Parse(config.URL) if err != nil { return nil, err } @@ -31,29 +30,27 @@ func ConnectWithConfig(config StoreConfig) (Store, error) { awsConfig.Region = aws.String(region) } - config.Url = parsed.Path - return NewS3Store(awsConfig, config) + return NewS3Store(awsConfig, parsed.Host, parsed.Path, config) case "file": - config.Url = filepath.Join(parsed.Host, parsed.Path) - return NewFileStore(config) + return NewFileStore(filepath.Join(parsed.Host, parsed.Path), config) default: return nil, fmt.Errorf("unknown URL scheme: '%s' (from %s)", - parsed.Scheme, config.Url) + parsed.Scheme, config.URL) } } -func NewFileStore(config StoreConfig) (Store, error) { - backend, err := backend.NewFileBackend(config.Url, config.Workers) +func NewFileStore(prefix string, config StoreConfig) (Store, error) { + backend, err := backend.NewFileBackend(prefix, config.Workers) if err != nil { return nil, err } return NewStore(backend, config) } -func NewS3Store(awsConfig *aws.Config, indexConfig StoreConfig) (Store, error) { - backend, err := backend.NewS3Backend(awsConfig, indexConfig.Url, indexConfig.Workers) +func NewS3Store(awsConfig *aws.Config, bucket string, prefix string, indexConfig StoreConfig) (Store, error) { + backend, err := backend.NewS3Backend(awsConfig, bucket, prefix, indexConfig.Workers) if err != nil { return nil, err } diff --git a/exp/lighthorizon/index/store.go b/exp/lighthorizon/index/store.go index a5f975b8bb..ddea9e70f4 100644 --- a/exp/lighthorizon/index/store.go +++ b/exp/lighthorizon/index/store.go @@ -38,7 +38,7 @@ type Store interface { type StoreConfig struct { // init time config - Url string + URL string Workers uint32 // runtime config diff --git a/exp/lighthorizon/main.go b/exp/lighthorizon/main.go index ae62264b7d..4d61808243 100644 --- a/exp/lighthorizon/main.go +++ b/exp/lighthorizon/main.go @@ -34,7 +34,7 @@ if left empty, uses a temporary directory`) registry := prometheus.NewRegistry() indexStore, err := index.ConnectWithConfig(index.StoreConfig{ - Url: *indexesUrl, + URL: *indexesUrl, Log: L.WithField("subservice", "index"), Metrics: registry, }) diff --git a/exp/lighthorizon/services/cursor_test.go b/exp/lighthorizon/services/cursor_test.go index 4239c5105a..1fa07bd14a 100644 --- a/exp/lighthorizon/services/cursor_test.go +++ b/exp/lighthorizon/services/cursor_test.go @@ -21,10 +21,13 @@ func TestAccountTransactionCursorManager(t *testing.T) { accountId := keypair.MustRandom().Address() // Create an index and fill it with some checkpoint details. - store, err := index.NewFileStore(index.StoreConfig{ - Url: "file://" + t.TempDir(), - Workers: 4, - }) + tmp := t.TempDir() + store, err := index.NewFileStore(tmp, + index.StoreConfig{ + URL: "file://" + tmp, + Workers: 4, + }, + ) require.NoError(t, err) for _, checkpoint := range []uint32{1, 5, 10} {