Skip to content

Commit

Permalink
services/horizon: Removed LedgerBackend calls inside ingest critical …
Browse files Browse the repository at this point in the history
…section (#3518)

This commit moves calls to `LedgerBackend` in `buildState` and `resumeState`
outside critical section which is a DB transaction acquiring ingestion lock.

To clean the code `LedgerBackend.GetLedgerBlocking` method was introduced that
always blocks until the ledger is available in the backend. Additionally,
`GetLatestLedger` method name was changed to `GetLatestHistoryLedger` for
clarity and `ProcessorRunner` methods have changed and do not depend on
`LedgerBackend` anymore.

This is done to prevent situations in which `LedgerBackend` hangs (ex. broken
connection to Core DB or remote captive core) inside a critical section
ingestion will halt in entire cluster. With this change only a single ingesting
node will halt.
  • Loading branch information
bartekn authored Apr 12, 2021
1 parent b83e017 commit 3d97762
Show file tree
Hide file tree
Showing 25 changed files with 782 additions and 644 deletions.
16 changes: 16 additions & 0 deletions ingest/ledger_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/xdr"
)

// ChangeReader provides convenient, streaming access to a sequence of Changes.
Expand Down Expand Up @@ -57,6 +58,21 @@ func NewLedgerChangeReader(backend ledgerbackend.LedgerBackend, networkPassphras
}, nil
}

// NewLedgerChangeReaderFromLedgerCloseMeta constructs a new LedgerChangeReader instance bound to the given ledger.
// Note that the returned LedgerChangeReader is not thread safe and should not be shared
// by multiple goroutines.
func NewLedgerChangeReaderFromLedgerCloseMeta(networkPassphrase string, ledger xdr.LedgerCloseMeta) (*LedgerChangeReader, error) {
transactionReader, err := NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledger)
if err != nil {
return nil, err
}

return &LedgerChangeReader{
LedgerTransactionReader: transactionReader,
state: feeChangesState,
}, nil
}

// Read returns the next change in the stream.
// If there are no changes remaining io.EOF is returned as an error.
func (r *LedgerChangeReader) Read() (Change, error) {
Expand Down
10 changes: 8 additions & 2 deletions ingest/ledger_transaction_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type LedgerTransactionReader struct {
}

// NewLedgerTransactionReader creates a new TransactionReader instance.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReader(backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) {
exists, ledgerCloseMeta, err := backend.GetLedger(sequence)
if err != nil {
Expand All @@ -30,8 +30,14 @@ func NewLedgerTransactionReader(backend ledgerbackend.LedgerBackend, networkPass
return nil, ErrNotFound
}

return NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta)
}

// NewLedgerTransactionReaderFromXdr creates a new TransactionReader instance from xdr.LedgerCloseMeta.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error) {
reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta}
if err = reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil {
if err := reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil {
return nil, errors.Wrap(err, "error extracting transactions from ledger close meta")
}
return reader, nil
Expand Down
47 changes: 37 additions & 10 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ type CaptiveStellarCore struct {
// for scenarios when Horizon consumes ledgers faster than Stellar-Core produces them
// and using `time.Sleep` when ledger is not available can actually slow entire
// ingestion process.
blocking bool
// blockingLock locks access to blocking.
blockingLock sync.Mutex
blocking bool

// cachedMeta keeps that ledger data of the last fetched ledger. Updated in GetLedger().
cachedMeta *xdr.LedgerCloseMeta
Expand Down Expand Up @@ -233,7 +235,7 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error
// the requested ledger
c.nextLedger = c.roundDownToFirstReplayAfterCheckpointStart(from)
c.lastLedger = &to
c.blocking = true
c.setBlocking(true)
c.previousLedgerHash = nil

return nil
Expand Down Expand Up @@ -292,7 +294,7 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error {
}
}

c.blocking = false
c.setBlocking(false)

return nil
}
Expand Down Expand Up @@ -398,10 +400,10 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error {
return nil
}

old := c.blocking
c.blocking = true
old := c.isBlocking()
c.setBlocking(true)
_, _, err := c.GetLedger(ledgerRange.from)
c.blocking = old
c.setBlocking(old)

if err != nil {
return errors.Wrapf(err, "Error fast-forwarding to %d", ledgerRange.from)
Expand Down Expand Up @@ -452,6 +454,18 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
return false
}

// GetLedgerBlocking works as GetLedger but will block until the ledger is
// available in the backend (even for UnboundedRange).
// Please note that requesting a ledger sequence far after current ledger will
// block the execution for a long time.
func (c *CaptiveStellarCore) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) {
old := c.isBlocking()
c.setBlocking(true)
_, meta, err := c.GetLedger(sequence)
c.setBlocking(old)
return meta, err
}

