diff --git a/go.mod b/go.mod
index f3d11b5abc..1ca1ea801c 100644
--- a/go.mod
+++ b/go.mod
@@ -14,6 +14,7 @@ require (
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
github.com/aws/aws-sdk-go v1.45.26
github.com/creachadair/jrpc2 v1.1.0
+ github.com/djherbis/fscache v0.10.1
github.com/elazarl/go-bindata-assetfs v1.0.1
github.com/getsentry/raven-go v0.2.0
github.com/go-chi/chi v4.1.2+incompatible
@@ -91,6 +92,8 @@ require (
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231211222908-989df2bf70f3 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
+ gopkg.in/djherbis/atime.v1 v1.0.0 // indirect
+ gopkg.in/djherbis/stream.v1 v1.3.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
diff --git a/go.sum b/go.sum
index 9ca895487d..6e4158f9b7 100644
--- a/go.sum
+++ b/go.sum
@@ -104,6 +104,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/djherbis/fscache v0.10.1 h1:hDv+RGyvD+UDKyRYuLoVNbuRTnf2SrA2K3VyR1br9lk=
+github.com/djherbis/fscache v0.10.1/go.mod h1:yyPYtkNnnPXsW+81lAcQS6yab3G2CRfnPLotBvtbf0c=
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
@@ -814,6 +816,10 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/djherbis/atime.v1 v1.0.0 h1:eMRqB/JrLKocla2PBPKgQYg/p5UG4L6AUAs92aP7F60=
+gopkg.in/djherbis/atime.v1 v1.0.0/go.mod h1:hQIUStKmJfvf7xdh/wtK84qe+DsTV5LnA9lzxxtPpJ8=
+gopkg.in/djherbis/stream.v1 v1.3.1 h1:uGfmsOY1qqMjQQphhRBSGLyA9qumJ56exkRu9ASTjCw=
+gopkg.in/djherbis/stream.v1 v1.3.1/go.mod h1:aEV8CBVRmSpLamVJfM903Npic1IKmb2qS30VAZ+sssg=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gavv/httpexpect.v1 v1.0.0-20170111145843-40724cf1e4a0 h1:r5ptJ1tBxVAeqw4CrYWhXIMr0SybY3CDHuIbCg5CFVw=
diff --git a/historyarchive/archive.go b/historyarchive/archive.go
index ed05a4130d..b01b808690 100644
--- a/historyarchive/archive.go
+++ b/historyarchive/archive.go
@@ -11,12 +11,15 @@ import (
"fmt"
"io"
"net/url"
+ "os"
"path"
"regexp"
"strconv"
"strings"
"sync"
+ "time"
+ fscache "github.com/djherbis/fscache"
log "github.com/sirupsen/logrus"
"github.com/stellar/go/support/errors"
@@ -38,6 +41,8 @@ type CommandOptions struct {
}
type ArchiveOptions struct {
+ storage.ConnectOptions
+
// NetworkPassphrase defines the expected network of history archive. It is
// checked when getting HAS. If network passphrase does not match, error is
// returned.
@@ -45,9 +50,8 @@ type ArchiveOptions struct {
// CheckpointFrequency is the number of ledgers between checkpoints
// if unset, DefaultCheckpointFrequency will be used
CheckpointFrequency uint32
- storage.ConnectOptions
- // CacheConfig controls how/if bucket files are cached on the disk.
- CacheConfig CacheOptions
+ // CachePath controls where/if bucket files are cached on the disk.
+ CachePath string
}
type Ledger struct {
@@ -104,8 +108,16 @@ type Archive struct {
checkpointManager CheckpointManager
backend storage.Storage
- cache *ArchiveBucketCache
stats archiveStats
+
+ cache *archiveBucketCache
+}
+
+type archiveBucketCache struct {
+ fscache.Cache
+
+ path string
+ sizes sync.Map
}
func (arch *Archive) GetStats() []ArchiveStats {
@@ -119,8 +131,9 @@ func (arch *Archive) GetCheckpointManager() CheckpointManager {
func (a *Archive) GetPathHAS(path string) (HistoryArchiveState, error) {
var has HistoryArchiveState
rdr, err := a.backend.GetFile(path)
- // this is a query on the HA server state, not a data/bucket file download
- a.stats.incrementRequests()
+ a.stats.incrementDownloads()
+ // // this is a query on the HA server state, not a data/bucket file download
+ // a.stats.incrementRequests()
if err != nil {
return has, err
}
@@ -383,23 +396,79 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
}
func (a *Archive) cachedGet(pth string) (io.ReadCloser, error) {
- if a.cache != nil {
- rdr, foundInCache, err := a.cache.GetFile(pth, a.backend)
- if !foundInCache {
- a.stats.incrementDownloads()
- } else {
- a.stats.incrementCacheHits()
- }
- if err == nil {
- return rdr, nil
+ if a.cache == nil {
+ a.stats.incrementDownloads()
+ return a.backend.GetFile(pth)
+ }
+
+ L := log.WithField("path", pth).WithField("cache", a.cache.path)
+
+ rdr, wrtr, err := a.cache.Get(pth)
+ if err != nil {
+ L.WithError(err).
+ WithField("remove", a.cache.Remove(pth)).
+ Warn("On-disk cache retrieval failed")
+ a.stats.incrementDownloads()
+ return a.backend.GetFile(pth)
+ }
+
+ // If a NEW key is being retrieved, it returns a writer to which
+ // you're expected to write your upstream as well as a reader that
+ // will read directly from it.
+ if wrtr != nil {
+ log.WithField("path", pth).Info("Caching file...")
+ a.stats.incrementDownloads()
+ upstreamReader, err := a.backend.GetFile(pth)
+ if err != nil {
+ writeErr := wrtr.Close()
+ readErr := rdr.Close()
+ removeErr := a.cache.Remove(pth)
+ // Execution order isn't guaranteed w/in a function call expression
+ // so we close them with explicit order first.
+ L.WithError(err).WithFields(log.Fields{
+ "write-close": writeErr,
+ "read-close": readErr,
+ "cache-rm": removeErr,
+ }).Warn("Download failed, purging from cache")
+ return nil, err
}
- // If there's an error, retry with the uncached backend.
- a.cache.Evict(pth)
+ // Start a goroutine to slurp up the upstream and feed
+ // it directly to the cache.
+ go func() {
+ written, err := io.Copy(wrtr, upstreamReader)
+ writeErr := wrtr.Close()
+ readErr := upstreamReader.Close()
+ fields := log.Fields{
+ "wr-close": writeErr,
+ "rd-close": readErr,
+ }
+
+ if err != nil {
+ L.WithFields(fields).WithError(err).
+ Warn("Failed to download and cache file")
+
+ // Removal must happen *after* handles close.
+ if removalErr := a.cache.Remove(pth); removalErr != nil {
+ L.WithError(removalErr).Warn("Removing cached file failed")
+ }
+ } else {
+ L.WithFields(fields).Infof("Cached %dKiB file", written/1024)
+
+ // Track how much bandwidth we've saved from caching by saving
+ // the size of the file we just downloaded.
+ a.cache.sizes.Store(pth, written)
+ }
+ }()
+ } else {
+ // Best-effort check to track bandwidth metrics
+ if written, found := a.cache.sizes.Load(pth); found {
+ a.stats.incrementCacheBandwidth(written.(int64))
+ }
+ a.stats.incrementCacheHits()
}
- a.stats.incrementDownloads()
- return a.backend.GetFile(pth)
+ return rdr, nil
}
func (a *Archive) cachedExists(pth string) (bool, error) {
@@ -439,13 +508,30 @@ func Connect(u string, opts ArchiveOptions) (*Archive, error) {
return &arch, err
}
- if opts.CacheConfig.Cache {
- cache, innerErr := MakeArchiveBucketCache(opts.CacheConfig)
- if innerErr != nil {
- return &arch, innerErr
+ if opts.CachePath != "" {
+ // Set up a <= ~10GiB LRU cache for history archives files
+ haunter := fscache.NewLRUHaunterStrategy(
+ fscache.NewLRUHaunter(0, 10<<30, time.Minute /* frequency check */),
+ )
+
+ // Wipe any existing cache on startup
+ os.RemoveAll(opts.CachePath)
+ fs, err := fscache.NewFs(opts.CachePath, 0755 /* drwxr-xr-x */)
+
+ if err != nil {
+ return &arch, errors.Wrapf(err,
+ "creating cache at '%s' with mode 0755 failed",
+ opts.CachePath)
+ }
+
+ cache, err := fscache.NewCacheWithHaunter(fs, haunter)
+ if err != nil {
+ return &arch, errors.Wrapf(err,
+ "creating cache at '%s' failed",
+ opts.CachePath)
}
- arch.cache = cache
+ arch.cache = &archiveBucketCache{cache, opts.CachePath, sync.Map{}}
}
arch.stats = archiveStats{backendName: u}
@@ -467,6 +553,8 @@ func ConnectBackend(u string, opts storage.ConnectOptions) (storage.Storage, err
if parsed.Scheme == "mock" {
backend = makeMockBackend()
+ } else if parsed.Scheme == "fmock" {
+ backend = makeFailingMockBackend()
} else {
backend, err = storage.ConnectBackend(u, opts)
}
diff --git a/historyarchive/archive_pool.go b/historyarchive/archive_pool.go
index 4cb5483f63..432e2af920 100644
--- a/historyarchive/archive_pool.go
+++ b/historyarchive/archive_pool.go
@@ -31,11 +31,7 @@ func NewArchivePool(archiveURLs []string, opts ArchiveOptions) (ArchivePool, err
// Try connecting to all of the listed archives, but only store valid ones.
var validArchives ArchivePool
for _, url := range archiveURLs {
- archive, err := Connect(
- url,
- opts,
- )
-
+ archive, err := Connect(url, opts)
if err != nil {
lastErr = errors.Wrapf(err, "Error connecting to history archive (%s)", url)
continue
diff --git a/historyarchive/archive_test.go b/historyarchive/archive_test.go
index de34c36f68..e5be8febbb 100644
--- a/historyarchive/archive_test.go
+++ b/historyarchive/archive_test.go
@@ -18,13 +18,18 @@ import (
"os"
"path/filepath"
"strings"
+ "sync"
"testing"
+ "time"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
+var cachePath = filepath.Join(os.TempDir(), "history-archive-test-cache")
+
func GetTestS3Archive() *Archive {
mx := big.NewInt(0xffffffff)
r, e := rand.Int(rand.Reader, mx)
@@ -49,13 +54,10 @@ func GetTestS3Archive() *Archive {
}
func GetTestMockArchive() *Archive {
- return MustConnect("mock://test",
- ArchiveOptions{CheckpointFrequency: 64,
- CacheConfig: CacheOptions{
- Cache: true,
- Path: filepath.Join(os.TempDir(), "history-archive-test-cache"),
- MaxFiles: 5,
- }})
+ return MustConnect("mock://test", ArchiveOptions{
+ CheckpointFrequency: 64,
+ CachePath: cachePath,
+ })
}
var tmpdirs []string
@@ -563,7 +565,95 @@ func TestGetLedgers(t *testing.T) {
assert.Equal(t, uint32(1), archive.GetStats()[0].GetRequests())
assert.Equal(t, uint32(0), archive.GetStats()[0].GetDownloads())
assert.EqualError(t, err, "checkpoint 1023 is not published")
+ ledgerHeaders, transactions, results := makeFakeArchive(t, archive)
+
+ stats := archive.GetStats()[0]
+ ledgers, err := archive.GetLedgers(1000, 1002)
+
+ assert.NoError(t, err)
+ assert.Len(t, ledgers, 3)
+ // it started at 1, incurred 6 requests total: 3 queries + 3 downloads
+ assert.EqualValues(t, 7, stats.GetRequests())
+ // started 0, incurred 3 file downloads
+ assert.EqualValues(t, 3, stats.GetDownloads())
+ assert.EqualValues(t, 0, stats.GetCacheHits())
+ for i, seq := range []uint32{1000, 1001, 1002} {
+ ledger := ledgers[seq]
+ assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
+ assertXdrEquals(t, transactions[i], ledger.Transaction)
+ assertXdrEquals(t, results[i], ledger.TransactionResult)
+ }
+
+ // Repeat the same check but ensure the cache was used
+ ledgers, err = archive.GetLedgers(1000, 1002) // all cached
+ assert.NoError(t, err)
+ assert.Len(t, ledgers, 3)
+
+ // downloads should not change because of the cache
+ assert.EqualValues(t, 3, stats.GetDownloads())
+ // but requests increase because of 3 fetches to categories
+ assert.EqualValues(t, 10, stats.GetRequests())
+ assert.EqualValues(t, 3, stats.GetCacheHits())
+ for i, seq := range []uint32{1000, 1001, 1002} {
+ ledger := ledgers[seq]
+ assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
+ assertXdrEquals(t, transactions[i], ledger.Transaction)
+ assertXdrEquals(t, results[i], ledger.TransactionResult)
+ }
+
+ // remove the cached files without informing it and ensure it fills up again
+ require.NoError(t, os.RemoveAll(cachePath))
+ ledgers, err = archive.GetLedgers(1000, 1002) // uncached, refetch
+ assert.NoError(t, err)
+ assert.Len(t, ledgers, 3)
+
+ // downloads should increase again
+ assert.EqualValues(t, 6, stats.GetDownloads())
+ assert.EqualValues(t, 3, stats.GetCacheHits())
+}
+
+func TestStressfulGetLedgers(t *testing.T) {
+ archive := GetTestMockArchive()
+ ledgerHeaders, transactions, results := makeFakeArchive(t, archive)
+
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+
+ go func() {
+ time.Sleep(time.Millisecond) // encourage interleaved execution
+ ledgers, err := archive.GetLedgers(1000, 1002)
+ assert.NoError(t, err)
+ assert.Len(t, ledgers, 3)
+ for i, seq := range []uint32{1000, 1001, 1002} {
+ ledger := ledgers[seq]
+ assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
+ assertXdrEquals(t, transactions[i], ledger.Transaction)
+ assertXdrEquals(t, results[i], ledger.TransactionResult)
+ }
+
+ wg.Done()
+ }()
+ }
+
+ require.Eventually(t, func() bool { wg.Wait(); return true }, time.Minute, time.Second)
+}
+func TestCacheDeadlocks(t *testing.T) {
+ archive := MustConnect("fmock://test", ArchiveOptions{
+ CheckpointFrequency: 64,
+ CachePath: cachePath,
+ })
+ makeFakeArchive(t, archive)
+ _, err := archive.GetLedgers(1000, 1002)
+ require.Error(t, err)
+}
+
+func makeFakeArchive(t *testing.T, archive *Archive) (
+ []xdr.LedgerHeaderHistoryEntry,
+ []xdr.TransactionHistoryEntry,
+ []xdr.TransactionHistoryResultEntry,
+) {
ledgerHeaders := []xdr.LedgerHeaderHistoryEntry{
{
Hash: xdr.Hash{1},
@@ -646,36 +736,5 @@ func TestGetLedgers(t *testing.T) {
[]xdrEntry{results[0], results[1], results[2]},
)
- stats := archive.GetStats()[0]
- ledgers, err := archive.GetLedgers(1000, 1002)
-
- assert.NoError(t, err)
- assert.Len(t, ledgers, 3)
- // it started at 1, incurred 6 requests total, 3 queries, 3 downloads
- assert.EqualValues(t, 7, stats.GetRequests())
- // started 0, incurred 3 file downloads
- assert.EqualValues(t, 3, stats.GetDownloads())
- for i, seq := range []uint32{1000, 1001, 1002} {
- ledger := ledgers[seq]
- assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
- assertXdrEquals(t, transactions[i], ledger.Transaction)
- assertXdrEquals(t, results[i], ledger.TransactionResult)
- }
-
- // Repeat the same check but ensure the cache was used
- ledgers, err = archive.GetLedgers(1000, 1002) // all cached
- assert.NoError(t, err)
- assert.Len(t, ledgers, 3)
-
- // downloads should not change because of the cache
- assert.EqualValues(t, 3, stats.GetDownloads())
- // but requests increase because of 3 fetches to categories
- assert.EqualValues(t, 10, stats.GetRequests())
- assert.EqualValues(t, 3, stats.GetCacheHits())
- for i, seq := range []uint32{1000, 1001, 1002} {
- ledger := ledgers[seq]
- assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
- assertXdrEquals(t, transactions[i], ledger.Transaction)
- assertXdrEquals(t, results[i], ledger.TransactionResult)
- }
+ return ledgerHeaders, transactions, results
}
diff --git a/historyarchive/failing_mock_archive.go b/historyarchive/failing_mock_archive.go
new file mode 100644
index 0000000000..2ee09523de
--- /dev/null
+++ b/historyarchive/failing_mock_archive.go
@@ -0,0 +1,82 @@
+package historyarchive
+
+import (
+ "io"
+
+ "github.com/stellar/go/support/errors"
+ "github.com/stellar/go/support/storage"
+)
+
+// FailingMockArchiveBackend is a mocking backend that will fail only when you
+// try to read but otherwise behave like MockArchiveBackend.
+type FailingMockArchiveBackend struct {
+ files map[string][]byte
+}
+
+func (b *FailingMockArchiveBackend) Exists(pth string) (bool, error) {
+ _, ok := b.files[pth]
+ return ok, nil
+}
+
+func (b *FailingMockArchiveBackend) Size(pth string) (int64, error) {
+ f, ok := b.files[pth]
+ sz := int64(0)
+ if ok {
+ sz = int64(len(f))
+ }
+ return sz, nil
+}
+
+func (b *FailingMockArchiveBackend) GetFile(pth string) (io.ReadCloser, error) {
+ data, ok := b.files[pth]
+ if !ok {
+ return nil, errors.New("file does not exist")
+ }
+
+ fr := FakeReader{}
+ fr.data = make([]byte, len(data))
+ copy(fr.data[:], data[:])
+ return &fr, nil
+}
+
+func (b *FailingMockArchiveBackend) PutFile(pth string, in io.ReadCloser) error {
+ buf, e := io.ReadAll(in)
+ if e != nil {
+ return e
+ }
+ b.files[pth] = buf
+ return nil
+}
+
+func (b *FailingMockArchiveBackend) ListFiles(pth string) (chan string, chan error) {
+ return nil, nil
+}
+
+func (b *FailingMockArchiveBackend) CanListFiles() bool {
+ return false
+}
+
+func (b *FailingMockArchiveBackend) Close() error {
+ b.files = make(map[string][]byte)
+ return nil
+}
+
+func makeFailingMockBackend() storage.Storage {
+ b := new(FailingMockArchiveBackend)
+ b.files = make(map[string][]byte)
+ return b
+}
+
+type FakeReader struct {
+ data []byte
+}
+
+func (fr *FakeReader) Read(b []byte) (int, error) {
+ return 0, io.ErrClosedPipe
+}
+
+func (fr *FakeReader) Close() error {
+ return nil
+}
+
+var _ io.ReadCloser = &FakeReader{}
diff --git a/historyarchive/mocks.go b/historyarchive/mocks.go
index fe497ec36e..0138e66416 100644
--- a/historyarchive/mocks.go
+++ b/historyarchive/mocks.go
@@ -137,3 +137,7 @@ func (m *MockArchiveStats) GetCacheHits() uint32 {
a := m.Called()
return a.Get(0).(uint32)
}
+func (m *MockArchiveStats) GetCacheBandwidth() uint64 {
+ a := m.Called()
+ return a.Get(0).(uint64)
+}
diff --git a/historyarchive/stats.go b/historyarchive/stats.go
index c182853d1b..8f436cee84 100644
--- a/historyarchive/stats.go
+++ b/historyarchive/stats.go
@@ -8,6 +8,7 @@ type archiveStats struct {
fileDownloads atomic.Uint32
fileUploads atomic.Uint32
cacheHits atomic.Uint32
+ cacheBw atomic.Uint64
backendName string
}
@@ -16,6 +17,7 @@ type ArchiveStats interface {
GetDownloads() uint32
GetUploads() uint32
GetCacheHits() uint32
+ GetCacheBandwidth() uint64
GetBackendName() string
}
@@ -37,21 +39,25 @@ func (as *archiveStats) incrementCacheHits() {
as.cacheHits.Add(1)
}
+func (as *archiveStats) incrementCacheBandwidth(bytes int64) {
+ as.cacheBw.Add(uint64(bytes))
+}
+
func (as *archiveStats) GetRequests() uint32 {
return as.requests.Load()
}
-
func (as *archiveStats) GetDownloads() uint32 {
return as.fileDownloads.Load()
}
-
func (as *archiveStats) GetUploads() uint32 {
return as.fileUploads.Load()
}
-
func (as *archiveStats) GetBackendName() string {
return as.backendName
}
func (as *archiveStats) GetCacheHits() uint32 {
return as.cacheHits.Load()
}
+func (as *archiveStats) GetCacheBandwidth() uint64 {
+ return as.cacheBw.Load()
+}
diff --git a/historyarchive/xdrstream.go b/historyarchive/xdrstream.go
index de8efc3bb6..313c600f8b 100644
--- a/historyarchive/xdrstream.go
+++ b/historyarchive/xdrstream.go
@@ -107,7 +107,7 @@ func (x *XdrStream) ExpectedHash() ([sha256.Size]byte, bool) {
func (x *XdrStream) Close() error {
if x.validateHash {
// Read all remaining data from rdr
- _, err := io.Copy(ioutil.Discard, x.rdr)
+ _, err := io.Copy(io.Discard, x.rdr)
if err != nil {
// close the internal readers to avoid memory leaks
x.closeReaders()
diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md
index fc2b046a57..9e3a87135d 100644
--- a/services/horizon/CHANGELOG.md
+++ b/services/horizon/CHANGELOG.md
@@ -3,14 +3,17 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).
-## Unreleased
+## 2.28.2
+
+### Fixed
+- History archive caching would cause file corruption in certain environments ([5197](https://github.com/stellar/go/pull/5197))
+- Server error in claimable balance API when claimant, asset and cursor query params are supplied ([5200](https://github.com/stellar/go/pull/5200))
## 2.28.1
### Fixed
- Submitting transaction with a future gapped sequence number greater than 1 past current source account sequence, may result in delayed 60s timeout response, rather than expected HTTP 400 error response with `result_codes: {transaction: "tx_bad_seq"}` ([5191](https://github.com/stellar/go/pull/5191))
-
## 2.28.0
### Fixed
@@ -30,21 +33,21 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
* API `Operation` model for `InvokeHostFunctionOp` type, will have empty `asset_balance_changes`
### Breaking Changes
-- Deprecation of legacy, non-captive core ingestion([5158](https://github.com/stellar/go/pull/5158)):
+- Deprecation of legacy, non-captive core ingestion([5158](https://github.com/stellar/go/pull/5158)):
* removed configuration flags `--stellar-core-url-db`, `--cursor-name` `--skip-cursor-update`, they are no longer usable.
* removed automatic updating of core cursor from ingestion background processing.
- **Note** for upgrading on existing horizon deployments - Since horizon will no longer maintain advancement of this cursor on core, it may require manual removal of the cursor from the core process that your horizon was using for captive core, otherwise that core process may un-necessarily retain older data in buckets on disk up to the last cursor ledger sequence set by prior horizon release.
-
+ **Note** for upgrading on existing horizon deployments - Since horizon will no longer maintain advancement of this cursor on core, it may require manual removal of the cursor from the core process that your horizon was using for captive core, otherwise that core process may un-necessarily retain older data in buckets on disk up to the last cursor ledger sequence set by prior horizon release.
+
The captive core process to check and verify presence of cursor usage is determined by the horizon deployment, if `NETWORK` is present, or `STELLAR_CORE_URL` is present or `CAPTIVE-CORE-HTTP-PORT` is present and set to non-zero value, or `CAPTIVE-CORE_CONFIG_PATH` is used and the toml has `HTTP_PORT` set to non-zero and `PUBLIC_HTTP_PORT` is not set to false, then it is recommended to perform the following preventative measure on the machine hosting horizon after upgraded to 2.28.0 and process restarted:
```
$ curl http:///getcursor
# If there are no cursors reported, done, no need for any action
- # If any horizon cursors exist they need to be dropped by id.
- # By default horizon sets cursor id to "HORIZON" but if it was customized
+ # If any horizon cursors exist they need to be dropped by id.
+ # By default horizon sets cursor id to "HORIZON" but if it was customized
# using the --cursor-name flag the id might be different
$ curl http:///dropcursor?id=
- ```
-
+ ```
+
## 2.27.0
diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go
index 07bbf975fa..7d14ca314e 100644
--- a/services/horizon/cmd/db.go
+++ b/services/horizon/cmd/db.go
@@ -407,6 +407,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
ingestConfig := ingest.Config{
NetworkPassphrase: config.NetworkPassphrase,
HistoryArchiveURLs: config.HistoryArchiveURLs,
+ HistoryArchiveCaching: config.HistoryArchiveCaching,
CheckpointFrequency: config.CheckpointFrequency,
ReingestEnabled: true,
MaxReingestRetries: int(retries),
diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go
index 3833dba7fd..18452dc74a 100644
--- a/services/horizon/cmd/ingest.go
+++ b/services/horizon/cmd/ingest.go
@@ -128,6 +128,7 @@ var ingestVerifyRangeCmd = &cobra.Command{
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
+ HistoryArchiveCaching: globalConfig.HistoryArchiveCaching,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
CheckpointFrequency: globalConfig.CheckpointFrequency,
@@ -210,6 +211,7 @@ var ingestStressTestCmd = &cobra.Command{
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
+ HistoryArchiveCaching: globalConfig.HistoryArchiveCaching,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
@@ -349,6 +351,7 @@ var ingestBuildStateCmd = &cobra.Command{
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
+ HistoryArchiveCaching: globalConfig.HistoryArchiveCaching,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
CheckpointFrequency: globalConfig.CheckpointFrequency,
diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go
index 8fb31075b8..54f843b810 100644
--- a/services/horizon/internal/config.go
+++ b/services/horizon/internal/config.go
@@ -27,6 +27,7 @@ type Config struct {
CaptiveCoreStoragePath string
CaptiveCoreReuseStoragePath bool
CaptiveCoreConfigUseDB bool
+ HistoryArchiveCaching bool
StellarCoreURL string
diff --git a/services/horizon/internal/db2/history/claimable_balances.go b/services/horizon/internal/db2/history/claimable_balances.go
index 5490bef11c..d45780a4c0 100644
--- a/services/horizon/internal/db2/history/claimable_balances.go
+++ b/services/horizon/internal/db2/history/claimable_balances.go
@@ -67,17 +67,17 @@ func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, lCursor int64, rC
case db2.OrderAscending:
if hasPagedLimit {
sql = sql.
- Where(sq.Expr("(last_modified_ledger, id) > (?, ?)", lCursor, rCursor))
+ Where(sq.Expr("(cb.last_modified_ledger, cb.id) > (?, ?)", lCursor, rCursor))
}
- sql = sql.OrderBy("last_modified_ledger asc, id asc")
+ sql = sql.OrderBy("cb.last_modified_ledger asc, cb.id asc")
case db2.OrderDescending:
if hasPagedLimit {
sql = sql.
- Where(sq.Expr("(last_modified_ledger, id) < (?, ?)", lCursor, rCursor))
+ Where(sq.Expr("(cb.last_modified_ledger, cb.id) < (?, ?)", lCursor, rCursor))
}
- sql = sql.OrderBy("last_modified_ledger desc, id desc")
+ sql = sql.OrderBy("cb.last_modified_ledger desc, cb.id desc")
default:
return sql, errors.Errorf("invalid order: %s", order)
}
diff --git a/services/horizon/internal/db2/history/claimable_balances_test.go b/services/horizon/internal/db2/history/claimable_balances_test.go
index ca32975c62..769ab3bc13 100644
--- a/services/horizon/internal/db2/history/claimable_balances_test.go
+++ b/services/horizon/internal/db2/history/claimable_balances_test.go
@@ -219,6 +219,188 @@ func TestFindClaimableBalancesByDestination(t *testing.T) {
tt.Assert.Len(cbs, 1)
}
+func TestFindClaimableBalancesByCursor(t *testing.T) {
+ tt := test.Start(t)
+ defer tt.Finish()
+ test.ResetHorizonDB(t, tt.HorizonDB)
+ q := &Q{tt.HorizonSession()}
+
+ tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{}))
+ defer func() {
+ _ = q.Rollback()
+ }()
+
+ balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder()
+ claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder()
+
+ dest1 := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
+ dest2 := "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"
+
+ sponsor1 := "GA25GQLHJU3LPEJXEIAXK23AWEA5GWDUGRSHTQHDFT6HXHVMRULSQJUJ"
+ sponsor2 := "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"
+
+ asset := xdr.MustNewCreditAsset("USD", dest1)
+ balanceID := xdr.ClaimableBalanceId{
+ Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
+ V0: &xdr.Hash{1, 2, 3},
+ }
+ id, err := xdr.MarshalHex(balanceID)
+ tt.Assert.NoError(err)
+ cBalance := ClaimableBalance{
+ BalanceID: id,
+ Claimants: []Claimant{
+ {
+ Destination: dest1,
+ Predicate: xdr.ClaimPredicate{
+ Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
+ },
+ },
+ },
+ Asset: asset,
+ LastModifiedLedger: 123,
+ Amount: 10,
+ Sponsor: null.StringFrom(sponsor1),
+ }
+
+ tt.Assert.NoError(balanceInsertBuilder.Add(cBalance))
+ tt.Assert.NoError(insertClaimants(claimantsInsertBuilder, cBalance))
+
+ balanceID = xdr.ClaimableBalanceId{
+ Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
+ V0: &xdr.Hash{3, 2, 1},
+ }
+ id, err = xdr.MarshalHex(balanceID)
+ tt.Assert.NoError(err)
+ cBalance = ClaimableBalance{
+ BalanceID: id,
+ Claimants: []Claimant{
+ {
+ Destination: dest1,
+ Predicate: xdr.ClaimPredicate{
+ Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
+ },
+ },
+ {
+ Destination: dest2,
+ Predicate: xdr.ClaimPredicate{
+ Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional,
+ },
+ },
+ },
+ Asset: asset,
+ LastModifiedLedger: 300,
+ Amount: 10,
+ Sponsor: null.StringFrom(sponsor2),
+ }
+
+ tt.Assert.NoError(balanceInsertBuilder.Add(cBalance))
+ tt.Assert.NoError(insertClaimants(claimantsInsertBuilder, cBalance))
+
+ tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx))
+ tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx))
+
+ query := ClaimableBalancesQuery{
+ PageQuery: db2.MustPageQuery("", false, "", 10),
+ }
+
+ cbs, err := q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 2)
+
+ order := "" // default is "asc"
+ // this validates the cb query with claimant and cb.id/ledger cursor parameters
+ query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10)
+ query.Claimant = xdr.MustAddressPtr(dest1)
+ cbs, err = q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 1)
+ tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
+
+ // this validates the cb query with claimant, asset, sponsor and cb.id/ledger cursor parameters
+ query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10)
+ query.Claimant = xdr.MustAddressPtr(dest1)
+ query.Asset = &asset
+ query.Sponsor = xdr.MustAddressPtr(sponsor2)
+
+ cbs, err = q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 1)
+ tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
+
+ // this validates the cb query with no claimant, asset, sponsor and cb.id/ledger cursor parameters
+ query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10)
+ query.Claimant = nil
+ query.Asset = &asset
+ query.Sponsor = xdr.MustAddressPtr(sponsor2)
+
+ cbs, err = q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 1)
+ tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
+
+ order = "desc"
+ // claimant and cb.id/ledger cursor parameters
+ query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 301, cbs[0].BalanceID), false, order, 10)
+ query.Claimant = xdr.MustAddressPtr(dest1)
+ cbs, err = q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 1)
+ tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
+
+ // claimant, asset, sponsor and cb.id/ledger cursor parameters
+ query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 301, cbs[0].BalanceID), false, order, 10)
+ query.Claimant = xdr.MustAddressPtr(dest1)
+ query.Asset = &asset
+ query.Sponsor = xdr.MustAddressPtr(sponsor2)
+
+ cbs, err = q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 1)
+ tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
+
+ // no claimant, asset, sponsor and cb.id/ledger cursor parameters
+ query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 301, cbs[0].BalanceID), false, order, 10)
+ query.Claimant = nil
+ query.Asset = &asset
+ query.Sponsor = xdr.MustAddressPtr(sponsor2)
+
+ cbs, err = q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 1)
+ tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
+
+ order = "asc"
+ // claimant and cb.id/ledger cursor parameters
+ query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10)
+ query.Claimant = xdr.MustAddressPtr(dest1)
+ cbs, err = q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 1)
+ tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
+
+ // claimant, asset, sponsor and cb.id/ledger cursor parameters
+ query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10)
+ query.Claimant = xdr.MustAddressPtr(dest1)
+ query.Asset = &asset
+ query.Sponsor = xdr.MustAddressPtr(sponsor2)
+
+ cbs, err = q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 1)
+ tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
+
+ // no claimant, asset, sponsor and cb.id/ledger cursor parameters
+ query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10)
+ query.Claimant = nil
+ query.Asset = &asset
+ query.Sponsor = xdr.MustAddressPtr(sponsor2)
+
+ cbs, err = q.GetClaimableBalances(tt.Ctx, query)
+ tt.Assert.NoError(err)
+ tt.Assert.Len(cbs, 1)
+ tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination)
+}
+
func insertClaimants(claimantsInsertBuilder ClaimableBalanceClaimantBatchInsertBuilder, cBalance ClaimableBalance) error {
for _, claimant := range cBalance.Claimants {
claimant := ClaimableBalanceClaimant{
diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go
index eb229c65b2..87deb28c48 100644
--- a/services/horizon/internal/flags.go
+++ b/services/horizon/internal/flags.go
@@ -51,6 +51,9 @@ const (
NetworkPassphraseFlagName = "network-passphrase"
// HistoryArchiveURLsFlagName is the command line flag for specifying the history archive URLs
HistoryArchiveURLsFlagName = "history-archive-urls"
+ // HistoryArchiveCaching is the flag for controlling whether or not there's
+ // an on-disk cache for history archive downloads
+ HistoryArchiveCachingFlagName = "history-archive-caching"
// NetworkFlagName is the command line flag for specifying the "network"
NetworkFlagName = "network"
// EnableIngestionFilteringFlagName is the command line flag for enabling the experimental ingestion filtering feature (now enabled by default)
@@ -236,11 +239,7 @@ func Flags() (*Config, support.ConfigOptions) {
OptType: types.Bool,
FlagDefault: true,
Required: false,
- Usage: `when enabled, Horizon ingestion will instruct the captive
- core invocation to use an external db url for ledger states rather than in memory(RAM).\n
- Will result in several GB of space shifting out of RAM and to the external db persistence.\n
- The external db url is determined by the presence of DATABASE parameter in the captive-core-config-path or\n
- or if absent, the db will default to sqlite and the db file will be stored at location derived from captive-core-storage-path parameter.`,
+ Usage: `when enabled, Horizon ingestion will instruct the captive core invocation to use an external db url for ledger states rather than in memory(RAM). Will result in several GB of space shifting out of RAM and to the external db persistence. The external db url is determined by the presence of DATABASE parameter in the captive-core-config-path or if absent, the db will default to sqlite and the db file will be stored at location derived from captive-core-storage-path parameter.`,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetBool(opt.Name); val {
config.CaptiveCoreConfigUseDB = val
@@ -372,6 +371,14 @@ func Flags() (*Config, support.ConfigOptions) {
Usage: "comma-separated list of stellar history archives to connect with",
UsedInCommands: IngestionCommands,
},
+ &support.ConfigOption{
+ Name: HistoryArchiveCachingFlagName,
+ ConfigKey: &config.HistoryArchiveCaching,
+ OptType: types.Bool,
+ FlagDefault: true,
+ Usage: "adds caching for history archive downloads (requires an add'l 10GB of disk space on mainnet)",
+ UsedInCommands: IngestionCommands,
+ },
&support.ConfigOption{
Name: "port",
ConfigKey: &config.Port,
diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go
index e0c667b033..892868e5b9 100644
--- a/services/horizon/internal/ingest/fsm.go
+++ b/services/horizon/internal/ingest/fsm.go
@@ -595,6 +595,11 @@ func addHistoryArchiveStatsMetrics(s *system, stats []historyarchive.ArchiveStat
"source": historyServerStat.GetBackendName(),
"type": "cache_hits"}).
Add(float64(historyServerStat.GetCacheHits()))
+ s.Metrics().HistoryArchiveStatsCounter.
+ With(prometheus.Labels{
+ "source": historyServerStat.GetBackendName(),
+ "type": "cache_bandwidth"}).
+ Add(float64(historyServerStat.GetCacheBandwidth()))
}
}
diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go
index 2726e02484..a8b43dcceb 100644
--- a/services/horizon/internal/ingest/main.go
+++ b/services/horizon/internal/ingest/main.go
@@ -88,8 +88,9 @@ type Config struct {
CaptiveCoreConfigUseDB bool
NetworkPassphrase string
- HistorySession db.SessionInterface
- HistoryArchiveURLs []string
+ HistorySession db.SessionInterface
+ HistoryArchiveURLs []string
+ HistoryArchiveCaching bool
DisableStateVerification bool
EnableReapLookupTables bool
@@ -223,19 +224,19 @@ type system struct {
func NewSystem(config Config) (System, error) {
ctx, cancel := context.WithCancel(context.Background())
+ cachingPath := ""
+ if config.HistoryArchiveCaching {
+ cachingPath = path.Join(config.CaptiveCoreStoragePath, "bucket-cache")
+ }
+
archive, err := historyarchive.NewArchivePool(
config.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
+ CachePath: cachingPath,
ConnectOptions: storage.ConnectOptions{
- Context: ctx,
- UserAgent: fmt.Sprintf("horizon/%s golang/%s", apkg.Version(), runtime.Version())},
- CacheConfig: historyarchive.CacheOptions{
- Cache: true,
- Path: path.Join(config.CaptiveCoreStoragePath, "bucket-cache"),
- Log: log.WithField("subservice", "ha-cache"),
- MaxFiles: 150,
+ UserAgent: fmt.Sprintf("horizon/%s golang/%s", apkg.Version(), runtime.Version()),
},
},
)
diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go
index f1f8b2ce2a..985391883f 100644
--- a/services/horizon/internal/ingest/resume_state_test.go
+++ b/services/horizon/internal/ingest/resume_state_test.go
@@ -267,6 +267,7 @@ func (s *ResumeTestTestSuite) mockSuccessfulIngestion() {
mockStats.On("GetRequests").Return(uint32(0))
mockStats.On("GetUploads").Return(uint32(0))
mockStats.On("GetCacheHits").Return(uint32(0))
+ mockStats.On("GetCacheBandwidth").Return(uint64(0))
s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{mockStats}).Once()
s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")).
@@ -384,6 +385,7 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() {
mockStats.On("GetRequests").Return(uint32(0))
mockStats.On("GetUploads").Return(uint32(0))
mockStats.On("GetCacheHits").Return(uint32(0))
+ mockStats.On("GetCacheBandwidth").Return(uint64(0))
s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{mockStats}).Once()
// Reap lookup tables not executed
@@ -434,6 +436,7 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() {
mockStats.On("GetRequests").Return(uint32(0))
mockStats.On("GetUploads").Return(uint32(0))
mockStats.On("GetCacheHits").Return(uint32(0))
+ mockStats.On("GetCacheBandwidth").Return(uint64(0))
s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{mockStats}).Once()
next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system)
diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go
index d4b34f9f4d..60ba7b6c2a 100644
--- a/services/horizon/internal/init.go
+++ b/services/horizon/internal/init.go
@@ -97,6 +97,7 @@ func initIngester(app *App) {
),
NetworkPassphrase: app.config.NetworkPassphrase,
HistoryArchiveURLs: app.config.HistoryArchiveURLs,
+ HistoryArchiveCaching: app.config.HistoryArchiveCaching,
CheckpointFrequency: app.config.CheckpointFrequency,
StellarCoreURL: app.config.StellarCoreURL,
CaptiveCoreBinaryPath: app.config.CaptiveCoreBinaryPath,