Skip to content

Commit

Permalink
Actually use parallel downloader when appropriate, preparing checkpoi…
Browse files Browse the repository at this point in the history
…nt chunks
  • Loading branch information
Shaptic committed Sep 8, 2022
1 parent 9ccb85a commit 36cdf9e
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 56 deletions.
47 changes: 2 additions & 45 deletions exp/lighthorizon/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@ package ingester

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/ingest"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/xdr"
Expand All @@ -30,43 +25,8 @@ type liteIngester struct {
networkPassphrase string
}

func NewIngester(config IngesterConfig) (Ingester, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
storage.ConnectOptions{Context: context.Background()},
)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl)
}

parsed, err := url.Parse(config.SourceUrl)
if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

if parsed.Scheme != "file" { // otherwise, already on-disk
cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize))

if errr != nil { // non-fatal: warn but continue w/o cache
log.WithField("path", config.CacheDir).WithError(errr).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

return &liteIngester{
MetaArchive: metaarchive.NewMetaArchive(source),
networkPassphrase: config.NetworkPassphrase,
}, nil
func (i *liteIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error {
return nil
}

func (i *liteIngester) NewLedgerTransactionReader(
Expand All @@ -75,9 +35,6 @@ func (i *liteIngester) NewLedgerTransactionReader(
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(
i.networkPassphrase,
ledgerCloseMeta.MustV0())
if err != nil {
return nil, err
}

return &liteLedgerTransactionReader{reader}, err
}
Expand Down
56 changes: 56 additions & 0 deletions exp/lighthorizon/ingester/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package ingester

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/metaarchive"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)

Expand All @@ -15,6 +23,7 @@ import (
type Ingester interface {
metaarchive.MetaArchive

PrepareRange(ctx context.Context, r historyarchive.Range) error
NewLedgerTransactionReader(
ledgerCloseMeta xdr.SerializedLedgerCloseMeta,
) (LedgerTransactionReader, error)
Expand All @@ -29,3 +38,50 @@ type LedgerTransaction struct {
type LedgerTransactionReader interface {
Read() (LedgerTransaction, error)
}

func NewIngester(config IngesterConfig) (Ingester, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
storage.ConnectOptions{Context: context.Background()},
)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl)
}

parsed, err := url.Parse(config.SourceUrl)
if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

if parsed.Scheme != "file" { // otherwise, already on-disk
cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize))

if errr != nil { // non-fatal: warn but continue w/o cache
log.WithField("path", config.CacheDir).WithError(errr).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

if config.ParallelDownloads > 1 {
log.Infof("Enabling parallel ledger fetches with %d workers", config.ParallelDownloads)
return NewParallelIngester(
source,
config.NetworkPassphrase,
config.ParallelDownloads), nil
}

return &liteIngester{
MetaArchive: metaarchive.NewMetaArchive(source),
networkPassphrase: config.NetworkPassphrase,
}, nil
}
2 changes: 2 additions & 0 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ if left empty, uses a temporary directory`)
"number of ledgers to store in the cache")
logLevelParam := flag.String("log-level", "info",
"logging level, info, debug, warn, error, panic, fatal, trace, default is info")
downloadCount := flag.Uint("parallel-downloads", 1, "")
flag.Parse()

L := log.WithField("service", "horizon-lite")
Expand All @@ -55,6 +56,7 @@ if left empty, uses a temporary directory`)
NetworkPassphrase: *networkPassphrase,
CacheDir: *cacheDir,
CacheSize: *cacheSize,
ParallelDownloads: *downloadCount,
})
if err != nil {
panic(err)
Expand Down
30 changes: 23 additions & 7 deletions exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,28 @@ func searchAccountTransactions(ctx context.Context,
Infof("Fulfilled request for account %s at cursor %d", accountId, cursor)
}()

checkpointMgr := historyarchive.NewCheckpointManager(0)

for {
if checkpointMgr.IsCheckpoint(nextLedger) {
r := historyarchive.Range{
Low: nextLedger,
High: checkpointMgr.NextCheckpoint(nextLedger + 1),
}
log.Infof("prepare range %d, %d", r.Low, r.High)
if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil {
log.Errorf("failed to prepare ledger range [%d, %d]: %v",
r.Low, r.High, innerErr)
}
}

start := time.Now()
ledger, ledgerErr := config.Ingester.GetLedger(ctx, nextLedger)
if ledgerErr != nil {
return errors.Wrapf(ledgerErr,
"ledger export state is out of sync at ledger %d", nextLedger)
ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger)
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to retrieve ledger %d from archive", nextLedger)
}
count++
thisFetchDuration := time.Since(start)
if thisFetchDuration > slowFetchDurationThreshold {
log.WithField("duration", thisFetchDuration).
Expand All @@ -131,9 +146,10 @@ func searchAccountTransactions(ctx context.Context,
fetchDuration += thisFetchDuration

start = time.Now()
reader, readerErr := config.Ingester.NewLedgerTransactionReader(ledger)
if readerErr != nil {
return readerErr
reader, innerErr := config.Ingester.NewLedgerTransactionReader(ledger)
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to read ledger %d", nextLedger)
}

for {
Expand Down
6 changes: 5 additions & 1 deletion support/collections/set/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
)

func TestSet(t *testing.T) {
s := Set[string]{}
s := NewSet[string](10)
s.Add("sanity")
require.True(t, s.Contains("sanity"))
require.False(t, s.Contains("check"))

s.AddSlice([]string{"a", "b", "c"})
require.True(t, s.Contains("b"))
require.Equal(t, []string{"sanity", "a", "b", "c"}, s.Slice())
}

func TestSafeSet(t *testing.T) {
Expand Down
18 changes: 15 additions & 3 deletions support/storage/ondisk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/stellar/go/support/log"
)

// OnDiskCache fronts another storage with a local filesystem cache
// OnDiskCache fronts another storage with a local filesystem cache. Its
// thread-safe, meaning you can be actively caching a file and retrieve it at
// the same time without corruption, because retrieval will wait for the fetch.
type OnDiskCache struct {
Storage
dir string
Expand Down Expand Up @@ -239,8 +241,18 @@ func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.R
return trc{
Reader: io.TeeReader(r, w),
close: func() error {
r.Close()
w.Close()
// Always run all closers, but return the first error
if err := r.Close(); err != nil {
w.Close()
onClose()
return err
}

if err := w.Close(); err != nil {
onClose()
return err
}

return onClose()
},
}
Expand Down

0 comments on commit 36cdf9e

Please sign in to comment.