Skip to content

Commit

Permalink
always load and track unfunded trades
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed Feb 10, 2021
1 parent 462fba0 commit fd8fe98
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 64 deletions.
142 changes: 98 additions & 44 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,23 @@ func (dc *dexConnection) findOrder(oid order.OrderID) (tracker *trackedTrade, pr
func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err error) {
tracker, _, _ := dc.findOrder(oid)
if tracker == nil {
return
return // false, nil
}
return true, c.tryCancelTrade(dc, tracker)
}

// tryCancelTrade attempts to cancel the order.
func (c *Core) tryCancelTrade(dc *dexConnection, tracker *trackedTrade) error {
oid := tracker.ID()
if lo, ok := tracker.Order.(*order.LimitOrder); !ok || lo.Force != order.StandingTiF {
return fmt.Errorf("cannot cancel %s order %s that is not a standing limit order", tracker.Type(), oid)
}
found = true

tracker.mtx.Lock()
defer tracker.mtx.Unlock()

if lo, ok := tracker.Order.(*order.LimitOrder); !ok || lo.Force != order.StandingTiF {
err = fmt.Errorf("cannot cancel %s order %s that is not a standing limit order", tracker.Type(), oid)
return
if status := tracker.metaData.Status; status != order.OrderStatusEpoch && status != order.OrderStatusBooked {
return fmt.Errorf("order %v not cancellable in status %v", oid, status)
}

if tracker.cancel != nil {
Expand All @@ -255,8 +262,8 @@ func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err
tracker.deleteStaleCancelOrder()

if tracker.cancel != nil {
err = fmt.Errorf("order %s - only one cancel order can be submitted per order per epoch. still waiting on cancel order %s to match", oid, tracker.cancel.ID())
return
return fmt.Errorf("order %s - only one cancel order can be submitted per order per epoch. "+
"still waiting on cancel order %s to match", oid, tracker.cancel.ID())
}
}

Expand All @@ -274,30 +281,28 @@ func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err
},
TargetOrderID: oid,
}
err = order.ValidateOrder(co, order.OrderStatusEpoch, 0)
err := order.ValidateOrder(co, order.OrderStatusEpoch, 0)
if err != nil {
return
return err
}

// Create and send the order message. Check the response before using it.
route, msgOrder := messageOrder(co, nil)
var result = new(msgjson.OrderResult)
err = dc.signAndRequest(msgOrder, route, result, DefaultResponseTimeout)
if err != nil {
return
return err
}
err = validateOrderResponse(dc, result, co, msgOrder)
if err != nil {
err = fmt.Errorf("Abandoning order. preimage: %x, server time: %d: %w",
return fmt.Errorf("Abandoning order. preimage: %x, server time: %d: %w",
preImg[:], result.ServerTime, err)
return
}

// Store the cancel order with the tracker.
err = tracker.cancelTrade(co, preImg)
if err != nil {
err = fmt.Errorf("error storing cancel order info %s: %w", co.ID(), err)
return
return fmt.Errorf("error storing cancel order info %s: %w", co.ID(), err)
}

// Now that the trackedTrade is updated, sync with the preimage request.
Expand All @@ -317,16 +322,15 @@ func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err
Order: co,
})
if err != nil {
err = fmt.Errorf("failed to store order in database: %w", err)
return
return fmt.Errorf("failed to store order in database: %w", err)
}

c.log.Infof("Cancel order %s targeting order %s at %s has been placed",
co.ID(), oid, dc.acct.host)

c.notify(newOrderNote(SubjectCancellingOrder, "A cancel order has been submitted for order "+tracker.token(), db.Poke, tracker.coreOrderInternal()))

return
return nil
}

// Synchronize with the preimage request, in case that request came before
Expand Down Expand Up @@ -386,7 +390,9 @@ type serverMatches struct {
cancel *msgjson.Match
}

