Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon: Add parameters to preload ledger cache. #4615

Merged
merged 5 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions exp/lighthorizon/build/k8s/lighthorizon_web.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ metadata:
app: lighthorizon-pubnet-web
name: lighthorizon-pubnet-web-env
data:
TXMETA_SOURCE: "s3://horizon-ledgermeta-prodnet-test"
INDEXES_SOURCE: "s3://horizon-index-prodnet-test"
TXMETA_SOURCE: "s3://horizon-indices-pubnet"
INDEXES_SOURCE: "s3://horizon-ledgermeta-pubnet"
NETWORK_PASSPHRASE: "Public Global Stellar Network ; September 2015"
MAX_PARALLEL_DOWNLOADS: 16
CACHE_PATH: "/ledgercache"
CACHE_PRELOAD_START_LEDGER: 0
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
CACHE_PRELOAD_COUNT: 14400
---
apiVersion: v1
kind: Secret
Expand Down
7 changes: 5 additions & 2 deletions exp/lighthorizon/build/web/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ COPY --from=builder /go/bin/lighthorizon ./

ENTRYPOINT ./lighthorizon serve \
--network-passphrase "$NETWORK_PASSPHRASE" \
--ledger-cache "$CACHE_PATH" \
--parallel-downloads "$MAX_PARALLEL_DOWNLOADS" \
"$TXMETA_SOURCE" "$INDEXES_SOURCE" \
--ledger-cache "$CACHE_PATH" \
--ledger-cache-preload "$CACHE_PRELOAD_COUNT" \
--ledger-cache-preload-start "$CACHE_PRELOAD_START_LEDGER" \
--log-level debug \
"$TXMETA_SOURCE" "$INDEXES_SOURCE"
37 changes: 37 additions & 0 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"net/http"

"github.com/go-chi/chi"
Expand Down Expand Up @@ -96,6 +97,36 @@ break down accounts by active ledgers.`,
return
}

latestLedger, err := ingester.GetLatestLedgerSequence(context.Background())
if err != nil {
log.Fatalf("Failed to retrieve latest ledger from %s: %v", sourceUrl, err)
return
}
log.Infof("The latest ledger stored at %s is %d.", sourceUrl, latestLedger)

cachePreloadCount, _ := cmd.Flags().GetUint32("ledger-cache-preload")
cachePreloadStart, _ := cmd.Flags().GetUint32("ledger-cache-preload-start")
if cachePreloadCount > 0 {
if cacheDir == "" {
log.Fatalf("--ledger-cache-preload=%d specified but no "+
"--ledger-cache directory provided, ignoring...",
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
cachePreloadCount)
return
} else {
startLedger := latestLedger - cachePreloadCount
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
if cachePreloadStart > 0 {
startLedger = cachePreloadStart
}

log.Infof("Preloading cache at %s with %d ledgers, starting at ledger %d.",
cacheDir, startLedger, cachePreloadCount)
go func() {
tools.BuildCache(sourceUrl, cacheDir,
startLedger, cachePreloadCount, false)
}()
}
}

Config := services.Config{
Ingester: ingester,
Passphrase: networkPassphrase,
Expand All @@ -118,6 +149,8 @@ break down accounts by active ledgers.`,
Version: HorizonLiteVersion,
LedgerSource: sourceUrl,
IndexSource: indexStoreUrl,

LatestLedger: latestLedger,
}))

log.Fatal(http.ListenAndServe(":8080", router))
Expand All @@ -131,6 +164,10 @@ break down accounts by active ledgers.`,
"if left empty, uses a temporary directory")
serve.Flags().Uint("ledger-cache-size", defaultCacheSize,
"number of ledgers to store in the cache")
serve.Flags().Uint32("ledger-cache-preload", 0,
"should the cache come preloaded with the latest <n> ledgers?")
serve.Flags().Uint32("ledger-cache-preload-start", 0,
"the preload should start at ledger <n>")
serve.Flags().Uint("parallel-downloads", 1,
"how many workers should download ledgers in parallel?")

Expand Down
7 changes: 6 additions & 1 deletion exp/lighthorizon/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func searchAccountTransactions(ctx context.Context,
Low: nextLedger,
High: checkpointMgr.NextCheckpoint(nextLedger + 1),
}
log.Infof("prepare range %d, %d", r.Low, r.High)
log.Infof("Preparing ledger range [%d, %d]", r.Low, r.High)
if innerErr := config.Ingester.PrepareRange(ctx, r); innerErr != nil {
log.Errorf("failed to prepare ledger range [%d, %d]: %v",
r.Low, r.High, innerErr)
Expand All @@ -133,6 +133,11 @@ func searchAccountTransactions(ctx context.Context,

start := time.Now()
ledger, innerErr := config.Ingester.GetLedger(ctx, nextLedger)

Shaptic marked this conversation as resolved.
Show resolved Hide resolved
// TODO: We should have helpful error messages when innerErr points to a
// 404 for that particular ledger, since that situation shouldn't happen
// under normal operations, but rather indicates a problem with the
// backing archive.
if innerErr != nil {
return errors.Wrapf(innerErr,
"failed to retrieve ledger %d from archive", nextLedger)
Expand Down
16 changes: 8 additions & 8 deletions exp/lighthorizon/tools/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ purge /tmp/example 1000 1005 # purge a ledger range`,
return cmd.Usage()
}

