From 934faf3ec78fbf7d615675e33ba34f6c12f9db7a Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Thu, 1 Sep 2022 17:36:19 -0700 Subject: [PATCH] wip --- exp/lighthorizon/http.go | 2 +- exp/lighthorizon/ingester/ingester.go | 47 +--------------------- exp/lighthorizon/ingester/main.go | 56 +++++++++++++++++++++++++++ exp/lighthorizon/main.go | 7 ++-- exp/lighthorizon/services/main.go | 30 ++++++++++---- exp/lighthorizon/tools/cache.go | 4 +- exp/lighthorizon/tools/explorer.go | 4 +- exp/lighthorizon/tools/main.go | 6 +-- support/collections/set/set_test.go | 6 ++- support/storage/ondisk_cache.go | 14 +++++-- 10 files changed, 109 insertions(+), 67 deletions(-) diff --git a/exp/lighthorizon/http.go b/exp/lighthorizon/http.go index b08e97ce34..fc9630c567 100644 --- a/exp/lighthorizon/http.go +++ b/exp/lighthorizon/http.go @@ -62,7 +62,7 @@ func lightHorizonHTTPHandler(registry *prometheus.Registry, lightHorizon service r.MethodFunc(http.MethodGet, "/operations", actions.NewOpsByAccountHandler(lightHorizon)) }) - router.MethodFunc(http.MethodGet, "/", actions.ApiDocs()) + router.MethodFunc(http.MethodGet, "/api", actions.ApiDocs()) router.Method(http.MethodGet, "/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) problem.RegisterHost("") diff --git a/exp/lighthorizon/ingester/ingester.go b/exp/lighthorizon/ingester/ingester.go index 41132b434f..21bb400b50 100644 --- a/exp/lighthorizon/ingester/ingester.go +++ b/exp/lighthorizon/ingester/ingester.go @@ -2,14 +2,9 @@ package ingester import ( "context" - "fmt" - "net/url" "github.com/stellar/go/ingest" "github.com/stellar/go/metaarchive" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/support/log" - "github.com/stellar/go/support/storage" "github.com/stellar/go/historyarchive" "github.com/stellar/go/xdr" @@ -30,43 +25,8 @@ type liteIngester struct { networkPassphrase string } -func NewIngester(config IngesterConfig) (Ingester, error) { - if config.CacheSize <= 0 { - return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize) - } - - // Now, set up a simple filesystem-like access to the backend and wrap it in - // a local on-disk LRU cache if we can. - source, err := historyarchive.ConnectBackend( - config.SourceUrl, - storage.ConnectOptions{Context: context.Background()}, - ) - if err != nil { - return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl) - } - - parsed, err := url.Parse(config.SourceUrl) - if err != nil { - return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl) - } - - if parsed.Scheme != "file" { // otherwise, already on-disk - cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize)) - - if errr != nil { // non-fatal: warn but continue w/o cache - log.WithField("path", config.CacheDir).WithError(errr). - Warnf("Failed to create cached ledger backend") - } else { - log.WithField("path", config.CacheDir). - Infof("On-disk cache configured") - source = cache - } - } - - return &liteIngester{ - MetaArchive: metaarchive.NewMetaArchive(source), - networkPassphrase: config.NetworkPassphrase, - }, nil +func (i *liteIngester) PrepareRange(ctx context.Context, r historyarchive.Range) error { + return nil } func (i *liteIngester) NewLedgerTransactionReader( @@ -75,9 +35,6 @@ func (i *liteIngester) NewLedgerTransactionReader( reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta( i.networkPassphrase, ledgerCloseMeta.MustV0()) - if err != nil { - return nil, err - } return &liteLedgerTransactionReader{reader}, err } diff --git a/exp/lighthorizon/ingester/main.go b/exp/lighthorizon/ingester/main.go index e43d77e7b1..35787e21ec 100644 --- a/exp/lighthorizon/ingester/main.go +++ b/exp/lighthorizon/ingester/main.go @@ -1,8 +1,16 @@ package ingester import ( + "context" + "fmt" + "net/url" + + "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" "github.com/stellar/go/metaarchive" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/support/storage" "github.com/stellar/go/xdr" ) @@ -15,6 +23,7 @@ import ( type Ingester interface { metaarchive.MetaArchive + PrepareRange(ctx context.Context, r historyarchive.Range) error NewLedgerTransactionReader( ledgerCloseMeta xdr.SerializedLedgerCloseMeta, ) (LedgerTransactionReader, error) @@ -29,3 +38,50 @@ type LedgerTransaction struct { type LedgerTransactionReader interface { Read() (LedgerTransaction, error) } + +func NewIngester(config IngesterConfig) (Ingester, error) { + if config.CacheSize <= 0 { + return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize) + } + + // Now, set up a simple filesystem-like access to the backend and wrap it in + // a local on-disk LRU cache if we can. + source, err := historyarchive.ConnectBackend( + config.SourceUrl, + storage.ConnectOptions{Context: context.Background()}, + ) + if err != nil { + return nil, errors.Wrapf(err, "failed to connect to %s", config.SourceUrl) + } + + parsed, err := url.Parse(config.SourceUrl) + if err != nil { + return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl) + } + + if parsed.Scheme != "file" { // otherwise, already on-disk + cache, errr := storage.MakeOnDiskCache(source, config.CacheDir, uint(config.CacheSize)) + + if errr != nil { // non-fatal: warn but continue w/o cache + log.WithField("path", config.CacheDir).WithError(errr). + Warnf("Failed to create cached ledger backend") + } else { + log.WithField("path", config.CacheDir). + Infof("On-disk cache configured") + source = cache + } + } + + if config.ParallelDownloads > 1 { + log.Infof("Enabling parallel ledger fetches with %d workers", config.ParallelDownloads) + return NewParallelIngester( + source, + config.NetworkPassphrase, + config.ParallelDownloads), nil + } + + return &liteIngester{ + MetaArchive: metaarchive.NewMetaArchive(source), + networkPassphrase: config.NetworkPassphrase, + }, nil +} diff --git a/exp/lighthorizon/main.go b/exp/lighthorizon/main.go index 5b7d6a791a..d6ec977d86 100644 --- a/exp/lighthorizon/main.go +++ b/exp/lighthorizon/main.go @@ -26,10 +26,11 @@ const ( ) func main() { + log.SetLevel(logrus.InfoLevel) // default for subcommands + cmd := &cobra.Command{ - Use: "lighthorizon ", - Short: "Horizon Lite suite", - Long: "Horizon Lite suite", + Use: "lighthorizon ", + Long: "Horizon Lite command suite", RunE: func(cmd *cobra.Command, args []string) error { return cmd.Usage() }, diff --git a/exp/lighthorizon/services/main.go b/exp/lighthorizon/services/main.go index abc262cf2c..d65e5420fd 100644 --- a/exp/lighthorizon/services/main.go +++ b/exp/lighthorizon/services/main.go @@ -116,13 +116,28 @@ func searchAccountTransactions(ctx context.Context, Infof("Fulfilled request for account %s at cursor %d", accountId, cursor) }() + checkpointMgr := historyarchive.NewCheckpointManager(0) + for { + if checkpointMgr.IsCheckpoint(nextLedger) { + r := historyarchive.Range{ + Low: nextLedger, + High: checkpointMgr.NextCheckpoint(nextLedger + 1), + } + log.Infof("prepare range %d, %d", r.Low, r.High) + if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil { + log.Errorf("failed to prepare ledger range [%d, %d]: %v", + r.Low, r.High, innerErr) + } + } + start := time.Now() - ledger, ledgerErr := config.Ingester.GetLedger(ctx, nextLedger) - if ledgerErr != nil { - return errors.Wrapf(ledgerErr, - "ledger export state is out of sync at ledger %d", nextLedger) + ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger) + if innerErr != nil { + return errors.Wrapf(innerErr, + "failed to retrieve ledger %d from archive", nextLedger) } + count++ thisFetchDuration := time.Since(start) if thisFetchDuration > slowFetchDurationThreshold { log.WithField("duration", thisFetchDuration). @@ -131,9 +146,10 @@ func searchAccountTransactions(ctx context.Context, fetchDuration += thisFetchDuration start = time.Now() - reader, readerErr := config.Ingester.NewLedgerTransactionReader(ledger) - if readerErr != nil { - return readerErr + reader, innerErr := config.Ingester.NewLedgerTransactionReader(ledger) + if innerErr != nil { + return errors.Wrapf(innerErr, + "failed to read ledger %d", nextLedger) } for { diff --git a/exp/lighthorizon/tools/cache.go b/exp/lighthorizon/tools/cache.go index 29f24f408c..2913499bac 100644 --- a/exp/lighthorizon/tools/cache.go +++ b/exp/lighthorizon/tools/cache.go @@ -1,4 +1,4 @@ -package main +package tools import ( "context" @@ -21,7 +21,7 @@ const ( defaultCacheCount = (60 * 60 * 24) / 5 // ~24hrs worth of ledgers ) -func addCacheCommands(parent *cobra.Command) *cobra.Command { +func AddCacheCommands(parent *cobra.Command) *cobra.Command { cmd := &cobra.Command{ Use: "cache", Long: "Manages the on-disk cache of ledgers.", diff --git a/exp/lighthorizon/tools/explorer.go b/exp/lighthorizon/tools/explorer.go index 1978236a20..d05175cb1b 100644 --- a/exp/lighthorizon/tools/explorer.go +++ b/exp/lighthorizon/tools/explorer.go @@ -1,4 +1,4 @@ -package main +package tools import ( "context" @@ -24,7 +24,7 @@ var ( checkpointMgr = historyarchive.NewCheckpointManager(0) ) -func addIndexCommands(parent *cobra.Command) *cobra.Command { +func AddIndexCommands(parent *cobra.Command) *cobra.Command { cmd := &cobra.Command{ Use: "index", Long: "Lets you view details about an index source.", diff --git a/exp/lighthorizon/tools/main.go b/exp/lighthorizon/tools/main.go index d57021becb..f01389d441 100644 --- a/exp/lighthorizon/tools/main.go +++ b/exp/lighthorizon/tools/main.go @@ -1,4 +1,4 @@ -package main +package tools import ( "github.com/spf13/cobra" @@ -19,7 +19,7 @@ func main() { }, } - cmd = addCacheCommands(cmd) - cmd = addIndexCommands(cmd) + cmd = AddCacheCommands(cmd) + cmd = AddIndexCommands(cmd) cmd.Execute() } diff --git a/support/collections/set/set_test.go b/support/collections/set/set_test.go index 798aeea7d0..42add005e6 100644 --- a/support/collections/set/set_test.go +++ b/support/collections/set/set_test.go @@ -7,8 +7,12 @@ import ( ) func TestSet(t *testing.T) { - s := Set[string]{} + s := NewSet[string](10) s.Add("sanity") require.True(t, s.Contains("sanity")) require.False(t, s.Contains("check")) + + s.AddSlice([]string{"a", "b", "c"}) + require.True(t, s.Contains("b")) + require.Equal(t, []string{"sanity", "a", "b", "c"}, s.Slice()) } diff --git a/support/storage/ondisk_cache.go b/support/storage/ondisk_cache.go index ad2a355097..67549ae77d 100644 --- a/support/storage/ondisk_cache.go +++ b/support/storage/ondisk_cache.go @@ -9,7 +9,9 @@ import ( "github.com/stellar/go/support/log" ) -// OnDiskCache fronts another storage with a local filesystem cache +// OnDiskCache fronts another storage with a local filesystem cache. Its +// thread-safe, meaning you can be actively caching a file and retrieve it at +// the same time without corruption, because retrieval will wait for the fetch. type OnDiskCache struct { Storage dir string @@ -85,6 +87,8 @@ func (b *OnDiskCache) GetFile(filepath string) (io.ReadCloser, error) { return remote, err } + filepath += ".lock" + local, err = b.createLocal(filepath) if err != nil { // If there's some local FS error, we can still continue with the @@ -219,12 +223,16 @@ func (t trc) Close() error { return t.close() } -func teeReadCloser(r io.ReadCloser, w io.WriteCloser) io.ReadCloser { +func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.ReadCloser { return trc{ Reader: io.TeeReader(r, w), close: func() error { r.Close() - return w.Close() + err := w.Close() + if err != nil { + return err + } + return onClose() }, } }