// parseMatches sorts the list of matches and associates them with a trade.
// parseMatches sorts the list of matches and associates them with a trade. This
// may be called from handleMatchRoute on receipt of a new 'match' request, or
// by authDEX with the list of active matches returned by the 'connect' request.
func (dc *dexConnection) parseMatches(msgMatches []*msgjson.Match, checkSigs bool) (map[order.OrderID]*serverMatches, []msgjson.Acknowledgement, error) {
var acks []msgjson.Acknowledgement
matches := make(map[order.OrderID]*serverMatches)
Expand Down Expand Up @@ -3124,6 +3130,31 @@ func (c *Core) authDEX(dc *dexConnection) error {
c.resolveMatchConflicts(dc, matchConflicts)
}

// List and cancel standing limit orders that are in epoch or booked status,
// but without funding coins for new matches. This should be done after the
// order status resolution done above.
var brokenTrades []*trackedTrade
dc.tradeMtx.RLock()
for _, trade := range dc.trades {
if lo, ok := trade.Order.(*order.LimitOrder); !ok || lo.Force != order.StandingTiF {
continue // only standing limit orders need to be canceled
}
trade.mtx.RLock()
status := trade.metaData.Status
if (status == order.OrderStatusEpoch || status == order.OrderStatusBooked) &&
!trade.hasFundingCoins() {
brokenTrades = append(brokenTrades, trade)
}
trade.mtx.RUnlock()
}
dc.tradeMtx.RUnlock()
for _, trade := range brokenTrades {
c.log.Warnf("Canceling unfunded standing limit order %v", trade.ID())
if err = c.tryCancelTrade(dc, trade); err != nil {
c.log.Warnf("Unable to cancel unfunded trade %v: %v", trade.ID(), err)
}
}

if len(updatedAssets) > 0 {
c.updateBalances(updatedAssets)
}
Expand Down Expand Up @@ -3484,25 +3515,29 @@ func (c *Core) loadDBTrades(dc *dexConnection, crypter encrypt.Crypter, failed m
if err != nil {
baseFailed = true
failed[base] = struct{}{}
c.log.Errorf("Connecting to wallet %v failed: %v", unbip(quote), err)
} else if !baseWallet.unlocked() {
err = baseWallet.Unlock(crypter)
if err != nil {
baseFailed = true
failed[base] = struct{}{}
}
c.log.Errorf("Unlock wallet %d failed: %v", unbip(quote), err)
}
}
if !baseFailed && !quoteFailed {
quoteWallet, err := c.connectedWallet(quote)
if err != nil {
quoteFailed = true
failed[quote] = struct{}{}
c.log.Errorf("Connecting to wallet %v failed: %v", unbip(quote), err)
} else if !quoteWallet.unlocked() {
err = quoteWallet.Unlock(crypter)
if err != nil {
quoteFailed = true
failed[quote] = struct{}{}
}
c.log.Errorf("Unlock wallet %d failed: %v", unbip(quote), err)
}
}
if baseFailed {
Expand All @@ -3528,19 +3563,24 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa
c.notify(newOrderNote(subject, detail, db.ErrorLevel, tracker.coreOrder()))
}

// When an active order fails to load funding coins, it must be revoked
// along with any matches that require funding coins. The trade should still
// enter the trades map so that any recovery of funds from other matches can
// take place before the order is retired.
revokeUnfunded := func(trade *trackedTrade, matches []*matchTracker) {
if trade.metaData.Status < order.OrderStatusExecuted {
trade.revoke()
}
// markUnfunded is used to allow an unfunded order to enter the trades map
// so that status resolution and match negotiation for unaffected matches
// may continue. By not self-revoking, the user may have the opportunity to
// resolve any wallet issues that may have lead to a failure to find the
// funding coins. Otherwise the server will (or already did) revoke some or
// all of the matches and the order itself.
markUnfunded := func(trade *trackedTrade, matches []*matchTracker) {
// Block negotiating new matches.
trade.changeLocked = false
trade.coinsLocked = false
// Block swaps txn attempts on matches needing funds.
for _, match := range matches {
if !match.MetaData.Proof.IsRevoked() {
_ = trade.revokeMatch(match.id, false) // it will be found
}
match.swapErr = errors.New("no funding coins for swap")
}
// Will not be retired until revoke of order and all matches, which may
// happen on status resolution after authenticating with the DEX server,
// or from a revoke_match/revoke_order notification after timeout.
// However, the order should be unconditionally canceled.
}

relocks := make(assetMap)
Expand Down Expand Up @@ -3657,7 +3697,7 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa
tracker.coins = map[string]asset.Coin{} // should already be
if len(coinIDs) == 0 {
notifyErr(SubjectOrderCoinError, "No funding coins recorded for active order %s", tracker.token())
revokeUnfunded(tracker, matchesNeedingCoins) // bug - no user resolution
markUnfunded(tracker, matchesNeedingCoins) // bug - no user resolution
} else {
byteIDs := make([]dex.Bytes, 0, len(coinIDs))
for _, cid := range coinIDs {
Expand All @@ -3667,16 +3707,15 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa
if err != nil || len(coins) == 0 {
notifyErr(SubjectOrderCoinError, "Source coins retrieval error for order %s (%s): %v",
tracker.token(), unbip(wallets.fromAsset.ID), err)
// Only revoke if requested so that the user can try again
// after investigating (wrong wallet connected?).
if c.cfg.RevokeUnfunded {
revokeUnfunded(tracker, matchesNeedingCoins)
} else {
c.log.Warnf("Check the status of your %s wallet and the coins logged above. "+
"Otherwise restart with --revokeunfunded to safely (and permanently) abandon this order.",
strings.ToUpper(unbip(wallets.fromAsset.ID)))
continue // omit this trade from the map
}
// Block matches needing funding coins.
markUnfunded(tracker, matchesNeedingCoins)
c.log.Warnf("Check the status of your %s wallet and the coins logged above! "+
"Resolve the wallet issue if possible and restart the DEX client.",
strings.ToUpper(unbip(wallets.fromAsset.ID)))
// Note: tracker is still added to trades map for (1) status
// resolution, (2) continued settlement of matches that no
// longer require funding coins, and (3) cancellation in
// authDEX if the order is booked.
} else {
// NOTE: change and changeLocked are not set even if the
// funding coins were loaded from the DB's ChangeCoin.
Expand Down Expand Up @@ -3728,7 +3767,7 @@ func generateDEXMaps(host string, cfg *msgjson.ConfigResult) (map[uint32]*dex.As
}

// runMatches runs the sorted matches returned from parseMatches.
func (c *Core) runMatches(dc *dexConnection, tradeMatches map[order.OrderID]*serverMatches) (assetMap, error) {
func (c *Core) runMatches(tradeMatches map[order.OrderID]*serverMatches) (assetMap, error) {
runMatch := func(sm *serverMatches) (assetMap, error) {
updatedAssets := make(assetMap)
tracker := sm.tracker
Expand Down Expand Up @@ -4387,21 +4426,36 @@ func handleMatchRoute(c *Core, dc *dexConnection, msg *msgjson.Message) error {
// requirement, which requires changes to the server's handling.
return err
}

// Warn about new matches for unfunded orders. We still must ack all the
// matches in the 'match' request for the server to accept it, although the
// server doesn't require match acks. See (*Swapper).processMatchAcks.
for oid, srvMatch := range matches {
if srvMatch.tracker.hasFundingCoins() {
c.log.Warnf("Received new match for unfunded order %v!", oid)
// In runMatches>tracker.negotiate we generate the matchTracker and
// set swapErr after updating order status and filled amount, and
// storing the match to the DB. It may still be possible for the
// user to recover if the issue is just that the wrong wallet is
// connected by fixing wallet config and restarting. p.s. Hopefully
// we are maker.
}
}

resp, err := msgjson.NewResponse(msg.ID, acks, nil)
if err != nil {
return err
}

// Send the match acknowledgments.
// TODO: Consider a "QueueSend" or similar, but do not bail on the matches.
err = dc.Send(resp)
if err != nil {
// Do not bail on the matches on error, just log it.
c.log.Errorf("Send match response: %v", err)
// dc.addPendingSend(resp) // e.g.
}

// Begin match negotiation.
updatedAssets, err := c.runMatches(dc, matches)
updatedAssets, err := c.runMatches(matches)
if len(updatedAssets) > 0 {
c.updateBalances(updatedAssets)
}
Expand Down
Loading

0 comments on commit fd8fe98

Please sign in to comment.