diff --git a/chain_bridge.go b/chain_bridge.go index 244ee04c1..78b78dafd 100644 --- a/chain_bridge.go +++ b/chain_bridge.go @@ -200,6 +200,15 @@ func (l *LndRpcChainBridge) EstimateFee(ctx context.Context, return l.lnd.WalletKit.EstimateFeeRate(ctx, int32(confTarget)) } +// SubscribeCustomMessages creates a subscription to custom messages received +// from our peers. +func (l *LndRpcChainBridge) SubscribeCustomMessages( + ctx context.Context) (<-chan lndclient.CustomMessage, + <-chan error, error) { + + return l.lnd.Client.SubscribeCustomMessages(ctx) +} + // A compile time assertion to ensure LndRpcChainBridge meets the // tapgarden.ChainBridge interface. var _ tapgarden.ChainBridge = (*LndRpcChainBridge)(nil) diff --git a/config.go b/config.go index bb5e233a9..be58e48e6 100644 --- a/config.go +++ b/config.go @@ -10,6 +10,7 @@ import ( "github.com/lightninglabs/taproot-assets/address" "github.com/lightninglabs/taproot-assets/monitoring" "github.com/lightninglabs/taproot-assets/proof" + "github.com/lightninglabs/taproot-assets/rfq" "github.com/lightninglabs/taproot-assets/tapdb" "github.com/lightninglabs/taproot-assets/tapfreighter" "github.com/lightninglabs/taproot-assets/tapgarden" @@ -122,6 +123,8 @@ type Config struct { UniverseFederation *universe.FederationEnvoy + RfqManager *rfq.Manager + UniverseStats universe.Telemetry // UniversePublicAccess is flag which, If true, and the Universe server diff --git a/rfq/log.go b/rfq/log.go new file mode 100644 index 000000000..9a772bb88 --- /dev/null +++ b/rfq/log.go @@ -0,0 +1,26 @@ +package rfq + +import ( + "github.com/btcsuite/btclog" +) + +// Subsystem defines the logging code for this subsystem. +const Subsystem = "RFQ" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log = btclog.Disabled + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/rfq/rfq.go b/rfq/rfq.go new file mode 100644 index 000000000..e5a1f1f3c --- /dev/null +++ b/rfq/rfq.go @@ -0,0 +1,124 @@ +package rfq + +import ( + "context" + "fmt" + "sync" + + "github.com/lightninglabs/taproot-assets/fn" +) + +type ManagerCfg struct { + PeerMessagePorter PeerMessagePorter +} + +// Manager is a struct that handles RFQ order management. +type Manager struct { + startOnce sync.Once + stopOnce sync.Once + + cfg ManagerCfg + + rfqStreamHandle *StreamHandler + + // ContextGuard provides a wait group and main quit channel that can be + // used to create guarded contexts. + *fn.ContextGuard +} + +func NewManager(cfg ManagerCfg) (Manager, error) { + return Manager{ + cfg: cfg, + }, nil +} + +// Start attempts to start a new RFQ manager. +func (m *Manager) Start() error { + var startErr error + m.startOnce.Do(func() { + ctx, cancel := m.WithCtxQuitNoTimeout() + defer cancel() + + log.Info("Initializing RFQ subsystems") + err := m.initSubsystems(ctx) + if err != nil { + startErr = err + return + } + + // Start the manager's main event loop in a separate goroutine. + m.Wg.Add(1) + go func() { + defer m.Wg.Done() + + log.Info("Starting RFQ manager main event loop") + err = m.mainEventLoop() + if err != nil { + startErr = err + return + } + }() + }) + return startErr +} + +func (m *Manager) Stop() error { + var stopErr error + + m.stopOnce.Do(func() { + log.Info("Stopping RFQ manager") + + err := m.rfqStreamHandle.Stop() + if err != nil { + stopErr = fmt.Errorf("error stopping RFQ stream "+ + "handler: %w", err) + return + } + }) + + return stopErr +} + +func (m *Manager) initSubsystems(ctx context.Context) error { + var err error + + // Initialise the RFQ raw message stream handler and start it in a + // separate goroutine. + m.rfqStreamHandle, err = NewStreamHandler(ctx, m.cfg.PeerMessagePorter) + if err != nil { + return fmt.Errorf("failed to create RFQ stream handler: %w", + err) + } + + m.Wg.Add(1) + go func() { + defer m.Wg.Done() + + log.Info("Starting RFQ stream handler") + err = m.rfqStreamHandle.Start() + if err != nil { + return + } + }() + + return nil +} + +func (m *Manager) mainEventLoop() error { + for { + select { + // Handle RFQ message stream events. + case quoteReq := <-m.rfqStreamHandle.QuoteRequests.NewItemCreated.ChanOut(): + log.Debugf("Received RFQ quote request from message "+ + "stream handler: %v", quoteReq) + // TODO(ffranr): send to negotiator (+ price oracle) + + case errStream := <-m.rfqStreamHandle.ErrChan: + return fmt.Errorf("error received from RFQ stream "+ + "handler: %w", errStream) + + case <-m.Quit: + return nil + } + } +} diff --git a/rfq/stream.go b/rfq/stream.go new file mode 100644 index 000000000..81fbd469a --- /dev/null +++ b/rfq/stream.go @@ -0,0 +1,197 @@ +package rfq + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + + "github.com/lightninglabs/lndclient" + "github.com/lightninglabs/taproot-assets/fn" +) + +// TapMessageTypeBaseOffset is the taproot-assets specific message type +// identifier base offset. All tap messages will have a type identifier that is +// greater than this value. +// +// This offset was chosen as the concatenation of the alphabetical index +// positions of the letters "t" (20), "a" (1), and "p" (16). +const TapMessageTypeBaseOffset uint32 = 20116 + +var ( + // QuoteRequestMsgType is the message type identifier for a quote + // request message. + QuoteRequestMsgType = TapMessageTypeBaseOffset + 1 +) + +// QuoteRequest is a struct that represents a request for a quote (RFQ) from a +// peer. +type QuoteRequest struct { + RfqID [32]byte + AssetID [32]byte + AssetAmt uint64 + SuggestedRateTick uint64 +} + +// Encode serializes the QuoteRequest struct into a byte slice. +func (q *QuoteRequest) Encode() ([]byte, error) { + buf := new(bytes.Buffer) + + _, err := buf.Write(q.RfqID[:]) + if err != nil { + return nil, err + } + + _, err = buf.Write(q.AssetID[:]) + if err != nil { + return nil, err + } + + err = binary.Write(buf, binary.BigEndian, q.AssetAmt) + if err != nil { + return nil, err + } + + err = binary.Write(buf, binary.BigEndian, q.SuggestedRateTick) + if err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// Decode populates a QuoteRequest instance from a byte slice +func (q *QuoteRequest) Decode(data []byte) error { + if len(data) != 72 { + return fmt.Errorf("invalid data length") + } + + copy(q.RfqID[:], data[:32]) + copy(q.AssetID[:], data[32:64]) + q.AssetAmt = binary.BigEndian.Uint64(data[64:72]) + q.SuggestedRateTick = binary.BigEndian.Uint64(data[72:]) + + return nil +} + +// StreamHandler is a struct that handles incoming and outgoing peer RFQ stream +// messages. +// +// This component subscribes to incoming raw peer messages (custom messages). It +// processes those messages with the aim of extracting relevant request for +// quotes (RFQs). +type StreamHandler struct { + // recvRawMessages is a channel that receives incoming raw peer + // messages. + recvRawMessages <-chan lndclient.CustomMessage + + // errRecvRawMessages is a channel that receives errors emanating from + // the peer raw messages subscription. + errRecvRawMessages <-chan error + + // QuoteRequests is a channel which is populated with received + // (incoming) and processed requests for quote (RFQ) messages. + QuoteRequests *fn.EventReceiver[QuoteRequest] + + // ErrChan is the handle's error reporting channel. + ErrChan <-chan error + + // ContextGuard provides a wait group and main quit channel that can be + // used to create guarded contexts. + *fn.ContextGuard +} + +// NewStreamHandler creates a new RFQ stream handler. +func NewStreamHandler(ctx context.Context, + peerMessagePorter PeerMessagePorter) (*StreamHandler, error) { + + msgChan, errChan, err := peerMessagePorter.SubscribeCustomMessages(ctx) + if err != nil { + return nil, fmt.Errorf("failed to subscribe to "+ + "custom messages via message transfer handle: %w", err) + } + + return &StreamHandler{ + recvRawMessages: msgChan, + errRecvRawMessages: errChan, + + ErrChan: make(<-chan error), + }, nil +} + +// handleRecvRawMessage handles an incoming raw peer message. +func (h *StreamHandler) handleQuoteRequestMsg(msg lndclient.CustomMessage) error { + // Attempt to decode the message as a request for quote (RFQ) message. + var rfq QuoteRequest + err := rfq.Decode(msg.Data) + if err != nil { + return fmt.Errorf("unable to decode incoming RFQ message: %w", + err) + } + + // TODO(ffranr): Determine whether to keep or discard the RFQ message. + + // Send the decoded RFQ message to the RFQ manager. + //if !fn.SendOrQuit(p.exportReqs, req, p.Quit) { + // return nil, fmt.Errorf("ChainPorter shutting down") + //} + + return nil +} + +// handleRecvRawMessage handles an incoming raw peer message. +func (h *StreamHandler) handleRecvRawMessage( + msg lndclient.CustomMessage) error { + + switch msg.MsgType { + case QuoteRequestMsgType: + err := h.handleQuoteRequestMsg(msg) + if err != nil { + return fmt.Errorf("unable to handle incoming quote "+ + "request message: %w", err) + } + + default: + // Silently disregard irrelevant messages based on message type. + return nil + } + + return nil +} + +// Start starts the RFQ stream handler. +func (h *StreamHandler) Start() error { + for { + select { + case msg := <-h.recvRawMessages: + err := h.handleRecvRawMessage(msg) + if err != nil { + log.Warnf("Error handling raw custom "+ + "message recieve event: %v", err) + } + + case errSubCustomMessages := <-h.errRecvRawMessages: + // If we receive an error from the peer message + // subscription, we'll terminate the stream handler. + return fmt.Errorf("error received from RFQ stream "+ + "handler: %w", errSubCustomMessages) + + case <-h.Quit: + return nil + } + } +} + +// Stop stops the RFQ stream handler. +func (h *StreamHandler) Stop() error { + close(h.Quit) + return nil +} + +// PeerMessagePorter is an interface that abstracts the peer message transport +// layer. +type PeerMessagePorter interface { + SubscribeCustomMessages( + ctx context.Context) (<-chan lndclient.CustomMessage, + <-chan error, error) +} diff --git a/server.go b/server.go index 73d03af5b..65a3fe4cf 100644 --- a/server.go +++ b/server.go @@ -158,6 +158,11 @@ func (s *Server) initialize(interceptorChain *rpcperms.InterceptorChain) error { "federation: %v", err) } + // Start the request for quote (RFQ) manager. + if err := s.cfg.RfqManager.Start(); err != nil { + return fmt.Errorf("unable to start RFQ manager: %v", err) + } + if s.cfg.UniversePublicAccess { err := s.cfg.UniverseFederation.SetAllowPublicAccess() if err != nil { @@ -586,6 +591,10 @@ func (s *Server) Stop() error { return err } + if err := s.cfg.RfqManager.Stop(); err != nil { + return err + } + if s.macaroonService != nil { err := s.macaroonService.Stop() if err != nil { diff --git a/tapcfg/server.go b/tapcfg/server.go index 862de4653..d903f46a3 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -14,6 +14,7 @@ import ( "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/proof" + "github.com/lightninglabs/taproot-assets/rfq" "github.com/lightninglabs/taproot-assets/tapdb" "github.com/lightninglabs/taproot-assets/tapdb/sqlc" "github.com/lightninglabs/taproot-assets/tapfreighter" @@ -329,6 +330,15 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, multiNotifier := proof.NewMultiArchiveNotifier(assetStore, multiverse) + rfqManager, err := rfq.NewManager( + rfq.ManagerCfg{ + PeerMessagePorter: chainBridge, + }, + ) + if err != nil { + return nil, err + } + return &tap.Config{ DebugLevel: cfg.DebugLevel, RuntimeID: runtimeID, @@ -401,6 +411,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, UniversePublicAccess: cfg.Universe.PublicAccess, UniverseQueriesPerSecond: cfg.Universe.UniverseQueriesPerSecond, UniverseQueriesBurst: cfg.Universe.UniverseQueriesBurst, + RfqManager: &rfqManager, LogWriter: cfg.LogWriter, DatabaseConfig: &tap.DatabaseConfig{ RootKeyStore: tapdb.NewRootKeyStore(rksDB),