Skip to content

Commit

Permalink
Make indexing s3 bucket configurable (#4507)
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio authored Aug 4, 2022
1 parent 2e50ba8 commit 7d26a32
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 35 deletions.
32 changes: 16 additions & 16 deletions exp/lighthorizon/index/backend/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ import (
types "github.com/stellar/go/exp/lighthorizon/index/types"
)

const BUCKET = "horizon-index"

type S3Backend struct {
s3Session *session.Session
downloader *s3manager.Downloader
uploader *s3manager.Uploader
parallel uint32
prefix string
pathPrefix string
bucket string
}

func NewS3Backend(awsConfig *aws.Config, prefix string, parallel uint32) (*S3Backend, error) {
func NewS3Backend(awsConfig *aws.Config, bucket string, pathPrefix string, parallel uint32) (*S3Backend, error) {
s3Session, err := session.NewSession(awsConfig)
if err != nil {
return nil, err
Expand All @@ -40,7 +39,8 @@ func NewS3Backend(awsConfig *aws.Config, prefix string, parallel uint32) (*S3Bac
downloader: s3manager.NewDownloader(s3Session),
uploader: s3manager.NewUploader(s3Session),
parallel: parallel,
prefix: prefix,
pathPrefix: pathPrefix,
bucket: bucket,
}, nil
}

Expand All @@ -52,10 +52,10 @@ func (s *S3Backend) FlushAccounts(accounts []string) error {
return err
}

path := filepath.Join(s.prefix, "accounts")
path := filepath.Join(s.pathPrefix, "accounts")

_, err = s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
Body: &buf,
})
Expand All @@ -81,7 +81,7 @@ func (s *S3Backend) writeBatch(b *batch) error {
path := s.path(b.account)

_, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
Body: &buf,
})
Expand All @@ -97,7 +97,7 @@ func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error
var buf bytes.Buffer
for key, index := range indexes {
buf.Reset()
path := filepath.Join(s.prefix, "tx", key)
path := filepath.Join(s.pathPrefix, "tx", key)

zw := gzip.NewWriter(&buf)
if _, err := index.WriteTo(zw); err != nil {
Expand All @@ -111,7 +111,7 @@ func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error
}

_, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
Body: &buf,
})
Expand All @@ -127,9 +127,9 @@ func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error
func (s *S3Backend) ReadAccounts() ([]string, error) {
log.Debugf("Downloading accounts list")
b := &aws.WriteAtBuffer{}
path := filepath.Join(s.prefix, "accounts")
path := filepath.Join(s.pathPrefix, "accounts")
n, err := s.downloader.Download(b, &s3.GetObjectInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
})
if err != nil {
Expand All @@ -147,7 +147,7 @@ func (s *S3Backend) ReadAccounts() ([]string, error) {
}

func (s *S3Backend) path(account string) string {
return filepath.Join(s.prefix, account[:10], account)
return filepath.Join(s.pathPrefix, account[:10], account)
}

func (s *S3Backend) Read(account string) (types.NamedIndices, error) {
Expand All @@ -159,7 +159,7 @@ func (s *S3Backend) Read(account string) (types.NamedIndices, error) {
path := s.path(account)
var n int64
n, err = s.downloader.Download(b, &s3.GetObjectInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
})
if err != nil {
Expand Down Expand Up @@ -189,9 +189,9 @@ func (s *S3Backend) ReadTransactions(prefix string) (*types.TrieIndex, error) {
// Check if index exists in S3
log.Debugf("Downloading index: %s", prefix)
b := &aws.WriteAtBuffer{}
path := filepath.Join(s.prefix, "tx", prefix)
path := filepath.Join(s.pathPrefix, "tx", prefix)
n, err := s.downloader.Download(b, &s3.GetObjectInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func BuildIndices(
L := log.Ctx(ctx).WithField("service", "builder")

indexStore, err := ConnectWithConfig(StoreConfig{
Url: targetUrl,
URL: targetUrl,
Workers: uint32(workerCount),
Log: L.WithField("subservice", "index"),
})
Expand Down
21 changes: 9 additions & 12 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@ import (
"path/filepath"

"github.com/aws/aws-sdk-go/aws"

backend "github.com/stellar/go/exp/lighthorizon/index/backend"
)

func Connect(backendUrl string) (Store, error) {
return ConnectWithConfig(StoreConfig{Url: backendUrl})
return ConnectWithConfig(StoreConfig{URL: backendUrl})
}

func ConnectWithConfig(config StoreConfig) (Store, error) {
if config.Workers <= 0 {
config.Workers = 1
}

parsed, err := url.Parse(config.Url)
parsed, err := url.Parse(config.URL)
if err != nil {
return nil, err
}
Expand All @@ -31,29 +30,27 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
awsConfig.Region = aws.String(region)
}

config.Url = parsed.Path
return NewS3Store(awsConfig, config)
return NewS3Store(awsConfig, parsed.Host, parsed.Path, config)

case "file":
config.Url = filepath.Join(parsed.Host, parsed.Path)
return NewFileStore(config)
return NewFileStore(filepath.Join(parsed.Host, parsed.Path), config)

default:
return nil, fmt.Errorf("unknown URL scheme: '%s' (from %s)",
parsed.Scheme, config.Url)
parsed.Scheme, config.URL)
}
}

func NewFileStore(config StoreConfig) (Store, error) {
backend, err := backend.NewFileBackend(config.Url, config.Workers)
func NewFileStore(prefix string, config StoreConfig) (Store, error) {
backend, err := backend.NewFileBackend(prefix, config.Workers)
if err != nil {
return nil, err
}
return NewStore(backend, config)
}

func NewS3Store(awsConfig *aws.Config, indexConfig StoreConfig) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, indexConfig.Url, indexConfig.Workers)
func NewS3Store(awsConfig *aws.Config, bucket string, prefix string, indexConfig StoreConfig) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, bucket, prefix, indexConfig.Workers)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Store interface {

type StoreConfig struct {
// init time config
Url string
URL string
Workers uint32

// runtime config
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ if left empty, uses a temporary directory`)

registry := prometheus.NewRegistry()
indexStore, err := index.ConnectWithConfig(index.StoreConfig{
Url: *indexesUrl,
URL: *indexesUrl,
Log: L.WithField("subservice", "index"),
Metrics: registry,
})
Expand Down
11 changes: 7 additions & 4 deletions exp/lighthorizon/services/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ func TestAccountTransactionCursorManager(t *testing.T) {
accountId := keypair.MustRandom().Address()

// Create an index and fill it with some checkpoint details.
store, err := index.NewFileStore(index.StoreConfig{
Url: "file://" + t.TempDir(),
Workers: 4,
})
tmp := t.TempDir()
store, err := index.NewFileStore(tmp,
index.StoreConfig{
URL: "file://" + tmp,
Workers: 4,
},
)
require.NoError(t, err)

for _, checkpoint := range []uint32{1, 5, 10} {
Expand Down

0 comments on commit 7d26a32

Please sign in to comment.