// GetLedger returns true when ledger is found and it's LedgerCloseMeta.
// Call PrepareRange first to instruct the backend which ledgers to fetch.
//
Expand Down Expand Up @@ -502,7 +516,7 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe
// Now loop along the range until we find the ledger we want.
var errOut error
for {
if !c.blocking && len(c.stellarCoreRunner.getMetaPipe()) == 0 {
if !c.isBlocking() && len(c.stellarCoreRunner.getMetaPipe()) == 0 {
return false, xdr.LedgerCloseMeta{}, nil
}

Expand Down Expand Up @@ -538,10 +552,11 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe
currentLedgerHash := result.LedgerCloseMeta.LedgerHash().HexString()
c.previousLedgerHash = &currentLedgerHash

if seq == sequence {
// Found the requested seq
c.cachedMeta = result.LedgerCloseMeta
// Update cache with the latest value because we incremented nextLedger.
// TODO add test for this case!
c.cachedMeta = result.LedgerCloseMeta

if seq == sequence {
// If we got the _last_ ledger in a segment, close before returning.
if c.lastLedger != nil && *c.lastLedger == seq {
if err := c.stellarCoreRunner.close(); err != nil {
Expand Down Expand Up @@ -613,6 +628,18 @@ func (c *CaptiveStellarCore) isClosed() bool {
return c.nextLedger == 0 || c.stellarCoreRunner == nil || c.stellarCoreRunner.context().Err() != nil
}

func (c *CaptiveStellarCore) isBlocking() bool {
c.blockingLock.Lock()
defer c.blockingLock.Unlock()
return c.blocking
}

func (c *CaptiveStellarCore) setBlocking(val bool) {
c.blockingLock.Lock()
c.blocking = val
c.blockingLock.Unlock()
}

// Close closes existing Stellar-Core process, streaming sessions and removes all
// temporary files. Note, once a CaptiveStellarCore instance is closed it can can no longer be used and
// all subsequent calls to PrepareRange(), GetLedger(), etc will fail.
Expand Down
20 changes: 20 additions & 0 deletions ingest/ledgerbackend/database_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ledgerbackend
import (
"database/sql"
"sort"
"time"

"github.com/stellar/go/network"
"github.com/stellar/go/support/db"
Expand Down Expand Up @@ -117,6 +118,25 @@ func sortByHash(transactions []xdr.TransactionEnvelope, passphrase string) error
return nil
}

// GetLedgerBlocking works as GetLedger but will block until the ledger is
// available in the backend (even for UnaboundedRange).
// Please note that requesting a ledger sequence far after current ledger will
// block the execution for a long time.
func (dbb *DatabaseBackend) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) {
for {
exists, meta, err := dbb.GetLedger(sequence)
if err != nil {
return xdr.LedgerCloseMeta{}, err
}

if exists {
return meta, nil
} else {
time.Sleep(time.Second)
}
}
}

// GetLedger returns the LedgerCloseMeta for the given ledger sequence number.
// The first returned value is false when the ledger does not exist in the database.
func (dbb *DatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error) {
Expand Down
2 changes: 2 additions & 0 deletions ingest/ledgerbackend/ledger_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type LedgerBackend interface {
GetLatestLedgerSequence() (sequence uint32, err error)
// The first returned value is false when the ledger does not exist in a backend.
GetLedger(sequence uint32) (bool, xdr.LedgerCloseMeta, error)
// Works like GetLedger but will block until the ledger is available.
GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error)
// PrepareRange prepares the given range (including from and to) to be loaded.
// Some backends (like captive stellar-core) need to initalize data to be
// able to stream ledgers. Blocks until the first ledger is available.
Expand Down
5 changes: 5 additions & 0 deletions ingest/ledgerbackend/mock_database_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (m *MockDatabaseBackend) GetLedger(sequence uint32) (bool, xdr.LedgerCloseM
return args.Bool(0), args.Get(1).(xdr.LedgerCloseMeta), args.Error(2)
}

func (m *MockDatabaseBackend) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) {
args := m.Called(sequence)
return args.Get(0).(xdr.LedgerCloseMeta), args.Error(1)
}

func (m *MockDatabaseBackend) Close() error {
args := m.Called()
return args.Error(0)
Expand Down
15 changes: 15 additions & 0 deletions ingest/ledgerbackend/remote_captive_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,18 @@ func (c RemoteCaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCl

return parsed.Present, xdr.LedgerCloseMeta(parsed.Ledger), nil
}

func (c RemoteCaptiveStellarCore) GetLedgerBlocking(sequence uint32) (xdr.LedgerCloseMeta, error) {
for {
exists, meta, err := c.GetLedger(sequence)
if err != nil {
return xdr.LedgerCloseMeta{}, err
}

if exists {
return meta, nil
} else {
time.Sleep(time.Second)
}
}
}
6 changes: 3 additions & 3 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ type IngestionQ interface {
UpdateExpStateInvalid(bool) error
UpdateIngestVersion(int) error
GetExpStateInvalid() (bool, error)
GetLatestLedger() (uint32, error)
GetLatestHistoryLedger() (uint32, error)
GetOfferCompactionSequence() (uint32, error)
TruncateIngestStateTables() error
DeleteRangeAll(start, end int64) error
Expand Down Expand Up @@ -776,9 +776,9 @@ func (q *Q) ElderLedger(dest interface{}) error {
return q.GetRaw(dest, `SELECT COALESCE(MIN(sequence), 0) FROM history_ledgers`)
}

// GetLatestLedger loads the latest known ledger. Returns 0 if no ledgers in
// GetLatestHistoryLedger loads the latest known ledger. Returns 0 if no ledgers in
// `history_ledgers` table.
func (q *Q) GetLatestLedger() (uint32, error) {
func (q *Q) GetLatestHistoryLedger() (uint32, error) {
var value uint32
err := q.LatestLedger(&value)
return value, err
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func TestLatestLedgerSequenceClosedAt(t *testing.T) {
}
}

func TestGetLatestLedgerEmptyDB(t *testing.T) {
func TestGetLatestHistoryLedgerEmptyDB(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

value, err := q.GetLatestLedger()
value, err := q.GetLatestHistoryLedger()
tt.Assert.NoError(err)
tt.Assert.Equal(uint32(0), value)
}
Expand Down
Loading

0 comments on commit 3d97762

Please sign in to comment.