Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ffranr committed Feb 19, 2024
1 parent aed3714 commit 4b63ddd
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 92 deletions.
35 changes: 14 additions & 21 deletions rfq/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ type ManagerCfg struct {
// PriceOracle is the price oracle that the RFQ manager will use to
// determine whether a quote is accepted or rejected.
PriceOracle PriceOracle

// LightningSelfId is the public key of the lightning node that the RFQ
// manager is associated with.
//
// TODO(ffranr): The tapd node was receiving wire messages that it sent.
// This is a temporary fix to prevent the node from processing its own
// messages.
LightningSelfId route.Vertex
}

// Manager is a struct that manages the request for quote (RFQ) system.
Expand Down Expand Up @@ -144,12 +136,11 @@ func (m *Manager) startSubsystems(ctx context.Context) error {
}

// Initialise and start the peer message stream handler.
streamHandlerCfg := StreamHandlerCfg{
PeerMessagePorter: m.cfg.PeerMessagePorter,
LightningSelfId: m.cfg.LightningSelfId,
}
m.streamHandler, err = NewStreamHandler(
ctx, streamHandlerCfg, m.incomingMessages,
ctx, StreamHandlerCfg{
PeerMessagePorter: m.cfg.PeerMessagePorter,
IncomingMessages: m.incomingMessages,
},
)
if err != nil {
return fmt.Errorf("error initializing RFQ subsystem service: "+
Expand All @@ -162,10 +153,12 @@ func (m *Manager) startSubsystems(ctx context.Context) error {
}

// Initialise and start the quote negotiator.
negotiatorCfg := NegotiatorCfg{
PriceOracle: m.cfg.PriceOracle,
}
m.negotiator, err = NewNegotiator(negotiatorCfg, m.outgoingMessages)
m.negotiator, err = NewNegotiator(
NegotiatorCfg{
PriceOracle: m.cfg.PriceOracle,
OutgoingMessages: m.outgoingMessages,
},
)
if err != nil {
return fmt.Errorf("error initializing RFQ negotiator: %w",
err)
Expand Down Expand Up @@ -324,8 +317,8 @@ func (m *Manager) mainEventLoop() {
case incomingMsg := <-m.incomingMessages:
peer := incomingMsg.MsgPeer()
log.Debugf("Manager handling incoming message "+
"(msg_type=%T, origin_peer=%s)",
incomingMsg, peer)
"(msg_type=%T, origin_peer=%s)", incomingMsg,
peer)

err := m.handleIncomingMessage(incomingMsg)
if err != nil {
Expand All @@ -337,8 +330,8 @@ func (m *Manager) mainEventLoop() {
case outgoingMsg := <-m.outgoingMessages:
peer := outgoingMsg.MsgPeer()
log.Debugf("Manager handling outgoing message "+
"(msg_type=%T, dest_peer=%s)",
outgoingMsg, peer.String())
"(msg_type=%T, dest_peer=%s)", outgoingMsg,
peer.String())

err := m.handleOutgoingMessage(outgoingMsg)
if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions rfq/negotiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type NegotiatorCfg struct {
// PriceOracle is the price oracle that the negotiator will use to
// determine whether a quote is accepted or rejected.
PriceOracle PriceOracle

// OutgoingMessages is a channel which is populated with outgoing peer
// messages.
OutgoingMessages chan<- rfqmsg.OutgoingMsg
}

// Negotiator is a struct that handles the negotiation of quotes. It is a RFQ
Expand All @@ -28,10 +32,6 @@ type Negotiator struct {
// cfg holds the configuration parameters for the negotiator.
cfg NegotiatorCfg

// outgoingMessages is a channel which is populated with outgoing peer
// messages.
outgoingMessages chan<- rfqmsg.OutgoingMsg

// assetSellOffers is a map (keyed on asset ID) that holds asset sell
// offers.
assetSellOffers map[string]SellOffer
Expand All @@ -50,13 +50,10 @@ type Negotiator struct {
}

// NewNegotiator creates a new quote negotiator.
func NewNegotiator(cfg NegotiatorCfg,
outgoingMessages chan<- rfqmsg.OutgoingMsg) (*Negotiator, error) {

func NewNegotiator(cfg NegotiatorCfg) (*Negotiator, error) {
return &Negotiator{
cfg: cfg,

outgoingMessages: outgoingMessages,
assetSellOffers: make(map[string]SellOffer),
assetGroupSellOffers: make(map[string]SellOffer),

Expand All @@ -68,9 +65,8 @@ func NewNegotiator(cfg NegotiatorCfg,
}, nil
}

// queryBidFromPriceOracle queries the price oracle for a bid price. It
// returns an appropriate outgoing response message which should be sent to the
// peer.
// queryBidFromPriceOracle queries the price oracle for a bid price. It returns
// an appropriate outgoing response message which should be sent to the peer.
func (n *Negotiator) queryBidFromPriceOracle(peer route.Vertex,
assetId *asset.ID, assetGroupKey *btcec.PublicKey,
assetAmount uint64) (rfqmsg.OutgoingMsg, error) {
Expand Down Expand Up @@ -137,7 +133,7 @@ func (n *Negotiator) RequestQuote(buyOrder BuyOrder) error {

// Send the response message to the outgoing messages channel.
sendSuccess := fn.SendOrQuit(
n.outgoingMessages, outgoingMsg, n.Quit,
n.cfg.OutgoingMessages, outgoingMsg, n.Quit,
)
if !sendSuccess {
err := fmt.Errorf("negotiator failed to add quote " +
Expand Down Expand Up @@ -205,7 +201,9 @@ func (n *Negotiator) HandleIncomingQuoteRequest(request rfqmsg.Request) error {
)
var msg rfqmsg.OutgoingMsg = &rejectMsg

sendSuccess := fn.SendOrQuit(n.outgoingMessages, msg, n.Quit)
sendSuccess := fn.SendOrQuit(
n.cfg.OutgoingMessages, msg, n.Quit,
)
if !sendSuccess {
return fmt.Errorf("negotiator failed to send reject " +
"message")
Expand All @@ -222,7 +220,9 @@ func (n *Negotiator) HandleIncomingQuoteRequest(request rfqmsg.Request) error {
)
var msg rfqmsg.OutgoingMsg = &rejectMsg

sendSuccess := fn.SendOrQuit(n.outgoingMessages, msg, n.Quit)
sendSuccess := fn.SendOrQuit(
n.cfg.OutgoingMessages, msg, n.Quit,
)
if !sendSuccess {
return fmt.Errorf("negotiator failed to send reject " +
"message")
Expand All @@ -248,7 +248,7 @@ func (n *Negotiator) HandleIncomingQuoteRequest(request rfqmsg.Request) error {

// Send the response message to the outgoing messages channel.
sendSuccess := fn.SendOrQuit(
n.outgoingMessages, outgoingMsgResponse, n.Quit,
n.cfg.OutgoingMessages, outgoingMsgResponse, n.Quit,
)
if !sendSuccess {
err = fmt.Errorf("negotiator failed to add message "+
Expand Down
6 changes: 4 additions & 2 deletions rfq/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func NewOrderHandler(cfg OrderHandlerCfg) (*OrderHandler, error) {

// handleIncomingHtlc handles an incoming HTLC.
//
// NOTE: This function must be thread safe.
// NOTE: This function must be thread safe. It is used by an external
// interceptor service.
func (h *OrderHandler) handleIncomingHtlc(_ context.Context,
htlc lndclient.InterceptedHtlc) (*lndclient.InterceptedHtlcResponse,
error) {
Expand Down Expand Up @@ -160,7 +161,8 @@ func (h *OrderHandler) handleIncomingHtlc(_ context.Context,
}, nil
}

log.Debug("HTLC complies with channel remit.")
log.Debug("HTLC complies with channel remit. Broadcasting accept " +
"event.")
acceptHtlcEvent := NewAcceptHtlcEvent(htlc, *channelRemit)
h.cfg.AcceptHtlcEvents <- acceptHtlcEvent

Expand Down
58 changes: 11 additions & 47 deletions rfq/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/taproot-assets/fn"
"github.com/lightninglabs/taproot-assets/rfqmsg"
"github.com/lightningnetwork/lnd/routing/route"
)

// StreamHandlerCfg is a struct that holds the configuration parameters for the
Expand All @@ -19,13 +18,10 @@ type StreamHandlerCfg struct {
// peer messages.
PeerMessagePorter PeerMessagePorter

// LightningSelfId is the public key of the lightning node that the RFQ
// manager is associated with.
//
// TODO(ffranr): The tapd node was receiving wire messages that it sent.
// This is a temporary fix to prevent the node from processing its own
// messages.
LightningSelfId route.Vertex
// IncomingMessages is a channel which is populated with incoming
// (received) RFQ messages. These messages have been extracted from the
// raw peer wire messages by the stream handler service.
IncomingMessages chan<- rfqmsg.IncomingMsg
}

// StreamHandler is a struct that handles incoming and outgoing peer RFQ stream
Expand All @@ -50,15 +46,6 @@ type StreamHandler struct {
// the peer raw messages subscription.
errRecvRawMessages <-chan error

// incomingMessages is a channel which is populated with incoming
// (received) RFQ messages. These messages have been extracted from the
// raw peer wire messages by the stream handler service.
incomingMessages chan<- rfqmsg.IncomingMsg

seenMessages map[string]struct{}

seenMessagesMtx sync.Mutex

// ContextGuard provides a wait group and main quit channel that can be
// used to create guarded contexts.
*fn.ContextGuard
Expand All @@ -68,8 +55,8 @@ type StreamHandler struct {
//
// TODO(ffranr): Pass in a signer so that we can create a signature over output
// message fields.
func NewStreamHandler(ctx context.Context, cfg StreamHandlerCfg,
incomingMessages chan<- rfqmsg.IncomingMsg) (*StreamHandler, error) {
func NewStreamHandler(ctx context.Context,
cfg StreamHandlerCfg) (*StreamHandler, error) {

pPorter := cfg.PeerMessagePorter
msgChan, peerMsgErrChan, err := pPorter.SubscribeCustomMessages(ctx)
Expand All @@ -84,10 +71,6 @@ func NewStreamHandler(ctx context.Context, cfg StreamHandlerCfg,
recvRawMessages: msgChan,
errRecvRawMessages: peerMsgErrChan,

incomingMessages: incomingMessages,

seenMessages: make(map[string]struct{}),

ContextGuard: &fn.ContextGuard{
DefaultTimeout: DefaultTimeout,
Quit: make(chan struct{}),
Expand All @@ -105,32 +88,13 @@ func (h *StreamHandler) handleIncomingQuoteRequest(
"from a wire message: %w", err)
}

if quoteRequest.Peer == h.cfg.LightningSelfId {
return nil
}

// If we have already seen this message, we will ignore it.
//
// TODO(ffranr): Why do messages get sent twice?
h.seenMessagesMtx.Lock()

if _, ok := h.seenMessages[quoteRequest.ID.String()]; ok {
return nil
}

// Mark the message as seen.
h.seenMessages[quoteRequest.ID.String()] = struct{}{}

h.seenMessagesMtx.Unlock()

log.Debugf("Stream handling incoming message (msg_type=%T, "+
"msg_id=%s, origin_peer=%s, self=%s)", quoteRequest,
quoteRequest.ID.String(), quoteRequest.MsgPeer(),
h.cfg.LightningSelfId)
"msg_id=%s, origin_peer=%s)", quoteRequest,
quoteRequest.ID.String(), quoteRequest.MsgPeer())

// Send the quote request to the RFQ manager.
var msg rfqmsg.IncomingMsg = quoteRequest
sendSuccess := fn.SendOrQuit(h.incomingMessages, msg, h.Quit)
sendSuccess := fn.SendOrQuit(h.cfg.IncomingMessages, msg, h.Quit)
if !sendSuccess {
return fmt.Errorf("RFQ stream handler shutting down")
}
Expand All @@ -157,7 +121,7 @@ func (h *StreamHandler) handleIncomingQuoteAccept(

// Send the message to the RFQ manager.
var msg rfqmsg.IncomingMsg = quoteAccept
sendSuccess := fn.SendOrQuit(h.incomingMessages, msg, h.Quit)
sendSuccess := fn.SendOrQuit(h.cfg.IncomingMessages, msg, h.Quit)
if !sendSuccess {
return fmt.Errorf("RFQ stream handler shutting down")
}
Expand All @@ -184,7 +148,7 @@ func (h *StreamHandler) handleIncomingQuoteReject(

// Send the message to the RFQ manager.
var msg rfqmsg.IncomingMsg = quoteReject
sendSuccess := fn.SendOrQuit(h.incomingMessages, msg, h.Quit)
sendSuccess := fn.SendOrQuit(h.cfg.IncomingMessages, msg, h.Quit)
if !sendSuccess {
return fmt.Errorf("RFQ stream handler shutting down")
}
Expand Down
7 changes: 0 additions & 7 deletions tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,18 +332,11 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,

// Construct the RFQ manager.
priceOracle := rfq.NewMockPriceOracle(3600)

lndInfo, err := chainBridge.GetInfo(context.Background())
if err != nil {
return nil, fmt.Errorf("unable to get lnd info: %v", err)
}

rfqManager, err := rfq.NewManager(
rfq.ManagerCfg{
PeerMessagePorter: chainBridge,
HtlcInterceptor: chainBridge,
PriceOracle: priceOracle,
LightningSelfId: lndInfo.IdentityPubkey,
},
)
if err != nil {
Expand Down

0 comments on commit 4b63ddd

Please sign in to comment.