Skip to content

Commit

Permalink
Add OrderBookStream tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed May 31, 2020
1 parent b84a859 commit e48e988
Show file tree
Hide file tree
Showing 5 changed files with 541 additions and 48 deletions.
2 changes: 1 addition & 1 deletion services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (a *App) init() {
orderBookGraph := orderbook.NewOrderBookGraph()
a.orderBookStream = &expingest.OrderBookStream{
OrderBookGraph: orderBookGraph,
HistorySession: a.historyQ.Clone(),
HistoryQ: &history.Q{a.HorizonSession(a.ctx)},
}
initPathFinder(a, orderBookGraph)
}
Expand Down
10 changes: 8 additions & 2 deletions services/horizon/internal/expingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,13 @@ func (r resumeState) run(s *System) (transition, error) {
// without doing any verification. the verification routine only works
// when we are updating both the db and order book stream at the same time
if s.verifyOrderBookStream {
_, _, err = s.orderBookStream.update(s.historyQ)
var status ingestionStatus
status, err = s.orderBookStream.getIngestionStatus()
if err != nil {
return retryResume(r), errors.Wrap(err, "Error obtaining ingestion status")
}

_, _, err = s.orderBookStream.update(status)
if err != nil {
return retryResume(r), errors.Wrap(err, "Error updating order book stream")
}
Expand Down Expand Up @@ -875,7 +881,7 @@ func (s *System) completeIngestion(ledger uint32) error {
}

if s.verifyOrderBookStream {
s.orderBookStream.updateAndVerify(ledger, s.historyQ, s.graph)
s.orderBookStream.updateAndVerify(s.graph, ledger)
}

if err := s.historyQ.Commit(); err != nil {
Expand Down
21 changes: 12 additions & 9 deletions services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,17 @@ func NewSystem(config Config) (*System, error) {
historyAdapter := adapters.MakeHistoryArchiveAdapter(archive)

system := &System{
ctx: ctx,
cancel: cancel,
historyAdapter: historyAdapter,
ledgerBackend: ledgerBackend,
config: config,
historyQ: historyQ,
graph: config.OrderBookGraph,
orderBookStream: &OrderBookStream{OrderBookGraph: orderbook.NewOrderBookGraph()},
ctx: ctx,
cancel: cancel,
historyAdapter: historyAdapter,
ledgerBackend: ledgerBackend,
config: config,
historyQ: historyQ,
graph: config.OrderBookGraph,
orderBookStream: &OrderBookStream{
HistoryQ: historyQ,
OrderBookGraph: orderbook.NewOrderBookGraph(),
},
verifyOrderBookStream: true,
disableStateVerification: config.DisableStateVerification,
maxStreamRetries: config.MaxStreamRetries,
Expand Down Expand Up @@ -365,7 +368,7 @@ func (s *System) loadOffersIntoMemory(sequence uint32) error {
}

if s.verifyOrderBookStream {
s.orderBookStream.updateAndVerify(sequence, s.historyQ, s.graph)
s.orderBookStream.updateAndVerify(s.graph, sequence)
}

if err := s.graphApply(sequence); err != nil {
Expand Down
76 changes: 40 additions & 36 deletions services/horizon/internal/expingest/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/stellar/go/exp/orderbook"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
Expand All @@ -18,8 +17,8 @@ import (
// in memory graph. However, it is safe for other go routines to use the
// in memory graph for read operations.
type OrderBookStream struct {
OrderBookGraph *orderbook.OrderBookGraph
HistorySession *db.Session
OrderBookGraph orderbook.OBGraph
HistoryQ history.IngestionQ
lastLedger uint32
}

Expand All @@ -30,27 +29,27 @@ type ingestionStatus struct {
LastOfferCompactionLedger uint32
}

func (o *OrderBookStream) isValid(q history.IngestionQ) (ingestionStatus, error) {
func (o *OrderBookStream) getIngestionStatus() (ingestionStatus, error) {
var status ingestionStatus
var err error

status.StateInvalid, err = q.GetExpStateInvalid()
status.StateInvalid, err = o.HistoryQ.GetExpStateInvalid()
if err != nil {
return status, errors.Wrap(err, "error from GetExpStateInvalid")
return status, errors.Wrap(err, "Error from GetExpStateInvalid")
}

var lastHistoryLedger uint32
lastHistoryLedger, err = q.GetLatestLedger()
lastHistoryLedger, err = o.HistoryQ.GetLatestLedger()
if err != nil {
return status, errors.Wrap(err, "error from GetLatestLedger")
return status, errors.Wrap(err, "Error from GetLatestLedger")
}
status.LastIngestedLedger, err = q.GetLastLedgerExpIngestNonBlocking()
status.LastIngestedLedger, err = o.HistoryQ.GetLastLedgerExpIngestNonBlocking()
if err != nil {
return status, errors.Wrap(err, "error from GetLastLedgerExpIngestNonBlocking")
return status, errors.Wrap(err, "Error from GetLastLedgerExpIngestNonBlocking")
}
status.LastOfferCompactionLedger, err = q.GetOfferCompactionSequence()
status.LastOfferCompactionLedger, err = o.HistoryQ.GetOfferCompactionSequence()
if err != nil {
return status, errors.Wrap(err, "error from GetOfferCompactionSequence")
return status, errors.Wrap(err, "Error from GetOfferCompactionSequence")
}

status.HistoryConsistentWithState = (status.LastIngestedLedger == lastHistoryLedger) ||
Expand All @@ -61,12 +60,7 @@ func (o *OrderBookStream) isValid(q history.IngestionQ) (ingestionStatus, error)
return status, nil
}

func (o *OrderBookStream) update(q history.IngestionQ) ([]history.Offer, []xdr.Int64, error) {
status, err := o.isValid(q)
if err != nil {
return nil, nil, errors.Wrap(err, "error from isValid check")
}

func (o *OrderBookStream) update(status ingestionStatus) ([]history.Offer, []xdr.Int64, error) {
reset := o.lastLedger == 0
if status.StateInvalid || !status.HistoryConsistentWithState {
log.WithField("status", status).Warn("ingestion state is invalid")
Expand Down Expand Up @@ -95,14 +89,13 @@ func (o *OrderBookStream) update(q history.IngestionQ) ([]history.Offer, []xdr.I
}

defer o.OrderBookGraph.Discard()
var offers []history.Offer
offers, err = loadOffersIntoGraph(q, o.OrderBookGraph)
offers, err := loadOffersIntoGraph(o.HistoryQ, o.OrderBookGraph)
if err != nil {
return nil, nil, errors.Wrap(err, "error from loadOffersIntoGraph")
return nil, nil, errors.Wrap(err, "Error from loadOffersIntoGraph")
}

if err = o.OrderBookGraph.Apply(status.LastIngestedLedger); err != nil {
return nil, nil, err
return nil, nil, errors.Wrap(err, "Error applying changes to order book")
}

o.lastLedger = status.LastIngestedLedger
Expand All @@ -115,11 +108,11 @@ func (o *OrderBookStream) update(q history.IngestionQ) ([]history.Offer, []xdr.I

defer o.OrderBookGraph.Discard()

var updated, rows []history.Offer
var updated []history.Offer
var removed []xdr.Int64
rows, err = q.GetUpdatedOffers(o.lastLedger)
rows, err := o.HistoryQ.GetUpdatedOffers(o.lastLedger)
if err != nil {
return nil, nil, errors.Wrap(err, "error from GetUpdatedOffers")
return nil, nil, errors.Wrap(err, "Error from GetUpdatedOffers")
}
for _, row := range rows {
if row.Deleted {
Expand All @@ -135,7 +128,7 @@ func (o *OrderBookStream) update(q history.IngestionQ) ([]history.Offer, []xdr.I
}

if err = o.OrderBookGraph.Apply(status.LastIngestedLedger); err != nil {
return nil, nil, errors.Wrap(err, "could not apply changes to orderbook")
return nil, nil, errors.Wrap(err, "Error applying changes to order book")
}

o.lastLedger = status.LastIngestedLedger
Expand Down Expand Up @@ -197,27 +190,38 @@ func verifyRemovedOffers(ledger uint32, fromDB []xdr.Int64, fromIngestion []xdr.
}
}

func (o *OrderBookStream) updateAndVerify(sequence uint32, q history.IngestionQ, graph orderbook.OBGraph) {
dbUpdates, dbRemoved, err := o.update(q)
func (o *OrderBookStream) updateAndVerify(graph orderbook.OBGraph, sequence uint32) {
status, err := o.getIngestionStatus()
if err != nil {
log.WithError(err).WithField("sequence", sequence).Info("Error obtaining ingestion status")
return
}

dbUpdates, dbRemoved, err := o.update(status)
if err != nil {
log.WithError(err).WithField("sequence", sequence).Info("could not update order book ingester")
log.WithError(err).WithField("sequence", sequence).Info("Error consuming from order book stream")
return
}
ingestionUpdates, ingestionRemoved := graph.Pending()
verifyUpdatedOffers(sequence, dbUpdates, ingestionUpdates)
verifyRemovedOffers(sequence, dbRemoved, ingestionRemoved)
}

// Update will query the Horizon DB for updates and apply them to the in memory order book graph.
// After calling this function the the in memory order book graph should be consistent with the
// Update will query the Horizon DB for offers which have been created, removed, or updated since the
// last time Update() was called. Those changes will then be applied to the in memory order book graph.
// After calling this function, the the in memory order book graph should be consistent with the
// Horizon DB (assuming no error is returned).
func (o *OrderBookStream) Update() error {
q := &history.Q{o.HistorySession}
if err := q.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead}); err != nil {
return errors.Wrap(err, "could not start repeatable read transaction")
if err := o.HistoryQ.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead}); err != nil {
return errors.Wrap(err, "Error starting repeatable read transaction")
}
defer o.HistoryQ.Rollback()

status, err := o.getIngestionStatus()
if err != nil {
return errors.Wrap(err, "Error obtaining ingestion status")
}
defer q.Rollback()

_, _, err := o.update(q)
_, _, err = o.update(status)
return err
}
Loading

0 comments on commit e48e988

Please sign in to comment.