Skip to content

Commit

Permalink
exp/lighthorizon: Add an on-disk cache for frequently accessed ledger…
Browse files Browse the repository at this point in the history
…s. (#4457)

* Replace custom LRU solution with an off-the-shelf data structure.
* Add a filesystem cache in front of the ledger backend to lower latency
* Add cache size parameter; only setup cache if not file://
* Extract S3 region from the archive URL if it's applicable.
  • Loading branch information
Shaptic authored Jul 26, 2022
1 parent bceaf07 commit 9b93a36
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 183 deletions.
82 changes: 66 additions & 16 deletions exp/lighthorizon/archive/ingest_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,81 @@ package archive

import (
"context"
"fmt"
"net/url"

"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/xdr"
)

type ArchiveConfig struct {
SourceUrl string
NetworkPassphrase string
CacheDir string
CacheSize int
}

func NewIngestArchive(config ArchiveConfig) (Archive, error) {
if config.CacheSize <= 0 {
return nil, fmt.Errorf("invalid cache size: %d", config.CacheSize)
}

parsed, err := url.Parse(config.SourceUrl)
if err != nil {
return nil, errors.Wrapf(err, "%s is not a valid URL", config.SourceUrl)
}

region := ""
needsCache := true
switch parsed.Scheme {
case "file":
// We should only avoid a cache if the ledgers are already local.
needsCache = false

case "s3":
// We need to extract the region if it's specified.
region = parsed.Query().Get("region")
}

// Now, set up a simple filesystem-like access to the backend and wrap it in
// a local on-disk LRU cache if we can.
source, err := historyarchive.ConnectBackend(
config.SourceUrl,
historyarchive.ConnectOptions{
Context: context.Background(),
NetworkPassphrase: config.NetworkPassphrase,
S3Region: region,
},
)
if err != nil {
return nil, err
}

if needsCache {
cache, err := historyarchive.MakeFsCacheBackend(source,
config.CacheDir, uint(config.CacheSize))

if err != nil { // warn but continue w/o cache
log.WithField("path", config.CacheDir).
WithError(err).
Warnf("Failed to create cached ledger backend")
} else {
log.WithField("path", config.CacheDir).
Infof("On-disk cache configured")
source = cache
}
}

ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
return ingestArchive{ledgerBackend}, nil
}

// This is an implementation of LightHorizon Archive that uses the existing horizon ingestion backend.
type ingestArchive struct {
*ledgerbackend.HistoryArchiveBackend
Expand Down Expand Up @@ -82,20 +148,4 @@ func (adaptation *ingestTransactionReaderAdaption) Read() (LedgerTransaction, er
return tx, nil
}

func NewIngestArchive(sourceUrl string, networkPassphrase string) (Archive, error) {
// Simple file os access
source, err := historyarchive.ConnectBackend(
sourceUrl,
historyarchive.ConnectOptions{
Context: context.Background(),
NetworkPassphrase: networkPassphrase,
},
)
if err != nil {
return nil, err
}
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
return ingestArchive{ledgerBackend}, nil
}

var _ Archive = (*ingestArchive)(nil) // ensure conformity to the interface
1 change: 1 addition & 0 deletions exp/lighthorizon/archive/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package archive

import (
"context"

"github.com/stellar/go/xdr"
)

Expand Down
21 changes: 17 additions & 4 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,40 @@ import (
"github.com/stellar/go/support/log"
)

const (
defaultCacheSize = (60 * 60 * 24) / 6 // 1 day of ledgers @ 6s each
)

func main() {
sourceUrl := flag.String("source", "gcs://horizon-archive-poc", "history archive url to read txmeta files")
indexesUrl := flag.String("indexes", "file://indexes", "url of the indexes")
networkPassphrase := flag.String("network-passphrase", network.TestNetworkPassphrase, "network passphrase")
networkPassphrase := flag.String("network-passphrase", network.PublicNetworkPassphrase, "network passphrase")
cacheDir := flag.String("ledger-cache", "", `path to cache frequently-used ledgers;
if left empty, uses a temporary directory`)
cacheSize := flag.Int("ledger-cache-size", defaultCacheSize,
"number of ledgers to store in the cache")
flag.Parse()

L := log.WithField("service", "horizon-lite")
// L.SetLevel(log.DebugLevel)
L.SetLevel(log.InfoLevel)
L.Info("Starting lighthorizon!")

registry := prometheus.NewRegistry()
indexStore, err := index.ConnectWithConfig(index.StoreConfig{
Url: *indexesUrl,
Metrics: registry,
Log: L.WithField("subservice", "index"),
Metrics: registry,
})
if err != nil {
panic(err)
}

ingestArchive, err := archive.NewIngestArchive(*sourceUrl, *networkPassphrase)
ingestArchive, err := archive.NewIngestArchive(archive.ArchiveConfig{
SourceUrl: *sourceUrl,
NetworkPassphrase: *networkPassphrase,
CacheDir: *cacheDir,
CacheSize: *cacheSize,
})
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/hashicorp/golang-lru v0.5.1
github.com/hpcloud/tail v1.0.0 // indirect
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand Down
Loading

0 comments on commit 9b93a36

Please sign in to comment.