Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rfq]: add validation to AddAssetBuyOrder and AddAssetSellOrder RPCs #1192

Merged
merged 3 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 50 additions & 30 deletions itest/rfq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,35 @@ func testRfqAssetBuyHtlcIntercept(t *harnessTest) {
bidAmt := uint64(90000)
buyOrderExpiry := uint64(time.Now().Add(24 * time.Hour).Unix())

_, err = ts.CarolTapd.AddAssetBuyOrder(
ctxt, &rfqrpc.AddAssetBuyOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_AssetId{
AssetId: mintedAssetId,
},
// We first try to add a buy order without specifying the asset skip
// flag. That should result in an error, since we only have a normal
// channel and not an asset channel.
buyReq := &rfqrpc.AddAssetBuyOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_AssetId{
AssetId: mintedAssetId,
},
AssetMaxAmt: purchaseAssetAmt,
Expiry: buyOrderExpiry,
},
AssetMaxAmt: purchaseAssetAmt,
Expiry: buyOrderExpiry,

// Here we explicitly specify Bob as the destination
// peer for the buy order. This will prompt Carol's tapd
// node to send a request for quote message to Bob's
// node.
PeerPubKey: ts.BobLnd.PubKey[:],
// Here we explicitly specify Bob as the destination
// peer for the buy order. This will prompt Carol's tapd
// node to send a request for quote message to Bob's
// node.
PeerPubKey: ts.BobLnd.PubKey[:],

TimeoutSeconds: uint32(rfqTimeout.Seconds()),
},
TimeoutSeconds: uint32(rfqTimeout.Seconds()),
}
_, err = ts.AliceTapd.AddAssetBuyOrder(ctxt, buyReq)
require.ErrorContains(
t.t, err, "error checking peer channel: error checking asset "+
"channel",
)

// Now we set the skip flag and we shouldn't get an error anymore.
buyReq.SkipAssetChannelCheck = true
_, err = ts.CarolTapd.AddAssetBuyOrder(ctxt, buyReq)
require.NoError(t.t, err, "unable to upsert asset buy order")

// Wait until Carol receives an incoming quote accept message (sent from
Expand Down Expand Up @@ -266,25 +276,35 @@ func testRfqAssetSellHtlcIntercept(t *harnessTest) {
askAmt := uint64(42000)
sellOrderExpiry := uint64(time.Now().Add(24 * time.Hour).Unix())

_, err = ts.AliceTapd.AddAssetSellOrder(
ctxt, &rfqrpc.AddAssetSellOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_AssetId{
AssetId: mintedAssetIdBytes,
},
// We first try to add a sell order without specifying the asset skip
// flag. That should result in an error, since we only have a normal
// channel and not an asset channel.
sellReq := &rfqrpc.AddAssetSellOrderRequest{
AssetSpecifier: &rfqrpc.AssetSpecifier{
Id: &rfqrpc.AssetSpecifier_AssetId{
AssetId: mintedAssetIdBytes,
},
PaymentMaxAmt: askAmt,
Expiry: sellOrderExpiry,
},
PaymentMaxAmt: askAmt,
Expiry: sellOrderExpiry,

// Here we explicitly specify Bob as the destination
// peer for the sell order. This will prompt Alice's
// tapd node to send a request for quote message to
// Bob's node.
PeerPubKey: ts.BobLnd.PubKey[:],
// Here we explicitly specify Bob as the destination
// peer for the sell order. This will prompt Alice's
// tapd node to send a request for quote message to
// Bob's node.
PeerPubKey: ts.BobLnd.PubKey[:],

TimeoutSeconds: uint32(rfqTimeout.Seconds()),
},
TimeoutSeconds: uint32(rfqTimeout.Seconds()),
}
_, err = ts.AliceTapd.AddAssetSellOrder(ctxt, sellReq)
require.ErrorContains(
t.t, err, "error checking peer channel: error checking asset "+
"channel",
)

// Now we set the skip flag and we shouldn't get an error anymore.
sellReq.SkipAssetChannelCheck = true
_, err = ts.AliceTapd.AddAssetSellOrder(ctxt, sellReq)
require.NoError(t.t, err, "unable to upsert asset sell order")

// Wait until Alice receives an incoming sell quote accept message (sent
Expand Down
100 changes: 86 additions & 14 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6340,7 +6340,7 @@ func unmarshalAssetBuyOrder(

// AddAssetBuyOrder upserts a new buy order for the given asset into the RFQ
// manager. If the order already exists for the given asset, it will be updated.
func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
func (r *rpcServer) AddAssetBuyOrder(ctx context.Context,
req *rfqrpc.AddAssetBuyOrderRequest) (*rfqrpc.AddAssetBuyOrderResponse,
error) {

Expand All @@ -6354,13 +6354,24 @@ func (r *rpcServer) AddAssetBuyOrder(_ context.Context,
return nil, fmt.Errorf("error unmarshalling buy order: %w", err)
}

peerStr := fn.MapOptionZ(
buyOrder.Peer, func(peerVertex route.Vertex) string {
return peerVertex.String()
},
// Currently, we require the peer to be specified in the buy order.
peer, err := buyOrder.Peer.UnwrapOrErr(
fmt.Errorf("buy order peer must be specified"),
)
if err != nil {
return nil, err
}

// Check if we have a channel with the peer.
err = r.checkPeerChannel(
ctx, peer, buyOrder.AssetSpecifier, req.SkipAssetChannelCheck,
)
if err != nil {
return nil, fmt.Errorf("error checking peer channel: %w", err)
}

rpcsLog.Debugf("[AddAssetBuyOrder]: upserting buy order "+
"(dest_peer=%s)", peerStr)
"(dest_peer=%s)", peer.String())

// Register an event listener before actually inserting the order, so we
// definitely don't miss any responses.
Expand Down Expand Up @@ -6402,11 +6413,61 @@ func (r *rpcServer) AddAssetBuyOrder(_ context.Context,

case <-timeout:
return nil, fmt.Errorf("timeout waiting for response "+
"(peer=%s)", peerStr)
"(peer=%s)", peer.String())
}
}
}

