diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 3b844a232a..917aa487a7 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -91,8 +91,9 @@ type CaptiveStellarCore struct { // cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger(). cachedMeta *xdr.LedgerCloseMeta - nextLedger uint32 // next ledger expected, error w/ restart if not seen - lastLedger *uint32 // end of current segment if offline, nil if online + nextLedger uint32 // next ledger expected, error w/ restart if not seen + lastLedger *uint32 // end of current segment if offline, nil if online + previousLedgerHash *string // waitIntervalPrepareRange defines a time to wait between checking if the buffer // is empty. Default 1s, lower in tests to make them faster. @@ -265,6 +266,18 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { c.nextLedger = nextLedger c.lastLedger = nil + + if c.ledgerHashStore != nil { + var exists bool + ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(nextLedger - 1) + if err != nil { + return errors.Wrapf(err, "error trying to read ledger hash %d", nextLedger-1) + } + if exists { + c.previousLedgerHash = &ledgerHash + } + } + c.blocking = false // read-ahead buffer @@ -486,10 +499,30 @@ loop: seq := result.LedgerCloseMeta.LedgerSequence() if seq != c.nextLedger { // We got something unexpected; close and reset - errOut = errors.Errorf("unexpected ledger (expected=%d actual=%d)", c.nextLedger, seq) + errOut = errors.Errorf( + "unexpected ledger sequence (expected=%d actual=%d)", + c.nextLedger, + seq, + ) break } + + newPreviousLedgerHash := result.LedgerCloseMeta.PreviousLedgerHash().HexString() + if c.previousLedgerHash != nil && *c.previousLedgerHash != newPreviousLedgerHash { + // We got something unexpected; close and reset + errOut = errors.Errorf( + "unexpected previous ledger hash for ledger %d (expected=%s actual=%s)", + seq, + *c.previousLedgerHash, + newPreviousLedgerHash, + ) + break + } + c.nextLedger++ + currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString() + c.previousLedgerHash = ¤tLedgerHash + if seq == sequence { // Found the requested seq c.cachedMeta = result.LedgerCloseMeta @@ -554,6 +587,7 @@ func (c *CaptiveStellarCore) Close() error { c.nextLedger = 0 c.lastLedger = nil + c.previousLedgerHash = nil return nil } diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 45f5993735..ff4729d8d6 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -55,7 +55,7 @@ func (m *stellarCoreRunnerMock) close() error { func (m *stellarCoreRunnerMock) setLogger(*log.Entry) {} -func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta { +func buildLedgerCloseMeta(header testLedgerHeader) xdr.LedgerCloseMeta { opResults := []xdr.OperationResult{} opMeta := []xdr.OperationMeta{} @@ -63,13 +63,33 @@ func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta { var hash [32]byte copy(hash[:], tmpHash) + var ledgerHash [32]byte + if header.hash != "" { + tmpHash, err := hex.DecodeString(header.hash) + if err != nil { + panic(err) + } + copy(ledgerHash[:], tmpHash) + } + + var previousLedgerHash [32]byte + if header.hash != "" { + tmpHash, err := hex.DecodeString(header.previousLedgerHash) + if err != nil { + panic(err) + } + copy(previousLedgerHash[:], tmpHash) + } + source := xdr.MustAddress("GAEJJMDDCRYF752PKIJICUVL7MROJBNXDV2ZB455T7BAFHU2LCLSE2LW") return xdr.LedgerCloseMeta{ V: 0, V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Hash: ledgerHash, Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(sequence), + LedgerSeq: xdr.Uint32(header.sequence), + PreviousLedgerHash: previousLedgerHash, }, }, TxSet: xdr.TransactionSet{ @@ -79,7 +99,7 @@ func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta { V1: &xdr.TransactionV1Envelope{ Tx: xdr.Transaction{ SourceAccount: source.ToMuxedAccount(), - Fee: xdr.Uint32(sequence), + Fee: xdr.Uint32(header.sequence), }, }, }, @@ -90,7 +110,7 @@ func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta { Result: xdr.TransactionResultPair{ TransactionHash: xdr.Hash(hash), Result: xdr.TransactionResult{ - FeeCharged: xdr.Int64(sequence), + FeeCharged: xdr.Int64(header.sequence), Result: xdr.TransactionResultResult{ Code: xdr.TransactionResultCodeTxSuccess, Results: &opResults, @@ -107,8 +127,14 @@ func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta { } -func writeLedgerHeader(w io.Writer, sequence uint32) { - err := xdr.MarshalFramed(w, buildLedgerCloseMeta(sequence)) +type testLedgerHeader struct { + sequence uint32 + hash string + previousLedgerHash string +} + +func writeLedgerHeader(w io.Writer, header testLedgerHeader) { + err := xdr.MarshalFramed(w, buildLedgerCloseMeta(header)) if err != nil { panic(err) } @@ -144,7 +170,7 @@ func TestCaptivePrepareRange(t *testing.T) { // Core will actually start with the last checkpoint before the from ledger // and then rewind to the `from` ledger. for i := 64; i <= 99; i++ { - writeLedgerHeader(&buf, uint32(i)) + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)}) } exitChan := make(chan struct{}) @@ -456,7 +482,7 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { var buf bytes.Buffer for i := 2; i <= 65; i++ { - writeLedgerHeader(&buf, uint32(i)) + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)}) } mockRunner := &stellarCoreRunnerMock{} @@ -497,7 +523,7 @@ func TestGetLatestLedgerSequence(t *testing.T) { var buf bytes.Buffer for i := 2; i <= 200; i++ { - writeLedgerHeader(&buf, uint32(i)) + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)}) } exitChan := make(chan struct{}) @@ -561,7 +587,7 @@ func TestCaptiveGetLedger(t *testing.T) { tt := assert.New(t) var buf bytes.Buffer for i := 64; i <= 66; i++ { - writeLedgerHeader(&buf, uint32(i)) + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)}) } mockRunner := &stellarCoreRunnerMock{} @@ -624,10 +650,10 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) tt := assert.New(t) var buf bytes.Buffer for i := 64; i <= 65; i++ { - writeLedgerHeader(&buf, uint32(i)) + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)}) } - writeLedgerHeader(&buf, uint32(68)) + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(68)}) mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) @@ -654,7 +680,7 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) assert.NoError(t, err) _, _, err = captiveBackend.GetLedger(66) - tt.EqualError(err, "unexpected ledger (expected=66 actual=68)") + tt.EqualError(err, "unexpected ledger sequence (expected=66 actual=68)") } func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) { tt := assert.New(t) @@ -695,7 +721,7 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) { tt := assert.New(t) var buf bytes.Buffer for i := 64; i <= 66; i++ { - writeLedgerHeader(&buf, uint32(i)) + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)}) } mockRunner := &stellarCoreRunnerMock{} @@ -734,7 +760,7 @@ func TestCaptiveGetLedger_BoundedGetLedgerAfterCoreExit(t *testing.T) { tt := assert.New(t) var buf bytes.Buffer for i := 64; i <= 70; i++ { - writeLedgerHeader(&buf, uint32(i)) + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)}) } mockRunner := &stellarCoreRunnerMock{} @@ -783,7 +809,7 @@ func TestCaptiveGetLedger_BoundedGetLedgerAfterCoreExit(t *testing.T) { func TestCaptiveGetLedger_CloseBufferFull(t *testing.T) { var buf bytes.Buffer for i := 2; i <= 200; i++ { - writeLedgerHeader(&buf, uint32(i)) + writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)}) } mockRunner := &stellarCoreRunnerMock{} @@ -843,9 +869,9 @@ func waitForBufferToFill(captiveBackend *CaptiveStellarCore) { func TestGetLedgerBoundsCheck(t *testing.T) { var buf bytes.Buffer - writeLedgerHeader(&buf, 128) - writeLedgerHeader(&buf, 129) - writeLedgerHeader(&buf, 130) + writeLedgerHeader(&buf, testLedgerHeader{sequence: 128}) + writeLedgerHeader(&buf, testLedgerHeader{sequence: 129}) + writeLedgerHeader(&buf, testLedgerHeader{sequence: 130}) mockRunner := &stellarCoreRunnerMock{} exitChan := make(chan struct{}) @@ -892,9 +918,9 @@ func TestGetLedgerBoundsCheck(t *testing.T) { mockRunner.AssertExpectations(t) buf.Reset() - writeLedgerHeader(&buf, 64) - writeLedgerHeader(&buf, 65) - writeLedgerHeader(&buf, 66) + writeLedgerHeader(&buf, testLedgerHeader{sequence: 64}) + writeLedgerHeader(&buf, testLedgerHeader{sequence: 65}) + writeLedgerHeader(&buf, testLedgerHeader{sequence: 66}) mockRunner.On("catchup", uint32(64), uint32(66)).Return(nil).Once() mockRunner.On("getProcessExitChan").Return(exitChan) @@ -941,7 +967,7 @@ func TestCaptiveGetLedgerTerminated(t *testing.T) { }, } - go writeLedgerHeader(writer, 64) + go writeLedgerHeader(writer, testLedgerHeader{sequence: 64}) err := captiveBackend.PrepareRange(BoundedRange(64, 100)) assert.NoError(t, err) @@ -1109,3 +1135,85 @@ func TestCaptiveIsPrepared(t *testing.T) { }) } } + +// TestCaptivePreviousLedgerCheck checks if previousLedgerHash is set in PrepareRange +// and then checked and updated in GetLedger. +func TestCaptivePreviousLedgerCheck(t *testing.T) { + var buf bytes.Buffer + + h := 3 + for i := 192; i <= 300; i++ { + writeLedgerHeader(&buf, testLedgerHeader{ + sequence: uint32(i), + hash: fmt.Sprintf("%02x00000000000000000000000000000000000000000000000000000000000000", h), + previousLedgerHash: fmt.Sprintf("%02x00000000000000000000000000000000000000000000000000000000000000", h-1), + }) + h++ + } + + // Write invalid hash + writeLedgerHeader(&buf, testLedgerHeader{ + sequence: 301, + hash: "0000000000000000000000000000000000000000000000000000000000000000", + previousLedgerHash: "0000000000000000000000000000000000000000000000000000000000000000", + }) + + ch := make(chan struct{}) + mockRunner := &stellarCoreRunnerMock{} + mockRunner.On("runFrom", uint32(254), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once() + mockRunner.On("getMetaPipe").Return(&buf) + mockRunner.On("getProcessExitChan").Return(ch) + mockRunner.On("getProcessExitError").Return(nil).Maybe() + mockRunner.On("close").Run(func(args mock.Arguments) { + close(ch) + }).Return(nil) + defer mockRunner.AssertExpectations(t) + + mockArchive := &historyarchive.MockArchive{} + mockArchive. + On("GetRootHAS"). + Return(historyarchive.HistoryArchiveState{ + CurrentLedger: uint32(255), + }, nil) + mockArchive. + On("GetLedgerHeader", uint32(255)). + Return(xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + PreviousLedgerHash: xdr.Hash{1, 1, 1, 1}, + }, + }, nil).Once() + defer mockArchive.AssertExpectations(t) + + mockLedgerHashStore := &MockLedgerHashStore{} + mockLedgerHashStore.On("GetLedgerHash", uint32(254)). + Return("", false, nil).Once() + mockLedgerHashStore.On("GetLedgerHash", uint32(191)). + Return("0200000000000000000000000000000000000000000000000000000000000000", true, nil).Once() + defer mockLedgerHashStore.AssertExpectations(t) + + captiveBackend := CaptiveStellarCore{ + configPath: "stellar-core.cfg", + archive: mockArchive, + networkPassphrase: network.PublicNetworkPassphrase, + stellarCoreRunnerFactory: func(configPath string) (stellarCoreRunnerInterface, error) { + return mockRunner, nil + }, + ledgerHashStore: mockLedgerHashStore, + } + + err := captiveBackend.PrepareRange(UnboundedRange(300)) + assert.NoError(t, err) + + exists, meta, err := captiveBackend.GetLedger(300) + assert.NoError(t, err) + assert.True(t, exists) + assert.NotNil(t, captiveBackend.previousLedgerHash) + assert.Equal(t, uint32(301), captiveBackend.nextLedger) + assert.Equal(t, meta.LedgerHash().HexString(), *captiveBackend.previousLedgerHash) + + _, _, err = captiveBackend.GetLedger(301) + assert.EqualError(t, err, "unexpected previous ledger hash for ledger 301 (expected=6f00000000000000000000000000000000000000000000000000000000000000 actual=0000000000000000000000000000000000000000000000000000000000000000)") + + err = captiveBackend.Close() + assert.NoError(t, err) +} diff --git a/xdr/hash.go b/xdr/hash.go new file mode 100644 index 0000000000..80a1800464 --- /dev/null +++ b/xdr/hash.go @@ -0,0 +1,7 @@ +package xdr + +import "encoding/hex" + +func (h Hash) HexString() string { + return hex.EncodeToString(h[:]) +} diff --git a/xdr/ledger_close_meta.go b/xdr/ledger_close_meta.go index 16f7499a63..d0ab5dfa08 100644 --- a/xdr/ledger_close_meta.go +++ b/xdr/ledger_close_meta.go @@ -3,3 +3,11 @@ package xdr func (l LedgerCloseMeta) LedgerSequence() uint32 { return uint32(l.MustV0().LedgerHeader.Header.LedgerSeq) } + +func (l LedgerCloseMeta) LedgerHash() Hash { + return l.MustV0().LedgerHeader.Hash +} + +func (l LedgerCloseMeta) PreviousLedgerHash() Hash { + return l.MustV0().LedgerHeader.Header.PreviousLedgerHash +}