Skip to content

Commit

Permalink
feat: additional prom metrics (#738)
Browse files Browse the repository at this point in the history
* add update timestamps for cache + separate prom/client dependency

* add client latency requests
  • Loading branch information
aalu1418 authored Jun 11, 2024
1 parent 511abc3 commit 4ca9bcc
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 17 deletions.
5 changes: 4 additions & 1 deletion pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L
return ch.getClient()
}
ch.txm = txm.NewTxm(ch.id, tc, cfg, ks, lggr)
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, ch.Reader)
bc := func() (monitor.BalanceClient, error) {
return ch.getClient()
}
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc)
return &ch, nil
}

Expand Down
39 changes: 39 additions & 0 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

const (
Expand Down Expand Up @@ -51,6 +52,7 @@ type Writer interface {
var _ ReaderWriter = (*Client)(nil)

type Client struct {
url string
rpc *rpc.Client
skipPreflight bool // to enable or disable preflight checks
commitment rpc.CommitmentType
Expand All @@ -65,6 +67,7 @@ type Client struct {

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
return &Client{
url: endpoint,
rpc: rpc.New(endpoint),
skipPreflight: cfg.SkipPreflight(),
commitment: cfg.Commitment(),
Expand All @@ -76,7 +79,17 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration,
}, nil
}

func (c *Client) latency(name string) func() {
start := time.Now()
return func() {
monitor.SetClientLatency(time.Since(start), name, c.url)
}
}

func (c *Client) Balance(addr solana.PublicKey) (uint64, error) {
done := c.latency("balance")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
defer cancel()

Expand All @@ -95,6 +108,9 @@ func (c *Client) SlotHeight() (uint64, error) {
}

func (c *Client) SlotHeightWithCommitment(commitment rpc.CommitmentType) (uint64, error) {
done := c.latency("slot_height")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
defer cancel()
v, err, _ := c.requestGroup.Do("GetSlotHeight", func() (interface{}, error) {
Expand All @@ -104,13 +120,19 @@ func (c *Client) SlotHeightWithCommitment(commitment rpc.CommitmentType) (uint64
}

func (c *Client) GetAccountInfoWithOpts(ctx context.Context, addr solana.PublicKey, opts *rpc.GetAccountInfoOpts) (*rpc.GetAccountInfoResult, error) {
done := c.latency("account_info")
defer done()

ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()
opts.Commitment = c.commitment // overrides passed in value - use defined client commitment type
return c.rpc.GetAccountInfoWithOpts(ctx, addr, opts)
}

func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) {
done := c.latency("latest_blockhash")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
defer cancel()

Expand All @@ -121,6 +143,9 @@ func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) {
}

func (c *Client) ChainID() (string, error) {
done := c.latency("chain_id")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
defer cancel()
v, err, _ := c.requestGroup.Do("GetGenesisHash", func() (interface{}, error) {
Expand All @@ -147,6 +172,9 @@ func (c *Client) ChainID() (string, error) {
}

func (c *Client) GetFeeForMessage(msg string) (uint64, error) {
done := c.latency("fee_for_message")
defer done()

// msg is base58 encoded data

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
Expand All @@ -164,6 +192,9 @@ func (c *Client) GetFeeForMessage(msg string) (uint64, error) {

// https://docs.solana.com/developing/clients/jsonrpc-api#getsignaturestatuses
func (c *Client) SignatureStatuses(ctx context.Context, sigs []solana.Signature) ([]*rpc.SignatureStatusesResult, error) {
done := c.latency("signature_statuses")
defer done()

ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()

Expand All @@ -182,6 +213,9 @@ func (c *Client) SignatureStatuses(ctx context.Context, sigs []solana.Signature)
// https://docs.solana.com/developing/clients/jsonrpc-api#simulatetransaction
// opts - (optional) use `nil` to use defaults
func (c *Client) SimulateTx(ctx context.Context, tx *solana.Transaction, opts *rpc.SimulateTransactionOpts) (*rpc.SimulateTransactionResult, error) {
done := c.latency("simulate_tx")
defer done()

ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()

Expand All @@ -205,6 +239,9 @@ func (c *Client) SimulateTx(ctx context.Context, tx *solana.Transaction, opts *r
}

func (c *Client) SendTx(ctx context.Context, tx *solana.Transaction) (solana.Signature, error) {
done := c.latency("send_tx")
defer done()

ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
defer cancel()

Expand All @@ -225,6 +262,8 @@ func (c *Client) GetLatestBlock() (*rpc.GetBlockResult, error) {
}

// get block based on slot
done := c.latency("latest_block")
defer done()
ctx, cancel := context.WithTimeout(context.Background(), c.txTimeout)
defer cancel()
v, err, _ := c.requestGroup.Do("GetBlockWithOpts", func() (interface{}, error) {
Expand Down
22 changes: 22 additions & 0 deletions pkg/solana/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import (
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/programs/system"
"github.com/gagliardetto/solana-go/rpc"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

func TestClient_Reader_Integration(t *testing.T) {
Expand Down Expand Up @@ -292,3 +295,22 @@ func TestClient_SendTxDuplicates_Integration(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, uint64(5_000), initBal-endBal)
}

func TestClientLatency(t *testing.T) {
c := Client{}
v := 100
n := t.Name() + uuid.NewString()
f := func() {
done := c.latency(n)
defer done()
time.Sleep(time.Duration(v) * time.Millisecond)
}
f()
g, err := monitor.GetClientLatency(n, c.url)
require.NoError(t, err)
val := testutil.ToFloat64(g)

// check within expected range
assert.GreaterOrEqual(t, val, float64(v))
assert.LessOrEqual(t, val, float64(v)*1.05)
}
16 changes: 9 additions & 7 deletions pkg/solana/monitor/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

solanaClient "github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
)

// Config defines the monitor configuration.
Expand All @@ -23,12 +21,16 @@ type Keystore interface {
Accounts(ctx context.Context) ([]string, error)
}

type BalanceClient interface {
Balance(addr solana.PublicKey) (uint64, error)
}

// NewBalanceMonitor returns a balance monitoring services.Service which reports the SOL balance of all ks keys to prometheus.
func NewBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (solanaClient.Reader, error)) services.Service {
func NewBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (BalanceClient, error)) services.Service {
return newBalanceMonitor(chainID, cfg, lggr, ks, newReader)
}

func newBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (solanaClient.Reader, error)) *balanceMonitor {
func newBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (BalanceClient, error)) *balanceMonitor {
b := balanceMonitor{
chainID: chainID,
cfg: cfg,
Expand All @@ -48,10 +50,10 @@ type balanceMonitor struct {
cfg Config
lggr logger.Logger
ks Keystore
newReader func() (solanaClient.Reader, error)
newReader func() (BalanceClient, error)
updateFn func(acc solana.PublicKey, lamports uint64) // overridable for testing

reader solanaClient.Reader
reader BalanceClient

stop services.StopChan
done chan struct{}
Expand Down Expand Up @@ -98,7 +100,7 @@ func (b *balanceMonitor) monitor() {
}

// getReader returns the cached solanaClient.Reader, or creates a new one if nil.
func (b *balanceMonitor) getReader() (solanaClient.Reader, error) {
func (b *balanceMonitor) getReader() (BalanceClient, error) {
if b.reader == nil {
var err error
b.reader, err = b.newReader()
Expand Down
40 changes: 37 additions & 3 deletions pkg/solana/monitor/prom.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,53 @@
package monitor

import (
"time"

"github.com/gagliardetto/solana-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/internal"
)

var promSolanaBalance = promauto.NewGaugeVec(
prometheus.GaugeOpts{Name: "solana_balance", Help: "Solana account balances"},
[]string{"account", "chainID", "chainSet", "denomination"},
var (
promSolanaBalance = promauto.NewGaugeVec(
prometheus.GaugeOpts{Name: "solana_balance", Help: "Solana account balances"},
[]string{"account", "chainID", "chainSet", "denomination"},
)
promCacheTimestamp = promauto.NewGaugeVec(
prometheus.GaugeOpts{Name: "solana_cache_last_update_unix", Help: "Solana relayer cache last update timestamp"},
[]string{"type", "chainID", "account"},
)
promClientReq = promauto.NewGaugeVec(
prometheus.GaugeOpts{Name: "solana_client_latency_ms", Help: "Solana client request latency"},
[]string{"request", "url"},
)
)

func (b *balanceMonitor) updateProm(acc solana.PublicKey, lamports uint64) {
v := internal.LamportsToSol(lamports) // convert from lamports to SOL
promSolanaBalance.WithLabelValues(acc.String(), b.chainID, "solana", "SOL").Set(v)
}

func SetCacheTimestamp(t time.Time, cacheType, chainID, account string) {
promCacheTimestamp.With(prometheus.Labels{
"type": cacheType,
"chainID": chainID,
"account": account,
}).Set(float64(t.Unix()))
}

func SetClientLatency(d time.Duration, request, url string) {
promClientReq.With(prometheus.Labels{
"request": request,
"url": url,
}).Set(float64(d.Milliseconds()))
}

func GetClientLatency(request, url string) (prometheus.Gauge, error) {
return promClientReq.GetMetricWith(prometheus.Labels{
"request": request,
"url": url,
})
}
4 changes: 2 additions & 2 deletions pkg/solana/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (r *Relayer) NewMedianProvider(rargs relaytypes.RelayArgs, pargs relaytypes
}

cfg := configWatcher.chain.Config()
transmissionsCache := NewTransmissionsCache(transmissionsID, cfg, configWatcher.reader, r.lggr)
transmissionsCache := NewTransmissionsCache(transmissionsID, relayConfig.ChainID, cfg, configWatcher.reader, r.lggr)
return &medianProvider{
configProvider: configWatcher,
transmissionsCache: transmissionsCache,
Expand Down Expand Up @@ -194,7 +194,7 @@ func newConfigProvider(ctx context.Context, lggr logger.Logger, chain Chain, arg
if err != nil {
return nil, fmt.Errorf("error in NewMedianProvider.chain.Reader: %w", err)
}
stateCache := NewStateCache(stateID, chain.Config(), reader, lggr)
stateCache := NewStateCache(stateID, relayConfig.ChainID, chain.Config(), reader, lggr)
return &configProvider{
chainID: relayConfig.ChainID,
stateID: stateID,
Expand Down
9 changes: 7 additions & 2 deletions pkg/solana/state_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

var (
Expand All @@ -28,6 +29,7 @@ type StateCache struct {
services.StateMachine
// on-chain program + 2x state accounts (state + transmissions)
StateID solana.PublicKey
chainID string

stateLock sync.RWMutex
state State
Expand All @@ -43,9 +45,10 @@ type StateCache struct {
stopCh services.StopChan
}

func NewStateCache(stateID solana.PublicKey, cfg config.Config, reader client.Reader, lggr logger.Logger) *StateCache {
func NewStateCache(stateID solana.PublicKey, chainID string, cfg config.Config, reader client.Reader, lggr logger.Logger) *StateCache {
return &StateCache{
StateID: stateID,
chainID: chainID,
reader: reader,
lggr: lggr,
cfg: cfg,
Expand Down Expand Up @@ -124,11 +127,13 @@ func (c *StateCache) fetchState(ctx context.Context) error {

c.lggr.Debugf("state fetched for account: %s, result (config digest): %v", c.StateID, hex.EncodeToString(state.Config.LatestConfigDigest[:]))

timestamp := time.Now()
monitor.SetCacheTimestamp(timestamp, "ocr2_median_state", c.chainID, c.StateID.String())
// acquire lock and write to state
c.stateLock.Lock()
defer c.stateLock.Unlock()
c.state = state
c.stateTime = time.Now()
c.stateTime = timestamp
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/solana/transmissions_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

type TransmissionsCache struct {
services.StateMachine

// on-chain program + 2x state accounts (state + transmissions)
TransmissionsID solana.PublicKey
chainID string

ansLock sync.RWMutex
answer Answer
Expand All @@ -39,9 +41,10 @@ type TransmissionsCache struct {
stopCh services.StopChan
}

func NewTransmissionsCache(transmissionsID solana.PublicKey, cfg config.Config, reader client.Reader, lggr logger.Logger) *TransmissionsCache {
func NewTransmissionsCache(transmissionsID solana.PublicKey, chainID string, cfg config.Config, reader client.Reader, lggr logger.Logger) *TransmissionsCache {
return &TransmissionsCache{
TransmissionsID: transmissionsID,
chainID: chainID,
reader: reader,
lggr: lggr,
cfg: cfg,
Expand Down Expand Up @@ -120,11 +123,13 @@ func (c *TransmissionsCache) fetchLatestTransmission(ctx context.Context) error
}
c.lggr.Debugf("latest transmission fetched for account: %s, result: %v", c.TransmissionsID, answer)

timestamp := time.Now()
monitor.SetCacheTimestamp(timestamp, "ocr2_median_transmissions", c.chainID, c.TransmissionsID.String())
// acquire lock and write to state
c.ansLock.Lock()
defer c.ansLock.Unlock()
c.answer = answer
c.ansTime = time.Now()
c.ansTime = timestamp
return nil
}

Expand Down

0 comments on commit 4ca9bcc

Please sign in to comment.