Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Captive-Core fixes to support Stellar-Core 17.1.0 #3694

Merged
71 changes: 32 additions & 39 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@ type CaptiveStellarCore struct {
// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta

prepared *Range // non-nil if any range is prepared
nextLedger uint32 // next ledger expected, error w/ restart if not seen
lastLedger *uint32 // end of current segment if offline, nil if online
previousLedgerHash *string
@@ -213,6 +214,8 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error

// The next ledger should be the first ledger of the checkpoint containing
// the requested ledger
ran := BoundedRange(from, to)
c.prepared = &ran
c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from)
c.lastLedger = &to
c.previousLedgerHash = nil
@@ -248,7 +251,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
c.stellarCoreRunner = runner
}

runFrom, ledgerHash, nextLedger, err := c.runFromParams(ctx, from)
runFrom, ledgerHash, err := c.runFromParams(ctx, from)
if err != nil {
return errors.Wrap(err, "error calculating ledger and hash for stelar-core run")
}
@@ -258,26 +261,20 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
return errors.Wrap(err, "error running stellar-core")
}

c.nextLedger = nextLedger
// In the online mode we update nextLedger after streaming the first ledger.
// This is to support versions before and after/including v17.1.0 that
// introduced minimal persistent DB.
c.nextLedger = 0
ran := UnboundedRange(from)
c.prepared = &ran
c.lastLedger = nil
c.previousLedgerHash = nil

if c.ledgerHashStore != nil {
var exists bool
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, nextLedger-1)
if err != nil {
return errors.Wrapf(err, "error trying to read ledger hash %d", nextLedger-1)
}
if exists {
c.previousLedgerHash = &ledgerHash
}
}

return nil
}

// runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, nextLedger uint32, err error) {
func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, err error) {
if from == 1 {
// Trying to start-from 1 results in an error from Stellar-Core:
// Target ledger 1 is not newer than last closed ledger 1 - nothing to do
@@ -288,28 +285,12 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru
}

if from <= 63 {
// For ledgers before (and including) first checkpoint, get/wait the first
// checkpoint to get the ledger header. It will always start streaming
// from ledger 2.
nextLedger = 2
// The line below is to support a special case for streaming ledger 2
// that works for all other ledgers <= 63 (fast-forward).
// We can't set from=2 because Stellar-Core will not allow starting from 1.
// To solve this we start from 3 and exploit the fast that Stellar-Core
// will stream data from 2 for the first checkpoint.
from = 3
} else {
// For ledgers after the first checkpoint, start at the previous checkpoint
// and fast-forward from there.
if !c.checkpointManager.IsCheckpoint(from) {
from = c.checkpointManager.PrevCheckpoint(from)
}
// Streaming will start from the previous checkpoint + 1
nextLedger = from - 63
if nextLedger < 2 {
// Stellar-Core always streams from ledger 2 at min.
nextLedger = 2
}
}

runFrom = from - 1
@@ -334,6 +315,18 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru
return
}

// nextExpectedSequence returns nextLedger (if currently set) or start of
// prepared range. Otherwise it returns 0.
// This is done because `nextLedger` is 0 between the moment Stellar-Core is
// started and streaming the first ledger (in such case we return first ledger
// in requested range).
func (c *CaptiveStellarCore) nextExpectedSequence() uint32 {
if c.nextLedger == 0 && c.prepared != nil {
return c.prepared.from
}
return c.nextLedger
}

func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRange Range) (bool, error) {
c.stellarCoreLock.Lock()
defer c.stellarCoreLock.Unlock()
@@ -408,18 +401,18 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
cachedLedger = c.cachedMeta.LedgerSequence()
}

if c.nextLedger == 0 {
if c.prepared == nil {
return false
}

if lastLedger == 0 {
return c.nextLedger <= ledgerRange.from || cachedLedger == ledgerRange.from
return c.nextExpectedSequence() <= ledgerRange.from || cachedLedger == ledgerRange.from
}

// From now on: lastLedger != 0 so current range is bounded

if ledgerRange.bounded {
return (c.nextLedger <= ledgerRange.from || cachedLedger == ledgerRange.from) &&
return (c.nextExpectedSequence() <= ledgerRange.from || cachedLedger == ledgerRange.from) &&
lastLedger >= ledgerRange.to
}

@@ -458,11 +451,11 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd
return xdr.LedgerCloseMeta{}, errors.New("session is closed, call PrepareRange first")
}

