diff --git a/cmd/extool/extool.go b/cmd/extool/extool.go index 6572a09a..9480965b 100644 --- a/cmd/extool/extool.go +++ b/cmd/extool/extool.go @@ -37,7 +37,10 @@ func main() { log.Fatal("No address specified") } - c, err := electrumx.NewClient(address, 1, 1) + c, err := electrumx.NewClient(address, &electrumx.ClientOptions{ + InitialConnections: 1, + MaxConnections: 1, + }) if err != nil { log.Fatalf("Failed to create electrumx client: %v", err) } diff --git a/docker/bfgd/Dockerfile b/docker/bfgd/Dockerfile index 6097922f..d68a3de1 100644 --- a/docker/bfgd/Dockerfile +++ b/docker/bfgd/Dockerfile @@ -17,9 +17,13 @@ RUN addgroup --gid 65532 bfgd && \ -G bfgd --uid 65532 bfgd WORKDIR /build/ -COPY . . +COPY Makefile . +COPY go.mod . +COPY go.sum . RUN make deps + +COPY . . RUN GOOS=$(go env GOOS) GOARCH=$(go env GOARCH) CGO_ENABLED=0 GOGC=off make GO_LDFLAGS="$GO_LDFLAGS" bfgd # Run stage diff --git a/docker/bssd/Dockerfile b/docker/bssd/Dockerfile index de61921e..510e86bc 100644 --- a/docker/bssd/Dockerfile +++ b/docker/bssd/Dockerfile @@ -17,9 +17,13 @@ RUN addgroup --gid 65532 bssd && \ -G bssd --uid 65532 bssd WORKDIR /build/ -COPY . . +COPY Makefile . +COPY go.mod . +COPY go.sum . RUN make deps + +COPY . . RUN GOOS=$(go env GOOS) GOARCH=$(go env GOARCH) CGO_ENABLED=0 GOGC=off make GO_LDFLAGS="$GO_LDFLAGS" bssd # Run stage diff --git a/docker/popmd/Dockerfile b/docker/popmd/Dockerfile index af03d779..5938c462 100644 --- a/docker/popmd/Dockerfile +++ b/docker/popmd/Dockerfile @@ -17,9 +17,13 @@ RUN addgroup --gid 65532 popmd && \ -G popmd --uid 65532 popmd WORKDIR /build/ -COPY . . +COPY Makefile . +COPY go.mod . +COPY go.sum . RUN make deps + +COPY . . RUN GOOS=$(go env GOOS) GOARCH=$(go env GOARCH) CGO_ENABLED=0 GOGC=off make GO_LDFLAGS="$GO_LDFLAGS" popmd # Run stage diff --git a/hemi/electrumx/conn.go b/hemi/electrumx/conn.go index 10d15db6..38ed50ac 100644 --- a/hemi/electrumx/conn.go +++ b/hemi/electrumx/conn.go @@ -15,6 +15,8 @@ import ( "net" "sync" "time" + + "github.com/prometheus/client_golang/prometheus" ) const ( @@ -30,14 +32,17 @@ type clientConn struct { conn net.Conn requestID uint64 + metrics *metrics + closeCh chan struct{} onClose func(c *clientConn) } // newClientConn returns a new clientConn. -func newClientConn(conn net.Conn, onClose func(c *clientConn)) *clientConn { +func newClientConn(conn net.Conn, metrics *metrics, onClose func(c *clientConn)) *clientConn { c := &clientConn{ conn: conn, + metrics: metrics, closeCh: make(chan struct{}), onClose: onClose, } @@ -56,6 +61,18 @@ func (c *clientConn) call(ctx context.Context, method string, params, result any defer c.mx.Unlock() c.requestID++ + if c.metrics != nil { + start := time.Now() + defer func() { + c.metrics.rpcCallsDuration.With(prometheus.Labels{ + "method": method, + }).Observe(time.Since(start).Seconds()) + }() + c.metrics.rpcCallsTotal.With(prometheus.Labels{ + "method": method, + }).Inc() + } + req, err := NewJSONRPCRequest(c.requestID, method, params) if err != nil { return fmt.Errorf("create request: %w", err) diff --git a/hemi/electrumx/conn_pool.go b/hemi/electrumx/conn_pool.go index 4cd49e5d..4d7b0ce3 100644 --- a/hemi/electrumx/conn_pool.go +++ b/hemi/electrumx/conn_pool.go @@ -28,22 +28,28 @@ type connPool struct { // TODO(joshuasing): This is used as a basic queue, however there are much // more performant queue implementations that could be used. pool []*clientConn + + // metrics contains prometheus collectors used for collecting metrics. + metrics *metrics } // newConnPool creates a new connection pool. -func newConnPool(network, address string, initial, max int) (*connPool, error) { - if initial > max { - return nil, errors.New("initial connections must be less than max connections") +func newConnPool(network, address string, opts *ClientOptions, metrics *metrics) (*connPool, error) { + if opts.InitialConnections > opts.MaxConnections { + return nil, errors.New( + "initial connections must be less than or equal to max connections", + ) } p := &connPool{ network: network, address: address, - max: max, + max: opts.MaxConnections, + metrics: metrics, } // Add initial connections to the pool. - for range initial { + for range opts.InitialConnections { conn, err := p.newConn() if err != nil { return nil, fmt.Errorf("new initial connection: %w", err) @@ -63,7 +69,11 @@ func (p *connPool) newConn() (*clientConn, error) { if err != nil { return nil, err } - return newClientConn(c, p.onClose), nil + if p.metrics != nil { + p.metrics.connsOpened.Inc() + p.metrics.connsOpen.Inc() + } + return newClientConn(c, p.metrics, p.onClose), nil } // onClose removes a connection from the pool if found. @@ -74,10 +84,17 @@ func (p *connPool) onClose(conn *clientConn) { p.poolMx.Lock() // Remove the connection from the pool. + l := len(p.pool) p.pool = slices.DeleteFunc(p.pool, func(c *clientConn) bool { return c == conn }) + removed := len(p.pool) != l p.poolMx.Unlock() + + if p.metrics != nil && removed { + p.metrics.connsClosed.Inc() + p.metrics.connsOpen.Dec() + } } // acquireConn returns a connection from the pool. @@ -93,14 +110,19 @@ func (p *connPool) acquireConn() (*clientConn, error) { } p.poolMx.Unlock() - if c == nil { - // The connection pool is empty, create a new connection. - var err error - if c, err = p.newConn(); err != nil { - return nil, fmt.Errorf("new connection: %w", err) + if c != nil { + // Successfully acquired a connection from the pool. + if c.metrics != nil { + c.metrics.connsIdle.Dec() } + return c, nil } + // The connection pool is empty, create a new connection. + var err error + if c, err = p.newConn(); err != nil { + return nil, fmt.Errorf("new connection: %w", err) + } return c, nil } @@ -124,6 +146,9 @@ func (p *connPool) freeConn(conn *clientConn) { p.pool = append(p.pool, conn) p.poolMx.Unlock() + if p.metrics != nil { + p.metrics.connsIdle.Inc() + } } // size returns the number of connections in the pool. diff --git a/hemi/electrumx/conn_pool_test.go b/hemi/electrumx/conn_pool_test.go index 3e72245e..710b918c 100644 --- a/hemi/electrumx/conn_pool_test.go +++ b/hemi/electrumx/conn_pool_test.go @@ -15,6 +15,11 @@ const ( clientMaximumConnections = 5 ) +var testClientOpts = &ClientOptions{ + InitialConnections: clientInitialConnections, + MaxConnections: clientMaximumConnections, +} + func TestConnPool(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -22,8 +27,7 @@ func TestConnPool(t *testing.T) { server := createMockServer(t) defer server.Close() - pool, err := newConnPool("tcp", server.address, - clientInitialConnections, clientMaximumConnections) + pool, err := newConnPool("tcp", server.address, testClientOpts, nil) if err != nil { t.Fatalf("failed to create connPool: %v", err) } diff --git a/hemi/electrumx/conn_test.go b/hemi/electrumx/conn_test.go index 387941c7..11537388 100644 --- a/hemi/electrumx/conn_test.go +++ b/hemi/electrumx/conn_test.go @@ -34,7 +34,7 @@ func TestClientConn(t *testing.T) { t.Fatalf("failed to dial server: %v", err) } - c := newClientConn(conn, nil) + c := newClientConn(conn, nil, nil) defer c.Close() tests := []struct { @@ -174,7 +174,7 @@ func TestClose(t *testing.T) { t.Fatalf("failed to dial server: %v", err) } - c := newClientConn(conn, nil) + c := newClientConn(conn, nil, nil) // Ping the server. if err := c.ping(); err != nil { diff --git a/hemi/electrumx/electrumx.go b/hemi/electrumx/electrumx.go index f2b32470..6570676c 100644 --- a/hemi/electrumx/electrumx.go +++ b/hemi/electrumx/electrumx.go @@ -17,11 +17,17 @@ import ( btcchainhash "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/juju/loggo" + "github.com/prometheus/client_golang/prometheus" "github.com/sethvargo/go-retry" "github.com/hemilabs/heminetwork/bitcoin" ) +var log = loggo.GetLogger("electrumx") + +// Prometheus subsystem name. +const promSubsystem = "electrumx" + // https://electrumx.readthedocs.io/en/latest/protocol-basics.html type JSONRPCError struct { @@ -120,17 +126,118 @@ var ( // Client implements an electrumx JSON RPC client. type Client struct { connPool *connPool + metrics *metrics } -var log = loggo.GetLogger("electrumx") +var ( + defaultInitialConnections = 2 + defaultMaxConnections = 10 +) + +type ClientOptions struct { + // InitialConnections is the number of initial ElectrumX connections to open + // and keep in the pool. + InitialConnections int + + // MaxConnections is the maximum number of ElectrumX connections to keep in + // the pool. + // + // If adding a connection back to the pool would result in the pool having + // more connections than this value, the connection will be closed instead + // of being added to the pool. + MaxConnections int + + // PromNamespace is the application Prometheus namespace. + PromNamespace string +} + +type metrics struct { + connsOpen prometheus.Gauge // Number of open connections + connsIdle prometheus.Gauge // Number of idle connections + connsOpened prometheus.Counter // Total number of connections opened + connsClosed prometheus.Counter // Total number of connections closed + rpcCallsTotal *prometheus.CounterVec // Total number of RPC calls + rpcCallsDuration *prometheus.HistogramVec // RPC call durations in seconds +} + +func newMetrics(namespace string) *metrics { + return &metrics{ + connsOpen: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: promSubsystem, + Name: "connections_open", + Help: "Number of open ElectrumX connections", + }), + connsIdle: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: promSubsystem, + Name: "connections_idle", + Help: "Number of idle ElectrumX connections", + }), + connsOpened: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: promSubsystem, + Name: "connections_opened_total", + Help: "Total number of ElectrumX connections opened", + }), + connsClosed: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: promSubsystem, + Name: "connections_closed_total", + Help: "Total number of ElectrumX connections closed", + }), + rpcCallsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: promSubsystem, + Name: "rpc_calls_total", + Help: "Total number of ElectrumX RPC calls", + }, + []string{"method"}, + ), + rpcCallsDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: promSubsystem, + Name: "rpc_calls_duration_seconds", + Help: "ElectrumX RPC call durations in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"method"}, + ), + } +} + +func (m *metrics) collectors() []prometheus.Collector { + return []prometheus.Collector{ + m.connsOpen, + m.connsIdle, + m.connsOpened, + m.connsClosed, + m.rpcCallsTotal, + m.rpcCallsDuration, + } +} // NewClient returns an initialised electrumx client. -func NewClient(address string, initialConns, maxConns int) (*Client, error) { - c := &Client{} +func NewClient(address string, opts *ClientOptions) (*Client, error) { + if opts == nil { + opts = new(ClientOptions) + } + if opts.InitialConnections == 0 { + opts.InitialConnections = defaultInitialConnections + } + if opts.MaxConnections == 0 { + opts.MaxConnections = defaultMaxConnections + } + + c := &Client{ + metrics: newMetrics(opts.PromNamespace), + } // The address may be empty during tests, ignore empty addresses. if address != "" { - pool, err := newConnPool("tcp", address, initialConns, maxConns) + pool, err := newConnPool("tcp", address, opts, c.metrics) if err != nil { return nil, fmt.Errorf("new connection pool: %w", err) } @@ -140,6 +247,11 @@ func NewClient(address string, initialConns, maxConns int) (*Client, error) { return c, nil } +// Metrics returns Prometheus metric collectors for the client. +func (c *Client) Metrics() []prometheus.Collector { + return c.metrics.collectors() +} + func (c *Client) call(ctx context.Context, method string, params, result any) error { if c.connPool == nil { // connPool may be nil if the address given to NewClient is empty. diff --git a/service/bfg/bfg.go b/service/bfg/bfg.go index db6837eb..0765ccbd 100644 --- a/service/bfg/bfg.go +++ b/service/bfg/bfg.go @@ -74,6 +74,7 @@ func NewDefaultConfig() *Config { // XXX figure out if this needs to be moved out into the electrumx package. type btcClient interface { + Metrics() []prometheus.Collector Balance(ctx context.Context, scriptHash []byte) (*electrumx.Balance, error) Broadcast(ctx context.Context, rtx []byte) ([]byte, error) Height(ctx context.Context) (uint64, error) @@ -217,7 +218,11 @@ func NewServer(cfg *Config) (*Server, error) { } var err error - s.btcClient, err = electrumx.NewClient(cfg.EXBTCAddress, cfg.EXBTCInitialConns, cfg.EXBTCMaxConns) + s.btcClient, err = electrumx.NewClient(cfg.EXBTCAddress, &electrumx.ClientOptions{ + InitialConnections: cfg.EXBTCInitialConns, + MaxConnections: cfg.EXBTCMaxConns, + PromNamespace: promNamespace, + }) if err != nil { return nil, fmt.Errorf("create electrumx client: %w", err) } @@ -1561,7 +1566,8 @@ func (s *Server) Run(pctx context.Context) error { if err != nil { return fmt.Errorf("create prometheus server: %w", err) } - cs := append(s.metrics.collectors(), + cs := append( + append(s.metrics.collectors(), s.btcClient.Metrics()...), prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Namespace: promNamespace, Name: "running",