Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swap,market: add TxMonitored #95

Merged
merged 7 commits into from
Jan 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions server/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/msgjson"
"decred.org/dcrdex/dex/order"
"decred.org/dcrdex/server/account"
"decred.org/dcrdex/server/book"
"decred.org/dcrdex/server/coinlock"
"decred.org/dcrdex/server/db"
Expand Down Expand Up @@ -42,6 +43,7 @@ type Swapper interface {
Negotiate(matchSets []*order.MatchSet)
LockCoins(asset uint32, coins map[order.OrderID][]order.CoinID)
LockOrdersCoins(orders []order.Order)
TxMonitored(user account.AccountID, asset uint32, txid string) bool
Copy link
Member Author

@chappjc chappjc Dec 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems prudent to specify the asset given that an asset may be structured in a way that makes cloning a txid from another asset possible. The same reasoning is used for including asset ID in the CoinLocker functions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I actually have a comment to this effect in the review I'm working on for #85.

}

// Market is the market manager. It should not be overly involved with details
Expand Down Expand Up @@ -118,6 +120,12 @@ func (m *Market) SubmitOrder(rec *orderRecord) error {
return <-m.SubmitOrderAsync(rec)
}

// TxMonitored checks if a user's transaction for a certain asset is being
// monitored by the Swapper.
func (m *Market) TxMonitored(user account.AccountID, asset uint32, txid string) bool {
return m.swapper.TxMonitored(user, asset, txid)
}

// SubmitOrderAsync submits a new order for inclusion into the current epoch.
// When submission is completed, an error value will be sent on the channel.
// This is the asynchronous version of SubmitOrder.
Expand Down
7 changes: 5 additions & 2 deletions server/market/orderrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type MarketTunnel interface {
// TxMonitored determines whether the transaction for the given user is
// involved in a DEX-monitored trade. Change outputs from DEX-monitored trades
// can be used in other orders without waiting for fundConf confirmations.
TxMonitored(user account.AccountID, txid string) bool // TODO specify asset?
TxMonitored(user account.AccountID, asset uint32, txid string) bool
}

