diff --git a/dex/msgjson/types.go b/dex/msgjson/types.go index e1ffba3d50..30ad1e0aae 100644 --- a/dex/msgjson/types.go +++ b/dex/msgjson/types.go @@ -187,6 +187,12 @@ const ( // FeeRateRoute is the client-originating request asking for the most // recently recorded transaction fee estimate for an asset. FeeRateRoute = "fee_rate" + // PriceFeedRoute is the client-originating request subscribing to the + // market overview feed. + PriceFeedRoute = "price_feed" + // PriceUpdateRoute is a dex-originating notification updating the current + // spot price for a market. + PriceUpdateRoute = "price_update" // CandlesRoute is the HTTP request to get the set of candlesticks // representing market activity history. CandlesRoute = "candles" diff --git a/server/market/bookrouter.go b/server/market/bookrouter.go index fd4f252fa9..a367ad7188 100644 --- a/server/market/bookrouter.go +++ b/server/market/bookrouter.go @@ -107,6 +107,7 @@ type sigDataEpochReport struct { epochIdx int64 epochDur int64 stats *matcher.MatchCycleStats + spot *msgjson.Spot baseFeeRate uint64 quoteFeeRate uint64 } @@ -271,6 +272,10 @@ func (book *msgBook) addBulkOrders(epoch int64, orderSets ...[]*order.LimitOrder type BookRouter struct { books map[string]*msgBook feeSource FeeSource + + priceFeeders *subscribers + spotsMtx sync.RWMutex + spots map[string]*msgjson.Spot } // NewBookRouter is a constructor for a BookRouter. Routes are registered with @@ -281,6 +286,10 @@ func NewBookRouter(sources map[string]BookSource, feeSource FeeSource) *BookRout router := &BookRouter{ books: make(map[string]*msgBook), feeSource: feeSource, + priceFeeders: &subscribers{ + conns: make(map[uint64]comms.Link), + }, + spots: make(map[string]*msgjson.Spot), } for mkt, src := range sources { subs := &subscribers{ @@ -299,6 +308,7 @@ func NewBookRouter(sources map[string]BookSource, feeSource FeeSource) *BookRout comms.Route(msgjson.OrderBookRoute, router.handleOrderBook) comms.Route(msgjson.UnsubOrderBookRoute, router.handleUnsubOrderBook) comms.Route(msgjson.FeeRateRoute, router.handleFeeRate) + comms.Route(msgjson.PriceFeedRoute, router.handlePriceFeeder) return router } @@ -348,6 +358,7 @@ out: // Prepare the book/unbook/epoch note. var note interface{} var route string + var spot *msgjson.Spot switch sigData := u.data.(type) { case sigDataNewEpoch: // New epoch index should be sent here by the market following @@ -400,6 +411,7 @@ out: startStamp := sigData.epochIdx * sigData.epochDur endStamp := startStamp + sigData.epochDur stats := sigData.stats + spot = sigData.spot note = &msgjson.EpochReportNote{ MarketID: book.name, @@ -502,6 +514,10 @@ out: } r.sendNote(route, subs, note) + + if spot != nil { + r.sendNote(msgjson.PriceUpdateRoute, r.priceFeeders, spot) + } case <-ctx.Done(): break out } @@ -658,6 +674,25 @@ func (r *BookRouter) handleFeeRate(conn comms.Link, msg *msgjson.Message) *msgjs return nil } +func (r *BookRouter) handlePriceFeeder(conn comms.Link, msg *msgjson.Message) *msgjson.Error { + r.spotsMtx.RLock() + msg, err := msgjson.NewResponse(msg.ID, r.spots, nil) + r.spotsMtx.RUnlock() + if err != nil { + return &msgjson.Error{ + Code: msgjson.RPCInternal, + Message: "encoding error", + } + } + r.priceFeeders.add(conn) + err = conn.Send(msg) + if err != nil { + log.Debugf("error sending price_feed response: %v", err) + } + + return nil +} + // sendNote sends a notification to the specified subscribers. func (r *BookRouter) sendNote(route string, subs *subscribers, note interface{}) { msg, err := msgjson.NewNotification(route, note) diff --git a/server/market/market.go b/server/market/market.go index 863b654ff1..cbfae675bd 100644 --- a/server/market/market.go +++ b/server/market/market.go @@ -2298,12 +2298,19 @@ func (m *Market) processReadyEpoch(epoch *readyEpoch, notifyChan chan<- *updateS } } + // Update the API data collector. + spot, err := m.dataCollector.ReportEpoch(m.Base(), m.Quote(), uint64(epoch.Epoch), stats) + if err != nil { + log.Errorf("Error updating API data collector: %v", err) + } + // Send "epoch_report" notifications. notifyChan <- &updateSignal{ action: epochReportAction, data: sigDataEpochReport{ epochIdx: epoch.Epoch, epochDur: epoch.Duration, + spot: spot, stats: stats, baseFeeRate: feeRateBase, quoteFeeRate: feeRateQuote, @@ -2316,12 +2323,6 @@ func (m *Market) processReadyEpoch(epoch *readyEpoch, notifyChan chan<- *updateS epoch.Epoch, epoch.Duration) m.swapper.Negotiate(matches) } - - // Update the API data collector. - _, err = m.dataCollector.ReportEpoch(m.Base(), m.Quote(), uint64(epoch.Epoch), stats) - if err != nil { - log.Errorf("Error updating API data collector: %v", err) - } } // validateOrder uses db.ValidateOrder to ensure that the provided order is diff --git a/server/market/market_test.go b/server/market/market_test.go index 8e467b19f8..b773755e95 100644 --- a/server/market/market_test.go +++ b/server/market/market_test.go @@ -1309,7 +1309,7 @@ func TestMarket_enqueueEpoch(t *testing.T) { {bookAction, sigDataBookedOrder{lo, epochIdx}}, {unbookAction, sigDataUnbookedOrder{bestBuy, epochIdx}}, {unbookAction, sigDataUnbookedOrder{bestSell, epochIdx}}, - {epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, 10, 10}}, + {epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, nil, 10, 10}}, }, }, { @@ -1317,7 +1317,7 @@ func TestMarket_enqueueEpoch(t *testing.T) { eq2, []*updateSignal{ {matchProofAction, sigDataMatchProof{mp2}}, - {epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, 10, 10}}, + {epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, nil, 10, 10}}, }, }, { @@ -1325,7 +1325,7 @@ func TestMarket_enqueueEpoch(t *testing.T) { NewEpoch(epochIdx, epochDur), []*updateSignal{ {matchProofAction, sigDataMatchProof{mp0}}, - {epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, 10, 10}}, + {epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, nil, 10, 10}}, }, }, } diff --git a/server/market/routers_test.go b/server/market/routers_test.go index b1da7016da..4c80d17b46 100644 --- a/server/market/routers_test.go +++ b/server/market/routers_test.go @@ -1904,3 +1904,51 @@ func TestBadMessages(t *testing.T) { rpcErr = router.handleUnsubOrderBook(link, unsub) checkErr("bad payload", rpcErr, msgjson.NotSubscribedError) } + +func TestPriceFeed(t *testing.T) { + mktID := "abc_123" + rig.router.spots[mktID] = &msgjson.Spot{Vol24: 54321} + + link := tNewLink() + sub, _ := msgjson.NewRequest(1, msgjson.PriceFeedRoute, nil) + if err := rig.router.handlePriceFeeder(link, sub); err != nil { + t.Fatalf("handlePriceFeeder: %v", err) + } + + primerMsg := link.getSend() + var spots map[string]*msgjson.Spot + err := primerMsg.UnmarshalResult(&spots) + if err != nil { + t.Fatalf("error unmarshaling initial price_feed response: %v", err) + } + + if len(spots) != 1 { + t.Fatalf("expected 1 spot, got %d", len(spots)) + } + + spot, found := spots[mktID] + if !found { + t.Fatal("spot not communicated") + } + + if spot.Vol24 != 54321 { + t.Fatal("spot volume not communicated") + } + + rig.source1.feed <- &updateSignal{ + action: epochReportAction, + data: sigDataEpochReport{ + spot: &msgjson.Spot{Vol24: 12345}, + stats: &matcher.MatchCycleStats{}, + }, + } + + update := link.getSend() + spot = new(msgjson.Spot) + if err := update.Unmarshal(spot); err != nil { + t.Fatalf("error unmarhsaling spot: %v", err) + } + if spot.Vol24 != 12345 { + t.Fatal("update volume not communicated") + } +}