Skip to content

Commit

Permalink
ingest/ledgerbackend: Add prev ledger hash check to CaptiveStellarCore (
Browse files Browse the repository at this point in the history
#3257)

Adds an extra check in `GetLedger` checking if previously read ledger
header match the current ledger previous hash.

`CaptiveStellarCore` reads ledgers in a strict increasing order. It's
worth checking if the previous hash is match to ensure we receive
ledgers in correct order.
  • Loading branch information
bartekn authored Nov 27, 2020
1 parent 423317f commit 7fa5a54
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 26 deletions.
40 changes: 37 additions & 3 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ type CaptiveStellarCore struct {
// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta

nextLedger uint32 // next ledger expected, error w/ restart if not seen
lastLedger *uint32 // end of current segment if offline, nil if online
nextLedger uint32 // next ledger expected, error w/ restart if not seen
lastLedger *uint32 // end of current segment if offline, nil if online
previousLedgerHash *string

// waitIntervalPrepareRange defines a time to wait between checking if the buffer
// is empty. Default 1s, lower in tests to make them faster.
Expand Down Expand Up @@ -265,6 +266,18 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {

c.nextLedger = nextLedger
c.lastLedger = nil

if c.ledgerHashStore != nil {
var exists bool
ledgerHash, exists, err = c.ledgerHashStore.GetLedgerHash(nextLedger - 1)
if err != nil {
return errors.Wrapf(err, "error trying to read ledger hash %d", nextLedger-1)
}
if exists {
c.previousLedgerHash = &ledgerHash
}
}

c.blocking = false

// read-ahead buffer
Expand Down Expand Up @@ -486,10 +499,30 @@ loop:
seq := result.LedgerCloseMeta.LedgerSequence()
if seq != c.nextLedger {
// We got something unexpected; close and reset
errOut = errors.Errorf("unexpected ledger (expected=%d actual=%d)", c.nextLedger, seq)
errOut = errors.Errorf(
"unexpected ledger sequence (expected=%d actual=%d)",
c.nextLedger,
seq,
)
break
}

newPreviousLedgerHash := result.LedgerCloseMeta.PreviousLedgerHash().HexString()
if c.previousLedgerHash != nil && *c.previousLedgerHash != newPreviousLedgerHash {
// We got something unexpected; close and reset
errOut = errors.Errorf(
"unexpected previous ledger hash for ledger %d (expected=%s actual=%s)",
seq,
*c.previousLedgerHash,
newPreviousLedgerHash,
)
break
}

c.nextLedger++
currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString()
c.previousLedgerHash = &currentLedgerHash

if seq == sequence {
// Found the requested seq
c.cachedMeta = result.LedgerCloseMeta
Expand Down Expand Up @@ -554,6 +587,7 @@ func (c *CaptiveStellarCore) Close() error {

c.nextLedger = 0
c.lastLedger = nil
c.previousLedgerHash = nil

return nil
}
Expand Down
154 changes: 131 additions & 23 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,41 @@ func (m *stellarCoreRunnerMock) close() error {

func (m *stellarCoreRunnerMock) setLogger(*log.Entry) {}

func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta {
func buildLedgerCloseMeta(header testLedgerHeader) xdr.LedgerCloseMeta {
opResults := []xdr.OperationResult{}
opMeta := []xdr.OperationMeta{}

tmpHash, _ := hex.DecodeString("cde54da3901f5b9c0331d24fbb06ac9c5c5de76de9fb2d4a7b86c09e46f11d8c")
var hash [32]byte
copy(hash[:], tmpHash)

var ledgerHash [32]byte
if header.hash != "" {
tmpHash, err := hex.DecodeString(header.hash)
if err != nil {
panic(err)
}
copy(ledgerHash[:], tmpHash)
}

var previousLedgerHash [32]byte
if header.hash != "" {
tmpHash, err := hex.DecodeString(header.previousLedgerHash)
if err != nil {
panic(err)
}
copy(previousLedgerHash[:], tmpHash)
}

source := xdr.MustAddress("GAEJJMDDCRYF752PKIJICUVL7MROJBNXDV2ZB455T7BAFHU2LCLSE2LW")
return xdr.LedgerCloseMeta{
V: 0,
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Hash: ledgerHash,
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(sequence),
LedgerSeq: xdr.Uint32(header.sequence),
PreviousLedgerHash: previousLedgerHash,
},
},
TxSet: xdr.TransactionSet{
Expand All @@ -79,7 +99,7 @@ func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta {
V1: &xdr.TransactionV1Envelope{
Tx: xdr.Transaction{
SourceAccount: source.ToMuxedAccount(),
Fee: xdr.Uint32(sequence),
Fee: xdr.Uint32(header.sequence),
},
},
},
Expand All @@ -90,7 +110,7 @@ func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta {
Result: xdr.TransactionResultPair{
TransactionHash: xdr.Hash(hash),
Result: xdr.TransactionResult{
FeeCharged: xdr.Int64(sequence),
FeeCharged: xdr.Int64(header.sequence),
Result: xdr.TransactionResultResult{
Code: xdr.TransactionResultCodeTxSuccess,
Results: &opResults,
Expand All @@ -107,8 +127,14 @@ func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta {

}

func writeLedgerHeader(w io.Writer, sequence uint32) {
err := xdr.MarshalFramed(w, buildLedgerCloseMeta(sequence))
type testLedgerHeader struct {
sequence uint32
hash string
previousLedgerHash string
}

func writeLedgerHeader(w io.Writer, header testLedgerHeader) {
err := xdr.MarshalFramed(w, buildLedgerCloseMeta(header))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -144,7 +170,7 @@ func TestCaptivePrepareRange(t *testing.T) {
// Core will actually start with the last checkpoint before the from ledger
// and then rewind to the `from` ledger.
for i := 64; i <= 99; i++ {
writeLedgerHeader(&buf, uint32(i))
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)})
}

exitChan := make(chan struct{})
Expand Down Expand Up @@ -456,7 +482,7 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
var buf bytes.Buffer

for i := 2; i <= 65; i++ {
writeLedgerHeader(&buf, uint32(i))
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)})
}

mockRunner := &stellarCoreRunnerMock{}
Expand Down Expand Up @@ -497,7 +523,7 @@ func TestGetLatestLedgerSequence(t *testing.T) {
var buf bytes.Buffer

for i := 2; i <= 200; i++ {
writeLedgerHeader(&buf, uint32(i))
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)})
}

exitChan := make(chan struct{})
Expand Down Expand Up @@ -561,7 +587,7 @@ func TestCaptiveGetLedger(t *testing.T) {
tt := assert.New(t)
var buf bytes.Buffer
for i := 64; i <= 66; i++ {
writeLedgerHeader(&buf, uint32(i))
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)})
}

mockRunner := &stellarCoreRunnerMock{}
Expand Down Expand Up @@ -624,10 +650,10 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T)
tt := assert.New(t)
var buf bytes.Buffer
for i := 64; i <= 65; i++ {
writeLedgerHeader(&buf, uint32(i))
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)})
}

