Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Removed LedgerBackend calls inside ingest critical section #3518

Merged
merged 18 commits into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions ingest/ledger_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/xdr"
)

// ChangeReader provides convenient, streaming access to a sequence of Changes.
Expand Down Expand Up @@ -57,6 +58,21 @@ func NewLedgerChangeReader(backend ledgerbackend.LedgerBackend, networkPassphras
}, nil
}

// NewLedgerChangeReaderFromLedgerCloseMeta constructs a new LedgerChangeReader instance bound to the given ledger.
// Note that the returned LedgerChangeReader is not thread safe and should not be shared
// by multiple goroutines.
func NewLedgerChangeReaderFromLedgerCloseMeta(networkPassphrase string, ledger xdr.LedgerCloseMeta) (*LedgerChangeReader, error) {
transactionReader, err := NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledger)
if err != nil {
return nil, err
}

return &LedgerChangeReader{
LedgerTransactionReader: transactionReader,
state: feeChangesState,
}, nil
}

// Read returns the next change in the stream.
// If there are no changes remaining io.EOF is returned as an error.
func (r *LedgerChangeReader) Read() (Change, error) {
Expand Down
10 changes: 8 additions & 2 deletions ingest/ledger_transaction_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type LedgerTransactionReader struct {
}

// NewLedgerTransactionReader creates a new TransactionReader instance.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReader(backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) {
exists, ledgerCloseMeta, err := backend.GetLedger(sequence)
if err != nil {
Expand All @@ -30,8 +30,14 @@ func NewLedgerTransactionReader(backend ledgerbackend.LedgerBackend, networkPass
return nil, ErrNotFound
}

return NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta)
}

// NewLedgerTransactionReaderFromXdr creates a new TransactionReader instance from xdr.LedgerCloseMeta.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error) {
reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta}
if err = reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil {
if err := reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil {
return nil, errors.Wrap(err, "error extracting transactions from ledger close meta")
}
return reader, nil
Expand Down
47 changes: 37 additions & 10 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ type CaptiveStellarCore struct {
// for scenarios when Horizon consumes ledgers faster than Stellar-Core produces them
// and using `time.Sleep` when ledger is not available can actually slow entire
// ingestion process.
blocking bool
// blockingLock locks access to blocking.
blockingLock sync.Mutex
blocking bool

// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta
Expand Down Expand Up @@ -233,7 +235,7 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error
// the requested ledger
c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from)
c.lastLedger = &to
c.blocking = true
c.setBlocking(true)
c.previousLedgerHash = nil

return nil
Expand Down Expand Up @@ -292,7 +294,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {
}
}

c.blocking = false
c.setBlocking(false)

return nil
}
Expand Down Expand Up @@ -398,10 +400,10 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error {
return nil
}

old := c.blocking
c.blocking = true
old := c.isBlocking()
c.setBlocking(true)
_, _, err := c.GetLedger(ledgerRange.from)
c.blocking = old
c.setBlocking(old)

if err != nil {
return errors.Wrapf(err, "Error fast-forwarding to %d", ledgerRange.from)
Expand Down Expand Up @@ -452,6 +454,18 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
return false
}

// GetLedgerBlocking works as GetLedger but will block until the ledger is
// available in the backend (even for UnboundedRange).
// Please note that requesting a ledger sequence far after current ledger will
// block the execution for a long time.
func (c *CaptiveStellarCore) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we have this function, should we make GetLedger() always be non blocking?

Copy link
Contributor

@paulbellamy paulbellamy Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also looks like it's not really threadsafe with GetLedger(), given it modifies the underlying thing. Wouldn't "the go way" be to have the default way be blocking, and use a goroutine/channel when you want it async? (When do we want it async, anyway?)

Edit: Ah, GetLedger(), is more like MaybeGetLedger() in that it might not exist in the backend yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we probably should make GetLedger blocking by default but before doing this I think we should 1) ensure it doesn't cause unexpected issues (we should be able to tell after testing Horizon release with this PR), 2) we are modifying stable ingest package so we should do it via major release. Going to create an issue about it soon.

old := c.isBlocking()
c.setBlocking(true)
_, meta, err := c.GetLedger(sequence)
c.setBlocking(old)
return meta, err
}

