diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index a7127c3110..174d86b1c5 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -88,6 +88,7 @@ type CaptiveStellarCore struct { // cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger(). cachedMeta *xdr.LedgerCloseMeta + prepared *Range // non-nil if any range is prepared nextLedger uint32 // next ledger expected, error w/ restart if not seen lastLedger *uint32 // end of current segment if offline, nil if online previousLedgerHash *string @@ -213,6 +214,8 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error // The next ledger should be the first ledger of the checkpoint containing // the requested ledger + ran := BoundedRange(from, to) + c.prepared = &ran c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from) c.lastLedger = &to c.previousLedgerHash = nil @@ -248,7 +251,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro c.stellarCoreRunner = runner } - runFrom, ledgerHash, nextLedger, err := c.runFromParams(ctx, from) + runFrom, ledgerHash, err := c.runFromParams(ctx, from) if err != nil { return errors.Wrap(err, "error calculating ledger and hash for stelar-core run") } @@ -258,26 +261,20 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro return errors.Wrap(err, "error running stellar-core") } - c.nextLedger = nextLedger + // In the online mode we update nextLedger after streaming the first ledger. + // This is to support versions before and after/including v17.1.0 that + // introduced minimal persistent DB. + c.nextLedger = 0 + ran := UnboundedRange(from) + c.prepared = &ran c.lastLedger = nil c.previousLedgerHash = nil - if c.ledgerHashStore != nil { - var exists bool - ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(ctx, nextLedger-1) - if err != nil { - return errors.Wrapf(err, "error trying to read ledger hash %d", nextLedger-1) - } - if exists { - c.previousLedgerHash = &ledgerHash - } - } - return nil } // runFromParams receives a ledger sequence and calculates the required values to call stellar-core run with --start-ledger and --start-hash -func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, nextLedger uint32, err error) { +func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (runFrom uint32, ledgerHash string, err error) { if from == 1 { // Trying to start-from 1 results in an error from Stellar-Core: // Target ledger 1 is not newer than last closed ledger 1 - nothing to do @@ -288,28 +285,12 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru } if from <= 63 { - // For ledgers before (and including) first checkpoint, get/wait the first - // checkpoint to get the ledger header. It will always start streaming - // from ledger 2. - nextLedger = 2 // The line below is to support a special case for streaming ledger 2 // that works for all other ledgers <= 63 (fast-forward). // We can't set from=2 because Stellar-Core will not allow starting from 1. // To solve this we start from 3 and exploit the fast that Stellar-Core // will stream data from 2 for the first checkpoint. from = 3 - } else { - // For ledgers after the first checkpoint, start at the previous checkpoint - // and fast-forward from there. - if !c.checkpointManager.IsCheckpoint(from) { - from = c.checkpointManager.PrevCheckpoint(from) - } - // Streaming will start from the previous checkpoint + 1 - nextLedger = from - 63 - if nextLedger < 2 { - // Stellar-Core always streams from ledger 2 at min. - nextLedger = 2 - } } runFrom = from - 1 @@ -334,6 +315,18 @@ func (c *CaptiveStellarCore) runFromParams(ctx context.Context, from uint32) (ru return } +// nextExpectedSequence returns nextLedger (if currently set) or start of +// prepared range. Otherwise it returns 0. +// This is done because `nextLedger` is 0 between the moment Stellar-Core is +// started and streaming the first ledger (in such case we return first ledger +// in requested range). +func (c *CaptiveStellarCore) nextExpectedSequence() uint32 { + if c.nextLedger == 0 && c.prepared != nil { + return c.prepared.from + } + return c.nextLedger +} + func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRange Range) (bool, error) { c.stellarCoreLock.Lock() defer c.stellarCoreLock.Unlock() @@ -408,18 +401,18 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool { cachedLedger = c.cachedMeta.LedgerSequence() } - if c.nextLedger == 0 { + if c.prepared == nil { return false } if lastLedger == 0 { - return c.nextLedger <= ledgerRange.from || cachedLedger == ledgerRange.from + return c.nextExpectedSequence() <= ledgerRange.from || cachedLedger == ledgerRange.from } // From now on: lastLedger != 0 so current range is bounded if ledgerRange.bounded { - return (c.nextLedger <= ledgerRange.from || cachedLedger == ledgerRange.from) && + return (c.nextExpectedSequence() <= ledgerRange.from || cachedLedger == ledgerRange.from) && lastLedger >= ledgerRange.to } @@ -458,11 +451,11 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd return xdr.LedgerCloseMeta{}, errors.New("session is closed, call PrepareRange first") } - if sequence < c.nextLedger { + if sequence < c.nextExpectedSequence() { return xdr.LedgerCloseMeta{}, errors.Errorf( "requested ledger %d is behind the captive core stream (expected=%d)", sequence, - c.nextLedger, + c.nextExpectedSequence(), ) } @@ -495,14 +488,22 @@ func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaRe } seq := result.LedgerCloseMeta.LedgerSequence() - if seq != c.nextLedger { - // We got something unexpected; close and reset + // If we got something unexpected; close and reset + if c.nextLedger != 0 && seq != c.nextLedger { c.stellarCoreRunner.close() return false, xdr.LedgerCloseMeta{}, errors.Errorf( "unexpected ledger sequence (expected=%d actual=%d)", c.nextLedger, seq, ) + } else if c.nextLedger == 0 && seq > c.prepared.from { + // First stream ledger is greater than prepared.from + c.stellarCoreRunner.close() + return false, xdr.LedgerCloseMeta{}, errors.Errorf( + "unexpected ledger sequence (expected=<=%d actual=%d)", + c.prepared.from, + seq, + ) } newPreviousLedgerHash := result.LedgerCloseMeta.PreviousLedgerHash().HexString() @@ -517,7 +518,7 @@ func (c *CaptiveStellarCore) handleMetaPipeResult(sequence uint32, result metaRe ) } - c.nextLedger++ + c.nextLedger = result.LedgerSequence() + 1 currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString() c.previousLedgerHash = ¤tLedgerHash @@ -583,13 +584,13 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3 } if c.lastLedger == nil { - return c.nextLedger - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil + return c.nextExpectedSequence() - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil } return *c.lastLedger, nil } func (c *CaptiveStellarCore) isClosed() bool { - return c.nextLedger == 0 || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil + return c.prepared == nil || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil } // Close closes existing Stellar-Core process, streaming sessions and removes all diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 02b36c6314..6f46aadf3b 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -270,7 +270,6 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} mockRunner.On("close").Return(fmt.Errorf("transient error")) - mockRunner.On("context").Return(ctx) captiveBackend := CaptiveStellarCore{ nextLedger: 300, @@ -405,7 +404,7 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) { func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(126), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once() + mockRunner.On("runFrom", uint32(127), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once() mockRunner.On("close").Return(nil).Once() mockArchive := &historyarchive.MockArchive{} @@ -416,7 +415,7 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { }, nil) mockArchive. - On("GetLedgerHeader", uint32(127)). + On("GetLedgerHeader", uint32(128)). Return(xdr.LedgerHeaderHistoryEntry{}, nil) ctx := context.Background() @@ -457,7 +456,7 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(62), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) mockRunner.On("context").Return(ctx) @@ -468,7 +467,7 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { CurrentLedger: uint32(129), }, nil) mockArchive. - On("GetLedgerHeader", uint32(63)). + On("GetLedgerHeader", uint32(65)). Return(xdr.LedgerHeaderHistoryEntry{}, nil) captiveBackend := CaptiveStellarCore{ @@ -504,7 +503,7 @@ func TestGetLatestLedgerSequence(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(62), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) mockRunner.On("context").Return(ctx) @@ -516,7 +515,7 @@ func TestGetLatestLedgerSequence(t *testing.T) { }, nil) mockArchive. - On("GetLedgerHeader", uint32(63)). + On("GetLedgerHeader", uint32(64)). Return(xdr.LedgerHeaderHistoryEntry{}, nil) captiveBackend := CaptiveStellarCore{ @@ -635,7 +634,7 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(62), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(65), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) mockRunner.On("context").Return(ctx) @@ -647,7 +646,7 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) { }, nil) mockArchive. - On("GetLedgerHeader", uint32(63)). + On("GetLedgerHeader", uint32(66)). Return(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ PreviousLedgerHash: xdr.Hash{1, 1, 1, 1}, @@ -723,6 +722,58 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) _, err = captiveBackend.GetLedger(ctx, 66) assert.EqualError(t, err, "unexpected ledger sequence (expected=66 actual=68)") + // TODO assertions should work - to be fixed in a separate PR. + // _, err = captiveBackend.GetLedger(ctx, 66) + // assert.EqualError(t, err, "session is closed, call PrepareRange first") + + mockArchive.AssertExpectations(t) + mockRunner.AssertExpectations(t) +} + +func TestCaptiveGetLedger_NextLedger0RangeFromIsSmallerThanLedgerFromBuffer(t *testing.T) { + metaChan := make(chan metaResult, 100) + + for i := 66; i <= 66; i++ { + meta := buildLedgerCloseMeta(testLedgerHeader{sequence: uint32(i)}) + metaChan <- metaResult{ + LedgerCloseMeta: &meta, + } + } + + ctx := context.Background() + mockRunner := &stellarCoreRunnerMock{} + mockRunner.On("runFrom", uint32(64), mock.Anything).Return(nil) + mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) + mockRunner.On("context").Return(ctx) + mockRunner.On("close").Return(nil) + + mockArchive := &historyarchive.MockArchive{} + mockArchive. + On("GetRootHAS"). + Return(historyarchive.HistoryArchiveState{ + CurrentLedger: uint32(200), + }, nil) + + mockArchive. + On("GetLedgerHeader", uint32(65)). + Return(xdr.LedgerHeaderHistoryEntry{}, nil) + + captiveBackend := CaptiveStellarCore{ + archive: mockArchive, + stellarCoreRunnerFactory: func(_ stellarCoreRunnerMode) (stellarCoreRunnerInterface, error) { + return mockRunner, nil + }, + checkpointManager: historyarchive.NewCheckpointManager(64), + } + + err := captiveBackend.PrepareRange(ctx, UnboundedRange(65)) + assert.EqualError(t, err, "Error fast-forwarding to 65: unexpected ledger sequence (expected=<=65 actual=66)") + + // TODO assertions should work - to be fixed in a separate PR. + // prepared, err := captiveBackend.IsPrepared(ctx, UnboundedRange(65)) + // assert.NoError(t, err) + // assert.False(t, prepared) + mockArchive.AssertExpectations(t) mockRunner.AssertExpectations(t) } @@ -1080,7 +1131,7 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) { ctx := context.Background() mockArchive := &historyarchive.MockArchive{} mockArchive. - On("GetLedgerHeader", uint32(255)). + On("GetLedgerHeader", uint32(300)). Return(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ PreviousLedgerHash: xdr.Hash{1, 1, 1, 1}, @@ -1088,13 +1139,13 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) { }, nil) mockLedgerHashStore := &MockLedgerHashStore{} - mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(1022)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(1049)). Return("", false, fmt.Errorf("transient error")).Once() - mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(254)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(299)). Return("", false, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(62)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(85)). Return("cde", true, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(126)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(127)). Return("ghi", true, nil).Once() mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(2)). Return("mnb", true, nil).Once() @@ -1109,32 +1160,28 @@ func TestCaptiveUseOfLedgerHashStore(t *testing.T) { }), } - runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(ctx, 24) + runFrom, ledgerHash, err := captiveBackend.runFromParams(ctx, 24) assert.NoError(t, err) assert.Equal(t, uint32(2), runFrom) assert.Equal(t, "mnb", ledgerHash) - assert.Equal(t, uint32(2), nextLedger) - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(ctx, 86) + runFrom, ledgerHash, err = captiveBackend.runFromParams(ctx, 86) assert.NoError(t, err) - assert.Equal(t, uint32(62), runFrom) + assert.Equal(t, uint32(85), runFrom) assert.Equal(t, "cde", ledgerHash) - assert.Equal(t, uint32(2), nextLedger) - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(ctx, 128) + runFrom, ledgerHash, err = captiveBackend.runFromParams(ctx, 128) assert.NoError(t, err) - assert.Equal(t, uint32(126), runFrom) + assert.Equal(t, uint32(127), runFrom) assert.Equal(t, "ghi", ledgerHash) - assert.Equal(t, uint32(64), nextLedger) - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(ctx, 1050) - assert.EqualError(t, err, "error trying to read ledger hash 1022: transient error") + _, _, err = captiveBackend.runFromParams(ctx, 1050) + assert.EqualError(t, err, "error trying to read ledger hash 1049: transient error") - runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(ctx, 300) + runFrom, ledgerHash, err = captiveBackend.runFromParams(ctx, 300) assert.NoError(t, err) - assert.Equal(t, uint32(254), runFrom, "runFrom") + assert.Equal(t, uint32(299), runFrom, "runFrom") assert.Equal(t, "0101010100000000000000000000000000000000000000000000000000000000", ledgerHash) - assert.Equal(t, uint32(192), nextLedger, "nextLedger") mockLedgerHashStore.On("Close").Return(nil).Once() err = captiveBackend.Close() @@ -1149,26 +1196,25 @@ func TestCaptiveRunFromParams(t *testing.T) { from uint32 runFrom uint32 ledgerArchives uint32 - nextLedger uint32 }{ // Before and including 1st checkpoint: - {2, 2, 3, 2}, - {3, 2, 3, 2}, - {3, 2, 3, 2}, - {4, 2, 3, 2}, - {62, 2, 3, 2}, - {63, 2, 3, 2}, + {2, 2, 3}, + {3, 2, 3}, + {3, 2, 3}, + {4, 2, 3}, + {62, 2, 3}, + {63, 2, 3}, // Starting from 64 we go normal path: between 1st and 2nd checkpoint: - {64, 62, 63, 2}, - {65, 62, 63, 2}, - {66, 62, 63, 2}, - {126, 62, 63, 2}, + {64, 63, 64}, + {65, 64, 65}, + {66, 65, 66}, + {126, 125, 126}, // between 2nd and 3rd checkpoint... and so on. - {127, 126, 127, 64}, - {128, 126, 127, 64}, - {129, 126, 127, 64}, + {127, 126, 127}, + {128, 127, 128}, + {129, 128, 129}, } for _, tc := range tests { @@ -1189,11 +1235,10 @@ func TestCaptiveRunFromParams(t *testing.T) { } ctx := context.Background() - runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(ctx, tc.from) + runFrom, ledgerHash, err := captiveBackend.runFromParams(ctx, tc.from) tt.NoError(err) tt.Equal(tc.runFrom, runFrom, "runFrom") tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash) - tt.Equal(tc.nextLedger, nextLedger, "nextLedger") mockArchive.AssertExpectations(t) }) @@ -1201,32 +1246,47 @@ func TestCaptiveRunFromParams(t *testing.T) { } func TestCaptiveIsPrepared(t *testing.T) { + mockRunner := &stellarCoreRunnerMock{} + mockRunner.On("context").Return(context.Background()).Maybe() + + // c.prepared == nil + captiveBackend := CaptiveStellarCore{ + nextLedger: 0, + } + + result := captiveBackend.isPrepared(UnboundedRange(100)) + assert.False(t, result) + + // c.prepared != nil: var tests = []struct { - nextLedger uint32 - lastLedger uint32 - cachedLedger uint32 - ledgerRange Range - result bool + nextLedger uint32 + lastLedger uint32 + cachedLedger uint32 + preparedRange Range + ledgerRange Range + result bool }{ - {0, 0, 0, UnboundedRange(100), false}, - {100, 0, 0, UnboundedRange(101), true}, - {101, 0, 100, UnboundedRange(100), true}, - {100, 200, 0, UnboundedRange(100), false}, - - {100, 200, 0, BoundedRange(100, 200), true}, - {100, 200, 0, BoundedRange(100, 201), false}, - {100, 201, 0, BoundedRange(100, 200), true}, - {101, 200, 100, BoundedRange(100, 200), true}, + // If nextLedger == 0, prepared range is checked + {0, 0, 0, UnboundedRange(100), UnboundedRange(100), true}, + {0, 0, 0, UnboundedRange(100), UnboundedRange(99), false}, + {0, 0, 0, UnboundedRange(100), BoundedRange(100, 200), true}, + + {100, 0, 0, UnboundedRange(99), UnboundedRange(101), true}, + {101, 0, 100, UnboundedRange(99), UnboundedRange(100), true}, + {100, 200, 0, BoundedRange(99, 200), UnboundedRange(100), false}, + + {100, 200, 0, BoundedRange(99, 200), BoundedRange(100, 200), true}, + {100, 200, 0, BoundedRange(99, 200), BoundedRange(100, 201), false}, + {100, 201, 0, BoundedRange(99, 201), BoundedRange(100, 200), true}, + {101, 200, 100, BoundedRange(99, 200), BoundedRange(100, 200), true}, } for _, tc := range tests { t.Run(fmt.Sprintf("next_%d_last_%d_cached_%d_range_%v", tc.nextLedger, tc.lastLedger, tc.cachedLedger, tc.ledgerRange), func(t *testing.T) { - mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("context").Return(context.Background()).Maybe() - captiveBackend := CaptiveStellarCore{ stellarCoreRunner: mockRunner, nextLedger: tc.nextLedger, + prepared: &tc.preparedRange, } if tc.lastLedger > 0 { captiveBackend.lastLedger = &tc.lastLedger @@ -1277,7 +1337,7 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { ctx := context.Background() mockRunner := &stellarCoreRunnerMock{} - mockRunner.On("runFrom", uint32(254), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("runFrom", uint32(299), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil).Once() @@ -1289,7 +1349,7 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { CurrentLedger: uint32(255), }, nil) mockArchive. - On("GetLedgerHeader", uint32(255)). + On("GetLedgerHeader", uint32(300)). Return(xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ PreviousLedgerHash: xdr.Hash{1, 1, 1, 1}, @@ -1297,10 +1357,8 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) { }, nil).Once() mockLedgerHashStore := &MockLedgerHashStore{} - mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(254)). + mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(299)). Return("", false, nil).Once() - mockLedgerHashStore.On("GetLedgerHash", ctx, uint32(191)). - Return("0200000000000000000000000000000000000000000000000000000000000000", true, nil).Once() captiveBackend := CaptiveStellarCore{ archive: mockArchive, diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 5baa980f80..143548d5dd 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -12,6 +12,7 @@ import ( "path" "path/filepath" "regexp" + "runtime" "strings" "sync" "time" @@ -52,11 +53,13 @@ type stellarCoreRunner struct { executablePath string started bool + cmd *exec.Cmd wg sync.WaitGroup ctx context.Context cancel context.CancelFunc ledgerBuffer *bufferedLedgerMetaReader pipe pipe + mode stellarCoreRunnerMode lock sync.Mutex processExited bool @@ -68,12 +71,35 @@ type stellarCoreRunner struct { log *log.Entry } +func createRandomHexString(n int) string { + hex := []rune("abcdef1234567890") + b := make([]rune, n) + for i := range b { + b[i] = hex[rand.Intn(len(hex))] + } + return string(b) +} + func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode) (*stellarCoreRunner, error) { - // Use the specified directory to store Captive Core's data: - // https://github.com/stellar/go/issues/3437 - // but be sure to re-use rather than replace it: - // https://github.com/stellar/go/issues/3631 - fullStoragePath := path.Join(config.StoragePath, "captive-core") + var fullStoragePath string + if runtime.GOOS != "windows" && mode != stellarCoreRunnerModeOffline { + // Use the specified directory to store Captive Core's data: + // https://github.com/stellar/go/issues/3437 + // but be sure to re-use rather than replace it: + // https://github.com/stellar/go/issues/3631 + fullStoragePath = path.Join(config.StoragePath, "captive-core") + } else { + // On Windows, first we ALWAYS append something to the base storage path, + // because we will delete the directory entirely when Horizon stops. We also + // add a random suffix in order to ensure that there aren't naming + // conflicts. + // This is done because it's impossible to send SIGINT on Windows so + // buckets can become corrupted. + // We also want to use random directories in offline mode (reingestion) + // because it's possible it's running multiple Stellar-Cores on a single + // machine. + fullStoragePath = path.Join(config.StoragePath, "captive-core-"+createRandomHexString(8)) + } info, err := os.Stat(fullStoragePath) if os.IsNotExist(err) { @@ -96,6 +122,7 @@ func newStellarCoreRunner(config CaptiveCoreConfig, mode stellarCoreRunnerMode) ctx: ctx, cancel: cancel, storagePath: fullStoragePath, + mode: mode, nonce: fmt.Sprintf( "captive-stellar-core-%x", rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(), @@ -206,7 +233,7 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer { func (r *stellarCoreRunner) createCmd(params ...string) *exec.Cmd { allParams := append([]string{"--conf", r.getConfFileName()}, params...) - cmd := exec.CommandContext(r.ctx, r.executablePath, allParams...) + cmd := exec.Command(r.executablePath, allParams...) cmd.Dir = r.storagePath cmd.Stdout = r.getLogLineWriter() cmd.Stderr = r.getLogLineWriter() @@ -250,16 +277,16 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { } rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) - cmd := r.createCmd( + r.cmd = r.createCmd( "catchup", rangeArg, "--metadata-output-stream", r.getPipeName(), "--in-memory", ) var err error - r.pipe, err = r.start(cmd) + r.pipe, err = r.start(r.cmd) if err != nil { - r.closeLogLineWriters(cmd) + r.closeLogLineWriters(r.cmd) return errors.Wrap(err, "error starting `stellar-core catchup` subprocess") } @@ -274,7 +301,7 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { } r.wg.Add(1) - go r.handleExit(cmd) + go r.handleExit() return nil } @@ -293,7 +320,7 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { return errors.New("runner already started") } - cmd := r.createCmd( + r.cmd = r.createCmd( "run", "--in-memory", "--start-at-ledger", fmt.Sprintf("%d", from), @@ -302,9 +329,9 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { ) var err error - r.pipe, err = r.start(cmd) + r.pipe, err = r.start(r.cmd) if err != nil { - r.closeLogLineWriters(cmd) + r.closeLogLineWriters(r.cmd) return errors.Wrap(err, "error starting `stellar-core run` subprocess") } @@ -319,15 +346,63 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { } r.wg.Add(1) - go r.handleExit(cmd) + go r.handleExit() return nil } -func (r *stellarCoreRunner) handleExit(cmd *exec.Cmd) { +func (r *stellarCoreRunner) handleExit() { defer r.wg.Done() - exitErr := cmd.Wait() - r.closeLogLineWriters(cmd) + + // Pattern recommended in: + // https://github.com/golang/go/blob/cacac8bdc5c93e7bc71df71981fdf32dded017bf/src/cmd/go/script_test.go#L1091-L1098 + var interrupt os.Signal = os.Interrupt + if runtime.GOOS == "windows" { + // Per https://golang.org/pkg/os/#Signal, “Interrupt is not implemented on + // Windows; using it with os.Process.Signal will return an error.” + // Fall back to Kill instead. + interrupt = os.Kill + } + + errc := make(chan error) + go func() { + select { + case errc <- nil: + return + case <-r.ctx.Done(): + } + + err := r.cmd.Process.Signal(interrupt) + if err == nil { + err = r.ctx.Err() // Report ctx.Err() as the reason we interrupted. + } else if err.Error() == "os: process already finished" { + errc <- nil + return + } + + timer := time.NewTimer(10 * time.Second) + select { + // Report ctx.Err() as the reason we interrupted the process... + case errc <- r.ctx.Err(): + timer.Stop() + return + // ...but after killDelay has elapsed, fall back to a stronger signal. + case <-timer.C: + } + + // Wait still hasn't returned. + // Kill the process harder to make sure that it exits. + // + // Ignore any error: if cmd.Process has already terminated, we still + // want to send ctx.Err() (or the error from the Interrupt call) + // to properly attribute the signal that may have terminated it. + _ = r.cmd.Process.Kill() + + errc <- err + }() + + waitErr := r.cmd.Wait() + r.closeLogLineWriters(r.cmd) r.lock.Lock() defer r.lock.Unlock() @@ -340,7 +415,11 @@ func (r *stellarCoreRunner) handleExit(cmd *exec.Cmd) { } r.processExited = true - r.processExitError = exitErr + if interruptErr := <-errc; interruptErr != nil { + r.processExitError = interruptErr + } else { + r.processExitError = waitErr + } } // closeLogLineWriters closes the go routines created by getLogLineWriter() @@ -398,5 +477,16 @@ func (r *stellarCoreRunner) close() error { r.pipe.Reader.Close() } + if runtime.GOOS == "windows" || + (r.processExitError != nil && r.processExitError != context.Canceled) || + r.mode == stellarCoreRunnerModeOffline { + // It's impossible to send SIGINT on Windows so buckets can become + // corrupted. If we can't reuse it, then remove it. + // We also remove the storage path if there was an error terminating the + // process (files can be corrupted). + // We remove all files when reingesting to save disk space. + return os.RemoveAll(storagePath) + } + return nil } diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index 36179bb976..18e6b0656a 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -5,12 +5,13 @@ import ( "os" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stellar/go/support/log" ) -func TestCloseBeforeStart(t *testing.T) { +func TestCloseBeforeStartOffline(t *testing.T) { captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) @@ -29,7 +30,63 @@ func TestCloseBeforeStart(t *testing.T) { assert.NoError(t, runner.close()) + // Directory cleaned up on shutdown when reingesting to save space + _, err = os.Stat(tempDir) + assert.Error(t, err) + assert.Contains(t, err.Error(), "no such file or directory") +} + +func TestCloseBeforeStartOnline(t *testing.T) { + captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) + assert.NoError(t, err) + + captiveCoreToml.AddExamplePubnetValidators() + + runner, err := newStellarCoreRunner(CaptiveCoreConfig{ + HistoryArchiveURLs: []string{"http://localhost"}, + Log: log.New(), + Context: context.Background(), + Toml: captiveCoreToml, + }, stellarCoreRunnerModeOnline) + assert.NoError(t, err) + + tempDir := runner.storagePath + info, err := os.Stat(tempDir) + assert.NoError(t, err) + assert.True(t, info.IsDir()) + + assert.NoError(t, runner.close()) + // Directory no longer cleaned up on shutdown (perf. bump in v2.5.0) _, err = os.Stat(tempDir) assert.NoError(t, err) } + +func TestCloseBeforeStartOnlineWithError(t *testing.T) { + captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) + assert.NoError(t, err) + + captiveCoreToml.AddExamplePubnetValidators() + + runner, err := newStellarCoreRunner(CaptiveCoreConfig{ + HistoryArchiveURLs: []string{"http://localhost"}, + Log: log.New(), + Context: context.Background(), + Toml: captiveCoreToml, + }, stellarCoreRunnerModeOnline) + assert.NoError(t, err) + + runner.processExitError = errors.New("some error") + + tempDir := runner.storagePath + info, err := os.Stat(tempDir) + assert.NoError(t, err) + assert.True(t, info.IsDir()) + + assert.NoError(t, runner.close()) + + // Directory cleaned up on shutdown with error (potentially corrupted files) + _, err = os.Stat(tempDir) + assert.Error(t, err) + assert.Contains(t, err.Error(), "no such file or directory") +} diff --git a/ingest/ledgerbackend/testdata/appendix-with-fields.cfg b/ingest/ledgerbackend/testdata/appendix-with-fields.cfg index c77833b1b2..42c4d2459f 100644 --- a/ingest/ledgerbackend/testdata/appendix-with-fields.cfg +++ b/ingest/ledgerbackend/testdata/appendix-with-fields.cfg @@ -3,7 +3,6 @@ FAILURE_SAFETY=2 UNSAFE_QUORUM=false PUBLIC_HTTP_PORT=true RUN_STANDALONE=false -DISABLE_XDR_FSYNC=false BUCKET_DIR_PATH="test-buckets" HTTP_PORT = 6789 PEER_PORT = 12345 diff --git a/ingest/ledgerbackend/testdata/expected-offline-core.cfg b/ingest/ledgerbackend/testdata/expected-offline-core.cfg index 6fc2616d8f..62aeeb6664 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-core.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-core.cfg @@ -1,5 +1,4 @@ # Generated file, do not edit -DISABLE_XDR_FSYNC = true FAILURE_SAFETY = 0 HTTP_PORT = 0 LOG_FILE_PATH = "" diff --git a/ingest/ledgerbackend/testdata/expected-offline-with-appendix-core.cfg b/ingest/ledgerbackend/testdata/expected-offline-with-appendix-core.cfg index d7edb3421b..124abc435b 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-with-appendix-core.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-with-appendix-core.cfg @@ -1,5 +1,4 @@ # Generated file, do not edit -DISABLE_XDR_FSYNC = true FAILURE_SAFETY = 0 HTTP_PORT = 0 LOG_FILE_PATH = "" diff --git a/ingest/ledgerbackend/testdata/expected-offline-with-no-peer-port.cfg b/ingest/ledgerbackend/testdata/expected-offline-with-no-peer-port.cfg index 7b358cc079..9eca1ccad1 100644 --- a/ingest/ledgerbackend/testdata/expected-offline-with-no-peer-port.cfg +++ b/ingest/ledgerbackend/testdata/expected-offline-with-no-peer-port.cfg @@ -1,5 +1,4 @@ # Generated file, do not edit -DISABLE_XDR_FSYNC = true FAILURE_SAFETY = 0 HTTP_PORT = 0 LOG_FILE_PATH = "/var/stellar-core/test.log" diff --git a/ingest/ledgerbackend/testdata/expected-online-core.cfg b/ingest/ledgerbackend/testdata/expected-online-core.cfg index e5944587ca..57a5e7ff2c 100644 --- a/ingest/ledgerbackend/testdata/expected-online-core.cfg +++ b/ingest/ledgerbackend/testdata/expected-online-core.cfg @@ -1,5 +1,4 @@ # Generated file, do not edit -DISABLE_XDR_FSYNC = true FAILURE_SAFETY = -1 HTTP_PORT = 6789 LOG_FILE_PATH = "" diff --git a/ingest/ledgerbackend/testdata/expected-online-with-no-http-port.cfg b/ingest/ledgerbackend/testdata/expected-online-with-no-http-port.cfg index 249101f33c..89e1762757 100644 --- a/ingest/ledgerbackend/testdata/expected-online-with-no-http-port.cfg +++ b/ingest/ledgerbackend/testdata/expected-online-with-no-http-port.cfg @@ -1,5 +1,4 @@ # Generated file, do not edit -DISABLE_XDR_FSYNC = true FAILURE_SAFETY = -1 HTTP_PORT = 11626 LOG_FILE_PATH = "" diff --git a/ingest/ledgerbackend/testdata/expected-online-with-no-peer-port.cfg b/ingest/ledgerbackend/testdata/expected-online-with-no-peer-port.cfg index 356fc6c736..1b65c5f318 100644 --- a/ingest/ledgerbackend/testdata/expected-online-with-no-peer-port.cfg +++ b/ingest/ledgerbackend/testdata/expected-online-with-no-peer-port.cfg @@ -1,5 +1,4 @@ # Generated file, do not edit -DISABLE_XDR_FSYNC = true FAILURE_SAFETY = -1 HTTP_PORT = 6789 LOG_FILE_PATH = "/var/stellar-core/test.log" diff --git a/ingest/ledgerbackend/toml.go b/ingest/ledgerbackend/toml.go index 5d642e9466..e5f7529a43 100644 --- a/ingest/ledgerbackend/toml.go +++ b/ingest/ledgerbackend/toml.go @@ -21,10 +21,6 @@ const ( // if LOG_FILE_PATH is omitted stellar core actually defaults to "stellar-core.log" // however, we are overriding this default for captive core defaultLogFilePath = "" // by default we disable logging to a file - - // if DISABLE_XDR_FSYNC is omitted stellar core actually defaults to false - // however, we are overriding this default for captive core - defaultDisableXDRFsync = true ) var validQuality = map[string]bool{ @@ -82,7 +78,6 @@ type captiveCoreTomlValues struct { UnsafeQuorum bool `toml:"UNSAFE_QUORUM,omitempty"` RunStandalone bool `toml:"RUN_STANDALONE,omitempty"` ArtificiallyAccelerateTimeForTesting bool `toml:"ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING,omitempty"` - DisableXDRFsync bool `toml:"DISABLE_XDR_FSYNC,omitempty"` HomeDomains []HomeDomain `toml:"HOME_DOMAINS,omitempty"` Validators []Validator `toml:"VALIDATORS,omitempty"` HistoryEntries map[string]History `toml:"-"` @@ -180,6 +175,33 @@ func unflattenTables(text string, tablePlaceHolders *placeholders) string { }) } +// AddExamplePubnetQuorum adds example pubnet validators to toml file +func (c *CaptiveCoreToml) AddExamplePubnetValidators() { + c.captiveCoreTomlValues.Validators = []Validator{ + { + Name: "sdf_1", + HomeDomain: "stellar.org", + PublicKey: "GCGB2S2KGYARPVIA37HYZXVRM2YZUEXA6S33ZU5BUDC6THSB62LZSTYH", + Address: "core-live-a.stellar.org:11625", + History: "curl -sf https://history.stellar.org/prd/core-live/core_live_001/{0} -o {1}", + }, + { + Name: "sdf_2", + HomeDomain: "stellar.org", + PublicKey: "GCM6QMP3DLRPTAZW2UZPCPX2LF3SXWXKPMP3GKFZBDSF3QZGV2G5QSTK", + Address: "core-live-b.stellar.org:11625", + History: "curl -sf https://history.stellar.org/prd/core-live/core_live_002/{0} -o {1}", + }, + { + Name: "sdf_3", + HomeDomain: "stellar.org", + PublicKey: "GABMKJM6I25XI4K7U6XWMULOUQIQ27BCTMLS6BYYSOWKTBUXVRJSXHYQ", + Address: "core-live-c.stellar.org:11625", + History: "curl -sf https://history.stellar.org/prd/core-live/core_live_003/{0} -o {1}", + }, + } +} + // Marshal serializes the CaptiveCoreToml into a toml document. func (c *CaptiveCoreToml) Marshal() ([]byte, error) { var sb strings.Builder @@ -401,10 +423,6 @@ func (c *CaptiveCoreToml) setDefaults(params CaptiveCoreTomlParams) { if !c.tree.Has("FAILURE_SAFETY") { c.FailureSafety = defaultFailureSafety } - if !c.tree.Has("DISABLE_XDR_FSYNC") { - c.DisableXDRFsync = defaultDisableXDRFsync - } - if !c.HistoryIsConfigured() { c.HistoryEntries = map[string]History{} for i, val := range params.HistoryArchiveURLs { diff --git a/ingest/ledgerbackend/toml_test.go b/ingest/ledgerbackend/toml_test.go index 6349665b0a..5052c85ced 100644 --- a/ingest/ledgerbackend/toml_test.go +++ b/ingest/ledgerbackend/toml_test.go @@ -1,10 +1,11 @@ package ledgerbackend import ( - "github.com/stretchr/testify/assert" "io/ioutil" "path/filepath" "testing" + + "github.com/stretchr/testify/assert" ) func newUint(v uint) *uint {