writeLedgerHeader(&buf, uint32(68))
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(68)})

mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
Expand All @@ -654,7 +680,7 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T)
assert.NoError(t, err)

_, _, err = captiveBackend.GetLedger(66)
tt.EqualError(err, "unexpected ledger (expected=66 actual=68)")
tt.EqualError(err, "unexpected ledger sequence (expected=66 actual=68)")
}
func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) {
tt := assert.New(t)
Expand Down Expand Up @@ -695,7 +721,7 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) {
tt := assert.New(t)
var buf bytes.Buffer
for i := 64; i <= 66; i++ {
writeLedgerHeader(&buf, uint32(i))
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)})
}

mockRunner := &stellarCoreRunnerMock{}
Expand Down Expand Up @@ -734,7 +760,7 @@ func TestCaptiveGetLedger_BoundedGetLedgerAfterCoreExit(t *testing.T) {
tt := assert.New(t)
var buf bytes.Buffer
for i := 64; i <= 70; i++ {
writeLedgerHeader(&buf, uint32(i))
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)})
}

mockRunner := &stellarCoreRunnerMock{}
Expand Down Expand Up @@ -783,7 +809,7 @@ func TestCaptiveGetLedger_BoundedGetLedgerAfterCoreExit(t *testing.T) {
func TestCaptiveGetLedger_CloseBufferFull(t *testing.T) {
var buf bytes.Buffer
for i := 2; i <= 200; i++ {
writeLedgerHeader(&buf, uint32(i))
writeLedgerHeader(&buf, testLedgerHeader{sequence: uint32(i)})
}

mockRunner := &stellarCoreRunnerMock{}
Expand Down Expand Up @@ -843,9 +869,9 @@ func waitForBufferToFill(captiveBackend *CaptiveStellarCore) {

func TestGetLedgerBoundsCheck(t *testing.T) {
var buf bytes.Buffer
writeLedgerHeader(&buf, 128)
writeLedgerHeader(&buf, 129)
writeLedgerHeader(&buf, 130)
writeLedgerHeader(&buf, testLedgerHeader{sequence: 128})
writeLedgerHeader(&buf, testLedgerHeader{sequence: 129})
writeLedgerHeader(&buf, testLedgerHeader{sequence: 130})

mockRunner := &stellarCoreRunnerMock{}
exitChan := make(chan struct{})
Expand Down Expand Up @@ -892,9 +918,9 @@ func TestGetLedgerBoundsCheck(t *testing.T) {
mockRunner.AssertExpectations(t)

buf.Reset()
writeLedgerHeader(&buf, 64)
writeLedgerHeader(&buf, 65)
writeLedgerHeader(&buf, 66)
writeLedgerHeader(&buf, testLedgerHeader{sequence: 64})
writeLedgerHeader(&buf, testLedgerHeader{sequence: 65})
writeLedgerHeader(&buf, testLedgerHeader{sequence: 66})

mockRunner.On("catchup", uint32(64), uint32(66)).Return(nil).Once()
mockRunner.On("getProcessExitChan").Return(exitChan)
Expand Down Expand Up @@ -941,7 +967,7 @@ func TestCaptiveGetLedgerTerminated(t *testing.T) {
},
}

go writeLedgerHeader(writer, 64)
go writeLedgerHeader(writer, testLedgerHeader{sequence: 64})
err := captiveBackend.PrepareRange(BoundedRange(64, 100))
assert.NoError(t, err)

Expand Down Expand Up @@ -1109,3 +1135,85 @@ func TestCaptiveIsPrepared(t *testing.T) {
})
}
}

// TestCaptivePreviousLedgerCheck checks if previousLedgerHash is set in PrepareRange
// and then checked and updated in GetLedger.
func TestCaptivePreviousLedgerCheck(t *testing.T) {
var buf bytes.Buffer

h := 3
for i := 192; i <= 300; i++ {
writeLedgerHeader(&buf, testLedgerHeader{
sequence: uint32(i),
hash: fmt.Sprintf("%02x00000000000000000000000000000000000000000000000000000000000000", h),
previousLedgerHash: fmt.Sprintf("%02x00000000000000000000000000000000000000000000000000000000000000", h-1),
})
h++
}

// Write invalid hash
writeLedgerHeader(&buf, testLedgerHeader{
sequence: 301,
hash: "0000000000000000000000000000000000000000000000000000000000000000",
previousLedgerHash: "0000000000000000000000000000000000000000000000000000000000000000",
})

ch := make(chan struct{})
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(254), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return(&buf)
mockRunner.On("getProcessExitChan").Return(ch)
mockRunner.On("getProcessExitError").Return(nil).Maybe()
mockRunner.On("close").Run(func(args mock.Arguments) {
close(ch)
}).Return(nil)
defer mockRunner.AssertExpectations(t)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetRootHAS").
Return(historyarchive.HistoryArchiveState{
CurrentLedger: uint32(255),
}, nil)
mockArchive.
On("GetLedgerHeader", uint32(255)).
Return(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
PreviousLedgerHash: xdr.Hash{1, 1, 1, 1},
},
}, nil).Once()
defer mockArchive.AssertExpectations(t)

