Skip to content

Commit

Permalink
WIP: RFQ stream handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ffranr committed Jan 16, 2024
1 parent c44a9cd commit 7d36f26
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 0 deletions.
9 changes: 9 additions & 0 deletions chain_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ func (l *LndRpcChainBridge) EstimateFee(ctx context.Context,
return l.lnd.WalletKit.EstimateFeeRate(ctx, int32(confTarget))
}

// SubscribeCustomMessages creates a subscription to custom messages received
// from our peers.
func (l *LndRpcChainBridge) SubscribeCustomMessages(
ctx context.Context) (<-chan lndclient.CustomMessage,
<-chan error, error) {

return l.lnd.Client.SubscribeCustomMessages(ctx)
}

// A compile time assertion to ensure LndRpcChainBridge meets the
// tapgarden.ChainBridge interface.
var _ tapgarden.ChainBridge = (*LndRpcChainBridge)(nil)
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lightninglabs/taproot-assets/address"
"github.com/lightninglabs/taproot-assets/monitoring"
"github.com/lightninglabs/taproot-assets/proof"
"github.com/lightninglabs/taproot-assets/rfq"
"github.com/lightninglabs/taproot-assets/tapdb"
"github.com/lightninglabs/taproot-assets/tapfreighter"
"github.com/lightninglabs/taproot-assets/tapgarden"
Expand Down Expand Up @@ -122,6 +123,8 @@ type Config struct {

UniverseFederation *universe.FederationEnvoy

RfqManager *rfq.Manager

UniverseStats universe.Telemetry

// UniversePublicAccess is flag which, If true, and the Universe server
Expand Down
26 changes: 26 additions & 0 deletions rfq/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package rfq

import (
"github.com/btcsuite/btclog"
)

// Subsystem defines the logging code for this subsystem.
const Subsystem = "RFQ"

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log = btclog.Disabled

// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
124 changes: 124 additions & 0 deletions rfq/rfq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package rfq

import (
"context"
"fmt"
"sync"

"github.com/lightninglabs/taproot-assets/fn"
)

type ManagerCfg struct {
PeerMessagePorter PeerMessagePorter
}

// Manager is a struct that handles RFQ order management.
type Manager struct {
startOnce sync.Once
stopOnce sync.Once

cfg ManagerCfg

rfqStreamHandle *StreamHandler

// ContextGuard provides a wait group and main quit channel that can be
// used to create guarded contexts.
*fn.ContextGuard
}

func NewManager(cfg ManagerCfg) (Manager, error) {
return Manager{
cfg: cfg,
}, nil
}

// Start attempts to start a new RFQ manager.
func (m *Manager) Start() error {
var startErr error
m.startOnce.Do(func() {
ctx, cancel := m.WithCtxQuitNoTimeout()
defer cancel()

log.Info("Initializing RFQ subsystems")
err := m.initSubsystems(ctx)
if err != nil {
startErr = err
return
}

// Start the manager's main event loop in a separate goroutine.
m.Wg.Add(1)
go func() {
defer m.Wg.Done()

log.Info("Starting RFQ manager main event loop")
err = m.mainEventLoop()
if err != nil {
startErr = err
return
}
}()
})
return startErr
}

func (m *Manager) Stop() error {
var stopErr error

m.stopOnce.Do(func() {
log.Info("Stopping RFQ manager")

err := m.rfqStreamHandle.Stop()
if err != nil {
stopErr = fmt.Errorf("error stopping RFQ stream "+
"handler: %w", err)
return
}
})

return stopErr
}

func (m *Manager) initSubsystems(ctx context.Context) error {
var err error

// Initialise the RFQ raw message stream handler and start it in a
// separate goroutine.
m.rfqStreamHandle, err = NewStreamHandler(ctx, m.cfg.PeerMessagePorter)
if err != nil {
return fmt.Errorf("failed to create RFQ stream handler: %w",
err)
}

m.Wg.Add(1)
go func() {
defer m.Wg.Done()

log.Info("Starting RFQ stream handler")
err = m.rfqStreamHandle.Start()
if err != nil {
return
}
}()

return nil
}

func (m *Manager) mainEventLoop() error {
for {
select {
// Handle RFQ message stream events.
case quoteReq := <-m.rfqStreamHandle.QuoteRequests.NewItemCreated.ChanOut():
log.Debugf("Received RFQ quote request from message "+
"stream handler: %v", quoteReq)
// TODO(ffranr): send to negotiator (+ price oracle)

case errStream := <-m.rfqStreamHandle.ErrChan:
return fmt.Errorf("error received from RFQ stream "+
"handler: %w", errStream)

case <-m.Quit:
return nil
}
}
}
197 changes: 197 additions & 0 deletions rfq/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package rfq

import (
"bytes"
"context"
"encoding/binary"
"fmt"

"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/taproot-assets/fn"
)

// TapMessageTypeBaseOffset is the taproot-assets specific message type
// identifier base offset. All tap messages will have a type identifier that is
// greater than this value.
//
// This offset was chosen as the concatenation of the alphabetical index
// positions of the letters "t" (20), "a" (1), and "p" (16).
const TapMessageTypeBaseOffset uint32 = 20116

var (
// QuoteRequestMsgType is the message type identifier for a quote
// request message.
QuoteRequestMsgType = TapMessageTypeBaseOffset + 1
)

// QuoteRequest is a struct that represents a request for a quote (RFQ) from a
// peer.
type QuoteRequest struct {
RfqID [32]byte
AssetID [32]byte
AssetAmt uint64
SuggestedRateTick uint64
}

// Encode serializes the QuoteRequest struct into a byte slice.
func (q *QuoteRequest) Encode() ([]byte, error) {
buf := new(bytes.Buffer)

_, err := buf.Write(q.RfqID[:])
if err != nil {
return nil, err
}

_, err = buf.Write(q.AssetID[:])
if err != nil {
return nil, err
}

err = binary.Write(buf, binary.BigEndian, q.AssetAmt)
if err != nil {
return nil, err
}

err = binary.Write(buf, binary.BigEndian, q.SuggestedRateTick)
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}

// Decode populates a QuoteRequest instance from a byte slice
func (q *QuoteRequest) Decode(data []byte) error {
if len(data) != 72 {
return fmt.Errorf("invalid data length")
}

copy(q.RfqID[:], data[:32])
copy(q.AssetID[:], data[32:64])
q.AssetAmt = binary.BigEndian.Uint64(data[64:72])
q.SuggestedRateTick = binary.BigEndian.Uint64(data[72:])

return nil
}

// StreamHandler is a struct that handles incoming and outgoing peer RFQ stream
// messages.
//
// This component subscribes to incoming raw peer messages (custom messages). It
// processes those messages with the aim of extracting relevant request for
// quotes (RFQs).
type StreamHandler struct {
// recvRawMessages is a channel that receives incoming raw peer
// messages.
recvRawMessages <-chan lndclient.CustomMessage

// errRecvRawMessages is a channel that receives errors emanating from
// the peer raw messages subscription.
errRecvRawMessages <-chan error

// QuoteRequests is a channel which is populated with received
// (incoming) and processed requests for quote (RFQ) messages.
QuoteRequests *fn.EventReceiver[QuoteRequest]

// ErrChan is the handle's error reporting channel.
ErrChan <-chan error

// ContextGuard provides a wait group and main quit channel that can be
// used to create guarded contexts.
*fn.ContextGuard
}

// NewStreamHandler creates a new RFQ stream handler.
func NewStreamHandler(ctx context.Context,
peerMessagePorter PeerMessagePorter) (*StreamHandler, error) {

msgChan, errChan, err := peerMessagePorter.SubscribeCustomMessages(ctx)
if err != nil {
return nil, fmt.Errorf("failed to subscribe to "+
"custom messages via message transfer handle: %w", err)
}

return &StreamHandler{
recvRawMessages: msgChan,
errRecvRawMessages: errChan,

ErrChan: make(<-chan error),
}, nil
}

// handleRecvRawMessage handles an incoming raw peer message.
func (h *StreamHandler) handleQuoteRequestMsg(msg lndclient.CustomMessage) error {
// Attempt to decode the message as a request for quote (RFQ) message.
var rfq QuoteRequest
err := rfq.Decode(msg.Data)
if err != nil {
return fmt.Errorf("unable to decode incoming RFQ message: %w",
err)
}

// TODO(ffranr): Determine whether to keep or discard the RFQ message.

// Send the decoded RFQ message to the RFQ manager.
//if !fn.SendOrQuit(p.exportReqs, req, p.Quit) {
// return nil, fmt.Errorf("ChainPorter shutting down")
//}

return nil
}

// handleRecvRawMessage handles an incoming raw peer message.
func (h *StreamHandler) handleRecvRawMessage(
msg lndclient.CustomMessage) error {

switch msg.MsgType {
case QuoteRequestMsgType:
err := h.handleQuoteRequestMsg(msg)
if err != nil {
return fmt.Errorf("unable to handle incoming quote "+
"request message: %w", err)
}

default:
// Silently disregard irrelevant messages based on message type.
return nil
}

return nil
}

// Start starts the RFQ stream handler.
func (h *StreamHandler) Start() error {
for {
select {
case msg := <-h.recvRawMessages:
err := h.handleRecvRawMessage(msg)
if err != nil {
log.Warnf("Error handling raw custom "+
"message recieve event: %v", err)
}

case errSubCustomMessages := <-h.errRecvRawMessages:
// If we receive an error from the peer message
// subscription, we'll terminate the stream handler.
return fmt.Errorf("error received from RFQ stream "+
"handler: %w", errSubCustomMessages)

case <-h.Quit:
return nil
}
}
}

// Stop stops the RFQ stream handler.
func (h *StreamHandler) Stop() error {
close(h.Quit)
return nil
}

// PeerMessagePorter is an interface that abstracts the peer message transport
// layer.
type PeerMessagePorter interface {
SubscribeCustomMessages(
ctx context.Context) (<-chan lndclient.CustomMessage,
<-chan error, error)
}
Loading

0 comments on commit 7d36f26

Please sign in to comment.