From e860ddb66e7feeced8cdab5063b423db2f375e8d Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 8 Sep 2022 11:56:00 -0700 Subject: [PATCH] Actually add & use parallel downloads, preparing checkpoint chunks --- exp/lighthorizon/ingester/ingester.go | 47 +----- exp/lighthorizon/ingester/main.go | 56 +++++++ .../ingester/parallel_ingester.go | 150 ++++++++++++++++++ exp/lighthorizon/main.go | 2 + exp/lighthorizon/services/main.go | 30 +++- support/collections/set/set_test.go | 6 +- support/storage/ondisk_cache.go | 18 ++- 7 files changed, 253 insertions(+), 56 deletions(-) create mode 100644 exp/lighthorizon/ingester/parallel_ingester.go diff --git a/exp/lighthorizon/ingester/ingester.go b/exp/lighthorizon/ingester/ingester.go index 41132b434f..21bb400b50 100644 --- a/exp/lighthorizon/ingester/ingester.go +++ b/exp/lighthorizon/ingester/ingester.go @@ -2,14 +2,9 @@ package ingester import ( "context" - "fmt" - "net/url" "github.com/stellar/go/ingest" "github.com/stellar/go/metaarchive" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/log" - "github.com/stellar/go/support/storage" "github.com/stellar/go/historyarchive" "github.com/stellar/go/xdr" @@ -30,43 +25,8 @@ type liteIngester struct { networkPassphrase string } -func NewIngester(config IngesterConfig) (Ingester, error) { - if config.CacheSize <= 0 { - return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize) - } - - // Now, set up a simple filesystem-like access to the backend and wrap it in - // a local on-disk LRU cache if we can. - source, err := historyarchive.ConnectBackend( - config.SourceUrl, - storage.ConnectOptions{Context: context.Background()}, - ) - if err != nil { - return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl) - } - - parsed, err := url.Parse(config.SourceUrl) - if err != nil { - return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl) - } - - if parsed.Scheme != "file" { // otherwise, already on-disk - cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize)) - - if errr != nil { // non-fatal: warn but continue w/o cache - log.WithField("path", config.CacheDir).WithError(errr). - Warnf("Failed to create cached ledger backend") - } else { - log.WithField("path", config.CacheDir). - Infof("On-disk cache configured") - source = cache - } - } - - return &liteIngester{ - MetaArchive: metaarchive.NewMetaArchive(source), - networkPassphrase: config.NetworkPassphrase, - }, nil +func (i *liteIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error { + return nil } func (i *liteIngester) NewLedgerTransactionReader( @@ -75,9 +35,6 @@ func (i *liteIngester) NewLedgerTransactionReader( reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta( i.networkPassphrase, ledgerCloseMeta.MustV0()) - if err != nil { - return nil, err - } return &liteLedgerTransactionReader{reader}, err } diff --git a/exp/lighthorizon/ingester/main.go b/exp/lighthorizon/ingester/main.go index e43d77e7b1..35787e21ec 100644 --- a/exp/lighthorizon/ingester/main.go +++ b/exp/lighthorizon/ingester/main.go @@ -1,8 +1,16 @@ package ingester import ( + "context" + "fmt" + "net/url" + + "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" "github.com/stellar/go/metaarchive" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/storage" "github.com/stellar/go/xdr" ) @@ -15,6 +23,7 @@ import ( type Ingester interface { metaarchive.MetaArchive + PrepareRange(ctx context.Context, r historyarchive.Range) error NewLedgerTransactionReader( ledgerCloseMeta xdr.SerializedLedgerCloseMeta, ) (LedgerTransactionReader, error) @@ -29,3 +38,50 @@ type LedgerTransaction struct { type LedgerTransactionReader interface { Read() (LedgerTransaction, error) } + +func NewIngester(config IngesterConfig) (Ingester, error) { + if config.CacheSize <= 0 { + return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize) + } + + // Now, set up a simple filesystem-like access to the backend and wrap it in + // a local on-disk LRU cache if we can. + source, err := historyarchive.ConnectBackend( + config.SourceUrl, + storage.ConnectOptions{Context: context.Background()}, + ) + if err != nil { + return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl) + } + + parsed, err := url.Parse(config.SourceUrl) + if err != nil { + return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl) + } + + if parsed.Scheme != "file" { // otherwise, already on-disk + cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize)) + + if errr != nil { // non-fatal: warn but continue w/o cache + log.WithField("path", config.CacheDir).WithError(errr). + Warnf("Failed to create cached ledger backend") + } else { + log.WithField("path", config.CacheDir). + Infof("On-disk cache configured") + source = cache + } + } + + if config.ParallelDownloads > 1 { + log.Infof("Enabling parallel ledger fetches with %d workers", config.ParallelDownloads) + return NewParallelIngester( + source, + config.NetworkPassphrase, + config.ParallelDownloads), nil + } + + return &liteIngester{ + MetaArchive: metaarchive.NewMetaArchive(source), + networkPassphrase: config.NetworkPassphrase, + }, nil +} diff --git a/exp/lighthorizon/ingester/parallel_ingester.go b/exp/lighthorizon/ingester/parallel_ingester.go new file mode 100644 index 0000000000..ec37159896 --- /dev/null +++ b/exp/lighthorizon/ingester/parallel_ingester.go @@ -0,0 +1,150 @@ +package ingester + +import ( + "context" + "sync" + "time" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/metaarchive" + "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/storage" + "github.com/stellar/go/xdr" +) + +type parallelIngester struct { + liteIngester + workerCount uint + + ledgerFeedLock sync.RWMutex + ledgerFeed map[uint32]downloadState + ledgerQueue set.Set[uint32] + + workQueue chan uint32 + signalChan chan error +} + +type downloadState struct { + ledger xdr.SerializedLedgerCloseMeta + err error +} + +// NewParallelIngester creates an ingester on the given `ledgerSource` using the +// given `networkPassphrase` that can download ledgers in parallel via +// `workerCount` workers. +func NewParallelIngester(ledgerSource storage.Storage, networkPassphrase string, workerCount uint) *parallelIngester { + self := ¶llelIngester{ + liteIngester: liteIngester{ + MetaArchive: metaarchive.NewMetaArchive(ledgerSource), + networkPassphrase: networkPassphrase, + }, + ledgerFeedLock: sync.RWMutex{}, + ledgerFeed: make(map[uint32]downloadState, 64), + ledgerQueue: set.NewSet[uint32](64), + workerCount: workerCount, + workQueue: make(chan uint32, workerCount), + signalChan: make(chan error), + } + + // These are the workers that download & store ledgers in memory. + for j := uint(0); j < workerCount; j++ { + go func() { + for ledgerSeq := range self.workQueue { + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + txmeta, err := self.liteIngester.GetLedger(ctx, ledgerSeq) + cancel() + + log.WithField("duration", time.Since(start)).WithError(err). + Debugf("Downloaded ledger %d", ledgerSeq) + + self.ledgerFeedLock.Lock() + self.ledgerFeed[ledgerSeq] = downloadState{txmeta, err} + self.ledgerFeedLock.Unlock() + self.signalChan <- err + } + }() + } + + return self +} + +// PrepareRange will create a set of parallel worker routines that feed ledgers +// to a channel in the order they're downloaded and store the results in an +// array. You can use this to download ledgers in parallel to fetching them +// individually via `GetLedger()`. +// +// Note: The passed in range `r` is inclusive of the boundaries. +func (i *parallelIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error { + // The taskmaster adds ledger sequence numbers to the work queue. + go func() { + start := time.Now() + defer func() { + log.WithField("duration", time.Since(start)). + WithField("workers", i.workerCount). + WithError(ctx.Err()). + Infof("Download of ledger range: [%d, %d] (%d ledgers) complete", + r.Low, r.High, r.Size()) + }() + + for seq := r.Low; seq <= r.High; seq++ { + if ctx.Err() != nil { + log.Warnf("Cancelling remaining downloads ([%d, %d]): %v", + seq, r.High, ctx.Err()) + break + } + + // Adding this to the "set of ledgers being downloaded in parallel" + // means that if a GetLedger() request happens in this range but + // outside of the realm of processing, it can be prioritized by the + // normal, direct download. + i.ledgerQueue.Add(seq) + + i.workQueue <- seq // blocks until there's an available worker + + // Similarly to before, removing it immediately after a worker takes + // it means that if someone requests the ledger again, it can be + // direct-downloaded (or pulled from the cache) without waiting on + // the busy workers. + i.ledgerQueue.Remove(seq) + } + }() + + return nil +} + +func (i *parallelIngester) GetLedger( + ctx context.Context, ledgerSeq uint32, +) (xdr.SerializedLedgerCloseMeta, error) { + // If the requested ledger is out of the queued up ranges, we can fall back + // to the default non-parallel download method. + if !i.ledgerQueue.Contains(ledgerSeq) { + return i.liteIngester.GetLedger(ctx, ledgerSeq) + } + + // If the ledger isn't available yet, wait for the download worker. + var err error + for err == nil { + i.ledgerFeedLock.RLock() + if state, ok := i.ledgerFeed[ledgerSeq]; ok { + i.ledgerFeedLock.RUnlock() // re-lock as a writer + i.ledgerFeedLock.Lock() + delete(i.ledgerFeed, ledgerSeq) + i.ledgerFeedLock.Unlock() + + return state.ledger, state.err + } + i.ledgerFeedLock.RUnlock() + + select { + case err = <-i.signalChan: // blocks until another ledger downloads + case <-ctx.Done(): + err = ctx.Err() + } + } + + return xdr.SerializedLedgerCloseMeta{}, err +} + +var _ Ingester = (*parallelIngester)(nil) // ensure conformity to the interface diff --git a/exp/lighthorizon/main.go b/exp/lighthorizon/main.go index 16925f91cf..37818be835 100644 --- a/exp/lighthorizon/main.go +++ b/exp/lighthorizon/main.go @@ -29,6 +29,7 @@ if left empty, uses a temporary directory`) "number of ledgers to store in the cache") logLevelParam := flag.String("log-level", "info", "logging level, info, debug, warn, error, panic, fatal, trace, default is info") + downloadCount := flag.Uint("parallel-downloads", 1, "") flag.Parse() L := log.WithField("service", "horizon-lite") @@ -55,6 +56,7 @@ if left empty, uses a temporary directory`) NetworkPassphrase: *networkPassphrase, CacheDir: *cacheDir, CacheSize: *cacheSize, + ParallelDownloads: *downloadCount, }) if err != nil { panic(err) diff --git a/exp/lighthorizon/services/main.go b/exp/lighthorizon/services/main.go index abc262cf2c..d65e5420fd 100644 --- a/exp/lighthorizon/services/main.go +++ b/exp/lighthorizon/services/main.go @@ -116,13 +116,28 @@ func searchAccountTransactions(ctx context.Context, Infof("Fulfilled request for account %s at cursor %d", accountId, cursor) }() + checkpointMgr := historyarchive.NewCheckpointManager(0) + for { + if checkpointMgr.IsCheckpoint(nextLedger) { + r := historyarchive.Range{ + Low: nextLedger, + High: checkpointMgr.NextCheckpoint(nextLedger + 1), + } + log.Infof("prepare range %d, %d", r.Low, r.High) + if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil { + log.Errorf("failed to prepare ledger range [%d, %d]: %v", + r.Low, r.High, innerErr) + } + } + start := time.Now() - ledger, ledgerErr := config.Ingester.GetLedger(ctx, nextLedger) - if ledgerErr != nil { - return errors.Wrapf(ledgerErr, - "ledger export state is out of sync at ledger %d", nextLedger) + ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger) + if innerErr != nil { + return errors.Wrapf(innerErr, + "failed to retrieve ledger %d from archive", nextLedger) } + count++ thisFetchDuration := time.Since(start) if thisFetchDuration > slowFetchDurationThreshold { log.WithField("duration", thisFetchDuration). @@ -131,9 +146,10 @@ func searchAccountTransactions(ctx context.Context, fetchDuration += thisFetchDuration start = time.Now() - reader, readerErr := config.Ingester.NewLedgerTransactionReader(ledger) - if readerErr != nil { - return readerErr + reader, innerErr := config.Ingester.NewLedgerTransactionReader(ledger) + if innerErr != nil { + return errors.Wrapf(innerErr, + "failed to read ledger %d", nextLedger) } for { diff --git a/support/collections/set/set_test.go b/support/collections/set/set_test.go index 4618de0471..ee36666ac4 100644 --- a/support/collections/set/set_test.go +++ b/support/collections/set/set_test.go @@ -7,10 +7,14 @@ import ( ) func TestSet(t *testing.T) { - s := Set[string]{} + s := NewSet[string](10) s.Add("sanity") require.True(t, s.Contains("sanity")) require.False(t, s.Contains("check")) + + s.AddSlice([]string{"a", "b", "c"}) + require.True(t, s.Contains("b")) + require.Equal(t, []string{"sanity", "a", "b", "c"}, s.Slice()) } func TestSafeSet(t *testing.T) { diff --git a/support/storage/ondisk_cache.go b/support/storage/ondisk_cache.go index 2f6c46a8f0..1804ebe9a3 100644 --- a/support/storage/ondisk_cache.go +++ b/support/storage/ondisk_cache.go @@ -9,7 +9,9 @@ import ( "github.com/stellar/go/support/log" ) -// OnDiskCache fronts another storage with a local filesystem cache +// OnDiskCache fronts another storage with a local filesystem cache. Its +// thread-safe, meaning you can be actively caching a file and retrieve it at +// the same time without corruption, because retrieval will wait for the fetch. type OnDiskCache struct { Storage dir string @@ -239,8 +241,18 @@ func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.R return trc{ Reader: io.TeeReader(r, w), close: func() error { - r.Close() - w.Close() + // Always run all closers, but return the first error + if err := r.Close(); err != nil { + w.Close() + onClose() + return err + } + + if err := w.Close(); err != nil { + onClose() + return err + } + return onClose() }, }