Skip to content

Commit

Permalink
pass pointer to zmq chan
Browse files Browse the repository at this point in the history
  • Loading branch information
gusin13 committed Oct 29, 2022
1 parent b4b23e2 commit 5afdeec
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 93 deletions.
2 changes: 1 addition & 1 deletion btcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Client struct {
// channel for notifying new BTC blocks to reporter
BlockEventChan chan *types.BlockEvent

SeqMsgChan chan zmq.SequenceMsg
ZMQSequenceMsgChan chan *zmq.SequenceMsg
}

func (c *Client) Stop() {
Expand Down
11 changes: 3 additions & 8 deletions btcclient/client_block_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,17 @@ func NewWithZMQSubscriber(cfg *config.BTCConfig, retrySleepTime, maxRetrySleepTi

client.Client = rpcClient

bc, err := zmq.New(zmq.Config{
RpcAddress: "localhost:18443",
RpcUser: "rpcuser",
RpcPassword: "rpcpass",
ZmqPubAddress: "tcp://127.0.0.1:29000",
})
zmqClient, err := zmq.New(cfg.ZMQPubAddress, cfg.ZMQSubChannelBufferSize)
if err != nil {
return nil, err
}

ch, _, err := bc.SubscribeSequence()
ch, _, err := zmqClient.SubscribeSequence()
if err != nil {
return nil, err
}

client.SeqMsgChan = ch
client.ZMQSequenceMsgChan = ch
log.Info("Successfully created the BTC client and connected to the BTC server")

return client, nil
Expand Down
41 changes: 27 additions & 14 deletions config/bitcoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ import (

// BTCConfig defines configuration for the Bitcoin client
type BTCConfig struct {
DisableClientTLS bool `mapstructure:"no-client-tls"`
CAFile string `mapstructure:"ca-file"`
Endpoint string `mapstructure:"endpoint"`
WalletEndpoint string `mapstructure:"wallet-endpoint"`
WalletPassword string `mapstructure:"wallet-password"`
WalletName string `mapstructure:"wallet-name"`
WalletCAFile string `mapstructure:"wallet-ca-file"`
WalletLockTime int64 `mapstructure:"wallet-lock-time"` // time duration in which the wallet remains unlocked, in seconds
TxFee btcutil.Amount `mapstructure:"tx-fee"` // BTC tx fee, in BTC
NetParams string `mapstructure:"net-params"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
ReconnectAttempts int `mapstructure:"reconnect-attempts"`
EnableZMQ bool `mapstructure:"enable-ZMQ"` // enable ZMQ for block notifications
DisableClientTLS bool `mapstructure:"no-client-tls"`
CAFile string `mapstructure:"ca-file"`
Endpoint string `mapstructure:"endpoint"`
WalletEndpoint string `mapstructure:"wallet-endpoint"`
WalletPassword string `mapstructure:"wallet-password"`
WalletName string `mapstructure:"wallet-name"`
WalletCAFile string `mapstructure:"wallet-ca-file"`
WalletLockTime int64 `mapstructure:"wallet-lock-time"` // time duration in which the wallet remains unlocked, in seconds
TxFee btcutil.Amount `mapstructure:"tx-fee"` // BTC tx fee, in BTC
NetParams string `mapstructure:"net-params"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
ReconnectAttempts int `mapstructure:"reconnect-attempts"`
EnableZMQ bool `mapstructure:"enable-ZMQ"` // enable ZMQ for block notifications
ZMQPubAddress string `mapstructure:"ZMQ-pub-address"` // ZMQ publisher address
ZMQSubChannelBufferSize int `mapstructure:"ZMQ-sub-channel-buffer-size"` // ZMQ subscriber channel buffer size
}

func (cfg *BTCConfig) Validate() error {
Expand All @@ -33,6 +35,17 @@ func (cfg *BTCConfig) Validate() error {
if _, ok := types.GetValidNetParams()[cfg.NetParams]; !ok {
return errors.New("invalid net params")
}

if cfg.EnableZMQ {
if cfg.ZMQPubAddress == "" {
return errors.New("ZMQ publisher address must be set")
}

if cfg.ZMQSubChannelBufferSize < 0 {
return errors.New("ZMQ subscriber channel buffer size must be non-negative")
}
}

return nil
}

Expand Down
100 changes: 37 additions & 63 deletions zmq/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,11 @@ import (
"github.com/pebbe/zmq4"
)

const DefaultSubChannelBufferSize = 2

var (
ErrSubscribeDisabled = errors.New("subscribe disabled (ZmqPubAddress was not set)")
ErrSubscribeExited = errors.New("subscription backend has exited")
)

type Config struct {
// ZmqPubAddress is the public address that the zmq4 instance uses for zmqpub,
// corresponding to what is set when starting zmq4 through one or multiple of:
// {-zmqpubhashtx=address -zmqpubhashblock=address -zmqpubrawblock=address -zmqpubrawtx=address -zmqpubsequence=address}.
// Only a single address is supported in this client. Either use the same address for
// all desired topics when starting zmq4, or create a seperate client for each address.
//
// Example: "tcp://8.8.8.8:1234"
// More examples at: https://github.com/bitcoin/bitcoin/blob/master/doc/zmq.md (the host part in those examples
// are local IPs and should be replaced with public IPs here on the client side)
//
// If ZmqPubAddress is not set then the Subscribe functions will return ErrSubscribeDisabled when called.
ZmqPubAddress string

// SubChannelBufferSize sets the number of entries that a subscription channel can hold
// before dropping entries, if it is not drained fast enough.
// If not set (or set to zero) then defaults to DefaultSubChannelBufferSize.
SubChannelBufferSize int
}

// Client is a client that provides methods for interacting with zmq4.
// Must be created with New and destroyed with Close.
//
Expand All @@ -44,7 +22,8 @@ type Client struct {
wg sync.WaitGroup
quit chan struct{}

Cfg Config
zmqPubAddress string
subChannelBufferSize int

// ZMQ subscription related things.
zctx *zmq4.Context
Expand All @@ -62,52 +41,47 @@ type Client struct {
// will disable the Subscribe methods.
// New does not try using the RPC connection and can't detect if the ZMQ connection works,
// you need to call Ready in order to check connection health.
func New() (*Client, error) {
func New(zmqPubAddress string, subChannelBufferSize int) (*Client, error) {
bc := &Client{
quit: make(chan struct{}),
quit: make(chan struct{}),
subChannelBufferSize: subChannelBufferSize,
}

// ZMQ Subscribe.
if bc.Cfg.ZmqPubAddress != "" {
if bc.Cfg.SubChannelBufferSize == 0 {
bc.Cfg.SubChannelBufferSize = DefaultSubChannelBufferSize
}

zctx, err := zmq4.NewContext()
if err != nil {
return nil, err
}
zsub, err := zctx.NewSocket(zmq4.SUB)
if err != nil {
return nil, err
}
if err := zsub.Connect(bc.Cfg.ZmqPubAddress); err != nil {
return nil, err
}
zback, err := zctx.NewSocket(zmq4.PAIR)
if err != nil {
return nil, err
}
if err := zback.Bind("inproc://channel"); err != nil {
return nil, err
}
zfront, err := zctx.NewSocket(zmq4.PAIR)
if err != nil {
return nil, err
}
if err := zfront.Connect("inproc://channel"); err != nil {
return nil, err
}
zctx, err := zmq4.NewContext()
if err != nil {
return nil, err
}
zsub, err := zctx.NewSocket(zmq4.SUB)
if err != nil {
return nil, err
}
if err := zsub.Connect(zmqPubAddress); err != nil {
return nil, err
}
zback, err := zctx.NewSocket(zmq4.PAIR)
if err != nil {
return nil, err
}
if err := zback.Bind("inproc://channel"); err != nil {
return nil, err
}
zfront, err := zctx.NewSocket(zmq4.PAIR)
if err != nil {
return nil, err
}
if err := zfront.Connect("inproc://channel"); err != nil {
return nil, err
}

bc.zctx = zctx
bc.zsub = zsub
bc.subs.exited = make(chan struct{})
bc.subs.zfront = zfront
bc.zback = zback
bc.zctx = zctx
bc.zsub = zsub
bc.subs.exited = make(chan struct{})
bc.subs.zfront = zfront
bc.zback = zback

bc.wg.Add(1)
go bc.zmqHandler()
}
bc.wg.Add(1)
go bc.zmqHandler()

return bc, nil
}
Expand Down
14 changes: 7 additions & 7 deletions zmq/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ type subscriptions struct {
zfront *zmq.Socket
latestEvent time.Time

sequence []chan SequenceMsg
sequence []chan *SequenceMsg
}

// SubscribeSequence subscribes to the ZMQ "sequence" messages as SequenceMsg items pushed onto the channel.
//
// Call cancel to cancel the subscription and let the client release the resources. The channel is closed
// when the subscription is canceled or when the client is closed.
func (bc *Client) SubscribeSequence() (subCh chan SequenceMsg, cancel func(), err error) {
func (bc *Client) SubscribeSequence() (subCh chan *SequenceMsg, cancel func(), err error) {
if bc.zsub == nil {
err = ErrSubscribeDisabled
return
Expand All @@ -50,7 +50,7 @@ func (bc *Client) SubscribeSequence() (subCh chan SequenceMsg, cancel func(), er
return
}
}
subCh = make(chan SequenceMsg, bc.Cfg.SubChannelBufferSize)
subCh = make(chan *SequenceMsg, bc.subChannelBufferSize)
bc.subs.sequence = append(bc.subs.sequence, subCh)
bc.subs.Unlock()
cancel = func() {
Expand All @@ -63,7 +63,7 @@ func (bc *Client) SubscribeSequence() (subCh chan SequenceMsg, cancel func(), er
return
}

func (bc *Client) unsubscribeSequence(subCh chan SequenceMsg) (err error) {
func (bc *Client) unsubscribeSequence(subCh chan *SequenceMsg) (err error) {
bc.subs.Lock()
select {
case <-bc.subs.exited:
Expand Down Expand Up @@ -142,13 +142,13 @@ OUTER:
bc.subs.RLock()
for _, ch := range bc.subs.sequence {
select {
case ch <- sequenceMsg:
case ch <- &sequenceMsg:
default:
select {
// Pop the oldest item and push the newest item (the user will miss a message).
case _ = <-ch:
ch <- sequenceMsg
case ch <- sequenceMsg:
ch <- &sequenceMsg
case ch <- &sequenceMsg:
default:
}
}
Expand Down

0 comments on commit 5afdeec

Please sign in to comment.