Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighhorizon: Make indexing s3 bucket configurable #4507

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions exp/lighthorizon/index/backend/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ import (
types "github.com/stellar/go/exp/lighthorizon/index/types"
)

const BUCKET = "horizon-index"

type S3Backend struct {
s3Session *session.Session
downloader *s3manager.Downloader
uploader *s3manager.Uploader
parallel uint32
prefix string
pathPrefix string
bucket string
}

func NewS3Backend(awsConfig *aws.Config, prefix string, parallel uint32) (*S3Backend, error) {
func NewS3Backend(awsConfig *aws.Config, bucket string, pathPrefix string, parallel uint32) (*S3Backend, error) {
s3Session, err := session.NewSession(awsConfig)
if err != nil {
return nil, err
Expand All @@ -40,7 +39,8 @@ func NewS3Backend(awsConfig *aws.Config, prefix string, parallel uint32) (*S3Bac
downloader: s3manager.NewDownloader(s3Session),
uploader: s3manager.NewUploader(s3Session),
parallel: parallel,
prefix: prefix,
pathPrefix: pathPrefix,
bucket: bucket,
}, nil
}

Expand All @@ -52,10 +52,10 @@ func (s *S3Backend) FlushAccounts(accounts []string) error {
return err
}

path := filepath.Join(s.prefix, "accounts")
path := filepath.Join(s.pathPrefix, "accounts")

_, err = s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
Body: &buf,
})
Expand All @@ -81,7 +81,7 @@ func (s *S3Backend) writeBatch(b *batch) error {
path := s.path(b.account)

_, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
Body: &buf,
})
Expand All @@ -97,7 +97,7 @@ func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error
var buf bytes.Buffer
for key, index := range indexes {
buf.Reset()
path := filepath.Join(s.prefix, "tx", key)
path := filepath.Join(s.pathPrefix, "tx", key)

zw := gzip.NewWriter(&buf)
if _, err := index.WriteTo(zw); err != nil {
Expand All @@ -111,7 +111,7 @@ func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error
}

_, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
Body: &buf,
})
Expand All @@ -127,9 +127,9 @@ func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error
func (s *S3Backend) ReadAccounts() ([]string, error) {
log.Debugf("Downloading accounts list")
b := &aws.WriteAtBuffer{}
path := filepath.Join(s.prefix, "accounts")
path := filepath.Join(s.pathPrefix, "accounts")
n, err := s.downloader.Download(b, &s3.GetObjectInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
})
if err != nil {
Expand All @@ -147,7 +147,7 @@ func (s *S3Backend) ReadAccounts() ([]string, error) {
}

func (s *S3Backend) path(account string) string {
return filepath.Join(s.prefix, account[:10], account)
return filepath.Join(s.pathPrefix, account[:10], account)
}

func (s *S3Backend) Read(account string) (types.NamedIndices, error) {
Expand All @@ -159,7 +159,7 @@ func (s *S3Backend) Read(account string) (types.NamedIndices, error) {
path := s.path(account)
var n int64
n, err = s.downloader.Download(b, &s3.GetObjectInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
})
if err != nil {
Expand Down Expand Up @@ -189,9 +189,9 @@ func (s *S3Backend) ReadTransactions(prefix string) (*types.TrieIndex, error) {
// Check if index exists in S3
log.Debugf("Downloading index: %s", prefix)
b := &aws.WriteAtBuffer{}
path := filepath.Join(s.prefix, "tx", prefix)
path := filepath.Join(s.pathPrefix, "tx", prefix)
n, err := s.downloader.Download(b, &s3.GetObjectInput{
Bucket: aws.String(BUCKET),
Bucket: aws.String(s.bucket),
Key: aws.String(path),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func BuildIndices(
L := log.Ctx(ctx).WithField("service", "builder")

indexStore, err := ConnectWithConfig(StoreConfig{
Url: targetUrl,
URL: targetUrl,
Workers: uint32(workerCount),
Log: L.WithField("subservice", "index"),
})
Expand Down
21 changes: 9 additions & 12 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@ import (
"path/filepath"

"github.com/aws/aws-sdk-go/aws"

backend "github.com/stellar/go/exp/lighthorizon/index/backend"
)

func Connect(backendUrl string) (Store, error) {
return ConnectWithConfig(StoreConfig{Url: backendUrl})
return ConnectWithConfig(StoreConfig{URL: backendUrl})
}

func ConnectWithConfig(config StoreConfig) (Store, error) {
if config.Workers <= 0 {
config.Workers = 1
}

parsed, err := url.Parse(config.Url)
parsed, err := url.Parse(config.URL)
if err != nil {
return nil, err
}
Expand All @@ -31,29 +30,27 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
awsConfig.Region = aws.String(region)
}

config.Url = parsed.Path
return NewS3Store(awsConfig, config)
return NewS3Store(awsConfig, parsed.Host, parsed.Path, config)

case "file":
config.Url = filepath.Join(parsed.Host, parsed.Path)
return NewFileStore(config)
return NewFileStore(filepath.Join(parsed.Host, parsed.Path), config)

default:
return nil, fmt.Errorf("unknown URL scheme: '%s' (from %s)",
parsed.Scheme, config.Url)
parsed.Scheme, config.URL)
}
}

func NewFileStore(config StoreConfig) (Store, error) {
backend, err := backend.NewFileBackend(config.Url, config.Workers)
func NewFileStore(prefix string, config StoreConfig) (Store, error) {
backend, err := backend.NewFileBackend(prefix, config.Workers)
if err != nil {
return nil, err
}
return NewStore(backend, config)
}

func NewS3Store(awsConfig *aws.Config, indexConfig StoreConfig) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, indexConfig.Url, indexConfig.Workers)
func NewS3Store(awsConfig *aws.Config, bucket string, prefix string, indexConfig StoreConfig) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, bucket, prefix, indexConfig.Workers)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Store interface {

type StoreConfig struct {
// init time config
Url string
URL string
Workers uint32

// runtime config
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ if left empty, uses a temporary directory`)

registry := prometheus.NewRegistry()
indexStore, err := index.ConnectWithConfig(index.StoreConfig{
Url: *indexesUrl,
URL: *indexesUrl,
Log: L.WithField("subservice", "index"),
Metrics: registry,
})
Expand Down
11 changes: 7 additions & 4 deletions exp/lighthorizon/services/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ func TestAccountTransactionCursorManager(t *testing.T) {
accountId := keypair.MustRandom().Address()

// Create an index and fill it with some checkpoint details.
store, err := index.NewFileStore(index.StoreConfig{
Url: "file://" + t.TempDir(),
Workers: 4,
})
tmp := t.TempDir()
store, err := index.NewFileStore(tmp,
index.StoreConfig{
URL: "file://" + tmp,
Workers: 4,
},
)
require.NoError(t, err)

for _, checkpoint := range []uint32{1, 5, 10} {
Expand Down