From 8043c7b659b35f868e382619902cb2cb6e2323aa Mon Sep 17 00:00:00 2001 From: aalu1418 <50029043+aalu1418@users.noreply.github.com> Date: Wed, 5 Jun 2024 15:31:22 -0600 Subject: [PATCH 1/2] add update timestamps for cache + separate prom/client dependency --- pkg/solana/chain.go | 5 ++++- pkg/solana/client/client.go | 10 ++++++++++ pkg/solana/monitor/balance.go | 16 ++++++++------- pkg/solana/monitor/prom.go | 33 ++++++++++++++++++++++++++++--- pkg/solana/relay.go | 4 ++-- pkg/solana/state_cache.go | 9 +++++++-- pkg/solana/transmissions_cache.go | 9 +++++++-- 7 files changed, 69 insertions(+), 17 deletions(-) diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index e62aa4531..4e03fd425 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -225,7 +225,10 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L return ch.getClient() } ch.txm = txm.NewTxm(ch.id, tc, cfg, ks, lggr) - ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, ch.Reader) + bc := func() (monitor.BalanceClient, error) { + return ch.getClient() + } + ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc) return &ch, nil } diff --git a/pkg/solana/client/client.go b/pkg/solana/client/client.go index ce329a919..f5a36a6fe 100644 --- a/pkg/solana/client/client.go +++ b/pkg/solana/client/client.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" ) const ( @@ -51,6 +52,7 @@ type Writer interface { var _ ReaderWriter = (*Client)(nil) type Client struct { + url string rpc *rpc.Client skipPreflight bool // to enable or disable preflight checks commitment rpc.CommitmentType @@ -65,6 +67,7 @@ type Client struct { func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) { return &Client{ + url: endpoint, rpc: rpc.New(endpoint), skipPreflight: cfg.SkipPreflight(), commitment: cfg.Commitment(), @@ -76,6 +79,13 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, }, nil } +func (c *Client) latency(name string) func() { + start := time.Now() + return func() { + monitor.SetClientLatency(time.Since(start), name, c.url) + } +} + func (c *Client) Balance(addr solana.PublicKey) (uint64, error) { ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) defer cancel() diff --git a/pkg/solana/monitor/balance.go b/pkg/solana/monitor/balance.go index dd7be2344..00f873488 100644 --- a/pkg/solana/monitor/balance.go +++ b/pkg/solana/monitor/balance.go @@ -9,8 +9,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/utils" - - solanaClient "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" ) // Config defines the monitor configuration. @@ -23,12 +21,16 @@ type Keystore interface { Accounts(ctx context.Context) ([]string, error) } +type BalanceClient interface { + Balance(addr solana.PublicKey) (uint64, error) +} + // NewBalanceMonitor returns a balance monitoring services.Service which reports the SOL balance of all ks keys to prometheus. -func NewBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (solanaClient.Reader, error)) services.Service { +func NewBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (BalanceClient, error)) services.Service { return newBalanceMonitor(chainID, cfg, lggr, ks, newReader) } -func newBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (solanaClient.Reader, error)) *balanceMonitor { +func newBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (BalanceClient, error)) *balanceMonitor { b := balanceMonitor{ chainID: chainID, cfg: cfg, @@ -48,10 +50,10 @@ type balanceMonitor struct { cfg Config lggr logger.Logger ks Keystore - newReader func() (solanaClient.Reader, error) + newReader func() (BalanceClient, error) updateFn func(acc solana.PublicKey, lamports uint64) // overridable for testing - reader solanaClient.Reader + reader BalanceClient stop services.StopChan done chan struct{} @@ -98,7 +100,7 @@ func (b *balanceMonitor) monitor() { } // getReader returns the cached solanaClient.Reader, or creates a new one if nil. -func (b *balanceMonitor) getReader() (solanaClient.Reader, error) { +func (b *balanceMonitor) getReader() (BalanceClient, error) { if b.reader == nil { var err error b.reader, err = b.newReader() diff --git a/pkg/solana/monitor/prom.go b/pkg/solana/monitor/prom.go index 1779b29fc..a87167b6b 100644 --- a/pkg/solana/monitor/prom.go +++ b/pkg/solana/monitor/prom.go @@ -1,6 +1,8 @@ package monitor import ( + "time" + "github.com/gagliardetto/solana-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -8,12 +10,37 @@ import ( "github.com/smartcontractkit/chainlink-solana/pkg/solana/internal" ) -var promSolanaBalance = promauto.NewGaugeVec( - prometheus.GaugeOpts{Name: "solana_balance", Help: "Solana account balances"}, - []string{"account", "chainID", "chainSet", "denomination"}, +var ( + promSolanaBalance = promauto.NewGaugeVec( + prometheus.GaugeOpts{Name: "solana_balance", Help: "Solana account balances"}, + []string{"account", "chainID", "chainSet", "denomination"}, + ) + promCacheTimestamp = promauto.NewGaugeVec( + prometheus.GaugeOpts{Name: "solana_cache_last_update_unix", Help: "Solana relayer cache last update timestamp"}, + []string{"type", "chainID", "account"}, + ) + promClientReq = promauto.NewGaugeVec( + prometheus.GaugeOpts{Name: "solana_client_latency_ms", Help: "Solana client request latency"}, + []string{"request", "url"}, + ) ) func (b *balanceMonitor) updateProm(acc solana.PublicKey, lamports uint64) { v := internal.LamportsToSol(lamports) // convert from lamports to SOL promSolanaBalance.WithLabelValues(acc.String(), b.chainID, "solana", "SOL").Set(v) } + +func SetCacheTimestamp(t time.Time, cacheType, chainID, account string) { + promCacheTimestamp.With(prometheus.Labels{ + "type": cacheType, + "chainID": chainID, + "account": account, + }).Set(float64(t.Unix())) +} + +func SetClientLatency(d time.Duration, request, url string) { + promClientReq.With(prometheus.Labels{ + "request": request, + "url": url, + }).Set(float64(d.Milliseconds())) +} diff --git a/pkg/solana/relay.go b/pkg/solana/relay.go index 6b1424626..ccf2a677c 100644 --- a/pkg/solana/relay.go +++ b/pkg/solana/relay.go @@ -122,7 +122,7 @@ func (r *Relayer) NewMedianProvider(rargs relaytypes.RelayArgs, pargs relaytypes } cfg := configWatcher.chain.Config() - transmissionsCache := NewTransmissionsCache(transmissionsID, cfg, configWatcher.reader, r.lggr) + transmissionsCache := NewTransmissionsCache(transmissionsID, relayConfig.ChainID, cfg, configWatcher.reader, r.lggr) return &medianProvider{ configProvider: configWatcher, transmissionsCache: transmissionsCache, @@ -194,7 +194,7 @@ func newConfigProvider(ctx context.Context, lggr logger.Logger, chain Chain, arg if err != nil { return nil, fmt.Errorf("error in NewMedianProvider.chain.Reader: %w", err) } - stateCache := NewStateCache(stateID, chain.Config(), reader, lggr) + stateCache := NewStateCache(stateID, relayConfig.ChainID, chain.Config(), reader, lggr) return &configProvider{ chainID: relayConfig.ChainID, stateID: stateID, diff --git a/pkg/solana/state_cache.go b/pkg/solana/state_cache.go index abd8d2fc7..54293e7e9 100644 --- a/pkg/solana/state_cache.go +++ b/pkg/solana/state_cache.go @@ -18,6 +18,7 @@ import ( "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" ) var ( @@ -28,6 +29,7 @@ type StateCache struct { services.StateMachine // on-chain program + 2x state accounts (state + transmissions) StateID solana.PublicKey + chainID string stateLock sync.RWMutex state State @@ -43,9 +45,10 @@ type StateCache struct { stopCh services.StopChan } -func NewStateCache(stateID solana.PublicKey, cfg config.Config, reader client.Reader, lggr logger.Logger) *StateCache { +func NewStateCache(stateID solana.PublicKey, chainID string, cfg config.Config, reader client.Reader, lggr logger.Logger) *StateCache { return &StateCache{ StateID: stateID, + chainID: chainID, reader: reader, lggr: lggr, cfg: cfg, @@ -124,11 +127,13 @@ func (c *StateCache) fetchState(ctx context.Context) error { c.lggr.Debugf("state fetched for account: %s, result (config digest): %v", c.StateID, hex.EncodeToString(state.Config.LatestConfigDigest[:])) + timestamp := time.Now() + monitor.SetCacheTimestamp(timestamp, "ocr2_median_state", c.chainID, c.StateID.String()) // acquire lock and write to state c.stateLock.Lock() defer c.stateLock.Unlock() c.state = state - c.stateTime = time.Now() + c.stateTime = timestamp return nil } diff --git a/pkg/solana/transmissions_cache.go b/pkg/solana/transmissions_cache.go index c063cf489..8f1ceab5e 100644 --- a/pkg/solana/transmissions_cache.go +++ b/pkg/solana/transmissions_cache.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" ) type TransmissionsCache struct { @@ -24,6 +25,7 @@ type TransmissionsCache struct { // on-chain program + 2x state accounts (state + transmissions) TransmissionsID solana.PublicKey + chainID string ansLock sync.RWMutex answer Answer @@ -39,9 +41,10 @@ type TransmissionsCache struct { stopCh services.StopChan } -func NewTransmissionsCache(transmissionsID solana.PublicKey, cfg config.Config, reader client.Reader, lggr logger.Logger) *TransmissionsCache { +func NewTransmissionsCache(transmissionsID solana.PublicKey, chainID string, cfg config.Config, reader client.Reader, lggr logger.Logger) *TransmissionsCache { return &TransmissionsCache{ TransmissionsID: transmissionsID, + chainID: chainID, reader: reader, lggr: lggr, cfg: cfg, @@ -120,11 +123,13 @@ func (c *TransmissionsCache) fetchLatestTransmission(ctx context.Context) error } c.lggr.Debugf("latest transmission fetched for account: %s, result: %v", c.TransmissionsID, answer) + timestamp := time.Now() + monitor.SetCacheTimestamp(timestamp, "ocr2_median_transmissions", c.chainID, c.TransmissionsID.String()) // acquire lock and write to state c.ansLock.Lock() defer c.ansLock.Unlock() c.answer = answer - c.ansTime = time.Now() + c.ansTime = timestamp return nil } From 1b2aaff36dc1aa1b560cad5d55aa813994f1e266 Mon Sep 17 00:00:00 2001 From: aalu1418 <50029043+aalu1418@users.noreply.github.com> Date: Mon, 10 Jun 2024 12:21:45 -0600 Subject: [PATCH 2/2] add client latency requests --- pkg/solana/client/client.go | 29 +++++++++++++++++++++++++++++ pkg/solana/client/client_test.go | 22 ++++++++++++++++++++++ pkg/solana/monitor/prom.go | 7 +++++++ 3 files changed, 58 insertions(+) diff --git a/pkg/solana/client/client.go b/pkg/solana/client/client.go index f5a36a6fe..e51c93837 100644 --- a/pkg/solana/client/client.go +++ b/pkg/solana/client/client.go @@ -87,6 +87,9 @@ func (c *Client) latency(name string) func() { } func (c *Client) Balance(addr solana.PublicKey) (uint64, error) { + done := c.latency("balance") + defer done() + ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) defer cancel() @@ -105,6 +108,9 @@ func (c *Client) SlotHeight() (uint64, error) { } func (c *Client) SlotHeightWithCommitment(commitment rpc.CommitmentType) (uint64, error) { + done := c.latency("slot_height") + defer done() + ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) defer cancel() v, err, _ := c.requestGroup.Do("GetSlotHeight", func() (interface{}, error) { @@ -114,6 +120,9 @@ func (c *Client) SlotHeightWithCommitment(commitment rpc.CommitmentType) (uint64 } func (c *Client) GetAccountInfoWithOpts(ctx context.Context, addr solana.PublicKey, opts *rpc.GetAccountInfoOpts) (*rpc.GetAccountInfoResult, error) { + done := c.latency("account_info") + defer done() + ctx, cancel := context.WithTimeout(ctx, c.contextDuration) defer cancel() opts.Commitment = c.commitment // overrides passed in value - use defined client commitment type @@ -121,6 +130,9 @@ func (c *Client) GetAccountInfoWithOpts(ctx context.Context, addr solana.PublicK } func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) { + done := c.latency("latest_blockhash") + defer done() + ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) defer cancel() @@ -131,6 +143,9 @@ func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) { } func (c *Client) ChainID() (string, error) { + done := c.latency("chain_id") + defer done() + ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) defer cancel() v, err, _ := c.requestGroup.Do("GetGenesisHash", func() (interface{}, error) { @@ -157,6 +172,9 @@ func (c *Client) ChainID() (string, error) { } func (c *Client) GetFeeForMessage(msg string) (uint64, error) { + done := c.latency("fee_for_message") + defer done() + // msg is base58 encoded data ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration) @@ -174,6 +192,9 @@ func (c *Client) GetFeeForMessage(msg string) (uint64, error) { // https://docs.solana.com/developing/clients/jsonrpc-api#getsignaturestatuses func (c *Client) SignatureStatuses(ctx context.Context, sigs []solana.Signature) ([]*rpc.SignatureStatusesResult, error) { + done := c.latency("signature_statuses") + defer done() + ctx, cancel := context.WithTimeout(ctx, c.contextDuration) defer cancel() @@ -192,6 +213,9 @@ func (c *Client) SignatureStatuses(ctx context.Context, sigs []solana.Signature) // https://docs.solana.com/developing/clients/jsonrpc-api#simulatetransaction // opts - (optional) use `nil` to use defaults func (c *Client) SimulateTx(ctx context.Context, tx *solana.Transaction, opts *rpc.SimulateTransactionOpts) (*rpc.SimulateTransactionResult, error) { + done := c.latency("simulate_tx") + defer done() + ctx, cancel := context.WithTimeout(ctx, c.contextDuration) defer cancel() @@ -215,6 +239,9 @@ func (c *Client) SimulateTx(ctx context.Context, tx *solana.Transaction, opts *r } func (c *Client) SendTx(ctx context.Context, tx *solana.Transaction) (solana.Signature, error) { + done := c.latency("send_tx") + defer done() + ctx, cancel := context.WithTimeout(ctx, c.txTimeout) defer cancel() @@ -235,6 +262,8 @@ func (c *Client) GetLatestBlock() (*rpc.GetBlockResult, error) { } // get block based on slot + done := c.latency("latest_block") + defer done() ctx, cancel := context.WithTimeout(context.Background(), c.txTimeout) defer cancel() v, err, _ := c.requestGroup.Do("GetBlockWithOpts", func() (interface{}, error) { diff --git a/pkg/solana/client/client_test.go b/pkg/solana/client/client_test.go index 4427330c4..ab9dba263 100644 --- a/pkg/solana/client/client_test.go +++ b/pkg/solana/client/client_test.go @@ -13,12 +13,15 @@ import ( "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/programs/system" "github.com/gagliardetto/solana-go/rpc" + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" ) func TestClient_Reader_Integration(t *testing.T) { @@ -292,3 +295,22 @@ func TestClient_SendTxDuplicates_Integration(t *testing.T) { assert.NoError(t, err) assert.Equal(t, uint64(5_000), initBal-endBal) } + +func TestClientLatency(t *testing.T) { + c := Client{} + v := 100 + n := t.Name() + uuid.NewString() + f := func() { + done := c.latency(n) + defer done() + time.Sleep(time.Duration(v) * time.Millisecond) + } + f() + g, err := monitor.GetClientLatency(n, c.url) + require.NoError(t, err) + val := testutil.ToFloat64(g) + + // check within expected range + assert.GreaterOrEqual(t, val, float64(v)) + assert.LessOrEqual(t, val, float64(v)*1.05) +} diff --git a/pkg/solana/monitor/prom.go b/pkg/solana/monitor/prom.go index a87167b6b..ea1ecadef 100644 --- a/pkg/solana/monitor/prom.go +++ b/pkg/solana/monitor/prom.go @@ -44,3 +44,10 @@ func SetClientLatency(d time.Duration, request, url string) { "url": url, }).Set(float64(d.Milliseconds())) } + +func GetClientLatency(request, url string) (prometheus.Gauge, error) { + return promClientReq.GetMetricWith(prometheus.Labels{ + "request": request, + "url": url, + }) +}