Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spend transaction change outputs even if undelivered proof(s) #1074

Merged
merged 8 commits into from
Aug 14, 2024
242 changes: 242 additions & 0 deletions itest/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,248 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
wg.Wait()
}

// testSpendChangeOutputWhenProofTransferFail tests that a tapd node is able
// to spend a change output even if the proof transfer for the previous
// transaction fails.
func testSpendChangeOutputWhenProofTransferFail(t *harnessTest) {
var (
ctxb = context.Background()
wg sync.WaitGroup
)

// For this test we will use the universe server as the proof courier.
proofCourier := t.universeServer

// Make a new tapd node which will send an asset to a receiving tapd
// node.
sendTapd := setupTapdHarness(
t.t, t, t.lndHarness.Bob, t.universeServer,
func(params *tapdHarnessParams) {
params.expectErrExit = true
params.proofCourier = proofCourier
},
)
defer func() {
// Any node that has been started within an itest should be
// explicitly stopped within the same itest.
require.NoError(t.t, sendTapd.stop(!*noDelete))
}()

// Use the primary tapd node as the receiver node.
recvTapd := t.tapd

// Use the sending node to mint an asset for sending.
rpcAssets := MintAssetsConfirmBatch(
t.t, t.lndHarness.Miner.Client, sendTapd,
[]*mintrpc.MintAssetRequest{simpleAssets[0]},
)

genInfo := rpcAssets[0].AssetGenesis

// After minting an asset with the sending node, we need to synchronize
// the Universe state to ensure the receiving node is updated and aware
// of the asset.
t.syncUniverseState(sendTapd, recvTapd, len(rpcAssets))

// Create a new address for the receiver node. We will use the universe
// server as the proof courier.
proofCourierAddr := fmt.Sprintf(
"%s://%s", proof.UniverseRpcCourierType,
proofCourier.service.rpcHost(),
)
t.Logf("Proof courier address: %s", proofCourierAddr)

recvAddr, err := recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
AssetId: genInfo.AssetId,
Amt: 10,
ProofCourierAddr: proofCourierAddr,
})
require.NoError(t.t, err)
AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr)

// Soon we will be attempting to send an asset to the receiver node. We
// want any associated proof delivery attempt to fail. Therefore, we
// will take the proof courier service offline.
t.Log("Stopping proof courier service")
require.NoError(t.t, proofCourier.Stop())

// Now that the proof courier service is offline, the sending node's
// attempt to transfer the asset proof should fail.
//
// We will soon start the asset transfer process. However, before we
// start, we subscribe to the send events from the sending tapd node so
// that we can be sure that a proof delivery has been attempted
// unsuccessfully. We assert that at least a single proof delivery
// attempt has been made by identifying a backoff wait event.
events := SubscribeSendEvents(t.t, sendTapd)

wg.Add(1)
go func() {
jharveyb marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()

// Define a target event selector to match the backoff wait
// event. This function selects for a specific event type.
targetEventSelector := func(
event *tapdevrpc.SendAssetEvent) bool {

return AssertSendEventProofTransferBackoffWaitTypeSend(
t, event,
)
}

// Set the context timeout for detecting a single proof delivery
// attempt to something reasonable.
timeout := 2 * defaultProofTransferReceiverAckTimeout

assertAssetNtfsEvent(
t, events, timeout, targetEventSelector, 1,
)
}()

// Start asset transfer and then mine to confirm the associated on-chain
// tx. The on-chain tx should be mined successfully, but we expect the
// asset proof transfer to be unsuccessful.
sendAssetsToAddr(t, sendTapd, recvAddr)
MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)

// There may be a delay between mining the anchoring transaction and
// recognizing its on-chain confirmation. To handle this potential
// delay, we use require.Eventually to ensure the transfer details are
// correctly listed after confirmation.
require.Eventually(t.t, func() bool {
// Ensure that the transaction took place as expected.
listTransfersResp, err := sendTapd.ListTransfers(
ctxb, &taprpc.ListTransfersRequest{},
)
require.NoError(t.t, err)

require.Len(t.t, listTransfersResp.Transfers, 1)

firstTransfer := listTransfersResp.Transfers[0]
require.NotEqual(t.t, firstTransfer.AnchorTxHeightHint, 0)
require.NotEmpty(t.t, firstTransfer.AnchorTxBlockHash)

// Assert proof transfer status for each transfer output.
require.Len(t.t, firstTransfer.Outputs, 2)

// First output should have a proof delivery status of not
// applicable. This indicates that a proof will not be delivered
// for this output.
firstOutput := firstTransfer.Outputs[0]
require.Equal(
t.t, taprpc.ProofDeliveryStatusNotApplicable,
firstOutput.ProofDeliveryStatus,
)

// The second output should have a proof delivery status of
// pending. This indicates that the proof deliver has not yet
// completed successfully.
secondOutput := firstTransfer.Outputs[1]
require.Equal(
t.t, taprpc.ProofDeliveryStatusPending,
secondOutput.ProofDeliveryStatus,
)

return true
}, defaultWaitTimeout, 200*time.Millisecond)

