Skip to content

Commit

Permalink
excludeCancels option for db.MatchesForOrder and reloadMatchProofs
Browse files Browse the repository at this point in the history
Cancel matches can be the lion's share of matches in some DBs, and in
most cases it is not necessary to process these.
- In db upgrades, there is no reason to reencode the MatchProof.
  Although it's not clear there is a reason to do this for trade matches
  either since any match proof version decodes automatically to the
  current MatchProof.
- In (*Core).dbTrackers, cancel order matches are not used.
- (*Core).coreOrderFromMetaOrder might be able to exclude cancel order
  matches, but the consumers are exported Core methods, so no change.

This also include a prealloc optional arg to ExtractPushes and
DecodeBlob, which is mainly helpful for the MatchProof buckets
that have 22 pushes.
  • Loading branch information
chappjc committed Apr 28, 2021
1 parent 914d140 commit d977162
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 50 deletions.
6 changes: 4 additions & 2 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2548,7 +2548,8 @@ func (c *Core) Orders(filter *OrderFilter) ([]*Order, error) {
func (c *Core) coreOrderFromMetaOrder(mOrd *db.MetaOrder) (*Order, error) {
corder := coreOrderFromTrade(mOrd.Order, mOrd.MetaData)
oid := mOrd.Order.ID()
matches, err := c.db.MatchesForOrder(oid)
excludeCancels := false // maybe don't include cancel order matches?
matches, err := c.db.MatchesForOrder(oid, excludeCancels)
if err != nil {
return nil, fmt.Errorf("MatchesForOrder error loading matches for %s: %w", oid, err)
}
Expand Down Expand Up @@ -3901,6 +3902,7 @@ func (c *Core) dbTrackers(dc *dexConnection) (map[order.OrderID]*trackedTrade, e
}

trackers := make(map[order.OrderID]*trackedTrade, len(dbOrders))
excludeCancelMatches := true
for _, dbOrder := range dbOrders {
ord := dbOrder.Order
oid := ord.ID()
Expand All @@ -3923,7 +3925,7 @@ func (c *Core) dbTrackers(dc *dexConnection) (map[order.OrderID]*trackedTrade, e
trackers[dbOrder.Order.ID()] = tracker

// Get matches.
dbMatches, err := c.db.MatchesForOrder(oid)
dbMatches, err := c.db.MatchesForOrder(oid, excludeCancelMatches)
if err != nil {
return nil, fmt.Errorf("error loading matches for order %s: %w", oid, err)
}
Expand Down
2 changes: 1 addition & 1 deletion client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (tdb *TDB) ActiveMatches() ([]*db.MetaMatch, error) {
return nil, nil
}

func (tdb *TDB) MatchesForOrder(oid order.OrderID) ([]*db.MetaMatch, error) {
func (tdb *TDB) MatchesForOrder(oid order.OrderID, excludeCancels bool) ([]*db.MetaMatch, error) {
return tdb.matchesForOID, tdb.matchesForOIDErr
}

Expand Down
19 changes: 12 additions & 7 deletions client/db/bolt/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func (db *BoltDB) ActiveMatches() ([]*dexdb.MetaMatch, error) {
return db.filteredMatches(func(mBkt *bbolt.Bucket) bool {
status := mBkt.Get(statusKey)
return len(status) != 1 || status[0] != uint8(order.MatchComplete)
})
}, true) // cancel matches are immediately complete, never active, but the status filter will apply first anyway
}

// DEXOrdersWithActiveMatches retrieves order IDs for any order that has active
Expand Down Expand Up @@ -853,7 +853,7 @@ func (db *BoltDB) DEXOrdersWithActiveMatches(dex string) ([]order.OrderID, error
db.log.Errorf("empty match proof")
return nil
}
proof, errM := dexdb.DecodeMatchProof(proofB)
proof, _, errM := dexdb.DecodeMatchProof(proofB)
if errM != nil {
db.log.Errorf("error decoding proof: %v", errM)
return nil
Expand Down Expand Up @@ -912,18 +912,20 @@ func (db *BoltDB) DEXOrdersWithActiveMatches(dex string) ([]order.OrderID, error
}

// MatchesForOrder retrieves the matches for the specified order ID.
func (db *BoltDB) MatchesForOrder(oid order.OrderID) ([]*dexdb.MetaMatch, error) {
func (db *BoltDB) MatchesForOrder(oid order.OrderID, excludeCancels bool) ([]*dexdb.MetaMatch, error) {
oidB := oid[:]
return db.filteredMatches(func(mBkt *bbolt.Bucket) bool {
oid := mBkt.Get(orderIDKey)
return bytes.Equal(oid, oidB)
})
}, excludeCancels)
}

// filteredMatches gets all matches that pass the provided filter function. Each
// match's bucket is provided to the filter, and a boolean true return value
// indicates the match should be decoded and returned.
func (db *BoltDB) filteredMatches(filter func(*bbolt.Bucket) bool) ([]*dexdb.MetaMatch, error) {
// indicates the match should be decoded and returned. Matches with cancel
// orders may be excluded, a separate option so the filter function does not
// need to load and decode the matchKey value.
func (db *BoltDB) filteredMatches(filter func(*bbolt.Bucket) bool, excludeCancels bool) ([]*dexdb.MetaMatch, error) {
var matches []*dexdb.MetaMatch
return matches, db.matchesView(func(master *bbolt.Bucket) error {
return master.ForEach(func(k, _ []byte) error {
Expand All @@ -941,11 +943,14 @@ func (db *BoltDB) filteredMatches(filter func(*bbolt.Bucket) bool) ([]*dexdb.Met
if err != nil {
return fmt.Errorf("error decoding match %x: %w", k, err)
}
if excludeCancels && match.Address == "" {
return nil
}
proofB := getCopy(mBkt, proofKey)
if len(proofB) == 0 {
return fmt.Errorf("empty proof")
}
proof, err = dexdb.DecodeMatchProof(proofB)
proof, _, err = dexdb.DecodeMatchProof(proofB)
if err != nil {
return fmt.Errorf("error decoding proof: %w", err)
}
Expand Down
48 changes: 29 additions & 19 deletions client/db/bolt/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

dexdb "decred.org/dcrdex/client/db"
"decred.org/dcrdex/dex/encode"
"decred.org/dcrdex/dex/order"
"go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -84,28 +85,18 @@ func (db *BoltDB) upgradeDB() error {
return fmt.Errorf("failed to backup DB prior to upgrade: %w", err)
}

// Each upgrade its own tx.
// Each upgrade its own tx, otherwise bolt eats too much RAM.
for i, upgrade := range upgrades[version:] {
newVersion := version + uint32(i) + 1
db.log.Debugf("Upgrading to version %d...", newVersion)
err = db.Update(func(tx *bbolt.Tx) error {
return doUpgrade(tx, upgrade, version+uint32(i)+1)
return doUpgrade(tx, upgrade, newVersion)
})
if err != nil {
return err
}
}
return nil

// All upgrades in a single tx.
// return db.Update(func(tx *bbolt.Tx) error {
// // Execute all necessary upgrades in order.
// for i, upgrade := range upgrades[version:] {
// err := doUpgrade(tx, upgrade, version+uint32(i)+1)
// if err != nil {
// return err
// }
// }
// return nil
// })
}

// Get the currently stored DB version.
Expand Down Expand Up @@ -141,8 +132,8 @@ func v1Upgrade(dbtx *bbolt.Tx) error {
if bkt == nil {
return fmt.Errorf("appBucket not found")
}

return reloadMatchProofs(dbtx)
skipCancels := true // cancel matches don't get revoked, only cancel orders
return reloadMatchProofs(dbtx, skipCancels)
}

// v2Upgrade adds a MaxFeeRate field to the OrderMetaData. The upgrade sets the
Expand Down Expand Up @@ -187,7 +178,8 @@ func v3Upgrade(dbtx *bbolt.Tx) error {
// Upgrade the match proof. We just have to retrieve and re-store the
// buckets. The decoder will recognize the the old version and add the new
// field.
return reloadMatchProofs(dbtx)
skipCancels := true // cancel matches have no tx data
return reloadMatchProofs(dbtx, skipCancels)
}

func ensureVersion(tx *bbolt.Tx, ver uint32) error {
Expand All @@ -202,7 +194,10 @@ func ensureVersion(tx *bbolt.Tx, ver uint32) error {
return nil
}

func reloadMatchProofs(tx *bbolt.Tx) error {
// Note that reloadMatchProofs will rewrite the MatchProof with the current
// match proof encoding version. Thus, multiple upgrades in a row calling
// reloadMatchProofs may be no-ops. Matches with cancel orders may be skipped.
func reloadMatchProofs(tx *bbolt.Tx, skipCancels bool) error {
matches := tx.Bucket(matchesBucket)
return matches.ForEach(func(k, _ []byte) error {
mBkt := matches.Bucket(k)
Expand All @@ -213,10 +208,25 @@ func reloadMatchProofs(tx *bbolt.Tx) error {
if len(proofB) == 0 {
return fmt.Errorf("empty match proof")
}
proof, err := dexdb.DecodeMatchProof(proofB)
proof, ver, err := dexdb.DecodeMatchProof(proofB)
if err != nil {
return fmt.Errorf("error decoding proof: %w", err)
}
// No need to rewrite this if it was loaded from the current version.
if ver == dexdb.MatchProofVer {
return nil
}
// No Script, and MatchComplete status means this is a cancel match.
if skipCancels && len(proof.Script) == 0 {
statusB := mBkt.Get(statusKey)
if len(statusB) != 1 {
return fmt.Errorf("no match status")
}
if order.MatchStatus(statusB[0]) == order.MatchComplete {
return nil
}
}

err = mBkt.Put(proofKey, proof.Encode())
if err != nil {
return fmt.Errorf("error re-storing match proof: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion client/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type DB interface {
// state.
DEXOrdersWithActiveMatches(dex string) ([]order.OrderID, error)
// MatchesForOrder gets the matches for the order ID.
MatchesForOrder(oid order.OrderID) ([]*MetaMatch, error)
MatchesForOrder(oid order.OrderID, excludeCancels bool) ([]*MetaMatch, error)
// Update wallets adds a wallet to the database, or updates the wallet
// credentials if the wallet already exists. A wallet is specified by the
// pair (asset ID, account name).
Expand Down
5 changes: 4 additions & 1 deletion client/db/test/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ func TestMatchProof(t *testing.T) {
nTimes(spins, func(i int) {
proof := proofs[i]
proofB := proof.Encode()
reProof, err := db.DecodeMatchProof(proofB)
reProof, ver, err := db.DecodeMatchProof(proofB)
if err != nil {
t.Fatalf("match decode error: %v", err)
}
MustCompareMatchProof(t, proof, reProof)
if ver != db.MatchProofVer {
t.Errorf("wanted match proof ver %d, got %d", db.MatchProofVer, ver)
}
})
t.Logf("encoded, decoded, and compared %d MatchProof in %d ms", spins, time.Since(tStart)/time.Millisecond)
}
Expand Down
32 changes: 21 additions & 11 deletions client/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ type MatchProof struct {
SelfRevoked bool
}

// MatchProofVer is the current serialization version of a MatchProof.
const (
MatchProofVer = 2
matchProofPushes = 22
)

// Encode encodes the MatchProof to a versioned blob.
func (p *MatchProof) Encode() []byte {
auth := p.Auth
Expand All @@ -266,7 +272,7 @@ func (p *MatchProof) Encode() []byte {
selfRevoked = encode.ByteTrue
}

return dbBytes{2}.
return dbBytes{MatchProofVer}.
AddData(p.Script).
AddData(p.CounterContract).
AddData(p.SecretHash).
Expand All @@ -292,20 +298,23 @@ func (p *MatchProof) Encode() []byte {
}

// DecodeMatchProof decodes the versioned blob to a *MatchProof.
func DecodeMatchProof(b []byte) (*MatchProof, error) {
ver, pushes, err := encode.DecodeBlob(b)
func DecodeMatchProof(b []byte) (*MatchProof, uint8, error) {
ver, pushes, err := encode.DecodeBlob(b, matchProofPushes)
if err != nil {
return nil, err
return nil, 0, err
}
switch ver {
case 2:
return decodeMatchProof_v2(pushes)
case 2: // MatchProofVer
proof, err := decodeMatchProof_v2(pushes)
return proof, ver, err
case 1:
return decodeMatchProof_v1(pushes)
proof, err := decodeMatchProof_v1(pushes)
return proof, ver, err
case 0:
return decodeMatchProof_v0(pushes)
proof, err := decodeMatchProof_v0(pushes)
return proof, ver, err
}
return nil, fmt.Errorf("unknown MatchProof version %d", ver)
return nil, ver, fmt.Errorf("unknown MatchProof version %d", ver)
}

func decodeMatchProof_v0(pushes [][]byte) (*MatchProof, error) {
Expand All @@ -319,8 +328,9 @@ func decodeMatchProof_v1(pushes [][]byte) (*MatchProof, error) {
}

func decodeMatchProof_v2(pushes [][]byte) (*MatchProof, error) {
if len(pushes) != 22 {
return nil, fmt.Errorf("DecodeMatchProof: expected 21 pushes, got %d", len(pushes))
if len(pushes) != matchProofPushes {
return nil, fmt.Errorf("DecodeMatchProof: expected %d pushes, got %d",
matchProofPushes, len(pushes))
}
return &MatchProof{
Script: pushes[0],
Expand Down
12 changes: 8 additions & 4 deletions dex/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,12 @@ func DecodeUTime(b []byte) time.Time {

// ExtractPushes parses the linearly-encoded 2D byte slice into a slice of
// slices. Empty pushes are nil slices.
func ExtractPushes(b []byte) ([][]byte, error) {
pushes := make([][]byte, 0)
func ExtractPushes(b []byte, preAlloc ...int) ([][]byte, error) {
allocPushes := 2
if len(preAlloc) > 0 {
allocPushes = preAlloc[0]
}
pushes := make([][]byte, 0, allocPushes)
for {
if len(b) == 0 {
break
Expand Down Expand Up @@ -139,13 +143,13 @@ func ExtractPushes(b []byte) ([][]byte, error) {

// DecodeBlob decodes a versioned blob into its version and the pushes extracted
// from its data. Empty pushes will be nil.
func DecodeBlob(b []byte) (byte, [][]byte, error) {
func DecodeBlob(b []byte, preAlloc ...int) (byte, [][]byte, error) {
if len(b) == 0 {
return 0, nil, fmt.Errorf("zero length blob not allowed")
}
ver := b[0]
b = b[1:]
pushes, err := ExtractPushes(b)
pushes, err := ExtractPushes(b, preAlloc...)
return ver, pushes, err
}

Expand Down
8 changes: 4 additions & 4 deletions dex/order/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func EncodePrefix(p *Prefix) []byte {

// DecodePrefix decodes the versioned blob to a *Prefix.
func DecodePrefix(b []byte) (prefix *Prefix, err error) {
ver, pushes, err := encode.DecodeBlob(b)
ver, pushes, err := encode.DecodeBlob(b, 7)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func EncodeTrade(ord *Trade) []byte {
// DecodeTrade decodes the versioned-blob market order, but does not populate
// the embedded Prefix.
func DecodeTrade(b []byte) (trade *Trade, err error) {
ver, pushes, err := encode.DecodeBlob(b)
ver, pushes, err := encode.DecodeBlob(b, 5)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func EncodeMatch(match *UserMatch) []byte {

// DecodeMatch decodes the versioned blob into a UserMatch.
func DecodeMatch(b []byte) (match *UserMatch, err error) {
ver, pushes, err := encode.DecodeBlob(b)
ver, pushes, err := encode.DecodeBlob(b, 8)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func EncodeOrder(ord Order) []byte {
// DecodeOrder decodes the byte-encoded order. DecodeOrder accepts any type of
// order.
func DecodeOrder(b []byte) (ord Order, err error) {
ver, pushes, err := encode.DecodeBlob(b)
ver, pushes, err := encode.DecodeBlob(b, 4) // pushes actually depends on order type
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d977162

Please sign in to comment.