Skip to content

Commit

Permalink
services/horizon/internal/actions: Add latest ingested ledger header …
Browse files Browse the repository at this point in the history
…in experimental ingestion endpoints (stellar#1830)


Include a Latest-Ledger header with the sequence number of the last processed ledger by the
experimental ingestion system. Also, ensure the endpoint responses contains only data consistent with the sequence number included in the header
  • Loading branch information
tamirms authored Oct 15, 2019
1 parent 2ac805c commit 9161997
Showing 30 changed files with 648 additions and 222 deletions.
8 changes: 7 additions & 1 deletion exp/orderbook/batch.go
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ func (tx *orderBookBatchedUpdates) removeOffer(offerID xdr.Int64) *orderBookBatc
}

// apply will attempt to apply all the updates in the batch to the order book
func (tx *orderBookBatchedUpdates) apply() error {
func (tx *orderBookBatchedUpdates) apply(ledger uint32) error {
tx.orderbook.lock.Lock()
defer tx.orderbook.lock.Unlock()

@@ -60,6 +60,10 @@ func (tx *orderBookBatchedUpdates) apply() error {
}
tx.committed = true

if tx.orderbook.lastLedger > 0 && ledger != tx.orderbook.lastLedger+1 {
return errUnexpectedLedger
}

for _, operation := range tx.operations {
switch operation.operationType {
case addOfferOperationType:
@@ -75,5 +79,7 @@ func (tx *orderBookBatchedUpdates) apply() error {
}
}

tx.orderbook.lastLedger = ledger

return nil
}
27 changes: 17 additions & 10 deletions exp/orderbook/graph.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ var (
errEmptyOffers = errors.New("offers is empty")
errAssetAmountIsZero = errors.New("current asset amount is 0")
errBatchAlreadyApplied = errors.New("cannot apply batched updates more than once")
errUnexpectedLedger = errors.New("cannot apply unexpected ledger")
)

type sortByType string
@@ -46,6 +47,8 @@ type OrderBookGraph struct {
// batchedUpdates is internal batch of updates to this graph. Users can
// create multiple batches using `Batch()` method but sometimes only one
// batch is enough.
// the orderbook graph is accurate up to lastLedger
lastLedger uint32
batchedUpdates *orderBookBatchedUpdates
lock sync.RWMutex
}
@@ -85,8 +88,8 @@ func (graph *OrderBookGraph) Discard() {

// Apply will attempt to apply all the updates in the internal batch to the order book.
// When Apply is successful, a new empty, instance of internal batch will be created.
func (graph *OrderBookGraph) Apply() error {
err := graph.batchedUpdates.apply()
func (graph *OrderBookGraph) Apply(ledger uint32) error {
err := graph.batchedUpdates.apply(ledger)
if err != nil {
return err
}
@@ -156,7 +159,7 @@ func (graph *OrderBookGraph) findOffers(
// Both Asks and Bids will span at most `maxPriceLevels` price levels
func (graph *OrderBookGraph) FindAsksAndBids(
selling, buying xdr.Asset, maxPriceLevels int,
) ([]xdr.OfferEntry, []xdr.OfferEntry) {
) ([]xdr.OfferEntry, []xdr.OfferEntry, uint32) {
buyingString := buying.String()
sellingString := selling.String()

@@ -165,7 +168,7 @@ func (graph *OrderBookGraph) FindAsksAndBids(
asks := graph.findOffers(sellingString, buyingString, maxPriceLevels)
bids := graph.findOffers(buyingString, sellingString, maxPriceLevels)

return asks, bids
return asks, bids, graph.lastLedger
}

// add inserts a given offer into the order book graph
@@ -246,7 +249,7 @@ func (graph *OrderBookGraph) FindPaths(
sourceAssetBalances []xdr.Int64,
validateSourceBalance bool,
maxAssetsPerPath int,
) ([]Path, error) {
) ([]Path, uint32, error) {
destinationAssetString := destinationAsset.String()
sourceAssetsMap := map[string]xdr.Int64{}
for i, sourceAsset := range sourceAssets {
@@ -273,16 +276,18 @@ func (graph *OrderBookGraph) FindPaths(
destinationAsset,
destinationAmount,
)
lastLedger := graph.lastLedger
graph.lock.RUnlock()
if err != nil {
return nil, errors.Wrap(err, "could not determine paths")
return nil, lastLedger, errors.Wrap(err, "could not determine paths")
}

return sortAndFilterPaths(
paths, err := sortAndFilterPaths(
searchState.paths,
maxAssetsPerPath,
sortBySourceAsset,
)
return paths, lastLedger, err
}

// FindFixedPaths returns a list of payment paths where the source and destination
@@ -296,7 +301,7 @@ func (graph *OrderBookGraph) FindFixedPaths(
amountToSpend xdr.Int64,
destinationAssets []xdr.Asset,
maxAssetsPerPath int,
) ([]Path, error) {
) ([]Path, uint32, error) {
target := map[string]bool{}
for _, destinationAsset := range destinationAssets {
destinationAssetString := destinationAsset.String()
@@ -320,20 +325,22 @@ func (graph *OrderBookGraph) FindFixedPaths(
sourceAsset,
amountToSpend,
)
lastLedger := graph.lastLedger
graph.lock.RUnlock()
if err != nil {
return nil, errors.Wrap(err, "could not determine paths")
return nil, lastLedger, errors.Wrap(err, "could not determine paths")
}

sort.Slice(searchState.paths, func(i, j int) bool {
return searchState.paths[i].DestinationAmount > searchState.paths[j].DestinationAmount
})

return sortAndFilterPaths(
paths, err := sortAndFilterPaths(
searchState.paths,
maxAssetsPerPath,
sortByDestinationAsset,
)
return paths, lastLedger, err
}

// compareSourceAsset will group payment paths by `SourceAsset`
Loading

0 comments on commit 9161997

Please sign in to comment.