Skip to content

Commit

Permalink
electrumx,bfg: add electrumx prometheus metrics (#226)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuasing authored Aug 27, 2024
1 parent f831628 commit 7cd520d
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 26 deletions.
5 changes: 4 additions & 1 deletion cmd/extool/extool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ func main() {
log.Fatal("No address specified")
}

c, err := electrumx.NewClient(address, 1, 1)
c, err := electrumx.NewClient(address, &electrumx.ClientOptions{
InitialConnections: 1,
MaxConnections: 1,
})
if err != nil {
log.Fatalf("Failed to create electrumx client: %v", err)
}
Expand Down
6 changes: 5 additions & 1 deletion docker/bfgd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ RUN addgroup --gid 65532 bfgd && \
-G bfgd --uid 65532 bfgd

WORKDIR /build/
COPY . .

COPY Makefile .
COPY go.mod .
COPY go.sum .
RUN make deps

COPY . .
RUN GOOS=$(go env GOOS) GOARCH=$(go env GOARCH) CGO_ENABLED=0 GOGC=off make GO_LDFLAGS="$GO_LDFLAGS" bfgd

# Run stage
Expand Down
6 changes: 5 additions & 1 deletion docker/bssd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ RUN addgroup --gid 65532 bssd && \
-G bssd --uid 65532 bssd

WORKDIR /build/
COPY . .

COPY Makefile .
COPY go.mod .
COPY go.sum .
RUN make deps

COPY . .
RUN GOOS=$(go env GOOS) GOARCH=$(go env GOARCH) CGO_ENABLED=0 GOGC=off make GO_LDFLAGS="$GO_LDFLAGS" bssd

# Run stage
Expand Down
6 changes: 5 additions & 1 deletion docker/popmd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ RUN addgroup --gid 65532 popmd && \
-G popmd --uid 65532 popmd

WORKDIR /build/
COPY . .

COPY Makefile .
COPY go.mod .
COPY go.sum .
RUN make deps

COPY . .
RUN GOOS=$(go env GOOS) GOARCH=$(go env GOARCH) CGO_ENABLED=0 GOGC=off make GO_LDFLAGS="$GO_LDFLAGS" popmd

# Run stage
Expand Down
19 changes: 18 additions & 1 deletion hemi/electrumx/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"net"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

const (
Expand All @@ -30,14 +32,17 @@ type clientConn struct {
conn net.Conn
requestID uint64

metrics *metrics

closeCh chan struct{}
onClose func(c *clientConn)
}

// newClientConn returns a new clientConn.
func newClientConn(conn net.Conn, onClose func(c *clientConn)) *clientConn {
func newClientConn(conn net.Conn, metrics *metrics, onClose func(c *clientConn)) *clientConn {
c := &clientConn{
conn: conn,
metrics: metrics,
closeCh: make(chan struct{}),
onClose: onClose,
}
Expand All @@ -56,6 +61,18 @@ func (c *clientConn) call(ctx context.Context, method string, params, result any
defer c.mx.Unlock()
c.requestID++

if c.metrics != nil {
start := time.Now()
defer func() {
c.metrics.rpcCallsDuration.With(prometheus.Labels{
"method": method,
}).Observe(time.Since(start).Seconds())
}()
c.metrics.rpcCallsTotal.With(prometheus.Labels{
"method": method,
}).Inc()
}

req, err := NewJSONRPCRequest(c.requestID, method, params)
if err != nil {
return fmt.Errorf("create request: %w", err)
Expand Down
47 changes: 36 additions & 11 deletions hemi/electrumx/conn_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,28 @@ type connPool struct {
// TODO(joshuasing): This is used as a basic queue, however there are much
// more performant queue implementations that could be used.
pool []*clientConn

// metrics contains prometheus collectors used for collecting metrics.
metrics *metrics
}

// newConnPool creates a new connection pool.
func newConnPool(network, address string, initial, max int) (*connPool, error) {
if initial > max {
return nil, errors.New("initial connections must be less than max connections")
func newConnPool(network, address string, opts *ClientOptions, metrics *metrics) (*connPool, error) {
if opts.InitialConnections > opts.MaxConnections {
return nil, errors.New(
"initial connections must be less than or equal to max connections",
)
}

p := &connPool{
network: network,
address: address,
max: max,
max: opts.MaxConnections,
metrics: metrics,
}

// Add initial connections to the pool.
for range initial {
for range opts.InitialConnections {
conn, err := p.newConn()
if err != nil {
return nil, fmt.Errorf("new initial connection: %w", err)
Expand All @@ -63,7 +69,11 @@ func (p *connPool) newConn() (*clientConn, error) {
if err != nil {
return nil, err
}
return newClientConn(c, p.onClose), nil
if p.metrics != nil {
p.metrics.connsOpened.Inc()
p.metrics.connsOpen.Inc()
}
return newClientConn(c, p.metrics, p.onClose), nil
}

// onClose removes a connection from the pool if found.
Expand All @@ -74,10 +84,17 @@ func (p *connPool) onClose(conn *clientConn) {

p.poolMx.Lock()
// Remove the connection from the pool.
l := len(p.pool)
p.pool = slices.DeleteFunc(p.pool, func(c *clientConn) bool {
return c == conn
})
removed := len(p.pool) != l
p.poolMx.Unlock()

if p.metrics != nil && removed {
p.metrics.connsClosed.Inc()
p.metrics.connsOpen.Dec()
}
}

// acquireConn returns a connection from the pool.
Expand All @@ -93,14 +110,19 @@ func (p *connPool) acquireConn() (*clientConn, error) {
}
p.poolMx.Unlock()

if c == nil {
// The connection pool is empty, create a new connection.
var err error
if c, err = p.newConn(); err != nil {
return nil, fmt.Errorf("new connection: %w", err)
if c != nil {
// Successfully acquired a connection from the pool.
if c.metrics != nil {
c.metrics.connsIdle.Dec()
}
return c, nil
}

// The connection pool is empty, create a new connection.
var err error
if c, err = p.newConn(); err != nil {
return nil, fmt.Errorf("new connection: %w", err)
}
return c, nil
}

Expand All @@ -124,6 +146,9 @@ func (p *connPool) freeConn(conn *clientConn) {

p.pool = append(p.pool, conn)
p.poolMx.Unlock()
if p.metrics != nil {
p.metrics.connsIdle.Inc()
}
}

// size returns the number of connections in the pool.
Expand Down
8 changes: 6 additions & 2 deletions hemi/electrumx/conn_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ const (
clientMaximumConnections = 5
)

var testClientOpts = &ClientOptions{
InitialConnections: clientInitialConnections,
MaxConnections: clientMaximumConnections,
}

func TestConnPool(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

server := createMockServer(t)
defer server.Close()

pool, err := newConnPool("tcp", server.address,
clientInitialConnections, clientMaximumConnections)
pool, err := newConnPool("tcp", server.address, testClientOpts, nil)
if err != nil {
t.Fatalf("failed to create connPool: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions hemi/electrumx/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestClientConn(t *testing.T) {
t.Fatalf("failed to dial server: %v", err)
}

c := newClientConn(conn, nil)
c := newClientConn(conn, nil, nil)
defer c.Close()

tests := []struct {
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestClose(t *testing.T) {
t.Fatalf("failed to dial server: %v", err)
}

c := newClientConn(conn, nil)
c := newClientConn(conn, nil, nil)

// Ping the server.
if err := c.ping(); err != nil {
Expand Down
120 changes: 116 additions & 4 deletions hemi/electrumx/electrumx.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ import (

btcchainhash "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/juju/loggo"
"github.com/prometheus/client_golang/prometheus"
"github.com/sethvargo/go-retry"

"github.com/hemilabs/heminetwork/bitcoin"
)

var log = loggo.GetLogger("electrumx")

// Prometheus subsystem name.
const promSubsystem = "electrumx"

// https://electrumx.readthedocs.io/en/latest/protocol-basics.html

type JSONRPCError struct {
Expand Down Expand Up @@ -120,17 +126,118 @@ var (
// Client implements an electrumx JSON RPC client.
type Client struct {
connPool *connPool
metrics *metrics
}

var log = loggo.GetLogger("electrumx")
var (
defaultInitialConnections = 2
defaultMaxConnections = 10
)

type ClientOptions struct {
// InitialConnections is the number of initial ElectrumX connections to open
// and keep in the pool.
InitialConnections int

// MaxConnections is the maximum number of ElectrumX connections to keep in
// the pool.
//
// If adding a connection back to the pool would result in the pool having
// more connections than this value, the connection will be closed instead
// of being added to the pool.
MaxConnections int

// PromNamespace is the application Prometheus namespace.
PromNamespace string
}

type metrics struct {
connsOpen prometheus.Gauge // Number of open connections
connsIdle prometheus.Gauge // Number of idle connections
connsOpened prometheus.Counter // Total number of connections opened
connsClosed prometheus.Counter // Total number of connections closed
rpcCallsTotal *prometheus.CounterVec // Total number of RPC calls
rpcCallsDuration *prometheus.HistogramVec // RPC call durations in seconds
}

func newMetrics(namespace string) *metrics {
return &metrics{
connsOpen: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: promSubsystem,
Name: "connections_open",
Help: "Number of open ElectrumX connections",
}),
connsIdle: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: promSubsystem,
Name: "connections_idle",
Help: "Number of idle ElectrumX connections",
}),
connsOpened: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: promSubsystem,
Name: "connections_opened_total",
Help: "Total number of ElectrumX connections opened",
}),
connsClosed: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: promSubsystem,
Name: "connections_closed_total",
Help: "Total number of ElectrumX connections closed",
}),
rpcCallsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: promSubsystem,
Name: "rpc_calls_total",
Help: "Total number of ElectrumX RPC calls",
},
[]string{"method"},
),
rpcCallsDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: promSubsystem,
Name: "rpc_calls_duration_seconds",
Help: "ElectrumX RPC call durations in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method"},
),
}
}

func (m *metrics) collectors() []prometheus.Collector {
return []prometheus.Collector{
m.connsOpen,
m.connsIdle,
m.connsOpened,
m.connsClosed,
m.rpcCallsTotal,
m.rpcCallsDuration,
}
}

// NewClient returns an initialised electrumx client.
func NewClient(address string, initialConns, maxConns int) (*Client, error) {
c := &Client{}
func NewClient(address string, opts *ClientOptions) (*Client, error) {
if opts == nil {
opts = new(ClientOptions)
}
if opts.InitialConnections == 0 {
opts.InitialConnections = defaultInitialConnections
}
if opts.MaxConnections == 0 {
opts.MaxConnections = defaultMaxConnections
}

c := &Client{
metrics: newMetrics(opts.PromNamespace),
}

// The address may be empty during tests, ignore empty addresses.
if address != "" {
pool, err := newConnPool("tcp", address, initialConns, maxConns)
pool, err := newConnPool("tcp", address, opts, c.metrics)
if err != nil {
return nil, fmt.Errorf("new connection pool: %w", err)
}
Expand All @@ -140,6 +247,11 @@ func NewClient(address string, initialConns, maxConns int) (*Client, error) {
return c, nil
}

// Metrics returns Prometheus metric collectors for the client.
func (c *Client) Metrics() []prometheus.Collector {
return c.metrics.collectors()
}

func (c *Client) call(ctx context.Context, method string, params, result any) error {
if c.connPool == nil {
// connPool may be nil if the address given to NewClient is empty.
Expand Down
Loading

0 comments on commit 7cd520d

Please sign in to comment.