if sequence < c.nextLedger {
if sequence < c.nextExpectedSequence() {
return xdr.LedgerCloseMeta{}, errors.Errorf(
"requested ledger %d is behind the captive core stream (expected=%d)",
sequence,
c.nextLedger,
c.nextExpectedSequence(),
)
}

@@ -495,7 +488,7 @@ func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaRe
}

seq := result.LedgerCloseMeta.LedgerSequence()
if seq != c.nextLedger {
if c.nextLedger != 0 && seq != c.nextLedger {
bartekn marked this conversation as resolved.
Show resolved Hide resolved
// We got something unexpected; close and reset
c.stellarCoreRunner.close()
return false, xdr.LedgerCloseMeta{}, errors.Errorf(
@@ -517,7 +510,7 @@ func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaRe
)
}

c.nextLedger++
c.nextLedger = result.LedgerSequence() + 1
currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString()
c.previousLedgerHash = &currentLedgerHash

@@ -583,13 +576,13 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
}

if c.lastLedger == nil {
return c.nextLedger - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil
return c.nextExpectedSequence() - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil
}
return *c.lastLedger, nil
}

func (c *CaptiveStellarCore) isClosed() bool {
return c.nextLedger == 0 || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil
return c.prepared == nil || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil
}

// Close closes existing Stellar-Core process, streaming sessions and removes all
116 changes: 98 additions & 18 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"path"
"path/filepath"
"regexp"
"runtime"
"strings"
"sync"
"time"
@@ -52,6 +53,7 @@ type stellarCoreRunner struct {
executablePath string

started bool
cmd *exec.Cmd
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
@@ -68,12 +70,32 @@ type stellarCoreRunner struct {
log *log.Entry
}

func createRandomHexString(n int) string {
hex := []rune("abcdef1234567890")
b := make([]rune, n)
for i := range b {
b[i] = hex[rand.Intn(len(hex))]
}
return string(b)
}

func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode) (*stellarCoreRunner, error) {
// Use the specified directory to store Captive Core's data:
// https://github.com/stellar/go/issues/3437
// but be sure to re-use rather than replace it:
// https://github.com/stellar/go/issues/3631
fullStoragePath := path.Join(config.StoragePath, "captive-core")
var fullStoragePath string
if runtime.GOOS != "windows" {
// Use the specified directory to store Captive Core's data:
// https://github.com/stellar/go/issues/3437
// but be sure to re-use rather than replace it:
// https://github.com/stellar/go/issues/3631
fullStoragePath = path.Join(config.StoragePath, "captive-core")
} else {
// On Windows, first we ALWAYS append something to the base storage path,
// because we will delete the directory entirely when Horizon stops. We also
// add a random suffix in order to ensure that there aren't naming
// conflicts.
// This is done because it's impossible to send SIGINT on Windows so
// buckets can become corrupted.
fullStoragePath = path.Join(config.StoragePath, "captive-core-"+createRandomHexString(8))
}

info, err := os.Stat(fullStoragePath)
if os.IsNotExist(err) {
@@ -206,7 +228,7 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer {

func (r *stellarCoreRunner) createCmd(params ...string) *exec.Cmd {
allParams := append([]string{"--conf", r.getConfFileName()}, params...)
cmd := exec.CommandContext(r.ctx, r.executablePath, allParams...)
cmd := exec.Command(r.executablePath, allParams...)
cmd.Dir = r.storagePath
cmd.Stdout = r.getLogLineWriter()
cmd.Stderr = r.getLogLineWriter()
@@ -250,16 +272,16 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
}

rangeArg := fmt.Sprintf("%d/%d", to, to-from+1)
cmd := r.createCmd(
r.cmd = r.createCmd(
"catchup", rangeArg,
"--metadata-output-stream", r.getPipeName(),
"--in-memory",
)

var err error
r.pipe, err = r.start(cmd)
r.pipe, err = r.start(r.cmd)
if err != nil {
r.closeLogLineWriters(cmd)
r.closeLogLineWriters(r.cmd)
return errors.Wrap(err, "error starting `stellar-core catchup` subprocess")
}

@@ -274,7 +296,7 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
}

r.wg.Add(1)
go r.handleExit(cmd)
bartekn marked this conversation as resolved.
Show resolved Hide resolved
go r.handleExit()

return nil
}
@@ -293,7 +315,7 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return errors.New("runner already started")
}

cmd := r.createCmd(
r.cmd = r.createCmd(
"run",
"--in-memory",
"--start-at-ledger", fmt.Sprintf("%d", from),
@@ -302,9 +324,9 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
)

var err error
r.pipe, err = r.start(cmd)
r.pipe, err = r.start(r.cmd)
if err != nil {
r.closeLogLineWriters(cmd)
r.closeLogLineWriters(r.cmd)
return errors.Wrap(err, "error starting `stellar-core run` subprocess")
}

@@ -319,15 +341,63 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
}

r.wg.Add(1)
go r.handleExit(cmd)
go r.handleExit()

return nil
}

