From 06668185fde71925a0d77a49aa42868b4541c836 Mon Sep 17 00:00:00 2001 From: George Date: Thu, 17 Nov 2022 09:41:47 -0800 Subject: [PATCH] services/horizon: Propogate *all* history archives through the ingestion interface. (#4687) --- services/horizon/CHANGELOG.md | 8 ++++++++ services/horizon/cmd/db.go | 2 +- services/horizon/cmd/ingest.go | 8 ++++---- .../horizon/internal/ingest/db_integration_test.go | 2 +- services/horizon/internal/ingest/main.go | 10 +++++----- services/horizon/internal/ingest/main_test.go | 2 +- services/horizon/internal/init.go | 7 ++----- 7 files changed, 22 insertions(+), 17 deletions(-) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 0091ee6634..566fa1046a 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -3,6 +3,14 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). + +## Unreleased + +### Fixes + +* The ingestion subsystem will now properly use a pool of history archives if more than one is provided ([#4687](https://github.com/stellar/go/pull/4687)). + + ## 2.22.1 **Upgrading to this version from <= v2.8.3 will trigger a state rebuild. During this process (which will take at least 10 minutes), Horizon will not ingest new ledgers.** diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index b29535c4b5..37a166533f 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -401,7 +401,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, - HistoryArchiveURL: config.HistoryArchiveURLs[0], + HistoryArchiveURLs: config.HistoryArchiveURLs, CheckpointFrequency: config.CheckpointFrequency, ReingestEnabled: true, MaxReingestRetries: int(retries), diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 2cae1fea99..c0fdaf7545 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -127,7 +127,7 @@ var ingestVerifyRangeCmd = &cobra.Command{ ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, HistorySession: horizonSession, - HistoryArchiveURL: config.HistoryArchiveURLs[0], + HistoryArchiveURLs: config.HistoryArchiveURLs, EnableCaptiveCore: config.EnableCaptiveCoreIngestion, CaptiveCoreBinaryPath: config.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: config.CaptiveCoreConfigUseDB, @@ -223,7 +223,7 @@ var ingestStressTestCmd = &cobra.Command{ ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, HistorySession: horizonSession, - HistoryArchiveURL: config.HistoryArchiveURLs[0], + HistoryArchiveURLs: config.HistoryArchiveURLs, EnableCaptiveCore: config.EnableCaptiveCoreIngestion, RoundingSlippageFilter: config.RoundingSlippageFilter, } @@ -314,7 +314,7 @@ var ingestInitGenesisStateCmd = &cobra.Command{ ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, HistorySession: horizonSession, - HistoryArchiveURL: config.HistoryArchiveURLs[0], + HistoryArchiveURLs: config.HistoryArchiveURLs, EnableCaptiveCore: config.EnableCaptiveCoreIngestion, CheckpointFrequency: config.CheckpointFrequency, RoundingSlippageFilter: config.RoundingSlippageFilter, @@ -391,7 +391,7 @@ var ingestBuildStateCmd = &cobra.Command{ ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, HistorySession: horizonSession, - HistoryArchiveURL: config.HistoryArchiveURLs[0], + HistoryArchiveURLs: config.HistoryArchiveURLs, EnableCaptiveCore: config.EnableCaptiveCoreIngestion, CaptiveCoreBinaryPath: config.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: config.CaptiveCoreConfigUseDB, diff --git a/services/horizon/internal/ingest/db_integration_test.go b/services/horizon/internal/ingest/db_integration_test.go index 2857a277ef..86576db137 100644 --- a/services/horizon/internal/ingest/db_integration_test.go +++ b/services/horizon/internal/ingest/db_integration_test.go @@ -83,7 +83,7 @@ func (s *DBTestSuite) SetupTest() { sIface, err := NewSystem(Config{ CoreSession: s.tt.CoreSession(), HistorySession: s.tt.HorizonSession(), - HistoryArchiveURL: "http://ignore.test", + HistoryArchiveURLs: []string{"http://ignore.test"}, DisableStateVerification: false, CheckpointFrequency: 64, }) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 4f1de4a2d2..465965def3 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -81,8 +81,8 @@ type Config struct { RemoteCaptiveCoreURL string NetworkPassphrase string - HistorySession db.SessionInterface - HistoryArchiveURL string + HistorySession db.SessionInterface + HistoryArchiveURLs []string DisableStateVerification bool EnableReapLookupTables bool @@ -214,8 +214,8 @@ type system struct { func NewSystem(config Config) (System, error) { ctx, cancel := context.WithCancel(context.Background()) - archive, err := historyarchive.Connect( - config.HistoryArchiveURL, + archive, err := historyarchive.NewArchivePool( + config.HistoryArchiveURLs, historyarchive.ConnectOptions{ Context: ctx, NetworkPassphrase: config.NetworkPassphrase, @@ -245,7 +245,7 @@ func NewSystem(config Config) (System, error) { UseDB: config.CaptiveCoreConfigUseDB, Toml: config.CaptiveCoreToml, NetworkPassphrase: config.NetworkPassphrase, - HistoryArchiveURLs: []string{config.HistoryArchiveURL}, + HistoryArchiveURLs: config.HistoryArchiveURLs, CheckpointFrequency: config.CheckpointFrequency, LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession), Log: logger, diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 0ee2a80bbc..c8e72022f4 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -84,7 +84,7 @@ func TestNewSystem(t *testing.T) { CoreSession: &db.Session{DB: &sqlx.DB{}}, HistorySession: &db.Session{DB: &sqlx.DB{}}, DisableStateVerification: true, - HistoryArchiveURL: "https://history.stellar.org/prd/core-live/core_live_001", + HistoryArchiveURLs: []string{"https://history.stellar.org/prd/core-live/core_live_001"}, CheckpointFrequency: 64, } diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 4150a1e2a7..33cee8ee7c 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -102,11 +102,8 @@ func initIngester(app *App) { HistorySession: mustNewDBSession( db.IngestSubservice, app.config.DatabaseURL, ingest.MaxDBConnections, ingest.MaxDBConnections, app.prometheusRegistry, ), - NetworkPassphrase: app.config.NetworkPassphrase, - // TODO: - // Use the first archive for now. We don't have a mechanism to - // use multiple archives at the same time currently. - HistoryArchiveURL: app.config.HistoryArchiveURLs[0], + NetworkPassphrase: app.config.NetworkPassphrase, + HistoryArchiveURLs: app.config.HistoryArchiveURLs, CheckpointFrequency: app.config.CheckpointFrequency, StellarCoreURL: app.config.StellarCoreURL, StellarCoreCursor: app.config.CursorName,