From b7fde588a61abdcb52b13b1b802b3e827202013b Mon Sep 17 00:00:00 2001 From: Paul Bellamy Date: Thu, 17 Feb 2022 22:58:12 +0000 Subject: [PATCH] services/horizon: Allow captive core to run with sqlite database (#4092) --- .circleci/config.yml | 13 +++ ingest/ledgerbackend/captive_core_backend.go | 42 ++++---- .../captive_core_backend_test.go | 7 +- ingest/ledgerbackend/stellar_core_runner.go | 58 ++++++++--- .../testdata/expected-offline-core.cfg | 1 + .../invalid-captive-core-database-field.cfg | 12 +++ ingest/ledgerbackend/toml.go | 12 +++ ingest/ledgerbackend/toml_test.go | 96 +++++++++++++++++++ integration.sh | 1 + services/horizon/CHANGELOG.md | 2 + services/horizon/cmd/db.go | 1 + services/horizon/cmd/ingest.go | 3 + services/horizon/internal/app.go | 5 - services/horizon/internal/config.go | 1 + .../internal/db2/history/ledger_test.go | 18 ++-- services/horizon/internal/flags.go | 24 +++++ services/horizon/internal/ingest/main.go | 3 +- services/horizon/internal/init.go | 1 + .../horizon/internal/integration/db_test.go | 2 + .../internal/test/integration/integration.go | 13 ++- 20 files changed, 262 insertions(+), 53 deletions(-) create mode 100644 ingest/ledgerbackend/testdata/invalid-captive-core-database-field.cfg diff --git a/.circleci/config.yml b/.circleci/config.yml index 50f99917e1..9ac4b8c1f8 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -430,6 +430,9 @@ jobs: enable-captive-core: type: boolean default: false + enable-captive-core-remote-storage: + type: boolean + default: false working_directory: ~/go/src/github.com/stellar/go machine: image: ubuntu-2004:202010-01 @@ -452,6 +455,12 @@ jobs: - run: name: Setting Captive Core env variables command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE=true" >> $BASH_ENV + - when: + condition: <> + steps: + - run: + name: Setting Captive Core Remote Storage env variable + command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE_USE_DB=true" >> $BASH_ENV - run: name: Run Horizon integration tests <<#parameters.enable-captive-core>>(With captive core)<> # Currently all integration tests are in a single directory. @@ -480,6 +489,10 @@ workflows: - test_horizon_integration: name: test_horizon_integration_with_captive_core enable-captive-core: true + - test_horizon_integration: + name: test_horizon_integration_with_captive_core_remote_storage + enable-captive-core: true + enable-captive-core-remote-storage: true - test_verify_range_docker_image: filters: # we use test_verify_range_docker_image with publish in master diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 22e5526f84..1154096a8f 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -124,6 +124,12 @@ type CaptiveCoreConfig struct { // stored. We always append /captive-core to this directory, since we clean // it up entirely on shutdown. StoragePath string + + // UseDB, when true, instructs the core invocation to use an external db url + // for ledger states rather than in memory(RAM). The external db url is determined by the presence + // of DATABASE parameter in the captive-core-config-path or if absent, the db will default to sqlite + // and the db file will be stored at location derived from StoragePath parameter. + UseDB bool } // NewCaptive returns a new CaptiveStellarCore instance. @@ -142,6 +148,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) { if parentCtx == nil { parentCtx = context.Background() } + var cancel context.CancelFunc config.Context, cancel = context.WithCancel(parentCtx) @@ -250,11 +257,8 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro var runner stellarCoreRunnerInterface if runner, err = c.stellarCoreRunnerFactory(stellarCoreRunnerModeOnline); err != nil { return errors.Wrap(err, "error creating stellar-core runner") - } else { - // only assign c.stellarCoreRunner if runner is not nil to avoid nil interface check - // see https://golang.org/doc/faq#nil_error - c.stellarCoreRunner = runner } + c.stellarCoreRunner = runner runFrom, ledgerHash, err := c.runFromParams(ctx, from) if err != nil { @@ -279,14 +283,15 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro } // runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash -func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, err error) { +func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (uint32, string, error) { + if from == 1 { // Trying to start-from 1 results in an error from Stellar-Core: // Target ledger 1 is not newer than last closed ledger 1 - nothing to do // TODO maybe we can fix it by generating 1st ledger meta // like GenesisLedgerStateReader? - err = errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2") - return + err := errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2") + return 0, "", err } if from <= 63 { @@ -298,26 +303,25 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru from = 3 } - runFrom = from - 1 + runFrom := from - 1 if c.ledgerHashStore != nil { var exists bool - ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, runFrom) + ledgerHash, exists, err := c.ledgerHashStore.GetLedgerHash(ctx, runFrom) if err != nil { err = errors.Wrapf(err, "error trying to read ledger hash %d", runFrom) - return + return 0, "", err } if exists { - return + return runFrom, ledgerHash, nil } } - ledgerHeader, err2 := c.archive.GetLedgerHeader(from) - if err2 != nil { - err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from) - return + ledgerHeader, err := c.archive.GetLedgerHeader(from) + if err != nil { + return 0, "", errors.Wrapf(err, "error trying to read ledger header %d from HAS", from) } - ledgerHash = hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:]) - return + ledgerHash := hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:]) + return runFrom, ledgerHash, nil } // nextExpectedSequence returns nextLedger (if currently set) or start of @@ -406,6 +410,10 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool { return false } + if exited, _ := c.stellarCoreRunner.getProcessExitError(); exited { + return false + } + lastLedger := uint32(0) if c.lastLedger != nil { lastLedger = *c.lastLedger diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index e011c6e4a2..f73341370f 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -316,6 +316,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("close").Return(fmt.Errorf("transient error")) + mockRunner.On("getProcessExitError").Return(false, nil) mockRunner.On("context").Return(ctx) captiveBackend := CaptiveStellarCore{ @@ -490,6 +491,7 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) mockRunner.On("context").Return(ctx) + mockRunner.On("getProcessExitError").Return(false, nil) mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -497,6 +499,7 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { Return(historyarchive.HistoryArchiveState{ CurrentLedger: uint32(129), }, nil) + mockArchive. On("GetLedgerHeader", uint32(65)). Return(xdr.LedgerHeaderHistoryEntry{}, nil) @@ -585,6 +588,7 @@ func TestCaptiveGetLedger(t *testing.T) { mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) mockRunner.On("context").Return(ctx) + mockRunner.On("getProcessExitError").Return(false, nil) mockArchive := &historyarchive.MockArchive{} mockArchive. @@ -1288,6 +1292,7 @@ func TestCaptiveRunFromParams(t *testing.T) { func TestCaptiveIsPrepared(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("context").Return(context.Background()).Maybe() + mockRunner.On("getProcessExitError").Return(false, nil) // c.prepared == nil captiveBackend := CaptiveStellarCore{ @@ -1351,6 +1356,7 @@ func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} ctx, cancel := context.WithCancel(context.Background()) mockRunner.On("context").Return(ctx).Maybe() + mockRunner.On("getProcessExitError").Return(false, nil) rang := UnboundedRange(100) captiveBackend := CaptiveStellarCore{ @@ -1447,5 +1453,4 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { mockRunner.AssertExpectations(t) mockArchive.AssertExpectations(t) - mockLedgerHashStore.AssertExpectations(t) } diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index df9d8fc271..b5b59b9b30 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -66,6 +66,7 @@ type stellarCoreRunner struct { processExitError error storagePath string + useDB bool nonce string log *log.Entry @@ -122,6 +123,7 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode) ctx: ctx, cancel: cancel, storagePath: fullStoragePath, + useDB: config.UseDB, mode: mode, nonce: fmt.Sprintf( "captive-stellar-core-%x", @@ -261,11 +263,22 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { } rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) - r.cmd = r.createCmd( - "catchup", rangeArg, - "--metadata-output-stream", r.getPipeName(), - "--in-memory", - ) + params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()} + + // horizon operator has specified to use external storage for captive core ledger state + // instruct captive core invocation to not use memory, and in that case + // cc will look at DATABASE property in cfg toml for the external storage source to use. + // when using external storage of ledgers, use new-db to first set the state of + // remote db storage to genesis to purge any prior state and reset. + if r.useDB { + if err := r.createCmd("new-db").Run(); err != nil { + return errors.Wrap(err, "error initializing core db") + } + } else { + params = append(params, "--in-memory") + } + + r.cmd = r.createCmd(params...) var err error r.pipe, err = r.start(r.cmd) @@ -304,13 +317,34 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { return errors.New("runner already started") } - r.cmd = r.createCmd( - "run", - "--in-memory", - "--start-at-ledger", fmt.Sprintf("%d", from), - "--start-at-hash", hash, - "--metadata-output-stream", r.getPipeName(), - ) + if r.useDB { + if err := r.createCmd("new-db").Run(); err != nil { + return errors.Wrap(err, "error initializing core db") + } + // Do a quick catch-up to set the LCL in core to be our expected starting + // point. + if from > 2 { + if err := r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil { + return errors.Wrap(err, "error runing stellar-core catchup") + } + } else if err := r.createCmd("catchup", "2/0").Run(); err != nil { + return errors.Wrap(err, "error runing stellar-core catchup") + } + + r.cmd = r.createCmd( + "run", + "--metadata-output-stream", + r.getPipeName(), + ) + } else { + r.cmd = r.createCmd( + "run", + "--in-memory", + "--start-at-ledger", fmt.Sprintf("%d", from), + "--start-at-hash", hash, + "--metadata-output-stream", r.getPipeName(), + ) + } var err error r.pipe, err = r.start(r.cmd) diff --git a/ingest/ledgerbackend/testdata/expected-offline-core.cfg b/ingest/ledgerbackend/testdata/expected-offline-core.cfg index 62aeeb6664..d6a80a628d 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-core.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-core.cfg @@ -1,4 +1,5 @@ # Generated file, do not edit +DATABASE = "sqlite3://stellar.db" FAILURE_SAFETY = 0 HTTP_PORT = 0 LOG_FILE_PATH = "" diff --git a/ingest/ledgerbackend/testdata/invalid-captive-core-database-field.cfg b/ingest/ledgerbackend/testdata/invalid-captive-core-database-field.cfg new file mode 100644 index 0000000000..438bd99120 --- /dev/null +++ b/ingest/ledgerbackend/testdata/invalid-captive-core-database-field.cfg @@ -0,0 +1,12 @@ +# DATABASE limited to only be sqlite:// protocol +DATABASE="postgres://mydb" + +[[HOME_DOMAINS]] +HOME_DOMAIN="testnet.stellar.org" +QUALITY="MEDIUM" + +[[VALIDATORS]] +NAME="sdf_testnet_1" +HOME_DOMAIN="testnet.stellar.org" +PUBLIC_KEY="GDKXE2OZMJIPOSLNA6N6F2BVCI3O777I2OOC4BV7VOYUEHYX7RTRYA7Y" +ADDRESS="localhost:123" diff --git a/ingest/ledgerbackend/toml.go b/ingest/ledgerbackend/toml.go index c6ac869a9f..f6fedfa4a4 100644 --- a/ingest/ledgerbackend/toml.go +++ b/ingest/ledgerbackend/toml.go @@ -61,6 +61,7 @@ type QuorumSet struct { } type captiveCoreTomlValues struct { + Database string `toml:"DATABASE,omitempty"` // we cannot omitempty because the empty string is a valid configuration for LOG_FILE_PATH // and the default is stellar-core.log LogFilePath string `toml:"LOG_FILE_PATH"` @@ -312,6 +313,8 @@ type CaptiveCoreTomlParams struct { LogPath *string // Strict is a flag which, if enabled, rejects Stellar Core toml fields which are not supported by captive core. Strict bool + // If true, specifies that captive core should be invoked with on-disk rather than in-memory option for ledger state + UseDB bool } // NewCaptiveCoreTomlFromFile constructs a new CaptiveCoreToml instance by merging configuration @@ -405,6 +408,11 @@ func (c *CaptiveCoreToml) CatchupToml() (*CaptiveCoreToml, error) { } func (c *CaptiveCoreToml) setDefaults(params CaptiveCoreTomlParams) { + + if params.UseDB && !c.tree.Has("DATABASE") { + c.Database = "sqlite3://stellar.db" + } + if !c.tree.Has("NETWORK_PASSPHRASE") { c.NetworkPassphrase = params.NetworkPassphrase } @@ -549,5 +557,9 @@ func (c *CaptiveCoreToml) validate(params CaptiveCoreTomlParams) error { names[v.Name] = true } + if len(c.Database) > 0 && !strings.HasPrefix(c.Database, "sqlite3://") { + return fmt.Errorf("invalid DATABASE parameter: %s, for captive core config, must be valid sqlite3 db url", c.Database) + } + return nil } diff --git a/ingest/ledgerbackend/toml_test.go b/ingest/ledgerbackend/toml_test.go index b8da4de03c..35a52d67dd 100644 --- a/ingest/ledgerbackend/toml_test.go +++ b/ingest/ledgerbackend/toml_test.go @@ -186,6 +186,15 @@ func TestCaptiveCoreTomlValidation(t *testing.T) { logPath: nil, expectedError: "could not unmarshal captive core toml: these fields are not supported by captive core: [\"CATCHUP_RECENT\"]", }, + { + name: "database field was invalid for captive core", + networkPassphrase: "Public Global Stellar Network ; September 2015", + appendPath: filepath.Join("testdata", "invalid-captive-core-database-field.cfg"), + httpPort: nil, + peerPort: nil, + logPath: nil, + expectedError: `invalid captive core toml: invalid DATABASE parameter: postgres://mydb, for captive core config, must be valid sqlite3 db url`, + }, { name: "unexpected BUCKET_DIR_PATH", appendPath: filepath.Join("testdata", "appendix-with-bucket-dir-path.cfg"), @@ -216,6 +225,7 @@ func TestGenerateConfig(t *testing.T) { httpPort *uint peerPort *uint logPath *string + useDB bool }{ { name: "offline config with no appendix", @@ -225,6 +235,7 @@ func TestGenerateConfig(t *testing.T) { httpPort: newUint(6789), peerPort: newUint(12345), logPath: nil, + useDB: true, }, { name: "offline config with no peer port", @@ -300,6 +311,7 @@ func TestGenerateConfig(t *testing.T) { PeerPort: testCase.peerPort, LogPath: testCase.logPath, Strict: false, + UseDB: testCase.useDB, } if testCase.appendPath != "" { captiveCoreToml, err = NewCaptiveCoreTomlFromFile(testCase.appendPath, params) @@ -318,3 +330,87 @@ func TestGenerateConfig(t *testing.T) { }) } } + +func TestExternalStorageConfigUsesDatabaseToml(t *testing.T) { + var err error + var captiveCoreToml *CaptiveCoreToml + httpPort := uint(8000) + peerPort := uint(8000) + logPath := "logPath" + + params := CaptiveCoreTomlParams{ + NetworkPassphrase: "Public Global Stellar Network ; September 2015", + HistoryArchiveURLs: []string{"http://localhost:1170"}, + HTTPPort: &httpPort, + PeerPort: &peerPort, + LogPath: &logPath, + Strict: false, + } + + captiveCoreToml, err = NewCaptiveCoreToml(params) + assert.NoError(t, err) + captiveCoreToml.Database = "sqlite3:///etc/defaults/stellar.db" + + configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModeOffline) + + assert.NoError(t, err) + toml := CaptiveCoreToml{} + toml.unmarshal(configBytes, true) + assert.Equal(t, toml.Database, "sqlite3:///etc/defaults/stellar.db") +} + +func TestDBConfigDefaultsToSqlite(t *testing.T) { + var err error + var captiveCoreToml *CaptiveCoreToml + httpPort := uint(8000) + peerPort := uint(8000) + logPath := "logPath" + + params := CaptiveCoreTomlParams{ + NetworkPassphrase: "Public Global Stellar Network ; September 2015", + HistoryArchiveURLs: []string{"http://localhost:1170"}, + HTTPPort: &httpPort, + PeerPort: &peerPort, + LogPath: &logPath, + Strict: false, + UseDB: true, + } + + captiveCoreToml, err = NewCaptiveCoreToml(params) + assert.NoError(t, err) + + configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModeOffline) + + assert.NoError(t, err) + toml := CaptiveCoreToml{} + toml.unmarshal(configBytes, true) + assert.Equal(t, toml.Database, "sqlite3://stellar.db") +} + +func TestNonDBConfigDoesNotUpdateDatabase(t *testing.T) { + var err error + var captiveCoreToml *CaptiveCoreToml + httpPort := uint(8000) + peerPort := uint(8000) + logPath := "logPath" + + // UseDB not set, which means it's false + params := CaptiveCoreTomlParams{ + NetworkPassphrase: "Public Global Stellar Network ; September 2015", + HistoryArchiveURLs: []string{"http://localhost:1170"}, + HTTPPort: &httpPort, + PeerPort: &peerPort, + LogPath: &logPath, + Strict: false, + } + + captiveCoreToml, err = NewCaptiveCoreToml(params) + assert.NoError(t, err) + + configBytes, err := generateConfig(captiveCoreToml, stellarCoreRunnerModeOffline) + + assert.NoError(t, err) + toml := CaptiveCoreToml{} + toml.unmarshal(configBytes, true) + assert.Equal(t, toml.Database, "") +} diff --git a/integration.sh b/integration.sh index 9ce0cb9b69..1bd78a5c16 100755 --- a/integration.sh +++ b/integration.sh @@ -6,6 +6,7 @@ cd "$(dirname "${BASH_SOURCE[0]}")" export HORIZON_INTEGRATION_TESTS=true export HORIZON_INTEGRATION_ENABLE_CAP_35=${HORIZON_INTEGRATION_ENABLE_CAP_35:-} export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE=${HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE:-} +export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE_USE_DB=${HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE_USE_DB:-} export CAPTIVE_CORE_BIN=${CAPTIVE_CORE_BIN:-/usr/bin/stellar-core} export TRACY_NO_INVARIANT_CHECK=1 # This fails on my dev vm. - Paul diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index c0d20f1f4f..87c2cd42b3 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -5,6 +5,8 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased +* New feature, enable captive core based ingestion to use remote db persistence rather than in-memory for ledger states. Essentially moves what would have been stored in RAM to the external db instead. Recent profiling on the two approaches shows an approximate space usgae of about 8GB for ledger states as of 02/2022 timeframe, but it will gradually continue to increase as more accounts/assets are added to network. Current horizon ingest behavior when configured for captive core usage will by default take this space from RAM, unless a new command line flag is specified `--captive-core-use-db=true`, which enables this space to be taken from the external db instead, and not RAM. The external db used is determined be setting `DATABASE` parameter in the captive core cfg/.toml file. If no value is set, then by default it uses sqlite and the db file is stored in `--captive-core-storage-path` - ([4092](https://github.com/stellar/go/pull/4092)) + ## v2.14.0 * Restart Stellar-Core when it's context is cancelled. ([4192](https://github.com/stellar/go/pull/4192)) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index d4f88b9aa7..37912f9e76 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -391,6 +391,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, ReingestRetryBackoffSeconds: int(retryBackoffSeconds), EnableCaptiveCore: config.EnableCaptiveCoreIngestion, CaptiveCoreBinaryPath: config.CaptiveCoreBinaryPath, + CaptiveCoreConfigUseDB: config.CaptiveCoreConfigUseDB, RemoteCaptiveCoreURL: config.RemoteCaptiveCoreURL, CaptiveCoreToml: config.CaptiveCoreToml, CaptiveCoreStoragePath: config.CaptiveCoreStoragePath, diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 2a7599921c..b0e0f19a10 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -109,6 +109,7 @@ var ingestVerifyRangeCmd = &cobra.Command{ HistoryArchiveURL: config.HistoryArchiveURLs[0], EnableCaptiveCore: config.EnableCaptiveCoreIngestion, CaptiveCoreBinaryPath: config.CaptiveCoreBinaryPath, + CaptiveCoreConfigUseDB: config.CaptiveCoreConfigUseDB, RemoteCaptiveCoreURL: config.RemoteCaptiveCoreURL, CheckpointFrequency: config.CheckpointFrequency, CaptiveCoreToml: config.CaptiveCoreToml, @@ -206,6 +207,7 @@ var ingestStressTestCmd = &cobra.Command{ if config.EnableCaptiveCoreIngestion { ingestConfig.CaptiveCoreBinaryPath = config.CaptiveCoreBinaryPath ingestConfig.RemoteCaptiveCoreURL = config.RemoteCaptiveCoreURL + ingestConfig.CaptiveCoreConfigUseDB = config.CaptiveCoreConfigUseDB } else { if config.StellarCoreDatabaseURL == "" { return fmt.Errorf("flag --%s cannot be empty", horizon.StellarCoreDBURLFlagName) @@ -295,6 +297,7 @@ var ingestInitGenesisStateCmd = &cobra.Command{ if config.EnableCaptiveCoreIngestion { ingestConfig.CaptiveCoreBinaryPath = config.CaptiveCoreBinaryPath + ingestConfig.CaptiveCoreConfigUseDB = config.CaptiveCoreConfigUseDB } else { if config.StellarCoreDatabaseURL == "" { return fmt.Errorf("flag --%s cannot be empty", horizon.StellarCoreDBURLFlagName) diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index f3fa68701f..178595c411 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -195,11 +195,6 @@ func (a *App) HistoryQ() *history.Q { return a.historyQ } -// Ingestion returns the ingestion system associated with this Horizon instance -func (a *App) Ingestion() ingest.System { - return a.ingester -} - // HorizonSession returns a new session that loads data from the horizon // database. func (a *App) HorizonSession() db.SessionInterface { diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index b9e69d3b36..19f979b84d 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -28,6 +28,7 @@ type Config struct { CaptiveCoreToml *ledgerbackend.CaptiveCoreToml CaptiveCoreStoragePath string CaptiveCoreReuseStoragePath bool + CaptiveCoreConfigUseDB bool StellarCoreDatabaseURL string StellarCoreURL string diff --git a/services/horizon/internal/db2/history/ledger_test.go b/services/horizon/internal/db2/history/ledger_test.go index 3b299ab8ba..4bf6d7b058 100644 --- a/services/horizon/internal/db2/history/ledger_test.go +++ b/services/horizon/internal/db2/history/ledger_test.go @@ -10,7 +10,6 @@ import ( "time" "github.com/guregu/null" - "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" @@ -61,10 +60,11 @@ func TestInsertLedger(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - ledgerHashStore := ledgerbackend.NewHorizonDBLedgerHashStore(tt.HorizonSession()) - _, exists, err := ledgerHashStore.GetLedgerHash(tt.Ctx, 100) - tt.Assert.NoError(err) - tt.Assert.False(exists) + var ledgerFromDB Ledger + var ledgerHeaderBase64 string + var err error + err = q.LedgerBySequence(tt.Ctx, &ledgerFromDB, 69859) + tt.Assert.Error(err) expectedLedger := Ledger{ Sequence: 69859, @@ -115,7 +115,7 @@ func TestInsertLedger(t *testing.T) { }, }, } - ledgerHeaderBase64, err := xdr.MarshalBase64(ledgerEntry.Header) + ledgerHeaderBase64, err = xdr.MarshalBase64(ledgerEntry.Header) tt.Assert.NoError(err) expectedLedger.LedgerHeaderXDR = null.NewString(ledgerHeaderBase64, true) @@ -130,7 +130,6 @@ func TestInsertLedger(t *testing.T) { tt.Assert.NoError(err) tt.Assert.Equal(rowsAffected, int64(1)) - var ledgerFromDB Ledger err = q.LedgerBySequence(tt.Ctx, &ledgerFromDB, 69859) tt.Assert.NoError(err) @@ -145,11 +144,6 @@ func TestInsertLedger(t *testing.T) { expectedLedger.ClosedAt = ledgerFromDB.ClosedAt tt.Assert.Equal(expectedLedger, ledgerFromDB) - - hash, exists, err := ledgerHashStore.GetLedgerHash(tt.Ctx, uint32(expectedLedger.Sequence)) - tt.Assert.NoError(err) - tt.Assert.True(exists) - tt.Assert.Equal(expectedLedger.LedgerHash, hash) } func insertLedgerWithSequence(tt *test.T, q *Q, seq uint32) { diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 2d7d214631..b774fd4189 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -36,6 +36,8 @@ const ( captiveCoreConfigAppendPathName = "captive-core-config-append-path" // CaptiveCoreConfigPathName is the command line flag for configuring the path to the captive core configuration file CaptiveCoreConfigPathName = "captive-core-config-path" + // captive-core-use-db is the command line flag for enabling captive core runtime to use an external db url connection rather than RAM for ledger states + CaptiveCoreConfigUseDB = "captive-core-use-db" captiveCoreMigrationHint = "If you are migrating from Horizon 1.x.y, start with the Migration Guide here: https://developers.stellar.org/docs/run-api-server/migrating/" ) @@ -170,6 +172,25 @@ func Flags() (*Config, support.ConfigOptions) { return nil }, }, + &support.ConfigOption{ + Name: CaptiveCoreConfigUseDB, + OptType: types.Bool, + FlagDefault: false, + Required: false, + Usage: `when enabled, Horizon ingestion will instruct the captive + core invocation to use an external db url for ledger states rather than in memory(RAM).\n + Will result in several GB of space shifting out of RAM and to the external db persistence.\n + The external db url is determined by the presence of DATABASE parameter in the captive-core-config-path or\n + or if absent, the db will default to sqlite and the db file will be stored at location derived from captive-core-storage-path parameter.`, + CustomSetValue: func(opt *support.ConfigOption) error { + if val := viper.GetBool(opt.Name); val { + config.CaptiveCoreConfigUseDB = val + config.CaptiveCoreTomlParams.UseDB = val + } + return nil + }, + ConfigKey: &config.CaptiveCoreConfigUseDB, + }, &support.ConfigOption{ Name: "enable-captive-core-ingestion", OptType: types.Bool, @@ -661,6 +682,9 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption if config.StellarCoreDatabaseURL != "" { return fmt.Errorf("Invalid config: --%s passed but --ingest not set. ", StellarCoreDBURLFlagName) } + if config.CaptiveCoreConfigUseDB { + return fmt.Errorf("Invalid config: --%s has been set, but --ingest not set. ", CaptiveCoreConfigUseDB) + } } // Configure log file diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 53abbfa9e8..cf10deb129 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -74,6 +74,7 @@ type Config struct { CaptiveCoreBinaryPath string CaptiveCoreStoragePath string CaptiveCoreToml *ledgerbackend.CaptiveCoreToml + CaptiveCoreConfigUseDB bool RemoteCaptiveCoreURL string NetworkPassphrase string @@ -224,11 +225,11 @@ func NewSystem(config Config) (System, error) { ledgerbackend.CaptiveCoreConfig{ BinaryPath: config.CaptiveCoreBinaryPath, StoragePath: config.CaptiveCoreStoragePath, + UseDB: config.CaptiveCoreConfigUseDB, Toml: config.CaptiveCoreToml, NetworkPassphrase: config.NetworkPassphrase, HistoryArchiveURLs: []string{config.HistoryArchiveURL}, CheckpointFrequency: config.CheckpointFrequency, - LedgerHashStore: ledgerbackend.NewHorizonDBLedgerHashStore(config.HistorySession), Log: logger, Context: ctx, }, diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 7b904bd30d..e2f92f225b 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -95,6 +95,7 @@ func initIngester(app *App) { StellarCoreCursor: app.config.CursorName, CaptiveCoreBinaryPath: app.config.CaptiveCoreBinaryPath, CaptiveCoreStoragePath: app.config.CaptiveCoreStoragePath, + CaptiveCoreConfigUseDB: app.config.CaptiveCoreConfigUseDB, CaptiveCoreToml: app.config.CaptiveCoreToml, RemoteCaptiveCoreURL: app.config.RemoteCaptiveCoreURL, EnableCaptiveCore: app.config.EnableCaptiveCoreIngestion, diff --git a/services/horizon/internal/integration/db_test.go b/services/horizon/internal/integration/db_test.go index 86f0f77ad5..26c9b9b595 100644 --- a/services/horizon/internal/integration/db_test.go +++ b/services/horizon/internal/integration/db_test.go @@ -530,6 +530,8 @@ func command(horizonConfig horizon.Config, args ...string) []string { horizonConfig.CaptiveCoreBinaryPath, "--captive-core-config-path", horizonConfig.CaptiveCoreConfigPath, + "--captive-core-use-db=" + + strconv.FormatBool(horizonConfig.CaptiveCoreConfigUseDB), "--enable-captive-core-ingestion=" + strconv.FormatBool(horizonConfig.EnableCaptiveCoreIngestion), "--network-passphrase", horizonConfig.NetworkPassphrase, diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index e5a265da97..622ec22f65 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -41,7 +41,8 @@ const ( ) var ( - RunWithCaptiveCore = os.Getenv("HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE") != "" + RunWithCaptiveCore = os.Getenv("HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE") != "" + RunWithCaptiveCoreUseDB = os.Getenv("HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE_USE_DB") != "" ) type Config struct { @@ -70,6 +71,7 @@ type Config struct { type CaptiveConfig struct { binaryPath string configPath string + useDB bool } type Test struct { @@ -153,6 +155,9 @@ func (i *Test) configureCaptiveCore() { composePath := findDockerComposePath() i.coreConfig.binaryPath = os.Getenv("CAPTIVE_CORE_BIN") i.coreConfig.configPath = filepath.Join(composePath, "captive-core-integration-tests.cfg") + if RunWithCaptiveCoreUseDB { + i.coreConfig.useDB = true + } } if value := i.getParameter( @@ -294,6 +299,7 @@ func (i *Test) StartHorizon() error { hostname := "localhost" coreBinaryPath := i.coreConfig.binaryPath captiveCoreConfigPath := i.coreConfig.configPath + captiveCoreUseDB := strconv.FormatBool(i.coreConfig.useDB) defaultArgs := map[string]string{ "stellar-core-url": i.coreClient.URL, @@ -306,6 +312,7 @@ func (i *Test) StartHorizon() error { "stellar-core-binary-path": coreBinaryPath, "captive-core-config-path": captiveCoreConfigPath, "captive-core-http-port": "21626", + "captive-core-use-db": captiveCoreUseDB, "enable-captive-core-ingestion": strconv.FormatBool(len(coreBinaryPath) > 0), "ingest": "true", "history-archive-urls": fmt.Sprintf("http://%s:%d", hostname, historyArchivePort), @@ -358,10 +365,6 @@ func (i *Test) StartHorizon() error { HorizonURL: fmt.Sprintf("http://%s:%s", hostname, horizonPort), } - if err = i.app.Ingestion().BuildGenesisState(); err != nil { - return errors.Wrap(err, "cannot build genesis state") - } - done := make(chan struct{}) go func() { i.app.Serve()