From 785232b2cf67ab31fe15f317297ce792c2b0cd1a Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 7 Jun 2024 15:48:22 +0200 Subject: [PATCH 1/8] aux_closer: skip amount allocation if no balance This commit fixes an issue with co-op closing channels where one side has no balance. --- tapchannel/aux_closer.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tapchannel/aux_closer.go b/tapchannel/aux_closer.go index 01345b171..affe412f4 100644 --- a/tapchannel/aux_closer.go +++ b/tapchannel/aux_closer.go @@ -258,7 +258,9 @@ func (a *AuxChanCloser) AuxCloseOutputs( // anchor amt, then we'll just drop this allocation, and modify // our asset allocation to match this value. if amtAfterAnchor <= o.DustLimit { - localAlloc.BtcAmount = btcAmt + if localAlloc != nil { + localAlloc.BtcAmount = btcAmt + } return } @@ -285,7 +287,10 @@ func (a *AuxChanCloser) AuxCloseOutputs( // anchor amt, then we'll just drop this allocation, and modify // our asset allocation to match this value. if amtAfterAnchor <= o.DustLimit { - remoteAlloc.BtcAmount = btcAmt + if remoteAlloc != nil { + remoteAlloc.BtcAmount = btcAmt + } + return } From c6d1e83d381bbbafbd526c9946fb1bf5db49a656 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 7 Jun 2024 15:48:54 +0200 Subject: [PATCH 2/8] tapchannelmsg: fix empty msg formatting, add pending chans This commit returns an empty string if there is no custom data in a channel. That prevents the lncli output from showing: "custom_channel_data": "{\"assets\":null}", And instead just shows: "custom_channel_data: "", At the same time, we add the previously forgotten pending channels to the calls of RPC methods that we want to format the response for. --- tapchannelmsg/custom_channel_data.go | 36 ++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tapchannelmsg/custom_channel_data.go b/tapchannelmsg/custom_channel_data.go index 8c40496a8..c1dbf00aa 100644 --- a/tapchannelmsg/custom_channel_data.go +++ b/tapchannelmsg/custom_channel_data.go @@ -61,6 +61,10 @@ type ChannelCustomData struct { // AsJson returns the JSON representation of the channel custom data. func (c *ChannelCustomData) AsJson() ([]byte, error) { + if len(c.OpenChan.Assets()) == 0 { + return []byte{}, nil + } + resp := &rfqmsg.JsonAssetChannel{} for _, output := range c.OpenChan.Assets() { a := output.Proof.Val.Asset @@ -126,6 +130,10 @@ type BalanceCustomData struct { // AsJson returns the JSON representation of the channel balance data. func (b *BalanceCustomData) AsJson() ([]byte, error) { + if len(b.OpenChannels) == 0 && len(b.PendingChannels) == 0 { + return []byte{}, nil + } + resp := &rfqmsg.JsonAssetChannelBalances{ OpenChannels: make(map[string]*rfqmsg.JsonAssetBalance), PendingChannels: make(map[string]*rfqmsg.JsonAssetBalance), @@ -336,6 +344,34 @@ func ParseCustomChannelData(msg proto.Message) error { "data to JSON: %w", err) } + case *lnrpc.PendingChannelsResponse: + for idx := range m.PendingOpenChannels { + pendingOpen := m.PendingOpenChannels[idx] + rpcChannel := pendingOpen.Channel + + if rpcChannel == nil { + continue + } + + if rpcChannel.CustomChannelData == nil { + continue + } + + channelData, err := ReadChannelCustomData( + rpcChannel.CustomChannelData, + ) + if err != nil { + return fmt.Errorf("error reading custom "+ + "channel data: %w", err) + } + + rpcChannel.CustomChannelData, err = channelData.AsJson() + if err != nil { + return fmt.Errorf("error converting custom "+ + "channel data to JSON: %w", err) + } + } + case *lnrpc.CloseStatusUpdate: closeUpd, ok := m.Update.(*lnrpc.CloseStatusUpdate_ChanClose) if !ok { From 51337e29d25fcad7088662965c4eadd8b6a47cb2 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 7 Jun 2024 15:51:34 +0200 Subject: [PATCH 3/8] rfq: add HasExpired method to policy --- rfq/order.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/rfq/order.go b/rfq/order.go index 7024d3616..d26d6a945 100644 --- a/rfq/order.go +++ b/rfq/order.go @@ -62,6 +62,9 @@ type Policy interface { // Expiry returns the policy's expiry time as a unix timestamp. Expiry() uint64 + // HasExpired returns true if the policy has expired. + HasExpired() bool + // Scid returns the serialised short channel ID (SCID) of the channel to // which the policy applies. Scid() uint64 @@ -141,6 +144,13 @@ func (c *AssetSalePolicy) Expiry() uint64 { return c.expiry } +// HasExpired returns true if the policy has expired. +func (c *AssetSalePolicy) HasExpired() bool { + expireTime := time.Unix(int64(c.expiry), 0).UTC() + + return time.Now().UTC().After(expireTime) +} + // Scid returns the serialised short channel ID (SCID) of the channel to which // the policy applies. func (c *AssetSalePolicy) Scid() uint64 { @@ -260,6 +270,13 @@ func (c *AssetPurchasePolicy) Expiry() uint64 { return c.expiry } +// HasExpired returns true if the policy has expired. +func (c *AssetPurchasePolicy) HasExpired() bool { + expireTime := time.Unix(int64(c.expiry), 0).UTC() + + return time.Now().UTC().After(expireTime) +} + // Scid returns the serialised short channel ID (SCID) of the channel to which // the policy applies. func (c *AssetPurchasePolicy) Scid() uint64 { @@ -536,11 +553,7 @@ func (h *OrderHandler) fetchPolicy(htlc lndclient.InterceptedHtlc) (Policy, policy := *foundPolicy scid := *foundScid - // If the policy has expired, return false and clear it from the cache. - expireTime := time.Unix(int64(policy.Expiry()), 0).UTC() - currentTime := time.Now().UTC() - - if currentTime.After(expireTime) { + if policy.HasExpired() { h.policies.Delete(scid) return nil, false, nil } @@ -555,10 +568,7 @@ func (h *OrderHandler) cleanupStalePolicies() { h.policies.ForEach( func(scid SerialisedScid, policy Policy) error { - expireTime := time.Unix(int64(policy.Expiry()), 0).UTC() - currentTime := time.Now().UTC() - - if currentTime.After(expireTime) { + if policy.HasExpired() { staleCounter++ h.policies.Delete(scid) } From a9f42b85f6a16d5a9ce0da0731a508e4ae75d544 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 7 Jun 2024 15:52:38 +0200 Subject: [PATCH 4/8] rfq: improve logging, small fixes --- rfq/order.go | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/rfq/order.go b/rfq/order.go index d26d6a945..f06619b10 100644 --- a/rfq/order.go +++ b/rfq/order.go @@ -352,7 +352,11 @@ func (h *OrderHandler) handleIncomingHtlc(_ context.Context, htlc lndclient.InterceptedHtlc) (*lndclient.InterceptedHtlcResponse, error) { - log.Debug("Handling incoming HTLC") + log.Debugf("Handling incoming HTLC, incoming channel ID: %v, "+ + "outgoing channel ID: %v (incoming amount: %v, outgoing "+ + "amount: %v)", htlc.IncomingCircuitKey.ChanID.ToUint64(), + htlc.OutgoingChannelID.ToUint64(), htlc.AmountInMsat, + htlc.AmountOutMsat) // Look up a policy for the HTLC. If a policy does not exist, we resume // the HTLC. This is because the HTLC may be relevant to another @@ -370,7 +374,8 @@ func (h *OrderHandler) handleIncomingHtlc(_ context.Context, }, nil } - log.Debugf("Fetched policy with SCID %v", policy.Scid()) + log.Debugf("Fetched policy with SCID %v of type %T", policy.Scid(), + policy) // At this point, we know that a policy exists and has not expired // whilst sitting in the local cache. We can now check that the HTLC @@ -386,8 +391,7 @@ func (h *OrderHandler) handleIncomingHtlc(_ context.Context, } log.Debug("HTLC complies with policy. Broadcasting accept event.") - acceptHtlcEvent := NewAcceptHtlcEvent(htlc, policy) - h.cfg.AcceptHtlcEvents <- acceptHtlcEvent + h.cfg.AcceptHtlcEvents <- NewAcceptHtlcEvent(htlc, policy) return policy.GenerateInterceptorResponse(htlc) } @@ -491,6 +495,17 @@ func (h *OrderHandler) RegisterAssetPurchasePolicy( func (h *OrderHandler) fetchPolicy(htlc lndclient.InterceptedHtlc) (Policy, bool, error) { + outScid := SerialisedScid(htlc.OutgoingChannelID.ToUint64()) + outPolicy, haveOutPolicy := h.policies.Load(outScid) + + inScid := SerialisedScid(htlc.IncomingCircuitKey.ChanID.ToUint64()) + inPolicy, haveInPolicy := h.policies.Load(inScid) + + log.Tracef("Have inbound policy: %v: %v", haveInPolicy, + spew.Sdump(inPolicy)) + log.Tracef("Have outbound policy: %v: %v", haveOutPolicy, + spew.Sdump(outPolicy)) + var ( foundPolicy *Policy foundScid *SerialisedScid @@ -525,24 +540,16 @@ func (h *OrderHandler) fetchPolicy(htlc lndclient.InterceptedHtlc) (Policy, // If no policy has been found so far, we attempt to look up a policy by // the outgoing channel SCID. - if foundPolicy == nil { - scid := SerialisedScid(htlc.OutgoingChannelID.ToUint64()) - policy, ok := h.policies.Load(scid) - if ok { - foundPolicy = &policy - foundScid = &scid - } + if foundPolicy == nil && haveOutPolicy { + foundPolicy = &outPolicy + foundScid = &outScid } // If no policy has been found so far, we attempt to look up a policy by // the incoming channel SCID. - if foundPolicy == nil { - scid := SerialisedScid(htlc.IncomingCircuitKey.ChanID.ToUint64()) - policy, ok := h.policies.Load(scid) - if ok { - foundPolicy = &policy - foundScid = &scid - } + if foundPolicy == nil && haveInPolicy { + foundPolicy = &inPolicy + foundScid = &inScid } // If no policy has been found, we return false. From 4f82b58c66b3f28d5d54dff4ba60437f7d762585 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 7 Jun 2024 15:54:08 +0200 Subject: [PATCH 5/8] rfq: round up inbound amount in policy check To make sure the lnd forwarding algorithm doesn't reject an incoming HTLC because due to the asset resolution it appears they aren't paying enough fees, we always add a single asset unit. This doesn't actually mean we're sending an additional unit every time, it just means we might accept an HTLC even if it pays a couple of milli-satoshis too little fees. This can be remediated by increasing the asset resolution/precision. --- rfq/order.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/rfq/order.go b/rfq/order.go index f06619b10..613c3d5c0 100644 --- a/rfq/order.go +++ b/rfq/order.go @@ -285,10 +285,24 @@ func (c *AssetPurchasePolicy) Scid() uint64 { // GenerateInterceptorResponse generates an interceptor response for the policy. func (c *AssetPurchasePolicy) GenerateInterceptorResponse( - _ lndclient.InterceptedHtlc) (*lndclient.InterceptedHtlcResponse, + htlc lndclient.InterceptedHtlc) (*lndclient.InterceptedHtlcResponse, error) { - incomingValue := lnwire.MilliSatoshi(c.AssetAmount) * c.BidPrice + htlcRecord, err := parseHtlcCustomRecords(htlc.WireCustomRecords) + if err != nil { + return nil, fmt.Errorf("parsing HTLC custom records failed: %w", + err) + } + + // The incoming amount is just to signal to the fee logic in lnd that + // we have received enough to pay for the routing fees and the asset + // amount. Due to rounding errors, we may slightly underreport the + // incoming value of the asset. So we increase it by exactly one asset + // unit to ensure that the fee logic in lnd does not reject the HTLC. + const roundingCorrection = 1 + htlcAssetAmount := htlcRecord.Amounts.Val.Sum() + roundingCorrection + incomingValue := lnwire.MilliSatoshi(htlcAssetAmount) * c.BidPrice + return &lndclient.InterceptedHtlcResponse{ Action: lndclient.InterceptorActionResumeModified, IncomingAmount: incomingValue, From b923d6159166cb1ab149eed64720be3ec96b7742 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 7 Jun 2024 15:56:40 +0200 Subject: [PATCH 6/8] rfq: add pure asset forwarding policy If a payment is coming in through one asset channel and is supposed to leave through another asset channel, we don't actually have to do any conversion to BTC but instead to the outgoing asset directly (this would be a single hop payment with asset channels on both sides). To allow that forwarding mode, we add a new policy that contains both an incoming and outgoing channel policy. --- rfq/order.go | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/rfq/order.go b/rfq/order.go index 613c3d5c0..66fe43afd 100644 --- a/rfq/order.go +++ b/rfq/order.go @@ -312,6 +312,118 @@ func (c *AssetPurchasePolicy) GenerateInterceptorResponse( // Ensure that AssetPurchasePolicy implements the Policy interface. var _ Policy = (*AssetPurchasePolicy)(nil) +// AssetForwardPolicy is a struct that holds the terms which determine whether a +// channel HTLC for an asset-to-asset forward is accepted or rejected. +type AssetForwardPolicy struct { + incomingPolicy *AssetPurchasePolicy + outgoingPolicy *AssetSalePolicy +} + +// NewAssetForwardPolicy creates a new asset forward policy. +func NewAssetForwardPolicy(incoming, outgoing Policy) (*AssetForwardPolicy, + error) { + + incomingPolicy, ok := incoming.(*AssetPurchasePolicy) + if !ok { + return nil, fmt.Errorf("incoming policy is not an asset "+ + "purchase policy, but %T", incoming) + } + + outgoingPolicy, ok := outgoing.(*AssetSalePolicy) + if !ok { + return nil, fmt.Errorf("outgoing policy is not an asset "+ + "sale policy, but %T", outgoing) + } + + return &AssetForwardPolicy{ + incomingPolicy: incomingPolicy, + outgoingPolicy: outgoingPolicy, + }, nil +} + +// CheckHtlcCompliance returns an error if the given HTLC intercept descriptor +// does not satisfy the subject policy. +func (a *AssetForwardPolicy) CheckHtlcCompliance( + htlc lndclient.InterceptedHtlc) error { + + if err := a.incomingPolicy.CheckHtlcCompliance(htlc); err != nil { + return fmt.Errorf("error checking forward policy, inbound "+ + "HTLC does not comply with policy: %w", err) + } + + if err := a.outgoingPolicy.CheckHtlcCompliance(htlc); err != nil { + return fmt.Errorf("error checking forward policy, outbound "+ + "HTLC does not comply with policy: %w", err) + } + + return nil +} + +// Expiry returns the policy's expiry time as a unix timestamp in seconds. The +// returned expiry time is the earliest expiry time of the incoming and outgoing +// policies. +func (a *AssetForwardPolicy) Expiry() uint64 { + if a.incomingPolicy.Expiry() < a.outgoingPolicy.Expiry() { + return a.incomingPolicy.Expiry() + } + + return a.outgoingPolicy.Expiry() +} + +// HasExpired returns true if the policy has expired. +func (a *AssetForwardPolicy) HasExpired() bool { + expireTime := time.Unix(int64(a.Expiry()), 0).UTC() + + return time.Now().UTC().After(expireTime) +} + +// Scid returns the serialised short channel ID (SCID) of the channel to which +// the policy applies. This is the SCID of the incoming policy. +func (a *AssetForwardPolicy) Scid() uint64 { + return a.incomingPolicy.Scid() +} + +// GenerateInterceptorResponse generates an interceptor response for the policy. +func (a *AssetForwardPolicy) GenerateInterceptorResponse( + htlc lndclient.InterceptedHtlc) (*lndclient.InterceptedHtlcResponse, + error) { + + incomingResponse, err := a.incomingPolicy.GenerateInterceptorResponse( + htlc, + ) + if err != nil { + return nil, fmt.Errorf("error generating incoming interceptor "+ + "response: %w", err) + } + + outgoingResponse, err := a.outgoingPolicy.GenerateInterceptorResponse( + htlc, + ) + if err != nil { + return nil, fmt.Errorf("error generating outgoing interceptor "+ + "response: %w", err) + } + + return &lndclient.InterceptedHtlcResponse{ + // Both incoming and outgoing policies will resume with + // modifications. + Action: lndclient.InterceptorActionResumeModified, + + // The incoming policy will modify the incoming amount in order + // to satisfy the fee check in `lnd`. + IncomingAmount: incomingResponse.IncomingAmount, + + // The outgoing policy will modify the outgoing amount and add + // custom records in order to satisfy the terms of the receiving + // node. + OutgoingAmount: outgoingResponse.OutgoingAmount, + CustomRecords: outgoingResponse.CustomRecords, + }, nil +} + +// Ensure that AssetForwardPolicy implements the Policy interface. +var _ Policy = (*AssetForwardPolicy)(nil) + // OrderHandlerCfg is a struct that holds the configuration parameters for the // order handler service. type OrderHandlerCfg struct { @@ -552,6 +664,39 @@ func (h *OrderHandler) fetchPolicy(htlc lndclient.InterceptedHtlc) (Policy, }) } + // Here we handle a special case where we both have an incoming and + // outgoing policy. In this case, we need to create a forward policy. + if foundPolicy != nil && haveOutPolicy { + incomingPolicy := *foundPolicy + outgoingPolicy := outPolicy + + if incomingPolicy.HasExpired() { + scid := incomingPolicy.Scid() + h.policies.Delete(SerialisedScid(scid)) + } + if outgoingPolicy.HasExpired() { + scid := outgoingPolicy.Scid() + h.policies.Delete(SerialisedScid(scid)) + } + + // If either the incoming or outgoing policy has expired, we + // return false, as if we didn't find a policy. + if incomingPolicy.HasExpired() || outgoingPolicy.HasExpired() { + return nil, false, nil + } + + forwardPolicy, err := NewAssetForwardPolicy( + incomingPolicy, outgoingPolicy, + ) + if err != nil { + return nil, false, fmt.Errorf("error creating forward "+ + "policy: %w", err) + } + + return forwardPolicy, true, nil + + } + // If no policy has been found so far, we attempt to look up a policy by // the outgoing channel SCID. if foundPolicy == nil && haveOutPolicy { From 2d16c8afabf5a8eab09492eb462f72828c33c5cc Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 7 Jun 2024 18:04:33 +0200 Subject: [PATCH 7/8] config: allow empty config --- config.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/config.go b/config.go index f50f3bab5..08166d68c 100644 --- a/config.go +++ b/config.go @@ -127,6 +127,9 @@ func ParseUniversePublicAccessStatus( case "w": return UniversePublicAccessStatusWrite, nil + case "": + return UniversePublicAccessStatusNone, nil + default: // This default case returns an error. It will capture the case // where the CLI argument is present but unset (empty value). From 080f22c1f866c9dd525205b3fb2ec2e987389e08 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Fri, 7 Jun 2024 18:21:12 +0200 Subject: [PATCH 8/8] rfq: add missing Godoc comments --- rfqmsg/accept.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rfqmsg/accept.go b/rfqmsg/accept.go index d81e17fb6..9fd68ec0e 100644 --- a/rfqmsg/accept.go +++ b/rfqmsg/accept.go @@ -42,8 +42,12 @@ type acceptWireMsgData struct { // Sig is a signature over the serialized contents of the message. Sig tlv.RecordT[tlv.TlvType3, [64]byte] + // InOutRateTick is the tick rate for the accept, defined in + // in_asset/out_asset. This is only set in a buy accept message. InOutRateTick acceptInOutRateTick + // OutInRateTick is the tick rate for the accept, defined in + // out_asset/in_asset. This is only set in a sell accept message. OutInRateTick acceptOutInRateTick }