Skip to content

Commit

Permalink
services/horizon: Allow captive core to run with sqlite database (#4092)
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Bellamy authored Feb 17, 2022
1 parent 2cce28d commit b7fde58
Show file tree
Hide file tree
Showing 20 changed files with 262 additions and 53 deletions.
13 changes: 13 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ jobs:
enable-captive-core:
type: boolean
default: false
enable-captive-core-remote-storage:
type: boolean
default: false
working_directory: ~/go/src/github.com/stellar/go
machine:
image: ubuntu-2004:202010-01
Expand All @@ -452,6 +455,12 @@ jobs:
- run:
name: Setting Captive Core env variables
command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE=true" >> $BASH_ENV
- when:
condition: <<parameters.enable-captive-core-remote-storage>>
steps:
- run:
name: Setting Captive Core Remote Storage env variable
command: echo "export HORIZON_INTEGRATION_ENABLE_CAPTIVE_CORE_USE_DB=true" >> $BASH_ENV
- run:
name: Run Horizon integration tests <<#parameters.enable-captive-core>>(With captive core)<</parameters.enable-captive-core>>
# Currently all integration tests are in a single directory.
Expand Down Expand Up @@ -480,6 +489,10 @@ workflows:
- test_horizon_integration:
name: test_horizon_integration_with_captive_core
enable-captive-core: true
- test_horizon_integration:
name: test_horizon_integration_with_captive_core_remote_storage
enable-captive-core: true
enable-captive-core-remote-storage: true
- test_verify_range_docker_image:
filters:
# we use test_verify_range_docker_image with publish in master
Expand Down
42 changes: 25 additions & 17 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type CaptiveCoreConfig struct {
// stored. We always append /captive-core to this directory, since we clean
// it up entirely on shutdown.
StoragePath string

// UseDB, when true, instructs the core invocation to use an external db url
// for ledger states rather than in memory(RAM). The external db url is determined by the presence
// of DATABASE parameter in the captive-core-config-path or if absent, the db will default to sqlite
// and the db file will be stored at location derived from StoragePath parameter.
UseDB bool
}

// NewCaptive returns a new CaptiveStellarCore instance.
Expand All @@ -142,6 +148,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
if parentCtx == nil {
parentCtx = context.Background()
}

var cancel context.CancelFunc
config.Context, cancel = context.WithCancel(parentCtx)

Expand Down Expand Up @@ -250,11 +257,8 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
var runner stellarCoreRunnerInterface
if runner, err = c.stellarCoreRunnerFactory(stellarCoreRunnerModeOnline); err != nil {
return errors.Wrap(err, "error creating stellar-core runner")
} else {
// only assign c.stellarCoreRunner if runner is not nil to avoid nil interface check
// see https://golang.org/doc/faq#nil_error
c.stellarCoreRunner = runner
}
c.stellarCoreRunner = runner

runFrom, ledgerHash, err := c.runFromParams(ctx, from)
if err != nil {
Expand All @@ -279,14 +283,15 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
}

// runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, err error) {
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (uint32, string, error) {

if from == 1 {
// Trying to start-from 1 results in an error from Stellar-Core:
// Target ledger 1 is not newer than last closed ledger 1 - nothing to do
// TODO maybe we can fix it by generating 1st ledger meta
// like GenesisLedgerStateReader?
err = errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2")
return
err := errors.New("CaptiveCore is unable to start from ledger 1, start from ledger 2")
return 0, "", err
}

if from <= 63 {
Expand All @@ -298,26 +303,25 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru
from = 3
}

runFrom = from - 1
runFrom := from - 1
if c.ledgerHashStore != nil {
var exists bool
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, runFrom)
ledgerHash, exists, err := c.ledgerHashStore.GetLedgerHash(ctx, runFrom)
if err != nil {
err = errors.Wrapf(err, "error trying to read ledger hash %d", runFrom)
return
return 0, "", err
}
if exists {
return
return runFrom, ledgerHash, nil
}
}

ledgerHeader, err2 := c.archive.GetLedgerHeader(from)
if err2 != nil {
err = errors.Wrapf(err2, "error trying to read ledger header %d from HAS", from)
return
ledgerHeader, err := c.archive.GetLedgerHeader(from)
if err != nil {
return 0, "", errors.Wrapf(err, "error trying to read ledger header %d from HAS", from)
}
ledgerHash = hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:])
return
ledgerHash := hex.EncodeToString(ledgerHeader.Header.PreviousLedgerHash[:])
return runFrom, ledgerHash, nil
}

// nextExpectedSequence returns nextLedger (if currently set) or start of
Expand Down Expand Up @@ -406,6 +410,10 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
return false
}

if exited, _ := c.stellarCoreRunner.getProcessExitError(); exited {
return false
}

