diff --git a/changelog.md b/changelog.md index 2b3667d36d..e14bd3935d 100644 --- a/changelog.md +++ b/changelog.md @@ -46,6 +46,11 @@ * [2681](https://github.com/zeta-chain/node/pull/2681) - implement `MsgUpdateERC20CustodyPauseStatus` to pause or unpause ERC20 Custody contract (to be used for the migration process for smart contract V2) * [2644](https://github.com/zeta-chain/node/pull/2644) - add created_timestamp to cctx status * [2673](https://github.com/zeta-chain/node/pull/2673) - add relayer key importer, encryption and decryption +* [2633](https://github.com/zeta-chain/node/pull/2633) - support for stateful precompiled contracts +* [2751](https://github.com/zeta-chain/node/pull/2751) - add RPC status check for Solana chain +* [2788](https://github.com/zeta-chain/node/pull/2788) - add common importable zetacored rpc package +* [2784](https://github.com/zeta-chain/node/pull/2784) - staking precompiled contract +* [2795](https://github.com/zeta-chain/node/pull/2795) - support restricted address in Solana * [2825](https://github.com/zeta-chain/node/pull/2825) - add Bitcoin inscriptions support ### Refactor diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index 3ea4fb423e..020fd323f2 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -7,6 +7,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" @@ -60,6 +61,9 @@ type Observer struct { // lastTxScanned is the last transaction hash scanned by the observer lastTxScanned string + // rpcAlertLatency is the threshold of RPC latency to trigger an alert + rpcAlertLatency time.Duration + // blockCache is the cache for blocks blockCache *lru.Cache @@ -92,6 +96,7 @@ func NewObserver( tss interfaces.TSSSigner, blockCacheSize int, headerCacheSize int, + rpcAlertLatency int64, ts *metrics.TelemetryServer, database *db.DB, logger Logger, @@ -104,6 +109,7 @@ func NewObserver( lastBlock: 0, lastBlockScanned: 0, lastTxScanned: "", + rpcAlertLatency: time.Duration(rpcAlertLatency) * time.Second, ts: ts, db: database, mu: &sync.Mutex{}, @@ -452,6 +458,27 @@ func (ob *Observer) PostVoteInbound( return ballot, err } +// AlertOnRPCLatency prints an alert if the RPC latency exceeds the threshold. +// Returns true if the RPC latency is too high. +func (ob *Observer) AlertOnRPCLatency(latestBlockTime time.Time, defaultAlertLatency time.Duration) bool { + // use configured alert latency if set + alertLatency := defaultAlertLatency + if ob.rpcAlertLatency > 0 { + alertLatency = ob.rpcAlertLatency + } + + // latest block should not be too old + elapsedTime := time.Since(latestBlockTime) + if elapsedTime > alertLatency { + ob.logger.Chain.Error(). + Msgf("RPC is stale: latest block is %.0f seconds old, RPC down or chain stuck (check explorer)?", elapsedTime.Seconds()) + return true + } + + ob.logger.Chain.Info().Msgf("RPC is OK: latest block is %.0f seconds old", elapsedTime.Seconds()) + return false +} + // EnvVarLatestBlockByChain returns the environment variable for the last block by chain. func EnvVarLatestBlockByChain(chain chains.Chain) string { return fmt.Sprintf("CHAIN_%d_SCAN_FROM_BLOCK", chain.ChainId) diff --git a/zetaclient/chains/base/observer_test.go b/zetaclient/chains/base/observer_test.go index 15689b6788..01713f6c4c 100644 --- a/zetaclient/chains/base/observer_test.go +++ b/zetaclient/chains/base/observer_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "testing" + "time" lru "github.com/hashicorp/golang-lru" "github.com/rs/zerolog" @@ -23,8 +24,13 @@ import ( "github.com/zeta-chain/node/zetaclient/testutils/mocks" ) +const ( + // defaultAlertLatency is the default alert latency (in seconds) for unit tests + defaultAlertLatency = 60 +) + // createObserver creates a new observer for testing -func createObserver(t *testing.T, chain chains.Chain) *base.Observer { +func createObserver(t *testing.T, chain chains.Chain, alertLatency int64) *base.Observer { // constructor parameters chainParams := *sample.ChainParams(chain.ChainId) zetacoreClient := mocks.NewZetacoreClient(t) @@ -41,6 +47,7 @@ func createObserver(t *testing.T, chain chains.Chain) *base.Observer { tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, + alertLatency, nil, database, logger, @@ -122,6 +129,7 @@ func TestNewObserver(t *testing.T) { tt.tss, tt.blockCacheSize, tt.headerCacheSize, + 60, nil, database, base.DefaultLogger(), @@ -141,7 +149,7 @@ func TestNewObserver(t *testing.T) { func TestStop(t *testing.T) { t.Run("should be able to stop observer", func(t *testing.T) { // create observer and initialize db - ob := createObserver(t, chains.Ethereum) + ob := createObserver(t, chains.Ethereum, defaultAlertLatency) // stop observer ob.Stop() @@ -152,63 +160,70 @@ func TestObserverGetterAndSetter(t *testing.T) { chain := chains.Ethereum t.Run("should be able to update chain", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update chain newChain := chains.BscMainnet ob = ob.WithChain(chains.BscMainnet) require.Equal(t, newChain, ob.Chain()) }) + t.Run("should be able to update chain params", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update chain params newChainParams := *sample.ChainParams(chains.BscMainnet.ChainId) ob = ob.WithChainParams(newChainParams) require.True(t, observertypes.ChainParamsEqual(newChainParams, ob.ChainParams())) }) + t.Run("should be able to update zetacore client", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update zetacore client newZetacoreClient := mocks.NewZetacoreClient(t) ob = ob.WithZetacoreClient(newZetacoreClient) require.Equal(t, newZetacoreClient, ob.ZetacoreClient()) }) + t.Run("should be able to update tss", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update tss newTSS := mocks.NewTSSAthens3() ob = ob.WithTSS(newTSS) require.Equal(t, newTSS, ob.TSS()) }) + t.Run("should be able to update last block", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update last block newLastBlock := uint64(100) ob = ob.WithLastBlock(newLastBlock) require.Equal(t, newLastBlock, ob.LastBlock()) }) + t.Run("should be able to update last block scanned", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update last block scanned newLastBlockScanned := uint64(100) ob = ob.WithLastBlockScanned(newLastBlockScanned) require.Equal(t, newLastBlockScanned, ob.LastBlockScanned()) }) + t.Run("should be able to update last tx scanned", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update last tx scanned newLastTxScanned := sample.EthAddress().String() ob = ob.WithLastTxScanned(newLastTxScanned) require.Equal(t, newLastTxScanned, ob.LastTxScanned()) }) + t.Run("should be able to replace block cache", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update block cache newBlockCache, err := lru.New(200) @@ -217,8 +232,9 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithBlockCache(newBlockCache) require.Equal(t, newBlockCache, ob.BlockCache()) }) + t.Run("should be able to replace header cache", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update headers cache newHeadersCache, err := lru.New(200) @@ -227,16 +243,18 @@ func TestObserverGetterAndSetter(t *testing.T) { ob = ob.WithHeaderCache(newHeadersCache) require.Equal(t, newHeadersCache, ob.HeaderCache()) }) + t.Run("should be able to update telemetry server", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // update telemetry server newServer := metrics.NewTelemetryServer() ob = ob.WithTelemetryServer(newServer) require.Equal(t, newServer, ob.TelemetryServer()) }) + t.Run("should be able to get logger", func(t *testing.T) { - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) logger := ob.Logger() // should be able to print log @@ -274,7 +292,7 @@ func TestOutboundID(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // create observer - ob := createObserver(t, tt.chain) + ob := createObserver(t, tt.chain, defaultAlertLatency) ob = ob.WithTSS(tt.tss) // get outbound id @@ -296,7 +314,7 @@ func TestLoadLastBlockScanned(t *testing.T) { t.Run("should be able to load last block scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write 100 as last block scanned err := ob.WriteLastBlockScannedToDB(100) @@ -307,18 +325,20 @@ func TestLoadLastBlockScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 100, ob.LastBlockScanned()) }) + t.Run("latest block scanned should be 0 if not found in db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // read last block scanned err := ob.LoadLastBlockScanned(log.Logger) require.NoError(t, err) require.EqualValues(t, 0, ob.LastBlockScanned()) }) + t.Run("should overwrite last block scanned if env var is set", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write 100 as last block scanned ob.WriteLastBlockScannedToDB(100) @@ -331,9 +351,10 @@ func TestLoadLastBlockScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 101, ob.LastBlockScanned()) }) + t.Run("last block scanned should remain 0 if env var is set to latest", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write 100 as last block scanned ob.WriteLastBlockScannedToDB(100) @@ -346,9 +367,10 @@ func TestLoadLastBlockScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 0, ob.LastBlockScanned()) }) + t.Run("should return error on invalid env var", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // set invalid env var os.Setenv(envvar, "invalid") @@ -362,7 +384,7 @@ func TestLoadLastBlockScanned(t *testing.T) { func TestSaveLastBlockScanned(t *testing.T) { t.Run("should be able to save last block scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chains.Ethereum) + ob := createObserver(t, chains.Ethereum, defaultAlertLatency) // save 100 as last block scanned err := ob.SaveLastBlockScanned(100) @@ -382,7 +404,7 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) { chain := chains.Ethereum t.Run("should be able to write and read last block scanned to db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // write last block scanned err := ob.WriteLastBlockScannedToDB(100) @@ -392,9 +414,10 @@ func TestReadWriteDBLastBlockScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 100, lastBlockScanned) }) + t.Run("should return error when last block scanned not found in db", func(t *testing.T) { // create empty db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) lastScannedBlock, err := ob.ReadLastBlockScannedFromDB() require.Error(t, err) @@ -408,7 +431,7 @@ func TestLoadLastTxScanned(t *testing.T) { t.Run("should be able to load last tx scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write sample hash as last tx scanned ob.WriteLastTxScannedToDB(lastTx) @@ -417,17 +440,19 @@ func TestLoadLastTxScanned(t *testing.T) { ob.LoadLastTxScanned() require.EqualValues(t, lastTx, ob.LastTxScanned()) }) + t.Run("latest tx scanned should be empty if not found in db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // read last tx scanned ob.LoadLastTxScanned() require.Empty(t, ob.LastTxScanned()) }) + t.Run("should overwrite last tx scanned if env var is set", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // create db and write sample hash as last tx scanned ob.WriteLastTxScannedToDB(lastTx) @@ -446,7 +471,7 @@ func TestSaveLastTxScanned(t *testing.T) { chain := chains.SolanaDevnet t.Run("should be able to save last tx scanned", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // save random tx hash lastSlot := uint64(100) @@ -469,7 +494,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { chain := chains.SolanaDevnet t.Run("should be able to write and read last tx scanned to db", func(t *testing.T) { // create observer and open db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) // write last tx scanned lastTx := "5LuQMorgd11p8GWEw6pmyHCDtA26NUyeNFhLWPNk2oBoM9pkag1LzhwGSRos3j4TJLhKjswFhZkGtvSGdLDkmqsk" @@ -480,9 +505,10 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { require.NoError(t, err) require.EqualValues(t, lastTx, lastTxScanned) }) + t.Run("should return error when last tx scanned not found in db", func(t *testing.T) { // create empty db - ob := createObserver(t, chain) + ob := createObserver(t, chain, defaultAlertLatency) lastTxScanned, err := ob.ReadLastTxScannedFromDB() require.Error(t, err) @@ -493,7 +519,7 @@ func TestReadWriteDBLastTxScanned(t *testing.T) { func TestPostVoteInbound(t *testing.T) { t.Run("should be able to post vote inbound", func(t *testing.T) { // create observer - ob := createObserver(t, chains.Ethereum) + ob := createObserver(t, chains.Ethereum, defaultAlertLatency) // create mock zetacore client zetacoreClient := mocks.NewZetacoreClient(t) @@ -508,6 +534,53 @@ func TestPostVoteInbound(t *testing.T) { }) } +func TestAlertOnRPCLatency(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + blockTime time.Time + alertLatency int64 + alerted bool + }{ + { + name: "should alert on high RPC latency", + blockTime: now.Add(-60 * time.Second), + alertLatency: 55, + alerted: true, + }, + { + name: "should not alert on normal RPC latency", + blockTime: now.Add(-60 * time.Second), + alertLatency: 65, + alerted: false, + }, + { + name: "should alert on higher RPC latency then default", + blockTime: now.Add(-65 * time.Second), + alertLatency: 0, // 0 means not set + alerted: true, + }, + { + name: "should not alert on normal RPC latency when compared to default", + blockTime: now.Add(-55 * time.Second), + alertLatency: 0, // 0 means not set + alerted: false, + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // create observer + ob := createObserver(t, chains.Ethereum, tt.alertLatency) + + alerted := ob.AlertOnRPCLatency(tt.blockTime, time.Duration(defaultAlertLatency)*time.Second) + require.Equal(t, tt.alerted, alerted) + }) + } +} + func createDatabase(t *testing.T) *db.DB { sqlDatabase, err := db.NewFromSqliteInMemory(true) require.NoError(t, err) diff --git a/zetaclient/chains/bitcoin/observer/observer.go b/zetaclient/chains/bitcoin/observer/observer.go index 4f3ad4ea28..50de502b79 100644 --- a/zetaclient/chains/bitcoin/observer/observer.go +++ b/zetaclient/chains/bitcoin/observer/observer.go @@ -9,7 +9,6 @@ import ( "math/big" "sort" "strings" - "time" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg" @@ -115,6 +114,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, + rpcAlertLatency int64, database *db.DB, logger base.Logger, ts *metrics.TelemetryServer, @@ -127,6 +127,7 @@ func NewObserver( tss, btcBlocksPerDay, base.DefaultHeaderCacheSize, + rpcAlertLatency, ts, database, logger, @@ -221,71 +222,7 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) // watch the RPC status of the bitcoin chain - bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) -} - -// WatchRPCStatus watches the RPC status of the Bitcoin chain -// TODO(revamp): move ticker related functions to a specific file -// TODO(revamp): move inner logic in a separate function -func (ob *Observer) WatchRPCStatus(_ context.Context) error { - ob.logger.Chain.Info().Msgf("RPCStatus is starting") - ticker := time.NewTicker(60 * time.Second) - - for { - select { - case <-ticker.C: - if !ob.GetChainParams().IsSupported { - continue - } - - bn, err := ob.btcClient.GetBlockCount() - if err != nil { - ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") - continue - } - - hash, err := ob.btcClient.GetBlockHash(bn) - if err != nil { - ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") - continue - } - - header, err := ob.btcClient.GetBlockHeader(hash) - if err != nil { - ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") - continue - } - - blockTime := header.Timestamp - elapsedSeconds := time.Since(blockTime).Seconds() - if elapsedSeconds > 1200 { - ob.logger.Chain.Error().Err(err).Msg("RPC status check: RPC down? ") - continue - } - - tssAddr := ob.TSS().BTCAddressWitnessPubkeyHash() - res, err := ob.btcClient.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddr}) - if err != nil { - ob.logger.Chain.Error(). - Err(err). - Msg("RPC status check: can't list utxos of TSS address; wallet or loaded? TSS address is not imported? ") - continue - } - - if len(res) == 0 { - ob.logger.Chain.Error(). - Err(err). - Msg("RPC status check: TSS address has no utxos; TSS address is not imported? ") - continue - } - - ob.logger.Chain.Info(). - Msgf("[OK] RPC status check: latest block number %d, timestamp %s (%.fs ago), tss addr %s, #utxos: %d", bn, blockTime, elapsedSeconds, tssAddr, len(res)) - - case <-ob.StopChannel(): - return nil - } - } + bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // GetPendingNonce returns the artificial pending nonce @@ -399,12 +336,12 @@ func (ob *Observer) PostGasPrice(ctx context.Context) error { // TODO(revamp): move in upper package to separate file (e.g., rpc.go) func GetSenderAddressByVin(rpcClient interfaces.BTCRPCClient, vin btcjson.Vin, net *chaincfg.Params) (string, error) { // query previous raw transaction by txid - // GetTransaction requires reconfiguring the bitcoin node (txindex=1), so we use GetRawTransaction instead hash, err := chainhash.NewHashFromStr(vin.Txid) if err != nil { return "", err } + // this requires running bitcoin node with 'txindex=1' tx, err := rpcClient.GetRawTransaction(hash) if err != nil { return "", errors.Wrapf(err, "error getting raw transaction %s", vin.Txid) diff --git a/zetaclient/chains/bitcoin/observer/observer_test.go b/zetaclient/chains/bitcoin/observer/observer_test.go index e4502d4b11..60686ff568 100644 --- a/zetaclient/chains/bitcoin/observer/observer_test.go +++ b/zetaclient/chains/bitcoin/observer/observer_test.go @@ -83,6 +83,7 @@ func MockBTCObserver( params, nil, nil, + 60, database, base.Logger{}, nil, @@ -169,6 +170,7 @@ func Test_NewObserver(t *testing.T) { tt.chainParams, tt.coreClient, tt.tss, + 60, database, tt.logger, tt.ts, diff --git a/zetaclient/chains/bitcoin/observer/outbound_test.go b/zetaclient/chains/bitcoin/observer/outbound_test.go index e9d30bd886..3cad907a19 100644 --- a/zetaclient/chains/bitcoin/observer/outbound_test.go +++ b/zetaclient/chains/bitcoin/observer/outbound_test.go @@ -32,7 +32,7 @@ func MockBTCObserverMainnet(t *testing.T) *Observer { require.NoError(t, err) // create Bitcoin observer - ob, err := NewObserver(chain, btcClient, params, nil, tss, database, base.Logger{}, nil) + ob, err := NewObserver(chain, btcClient, params, nil, tss, 60, database, base.Logger{}, nil) require.NoError(t, err) return ob diff --git a/zetaclient/chains/bitcoin/observer/rpc_status.go b/zetaclient/chains/bitcoin/observer/rpc_status.go new file mode 100644 index 0000000000..008d49aa59 --- /dev/null +++ b/zetaclient/chains/bitcoin/observer/rpc_status.go @@ -0,0 +1,41 @@ +package observer + +import ( + "context" + "time" + + "github.com/zeta-chain/node/zetaclient/chains/bitcoin/rpc" + "github.com/zeta-chain/node/zetaclient/common" +) + +// watchRPCStatus watches the RPC status of the Bitcoin chain +func (ob *Observer) watchRPCStatus(_ context.Context) error { + ob.Logger().Chain.Info().Msgf("WatchRPCStatus started for chain %d", ob.Chain().ChainId) + + ticker := time.NewTicker(common.RPCStatusCheckInterval) + for { + select { + case <-ticker.C: + if !ob.GetChainParams().IsSupported { + continue + } + + ob.checkRPCStatus() + case <-ob.StopChannel(): + return nil + } + } +} + +// checkRPCStatus checks the RPC status of the Bitcoin chain +func (ob *Observer) checkRPCStatus() { + tssAddress := ob.TSS().BTCAddressWitnessPubkeyHash() + blockTime, err := rpc.CheckRPCStatus(ob.btcClient, tssAddress) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("CheckRPCStatus failed") + return + } + + // alert if RPC latency is too high + ob.AlertOnRPCLatency(blockTime, rpc.RPCAlertLatency) +} diff --git a/zetaclient/chains/bitcoin/rpc/rpc.go b/zetaclient/chains/bitcoin/rpc/rpc.go index c70b7a4275..e9a0e2ddcc 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc.go +++ b/zetaclient/chains/bitcoin/rpc/rpc.go @@ -2,6 +2,7 @@ package rpc import ( "fmt" + "time" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -13,6 +14,12 @@ import ( "github.com/zeta-chain/node/zetaclient/config" ) +const ( + // RPCAlertLatency is the default threshold for RPC latency to be considered unhealthy and trigger an alert. + // Bitcoin block time is 10 minutes, 1200s (20 minutes) is a reasonable threshold for Bitcoin + RPCAlertLatency = time.Duration(1200) * time.Second +) + // NewRPCClient creates a new RPC client by the given config. func NewRPCClient(btcConfig config.BTCConfig) (*rpcclient.Client, error) { connCfg := &rpcclient.ConnConfig{ @@ -173,3 +180,37 @@ func GetTransactionFeeAndRate(rpcClient interfaces.BTCRPCClient, rawResult *btcj return fee, feeRate, nil } + +// CheckRPCStatus checks the RPC status of the bitcoin chain +func CheckRPCStatus(client interfaces.BTCRPCClient, tssAddress btcutil.Address) (time.Time, error) { + // query latest block number + bn, err := client.GetBlockCount() + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockCount, RPC down?") + } + + // query latest block header + hash, err := client.GetBlockHash(bn) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockHash, RPC down?") + } + + // query latest block header thru hash + header, err := client.GetBlockHeader(hash) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockHeader, RPC down?") + } + + // should be able to list utxos owned by TSS address + res, err := client.ListUnspentMinMaxAddresses(0, 1000000, []btcutil.Address{tssAddress}) + if err != nil { + return time.Time{}, errors.Wrap(err, "can't list utxos of TSS address; TSS address is not imported?") + } + + // TSS address should have utxos + if len(res) == 0 { + return time.Time{}, errors.New("TSS address has no utxos; TSS address is not imported?") + } + + return header.Timestamp, nil +} diff --git a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go index e2f44066c4..2eee4bf023 100644 --- a/zetaclient/chains/bitcoin/rpc/rpc_live_test.go +++ b/zetaclient/chains/bitcoin/rpc/rpc_live_test.go @@ -31,8 +31,8 @@ import ( // createRPCClient creates a new Bitcoin RPC client for given chainID func createRPCClient(chainID int64) (*rpcclient.Client, error) { var connCfg *rpcclient.ConnConfig - rpcMainnet := os.Getenv("BTC_RPC_MAINNET") - rpcTestnet := os.Getenv("BTC_RPC_TESTNET") + rpcMainnet := os.Getenv(common.EnvBtcRPCMainnet) + rpcTestnet := os.Getenv(common.EnvBtcRPCTestnet) // mainnet if chainID == chains.BitcoinMainnet.ChainId { @@ -177,17 +177,34 @@ func LiveTest_FilterAndParseIncomingTx_Nop(t *testing.T) { &chaincfg.TestNet3Params, 0.0, ) + require.NoError(t, err) require.Empty(t, inbounds) } +// TestBitcoinObserverLive is a phony test to run each live test individually +func TestBitcoinObserverLive(t *testing.T) { + if !common.LiveTestEnabled() { + return + } + + LiveTest_NewRPCClient(t) + LiveTest_CheckRPCStatus(t) + LiveTest_GetBlockHeightByHash(t) + LiveTest_BitcoinFeeRate(t) + LiveTest_AvgFeeRateMainnetMempoolSpace(t) + LiveTest_AvgFeeRateTestnetMempoolSpace(t) + LiveTest_GetRecentFeeRate(t) + LiveTest_GetSenderByVin(t) +} + // LiveTestNewRPCClient creates a new Bitcoin RPC client func LiveTest_NewRPCClient(t *testing.T) { btcConfig := config.BTCConfig{ RPCUsername: "user", RPCPassword: "pass", - RPCHost: "bitcoin.rpc.zetachain.com/6315704c-49bc-4649-8b9d-e9278a1dfeb3", - RPCParams: "mainnet", + RPCHost: os.Getenv(common.EnvBtcRPCTestnet), + RPCParams: "testnet3", } // create Bitcoin RPC client @@ -200,6 +217,22 @@ func LiveTest_NewRPCClient(t *testing.T) { require.Greater(t, bn, int64(0)) } +// Live_TestCheckRPCStatus checks the RPC status of the Bitcoin chain +func LiveTest_CheckRPCStatus(t *testing.T) { + // setup Bitcoin client + chainID := chains.BitcoinMainnet.ChainId + client, err := createRPCClient(chainID) + require.NoError(t, err) + + // decode tss address + tssAddress, err := chains.DecodeBtcAddress(testutils.TSSAddressBTCMainnet, chainID) + require.NoError(t, err) + + // check RPC status + _, err = rpc.CheckRPCStatus(client, tssAddress) + require.NoError(t, err) +} + // LiveTestGetBlockHeightByHash queries Bitcoin block height by hash func LiveTest_GetBlockHeightByHash(t *testing.T) { // setup Bitcoin client @@ -263,9 +296,11 @@ func LiveTest_BitcoinFeeRate(t *testing.T) { feeRateEconomical2.Uint64(), ) - // monitor fee rate every 5 minutes - for { - time.Sleep(time.Duration(5) * time.Minute) + // monitor fee rate every 5 minutes, adjust the iteration count as needed + for i := 0; i < 1; i++ { + // please uncomment this interval for long running test + //time.Sleep(time.Duration(5) * time.Minute) + bn, err = client.GetBlockCount() feeRateConservative1, errCon1 = getFeeRate(client, 1, &btcjson.EstimateModeConservative) feeRateEconomical1, errEco1 = getFeeRate(client, 1, &btcjson.EstimateModeEconomical) @@ -356,7 +391,7 @@ func LiveTest_AvgFeeRateMainnetMempoolSpace(t *testing.T) { // test against mempool.space API for 10000 blocks //startBlock := 210000 * 3 // 3rd halving startBlock := 829596 - endBlock := startBlock - 10000 + endBlock := startBlock - 1 // go back to whatever block as needed compareAvgFeeRate(t, client, startBlock, endBlock, false) } @@ -370,7 +405,7 @@ func LiveTest_AvgFeeRateTestnetMempoolSpace(t *testing.T) { // test against mempool.space API for 10000 blocks //startBlock := 210000 * 12 // 12th halving startBlock := 2577600 - endBlock := startBlock - 10000 + endBlock := startBlock - 1 // go back to whatever block as needed compareAvgFeeRate(t, client, startBlock, endBlock, true) } @@ -387,8 +422,8 @@ func LiveTest_GetRecentFeeRate(t *testing.T) { require.Greater(t, feeRate, uint64(0)) } -// LiveTestGetSenderByVin gets sender address for each vin and compares with mempool.space sender address -func LiveTestGetSenderByVin(t *testing.T) { +// LiveTest_GetSenderByVin gets sender address for each vin and compares with mempool.space sender address +func LiveTest_GetSenderByVin(t *testing.T) { // setup Bitcoin client chainID := chains.BitcoinMainnet.ChainId client, err := createRPCClient(chainID) @@ -405,7 +440,7 @@ func LiveTestGetSenderByVin(t *testing.T) { // calculates block range to test startBlock, err := client.GetBlockCount() require.NoError(t, err) - endBlock := startBlock - 5000 + endBlock := startBlock - 1 // go back to whatever block as needed // loop through mempool.space blocks backwards BLOCKLOOP: diff --git a/zetaclient/chains/evm/observer/observer.go b/zetaclient/chains/evm/observer/observer.go index 32952c28a2..c10ecbc54f 100644 --- a/zetaclient/chains/evm/observer/observer.go +++ b/zetaclient/chains/evm/observer/observer.go @@ -7,7 +7,6 @@ import ( "math" "math/big" "strings" - "time" ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -67,6 +66,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, + rpcAlertLatency int64, database *db.DB, logger base.Logger, ts *metrics.TelemetryServer, @@ -79,6 +79,7 @@ func NewObserver( tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, + rpcAlertLatency, ts, database, logger, @@ -200,50 +201,7 @@ func (ob *Observer) Start(ctx context.Context) { bg.Work(ctx, ob.WatchOutbound, bg.WithName("WatchOutbound"), bg.WithLogger(ob.Logger().Outbound)) bg.Work(ctx, ob.WatchGasPrice, bg.WithName("WatchGasPrice"), bg.WithLogger(ob.Logger().GasPrice)) bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) - bg.Work(ctx, ob.WatchRPCStatus, bg.WithName("WatchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) -} - -// WatchRPCStatus watches the RPC status of the evm chain -// TODO(revamp): move ticker to ticker file -// TODO(revamp): move inner logic to a separate function -func (ob *Observer) WatchRPCStatus(ctx context.Context) error { - ob.Logger().Chain.Info().Msgf("Starting RPC status check for chain %d", ob.Chain().ChainId) - ticker := time.NewTicker(60 * time.Second) - for { - select { - case <-ticker.C: - if !ob.GetChainParams().IsSupported { - continue - } - bn, err := ob.evmClient.BlockNumber(ctx) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") - continue - } - gasPrice, err := ob.evmClient.SuggestGasPrice(ctx) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") - continue - } - header, err := ob.evmClient.HeaderByNumber(ctx, new(big.Int).SetUint64(bn)) - if err != nil { - ob.Logger().Chain.Error().Err(err).Msg("RPC Status Check error: RPC down?") - continue - } - // #nosec G115 always in range - blockTime := time.Unix(int64(header.Time), 0).UTC() - elapsedSeconds := time.Since(blockTime).Seconds() - if elapsedSeconds > 100 { - ob.Logger().Chain.Warn(). - Msgf("RPC Status Check warning: RPC stale or chain stuck (check explorer)? Latest block %d timestamp is %.0fs ago", bn, elapsedSeconds) - continue - } - ob.Logger().Chain.Info(). - Msgf("[OK] RPC status: latest block num %d, timestamp %s ( %.0fs ago), suggested gas price %d", header.Number, blockTime.String(), elapsedSeconds, gasPrice.Uint64()) - case <-ob.StopChannel(): - return nil - } - } + bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // SetTxNReceipt sets the receipt and transaction in memory diff --git a/zetaclient/chains/evm/observer/observer_test.go b/zetaclient/chains/evm/observer/observer_test.go index 049845b79e..d30be608e5 100644 --- a/zetaclient/chains/evm/observer/observer_test.go +++ b/zetaclient/chains/evm/observer/observer_test.go @@ -138,6 +138,7 @@ func MockEVMObserver( params, zetacoreClient, tss, + 60, database, logger, nil, @@ -258,6 +259,7 @@ func Test_NewObserver(t *testing.T) { tt.chainParams, zetacoreClient, tt.tss, + 60, database, tt.logger, tt.ts, diff --git a/zetaclient/chains/evm/observer/rpc_status.go b/zetaclient/chains/evm/observer/rpc_status.go new file mode 100644 index 0000000000..c63e9e775a --- /dev/null +++ b/zetaclient/chains/evm/observer/rpc_status.go @@ -0,0 +1,41 @@ +// Package observer implements the EVM chain observer +package observer + +import ( + "context" + "time" + + "github.com/zeta-chain/node/zetaclient/chains/evm/rpc" + "github.com/zeta-chain/node/zetaclient/common" +) + +// watchRPCStatus watches the RPC status of the EVM chain +func (ob *Observer) watchRPCStatus(ctx context.Context) error { + ob.Logger().Chain.Info().Msgf("watchRPCStatus started for chain %d", ob.Chain().ChainId) + + ticker := time.NewTicker(common.RPCStatusCheckInterval) + for { + select { + case <-ticker.C: + if !ob.GetChainParams().IsSupported { + continue + } + + ob.checkRPCStatus(ctx) + case <-ob.StopChannel(): + return nil + } + } +} + +// checkRPCStatus checks the RPC status of the EVM chain +func (ob *Observer) checkRPCStatus(ctx context.Context) { + blockTime, err := rpc.CheckRPCStatus(ctx, ob.evmClient) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("CheckRPCStatus failed") + return + } + + // alert if RPC latency is too high + ob.AlertOnRPCLatency(blockTime, rpc.RPCAlertLatency) +} diff --git a/zetaclient/chains/evm/rpc/rpc.go b/zetaclient/chains/evm/rpc/rpc.go index 17f39b3a1c..2d5dea77f5 100644 --- a/zetaclient/chains/evm/rpc/rpc.go +++ b/zetaclient/chains/evm/rpc/rpc.go @@ -2,6 +2,8 @@ package rpc import ( "context" + "math/big" + "time" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" @@ -9,6 +11,12 @@ import ( "github.com/zeta-chain/node/zetaclient/chains/interfaces" ) +const ( + // RPCAlertLatency is the default threshold for RPC latency to be considered unhealthy and trigger an alert. + // 100s is a reasonable threshold for most EVM chains + RPCAlertLatency = time.Duration(100) * time.Second +) + // IsTxConfirmed checks if the transaction is confirmed with given confirmations func IsTxConfirmed( ctx context.Context, @@ -50,3 +58,30 @@ func IsTxConfirmed( return blocks >= confirmations, nil } + +// CheckRPCStatus checks the RPC status of the evm chain +func CheckRPCStatus(ctx context.Context, client interfaces.EVMRPCClient) (time.Time, error) { + // query latest block number + bn, err := client.BlockNumber(ctx) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on BlockNumber, RPC down?") + } + + // query suggested gas price + _, err = client.SuggestGasPrice(ctx) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on SuggestGasPrice, RPC down?") + } + + // query latest block header + header, err := client.HeaderByNumber(ctx, new(big.Int).SetUint64(bn)) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on HeaderByNumber, RPC down?") + } + + // convert block time to UTC + // #nosec G115 always in range + blockTime := time.Unix(int64(header.Time), 0).UTC() + + return blockTime, nil +} diff --git a/zetaclient/chains/evm/rpc/rpc_live_test.go b/zetaclient/chains/evm/rpc/rpc_live_test.go index 584b089d31..ec99fe6ebd 100644 --- a/zetaclient/chains/evm/rpc/rpc_live_test.go +++ b/zetaclient/chains/evm/rpc/rpc_live_test.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/stretchr/testify/require" "github.com/zeta-chain/node/zetaclient/chains/evm/rpc" + "github.com/zeta-chain/node/zetaclient/common" "testing" ) @@ -20,7 +21,12 @@ const ( // Test_EVMRPCLive is a phony test to run each live test individually func Test_EVMRPCLive(t *testing.T) { - // LiveTest_IsTxConfirmed(t) + if !common.LiveTestEnabled() { + return + } + + LiveTest_IsTxConfirmed(t) + LiveTest_CheckRPCStatus(t) } func LiveTest_IsTxConfirmed(t *testing.T) { @@ -43,3 +49,12 @@ func LiveTest_IsTxConfirmed(t *testing.T) { require.False(t, confirmed) }) } + +func LiveTest_CheckRPCStatus(t *testing.T) { + client, err := ethclient.Dial(URLEthMainnet) + require.NoError(t, err) + + ctx := context.Background() + _, err = rpc.CheckRPCStatus(ctx, client) + require.NoError(t, err) +} diff --git a/zetaclient/chains/evm/signer/signer_test.go b/zetaclient/chains/evm/signer/signer_test.go index a128354a7f..b2c266b438 100644 --- a/zetaclient/chains/evm/signer/signer_test.go +++ b/zetaclient/chains/evm/signer/signer_test.go @@ -90,6 +90,7 @@ func getNewEvmChainObserver(t *testing.T, tss interfaces.TSSSigner) (*observer.O params, mocks.NewZetacoreClient(t), tss, + 60, database, logger, ts, diff --git a/zetaclient/chains/interfaces/interfaces.go b/zetaclient/chains/interfaces/interfaces.go index 6767711d5a..36f3fcad5b 100644 --- a/zetaclient/chains/interfaces/interfaces.go +++ b/zetaclient/chains/interfaces/interfaces.go @@ -194,6 +194,7 @@ type SolanaRPCClient interface { GetVersion(ctx context.Context) (*solrpc.GetVersionResult, error) GetHealth(ctx context.Context) (string, error) GetSlot(ctx context.Context, commitment solrpc.CommitmentType) (uint64, error) + GetBlockTime(ctx context.Context, block uint64) (*solana.UnixTimeSeconds, error) GetAccountInfo(ctx context.Context, account solana.PublicKey) (*solrpc.GetAccountInfoResult, error) GetBalance( ctx context.Context, diff --git a/zetaclient/chains/solana/observer/inbound_test.go b/zetaclient/chains/solana/observer/inbound_test.go index 0a4738738b..577c10ee9a 100644 --- a/zetaclient/chains/solana/observer/inbound_test.go +++ b/zetaclient/chains/solana/observer/inbound_test.go @@ -40,7 +40,17 @@ func Test_FilterInboundEventAndVote(t *testing.T) { zetacoreClient := mocks.NewZetacoreClient(t) zetacoreClient.WithKeys(&keys.Keys{}).WithZetaChain().WithPostVoteInbound("", "") - ob, err := observer.NewObserver(chain, nil, *chainParams, zetacoreClient, nil, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver( + chain, + nil, + *chainParams, + zetacoreClient, + nil, + 60, + database, + base.DefaultLogger(), + nil, + ) require.NoError(t, err) t.Run("should filter inbound events and vote", func(t *testing.T) { @@ -63,7 +73,7 @@ func Test_FilterInboundEvents(t *testing.T) { chainParams := sample.ChainParams(chain.ChainId) chainParams.GatewayAddress = testutils.GatewayAddresses[chain.ChainId] - ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, 60, database, base.DefaultLogger(), nil) require.NoError(t, err) // expected result @@ -103,7 +113,7 @@ func Test_BuildInboundVoteMsgFromEvent(t *testing.T) { database, err := db.NewFromSqliteInMemory(true) require.NoError(t, err) - ob, err := observer.NewObserver(chain, nil, *params, zetacoreClient, nil, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver(chain, nil, *params, zetacoreClient, nil, 60, database, base.DefaultLogger(), nil) require.NoError(t, err) // create test compliance config @@ -170,7 +180,7 @@ func Test_ParseInboundAsDeposit(t *testing.T) { // create observer chainParams := sample.ChainParams(chain.ChainId) chainParams.GatewayAddress = testutils.GatewayAddresses[chain.ChainId] - ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, 60, database, base.DefaultLogger(), nil) require.NoError(t, err) // expected result diff --git a/zetaclient/chains/solana/observer/observer.go b/zetaclient/chains/solana/observer/observer.go index 721e93e1df..634bdfd635 100644 --- a/zetaclient/chains/solana/observer/observer.go +++ b/zetaclient/chains/solana/observer/observer.go @@ -44,6 +44,7 @@ func NewObserver( chainParams observertypes.ChainParams, zetacoreClient interfaces.ZetacoreClient, tss interfaces.TSSSigner, + rpcAlertLatency int64, db *db.DB, logger base.Logger, ts *metrics.TelemetryServer, @@ -56,6 +57,7 @@ func NewObserver( tss, base.DefaultBlockCacheSize, base.DefaultHeaderCacheSize, + rpcAlertLatency, ts, db, logger, @@ -130,6 +132,9 @@ func (ob *Observer) Start(ctx context.Context) { // watch zetacore for Solana inbound trackers bg.Work(ctx, ob.WatchInboundTracker, bg.WithName("WatchInboundTracker"), bg.WithLogger(ob.Logger().Inbound)) + + // watch RPC status of the Solana chain + bg.Work(ctx, ob.watchRPCStatus, bg.WithName("watchRPCStatus"), bg.WithLogger(ob.Logger().Chain)) } // LoadLastTxScanned loads the last scanned tx from the database. diff --git a/zetaclient/chains/solana/observer/observer_test.go b/zetaclient/chains/solana/observer/observer_test.go index 4e6ac48446..70b3d10090 100644 --- a/zetaclient/chains/solana/observer/observer_test.go +++ b/zetaclient/chains/solana/observer/observer_test.go @@ -44,6 +44,7 @@ func MockSolanaObserver( chainParams, zetacoreClient, tss, + 60, database, base.DefaultLogger(), nil, diff --git a/zetaclient/chains/solana/observer/outbound_test.go b/zetaclient/chains/solana/observer/outbound_test.go index abbcb2430b..5cb2b80a5c 100644 --- a/zetaclient/chains/solana/observer/outbound_test.go +++ b/zetaclient/chains/solana/observer/outbound_test.go @@ -50,7 +50,7 @@ func createTestObserver( // create observer chainParams := sample.ChainParams(chain.ChainId) chainParams.GatewayAddress = GatewayAddressTest - ob, err := observer.NewObserver(chain, solClient, *chainParams, nil, tss, database, base.DefaultLogger(), nil) + ob, err := observer.NewObserver(chain, solClient, *chainParams, nil, tss, 60, database, base.DefaultLogger(), nil) require.NoError(t, err) return ob diff --git a/zetaclient/chains/solana/observer/rpc_status.go b/zetaclient/chains/solana/observer/rpc_status.go new file mode 100644 index 0000000000..ff3d02f679 --- /dev/null +++ b/zetaclient/chains/solana/observer/rpc_status.go @@ -0,0 +1,45 @@ +package observer + +import ( + "context" + "time" + + "github.com/zeta-chain/node/pkg/chains" + "github.com/zeta-chain/node/zetaclient/chains/solana/rpc" + "github.com/zeta-chain/node/zetaclient/common" +) + +// watchRPCStatus watches the RPC status of the Solana chain +func (ob *Observer) watchRPCStatus(ctx context.Context) error { + ob.Logger().Chain.Info().Msgf("watchRPCStatus started for chain %d", ob.Chain().ChainId) + + ticker := time.NewTicker(common.RPCStatusCheckInterval) + for { + select { + case <-ticker.C: + if !ob.GetChainParams().IsSupported { + continue + } + + ob.checkRPCStatus(ctx) + case <-ob.StopChannel(): + return nil + } + } +} + +// checkRPCStatus checks the RPC status of the Solana chain +func (ob *Observer) checkRPCStatus(ctx context.Context) { + // Solana privnet doesn't have RPC 'GetHealth', need to differentiate + privnet := ob.Chain().NetworkType == chains.NetworkType_privnet + + // check the RPC status + blockTime, err := rpc.CheckRPCStatus(ctx, ob.solClient, privnet) + if err != nil { + ob.Logger().Chain.Error().Err(err).Msg("CheckRPCStatus failed") + return + } + + // alert if RPC latency is too high + ob.AlertOnRPCLatency(blockTime, rpc.RPCAlertLatency) +} diff --git a/zetaclient/chains/solana/rpc/rpc.go b/zetaclient/chains/solana/rpc/rpc.go index a689ebe7c8..c1fc0e1751 100644 --- a/zetaclient/chains/solana/rpc/rpc.go +++ b/zetaclient/chains/solana/rpc/rpc.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "time" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" @@ -13,6 +14,10 @@ import ( const ( // defaultPageLimit is the default number of signatures to fetch in one GetSignaturesForAddressWithOpts call DefaultPageLimit = 1000 + + // RPCAlertLatency is the default threshold for RPC latency to be considered unhealthy and trigger an alert. + // The 'HEALTH_CHECK_SLOT_DISTANCE' is default to 150 slots, which is 150 * 0.4s = 60s + RPCAlertLatency = time.Duration(60) * time.Second ) // GetFirstSignatureForAddress searches the first signature for the given address. @@ -116,3 +121,28 @@ func GetSignaturesForAddressUntil( return allSignatures, nil } + +// CheckRPCStatus checks the RPC status of the solana chain +func CheckRPCStatus(ctx context.Context, client interfaces.SolanaRPCClient, privnet bool) (time.Time, error) { + // query solana health (always return "ok" unless --trusted-validator is provided) + if !privnet { + _, err := client.GetHealth(ctx) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on GetHealth, RPC down?") + } + } + + // query latest slot + slot, err := client.GetSlot(ctx, rpc.CommitmentFinalized) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on GetSlot, RPC down?") + } + + // query latest block time + blockTime, err := client.GetBlockTime(ctx, slot) + if err != nil { + return time.Time{}, errors.Wrap(err, "RPC failed on GetBlockTime, RPC down?") + } + + return blockTime.Time(), nil +} diff --git a/zetaclient/chains/solana/rpc/rpc_live_test.go b/zetaclient/chains/solana/rpc/rpc_live_test.go index c04e13d171..7cbe98eeba 100644 --- a/zetaclient/chains/solana/rpc/rpc_live_test.go +++ b/zetaclient/chains/solana/rpc/rpc_live_test.go @@ -2,31 +2,35 @@ package rpc_test import ( "context" - "os" "testing" "github.com/gagliardetto/solana-go" solanarpc "github.com/gagliardetto/solana-go/rpc" "github.com/stretchr/testify/require" "github.com/zeta-chain/node/zetaclient/chains/solana/rpc" + "github.com/zeta-chain/node/zetaclient/common" ) // Test_SolanaRPCLive is a phony test to run all live tests func Test_SolanaRPCLive(t *testing.T) { - // LiveTest_GetFirstSignatureForAddress(t) - // LiveTest_GetSignaturesForAddressUntil(t) + if !common.LiveTestEnabled() { + return + } + + LiveTest_GetFirstSignatureForAddress(t) + LiveTest_GetSignaturesForAddressUntil(t) + LiveTest_CheckRPCStatus(t) } func LiveTest_GetFirstSignatureForAddress(t *testing.T) { // create a Solana devnet RPC client - urlDevnet := os.Getenv("TEST_SOL_URL_DEVNET") - client := solanarpc.New(urlDevnet) + client := solanarpc.New(solanarpc.DevNet_RPC) // program address address := solana.MustPublicKeyFromBase58("2kJndCL9NBR36ySiQ4bmArs4YgWQu67LmCDfLzk5Gb7s") // get the first signature for the address (one by one) - sig, err := rpc.GetFirstSignatureForAddress(context.TODO(), client, address, 1) + sig, err := rpc.GetFirstSignatureForAddress(context.Background(), client, address, 1) require.NoError(t, err) // assert @@ -36,8 +40,7 @@ func LiveTest_GetFirstSignatureForAddress(t *testing.T) { func LiveTest_GetSignaturesForAddressUntil(t *testing.T) { // create a Solana devnet RPC client - urlDevnet := os.Getenv("TEST_SOL_URL_DEVNET") - client := solanarpc.New(urlDevnet) + client := solanarpc.New(solanarpc.DevNet_RPC) // program address address := solana.MustPublicKeyFromBase58("2kJndCL9NBR36ySiQ4bmArs4YgWQu67LmCDfLzk5Gb7s") @@ -45,8 +48,8 @@ func LiveTest_GetSignaturesForAddressUntil(t *testing.T) { "2tUQtcrXxtNFtV9kZ4kQsmY7snnEoEEArmu9pUptr4UCy8UdbtjPD6UtfEtPJ2qk5CTzZTmLwsbmZdLymcwSUcHu", ) - // get all signatures for the address until the first signature (one by one) - sigs, err := rpc.GetSignaturesForAddressUntil(context.TODO(), client, address, untilSig, 1) + // get all signatures for the address until the first signature + sigs, err := rpc.GetSignaturesForAddressUntil(context.Background(), client, address, untilSig, 100) require.NoError(t, err) // assert @@ -57,3 +60,13 @@ func LiveTest_GetSignaturesForAddressUntil(t *testing.T) { require.NotEqual(t, untilSig, sig.Signature) } } + +func LiveTest_CheckRPCStatus(t *testing.T) { + // create a Solana devnet RPC client + client := solanarpc.New(solanarpc.DevNet_RPC) + + // check the RPC status + ctx := context.Background() + _, err := rpc.CheckRPCStatus(ctx, client, false) + require.NoError(t, err) +} diff --git a/zetaclient/common/constant.go b/zetaclient/common/constant.go index 64d906ca0d..c54acade92 100644 --- a/zetaclient/common/constant.go +++ b/zetaclient/common/constant.go @@ -1,5 +1,7 @@ package common +import "time" + const ( // DefaultGasPriceMultiplier is the default gas price multiplier for all chains DefaultGasPriceMultiplier = 1.0 @@ -9,4 +11,7 @@ const ( // BTCOutboundGasPriceMultiplier is the default gas price multiplier for BTC outbond txs BTCOutboundGasPriceMultiplier = 2.0 + + // RPCStatusCheckInterval is the interval to check RPC status, 1 minute + RPCStatusCheckInterval = time.Minute ) diff --git a/zetaclient/common/env.go b/zetaclient/common/env.go new file mode 100644 index 0000000000..f3e97110c6 --- /dev/null +++ b/zetaclient/common/env.go @@ -0,0 +1,30 @@ +package common + +import ( + "os" + "strconv" +) + +const ( + // EnvEnableLiveTest is the environment variable to enable zetaclient live tests + EnvEnableLiveTest = "ENABLE_LIVE_TEST" + + // EnvBtcRPCMainnet is the environment variable to enable mainnet for bitcoin rpc + EnvBtcRPCMainnet = "BTC_RPC_MAINNET" + + // EnvBtcRPCTestnet is the environment variable to enable testnet for bitcoin rpc + EnvBtcRPCTestnet = "BTC_RPC_TESTNET" +) + +// LiveTestEnabled returns true if live tests are enabled +func LiveTestEnabled() bool { + value := os.Getenv(EnvEnableLiveTest) + + // parse flag + enabled, err := strconv.ParseBool(value) + if err != nil { + return false + } + + return enabled +} diff --git a/zetaclient/config/config_chain.go b/zetaclient/config/config_chain.go index 194346168a..54d01baaf7 100644 --- a/zetaclient/config/config_chain.go +++ b/zetaclient/config/config_chain.go @@ -31,17 +31,19 @@ func New(setDefaults bool) Config { // bitcoinConfigRegnet contains Bitcoin config for regnet func bitcoinConfigRegnet() BTCConfig { return BTCConfig{ - RPCUsername: "smoketest", // smoketest is the previous name for E2E test, we keep this name for compatibility between client versions in upgrade test - RPCPassword: "123", - RPCHost: "bitcoin:18443", - RPCParams: "regtest", + RPCUsername: "smoketest", // smoketest is the previous name for E2E test, we keep this name for compatibility between client versions in upgrade test + RPCPassword: "123", + RPCHost: "bitcoin:18443", + RPCParams: "regtest", + RPCAlertLatency: 60, } } // solanaConfigLocalnet contains config for Solana localnet func solanaConfigLocalnet() SolanaConfig { return SolanaConfig{ - Endpoint: "http://solana:8899", + Endpoint: "http://solana:8899", + RPCAlertLatency: 60, } } @@ -72,8 +74,9 @@ func evmChainsConfigs() map[int64]EVMConfig { Endpoint: "", }, chains.GoerliLocalnet.ChainId: { - Chain: chains.GoerliLocalnet, - Endpoint: "http://eth:8545", + Chain: chains.GoerliLocalnet, + Endpoint: "http://eth:8545", + RPCAlertLatency: 60, }, } } diff --git a/zetaclient/config/types.go b/zetaclient/config/types.go index 4cafe0a9f7..9ef1a5d5a5 100644 --- a/zetaclient/config/types.go +++ b/zetaclient/config/types.go @@ -37,22 +37,25 @@ type ClientConfiguration struct { // EVMConfig is the config for EVM chain type EVMConfig struct { - Chain chains.Chain - Endpoint string + Chain chains.Chain + Endpoint string + RPCAlertLatency int64 } // BTCConfig is the config for Bitcoin chain type BTCConfig struct { // the following are rpcclient ConnConfig fields - RPCUsername string - RPCPassword string - RPCHost string - RPCParams string // "regtest", "mainnet", "testnet3" + RPCUsername string + RPCPassword string + RPCHost string + RPCParams string // "regtest", "mainnet", "testnet3" + RPCAlertLatency int64 } // SolanaConfig is the config for Solana chain type SolanaConfig struct { - Endpoint string + Endpoint string + RPCAlertLatency int64 } // ComplianceConfig is the config for compliance diff --git a/zetaclient/orchestrator/bootstrap.go b/zetaclient/orchestrator/bootstrap.go index 7dd4344b29..f7be6ad504 100644 --- a/zetaclient/orchestrator/bootstrap.go +++ b/zetaclient/orchestrator/bootstrap.go @@ -310,6 +310,7 @@ func syncObserverMap( *params, client, tss, + cfg.RPCAlertLatency, database, logger, ts, @@ -345,6 +346,7 @@ func syncObserverMap( *params, client, tss, + cfg.RPCAlertLatency, database, logger, ts, @@ -381,6 +383,7 @@ func syncObserverMap( *params, client, tss, + cfg.RPCAlertLatency, database, logger, ts, diff --git a/zetaclient/testutils/mocks/solana_rpc.go b/zetaclient/testutils/mocks/solana_rpc.go index 26b923abb3..b6333ff744 100644 --- a/zetaclient/testutils/mocks/solana_rpc.go +++ b/zetaclient/testutils/mocks/solana_rpc.go @@ -77,6 +77,36 @@ func (_m *SolanaRPCClient) GetBalance(ctx context.Context, account solana.Public return r0, r1 } +// GetBlockTime provides a mock function with given fields: ctx, block +func (_m *SolanaRPCClient) GetBlockTime(ctx context.Context, block uint64) (*solana.UnixTimeSeconds, error) { + ret := _m.Called(ctx, block) + + if len(ret) == 0 { + panic("no return value specified for GetBlockTime") + } + + var r0 *solana.UnixTimeSeconds + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) (*solana.UnixTimeSeconds, error)); ok { + return rf(ctx, block) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64) *solana.UnixTimeSeconds); ok { + r0 = rf(ctx, block) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*solana.UnixTimeSeconds) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = rf(ctx, block) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetConfirmedTransactionWithOpts provides a mock function with given fields: ctx, signature, opts func (_m *SolanaRPCClient) GetConfirmedTransactionWithOpts(ctx context.Context, signature solana.Signature, opts *rpc.GetTransactionOpts) (*rpc.TransactionWithMeta, error) { ret := _m.Called(ctx, signature, opts)