Skip to content

Commit

Permalink
Implement db backed ledger store for the captive core backend
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Nov 12, 2020
1 parent f80da5f commit a85196f
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 3 deletions.
1 change: 1 addition & 0 deletions exp/services/captivecore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Usage:
captivecore [flags]
Flags:
--db-url Horizon Postgres URL (optional) used to lookup the ledger hash for sequence numbers
--stellar-core-binary-path Path to stellar core binary
--stellar-core-config-path Path to stellar core config file
--history-archive-urls Comma-separated list of stellar history archives to connect with
Expand Down
29 changes: 27 additions & 2 deletions exp/services/captivecore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/db"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
)

func main() {
var port int
var networkPassphrase, binaryPath, configPath string
var networkPassphrase, binaryPath, configPath, dbURL string
var historyArchiveURLs []string
var logLevel logrus.Level
logger := supportlog.New()
Expand Down Expand Up @@ -85,6 +86,14 @@ func main() {
},
Usage: "minimum log severity (debug, info, warn, error) to log",
},
&config.ConfigOption{
Name: "db-url",
EnvVar: "DATABASE_URL",
ConfigKey: &dbURL,
OptType: types.String,
Required: false,
Usage: "horizon postgres database to connect with",
},
}
cmd := &cobra.Command{
Use: "captivecore",
Expand All @@ -94,7 +103,20 @@ func main() {
configOpts.SetValues()
logger.Level = logLevel

core, err := ledgerbackend.NewCaptive(binaryPath, configPath, networkPassphrase, historyArchiveURLs)
var ledgerStore ledgerbackend.LedgerStore
var dbConn *db.Session
if len(dbURL) == 0 {
ledgerStore = ledgerbackend.EmptyLedgerStore{}
} else {
var err error
dbConn, err = db.Open("postgres", dbURL)
if err != nil {
logger.WithError(err).Fatal("Could not create db connection instance")
}
ledgerStore = ledgerbackend.NewDBLedgerStore(dbConn)
}

core, err := ledgerbackend.NewCaptive(binaryPath, configPath, networkPassphrase, historyArchiveURLs, ledgerStore)
if err != nil {
logger.WithError(err).Fatal("Could not create captive core instance")
}
Expand All @@ -108,6 +130,9 @@ func main() {
},
OnStopping: func() {
api.Shutdown()
if dbConn != nil {
dbConn.Close()
}
},
})
},
Expand Down
1 change: 1 addition & 0 deletions exp/tools/captive-core-start-tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func check(ledger uint32) bool {
"stellar-core-standalone2.cfg",
"Standalone Network ; February 2017",
[]string{"http://localhost:1570"},
ledgerbackend.EmptyLedgerStore{},
)
if err != nil {
panic(err)
Expand Down
1 change: 1 addition & 0 deletions ingest/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func Example_changes() {
"/opt/stellar-core.cfg",
networkPassphrase,
[]string{archiveURL},
ledgerbackend.EmptyLedgerStore{},
)
if err != nil {
panic(err)
Expand Down
32 changes: 31 additions & 1 deletion ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type CaptiveStellarCore struct {
networkPassphrase string
historyURLs []string
archive historyarchive.ArchiveInterface
ledgerStore LedgerStore

ledgerBuffer bufferedLedgerMetaReader

Expand Down Expand Up @@ -98,7 +99,7 @@ type CaptiveStellarCore struct {
//
// All parameters are required, except configPath which is not required when
// working with BoundedRanges only.
func NewCaptive(executablePath, configPath, networkPassphrase string, historyURLs []string) (*CaptiveStellarCore, error) {
func NewCaptive(executablePath, configPath, networkPassphrase string, historyURLs []string, ledgerStore LedgerStore) (*CaptiveStellarCore, error) {
archive, err := historyarchive.Connect(
historyURLs[0],
historyarchive.ConnectOptions{
Expand All @@ -116,6 +117,7 @@ func NewCaptive(executablePath, configPath, networkPassphrase string, historyURL
historyURLs: historyURLs,
networkPassphrase: networkPassphrase,
waitIntervalPrepareRange: time.Second,
ledgerStore: ledgerStore,
}
c.stellarCoreRunnerFactory = func(configPath2 string) (stellarCoreRunnerInterface, error) {
runner, innerErr := newStellarCoreRunner(executablePath, configPath2, networkPassphrase, historyURLs)
Expand Down Expand Up @@ -268,6 +270,34 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH
return
}

// try to get ledger hash from a trusted source before using the history archive checkpoints
ledger, exists, err := c.ledgerStore.LastLedger(from)
if err != nil {
err = errors.Wrapf(err, "Cannot fetch ledgers preceding %v", from)
return
}
if exists &&
ledger.Sequence > 1 && // stellar-core cannot run from ledger 1
// if the last ledger is too far behind, let's use the history archive instead
(from-ledger.Sequence) < 2*historyarchive.CheckpointFreq {
runFrom = ledger.Sequence
ledgerHash = ledger.Hash
if ledger.Sequence < 63 {
// if we run from a sequence which occurs before the first checkpoint
// the first ledger we will stream is 2
nextLedger = 2
} else if historyarchive.IsCheckpoint(ledger.Sequence) {
// if we run from a checkpoint sequence
// the first ledger we will stream is the next ledger
nextLedger = ledger.Sequence + 1
} else {
// if we run from a non-checkpoint sequence
// the first ledger we will stream is the first ledger in the checkpoint range which spans ledger.Sequence
nextLedger = historyarchive.PrevCheckpoint(ledger.Sequence) + 1
}
return
}

if from <= 63 {
// For ledgers before (and including) first checkpoint, we start streaming
// without providing a hash, to avoid waiting for the checkpoint.
Expand Down
92 changes: 92 additions & 0 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestCaptiveNew(t *testing.T) {
configPath,
networkPassphrase,
historyURLs,
&MockLedgerStore{},
)

assert.NoError(t, err)
Expand Down Expand Up @@ -405,17 +406,23 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) {
On("GetLedgerHeader", uint32(127)).
Return(xdr.LedgerHeaderHistoryEntry{}, nil)

mockLedgerStore := &MockLedgerStore{}
mockLedgerStore.On("LastLedger", uint32(128)).
Return(Ledger{}, false, nil).Once()

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
networkPassphrase: network.PublicNetworkPassphrase,
configPath: "foo",
ledgerStore: mockLedgerStore,
stellarCoreRunnerFactory: func(configPath string) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
},
}

err := captiveBackend.PrepareRange(UnboundedRange(128))
assert.EqualError(t, err, "opening subprocess: error running stellar-core: transient error")
mockLedgerStore.AssertExpectations(t)
}

func TestCaptivePrepareRangeUnboundedRange_ErrClosingExistingSession(t *testing.T) {
Expand Down Expand Up @@ -464,21 +471,26 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
On("GetLedgerHeader", uint32(63)).
Return(xdr.LedgerHeaderHistoryEntry{}, nil)

mockLedgerStore := &MockLedgerStore{}
captiveBackend := CaptiveStellarCore{
archive: mockArchive,
ledgerStore: mockLedgerStore,
networkPassphrase: network.PublicNetworkPassphrase,
configPath: "foo",
stellarCoreRunnerFactory: func(configPath string) (stellarCoreRunnerInterface, error) {
return mockRunner, nil
},
}

mockLedgerStore.On("LastLedger", uint32(65)).
Return(Ledger{}, false, nil).Once()
err := captiveBackend.PrepareRange(UnboundedRange(65))
assert.NoError(t, err)

captiveBackend.nextLedger = 64
err = captiveBackend.PrepareRange(UnboundedRange(65))
assert.NoError(t, err)
mockLedgerStore.AssertExpectations(t)
}

func TestGetLatestLedgerSequence(t *testing.T) {
Expand All @@ -505,7 +517,12 @@ func TestGetLatestLedgerSequence(t *testing.T) {
On("GetLedgerHeader", uint32(63)).
Return(xdr.LedgerHeaderHistoryEntry{}, nil)

mockLedgerStore := &MockLedgerStore{}
mockLedgerStore.On("LastLedger", uint32(64)).
Return(Ledger{}, false, nil).Once()

captiveBackend := CaptiveStellarCore{
ledgerStore: mockLedgerStore,
archive: mockArchive,
networkPassphrase: network.PublicNetworkPassphrase,
configPath: "foo",
Expand Down Expand Up @@ -539,6 +556,7 @@ func TestGetLatestLedgerSequence(t *testing.T) {
mockRunner.On("close").Return(nil).Once()
err = captiveBackend.Close()
assert.NoError(t, err)
mockLedgerStore.AssertExpectations(t)
}
func TestCaptiveGetLedger(t *testing.T) {
tt := assert.New(t)
Expand Down Expand Up @@ -836,6 +854,74 @@ func TestCaptiveGetLedgerTerminated(t *testing.T) {
assert.EqualError(t, err, "stellar-core process exited unexpectedly without an error")
}

func TestCaptiveUseOfLedgerStore(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockArchive := &historyarchive.MockArchive{}
mockArchive.
On("GetLedgerHeader", uint32(255)).
Return(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
PreviousLedgerHash: xdr.Hash{1, 1, 1, 1},
},
}, nil)

mockArchive.
On("GetLedgerHeader", uint32(3)).
Return(xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
PreviousLedgerHash: xdr.Hash{2},
},
}, nil)

mockLedgerStore := &MockLedgerStore{}
mockLedgerStore.On("LastLedger", uint32(300)).
Return(Ledger{Sequence: 19, Hash: "abc"}, true, nil).Once()
mockLedgerStore.On("LastLedger", uint32(24)).
Return(Ledger{Sequence: 19, Hash: "abc"}, true, nil).Once()
mockLedgerStore.On("LastLedger", uint32(14)).
Return(Ledger{}, true, fmt.Errorf("transient error")).Once()
mockLedgerStore.On("LastLedger", uint32(9)).
Return(Ledger{}, false, nil).Once()
mockLedgerStore.On("LastLedger", uint32(86)).
Return(Ledger{Sequence: 85, Hash: "cde"}, true, nil).Once()

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
networkPassphrase: network.PublicNetworkPassphrase,
stellarCoreRunner: mockRunner,
ledgerStore: mockLedgerStore,
}

runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(24)
assert.NoError(t, err)
assert.Equal(t, uint32(19), runFrom)
assert.Equal(t, "abc", ledgerHash)
assert.Equal(t, uint32(2), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(86)
assert.NoError(t, err)
assert.Equal(t, uint32(85), runFrom)
assert.Equal(t, "cde", ledgerHash)
assert.Equal(t, uint32(64), nextLedger)

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(14)
assert.EqualError(t, err, "Cannot fetch ledgers preceding 14: transient error")

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(300)
assert.NoError(t, err)
assert.Equal(t, uint32(254), runFrom, "runFrom")
assert.Equal(t, "0101010100000000000000000000000000000000000000000000000000000000", ledgerHash)
assert.Equal(t, uint32(192), nextLedger, "nextLedger")

runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(9)
assert.NoError(t, err)
assert.Equal(t, uint32(2), runFrom, "runFrom")
assert.Equal(t, "0200000000000000000000000000000000000000000000000000000000000000", ledgerHash)
assert.Equal(t, uint32(2), nextLedger, "nextLedger")

mockLedgerStore.AssertExpectations(t)
}

func TestCaptiveRunFromParams(t *testing.T) {
var tests = []struct {
from uint32
Expand Down Expand Up @@ -876,10 +962,15 @@ func TestCaptiveRunFromParams(t *testing.T) {
},
}, nil)

mockLedgerStore := &MockLedgerStore{}
mockLedgerStore.On("LastLedger", tc.from).
Return(Ledger{}, false, nil).Once()

captiveBackend := CaptiveStellarCore{
archive: mockArchive,
networkPassphrase: network.PublicNetworkPassphrase,
stellarCoreRunner: mockRunner,
ledgerStore: mockLedgerStore,
}

runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(tc.from)
Expand All @@ -891,6 +982,7 @@ func TestCaptiveRunFromParams(t *testing.T) {
tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash)
}
tt.Equal(tc.nextLedger, nextLedger, "nextLedger")
mockLedgerStore.AssertExpectations(t)
})
}
}
54 changes: 54 additions & 0 deletions ingest/ledgerbackend/ledger_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package ledgerbackend

import (
sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/support/db"
)

// Ledger contains information about a ledger (sequence number and hash)
type Ledger struct {
Sequence uint32 `db:"sequence"`
Hash string `db:"ledger_hash"`
}

// LedgerStore is used to query ledger data from the Horizon DB
type LedgerStore interface {
// LastLedger returns the highest ledger which is less than `seq` if there exists such a ledger
LastLedger(seq uint32) (Ledger, bool, error)
}

// EmptyLedgerStore is a ledger store which is empty
type EmptyLedgerStore struct{}

// LastLedger always returns false indicating there is no ledger
func (e EmptyLedgerStore) LastLedger(seq uint32) (Ledger, bool, error) {
return Ledger{}, false, nil
}

// DBLedgerStore is a ledger store backed by the Horizon database
type DBLedgerStore struct {
session *db.Session
}

// NewDBLedgerStore constructs a new DBLedgerStore
func NewDBLedgerStore(session *db.Session) LedgerStore {
return DBLedgerStore{session: session}
}

// LastLedger returns the highest ledger which is less than `seq` if there exists such a ledger
func (l DBLedgerStore) LastLedger(seq uint32) (Ledger, bool, error) {
sql := sq.Select(
"hl.sequence",
"hl.ledger_hash",
).From("history_ledgers hl").
Limit(1).
Where("sequence < ?", seq).
OrderBy("sequence desc")

var dest Ledger
err := l.session.Get(&dest, sql)
if l.session.NoRows(err) {
return dest, false, nil
}
return dest, true, err
}
14 changes: 14 additions & 0 deletions ingest/ledgerbackend/mock_ledger_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package ledgerbackend

import "github.com/stretchr/testify/mock"

var _ LedgerStore = (*MockLedgerStore)(nil)

type MockLedgerStore struct {
mock.Mock
}

func (m *MockLedgerStore) LastLedger(seq uint32) (Ledger, bool, error) {
args := m.Called(seq)
return args.Get(0).(Ledger), args.Get(1).(bool), args.Error(2)
}
Loading

0 comments on commit a85196f

Please sign in to comment.