Skip to content

Commit

Permalink
ingest: Reuse Stellar-Core on-disk DB in online mode (#4471)
Browse files Browse the repository at this point in the history
This commit changes the behaviour of `stellarCoreRunner` when using an on-disk
DB in online mode to check if existing storage dir contains the DB in a state
that allows Captive Core to start without rebuilding Stellar-Core state. In
short, it checks (by using `stellar-core offline-info` command) if the LCL of
Stellar-Core matches the requested ledger in `startFrom`.

This was done because while applying state from buckets was relatively fast in
memory mode of Captive Core it can be extremely slow when using disk. This
change allows reusing existing state in most cases.

Close #4454.
  • Loading branch information
bartekn authored Jul 26, 2022
1 parent 0d47357 commit 4850d22
Showing 1 changed file with 81 additions and 25 deletions.
106 changes: 81 additions & 25 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ledgerbackend
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand All @@ -18,6 +19,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/support/log"
)

Expand Down Expand Up @@ -66,6 +68,7 @@ type stellarCoreRunner struct {
processExitError error

storagePath string
toml *CaptiveCoreToml
useDB bool
nonce string

Expand Down Expand Up @@ -102,18 +105,9 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
fullStoragePath = path.Join(config.StoragePath, "captive-core")
}

info, err := os.Stat(fullStoragePath)
if os.IsNotExist(err) {
innerErr := os.MkdirAll(fullStoragePath, os.FileMode(int(0755))) // rwx|rx|rx
if innerErr != nil {
return nil, errors.Wrap(innerErr, fmt.Sprintf(
"failed to create storage directory (%s)", fullStoragePath))
}
} else if !info.IsDir() {
return nil, errors.New(fmt.Sprintf("%s is not a directory", fullStoragePath))
} else if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf(
"error accessing storage directory (%s)", fullStoragePath))
err := createCheckDirectory(fullStoragePath)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(config.Context)
Expand All @@ -129,10 +123,11 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
"captive-stellar-core-%x",
rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(),
),
log: config.Log,
log: config.Log,
toml: config.Toml,
}

if conf, err := writeConf(config.Toml, mode, runner.getConfFileName()); err != nil {
if conf, err := runner.writeConf(); err != nil {
return nil, errors.Wrap(err, "error writing configuration")
} else {
runner.log.Debugf("captive core config file contents:\n%s", conf)
Expand All @@ -141,13 +136,31 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
return runner, nil
}

func writeConf(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode, location string) (string, error) {
text, err := generateConfig(captiveCoreToml, mode)
func createCheckDirectory(fullStoragePath string) error {
info, err := os.Stat(fullStoragePath)
if os.IsNotExist(err) {
innerErr := os.MkdirAll(fullStoragePath, os.FileMode(int(0755))) // rwx|rx|rx
if innerErr != nil {
return errors.Wrap(innerErr, fmt.Sprintf(
"failed to create storage directory (%s)", fullStoragePath))
}
} else if !info.IsDir() {
return errors.New(fmt.Sprintf("%s is not a directory", fullStoragePath))
} else if err != nil {
return errors.Wrap(err, fmt.Sprintf(
"error accessing storage directory (%s)", fullStoragePath))
}

return nil
}

func (r *stellarCoreRunner) writeConf() (string, error) {
text, err := generateConfig(r.toml, r.mode)
if err != nil {
return "", err
}

return string(text), ioutil.WriteFile(location, text, 0644)
return string(text), ioutil.WriteFile(r.getConfFileName(), text, 0644)
}

func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) {
Expand Down Expand Up @@ -234,6 +247,22 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
return wr
}

func (r *stellarCoreRunner) offlineInfo() (stellarcore.InfoResponse, error) {
allParams := []string{"--conf", r.getConfFileName(), "offline-info"}
cmd := exec.Command(r.executablePath, allParams...)
cmd.Dir = r.storagePath
output, err := cmd.Output()
if err != nil {
return stellarcore.InfoResponse{}, errors.Wrap(err, "error executing offline-info cmd")
}
var info stellarcore.InfoResponse
err = json.Unmarshal(output, &info)
if err != nil {
return stellarcore.InfoResponse{}, errors.Wrap(err, "invalid output of offline-info cmd")
}
return info, nil
}

func (r *stellarCoreRunner) createCmd(params ...string) *exec.Cmd {
allParams := append([]string{"--conf", r.getConfFileName()}, params...)
cmd := exec.Command(r.executablePath, allParams...)
Expand Down Expand Up @@ -318,17 +347,44 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
}

if r.useDB {
if err := r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
// Check if on-disk core DB exists and what's the LCL there. If not what
// we need remove storage dir and start from scratch.
removeStorageDir := false
info, err := r.offlineInfo()
if err != nil {
r.log.Infof("Error running offline-info: %v, removing existing storage-dir contents", err)
removeStorageDir = true
} else if uint32(info.Info.Ledger.Num) != from {
r.log.Infof("Unexpected LCL in Stellar-Core DB: %d (want: %d), removing existing storage-dir contents", info.Info.Ledger.Num, from)
removeStorageDir = true
}
// Do a quick catch-up to set the LCL in core to be our expected starting
// point.
if from > 2 {
if err := r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil {

if removeStorageDir {
if err = os.RemoveAll(r.storagePath); err != nil {
return errors.Wrap(err, "error removing existing storage-dir contents")
}

if err = createCheckDirectory(r.storagePath); err != nil {
return err
}

if _, err = r.writeConf(); err != nil {
return errors.Wrap(err, "error writing configuration")
}

if err = r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
}

// Do a quick catch-up to set the LCL in core to be our expected starting
// point.
if from > 2 {
if err = r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}
} else if err = r.createCmd("catchup", "2/0").Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}
} else if err := r.createCmd("catchup", "2/0").Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}

r.cmd = r.createCmd(
Expand Down

0 comments on commit 4850d22

Please sign in to comment.