Skip to content

Commit

Permalink
market,ws: avoid backups
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed Jul 21, 2023
1 parent aa61db3 commit 7c48e81
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 15 deletions.
9 changes: 7 additions & 2 deletions dex/ws/wslink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
// outBufferSize is the size of the WSLink's buffered channel for outgoing
// messages.
outBufferSize = 128
outBufferSize = 512
defaultReadLimit = 8192
writeWait = 5 * time.Second
// ErrPeerDisconnected will be returned if Send or Request is called on a
Expand Down Expand Up @@ -107,7 +107,8 @@ func (c *WSLink) Send(msg *msgjson.Message) error {

// SendRaw sends the passed bytes to the websocket peer. The actual writing of
// the message on the peer's link occurs asynchronously. As such, a nil error
// only indicates that the link is believed to be up.
// only indicates that the link is believed to be up, and the output buffer is
// not full.
func (c *WSLink) SendRaw(b []byte) error {
if c.Off() {
return ErrPeerDisconnected
Expand All @@ -134,6 +135,10 @@ func (c *WSLink) sendRaw(b []byte, writeErr chan<- error) error {
case c.outChan <- &sendData{b, writeErr}:
case <-c.stopped:
return ErrPeerDisconnected
default: // if we're backed up with queued outgoing messages, hang up on the peer
c.log.Errorf("Backed up with queued outgoing messages, hang up on the peer (%v)", c.addr)
c.stop()
return ErrPeerDisconnected
}

return nil
Expand Down
37 changes: 27 additions & 10 deletions server/market/bookrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ func (s *subscribers) add(conn comms.Link) {
s.conns[conn.ID()] = conn
}

func (s *subscribers) links() []comms.Link {
s.mtx.RLock()
defer s.mtx.RUnlock()
links := make([]comms.Link, 0, len(s.conns))
for _, link := range s.conns {
links = append(links, link)
}
return links
}

func (s *subscribers) remove(id uint64) bool {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down Expand Up @@ -212,6 +222,9 @@ func (book *msgBook) setEpoch(idx int64) {
}

func (book *msgBook) addRecentMatches(matches [][3]int64) {
if len(matches) == 0 {
return // don't realloc for no reason
}
book.mtx.Lock()
defer book.mtx.Unlock()

Expand Down Expand Up @@ -333,6 +346,9 @@ func (r *BookRouter) Run(ctx context.Context) {
wg.Add(1)
go func(b *msgBook) {
r.runBook(ctx, b)
if ctx.Err() == nil {
log.Warnf("router for book unexpectedly died: %v", b.name)
}
wg.Done()
}(b)
}
Expand Down Expand Up @@ -426,14 +442,16 @@ out:
stats := sigData.stats
spot = sigData.spot

matchesWithTimestamp := make([][3]int64, 0, len(sigData.matches))
for _, match := range sigData.matches {
matchesWithTimestamp = append(matchesWithTimestamp, [3]int64{
match[0],
match[1],
endStamp})
if len(sigData.matches) > 0 {
matchesWithTimestamp := make([][3]int64, 0, len(sigData.matches))
for _, match := range sigData.matches {
matchesWithTimestamp = append(matchesWithTimestamp, [3]int64{
match[0],
match[1],
endStamp})
}
book.addRecentMatches(matchesWithTimestamp)
}
book.addRecentMatches(matchesWithTimestamp)

note = &msgjson.EpochReportNote{
MarketID: book.name,
Expand Down Expand Up @@ -739,14 +757,13 @@ func (r *BookRouter) sendNote(route string, subs *subscribers, note interface{})
}

var deletes []uint64
subs.mtx.RLock()
for _, conn := range subs.conns {
for _, conn := range subs.links() {
err := conn.SendRaw(b)
if err != nil {
deletes = append(deletes, conn.ID())
}
}
subs.mtx.RUnlock()

if len(deletes) > 0 {
subs.mtx.Lock()
for _, id := range deletes {
Expand Down
17 changes: 14 additions & 3 deletions server/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ func (m *Market) Quote() uint32 {
// market stops, channels are closed (invalidated), and new channels should be
// requested if the market starts again.
func (m *Market) OrderFeed() <-chan *updateSignal {
bookUpdates := make(chan *updateSignal, 1)
bookUpdates := make(chan *updateSignal, 32)
m.orderFeedMtx.Lock()
m.orderFeeds = append(m.orderFeeds, bookUpdates)
m.orderFeedMtx.Unlock()
Expand Down Expand Up @@ -720,7 +720,17 @@ func (m *Market) FeedDone(feed <-chan *updateSignal) bool {
func (m *Market) sendToFeeds(sig *updateSignal) {
m.orderFeedMtx.RLock()
for _, s := range m.orderFeeds {
s <- sig
select {
case s <- sig:
default: // BookRouter choking?
// This is bad, for the clients. There's a missing book update, but
// don't stall all of Market because of the outgoing notifications.
// We could conceivable close the feed, causing the BookRouter to
// shut down, the app is not rigged to restart dead subsystems. We
// just have to not block the incoming order and matching pipeline.
// Clients can resubscribe their book feeds, but we can't die.
log.Warnf("A receiver for updates on market %v (a BookRouter) is backed up!", m.marketInfo)
}
}
m.orderFeedMtx.RUnlock()
}
Expand Down Expand Up @@ -2306,7 +2316,7 @@ func (m *Market) unbookedOrder(lo *order.LimitOrder) {
}

// Send revoke_order notification to order owner.
m.sendRevokeOrderNote(oid, user)
go m.sendRevokeOrderNote(oid, user)

// Send "unbook" notification to order book subscribers.
m.sendToFeeds(&updateSignal{
Expand Down Expand Up @@ -2470,6 +2480,7 @@ func (m *Market) processReadyEpoch(epoch *readyEpoch, notifyChan chan<- *updateS
})
if err != nil {
// fatal backend error, do not begin new swaps.
log.Errorf("InsertEpoch failed: %v", err)
return // TODO: notify clients
}

Expand Down

0 comments on commit 7c48e81

Please sign in to comment.