Skip to content

Commit

Permalink
exp/lighthorizon: Restructure index package into sensible sub-packages (
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic authored Jun 10, 2022
1 parent 6b47337 commit 33cbd42
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 70 deletions.
14 changes: 14 additions & 0 deletions exp/lighthorizon/index/backend/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package index

import types "github.com/stellar/go/exp/lighthorizon/index/types"

// TODO: Use a more standardized filesystem-style backend, so we can re-use
// code
type Backend interface {
Flush(map[string]types.NamedIndices) error
FlushAccounts([]string) error
Read(account string) (types.NamedIndices, error)
ReadAccounts() ([]string, error)
FlushTransactions(map[string]*types.TrieIndex) error
ReadTransactions(prefix string) (*types.TrieIndex, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"

types "github.com/stellar/go/exp/lighthorizon/index/types"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
)
Expand All @@ -17,22 +18,14 @@ type FileBackend struct {
parallel uint32
}

func NewFileStore(dir string, parallel uint32) (Store, error) {
backend, err := NewFileBackend(dir, parallel)
if err != nil {
return nil, err
}
return NewStore(backend)
}

func NewFileBackend(dir string, parallel uint32) (*FileBackend, error) {
return &FileBackend{
dir: dir,
parallel: parallel,
}, nil
}

func (s *FileBackend) Flush(indexes map[string]map[string]*CheckpointIndex) error {
func (s *FileBackend) Flush(indexes map[string]types.NamedIndices) error {
return parallelFlush(s.parallel, indexes, s.writeBatch)
}

Expand Down Expand Up @@ -88,7 +81,7 @@ func (s *FileBackend) writeBatch(b *batch) error {
return nil
}

func (s *FileBackend) FlushTransactions(indexes map[string]*TrieIndex) error {
func (s *FileBackend) FlushTransactions(indexes map[string]*types.TrieIndex) error {
// TODO: Parallelize this
for key, index := range indexes {
path := filepath.Join(s.dir, "tx", key)
Expand Down Expand Up @@ -125,7 +118,7 @@ func (s *FileBackend) FlushTransactions(indexes map[string]*TrieIndex) error {
return nil
}

func (s *FileBackend) Read(account string) (map[string]*CheckpointIndex, error) {
func (s *FileBackend) Read(account string) (types.NamedIndices, error) {
log.Debugf("Opening index: %s", account)
b, err := os.Open(filepath.Join(s.dir, account[:3], account))
if err != nil {
Expand Down Expand Up @@ -185,7 +178,7 @@ func (s *FileBackend) ReadAccounts() ([]string, error) {
return accounts, nil
}

func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) {
func (s *FileBackend) ReadTransactions(prefix string) (*types.TrieIndex, error) {
log.Debugf("Opening index: %s", prefix)
b, err := os.Open(filepath.Join(s.dir, "tx", prefix))
if err != nil {
Expand All @@ -198,7 +191,7 @@ func (s *FileBackend) ReadTransactions(prefix string) (*TrieIndex, error) {
return nil, os.ErrNotExist
}
defer zr.Close()
var index TrieIndex
var index types.TrieIndex
_, err = index.ReadFrom(zr)
if err != nil {
log.Errorf("Unable to parse %s: %v", prefix, err)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"compress/gzip"
"io"

types "github.com/stellar/go/exp/lighthorizon/index/types"
)

func writeGzippedTo(w io.Writer, indexes map[string]*CheckpointIndex) (int64, error) {
func writeGzippedTo(w io.Writer, indexes types.NamedIndices) (int64, error) {
zw := gzip.NewWriter(w)

var n int64
Expand All @@ -28,12 +30,12 @@ func writeGzippedTo(w io.Writer, indexes map[string]*CheckpointIndex) (int64, er
return n, nil
}

func readGzippedFrom(r io.Reader) (map[string]*CheckpointIndex, int64, error) {
func readGzippedFrom(r io.Reader) (types.NamedIndices, int64, error) {
zr, err := gzip.NewReader(r)
if err != nil {
return nil, 0, err
}
indexes := map[string]*CheckpointIndex{}
indexes := types.NamedIndices{}
var buf bytes.Buffer
var n int64
for {
Expand All @@ -45,7 +47,7 @@ func readGzippedFrom(r io.Reader) (map[string]*CheckpointIndex, int64, error) {
return nil, n, err
}

ind, err := NewCheckpointIndexFromBytes(buf.Bytes())
ind, err := types.NewCheckpointIndexFromBytes(buf.Bytes())
if err != nil {
return nil, n, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"bytes"
"testing"

types "github.com/stellar/go/exp/lighthorizon/index/types"
"github.com/stretchr/testify/require"
)

func TestGzipWriteReadRoundtrip(t *testing.T) {
indexes := map[string]*CheckpointIndex{}
index := &CheckpointIndex{}
indexes := types.NamedIndices{}
index := &types.CheckpointIndex{}
index.SetActive(5)
indexes["A"] = index

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ import (
"sync/atomic"
"time"

types "github.com/stellar/go/exp/lighthorizon/index/types"
"github.com/stellar/go/support/log"
)

type batch struct {
account string
indexes map[string]*CheckpointIndex
indexes types.NamedIndices
}

type flushBatch func(b *batch) error

func parallelFlush(parallel uint32, allIndexes map[string]map[string]*CheckpointIndex, f flushBatch) error {
func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f flushBatch) error {
var wg sync.WaitGroup

batches := make(chan *batch, parallel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"

types "github.com/stellar/go/exp/lighthorizon/index/types"
)

const BUCKET = "horizon-index"
Expand All @@ -27,14 +29,6 @@ type S3Backend struct {
prefix string
}

func NewS3Store(awsConfig *aws.Config, prefix string, parallel uint32) (Store, error) {
backend, err := NewS3Backend(awsConfig, prefix, parallel)
if err != nil {
return nil, err
}
return NewStore(backend)
}

func NewS3Backend(awsConfig *aws.Config, prefix string, parallel uint32) (*S3Backend, error) {
s3Session, err := session.NewSession(awsConfig)
if err != nil {
Expand Down Expand Up @@ -72,7 +66,7 @@ func (s *S3Backend) FlushAccounts(accounts []string) error {
return nil
}

func (s *S3Backend) Flush(indexes map[string]map[string]*CheckpointIndex) error {
func (s *S3Backend) Flush(indexes map[string]types.NamedIndices) error {
return parallelFlush(s.parallel, indexes, s.writeBatch)
}

Expand All @@ -98,7 +92,7 @@ func (s *S3Backend) writeBatch(b *batch) error {
return nil
}

func (s *S3Backend) FlushTransactions(indexes map[string]*TrieIndex) error {
func (s *S3Backend) FlushTransactions(indexes map[string]*types.TrieIndex) error {
// TODO: Parallelize this
var buf bytes.Buffer
for key, index := range indexes {
Expand Down Expand Up @@ -156,7 +150,7 @@ func (s *S3Backend) path(account string) string {
return filepath.Join(s.prefix, account[:10], account)
}

func (s *S3Backend) Read(account string) (map[string]*CheckpointIndex, error) {
func (s *S3Backend) Read(account string) (types.NamedIndices, error) {
// Check if index exists in S3
log.Debugf("Downloading index: %s", account)
var err error
Expand All @@ -179,7 +173,7 @@ func (s *S3Backend) Read(account string) (map[string]*CheckpointIndex, error) {
if n == 0 {
return nil, os.ErrNotExist
}
var indexes map[string]*CheckpointIndex
var indexes map[string]*types.CheckpointIndex
indexes, _, err = readGzippedFrom(bytes.NewReader(b.Bytes()))
if err != nil {
log.Errorf("Unable to parse %s: %v", account, err)
Expand All @@ -191,7 +185,7 @@ func (s *S3Backend) Read(account string) (map[string]*CheckpointIndex, error) {
return nil, err
}

func (s *S3Backend) ReadTransactions(prefix string) (*TrieIndex, error) {
func (s *S3Backend) ReadTransactions(prefix string) (*types.TrieIndex, error) {
// Check if index exists in S3
log.Debugf("Downloading index: %s", prefix)
b := &aws.WriteAtBuffer{}
Expand All @@ -216,7 +210,7 @@ func (s *S3Backend) ReadTransactions(prefix string) (*TrieIndex, error) {
}
defer zr.Close()

var index TrieIndex
var index types.TrieIndex
_, err = index.ReadFrom(zr)
if err != nil {
log.Errorf("Unable to parse %s: %v", prefix, err)
Expand Down
3 changes: 2 additions & 1 deletion exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"

"github.com/stellar/go/exp/lighthorizon/index"
types "github.com/stellar/go/exp/lighthorizon/index/types"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
)
Expand Down Expand Up @@ -337,7 +338,7 @@ func (cfg *ReduceConfig) shouldProcessTx(txPrefix byte, routineIndex uint32) boo

// For every index that exists in `dest`, finds the corresponding index in
// `source` and merges it into `dest`'s version.
func mergeIndices(dest, source map[string]*index.CheckpointIndex) error {
func mergeIndices(dest, source map[string]*types.CheckpointIndex) error {
for name, index := range dest {
// The source doesn't contain this particular index.
//
Expand Down
18 changes: 18 additions & 0 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"path/filepath"

"github.com/aws/aws-sdk-go/aws"

backend "github.com/stellar/go/exp/lighthorizon/index/backend"
)

func Connect(backendUrl string) (Store, error) {
Expand All @@ -31,3 +33,19 @@ func Connect(backendUrl string) (Store, error) {
parsed.Scheme, backendUrl)
}
}

func NewFileStore(dir string, parallel uint32) (Store, error) {
backend, err := backend.NewFileBackend(dir, parallel)
if err != nil {
return nil, err
}
return NewStore(backend)
}

func NewS3Store(awsConfig *aws.Config, prefix string, parallel uint32) (Store, error) {
backend, err := backend.NewS3Backend(awsConfig, prefix, parallel)
if err != nil {
return nil, err
}
return NewStore(backend)
}
Loading

0 comments on commit 33cbd42

Please sign in to comment.