// Wait to ensure that the asset transfer proof deliver attempt has been
// made.
wg.Wait()

// Attempt to send the change output to the receiver node. This
// operation should select the change output from the previous
// transaction and transmit it to the receiver node, despite the fact
// that proof delivery for the previous transaction remains incomplete
// (due to the proof courier being shut down). We will generate a new
// address for this new transaction.
recvAddr, err = recvTapd.NewAddr(ctxb, &taprpc.NewAddrRequest{
AssetId: genInfo.AssetId,
Amt: 42,
ProofCourierAddr: proofCourierAddr,
})
require.NoError(t.t, err)
AssertAddrCreated(t.t, recvTapd, rpcAssets[0], recvAddr)

sendAssetsToAddr(t, sendTapd, recvAddr)
MineBlocks(t.t, t.lndHarness.Miner.Client, 1, 1)

// There may be a delay between mining the anchoring transaction and
// recognizing its on-chain confirmation. To handle this potential
// delay, we use require.Eventually to ensure the transfer details are
// correctly listed after confirmation.
require.Eventually(t.t, func() bool {
ffranr marked this conversation as resolved.
Show resolved Hide resolved
// Ensure that the transaction took place as expected.
listTransfersResp, err := sendTapd.ListTransfers(
ctxb, &taprpc.ListTransfersRequest{},
)
require.NoError(t.t, err)

require.Len(t.t, listTransfersResp.Transfers, 2)

// Inspect the first transfer.
firstTransfer := listTransfersResp.Transfers[0]
require.NotEqual(t.t, firstTransfer.AnchorTxHeightHint, 0)
require.NotEmpty(t.t, firstTransfer.AnchorTxBlockHash)

// Assert proof transfer status for each transfer output.
require.Len(t.t, firstTransfer.Outputs, 2)

// First output should have a proof delivery status of not
// applicable. This indicates that a proof will not be delivered
// for this output.
firstOutput := firstTransfer.Outputs[0]
require.Equal(
ffranr marked this conversation as resolved.
Show resolved Hide resolved
t.t, taprpc.ProofDeliveryStatusNotApplicable,
firstOutput.ProofDeliveryStatus,
)

// The second output should have a proof delivery status of
// pending. This indicates that the proof deliver has not yet
// completed successfully.
secondOutput := firstTransfer.Outputs[1]
require.Equal(
t.t, taprpc.ProofDeliveryStatusPending,
secondOutput.ProofDeliveryStatus,
)

// Inspect the second transfer.
secondTransfer := listTransfersResp.Transfers[1]
require.NotEqual(t.t, secondTransfer.AnchorTxHeightHint, 0)
require.NotEmpty(t.t, secondTransfer.AnchorTxBlockHash)

// Assert proof transfer status for each transfer output.
require.Len(t.t, secondTransfer.Outputs, 2)

// First output should have a proof delivery status of not
// applicable. This indicates that a proof will not be delivered
// for this output.
firstOutput = secondTransfer.Outputs[0]
require.Equal(
t.t, taprpc.ProofDeliveryStatusNotApplicable,
firstOutput.ProofDeliveryStatus,
)

// The second output should have a proof delivery status of
// pending. This indicates that the proof deliver has not yet
// completed successfully.
secondOutput = secondTransfer.Outputs[1]
require.Equal(
t.t, taprpc.ProofDeliveryStatusPending,
secondOutput.ProofDeliveryStatus,
)

return true
}, defaultWaitTimeout, 200*time.Millisecond)

// Restart the proof courier service.
t.Log("Starting proof courier service")
require.NoError(t.t, proofCourier.Start(nil))
guggero marked this conversation as resolved.
Show resolved Hide resolved

// TODO(ffranr): Assert proof transfer complete after proof courier
// restart.
}

// testReattemptFailedReceiveUniCourier ensures that a failed attempt to receive
// an asset proof is retried by the receiving Tapd node. This test focuses on
// the universe proof courier.
Expand Down
4 changes: 4 additions & 0 deletions itest/test_list_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ var testCases = []*testCase{
name: "reattempt proof transfer on tapd restart",
test: testReattemptProofTransferOnTapdRestart,
},
{
name: "spend change output when proof transfer fail",
test: testSpendChangeOutputWhenProofTransferFail,
},
{
name: "reattempt failed receive uni courier",
test: testReattemptFailedReceiveUniCourier,
Expand Down
29 changes: 29 additions & 0 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3364,6 +3364,9 @@ func marshalOutboundParcel(
return nil, err
}

// Marshall the proof delivery status.
proofDeliveryStatus := marshalOutputProofDeliveryStatus(out)

rpcOutputs[idx] = &taprpc.TransferOutput{
Anchor: rpcAnchor,
ScriptKey: scriptPubKey.SerializeCompressed(),
Expand All @@ -3375,20 +3378,46 @@ func marshalOutboundParcel(
SplitCommitRootHash: splitCommitRoot,
OutputType: rpcOutType,
AssetVersion: assetVersion,
ProofDeliveryStatus: proofDeliveryStatus,
}
}

anchorTxHash := parcel.AnchorTx.TxHash()

// Marshal the anchor tx block hash.
var anchorTxBlockHashBytes []byte
parcel.AnchorTxBlockHash.WhenSome(func(hash chainhash.Hash) {
anchorTxBlockHashBytes = hash[:]
})

return &taprpc.AssetTransfer{
TransferTimestamp: parcel.TransferTime.Unix(),
AnchorTxHash: anchorTxHash[:],
AnchorTxHeightHint: parcel.AnchorTxHeightHint,
AnchorTxChainFees: parcel.ChainFees,
AnchorTxBlockHash: anchorTxBlockHashBytes,
Inputs: rpcInputs,
Outputs: rpcOutputs,
}, nil
}