func (r *stellarCoreRunner) handleExit(cmd *exec.Cmd) {
func (r *stellarCoreRunner) handleExit() {
defer r.wg.Done()
exitErr := cmd.Wait()
r.closeLogLineWriters(cmd)

// Pattern recommended in:
// https://github.com/golang/go/blob/cacac8bdc5c93e7bc71df71981fdf32dded017bf/src/cmd/go/script_test.go#L1091-L1098
var interrupt os.Signal = os.Interrupt
if runtime.GOOS == "windows" {
// Per https://golang.org/pkg/os/#Signal, “Interrupt is not implemented on
// Windows; using it with os.Process.Signal will return an error.”
// Fall back to Kill instead.
interrupt = os.Kill
}

errc := make(chan error)
go func() {
select {
case errc <- nil:
return
case <-r.ctx.Done():
}

err := r.cmd.Process.Signal(interrupt)
if err == nil {
err = r.ctx.Err() // Report ctx.Err() as the reason we interrupted.
} else if err.Error() == "os: process already finished" {
errc <- nil
return
}

timer := time.NewTimer(10 * time.Second)
select {
// Report ctx.Err() as the reason we interrupted the process...
case errc <- r.ctx.Err():
timer.Stop()
return
// ...but after killDelay has elapsed, fall back to a stronger signal.
case <-timer.C:
}

// Wait still hasn't returned.
// Kill the process harder to make sure that it exits.
//
// Ignore any error: if cmd.Process has already terminated, we still
// want to send ctx.Err() (or the error from the Interrupt call)
// to properly attribute the signal that may have terminated it.
_ = r.cmd.Process.Kill()

errc <- err
}()

waitErr := r.cmd.Wait()
r.closeLogLineWriters(r.cmd)

r.lock.Lock()
defer r.lock.Unlock()
@@ -340,7 +410,11 @@ func (r *stellarCoreRunner) handleExit(cmd *exec.Cmd) {
}

r.processExited = true
r.processExitError = exitErr
if interruptErr := <-errc; interruptErr != nil {
r.processExitError = interruptErr
} else {
r.processExitError = waitErr
}
}

// closeLogLineWriters closes the go routines created by getLogLineWriter()
@@ -398,5 +472,11 @@ func (r *stellarCoreRunner) close() error {
r.pipe.Reader.Close()
}

if runtime.GOOS == "windows" {
// It's impossible to send SIGINT on Windows so buckets can become
// corrupted. If we can't reuse it, then remove it.
return os.RemoveAll(storagePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the buckets are corrupted what happens if we don't remove the directory? will captive core be unable to start at all? should we also remove the directory on linux in the scenario that captive core does not shutdown gracefully and we have to use sigkill?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I believe the change in 3d28e9e should fix it (remove folder if there was an error terminating the process).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how r.processExitError != nil implies that the process must have been terminated by sigkill. Isn't the scenario below possible?

  1. context is cancelled
  2. we send sigint to captive core
  3. captive core terminates cleanly before the 10 second timeout (we don't need to send sigkill)
  4. r.processExitError is assigned the context error which is non nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot about context.Cancelled - does 3200baf look good now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bartekn I ran it locally and it seems to work. it might be worth adding an assertion in the integration tests here:

https://github.com/stellar/go/blob/master/services/horizon/internal/test/integration/integration.go#L252

if we're running the integration tests on windows with captive core then we expect that the buckets directory to still exist after horizon has shut down

}

return nil
}
1 change: 0 additions & 1 deletion ingest/ledgerbackend/testdata/appendix-with-fields.cfg
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ FAILURE_SAFETY=2
UNSAFE_QUORUM=false
PUBLIC_HTTP_PORT=true
RUN_STANDALONE=false
DISABLE_XDR_FSYNC=false
BUCKET_DIR_PATH="test-buckets"
HTTP_PORT = 6789
PEER_PORT = 12345
1 change: 0 additions & 1 deletion ingest/ledgerbackend/testdata/expected-offline-core.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Generated file, do not edit
DISABLE_XDR_FSYNC = true
FAILURE_SAFETY = 0
HTTP_PORT = 0
LOG_FILE_PATH = ""
Loading