Skip to content

Commit

Permalink
multi: split UpsertAssetProof and use mutex
Browse files Browse the repository at this point in the history
`UpsertAssetProof` gave issues with a postgresql backend. See: #951

By separating the subquery from the upsert, we can error out if we see
more than one asset ID (primary key from table assets) being returned
from the database. This should return a more meaningful error than the
original postgresql error.

By also surrounding the usage with mutex we prevent the deadlocks we saw
happening.
  • Loading branch information
gijswijs committed Jun 21, 2024
1 parent 4d44446 commit 0e292f3
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 76 deletions.
2 changes: 1 addition & 1 deletion itest/addrs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func runMultiSendTest(ctxt context.Context, t *harnessTest, alice,
AssertAddrEvent(t.t, alice, aliceAddr2, 1, statusDetected)

// Mine a block to make sure the events are marked as confirmed.
_ = MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)[0]
_ = MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)

// Eventually the events should be marked as confirmed.
AssertAddrEventByStatus(t.t, bob, statusConfirmed, 2)
Expand Down
50 changes: 38 additions & 12 deletions tapdb/asset_minting.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,14 @@ type (
// MintingBatchInit is used to create a new minting batch.
MintingBatchInit = sqlc.NewMintingBatchParams

// ProofUpdate is used to update a proof file on disk.
ProofUpdate = sqlc.UpsertAssetProofParams

// ProofUpdateByID is used to update a proof file on disk by asset
// database ID.
ProofUpdateByID = sqlc.UpsertAssetProofByIDParams

// FetchAssetID is used to fetch the primary key ID of an asset, by
// outpoint and tweaked script key.
FetchAssetID = sqlc.FetchAssetIDParams

// NewScriptKey wraps the params needed to insert a new script key on
// disk.
NewScriptKey = sqlc.UpsertScriptKeyParams
Expand Down Expand Up @@ -222,12 +223,15 @@ type PendingAssetStore interface {
FetchAssetsForBatch(ctx context.Context, rawKey []byte) ([]AssetSprout,
error)

// UpsertAssetProof inserts a new or updates an existing asset proof on
// disk.
//
// TODO(roasbeef): move somewhere else??
UpsertAssetProof(ctx context.Context,
arg sqlc.UpsertAssetProofParams) error
// FetchAssetID fetches the `asset_id` (primary key) from the assets
// table for a given asset identified by `Outpoint` and
// `TweakedScriptKey`
FetchAssetID(ctx context.Context,
arg FetchAssetID) ([]int64, error)

// UpsertAssetProofByID inserts a new or updates an existing asset
// proof on disk.
UpsertAssetProofByID(ctx context.Context, arg ProofUpdateByID) error

// FetchAssetMetaForAsset fetches the asset meta for a given asset.
FetchAssetMetaForAsset(ctx context.Context,
Expand Down Expand Up @@ -1644,9 +1648,31 @@ func (a *AssetMintingStore) MarkBatchConfirmed(ctx context.Context,
// As a final act, we'll now insert the proof files for each of
// the assets that were fully confirmed with this block.
for scriptKey, proofBlob := range mintingProofs {
err := q.UpsertAssetProof(ctx, ProofUpdate{
TweakedScriptKey: scriptKey.CopyBytes(),
ProofFile: proofBlob,
// We need to fetch the table primary key `asset_id`
// first, as we need it to update the proof.
dbAssetIds, err := q.FetchAssetID(
ctx,
FetchAssetID{
TweakedScriptKey: scriptKey.CopyBytes(),
},
)
if err != nil {
return err
}

// We should not have more than one `asset_id`.
if len(dbAssetIds) > 1 {
return fmt.Errorf("expected 1 asset id, found"+
" %d with asset ids %v",
len(dbAssetIds), dbAssetIds)
}

// Upload proof by the dbAssetId, which is the _primary
// key_ of the asset in table assets, not the BIPS
// concept of `asset_id`.
err = q.UpsertAssetProofByID(ctx, ProofUpdateByID{
AssetID: dbAssetIds[0],
ProofFile: proofBlob,
})
if err != nil {
return fmt.Errorf("unable to insert proof "+
Expand Down
113 changes: 97 additions & 16 deletions tapdb/assets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"math"
"sync"
"time"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -214,9 +215,11 @@ type ActiveAssetsStore interface {
UpsertManagedUTXO(ctx context.Context, arg RawManagedUTXO) (int64,
error)

// UpsertAssetProof inserts a new or updates an existing asset proof on
// disk.
UpsertAssetProof(ctx context.Context, arg ProofUpdate) error
// FetchAssetID fetches the `asset_id` (primary key) from the assets
// table for a given asset identified by `Outpoint` and
// `TweakedScriptKey`
FetchAssetID(ctx context.Context,
arg FetchAssetID) ([]int64, error)

// UpsertAssetProofByID inserts a new or updates an existing asset
// proof on disk.
Expand Down Expand Up @@ -376,6 +379,8 @@ type AssetStore struct {
clock clock.Clock

txHeights *lru.Cache[chainhash.Hash, cacheableBlockHeight]

proofInsertMtx sync.Mutex
}

// NewAssetStore creates a new AssetStore from the specified BatchedAssetStore
Expand Down Expand Up @@ -1569,10 +1574,32 @@ func (a *AssetStore) importAssetFromProof(ctx context.Context,
// As a final step, we'll insert the proof file we used to generate all
// the above information.
scriptKeyBytes := newAsset.ScriptKey.PubKey.SerializeCompressed()
return db.UpsertAssetProof(ctx, ProofUpdate{
TweakedScriptKey: scriptKeyBytes,
Outpoint: anchorPoint,
ProofFile: proof.Blob,

// We need to fetch the table primary key `asset_id` first, as we need
// it to update the proof.
dbAssetIds, err := db.FetchAssetID(
ctx,
FetchAssetID{
TweakedScriptKey: scriptKeyBytes,
Outpoint: anchorPoint,
},
)
if err != nil {
return err
}

// We should not have more than one `asset_id`.
if len(dbAssetIds) > 1 {
return fmt.Errorf("expected 1 asset id, found"+
" %d with asset ids %v",
len(dbAssetIds), dbAssetIds)
}

// Upload proof by the dbAssetId, which is the _primary key_ of the
// asset in table assets, not the BIPS concept of `asset_id`.
return db.UpsertAssetProofByID(ctx, ProofUpdateByID{
AssetID: dbAssetIds[0],
ProofFile: proof.Blob,
})
}

Expand Down Expand Up @@ -1615,10 +1642,33 @@ func (a *AssetStore) upsertAssetProof(ctx context.Context,
// As a final step, we'll insert the proof file we used to generate all
// the above information.
scriptKeyBytes := proof.Asset.ScriptKey.PubKey.SerializeCompressed()
return db.UpsertAssetProof(ctx, ProofUpdate{
TweakedScriptKey: scriptKeyBytes,
Outpoint: outpointBytes,
ProofFile: proof.Blob,

// We need to fetch the table primary key `asset_id` first, as we need
// it to update the proof. We could do this in one query, this gave
// issues with a postgresql backend. See:
// https://github.com/lightninglabs/taproot-assets/issues/951
dbAssetIds, err := db.FetchAssetID(
ctx,
FetchAssetID{
TweakedScriptKey: scriptKeyBytes,
Outpoint: outpointBytes,
},
)
if err != nil {
return err
}

// We should not have more than one `asset_id`.
if len(dbAssetIds) > 1 {
return fmt.Errorf("expected 1 asset id, found %d with asset"+
" ids %v", len(dbAssetIds), dbAssetIds)
}

// Upload proof by the dbAssetId, which is the _primary key_ of the
// asset in table assets, not the BIPS concept of `asset_id`.
return db.UpsertAssetProofByID(ctx, ProofUpdateByID{
AssetID: dbAssetIds[0],
ProofFile: proof.Blob,
})
}

Expand All @@ -1633,6 +1683,10 @@ func (a *AssetStore) ImportProofs(ctx context.Context, _ proof.HeaderVerifier,
proofs ...*proof.AnnotatedProof) error {

var writeTxOpts AssetStoreTxOptions

a.proofInsertMtx.Lock()
defer a.proofInsertMtx.Unlock()

err := a.db.ExecTx(ctx, &writeTxOpts, func(q ActiveAssetsStore) error {
for _, p := range proofs {
if replace {
Expand Down Expand Up @@ -2674,6 +2728,10 @@ func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context,
writeTxOpts AssetStoreTxOptions
localProofKeys []asset.SerializedKey
)

a.proofInsertMtx.Lock()
defer a.proofInsertMtx.Unlock()

err := a.db.ExecTx(ctx, &writeTxOpts, func(q ActiveAssetsStore) error {
// First, we'll fetch the asset transfer based on its outpoint
// bytes, so we can apply the delta it describes.
Expand Down Expand Up @@ -2824,12 +2882,35 @@ func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context,
localProofKeys = append(localProofKeys, scriptKey)

// Now we can update the asset proof for the sender for
// this given delta.
err = q.UpsertAssetProof(ctx, ProofUpdate{
TweakedScriptKey: out.ScriptKeyBytes,
Outpoint: out.AnchorOutpoint,
ProofFile: receiverProof.Blob,
// this given delta. We need to fetch the table primary
// key `asset_id` first, as we need it to update the
// proof.
dbAssetIds, err := q.FetchAssetID(
ctx,
FetchAssetID{
TweakedScriptKey: out.ScriptKeyBytes,
Outpoint: out.AnchorOutpoint,
},
)
if err != nil {
return err
}

// We should not have more than one `asset_id`.
if len(dbAssetIds) > 1 {
return fmt.Errorf("expected 1 asset id, found"+
" %d with asset ids %v",
len(dbAssetIds), dbAssetIds)
}

// Upload proof by the dbAssetId, which is the _primary
// key_ of the asset in table assets, not the BIPS
// concept of `asset_id`.
err = q.UpsertAssetProofByID(ctx, ProofUpdateByID{
AssetID: dbAssetIds[0],
ProofFile: receiverProof.Blob,
})

if err != nil {
return err
}
Expand Down
76 changes: 42 additions & 34 deletions tapdb/sqlc/assets.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tapdb/sqlc/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 3 additions & 12 deletions tapdb/sqlc/queries/assets.sql
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,8 @@ UPDATE chain_txns
SET block_height = $2, block_hash = $3, tx_index = $4
WHERE txn_id in (SELECT txn_id FROM target_txn);

-- name: UpsertAssetProof :exec
WITH target_asset(asset_id) AS (
SELECT asset_id
-- name: FetchAssetID :many
SELECT asset_id
FROM assets
JOIN script_keys
ON assets.script_key_id = script_keys.script_key_id
Expand All @@ -681,15 +680,7 @@ WITH target_asset(asset_id) AS (
(script_keys.tweaked_script_key = sqlc.narg('tweaked_script_key')
OR sqlc.narg('tweaked_script_key') IS NULL)
AND (utxos.outpoint = sqlc.narg('outpoint')
OR sqlc.narg('outpoint') IS NULL)
)
INSERT INTO asset_proofs (
asset_id, proof_file
) VALUES (
(SELECT asset_id FROM target_asset), @proof_file
) ON CONFLICT (asset_id)
-- This is not a NOP, we always overwrite the proof with the new one.
DO UPDATE SET proof_file = EXCLUDED.proof_file;
OR sqlc.narg('outpoint') IS NULL);

-- name: UpsertAssetProofByID :exec
INSERT INTO asset_proofs (
Expand Down

0 comments on commit 0e292f3

Please sign in to comment.