From 36cdf9ef8a1fe481b8e2ee2e0de8b0e388aac269 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 8 Sep 2022 11:56:00 -0700 Subject: [PATCH] Actually use parallel downloader when appropriate, preparing checkpoint chunks --- exp/lighthorizon/ingester/ingester.go | 47 +--------------------- exp/lighthorizon/ingester/main.go | 56 +++++++++++++++++++++++++++ exp/lighthorizon/main.go | 2 + exp/lighthorizon/services/main.go | 30 ++++++++++---- support/collections/set/set_test.go | 6 ++- support/storage/ondisk_cache.go | 18 +++++++-- 6 files changed, 103 insertions(+), 56 deletions(-) diff --git a/exp/lighthorizon/ingester/ingester.go b/exp/lighthorizon/ingester/ingester.go index 41132b434f..21bb400b50 100644 --- a/exp/lighthorizon/ingester/ingester.go +++ b/exp/lighthorizon/ingester/ingester.go @@ -2,14 +2,9 @@ package ingester import ( "context" - "fmt" - "net/url" "github.com/stellar/go/ingest" "github.com/stellar/go/metaarchive" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/log" - "github.com/stellar/go/support/storage" "github.com/stellar/go/historyarchive" "github.com/stellar/go/xdr" @@ -30,43 +25,8 @@ type liteIngester struct { networkPassphrase string } -func NewIngester(config IngesterConfig) (Ingester, error) { - if config.CacheSize <= 0 { - return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize) - } - - // Now, set up a simple filesystem-like access to the backend and wrap it in - // a local on-disk LRU cache if we can. - source, err := historyarchive.ConnectBackend( - config.SourceUrl, - storage.ConnectOptions{Context: context.Background()}, - ) - if err != nil { - return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl) - } - - parsed, err := url.Parse(config.SourceUrl) - if err != nil { - return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl) - } - - if parsed.Scheme != "file" { // otherwise, already on-disk - cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize)) - - if errr != nil { // non-fatal: warn but continue w/o cache - log.WithField("path", config.CacheDir).WithError(errr). - Warnf("Failed to create cached ledger backend") - } else { - log.WithField("path", config.CacheDir). - Infof("On-disk cache configured") - source = cache - } - } - - return &liteIngester{ - MetaArchive: metaarchive.NewMetaArchive(source), - networkPassphrase: config.NetworkPassphrase, - }, nil +func (i *liteIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error { + return nil } func (i *liteIngester) NewLedgerTransactionReader( @@ -75,9 +35,6 @@ func (i *liteIngester) NewLedgerTransactionReader( reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta( i.networkPassphrase, ledgerCloseMeta.MustV0()) - if err != nil { - return nil, err - } return &liteLedgerTransactionReader{reader}, err } diff --git a/exp/lighthorizon/ingester/main.go b/exp/lighthorizon/ingester/main.go index e43d77e7b1..35787e21ec 100644 --- a/exp/lighthorizon/ingester/main.go +++ b/exp/lighthorizon/ingester/main.go @@ -1,8 +1,16 @@ package ingester import ( + "context" + "fmt" + "net/url" + + "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" "github.com/stellar/go/metaarchive" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/storage" "github.com/stellar/go/xdr" ) @@ -15,6 +23,7 @@ import ( type Ingester interface { metaarchive.MetaArchive + PrepareRange(ctx context.Context, r historyarchive.Range) error NewLedgerTransactionReader( ledgerCloseMeta xdr.SerializedLedgerCloseMeta, ) (LedgerTransactionReader, error) @@ -29,3 +38,50 @@ type LedgerTransaction struct { type LedgerTransactionReader interface { Read() (LedgerTransaction, error) } + +func NewIngester(config IngesterConfig) (Ingester, error) { + if config.CacheSize <= 0 { + return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize) + } + + // Now, set up a simple filesystem-like access to the backend and wrap it in + // a local on-disk LRU cache if we can. + source, err := historyarchive.ConnectBackend( + config.SourceUrl, + storage.ConnectOptions{Context: context.Background()}, + ) + if err != nil { + return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl) + } + + parsed, err := url.Parse(config.SourceUrl) + if err != nil { + return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl) + } + + if parsed.Scheme != "file" { // otherwise, already on-disk + cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize)) + + if errr != nil { // non-fatal: warn but continue w/o cache + log.WithField("path", config.CacheDir).WithError(errr). + Warnf("Failed to create cached ledger backend") + } else { + log.WithField("path", config.CacheDir). + Infof("On-disk cache configured") + source = cache + } + } + + if config.ParallelDownloads > 1 { + log.Infof("Enabling parallel ledger fetches with %d workers", config.ParallelDownloads) + return NewParallelIngester( + source, + config.NetworkPassphrase, + config.ParallelDownloads), nil + } + + return &liteIngester{ + MetaArchive: metaarchive.NewMetaArchive(source), + networkPassphrase: config.NetworkPassphrase, + }, nil +} diff --git a/exp/lighthorizon/main.go b/exp/lighthorizon/main.go index 16925f91cf..37818be835 100644 --- a/exp/lighthorizon/main.go +++ b/exp/lighthorizon/main.go @@ -29,6 +29,7 @@ if left empty, uses a temporary directory`) "number of ledgers to store in the cache") logLevelParam := flag.String("log-level", "info", "logging level, info, debug, warn, error, panic, fatal, trace, default is info") + downloadCount := flag.Uint("parallel-downloads", 1, "") flag.Parse() L := log.WithField("service", "horizon-lite") @@ -55,6 +56,7 @@ if left empty, uses a temporary directory`) NetworkPassphrase: *networkPassphrase, CacheDir: *cacheDir, CacheSize: *cacheSize, + ParallelDownloads: *downloadCount, }) if err != nil { panic(err) diff --git a/exp/lighthorizon/services/main.go b/exp/lighthorizon/services/main.go index abc262cf2c..d65e5420fd 100644 --- a/exp/lighthorizon/services/main.go +++ b/exp/lighthorizon/services/main.go @@ -116,13 +116,28 @@ func searchAccountTransactions(ctx context.Context, Infof("Fulfilled request for account %s at cursor %d", accountId, cursor) }() + checkpointMgr := historyarchive.NewCheckpointManager(0) + for { + if checkpointMgr.IsCheckpoint(nextLedger) { + r := historyarchive.Range{ + Low: nextLedger, + High: checkpointMgr.NextCheckpoint(nextLedger + 1), + } + log.Infof("prepare range %d, %d", r.Low, r.High) + if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil { + log.Errorf("failed to prepare ledger range [%d, %d]: %v", + r.Low, r.High, innerErr) + } + } + start := time.Now() - ledger, ledgerErr := config.Ingester.GetLedger(ctx, nextLedger) - if ledgerErr != nil { - return errors.Wrapf(ledgerErr, - "ledger export state is out of sync at ledger %d", nextLedger) + ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger) + if innerErr != nil { + return errors.Wrapf(innerErr, + "failed to retrieve ledger %d from archive", nextLedger) } + count++ thisFetchDuration := time.Since(start) if thisFetchDuration > slowFetchDurationThreshold { log.WithField("duration", thisFetchDuration). @@ -131,9 +146,10 @@ func searchAccountTransactions(ctx context.Context, fetchDuration += thisFetchDuration start = time.Now() - reader, readerErr := config.Ingester.NewLedgerTransactionReader(ledger) - if readerErr != nil { - return readerErr + reader, innerErr := config.Ingester.NewLedgerTransactionReader(ledger) + if innerErr != nil { + return errors.Wrapf(innerErr, + "failed to read ledger %d", nextLedger) } for { diff --git a/support/collections/set/set_test.go b/support/collections/set/set_test.go index 4618de0471..ee36666ac4 100644 --- a/support/collections/set/set_test.go +++ b/support/collections/set/set_test.go @@ -7,10 +7,14 @@ import ( ) func TestSet(t *testing.T) { - s := Set[string]{} + s := NewSet[string](10) s.Add("sanity") require.True(t, s.Contains("sanity")) require.False(t, s.Contains("check")) + + s.AddSlice([]string{"a", "b", "c"}) + require.True(t, s.Contains("b")) + require.Equal(t, []string{"sanity", "a", "b", "c"}, s.Slice()) } func TestSafeSet(t *testing.T) { diff --git a/support/storage/ondisk_cache.go b/support/storage/ondisk_cache.go index 2f6c46a8f0..1804ebe9a3 100644 --- a/support/storage/ondisk_cache.go +++ b/support/storage/ondisk_cache.go @@ -9,7 +9,9 @@ import ( "github.com/stellar/go/support/log" ) -// OnDiskCache fronts another storage with a local filesystem cache +// OnDiskCache fronts another storage with a local filesystem cache. Its +// thread-safe, meaning you can be actively caching a file and retrieve it at +// the same time without corruption, because retrieval will wait for the fetch. type OnDiskCache struct { Storage dir string @@ -239,8 +241,18 @@ func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.R return trc{ Reader: io.TeeReader(r, w), close: func() error { - r.Close() - w.Close() + // Always run all closers, but return the first error + if err := r.Close(); err != nil { + w.Close() + onClose() + return err + } + + if err := w.Close(); err != nil { + onClose() + return err + } + return onClose() }, }