diff --git a/exp/services/captivecore/README.md b/exp/services/captivecore/README.md index d428ab53a0..3d621008fe 100644 --- a/exp/services/captivecore/README.md +++ b/exp/services/captivecore/README.md @@ -87,6 +87,7 @@ Usage: captivecore [flags] Flags: + --db-url Horizon Postgres URL (optional) used to lookup the ledger hash for sequence numbers --stellar-core-binary-path Path to stellar core binary --stellar-core-config-path Path to stellar core config file --history-archive-urls Comma-separated list of stellar history archives to connect with diff --git a/exp/services/captivecore/main.go b/exp/services/captivecore/main.go index c9677520fe..f724fc4083 100644 --- a/exp/services/captivecore/main.go +++ b/exp/services/captivecore/main.go @@ -13,13 +13,14 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/network" "github.com/stellar/go/support/config" + "github.com/stellar/go/support/db" supporthttp "github.com/stellar/go/support/http" supportlog "github.com/stellar/go/support/log" ) func main() { var port int - var networkPassphrase, binaryPath, configPath string + var networkPassphrase, binaryPath, configPath, dbURL string var historyArchiveURLs []string var logLevel logrus.Level logger := supportlog.New() @@ -85,6 +86,14 @@ func main() { }, Usage: "minimum log severity (debug, info, warn, error) to log", }, + &config.ConfigOption{ + Name: "db-url", + EnvVar: "DATABASE_URL", + ConfigKey: &dbURL, + OptType: types.String, + Required: false, + Usage: "horizon postgres database to connect with", + }, } cmd := &cobra.Command{ Use: "captivecore", @@ -94,7 +103,20 @@ func main() { configOpts.SetValues() logger.Level = logLevel - core, err := ledgerbackend.NewCaptive(binaryPath, configPath, networkPassphrase, historyArchiveURLs) + var ledgerStore ledgerbackend.LedgerStore + var dbConn *db.Session + if len(dbURL) == 0 { + ledgerStore = ledgerbackend.EmptyLedgerStore{} + } else { + var err error + dbConn, err = db.Open("postgres", dbURL) + if err != nil { + logger.WithError(err).Fatal("Could not create db connection instance") + } + ledgerStore = ledgerbackend.NewDBLedgerStore(dbConn) + } + + core, err := ledgerbackend.NewCaptive(binaryPath, configPath, networkPassphrase, historyArchiveURLs, ledgerStore) if err != nil { logger.WithError(err).Fatal("Could not create captive core instance") } @@ -108,6 +130,9 @@ func main() { }, OnStopping: func() { api.Shutdown() + if dbConn != nil { + dbConn.Close() + } }, }) }, diff --git a/exp/tools/captive-core-start-tester/main.go b/exp/tools/captive-core-start-tester/main.go index 547942bb70..dba9a3fee5 100644 --- a/exp/tools/captive-core-start-tester/main.go +++ b/exp/tools/captive-core-start-tester/main.go @@ -28,6 +28,7 @@ func check(ledger uint32) bool { "stellar-core-standalone2.cfg", "Standalone Network ; February 2017", []string{"http://localhost:1570"}, + ledgerbackend.EmptyLedgerStore{}, ) if err != nil { panic(err) diff --git a/ingest/doc_test.go b/ingest/doc_test.go index 5c8b7a2bd7..b795eb952d 100644 --- a/ingest/doc_test.go +++ b/ingest/doc_test.go @@ -107,6 +107,7 @@ func Example_changes() { "/opt/stellar-core.cfg", networkPassphrase, []string{archiveURL}, + ledgerbackend.EmptyLedgerStore{}, ) if err != nil { panic(err) diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 404abc89ca..920f9bc215 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -67,6 +67,7 @@ type CaptiveStellarCore struct { networkPassphrase string historyURLs []string archive historyarchive.ArchiveInterface + ledgerStore LedgerStore ledgerBuffer bufferedLedgerMetaReader @@ -98,7 +99,7 @@ type CaptiveStellarCore struct { // // All parameters are required, except configPath which is not required when // working with BoundedRanges only. -func NewCaptive(executablePath, configPath, networkPassphrase string, historyURLs []string) (*CaptiveStellarCore, error) { +func NewCaptive(executablePath, configPath, networkPassphrase string, historyURLs []string, ledgerStore LedgerStore) (*CaptiveStellarCore, error) { archive, err := historyarchive.Connect( historyURLs[0], historyarchive.ConnectOptions{ @@ -116,6 +117,7 @@ func NewCaptive(executablePath, configPath, networkPassphrase string, historyURL historyURLs: historyURLs, networkPassphrase: networkPassphrase, waitIntervalPrepareRange: time.Second, + ledgerStore: ledgerStore, } c.stellarCoreRunnerFactory = func(configPath2 string) (stellarCoreRunnerInterface, error) { runner, innerErr := newStellarCoreRunner(executablePath, configPath2, networkPassphrase, historyURLs) @@ -268,6 +270,34 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH return } + // try to get ledger hash from a trusted source before using the history archive checkpoints + ledger, exists, err := c.ledgerStore.LastLedger(from) + if err != nil { + err = errors.Wrapf(err, "Cannot fetch ledgers preceding %v", from) + return + } + if exists && + ledger.Sequence > 1 && // stellar-core cannot run from ledger 1 + // if the last ledger is too far behind, let's use the history archive instead + (from-ledger.Sequence) < 2*historyarchive.CheckpointFreq { + runFrom = ledger.Sequence + ledgerHash = ledger.Hash + if ledger.Sequence < 63 { + // if we run from a sequence which occurs before the first checkpoint + // the first ledger we will stream is 2 + nextLedger = 2 + } else if historyarchive.IsCheckpoint(ledger.Sequence) { + // if we run from a checkpoint sequence + // the first ledger we will stream is the next ledger + nextLedger = ledger.Sequence + 1 + } else { + // if we run from a non-checkpoint sequence + // the first ledger we will stream is the first ledger in the checkpoint range which spans ledger.Sequence + nextLedger = historyarchive.PrevCheckpoint(ledger.Sequence) + 1 + } + return + } + if from <= 63 { // For ledgers before (and including) first checkpoint, we start streaming // without providing a hash, to avoid waiting for the checkpoint. diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index ee91c9bbb2..3c682fea54 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -126,6 +126,7 @@ func TestCaptiveNew(t *testing.T) { configPath, networkPassphrase, historyURLs, + &MockLedgerStore{}, ) assert.NoError(t, err) @@ -405,10 +406,15 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { On("GetLedgerHeader", uint32(127)). Return(xdr.LedgerHeaderHistoryEntry{}, nil) + mockLedgerStore := &MockLedgerStore{} + mockLedgerStore.On("LastLedger", uint32(128)). + Return(Ledger{}, false, nil).Once() + captiveBackend := CaptiveStellarCore{ archive: mockArchive, networkPassphrase: network.PublicNetworkPassphrase, configPath: "foo", + ledgerStore: mockLedgerStore, stellarCoreRunnerFactory: func(configPath string) (stellarCoreRunnerInterface, error) { return mockRunner, nil }, @@ -416,6 +422,7 @@ func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) { err := captiveBackend.PrepareRange(UnboundedRange(128)) assert.EqualError(t, err, "opening subprocess: error running stellar-core: transient error") + mockLedgerStore.AssertExpectations(t) } func TestCaptivePrepareRangeUnboundedRange_ErrClosingExistingSession(t *testing.T) { @@ -464,8 +471,10 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { On("GetLedgerHeader", uint32(63)). Return(xdr.LedgerHeaderHistoryEntry{}, nil) + mockLedgerStore := &MockLedgerStore{} captiveBackend := CaptiveStellarCore{ archive: mockArchive, + ledgerStore: mockLedgerStore, networkPassphrase: network.PublicNetworkPassphrase, configPath: "foo", stellarCoreRunnerFactory: func(configPath string) (stellarCoreRunnerInterface, error) { @@ -473,12 +482,15 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { }, } + mockLedgerStore.On("LastLedger", uint32(65)). + Return(Ledger{}, false, nil).Once() err := captiveBackend.PrepareRange(UnboundedRange(65)) assert.NoError(t, err) captiveBackend.nextLedger = 64 err = captiveBackend.PrepareRange(UnboundedRange(65)) assert.NoError(t, err) + mockLedgerStore.AssertExpectations(t) } func TestGetLatestLedgerSequence(t *testing.T) { @@ -505,7 +517,12 @@ func TestGetLatestLedgerSequence(t *testing.T) { On("GetLedgerHeader", uint32(63)). Return(xdr.LedgerHeaderHistoryEntry{}, nil) + mockLedgerStore := &MockLedgerStore{} + mockLedgerStore.On("LastLedger", uint32(64)). + Return(Ledger{}, false, nil).Once() + captiveBackend := CaptiveStellarCore{ + ledgerStore: mockLedgerStore, archive: mockArchive, networkPassphrase: network.PublicNetworkPassphrase, configPath: "foo", @@ -539,6 +556,7 @@ func TestGetLatestLedgerSequence(t *testing.T) { mockRunner.On("close").Return(nil).Once() err = captiveBackend.Close() assert.NoError(t, err) + mockLedgerStore.AssertExpectations(t) } func TestCaptiveGetLedger(t *testing.T) { tt := assert.New(t) @@ -836,6 +854,74 @@ func TestCaptiveGetLedgerTerminated(t *testing.T) { assert.EqualError(t, err, "stellar-core process exited unexpectedly without an error") } +func TestCaptiveUseOfLedgerStore(t *testing.T) { + mockRunner := &stellarCoreRunnerMock{} + mockArchive := &historyarchive.MockArchive{} + mockArchive. + On("GetLedgerHeader", uint32(255)). + Return(xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + PreviousLedgerHash: xdr.Hash{1, 1, 1, 1}, + }, + }, nil) + + mockArchive. + On("GetLedgerHeader", uint32(3)). + Return(xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + PreviousLedgerHash: xdr.Hash{2}, + }, + }, nil) + + mockLedgerStore := &MockLedgerStore{} + mockLedgerStore.On("LastLedger", uint32(300)). + Return(Ledger{Sequence: 19, Hash: "abc"}, true, nil).Once() + mockLedgerStore.On("LastLedger", uint32(24)). + Return(Ledger{Sequence: 19, Hash: "abc"}, true, nil).Once() + mockLedgerStore.On("LastLedger", uint32(14)). + Return(Ledger{}, true, fmt.Errorf("transient error")).Once() + mockLedgerStore.On("LastLedger", uint32(9)). + Return(Ledger{}, false, nil).Once() + mockLedgerStore.On("LastLedger", uint32(86)). + Return(Ledger{Sequence: 85, Hash: "cde"}, true, nil).Once() + + captiveBackend := CaptiveStellarCore{ + archive: mockArchive, + networkPassphrase: network.PublicNetworkPassphrase, + stellarCoreRunner: mockRunner, + ledgerStore: mockLedgerStore, + } + + runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(24) + assert.NoError(t, err) + assert.Equal(t, uint32(19), runFrom) + assert.Equal(t, "abc", ledgerHash) + assert.Equal(t, uint32(2), nextLedger) + + runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(86) + assert.NoError(t, err) + assert.Equal(t, uint32(85), runFrom) + assert.Equal(t, "cde", ledgerHash) + assert.Equal(t, uint32(64), nextLedger) + + runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(14) + assert.EqualError(t, err, "Cannot fetch ledgers preceding 14: transient error") + + runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(300) + assert.NoError(t, err) + assert.Equal(t, uint32(254), runFrom, "runFrom") + assert.Equal(t, "0101010100000000000000000000000000000000000000000000000000000000", ledgerHash) + assert.Equal(t, uint32(192), nextLedger, "nextLedger") + + runFrom, ledgerHash, nextLedger, err = captiveBackend.runFromParams(9) + assert.NoError(t, err) + assert.Equal(t, uint32(2), runFrom, "runFrom") + assert.Equal(t, "0200000000000000000000000000000000000000000000000000000000000000", ledgerHash) + assert.Equal(t, uint32(2), nextLedger, "nextLedger") + + mockLedgerStore.AssertExpectations(t) +} + func TestCaptiveRunFromParams(t *testing.T) { var tests = []struct { from uint32 @@ -876,10 +962,15 @@ func TestCaptiveRunFromParams(t *testing.T) { }, }, nil) + mockLedgerStore := &MockLedgerStore{} + mockLedgerStore.On("LastLedger", tc.from). + Return(Ledger{}, false, nil).Once() + captiveBackend := CaptiveStellarCore{ archive: mockArchive, networkPassphrase: network.PublicNetworkPassphrase, stellarCoreRunner: mockRunner, + ledgerStore: mockLedgerStore, } runFrom, ledgerHash, nextLedger, err := captiveBackend.runFromParams(tc.from) @@ -891,6 +982,7 @@ func TestCaptiveRunFromParams(t *testing.T) { tt.Equal("0101010100000000000000000000000000000000000000000000000000000000", ledgerHash) } tt.Equal(tc.nextLedger, nextLedger, "nextLedger") + mockLedgerStore.AssertExpectations(t) }) } } diff --git a/ingest/ledgerbackend/ledger_store.go b/ingest/ledgerbackend/ledger_store.go new file mode 100644 index 0000000000..77314e3069 --- /dev/null +++ b/ingest/ledgerbackend/ledger_store.go @@ -0,0 +1,54 @@ +package ledgerbackend + +import ( + sq "github.com/Masterminds/squirrel" + "github.com/stellar/go/support/db" +) + +// Ledger contains information about a ledger (sequence number and hash) +type Ledger struct { + Sequence uint32 `db:"sequence"` + Hash string `db:"ledger_hash"` +} + +// LedgerStore is used to query ledger data from the Horizon DB +type LedgerStore interface { + // LastLedger returns the highest ledger which is less than `seq` if there exists such a ledger + LastLedger(seq uint32) (Ledger, bool, error) +} + +// EmptyLedgerStore is a ledger store which is empty +type EmptyLedgerStore struct{} + +// LastLedger always returns false indicating there is no ledger +func (e EmptyLedgerStore) LastLedger(seq uint32) (Ledger, bool, error) { + return Ledger{}, false, nil +} + +// DBLedgerStore is a ledger store backed by the Horizon database +type DBLedgerStore struct { + session *db.Session +} + +// NewDBLedgerStore constructs a new DBLedgerStore +func NewDBLedgerStore(session *db.Session) LedgerStore { + return DBLedgerStore{session: session} +} + +// LastLedger returns the highest ledger which is less than `seq` if there exists such a ledger +func (l DBLedgerStore) LastLedger(seq uint32) (Ledger, bool, error) { + sql := sq.Select( + "hl.sequence", + "hl.ledger_hash", + ).From("history_ledgers hl"). + Limit(1). + Where("sequence < ?", seq). + OrderBy("sequence desc") + + var dest Ledger + err := l.session.Get(&dest, sql) + if l.session.NoRows(err) { + return dest, false, nil + } + return dest, true, err +} diff --git a/ingest/ledgerbackend/mock_ledger_store.go b/ingest/ledgerbackend/mock_ledger_store.go new file mode 100644 index 0000000000..0372c0881f --- /dev/null +++ b/ingest/ledgerbackend/mock_ledger_store.go @@ -0,0 +1,14 @@ +package ledgerbackend + +import "github.com/stretchr/testify/mock" + +var _ LedgerStore = (*MockLedgerStore)(nil) + +type MockLedgerStore struct { + mock.Mock +} + +func (m *MockLedgerStore) LastLedger(seq uint32) (Ledger, bool, error) { + args := m.Called(seq) + return args.Get(0).(Ledger), args.Get(1).(bool), args.Error(2) +} diff --git a/services/horizon/internal/db2/history/ledger_test.go b/services/horizon/internal/db2/history/ledger_test.go index a3fb6c3b79..9a26a51e92 100644 --- a/services/horizon/internal/db2/history/ledger_test.go +++ b/services/horizon/internal/db2/history/ledger_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/guregu/null" + "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/services/horizon/internal/toid" "github.com/stellar/go/xdr" @@ -48,6 +49,26 @@ func TestLedgerQueries(t *testing.T) { tt.Assert.Contains(foundSeqs, int32(2)) tt.Assert.Contains(foundSeqs, int32(3)) } + + store := ledgerbackend.NewDBLedgerStore(tt.HorizonSession()) + ledger, exists, err := store.LastLedger(100) + tt.Assert.NoError(err) + tt.Assert.True(exists) + tt.Assert.Equal(ledger.Sequence, uint32(3)) + + ledger, exists, err = store.LastLedger(3) + tt.Assert.NoError(err) + tt.Assert.True(exists) + tt.Assert.Equal(ledger.Sequence, uint32(2)) + + ledger, exists, err = store.LastLedger(2) + tt.Assert.NoError(err) + tt.Assert.True(exists) + tt.Assert.Equal(ledger.Sequence, uint32(1)) + + ledger, exists, err = store.LastLedger(1) + tt.Assert.NoError(err) + tt.Assert.False(exists) } func TestInsertLedger(t *testing.T) { diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 3837b5030b..2d94a0bfc7 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -177,6 +177,7 @@ func NewSystem(config Config) (System, error) { config.StellarCoreConfigPath, config.NetworkPassphrase, []string{config.HistoryArchiveURL}, + ledgerbackend.NewDBLedgerStore(config.HistorySession), ) if err != nil { cancel()