From 9b93a36f52effdfc5e75feb99fc0ae4e2ae02f7d Mon Sep 17 00:00:00 2001 From: George Date: Tue, 26 Jul 2022 15:36:49 -0700 Subject: [PATCH] exp/lighthorizon: Add an on-disk cache for frequently accessed ledgers. (#4457) * Replace custom LRU solution with an off-the-shelf data structure. * Add a filesystem cache in front of the ledger backend to lower latency * Add cache size parameter; only setup cache if not file:// * Extract S3 region from the archive URL if it's applicable. --- exp/lighthorizon/archive/ingest_archive.go | 82 +++++- exp/lighthorizon/archive/main.go | 1 + exp/lighthorizon/main.go | 21 +- go.mod | 2 +- historyarchive/fs_cache.go | 325 +++++++++++---------- 5 files changed, 248 insertions(+), 183 deletions(-) diff --git a/exp/lighthorizon/archive/ingest_archive.go b/exp/lighthorizon/archive/ingest_archive.go index 01ac0b6158..002012c0db 100644 --- a/exp/lighthorizon/archive/ingest_archive.go +++ b/exp/lighthorizon/archive/ingest_archive.go @@ -2,15 +2,81 @@ package archive import ( "context" + "fmt" + "net/url" "github.com/stellar/go/exp/lighthorizon/index" "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" "github.com/stellar/go/historyarchive" "github.com/stellar/go/xdr" ) +type ArchiveConfig struct { + SourceUrl string + NetworkPassphrase string + CacheDir string + CacheSize int +} + +func NewIngestArchive(config ArchiveConfig) (Archive, error) { + if config.CacheSize <= 0 { + return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize) + } + + parsed, err := url.Parse(config.SourceUrl) + if err != nil { + return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl) + } + + region := "" + needsCache := true + switch parsed.Scheme { + case "file": + // We should only avoid a cache if the ledgers are already local. + needsCache = false + + case "s3": + // We need to extract the region if it's specified. + region = parsed.Query().Get("region") + } + + // Now, set up a simple filesystem-like access to the backend and wrap it in + // a local on-disk LRU cache if we can. + source, err := historyarchive.ConnectBackend( + config.SourceUrl, + historyarchive.ConnectOptions{ + Context: context.Background(), + NetworkPassphrase: config.NetworkPassphrase, + S3Region: region, + }, + ) + if err != nil { + return nil, err + } + + if needsCache { + cache, err := historyarchive.MakeFsCacheBackend(source, + config.CacheDir, uint(config.CacheSize)) + + if err != nil { // warn but continue w/o cache + log.WithField("path", config.CacheDir). + WithError(err). + Warnf("Failed to create cached ledger backend") + } else { + log.WithField("path", config.CacheDir). + Infof("On-disk cache configured") + source = cache + } + } + + ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source) + return ingestArchive{ledgerBackend}, nil +} + // This is an implementation of LightHorizon Archive that uses the existing horizon ingestion backend. type ingestArchive struct { *ledgerbackend.HistoryArchiveBackend @@ -82,20 +148,4 @@ func (adaptation *ingestTransactionReaderAdaption) Read() (LedgerTransaction, er return tx, nil } -func NewIngestArchive(sourceUrl string, networkPassphrase string) (Archive, error) { - // Simple file os access - source, err := historyarchive.ConnectBackend( - sourceUrl, - historyarchive.ConnectOptions{ - Context: context.Background(), - NetworkPassphrase: networkPassphrase, - }, - ) - if err != nil { - return nil, err - } - ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source) - return ingestArchive{ledgerBackend}, nil -} - var _ Archive = (*ingestArchive)(nil) // ensure conformity to the interface diff --git a/exp/lighthorizon/archive/main.go b/exp/lighthorizon/archive/main.go index 1f46a68105..2079ca1040 100644 --- a/exp/lighthorizon/archive/main.go +++ b/exp/lighthorizon/archive/main.go @@ -2,6 +2,7 @@ package archive import ( "context" + "github.com/stellar/go/xdr" ) diff --git a/exp/lighthorizon/main.go b/exp/lighthorizon/main.go index 604aa49e81..11b7354c80 100644 --- a/exp/lighthorizon/main.go +++ b/exp/lighthorizon/main.go @@ -17,27 +17,40 @@ import ( "github.com/stellar/go/support/log" ) +const ( + defaultCacheSize = (60 * 60 * 24) / 6 // 1 day of ledgers @ 6s each +) + func main() { sourceUrl := flag.String("source", "gcs://horizon-archive-poc", "history archive url to read txmeta files") indexesUrl := flag.String("indexes", "file://indexes", "url of the indexes") - networkPassphrase := flag.String("network-passphrase", network.TestNetworkPassphrase, "network passphrase") + networkPassphrase := flag.String("network-passphrase", network.PublicNetworkPassphrase, "network passphrase") + cacheDir := flag.String("ledger-cache", "", `path to cache frequently-used ledgers; +if left empty, uses a temporary directory`) + cacheSize := flag.Int("ledger-cache-size", defaultCacheSize, + "number of ledgers to store in the cache") flag.Parse() L := log.WithField("service", "horizon-lite") - // L.SetLevel(log.DebugLevel) + L.SetLevel(log.InfoLevel) L.Info("Starting lighthorizon!") registry := prometheus.NewRegistry() indexStore, err := index.ConnectWithConfig(index.StoreConfig{ Url: *indexesUrl, - Metrics: registry, Log: L.WithField("subservice", "index"), + Metrics: registry, }) if err != nil { panic(err) } - ingestArchive, err := archive.NewIngestArchive(*sourceUrl, *networkPassphrase) + ingestArchive, err := archive.NewIngestArchive(archive.ArchiveConfig{ + SourceUrl: *sourceUrl, + NetworkPassphrase: *networkPassphrase, + CacheDir: *cacheDir, + CacheSize: *cacheSize, + }) if err != nil { panic(err) } diff --git a/go.mod b/go.mod index 0949f16980..eb1351d495 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/google/go-cmp v0.5.6 // indirect github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5 // indirect github.com/googleapis/gax-go/v2 v2.0.5 // indirect - github.com/hashicorp/golang-lru v0.5.1 // indirect + github.com/hashicorp/golang-lru v0.5.1 github.com/hpcloud/tail v1.0.0 // indirect github.com/imkira/go-interpol v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect diff --git a/historyarchive/fs_cache.go b/historyarchive/fs_cache.go index b206b385a8..3f4c6f5a2d 100644 --- a/historyarchive/fs_cache.go +++ b/historyarchive/fs_cache.go @@ -5,222 +5,223 @@ package historyarchive import ( - "container/heap" "io" "os" "path" - "time" - log "github.com/sirupsen/logrus" + lru "github.com/hashicorp/golang-lru" + "github.com/stellar/go/support/log" ) // FsCacheBackend fronts another backend with a local filesystem cache type FsCacheBackend struct { ArchiveBackend - dir string - //lint:ignore U1000 Ignore unused temporarily - knownFiles lruCache - maxFiles int - lru lruCache -} + dir string + maxFiles int + lru *lru.Cache -type trc struct { - io.Reader - close func() error + log *log.Entry } -func (t trc) Close() error { - return t.close() -} - -func teeReadCloser(r io.ReadCloser, w io.WriteCloser) io.ReadCloser { - return trc{ - Reader: io.TeeReader(r, w), - close: func() error { - r.Close() - return w.Close() - }, - } -} - -func (b *FsCacheBackend) GetFile(pth string) (r io.ReadCloser, err error) { - localPath := path.Join(b.dir, pth) - local, err := os.Open(localPath) - if err == nil { - b.updateLRU(localPath) - return local, nil +// MakeFsCacheBackend wraps an ArchiveBackend with a local filesystem cache in +// `dir`. If dir is blank, a temporary directory will be created. If `maxFiles` +// is zero, a default (90 days of ledgers) is used. +func MakeFsCacheBackend(upstream ArchiveBackend, dir string, maxFiles uint) (ArchiveBackend, error) { + if dir == "" { + tmp, err := os.MkdirTemp(os.TempDir(), "stellar-horizon-*") + if err != nil { + return nil, err + } + dir = tmp } - if !os.IsNotExist(err) { - // Some local fs error.. log and continue? - log.WithField("path", pth).WithError(err).Error("fs-cache: get file") + if maxFiles == 0 { + // A guess at a reasonable number of checkpoints. This is 90 days of + // ledgers. (90*86_400)/(5*64) = 24_300 + maxFiles = 24_300 } - remote, err := b.ArchiveBackend.GetFile(pth) - if err != nil { - return remote, err + backendLog := log. + WithField("subservice", "fs-cache"). + WithField("path", dir). + WithField("size", maxFiles) + backendLog.Info("Filesystem cache configured") + + backend := &FsCacheBackend{ + ArchiveBackend: upstream, + dir: dir, + maxFiles: int(maxFiles), + log: backendLog, } - local, err = b.createLocal(pth) + + cache, err := lru.NewWithEvict(int(maxFiles), backend.onEviction) if err != nil { - // Some local fs error.. log and continue? - log.WithField("path", pth).WithError(err).Error("fs-cache: get file") - return remote, nil + return nil, err } - return teeReadCloser(remote, local), nil -} -func (b *FsCacheBackend) createLocal(pth string) (*os.File, error) { - localPath := path.Join(b.dir, pth) + backend.lru = cache + return backend, nil +} + +// GetFile retrieves the file contents from the local cache if present. +// Otherwise, it returns the same result that the wrapped backend returns and +// adds that result into the local cache, if possible. +func (b *FsCacheBackend) GetFile(filepath string) (io.ReadCloser, error) { + L := b.log.WithField("key", filepath) + localPath := path.Join(b.dir, filepath) + + if _, ok := b.lru.Get(localPath); !ok { + // If it doesn't exist in the cache, it might still exist on the disk if + // we've restarted from an existing directory. + local, err := os.Open(localPath) + if err == nil { + L.Debug("found file on disk but not in cache, adding") + b.lru.Add(localPath, struct{}{}) + return local, nil + } - if err := os.MkdirAll(path.Dir(localPath), 0755); err != nil { - return nil, err + b.log.WithField("key", filepath). + Debug("retrieving file from remote backend") + + // Since it's not on-disk, pull it from the remote backend, shove it + // into the cache, and write it to disk. + remote, err := b.ArchiveBackend.GetFile(filepath) + if err != nil { + return remote, err + } + + local, err = b.createLocal(filepath) + if err != nil { + // If there's some local FS error, we can still continue with the + // remote version, so just log it and continue. + L.WithError(err).Error("caching ledger failed") + + return remote, nil + } + + return teeReadCloser(remote, local), nil } - local, err := os.Create(localPath) + // The cache claims it exists, so just give it a read and send it. + local, err := os.Open(localPath) if err != nil { - return nil, err + // Uh-oh, the cache and the disk are not in sync somehow? Let's evict + // this value and try again (recurse) w/ the remote version. + L.WithError(err).Warn("opening cached ledger failed") + b.lru.Remove(localPath) + return b.GetFile(filepath) } - b.updateLRU(localPath) - return local, err + + L.Debug("Found file in cache") + return local, nil } -func (b *FsCacheBackend) Exists(pth string) (bool, error) { - localPath := path.Join(b.dir, pth) - log.WithField("path", pth).Trace("fs-cache: check exists") - if _, err := os.Stat(localPath); err == nil { - b.updateLRU(localPath) +// Exists shortcuts an existence check by checking if it exists in the cache. +// Otherwise, it returns the same result as the wrapped backend. Note that in +// the latter case, the cache isn't modified. +func (b *FsCacheBackend) Exists(filepath string) (bool, error) { + localPath := path.Join(b.dir, filepath) + b.log.WithField("key", filepath).Debug("checking existence") + + if _, ok := b.lru.Get(localPath); ok { + // If the cache says it's there, we can definitively say that this path + // exists, even if we'd fail to `os.Stat()/Read()/etc.` it locally. return true, nil } - return b.ArchiveBackend.Exists(pth) -} -func (b *FsCacheBackend) Size(pth string) (int64, error) { - localPath := path.Join(b.dir, pth) - log.WithField("path", pth).Trace("fs-cache: check exists") - fi, err := os.Stat(localPath) - if err == nil { - log.WithField("path", pth).WithField("size", fi.Size()).Trace("fs-cache: got size") - b.updateLRU(localPath) - return fi.Size(), nil - } - log.WithField("path", pth).WithError(err).Error("fs-cache: get size") - return b.ArchiveBackend.Size(pth) + return b.ArchiveBackend.Exists(filepath) } -func (b *FsCacheBackend) PutFile(pth string, in io.ReadCloser) error { - log.WithField("path", pth).Trace("fs-cache: put file") - in = b.tryLocalPutFile(pth, in) - return b.ArchiveBackend.PutFile(pth, in) -} +// Size will return the size of the file found in the cache if possible. +// Otherwise, it returns the same result as the wrapped backend. Note that in +// the latter case, the cache isn't modified. +func (b *FsCacheBackend) Size(filepath string) (int64, error) { + localPath := path.Join(b.dir, filepath) + L := b.log.WithField("key", filepath) -// Best effort to tee the upload off to the local cache as well -func (b *FsCacheBackend) tryLocalPutFile(pth string, in io.ReadCloser) io.ReadCloser { - local, err := b.createLocal(pth) - if err != nil { - log.WithField("path", pth).WithError(err).Error("fs-cache: put file") - return in + L.Debug("retrieving size") + if _, ok := b.lru.Get(localPath); ok { + stats, err := os.Stat(localPath) + if err == nil { + L.Debugf("retrieved cached size: %d", stats.Size()) + return stats.Size(), nil + } + + L.WithError(err).Debug("retrieving size of cached ledger failed") + b.lru.Remove(localPath) // stale cache? } - // tee upload data into our local file - return teeReadCloser(in, local) + return b.ArchiveBackend.Size(filepath) } -func (b *FsCacheBackend) updateLRU(pth string) { - b.lru.bump(pth) - for i := b.lru.Len(); i > b.maxFiles; i-- { - item := b.lru.Pop().(*lruCacheItem) - if err := os.Remove(item.path); err != nil { - log.WithField("path", item.path).WithError(err).Error("fs-cache: evict") - } +// PutFile writes to the given `filepath` from the given `in` reader, also +// writing it to the local cache if possible. It returns the same result as the +// wrapped backend. +func (b *FsCacheBackend) PutFile(filepath string, in io.ReadCloser) error { + L := log.WithField("key", filepath) + L.Debug("putting file") + + // Best effort to tee the upload off to the local cache as well + local, err := b.createLocal(filepath) + if err != nil { + L.WithError(err).Error("failed to put file locally") + } else { + // tee upload data into our local file + in = teeReadCloser(in, local) } + + return b.ArchiveBackend.PutFile(filepath, in) } +// Close purges the cache, then forwards the call to the wrapped backend. func (b *FsCacheBackend) Close() error { + // We only purge the cache, leaving the filesystem untouched: + // https://github.com/stellar/go/pull/4457#discussion_r929352643 + b.lru.Purge() return b.ArchiveBackend.Close() } -// MakeFsCacheBackend, wraps an ArchiveBackend with a local filesystem cache in -// `dir`. If dir is blank, a temporary directory will be created. -func MakeFsCacheBackend(upstream ArchiveBackend, dir string, maxFiles uint) (ArchiveBackend, error) { - if dir == "" { - tmp, err := os.MkdirTemp(os.TempDir(), "stellar-horizon-*") - if err != nil { - return nil, err - } - dir = tmp - log.WithField("dir", dir).Info("fs-cache: temp dir") +func (b *FsCacheBackend) onEviction(key, value interface{}) { + path := key.(string) + if err := os.Remove(path); err != nil { // best effort removal + b.log.WithError(err). + WithField("key", path). + Error("removal failed after cache eviction") } - if maxFiles == 0 { - // A guess at a reasonable number of checkpoints. This is 90 days of - // ledgers. (90*86_400)/(5*64) = 24_300 - maxFiles = 24_300 - } - // Add 10 here, cause we need a bit of spare room for pending evictions. - var lru lruCache - heap.Init(&lru) - return &FsCacheBackend{ - ArchiveBackend: upstream, - dir: dir, - maxFiles: int(maxFiles), - lru: lru, - }, nil } -// lruCache is a heap-based LRU cache that we use to limit the on-disk size -type lruCache []*lruCacheItem - -type lruCacheItem struct { - path string - lastUsedAt time.Time - index int -} - -func (c lruCache) Len() int { return len(c) } +func (b *FsCacheBackend) createLocal(filepath string) (*os.File, error) { + localPath := path.Join(b.dir, filepath) + if err := os.MkdirAll(path.Dir(localPath), 0755 /* drwxr-xr-x */); err != nil { + return nil, err + } -func (c lruCache) Less(i, j int) bool { - // We want Pop to give us the oldest, so we use before than here. - return c[i].lastUsedAt.Before(c[j].lastUsedAt) -} + local, err := os.Create(localPath) /* mode -rw-rw-rw- */ + if err != nil { + return nil, err + } -func (c lruCache) Swap(i, j int) { - c[i], c[j] = c[j], c[i] - c[i].index = i - c[j].index = j + b.lru.Add(localPath, struct{}{}) // just use the cache as an array + return local, nil } -func (c *lruCache) Push(x interface{}) { - n := len(*c) - item := x.(*lruCacheItem) - item.index = n - *c = append(*c, item) -} +// The below is a helper interface so that we can use io.TeeReader to write +// data locally immediately as we read it remotely. -func (c *lruCache) Pop() interface{} { - old := *c - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.index = -1 // for safety - *c = old[0 : n-1] - return item +type trc struct { + io.Reader + close func() error } -func (c *lruCache) bump(pth string) { - c.upsert(pth, time.Now()) +func (t trc) Close() error { + return t.close() } -// upsert modifies the priority and value of an item in the heap, or inserts it. -func (c *lruCache) upsert(pth string, lastUsedAt time.Time) { - // Try to find by path and update - for _, item := range *c { - if item.path == pth { - item.lastUsedAt = lastUsedAt - heap.Fix(c, item.index) - return - } +func teeReadCloser(r io.ReadCloser, w io.WriteCloser) io.ReadCloser { + return trc{ + Reader: io.TeeReader(r, w), + close: func() error { + r.Close() + return w.Close() + }, } - // not found, add this item - heap.Push(c, &lruCacheItem{ - path: pth, - lastUsedAt: lastUsedAt, - }) }