From 19c089f73bbe9e5a16280a9856cb30c8e4aae623 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Thu, 12 Dec 2024 13:51:34 +0100 Subject: [PATCH 1/7] Streamline start.go --- cmd/zetaclientd/start.go | 219 +++++++-------------- cmd/zetaclientd/utils.go | 114 +++++++---- pkg/os/console.go | 4 +- zetaclient/chains/base/logger.go | 53 +++-- zetaclient/chains/interfaces/interfaces.go | 2 - zetaclient/config/config.go | 14 ++ zetaclient/zetacore/client.go | 4 - 7 files changed, 196 insertions(+), 214 deletions(-) diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 9906d735fc..38182d1c13 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -6,8 +6,6 @@ import ( _ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional "os" "os/signal" - "path/filepath" - "strings" "syscall" "github.com/pkg/errors" @@ -15,7 +13,6 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" - "github.com/zeta-chain/node/pkg/authz" "github.com/zeta-chain/node/pkg/chains" "github.com/zeta-chain/node/pkg/constant" zetaos "github.com/zeta-chain/node/pkg/os" @@ -27,7 +24,6 @@ import ( "github.com/zeta-chain/node/zetaclient/metrics" "github.com/zeta-chain/node/zetaclient/orchestrator" zetatss "github.com/zeta-chain/node/zetaclient/tss" - "github.com/zeta-chain/node/zetaclient/zetacore" ) const ( @@ -36,119 +32,80 @@ const ( envPprofAddr = "PPROF_ADDR" ) -// Start starts zetaclientd process todo revamp -// https://github.com/zeta-chain/node/issues/3112 +// Start starts zetaclientd process func Start(_ *cobra.Command, _ []string) error { - // Prompt for Hotkey, TSS key-share and relayer key passwords - titles := []string{"HotKey", "TSS", "Solana Relayer Key"} - passwords, err := zetaos.PromptPasswords(titles) + // Load Config file given path + cfg, err := config.Load(globalOpts.ZetacoreHome) if err != nil { - return errors.Wrap(err, "unable to get passwords") - } - hotkeyPass, tssKeyPass, solanaKeyPass := passwords[0], passwords[1], passwords[2] - relayerKeyPasswords := map[string]string{ - chains.Network_solana.String(): solanaKeyPass, + return errors.Wrap(err, "unable to load config") } - // Load Config file given path - cfg, err := config.Load(globalOpts.ZetacoreHome) + dbPath, err := config.ResolveDBPath() if err != nil { - return err + return errors.Wrap(err, "unable to resolve db path") } - logger, err := base.InitLogger(cfg) + // Configure logger (also overrides the default log level) + logger, err := base.NewLogger(cfg) if err != nil { - return errors.Wrap(err, "initLogger failed") + return errors.Wrap(err, "unable to create logger") } - masterLogger := logger.Std - startLogger := logger.Std.With().Str("module", "startup").Logger() + passes, err := promptPasswords() + if err != nil { + return errors.Wrap(err, "unable to prompt for passwords") + } - appContext := zctx.New(cfg, relayerKeyPasswords, masterLogger) + appContext := zctx.New(cfg, passes.relayerKeys(), logger.Std) ctx := zctx.WithAppContext(context.Background(), appContext) - // Wait until zetacore is up - waitForZetaCore(cfg, startLogger) - startLogger.Info().Msgf("Zetacore is ready, trying to connect to %s", cfg.Peer) - + // TODO graceful telemetryServer := metrics.NewTelemetryServer() go func() { err := telemetryServer.Start() if err != nil { - startLogger.Error().Err(err).Msg("telemetryServer error") - panic("telemetryServer error") + log.Fatal().Err(err).Msg("telemetryServer error") } }() - go runPprof(startLogger) - - // CreateZetacoreClient: zetacore client is used for all communication to zetacore , which this client connects to. - // Zetacore accumulates votes , and provides a centralized source of truth for all clients - zetacoreClient, err := createZetacoreClient(cfg, hotkeyPass, masterLogger) + m, err := metrics.NewMetrics() if err != nil { - return errors.Wrap(err, "unable to create zetacore client") + return errors.Wrap(err, "unable to create metrics") } + m.Start() - // Wait until zetacore is ready to create blocks - if err = waitForZetacoreToCreateBlocks(ctx, zetacoreClient, startLogger); err != nil { - startLogger.Error().Err(err).Msg("WaitForZetacoreToCreateBlocks error") - return err - } - startLogger.Info().Msgf("Zetacore client is ready") + metrics.Info.WithLabelValues(constant.Version).Set(1) + metrics.LastStartTime.SetToCurrentTime() - // Set grantee account number and sequence number - err = zetacoreClient.SetAccountNumber(authz.ZetaClientGranteeKey) - if err != nil { - startLogger.Error().Err(err).Msg("SetAccountNumber error") - return err - } + telemetryServer.SetIPAddress(cfg.PublicIP) - // cross-check chainid - res, err := zetacoreClient.GetNodeInfo(ctx) + // TODO graceful + go runPprof(logger.Std) + + // zetacore client is used for all communication to zeta node. + // it accumulates votes, and provides a source of truth for all clients + zetacoreClient, err := createZetacoreClient(cfg, passes.hotkey, logger.Std) if err != nil { - startLogger.Error().Err(err).Msg("GetNodeInfo error") - return err + return errors.Wrap(err, "unable to create zetacore client") } - if strings.Compare(res.GetDefaultNodeInfo().Network, cfg.ChainID) != 0 { - startLogger.Warn(). - Msgf("chain id mismatch, zetacore chain id %s, zetaclient configured chain id %s; reset zetaclient chain id", res.GetDefaultNodeInfo().Network, cfg.ChainID) - cfg.ChainID = res.GetDefaultNodeInfo().Network - err := zetacoreClient.UpdateChainID(cfg.ChainID) - if err != nil { - return err - } + // Wait until zetacore is ready to produce blocks + if err = waitForBlocks(ctx, zetacoreClient, logger.Std); err != nil { + return errors.Wrap(err, "zetacore unavailable") } - // CreateAuthzSigner : which is used to sign all authz messages . All votes broadcast to zetacore are wrapped in authz exec . - // This is to ensure that the user does not need to keep their operator key online , and can use a cold key to sign votes - signerAddress, err := zetacoreClient.GetKeys().GetAddress() - if err != nil { - return errors.Wrap(err, "error getting signer address") + if err = prepareZetacoreClient(ctx, zetacoreClient, &cfg, logger.Std); err != nil { + return errors.Wrap(err, "unable to prepare zetacore client") } - createAuthzSigner(zetacoreClient.GetKeys().GetOperatorAddress().String(), signerAddress) - startLogger.Debug().Msgf("createAuthzSigner is ready") - // Initialize core parameters from zetacore - if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, startLogger); err != nil { + if err = orchestrator.UpdateAppContext(ctx, appContext, zetacoreClient, logger.Std); err != nil { return errors.Wrap(err, "unable to update app context") } - startLogger.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked()) + log.Info().Msgf("Config is updated from zetacore\n %s", cfg.StringMasked()) - m, err := metrics.NewMetrics() - if err != nil { - return errors.Wrap(err, "unable to create metrics") - } - m.Start() - - metrics.Info.WithLabelValues(constant.Version).Set(1) - metrics.LastStartTime.SetToCurrentTime() - - telemetryServer.SetIPAddress(cfg.PublicIP) - - granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, hotkeyPass) + granteePubKeyBech32, err := resolveObserverPubKeyBech32(cfg, passes.hotkey) if err != nil { return errors.Wrap(err, "unable to resolve observer pub key bech32") } @@ -157,44 +114,30 @@ func Start(_ *cobra.Command, _ []string) error { Config: cfg, Zetacore: zetacoreClient, GranteePubKeyBech32: granteePubKeyBech32, - HotKeyPassword: hotkeyPass, - TSSKeyPassword: tssKeyPass, + HotKeyPassword: passes.hotkey, + TSSKeyPassword: passes.tss, BitcoinChainIDs: btcChainIDsFromContext(appContext), PostBlame: isEnvFlagEnabled(envFlagPostBlame), Telemetry: telemetryServer, } - tss, err := zetatss.Setup(ctx, tssSetupProps, startLogger) + tss, err := zetatss.Setup(ctx, tssSetupProps, logger.Std) if err != nil { return errors.Wrap(err, "unable to setup TSS service") } // Creating a channel to listen for os signals (or other signals) + // TODO graceful signalChannel := make(chan os.Signal, 1) signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) // Starts various background TSS listeners. // Shuts down zetaclientd if any is triggered. - maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() { - masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.") + maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() { + logger.Std.Info().Msg("TSS listener received an action to shutdown zetaclientd.") signalChannel <- syscall.SIGTERM }) - if len(appContext.ListChainIDs()) == 0 { - startLogger.Error().Interface("config", cfg).Msgf("No chains in updated config") - } - - isObserver, err := isObserverNode(ctx, zetacoreClient) - switch { - case err != nil: - startLogger.Error().Msgf("Unable to determine if node is an observer") - return err - case !isObserver: - addr := zetacoreClient.GetKeys().GetOperatorAddress().String() - startLogger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0") - return nil - } - // CreateSignerMap: This creates a map of all signers for each chain. // Each signer is responsible for signing transactions for a particular chain signerMap, err := orchestrator.CreateSignerMap(ctx, tss, logger) @@ -203,16 +146,9 @@ func Start(_ *cobra.Command, _ []string) error { return err } - userDir, err := os.UserHomeDir() - if err != nil { - log.Error().Err(err).Msg("os.UserHomeDir") - return err - } - dbpath := filepath.Join(userDir, ".zetaclient/chainobserver") - // Creates a map of all chain observers for each chain. // Each chain observer is responsible for observing events on the chain and processing them. - observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer) + observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetryServer) if err != nil { return errors.Wrap(err, "unable to create chain observer map") } @@ -226,7 +162,7 @@ func Start(_ *cobra.Command, _ []string) error { signerMap, observerMap, tss, - dbpath, + dbPath, logger, telemetryServer, ) @@ -239,48 +175,17 @@ func Start(_ *cobra.Command, _ []string) error { return errors.Wrap(err, "unable to start orchestrator") } - // start zeta supply checker - // TODO: enable - // https://github.com/zeta-chain/node/issues/1354 - // NOTE: this is disabled for now because we need to determine the frequency on how to handle invalid check - // The method uses GRPC query to the node we might need to improve for performance - //zetaSupplyChecker, err := mc.NewZetaSupplyChecker(cfg, zetacoreClient, masterLogger) - //if err != nil { - // startLogger.Err(err).Msg("NewZetaSupplyChecker") - //} - //if err == nil { - // zetaSupplyChecker.Start() - // defer zetaSupplyChecker.Stop() - //} - - startLogger.Info().Msg("zetaclientd is running") + log.Info().Msg("zetaclientd is running") + // todo graceful sig := <-signalChannel - startLogger.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig) + log.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig) maestro.Stop() return nil } -// isObserverNode checks whether THIS node is an observer node. -func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error) { - observers, err := client.GetObserverList(ctx) - if err != nil { - return false, errors.Wrap(err, "unable to get observers list") - } - - operatorAddress := client.GetKeys().GetOperatorAddress().String() - - for _, observer := range observers { - if observer == operatorAddress { - return true, nil - } - } - - return false, nil -} - func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) { // Get observer's public key ("grantee pub key") _, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword) @@ -307,3 +212,31 @@ func runPprof(logger zerolog.Logger) { logger.Error().Err(err).Msg("pprof http server error") } } + +type passwords struct { + hotkey string + tss string + solanaRelayerKey string +} + +// promptPasswords prompts for Hotkey, TSS key-share and relayer key passwords +func promptPasswords() (passwords, error) { + titles := []string{"HotKey", "TSS", "Solana Relayer Key"} + + res, err := zetaos.PromptPasswords(titles) + if err != nil { + return passwords{}, errors.Wrap(err, "unable to get passwords") + } + + return passwords{ + hotkey: res[0], + tss: res[1], + solanaRelayerKey: res[2], + }, nil +} + +func (p passwords) relayerKeys() map[string]string { + return map[string]string{ + chains.Network_solana.String(): p.solanaRelayerKey, + } +} diff --git a/cmd/zetaclientd/utils.go b/cmd/zetaclientd/utils.go index f7ef2f91bc..1da8393272 100644 --- a/cmd/zetaclientd/utils.go +++ b/cmd/zetaclientd/utils.go @@ -10,21 +10,16 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" "github.com/pkg/errors" "github.com/rs/zerolog" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + authz2 "github.com/zeta-chain/node/pkg/authz" + "github.com/zeta-chain/node/pkg/ticker" "github.com/zeta-chain/node/zetaclient/authz" - "github.com/zeta-chain/node/zetaclient/chains/interfaces" "github.com/zeta-chain/node/zetaclient/config" zctx "github.com/zeta-chain/node/zetaclient/context" "github.com/zeta-chain/node/zetaclient/keys" "github.com/zeta-chain/node/zetaclient/zetacore" ) -func createAuthzSigner(granter string, grantee sdk.AccAddress) { - authz.SetupAuthZSignerList(granter, grantee) -} - func createZetacoreClient(cfg config.Config, hotkeyPassword string, logger zerolog.Logger) (*zetacore.Client, error) { hotKey := cfg.AuthzHotkey @@ -42,6 +37,15 @@ func createZetacoreClient(cfg config.Config, hotkeyPassword string, logger zerol k := keys.NewKeysWithKeybase(kb, granterAddress, cfg.AuthzHotkey, hotkeyPassword) + // All votes broadcasts to zetacore are wrapped in authz. + // This is to ensure that the user does not need to keep their operator key online, and can use a cold key to sign votes + signerAddress, err := k.GetAddress() + if err != nil { + return nil, errors.Wrap(err, "failed to get signer address") + } + + authz.SetupAuthZSignerList(k.GetOperatorAddress().String(), signerAddress) + client, err := zetacore.NewClient(k, chainIP, hotKey, cfg.ChainID, logger) if err != nil { return nil, errors.Wrap(err, "failed to create zetacore client") @@ -50,31 +54,8 @@ func createZetacoreClient(cfg config.Config, hotkeyPassword string, logger zerol return client, nil } -func waitForZetaCore(config config.Config, logger zerolog.Logger) { - const ( - port = 9090 - retry = 5 * time.Second - ) - - var ( - url = fmt.Sprintf("%s:%d", config.ZetaCoreURL, port) - opt = grpc.WithTransportCredentials(insecure.NewCredentials()) - ) - - // wait until zetacore is up - logger.Debug().Msgf("Waiting for zetacore to open %d port...", port) - - for { - if _, err := grpc.Dial(url, opt); err != nil { - logger.Warn().Err(err).Msg("grpc dial fail") - time.Sleep(retry) - } else { - break - } - } -} - -func waitForZetacoreToCreateBlocks(ctx context.Context, zc interfaces.ZetacoreClient, logger zerolog.Logger) error { +// waitForBlocks waits for zetacore to be ready (i.e. producing blocks) +func waitForBlocks(ctx context.Context, zc *zetacore.Client, logger zerolog.Logger) error { const ( interval = 5 * time.Second attempts = 15 @@ -85,10 +66,12 @@ func waitForZetacoreToCreateBlocks(ctx context.Context, zc interfaces.ZetacoreCl start = time.Now() ) - for { + task := func(ctx context.Context, t *ticker.Ticker) error { blockHeight, err := zc.GetBlockHeight(ctx) + if err == nil && blockHeight > 1 { - logger.Info().Msgf("Zeta block height: %d", blockHeight) + logger.Info().Msgf("Zetacore is ready, block height: %d", blockHeight) + t.Stop() return nil } @@ -97,9 +80,68 @@ func waitForZetacoreToCreateBlocks(ctx context.Context, zc interfaces.ZetacoreCl return fmt.Errorf("zetacore is not ready, timeout %s", time.Since(start).String()) } - logger.Info().Msgf("Failed to get block number, retry : %d/%d", retryCount, attempts) - time.Sleep(interval) + logger.Info().Msgf("Failed to get block number, retry: %d/%d", retryCount, attempts) + return nil } + + return ticker.Run(ctx, interval, task) +} + +// prepareZetacoreClient prepares the zetacore client for use. +// EXITS THE PROGRAM IF THIS NODE IS NOT AN OBSERVER. +func prepareZetacoreClient(ctx context.Context, zc *zetacore.Client, cfg *config.Config, logger zerolog.Logger) error { + // Set grantee account number and sequence number + if err := zc.SetAccountNumber(authz2.ZetaClientGranteeKey); err != nil { + return errors.Wrap(err, "failed to set account number") + } + + res, err := zc.GetNodeInfo(ctx) + if err != nil { + return errors.Wrap(err, "failed to get node info") + } + + network := res.GetDefaultNodeInfo().Network + if network != cfg.ChainID { + logger.Warn(). + Str("got", cfg.ChainID). + Str("want", network). + Msg("Zetacore chain id config mismatch. Forcing update from the network") + + cfg.ChainID = network + if err = zc.UpdateChainID(cfg.ChainID); err != nil { + return errors.Wrap(err, "failed to update chain id") + } + } + + isObserver, err := isObserverNode(ctx, zc) + switch { + case err != nil: + return errors.Wrap(err, "failed to check if this node is an observer") + case !isObserver: + addr := zc.GetKeys().GetOperatorAddress().String() + logger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0") + os.Exit(0) + } + + return nil +} + +// isObserverNode checks whether THIS node is an observer node. +func isObserverNode(ctx context.Context, zc *zetacore.Client) (bool, error) { + observers, err := zc.GetObserverList(ctx) + if err != nil { + return false, errors.Wrap(err, "unable to get observers list") + } + + operatorAddress := zc.GetKeys().GetOperatorAddress().String() + + for _, observer := range observers { + if observer == operatorAddress { + return true, nil + } + } + + return false, nil } func isEnvFlagEnabled(flag string) bool { diff --git a/pkg/os/console.go b/pkg/os/console.go index c4a7c505c7..6782bb6b43 100644 --- a/pkg/os/console.go +++ b/pkg/os/console.go @@ -33,11 +33,11 @@ func PromptPasswords(passwordTitles []string) ([]string, error) { // readPassword is a helper function that reads a password from bufio.Reader func readPassword(reader *bufio.Reader, passwordTitle string) (string, error) { - const delimitor = '\n' + const delimiter = '\n' // prompt for password fmt.Printf("%s Password: ", passwordTitle) - password, err := reader.ReadString(delimitor) + password, err := reader.ReadString(delimiter) if err != nil { return "", err } diff --git a/zetaclient/chains/base/logger.go b/zetaclient/chains/base/logger.go index c70c1fd738..d5ff2948af 100644 --- a/zetaclient/chains/base/logger.go +++ b/zetaclient/chains/base/logger.go @@ -1,6 +1,7 @@ package base import ( + "io" "os" "path/filepath" "time" @@ -11,9 +12,7 @@ import ( "github.com/zeta-chain/node/zetaclient/config" ) -const ( - ComplianceLogFile = "compliance.log" -) +const complianceLogFile = "compliance.log" // Logger contains the base loggers type Logger struct { @@ -21,7 +20,7 @@ type Logger struct { Compliance zerolog.Logger } -// DefaultLoggers creates default base loggers for tests +// DefaultLogger creates default base loggers for tests func DefaultLogger() Logger { return Logger{ Std: log.Logger, @@ -50,39 +49,38 @@ type ObserverLogger struct { Compliance zerolog.Logger } -// InitLogger initializes the base loggers -func InitLogger(cfg config.Config) (Logger, error) { +// NewLogger initializes the base loggers +func NewLogger(cfg config.Config) (Logger, error) { // open compliance log file - file, err := openComplianceLogFile(cfg) + complianceFile, err := openComplianceLogFile(cfg) if err != nil { - return DefaultLogger(), err + return Logger{}, err } - level := zerolog.Level(cfg.LogLevel) + augmentLogger := func(logger zerolog.Logger) zerolog.Logger { + level := zerolog.Level(cfg.LogLevel) + + return logger.Level(level).With().Timestamp().Logger() + } // create loggers based on configured level and format - var std zerolog.Logger - var compliance zerolog.Logger - switch cfg.LogFormat { - case "json": - std = zerolog.New(os.Stdout).Level(level).With().Timestamp().Logger() - compliance = zerolog.New(file).Level(level).With().Timestamp().Logger() - case "text": - std = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339}). - Level(zerolog.Level(cfg.LogLevel)). - With(). - Timestamp(). - Logger() - compliance = zerolog.New(file).Level(level).With().Timestamp().Logger() - default: - std = zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339}) - compliance = zerolog.New(file).With().Timestamp().Logger() + var stdWriter io.Writer = os.Stdout + if cfg.LogFormat != "json" { + stdWriter = zerolog.ConsoleWriter{ + Out: os.Stdout, + TimeFormat: time.RFC3339, + } } + std := augmentLogger(zerolog.New(stdWriter)) + compliance := augmentLogger(zerolog.New(complianceFile)) + if cfg.LogSampler { std = std.Sample(&zerolog.BasicSampler{N: 5}) } - log.Logger = std // set global logger + + // set global logger + log.Logger = std return Logger{ Std: std, @@ -99,11 +97,12 @@ func openComplianceLogFile(cfg config.Config) (*os.File, error) { } // clean file name - name := filepath.Join(logPath, ComplianceLogFile) + name := filepath.Join(logPath, complianceLogFile) name, err := filepath.Abs(name) if err != nil { return nil, err } + name = filepath.Clean(name) // open (or create) compliance log file diff --git a/zetaclient/chains/interfaces/interfaces.go b/zetaclient/chains/interfaces/interfaces.go index cd195912bb..4f00b7a2bb 100644 --- a/zetaclient/chains/interfaces/interfaces.go +++ b/zetaclient/chains/interfaces/interfaces.go @@ -19,7 +19,6 @@ import ( "github.com/gagliardetto/solana-go" solrpc "github.com/gagliardetto/solana-go/rpc" "github.com/onrik/ethrpc" - "github.com/rs/zerolog" "gitlab.com/thorchain/tss/go-tss/blame" "github.com/zeta-chain/node/pkg/chains" @@ -102,7 +101,6 @@ type ZetacoreClient interface { ZetacoreVoter Chain() chains.Chain - GetLogger() *zerolog.Logger GetKeys() keyinterfaces.ObserverKeys GetSupportedChains(ctx context.Context) ([]chains.Chain, error) diff --git a/zetaclient/config/config.go b/zetaclient/config/config.go index 82cb3f97f8..8bd3e9eff9 100644 --- a/zetaclient/config/config.go +++ b/zetaclient/config/config.go @@ -7,6 +7,8 @@ import ( "os" "path/filepath" "strings" + + "github.com/pkg/errors" ) // restrictedAddressBook is a map of restricted addresses @@ -114,3 +116,15 @@ func ContainRestrictedAddress(addrs ...string) bool { } return false } + +// ResolveDBPath resolves the path to chain observer database +func ResolveDBPath() (string, error) { + const dbpath = ".zetaclient/chainobserver" + + userDir, err := os.UserHomeDir() + if err != nil { + return "", errors.Wrap(err, "unable to resolve user home directory") + } + + return filepath.Join(userDir, dbpath), nil +} diff --git a/zetaclient/zetacore/client.go b/zetaclient/zetacore/client.go index de54435c7e..df5b6dbeb6 100644 --- a/zetaclient/zetacore/client.go +++ b/zetaclient/zetacore/client.go @@ -254,10 +254,6 @@ func (c *Client) Chain() chains.Chain { return c.chain } -func (c *Client) GetLogger() *zerolog.Logger { - return &c.logger -} - func (c *Client) GetKeys() keyinterfaces.ObserverKeys { return c.keys } From 6df29a351de631ac56132155bbfe9b645e39f700 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Thu, 12 Dec 2024 20:20:05 +0100 Subject: [PATCH 2/7] Implement graceful package --- pkg/graceful/graceful.go | 147 ++++++++++++++++++ pkg/graceful/graceful_test.go | 273 ++++++++++++++++++++++++++++++++++ 2 files changed, 420 insertions(+) create mode 100644 pkg/graceful/graceful.go create mode 100644 pkg/graceful/graceful_test.go diff --git a/pkg/graceful/graceful.go b/pkg/graceful/graceful.go new file mode 100644 index 0000000000..dc61118efc --- /dev/null +++ b/pkg/graceful/graceful.go @@ -0,0 +1,147 @@ +// Package graceful contains tools for graceful shutdown. +// GS refers to the process of shutting down a system in a controlled manner, allowing it to complete ongoing tasks, +// release resources, and perform necessary cleanup operations before terminating. +// This ensures that the system stops functioning without causing data loss, corruption, or other issues. +package graceful + +import ( + "context" + "os" + "os/signal" + "sync" + "time" + + "github.com/rs/zerolog" +) + +// Process represents "virtual" process that contains +// routines that can be started and stopped +// Note that ANY failure in starting a service will cause the process to shutdown +type Process struct { + stop <-chan os.Signal + stopStack []func() + + timeout time.Duration + mu sync.Mutex + stopped bool + + logger zerolog.Logger +} + +// Service represents abstract service. +type Service interface { + Start(ctx context.Context) error + Stop() +} + +// New Process constructor. +func New(timeout time.Duration, logger zerolog.Logger, stop <-chan os.Signal) *Process { + return &Process{ + stop: stop, + timeout: timeout, + logger: logger.With().Str("module", "graceful").Logger(), + } +} + +// AddService adds Service to the process. +func (p *Process) AddService(ctx context.Context, s Service) { + p.AddStarter(ctx, s.Start) + p.AddStopper(s.Stop) +} + +// AddStarter runs a function that starts something. +// This is a blocking call for blocking .Start() services +func (p *Process) AddStarter(ctx context.Context, fn func(ctx context.Context) error) { + go func() { + defer func() { + if r := recover(); r != nil { + p.logger.Error().Interface("panic", r).Msg("panic in service") + p.ShutdownNow() + } + }() + + if err := fn(ctx); err != nil { + p.logger.Error().Err(err).Msg("failed to start service") + p.ShutdownNow() + } + }() +} + +// AddStopper adds a function will be executed during shutdown. +func (p *Process) AddStopper(fn func()) { + p.mu.Lock() + defer p.mu.Unlock() + + p.stopStack = append(p.stopStack, fn) +} + +// WaitForShutdown blocks current routine until a shutdown signal is received +func (p *Process) WaitForShutdown() { + t := time.NewTicker(time.Second) + defer t.Stop() + + for { + select { + case <-p.stop: + p.ShutdownNow() + return + case <-t.C: + // another goroutine already called ShutdownNow + // safe to read w/o mutex + if p.stopped { + return + } + } + } +} + +// ShutdownNow invokes shutdown of all services. +func (p *Process) ShutdownNow() { + p.mu.Lock() + defer p.mu.Unlock() + + // noop + if p.stopped { + return + } + + defer func() { + p.stopped = true + }() + + p.logger.Info().Msg("Shutting down") + + deadline := time.After(p.timeout) + done := make(chan struct{}) + + go func() { + defer func() { + if r := recover(); r != nil { + p.logger.Error().Interface("panic", r).Msg("panic during shutdown") + } + + // complete shutdown + close(done) + }() + + // stop services in the reverse order + for i := len(p.stopStack) - 1; i >= 0; i-- { + p.stopStack[i]() + } + }() + + select { + case <-deadline: + p.logger.Info().Msgf("Shutdown interrupted by timeout (%s)", p.timeout.String()) + case <-done: + p.logger.Info().Msg("Shutdown completed") + } +} + +// NewSigChan creates a new signal channel. +func NewSigChan(signals ...os.Signal) chan os.Signal { + out := make(chan os.Signal, 1) + signal.Notify(out, signals...) + + return out +} diff --git a/pkg/graceful/graceful_test.go b/pkg/graceful/graceful_test.go new file mode 100644 index 0000000000..23de33bca2 --- /dev/null +++ b/pkg/graceful/graceful_test.go @@ -0,0 +1,273 @@ +package graceful + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" +) + +func TestProcess(t *testing.T) { + const defaultTimeout = 2 * time.Second + + ctx := context.Background() + + t.Run("Service sync", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + // ACT + // Run service + ts.process.AddService(ctx, ts.mockService) + + start := time.Now() + + // And after 1 second someone presses ctrl+c + go func() { + time.Sleep(time.Second) + ts.mockSignal <- os.Interrupt + }() + + ts.process.WaitForShutdown() + + // ASSERT + // Check that service was stopped in a timely manner + assert.Less(t, time.Since(start), defaultTimeout) + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "mock is running in blocking mode") + }) + + t.Run("Service async", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, true) + + // Run service + ts.process.AddService(ctx, ts.mockService) + + // ACT + start := time.Now() + + // And after 700ms someone presses ctrl+c + go func() { + time.Sleep(700 * time.Millisecond) + ts.mockSignal <- os.Interrupt + }() + + ts.process.WaitForShutdown() + + // ASSERT + // Check that service was stopped in a timely manner + assert.Less(t, time.Since(start), defaultTimeout) + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "mock is running in non-blocking mode") + }) + + t.Run("Manual starters and stoppers", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + // Given one starter + ts.process.AddStarter(ctx, func(ctx context.Context) error { + ts.logger.Info().Msg("Hello world") + return nil + }) + + // And two stoppers + ts.process.AddStopper(func() { + time.Sleep(200 * time.Millisecond) + ts.logger.Info().Msg("Stopper 1") + }) + + ts.process.AddStopper(func() { + time.Sleep(300 * time.Millisecond) + ts.logger.Info().Msg("Stopper 2") + }) + + // ACT + start := time.Now() + + // And after 1s someone presses ctrl+c + go func() { + time.Sleep(time.Second) + ts.mockSignal <- os.Interrupt + }() + + ts.process.WaitForShutdown() + + // ASSERT + // Check that service was stopped in a timely manner + assert.Less(t, time.Since(start), defaultTimeout) + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "Stopper 1") + assert.Contains(t, ts.logBuffer.String(), "Stopper 2") + }) + + t.Run("Starter error", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + ts.mockService.errStart = fmt.Errorf("failed to start service") + + ts.process.AddService(ctx, ts.mockService) + + // ACT + start := time.Now() + ts.process.WaitForShutdown() + + // ASSERT + // Check that service had errors and was stopped + assert.Less(t, time.Since(start), defaultTimeout) + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "failed to start service") + }) + + t.Run("Panic handling", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + ts.process.AddStarter(ctx, func(ctx context.Context) error { + panic("oopsie") + return nil + }) + + // ACT + ts.process.WaitForShutdown() + + // ASSERT + // Check that service had errors and was stopped + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "panic in service") + }) + + t.Run("WaitForShutdown noop", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + ts.process.AddService(ctx, ts.mockService) + + // ACT + ts.process.ShutdownNow() + ts.process.WaitForShutdown() + + // ASSERT + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + }) + + t.Run("Shutdown timeout", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + // Given some slow stopper + const workDuration = defaultTimeout + 5*time.Second + + ts.process.AddStopper(func() { + ts.logger.Info().Msg("Stopping something") + time.Sleep(workDuration) + ts.logger.Info().Msg("Stopped something") + }) + + // ACT + ts.process.ShutdownNow() + + // ASSERT + assert.Contains(t, ts.logBuffer.String(), "Stopping something") + assert.Contains(t, ts.logBuffer.String(), "Shutdown interrupted by timeout") + + // log doesn't contain this line because it was interrupted + assert.NotContains(t, ts.logBuffer.String(), "Stopped something") + }) +} + +type testSuite struct { + process *Process + mockService *mockService + mockSignal chan os.Signal + + logger zerolog.Logger + logBuffer *bytes.Buffer +} + +func newTestSuite(t *testing.T, timeout time.Duration, async bool) *testSuite { + logBuffer := &bytes.Buffer{} + logger := zerolog.New(io.MultiWriter(zerolog.NewTestWriter(t), logBuffer)) + + stop := NewSigChan(os.Interrupt) + process := New(timeout, logger, stop) + + return &testSuite{ + mockSignal: stop, + process: process, + logger: logger, + logBuffer: logBuffer, + mockService: &mockService{ + async: async, + Logger: logger, + }, + } +} + +type mockService struct { + errStart error + async bool + running bool + zerolog.Logger +} + +func (m *mockService) Start(_ context.Context) error { + const interval = 300 * time.Millisecond + + m.running = true + + // emulate async started + if m.async { + go func() { + for { + if m.errStart != nil || !m.running { + return + } + + m.Info().Msg("mock is running in non-blocking mode") + time.Sleep(interval) + } + }() + + return nil + } + + for { + switch { + case m.errStart != nil: + m.running = false + return m.errStart + case !m.running: + return nil + default: + m.Info().Msg("mock is running in blocking mode") + time.Sleep(interval) + } + } +} + +func (m *mockService) Stop() { + m.running = false + m.Info().Msg("Stopping mock service") +} From 6f43ea0d00c571d9d662fcba06870a008fee5167 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Thu, 12 Dec 2024 20:52:44 +0100 Subject: [PATCH 3/7] Apply graceful package to start.go --- cmd/zetaclientd/start.go | 120 +++++++++++--------------- cmd/zetaclientd/utils.go | 10 +++ pkg/graceful/graceful.go | 28 +++++- zetaclient/chains/base/logger_test.go | 2 +- zetaclient/metrics/metrics.go | 23 +++-- zetaclient/metrics/metrics_test.go | 3 +- zetaclient/metrics/telemetry.go | 14 +-- 7 files changed, 111 insertions(+), 89 deletions(-) diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 38182d1c13..397e61377a 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -5,21 +5,18 @@ import ( "net/http" _ "net/http/pprof" // #nosec G108 -- pprof enablement is intentional "os" - "os/signal" - "syscall" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/zeta-chain/node/pkg/chains" "github.com/zeta-chain/node/pkg/constant" + "github.com/zeta-chain/node/pkg/graceful" zetaos "github.com/zeta-chain/node/pkg/os" "github.com/zeta-chain/node/zetaclient/chains/base" "github.com/zeta-chain/node/zetaclient/config" zctx "github.com/zeta-chain/node/zetaclient/context" - "github.com/zeta-chain/node/zetaclient/keys" "github.com/zeta-chain/node/zetaclient/maintenance" "github.com/zeta-chain/node/zetaclient/metrics" "github.com/zeta-chain/node/zetaclient/orchestrator" @@ -59,28 +56,10 @@ func Start(_ *cobra.Command, _ []string) error { appContext := zctx.New(cfg, passes.relayerKeys(), logger.Std) ctx := zctx.WithAppContext(context.Background(), appContext) - // TODO graceful - telemetryServer := metrics.NewTelemetryServer() - go func() { - err := telemetryServer.Start() - if err != nil { - log.Fatal().Err(err).Msg("telemetryServer error") - } - }() - - m, err := metrics.NewMetrics() + telemetry, err := startTelemetry(ctx, cfg) if err != nil { - return errors.Wrap(err, "unable to create metrics") + return errors.Wrap(err, "unable to start telemetry") } - m.Start() - - metrics.Info.WithLabelValues(constant.Version).Set(1) - metrics.LastStartTime.SetToCurrentTime() - - telemetryServer.SetIPAddress(cfg.PublicIP) - - // TODO graceful - go runPprof(logger.Std) // zetacore client is used for all communication to zeta node. // it accumulates votes, and provides a source of truth for all clients @@ -118,7 +97,7 @@ func Start(_ *cobra.Command, _ []string) error { TSSKeyPassword: passes.tss, BitcoinChainIDs: btcChainIDsFromContext(appContext), PostBlame: isEnvFlagEnabled(envFlagPostBlame), - Telemetry: telemetryServer, + Telemetry: telemetry, } tss, err := zetatss.Setup(ctx, tssSetupProps, logger.Std) @@ -126,16 +105,11 @@ func Start(_ *cobra.Command, _ []string) error { return errors.Wrap(err, "unable to setup TSS service") } - // Creating a channel to listen for os signals (or other signals) - // TODO graceful - signalChannel := make(chan os.Signal, 1) - signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) - // Starts various background TSS listeners. // Shuts down zetaclientd if any is triggered. maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() { logger.Std.Info().Msg("TSS listener received an action to shutdown zetaclientd.") - signalChannel <- syscall.SIGTERM + graceful.ShutdownNow() }) // CreateSignerMap: This creates a map of all signers for each chain. @@ -148,7 +122,7 @@ func Start(_ *cobra.Command, _ []string) error { // Creates a map of all chain observers for each chain. // Each chain observer is responsible for observing events on the chain and processing them. - observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetryServer) + observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbPath, logger, telemetry) if err != nil { return errors.Wrap(err, "unable to create chain observer map") } @@ -164,55 +138,21 @@ func Start(_ *cobra.Command, _ []string) error { tss, dbPath, logger, - telemetryServer, + telemetry, ) if err != nil { return errors.Wrap(err, "unable to create orchestrator") } // Start orchestrator with all observers and signers - if err = maestro.Start(ctx); err != nil { - return errors.Wrap(err, "unable to start orchestrator") - } - - log.Info().Msg("zetaclientd is running") + graceful.AddService(ctx, maestro) - // todo graceful - sig := <-signalChannel - log.Info().Msgf("Stop signal received: %q. Stopping zetaclientd", sig) - - maestro.Stop() + // Block current routine until a shutdown signal is received + graceful.WaitForShutdown() return nil } -func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) { - // Get observer's public key ("grantee pub key") - _, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword) - if err != nil { - return "", errors.Wrap(err, "unable to get keyring key base") - } - - return granteePubKeyBech32, nil -} - -// runPprof run pprof http server -// zetacored/cometbft is already listening for runPprof on 6060 (by default) -func runPprof(logger zerolog.Logger) { - addr := os.Getenv(envPprofAddr) - if addr == "" { - addr = "localhost:6061" - } - - logger.Info().Str("addr", addr).Msg("starting pprof http server") - - // #nosec G114 -- timeouts unneeded - err := http.ListenAndServe(addr, nil) - if err != nil { - logger.Error().Err(err).Msg("pprof http server error") - } -} - type passwords struct { hotkey string tss string @@ -240,3 +180,43 @@ func (p passwords) relayerKeys() map[string]string { chains.Network_solana.String(): p.solanaRelayerKey, } } + +func startTelemetry(ctx context.Context, cfg config.Config) (*metrics.TelemetryServer, error) { + // 1. Init pprof http server + pprofServer := func(_ context.Context) error { + addr := os.Getenv(envPprofAddr) + if addr == "" { + addr = "localhost:6061" + } + + log.Info().Str("addr", addr).Msg("starting pprof http server") + + // #nosec G114 -- timeouts unneeded + err := http.ListenAndServe(addr, nil) + if err != nil { + log.Error().Err(err).Msg("pprof http server error") + } + + return nil + } + + // 2. Init metrics server + metricsServer, err := metrics.NewMetrics() + if err != nil { + return nil, errors.Wrap(err, "unable to create metrics") + } + + metrics.Info.WithLabelValues(constant.Version).Set(1) + metrics.LastStartTime.SetToCurrentTime() + + // 3. Init telemetry server + telemetry := metrics.NewTelemetryServer() + telemetry.SetIPAddress(cfg.PublicIP) + + // 4. Add services to the process + graceful.AddStarter(ctx, pprofServer) + graceful.AddService(ctx, metricsServer) + graceful.AddService(ctx, telemetry) + + return telemetry, nil +} diff --git a/cmd/zetaclientd/utils.go b/cmd/zetaclientd/utils.go index 1da8393272..c9b97dfdba 100644 --- a/cmd/zetaclientd/utils.go +++ b/cmd/zetaclientd/utils.go @@ -161,3 +161,13 @@ func btcChainIDsFromContext(app *zctx.AppContext) []int64 { return btcChainIDs } + +func resolveObserverPubKeyBech32(cfg config.Config, hotKeyPassword string) (string, error) { + // Get observer's public key ("grantee pub key") + _, granteePubKeyBech32, err := keys.GetKeyringKeybase(cfg, hotKeyPassword) + if err != nil { + return "", errors.Wrap(err, "unable to get keyring key base") + } + + return granteePubKeyBech32, nil +} diff --git a/pkg/graceful/graceful.go b/pkg/graceful/graceful.go index dc61118efc..1d2167b244 100644 --- a/pkg/graceful/graceful.go +++ b/pkg/graceful/graceful.go @@ -9,9 +9,11 @@ import ( "os" "os/signal" "sync" + "syscall" "time" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) // Process represents "virtual" process that contains @@ -34,6 +36,9 @@ type Service interface { Stop() } +// DefaultProcess is a process instance with some sane defaults. +var DefaultProcess = New(15*time.Second, log.Logger, NewSigChan(syscall.SIGINT, syscall.SIGTERM)) + // New Process constructor. func New(timeout time.Duration, logger zerolog.Logger, stop <-chan os.Signal) *Process { return &Process{ @@ -82,7 +87,8 @@ func (p *Process) WaitForShutdown() { for { select { - case <-p.stop: + case sig := <-p.stop: + p.logger.Info().Msgf("Received signal: %q", sig.String()) p.ShutdownNow() return case <-t.C: @@ -145,3 +151,23 @@ func NewSigChan(signals ...os.Signal) chan os.Signal { return out } + +func AddService(ctx context.Context, s Service) { + DefaultProcess.AddService(ctx, s) +} + +func AddStarter(ctx context.Context, fn func(ctx context.Context) error) { + DefaultProcess.AddStarter(ctx, fn) +} + +func AddStopper(fn func()) { + DefaultProcess.AddStopper(fn) +} + +func WaitForShutdown() { + DefaultProcess.WaitForShutdown() +} + +func ShutdownNow() { + DefaultProcess.ShutdownNow() +} diff --git a/zetaclient/chains/base/logger_test.go b/zetaclient/chains/base/logger_test.go index 571a84ee7d..d1b74bf0cb 100644 --- a/zetaclient/chains/base/logger_test.go +++ b/zetaclient/chains/base/logger_test.go @@ -79,7 +79,7 @@ func TestInitLogger(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // init logger - logger, err := base.InitLogger(tt.cfg) + logger, err := base.NewLogger(tt.cfg) // check if error is expected if tt.fail { diff --git a/zetaclient/metrics/metrics.go b/zetaclient/metrics/metrics.go index e614cbf676..1c2e03a21f 100644 --- a/zetaclient/metrics/metrics.go +++ b/zetaclient/metrics/metrics.go @@ -7,6 +7,7 @@ import ( "net/url" "time" + "cosmossdk.io/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -186,20 +187,24 @@ func NewMetrics() (*Metrics, error) { } // Start starts the metrics server -func (m *Metrics) Start() { +func (m *Metrics) Start(_ context.Context) error { log.Info().Msg("metrics server starting") - go func() { - if err := m.s.ListenAndServe(); err != nil { - log.Error().Err(err).Msg("fail to start metric server") - } - }() + + if err := m.s.ListenAndServe(); err != nil { + return errors.Wrap(err, "fail to start metric server") + } + + return nil } // Stop stops the metrics server -func (m *Metrics) Stop() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) +func (m *Metrics) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return m.s.Shutdown(ctx) + + if err := m.s.Shutdown(ctx); err != nil { + log.Error().Err(err).Msg("failed to shutdown metrics server") + } } // GetInstrumentedHTTPClient sets up a http client that emits prometheus metrics diff --git a/zetaclient/metrics/metrics_test.go b/zetaclient/metrics/metrics_test.go index dff41d94c8..d630b94b79 100644 --- a/zetaclient/metrics/metrics_test.go +++ b/zetaclient/metrics/metrics_test.go @@ -1,6 +1,7 @@ package metrics import ( + "context" "fmt" "io" "net/http" @@ -25,7 +26,7 @@ var _ = Suite(&MetricsSuite{}) func (ms *MetricsSuite) SetUpSuite(c *C) { m, err := NewMetrics() c.Assert(err, IsNil) - m.Start() + go m.Start(context.Background()) ms.m = m } diff --git a/zetaclient/metrics/telemetry.go b/zetaclient/metrics/telemetry.go index c78bcd565d..777d68019e 100644 --- a/zetaclient/metrics/telemetry.go +++ b/zetaclient/metrics/telemetry.go @@ -198,10 +198,11 @@ func (t *TelemetryServer) Handlers() http.Handler { } // Start starts telemetry server -func (t *TelemetryServer) Start() error { +func (t *TelemetryServer) Start(_ context.Context) error { if t.s == nil { return errors.New("invalid http server instance") } + if err := t.s.ListenAndServe(); err != nil { if !errors.Is(err, http.ErrServerClosed) { return fmt.Errorf("fail to start http server: %w", err) @@ -212,14 +213,13 @@ func (t *TelemetryServer) Start() error { } // Stop stops telemetry server -func (t *TelemetryServer) Stop() error { - c, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func (t *TelemetryServer) Stop() { + c, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - err := t.s.Shutdown(c) - if err != nil { - log.Error().Err(err).Msg("Failed to shutdown the HTTP server gracefully") + + if err := t.s.Shutdown(c); err != nil { + log.Error().Err(err).Msg("Failed to shutdown the TelemetryServer") } - return err } // pingHandler returns a 200 OK response From 9f78be024bcc3fa051bc3909540a12d005d3be2d Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Thu, 12 Dec 2024 20:55:55 +0100 Subject: [PATCH 4/7] Update changelog --- changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/changelog.md b/changelog.md index 14baffeb92..6857ab4795 100644 --- a/changelog.md +++ b/changelog.md @@ -15,6 +15,7 @@ ## Refactor * [3170](https://github.com/zeta-chain/node/pull/3170) - revamp TSS package in zetaclient +* [3291](https://github.com/zeta-chain/node/pull/3291) - revamp zetaclient initialization (+ graceful shutdown) ### Fixes From 6a8c1356c68d41e18c2fdefe3836b7427efeed99 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:56:05 +0100 Subject: [PATCH 5/7] Move zetaclient bootstrapping to client_start. Move isObserver after TSS ceremony --- cmd/zetaclientd/start.go | 25 +++--- cmd/zetaclientd/utils.go | 113 ----------------------- zetaclient/zetacore/client_start.go | 134 ++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+), 124 deletions(-) create mode 100644 zetaclient/zetacore/client_start.go diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 397e61377a..e82baec911 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -21,6 +21,7 @@ import ( "github.com/zeta-chain/node/zetaclient/metrics" "github.com/zeta-chain/node/zetaclient/orchestrator" zetatss "github.com/zeta-chain/node/zetaclient/tss" + "github.com/zeta-chain/node/zetaclient/zetacore" ) const ( @@ -63,18 +64,11 @@ func Start(_ *cobra.Command, _ []string) error { // zetacore client is used for all communication to zeta node. // it accumulates votes, and provides a source of truth for all clients - zetacoreClient, err := createZetacoreClient(cfg, passes.hotkey, logger.Std) + // + // This call crated client, ensured block production, then prepares the client + zetacoreClient, err := zetacore.NewFromConfig(ctx, &cfg, passes.hotkey, logger.Std) if err != nil { - return errors.Wrap(err, "unable to create zetacore client") - } - - // Wait until zetacore is ready to produce blocks - if err = waitForBlocks(ctx, zetacoreClient, logger.Std); err != nil { - return errors.Wrap(err, "zetacore unavailable") - } - - if err = prepareZetacoreClient(ctx, zetacoreClient, &cfg, logger.Std); err != nil { - return errors.Wrap(err, "unable to prepare zetacore client") + return errors.Wrap(err, "unable to create zetacore client from config") } // Initialize core parameters from zetacore @@ -105,6 +99,15 @@ func Start(_ *cobra.Command, _ []string) error { return errors.Wrap(err, "unable to setup TSS service") } + isObserver, err := isObserverNode(ctx, zetacoreClient) + switch { + case err != nil: + return errors.Wrap(err, "unable to check if observer node") + case !isObserver: + logger.Std.Warn().Msg("This node is not an observer node. Exit 0") + return nil + } + // Starts various background TSS listeners. // Shuts down zetaclientd if any is triggered. maintenance.NewTSSListener(zetacoreClient, logger.Std).Listen(ctx, func() { diff --git a/cmd/zetaclientd/utils.go b/cmd/zetaclientd/utils.go index c9b97dfdba..60cb7c9361 100644 --- a/cmd/zetaclientd/utils.go +++ b/cmd/zetaclientd/utils.go @@ -2,130 +2,17 @@ package main import ( "context" - "fmt" "os" "strconv" - "time" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/pkg/errors" - "github.com/rs/zerolog" - authz2 "github.com/zeta-chain/node/pkg/authz" - "github.com/zeta-chain/node/pkg/ticker" - "github.com/zeta-chain/node/zetaclient/authz" "github.com/zeta-chain/node/zetaclient/config" zctx "github.com/zeta-chain/node/zetaclient/context" "github.com/zeta-chain/node/zetaclient/keys" "github.com/zeta-chain/node/zetaclient/zetacore" ) -func createZetacoreClient(cfg config.Config, hotkeyPassword string, logger zerolog.Logger) (*zetacore.Client, error) { - hotKey := cfg.AuthzHotkey - - chainIP := cfg.ZetaCoreURL - - kb, _, err := keys.GetKeyringKeybase(cfg, hotkeyPassword) - if err != nil { - return nil, errors.Wrap(err, "failed to get keyring base") - } - - granterAddress, err := sdk.AccAddressFromBech32(cfg.AuthzGranter) - if err != nil { - return nil, errors.Wrap(err, "failed to get granter address") - } - - k := keys.NewKeysWithKeybase(kb, granterAddress, cfg.AuthzHotkey, hotkeyPassword) - - // All votes broadcasts to zetacore are wrapped in authz. - // This is to ensure that the user does not need to keep their operator key online, and can use a cold key to sign votes - signerAddress, err := k.GetAddress() - if err != nil { - return nil, errors.Wrap(err, "failed to get signer address") - } - - authz.SetupAuthZSignerList(k.GetOperatorAddress().String(), signerAddress) - - client, err := zetacore.NewClient(k, chainIP, hotKey, cfg.ChainID, logger) - if err != nil { - return nil, errors.Wrap(err, "failed to create zetacore client") - } - - return client, nil -} - -// waitForBlocks waits for zetacore to be ready (i.e. producing blocks) -func waitForBlocks(ctx context.Context, zc *zetacore.Client, logger zerolog.Logger) error { - const ( - interval = 5 * time.Second - attempts = 15 - ) - - var ( - retryCount = 0 - start = time.Now() - ) - - task := func(ctx context.Context, t *ticker.Ticker) error { - blockHeight, err := zc.GetBlockHeight(ctx) - - if err == nil && blockHeight > 1 { - logger.Info().Msgf("Zetacore is ready, block height: %d", blockHeight) - t.Stop() - return nil - } - - retryCount++ - if retryCount > attempts { - return fmt.Errorf("zetacore is not ready, timeout %s", time.Since(start).String()) - } - - logger.Info().Msgf("Failed to get block number, retry: %d/%d", retryCount, attempts) - return nil - } - - return ticker.Run(ctx, interval, task) -} - -// prepareZetacoreClient prepares the zetacore client for use. -// EXITS THE PROGRAM IF THIS NODE IS NOT AN OBSERVER. -func prepareZetacoreClient(ctx context.Context, zc *zetacore.Client, cfg *config.Config, logger zerolog.Logger) error { - // Set grantee account number and sequence number - if err := zc.SetAccountNumber(authz2.ZetaClientGranteeKey); err != nil { - return errors.Wrap(err, "failed to set account number") - } - - res, err := zc.GetNodeInfo(ctx) - if err != nil { - return errors.Wrap(err, "failed to get node info") - } - - network := res.GetDefaultNodeInfo().Network - if network != cfg.ChainID { - logger.Warn(). - Str("got", cfg.ChainID). - Str("want", network). - Msg("Zetacore chain id config mismatch. Forcing update from the network") - - cfg.ChainID = network - if err = zc.UpdateChainID(cfg.ChainID); err != nil { - return errors.Wrap(err, "failed to update chain id") - } - } - - isObserver, err := isObserverNode(ctx, zc) - switch { - case err != nil: - return errors.Wrap(err, "failed to check if this node is an observer") - case !isObserver: - addr := zc.GetKeys().GetOperatorAddress().String() - logger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0") - os.Exit(0) - } - - return nil -} - // isObserverNode checks whether THIS node is an observer node. func isObserverNode(ctx context.Context, zc *zetacore.Client) (bool, error) { observers, err := zc.GetObserverList(ctx) diff --git a/zetaclient/zetacore/client_start.go b/zetaclient/zetacore/client_start.go new file mode 100644 index 0000000000..0758278bd4 --- /dev/null +++ b/zetaclient/zetacore/client_start.go @@ -0,0 +1,134 @@ +package zetacore + +import ( + "context" + "fmt" + "time" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/pkg/errors" + "github.com/rs/zerolog" + + authz2 "github.com/zeta-chain/node/pkg/authz" + "github.com/zeta-chain/node/pkg/ticker" + "github.com/zeta-chain/node/zetaclient/authz" + "github.com/zeta-chain/node/zetaclient/config" + "github.com/zeta-chain/node/zetaclient/keys" +) + +// This file contains some high level logic for creating a zetacore client +// when starting zetaclientd in cmd/zetaclientd/start.go + +// NewFromConfig creates a new client from the given config. +// It also makes sure that the zetacore (i.e. the node) is ready to be used. +func NewFromConfig( + ctx context.Context, + cfg *config.Config, + hotkeyPassword string, + logger zerolog.Logger, +) (*Client, error) { + hotKey := cfg.AuthzHotkey + + chainIP := cfg.ZetaCoreURL + + kb, _, err := keys.GetKeyringKeybase(*cfg, hotkeyPassword) + if err != nil { + return nil, errors.Wrap(err, "failed to get keyring base") + } + + granterAddress, err := sdk.AccAddressFromBech32(cfg.AuthzGranter) + if err != nil { + return nil, errors.Wrap(err, "failed to get granter address") + } + + k := keys.NewKeysWithKeybase(kb, granterAddress, cfg.AuthzHotkey, hotkeyPassword) + + // All votes broadcasts to zetacore are wrapped in authz. + // This is to ensure that the user does not need to keep their operator key online, + // and can use a cold key to sign votes + signerAddress, err := k.GetAddress() + if err != nil { + return nil, errors.Wrap(err, "failed to get signer address") + } + + authz.SetupAuthZSignerList(k.GetOperatorAddress().String(), signerAddress) + + // Create client + client, err := NewClient(k, chainIP, hotKey, cfg.ChainID, logger) + if err != nil { + return nil, errors.Wrap(err, "failed to create the client") + } + + // Make sure that the node produces blocks + if err = ensureBlocksProduction(ctx, client); err != nil { + return nil, errors.Wrap(err, "zetacore unavailable") + } + + // Prepare the client + if err = prepareZetacoreClient(ctx, client, cfg); err != nil { + return nil, errors.Wrap(err, "failed to prepare the client") + } + + return client, nil +} + +// ensureBlocksProduction waits for zetacore to be ready (i.e. producing blocks) +func ensureBlocksProduction(ctx context.Context, zc *Client) error { + const ( + interval = 5 * time.Second + attempts = 15 + ) + + var ( + retryCount = 0 + start = time.Now() + ) + + task := func(ctx context.Context, t *ticker.Ticker) error { + blockHeight, err := zc.GetBlockHeight(ctx) + + if err == nil && blockHeight > 1 { + zc.logger.Info().Msgf("Zetacore is ready, block height: %d", blockHeight) + t.Stop() + return nil + } + + retryCount++ + if retryCount > attempts { + return fmt.Errorf("zetacore is not ready, timeout %s", time.Since(start).String()) + } + + zc.logger.Info().Msgf("Failed to get block number, retry: %d/%d", retryCount, attempts) + return nil + } + + return ticker.Run(ctx, interval, task) +} + +// prepareZetacoreClient prepares the zetacore client for use. +func prepareZetacoreClient(ctx context.Context, zc *Client, cfg *config.Config) error { + // Set grantee account number and sequence number + if err := zc.SetAccountNumber(authz2.ZetaClientGranteeKey); err != nil { + return errors.Wrap(err, "failed to set account number") + } + + res, err := zc.GetNodeInfo(ctx) + if err != nil { + return errors.Wrap(err, "failed to get node info") + } + + network := res.GetDefaultNodeInfo().Network + if network != cfg.ChainID { + zc.logger.Warn(). + Str("got", cfg.ChainID). + Str("want", network). + Msg("Zetacore chain id config mismatch. Forcing update from the network") + + cfg.ChainID = network + if err = zc.UpdateChainID(cfg.ChainID); err != nil { + return errors.Wrap(err, "failed to update chain id") + } + } + + return nil +} From e3c3587fc4ff8302450ca6b1c2537e84b5e6007f Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Fri, 13 Dec 2024 14:16:00 +0100 Subject: [PATCH 6/7] Locate panic line --- pkg/graceful/graceful.go | 21 +++++++++++++++++++-- pkg/graceful/graceful_test.go | 27 ++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/pkg/graceful/graceful.go b/pkg/graceful/graceful.go index 1d2167b244..6f9ffe4cf4 100644 --- a/pkg/graceful/graceful.go +++ b/pkg/graceful/graceful.go @@ -6,8 +6,11 @@ package graceful import ( "context" + "fmt" "os" "os/signal" + "runtime/debug" + "strings" "sync" "syscall" "time" @@ -60,7 +63,7 @@ func (p *Process) AddStarter(ctx context.Context, fn func(ctx context.Context) e go func() { defer func() { if r := recover(); r != nil { - p.logger.Error().Interface("panic", r).Msg("panic in service") + p.logger.Error().Err(panicToErr(r, 10)).Msg("panic in service") p.ShutdownNow() } }() @@ -123,7 +126,7 @@ func (p *Process) ShutdownNow() { go func() { defer func() { if r := recover(); r != nil { - p.logger.Error().Interface("panic", r).Msg("panic during shutdown") + p.logger.Error().Err(panicToErr(r, 10)).Msg("panic during shutdown") } // complete shutdown @@ -144,6 +147,20 @@ func (p *Process) ShutdownNow() { } } +// panicToErr converts panic to error WITH exact line of panic. +// Note the offset should be determined empirically. +func panicToErr(panic any, offset int) error { + stack := string(debug.Stack()) + lines := strings.Split(stack, "\n") + line := "" + + if len(lines) > offset { + line = strings.TrimSpace(lines[offset]) + } + + return fmt.Errorf("panic: %v at %s", panic, line) +} + // NewSigChan creates a new signal channel. func NewSigChan(signals ...os.Signal) chan os.Signal { out := make(chan os.Signal, 1) diff --git a/pkg/graceful/graceful_test.go b/pkg/graceful/graceful_test.go index 23de33bca2..37368cecea 100644 --- a/pkg/graceful/graceful_test.go +++ b/pkg/graceful/graceful_test.go @@ -135,7 +135,7 @@ func TestProcess(t *testing.T) { assert.Contains(t, ts.logBuffer.String(), "failed to start service") }) - t.Run("Panic handling", func(t *testing.T) { + t.Run("Panic handling during startup", func(t *testing.T) { t.Parallel() // ARRANGE @@ -153,6 +153,31 @@ func TestProcess(t *testing.T) { // Check that service had errors and was stopped assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") assert.Contains(t, ts.logBuffer.String(), "panic in service") + + // Check that error contains exact line of panic + assert.Contains(t, ts.logBuffer.String(), "graceful_test.go:145") + }) + + t.Run("Panic handling during shutdown", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t, defaultTimeout, false) + + ts.process.AddStopper(func() { + panic("bombarda maxima") + }) + + // ACT + ts.process.ShutdownNow() + + // ASSERT + // Check that service had errors and was stopped + assert.Contains(t, ts.logBuffer.String(), "Shutdown completed") + assert.Contains(t, ts.logBuffer.String(), "panic during shutdown") + + // Check that error contains exact line of panic + assert.Contains(t, ts.logBuffer.String(), "graceful_test.go:168") }) t.Run("WaitForShutdown noop", func(t *testing.T) { From b2b6cb600aabecea2401abfd72ba7c9234517bee Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Mon, 16 Dec 2024 12:26:17 +0100 Subject: [PATCH 7/7] Fix typo --- zetaclient/zetacore/client_start.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zetaclient/zetacore/client_start.go b/zetaclient/zetacore/client_start.go index 0758278bd4..af957e41c6 100644 --- a/zetaclient/zetacore/client_start.go +++ b/zetaclient/zetacore/client_start.go @@ -9,7 +9,7 @@ import ( "github.com/pkg/errors" "github.com/rs/zerolog" - authz2 "github.com/zeta-chain/node/pkg/authz" + zetaauthz "github.com/zeta-chain/node/pkg/authz" "github.com/zeta-chain/node/pkg/ticker" "github.com/zeta-chain/node/zetaclient/authz" "github.com/zeta-chain/node/zetaclient/config" @@ -108,7 +108,7 @@ func ensureBlocksProduction(ctx context.Context, zc *Client) error { // prepareZetacoreClient prepares the zetacore client for use. func prepareZetacoreClient(ctx context.Context, zc *Client, cfg *config.Config) error { // Set grantee account number and sequence number - if err := zc.SetAccountNumber(authz2.ZetaClientGranteeKey); err != nil { + if err := zc.SetAccountNumber(zetaauthz.ZetaClientGranteeKey); err != nil { return errors.Wrap(err, "failed to set account number") }