Skip to content

Commit

Permalink
exp/lighthorizon: Add parameters to preload ledger cache. (#4615)
Browse files Browse the repository at this point in the history
* Add ability to preload cache in parallel after launching webserver
* Default to 1 day of ledgers @ 6s each
* Add logging instead of silent failure
  • Loading branch information
Shaptic authored Oct 11, 2022
1 parent a088915 commit dc430d4
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 14 deletions.
6 changes: 4 additions & 2 deletions exp/lighthorizon/build/k8s/lighthorizon_web.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ metadata:
app: lighthorizon-pubnet-web
name: lighthorizon-pubnet-web-env
data:
TXMETA_SOURCE: "s3://horizon-ledgermeta-prodnet-test"
INDEXES_SOURCE: "s3://horizon-index-prodnet-test"
TXMETA_SOURCE: "s3://horizon-indices-pubnet"
INDEXES_SOURCE: "s3://horizon-ledgermeta-pubnet"
NETWORK_PASSPHRASE: "Public Global Stellar Network ; September 2015"
MAX_PARALLEL_DOWNLOADS: 16
CACHE_PATH: "/ledgercache"
CACHE_PRELOAD_START_LEDGER: 0
CACHE_PRELOAD_COUNT: 14400
---
apiVersion: v1
kind: Secret
Expand Down
7 changes: 5 additions & 2 deletions exp/lighthorizon/build/web/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ COPY --from=builder /go/bin/lighthorizon ./

ENTRYPOINT ./lighthorizon serve \
--network-passphrase "$NETWORK_PASSPHRASE" \
--ledger-cache "$CACHE_PATH" \
--parallel-downloads "$MAX_PARALLEL_DOWNLOADS" \
"$TXMETA_SOURCE" "$INDEXES_SOURCE" \
--ledger-cache "$CACHE_PATH" \
--ledger-cache-preload "$CACHE_PRELOAD_COUNT" \
--ledger-cache-preload-start "$CACHE_PRELOAD_START_LEDGER" \
--log-level debug \
"$TXMETA_SOURCE" "$INDEXES_SOURCE"
42 changes: 42 additions & 0 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"net/http"

"github.com/go-chi/chi"
Expand Down Expand Up @@ -96,6 +97,41 @@ break down accounts by active ledgers.`,
return
}

latestLedger, err := ingester.GetLatestLedgerSequence(context.Background())
if err != nil {
log.Fatalf("Failed to retrieve latest ledger from %s: %v", sourceUrl, err)
return
}
log.Infof("The latest ledger stored at %s is %d.", sourceUrl, latestLedger)

cachePreloadCount, _ := cmd.Flags().GetUint32("ledger-cache-preload")
cachePreloadStart, _ := cmd.Flags().GetUint32("ledger-cache-preload-start")
if cachePreloadCount > 0 {
if cacheDir == "" {
log.Fatalf("--ledger-cache-preload=%d specified but no "+
"--ledger-cache directory provided.",
cachePreloadCount)
return
} else {
startLedger := int(latestLedger) - int(cachePreloadCount)
if cachePreloadStart > 0 {
startLedger = int(cachePreloadStart)
}
if startLedger <= 0 {
log.Warnf("Starting ledger invalid (%d), defaulting to 2.",
startLedger)
startLedger = 2
}

log.Infof("Preloading cache at %s with %d ledgers, starting at ledger %d.",
cacheDir, startLedger, cachePreloadCount)
go func() {
tools.BuildCache(sourceUrl, cacheDir,
uint32(startLedger), cachePreloadCount, false)
}()
}
}

Config := services.Config{
Ingester: ingester,
Passphrase: networkPassphrase,
Expand All @@ -118,6 +154,8 @@ break down accounts by active ledgers.`,
Version: HorizonLiteVersion,
LedgerSource: sourceUrl,
IndexSource: indexStoreUrl,

LatestLedger: latestLedger,
}))