// marshalOutputProofDeliveryStatus turns the output proof delivery status into
// the RPC counterpart.
func marshalOutputProofDeliveryStatus(
out tapfreighter.TransferOutput) taprpc.ProofDeliveryStatus {

proofDeliveryStatus := taprpc.ProofDeliveryStatusNotApplicable
out.ProofDeliveryComplete.WhenSome(func(complete bool) {
if complete {
proofDeliveryStatus = taprpc.ProofDeliveryStatusComplete
} else {
proofDeliveryStatus = taprpc.ProofDeliveryStatusPending
}
})

return proofDeliveryStatus
}

// marshalOutputType turns the transfer output type into the RPC counterpart.
func marshalOutputType(outputType tappsbt.VOutputType) (taprpc.OutputType,
error) {
Expand Down
38 changes: 27 additions & 11 deletions tapdb/assets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2809,9 +2809,10 @@ func (a *AssetStore) ConfirmProofDelivery(ctx context.Context,
return nil
}

// ConfirmParcelDelivery marks a spend event on disk as confirmed. This updates
// the on-chain reference information on disk to point to this new spend.
func (a *AssetStore) ConfirmParcelDelivery(ctx context.Context,
// LogAnchorTxConfirm updates the send package state on disk to reflect the
// confirmation of the anchor transaction, ensuring the on-chain reference
// information is up to date.
func (a *AssetStore) LogAnchorTxConfirm(ctx context.Context,
conf *tapfreighter.AssetConfirmEvent) error {

var (
Expand Down Expand Up @@ -3128,9 +3129,13 @@ func (a *AssetStore) reAnchorPassiveAssets(ctx context.Context,
return nil
}

// PendingParcels returns the set of parcels that haven't yet been finalized.
// This can be used to query the set of unconfirmed
// transactions for re-broadcast.
// PendingParcels returns the set of parcels that have not yet been finalized.
// A parcel is considered finalized once the on-chain anchor transaction is
// included in a block, and all pending transfer output proofs have been
// delivered to their target peers.
//
// NOTE: This can be used to query the set of unconfirmed transactions for
// re-broadcast and for the set of undelivered proofs.
func (a *AssetStore) PendingParcels(
ctx context.Context) ([]*tapfreighter.OutboundParcel, error) {

Expand All @@ -3140,7 +3145,7 @@ func (a *AssetStore) PendingParcels(
// QueryParcels returns the set of confirmed or unconfirmed parcels.
func (a *AssetStore) QueryParcels(ctx context.Context,
anchorTxHash *chainhash.Hash,
unconfirmedTxOnly bool) ([]*tapfreighter.OutboundParcel, error) {
pendingTransfersOnly bool) ([]*tapfreighter.OutboundParcel, error) {

var (
outboundParcels []*tapfreighter.OutboundParcel
Expand All @@ -3157,10 +3162,8 @@ func (a *AssetStore) QueryParcels(ctx context.Context,
}

transferQuery := TransferQuery{
// If we want unconfirmed transfers only, we set the
// UnconfOnly field to true.
UnconfOnly: unconfirmedTxOnly,
AnchorTxHash: anchorTxHashBytes,
AnchorTxHash: anchorTxHashBytes,
PendingTransfersOnly: sqlBool(pendingTransfersOnly),
}

// Query for asset transfers.
Expand Down Expand Up @@ -3210,9 +3213,22 @@ func (a *AssetStore) QueryParcels(ctx context.Context,
"anchor tx: %w", err)
}

// Marshal anchor tx block hash from the database to a
// Hash type.
var anchorTxBlockHash fn.Option[chainhash.Hash]
if len(dbT.AnchorTxBlockHash) > 0 {
var blockHash chainhash.Hash
copy(blockHash[:], dbT.AnchorTxBlockHash)

anchorTxBlockHash = fn.Some[chainhash.Hash](
blockHash,
)
}

parcel := &tapfreighter.OutboundParcel{
AnchorTx: anchorTx,
AnchorTxHeightHint: uint32(dbT.HeightHint),
AnchorTxBlockHash: anchorTxBlockHash,
TransferTime: dbT.TransferTimeUnix.UTC(),
ChainFees: dbAnchorTx.ChainFees,
Inputs: inputs,
Expand Down
Loading
Loading