// orderRecord contains the information necessary to respond to an order
Expand Down Expand Up @@ -521,7 +521,10 @@ func (r *OrderRouter) checkPrefixTrade(user account.AccountID, tunnel MarketTunn
return errSet(msgjson.FundingError,
fmt.Sprintf("coin confirmations error for %x: %v", coin.ID, err))
}
if confs < int64(assets.funding.FundConf) && !tunnel.TxMonitored(user, dexCoin.TxID()) {
// Valid coins have either confs >= FundConf, or come from a
// DEX-monitored transaction.
if confs < int64(assets.funding.FundConf) &&
!tunnel.TxMonitored(user, assets.funding.ID, dexCoin.TxID()) {
return errSet(msgjson.FundingError,
fmt.Sprintf("not enough confirmations for %x. require %d, have %d",
coin.ID, assets.funding.FundConf, confs))
Expand Down
2 changes: 1 addition & 1 deletion server/market/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (m *TMarketTunnel) CoinLocked(coinid order.CoinID, assetID uint32) bool {
return m.locked
}

func (m *TMarketTunnel) TxMonitored(user account.AccountID, txid string) bool {
func (m *TMarketTunnel) TxMonitored(user account.AccountID, asset uint32, txid string) bool {
return m.watched
}

Expand Down
105 changes: 91 additions & 14 deletions server/swap/swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func (s *Swapper) tryConfirmSwap(status *swapStatus) {
// the appropriate flags in the swapStatus structures.
func (s *Swapper) processBlock(block *blockNotification) {
completions := make([]*matchTracker, 0)
s.matchMtx.RLock()
defer s.matchMtx.RUnlock()
s.matchMtx.Lock()
defer s.matchMtx.Unlock()
for _, match := range s.matches {
// If it's neither of the match assets, nothing to do.
if match.makerStatus.swapAsset != block.assetID && match.takerStatus.swapAsset != block.assetID {
Expand Down Expand Up @@ -409,37 +409,113 @@ func (s *Swapper) processBlock(block *blockNotification) {
case order.MatchComplete:
// Once both redemption transactions have SwapConf confirmations, the
// order is complete.
var makerRedeemed, takerRedeemed bool
mStatus, tStatus := match.makerStatus, match.takerStatus
if !mStatus.redeemTime.IsZero() {
confs, err := mStatus.redemption.Confirmations()
makerRedeemed = err == nil && confs >= int64(s.coins[tStatus.swapAsset].SwapConf)
}
if makerRedeemed && !tStatus.redeemTime.IsZero() {
confs, err := tStatus.redemption.Confirmations()
takerRedeemed = err == nil && confs >= int64(s.coins[mStatus.swapAsset].SwapConf)
}
makerRedeemed, takerRedeemed := s.redeemStatus(match)
// TODO: Can coins be unlocked now regardless of redemption?
if makerRedeemed && takerRedeemed {
completions = append(completions, match)
}
}
}

for _, match := range completions {
s.unlockOrderCoins(match.Taker)
s.unlockOrderCoins(match.Maker)
// Remove the completed match. Note that checkInaction may also remove
// matches, so this entire function must lock even if there are no
// completions.
delete(s.matches, match.ID().String())
}
}

func (s *Swapper) redeemStatus(match *matchTracker) (makerRedeemComplete, takerRedeemComplete bool) {
makerRedeemComplete = s.makerRedeemStatus(match)
tStatus := match.takerStatus
// Taker is only complete if the maker is complete because
// order.MatchComplete follows order.MakerRedeemed.
if makerRedeemComplete && !tStatus.redeemTime.IsZero() {
confs, err := tStatus.redemption.Confirmations()
if err != nil {
log.Errorf("Confirmations failed for taker redemption %v: err",
tStatus.redemption.TxID(), err)
return
}
takerRedeemComplete = confs >= int64(s.coins[match.makerStatus.swapAsset].SwapConf)
}
return
}

func (s *Swapper) makerRedeemStatus(match *matchTracker) (makerRedeemComplete bool) {
mStatus, tStatus := match.makerStatus, match.takerStatus
if !mStatus.redeemTime.IsZero() {
confs, err := mStatus.redemption.Confirmations()
if err != nil {
log.Errorf("Confirmations failed for maker redemption %v: err",
mStatus.redemption.TxID(), err) // Severity?
return
}
makerRedeemComplete = confs >= int64(s.coins[tStatus.swapAsset].SwapConf)
}
return
}

// TxMonitored determines whether the transaction for the given user is involved
// in a DEX-monitored trade. Note that the swap contract tx is considered
// monitored until the swap is complete, regardless of confirms. This allows
// change outputs from a dex-monitored swap contract to be used to fund
// additional swaps prior to FundConf. e.g. OrderRouter may allow coins to fund
// orders where: (coins.confs >= FundConf) OR TxMonitored(coins.tx).
func (s *Swapper) TxMonitored(user account.AccountID, asset uint32, txid string) bool {
s.matchMtx.RLock()
defer s.matchMtx.RUnlock()

for _, match := range s.matches {
// The swap contract of either the maker or taker must correspond to
// specified asset to be of interest.
switch asset {
case match.makerStatus.swapAsset:
// Maker's swap transaction is the asset of interest.
if user == match.Maker.User() && match.makerStatus.swap.TxID() == txid {
// The swap contract tx is considered monitored until the swap
// is complete, regardless of confirms.
return true
}

// Taker's redemption transaction is the asset of interest.
_, takerRedeemDone := s.redeemStatus(match)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The throwaway here is not ideal. Every call to (asset.Coin).Confirmations potentially requires an RPC call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the next case, we can now do makerRedeemDone := s.makerRedeemStatus(match), but for taker redeem status, we need to also check maker redeem status.

if !takerRedeemDone && user == match.Taker.User() &&
match.takerStatus.redemption.TxID() == txid {
return true
}
case match.takerStatus.swapAsset:
// Taker's swap transaction is the asset of interest.
if user == match.Taker.User() && match.takerStatus.swap.TxID() == txid {
// The swap contract tx is considered monitored until the swap
// is complete, regardless of confirms.
return true
}

// Maker's redemption transaction is the asset of interest.
makerRedeemDone := s.makerRedeemStatus(match)
if !makerRedeemDone && user == match.Maker.User() &&
match.makerStatus.redemption.TxID() == txid {
return true
}
default:
continue
}
}

return false
}

// checkInaction scans the swapStatus structures relevant to the specified
// asset. If a client is found to have not acted when required, a match may be
// revoked and a penalty assigned to the user.
func (s *Swapper) checkInaction(assetID uint32) {
oldestAllowed := time.Now().Add(-s.bTimeout).UTC()
deletions := make([]string, 0)
s.matchMtx.RLock()
defer s.matchMtx.RUnlock()
s.matchMtx.Lock()
defer s.matchMtx.Unlock()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot going on in this locked section, also in processBlock, but to keep the read and subsequent possible writes (delete) atomic, I'm just using a single full lock.

Copy link
Member

@buck54321 buck54321 Jan 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be good. There are no RPC calls here anyway, so I expect this check to go quick. The only thing that might be remotely slow is the calls to Penalize, since there is a DB call there. revoke has only coin cache access and non-blocking requests. Is it worth running Penalize as a goroutine? Regardless, write lock is okay.

May want to poke here during load testing though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth running Penalize as a goroutine?

It may be, but it's not clear yet that it's worth the potential complications that come with that.

for _, match := range s.matches {
if match.makerStatus.swapAsset != assetID && match.takerStatus.swapAsset != assetID {
continue
Expand Down Expand Up @@ -505,6 +581,7 @@ func (s *Swapper) checkInaction(assetID uint32) {
// Nothing to do here right now.
}
}

for _, matchID := range deletions {
delete(s.matches, matchID)
}
Expand Down
138 changes: 137 additions & 1 deletion server/swap/swap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (coin *TCoin) AuditContract() (string, uint64, error) {

func (coin *TCoin) Auth(pubkeys, sigs [][]byte, msg []byte) error { return nil }
func (coin *TCoin) ID() []byte { return coin.id }
func (coin *TCoin) TxID() string { return "" }
func (coin *TCoin) TxID() string { return hex.EncodeToString(coin.id) }
func (coin *TCoin) Value() uint64 { return 0 }
func (coin *TCoin) SpendSize() uint32 { return 0 }

Expand Down Expand Up @@ -1612,3 +1612,139 @@ func TestCancel(t *testing.T) {
t.Fatalf("match ID mismatch. %s != %s", makerNote.MatchID, takerNote.MatchID)
}
}

func TestTxMonitored(t *testing.T) {
sendBlock := func(node *TAsset) {
node.bChan <- 1
tickMempool()
}

makerSell := true
set := tPerfectLimitLimit(uint64(1e8), uint64(1e8), makerSell)
matchInfo := set.matchInfos[0]
rig := tNewTestRig(matchInfo)
rig.swapper.Negotiate([]*order.MatchSet{set.matchSet})
ensureNilErr := makeEnsureNilErr(t)
maker, taker := matchInfo.maker, matchInfo.taker

var makerLockedAsset, takerLockedAsset uint32
if makerSell {
makerLockedAsset = matchInfo.match.Maker.Base() // maker sell locks base asset
takerLockedAsset = matchInfo.match.Taker.Quote() // taker buy locks quote asset
} else {
makerLockedAsset = matchInfo.match.Maker.Quote() // maker sell locks base asset
takerLockedAsset = matchInfo.match.Taker.Base() // taker buy locks quote asset
}

tracker := rig.getTracker()
if tracker.Status != order.NewlyMatched {
t.Fatalf("match not marked as NewlyMatched: %d", tracker.Status)
}

// Maker acks match and sends swap tx.
ensureNilErr(rig.ackMatch_maker(true))
ensureNilErr(rig.sendSwap_maker(true))
makerContractTx := rig.matchInfo.db.makerSwap.coin.TxID()
if !rig.swapper.TxMonitored(maker.acct, makerLockedAsset, makerContractTx) {
t.Errorf("maker contract %s (asset %d) was not monitored",
makerContractTx, makerLockedAsset)
}

if tracker.Status != order.MakerSwapCast {
t.Fatalf("match not marked as MakerSwapCast: %d", tracker.Status)
}
matchInfo.db.makerSwap.coin.setConfs(int64(rig.abc.SwapConf))
sendBlock(rig.abcNode)
sendBlock(rig.xyzNode)

// For the taker, there must be two acknowledgements before broadcasting the
// swap transaction, the match ack and the audit ack.
ensureNilErr(rig.ackMatch_taker(true))
ensureNilErr(rig.auditSwap_taker())
ensureNilErr(rig.ackAudit_taker(true))
ensureNilErr(rig.sendSwap_taker(true))

takerContractTx := rig.matchInfo.db.takerSwap.coin.TxID()
if !rig.swapper.TxMonitored(taker.acct, takerLockedAsset, takerContractTx) {
t.Errorf("taker contract %s (asset %d) was not monitored",
takerContractTx, takerLockedAsset)
}

if tracker.Status != order.TakerSwapCast {
t.Fatalf("match not marked as TakerSwapCast: %d", tracker.Status)
}
sendBlock(rig.abcNode)
sendBlock(rig.xyzNode)

ensureNilErr(rig.auditSwap_maker())
ensureNilErr(rig.ackAudit_maker(true))

// Set the number of confirmations on the contracts.
//tracker.makerStatus.swapTime
// tracker.makerStatus.swapConfirmed = time.Now()
// tracker.takerStatus.swapConfirmed = time.Now()
matchInfo.db.takerSwap.coin.setConfs(int64(rig.xyz.SwapConf))

// send a block through for either chain to trigger a swap check.
sendBlock(rig.abcNode)
sendBlock(rig.xyzNode)

// Now redeem

ensureNilErr(rig.redeem_maker(true))

makerRedeemTx := rig.matchInfo.db.makerRedeem.coin.TxID()
if !rig.swapper.TxMonitored(maker.acct, takerLockedAsset, makerRedeemTx) {
t.Errorf("maker redeem %s (asset %d) was not monitored",
makerRedeemTx, takerLockedAsset)
}

if tracker.Status != order.MakerRedeemed {
t.Fatalf("match not marked as MakerRedeemed: %d", tracker.Status)
}

ensureNilErr(rig.ackRedemption_taker(true))
ensureNilErr(rig.redeem_taker(true))

takerRedeemTx := rig.matchInfo.db.takerRedeem.coin.TxID()
if !rig.swapper.TxMonitored(taker.acct, makerLockedAsset, takerRedeemTx) {
t.Errorf("taker redeem %s (asset %d) was not monitored",
takerRedeemTx, makerLockedAsset)
}

if tracker.Status != order.MatchComplete {
t.Fatalf("match not marked as MatchComplete: %d", tracker.Status)
}

ensureNilErr(rig.ackRedemption_maker(true))

// Confirm both redeem txns up to SwapConf so they are no longer monitored.
matchInfo.db.makerRedeem.coin.setConfs(int64(rig.abc.SwapConf))
matchInfo.db.takerRedeem.coin.setConfs(int64(rig.xyz.SwapConf))

if rig.swapper.TxMonitored(taker.acct, makerLockedAsset, takerRedeemTx) {
t.Errorf("taker redeem %s (asset %d) was still monitored",
takerRedeemTx, makerLockedAsset)
}

if rig.swapper.TxMonitored(maker.acct, takerLockedAsset, makerRedeemTx) {
t.Errorf("maker redeem %s (asset %d) was still monitored",
makerRedeemTx, takerLockedAsset)
}

// The match should also be gone from matchTracker, so the contracts should
// no longer be monitored either.

sendBlock(rig.abcNode)

if rig.swapper.TxMonitored(maker.acct, makerLockedAsset, makerContractTx) {
t.Errorf("maker contract %s (asset %d) was still monitored",
makerContractTx, makerLockedAsset)
}

if rig.swapper.TxMonitored(taker.acct, takerLockedAsset, takerContractTx) {
t.Errorf("taker contract %s (asset %d) was still monitored",
takerContractTx, takerLockedAsset)
}

}