From 989ccca07498106e5b02700ec4c7a72091fea747 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Tue, 20 Aug 2024 00:33:24 -0500 Subject: [PATCH 1/7] add Solana RPC status check and refactor EVM and bitcoin RPC status check --- .../chains/bitcoin/observer/observer.go | 67 +------------------ .../chains/bitcoin/observer/rpc_status.go | 33 +++++++++ zetaclient/chains/bitcoin/rpc/rpc.go | 54 +++++++++++++++ .../chains/bitcoin/rpc/rpc_live_test.go | 19 +++++- zetaclient/chains/evm/observer/observer.go | 44 ------------ zetaclient/chains/evm/observer/rpc_status.go | 32 +++++++++ zetaclient/chains/evm/rpc/rpc.go | 46 +++++++++++++ zetaclient/chains/evm/rpc/rpc_live_test.go | 11 +++ zetaclient/chains/interfaces/interfaces.go | 1 + zetaclient/chains/solana/observer/observer.go | 3 + .../chains/solana/observer/rpc_status.go | 31 +++++++++ zetaclient/chains/solana/rpc/rpc.go | 41 ++++++++++++ zetaclient/chains/solana/rpc/rpc_live_test.go | 23 +++++-- zetaclient/common/constant.go | 5 ++ zetaclient/testutils/mocks/solana_rpc.go | 32 ++++++++- 15 files changed, 323 insertions(+), 119 deletions(-) create mode 100644 zetaclient/chains/bitcoin/observer/rpc_status.go create mode 100644 zetaclient/chains/evm/observer/rpc_status.go create mode 100644 zetaclient/chains/solana/observer/rpc_status.go diff --git a/zetaclient/chains/bitcoin/observer/observer.go b/zetaclient/chains/bitcoin/observer/observer.go index 6a15173c33..9c5ac710a4 100644 --- a/zetaclient/chains/bitcoin/observer/observer.go +++ b/zetaclient/chains/bitcoin/observer/observer.go @@ -8,7 +8,6 @@ import ( "math" "math/big" "sort" - "time" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg" @@ -224,70 +223,6 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } -// WatchRPCStatus watches the RPC status of the Bitcoin chain -// TODO(revamp): move ticker related functions to a specific file -// TODO(revamp): move inner logic in a separate function -func (ob *Observer) WatchRPCStatus(_ context.Context) error { - ob.logger.Chain.Info().Msgf("RPCStatus is starting") - ticker := time.NewTicker(60 * time.Second) - - for { - select { - case <-ticker.C: - if !ob.GetChainParams().IsSupported { - continue - } - - bn, err := ob.btcClient.GetBlockCount() - if err != nil { - ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") - continue - } - - hash, err := ob.btcClient.GetBlockHash(bn) - if err != nil { - ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") - continue - } - - header, err := ob.btcClient.GetBlockHeader(hash) - if err != nil { - ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") - continue - } - - blockTime := header.Timestamp - elapsedSeconds := time.Since(blockTime).Seconds() - if elapsedSeconds > 1200 { - ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") - continue - } - - tssAddr := ob.TSS().BTCAddressWitnessPubkeyHash() - res, err := ob.btcClient.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddr}) - if err != nil { - ob.logger.Chain.Error(). - Err(err). - Msg("RPC status check: can't list utxos of TSS address; wallet or loaded? TSS address is not imported? ") - continue - } - - if len(res) == 0 { - ob.logger.Chain.Error(). - Err(err). - Msg("RPC status check: TSS address has no utxos; TSS address is not imported? ") - continue - } - - ob.logger.Chain.Info(). - Msgf("[OK] RPC status check: latest block number %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddr, len(res)) - - case <-ob.StopChannel(): - return nil - } - } -} - // GetPendingNonce returns the artificial pending nonce // Note: pending nonce is accessed concurrently func (ob *Observer) GetPendingNonce() uint64 { @@ -399,12 +334,12 @@ func (ob *Observer) PostGasPrice(ctx context.Context) error { // TODO(revamp): move in upper package to separate file (e.g., rpc.go) func GetSenderAddressByVin(rpcClient interfaces.BTCRPCClient, vin btcjson.Vin, net *chaincfg.Params) (string, error) { // query previous raw transaction by txid - // GetTransaction requires reconfiguring the bitcoin node (txindex=1), so we use GetRawTransaction instead hash, err := chainhash.NewHashFromStr(vin.Txid) if err != nil { return "", err } + // this requires running bitcoin node with 'txindex=1' tx, err := rpcClient.GetRawTransaction(hash) if err != nil { return "", errors.Wrapf(err, "error getting raw transaction %s", vin.Txid) diff --git a/zetaclient/chains/bitcoin/observer/rpc_status.go b/zetaclient/chains/bitcoin/observer/rpc_status.go new file mode 100644 index 0000000000..51511268ca --- /dev/null +++ b/zetaclient/chains/bitcoin/observer/rpc_status.go @@ -0,0 +1,33 @@ +package observer + +import ( + "context" + "time" + + "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc" + "github.com/zeta-chain/zetacore/zetaclient/common" +) + +// WatchRPCStatus watches the RPC status of the Bitcoin chain +func (ob *Observer) WatchRPCStatus(_ context.Context) error { + ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) + + ticker := time.NewTicker(common.RPCStatusCheckInterval) + for { + select { + case <-ticker.C: + if !ob.GetChainParams().IsSupported { + continue + } + + tssAddress := ob.TSS().BTCAddressWitnessPubkeyHash() + err := rpc.CheckRPCStatus(ob.btcClient, tssAddress, ob.Logger().Chain) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") + } + + case <-ob.StopChannel(): + return nil + } + } +} diff --git a/zetaclient/chains/bitcoin/rpc/rpc.go b/zetaclient/chains/bitcoin/rpc/rpc.go index 079b6fa298..9f8101438b 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc.go +++ b/zetaclient/chains/bitcoin/rpc/rpc.go @@ -2,12 +2,15 @@ package rpc import ( "fmt" + "time" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/rpcclient" + "github.com/btcsuite/btcutil" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" @@ -20,6 +23,10 @@ const ( // defaultTestnetFeeRate is the default fee rate for testnet, 10 sat/byte defaultTestnetFeeRate = 10 + + // rpcLatencyThreshold is the threshold for RPC latency to be considered unhealthy + // Bitcoin block time is 10 minutes, 1200s (20 minutes) is a reasonable threshold for Bitcoin + rpcLatencyThreshold = 1200 ) // NewRPCClient creates a new RPC client by the given config. @@ -157,3 +164,50 @@ func GetRecentFeeRate(rpcClient interfaces.BTCRPCClient, netParams *chaincfg.Par // #nosec G115 always in range return uint64(highestRate), nil } + +// CheckRPCStatus checks the RPC status of the evm chain +func CheckRPCStatus(client interfaces.BTCRPCClient, tssAddress btcutil.Address, logger zerolog.Logger) error { + // query latest block number + bn, err := client.GetBlockCount() + if err != nil { + return errors.Wrap(err, "GetBlockCount error: RPC down?") + } + + // query latest block header + hash, err := client.GetBlockHash(bn) + if err != nil { + return errors.Wrap(err, "GetBlockHash error: RPC down?") + } + + // query latest block header thru hash + header, err := client.GetBlockHeader(hash) + if err != nil { + return errors.Wrap(err, "GetBlockHeader error: RPC down?") + } + + // latest block should not be too old + blockTime := header.Timestamp + elapsedSeconds := time.Since(blockTime).Seconds() + if elapsedSeconds > rpcLatencyThreshold { + return errors.Errorf( + "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", + bn, + elapsedSeconds, + ) + } + + // should be able to list utxos owned by TSS address + res, err := client.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddress}) + if err != nil { + return errors.Wrap(err, "can't list utxos of TSS address; TSS address is not imported?") + } + + // TSS address should have utxos + if len(res) == 0 { + return errors.New("TSS address has no utxos; TSS address is not imported?") + } + + logger.Info(). + Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddress, len(res)) + return nil +} diff --git a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go index 54964d7403..818928f71f 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go +++ b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go @@ -219,6 +219,7 @@ func TestBitcoinObserverLive(t *testing.T) { // suite.Run(t, new(BitcoinClientTestSuite)) // LiveTestNewRPCClient(t) + // LiveTestCheckRPCStatus(t) // LiveTestGetBlockHeightByHash(t) // LiveTestBitcoinFeeRate(t) // LiveTestAvgFeeRateMainnetMempoolSpace(t) @@ -232,7 +233,7 @@ func LiveTestNewRPCClient(t *testing.T) { btcConfig := config.BTCConfig{ RPCUsername: "user", RPCPassword: "pass", - RPCHost: "bitcoin.rpc.zetachain.com/6315704c-49bc-4649-8b9d-e9278a1dfeb3", + RPCHost: os.Getenv("BTC_RPC_TESTNET"), RPCParams: "mainnet", } @@ -246,6 +247,22 @@ func LiveTestNewRPCClient(t *testing.T) { require.Greater(t, bn, int64(0)) } +// LiveTestCheckRPCStatus checks the RPC status of the Bitcoin chain +func LiveTestCheckRPCStatus(t *testing.T) { + // setup Bitcoin client + chainID := chains.BitcoinMainnet.ChainId + client, err := createRPCClient(chainID) + require.NoError(t, err) + + // decode tss address + tssAddress, err := chains.DecodeBtcAddress(testutils.TSSAddressBTCMainnet, chainID) + require.NoError(t, err) + + // check RPC status + err = rpc.CheckRPCStatus(client, tssAddress, log.Logger) + require.NoError(t, err) +} + // LiveTestGetBlockHeightByHash queries Bitcoin block height by hash func LiveTestGetBlockHeightByHash(t *testing.T) { // setup Bitcoin client diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index 6dd1234018..8d451e5d01 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -7,7 +7,6 @@ import ( "math" "math/big" "strings" - "time" ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -202,49 +201,6 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } -// WatchRPCStatus watches the RPC status of the evm chain -// TODO(revamp): move ticker to ticker file -// TODO(revamp): move inner logic to a separate function -func (ob *Observer) WatchRPCStatus(ctx context.Context) error { - ob.Logger().Chain.Info().Msgf("Starting RPC status check for chain %d", ob.Chain().ChainId) - ticker := time.NewTicker(60 * time.Second) - for { - select { - case <-ticker.C: - if !ob.GetChainParams().IsSupported { - continue - } - bn, err := ob.evmClient.BlockNumber(ctx) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") - continue - } - gasPrice, err := ob.evmClient.SuggestGasPrice(ctx) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") - continue - } - header, err := ob.evmClient.HeaderByNumber(ctx, new(big.Int).SetUint64(bn)) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") - continue - } - // #nosec G115 always in range - blockTime := time.Unix(int64(header.Time), 0).UTC() - elapsedSeconds := time.Since(blockTime).Seconds() - if elapsedSeconds > 100 { - ob.Logger().Chain.Warn(). - Msgf("RPC Status Check warning: RPC stale or chain stuck (check explorer)? Latest block %d timestamp is %.0fs ago", bn, elapsedSeconds) - continue - } - ob.Logger().Chain.Info(). - Msgf("[OK] RPC status: latest block num %d, timestamp %s ( %.0fs ago), suggested gas price %d", header.Number, blockTime.String(), elapsedSeconds, gasPrice.Uint64()) - case <-ob.StopChannel(): - return nil - } - } -} - // SetTxNReceipt sets the receipt and transaction in memory func (ob *Observer) SetTxNReceipt(nonce uint64, receipt *ethtypes.Receipt, transaction *ethtypes.Transaction) { ob.Mu().Lock() diff --git a/zetaclient/chains/evm/observer/rpc_status.go b/zetaclient/chains/evm/observer/rpc_status.go new file mode 100644 index 0000000000..51ddc937d9 --- /dev/null +++ b/zetaclient/chains/evm/observer/rpc_status.go @@ -0,0 +1,32 @@ +// Package observer implements the EVM chain observer +package observer + +import ( + "context" + "time" + + "github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc" + "github.com/zeta-chain/zetacore/zetaclient/common" +) + +// WatchRPCStatus watches the RPC status of the evm chain +func (ob *Observer) WatchRPCStatus(ctx context.Context) error { + ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) + + ticker := time.NewTicker(common.RPCStatusCheckInterval) + for { + select { + case <-ticker.C: + if !ob.GetChainParams().IsSupported { + continue + } + + err := rpc.CheckRPCStatus(ctx, ob.evmClient, ob.Logger().Chain) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") + } + case <-ob.StopChannel(): + return nil + } + } +} diff --git a/zetaclient/chains/evm/rpc/rpc.go b/zetaclient/chains/evm/rpc/rpc.go index 6fcc3d007c..f9e6d71c73 100644 --- a/zetaclient/chains/evm/rpc/rpc.go +++ b/zetaclient/chains/evm/rpc/rpc.go @@ -2,13 +2,22 @@ package rpc import ( "context" + "math/big" + "time" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" ) +const ( + // rpcLatencyThreshold is the threshold for RPC latency to be considered unhealthy + // 100s is a reasonable threshold for most EVM chains + rpcLatencyThreshold = 100 +) + // IsTxConfirmed checks if the transaction is confirmed with given confirmations func IsTxConfirmed( ctx context.Context, @@ -50,3 +59,40 @@ func IsTxConfirmed( return blocks >= confirmations, nil } + +// CheckRPCStatus checks the RPC status of the evm chain +func CheckRPCStatus(ctx context.Context, client interfaces.EVMRPCClient, logger zerolog.Logger) error { + // query latest block number + bn, err := client.BlockNumber(ctx) + if err != nil { + return errors.Wrap(err, "BlockNumber error: RPC down?") + } + + // query suggested gas price + gasPrice, err := client.SuggestGasPrice(ctx) + if err != nil { + return errors.Wrap(err, "SuggestGasPrice error: RPC down?") + } + + // query latest block header + header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(bn)) + if err != nil { + return errors.Wrap(err, "HeaderByNumber error: RPC down?") + } + + // latest block should not be too old + // #nosec G115 always in range + blockTime := time.Unix(int64(header.Time), 0).UTC() + elapsedSeconds := time.Since(blockTime).Seconds() + if elapsedSeconds > rpcLatencyThreshold { + return errors.Errorf( + "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", + bn, + elapsedSeconds, + ) + } + + logger.Info(). + Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.0fs ago), gas price %s", header.Number, blockTime.String(), elapsedSeconds, gasPrice.String()) + return nil +} diff --git a/zetaclient/chains/evm/rpc/rpc_live_test.go b/zetaclient/chains/evm/rpc/rpc_live_test.go index 0c420c830e..0a97b00ab6 100644 --- a/zetaclient/chains/evm/rpc/rpc_live_test.go +++ b/zetaclient/chains/evm/rpc/rpc_live_test.go @@ -5,6 +5,7 @@ import ( "math" "github.com/ethereum/go-ethereum/ethclient" + "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc" @@ -21,6 +22,7 @@ const ( // Test_EVMRPCLive is a phony test to run each live test individually func Test_EVMRPCLive(t *testing.T) { // LiveTest_IsTxConfirmed(t) + // LiveTest_CheckRPCStatus(t) } func LiveTest_IsTxConfirmed(t *testing.T) { @@ -43,3 +45,12 @@ func LiveTest_IsTxConfirmed(t *testing.T) { require.False(t, confirmed) }) } + +func LiveTest_CheckRPCStatus(t *testing.T) { + client, err := ethclient.Dial(URLEthMainnet) + require.NoError(t, err) + + ctx := context.Background() + err = rpc.CheckRPCStatus(ctx, client, log.Logger) + require.NoError(t, err) +} diff --git a/zetaclient/chains/interfaces/interfaces.go b/zetaclient/chains/interfaces/interfaces.go index 5e1148f81b..15e9c395bd 100644 --- a/zetaclient/chains/interfaces/interfaces.go +++ b/zetaclient/chains/interfaces/interfaces.go @@ -194,6 +194,7 @@ type SolanaRPCClient interface { GetVersion(ctx context.Context) (*solrpc.GetVersionResult, error) GetHealth(ctx context.Context) (string, error) GetSlot(ctx context.Context, commitment solrpc.CommitmentType) (uint64, error) + GetBlockTime(ctx context.Context, block uint64) (*solana.UnixTimeSeconds, error) GetAccountInfo(ctx context.Context, account solana.PublicKey) (*solrpc.GetAccountInfoResult, error) GetBalance( ctx context.Context, diff --git a/zetaclient/chains/solana/observer/observer.go b/zetaclient/chains/solana/observer/observer.go index ad135aeecf..75f5b03dc6 100644 --- a/zetaclient/chains/solana/observer/observer.go +++ b/zetaclient/chains/solana/observer/observer.go @@ -130,6 +130,9 @@ func (ob *Observer) Start(ctx context.Context) { // watch zetacore for Solana inbound trackers bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) + + // watch RPC status of the Solana chain + bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // LoadLastTxScanned loads the last scanned tx from the database. diff --git a/zetaclient/chains/solana/observer/rpc_status.go b/zetaclient/chains/solana/observer/rpc_status.go new file mode 100644 index 0000000000..c7fe070bd5 --- /dev/null +++ b/zetaclient/chains/solana/observer/rpc_status.go @@ -0,0 +1,31 @@ +package observer + +import ( + "context" + "time" + + "github.com/zeta-chain/zetacore/zetaclient/chains/solana/rpc" + "github.com/zeta-chain/zetacore/zetaclient/common" +) + +// WatchRPCStatus watches the RPC status of the solana chain +func (ob *Observer) WatchRPCStatus(ctx context.Context) error { + ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) + + ticker := time.NewTicker(common.RPCStatusCheckInterval) + for { + select { + case <-ticker.C: + if !ob.GetChainParams().IsSupported { + continue + } + + err := rpc.CheckRPCStatus(ctx, ob.solClient, ob.Logger().Chain) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") + } + case <-ob.StopChannel(): + return nil + } + } +} diff --git a/zetaclient/chains/solana/rpc/rpc.go b/zetaclient/chains/solana/rpc/rpc.go index d83846f1e1..bc761fec02 100644 --- a/zetaclient/chains/solana/rpc/rpc.go +++ b/zetaclient/chains/solana/rpc/rpc.go @@ -2,10 +2,12 @@ package rpc import ( "context" + "time" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" ) @@ -13,6 +15,10 @@ import ( const ( // defaultPageLimit is the default number of signatures to fetch in one GetSignaturesForAddressWithOpts call DefaultPageLimit = 1000 + + // rpcLatencyThreshold is the threshold for RPC latency to be considered unhealthy + // The 'HEALTH_CHECK_SLOT_DISTANCE' is default to 150 slots, which is 150 * 0.4s = 60s + rpcLatencyThreshold = 60 ) // GetFirstSignatureForAddress searches the first signature for the given address. @@ -116,3 +122,38 @@ func GetSignaturesForAddressUntil( return allSignatures, nil } + +// CheckRPCStatus checks the RPC status of the solana chain +func CheckRPCStatus(ctx context.Context, client interfaces.SolanaRPCClient, logger zerolog.Logger) error { + // query solana health (always return "ok" unless --trusted-validator is provided) + _, err := client.GetHealth(ctx) + if err != nil { + return errors.Wrap(err, "GetHealth error: RPC down?") + } + + // query latest slot + slot, err := client.GetSlot(ctx, rpc.CommitmentFinalized) + if err != nil { + return errors.Wrap(err, "GetSlot error: RPC down?") + } + + // query latest block time + blockTime, err := client.GetBlockTime(ctx, slot) + if err != nil { + return errors.Wrap(err, "GetBlockTime error: RPC down?") + } + + // latest block should not be too old + elapsedSeconds := time.Since(blockTime.Time()).Seconds() + if elapsedSeconds > rpcLatencyThreshold { + return errors.Errorf( + "Latest slot %d is %.0fs old, RPC stale or chain stuck (check explorer)?", + slot, + elapsedSeconds, + ) + } + + logger.Info(). + Msgf("RPC Status [OK]: latest slot %d, timestamp %s (%.0fs ago)", slot, blockTime.String(), elapsedSeconds) + return nil +} diff --git a/zetaclient/chains/solana/rpc/rpc_live_test.go b/zetaclient/chains/solana/rpc/rpc_live_test.go index c8455588ff..3b3cf9cc91 100644 --- a/zetaclient/chains/solana/rpc/rpc_live_test.go +++ b/zetaclient/chains/solana/rpc/rpc_live_test.go @@ -2,11 +2,11 @@ package rpc_test import ( "context" - "os" "testing" "github.com/gagliardetto/solana-go" solanarpc "github.com/gagliardetto/solana-go/rpc" + "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/zetaclient/chains/solana/rpc" ) @@ -15,18 +15,18 @@ import ( func Test_SolanaRPCLive(t *testing.T) { // LiveTest_GetFirstSignatureForAddress(t) // LiveTest_GetSignaturesForAddressUntil(t) + // LiveTest_CheckRPCStatus(t) } func LiveTest_GetFirstSignatureForAddress(t *testing.T) { // create a Solana devnet RPC client - urlDevnet := os.Getenv("TEST_SOL_URL_DEVNET") - client := solanarpc.New(urlDevnet) + client := solanarpc.New(solanarpc.DevNet_RPC) // program address address := solana.MustPublicKeyFromBase58("2kJndCL9NBR36ySiQ4bmArs4YgWQu67LmCDfLzk5Gb7s") // get the first signature for the address (one by one) - sig, err := rpc.GetFirstSignatureForAddress(context.TODO(), client, address, 1) + sig, err := rpc.GetFirstSignatureForAddress(context.Background(), client, address, 1) require.NoError(t, err) // assert @@ -36,8 +36,7 @@ func LiveTest_GetFirstSignatureForAddress(t *testing.T) { func LiveTest_GetSignaturesForAddressUntil(t *testing.T) { // create a Solana devnet RPC client - urlDevnet := os.Getenv("TEST_SOL_URL_DEVNET") - client := solanarpc.New(urlDevnet) + client := solanarpc.New(solanarpc.DevNet_RPC) // program address address := solana.MustPublicKeyFromBase58("2kJndCL9NBR36ySiQ4bmArs4YgWQu67LmCDfLzk5Gb7s") @@ -46,7 +45,7 @@ func LiveTest_GetSignaturesForAddressUntil(t *testing.T) { ) // get all signatures for the address until the first signature (one by one) - sigs, err := rpc.GetSignaturesForAddressUntil(context.TODO(), client, address, untilSig, 1) + sigs, err := rpc.GetSignaturesForAddressUntil(context.Background(), client, address, untilSig, 1) require.NoError(t, err) // assert @@ -57,3 +56,13 @@ func LiveTest_GetSignaturesForAddressUntil(t *testing.T) { require.NotEqual(t, untilSig, sig.Signature) } } + +func LiveTest_CheckRPCStatus(t *testing.T) { + // create a Solana devnet RPC client + client := solanarpc.New(solanarpc.DevNet_RPC) + + // check the RPC status + ctx := context.Background() + err := rpc.CheckRPCStatus(ctx, client, log.Logger) + require.NoError(t, err) +} diff --git a/zetaclient/common/constant.go b/zetaclient/common/constant.go index 64d906ca0d..c54acade92 100644 --- a/zetaclient/common/constant.go +++ b/zetaclient/common/constant.go @@ -1,5 +1,7 @@ package common +import "time" + const ( // DefaultGasPriceMultiplier is the default gas price multiplier for all chains DefaultGasPriceMultiplier = 1.0 @@ -9,4 +11,7 @@ const ( // BTCOutboundGasPriceMultiplier is the default gas price multiplier for BTC outbond txs BTCOutboundGasPriceMultiplier = 2.0 + + // RPCStatusCheckInterval is the interval to check RPC status, 1 minute + RPCStatusCheckInterval = time.Minute ) diff --git a/zetaclient/testutils/mocks/solana_rpc.go b/zetaclient/testutils/mocks/solana_rpc.go index fad147037c..6c4d2829e3 100644 --- a/zetaclient/testutils/mocks/solana_rpc.go +++ b/zetaclient/testutils/mocks/solana_rpc.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. +// Code generated by mockery v2.42.2. DO NOT EDIT. package mocks @@ -77,6 +77,36 @@ func (_m *SolanaRPCClient) GetBalance(ctx context.Context, account solana.Public return r0, r1 } +// GetBlockTime provides a mock function with given fields: ctx, block +func (_m *SolanaRPCClient) GetBlockTime(ctx context.Context, block uint64) (*solana.UnixTimeSeconds, error) { + ret := _m.Called(ctx, block) + + if len(ret) == 0 { + panic("no return value specified for GetBlockTime") + } + + var r0 *solana.UnixTimeSeconds + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) (*solana.UnixTimeSeconds, error)); ok { + return rf(ctx, block) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64) *solana.UnixTimeSeconds); ok { + r0 = rf(ctx, block) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*solana.UnixTimeSeconds) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, block) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetConfirmedTransactionWithOpts provides a mock function with given fields: ctx, signature, opts func (_m *SolanaRPCClient) GetConfirmedTransactionWithOpts(ctx context.Context, signature solana.Signature, opts *rpc.GetTransactionOpts) (*rpc.TransactionWithMeta, error) { ret := _m.Called(ctx, signature, opts) From b0de92a34dbac160b1a119ab461cd72578b8e6be Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Tue, 20 Aug 2024 00:36:45 -0500 Subject: [PATCH 2/7] add changelog entry --- changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/changelog.md b/changelog.md index d00252a146..b323e8b6c7 100644 --- a/changelog.md +++ b/changelog.md @@ -14,6 +14,7 @@ * [2681](https://github.com/zeta-chain/node/pull/2681) - implement `MsgUpdateERC20CustodyPauseStatus` to pause or unpause ERC20 Custody contract (to be used for the migration process for smart contract V2) * [2644](https://github.com/zeta-chain/node/pull/2644) - add created_timestamp to cctx status * [2673](https://github.com/zeta-chain/node/pull/2673) - add relayer key importer, encryption and decryption +* [2751](https://github.com/zeta-chain/node/pull/2751) - add RPC status check for Solana chain ### Refactor From fedd2fead3d17535cd748e8bac7cb97cbae906be Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Tue, 20 Aug 2024 17:22:50 -0500 Subject: [PATCH 3/7] add RPC alert latency to config file --- zetaclient/chains/base/observer.go | 10 +++++++ zetaclient/chains/base/observer_test.go | 27 +++++++++++++++++++ .../chains/bitcoin/observer/observer.go | 2 ++ .../chains/bitcoin/observer/observer_test.go | 2 ++ .../chains/bitcoin/observer/outbound_test.go | 2 +- .../chains/bitcoin/observer/rpc_status.go | 3 ++- zetaclient/chains/bitcoin/rpc/rpc.go | 18 ++++++++++--- .../chains/bitcoin/rpc/rpc_live_test.go | 4 +-- zetaclient/chains/evm/observer/observer.go | 1 + zetaclient/chains/evm/observer/rpc_status.go | 3 ++- zetaclient/chains/evm/rpc/rpc.go | 18 ++++++++++--- zetaclient/chains/evm/rpc/rpc_live_test.go | 2 +- .../chains/solana/observer/inbound_test.go | 18 ++++++++++--- zetaclient/chains/solana/observer/observer.go | 2 ++ .../chains/solana/observer/observer_test.go | 1 + .../chains/solana/observer/outbound_test.go | 2 +- .../chains/solana/observer/rpc_status.go | 3 ++- zetaclient/chains/solana/rpc/rpc.go | 18 ++++++++++--- zetaclient/chains/solana/rpc/rpc_live_test.go | 2 +- zetaclient/config/config_chain.go | 17 +++++++----- zetaclient/config/types.go | 17 +++++++----- zetaclient/orchestrator/bootstrap.go | 2 ++ 22 files changed, 135 insertions(+), 39 deletions(-) diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index cf5491366b..1778e2d202 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -60,6 +60,9 @@ type Observer struct { // lastTxScanned is the last transaction hash scanned by the observer lastTxScanned string + // rpcAlertLatency is the threshold of RPC latency (in seconds) to trigger an alert + rpcAlertLatency uint64 + // blockCache is the cache for blocks blockCache *lru.Cache @@ -92,6 +95,7 @@ func NewObserver( tss interfaces.TSSSigner, blockCacheSize int, headerCacheSize int, + rpcAlertLatency uint64, ts *metrics.TelemetryServer, database *db.DB, logger Logger, @@ -104,6 +108,7 @@ func NewObserver( lastBlock: 0, lastBlockScanned: 0, lastTxScanned: "", + rpcAlertLatency: rpcAlertLatency, ts: ts, db: database, mu: &sync.Mutex{}, @@ -246,6 +251,11 @@ func (ob *Observer) WithLastTxScanned(txHash string) *Observer { return ob } +// RPCAlertLatency returns the RPC alert latency for the observer. +func (ob *Observer) RPCAlertLatency() uint64 { + return ob.rpcAlertLatency +} + // BlockCache returns the block cache for the observer. func (ob *Observer) BlockCache() *lru.Cache { return ob.blockCache diff --git a/zetaclient/chains/base/observer_test.go b/zetaclient/chains/base/observer_test.go index 009d9a53cf..4db8a19bbc 100644 --- a/zetaclient/chains/base/observer_test.go +++ b/zetaclient/chains/base/observer_test.go @@ -41,6 +41,7 @@ func createObserver(t *testing.T, chain chains.Chain) *base.Observer { tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, + 60, nil, database, logger, @@ -122,6 +123,7 @@ func TestNewObserver(t *testing.T) { tt.tss, tt.blockCacheSize, tt.headerCacheSize, + 60, nil, database, base.DefaultLogger(), @@ -159,6 +161,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithChain(chains.BscMainnet) require.Equal(t, newChain, ob.Chain()) }) + t.Run("should be able to update chain params", func(t *testing.T) { ob := createObserver(t, chain) @@ -167,6 +170,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithChainParams(newChainParams) require.True(t, observertypes.ChainParamsEqual(newChainParams, ob.ChainParams())) }) + t.Run("should be able to update zetacore client", func(t *testing.T) { ob := createObserver(t, chain) @@ -175,6 +179,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithZetacoreClient(newZetacoreClient) require.Equal(t, newZetacoreClient, ob.ZetacoreClient()) }) + t.Run("should be able to update tss", func(t *testing.T) { ob := createObserver(t, chain) @@ -183,6 +188,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithTSS(newTSS) require.Equal(t, newTSS, ob.TSS()) }) + t.Run("should be able to update last block", func(t *testing.T) { ob := createObserver(t, chain) @@ -191,6 +197,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithLastBlock(newLastBlock) require.Equal(t, newLastBlock, ob.LastBlock()) }) + t.Run("should be able to update last block scanned", func(t *testing.T) { ob := createObserver(t, chain) @@ -199,6 +206,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithLastBlockScanned(newLastBlockScanned) require.Equal(t, newLastBlockScanned, ob.LastBlockScanned()) }) + t.Run("should be able to update last tx scanned", func(t *testing.T) { ob := createObserver(t, chain) @@ -207,6 +215,14 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithLastTxScanned(newLastTxScanned) require.Equal(t, newLastTxScanned, ob.LastTxScanned()) }) + + t.Run("should be able to get rpc alert latency", func(t *testing.T) { + ob := createObserver(t, chain) + + // get rpc alert latency + require.EqualValues(t, 60, ob.RPCAlertLatency()) + }) + t.Run("should be able to replace block cache", func(t *testing.T) { ob := createObserver(t, chain) @@ -217,6 +233,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithBlockCache(newBlockCache) require.Equal(t, newBlockCache, ob.BlockCache()) }) + t.Run("should be able to replace header cache", func(t *testing.T) { ob := createObserver(t, chain) @@ -227,6 +244,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithHeaderCache(newHeadersCache) require.Equal(t, newHeadersCache, ob.HeaderCache()) }) + t.Run("should be able to update telemetry server", func(t *testing.T) { ob := createObserver(t, chain) @@ -235,6 +253,7 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithTelemetryServer(newServer) require.Equal(t, newServer, ob.TelemetryServer()) }) + t.Run("should be able to get logger", func(t *testing.T) { ob := createObserver(t, chain) logger := ob.Logger() @@ -307,6 +326,7 @@ func TestLoadLastBlockScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 100, ob.LastBlockScanned()) }) + t.Run("latest block scanned should be 0 if not found in db", func(t *testing.T) { // create observer and open db ob := createObserver(t, chain) @@ -316,6 +336,7 @@ func TestLoadLastBlockScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 0, ob.LastBlockScanned()) }) + t.Run("should overwrite last block scanned if env var is set", func(t *testing.T) { // create observer and open db ob := createObserver(t, chain) @@ -331,6 +352,7 @@ func TestLoadLastBlockScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 101, ob.LastBlockScanned()) }) + t.Run("last block scanned should remain 0 if env var is set to latest", func(t *testing.T) { // create observer and open db ob := createObserver(t, chain) @@ -346,6 +368,7 @@ func TestLoadLastBlockScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 0, ob.LastBlockScanned()) }) + t.Run("should return error on invalid env var", func(t *testing.T) { // create observer and open db ob := createObserver(t, chain) @@ -392,6 +415,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 100, lastBlockScanned) }) + t.Run("should return error when last block scanned not found in db", func(t *testing.T) { // create empty db ob := createObserver(t, chain) @@ -417,6 +441,7 @@ func TestLoadLastTxScanned(t *testing.T) { ob.LoadLastTxScanned() require.EqualValues(t, lastTx, ob.LastTxScanned()) }) + t.Run("latest tx scanned should be empty if not found in db", func(t *testing.T) { // create observer and open db ob := createObserver(t, chain) @@ -425,6 +450,7 @@ func TestLoadLastTxScanned(t *testing.T) { ob.LoadLastTxScanned() require.Empty(t, ob.LastTxScanned()) }) + t.Run("should overwrite last tx scanned if env var is set", func(t *testing.T) { // create observer and open db ob := createObserver(t, chain) @@ -480,6 +506,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, lastTx, lastTxScanned) }) + t.Run("should return error when last tx scanned not found in db", func(t *testing.T) { // create empty db ob := createObserver(t, chain) diff --git a/zetaclient/chains/bitcoin/observer/observer.go b/zetaclient/chains/bitcoin/observer/observer.go index 9c5ac710a4..65adf2f932 100644 --- a/zetaclient/chains/bitcoin/observer/observer.go +++ b/zetaclient/chains/bitcoin/observer/observer.go @@ -114,6 +114,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, + rpcAlertLatency uint64, database *db.DB, logger base.Logger, ts *metrics.TelemetryServer, @@ -126,6 +127,7 @@ func NewObserver( tss, btcBlocksPerDay, base.DefaultHeaderCacheSize, + rpcAlertLatency, ts, database, logger, diff --git a/zetaclient/chains/bitcoin/observer/observer_test.go b/zetaclient/chains/bitcoin/observer/observer_test.go index c873dfb8d7..8b4469a9ab 100644 --- a/zetaclient/chains/bitcoin/observer/observer_test.go +++ b/zetaclient/chains/bitcoin/observer/observer_test.go @@ -83,6 +83,7 @@ func MockBTCObserver( params, nil, nil, + 60, database, base.Logger{}, nil, @@ -169,6 +170,7 @@ func Test_NewObserver(t *testing.T) { tt.chainParams, tt.coreClient, tt.tss, + 60, database, tt.logger, tt.ts, diff --git a/zetaclient/chains/bitcoin/observer/outbound_test.go b/zetaclient/chains/bitcoin/observer/outbound_test.go index b1f7ae309b..f3b1755e29 100644 --- a/zetaclient/chains/bitcoin/observer/outbound_test.go +++ b/zetaclient/chains/bitcoin/observer/outbound_test.go @@ -32,7 +32,7 @@ func MockBTCObserverMainnet(t *testing.T) *Observer { require.NoError(t, err) // create Bitcoin observer - ob, err := NewObserver(chain, btcClient, params, nil, tss, database, base.Logger{}, nil) + ob, err := NewObserver(chain, btcClient, params, nil, tss, 60, database, base.Logger{}, nil) require.NoError(t, err) return ob diff --git a/zetaclient/chains/bitcoin/observer/rpc_status.go b/zetaclient/chains/bitcoin/observer/rpc_status.go index 51511268ca..f9f6ad97b4 100644 --- a/zetaclient/chains/bitcoin/observer/rpc_status.go +++ b/zetaclient/chains/bitcoin/observer/rpc_status.go @@ -20,8 +20,9 @@ func (ob *Observer) WatchRPCStatus(_ context.Context) error { continue } + alertLatency := ob.RPCAlertLatency() tssAddress := ob.TSS().BTCAddressWitnessPubkeyHash() - err := rpc.CheckRPCStatus(ob.btcClient, tssAddress, ob.Logger().Chain) + err := rpc.CheckRPCStatus(ob.btcClient, alertLatency, tssAddress, ob.Logger().Chain) if err != nil { ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") } diff --git a/zetaclient/chains/bitcoin/rpc/rpc.go b/zetaclient/chains/bitcoin/rpc/rpc.go index 9f8101438b..d422ba66cb 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc.go +++ b/zetaclient/chains/bitcoin/rpc/rpc.go @@ -24,9 +24,9 @@ const ( // defaultTestnetFeeRate is the default fee rate for testnet, 10 sat/byte defaultTestnetFeeRate = 10 - // rpcLatencyThreshold is the threshold for RPC latency to be considered unhealthy + // RPCAlertLatency is the default threshold for RPC latency to be considered unhealthy and trigger an alert. // Bitcoin block time is 10 minutes, 1200s (20 minutes) is a reasonable threshold for Bitcoin - rpcLatencyThreshold = 1200 + RPCAlertLatency = 1200 ) // NewRPCClient creates a new RPC client by the given config. @@ -166,7 +166,12 @@ func GetRecentFeeRate(rpcClient interfaces.BTCRPCClient, netParams *chaincfg.Par } // CheckRPCStatus checks the RPC status of the evm chain -func CheckRPCStatus(client interfaces.BTCRPCClient, tssAddress btcutil.Address, logger zerolog.Logger) error { +func CheckRPCStatus( + client interfaces.BTCRPCClient, + alertLatency uint64, + tssAddress btcutil.Address, + logger zerolog.Logger, +) error { // query latest block number bn, err := client.GetBlockCount() if err != nil { @@ -185,10 +190,15 @@ func CheckRPCStatus(client interfaces.BTCRPCClient, tssAddress btcutil.Address, return errors.Wrap(err, "GetBlockHeader error: RPC down?") } + // use default alert latency if not provided + if alertLatency == 0 { + alertLatency = RPCAlertLatency + } + // latest block should not be too old blockTime := header.Timestamp elapsedSeconds := time.Since(blockTime).Seconds() - if elapsedSeconds > rpcLatencyThreshold { + if elapsedSeconds > float64(alertLatency) { return errors.Errorf( "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", bn, diff --git a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go index 818928f71f..ebc8bf1a50 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go +++ b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go @@ -59,7 +59,7 @@ func (suite *BitcoinObserverTestSuite) SetupTest() { suite.Require().NoError(err) // create observer - ob, err := observer.NewObserver(chain, btcClient, params, nil, tss, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver(chain, btcClient, params, nil, tss, 60, database, base.DefaultLogger(), nil) suite.Require().NoError(err) suite.Require().NotNil(ob) @@ -259,7 +259,7 @@ func LiveTestCheckRPCStatus(t *testing.T) { require.NoError(t, err) // check RPC status - err = rpc.CheckRPCStatus(client, tssAddress, log.Logger) + err = rpc.CheckRPCStatus(client, rpc.RPCAlertLatency, tssAddress, log.Logger) require.NoError(t, err) } diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index 8d451e5d01..fdc6efbe61 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -77,6 +77,7 @@ func NewObserver( tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, + evmCfg.RPCAlertLatency, ts, database, logger, diff --git a/zetaclient/chains/evm/observer/rpc_status.go b/zetaclient/chains/evm/observer/rpc_status.go index 51ddc937d9..525c20b1ba 100644 --- a/zetaclient/chains/evm/observer/rpc_status.go +++ b/zetaclient/chains/evm/observer/rpc_status.go @@ -21,7 +21,8 @@ func (ob *Observer) WatchRPCStatus(ctx context.Context) error { continue } - err := rpc.CheckRPCStatus(ctx, ob.evmClient, ob.Logger().Chain) + alertLatency := ob.RPCAlertLatency() + err := rpc.CheckRPCStatus(ctx, ob.evmClient, alertLatency, ob.Logger().Chain) if err != nil { ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") } diff --git a/zetaclient/chains/evm/rpc/rpc.go b/zetaclient/chains/evm/rpc/rpc.go index f9e6d71c73..6c9cb264d2 100644 --- a/zetaclient/chains/evm/rpc/rpc.go +++ b/zetaclient/chains/evm/rpc/rpc.go @@ -13,9 +13,9 @@ import ( ) const ( - // rpcLatencyThreshold is the threshold for RPC latency to be considered unhealthy + // RPCAlertLatency is the default threshold for RPC latency to be considered unhealthy and trigger an alert. // 100s is a reasonable threshold for most EVM chains - rpcLatencyThreshold = 100 + RPCAlertLatency = 100 ) // IsTxConfirmed checks if the transaction is confirmed with given confirmations @@ -61,7 +61,12 @@ func IsTxConfirmed( } // CheckRPCStatus checks the RPC status of the evm chain -func CheckRPCStatus(ctx context.Context, client interfaces.EVMRPCClient, logger zerolog.Logger) error { +func CheckRPCStatus( + ctx context.Context, + client interfaces.EVMRPCClient, + alertLatency uint64, + logger zerolog.Logger, +) error { // query latest block number bn, err := client.BlockNumber(ctx) if err != nil { @@ -80,11 +85,16 @@ func CheckRPCStatus(ctx context.Context, client interfaces.EVMRPCClient, logger return errors.Wrap(err, "HeaderByNumber error: RPC down?") } + // use default alert latency if not provided + if alertLatency == 0 { + alertLatency = RPCAlertLatency + } + // latest block should not be too old // #nosec G115 always in range blockTime := time.Unix(int64(header.Time), 0).UTC() elapsedSeconds := time.Since(blockTime).Seconds() - if elapsedSeconds > rpcLatencyThreshold { + if elapsedSeconds > float64(alertLatency) { return errors.Errorf( "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", bn, diff --git a/zetaclient/chains/evm/rpc/rpc_live_test.go b/zetaclient/chains/evm/rpc/rpc_live_test.go index 0a97b00ab6..6ba0f3bbf7 100644 --- a/zetaclient/chains/evm/rpc/rpc_live_test.go +++ b/zetaclient/chains/evm/rpc/rpc_live_test.go @@ -51,6 +51,6 @@ func LiveTest_CheckRPCStatus(t *testing.T) { require.NoError(t, err) ctx := context.Background() - err = rpc.CheckRPCStatus(ctx, client, log.Logger) + err = rpc.CheckRPCStatus(ctx, client, rpc.RPCAlertLatency, log.Logger) require.NoError(t, err) } diff --git a/zetaclient/chains/solana/observer/inbound_test.go b/zetaclient/chains/solana/observer/inbound_test.go index 40f53ce0bc..5652d68df3 100644 --- a/zetaclient/chains/solana/observer/inbound_test.go +++ b/zetaclient/chains/solana/observer/inbound_test.go @@ -40,7 +40,17 @@ func Test_FilterInboundEventAndVote(t *testing.T) { zetacoreClient := mocks.NewZetacoreClient(t) zetacoreClient.WithKeys(&keys.Keys{}).WithZetaChain().WithPostVoteInbound("", "") - ob, err := observer.NewObserver(chain, nil, *chainParams, zetacoreClient, nil, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver( + chain, + nil, + *chainParams, + zetacoreClient, + nil, + 60, + database, + base.DefaultLogger(), + nil, + ) require.NoError(t, err) t.Run("should filter inbound events and vote", func(t *testing.T) { @@ -63,7 +73,7 @@ func Test_FilterInboundEvents(t *testing.T) { chainParams := sample.ChainParams(chain.ChainId) chainParams.GatewayAddress = GatewayAddressTest - ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, 60, database, base.DefaultLogger(), nil) require.NoError(t, err) // expected result @@ -103,7 +113,7 @@ func Test_BuildInboundVoteMsgFromEvent(t *testing.T) { database, err := db.NewFromSqliteInMemory(true) require.NoError(t, err) - ob, err := observer.NewObserver(chain, nil, *params, zetacoreClient, nil, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver(chain, nil, *params, zetacoreClient, nil, 60, database, base.DefaultLogger(), nil) require.NoError(t, err) // create test compliance config @@ -170,7 +180,7 @@ func Test_ParseInboundAsDeposit(t *testing.T) { // create observer chainParams := sample.ChainParams(chain.ChainId) chainParams.GatewayAddress = GatewayAddressTest - ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, 60, database, base.DefaultLogger(), nil) require.NoError(t, err) // expected result diff --git a/zetaclient/chains/solana/observer/observer.go b/zetaclient/chains/solana/observer/observer.go index 75f5b03dc6..0de212b30b 100644 --- a/zetaclient/chains/solana/observer/observer.go +++ b/zetaclient/chains/solana/observer/observer.go @@ -44,6 +44,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, + rpcAlertLatency uint64, db *db.DB, logger base.Logger, ts *metrics.TelemetryServer, @@ -56,6 +57,7 @@ func NewObserver( tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, + rpcAlertLatency, ts, db, logger, diff --git a/zetaclient/chains/solana/observer/observer_test.go b/zetaclient/chains/solana/observer/observer_test.go index 5b2576e66d..7d8b3370b5 100644 --- a/zetaclient/chains/solana/observer/observer_test.go +++ b/zetaclient/chains/solana/observer/observer_test.go @@ -44,6 +44,7 @@ func MockSolanaObserver( chainParams, zetacoreClient, tss, + 60, database, base.DefaultLogger(), nil, diff --git a/zetaclient/chains/solana/observer/outbound_test.go b/zetaclient/chains/solana/observer/outbound_test.go index 654fa8f4ee..d0b6a7e064 100644 --- a/zetaclient/chains/solana/observer/outbound_test.go +++ b/zetaclient/chains/solana/observer/outbound_test.go @@ -50,7 +50,7 @@ func createTestObserver( // create observer chainParams := sample.ChainParams(chain.ChainId) chainParams.GatewayAddress = GatewayAddressTest - ob, err := observer.NewObserver(chain, solClient, *chainParams, nil, tss, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver(chain, solClient, *chainParams, nil, tss, 60, database, base.DefaultLogger(), nil) require.NoError(t, err) return ob diff --git a/zetaclient/chains/solana/observer/rpc_status.go b/zetaclient/chains/solana/observer/rpc_status.go index c7fe070bd5..7910ff6f7e 100644 --- a/zetaclient/chains/solana/observer/rpc_status.go +++ b/zetaclient/chains/solana/observer/rpc_status.go @@ -20,7 +20,8 @@ func (ob *Observer) WatchRPCStatus(ctx context.Context) error { continue } - err := rpc.CheckRPCStatus(ctx, ob.solClient, ob.Logger().Chain) + alertLatency := ob.RPCAlertLatency() + err := rpc.CheckRPCStatus(ctx, ob.solClient, alertLatency, ob.Logger().Chain) if err != nil { ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") } diff --git a/zetaclient/chains/solana/rpc/rpc.go b/zetaclient/chains/solana/rpc/rpc.go index bc761fec02..00c2add253 100644 --- a/zetaclient/chains/solana/rpc/rpc.go +++ b/zetaclient/chains/solana/rpc/rpc.go @@ -16,9 +16,9 @@ const ( // defaultPageLimit is the default number of signatures to fetch in one GetSignaturesForAddressWithOpts call DefaultPageLimit = 1000 - // rpcLatencyThreshold is the threshold for RPC latency to be considered unhealthy + // RPCAlertLatency is the default threshold for RPC latency to be considered unhealthy and trigger an alert. // The 'HEALTH_CHECK_SLOT_DISTANCE' is default to 150 slots, which is 150 * 0.4s = 60s - rpcLatencyThreshold = 60 + RPCAlertLatency = 60 ) // GetFirstSignatureForAddress searches the first signature for the given address. @@ -124,7 +124,12 @@ func GetSignaturesForAddressUntil( } // CheckRPCStatus checks the RPC status of the solana chain -func CheckRPCStatus(ctx context.Context, client interfaces.SolanaRPCClient, logger zerolog.Logger) error { +func CheckRPCStatus( + ctx context.Context, + client interfaces.SolanaRPCClient, + alertLatency uint64, + logger zerolog.Logger, +) error { // query solana health (always return "ok" unless --trusted-validator is provided) _, err := client.GetHealth(ctx) if err != nil { @@ -143,9 +148,14 @@ func CheckRPCStatus(ctx context.Context, client interfaces.SolanaRPCClient, logg return errors.Wrap(err, "GetBlockTime error: RPC down?") } + // use default alert latency if not provided + if alertLatency == 0 { + alertLatency = RPCAlertLatency + } + // latest block should not be too old elapsedSeconds := time.Since(blockTime.Time()).Seconds() - if elapsedSeconds > rpcLatencyThreshold { + if elapsedSeconds > float64(alertLatency) { return errors.Errorf( "Latest slot %d is %.0fs old, RPC stale or chain stuck (check explorer)?", slot, diff --git a/zetaclient/chains/solana/rpc/rpc_live_test.go b/zetaclient/chains/solana/rpc/rpc_live_test.go index 3b3cf9cc91..b04352d944 100644 --- a/zetaclient/chains/solana/rpc/rpc_live_test.go +++ b/zetaclient/chains/solana/rpc/rpc_live_test.go @@ -63,6 +63,6 @@ func LiveTest_CheckRPCStatus(t *testing.T) { // check the RPC status ctx := context.Background() - err := rpc.CheckRPCStatus(ctx, client, log.Logger) + err := rpc.CheckRPCStatus(ctx, client, rpc.RPCAlertLatency, log.Logger) require.NoError(t, err) } diff --git a/zetaclient/config/config_chain.go b/zetaclient/config/config_chain.go index 826253d71e..76bddc335f 100644 --- a/zetaclient/config/config_chain.go +++ b/zetaclient/config/config_chain.go @@ -31,17 +31,19 @@ func New(setDefaults bool) Config { // bitcoinConfigRegnet contains Bitcoin config for regnet func bitcoinConfigRegnet() BTCConfig { return BTCConfig{ - RPCUsername: "smoketest", // smoketest is the previous name for E2E test, we keep this name for compatibility between client versions in upgrade test - RPCPassword: "123", - RPCHost: "bitcoin:18443", - RPCParams: "regtest", + RPCUsername: "smoketest", // smoketest is the previous name for E2E test, we keep this name for compatibility between client versions in upgrade test + RPCPassword: "123", + RPCHost: "bitcoin:18443", + RPCParams: "regtest", + RPCAlertLatency: 60, } } // solanaConfigLocalnet contains config for Solana localnet func solanaConfigLocalnet() SolanaConfig { return SolanaConfig{ - Endpoint: "http://solana:8899", + Endpoint: "http://solana:8899", + RPCAlertLatency: 60, } } @@ -72,8 +74,9 @@ func evmChainsConfigs() map[int64]EVMConfig { Endpoint: "", }, chains.GoerliLocalnet.ChainId: { - Chain: chains.GoerliLocalnet, - Endpoint: "http://eth:8545", + Chain: chains.GoerliLocalnet, + Endpoint: "http://eth:8545", + RPCAlertLatency: 60, }, } } diff --git a/zetaclient/config/types.go b/zetaclient/config/types.go index b43043e30e..37c8024b0f 100644 --- a/zetaclient/config/types.go +++ b/zetaclient/config/types.go @@ -34,22 +34,25 @@ type ClientConfiguration struct { // EVMConfig is the config for EVM chain type EVMConfig struct { - Chain chains.Chain - Endpoint string + Chain chains.Chain + Endpoint string + RPCAlertLatency uint64 } // BTCConfig is the config for Bitcoin chain type BTCConfig struct { // the following are rpcclient ConnConfig fields - RPCUsername string - RPCPassword string - RPCHost string - RPCParams string // "regtest", "mainnet", "testnet3" + RPCUsername string + RPCPassword string + RPCHost string + RPCParams string // "regtest", "mainnet", "testnet3" + RPCAlertLatency uint64 } // SolanaConfig is the config for Solana chain type SolanaConfig struct { - Endpoint string + Endpoint string + RPCAlertLatency uint64 } // ComplianceConfig is the config for compliance diff --git a/zetaclient/orchestrator/bootstrap.go b/zetaclient/orchestrator/bootstrap.go index 238652f931..e997eaeaa4 100644 --- a/zetaclient/orchestrator/bootstrap.go +++ b/zetaclient/orchestrator/bootstrap.go @@ -341,6 +341,7 @@ func syncObserverMap( *params, client, tss, + cfg.RPCAlertLatency, database, logger, ts, @@ -377,6 +378,7 @@ func syncObserverMap( *params, client, tss, + cfg.RPCAlertLatency, database, logger, ts, From 81cff28735b568cb40de3da6d1f6e00047c47bfb Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Wed, 21 Aug 2024 12:50:23 -0500 Subject: [PATCH 4/7] enable live test by ENABLE_LIVE_TEST environment variable --- zetaclient/chains/base/observer.go | 11 ++--- .../chains/bitcoin/observer/observer.go | 3 +- zetaclient/chains/bitcoin/rpc/rpc.go | 14 +++--- .../chains/bitcoin/rpc/rpc_live_test.go | 44 +++++++++++-------- zetaclient/chains/evm/observer/observer.go | 3 +- zetaclient/chains/evm/rpc/rpc.go | 14 +++--- zetaclient/chains/evm/rpc/rpc_live_test.go | 9 +++- zetaclient/chains/solana/observer/observer.go | 3 +- zetaclient/chains/solana/rpc/rpc.go | 14 +++--- zetaclient/chains/solana/rpc/rpc_live_test.go | 15 ++++--- zetaclient/common/env.go | 24 ++++++++++ zetaclient/config/types.go | 6 +-- zetaclient/orchestrator/bootstrap.go | 5 ++- 13 files changed, 106 insertions(+), 59 deletions(-) create mode 100644 zetaclient/common/env.go diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index 1778e2d202..a948ac396f 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -7,6 +7,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" @@ -60,8 +61,8 @@ type Observer struct { // lastTxScanned is the last transaction hash scanned by the observer lastTxScanned string - // rpcAlertLatency is the threshold of RPC latency (in seconds) to trigger an alert - rpcAlertLatency uint64 + // rpcAlertLatency is the threshold of RPC latency to trigger an alert + rpcAlertLatency time.Duration // blockCache is the cache for blocks blockCache *lru.Cache @@ -95,7 +96,7 @@ func NewObserver( tss interfaces.TSSSigner, blockCacheSize int, headerCacheSize int, - rpcAlertLatency uint64, + rpcAlertLatency time.Duration, ts *metrics.TelemetryServer, database *db.DB, logger Logger, @@ -108,7 +109,7 @@ func NewObserver( lastBlock: 0, lastBlockScanned: 0, lastTxScanned: "", - rpcAlertLatency: rpcAlertLatency, + rpcAlertLatency: rpcAlertLatency * time.Second, // latency in seconds ts: ts, db: database, mu: &sync.Mutex{}, @@ -252,7 +253,7 @@ func (ob *Observer) WithLastTxScanned(txHash string) *Observer { } // RPCAlertLatency returns the RPC alert latency for the observer. -func (ob *Observer) RPCAlertLatency() uint64 { +func (ob *Observer) RPCAlertLatency() time.Duration { return ob.rpcAlertLatency } diff --git a/zetaclient/chains/bitcoin/observer/observer.go b/zetaclient/chains/bitcoin/observer/observer.go index 65adf2f932..7a74164fff 100644 --- a/zetaclient/chains/bitcoin/observer/observer.go +++ b/zetaclient/chains/bitcoin/observer/observer.go @@ -8,6 +8,7 @@ import ( "math" "math/big" "sort" + "time" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg" @@ -114,7 +115,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, - rpcAlertLatency uint64, + rpcAlertLatency time.Duration, database *db.DB, logger base.Logger, ts *metrics.TelemetryServer, diff --git a/zetaclient/chains/bitcoin/rpc/rpc.go b/zetaclient/chains/bitcoin/rpc/rpc.go index d422ba66cb..c2f73364d8 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc.go +++ b/zetaclient/chains/bitcoin/rpc/rpc.go @@ -26,7 +26,7 @@ const ( // RPCAlertLatency is the default threshold for RPC latency to be considered unhealthy and trigger an alert. // Bitcoin block time is 10 minutes, 1200s (20 minutes) is a reasonable threshold for Bitcoin - RPCAlertLatency = 1200 + RPCAlertLatency = time.Duration(1200) * time.Second ) // NewRPCClient creates a new RPC client by the given config. @@ -168,7 +168,7 @@ func GetRecentFeeRate(rpcClient interfaces.BTCRPCClient, netParams *chaincfg.Par // CheckRPCStatus checks the RPC status of the evm chain func CheckRPCStatus( client interfaces.BTCRPCClient, - alertLatency uint64, + alertLatency time.Duration, tssAddress btcutil.Address, logger zerolog.Logger, ) error { @@ -191,18 +191,18 @@ func CheckRPCStatus( } // use default alert latency if not provided - if alertLatency == 0 { + if alertLatency <= 0 { alertLatency = RPCAlertLatency } // latest block should not be too old blockTime := header.Timestamp - elapsedSeconds := time.Since(blockTime).Seconds() - if elapsedSeconds > float64(alertLatency) { + elapsedTime := time.Since(blockTime) + if elapsedTime > alertLatency { return errors.Errorf( "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", bn, - elapsedSeconds, + elapsedTime.Seconds(), ) } @@ -218,6 +218,6 @@ func CheckRPCStatus( } logger.Info(). - Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddress, len(res)) + Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedTime.Seconds(), tssAddress, len(res)) return nil } diff --git a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go index ebc8bf1a50..cde5fae2b7 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go +++ b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go @@ -26,6 +26,7 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin" "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer" "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc" + "github.com/zeta-chain/zetacore/zetaclient/common" "github.com/zeta-chain/zetacore/zetaclient/config" "github.com/zeta-chain/zetacore/zetaclient/testutils" "github.com/zeta-chain/zetacore/zetaclient/testutils/mocks" @@ -98,8 +99,8 @@ func (suite *BitcoinObserverTestSuite) TearDownSuite() { // createRPCClient creates a new Bitcoin RPC client for given chainID func createRPCClient(chainID int64) (*rpcclient.Client, error) { var connCfg *rpcclient.ConnConfig - rpcMainnet := os.Getenv("BTC_RPC_MAINNET") - rpcTestnet := os.Getenv("BTC_RPC_TESTNET") + rpcMainnet := os.Getenv(common.EnvBtcRPCMainnet) + rpcTestnet := os.Getenv(common.EnvBtcRPCTestnet) // mainnet if chainID == chains.BitcoinMainnet.ChainId { @@ -216,16 +217,21 @@ func (suite *BitcoinObserverTestSuite) Test2() { // TestBitcoinObserverLive is a phony test to run each live test individually func TestBitcoinObserverLive(t *testing.T) { + if !common.LiveTestEnabled() { + return + } + + // disable legacy live tests // suite.Run(t, new(BitcoinClientTestSuite)) - // LiveTestNewRPCClient(t) - // LiveTestCheckRPCStatus(t) - // LiveTestGetBlockHeightByHash(t) - // LiveTestBitcoinFeeRate(t) - // LiveTestAvgFeeRateMainnetMempoolSpace(t) - // LiveTestAvgFeeRateTestnetMempoolSpace(t) - // LiveTestGetRecentFeeRate(t) - // LiveTestGetSenderByVin(t) + LiveTestNewRPCClient(t) + LiveTestCheckRPCStatus(t) + LiveTestGetBlockHeightByHash(t) + LiveTestBitcoinFeeRate(t) + LiveTestAvgFeeRateMainnetMempoolSpace(t) + LiveTestAvgFeeRateTestnetMempoolSpace(t) + LiveTestGetRecentFeeRate(t) + LiveTestGetSenderByVin(t) } // LiveTestNewRPCClient creates a new Bitcoin RPC client @@ -233,8 +239,8 @@ func LiveTestNewRPCClient(t *testing.T) { btcConfig := config.BTCConfig{ RPCUsername: "user", RPCPassword: "pass", - RPCHost: os.Getenv("BTC_RPC_TESTNET"), - RPCParams: "mainnet", + RPCHost: os.Getenv(common.EnvBtcRPCTestnet), + RPCParams: "testnet3", } // create Bitcoin RPC client @@ -326,9 +332,11 @@ func LiveTestBitcoinFeeRate(t *testing.T) { feeRateEconomical2.Uint64(), ) - // monitor fee rate every 5 minutes - for { - time.Sleep(time.Duration(5) * time.Minute) + // monitor fee rate every 5 minutes, adjust the iteration count as needed + for i := 0; i < 1; i++ { + // please uncomment this interval for long running test + //time.Sleep(time.Duration(5) * time.Minute) + bn, err = client.GetBlockCount() feeRateConservative1, errCon1 = getFeeRate(client, 1, &btcjson.EstimateModeConservative) feeRateEconomical1, errEco1 = getFeeRate(client, 1, &btcjson.EstimateModeEconomical) @@ -419,7 +427,7 @@ func LiveTestAvgFeeRateMainnetMempoolSpace(t *testing.T) { // test against mempool.space API for 10000 blocks //startBlock := 210000 * 3 // 3rd halving startBlock := 829596 - endBlock := startBlock - 10000 + endBlock := startBlock - 1 // go back to whatever block as needed compareAvgFeeRate(t, client, startBlock, endBlock, false) } @@ -433,7 +441,7 @@ func LiveTestAvgFeeRateTestnetMempoolSpace(t *testing.T) { // test against mempool.space API for 10000 blocks //startBlock := 210000 * 12 // 12th halving startBlock := 2577600 - endBlock := startBlock - 10000 + endBlock := startBlock - 1 // go back to whatever block as needed compareAvgFeeRate(t, client, startBlock, endBlock, true) } @@ -468,7 +476,7 @@ func LiveTestGetSenderByVin(t *testing.T) { // calculates block range to test startBlock, err := client.GetBlockCount() require.NoError(t, err) - endBlock := startBlock - 5000 + endBlock := startBlock - 1 // go back to whatever block as needed // loop through mempool.space blocks in descending order BLOCKLOOP: diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index fdc6efbe61..5ed0acf136 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -7,6 +7,7 @@ import ( "math" "math/big" "strings" + "time" ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -77,7 +78,7 @@ func NewObserver( tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, - evmCfg.RPCAlertLatency, + time.Duration(evmCfg.RPCAlertLatency), ts, database, logger, diff --git a/zetaclient/chains/evm/rpc/rpc.go b/zetaclient/chains/evm/rpc/rpc.go index 6c9cb264d2..538392adae 100644 --- a/zetaclient/chains/evm/rpc/rpc.go +++ b/zetaclient/chains/evm/rpc/rpc.go @@ -15,7 +15,7 @@ import ( const ( // RPCAlertLatency is the default threshold for RPC latency to be considered unhealthy and trigger an alert. // 100s is a reasonable threshold for most EVM chains - RPCAlertLatency = 100 + RPCAlertLatency = time.Duration(100) * time.Second ) // IsTxConfirmed checks if the transaction is confirmed with given confirmations @@ -64,7 +64,7 @@ func IsTxConfirmed( func CheckRPCStatus( ctx context.Context, client interfaces.EVMRPCClient, - alertLatency uint64, + alertLatency time.Duration, logger zerolog.Logger, ) error { // query latest block number @@ -86,23 +86,23 @@ func CheckRPCStatus( } // use default alert latency if not provided - if alertLatency == 0 { + if alertLatency <= 0 { alertLatency = RPCAlertLatency } // latest block should not be too old // #nosec G115 always in range blockTime := time.Unix(int64(header.Time), 0).UTC() - elapsedSeconds := time.Since(blockTime).Seconds() - if elapsedSeconds > float64(alertLatency) { + elapsedTime := time.Since(blockTime) + if elapsedTime > alertLatency { return errors.Errorf( "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", bn, - elapsedSeconds, + elapsedTime.Seconds(), ) } logger.Info(). - Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.0fs ago), gas price %s", header.Number, blockTime.String(), elapsedSeconds, gasPrice.String()) + Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.0fs ago), gas price %s", header.Number, blockTime.String(), elapsedTime.Seconds(), gasPrice.String()) return nil } diff --git a/zetaclient/chains/evm/rpc/rpc_live_test.go b/zetaclient/chains/evm/rpc/rpc_live_test.go index 6ba0f3bbf7..c6f2ad2368 100644 --- a/zetaclient/chains/evm/rpc/rpc_live_test.go +++ b/zetaclient/chains/evm/rpc/rpc_live_test.go @@ -8,6 +8,7 @@ import ( "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc" + "github.com/zeta-chain/zetacore/zetaclient/common" "testing" ) @@ -21,8 +22,12 @@ const ( // Test_EVMRPCLive is a phony test to run each live test individually func Test_EVMRPCLive(t *testing.T) { - // LiveTest_IsTxConfirmed(t) - // LiveTest_CheckRPCStatus(t) + if !common.LiveTestEnabled() { + return + } + + LiveTest_IsTxConfirmed(t) + LiveTest_CheckRPCStatus(t) } func LiveTest_IsTxConfirmed(t *testing.T) { diff --git a/zetaclient/chains/solana/observer/observer.go b/zetaclient/chains/solana/observer/observer.go index 0de212b30b..cc2e3a8204 100644 --- a/zetaclient/chains/solana/observer/observer.go +++ b/zetaclient/chains/solana/observer/observer.go @@ -2,6 +2,7 @@ package observer import ( "context" + "time" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" @@ -44,7 +45,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, - rpcAlertLatency uint64, + rpcAlertLatency time.Duration, db *db.DB, logger base.Logger, ts *metrics.TelemetryServer, diff --git a/zetaclient/chains/solana/rpc/rpc.go b/zetaclient/chains/solana/rpc/rpc.go index 00c2add253..c62bcccb06 100644 --- a/zetaclient/chains/solana/rpc/rpc.go +++ b/zetaclient/chains/solana/rpc/rpc.go @@ -18,7 +18,7 @@ const ( // RPCAlertLatency is the default threshold for RPC latency to be considered unhealthy and trigger an alert. // The 'HEALTH_CHECK_SLOT_DISTANCE' is default to 150 slots, which is 150 * 0.4s = 60s - RPCAlertLatency = 60 + RPCAlertLatency = time.Duration(60) * time.Second ) // GetFirstSignatureForAddress searches the first signature for the given address. @@ -127,7 +127,7 @@ func GetSignaturesForAddressUntil( func CheckRPCStatus( ctx context.Context, client interfaces.SolanaRPCClient, - alertLatency uint64, + alertLatency time.Duration, logger zerolog.Logger, ) error { // query solana health (always return "ok" unless --trusted-validator is provided) @@ -149,21 +149,21 @@ func CheckRPCStatus( } // use default alert latency if not provided - if alertLatency == 0 { + if alertLatency <= 0 { alertLatency = RPCAlertLatency } // latest block should not be too old - elapsedSeconds := time.Since(blockTime.Time()).Seconds() - if elapsedSeconds > float64(alertLatency) { + elapsedTime := time.Since(blockTime.Time()) + if elapsedTime > alertLatency { return errors.Errorf( "Latest slot %d is %.0fs old, RPC stale or chain stuck (check explorer)?", slot, - elapsedSeconds, + elapsedTime.Seconds(), ) } logger.Info(). - Msgf("RPC Status [OK]: latest slot %d, timestamp %s (%.0fs ago)", slot, blockTime.String(), elapsedSeconds) + Msgf("RPC Status [OK]: latest slot %d, timestamp %s (%.0fs ago)", slot, blockTime.String(), elapsedTime.Seconds()) return nil } diff --git a/zetaclient/chains/solana/rpc/rpc_live_test.go b/zetaclient/chains/solana/rpc/rpc_live_test.go index b04352d944..35c1b6ae70 100644 --- a/zetaclient/chains/solana/rpc/rpc_live_test.go +++ b/zetaclient/chains/solana/rpc/rpc_live_test.go @@ -9,13 +9,18 @@ import ( "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/zetaclient/chains/solana/rpc" + "github.com/zeta-chain/zetacore/zetaclient/common" ) // Test_SolanaRPCLive is a phony test to run all live tests func Test_SolanaRPCLive(t *testing.T) { - // LiveTest_GetFirstSignatureForAddress(t) - // LiveTest_GetSignaturesForAddressUntil(t) - // LiveTest_CheckRPCStatus(t) + if !common.LiveTestEnabled() { + return + } + + LiveTest_GetFirstSignatureForAddress(t) + LiveTest_GetSignaturesForAddressUntil(t) + LiveTest_CheckRPCStatus(t) } func LiveTest_GetFirstSignatureForAddress(t *testing.T) { @@ -44,8 +49,8 @@ func LiveTest_GetSignaturesForAddressUntil(t *testing.T) { "2tUQtcrXxtNFtV9kZ4kQsmY7snnEoEEArmu9pUptr4UCy8UdbtjPD6UtfEtPJ2qk5CTzZTmLwsbmZdLymcwSUcHu", ) - // get all signatures for the address until the first signature (one by one) - sigs, err := rpc.GetSignaturesForAddressUntil(context.Background(), client, address, untilSig, 1) + // get all signatures for the address until the first signature + sigs, err := rpc.GetSignaturesForAddressUntil(context.Background(), client, address, untilSig, 100) require.NoError(t, err) // assert diff --git a/zetaclient/common/env.go b/zetaclient/common/env.go new file mode 100644 index 0000000000..ca48ef7615 --- /dev/null +++ b/zetaclient/common/env.go @@ -0,0 +1,24 @@ +package common + +import ( + "os" + "strings" +) + +const ( + // EnvEnableLiveTest is the environment variable to enable zetaclient live tests + EnvEnableLiveTest = "ENABLE_LIVE_TEST" + + // EnvBtcRPCMainnet is the environment variable to enable mainnet for bitcoin rpc + EnvBtcRPCMainnet = "BTC_RPC_MAINNET" + + // EnvBtcRPCTestnet is the environment variable to enable testnet for bitcoin rpc + EnvBtcRPCTestnet = "BTC_RPC_TESTNET" +) + +// LiveTestEnabled returns true if live tests are enabled +func LiveTestEnabled() bool { + value := os.Getenv(EnvEnableLiveTest) + + return strings.ToLower(value) == "true" || value == "1" +} diff --git a/zetaclient/config/types.go b/zetaclient/config/types.go index 37c8024b0f..55164efb33 100644 --- a/zetaclient/config/types.go +++ b/zetaclient/config/types.go @@ -36,7 +36,7 @@ type ClientConfiguration struct { type EVMConfig struct { Chain chains.Chain Endpoint string - RPCAlertLatency uint64 + RPCAlertLatency int64 } // BTCConfig is the config for Bitcoin chain @@ -46,13 +46,13 @@ type BTCConfig struct { RPCPassword string RPCHost string RPCParams string // "regtest", "mainnet", "testnet3" - RPCAlertLatency uint64 + RPCAlertLatency int64 } // SolanaConfig is the config for Solana chain type SolanaConfig struct { Endpoint string - RPCAlertLatency uint64 + RPCAlertLatency int64 } // ComplianceConfig is the config for compliance diff --git a/zetaclient/orchestrator/bootstrap.go b/zetaclient/orchestrator/bootstrap.go index e997eaeaa4..0f91a83dc6 100644 --- a/zetaclient/orchestrator/bootstrap.go +++ b/zetaclient/orchestrator/bootstrap.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "time" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" @@ -341,7 +342,7 @@ func syncObserverMap( *params, client, tss, - cfg.RPCAlertLatency, + time.Duration(cfg.RPCAlertLatency), database, logger, ts, @@ -378,7 +379,7 @@ func syncObserverMap( *params, client, tss, - cfg.RPCAlertLatency, + time.Duration(cfg.RPCAlertLatency), database, logger, ts, From 3d55b517435507acc2dbd433481d7eafdeaf8ac8 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Wed, 21 Aug 2024 13:06:54 -0500 Subject: [PATCH 5/7] unified RPC error alert message --- zetaclient/chains/bitcoin/rpc/rpc.go | 6 +++--- zetaclient/chains/evm/rpc/rpc.go | 6 +++--- zetaclient/chains/solana/rpc/rpc.go | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/zetaclient/chains/bitcoin/rpc/rpc.go b/zetaclient/chains/bitcoin/rpc/rpc.go index c2f73364d8..a265dc45f3 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc.go +++ b/zetaclient/chains/bitcoin/rpc/rpc.go @@ -175,19 +175,19 @@ func CheckRPCStatus( // query latest block number bn, err := client.GetBlockCount() if err != nil { - return errors.Wrap(err, "GetBlockCount error: RPC down?") + return errors.Wrap(err, "RPC error onGetBlockCount: RPC down?") } // query latest block header hash, err := client.GetBlockHash(bn) if err != nil { - return errors.Wrap(err, "GetBlockHash error: RPC down?") + return errors.Wrap(err, "RPC error onGetBlockHash: RPC down?") } // query latest block header thru hash header, err := client.GetBlockHeader(hash) if err != nil { - return errors.Wrap(err, "GetBlockHeader error: RPC down?") + return errors.Wrap(err, "RPC error onGetBlockHeader: RPC down?") } // use default alert latency if not provided diff --git a/zetaclient/chains/evm/rpc/rpc.go b/zetaclient/chains/evm/rpc/rpc.go index 538392adae..22bea55110 100644 --- a/zetaclient/chains/evm/rpc/rpc.go +++ b/zetaclient/chains/evm/rpc/rpc.go @@ -70,19 +70,19 @@ func CheckRPCStatus( // query latest block number bn, err := client.BlockNumber(ctx) if err != nil { - return errors.Wrap(err, "BlockNumber error: RPC down?") + return errors.Wrap(err, "RPC error onBlockNumber: RPC down?") } // query suggested gas price gasPrice, err := client.SuggestGasPrice(ctx) if err != nil { - return errors.Wrap(err, "SuggestGasPrice error: RPC down?") + return errors.Wrap(err, "RPC error onSuggestGasPrice: RPC down?") } // query latest block header header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(bn)) if err != nil { - return errors.Wrap(err, "HeaderByNumber error: RPC down?") + return errors.Wrap(err, "RPC error onHeaderByNumber: RPC down?") } // use default alert latency if not provided diff --git a/zetaclient/chains/solana/rpc/rpc.go b/zetaclient/chains/solana/rpc/rpc.go index c62bcccb06..c5cf6b0989 100644 --- a/zetaclient/chains/solana/rpc/rpc.go +++ b/zetaclient/chains/solana/rpc/rpc.go @@ -133,19 +133,19 @@ func CheckRPCStatus( // query solana health (always return "ok" unless --trusted-validator is provided) _, err := client.GetHealth(ctx) if err != nil { - return errors.Wrap(err, "GetHealth error: RPC down?") + return errors.Wrap(err, "RPC error onGetHealth: RPC down?") } // query latest slot slot, err := client.GetSlot(ctx, rpc.CommitmentFinalized) if err != nil { - return errors.Wrap(err, "GetSlot error: RPC down?") + return errors.Wrap(err, "RPC error onGetSlot: RPC down?") } // query latest block time blockTime, err := client.GetBlockTime(ctx, slot) if err != nil { - return errors.Wrap(err, "GetBlockTime error: RPC down?") + return errors.Wrap(err, "RPC error onGetBlockTime: RPC down?") } // use default alert latency if not provided From 826da7b36bc7995adba0a479e4ba110107b37a1d Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Thu, 29 Aug 2024 19:42:45 -0500 Subject: [PATCH 6/7] move rpcAlertLatency argument out of rpc methods --- zetaclient/chains/base/observer.go | 26 +++- zetaclient/chains/base/observer_test.go | 120 ++++++++++++------ .../chains/bitcoin/observer/observer.go | 2 +- .../chains/bitcoin/observer/rpc_status.go | 25 ++-- zetaclient/chains/bitcoin/rpc/rpc.go | 40 ++---- .../chains/bitcoin/rpc/rpc_live_test.go | 2 +- zetaclient/chains/evm/observer/observer.go | 2 +- zetaclient/chains/evm/observer/rpc_status.go | 24 ++-- zetaclient/chains/evm/rpc/rpc.go | 35 +---- zetaclient/chains/evm/rpc/rpc_live_test.go | 3 +- zetaclient/chains/solana/observer/observer.go | 2 +- .../chains/solana/observer/rpc_status.go | 29 +++-- zetaclient/chains/solana/rpc/rpc.go | 39 ++---- zetaclient/chains/solana/rpc/rpc_live_test.go | 3 +- 14 files changed, 187 insertions(+), 165 deletions(-) diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index a948ac396f..b213e63ee9 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -252,11 +252,6 @@ func (ob *Observer) WithLastTxScanned(txHash string) *Observer { return ob } -// RPCAlertLatency returns the RPC alert latency for the observer. -func (ob *Observer) RPCAlertLatency() time.Duration { - return ob.rpcAlertLatency -} - // BlockCache returns the block cache for the observer. func (ob *Observer) BlockCache() *lru.Cache { return ob.blockCache @@ -463,6 +458,27 @@ func (ob *Observer) PostVoteInbound( return ballot, err } +// AlertOnRPCLatency prints an alert if the RPC latency exceeds the threshold. +// Returns true if the RPC latency is too high. +func (ob *Observer) AlertOnRPCLatency(latestBlockTime time.Time, defaultAlertLatency time.Duration) bool { + // use configured alert latency if set + alertLatency := defaultAlertLatency + if ob.rpcAlertLatency > 0 { + alertLatency = ob.rpcAlertLatency + } + + // latest block should not be too old + elapsedTime := time.Since(latestBlockTime) + if elapsedTime > alertLatency { + ob.logger.Chain.Error(). + Msgf("RPC is stale: latest block is %.0f seconds old, RPC down or chain stuck (check explorer)?", elapsedTime.Seconds()) + return true + } + + ob.logger.Chain.Info().Msgf("RPC is OK: latest block is %.0f seconds old", elapsedTime.Seconds()) + return false +} + // EnvVarLatestBlockByChain returns the environment variable for the last block by chain. func EnvVarLatestBlockByChain(chain chains.Chain) string { return fmt.Sprintf("CHAIN_%d_SCAN_FROM_BLOCK", chain.ChainId) diff --git a/zetaclient/chains/base/observer_test.go b/zetaclient/chains/base/observer_test.go index 4db8a19bbc..88cd1f0cf5 100644 --- a/zetaclient/chains/base/observer_test.go +++ b/zetaclient/chains/base/observer_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "testing" + "time" lru "github.com/hashicorp/golang-lru" "github.com/rs/zerolog" @@ -23,8 +24,13 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/testutils/mocks" ) +const ( + // defaultAlertLatency is the default alert latency for unit tests + defaultAlertLatency = 60 * time.Second +) + // createObserver creates a new observer for testing -func createObserver(t *testing.T, chain chains.Chain) *base.Observer { +func createObserver(t *testing.T, chain chains.Chain, alertLatency time.Duration) *base.Observer { // constructor parameters chainParams := *sample.ChainParams(chain.ChainId) zetacoreClient := mocks.NewZetacoreClient(t) @@ -41,7 +47,7 @@ func createObserver(t *testing.T, chain chains.Chain) *base.Observer { tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, - 60, + alertLatency, nil, database, logger, @@ -143,7 +149,7 @@ func TestNewObserver(t *testing.T) { func TestStop(t *testing.T) { t.Run("should be able to stop observer", func(t *testing.T) { // create observer and initialize db - ob := createObserver(t, chains.Ethereum) + ob := createObserver(t, chains.Ethereum, defaultAlertLatency) // stop observer ob.Stop() @@ -154,7 +160,7 @@ func TestObserverGetterAndSetter(t *testing.T) { chain := chains.Ethereum t.Run("should be able to update chain", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update chain newChain := chains.BscMainnet @@ -163,7 +169,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update chain params", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update chain params newChainParams := *sample.ChainParams(chains.BscMainnet.ChainId) @@ -172,7 +178,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update zetacore client", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update zetacore client newZetacoreClient := mocks.NewZetacoreClient(t) @@ -181,7 +187,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update tss", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update tss newTSS := mocks.NewTSSAthens3() @@ -190,7 +196,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update last block", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update last block newLastBlock := uint64(100) @@ -199,7 +205,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update last block scanned", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update last block scanned newLastBlockScanned := uint64(100) @@ -208,7 +214,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update last tx scanned", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update last tx scanned newLastTxScanned := sample.EthAddress().String() @@ -216,15 +222,8 @@ func TestObserverGetterAndSetter(t *testing.T) { require.Equal(t, newLastTxScanned, ob.LastTxScanned()) }) - t.Run("should be able to get rpc alert latency", func(t *testing.T) { - ob := createObserver(t, chain) - - // get rpc alert latency - require.EqualValues(t, 60, ob.RPCAlertLatency()) - }) - t.Run("should be able to replace block cache", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update block cache newBlockCache, err := lru.New(200) @@ -235,7 +234,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to replace header cache", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update headers cache newHeadersCache, err := lru.New(200) @@ -246,7 +245,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to update telemetry server", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update telemetry server newServer := metrics.NewTelemetryServer() @@ -255,7 +254,7 @@ func TestObserverGetterAndSetter(t *testing.T) { }) t.Run("should be able to get logger", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) logger := ob.Logger() // should be able to print log @@ -293,7 +292,7 @@ func TestOutboundID(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // create observer - ob := createObserver(t, tt.chain) + ob := createObserver(t, tt.chain, defaultAlertLatency) ob = ob.WithTSS(tt.tss) // get outbound id @@ -315,7 +314,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("should be able to load last block scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write 100 as last block scanned err := ob.WriteLastBlockScannedToDB(100) @@ -329,7 +328,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("latest block scanned should be 0 if not found in db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // read last block scanned err := ob.LoadLastBlockScanned(log.Logger) @@ -339,7 +338,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("should overwrite last block scanned if env var is set", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write 100 as last block scanned ob.WriteLastBlockScannedToDB(100) @@ -355,7 +354,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("last block scanned should remain 0 if env var is set to latest", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write 100 as last block scanned ob.WriteLastBlockScannedToDB(100) @@ -371,7 +370,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("should return error on invalid env var", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // set invalid env var os.Setenv(envvar, "invalid") @@ -385,7 +384,7 @@ func TestLoadLastBlockScanned(t *testing.T) { func TestSaveLastBlockScanned(t *testing.T) { t.Run("should be able to save last block scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chains.Ethereum) + ob := createObserver(t, chains.Ethereum, defaultAlertLatency) // save 100 as last block scanned err := ob.SaveLastBlockScanned(100) @@ -405,7 +404,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) { chain := chains.Ethereum t.Run("should be able to write and read last block scanned to db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // write last block scanned err := ob.WriteLastBlockScannedToDB(100) @@ -418,7 +417,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) { t.Run("should return error when last block scanned not found in db", func(t *testing.T) { // create empty db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) lastScannedBlock, err := ob.ReadLastBlockScannedFromDB() require.Error(t, err) @@ -432,7 +431,7 @@ func TestLoadLastTxScanned(t *testing.T) { t.Run("should be able to load last tx scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write sample hash as last tx scanned ob.WriteLastTxScannedToDB(lastTx) @@ -444,7 +443,7 @@ func TestLoadLastTxScanned(t *testing.T) { t.Run("latest tx scanned should be empty if not found in db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // read last tx scanned ob.LoadLastTxScanned() @@ -453,7 +452,7 @@ func TestLoadLastTxScanned(t *testing.T) { t.Run("should overwrite last tx scanned if env var is set", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write sample hash as last tx scanned ob.WriteLastTxScannedToDB(lastTx) @@ -472,7 +471,7 @@ func TestSaveLastTxScanned(t *testing.T) { chain := chains.SolanaDevnet t.Run("should be able to save last tx scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // save random tx hash lastSlot := uint64(100) @@ -495,7 +494,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { chain := chains.SolanaDevnet t.Run("should be able to write and read last tx scanned to db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // write last tx scanned lastTx := "5LuQMorgd11p8GWEw6pmyHCDtA26NUyeNFhLWPNk2oBoM9pkag1LzhwGSRos3j4TJLhKjswFhZkGtvSGdLDkmqsk" @@ -509,7 +508,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { t.Run("should return error when last tx scanned not found in db", func(t *testing.T) { // create empty db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) lastTxScanned, err := ob.ReadLastTxScannedFromDB() require.Error(t, err) @@ -520,7 +519,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { func TestPostVoteInbound(t *testing.T) { t.Run("should be able to post vote inbound", func(t *testing.T) { // create observer - ob := createObserver(t, chains.Ethereum) + ob := createObserver(t, chains.Ethereum, defaultAlertLatency) // create mock zetacore client zetacoreClient := mocks.NewZetacoreClient(t) @@ -535,6 +534,53 @@ func TestPostVoteInbound(t *testing.T) { }) } +func TestAlertOnRPCLatency(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + blockTime time.Time + alertLatency time.Duration + alerted bool + }{ + { + name: "should alert on high RPC latency", + blockTime: now.Add(-60 * time.Second), + alertLatency: 55, + alerted: true, + }, + { + name: "should not alert on normal RPC latency", + blockTime: now.Add(-60 * time.Second), + alertLatency: 65, + alerted: false, + }, + { + name: "should alert on higher RPC latency then default", + blockTime: now.Add(-65 * time.Second), + alertLatency: 0, // 0 means not set + alerted: true, + }, + { + name: "should not alert on normal RPC latency compared to default", + blockTime: now.Add(-55 * time.Second), + alertLatency: 0, // 0 means not set + alerted: false, + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // create observer + ob := createObserver(t, chains.Ethereum, tt.alertLatency) + + alerted := ob.AlertOnRPCLatency(tt.blockTime, defaultAlertLatency) + require.Equal(t, tt.alerted, alerted) + }) + } +} + func createDatabase(t *testing.T) *db.DB { sqlDatabase, err := db.NewFromSqliteInMemory(true) require.NoError(t, err) diff --git a/zetaclient/chains/bitcoin/observer/observer.go b/zetaclient/chains/bitcoin/observer/observer.go index 7a74164fff..6feeffbbe0 100644 --- a/zetaclient/chains/bitcoin/observer/observer.go +++ b/zetaclient/chains/bitcoin/observer/observer.go @@ -223,7 +223,7 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) // watch the RPC status of the bitcoin chain - bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) + bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // GetPendingNonce returns the artificial pending nonce diff --git a/zetaclient/chains/bitcoin/observer/rpc_status.go b/zetaclient/chains/bitcoin/observer/rpc_status.go index f9f6ad97b4..6e99ebf024 100644 --- a/zetaclient/chains/bitcoin/observer/rpc_status.go +++ b/zetaclient/chains/bitcoin/observer/rpc_status.go @@ -8,8 +8,8 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/common" ) -// WatchRPCStatus watches the RPC status of the Bitcoin chain -func (ob *Observer) WatchRPCStatus(_ context.Context) error { +// watchRPCStatus watches the RPC status of the Bitcoin chain +func (ob *Observer) watchRPCStatus(_ context.Context) error { ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) ticker := time.NewTicker(common.RPCStatusCheckInterval) @@ -20,15 +20,22 @@ func (ob *Observer) WatchRPCStatus(_ context.Context) error { continue } - alertLatency := ob.RPCAlertLatency() - tssAddress := ob.TSS().BTCAddressWitnessPubkeyHash() - err := rpc.CheckRPCStatus(ob.btcClient, alertLatency, tssAddress, ob.Logger().Chain) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") - } - + ob.checkRPCStatus() case <-ob.StopChannel(): return nil } } } + +// checkRPCStatus checks the RPC status of the Bitcoin chain +func (ob *Observer) checkRPCStatus() { + tssAddress := ob.TSS().BTCAddressWitnessPubkeyHash() + blockTime, err := rpc.CheckRPCStatus(ob.btcClient, tssAddress) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("CheckRPCStatus failed") + return + } + + // alert if RPC latency is too high + ob.AlertOnRPCLatency(blockTime, rpc.RPCAlertLatency) +} diff --git a/zetaclient/chains/bitcoin/rpc/rpc.go b/zetaclient/chains/bitcoin/rpc/rpc.go index a265dc45f3..472e8b0e0f 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc.go +++ b/zetaclient/chains/bitcoin/rpc/rpc.go @@ -10,7 +10,6 @@ import ( "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcutil" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" @@ -165,59 +164,36 @@ func GetRecentFeeRate(rpcClient interfaces.BTCRPCClient, netParams *chaincfg.Par return uint64(highestRate), nil } -// CheckRPCStatus checks the RPC status of the evm chain -func CheckRPCStatus( - client interfaces.BTCRPCClient, - alertLatency time.Duration, - tssAddress btcutil.Address, - logger zerolog.Logger, -) error { +// CheckRPCStatus checks the RPC status of the bitcoin chain +func CheckRPCStatus(client interfaces.BTCRPCClient, tssAddress btcutil.Address) (time.Time, error) { // query latest block number bn, err := client.GetBlockCount() if err != nil { - return errors.Wrap(err, "RPC error onGetBlockCount: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockCount, RPC down?") } // query latest block header hash, err := client.GetBlockHash(bn) if err != nil { - return errors.Wrap(err, "RPC error onGetBlockHash: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockHash, RPC down?") } // query latest block header thru hash header, err := client.GetBlockHeader(hash) if err != nil { - return errors.Wrap(err, "RPC error onGetBlockHeader: RPC down?") - } - - // use default alert latency if not provided - if alertLatency <= 0 { - alertLatency = RPCAlertLatency - } - - // latest block should not be too old - blockTime := header.Timestamp - elapsedTime := time.Since(blockTime) - if elapsedTime > alertLatency { - return errors.Errorf( - "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", - bn, - elapsedTime.Seconds(), - ) + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockHeader, RPC down?") } // should be able to list utxos owned by TSS address res, err := client.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddress}) if err != nil { - return errors.Wrap(err, "can't list utxos of TSS address; TSS address is not imported?") + return time.Time{}, errors.Wrap(err, "can't list utxos of TSS address; TSS address is not imported?") } // TSS address should have utxos if len(res) == 0 { - return errors.New("TSS address has no utxos; TSS address is not imported?") + return time.Time{}, errors.New("TSS address has no utxos; TSS address is not imported?") } - logger.Info(). - Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedTime.Seconds(), tssAddress, len(res)) - return nil + return header.Timestamp, nil } diff --git a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go index cde5fae2b7..4e12f33507 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go +++ b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go @@ -265,7 +265,7 @@ func LiveTestCheckRPCStatus(t *testing.T) { require.NoError(t, err) // check RPC status - err = rpc.CheckRPCStatus(client, rpc.RPCAlertLatency, tssAddress, log.Logger) + _, err = rpc.CheckRPCStatus(client, tssAddress) require.NoError(t, err) } diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index 5ed0acf136..3ac07cd375 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -200,7 +200,7 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchOutbound, bg.WithName("WatchOutbound"), bg.WithLogger(ob.Logger().Outbound)) bg.Work(ctx, ob.WatchGasPrice, bg.WithName("WatchGasPrice"), bg.WithLogger(ob.Logger().GasPrice)) bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) - bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) + bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // SetTxNReceipt sets the receipt and transaction in memory diff --git a/zetaclient/chains/evm/observer/rpc_status.go b/zetaclient/chains/evm/observer/rpc_status.go index 525c20b1ba..cd2ad4d917 100644 --- a/zetaclient/chains/evm/observer/rpc_status.go +++ b/zetaclient/chains/evm/observer/rpc_status.go @@ -9,9 +9,9 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/common" ) -// WatchRPCStatus watches the RPC status of the evm chain -func (ob *Observer) WatchRPCStatus(ctx context.Context) error { - ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) +// watchRPCStatus watches the RPC status of the EVM chain +func (ob *Observer) watchRPCStatus(ctx context.Context) error { + ob.Logger().Chain.Info().Msgf("watchRPCStatus started for chain %d", ob.Chain().ChainId) ticker := time.NewTicker(common.RPCStatusCheckInterval) for { @@ -21,13 +21,21 @@ func (ob *Observer) WatchRPCStatus(ctx context.Context) error { continue } - alertLatency := ob.RPCAlertLatency() - err := rpc.CheckRPCStatus(ctx, ob.evmClient, alertLatency, ob.Logger().Chain) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") - } + ob.checkRPCStatus(ctx) case <-ob.StopChannel(): return nil } } } + +// checkRPCStatus checks the RPC status of the EVM chain +func (ob *Observer) checkRPCStatus(ctx context.Context) { + blockTime, err := rpc.CheckRPCStatus(ctx, ob.evmClient) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("CheckRPCStatus failed") + return + } + + // alert if RPC latency is too high + ob.AlertOnRPCLatency(blockTime, rpc.RPCAlertLatency) +} diff --git a/zetaclient/chains/evm/rpc/rpc.go b/zetaclient/chains/evm/rpc/rpc.go index 22bea55110..4ae5441fc8 100644 --- a/zetaclient/chains/evm/rpc/rpc.go +++ b/zetaclient/chains/evm/rpc/rpc.go @@ -7,7 +7,6 @@ import ( ethcommon "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" ) @@ -61,48 +60,28 @@ func IsTxConfirmed( } // CheckRPCStatus checks the RPC status of the evm chain -func CheckRPCStatus( - ctx context.Context, - client interfaces.EVMRPCClient, - alertLatency time.Duration, - logger zerolog.Logger, -) error { +func CheckRPCStatus(ctx context.Context, client interfaces.EVMRPCClient) (time.Time, error) { // query latest block number bn, err := client.BlockNumber(ctx) if err != nil { - return errors.Wrap(err, "RPC error onBlockNumber: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on BlockNumber, RPC down?") } // query suggested gas price - gasPrice, err := client.SuggestGasPrice(ctx) + _, err = client.SuggestGasPrice(ctx) if err != nil { - return errors.Wrap(err, "RPC error onSuggestGasPrice: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on SuggestGasPrice, RPC down?") } // query latest block header header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(bn)) if err != nil { - return errors.Wrap(err, "RPC error onHeaderByNumber: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on HeaderByNumber, RPC down?") } - // use default alert latency if not provided - if alertLatency <= 0 { - alertLatency = RPCAlertLatency - } - - // latest block should not be too old + // convert block time to UTC // #nosec G115 always in range blockTime := time.Unix(int64(header.Time), 0).UTC() - elapsedTime := time.Since(blockTime) - if elapsedTime > alertLatency { - return errors.Errorf( - "Latest block %d is %.0fs old, RPC stale or chain stuck (check explorer)?", - bn, - elapsedTime.Seconds(), - ) - } - logger.Info(). - Msgf("RPC Status [OK]: latest block %d, timestamp %s (%.0fs ago), gas price %s", header.Number, blockTime.String(), elapsedTime.Seconds(), gasPrice.String()) - return nil + return blockTime, nil } diff --git a/zetaclient/chains/evm/rpc/rpc_live_test.go b/zetaclient/chains/evm/rpc/rpc_live_test.go index c6f2ad2368..5e0f7ca59a 100644 --- a/zetaclient/chains/evm/rpc/rpc_live_test.go +++ b/zetaclient/chains/evm/rpc/rpc_live_test.go @@ -5,7 +5,6 @@ import ( "math" "github.com/ethereum/go-ethereum/ethclient" - "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/zetaclient/chains/evm/rpc" "github.com/zeta-chain/zetacore/zetaclient/common" @@ -56,6 +55,6 @@ func LiveTest_CheckRPCStatus(t *testing.T) { require.NoError(t, err) ctx := context.Background() - err = rpc.CheckRPCStatus(ctx, client, rpc.RPCAlertLatency, log.Logger) + _, err = rpc.CheckRPCStatus(ctx, client) require.NoError(t, err) } diff --git a/zetaclient/chains/solana/observer/observer.go b/zetaclient/chains/solana/observer/observer.go index cc2e3a8204..d44f8e0803 100644 --- a/zetaclient/chains/solana/observer/observer.go +++ b/zetaclient/chains/solana/observer/observer.go @@ -135,7 +135,7 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) // watch RPC status of the Solana chain - bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) + bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // LoadLastTxScanned loads the last scanned tx from the database. diff --git a/zetaclient/chains/solana/observer/rpc_status.go b/zetaclient/chains/solana/observer/rpc_status.go index 7910ff6f7e..a01d128683 100644 --- a/zetaclient/chains/solana/observer/rpc_status.go +++ b/zetaclient/chains/solana/observer/rpc_status.go @@ -4,13 +4,14 @@ import ( "context" "time" + "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/zetaclient/chains/solana/rpc" "github.com/zeta-chain/zetacore/zetaclient/common" ) -// WatchRPCStatus watches the RPC status of the solana chain -func (ob *Observer) WatchRPCStatus(ctx context.Context) error { - ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) +// watchRPCStatus watches the RPC status of the Solana chain +func (ob *Observer) watchRPCStatus(ctx context.Context) error { + ob.Logger().Chain.Info().Msgf("watchRPCStatus started for chain %d", ob.Chain().ChainId) ticker := time.NewTicker(common.RPCStatusCheckInterval) for { @@ -20,13 +21,25 @@ func (ob *Observer) WatchRPCStatus(ctx context.Context) error { continue } - alertLatency := ob.RPCAlertLatency() - err := rpc.CheckRPCStatus(ctx, ob.solClient, alertLatency, ob.Logger().Chain) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status error") - } + ob.checkRPCStatus(ctx) case <-ob.StopChannel(): return nil } } } + +// checkRPCStatus checks the RPC status of the Solana chain +func (ob *Observer) checkRPCStatus(ctx context.Context) { + // Solana privnet doesn't have RPC 'GetHealth', need to differentiate + privnet := ob.Chain().NetworkType == chains.NetworkType_privnet + + // check the RPC status + blockTime, err := rpc.CheckRPCStatus(ctx, ob.solClient, privnet) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("CheckRPCStatus failed") + return + } + + // alert if RPC latency is too high + ob.AlertOnRPCLatency(blockTime, rpc.RPCAlertLatency) +} diff --git a/zetaclient/chains/solana/rpc/rpc.go b/zetaclient/chains/solana/rpc/rpc.go index c5cf6b0989..c1f378365b 100644 --- a/zetaclient/chains/solana/rpc/rpc.go +++ b/zetaclient/chains/solana/rpc/rpc.go @@ -7,7 +7,6 @@ import ( "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" ) @@ -124,46 +123,26 @@ func GetSignaturesForAddressUntil( } // CheckRPCStatus checks the RPC status of the solana chain -func CheckRPCStatus( - ctx context.Context, - client interfaces.SolanaRPCClient, - alertLatency time.Duration, - logger zerolog.Logger, -) error { +func CheckRPCStatus(ctx context.Context, client interfaces.SolanaRPCClient, privnet bool) (time.Time, error) { // query solana health (always return "ok" unless --trusted-validator is provided) - _, err := client.GetHealth(ctx) - if err != nil { - return errors.Wrap(err, "RPC error onGetHealth: RPC down?") + if !privnet { + _, err := client.GetHealth(ctx) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on GetHealth, RPC down?") + } } // query latest slot slot, err := client.GetSlot(ctx, rpc.CommitmentFinalized) if err != nil { - return errors.Wrap(err, "RPC error onGetSlot: RPC down?") + return time.Time{}, errors.Wrap(err, "RPC failed on GetSlot, RPC down?") } // query latest block time blockTime, err := client.GetBlockTime(ctx, slot) if err != nil { - return errors.Wrap(err, "RPC error onGetBlockTime: RPC down?") - } - - // use default alert latency if not provided - if alertLatency <= 0 { - alertLatency = RPCAlertLatency - } - - // latest block should not be too old - elapsedTime := time.Since(blockTime.Time()) - if elapsedTime > alertLatency { - return errors.Errorf( - "Latest slot %d is %.0fs old, RPC stale or chain stuck (check explorer)?", - slot, - elapsedTime.Seconds(), - ) + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockTime, RPC down?") } - logger.Info(). - Msgf("RPC Status [OK]: latest slot %d, timestamp %s (%.0fs ago)", slot, blockTime.String(), elapsedTime.Seconds()) - return nil + return blockTime.Time(), nil } diff --git a/zetaclient/chains/solana/rpc/rpc_live_test.go b/zetaclient/chains/solana/rpc/rpc_live_test.go index 35c1b6ae70..51c51c2f61 100644 --- a/zetaclient/chains/solana/rpc/rpc_live_test.go +++ b/zetaclient/chains/solana/rpc/rpc_live_test.go @@ -6,7 +6,6 @@ import ( "github.com/gagliardetto/solana-go" solanarpc "github.com/gagliardetto/solana-go/rpc" - "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/zetaclient/chains/solana/rpc" "github.com/zeta-chain/zetacore/zetaclient/common" @@ -68,6 +67,6 @@ func LiveTest_CheckRPCStatus(t *testing.T) { // check the RPC status ctx := context.Background() - err := rpc.CheckRPCStatus(ctx, client, rpc.RPCAlertLatency, log.Logger) + _, err := rpc.CheckRPCStatus(ctx, client, false) require.NoError(t, err) } From 05139aa9dd5c14cde3fd81fe4ec3f1fb1202dbe6 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Fri, 13 Sep 2024 15:08:55 -0700 Subject: [PATCH 7/7] unified param 'rpcAlertLatency' as type int64 when passing cfg.RPCAlertLatency to NewObserver() --- zetaclient/chains/base/observer.go | 4 ++-- zetaclient/chains/base/observer_test.go | 12 ++++++------ zetaclient/chains/bitcoin/observer/observer.go | 3 +-- zetaclient/chains/evm/observer/observer.go | 4 ++-- zetaclient/chains/evm/observer/observer_test.go | 2 ++ zetaclient/chains/evm/signer/signer_test.go | 1 + zetaclient/chains/solana/observer/observer.go | 3 +-- zetaclient/common/env.go | 10 ++++++++-- zetaclient/orchestrator/bootstrap.go | 6 +++--- 9 files changed, 26 insertions(+), 19 deletions(-) diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index 26939ee500..020fd323f2 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -96,7 +96,7 @@ func NewObserver( tss interfaces.TSSSigner, blockCacheSize int, headerCacheSize int, - rpcAlertLatency time.Duration, + rpcAlertLatency int64, ts *metrics.TelemetryServer, database *db.DB, logger Logger, @@ -109,7 +109,7 @@ func NewObserver( lastBlock: 0, lastBlockScanned: 0, lastTxScanned: "", - rpcAlertLatency: rpcAlertLatency * time.Second, // latency in seconds + rpcAlertLatency: time.Duration(rpcAlertLatency) * time.Second, ts: ts, db: database, mu: &sync.Mutex{}, diff --git a/zetaclient/chains/base/observer_test.go b/zetaclient/chains/base/observer_test.go index 9dd98ae0b6..01713f6c4c 100644 --- a/zetaclient/chains/base/observer_test.go +++ b/zetaclient/chains/base/observer_test.go @@ -25,12 +25,12 @@ import ( ) const ( - // defaultAlertLatency is the default alert latency for unit tests - defaultAlertLatency = 60 * time.Second + // defaultAlertLatency is the default alert latency (in seconds) for unit tests + defaultAlertLatency = 60 ) // createObserver creates a new observer for testing -func createObserver(t *testing.T, chain chains.Chain, alertLatency time.Duration) *base.Observer { +func createObserver(t *testing.T, chain chains.Chain, alertLatency int64) *base.Observer { // constructor parameters chainParams := *sample.ChainParams(chain.ChainId) zetacoreClient := mocks.NewZetacoreClient(t) @@ -540,7 +540,7 @@ func TestAlertOnRPCLatency(t *testing.T) { tests := []struct { name string blockTime time.Time - alertLatency time.Duration + alertLatency int64 alerted bool }{ { @@ -562,7 +562,7 @@ func TestAlertOnRPCLatency(t *testing.T) { alerted: true, }, { - name: "should not alert on normal RPC latency compared to default", + name: "should not alert on normal RPC latency when compared to default", blockTime: now.Add(-55 * time.Second), alertLatency: 0, // 0 means not set alerted: false, @@ -575,7 +575,7 @@ func TestAlertOnRPCLatency(t *testing.T) { // create observer ob := createObserver(t, chains.Ethereum, tt.alertLatency) - alerted := ob.AlertOnRPCLatency(tt.blockTime, defaultAlertLatency) + alerted := ob.AlertOnRPCLatency(tt.blockTime, time.Duration(defaultAlertLatency)*time.Second) require.Equal(t, tt.alerted, alerted) }) } diff --git a/zetaclient/chains/bitcoin/observer/observer.go b/zetaclient/chains/bitcoin/observer/observer.go index 1122ed0d15..50de502b79 100644 --- a/zetaclient/chains/bitcoin/observer/observer.go +++ b/zetaclient/chains/bitcoin/observer/observer.go @@ -9,7 +9,6 @@ import ( "math/big" "sort" "strings" - "time" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg" @@ -115,7 +114,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, - rpcAlertLatency time.Duration, + rpcAlertLatency int64, database *db.DB, logger base.Logger, ts *metrics.TelemetryServer, diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index 3fab354afe..c10ecbc54f 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -7,7 +7,6 @@ import ( "math" "math/big" "strings" - "time" ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -67,6 +66,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, + rpcAlertLatency int64, database *db.DB, logger base.Logger, ts *metrics.TelemetryServer, @@ -79,7 +79,7 @@ func NewObserver( tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, - time.Duration(evmCfg.RPCAlertLatency), + rpcAlertLatency, ts, database, logger, diff --git a/zetaclient/chains/evm/observer/observer_test.go b/zetaclient/chains/evm/observer/observer_test.go index f89715022a..713148025d 100644 --- a/zetaclient/chains/evm/observer/observer_test.go +++ b/zetaclient/chains/evm/observer/observer_test.go @@ -135,6 +135,7 @@ func MockEVMObserver( params, zetacoreClient, tss, + 60, database, logger, nil, @@ -247,6 +248,7 @@ func Test_NewObserver(t *testing.T) { tt.chainParams, zetacoreClient, tt.tss, + 60, database, tt.logger, tt.ts, diff --git a/zetaclient/chains/evm/signer/signer_test.go b/zetaclient/chains/evm/signer/signer_test.go index 807d6b7819..cbff2a9c1f 100644 --- a/zetaclient/chains/evm/signer/signer_test.go +++ b/zetaclient/chains/evm/signer/signer_test.go @@ -88,6 +88,7 @@ func getNewEvmChainObserver(t *testing.T, tss interfaces.TSSSigner) (*observer.O params, mocks.NewZetacoreClient(t), tss, + 60, database, logger, ts, diff --git a/zetaclient/chains/solana/observer/observer.go b/zetaclient/chains/solana/observer/observer.go index 8ce374210e..634bdfd635 100644 --- a/zetaclient/chains/solana/observer/observer.go +++ b/zetaclient/chains/solana/observer/observer.go @@ -2,7 +2,6 @@ package observer import ( "context" - "time" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" @@ -45,7 +44,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, - rpcAlertLatency time.Duration, + rpcAlertLatency int64, db *db.DB, logger base.Logger, ts *metrics.TelemetryServer, diff --git a/zetaclient/common/env.go b/zetaclient/common/env.go index ca48ef7615..f3e97110c6 100644 --- a/zetaclient/common/env.go +++ b/zetaclient/common/env.go @@ -2,7 +2,7 @@ package common import ( "os" - "strings" + "strconv" ) const ( @@ -20,5 +20,11 @@ const ( func LiveTestEnabled() bool { value := os.Getenv(EnvEnableLiveTest) - return strings.ToLower(value) == "true" || value == "1" + // parse flag + enabled, err := strconv.ParseBool(value) + if err != nil { + return false + } + + return enabled } diff --git a/zetaclient/orchestrator/bootstrap.go b/zetaclient/orchestrator/bootstrap.go index e9d0cea251..f7be6ad504 100644 --- a/zetaclient/orchestrator/bootstrap.go +++ b/zetaclient/orchestrator/bootstrap.go @@ -2,7 +2,6 @@ package orchestrator import ( "context" - "time" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" @@ -311,6 +310,7 @@ func syncObserverMap( *params, client, tss, + cfg.RPCAlertLatency, database, logger, ts, @@ -346,7 +346,7 @@ func syncObserverMap( *params, client, tss, - time.Duration(cfg.RPCAlertLatency), + cfg.RPCAlertLatency, database, logger, ts, @@ -383,7 +383,7 @@ func syncObserverMap( *params, client, tss, - time.Duration(cfg.RPCAlertLatency), + cfg.RPCAlertLatency, database, logger, ts,