// GetLedger returns true when ledger is found and it's LedgerCloseMeta.
// Call PrepareRange first to instruct the backend which ledgers to fetch.
//
Expand Down Expand Up @@ -502,7 +516,7 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe
// Now loop along the range until we find the ledger we want.
var errOut error
for {
if !c.blocking && len(c.stellarCoreRunner.getMetaPipe()) == 0 {
if !c.isBlocking() && len(c.stellarCoreRunner.getMetaPipe()) == 0 {
return false, xdr.LedgerCloseMeta{}, nil
}

Expand Down Expand Up @@ -538,10 +552,11 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe
currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString()
c.previousLedgerHash = &currentLedgerHash

if seq == sequence {
// Found the requested seq
c.cachedMeta = result.LedgerCloseMeta
// Update cache with the latest value because we incremented nextLedger.
// TODO add test for this case!
c.cachedMeta = result.LedgerCloseMeta

if seq == sequence {
// If we got the _last_ ledger in a segment, close before returning.
if c.lastLedger != nil && *c.lastLedger == seq {
if err := c.stellarCoreRunner.close(); err != nil {
Expand Down Expand Up @@ -613,6 +628,18 @@ func (c *CaptiveStellarCore) isClosed() bool {
return c.nextLedger == 0 || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil
}

func (c *CaptiveStellarCore) isBlocking() bool {
c.blockingLock.Lock()
defer c.blockingLock.Unlock()
return c.blocking
}

func (c *CaptiveStellarCore) setBlocking(val bool) {
c.blockingLock.Lock()
c.blocking = val
c.blockingLock.Unlock()
}

// Close closes existing Stellar-Core process, streaming sessions and removes all
// temporary files. Note, once a CaptiveStellarCore instance is closed it can can no longer be used and
// all subsequent calls to PrepareRange(), GetLedger(), etc will fail.
Expand Down
20 changes: 20 additions & 0 deletions ingest/ledgerbackend/database_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ledgerbackend
import (
"database/sql"
"sort"
"time"

"github.com/stellar/go/network"
"github.com/stellar/go/support/db"
Expand Down Expand Up @@ -117,6 +118,25 @@ func sortByHash(transactions []xdr.TransactionEnvelope, passphrase string) error
return nil
}

// GetLedgerBlocking works as GetLedger but will block until the ledger is
// available in the backend (even for UnaboundedRange).
// Please note that requesting a ledger sequence far after current ledger will
// block the execution for a long time.
func (dbb *DatabaseBackend) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) {
for {
exists, meta, err := dbb.GetLedger(sequence)
if err != nil {
return xdr.LedgerCloseMeta{}, err
}

if exists {
return meta, nil
} else {
time.Sleep(time.Second)
}
}
}

// GetLedger returns the LedgerCloseMeta for the given ledger sequence number.
// The first returned value is false when the ledger does not exist in the database.
func (dbb *DatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) {
Expand Down
2 changes: 2 additions & 0 deletions ingest/ledgerbackend/ledger_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type LedgerBackend interface {
GetLatestLedgerSequence() (sequence uint32, err error)
// The first returned value is false when the ledger does not exist in a backend.
GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error)
// Works like GetLedger but will block until the ledger is available.
GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error)
// PrepareRange prepares the given range (including from and to) to be loaded.
// Some backends (like captive stellar-core) need to initalize data to be
// able to stream ledgers. Blocks until the first ledger is available.
Expand Down
5 changes: 5 additions & 0 deletions ingest/ledgerbackend/mock_database_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (m *MockDatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseM
return args.Bool(0), args.Get(1).(xdr.LedgerCloseMeta), args.Error(2)
}

func (m *MockDatabaseBackend) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) {
args := m.Called(sequence)
return args.Get(0).(xdr.LedgerCloseMeta), args.Error(1)
}

func (m *MockDatabaseBackend) Close() error {
args := m.Called()
return args.Error(0)
Expand Down
15 changes: 15 additions & 0 deletions ingest/ledgerbackend/remote_captive_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,18 @@ func (c RemoteCaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCl

return parsed.Present, xdr.LedgerCloseMeta(parsed.Ledger), nil
}

func (c RemoteCaptiveStellarCore) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) {
for {
exists, meta, err := c.GetLedger(sequence)
if err != nil {
return xdr.LedgerCloseMeta{}, err
}

if exists {
return meta, nil
} else {
time.Sleep(time.Second)
}
}
}
6 changes: 3 additions & 3 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ type IngestionQ interface {
UpdateExpStateInvalid(bool) error
UpdateIngestVersion(int) error
GetExpStateInvalid() (bool, error)
GetLatestLedger() (uint32, error)
GetLatestHistoryLedger() (uint32, error)
GetOfferCompactionSequence() (uint32, error)
TruncateIngestStateTables() error
DeleteRangeAll(start, end int64) error
Expand Down Expand Up @@ -776,9 +776,9 @@ func (q *Q) ElderLedger(dest interface{}) error {
return q.GetRaw(dest, `SELECT COALESCE(MIN(sequence), 0) FROM history_ledgers`)
}

// GetLatestLedger loads the latest known ledger. Returns 0 if no ledgers in
// GetLatestHistoryLedger loads the latest known ledger. Returns 0 if no ledgers in
// `history_ledgers` table.
func (q *Q) GetLatestLedger() (uint32, error) {
func (q *Q) GetLatestHistoryLedger() (uint32, error) {
var value uint32
err := q.LatestLedger(&value)
return value, err
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func TestLatestLedgerSequenceClosedAt(t *testing.T) {
}
}

func TestGetLatestLedgerEmptyDB(t *testing.T) {
func TestGetLatestHistoryLedgerEmptyDB(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

value, err := q.GetLatestLedger()
value, err := q.GetLatestHistoryLedger()
tt.Assert.NoError(err)
tt.Assert.Equal(uint32(0), value)
}
Expand Down
Loading