From f611559323501efa7fc3a3967acd1313db4bfeb6 Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Fri, 30 Aug 2019 17:16:35 +0200 Subject: [PATCH 1/4] Add /order_book ingestion endpoint --- exp/orderbook/graph.go | 32 + exp/orderbook/graph_test.go | 58 ++ .../horizon/internal/actions/helpers_test.go | 23 + .../horizon/internal/actions/offer_test.go | 75 +- .../horizon/internal/actions/orderbook.go | 147 ++++ .../internal/actions/orderbook_test.go | 642 ++++++++++++++++++ .../horizon/internal/actions_path_test.go | 1 + services/horizon/internal/app.go | 2 +- services/horizon/internal/handler.go | 56 ++ .../horizon/internal/ledger/ledger_source.go | 22 +- .../horizon/internal/stream_handler_test.go | 318 +++++++-- services/horizon/internal/web.go | 23 +- 12 files changed, 1271 insertions(+), 128 deletions(-) create mode 100644 services/horizon/internal/actions/orderbook.go create mode 100644 services/horizon/internal/actions/orderbook_test.go diff --git a/exp/orderbook/graph.go b/exp/orderbook/graph.go index beffc3e4f6..1aaa32f3d8 100644 --- a/exp/orderbook/graph.go +++ b/exp/orderbook/graph.go @@ -119,6 +119,38 @@ func (graph *OrderBookGraph) batch() *orderBookBatchedUpdates { } } +// FindOffers returns all offers for a given trading pair +// The offers will be sorted by price from cheapest to most expensive +// The returned offers will span at most `limit` price levels +func (graph *OrderBookGraph) FindOffers(selling, buying xdr.Asset, limit int) []xdr.OfferEntry { + results := []xdr.OfferEntry{} + buyingString := buying.String() + sellingString := selling.String() + + graph.lock.RLock() + defer graph.lock.RUnlock() + edges, ok := graph.edgesForSellingAsset[sellingString] + if !ok { + return results + } + offers, ok := edges[buyingString] + if !ok { + return results + } + + for _, offer := range offers { + if len(results) == 0 || results[len(results)-1].Price != offer.Price { + limit-- + } + if limit < 0 { + return results + } + + results = append(results, offer) + } + return results +} + // add inserts a given offer into the order book graph func (graph *OrderBookGraph) add(offer xdr.OfferEntry) error { if _, contains := graph.tradingPairForOffer[offer.OfferId]; contains { diff --git a/exp/orderbook/graph_test.go b/exp/orderbook/graph_test.go index c64c3f3e3f..9a5b056d16 100644 --- a/exp/orderbook/graph_test.go +++ b/exp/orderbook/graph_test.go @@ -760,6 +760,64 @@ func TestRemoveOfferOrderBook(t *testing.T) { } } +func TestFindOffers(t *testing.T) { + graph := NewOrderBookGraph() + + assertOfferListEquals( + t, + []xdr.OfferEntry{}, + graph.FindOffers(nativeAsset, eurAsset, 0), + ) + + assertOfferListEquals( + t, + []xdr.OfferEntry{}, + graph.FindOffers(nativeAsset, eurAsset, 5), + ) + + err := graph. + AddOffer(threeEurOffer). + AddOffer(eurOffer). + AddOffer(twoEurOffer). + Apply() + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + assertOfferListEquals( + t, + []xdr.OfferEntry{}, + graph.FindOffers(nativeAsset, eurAsset, 0), + ) + assertOfferListEquals( + t, + []xdr.OfferEntry{eurOffer, twoEurOffer}, + graph.FindOffers(nativeAsset, eurAsset, 2), + ) + + extraTwoEurOffers := []xdr.OfferEntry{} + for i := 0; i < 4; i++ { + otherTwoEurOffer := twoEurOffer + otherTwoEurOffer.OfferId += xdr.Int64(i + 17) + graph.AddOffer(otherTwoEurOffer) + extraTwoEurOffers = append(extraTwoEurOffers, otherTwoEurOffer) + } + if err := graph.Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + + assertOfferListEquals( + t, + append([]xdr.OfferEntry{eurOffer, twoEurOffer}, extraTwoEurOffers...), + graph.FindOffers(nativeAsset, eurAsset, 2), + ) + assertOfferListEquals( + t, + append(append([]xdr.OfferEntry{eurOffer, twoEurOffer}, extraTwoEurOffers...), threeEurOffer), + graph.FindOffers(nativeAsset, eurAsset, 3), + ) +} + func TestConsumeOffersForSellingAsset(t *testing.T) { kp, err := keypair.Random() if err != nil { diff --git a/services/horizon/internal/actions/helpers_test.go b/services/horizon/internal/actions/helpers_test.go index d7e0cc8711..07479b6219 100644 --- a/services/horizon/internal/actions/helpers_test.go +++ b/services/horizon/internal/actions/helpers_test.go @@ -498,3 +498,26 @@ func testURLParams() map[string]string { "long_12_asset_issuer": "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", } } + +func makeRequest( + t *testing.T, + queryParams map[string]string, + routeParams map[string]string, +) *http.Request { + request, err := http.NewRequest("GET", "/", nil) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + query := url.Values{} + for key, value := range queryParams { + query.Set(key, value) + } + request.URL.RawQuery = query.Encode() + + chiRouteContext := chi.NewRouteContext() + for key, value := range routeParams { + chiRouteContext.URLParams.Add(key, value) + } + ctx := context.WithValue(context.Background(), chi.RouteCtxKey, chiRouteContext) + return request.WithContext(ctx) +} diff --git a/services/horizon/internal/actions/offer_test.go b/services/horizon/internal/actions/offer_test.go index 261feae7d1..dc6d26d313 100644 --- a/services/horizon/internal/actions/offer_test.go +++ b/services/horizon/internal/actions/offer_test.go @@ -1,13 +1,9 @@ package actions import ( - "context" - "net/http" - "net/url" "testing" "time" - "github.com/go-chi/chi" "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/services/horizon/internal/db2/core" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -63,50 +59,6 @@ var ( } ) -func makeOffersRequest(t *testing.T, queryParams map[string]string) *http.Request { - request, err := http.NewRequest("GET", "/", nil) - if err != nil { - t.Fatalf("unexpected error %v", err) - } - query := url.Values{} - for key, value := range queryParams { - query.Set(key, value) - } - request.URL.RawQuery = query.Encode() - - ctx := context.WithValue(context.Background(), chi.RouteCtxKey, chi.NewRouteContext()) - return request.WithContext(ctx) -} - -func makeAccountOffersRequest( - t *testing.T, - accountID string, - queryParams map[string]string, -) *http.Request { - request, err := http.NewRequest("GET", "/", nil) - if err != nil { - t.Fatalf("unexpected error %v", err) - } - query := url.Values{} - for key, value := range queryParams { - query.Set(key, value) - } - request.URL.RawQuery = query.Encode() - - chiRouteContext := chi.NewRouteContext() - chiRouteContext.URLParams.Add("account_id", accountID) - ctx := context.WithValue(context.Background(), chi.RouteCtxKey, chiRouteContext) - return request.WithContext(ctx) -} - -func pageableToOffers(t *testing.T, page []hal.Pageable) []horizon.Offer { - var offers []horizon.Offer - for _, entry := range page { - offers = append(offers, entry.(horizon.Offer)) - } - return offers -} - func TestGetOffersHandler(t *testing.T) { tt := test.Start(t) defer tt.Finish() @@ -133,7 +85,7 @@ func TestGetOffersHandler(t *testing.T) { tt.Assert.NoError(q.UpsertOffer(usdOffer, 3)) t.Run("No filter", func(t *testing.T) { - records, err := handler.GetResourcePage(makeOffersRequest(t, map[string]string{})) + records, err := handler.GetResourcePage(makeRequest(t, map[string]string{}, map[string]string{})) tt.Assert.NoError(err) tt.Assert.Len(records, 3) @@ -149,11 +101,12 @@ func TestGetOffersHandler(t *testing.T) { }) t.Run("Filter by seller", func(t *testing.T) { - records, err := handler.GetResourcePage(makeOffersRequest( + records, err := handler.GetResourcePage(makeRequest( t, map[string]string{ "seller": issuer.Address(), }, + map[string]string{}, )) tt.Assert.NoError(err) tt.Assert.Len(records, 2) @@ -168,11 +121,12 @@ func TestGetOffersHandler(t *testing.T) { asset := horizon.Asset{} nativeAsset.Extract(&asset.Type, &asset.Code, &asset.Issuer) - records, err := handler.GetResourcePage(makeOffersRequest( + records, err := handler.GetResourcePage(makeRequest( t, map[string]string{ "selling_asset_type": asset.Type, }, + map[string]string{}, )) tt.Assert.NoError(err) tt.Assert.Len(records, 2) @@ -185,13 +139,14 @@ func TestGetOffersHandler(t *testing.T) { asset = horizon.Asset{} eurAsset.Extract(&asset.Type, &asset.Code, &asset.Issuer) - records, err = handler.GetResourcePage(makeOffersRequest( + records, err = handler.GetResourcePage(makeRequest( t, map[string]string{ "selling_asset_type": asset.Type, "selling_asset_code": asset.Code, "selling_asset_issuer": asset.Issuer, }, + map[string]string{}, )) tt.Assert.NoError(err) tt.Assert.Len(records, 1) @@ -204,13 +159,14 @@ func TestGetOffersHandler(t *testing.T) { asset := horizon.Asset{} eurAsset.Extract(&asset.Type, &asset.Code, &asset.Issuer) - records, err := handler.GetResourcePage(makeOffersRequest( + records, err := handler.GetResourcePage(makeRequest( t, map[string]string{ "buying_asset_type": asset.Type, "buying_asset_code": asset.Code, "buying_asset_issuer": asset.Issuer, }, + map[string]string{}, )) tt.Assert.NoError(err) tt.Assert.Len(records, 2) @@ -223,13 +179,14 @@ func TestGetOffersHandler(t *testing.T) { asset = horizon.Asset{} usdAsset.Extract(&asset.Type, &asset.Code, &asset.Issuer) - records, err = handler.GetResourcePage(makeOffersRequest( + records, err = handler.GetResourcePage(makeRequest( t, map[string]string{ "buying_asset_type": asset.Type, "buying_asset_code": asset.Code, "buying_asset_issuer": asset.Issuer, }, + map[string]string{}, )) tt.Assert.NoError(err) tt.Assert.Len(records, 1) @@ -256,7 +213,7 @@ func TestGetAccountOffersHandler(t *testing.T) { tt.Assert.NoError(q.UpsertOffer(usdOffer, 3)) records, err := handler.GetResourcePage( - makeAccountOffersRequest(t, issuer.Address(), map[string]string{}), + makeRequest(t, map[string]string{}, map[string]string{"account_id": issuer.Address()}), ) tt.Assert.NoError(err) tt.Assert.Len(records, 2) @@ -267,3 +224,11 @@ func TestGetAccountOffersHandler(t *testing.T) { tt.Assert.Equal(issuer.Address(), offer.Seller) } } + +func pageableToOffers(t *testing.T, page []hal.Pageable) []horizon.Offer { + var offers []horizon.Offer + for _, entry := range page { + offers = append(offers, entry.(horizon.Offer)) + } + return offers +} diff --git a/services/horizon/internal/actions/orderbook.go b/services/horizon/internal/actions/orderbook.go new file mode 100644 index 0000000000..e74c0b6b94 --- /dev/null +++ b/services/horizon/internal/actions/orderbook.go @@ -0,0 +1,147 @@ +package actions + +import ( + "context" + "net/http" + + "github.com/stellar/go/amount" + "github.com/stellar/go/exp/orderbook" + protocol "github.com/stellar/go/protocols/horizon" + "github.com/stellar/go/services/horizon/internal/resourceadapter" + "github.com/stellar/go/support/render/problem" + "github.com/stellar/go/xdr" +) + +// StreamableObjectResponse is an interface for objects returned by streamable object endpoints +// A streamable object endpoint is an SSE endpoint which returns a single JSON object response +// instead of a page of items. +type StreamableObjectResponse interface { + Equals(other StreamableObjectResponse) bool +} + +// OrderBookResponse is the response for the /orderbook_endpoint +// OrderBookResponse implements StreamableObjectResponse +type OrderBookResponse struct { + protocol.OrderBookSummary +} + +func priceLevelsEqual(a, b []protocol.PriceLevel) bool { + if len(a) != len(b) { + return false + } + + for i := range a { + if a[i] != b[i] { + return false + } + } + + return true +} + +// Equals returns true if the OrderBookResponse is equal to `other` +func (o OrderBookResponse) Equals(other StreamableObjectResponse) bool { + otherOrderBook, ok := other.(OrderBookResponse) + if !ok { + return false + } + return otherOrderBook.Selling == o.Selling && + otherOrderBook.Buying == o.Buying && + priceLevelsEqual(otherOrderBook.Bids, o.Bids) && + priceLevelsEqual(otherOrderBook.Asks, o.Asks) +} + +var invalidOrderBook = problem.P{ + Type: "invalid_order_book", + Title: "Invalid Order Book Parameters", + Status: http.StatusBadRequest, + Detail: "The parameters that specify what order book to view are invalid in some way. " + + "Please ensure that your type parameters (selling_asset_type and buying_asset_type) are one the " + + "following valid values: native, credit_alphanum4, credit_alphanum12. Also ensure that you " + + "have specified selling_asset_code and selling_asset_issuer if selling_asset_type is not 'native', as well " + + "as buying_asset_code and buying_asset_issuer if buying_asset_type is not 'native'", +} + +// GetOrderbookHandler is the action handler for the /order_book endpoint +type GetOrderbookHandler struct { + OrderBookGraph *orderbook.OrderBookGraph +} + +func offersToPriceLevels(offers []xdr.OfferEntry, invert bool) []protocol.PriceLevel { + result := []protocol.PriceLevel{} + + amountForPrice := map[xdr.Price]xdr.Int64{} + for _, offer := range offers { + amountForPrice[offer.Price] += offer.Amount + } + for _, offer := range offers { + total, ok := amountForPrice[offer.Price] + if !ok { + continue + } + + offerPrice := offer.Price + if invert { + offerPrice.Invert() + } + + result = append(result, protocol.PriceLevel{ + PriceR: protocol.Price{ + N: int32(offerPrice.N), + D: int32(offerPrice.D), + }, + Price: offerPrice.String(), + Amount: amount.String(total), + }) + + delete(amountForPrice, offerPrice) + } + + return result +} + +func (handler GetOrderbookHandler) orderBookSummary( + ctx context.Context, selling, buying xdr.Asset, limit int, +) (protocol.OrderBookSummary, error) { + response := protocol.OrderBookSummary{} + if err := resourceadapter.PopulateAsset(ctx, &response.Selling, selling); err != nil { + return response, err + } + if err := resourceadapter.PopulateAsset(ctx, &response.Buying, buying); err != nil { + return response, err + } + + response.Asks = offersToPriceLevels( + handler.OrderBookGraph.FindOffers(selling, buying, limit), + false, + ) + response.Bids = offersToPriceLevels( + handler.OrderBookGraph.FindOffers(buying, selling, limit), + true, + ) + + return response, nil +} + +// GetResource implements the /order_book endpoint +func (handler GetOrderbookHandler) GetResource(r *http.Request) (StreamableObjectResponse, error) { + selling, err := GetAsset(r, "selling_") + if err != nil { + return nil, invalidOrderBook + } + buying, err := GetAsset(r, "buying_") + if err != nil { + return nil, invalidOrderBook + } + limit, err := GetLimit(r, "limit", 20, 200) + if err != nil { + return nil, invalidOrderBook + } + + summary, err := handler.orderBookSummary(r.Context(), selling, buying, int(limit)) + if err != nil { + return nil, err + } + + return OrderBookResponse{summary}, nil +} diff --git a/services/horizon/internal/actions/orderbook_test.go b/services/horizon/internal/actions/orderbook_test.go new file mode 100644 index 0000000000..b422f2999a --- /dev/null +++ b/services/horizon/internal/actions/orderbook_test.go @@ -0,0 +1,642 @@ +package actions + +import ( + "strconv" + "testing" + + "github.com/stellar/go/exp/orderbook" + protocol "github.com/stellar/go/protocols/horizon" +) + +type intObject int + +func (i intObject) Equals(other StreamableObjectResponse) bool { + return i == other.(intObject) +} + +func TestOrderBookResponseEquals(t *testing.T) { + for _, testCase := range []struct { + name string + response protocol.OrderBookSummary + other StreamableObjectResponse + expected bool + }{ + { + "empty orderbook summary", + protocol.OrderBookSummary{}, + OrderBookResponse{}, + true, + }, + { + "types don't match", + protocol.OrderBookSummary{}, + intObject(0), + false, + }, + { + "buying asset doesn't match", + protocol.OrderBookSummary{ + Buying: protocol.Asset{ + Type: "native", + }, + Selling: protocol.Asset{ + Type: "native", + }, + }, + OrderBookResponse{ + protocol.OrderBookSummary{ + Buying: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Selling: protocol.Asset{ + Type: "native", + }, + }, + }, + false, + }, + { + "selling asset doesn't match", + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "native", + }, + Buying: protocol.Asset{ + Type: "native", + }, + }, + OrderBookResponse{ + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + }, + }, + false, + }, + { + "bid lengths don't match", + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Bids: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 1}, + Price: "1.0", + Amount: "123", + }, + }, + }, + OrderBookResponse{ + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Bids: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + }, + }, + }, + false, + }, + { + "ask lengths don't match", + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Asks: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 1}, + Price: "1.0", + Amount: "123", + }, + }, + }, + OrderBookResponse{ + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Asks: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + }, + }, + }, + false, + }, + { + "bids don't match", + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Bids: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 1}, + Price: "1.0", + Amount: "123", + }, + }, + }, + OrderBookResponse{ + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Bids: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: 2, D: 1}, + Price: "2.0", + Amount: "123", + }, + }, + }, + }, + false, + }, + { + "asks don't match", + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Asks: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 1}, + Price: "1.0", + Amount: "123", + }, + }, + }, + OrderBookResponse{ + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Asks: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 1}, + Price: "1.0", + Amount: "12", + }, + }, + }, + }, + false, + }, + { + "orderbook summaries match", + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Asks: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 1}, + Price: "1.0", + Amount: "123", + }, + }, + Bids: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 3}, + Price: "0.33", + Amount: "13", + }, + }, + }, + OrderBookResponse{ + protocol.OrderBookSummary{ + Selling: protocol.Asset{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU", + }, + Buying: protocol.Asset{ + Type: "native", + }, + Bids: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 3}, + Price: "0.33", + Amount: "13", + }, + }, + Asks: []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 2}, + Price: "0.5", + Amount: "123", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: 1, D: 1}, + Price: "1.0", + Amount: "123", + }, + }, + }, + }, + true, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + equals := (OrderBookResponse{testCase.response}).Equals(testCase.other) + if equals != testCase.expected { + t.Fatalf("expected %v but got %v", testCase.expected, equals) + } + }) + } +} + +func TestOrderbookGetResourceValidation(t *testing.T) { + graph := orderbook.NewOrderBookGraph() + handler := GetOrderbookHandler{ + OrderBookGraph: graph, + } + + var eurAssetType, eurAssetCode, eurAssetIssuer string + if err := eurAsset.Extract(&eurAssetType, &eurAssetCode, &eurAssetIssuer); err != nil { + t.Fatalf("cound not extract eur asset: %v", err) + } + var usdAssetType, usdAssetCode, usdAssetIssuer string + if err := eurAsset.Extract(&usdAssetType, &usdAssetCode, &usdAssetIssuer); err != nil { + t.Fatalf("cound not extract usd asset: %v", err) + } + + for _, testCase := range []struct { + name string + queryParams map[string]string + }{ + { + "missing all params", + map[string]string{}, + }, + { + "missing buying asset", + map[string]string{ + "selling_asset_type": eurAssetType, + "selling_asset_code": eurAssetCode, + "selling_asset_issuer": eurAssetIssuer, + "limit": "25", + }, + }, + { + "missing selling asset", + map[string]string{ + "buying_asset_type": eurAssetType, + "buying_asset_code": eurAssetCode, + "buying_asset_issuer": eurAssetIssuer, + "limit": "25", + }, + }, + { + "invalid buying asset", + map[string]string{ + "buying_asset_type": "invalid", + "buying_asset_code": eurAssetCode, + "buying_asset_issuer": eurAssetIssuer, + "selling_asset_type": usdAssetType, + "selling_asset_code": usdAssetCode, + "selling_asset_issuer": usdAssetIssuer, + "limit": "25", + }, + }, + { + "invalid selling asset", + map[string]string{ + "buying_asset_type": eurAssetType, + "buying_asset_code": eurAssetCode, + "buying_asset_issuer": eurAssetIssuer, + "selling_asset_type": "invalid", + "selling_asset_code": usdAssetCode, + "selling_asset_issuer": usdAssetIssuer, + "limit": "25", + }, + }, + { + "limit is not a number", + map[string]string{ + "buying_asset_type": eurAssetType, + "buying_asset_code": eurAssetCode, + "buying_asset_issuer": eurAssetIssuer, + "selling_asset_type": usdAssetType, + "selling_asset_code": usdAssetCode, + "selling_asset_issuer": usdAssetIssuer, + "limit": "avcdef", + }, + }, + { + "limit is negative", + map[string]string{ + "buying_asset_type": eurAssetType, + "buying_asset_code": eurAssetCode, + "buying_asset_issuer": eurAssetIssuer, + "selling_asset_type": usdAssetType, + "selling_asset_code": usdAssetCode, + "selling_asset_issuer": usdAssetIssuer, + "limit": "-1", + }, + }, + { + "limit is too high", + map[string]string{ + "buying_asset_type": eurAssetType, + "buying_asset_code": eurAssetCode, + "buying_asset_issuer": eurAssetIssuer, + "selling_asset_type": usdAssetType, + "selling_asset_code": usdAssetCode, + "selling_asset_issuer": usdAssetIssuer, + "limit": "20000", + }, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + r := makeRequest(t, testCase.queryParams, map[string]string{}) + _, err := handler.GetResource(r) + if err == nil || err.Error() != invalidOrderBook.Error() { + t.Fatalf("expected error %v but got %v", invalidOrderBook, err) + } + }) + } +} + +func TestOrderbookGetResource(t *testing.T) { + var eurAssetType, eurAssetCode, eurAssetIssuer string + if err := eurAsset.Extract(&eurAssetType, &eurAssetCode, &eurAssetIssuer); err != nil { + t.Fatalf("cound not extract eur asset: %v", err) + } + + empty := OrderBookResponse{ + OrderBookSummary: protocol.OrderBookSummary{ + Bids: []protocol.PriceLevel{}, + Asks: []protocol.PriceLevel{}, + Selling: protocol.Asset{ + Type: "native", + }, + Buying: protocol.Asset{ + Type: eurAssetType, + Code: eurAssetCode, + Issuer: eurAssetIssuer, + }, + }, + } + + asksButNoBidsGraph := orderbook.NewOrderBookGraph() + if err := asksButNoBidsGraph.AddOffer(twoEurOffer).Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + asksButNoBidsResponse := empty + asksButNoBidsResponse.Asks = []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: int32(twoEurOffer.Price.N), D: int32(twoEurOffer.Price.D)}, + Price: "2.0000000", + Amount: "0.0000500", + }, + } + + sellEurOffer := twoEurOffer + sellEurOffer.Buying, sellEurOffer.Selling = sellEurOffer.Selling, sellEurOffer.Buying + sellEurOffer.OfferId = 15 + bidsButNoAsksGraph := orderbook.NewOrderBookGraph() + if err := bidsButNoAsksGraph.AddOffer(sellEurOffer).Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + bidsButNoAsksResponse := empty + bidsButNoAsksResponse.Bids = []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: int32(sellEurOffer.Price.D), D: int32(sellEurOffer.Price.N)}, + Price: "0.5000000", + Amount: "0.0000500", + }, + } + + fullGraph := orderbook.NewOrderBookGraph() + if err := fullGraph.AddOffer(twoEurOffer).Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + otherEurOffer := twoEurOffer + otherEurOffer.Amount = 10000 + otherEurOffer.OfferId = 16 + if err := fullGraph.AddOffer(otherEurOffer).Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + threeEurOffer := twoEurOffer + threeEurOffer.Price.N = 3 + threeEurOffer.OfferId = 20 + if err := fullGraph.AddOffer(threeEurOffer).Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + + sellEurOffer.Price.N = 9 + sellEurOffer.Price.D = 10 + if err := fullGraph.AddOffer(sellEurOffer).Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + otherSellEurOffer := sellEurOffer + otherSellEurOffer.OfferId = 17 + otherSellEurOffer.Price.N *= 2 + if err := fullGraph.AddOffer(otherSellEurOffer).Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + + fullResponse := empty + fullResponse.Asks = []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: int32(twoEurOffer.Price.N), D: int32(twoEurOffer.Price.D)}, + Price: "2.0000000", + Amount: "0.0010500", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: int32(threeEurOffer.Price.N), D: int32(threeEurOffer.Price.D)}, + Price: "3.0000000", + Amount: "0.0000500", + }, + } + fullResponse.Bids = []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: int32(sellEurOffer.Price.D), D: int32(sellEurOffer.Price.N)}, + Price: "1.1111111", + Amount: "0.0000500", + }, + protocol.PriceLevel{ + PriceR: protocol.Price{N: int32(otherSellEurOffer.Price.D), D: int32(otherSellEurOffer.Price.N)}, + Price: "0.5555556", + Amount: "0.0000500", + }, + } + + limitResponse := empty + limitResponse.Asks = []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: int32(twoEurOffer.Price.N), D: int32(twoEurOffer.Price.D)}, + Price: "2.0000000", + Amount: "0.0010500", + }, + } + limitResponse.Bids = []protocol.PriceLevel{ + protocol.PriceLevel{ + PriceR: protocol.Price{N: int32(sellEurOffer.Price.D), D: int32(sellEurOffer.Price.N)}, + Price: "1.1111111", + Amount: "0.0000500", + }, + } + + for _, testCase := range []struct { + name string + graph *orderbook.OrderBookGraph + limit int + expected OrderBookResponse + }{ + { + "empty orderbook", + orderbook.NewOrderBookGraph(), + 10, + empty, + }, + { + "orderbook with asks but no bids", + asksButNoBidsGraph, + 10, + asksButNoBidsResponse, + }, + { + "orderbook with bids but no asks", + bidsButNoAsksGraph, + 10, + bidsButNoAsksResponse, + }, + { + "full orderbook", + fullGraph, + 10, + fullResponse, + }, + { + "limit request", + fullGraph, + 1, + limitResponse, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + handler := GetOrderbookHandler{ + OrderBookGraph: testCase.graph, + } + r := makeRequest( + t, + map[string]string{ + "buying_asset_type": eurAssetType, + "buying_asset_code": eurAssetCode, + "buying_asset_issuer": eurAssetIssuer, + "selling_asset_type": "native", + "limit": strconv.Itoa(testCase.limit), + }, + map[string]string{}, + ) + response, err := handler.GetResource(r) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + if !response.Equals(testCase.expected) { + t.Fatalf("expected %v but got %v", testCase.expected, response) + } + }) + } +} diff --git a/services/horizon/internal/actions_path_test.go b/services/horizon/internal/actions_path_test.go index 9f319dfdff..c02fd1370b 100644 --- a/services/horizon/internal/actions_path_test.go +++ b/services/horizon/internal/actions_path_test.go @@ -121,6 +121,7 @@ func TestPathActionsStateInvalid(t *testing.T) { rh.App.web.mustInstallActions( rh.App.config, simplepath.NewInMemoryFinder(orderBookGraph), + orderBookGraph, ) rh.RH = test.NewRequestHelper(rh.App.web.router) diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index a82de98ba2..259c0424a8 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -432,7 +432,7 @@ func (a *App) init() { a.web.mustInstallMiddlewares(a, a.config.ConnectionTimeout) // web.actions - a.web.mustInstallActions(a.config, a.paths) + a.web.mustInstallActions(a.config, a.paths, orderBookGraph) // metrics and log.metrics a.metrics = metrics.NewRegistry() diff --git a/services/horizon/internal/handler.go b/services/horizon/internal/handler.go index 8bde0d6b20..14752443b8 100644 --- a/services/horizon/internal/handler.go +++ b/services/horizon/internal/handler.go @@ -403,6 +403,62 @@ func validateCursorWithinHistory(pq db2.PageQuery) error { return nil } +const singleObjectStreamLimit = 10 + +type streamableObjectAction interface { + GetResource(r *http.Request) (actions.StreamableObjectResponse, error) +} + +type streamableObjectActionHandler struct { + action streamableObjectAction + streamHandler sse.StreamHandler +} + +func (handler streamableObjectActionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch render.Negotiate(r) { + case render.MimeHal, render.MimeJSON: + response, err := handler.action.GetResource(r) + if err != nil { + problem.Render(r.Context(), w, err) + return + } + + httpjson.Render( + w, + response, + httpjson.HALJSON, + ) + return + case render.MimeEventStream: + handler.renderStream(w, r) + return + } + + problem.Render(r.Context(), w, hProblem.NotAcceptable) +} + +func (handler streamableObjectActionHandler) renderStream(w http.ResponseWriter, r *http.Request) { + var lastResponse actions.StreamableObjectResponse + + handler.streamHandler.ServeStream( + w, + r, + singleObjectStreamLimit, + func() ([]sse.Event, error) { + response, err := handler.action.GetResource(r) + if err != nil { + return nil, err + } + + if lastResponse == nil || !lastResponse.Equals(response) { + lastResponse = response + return []sse.Event{sse.Event{Data: response}}, nil + } + return []sse.Event{}, nil + }, + ) +} + type pageAction interface { GetResourcePage(r *http.Request) ([]hal.Pageable, error) } diff --git a/services/horizon/internal/ledger/ledger_source.go b/services/horizon/internal/ledger/ledger_source.go index c6ff13ee84..324ed7d5d2 100644 --- a/services/horizon/internal/ledger/ledger_source.go +++ b/services/horizon/internal/ledger/ledger_source.go @@ -2,6 +2,7 @@ package ledger import ( "context" + "sync" "time" ) @@ -62,6 +63,7 @@ func (source HistoryDBSource) NextLedger(currentSequence uint32) chan uint32 { type TestingSource struct { currentLedger uint32 newLedgers chan uint32 + lock *sync.RWMutex } // NewTestingSource returns a TestingSource. @@ -69,11 +71,14 @@ func NewTestingSource(currentLedger uint32) *TestingSource { return &TestingSource{ currentLedger: currentLedger, newLedgers: make(chan uint32), + lock: &sync.RWMutex{}, } } // CurrentLedger returns the current ledger. func (source *TestingSource) CurrentLedger() uint32 { + source.lock.RLock() + defer source.lock.RUnlock() return source.currentLedger } @@ -106,5 +111,20 @@ func (source *TestingSource) TryAddLedger( // NextLedger returns a channel which yields every time there is a new ledger. func (source *TestingSource) NextLedger(currentSequence uint32) chan uint32 { - return source.newLedgers + response := make(chan uint32, 1) + + go func() { + for { + nextLedger := <-source.newLedgers + if nextLedger > source.currentLedger { + source.lock.Lock() + defer source.lock.Unlock() + source.currentLedger = nextLedger + response <- nextLedger + return + } + } + }() + + return response } diff --git a/services/horizon/internal/stream_handler_test.go b/services/horizon/internal/stream_handler_test.go index f19ef37526..b14a893ae0 100644 --- a/services/horizon/internal/stream_handler_test.go +++ b/services/horizon/internal/stream_handler_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/go-chi/chi" + "github.com/stellar/go/services/horizon/internal/actions" "github.com/stellar/go/services/horizon/internal/ledger" "github.com/stellar/go/services/horizon/internal/render/sse" "github.com/stellar/go/support/render/hal" @@ -20,46 +21,78 @@ import ( // StreamTest utility struct to wrap SSE related tests. type StreamTest struct { - action pageAction - ledgerSource *ledger.TestingSource - cancel context.CancelFunc - wg *sync.WaitGroup - ctx context.Context + ledgerSource *ledger.TestingSource + cancel context.CancelFunc + wg *sync.WaitGroup + w *httptest.ResponseRecorder + checkResponse func(w *httptest.ResponseRecorder) + ctx context.Context } -// NewStreamTest executes an SSE related test, letting you simulate ledger closings via -// AddLedger. -func NewStreamTest( - action pageAction, - currentLedger uint32, +func newStreamTest( + handler http.HandlerFunc, + ledgerSource *ledger.TestingSource, request *http.Request, checkResponse func(w *httptest.ResponseRecorder), ) *StreamTest { s := &StreamTest{ - action: action, - ledgerSource: ledger.NewTestingSource(currentLedger), - wg: &sync.WaitGroup{}, + ledgerSource: ledgerSource, + w: httptest.NewRecorder(), + checkResponse: checkResponse, + wg: &sync.WaitGroup{}, } s.ctx, s.cancel = context.WithCancel(request.Context()) - streamHandler := sse.StreamHandler{ - LedgerSource: s.ledgerSource, - } - handler := streamablePageHandler(s.action, streamHandler) - s.wg.Add(1) go func() { - w := httptest.NewRecorder() - handler.renderStream(w, request.WithContext(s.ctx)) + handler(s.w, request.WithContext(s.ctx)) s.wg.Done() s.cancel() - - checkResponse(w) }() return s } +// NewstreamableObjectTest tests the SSE functionality of a pageAction +func NewStreamablePageTest( + action *testPageAction, + currentLedger uint32, + request *http.Request, + checkResponse func(w *httptest.ResponseRecorder), +) *StreamTest { + ledgerSource := ledger.NewTestingSource(currentLedger) + action.ledgerSource = ledgerSource + streamHandler := sse.StreamHandler{LedgerSource: ledgerSource} + handler := streamablePageHandler(action, streamHandler) + + return newStreamTest( + handler.renderStream, + ledgerSource, + request, + checkResponse, + ) +} + +// NewstreamableObjectTest tests the SSE functionality of a streamableObjectAction +func NewstreamableObjectTest( + action *testObjectAction, + currentLedger uint32, + request *http.Request, + checkResponse func(w *httptest.ResponseRecorder), +) *StreamTest { + ledgerSource := ledger.NewTestingSource(currentLedger) + action.ledgerSource = ledgerSource + streamHandler := sse.StreamHandler{LedgerSource: ledgerSource} + handler := streamableObjectActionHandler{action, streamHandler} + + return newStreamTest( + handler.renderStream, + ledgerSource, + request, + checkResponse, + ) +} + // AddLedger pushes a new ledger to the stream handler. AddLedger() will block until // the new ledger has been read by the stream handler func (s *StreamTest) AddLedger(sequence uint32) { @@ -81,10 +114,11 @@ func (s *StreamTest) Wait(expectLimitReached bool) { if !expectLimitReached { // first send a ledger to the stream handler so we can ensure that at least one // iteration of the stream loop has been executed - s.TryAddLedger(0) + s.TryAddLedger(s.ledgerSource.CurrentLedger() + 1) s.cancel() } s.wg.Wait() + s.checkResponse(s.w) } type testPage struct { @@ -97,19 +131,15 @@ func (p testPage) PagingToken() string { } type testPageAction struct { - objects []string - lock sync.Mutex -} - -func (action *testPageAction) appendObjects(objects ...string) { - action.lock.Lock() - defer action.lock.Unlock() - action.objects = append(action.objects, objects...) + objects map[uint32][]string + ledgerSource ledger.Source } func (action *testPageAction) GetResourcePage(r *http.Request) ([]hal.Pageable, error) { - action.lock.Lock() - defer action.lock.Unlock() + objects, ok := action.objects[action.ledgerSource.CurrentLedger()] + if !ok { + return nil, fmt.Errorf("unexpected ledger") + } cursor := r.Header.Get("Last-Event-ID") if cursor == "" { @@ -123,7 +153,7 @@ func (action *testPageAction) GetResourcePage(r *http.Request) ([]hal.Pageable, return nil, err } - limit := len(action.objects) + limit := len(objects) if limitParam := r.URL.Query().Get("limit"); limitParam != "" { limit, err = strconv.Atoi(limitParam) if err != nil { @@ -135,11 +165,12 @@ func (action *testPageAction) GetResourcePage(r *http.Request) ([]hal.Pageable, return nil, fmt.Errorf("cursor cannot be negative") } - if parsedCursor >= len(action.objects) { + if parsedCursor >= len(objects) { return []hal.Pageable{}, nil } + response := []hal.Pageable{} - for i, object := range action.objects[parsedCursor:] { + for i, object := range objects[parsedCursor:] { if len(response) >= limit { break } @@ -161,18 +192,31 @@ func streamRequest(t *testing.T, queryParams string) *http.Request { return request } -func expectResponse(t *testing.T, expectedResponse []string) func(*httptest.ResponseRecorder) { +func unmarashalPage(jsonString string) (string, error) { + var page testPage + err := json.Unmarshal([]byte(jsonString), &page) + return page.Value, err +} + +func expectResponse( + t *testing.T, + unmarshal func(string) (string, error), + expectedResponse []string, +) func(*httptest.ResponseRecorder) { return func(w *httptest.ResponseRecorder) { var response []string for _, line := range strings.Split(w.Body.String(), "\n") { - if strings.HasPrefix(line, "data: {") { + if line == "data: \"hello\"" || line == "data: \"byebye\"" { + continue + } + + if strings.HasPrefix(line, "data: ") { jsonString := line[len("data: "):] - var page testPage - err := json.Unmarshal([]byte(jsonString), &page) + value, err := unmarshal(jsonString) if err != nil { t.Fatalf("could not parse json %v", err) } - response = append(response, page.Value) + response = append(response, value) } } @@ -188,91 +232,227 @@ func expectResponse(t *testing.T, expectedResponse []string) func(*httptest.Resp } } -func TestRenderStream(t *testing.T) { - action := &testPageAction{ - objects: []string{"a", "b", "c"}, - } - +func TestPageStream(t *testing.T) { t.Run("without offset", func(t *testing.T) { request := streamRequest(t, "") - st := NewStreamTest( + action := &testPageAction{ + objects: map[uint32][]string{ + 3: []string{"a", "b", "c"}, + 4: []string{"a", "b", "c", "d", "e"}, + 6: []string{"a", "b", "c", "d", "e", "f"}, + 7: []string{"a", "b", "c", "d", "e", "f"}, + }, + } + st := NewStreamablePageTest( action, 3, request, - expectResponse(t, []string{"a", "b", "c", "d", "e", "f"}), + expectResponse(t, unmarashalPage, []string{"a", "b", "c", "d", "e", "f"}), ) st.AddLedger(4) - action.appendObjects("d", "e") - st.AddLedger(6) - action.appendObjects("f") st.Wait(false) }) - action.objects = []string{"a", "b", "c"} t.Run("with offset", func(t *testing.T) { request := streamRequest(t, "cursor=1") - st := NewStreamTest( + action := &testPageAction{ + objects: map[uint32][]string{ + 3: []string{"a", "b", "c"}, + 4: []string{"a", "b", "c", "d", "e"}, + 6: []string{"a", "b", "c", "d", "e", "f"}, + 7: []string{"a", "b", "c", "d", "e", "f"}, + }, + } + st := NewStreamablePageTest( action, 3, request, - expectResponse(t, []string{"b", "c", "d", "e", "f"}), + expectResponse(t, unmarashalPage, []string{"b", "c", "d", "e", "f"}), ) st.AddLedger(4) - action.appendObjects("d", "e") - st.AddLedger(6) - action.appendObjects("f") st.Wait(false) }) - action.objects = []string{"a", "b", "c"} t.Run("with limit", func(t *testing.T) { request := streamRequest(t, "limit=2") - st := NewStreamTest( + action := &testPageAction{ + objects: map[uint32][]string{ + 3: []string{"a", "b", "c"}, + }, + } + st := NewStreamablePageTest( action, 3, request, - expectResponse(t, []string{"a", "b"}), + expectResponse(t, unmarashalPage, []string{"a", "b"}), ) st.Wait(true) }) - action.objects = []string{"a", "b", "c", "d", "e"} t.Run("with limit and offset", func(t *testing.T) { request := streamRequest(t, "limit=2&cursor=1") - st := NewStreamTest( + action := &testPageAction{ + objects: map[uint32][]string{ + 3: []string{"a", "b", "c", "d", "e"}, + }, + } + st := NewStreamablePageTest( action, 3, request, - expectResponse(t, []string{"b", "c"}), + expectResponse(t, unmarashalPage, []string{"b", "c"}), ) st.Wait(true) }) - action.objects = []string{"a"} t.Run("reach limit after multiple iterations", func(t *testing.T) { request := streamRequest(t, "limit=3&cursor=1") - st := NewStreamTest( + action := &testPageAction{ + objects: map[uint32][]string{ + 3: []string{"a"}, + 4: []string{"a", "b"}, + 5: []string{"a", "b", "c", "d", "e", "f", "g"}, + }, + } + st := NewStreamablePageTest( action, 3, request, - expectResponse(t, []string{"b", "c", "d"}), + expectResponse(t, unmarashalPage, []string{"b", "c", "d"}), ) st.AddLedger(4) - action.appendObjects("b") + st.AddLedger(5) + + st.Wait(true) + }) +} +type stringObject string + +func (s stringObject) Equals(other actions.StreamableObjectResponse) bool { + otherString, ok := other.(stringObject) + if !ok { + return false + } + return s == otherString +} + +func unmarashalString(jsonString string) (string, error) { + var object stringObject + err := json.Unmarshal([]byte(jsonString), &object) + return string(object), err +} + +type testObjectAction struct { + objects map[uint32]stringObject + ledgerSource ledger.Source +} + +func (action *testObjectAction) GetResource(r *http.Request) (actions.StreamableObjectResponse, error) { + ledger := action.ledgerSource.CurrentLedger() + object, ok := action.objects[ledger] + if !ok { + return nil, fmt.Errorf("unexpected ledger") + } + + return object, nil +} + +func TestObjectStream(t *testing.T) { + t.Run("without interior duplicates", func(t *testing.T) { + request := streamRequest(t, "") + action := &testObjectAction{ + objects: map[uint32]stringObject{ + 3: "a", + 4: "b", + 5: "c", + 6: "c", + }, + } + + st := NewstreamableObjectTest( + action, + 3, + request, + expectResponse(t, unmarashalString, []string{"a", "b", "c"}), + ) + + st.AddLedger(4) + st.AddLedger(5) + st.Wait(false) + }) + + t.Run("with interior duplicates", func(t *testing.T) { + request := streamRequest(t, "") + action := &testObjectAction{ + objects: map[uint32]stringObject{ + 3: "a", + 4: "b", + 5: "b", + 6: "c", + 7: "c", + }, + } + + st := NewstreamableObjectTest( + action, + 3, + request, + expectResponse(t, unmarashalString, []string{"a", "b", "c"}), + ) + + st.AddLedger(4) st.AddLedger(5) - action.appendObjects("c", "d", "e", "f", "g") + st.AddLedger(6) + + st.Wait(false) + }) + + t.Run("limit reached", func(t *testing.T) { + request := streamRequest(t, "") + action := &testObjectAction{ + objects: map[uint32]stringObject{ + 1: "a", + 2: "b", + 3: "b", + 4: "c", + 5: "d", + 6: "e", + 7: "f", + 8: "g", + 9: "h", + 10: "i", + 11: "j", + 12: "k", + }, + } + + st := NewstreamableObjectTest( + action, + 1, + request, + expectResponse( + t, + unmarashalString, + []string{ + "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", + }, + ), + ) + + for i := uint32(1); i <= 11; i++ { + st.AddLedger(i) + } - st.TryAddLedger(0) st.Wait(true) }) } diff --git a/services/horizon/internal/web.go b/services/horizon/internal/web.go index 5ce49e0938..98df2ddf6c 100644 --- a/services/horizon/internal/web.go +++ b/services/horizon/internal/web.go @@ -14,6 +14,7 @@ import ( "github.com/rs/cors" "github.com/sebest/xff" + "github.com/stellar/go/exp/orderbook" "github.com/stellar/go/services/horizon/internal/actions" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/services/horizon/internal/db2/core" @@ -152,7 +153,11 @@ func installAccountOfferRoute( // mustInstallActions installs the routing configuration of horizon onto the // provided app. All route registration should be implemented here. -func (w *web) mustInstallActions(config Config, pathFinder paths.Finder) { +func (w *web) mustInstallActions( + config Config, + pathFinder paths.Finder, + orderBookGraph *orderbook.OrderBookGraph, +) { if w == nil { log.Fatal("missing web instance for installing web actions") } @@ -242,7 +247,21 @@ func (w *web) mustInstallActions(config Config, pathFinder paths.Finder) { Get("/{id}", getOfferResource) r.Get("/{offer_id}/trades", TradeIndexAction{}.Handle) }) - r.Get("/order_book", OrderBookShowAction{}.Handle) + + if config.EnableExperimentalIngestion { + r.With(requiresExperimentalIngestion).Method( + http.MethodGet, + "/order_book", + streamableObjectActionHandler{ + streamHandler: streamHandler, + action: actions.GetOrderbookHandler{ + OrderBookGraph: orderBookGraph, + }, + }, + ) + } else { + r.Get("/order_book", OrderBookShowAction{}.Handle) + } // Transaction submission API r.Post("/transactions", TransactionCreateAction{}.Handle) From 9ce144abe98d0358bdcb467217244994e1c6c0da Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Wed, 25 Sep 2019 18:48:46 +0200 Subject: [PATCH 2/4] Code review fixes --- exp/orderbook/graph.go | 41 ++++++--- exp/orderbook/graph_test.go | 91 +++++++++++++++++-- .../horizon/internal/actions/orderbook.go | 14 +-- 3 files changed, 118 insertions(+), 28 deletions(-) diff --git a/exp/orderbook/graph.go b/exp/orderbook/graph.go index 1aaa32f3d8..d33fc339bb 100644 --- a/exp/orderbook/graph.go +++ b/exp/orderbook/graph.go @@ -119,30 +119,27 @@ func (graph *OrderBookGraph) batch() *orderBookBatchedUpdates { } } -// FindOffers returns all offers for a given trading pair +// findOffers returns all offers for a given trading pair // The offers will be sorted by price from cheapest to most expensive -// The returned offers will span at most `limit` price levels -func (graph *OrderBookGraph) FindOffers(selling, buying xdr.Asset, limit int) []xdr.OfferEntry { +// The returned offers will span at most `maxPriceLevels` price levels +func (graph *OrderBookGraph) findOffers( + selling, buying string, maxPriceLevels int, +) []xdr.OfferEntry { results := []xdr.OfferEntry{} - buyingString := buying.String() - sellingString := selling.String() - - graph.lock.RLock() - defer graph.lock.RUnlock() - edges, ok := graph.edgesForSellingAsset[sellingString] + edges, ok := graph.edgesForSellingAsset[selling] if !ok { return results } - offers, ok := edges[buyingString] + offers, ok := edges[buying] if !ok { return results } for _, offer := range offers { if len(results) == 0 || results[len(results)-1].Price != offer.Price { - limit-- + maxPriceLevels-- } - if limit < 0 { + if maxPriceLevels < 0 { return results } @@ -151,6 +148,26 @@ func (graph *OrderBookGraph) FindOffers(selling, buying xdr.Asset, limit int) [] return results } +// FindAsksAndBids returns all asks and bids for a given trading pair +// Asks consists of all offers which sell `selling` in exchange for `buying` sorted by +// price (in terms of `buying`) from cheapest to most expensive +// Bids consists of all offers which sell `buying` in exchange for `selling` sorted by +// price (in terms of `selling`) from cheapest to most expensive +// Both Asks and Bids will span at most `maxPriceLevels` price levels +func (graph *OrderBookGraph) FindAsksAndBids( + selling, buying xdr.Asset, maxPriceLevels int, +) ([]xdr.OfferEntry, []xdr.OfferEntry) { + buyingString := buying.String() + sellingString := selling.String() + + graph.lock.RLock() + defer graph.lock.RUnlock() + asks := graph.findOffers(sellingString, buyingString, maxPriceLevels) + bids := graph.findOffers(buyingString, sellingString, maxPriceLevels) + + return asks, bids +} + // add inserts a given offer into the order book graph func (graph *OrderBookGraph) add(offer xdr.OfferEntry) error { if _, contains := graph.tradingPairForOffer[offer.OfferId]; contains { diff --git a/exp/orderbook/graph_test.go b/exp/orderbook/graph_test.go index 9a5b056d16..8f6788c54d 100644 --- a/exp/orderbook/graph_test.go +++ b/exp/orderbook/graph_test.go @@ -766,13 +766,13 @@ func TestFindOffers(t *testing.T) { assertOfferListEquals( t, []xdr.OfferEntry{}, - graph.FindOffers(nativeAsset, eurAsset, 0), + graph.findOffers(nativeAsset.String(), eurAsset.String(), 0), ) assertOfferListEquals( t, []xdr.OfferEntry{}, - graph.FindOffers(nativeAsset, eurAsset, 5), + graph.findOffers(nativeAsset.String(), eurAsset.String(), 5), ) err := graph. @@ -787,12 +787,12 @@ func TestFindOffers(t *testing.T) { assertOfferListEquals( t, []xdr.OfferEntry{}, - graph.FindOffers(nativeAsset, eurAsset, 0), + graph.findOffers(nativeAsset.String(), eurAsset.String(), 0), ) assertOfferListEquals( t, []xdr.OfferEntry{eurOffer, twoEurOffer}, - graph.FindOffers(nativeAsset, eurAsset, 2), + graph.findOffers(nativeAsset.String(), eurAsset.String(), 2), ) extraTwoEurOffers := []xdr.OfferEntry{} @@ -809,12 +809,91 @@ func TestFindOffers(t *testing.T) { assertOfferListEquals( t, append([]xdr.OfferEntry{eurOffer, twoEurOffer}, extraTwoEurOffers...), - graph.FindOffers(nativeAsset, eurAsset, 2), + graph.findOffers(nativeAsset.String(), eurAsset.String(), 2), ) assertOfferListEquals( t, append(append([]xdr.OfferEntry{eurOffer, twoEurOffer}, extraTwoEurOffers...), threeEurOffer), - graph.FindOffers(nativeAsset, eurAsset, 3), + graph.findOffers(nativeAsset.String(), eurAsset.String(), 3), + ) +} + +func TestFindAsksAndBids(t *testing.T) { + graph := NewOrderBookGraph() + + asks, bids := graph.FindAsksAndBids(nativeAsset, eurAsset, 0) + assertOfferListEquals( + t, + []xdr.OfferEntry{}, + asks, + ) + assertOfferListEquals( + t, + []xdr.OfferEntry{}, + bids, + ) + + asks, bids = graph.FindAsksAndBids(nativeAsset, eurAsset, 5) + assertOfferListEquals( + t, + []xdr.OfferEntry{}, + asks, + ) + assertOfferListEquals( + t, + []xdr.OfferEntry{}, + bids, + ) + + err := graph. + AddOffer(threeEurOffer). + AddOffer(eurOffer). + AddOffer(twoEurOffer). + Apply() + if err != nil { + t.Fatalf("unexpected error %v", err) + } + + asks, bids = graph.FindAsksAndBids(nativeAsset, eurAsset, 0) + assertOfferListEquals( + t, + []xdr.OfferEntry{}, + asks, + ) + assertOfferListEquals( + t, + []xdr.OfferEntry{}, + bids, + ) + + extraTwoEurOffers := []xdr.OfferEntry{} + for i := 0; i < 4; i++ { + otherTwoEurOffer := twoEurOffer + otherTwoEurOffer.OfferId += xdr.Int64(i + 17) + graph.AddOffer(otherTwoEurOffer) + extraTwoEurOffers = append(extraTwoEurOffers, otherTwoEurOffer) + } + if err := graph.Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + + sellEurOffer := twoEurOffer + sellEurOffer.Buying, sellEurOffer.Selling = sellEurOffer.Selling, sellEurOffer.Buying + sellEurOffer.OfferId = 35 + if err := graph.AddOffer(sellEurOffer).Apply(); err != nil { + t.Fatalf("unexpected error %v", err) + } + + asks, bids = graph.FindAsksAndBids(nativeAsset, eurAsset, 3) + assertOfferListEquals( + t, + append(append([]xdr.OfferEntry{eurOffer, twoEurOffer}, extraTwoEurOffers...), threeEurOffer), + asks, + ) + assertOfferListEquals( + t, + []xdr.OfferEntry{sellEurOffer}, + bids, ) } diff --git a/services/horizon/internal/actions/orderbook.go b/services/horizon/internal/actions/orderbook.go index e74c0b6b94..e622d7c149 100644 --- a/services/horizon/internal/actions/orderbook.go +++ b/services/horizon/internal/actions/orderbook.go @@ -79,6 +79,7 @@ func offersToPriceLevels(offers []xdr.OfferEntry, invert bool) []protocol.PriceL if !ok { continue } + delete(amountForPrice, offer.Price) offerPrice := offer.Price if invert { @@ -93,8 +94,6 @@ func offersToPriceLevels(offers []xdr.OfferEntry, invert bool) []protocol.PriceL Price: offerPrice.String(), Amount: amount.String(total), }) - - delete(amountForPrice, offerPrice) } return result @@ -111,14 +110,9 @@ func (handler GetOrderbookHandler) orderBookSummary( return response, err } - response.Asks = offersToPriceLevels( - handler.OrderBookGraph.FindOffers(selling, buying, limit), - false, - ) - response.Bids = offersToPriceLevels( - handler.OrderBookGraph.FindOffers(buying, selling, limit), - true, - ) + asks, bids := handler.OrderBookGraph.FindAsksAndBids(selling, buying, limit) + response.Asks = offersToPriceLevels(asks, false) + response.Bids = offersToPriceLevels(bids, true) return response, nil } From 6a3263e131a81a1078d10d2d89e9947f469101b9 Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Wed, 25 Sep 2019 21:42:52 +0200 Subject: [PATCH 3/4] Fixed overflow error and struct comment --- amount/main.go | 7 +++++++ services/horizon/internal/actions/orderbook.go | 14 ++++++++++---- .../horizon/internal/actions/orderbook_test.go | 8 +++++--- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/amount/main.go b/amount/main.go index b1b7bc1474..acebc16257 100644 --- a/amount/main.go +++ b/amount/main.go @@ -123,3 +123,10 @@ func StringFromInt64(v int64) string { r.Quo(r, bigOne) return r.FloatString(7) } + +// StringFromBigInt returns an "amount string" from the provided raw *big.Int value `v`. +func StringFromBigInt(v *big.Int) string { + r := new(big.Rat).SetInt(v) + r.Quo(r, bigOne) + return r.FloatString(7) +} diff --git a/services/horizon/internal/actions/orderbook.go b/services/horizon/internal/actions/orderbook.go index e622d7c149..ee34301b9e 100644 --- a/services/horizon/internal/actions/orderbook.go +++ b/services/horizon/internal/actions/orderbook.go @@ -2,6 +2,7 @@ package actions import ( "context" + "math/big" "net/http" "github.com/stellar/go/amount" @@ -19,7 +20,7 @@ type StreamableObjectResponse interface { Equals(other StreamableObjectResponse) bool } -// OrderBookResponse is the response for the /orderbook_endpoint +// OrderBookResponse is the response for the /order_book endpoint // OrderBookResponse implements StreamableObjectResponse type OrderBookResponse struct { protocol.OrderBookSummary @@ -70,9 +71,14 @@ type GetOrderbookHandler struct { func offersToPriceLevels(offers []xdr.OfferEntry, invert bool) []protocol.PriceLevel { result := []protocol.PriceLevel{} - amountForPrice := map[xdr.Price]xdr.Int64{} + amountForPrice := map[xdr.Price]*big.Int{} for _, offer := range offers { - amountForPrice[offer.Price] += offer.Amount + offerAmount := big.NewInt(int64(offer.Amount)) + if amount, ok := amountForPrice[offer.Price]; ok { + amount.Add(amount, offerAmount) + } else { + amountForPrice[offer.Price] = offerAmount + } } for _, offer := range offers { total, ok := amountForPrice[offer.Price] @@ -92,7 +98,7 @@ func offersToPriceLevels(offers []xdr.OfferEntry, invert bool) []protocol.PriceL D: int32(offerPrice.D), }, Price: offerPrice.String(), - Amount: amount.String(total), + Amount: amount.StringFromBigInt(total), }) } diff --git a/services/horizon/internal/actions/orderbook_test.go b/services/horizon/internal/actions/orderbook_test.go index b422f2999a..61b955b54a 100644 --- a/services/horizon/internal/actions/orderbook_test.go +++ b/services/horizon/internal/actions/orderbook_test.go @@ -1,11 +1,13 @@ package actions import ( + "math" "strconv" "testing" "github.com/stellar/go/exp/orderbook" protocol "github.com/stellar/go/protocols/horizon" + "github.com/stellar/go/xdr" ) type intObject int @@ -512,7 +514,7 @@ func TestOrderbookGetResource(t *testing.T) { t.Fatalf("unexpected error %v", err) } otherEurOffer := twoEurOffer - otherEurOffer.Amount = 10000 + otherEurOffer.Amount = xdr.Int64(math.MaxInt64) otherEurOffer.OfferId = 16 if err := fullGraph.AddOffer(otherEurOffer).Apply(); err != nil { t.Fatalf("unexpected error %v", err) @@ -541,7 +543,7 @@ func TestOrderbookGetResource(t *testing.T) { protocol.PriceLevel{ PriceR: protocol.Price{N: int32(twoEurOffer.Price.N), D: int32(twoEurOffer.Price.D)}, Price: "2.0000000", - Amount: "0.0010500", + Amount: "922337203685.4776307", }, protocol.PriceLevel{ PriceR: protocol.Price{N: int32(threeEurOffer.Price.N), D: int32(threeEurOffer.Price.D)}, @@ -567,7 +569,7 @@ func TestOrderbookGetResource(t *testing.T) { protocol.PriceLevel{ PriceR: protocol.Price{N: int32(twoEurOffer.Price.N), D: int32(twoEurOffer.Price.D)}, Price: "2.0000000", - Amount: "0.0010500", + Amount: "922337203685.4776307", }, } limitResponse.Bids = []protocol.PriceLevel{ From 73bd86d9e9c2703e883412928e8948dd4d392918 Mon Sep 17 00:00:00 2001 From: Tamir Sen Date: Wed, 25 Sep 2019 21:51:26 +0200 Subject: [PATCH 4/4] Use IntStringToAmount instead of StringFromBigInt --- amount/main.go | 7 ------- .../horizon/internal/actions/orderbook.go | 21 ++++++++++++++----- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/amount/main.go b/amount/main.go index acebc16257..b1b7bc1474 100644 --- a/amount/main.go +++ b/amount/main.go @@ -123,10 +123,3 @@ func StringFromInt64(v int64) string { r.Quo(r, bigOne) return r.FloatString(7) } - -// StringFromBigInt returns an "amount string" from the provided raw *big.Int value `v`. -func StringFromBigInt(v *big.Int) string { - r := new(big.Rat).SetInt(v) - r.Quo(r, bigOne) - return r.FloatString(7) -} diff --git a/services/horizon/internal/actions/orderbook.go b/services/horizon/internal/actions/orderbook.go index ee34301b9e..cbc4287555 100644 --- a/services/horizon/internal/actions/orderbook.go +++ b/services/horizon/internal/actions/orderbook.go @@ -68,7 +68,7 @@ type GetOrderbookHandler struct { OrderBookGraph *orderbook.OrderBookGraph } -func offersToPriceLevels(offers []xdr.OfferEntry, invert bool) []protocol.PriceLevel { +func offersToPriceLevels(offers []xdr.OfferEntry, invert bool) ([]protocol.PriceLevel, error) { result := []protocol.PriceLevel{} amountForPrice := map[xdr.Price]*big.Int{} @@ -92,17 +92,22 @@ func offersToPriceLevels(offers []xdr.OfferEntry, invert bool) []protocol.PriceL offerPrice.Invert() } + amountString, err := amount.IntStringToAmount(total.String()) + if err != nil { + return nil, err + } + result = append(result, protocol.PriceLevel{ PriceR: protocol.Price{ N: int32(offerPrice.N), D: int32(offerPrice.D), }, Price: offerPrice.String(), - Amount: amount.StringFromBigInt(total), + Amount: amountString, }) } - return result + return result, nil } func (handler GetOrderbookHandler) orderBookSummary( @@ -116,9 +121,15 @@ func (handler GetOrderbookHandler) orderBookSummary( return response, err } + var err error asks, bids := handler.OrderBookGraph.FindAsksAndBids(selling, buying, limit) - response.Asks = offersToPriceLevels(asks, false) - response.Bids = offersToPriceLevels(bids, true) + if response.Asks, err = offersToPriceLevels(asks, false); err != nil { + return response, err + } + + if response.Bids, err = offersToPriceLevels(bids, true); err != nil { + return response, err + } return response, nil }