log.Fatal(http.ListenAndServe(":8080", router))
Expand All @@ -131,6 +169,10 @@ break down accounts by active ledgers.`,
"if left empty, uses a temporary directory")
serve.Flags().Uint("ledger-cache-size", defaultCacheSize,
"number of ledgers to store in the cache")
serve.Flags().Uint32("ledger-cache-preload", 0,
"should the cache come preloaded with the latest <n> ledgers?")
serve.Flags().Uint32("ledger-cache-preload-start", 0,
"the preload should start at ledger <n>")
serve.Flags().Uint("parallel-downloads", 1,
"how many workers should download ledgers in parallel?")

Expand Down
7 changes: 6 additions & 1 deletion exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func searchAccountTransactions(ctx context.Context,
Low: nextLedger,
High: checkpointMgr.NextCheckpoint(nextLedger + 1),
}
log.Infof("prepare range %d, %d", r.Low, r.High)
log.Infof("Preparing ledger range [%d, %d]", r.Low, r.High)
if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil {
log.Errorf("failed to prepare ledger range [%d, %d]: %v",
r.Low, r.High, innerErr)
Expand All @@ -133,6 +133,11 @@ func searchAccountTransactions(ctx context.Context,

start := time.Now()
ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger)

// TODO: We should have helpful error messages when innerErr points to a
// 404 for that particular ledger, since that situation shouldn't happen
// under normal operations, but rather indicates a problem with the
// backing archive.
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to retrieve ledger %d from archive", nextLedger)
Expand Down
16 changes: 8 additions & 8 deletions exp/lighthorizon/tools/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ purge /tmp/example 1000 1005 # purge a ledger range`,
return cmd.Usage()
}

count, err := cmd.Flags().GetUint("count")
count, err := cmd.Flags().GetUint32("count")
if err != nil || count <= 0 {
cmd.Println("--count should be a positive integer")
cmd.Println("--count should be a positive 32-bit integer")
return cmd.Usage()
}
repair, _ := cmd.Flags().GetBool("repair")
Expand All @@ -115,7 +115,7 @@ purge /tmp/example 1000 1005 # purge a ledger range`,

build.Flags().Bool("repair", false, "attempt to purge the cache and retry ledgers that error")
build.Flags().Uint32("start", 0, "first ledger to cache (required)")
build.Flags().Uint("count", defaultCacheCount, "number of ledgers to cache")
build.Flags().Uint32("count", defaultCacheCount, "number of ledgers to cache")

cmd.AddCommand(build, purge, show)
if parent == nil {
Expand All @@ -126,7 +126,7 @@ purge /tmp/example 1000 1005 # purge a ledger range`,
return parent
}

func BuildCache(ledgerSource, cacheDir string, start uint32, count uint, repair bool) error {
func BuildCache(ledgerSource, cacheDir string, start uint32, count uint32, repair bool) error {
fullStart := time.Now()
L := log.DefaultLogger
L.SetLevel(log.InfoLevel)
Expand All @@ -136,7 +136,7 @@ func BuildCache(ledgerSource, cacheDir string, start uint32, count uint, repair
store, err := storage.ConnectBackend(ledgerSource, storage.ConnectOptions{
Context: ctx,
Wrap: func(store storage.Storage) (storage.Storage, error) {
return storage.MakeOnDiskCache(store, cacheDir, count)
return storage.MakeOnDiskCache(store, cacheDir, uint(count))
},
})
if err != nil {
Expand All @@ -151,10 +151,10 @@ func BuildCache(ledgerSource, cacheDir string, start uint32, count uint, repair
source := metaarchive.NewMetaArchive(store)
log.Infof("Filling local cache of ledgers at %s...", cacheDir)
log.Infof("Ledger range: [%d, %d] (%d ledgers)",
start, uint(start)+count-1, count)
start, start+count-1, count)

successful := uint(0)
for i := uint(0); i < count; i++ {
for i := uint32(0); i < count; i++ {
ledgerSeq := start + uint32(i)

// do "best effort" caching, skipping if too slow
Expand Down Expand Up @@ -186,7 +186,7 @@ func BuildCache(ledgerSource, cacheDir string, start uint32, count uint, repair
Warnf("Downloading ledger %d took a while.", ledgerSeq)
}

log = log.WithField("failures", 1+i-successful)
log = log.WithField("failures", 1+uint(i)-successful)
if successful%97 == 0 {
log.Infof("Cached %d/%d ledgers (%0.1f%%)", successful, count,
100*float64(successful)/float64(count))
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/tools/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ view s3:///indices GAXLQGKIUAIIUHAX4GJO3J7HFGLBCNF6ZCZSTLJE7EKO5IUHGLQLMXZO
view file:///tmp/indices --limit=0 GAXLQGKIUAIIUHAX4GJO3J7HFGLBCNF6ZCZSTLJE7EKO5IUHGLQLMXZO
view gcs://indices --limit=10 GAXLQGKIUAIIUHAX4GJO3J7HFGLBCNF6ZCZSTLJE7EKO5IUHGLQLMXZO,GBUUWQDVEEXBJCUF5UL24YGXKJIP5EMM7KFWIAR33KQRJR34GN6HEDPV,GBYETUYNBK2ZO5MSYBJKSLDEA2ZHIXLCFL3MMWU6RHFVAUBKEWQORYKS`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) > 2 {
if len(args) < 1 || len(args) > 2 {
return cmd.Usage()
}

Expand Down

0 comments on commit dc430d4

Please sign in to comment.