mockLedgerHashStore := &MockLedgerHashStore{}
mockLedgerHashStore.On("GetLedgerHash", uint32(254)).
Return("", false, nil).Once()
mockLedgerHashStore.On("GetLedgerHash", uint32(191)).
Return("0200000000000000000000000000000000000000000000000000000000000000", true, nil).Once()
defer mockLedgerHashStore.AssertExpectations(t)

captiveBackend := CaptiveStellarCore{
configPath: "stellar-core.cfg",
archive: mockArchive,
networkPassphrase: network.PublicNetworkPassphrase,
stellarCoreRunnerFactory: func(configPath string) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
},
ledgerHashStore: mockLedgerHashStore,
}

err := captiveBackend.PrepareRange(UnboundedRange(300))
assert.NoError(t, err)

exists, meta, err := captiveBackend.GetLedger(300)
assert.NoError(t, err)
assert.True(t, exists)
assert.NotNil(t, captiveBackend.previousLedgerHash)
assert.Equal(t, uint32(301), captiveBackend.nextLedger)
assert.Equal(t, meta.LedgerHash().HexString(), *captiveBackend.previousLedgerHash)

_, _, err = captiveBackend.GetLedger(301)
assert.EqualError(t, err, "unexpected previous ledger hash for ledger 301 (expected=6f00000000000000000000000000000000000000000000000000000000000000 actual=0000000000000000000000000000000000000000000000000000000000000000)")

err = captiveBackend.Close()
assert.NoError(t, err)
}
7 changes: 7 additions & 0 deletions xdr/hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package xdr

import "encoding/hex"

func (h Hash) HexString() string {
return hex.EncodeToString(h[:])
}
8 changes: 8 additions & 0 deletions xdr/ledger_close_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@ package xdr
func (l LedgerCloseMeta) LedgerSequence() uint32 {
return uint32(l.MustV0().LedgerHeader.Header.LedgerSeq)
}

func (l LedgerCloseMeta) LedgerHash() Hash {
return l.MustV0().LedgerHeader.Hash
}

func (l LedgerCloseMeta) PreviousLedgerHash() Hash {
return l.MustV0().LedgerHeader.Header.PreviousLedgerHash
}

0 comments on commit 7fa5a54

Please sign in to comment.