Skip to content

Commit

Permalink
exp/orderbook: Allow updates to the order book graph which span multi…
Browse files Browse the repository at this point in the history
…ple ledgers (#2649)

Before #2639 we were updating the order book graph via ingestion on
every single ledger. The order book graph was implemented with the
expectation that you must apply changes for every single ledger and no
ledgers should be skipped.

However, we can no longer maintain that invariant because now we are
applying updates to the order book graph via the OrderBookStream. The
OrderBookStream periodically polls for new / updated offers from the
Horizon DB. It is possible that ingestion has advanced by more than one
ledger since the last time OrderBookStream updated the order book graph.

Therefore we must relax the restrictions for how updates are applied to
an order book graph. We still require ledgers to be strictly increasing
when applying updates to an order book graph. However, now we allow
updates to span multiple ledgers.

We also allow you to remove an offer which is not present in the order
book. Consider the case where you apply updates from ledger x. Let's say
the next time we update the order book graph ingestion has already
advanced to x+2. It is possible that an offer was created in ledger x+1
and in ledger x+2 we are removing the offer. However, our graph was last
updated at ledger x, so when we try to remove the offer which was
created in ledger x+1 we will panic unless we deliberately skip over
offers which are not present in the order book.
  • Loading branch information
tamirms authored Jun 2, 2020
1 parent 7f5df5f commit 7aa0d78
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 36 deletions.
5 changes: 4 additions & 1 deletion exp/orderbook/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (tx *orderBookBatchedUpdates) apply(ledger uint32) error {
}
tx.committed = true

if tx.orderbook.lastLedger > 0 && ledger != tx.orderbook.lastLedger+1 {
if tx.orderbook.lastLedger > 0 && ledger <= tx.orderbook.lastLedger {
return errUnexpectedLedger
}

Expand All @@ -71,6 +71,9 @@ func (tx *orderBookBatchedUpdates) apply(ledger uint32) error {
panic(errors.Wrap(err, "could not apply update in batch"))
}
case removeOfferOperationType:
if _, ok := tx.orderbook.tradingPairForOffer[operation.offerID]; !ok {
continue
}
if err := tx.orderbook.remove(operation.offerID); err != nil {
panic(errors.Wrap(err, "could not apply update in batch"))
}
Expand Down
23 changes: 22 additions & 1 deletion exp/orderbook/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,23 @@ func TestApplyOutdatedLedger(t *testing.T) {
graph.Discard()

graph.AddOffer(eurOffer)
err = graph.Apply(4)
err = graph.Apply(2)
if err != errUnexpectedLedger {
t.Fatalf("expected error %v but got %v", errUnexpectedLedger, err)
}
if graph.lastLedger != 2 {
t.Fatalf("expected last ledger to be %v but got %v", 2, graph.lastLedger)
}

graph.Discard()

err = graph.Apply(4)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
if graph.lastLedger != 4 {
t.Fatalf("expected last ledger to be %v but got %v", 4, graph.lastLedger)
}
}

func TestAddOfferOrderBook(t *testing.T) {
Expand Down Expand Up @@ -815,6 +825,17 @@ func TestRemoveOfferOrderBook(t *testing.T) {
t.Fatalf("expected last ledger to be %v but got %v", 3, graph.lastLedger)
}

// Skip over offer ids which are not present in the graph
err = graph.
RemoveOffer(988888).
Apply(5)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
if graph.lastLedger != 5 {
t.Fatalf("expected last ledger to be %v but got %v", 3, graph.lastLedger)
}

expectedGraph.edgesForSellingAsset = map[string]edgeSet{}
expectedGraph.tradingPairForOffer = map[xdr.Int64]tradingPair{}
assertGraphEquals(t, graph, expectedGraph)
Expand Down
8 changes: 1 addition & 7 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

metrics "github.com/rcrowley/go-metrics"
"github.com/stellar/go/clients/stellarcore"
"github.com/stellar/go/exp/orderbook"
horizonContext "github.com/stellar/go/services/horizon/internal/context"
"github.com/stellar/go/services/horizon/internal/db2/core"
"github.com/stellar/go/services/horizon/internal/db2/history"
Expand Down Expand Up @@ -458,12 +457,7 @@ func (a *App) init() {
// expingester
initExpIngester(a)
}
orderBookGraph := orderbook.NewOrderBookGraph()
a.orderBookStream = &expingest.OrderBookStream{
OrderBookGraph: orderBookGraph,
HistoryQ: &history.Q{a.HorizonSession(a.ctx)},
}
initPathFinder(a, orderBookGraph)
initPathFinder(a)

// txsub
initSubmissionSystem(a)
Expand Down
62 changes: 39 additions & 23 deletions services/horizon/internal/expingest/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sort"
"time"

"github.com/rcrowley/go-metrics"

"github.com/stellar/go/exp/orderbook"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
Expand All @@ -25,10 +27,22 @@ const (
// in memory graph. However, it is safe for other go routines to use the
// in memory graph for read operations.
type OrderBookStream struct {
OrderBookGraph orderbook.OBGraph
HistoryQ history.IngestionQ
lastLedger uint32
lastUpdate time.Time
graph orderbook.OBGraph
historyQ history.IngestionQ
// LatestLedgerGauge exposes the local (order book graph)
// latest processed ledger
LatestLedgerGauge metrics.Gauge
lastLedger uint32
lastUpdate time.Time
}

// NewOrderBookStream constructs and initializes an OrderBookStream instance
func NewOrderBookStream(historyQ history.IngestionQ, graph orderbook.OBGraph) *OrderBookStream {
return &OrderBookStream{
graph: graph,
historyQ: historyQ,
LatestLedgerGauge: metrics.NewGauge(),
}
}

type ingestionStatus struct {
Expand All @@ -42,21 +56,21 @@ func (o *OrderBookStream) getIngestionStatus() (ingestionStatus, error) {
var status ingestionStatus
var err error

status.StateInvalid, err = o.HistoryQ.GetExpStateInvalid()
status.StateInvalid, err = o.historyQ.GetExpStateInvalid()
if err != nil {
return status, errors.Wrap(err, "Error from GetExpStateInvalid")
}

var lastHistoryLedger uint32
lastHistoryLedger, err = o.HistoryQ.GetLatestLedger()
lastHistoryLedger, err = o.historyQ.GetLatestLedger()
if err != nil {
return status, errors.Wrap(err, "Error from GetLatestLedger")
}
status.LastIngestedLedger, err = o.HistoryQ.GetLastLedgerExpIngestNonBlocking()
status.LastIngestedLedger, err = o.historyQ.GetLastLedgerExpIngestNonBlocking()
if err != nil {
return status, errors.Wrap(err, "Error from GetLastLedgerExpIngestNonBlocking")
}
status.LastOfferCompactionLedger, err = o.HistoryQ.GetOfferCompactionSequence()
status.LastOfferCompactionLedger, err = o.historyQ.GetOfferCompactionSequence()
if err != nil {
return status, errors.Wrap(err, "Error from GetOfferCompactionSequence")
}
Expand Down Expand Up @@ -103,7 +117,7 @@ func (o *OrderBookStream) update(status ingestionStatus) error {
}

if reset {
o.OrderBookGraph.Clear()
o.graph.Clear()
o.lastLedger = 0

// wait until offers in horizon db is valid before populating order book graph
Expand All @@ -113,55 +127,57 @@ func (o *OrderBookStream) update(status ingestionStatus) error {
return nil
}

defer o.OrderBookGraph.Discard()
defer o.graph.Discard()

offers, err := o.HistoryQ.GetAllOffers()
offers, err := o.historyQ.GetAllOffers()
if err != nil {
return errors.Wrap(err, "Error from GetAllOffers")
}

for _, offer := range offers {
addOfferToGraph(o.OrderBookGraph, offer)
addOfferToGraph(o.graph, offer)
}

if err := o.OrderBookGraph.Apply(status.LastIngestedLedger); err != nil {
if err := o.graph.Apply(status.LastIngestedLedger); err != nil {
return errors.Wrap(err, "Error applying changes to order book")
}

o.lastLedger = status.LastIngestedLedger
o.LatestLedgerGauge.Update(int64(status.LastIngestedLedger))
return nil
}

if status.LastIngestedLedger == o.lastLedger {
return nil
}

defer o.OrderBookGraph.Discard()
defer o.graph.Discard()

offers, err := o.HistoryQ.GetUpdatedOffers(o.lastLedger)
offers, err := o.historyQ.GetUpdatedOffers(o.lastLedger)
if err != nil {
return errors.Wrap(err, "Error from GetUpdatedOffers")
}
for _, offer := range offers {
if offer.Deleted {
o.OrderBookGraph.RemoveOffer(offer.OfferID)
o.graph.RemoveOffer(offer.OfferID)
} else {
addOfferToGraph(o.OrderBookGraph, offer)
addOfferToGraph(o.graph, offer)
}
}

if err = o.OrderBookGraph.Apply(status.LastIngestedLedger); err != nil {
if err = o.graph.Apply(status.LastIngestedLedger); err != nil {
return errors.Wrap(err, "Error applying changes to order book")
}

o.lastUpdate = time.Now()
o.lastLedger = status.LastIngestedLedger
o.LatestLedgerGauge.Update(int64(status.LastIngestedLedger))
return nil
}

func (o *OrderBookStream) verifyAllOffers() {
offers := o.OrderBookGraph.Offers()
ingestionOffers, err := o.HistoryQ.GetAllOffers()
offers := o.graph.Offers()
ingestionOffers, err := o.historyQ.GetAllOffers()
if err != nil {
// reset last update so that we retry verification on next update
o.lastUpdate = time.Now().Add(verificationFrequency * -2)
Expand Down Expand Up @@ -210,13 +226,13 @@ func (o *OrderBookStream) verifyAllOffers() {
// After calling this function, the the in memory order book graph should be consistent with the
// Horizon DB (assuming no error is returned).
func (o *OrderBookStream) Update() error {
if err := o.HistoryQ.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead}); err != nil {
if err := o.historyQ.BeginTx(&sql.TxOptions{ReadOnly: true, Isolation: sql.LevelRepeatableRead}); err != nil {
return errors.Wrap(err, "Error starting repeatable read transaction")
}
defer o.HistoryQ.Rollback()
defer o.historyQ.Rollback()

// add 15 minute jitter so that not all horizon nodes are calling
// HistoryQ.GetAllOffers at the same time
// historyQ.GetAllOffers at the same time
jitter := time.Duration(rand.Int63n(int64(15 * time.Minute)))
requiresVerification := !o.lastUpdate.Equal(time.Time{}) &&
time.Since(o.lastUpdate) >= verificationFrequency+jitter
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/expingest/orderbook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestIngestionStatus(t *testing.T) {

func (t *IngestionStatusTestSuite) SetupTest() {
t.historyQ = &mockDBQ{}
t.stream = &OrderBookStream{HistoryQ: t.historyQ}
t.stream = NewOrderBookStream(t.historyQ, &mockOrderBookGraph{})
}

func (t *IngestionStatusTestSuite) TearDownTest() {
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestUpdateOrderBookStream(t *testing.T) {
func (t *UpdateOrderBookStreamTestSuite) SetupTest() {
t.historyQ = &mockDBQ{}
t.graph = &mockOrderBookGraph{}
t.stream = &OrderBookStream{OrderBookGraph: t.graph, HistoryQ: t.historyQ}
t.stream = NewOrderBookStream(t.historyQ, t.graph)
}

func (t *UpdateOrderBookStreamTestSuite) TearDownTest() {
Expand Down Expand Up @@ -484,7 +484,7 @@ func TestVerifyOrderBookStream(t *testing.T) {
func (t *VerifyOrderBookStreamTestSuite) SetupTest() {
t.historyQ = &mockDBQ{}
t.graph = &mockOrderBookGraph{}
t.stream = &OrderBookStream{OrderBookGraph: t.graph, HistoryQ: t.historyQ}
t.stream = NewOrderBookStream(t.historyQ, t.graph)

sellerID := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"
otherSellerID := "GAXI33UCLQTCKM2NMRBS7XYBR535LLEVAHL5YBN4FTCB4HZHT7ZA5CVK"
Expand Down
9 changes: 8 additions & 1 deletion services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ func initExpIngester(app *App) {
}
}

func initPathFinder(app *App, orderBookGraph *orderbook.OrderBookGraph) {
func initPathFinder(app *App) {
orderBookGraph := orderbook.NewOrderBookGraph()
app.orderBookStream = expingest.NewOrderBookStream(
&history.Q{app.HorizonSession(app.ctx)},
orderBookGraph,
)

app.paths = simplepath.NewInMemoryFinder(orderBookGraph)
}

Expand Down Expand Up @@ -144,6 +150,7 @@ func initDbMetrics(app *App) {
app.metrics.Register("history.latest_ledger", app.historyLatestLedgerGauge)
app.metrics.Register("history.elder_ledger", app.historyElderLedgerGauge)
app.metrics.Register("stellar_core.latest_ledger", app.coreLatestLedgerGauge)
app.metrics.Register("order_book_stream.latest_ledger", app.orderBookStream.LatestLedgerGauge)
app.metrics.Register("history.open_connections", app.horizonConnGauge)
app.metrics.Register("stellar_core.open_connections", app.coreConnGauge)
app.metrics.Register("goroutines", app.goroutineGauge)
Expand Down

0 comments on commit 7aa0d78

Please sign in to comment.