Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: Reuse Stellar-Core on-disk DB in online mode #4471

Merged
merged 4 commits into from
Jul 26, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 81 additions & 25 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ledgerbackend
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand All @@ -18,6 +19,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/support/log"
)

Expand Down Expand Up @@ -66,6 +68,7 @@ type stellarCoreRunner struct {
processExitError error

storagePath string
toml *CaptiveCoreToml
useDB bool
nonce string

Expand Down Expand Up @@ -102,18 +105,9 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
fullStoragePath = path.Join(config.StoragePath, "captive-core")
}

info, err := os.Stat(fullStoragePath)
if os.IsNotExist(err) {
innerErr := os.MkdirAll(fullStoragePath, os.FileMode(int(0755))) // rwx|rx|rx
if innerErr != nil {
return nil, errors.Wrap(innerErr, fmt.Sprintf(
"failed to create storage directory (%s)", fullStoragePath))
}
} else if !info.IsDir() {
return nil, errors.New(fmt.Sprintf("%s is not a directory", fullStoragePath))
} else if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf(
"error accessing storage directory (%s)", fullStoragePath))
err := createCheckDirectory(fullStoragePath)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(config.Context)
Expand All @@ -129,10 +123,11 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
"captive-stellar-core-%x",
rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(),
),
log: config.Log,
log: config.Log,
toml: config.Toml,
}

if conf, err := writeConf(config.Toml, mode, runner.getConfFileName()); err != nil {
if conf, err := runner.writeConf(); err != nil {
return nil, errors.Wrap(err, "error writing configuration")
} else {
runner.log.Debugf("captive core config file contents:\n%s", conf)
Expand All @@ -141,13 +136,31 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode)
return runner, nil
}

func writeConf(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode, location string) (string, error) {
text, err := generateConfig(captiveCoreToml, mode)
func createCheckDirectory(fullStoragePath string) error {
info, err := os.Stat(fullStoragePath)
if os.IsNotExist(err) {
innerErr := os.MkdirAll(fullStoragePath, os.FileMode(int(0755))) // rwx|rx|rx
if innerErr != nil {
return errors.Wrap(innerErr, fmt.Sprintf(
"failed to create storage directory (%s)", fullStoragePath))
}
} else if !info.IsDir() {
return errors.New(fmt.Sprintf("%s is not a directory", fullStoragePath))
} else if err != nil {
return errors.Wrap(err, fmt.Sprintf(
"error accessing storage directory (%s)", fullStoragePath))
}

return nil
}

func (r *stellarCoreRunner) writeConf() (string, error) {
text, err := generateConfig(r.toml, r.mode)
if err != nil {
return "", err
}

return string(text), ioutil.WriteFile(location, text, 0644)
return string(text), ioutil.WriteFile(r.getConfFileName(), text, 0644)
}

func generateConfig(captiveCoreToml *CaptiveCoreToml, mode stellarCoreRunnerMode) ([]byte, error) {
Expand Down Expand Up @@ -234,6 +247,22 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
return wr
}

func (r *stellarCoreRunner) offlineInfo() (stellarcore.InfoResponse, error) {
allParams := []string{"--conf", r.getConfFileName(), "offline-info"}
cmd := exec.Command(r.executablePath, allParams...)
cmd.Dir = r.storagePath
output, err := cmd.Output()
if err != nil {
return stellarcore.InfoResponse{}, errors.Wrap(err, "error executing offline-info cmd")
}
var info stellarcore.InfoResponse
err = json.Unmarshal(output, &info)
if err != nil {
return stellarcore.InfoResponse{}, errors.Wrap(err, "invalid output of offline-info cmd")
}
return info, nil
}

func (r *stellarCoreRunner) createCmd(params ...string) *exec.Cmd {
allParams := append([]string{"--conf", r.getConfFileName()}, params...)
cmd := exec.Command(r.executablePath, allParams...)
Expand Down Expand Up @@ -318,17 +347,44 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
}

if r.useDB {
if err := r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
// Check if on-disk core DB exists and what's the LCL there. If not what
// we need remove storage dir and start from scratch.
removeStorageDir := false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worthwhile to add a unit test in stellar_core_runner_test.go to assert this new outcome?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick update on this: I'm working on refactoring stellarCoreRunner to allow writing better unit tests. I'll have a new commit ready by the end of today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually while refactoring I changed some other parts of stellarCoreRunner that seemed inconsistent. Would you mind 👍 this PR (if there is nothing else that requires changes) and I'll open another PR with refactoring and tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sreuland follow up PR: #4480

info, err := r.offlineInfo()
if err != nil {
r.log.Infof("Error running offline-info: %v, removing existing storage-dir contents", err)
removeStorageDir = true
} else if uint32(info.Info.Ledger.Num) != from {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know how core maintains info.Info.Ledger.Num , i.e. does it only bump it when it knows the meta record for that sequence was read off the pipe? wondering if info.Info.Ledger.Num will tend to be farther ahead than from which represents the last sequence that horizon read off the pipe(and serialized to history), if it does drift asynchronously from meta pipe reader activity(horizon), then this condition won't get hit much, right, result being it ends up in same routine of new-db/catchup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my best knowledge and some experimenting it seems that Stellar-Core only closes the ledger once it's read from meta pipe. This leaves us with two cases:

  • Horizon is catching up (after restart or state build) - it this case bufferedLedgerMetaReader can read ledgers from meta pipe upfront which will make the Horizon to be behind. In this case, when Horizon is stopped with ledgers in the buffer the solution in this PR will not work because the ledger sequences in will not match on restart. We can try removing bufferedLedgerMetaReader in online mode but I'm not sure about performance of this change. We can explore it in a separate PR.
  • Horizon is ingesting latest ledgers - in this case the bufferedLedgerMetaReader will contain up to one ledger but if Horizon is shutdown gracefully it will process this ledger before shutting down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that's interesting, meaning there's only one ledger of data present in that pipe at any time, sounds like core writer blocks until it's empty, which is the signal that prior ledger was read, but, this at least recovers from any out-of-sync case and worst outcome is it does the same as current day of full removal first and init first.

r.log.Infof("Unexpected LCL in Stellar-Core DB: %d (want: %d), removing existing storage-dir contents", info.Info.Ledger.Num, from)
removeStorageDir = true
}
// Do a quick catch-up to set the LCL in core to be our expected starting
// point.
if from > 2 {
if err := r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil {

if removeStorageDir {
if err = os.RemoveAll(r.storagePath); err != nil {
return errors.Wrap(err, "error removing existing storage-dir contents")
}

if err = createCheckDirectory(r.storagePath); err != nil {
return err
}

if _, err = r.writeConf(); err != nil {
return errors.Wrap(err, "error writing configuration")
}

if err = r.createCmd("new-db").Run(); err != nil {
return errors.Wrap(err, "error initializing core db")
}

// Do a quick catch-up to set the LCL in core to be our expected starting
// point.
if from > 2 {
if err = r.createCmd("catchup", fmt.Sprintf("%d/0", from-1)).Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}
} else if err = r.createCmd("catchup", "2/0").Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}
} else if err := r.createCmd("catchup", "2/0").Run(); err != nil {
return errors.Wrap(err, "error runing stellar-core catchup")
}

r.cmd = r.createCmd(
Expand Down