-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client/{core,cmd/dexc}: load unfunded orders for recovery #967
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -237,16 +237,23 @@ func (dc *dexConnection) findOrder(oid order.OrderID) (tracker *trackedTrade, pr | |
func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err error) { | ||
tracker, _, _ := dc.findOrder(oid) | ||
if tracker == nil { | ||
return | ||
return // false, nil | ||
} | ||
return true, c.tryCancelTrade(dc, tracker) | ||
} | ||
|
||
// tryCancelTrade attempts to cancel the order. | ||
func (c *Core) tryCancelTrade(dc *dexConnection, tracker *trackedTrade) error { | ||
oid := tracker.ID() | ||
if lo, ok := tracker.Order.(*order.LimitOrder); !ok || lo.Force != order.StandingTiF { | ||
return fmt.Errorf("cannot cancel %s order %s that is not a standing limit order", tracker.Type(), oid) | ||
} | ||
found = true | ||
|
||
tracker.mtx.Lock() | ||
defer tracker.mtx.Unlock() | ||
|
||
if lo, ok := tracker.Order.(*order.LimitOrder); !ok || lo.Force != order.StandingTiF { | ||
err = fmt.Errorf("cannot cancel %s order %s that is not a standing limit order", tracker.Type(), oid) | ||
return | ||
if status := tracker.metaData.Status; status != order.OrderStatusEpoch && status != order.OrderStatusBooked { | ||
return fmt.Errorf("order %v not cancellable in status %v", oid, status) | ||
} | ||
|
||
if tracker.cancel != nil { | ||
|
@@ -255,8 +262,8 @@ func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err | |
tracker.deleteStaleCancelOrder() | ||
|
||
if tracker.cancel != nil { | ||
err = fmt.Errorf("order %s - only one cancel order can be submitted per order per epoch. still waiting on cancel order %s to match", oid, tracker.cancel.ID()) | ||
return | ||
return fmt.Errorf("order %s - only one cancel order can be submitted per order per epoch. "+ | ||
"still waiting on cancel order %s to match", oid, tracker.cancel.ID()) | ||
} | ||
} | ||
|
||
|
@@ -274,30 +281,28 @@ func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err | |
}, | ||
TargetOrderID: oid, | ||
} | ||
err = order.ValidateOrder(co, order.OrderStatusEpoch, 0) | ||
err := order.ValidateOrder(co, order.OrderStatusEpoch, 0) | ||
if err != nil { | ||
return | ||
return err | ||
} | ||
|
||
// Create and send the order message. Check the response before using it. | ||
route, msgOrder := messageOrder(co, nil) | ||
var result = new(msgjson.OrderResult) | ||
err = dc.signAndRequest(msgOrder, route, result, DefaultResponseTimeout) | ||
if err != nil { | ||
return | ||
return err | ||
} | ||
err = validateOrderResponse(dc, result, co, msgOrder) | ||
if err != nil { | ||
err = fmt.Errorf("Abandoning order. preimage: %x, server time: %d: %w", | ||
return fmt.Errorf("Abandoning order. preimage: %x, server time: %d: %w", | ||
preImg[:], result.ServerTime, err) | ||
return | ||
} | ||
|
||
// Store the cancel order with the tracker. | ||
err = tracker.cancelTrade(co, preImg) | ||
if err != nil { | ||
err = fmt.Errorf("error storing cancel order info %s: %w", co.ID(), err) | ||
return | ||
return fmt.Errorf("error storing cancel order info %s: %w", co.ID(), err) | ||
} | ||
|
||
// Now that the trackedTrade is updated, sync with the preimage request. | ||
|
@@ -317,16 +322,15 @@ func (c *Core) tryCancel(dc *dexConnection, oid order.OrderID) (found bool, err | |
Order: co, | ||
}) | ||
if err != nil { | ||
err = fmt.Errorf("failed to store order in database: %w", err) | ||
return | ||
return fmt.Errorf("failed to store order in database: %w", err) | ||
} | ||
|
||
c.log.Infof("Cancel order %s targeting order %s at %s has been placed", | ||
co.ID(), oid, dc.acct.host) | ||
|
||
c.notify(newOrderNote(SubjectCancellingOrder, "A cancel order has been submitted for order "+tracker.token(), db.Poke, tracker.coreOrderInternal())) | ||
|
||
return | ||
return nil | ||
} | ||
|
||
// Synchronize with the preimage request, in case that request came before | ||
|
@@ -386,7 +390,9 @@ type serverMatches struct { | |
cancel *msgjson.Match | ||
} | ||
|
||
// parseMatches sorts the list of matches and associates them with a trade. | ||
// parseMatches sorts the list of matches and associates them with a trade. This | ||
// may be called from handleMatchRoute on receipt of a new 'match' request, or | ||
// by authDEX with the list of active matches returned by the 'connect' request. | ||
func (dc *dexConnection) parseMatches(msgMatches []*msgjson.Match, checkSigs bool) (map[order.OrderID]*serverMatches, []msgjson.Acknowledgement, error) { | ||
var acks []msgjson.Acknowledgement | ||
matches := make(map[order.OrderID]*serverMatches) | ||
|
@@ -3154,6 +3160,31 @@ func (c *Core) authDEX(dc *dexConnection) error { | |
c.resolveMatchConflicts(dc, matchConflicts) | ||
} | ||
|
||
// List and cancel standing limit orders that are in epoch or booked status, | ||
// but without funding coins for new matches. This should be done after the | ||
// order status resolution done above. | ||
var brokenTrades []*trackedTrade | ||
dc.tradeMtx.RLock() | ||
for _, trade := range dc.trades { | ||
if lo, ok := trade.Order.(*order.LimitOrder); !ok || lo.Force != order.StandingTiF { | ||
continue // only standing limit orders need to be canceled | ||
} | ||
trade.mtx.RLock() | ||
status := trade.metaData.Status | ||
if (status == order.OrderStatusEpoch || status == order.OrderStatusBooked) && | ||
!trade.hasFundingCoins() { | ||
brokenTrades = append(brokenTrades, trade) | ||
} | ||
trade.mtx.RUnlock() | ||
} | ||
dc.tradeMtx.RUnlock() | ||
for _, trade := range brokenTrades { | ||
c.log.Warnf("Canceling unfunded standing limit order %v", trade.ID()) | ||
if err = c.tryCancelTrade(dc, trade); err != nil { | ||
c.log.Warnf("Unable to cancel unfunded trade %v: %v", trade.ID(), err) | ||
} | ||
chappjc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
if len(updatedAssets) > 0 { | ||
c.updateBalances(updatedAssets) | ||
} | ||
|
@@ -3520,11 +3551,13 @@ func (c *Core) loadDBTrades(dc *dexConnection, crypter encrypt.Crypter, failed m | |
if err != nil { | ||
baseFailed = true | ||
failed[base] = struct{}{} | ||
c.log.Errorf("Connecting to wallet %s failed: %v", unbip(base), err) | ||
} else if !baseWallet.unlocked() { | ||
err = baseWallet.Unlock(crypter) | ||
if err != nil { | ||
baseFailed = true | ||
failed[base] = struct{}{} | ||
c.log.Errorf("Unlock wallet %s failed: %v", unbip(base), err) | ||
} | ||
} | ||
} | ||
|
@@ -3533,11 +3566,13 @@ func (c *Core) loadDBTrades(dc *dexConnection, crypter encrypt.Crypter, failed m | |
if err != nil { | ||
quoteFailed = true | ||
failed[quote] = struct{}{} | ||
c.log.Errorf("Connecting to wallet %s failed: %v", unbip(quote), err) | ||
} else if !quoteWallet.unlocked() { | ||
err = quoteWallet.Unlock(crypter) | ||
if err != nil { | ||
quoteFailed = true | ||
failed[quote] = struct{}{} | ||
c.log.Errorf("Unlock wallet %s failed: %v", unbip(quote), err) | ||
} | ||
} | ||
} | ||
|
@@ -3563,6 +3598,27 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa | |
detail := fmt.Sprintf(s, a...) | ||
c.notify(newOrderNote(subject, detail, db.ErrorLevel, tracker.coreOrder())) | ||
} | ||
|
||
// markUnfunded is used to allow an unfunded order to enter the trades map | ||
// so that status resolution and match negotiation for unaffected matches | ||
// may continue. By not self-revoking, the user may have the opportunity to | ||
// resolve any wallet issues that may have lead to a failure to find the | ||
// funding coins. Otherwise the server will (or already did) revoke some or | ||
// all of the matches and the order itself. | ||
markUnfunded := func(trade *trackedTrade, matches []*matchTracker) { | ||
// Block negotiating new matches. | ||
trade.changeLocked = false | ||
trade.coinsLocked = false | ||
// Block swap txn attempts on matches needing funds. | ||
for _, match := range matches { | ||
match.swapErr = errors.New("no funding coins for swap") | ||
} | ||
// Will not be retired until revoke or cancel of the order and all | ||
// matches, which may happen on status resolution after authenticating | ||
// with the DEX server, or from a revoke_match/revoke_order notification | ||
// after timeout. However, the order should be unconditionally canceled. | ||
} | ||
|
||
relocks := make(assetMap) | ||
dc.tradeMtx.Lock() | ||
defer dc.tradeMtx.Unlock() | ||
|
@@ -3587,28 +3643,30 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa | |
// If matches haven't redeemed, but the counter-swap has been received, | ||
// reload the audit info. | ||
isActive := tracker.metaData.Status == order.OrderStatusBooked || tracker.metaData.Status == order.OrderStatusEpoch | ||
var needsCoins bool | ||
var matchesNeedingCoins []*matchTracker | ||
for _, match := range tracker.matches { | ||
dbMatch, metaData := match.Match, match.MetaData | ||
var needsAuditInfo bool | ||
var counterSwap []byte | ||
if dbMatch.Side == order.Maker { | ||
if dbMatch.Status < order.MakerSwapCast { | ||
needsCoins = true | ||
matchesNeedingCoins = append(matchesNeedingCoins, match) | ||
} | ||
if dbMatch.Status == order.TakerSwapCast { | ||
needsAuditInfo = true // maker needs AuditInfo for takers contract | ||
counterSwap = metaData.Proof.TakerSwap | ||
} | ||
} else { // Taker | ||
if dbMatch.Status < order.TakerSwapCast { | ||
needsCoins = true | ||
matchesNeedingCoins = append(matchesNeedingCoins, match) | ||
} | ||
if dbMatch.Status < order.MatchComplete && dbMatch.Status >= order.MakerSwapCast { | ||
needsAuditInfo = true // taker needs AuditInfo for maker's contract | ||
counterSwap = metaData.Proof.MakerSwap | ||
} | ||
} | ||
c.log.Tracef("Trade %v match %v needs coins = %v, needs audit info = %v", | ||
tracker.ID(), match.id, len(matchesNeedingCoins) > 0, needsAuditInfo) | ||
if needsAuditInfo { | ||
// Check for unresolvable states. | ||
if len(counterSwap) == 0 { | ||
|
@@ -3662,39 +3720,51 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa | |
} | ||
} | ||
|
||
// Active orders and orders with matches with unsent swaps need the funding | ||
// coin(s). | ||
// Active orders and orders with matches with unsent swaps need funding | ||
// coin(s). If they are not found, block new matches and swap attempts. | ||
needsCoins := len(matchesNeedingCoins) > 0 | ||
if isActive || needsCoins { | ||
coinIDs := trade.Coins | ||
if len(tracker.metaData.ChangeCoin) != 0 { | ||
coinIDs = []order.CoinID{tracker.metaData.ChangeCoin} | ||
} | ||
tracker.coins = map[string]asset.Coin{} // should already be | ||
if len(coinIDs) == 0 { | ||
notifyErr(SubjectNoFundingCoins, "Order %s has no %s funding coins", tracker.token(), unbip(wallets.fromAsset.ID)) | ||
continue | ||
} | ||
byteIDs := make([]dex.Bytes, 0, len(coinIDs)) | ||
for _, cid := range coinIDs { | ||
byteIDs = append(byteIDs, []byte(cid)) | ||
} | ||
if len(byteIDs) == 0 { | ||
notifyErr(SubjectOrderCoinError, "No coins for loaded order %s %s: %v", unbip(wallets.fromAsset.ID), tracker.token(), err) | ||
continue | ||
} | ||
coins, err := wallets.fromWallet.FundingCoins(byteIDs) | ||
if err != nil { | ||
notifyErr(SubjectOrderCoinError, "Source coins retrieval error for %s %s: %v", unbip(wallets.fromAsset.ID), tracker.token(), err) | ||
continue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removal of these |
||
notifyErr(SubjectOrderCoinError, "No funding coins recorded for active order %s", tracker.token()) | ||
markUnfunded(tracker, matchesNeedingCoins) // bug - no user resolution | ||
} else { | ||
byteIDs := make([]dex.Bytes, 0, len(coinIDs)) | ||
for _, cid := range coinIDs { | ||
byteIDs = append(byteIDs, []byte(cid)) | ||
} | ||
coins, err := wallets.fromWallet.FundingCoins(byteIDs) | ||
if err != nil || len(coins) == 0 { | ||
notifyErr(SubjectOrderCoinError, "Source coins retrieval error for order %s (%s): %v", | ||
tracker.token(), unbip(wallets.fromAsset.ID), err) | ||
// Block matches needing funding coins. | ||
markUnfunded(tracker, matchesNeedingCoins) | ||
// Note: tracker is still added to trades map for (1) status | ||
// resolution, (2) continued settlement of matches that no | ||
// longer require funding coins, and (3) cancellation in | ||
// authDEX if the order is booked. | ||
c.log.Warnf("Check the status of your %s wallet and the coins logged above! "+ | ||
"Resolve the wallet issue if possible and restart the DEX client.", | ||
strings.ToUpper(unbip(wallets.fromAsset.ID))) | ||
c.log.Warnf("Unfunded order %v will be canceled on connect, but %d active matches need funding coins!", | ||
tracker.ID(), len(matchesNeedingCoins)) | ||
// If the funding coins are spent or inaccessible, the user | ||
// can only wait for match revocation. | ||
} else { | ||
// NOTE: change and changeLocked are not set even if the | ||
// funding coins were loaded from the DB's ChangeCoin. | ||
tracker.coinsLocked = true | ||
tracker.coins = mapifyCoins(coins) | ||
} | ||
} | ||
// NOTE: change and changeLocked are not set even if the funding | ||
// coins were loaded from the DB's ChangeCoin. | ||
tracker.coinsLocked = true | ||
tracker.coins = mapifyCoins(coins) | ||
} | ||
|
||
// Active orders and orders with matches with unsent swaps need the funding | ||
// coin(s). | ||
// Orders with sent but unspent swaps need to recompute contract-locked amts. | ||
// Balances should be updated for any orders with locked wallet coins, | ||
// or orders with funds locked in contracts. | ||
if isActive || needsCoins || tracker.unspentContractAmounts() > 0 { | ||
relocks.count(wallets.fromAsset.ID) | ||
} | ||
|
@@ -3735,7 +3805,7 @@ func generateDEXMaps(host string, cfg *msgjson.ConfigResult) (map[uint32]*dex.As | |
} | ||
|
||
// runMatches runs the sorted matches returned from parseMatches. | ||
func (c *Core) runMatches(dc *dexConnection, tradeMatches map[order.OrderID]*serverMatches) (assetMap, error) { | ||
func (c *Core) runMatches(tradeMatches map[order.OrderID]*serverMatches) (assetMap, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
runMatch := func(sm *serverMatches) (assetMap, error) { | ||
updatedAssets := make(assetMap) | ||
tracker := sm.tracker | ||
|
@@ -4394,21 +4464,36 @@ func handleMatchRoute(c *Core, dc *dexConnection, msg *msgjson.Message) error { | |
// requirement, which requires changes to the server's handling. | ||
return err | ||
} | ||
|
||
// Warn about new matches for unfunded orders. We still must ack all the | ||
// matches in the 'match' request for the server to accept it, although the | ||
// server doesn't require match acks. See (*Swapper).processMatchAcks. | ||
for oid, srvMatch := range matches { | ||
if !srvMatch.tracker.hasFundingCoins() { | ||
c.log.Warnf("Received new match for unfunded order %v!", oid) | ||
// In runMatches>tracker.negotiate we generate the matchTracker and | ||
// set swapErr after updating order status and filled amount, and | ||
// storing the match to the DB. It may still be possible for the | ||
// user to recover if the issue is just that the wrong wallet is | ||
// connected by fixing wallet config and restarting. p.s. Hopefully | ||
// we are maker. | ||
} | ||
} | ||
|
||
resp, err := msgjson.NewResponse(msg.ID, acks, nil) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Send the match acknowledgments. | ||
// TODO: Consider a "QueueSend" or similar, but do not bail on the matches. | ||
err = dc.Send(resp) | ||
if err != nil { | ||
// Do not bail on the matches on error, just log it. | ||
c.log.Errorf("Send match response: %v", err) | ||
// dc.addPendingSend(resp) // e.g. | ||
} | ||
|
||
// Begin match negotiation. | ||
updatedAssets, err := c.runMatches(dc, matches) | ||
updatedAssets, err := c.runMatches(matches) | ||
if len(updatedAssets) > 0 { | ||
c.updateBalances(updatedAssets) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We removed this from the
release-0.1
backport (already merged), but it's still on this master PR. What do you say to leaving @buck54321?