From 0e292f3c35808e2719aefcd51d7e8a1d5e01b3b0 Mon Sep 17 00:00:00 2001 From: Gijs van Dam Date: Tue, 18 Jun 2024 17:00:10 +0200 Subject: [PATCH] multi: split `UpsertAssetProof` and use `mutex` `UpsertAssetProof` gave issues with a postgresql backend. See: #951 By separating the subquery from the upsert, we can error out if we see more than one asset ID (primary key from table assets) being returned from the database. This should return a more meaningful error than the original postgresql error. By also surrounding the usage with mutex we prevent the deadlocks we saw happening. --- itest/addrs_test.go | 2 +- tapdb/asset_minting.go | 50 +++++++++++---- tapdb/assets_store.go | 113 +++++++++++++++++++++++++++++----- tapdb/sqlc/assets.sql.go | 76 +++++++++++++---------- tapdb/sqlc/querier.go | 2 +- tapdb/sqlc/queries/assets.sql | 15 +---- 6 files changed, 182 insertions(+), 76 deletions(-) diff --git a/itest/addrs_test.go b/itest/addrs_test.go index 4822af145..7a96fc6f6 100644 --- a/itest/addrs_test.go +++ b/itest/addrs_test.go @@ -603,7 +603,7 @@ func runMultiSendTest(ctxt context.Context, t *harnessTest, alice, AssertAddrEvent(t.t, alice, aliceAddr2, 1, statusDetected) // Mine a block to make sure the events are marked as confirmed. - _ = MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)[0] + _ = MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1) // Eventually the events should be marked as confirmed. AssertAddrEventByStatus(t.t, bob, statusConfirmed, 2) diff --git a/tapdb/asset_minting.go b/tapdb/asset_minting.go index f7a230f39..8ede9b879 100644 --- a/tapdb/asset_minting.go +++ b/tapdb/asset_minting.go @@ -113,13 +113,14 @@ type ( // MintingBatchInit is used to create a new minting batch. MintingBatchInit = sqlc.NewMintingBatchParams - // ProofUpdate is used to update a proof file on disk. - ProofUpdate = sqlc.UpsertAssetProofParams - // ProofUpdateByID is used to update a proof file on disk by asset // database ID. ProofUpdateByID = sqlc.UpsertAssetProofByIDParams + // FetchAssetID is used to fetch the primary key ID of an asset, by + // outpoint and tweaked script key. + FetchAssetID = sqlc.FetchAssetIDParams + // NewScriptKey wraps the params needed to insert a new script key on // disk. NewScriptKey = sqlc.UpsertScriptKeyParams @@ -222,12 +223,15 @@ type PendingAssetStore interface { FetchAssetsForBatch(ctx context.Context, rawKey []byte) ([]AssetSprout, error) - // UpsertAssetProof inserts a new or updates an existing asset proof on - // disk. - // - // TODO(roasbeef): move somewhere else?? - UpsertAssetProof(ctx context.Context, - arg sqlc.UpsertAssetProofParams) error + // FetchAssetID fetches the `asset_id` (primary key) from the assets + // table for a given asset identified by `Outpoint` and + // `TweakedScriptKey` + FetchAssetID(ctx context.Context, + arg FetchAssetID) ([]int64, error) + + // UpsertAssetProofByID inserts a new or updates an existing asset + // proof on disk. + UpsertAssetProofByID(ctx context.Context, arg ProofUpdateByID) error // FetchAssetMetaForAsset fetches the asset meta for a given asset. FetchAssetMetaForAsset(ctx context.Context, @@ -1644,9 +1648,31 @@ func (a *AssetMintingStore) MarkBatchConfirmed(ctx context.Context, // As a final act, we'll now insert the proof files for each of // the assets that were fully confirmed with this block. for scriptKey, proofBlob := range mintingProofs { - err := q.UpsertAssetProof(ctx, ProofUpdate{ - TweakedScriptKey: scriptKey.CopyBytes(), - ProofFile: proofBlob, + // We need to fetch the table primary key `asset_id` + // first, as we need it to update the proof. + dbAssetIds, err := q.FetchAssetID( + ctx, + FetchAssetID{ + TweakedScriptKey: scriptKey.CopyBytes(), + }, + ) + if err != nil { + return err + } + + // We should not have more than one `asset_id`. + if len(dbAssetIds) > 1 { + return fmt.Errorf("expected 1 asset id, found"+ + " %d with asset ids %v", + len(dbAssetIds), dbAssetIds) + } + + // Upload proof by the dbAssetId, which is the _primary + // key_ of the asset in table assets, not the BIPS + // concept of `asset_id`. + err = q.UpsertAssetProofByID(ctx, ProofUpdateByID{ + AssetID: dbAssetIds[0], + ProofFile: proofBlob, }) if err != nil { return fmt.Errorf("unable to insert proof "+ diff --git a/tapdb/assets_store.go b/tapdb/assets_store.go index ccc23ee03..f943d59a1 100644 --- a/tapdb/assets_store.go +++ b/tapdb/assets_store.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "math" + "sync" "time" "github.com/btcsuite/btcd/btcec/v2" @@ -214,9 +215,11 @@ type ActiveAssetsStore interface { UpsertManagedUTXO(ctx context.Context, arg RawManagedUTXO) (int64, error) - // UpsertAssetProof inserts a new or updates an existing asset proof on - // disk. - UpsertAssetProof(ctx context.Context, arg ProofUpdate) error + // FetchAssetID fetches the `asset_id` (primary key) from the assets + // table for a given asset identified by `Outpoint` and + // `TweakedScriptKey` + FetchAssetID(ctx context.Context, + arg FetchAssetID) ([]int64, error) // UpsertAssetProofByID inserts a new or updates an existing asset // proof on disk. @@ -376,6 +379,8 @@ type AssetStore struct { clock clock.Clock txHeights *lru.Cache[chainhash.Hash, cacheableBlockHeight] + + proofInsertMtx sync.Mutex } // NewAssetStore creates a new AssetStore from the specified BatchedAssetStore @@ -1569,10 +1574,32 @@ func (a *AssetStore) importAssetFromProof(ctx context.Context, // As a final step, we'll insert the proof file we used to generate all // the above information. scriptKeyBytes := newAsset.ScriptKey.PubKey.SerializeCompressed() - return db.UpsertAssetProof(ctx, ProofUpdate{ - TweakedScriptKey: scriptKeyBytes, - Outpoint: anchorPoint, - ProofFile: proof.Blob, + + // We need to fetch the table primary key `asset_id` first, as we need + // it to update the proof. + dbAssetIds, err := db.FetchAssetID( + ctx, + FetchAssetID{ + TweakedScriptKey: scriptKeyBytes, + Outpoint: anchorPoint, + }, + ) + if err != nil { + return err + } + + // We should not have more than one `asset_id`. + if len(dbAssetIds) > 1 { + return fmt.Errorf("expected 1 asset id, found"+ + " %d with asset ids %v", + len(dbAssetIds), dbAssetIds) + } + + // Upload proof by the dbAssetId, which is the _primary key_ of the + // asset in table assets, not the BIPS concept of `asset_id`. + return db.UpsertAssetProofByID(ctx, ProofUpdateByID{ + AssetID: dbAssetIds[0], + ProofFile: proof.Blob, }) } @@ -1615,10 +1642,33 @@ func (a *AssetStore) upsertAssetProof(ctx context.Context, // As a final step, we'll insert the proof file we used to generate all // the above information. scriptKeyBytes := proof.Asset.ScriptKey.PubKey.SerializeCompressed() - return db.UpsertAssetProof(ctx, ProofUpdate{ - TweakedScriptKey: scriptKeyBytes, - Outpoint: outpointBytes, - ProofFile: proof.Blob, + + // We need to fetch the table primary key `asset_id` first, as we need + // it to update the proof. We could do this in one query, this gave + // issues with a postgresql backend. See: + // https://github.com/lightninglabs/taproot-assets/issues/951 + dbAssetIds, err := db.FetchAssetID( + ctx, + FetchAssetID{ + TweakedScriptKey: scriptKeyBytes, + Outpoint: outpointBytes, + }, + ) + if err != nil { + return err + } + + // We should not have more than one `asset_id`. + if len(dbAssetIds) > 1 { + return fmt.Errorf("expected 1 asset id, found %d with asset"+ + " ids %v", len(dbAssetIds), dbAssetIds) + } + + // Upload proof by the dbAssetId, which is the _primary key_ of the + // asset in table assets, not the BIPS concept of `asset_id`. + return db.UpsertAssetProofByID(ctx, ProofUpdateByID{ + AssetID: dbAssetIds[0], + ProofFile: proof.Blob, }) } @@ -1633,6 +1683,10 @@ func (a *AssetStore) ImportProofs(ctx context.Context, _ proof.HeaderVerifier, proofs ...*proof.AnnotatedProof) error { var writeTxOpts AssetStoreTxOptions + + a.proofInsertMtx.Lock() + defer a.proofInsertMtx.Unlock() + err := a.db.ExecTx(ctx, &writeTxOpts, func(q ActiveAssetsStore) error { for _, p := range proofs { if replace { @@ -2674,6 +2728,10 @@ func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context, writeTxOpts AssetStoreTxOptions localProofKeys []asset.SerializedKey ) + + a.proofInsertMtx.Lock() + defer a.proofInsertMtx.Unlock() + err := a.db.ExecTx(ctx, &writeTxOpts, func(q ActiveAssetsStore) error { // First, we'll fetch the asset transfer based on its outpoint // bytes, so we can apply the delta it describes. @@ -2824,12 +2882,35 @@ func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context, localProofKeys = append(localProofKeys, scriptKey) // Now we can update the asset proof for the sender for - // this given delta. - err = q.UpsertAssetProof(ctx, ProofUpdate{ - TweakedScriptKey: out.ScriptKeyBytes, - Outpoint: out.AnchorOutpoint, - ProofFile: receiverProof.Blob, + // this given delta. We need to fetch the table primary + // key `asset_id` first, as we need it to update the + // proof. + dbAssetIds, err := q.FetchAssetID( + ctx, + FetchAssetID{ + TweakedScriptKey: out.ScriptKeyBytes, + Outpoint: out.AnchorOutpoint, + }, + ) + if err != nil { + return err + } + + // We should not have more than one `asset_id`. + if len(dbAssetIds) > 1 { + return fmt.Errorf("expected 1 asset id, found"+ + " %d with asset ids %v", + len(dbAssetIds), dbAssetIds) + } + + // Upload proof by the dbAssetId, which is the _primary + // key_ of the asset in table assets, not the BIPS + // concept of `asset_id`. + err = q.UpsertAssetProofByID(ctx, ProofUpdateByID{ + AssetID: dbAssetIds[0], + ProofFile: receiverProof.Blob, }) + if err != nil { return err } diff --git a/tapdb/sqlc/assets.sql.go b/tapdb/sqlc/assets.sql.go index 88cc429c1..1ebda3434 100644 --- a/tapdb/sqlc/assets.sql.go +++ b/tapdb/sqlc/assets.sql.go @@ -516,6 +516,48 @@ func (q *Queries) DeleteUTXOLease(ctx context.Context, outpoint []byte) error { return err } +const fetchAssetID = `-- name: FetchAssetID :many +SELECT asset_id + FROM assets + JOIN script_keys + ON assets.script_key_id = script_keys.script_key_id + JOIN managed_utxos utxos + ON assets.anchor_utxo_id = utxos.utxo_id + WHERE + (script_keys.tweaked_script_key = $1 + OR $1 IS NULL) + AND (utxos.outpoint = $2 + OR $2 IS NULL) +` + +type FetchAssetIDParams struct { + TweakedScriptKey []byte + Outpoint []byte +} + +func (q *Queries) FetchAssetID(ctx context.Context, arg FetchAssetIDParams) ([]int64, error) { + rows, err := q.db.QueryContext(ctx, fetchAssetID, arg.TweakedScriptKey, arg.Outpoint) + if err != nil { + return nil, err + } + defer rows.Close() + var items []int64 + for rows.Next() { + var asset_id int64 + if err := rows.Scan(&asset_id); err != nil { + return nil, err + } + items = append(items, asset_id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const fetchAssetMeta = `-- name: FetchAssetMeta :one SELECT meta_data_hash, meta_data_blob, meta_data_type FROM assets_meta @@ -2509,40 +2551,6 @@ func (q *Queries) UpsertAssetMeta(ctx context.Context, arg UpsertAssetMetaParams return meta_id, err } -const upsertAssetProof = `-- name: UpsertAssetProof :exec -WITH target_asset(asset_id) AS ( - SELECT asset_id - FROM assets - JOIN script_keys - ON assets.script_key_id = script_keys.script_key_id - JOIN managed_utxos utxos - ON assets.anchor_utxo_id = utxos.utxo_id - WHERE - (script_keys.tweaked_script_key = $2 - OR $2 IS NULL) - AND (utxos.outpoint = $3 - OR $3 IS NULL) -) -INSERT INTO asset_proofs ( - asset_id, proof_file -) VALUES ( - (SELECT asset_id FROM target_asset), $1 -) ON CONFLICT (asset_id) - -- This is not a NOP, we always overwrite the proof with the new one. - DO UPDATE SET proof_file = EXCLUDED.proof_file -` - -type UpsertAssetProofParams struct { - ProofFile []byte - TweakedScriptKey []byte - Outpoint []byte -} - -func (q *Queries) UpsertAssetProof(ctx context.Context, arg UpsertAssetProofParams) error { - _, err := q.db.ExecContext(ctx, upsertAssetProof, arg.ProofFile, arg.TweakedScriptKey, arg.Outpoint) - return err -} - const upsertAssetProofByID = `-- name: UpsertAssetProofByID :exec INSERT INTO asset_proofs ( asset_id, proof_file diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index 340d49061..107c06cf6 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -44,6 +44,7 @@ type Querier interface { FetchAddrEventByAddrKeyAndOutpoint(ctx context.Context, arg FetchAddrEventByAddrKeyAndOutpointParams) (FetchAddrEventByAddrKeyAndOutpointRow, error) FetchAddrs(ctx context.Context, arg FetchAddrsParams) ([]FetchAddrsRow, error) FetchAllNodes(ctx context.Context) ([]MssmtNode, error) + FetchAssetID(ctx context.Context, arg FetchAssetIDParams) ([]int64, error) FetchAssetMeta(ctx context.Context, metaID int64) (FetchAssetMetaRow, error) FetchAssetMetaByHash(ctx context.Context, metaDataHash []byte) (FetchAssetMetaByHashRow, error) FetchAssetMetaForAsset(ctx context.Context, assetID []byte) (FetchAssetMetaForAssetRow, error) @@ -157,7 +158,6 @@ type Querier interface { UpsertAssetGroupKey(ctx context.Context, arg UpsertAssetGroupKeyParams) (int64, error) UpsertAssetGroupWitness(ctx context.Context, arg UpsertAssetGroupWitnessParams) (int64, error) UpsertAssetMeta(ctx context.Context, arg UpsertAssetMetaParams) (int64, error) - UpsertAssetProof(ctx context.Context, arg UpsertAssetProofParams) error UpsertAssetProofByID(ctx context.Context, arg UpsertAssetProofByIDParams) error UpsertAssetWitness(ctx context.Context, arg UpsertAssetWitnessParams) error UpsertChainTx(ctx context.Context, arg UpsertChainTxParams) (int64, error) diff --git a/tapdb/sqlc/queries/assets.sql b/tapdb/sqlc/queries/assets.sql index 333d64f38..5c5fb6d3c 100644 --- a/tapdb/sqlc/queries/assets.sql +++ b/tapdb/sqlc/queries/assets.sql @@ -669,9 +669,8 @@ UPDATE chain_txns SET block_height = $2, block_hash = $3, tx_index = $4 WHERE txn_id in (SELECT txn_id FROM target_txn); --- name: UpsertAssetProof :exec -WITH target_asset(asset_id) AS ( - SELECT asset_id +-- name: FetchAssetID :many +SELECT asset_id FROM assets JOIN script_keys ON assets.script_key_id = script_keys.script_key_id @@ -681,15 +680,7 @@ WITH target_asset(asset_id) AS ( (script_keys.tweaked_script_key = sqlc.narg('tweaked_script_key') OR sqlc.narg('tweaked_script_key') IS NULL) AND (utxos.outpoint = sqlc.narg('outpoint') - OR sqlc.narg('outpoint') IS NULL) -) -INSERT INTO asset_proofs ( - asset_id, proof_file -) VALUES ( - (SELECT asset_id FROM target_asset), @proof_file -) ON CONFLICT (asset_id) - -- This is not a NOP, we always overwrite the proof with the new one. - DO UPDATE SET proof_file = EXCLUDED.proof_file; + OR sqlc.narg('outpoint') IS NULL); -- name: UpsertAssetProofByID :exec INSERT INTO asset_proofs (