lastLedger := uint32(0)
if c.lastLedger != nil {
lastLedger = *c.lastLedger
Expand Down
7 changes: 6 additions & 1 deletion ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("close").Return(fmt.Errorf("transient error"))
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("context").Return(ctx)

captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -490,13 +491,15 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetRootHAS").
Return(historyarchive.HistoryArchiveState{
CurrentLedger: uint32(129),
}, nil)

mockArchive.
On("GetLedgerHeader", uint32(65)).
Return(xdr.LedgerHeaderHistoryEntry{}, nil)
Expand Down Expand Up @@ -585,6 +588,7 @@ func TestCaptiveGetLedger(t *testing.T) {
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -1288,6 +1292,7 @@ func TestCaptiveRunFromParams(t *testing.T) {
func TestCaptiveIsPrepared(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("context").Return(context.Background()).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)

// c.prepared == nil
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1351,6 +1356,7 @@ func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
ctx, cancel := context.WithCancel(context.Background())
mockRunner.On("context").Return(ctx).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)

rang := UnboundedRange(100)
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1447,5 +1453,4 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) {

mockRunner.AssertExpectations(t)
mockArchive.AssertExpectations(t)
mockLedgerHashStore.AssertExpectations(t)
}
58 changes: 46 additions & 12 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type stellarCoreRunner struct {
processExitError error

storagePath string
useDB bool
nonce string

log *log.Entry
Expand Down Expand Up @@ -122,6 +123,7 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
ctx: ctx,
cancel: cancel,
storagePath: fullStoragePath,
useDB: config.UseDB,
mode: mode,
nonce: fmt.Sprintf(
"captive-stellar-core-%x",
Expand Down Expand Up @@ -261,11 +263,22 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
}

rangeArg := fmt.Sprintf("%d/%d", to, to-from+1)
r.cmd = r.createCmd(
"catchup", rangeArg,
"--metadata-output-stream", r.getPipeName(),
"--in-memory",
)
params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()}

// horizon operator has specified to use external storage for captive core ledger state
// instruct captive core invocation to not use memory, and in that case
// cc will look at DATABASE property in cfg toml for the external storage source to use.
// when using external storage of ledgers, use new-db to first set the state of
// remote db storage to genesis to purge any prior state and reset.
if r.useDB {
if err := r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
}
} else {
params = append(params, "--in-memory")
}

r.cmd = r.createCmd(params...)

var err error
r.pipe, err = r.start(r.cmd)
Expand Down Expand Up @@ -304,13 +317,34 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return errors.New("runner already started")
}

r.cmd = r.createCmd(
"run",
"--in-memory",
"--start-at-ledger", fmt.Sprintf("%d", from),
"--start-at-hash", hash,
"--metadata-output-stream", r.getPipeName(),
)
if r.useDB {
if err := r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
}
// Do a quick catch-up to set the LCL in core to be our expected starting
// point.
if from > 2 {
if err := r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}
} else if err := r.createCmd("catchup", "2/0").Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}

r.cmd = r.createCmd(
"run",
"--metadata-output-stream",
r.getPipeName(),
)
} else {
r.cmd = r.createCmd(
"run",
"--in-memory",
"--start-at-ledger", fmt.Sprintf("%d", from),
"--start-at-hash", hash,
"--metadata-output-stream", r.getPipeName(),
)
}

var err error
r.pipe, err = r.start(r.cmd)
Expand Down
1 change: 1 addition & 0 deletions ingest/ledgerbackend/testdata/expected-offline-core.cfg
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Generated file, do not edit
DATABASE = "sqlite3://stellar.db"
FAILURE_SAFETY = 0
HTTP_PORT = 0
LOG_FILE_PATH = ""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# DATABASE limited to only be sqlite:// protocol
DATABASE="postgres://mydb"

[[HOME_DOMAINS]]
HOME_DOMAIN="testnet.stellar.org"
QUALITY="MEDIUM"

[[VALIDATORS]]
NAME="sdf_testnet_1"
HOME_DOMAIN="testnet.stellar.org"
PUBLIC_KEY="GDKXE2OZMJIPOSLNA6N6F2BVCI3O777I2OOC4BV7VOYUEHYX7RTRYA7Y"
ADDRESS="localhost:123"
12 changes: 12 additions & 0 deletions ingest/ledgerbackend/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type QuorumSet struct {
}

type captiveCoreTomlValues struct {
Database string `toml:"DATABASE,omitempty"`
// we cannot omitempty because the empty string is a valid configuration for LOG_FILE_PATH
// and the default is stellar-core.log
LogFilePath string `toml:"LOG_FILE_PATH"`
Expand Down Expand Up @@ -312,6 +313,8 @@ type CaptiveCoreTomlParams struct {
LogPath *string
// Strict is a flag which, if enabled, rejects Stellar Core toml fields which are not supported by captive core.
Strict bool
// If true, specifies that captive core should be invoked with on-disk rather than in-memory option for ledger state
UseDB bool
}

// NewCaptiveCoreTomlFromFile constructs a new CaptiveCoreToml instance by merging configuration
Expand Down Expand Up @@ -405,6 +408,11 @@ func (c *CaptiveCoreToml) CatchupToml() (*CaptiveCoreToml, error) {
}

func (c *CaptiveCoreToml) setDefaults(params CaptiveCoreTomlParams) {

if params.UseDB && !c.tree.Has("DATABASE") {
c.Database = "sqlite3://stellar.db"
}

if !c.tree.Has("NETWORK_PASSPHRASE") {
c.NetworkPassphrase = params.NetworkPassphrase
}
Expand Down Expand Up @@ -549,5 +557,9 @@ func (c *CaptiveCoreToml) validate(params CaptiveCoreTomlParams) error {
names[v.Name] = true
}

if len(c.Database) > 0 && !strings.HasPrefix(c.Database, "sqlite3://") {
return fmt.Errorf("invalid DATABASE parameter: %s, for captive core config, must be valid sqlite3 db url", c.Database)
}

return nil
}
Loading

0 comments on commit b7fde58

Please sign in to comment.