Skip to content

Commit

Permalink
client/db/bolt: omit copy of buffer used only in db txns
Browse files Browse the repository at this point in the history
try each upgrade in its own tx

excludeCancels option for db.MatchesForOrder and reloadMatchProofs

Cancel matches can be the lion's share of matches in some DBs, and in
most cases it is not necessary to process these.
- In db upgrades, there is no reason to reencode the MatchProof.
  Although it's not clear there is a reason to do this for trade matches
  either since any match proof version decodes automatically to the
  current MatchProof.
- In (*Core).dbTrackers, cancel order matches are not used.
- (*Core).coreOrderFromMetaOrder might be able to exclude cancel order
  matches, but the consumers are exported Core methods, so no change.

This also include a prealloc optional arg to ExtractPushes and
DecodeBlob, which is mainly helpful for the MatchProof buckets
that have 22 pushes.

do not set maxFeeRate for cancels v2 upgrade
  • Loading branch information
chappjc committed Apr 29, 2021
1 parent 736b005 commit bbd3bfd
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 56 deletions.
6 changes: 4 additions & 2 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2548,7 +2548,8 @@ func (c *Core) Orders(filter *OrderFilter) ([]*Order, error) {
func (c *Core) coreOrderFromMetaOrder(mOrd *db.MetaOrder) (*Order, error) {
corder := coreOrderFromTrade(mOrd.Order, mOrd.MetaData)
oid := mOrd.Order.ID()
matches, err := c.db.MatchesForOrder(oid)
excludeCancels := false // maybe don't include cancel order matches?
matches, err := c.db.MatchesForOrder(oid, excludeCancels)
if err != nil {
return nil, fmt.Errorf("MatchesForOrder error loading matches for %s: %w", oid, err)
}
Expand Down Expand Up @@ -3901,6 +3902,7 @@ func (c *Core) dbTrackers(dc *dexConnection) (map[order.OrderID]*trackedTrade, e
}

trackers := make(map[order.OrderID]*trackedTrade, len(dbOrders))
excludeCancelMatches := true
for _, dbOrder := range dbOrders {
ord := dbOrder.Order
oid := ord.ID()
Expand All @@ -3923,7 +3925,7 @@ func (c *Core) dbTrackers(dc *dexConnection) (map[order.OrderID]*trackedTrade, e
trackers[dbOrder.Order.ID()] = tracker

// Get matches.
dbMatches, err := c.db.MatchesForOrder(oid)
dbMatches, err := c.db.MatchesForOrder(oid, excludeCancelMatches)
if err != nil {
return nil, fmt.Errorf("error loading matches for order %s: %w", oid, err)
}
Expand Down
2 changes: 1 addition & 1 deletion client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (tdb *TDB) ActiveMatches() ([]*db.MetaMatch, error) {
return nil, nil
}

func (tdb *TDB) MatchesForOrder(oid order.OrderID) ([]*db.MetaMatch, error) {
func (tdb *TDB) MatchesForOrder(oid order.OrderID, excludeCancels bool) ([]*db.MetaMatch, error) {
return tdb.matchesForOID, tdb.matchesForOIDErr
}

Expand Down
33 changes: 24 additions & 9 deletions client/db/bolt/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,16 @@ func decodeOrderBucket(oid []byte, oBkt *bbolt.Bucket) (*dexdb.MetaOrder, error)
var linkedID order.OrderID
copy(linkedID[:], oBkt.Get(linkedKey))

// Old cancel orders may not have a maxFeeRate set since the v2 upgrade
// doesn't set it for cancel orders.
var maxFeeRate uint64
if maxFeeRateB := oBkt.Get(maxFeeRateKey); len(maxFeeRateB) == 8 {
maxFeeRate = intCoder.Uint64(maxFeeRateB)
} else if ord.Type() != order.CancelOrderType {
// Cancel orders should use zero, but trades need a non-zero value.
maxFeeRate = ^uint64(0) // should not happen for trade orders after v2 upgrade
}

return &dexdb.MetaOrder{
MetaData: &dexdb.OrderMetaData{
Proof: *proof,
Expand All @@ -712,7 +722,7 @@ func decodeOrderBucket(oid []byte, oBkt *bbolt.Bucket) (*dexdb.MetaOrder, error)
ChangeCoin: getCopy(oBkt, changeKey),
LinkedOrder: linkedID,
SwapFeesPaid: intCoder.Uint64(oBkt.Get(swapFeesKey)),
MaxFeeRate: intCoder.Uint64(oBkt.Get(maxFeeRateKey)),
MaxFeeRate: maxFeeRate,
RedemptionFeesPaid: intCoder.Uint64(oBkt.Get(redemptionFeesKey)),
},
Order: ord,
Expand Down Expand Up @@ -818,7 +828,7 @@ func (db *BoltDB) ActiveMatches() ([]*dexdb.MetaMatch, error) {
return db.filteredMatches(func(mBkt *bbolt.Bucket) bool {
status := mBkt.Get(statusKey)
return len(status) != 1 || status[0] != uint8(order.MatchComplete)
})
}, true) // cancel matches are immediately complete, never active, but the status filter will apply first anyway
}

// DEXOrdersWithActiveMatches retrieves order IDs for any order that has active
Expand Down Expand Up @@ -848,12 +858,12 @@ func (db *BoltDB) DEXOrdersWithActiveMatches(dex string) ([]order.OrderID, error
}

// Inactive if refunded.
proofB := getCopy(mBkt, proofKey)
proofB := mBkt.Get(proofKey)
if len(proofB) == 0 {
db.log.Errorf("empty match proof")
return nil
}
proof, errM := dexdb.DecodeMatchProof(proofB)
proof, _, errM := dexdb.DecodeMatchProof(proofB)
if errM != nil {
db.log.Errorf("error decoding proof: %v", errM)
return nil
Expand Down Expand Up @@ -912,18 +922,20 @@ func (db *BoltDB) DEXOrdersWithActiveMatches(dex string) ([]order.OrderID, error
}

// MatchesForOrder retrieves the matches for the specified order ID.
func (db *BoltDB) MatchesForOrder(oid order.OrderID) ([]*dexdb.MetaMatch, error) {
func (db *BoltDB) MatchesForOrder(oid order.OrderID, excludeCancels bool) ([]*dexdb.MetaMatch, error) {
oidB := oid[:]
return db.filteredMatches(func(mBkt *bbolt.Bucket) bool {
oid := mBkt.Get(orderIDKey)
return bytes.Equal(oid, oidB)
})
}, excludeCancels)
}

// filteredMatches gets all matches that pass the provided filter function. Each
// match's bucket is provided to the filter, and a boolean true return value
// indicates the match should be decoded and returned.
func (db *BoltDB) filteredMatches(filter func(*bbolt.Bucket) bool) ([]*dexdb.MetaMatch, error) {
// indicates the match should be decoded and returned. Matches with cancel
// orders may be excluded, a separate option so the filter function does not
// need to load and decode the matchKey value.
func (db *BoltDB) filteredMatches(filter func(*bbolt.Bucket) bool, excludeCancels bool) ([]*dexdb.MetaMatch, error) {
var matches []*dexdb.MetaMatch
return matches, db.matchesView(func(master *bbolt.Bucket) error {
return master.ForEach(func(k, _ []byte) error {
Expand All @@ -941,11 +953,14 @@ func (db *BoltDB) filteredMatches(filter func(*bbolt.Bucket) bool) ([]*dexdb.Met
if err != nil {
return fmt.Errorf("error decoding match %x: %w", k, err)
}
if excludeCancels && match.Address == "" {
return nil
}
proofB := getCopy(mBkt, proofKey)
if len(proofB) == 0 {
return fmt.Errorf("empty proof")
}
proof, err = dexdb.DecodeMatchProof(proofB)
proof, _, err = dexdb.DecodeMatchProof(proofB)
if err != nil {
return fmt.Errorf("error decoding proof: %w", err)
}
Expand Down
68 changes: 51 additions & 17 deletions client/db/bolt/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

dexdb "decred.org/dcrdex/client/db"
"decred.org/dcrdex/dex/encode"
"decred.org/dcrdex/dex/order"
"go.etcd.io/bbolt"
)

Expand Down Expand Up @@ -84,16 +85,18 @@ func (db *BoltDB) upgradeDB() error {
return fmt.Errorf("failed to backup DB prior to upgrade: %w", err)
}

return db.Update(func(tx *bbolt.Tx) error {
// Execute all necessary upgrades in order.
for i, upgrade := range upgrades[version:] {
err := doUpgrade(tx, upgrade, version+uint32(i)+1)
if err != nil {
return err
}
// Each upgrade its own tx, otherwise bolt eats too much RAM.
for i, upgrade := range upgrades[version:] {
newVersion := version + uint32(i) + 1
db.log.Debugf("Upgrading to version %d...", newVersion)
err = db.Update(func(tx *bbolt.Tx) error {
return doUpgrade(tx, upgrade, newVersion)
})
if err != nil {
return err
}
return nil
})
}
return nil
}

// Get the currently stored DB version.
Expand Down Expand Up @@ -129,13 +132,13 @@ func v1Upgrade(dbtx *bbolt.Tx) error {
if bkt == nil {
return fmt.Errorf("appBucket not found")
}

return reloadMatchProofs(dbtx)
skipCancels := true // cancel matches don't get revoked, only cancel orders
return reloadMatchProofs(dbtx, skipCancels)
}

// v2Upgrade adds a MaxFeeRate field to the OrderMetaData. The upgrade sets the
// MaxFeeRate field for all historical orders to the max uint64. This avoids any
// chance of rejecting a pre-existing active match.
// MaxFeeRate field for all historical trade orders to the max uint64. This
// avoids any chance of rejecting a pre-existing active match.
func v2Upgrade(dbtx *bbolt.Tx) error {
const oldVersion = 1

Expand All @@ -161,6 +164,18 @@ func v2Upgrade(dbtx *bbolt.Tx) error {
if oBkt == nil {
return fmt.Errorf("order %x bucket is not a bucket", oid)
}
// Cancel orders should be stored with a zero maxFeeRate, as done in
// (*Core).tryCancelTrade. Besides, the maxFeeRate should not be applied
// to cancel matches, as done in (*dexConnection).parseMatches.
oTypeB := oBkt.Get(typeKey)
if len(oTypeB) != 1 {
return fmt.Errorf("order %x type invalid: %x", oid, oTypeB)
}
if order.OrderType(oTypeB[0]) == order.CancelOrderType {
// Don't bother setting maxFeeRate for cancel orders.
// decodeOrderBucket will default to zero for cancels.
return nil
}
return oBkt.Put(maxFeeRateKey, maxFeeB)
})
}
Expand All @@ -175,7 +190,8 @@ func v3Upgrade(dbtx *bbolt.Tx) error {
// Upgrade the match proof. We just have to retrieve and re-store the
// buckets. The decoder will recognize the the old version and add the new
// field.
return reloadMatchProofs(dbtx)
skipCancels := true // cancel matches have no tx data
return reloadMatchProofs(dbtx, skipCancels)
}

func ensureVersion(tx *bbolt.Tx, ver uint32) error {
Expand All @@ -190,21 +206,39 @@ func ensureVersion(tx *bbolt.Tx, ver uint32) error {
return nil
}

func reloadMatchProofs(tx *bbolt.Tx) error {
// Note that reloadMatchProofs will rewrite the MatchProof with the current
// match proof encoding version. Thus, multiple upgrades in a row calling
// reloadMatchProofs may be no-ops. Matches with cancel orders may be skipped.
func reloadMatchProofs(tx *bbolt.Tx, skipCancels bool) error {
matches := tx.Bucket(matchesBucket)
return matches.ForEach(func(k, _ []byte) error {
mBkt := matches.Bucket(k)
if mBkt == nil {
return fmt.Errorf("match %x bucket is not a bucket", k)
}
proofB := getCopy(mBkt, proofKey)
proofB := mBkt.Get(proofKey)
if len(proofB) == 0 {
return fmt.Errorf("empty match proof")
}
proof, err := dexdb.DecodeMatchProof(proofB)
proof, ver, err := dexdb.DecodeMatchProof(proofB)
if err != nil {
return fmt.Errorf("error decoding proof: %w", err)
}
// No need to rewrite this if it was loaded from the current version.
if ver == dexdb.MatchProofVer {
return nil
}
// No Script, and MatchComplete status means this is a cancel match.
if skipCancels && len(proof.Script) == 0 {
statusB := mBkt.Get(statusKey)
if len(statusB) != 1 {
return fmt.Errorf("no match status")
}
if order.MatchStatus(statusB[0]) == order.MatchComplete {
return nil
}
}

err = mBkt.Put(proofKey, proof.Encode())
if err != nil {
return fmt.Errorf("error re-storing match proof: %w", err)
Expand Down
19 changes: 13 additions & 6 deletions client/db/bolt/upgrades_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var dbUpgradeTests = [...]struct {
filename string // in testdata directory
newVersion uint32
}{
// {"testnetbot", v1Upgrade, verifyV1Upgrade, "dexbot-testnet.db.gz", 3}, // only for TestUpgradeDB
{"upgradeFromV0", v1Upgrade, verifyV1Upgrade, "v0.db.gz", 1},
{"upgradeFromV1", v2Upgrade, verifyV2Upgrade, "v1.db.gz", 2},
{"upgradeFromV2", v3Upgrade, verifyV3Upgrade, "v2.db.gz", 3},
Expand Down Expand Up @@ -82,13 +83,15 @@ func TestUpgrades(t *testing.T) {

func TestUpgradeDB(t *testing.T) {
runUpgrade := func(archiveName string) error {
dbPath, close := unpack(t, archiveName)
defer close()
dbPath, cleanup := unpack(t, archiveName)
defer cleanup()
// NewDB runs upgradeDB.
dbi, err := NewDB(dbPath, tLogger)
if err != nil {
return fmt.Errorf("database initialization error: %w", err)
return fmt.Errorf("database initialization or upgrade error: %w", err)
}
db := dbi.(*BoltDB)
// Run upgradeDB again and it should be happy.
err = db.upgradeDB()
if err != nil {
return fmt.Errorf("upgradeDB error: %v", err)
Expand All @@ -113,6 +116,7 @@ func TestUpgradeDB(t *testing.T) {
}

func verifyV1Upgrade(t *testing.T, db *bbolt.DB) {
t.Helper()
err := db.View(func(dbtx *bbolt.Tx) error {
return checkVersion(dbtx, 1)
})
Expand All @@ -122,6 +126,7 @@ func verifyV1Upgrade(t *testing.T, db *bbolt.DB) {
}

func verifyV2Upgrade(t *testing.T, db *bbolt.DB) {
t.Helper()
maxFeeB := uint64Bytes(^uint64(0))

err := db.View(func(dbtx *bbolt.Tx) error {
Expand Down Expand Up @@ -153,6 +158,7 @@ func verifyV2Upgrade(t *testing.T, db *bbolt.DB) {
// Nothing to really check here. Any errors would have come out during the
// upgrade process itself, since we just added a default nil field.
func verifyV3Upgrade(t *testing.T, db *bbolt.DB) {
t.Helper()
err := db.View(func(dbtx *bbolt.Tx) error {
return checkVersion(dbtx, 3)
})
Expand All @@ -179,6 +185,7 @@ func checkVersion(dbtx *bbolt.Tx, expectedVersion uint32) error {
}

func unpack(t *testing.T, db string) (string, func()) {
t.Helper()
d, err := ioutil.TempDir("", "dcrdex_test_upgrades")
if err != nil {
t.Fatal(err)
Expand All @@ -200,13 +207,13 @@ func unpack(t *testing.T, db string) (string, func()) {
t.Fatal(err)
}
_, err = io.Copy(dbFile, r)

archive.Close()
dbFile.Close()
if err != nil {
os.RemoveAll(d)
t.Fatal(err)
}
return dbPath, func() {
dbFile.Close()
archive.Close()
os.RemoveAll(d)
}
}
2 changes: 1 addition & 1 deletion client/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type DB interface {
// state.
DEXOrdersWithActiveMatches(dex string) ([]order.OrderID, error)
// MatchesForOrder gets the matches for the order ID.
MatchesForOrder(oid order.OrderID) ([]*MetaMatch, error)
MatchesForOrder(oid order.OrderID, excludeCancels bool) ([]*MetaMatch, error)
// Update wallets adds a wallet to the database, or updates the wallet
// credentials if the wallet already exists. A wallet is specified by the
// pair (asset ID, account name).
Expand Down
5 changes: 4 additions & 1 deletion client/db/test/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ func TestMatchProof(t *testing.T) {
nTimes(spins, func(i int) {
proof := proofs[i]
proofB := proof.Encode()
reProof, err := db.DecodeMatchProof(proofB)
reProof, ver, err := db.DecodeMatchProof(proofB)
if err != nil {
t.Fatalf("match decode error: %v", err)
}
MustCompareMatchProof(t, proof, reProof)
if ver != db.MatchProofVer {
t.Errorf("wanted match proof ver %d, got %d", db.MatchProofVer, ver)
}
})
t.Logf("encoded, decoded, and compared %d MatchProof in %d ms", spins, time.Since(tStart)/time.Millisecond)
}
Expand Down
Loading

0 comments on commit bbd3bfd

Please sign in to comment.