Skip to content

Commit

Permalink
implement price feed service on server
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 committed Oct 15, 2021
1 parent dad4a2e commit 84fbf95
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 9 deletions.
6 changes: 6 additions & 0 deletions dex/msgjson/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ const (
// FeeRateRoute is the client-originating request asking for the most
// recently recorded transaction fee estimate for an asset.
FeeRateRoute = "fee_rate"
// PriceFeedRoute is the client-originating request subscribing to the
// market overview feed.
PriceFeedRoute = "price_feed"
// PriceUpdateRoute is a dex-originating notification updating the current
// spot price for a market.
PriceUpdateRoute = "price_update"
// CandlesRoute is the HTTP request to get the set of candlesticks
// representing market activity history.
CandlesRoute = "candles"
Expand Down
35 changes: 35 additions & 0 deletions server/market/bookrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type sigDataEpochReport struct {
epochIdx int64
epochDur int64
stats *matcher.MatchCycleStats
spot *msgjson.Spot
baseFeeRate uint64
quoteFeeRate uint64
}
Expand Down Expand Up @@ -271,6 +272,10 @@ func (book *msgBook) addBulkOrders(epoch int64, orderSets ...[]*order.LimitOrder
type BookRouter struct {
books map[string]*msgBook
feeSource FeeSource

priceFeeders *subscribers
spotsMtx sync.RWMutex
spots map[string]*msgjson.Spot
}

// NewBookRouter is a constructor for a BookRouter. Routes are registered with
Expand All @@ -281,6 +286,10 @@ func NewBookRouter(sources map[string]BookSource, feeSource FeeSource) *BookRout
router := &BookRouter{
books: make(map[string]*msgBook),
feeSource: feeSource,
priceFeeders: &subscribers{
conns: make(map[uint64]comms.Link),
},
spots: make(map[string]*msgjson.Spot),
}
for mkt, src := range sources {
subs := &subscribers{
Expand All @@ -299,6 +308,7 @@ func NewBookRouter(sources map[string]BookSource, feeSource FeeSource) *BookRout
comms.Route(msgjson.OrderBookRoute, router.handleOrderBook)
comms.Route(msgjson.UnsubOrderBookRoute, router.handleUnsubOrderBook)
comms.Route(msgjson.FeeRateRoute, router.handleFeeRate)
comms.Route(msgjson.PriceFeedRoute, router.handlePriceFeeder)

return router
}
Expand Down Expand Up @@ -348,6 +358,7 @@ out:
// Prepare the book/unbook/epoch note.
var note interface{}
var route string
var spot *msgjson.Spot
switch sigData := u.data.(type) {
case sigDataNewEpoch:
// New epoch index should be sent here by the market following
Expand Down Expand Up @@ -400,6 +411,7 @@ out:
startStamp := sigData.epochIdx * sigData.epochDur
endStamp := startStamp + sigData.epochDur
stats := sigData.stats
spot = sigData.spot

note = &msgjson.EpochReportNote{
MarketID: book.name,
Expand Down Expand Up @@ -502,6 +514,10 @@ out:
}

r.sendNote(route, subs, note)

if spot != nil {
r.sendNote(msgjson.PriceUpdateRoute, r.priceFeeders, spot)
}
case <-ctx.Done():
break out
}
Expand Down Expand Up @@ -658,6 +674,25 @@ func (r *BookRouter) handleFeeRate(conn comms.Link, msg *msgjson.Message) *msgjs
return nil
}

func (r *BookRouter) handlePriceFeeder(conn comms.Link, msg *msgjson.Message) *msgjson.Error {
r.spotsMtx.RLock()
msg, err := msgjson.NewResponse(msg.ID, r.spots, nil)
r.spotsMtx.RUnlock()
if err != nil {
return &msgjson.Error{
Code: msgjson.RPCInternal,
Message: "encoding error",
}
}
r.priceFeeders.add(conn)
err = conn.Send(msg)
if err != nil {
log.Debugf("error sending price_feed response: %v", err)
}

return nil
}

// sendNote sends a notification to the specified subscribers.
func (r *BookRouter) sendNote(route string, subs *subscribers, note interface{}) {
msg, err := msgjson.NewNotification(route, note)
Expand Down
13 changes: 7 additions & 6 deletions server/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -2298,12 +2298,19 @@ func (m *Market) processReadyEpoch(epoch *readyEpoch, notifyChan chan<- *updateS
}
}

// Update the API data collector.
spot, err := m.dataCollector.ReportEpoch(m.Base(), m.Quote(), uint64(epoch.Epoch), stats)
if err != nil {
log.Errorf("Error updating API data collector: %v", err)
}

// Send "epoch_report" notifications.
notifyChan <- &updateSignal{
action: epochReportAction,
data: sigDataEpochReport{
epochIdx: epoch.Epoch,
epochDur: epoch.Duration,
spot: spot,
stats: stats,
baseFeeRate: feeRateBase,
quoteFeeRate: feeRateQuote,
Expand All @@ -2316,12 +2323,6 @@ func (m *Market) processReadyEpoch(epoch *readyEpoch, notifyChan chan<- *updateS
epoch.Epoch, epoch.Duration)
m.swapper.Negotiate(matches)
}

// Update the API data collector.
_, err = m.dataCollector.ReportEpoch(m.Base(), m.Quote(), uint64(epoch.Epoch), stats)
if err != nil {
log.Errorf("Error updating API data collector: %v", err)
}
}

// validateOrder uses db.ValidateOrder to ensure that the provided order is
Expand Down
6 changes: 3 additions & 3 deletions server/market/market_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,23 +1309,23 @@ func TestMarket_enqueueEpoch(t *testing.T) {
{bookAction, sigDataBookedOrder{lo, epochIdx}},
{unbookAction, sigDataUnbookedOrder{bestBuy, epochIdx}},
{unbookAction, sigDataUnbookedOrder{bestSell, epochIdx}},
{epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, 10, 10}},
{epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, nil, 10, 10}},
},
},
{
"ok no matches or book updates, one miss",
eq2,
[]*updateSignal{
{matchProofAction, sigDataMatchProof{mp2}},
{epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, 10, 10}},
{epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, nil, 10, 10}},
},
},
{
"ok empty queue",
NewEpoch(epochIdx, epochDur),
[]*updateSignal{
{matchProofAction, sigDataMatchProof{mp0}},
{epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, 10, 10}},
{epochReportAction, sigDataEpochReport{epochIdx, epochDur, nil, nil, 10, 10}},
},
},
}
Expand Down
48 changes: 48 additions & 0 deletions server/market/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1904,3 +1904,51 @@ func TestBadMessages(t *testing.T) {
rpcErr = router.handleUnsubOrderBook(link, unsub)
checkErr("bad payload", rpcErr, msgjson.NotSubscribedError)
}