// checkPeerChannel checks if there is a channel with the given peer. If the
// asset channel check is enabled, it will also check if there is a channel with
// the given asset with the peer.
func (r *rpcServer) checkPeerChannel(ctx context.Context, peer route.Vertex,
specifier asset.Specifier, skipAssetChannelCheck bool) error {

// We want to make sure there is at least a channel between us and the
// peer, otherwise RFQ negotiation doesn't make sense.
switch {
// For integration tests, we can't create asset channels, so we allow
// the asset channel check to be skipped. In this case we simply check
// that we have any channel with the peer.
case skipAssetChannelCheck:
activeChannels, err := r.cfg.Lnd.Client.ListChannels(
ctx, true, false,
)
if err != nil {
return fmt.Errorf("unable to fetch channels: %w", err)
}
peerChannels := fn.Filter(
activeChannels, func(c lndclient.ChannelInfo) bool {
return c.PubKeyBytes == peer
},
)
if len(peerChannels) == 0 {
return fmt.Errorf("no active channel found with peer "+
"%x", peer[:])
}

// For any other case, we'll want to make sure there is a channel with
// a non-zero balance of the given asset to carry the order.
default:
assetID, err := specifier.UnwrapIdOrErr()
if err != nil {
return fmt.Errorf("cannot check asset channel, " +
"missing asset ID")
}

// If we don't get an error here, it means we do have an asset
// channel with the peer.
_, err = r.rfqChannel(ctx, assetID, &peer)
if err != nil {
return fmt.Errorf("error checking asset channel: %w",
err)
}
}

return nil
}

// unmarshalAssetSellOrder unmarshals an asset sell order from the RPC form.
func unmarshalAssetSellOrder(
req *rfqrpc.AddAssetSellOrderRequest) (*rfq.SellOrder, error) {
Expand Down Expand Up @@ -6457,7 +6518,7 @@ func unmarshalAssetSellOrder(

// AddAssetSellOrder upserts a new sell order for the given asset into the RFQ
// manager. If the order already exists for the given asset, it will be updated.
func (r *rpcServer) AddAssetSellOrder(_ context.Context,
func (r *rpcServer) AddAssetSellOrder(ctx context.Context,
req *rfqrpc.AddAssetSellOrderRequest) (*rfqrpc.AddAssetSellOrderResponse,
error) {

Expand All @@ -6472,13 +6533,24 @@ func (r *rpcServer) AddAssetSellOrder(_ context.Context,
err)
}

// Extract peer identifier as a string for logging.
peerStr := fn.MapOptionZ(sellOrder.Peer, func(p route.Vertex) string {
return p.String()
})
// Currently, we require the peer to be specified in the buy order.
peer, err := sellOrder.Peer.UnwrapOrErr(
fmt.Errorf("sell order peer must be specified"),
)
if err != nil {
return nil, err
}

// Check if we have a channel with the peer.
err = r.checkPeerChannel(
ctx, peer, sellOrder.AssetSpecifier, req.SkipAssetChannelCheck,
)
if err != nil {
return nil, fmt.Errorf("error checking peer channel: %w", err)
}

rpcsLog.Debugf("[AddAssetSellOrder]: upserting sell order "+
"(dest_peer=%s)", peerStr)
"(dest_peer=%s)", peer.String())

// Register an event listener before actually inserting the order, so we
// definitely don't miss any responses.
Expand Down Expand Up @@ -6520,7 +6592,7 @@ func (r *rpcServer) AddAssetSellOrder(_ context.Context,

case <-timeout:
return nil, fmt.Errorf("timeout waiting for response "+
"from peer %s", peerStr)
"from peer %s", peer.String())
}
}
}
Expand Down
Loading
Loading