count, err := cmd.Flags().GetUint("count")
count, err := cmd.Flags().GetUint32("count")
if err != nil || count <= 0 {
cmd.Println("--count should be a positive integer")
cmd.Println("--count should be a positive 32-bit integer")
return cmd.Usage()
}
repair, _ := cmd.Flags().GetBool("repair")
Expand All @@ -115,7 +115,7 @@ purge /tmp/example 1000 1005 # purge a ledger range`,

build.Flags().Bool("repair", false, "attempt to purge the cache and retry ledgers that error")
build.Flags().Uint32("start", 0, "first ledger to cache (required)")
build.Flags().Uint("count", defaultCacheCount, "number of ledgers to cache")
build.Flags().Uint32("count", defaultCacheCount, "number of ledgers to cache")

cmd.AddCommand(build, purge, show)
if parent == nil {
Expand All @@ -126,7 +126,7 @@ purge /tmp/example 1000 1005 # purge a ledger range`,
return parent
}

func BuildCache(ledgerSource, cacheDir string, start uint32, count uint, repair bool) error {
func BuildCache(ledgerSource, cacheDir string, start uint32, count uint32, repair bool) error {
fullStart := time.Now()
L := log.DefaultLogger
L.SetLevel(log.InfoLevel)
Expand All @@ -136,7 +136,7 @@ func BuildCache(ledgerSource, cacheDir string, start uint32, count uint, repair
store, err := storage.ConnectBackend(ledgerSource, storage.ConnectOptions{
Context: ctx,
Wrap: func(store storage.Storage) (storage.Storage, error) {
return storage.MakeOnDiskCache(store, cacheDir, count)
return storage.MakeOnDiskCache(store, cacheDir, uint(count))
},
})
if err != nil {
Expand All @@ -151,10 +151,10 @@ func BuildCache(ledgerSource, cacheDir string, start uint32, count uint, repair
source := metaarchive.NewMetaArchive(store)
log.Infof("Filling local cache of ledgers at %s...", cacheDir)
log.Infof("Ledger range: [%d, %d] (%d ledgers)",
start, uint(start)+count-1, count)
start, start+count-1, count)

successful := uint(0)
for i := uint(0); i < count; i++ {
for i := uint32(0); i < count; i++ {
ledgerSeq := start + uint32(i)

// do "best effort" caching, skipping if too slow
Expand Down Expand Up @@ -186,7 +186,7 @@ func BuildCache(ledgerSource, cacheDir string, start uint32, count uint, repair
Warnf("Downloading ledger %d took a while.", ledgerSeq)
}

log = log.WithField("failures", 1+i-successful)
log = log.WithField("failures", 1+uint(i)-successful)
if successful%97 == 0 {
log.Infof("Cached %d/%d ledgers (%0.1f%%)", successful, count,
100*float64(successful)/float64(count))
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/tools/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ view s3:///indices GAXLQGKIUAIIUHAX4GJO3J7HFGLBCNF6ZCZSTLJE7EKO5IUHGLQLMXZO
view file:///tmp/indices --limit=0 GAXLQGKIUAIIUHAX4GJO3J7HFGLBCNF6ZCZSTLJE7EKO5IUHGLQLMXZO
view gcs://indices --limit=10 GAXLQGKIUAIIUHAX4GJO3J7HFGLBCNF6ZCZSTLJE7EKO5IUHGLQLMXZO,GBUUWQDVEEXBJCUF5UL24YGXKJIP5EMM7KFWIAR33KQRJR34GN6HEDPV,GBYETUYNBK2ZO5MSYBJKSLDEA2ZHIXLCFL3MMWU6RHFVAUBKEWQORYKS`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) > 2 {
if len(args) < 1 || len(args) > 2 {
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
return cmd.Usage()
}

Expand Down