func TestPriceFeed(t *testing.T) {
mktID := "abc_123"
rig.router.spots[mktID] = &msgjson.Spot{Vol24: 54321}

link := tNewLink()
sub, _ := msgjson.NewRequest(1, msgjson.PriceFeedRoute, nil)
if err := rig.router.handlePriceFeeder(link, sub); err != nil {
t.Fatalf("handlePriceFeeder: %v", err)
}

primerMsg := link.getSend()
var spots map[string]*msgjson.Spot
err := primerMsg.UnmarshalResult(&spots)
if err != nil {
t.Fatalf("error unmarshaling initial price_feed response: %v", err)
}

if len(spots) != 1 {
t.Fatalf("expected 1 spot, got %d", len(spots))
}

spot, found := spots[mktID]
if !found {
t.Fatal("spot not communicated")
}

if spot.Vol24 != 54321 {
t.Fatal("spot volume not communicated")
}

rig.source1.feed <- &updateSignal{
action: epochReportAction,
data: sigDataEpochReport{
spot: &msgjson.Spot{Vol24: 12345},
stats: &matcher.MatchCycleStats{},
},
}

update := link.getSend()
spot = new(msgjson.Spot)
if err := update.Unmarshal(spot); err != nil {
t.Fatalf("error unmarhsaling spot: %v", err)
}
if spot.Vol24 != 12345 {
t.Fatal("update volume not communicated")
}
}

0 comments on commit 84fbf95

Please sign in to comment.