From 7d3474dffd56a43b4004719d1a9d6f8ab811d452 Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Sun, 10 Dec 2023 17:51:41 -0800 Subject: [PATCH] tmpnet: Separate node into orchestration, config and process This refactor is intended to improve maintainability by separating node into coherent constituent parts and minimizing the exported API. --- tests/e2e/faultinjection/duplicate_node_id.go | 2 +- tests/e2e/p/interchain_workflow.go | 2 +- tests/e2e/p/staking_rewards.go | 4 +- tests/fixture/e2e/env.go | 2 +- tests/fixture/e2e/helpers.go | 38 +- tests/fixture/tmpnet/README.md | 24 +- tests/fixture/tmpnet/cmd/main.go | 8 +- tests/fixture/tmpnet/defaults.go | 13 +- tests/fixture/tmpnet/network.go | 51 ++- tests/fixture/tmpnet/node.go | 383 +++++------------- tests/fixture/tmpnet/node_config.go | 100 +++++ tests/fixture/tmpnet/node_process.go | 258 ++++++++++++ tests/fixture/tmpnet/utils.go | 4 + tests/upgrade/upgrade_test.go | 7 +- 14 files changed, 543 insertions(+), 353 deletions(-) create mode 100644 tests/fixture/tmpnet/node_config.go create mode 100644 tests/fixture/tmpnet/node_process.go diff --git a/tests/e2e/faultinjection/duplicate_node_id.go b/tests/e2e/faultinjection/duplicate_node_id.go index 98e971f5bbf1..48bba147d8e0 100644 --- a/tests/e2e/faultinjection/duplicate_node_id.go +++ b/tests/e2e/faultinjection/duplicate_node_id.go @@ -49,7 +49,7 @@ var _ = ginkgo.Describe("Duplicate node handling", func() { require.ErrorIs(err, context.DeadlineExceeded) ginkgo.By("stopping the first new node") - require.NoError(node1.Stop()) + require.NoError(node1.Stop(e2e.DefaultContext())) ginkgo.By("checking that the second new node becomes healthy within timeout") e2e.WaitForHealthy(node2) diff --git a/tests/e2e/p/interchain_workflow.go b/tests/e2e/p/interchain_workflow.go index 755312ae8160..4f352c5222d8 100644 --- a/tests/e2e/p/interchain_workflow.go +++ b/tests/e2e/p/interchain_workflow.go @@ -220,7 +220,7 @@ var _ = e2e.DescribePChain("[Interchain Workflow]", ginkgo.Label(e2e.UsesCChainL require.Positive(balance.Cmp(big.NewInt(0))) ginkgo.By("stopping validator node to free up resources for a bootstrap check") - require.NoError(node.Stop()) + require.NoError(node.Stop(e2e.DefaultContext())) e2e.CheckBootstrapIsPossible(network) }) diff --git a/tests/e2e/p/staking_rewards.go b/tests/e2e/p/staking_rewards.go index c8ae29805ebf..34c178b57849 100644 --- a/tests/e2e/p/staking_rewards.go +++ b/tests/e2e/p/staking_rewards.go @@ -230,7 +230,7 @@ var _ = ginkgo.Describe("[Staking Rewards]", func() { }) ginkgo.By("stopping beta node to prevent it and its delegator from receiving a validation reward") - require.NoError(betaNode.Stop()) + require.NoError(betaNode.Stop(e2e.DefaultContext())) ginkgo.By("waiting until all validation periods are over") // The beta validator was the last added and so has the latest end time. The @@ -297,7 +297,7 @@ var _ = ginkgo.Describe("[Staking Rewards]", func() { } ginkgo.By("stopping alpha to free up resources for a bootstrap check") - require.NoError(alphaNode.Stop()) + require.NoError(alphaNode.Stop(e2e.DefaultContext())) e2e.CheckBootstrapIsPossible(network) }) diff --git a/tests/fixture/e2e/env.go b/tests/fixture/e2e/env.go index 397f6a1710d3..b1eb70905a08 100644 --- a/tests/fixture/e2e/env.go +++ b/tests/fixture/e2e/env.go @@ -132,5 +132,5 @@ func (te *TestEnvironment) NewPrivateNetwork() *tmpnet.Network { privateNetworksDir := filepath.Join(sharedNetwork.Dir, PrivateNetworksDirName) te.require.NoError(os.MkdirAll(privateNetworksDir, perms.ReadWriteExecute)) - return StartNetwork(sharedNetwork.ExecPath, privateNetworksDir) + return StartNetwork(sharedNetwork.AvalancheGoPath, privateNetworksDir) } diff --git a/tests/fixture/e2e/helpers.go b/tests/fixture/e2e/helpers.go index ba2e2cbb982a..8efc8edd0cb2 100644 --- a/tests/fixture/e2e/helpers.go +++ b/tests/fixture/e2e/helpers.go @@ -124,22 +124,21 @@ func Eventually(condition func() bool, waitFor time.Duration, tick time.Duration } } -// Add an ephemeral node that is only intended to be used by a single test. Its ID and -// URI are not intended to be returned from the Network instance to minimize -// accessibility from other tests. +// Adds an ephemeral node intended to be used by a single test. func AddEphemeralNode(network *tmpnet.Network, flags tmpnet.FlagsMap) *tmpnet.Node { require := require.New(ginkgo.GinkgoT()) - node, err := network.AddEphemeralNode(ginkgo.GinkgoWriter, flags) + ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancel() + node, err := network.AddEphemeralNode(ctx, ginkgo.GinkgoWriter, flags) require.NoError(err) - // Ensure node is stopped on teardown. It's configuration is not removed to enable - // collection in CI to aid in troubleshooting failures. ginkgo.DeferCleanup(func() { - tests.Outf("Shutting down ephemeral node %s\n", node.ID) - require.NoError(node.Stop()) + tests.Outf("shutting down ephemeral node %q\n", node.ID) + ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancel() + require.NoError(node.Stop(ctx)) }) - return node } @@ -197,26 +196,13 @@ func WithSuggestedGasPrice(ethClient ethclient.Client) common.Option { // Verify that a new node can bootstrap into the network. func CheckBootstrapIsPossible(network *tmpnet.Network) { - require := require.New(ginkgo.GinkgoT()) - if len(os.Getenv(SkipBootstrapChecksEnvName)) > 0 { tests.Outf("{{yellow}}Skipping bootstrap check due to the %s env var being set", SkipBootstrapChecksEnvName) return } ginkgo.By("checking if bootstrap is possible with the current network state") - // Call network.AddEphemeralNode instead of AddEphemeralNode to support - // checking for bootstrap implicitly on teardown via a function registered - // with ginkgo.DeferCleanup. It's not possible to call DeferCleanup from - // within a function called by DeferCleanup. - node, err := network.AddEphemeralNode(ginkgo.GinkgoWriter, tmpnet.FlagsMap{}) - require.NoError(err) - - defer func() { - tests.Outf("Shutting down ephemeral node %s\n", node.ID) - require.NoError(node.Stop()) - }() - + node := AddEphemeralNode(network, tmpnet.FlagsMap{}) WaitForHealthy(node) } @@ -230,7 +216,7 @@ func StartNetwork(avalancheGoExecPath string, networkDir string) *tmpnet.Network networkDir, &tmpnet.Network{ NodeRuntimeConfig: tmpnet.NodeRuntimeConfig{ - ExecPath: avalancheGoExecPath, + AvalancheGoPath: avalancheGoExecPath, }, }, tmpnet.DefaultNodeCount, @@ -239,7 +225,9 @@ func StartNetwork(avalancheGoExecPath string, networkDir string) *tmpnet.Network require.NoError(err) ginkgo.DeferCleanup(func() { tests.Outf("Shutting down network\n") - require.NoError(network.Stop()) + ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout) + defer cancel() + require.NoError(network.Stop(ctx)) }) tests.Outf("{{green}}Successfully started network{{/}}\n") diff --git a/tests/fixture/tmpnet/README.md b/tests/fixture/tmpnet/README.md index dfe430ea10a0..879c90a34e38 100644 --- a/tests/fixture/tmpnet/README.md +++ b/tests/fixture/tmpnet/README.md @@ -1,4 +1,4 @@ -# tmpnet (temporary network fixture) +# tmpnet - temporary network orchestration This package implements a simple orchestrator for the avalanchego nodes of a temporary network. Configuration is stored on disk, and @@ -31,6 +31,8 @@ the following non-test files: | genesis.go | | Creates test genesis | | network.go | Network | Orchestrates and configures temporary networks | | node.go | Node | Orchestrates and configures nodes | +| node_config.go | Node | Reads and writes node configuration | +| node_process.go | NodeProcess | Orchestrates node processes | | utils.go | | Defines shared utility functions | ## Usage @@ -147,9 +149,10 @@ HOME ├── NodeID-37E8UK3x2YFsHE3RdALmfWcppcZ1eTuj9 // The ID of a node is the name of its data dir │ ├── chainData │ │ └── ... - │ ├── config.json // Node flags + │ ├── config.json // Node runtime configuration │ ├── db │ │ └── ... + │ ├── flags.json // Node flags │ ├── logs │ │ └── ... │ ├── plugins @@ -160,11 +163,7 @@ HOME │ └── config.json // C-Chain config for all nodes ├── defaults.json // Default flags and configuration for network ├── genesis.json // Genesis for all nodes - ├── network.env // Sets network dir env to simplify use of network - └── ephemeral // Parent directory for ephemeral nodes (e.g. created by tests) - └─ NodeID-FdxnAvr4jK9XXAwsYZPgWAHW2QnwSZ // Data dir for an ephemeral node - └── ... - + └── network.env // Sets network dir env var to simplify network usage ``` ### Default flags and configuration @@ -210,12 +209,19 @@ The data dir for a node is set by default to non-default path by explicitly setting the `--data-dir` flag. +#### Runtime config + +The details required to configure a node's execution are written to +`[network-path]/[node-id]/config.json`. This file contains the +runtime-specific details like the path of the avalanchego binary to +start the node with. + #### Flags All flags used to configure a node are written to -`[network-path]/[node-id]/config.json` so that a node can be +`[network-path]/[node-id]/flags.json` so that a node can be configured with only a single argument: -`--config-file=/path/to/config.json`. This simplifies node launch and +`--config-file=/path/to/flags.json`. This simplifies node launch and ensures all parameters used to launch a node can be modified by editing the config file. diff --git a/tests/fixture/tmpnet/cmd/main.go b/tests/fixture/tmpnet/cmd/main.go index 0fe8435bb49e..610f400844ce 100644 --- a/tests/fixture/tmpnet/cmd/main.go +++ b/tests/fixture/tmpnet/cmd/main.go @@ -62,10 +62,10 @@ func main() { network := &tmpnet.Network{ NodeRuntimeConfig: tmpnet.NodeRuntimeConfig{ - ExecPath: execPath, + AvalancheGoPath: execPath, }, } - ctx, cancel := context.WithTimeout(context.Background(), tmpnet.DefaultNetworkStartTimeout) + ctx, cancel := context.WithTimeout(context.Background(), tmpnet.DefaultNetworkTimeout) defer cancel() network, err := tmpnet.StartNetwork(ctx, os.Stdout, rootDir, network, int(nodeCount), int(preFundedKeyCount)) if err != nil { @@ -105,7 +105,9 @@ func main() { if len(networkDir) == 0 { return errNetworkDirRequired } - if err := tmpnet.StopNetwork(networkDir); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), tmpnet.DefaultNetworkTimeout) + defer cancel() + if err := tmpnet.StopNetwork(ctx, networkDir); err != nil { return err } fmt.Fprintf(os.Stdout, "Stopped network configured at: %s\n", networkDir) diff --git a/tests/fixture/tmpnet/defaults.go b/tests/fixture/tmpnet/defaults.go index 1ca62e26bd4f..9e5d70916db7 100644 --- a/tests/fixture/tmpnet/defaults.go +++ b/tests/fixture/tmpnet/defaults.go @@ -12,13 +12,12 @@ import ( const ( // Constants defining the names of shell variables whose value can // configure temporary network orchestration. - AvalancheGoPathEnvName = "AVALANCHEGO_PATH" - NetworkDirEnvName = "TMPNET_NETWORK_DIR" - RootDirEnvName = "TMPNET_ROOT_DIR" + NetworkDirEnvName = "TMPNET_NETWORK_DIR" + RootDirEnvName = "TMPNET_ROOT_DIR" - DefaultNetworkStartTimeout = 2 * time.Minute - DefaultNodeInitTimeout = 10 * time.Second - DefaultNodeStopTimeout = 5 * time.Second + DefaultNetworkTimeout = 2 * time.Minute + DefaultNodeInitTimeout = 10 * time.Second + DefaultNodeStopTimeout = 5 * time.Second // Minimum required to ensure connectivity-based health checks will pass DefaultNodeCount = 2 @@ -28,6 +27,8 @@ const ( // A short minimum stake duration enables testing of staking logic. DefaultMinStakeDuration = time.Second + + defaultConfigFilename = "config.json" ) // A set of flags appropriate for testing. diff --git a/tests/fixture/tmpnet/network.go b/tests/fixture/tmpnet/network.go index d2cb6f97260f..9b406f7e316c 100644 --- a/tests/fixture/tmpnet/network.go +++ b/tests/fixture/tmpnet/network.go @@ -97,11 +97,11 @@ type Network struct { } // Adds a backend-agnostic ephemeral node to the network -func (n *Network) AddEphemeralNode(w io.Writer, flags FlagsMap) (*Node, error) { +func (n *Network) AddEphemeralNode(ctx context.Context, w io.Writer, flags FlagsMap) (*Node, error) { if flags == nil { flags = FlagsMap{} } - return n.AddNode(w, &Node{ + return n.AddNode(ctx, w, &Node{ Flags: flags, }, true /* isEphemeral */) } @@ -116,7 +116,7 @@ func StartNetwork( nodeCount int, keyCount int, ) (*Network, error) { - if _, err := fmt.Fprintf(w, "Preparing configuration for new temporary network with %s\n", network.ExecPath); err != nil { + if _, err := fmt.Fprintf(w, "Preparing configuration for new temporary network with %s\n", network.AvalancheGoPath); err != nil { return nil, err } @@ -198,12 +198,12 @@ func ReadNetwork(dir string) (*Network, error) { } // Stop the nodes of the network configured in the provided directory. -func StopNetwork(dir string) error { +func StopNetwork(ctx context.Context, dir string) error { network, err := ReadNetwork(dir) if err != nil { return err } - return network.Stop() + return network.Stop(ctx) } // Ensure the network has the configuration it needs to start. @@ -291,8 +291,15 @@ func (n *Network) PopulateNodeConfig(node *Node, nodeParentDir string) error { return err } + // Ensure the node is configured with a runtime config + if node.RuntimeConfig == nil { + node.RuntimeConfig = &NodeRuntimeConfig{ + AvalancheGoPath: n.AvalancheGoPath, + } + } + // Ensure the node's data dir is configured - dataDir := node.GetDataDir() + dataDir := node.getDataDir() if len(dataDir) == 0 { // NodeID will have been set by EnsureKeys dataDir = filepath.Join(nodeParentDir, node.ID.String()) @@ -331,7 +338,7 @@ func (n *Network) Start(w io.Writer) error { node.SetNetworkingConfig(0, 0, bootstrapIDs, bootstrapIPs) // Write configuration to disk in preparation for node start - if err := node.WriteConfig(); err != nil { + if err := node.Write(); err != nil { return err } @@ -340,7 +347,7 @@ func (n *Network) Start(w io.Writer) error { // its staking port. The network will start faster with this // synchronization due to the avoidance of exponential backoff // if a node tries to connect to a beacon that is not ready. - if err := node.Start(w, n.ExecPath); err != nil { + if err := node.Start(w); err != nil { return err } @@ -406,11 +413,11 @@ func (n *Network) GetURIs() []NodeURI { } // Stop all nodes in the network. -func (n *Network) Stop() error { +func (n *Network) Stop(ctx context.Context) error { var errs []error // Assume the nodes are loaded and the pids are current for _, node := range n.Nodes { - if err := node.Stop(); err != nil { + if err := node.Stop(ctx); err != nil { errs = append(errs, fmt.Errorf("failed to stop node %s: %w", node.ID, err)) } } @@ -476,9 +483,9 @@ func (n *Network) WriteCChainConfig() error { // Used to marshal/unmarshal persistent network defaults. type networkDefaults struct { - Flags FlagsMap - ExecPath string - PreFundedKeys []*secp256k1.PrivateKey + Flags FlagsMap + AvalancheGoPath string + PreFundedKeys []*secp256k1.PrivateKey } func (n *Network) GetDefaultsPath() string { @@ -495,16 +502,16 @@ func (n *Network) ReadDefaults() error { return fmt.Errorf("failed to unmarshal defaults: %w", err) } n.DefaultFlags = defaults.Flags - n.ExecPath = defaults.ExecPath + n.AvalancheGoPath = defaults.AvalancheGoPath n.PreFundedKeys = defaults.PreFundedKeys return nil } func (n *Network) WriteDefaults() error { defaults := networkDefaults{ - Flags: n.DefaultFlags, - ExecPath: n.ExecPath, - PreFundedKeys: n.PreFundedKeys, + Flags: n.DefaultFlags, + AvalancheGoPath: n.AvalancheGoPath, + PreFundedKeys: n.PreFundedKeys, } bytes, err := DefaultJSONMarshal(defaults) if err != nil { @@ -534,7 +541,7 @@ func (n *Network) WriteEnvFile() error { func (n *Network) WriteNodes() error { for _, node := range n.Nodes { - if err := node.WriteConfig(); err != nil { + if err := node.Write(); err != nil { return err } } @@ -611,7 +618,7 @@ func (n *Network) ReadAll() error { return n.ReadNodes() } -func (n *Network) AddNode(w io.Writer, node *Node, isEphemeral bool) (*Node, error) { +func (n *Network) AddNode(ctx context.Context, w io.Writer, node *Node, isEphemeral bool) (*Node, error) { // Assume network configuration has been written to disk and is current in memory if node == nil { @@ -650,15 +657,15 @@ func (n *Network) AddNode(w io.Writer, node *Node, isEphemeral bool) (*Node, err ) node.SetNetworkingConfig(httpPort, stakingPort, bootstrapIDs, bootstrapIPs) - if err := node.WriteConfig(); err != nil { + if err := node.Write(); err != nil { return nil, err } - err = node.Start(w, n.ExecPath) + err = node.Start(w) if err != nil { // Attempt to stop an unhealthy node to provide some assurance to the caller // that an error condition will not result in a lingering process. - stopErr := node.Stop() + stopErr := node.Stop(ctx) if stopErr != nil { err = errors.Join(err, stopErr) } diff --git a/tests/fixture/tmpnet/node.go b/tests/fixture/tmpnet/node.go index c25e16d403cd..36f2d03c292f 100644 --- a/tests/fixture/tmpnet/node.go +++ b/tests/fixture/tmpnet/node.go @@ -6,59 +6,74 @@ package tmpnet import ( "context" "encoding/base64" - "encoding/json" "errors" "fmt" "io" - "io/fs" - "net" "os" - "os/exec" "path/filepath" "strings" - "syscall" "time" "github.com/spf13/cast" - "github.com/ava-labs/avalanchego/api/health" "github.com/ava-labs/avalanchego/config" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/node" "github.com/ava-labs/avalanchego/staking" "github.com/ava-labs/avalanchego/utils/crypto/bls" - "github.com/ava-labs/avalanchego/utils/perms" "github.com/ava-labs/avalanchego/vms/platformvm/signer" ) +// The Node type is defined in this file (node.go - orchestration) and +// node_config.go (reading/writing configuration). + +const ( + defaultNodeTickerInterval = 50 * time.Millisecond +) + var ( - errNodeAlreadyRunning = errors.New("failed to start node : node is already running") errMissingTLSKeyForNodeID = fmt.Errorf("failed to ensure node ID: missing value for %q", config.StakingTLSKeyContentKey) errMissingCertForNodeID = fmt.Errorf("failed to ensure node ID: missing value for %q", config.StakingCertContentKey) errInvalidKeypair = fmt.Errorf("%q and %q must be provided together or not at all", config.StakingTLSKeyContentKey, config.StakingCertContentKey) ) -// Defines configuration to execute a node. -// -// TODO(marun) Support persisting this configuration per-node when -// node restart is implemented. Currently it can be supplied for node -// start but won't survive restart. +// NodeRuntime defines the methods required to support running a node. +type NodeRuntime interface { + readState() error + Start(w io.Writer) error + InitiateStop() error + WaitForStopped(ctx context.Context) error + IsHealthy(ctx context.Context) (bool, error) +} + +// Configuration required to configure a node runtime. type NodeRuntimeConfig struct { - // Path to avalanchego binary - ExecPath string + AvalancheGoPath string } -// Stores the configuration and process details of a node in a temporary network. +// Node supports configuring and running a node participating in a temporary network. type Node struct { - NodeRuntimeConfig - node.NodeProcessContext + // Set by EnsureNodeID which is also called when the node is read. + ID ids.NodeID - ID ids.NodeID + // Flags that will be supplied to the node at startup Flags FlagsMap - // Configuration is intended to be stored at the path identified in NodeConfig.Flags[config.DataDirKey] + // An ephemeral node is not expected to be a persistent member of the network and + // should therefore not be used as for bootstrapping purposes. + IsEphemeral bool + + // The configuration used to initialize the node runtime. + RuntimeConfig *NodeRuntimeConfig + + // Runtime state, intended to be set by NodeRuntime + URI string + StakingAddress string + + // Initialized on demand + runtime NodeRuntime } +// Initializes a new node with only the data dir set func NewNode(dataDir string) *Node { return &Node{ Flags: FlagsMap{ @@ -67,272 +82,86 @@ func NewNode(dataDir string) *Node { } } -// Attempt to read configuration and process details for a node -// from the specified directory. +// Reads a node's configuration from the specified directory. func ReadNode(dataDir string) (*Node, error) { node := NewNode(dataDir) - if _, err := os.Stat(node.GetConfigPath()); err != nil { - return nil, fmt.Errorf("failed to read node config file: %w", err) - } - return node, node.ReadAll() -} - -func (n *Node) GetDataDir() string { - return cast.ToString(n.Flags[config.DataDirKey]) + return node, node.Read() } -func (n *Node) GetConfigPath() string { - return filepath.Join(n.GetDataDir(), "config.json") -} +// Reads nodes from the specified network directory. +func ReadNodes(networkDir string) ([]*Node, error) { + nodes := []*Node{} -func (n *Node) ReadConfig() error { - bytes, err := os.ReadFile(n.GetConfigPath()) + // Node configuration is stored in child directories + entries, err := os.ReadDir(networkDir) if err != nil { - return fmt.Errorf("failed to read node config: %w", err) - } - flags := FlagsMap{} - if err := json.Unmarshal(bytes, &flags); err != nil { - return fmt.Errorf("failed to unmarshal node config: %w", err) - } - n.Flags = flags - if err := n.EnsureNodeID(); err != nil { - return err + return nil, fmt.Errorf("failed to read dir: %w", err) } - return nil -} + for _, entry := range entries { + if !entry.IsDir() { + continue + } -func (n *Node) WriteConfig() error { - if err := os.MkdirAll(n.GetDataDir(), perms.ReadWriteExecute); err != nil { - return fmt.Errorf("failed to create node dir: %w", err) - } + nodeDir := filepath.Join(networkDir, entry.Name()) + node, err := ReadNode(nodeDir) + if errors.Is(err, os.ErrNotExist) { + // If no config file exists, assume this is not the path of a node + continue + } else if err != nil { + return nil, err + } - bytes, err := DefaultJSONMarshal(n.Flags) - if err != nil { - return fmt.Errorf("failed to marshal node config: %w", err) + nodes = append(nodes, node) } - if err := os.WriteFile(n.GetConfigPath(), bytes, perms.ReadWrite); err != nil { - return fmt.Errorf("failed to write node config: %w", err) - } - return nil + return nodes, nil } -func (n *Node) GetProcessContextPath() string { - return filepath.Join(n.GetDataDir(), config.DefaultProcessContextFilename) +// Retrieves the runtime for the node. +func (n *Node) getRuntime() NodeRuntime { + if n.runtime == nil { + n.runtime = &NodeProcess{ + node: n, + } + } + return n.runtime } -func (n *Node) ReadProcessContext() error { - path := n.GetProcessContextPath() - if _, err := os.Stat(path); errors.Is(err, fs.ErrNotExist) { - // The absence of the process context file indicates the node is not running - n.NodeProcessContext = node.NodeProcessContext{} - return nil - } +// Runtime methods - bytes, err := os.ReadFile(path) - if err != nil { - return fmt.Errorf("failed to read node process context: %w", err) - } - processContext := node.NodeProcessContext{} - if err := json.Unmarshal(bytes, &processContext); err != nil { - return fmt.Errorf("failed to unmarshal node process context: %w", err) - } - n.NodeProcessContext = processContext - return nil +func (n *Node) IsHealthy(ctx context.Context) (bool, error) { + return n.getRuntime().IsHealthy(ctx) } -func (n *Node) ReadAll() error { - if err := n.ReadConfig(); err != nil { - return err - } - return n.ReadProcessContext() +func (n *Node) Start(w io.Writer) error { + return n.getRuntime().Start(w) } -func (n *Node) Start(w io.Writer, defaultExecPath string) error { - // Avoid attempting to start an already running node. - proc, err := n.GetProcess() - if err != nil { - return fmt.Errorf("failed to start node: %w", err) - } - if proc != nil { - return errNodeAlreadyRunning - } - - // Ensure a stale process context file is removed so that the - // creation of a new file can indicate node start. - if err := os.Remove(n.GetProcessContextPath()); err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("failed to remove stale process context file: %w", err) - } - - execPath := n.ExecPath - if len(execPath) == 0 { - execPath = defaultExecPath - } - - cmd := exec.Command(execPath, "--config-file", n.GetConfigPath()) - if err := cmd.Start(); err != nil { - return err - } - - // Determine appropriate level of node description detail - nodeDescription := fmt.Sprintf("node %q", n.ID) - isEphemeralNode := filepath.Base(filepath.Dir(n.GetDataDir())) == defaultEphemeralDirName - if isEphemeralNode { - nodeDescription = "ephemeral " + nodeDescription - } - nonDefaultNodeDir := filepath.Base(n.GetDataDir()) != n.ID.String() - if nonDefaultNodeDir { - // Only include the data dir if its base is not the default (the node ID) - nodeDescription = fmt.Sprintf("%s with path: %s", nodeDescription, n.GetDataDir()) - } - - go func() { - if err := cmd.Wait(); err != nil { - if err.Error() != "signal: killed" { - _, _ = fmt.Fprintf(w, "%s finished with error: %v\n", nodeDescription, err) - } - } - _, _ = fmt.Fprintf(w, "%s exited\n", nodeDescription) - }() - - // A node writes a process context file on start. If the file is not - // found in a reasonable amount of time, the node is unlikely to have - // started successfully. - if err := n.WaitForProcessContext(context.Background()); err != nil { - return fmt.Errorf("failed to start node: %w", err) - } - - _, err = fmt.Fprintf(w, "Started %s\n", nodeDescription) - return err +func (n *Node) InitiateStop() error { + return n.getRuntime().InitiateStop() } -// Retrieve the node process if it is running. As part of determining -// process liveness, the node's process context will be refreshed if -// live or cleared if not running. -func (n *Node) GetProcess() (*os.Process, error) { - // Read the process context to ensure freshness. The node may have - // stopped or been restarted since last read. - if err := n.ReadProcessContext(); err != nil { - return nil, fmt.Errorf("failed to read process context: %w", err) - } - - if n.PID == 0 { - // Process is not running - return nil, nil - } - - proc, err := os.FindProcess(n.PID) - if err != nil { - return nil, fmt.Errorf("failed to find process: %w", err) - } - - // Sending 0 will not actually send a signal but will perform - // error checking. - err = proc.Signal(syscall.Signal(0)) - if err == nil { - // Process is running - return proc, nil - } - if errors.Is(err, os.ErrProcessDone) { - // Process is not running - return nil, nil - } - return nil, fmt.Errorf("failed to determine process status: %w", err) +func (n *Node) WaitForStopped(ctx context.Context) error { + return n.getRuntime().WaitForStopped(ctx) } -// Signals the node process to stop and waits for the node process to -// stop running. -func (n *Node) Stop() error { - proc, err := n.GetProcess() - if err != nil { - return fmt.Errorf("failed to retrieve process to stop: %w", err) - } - if proc == nil { - // Already stopped - return nil - } - if err := proc.Signal(syscall.SIGTERM); err != nil { - return fmt.Errorf("failed to send SIGTERM to pid %d: %w", n.PID, err) - } - - // Wait for the node process to stop - ticker := time.NewTicker(DefaultNodeTickerInterval) - defer ticker.Stop() - ctx, cancel := context.WithTimeout(context.Background(), DefaultNodeStopTimeout) - defer cancel() - for { - proc, err := n.GetProcess() - if err != nil { - return fmt.Errorf("failed to retrieve process: %w", err) - } - if proc == nil { - return nil - } - - select { - case <-ctx.Done(): - return fmt.Errorf("failed to see node process stop %q before timeout: %w", n.ID, ctx.Err()) - case <-ticker.C: - } - } +func (n *Node) readState() error { + return n.getRuntime().readState() } -func (n *Node) IsHealthy(ctx context.Context) (bool, error) { - // Check that the node process is running as a precondition for - // checking health. GetProcess will also ensure that the node's - // API URI is current. - proc, err := n.GetProcess() - if err != nil { - return false, fmt.Errorf("failed to determine process status: %w", err) - } - if proc == nil { - return false, ErrNotRunning - } - - // Check that the node is reporting healthy - health, err := health.NewClient(n.URI).Health(ctx, nil) - if err == nil { - return health.Healthy, nil - } - - switch t := err.(type) { - case *net.OpError: - if t.Op == "read" { - // Connection refused - potentially recoverable - return false, nil - } - case syscall.Errno: - if t == syscall.ECONNREFUSED { - // Connection refused - potentially recoverable - return false, nil - } - } - // Assume all other errors are not recoverable - return false, fmt.Errorf("failed to query node health: %w", err) +func (n *Node) getDataDir() string { + return cast.ToString(n.Flags[config.DataDirKey]) } -func (n *Node) WaitForProcessContext(ctx context.Context) error { - ticker := time.NewTicker(DefaultNodeTickerInterval) - defer ticker.Stop() - - ctx, cancel := context.WithTimeout(ctx, DefaultNodeInitTimeout) - defer cancel() - for len(n.URI) == 0 { - err := n.ReadProcessContext() - if err != nil { - return fmt.Errorf("failed to read process context for node %q: %w", n.ID, err) - } - - select { - case <-ctx.Done(): - return fmt.Errorf("failed to load process context for node %q before timeout: %w", n.ID, ctx.Err()) - case <-ticker.C: - } +// Initiates node shutdown and waits for the node to stop. +func (n *Node) Stop(ctx context.Context) error { + if err := n.InitiateStop(); err != nil { + return err } - return nil + return n.WaitForStopped(ctx) } -// Convenience method for setting networking flags. +// Sets networking configuration for the node. func (n *Node) SetNetworkingConfig( httpPort uint16, stakingPort uint16, @@ -354,28 +183,9 @@ func (n *Node) EnsureKeys() error { if err := n.EnsureStakingKeypair(); err != nil { return err } - // Once a staking keypair is guaranteed it is safe to derive the node ID return n.EnsureNodeID() } -// Derives the nodes proof-of-possession. Requires the node to have a -// BLS signing key. -func (n *Node) GetProofOfPossession() (*signer.ProofOfPossession, error) { - signingKey, err := n.Flags.GetStringVal(config.StakingSignerKeyContentKey) - if err != nil { - return nil, err - } - signingKeyBytes, err := base64.StdEncoding.DecodeString(signingKey) - if err != nil { - return nil, err - } - secretKey, err := bls.SecretKeyFromBytes(signingKeyBytes) - if err != nil { - return nil, err - } - return signer.NewProofOfPossession(secretKey), nil -} - // Ensures a BLS signing key is generated if not already present. func (n *Node) EnsureBLSSigningKey() error { // Attempt to retrieve an existing key @@ -425,15 +235,28 @@ func (n *Node) EnsureStakingKeypair() error { return errInvalidKeypair } - err = n.EnsureNodeID() + return nil +} + +// Derives the nodes proof-of-possession. Requires the node to have a +// BLS signing key. +func (n *Node) GetProofOfPossession() (*signer.ProofOfPossession, error) { + signingKey, err := n.Flags.GetStringVal(config.StakingSignerKeyContentKey) if err != nil { - return fmt.Errorf("failed to derive a node ID: %w", err) + return nil, err } - - return nil + signingKeyBytes, err := base64.StdEncoding.DecodeString(signingKey) + if err != nil { + return nil, err + } + secretKey, err := bls.SecretKeyFromBytes(signingKeyBytes) + if err != nil { + return nil, err + } + return signer.NewProofOfPossession(secretKey), nil } -// Attempt to derive the node ID from the node configuration. +// Derives the node ID. Requires that a tls keypair is present. func (n *Node) EnsureNodeID() error { keyKey := config.StakingTLSKeyContentKey certKey := config.StakingCertContentKey diff --git a/tests/fixture/tmpnet/node_config.go b/tests/fixture/tmpnet/node_config.go new file mode 100644 index 000000000000..8771ce48f2bd --- /dev/null +++ b/tests/fixture/tmpnet/node_config.go @@ -0,0 +1,100 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package tmpnet + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/ava-labs/avalanchego/utils/perms" +) + +// The Node type is defined in this file node_config.go +// (reading/writing configuration) and node.go (orchestration). + +func (n *Node) getFlagsPath() string { + return filepath.Join(n.getDataDir(), "flags.json") +} + +func (n *Node) readFlags() error { + bytes, err := os.ReadFile(n.getFlagsPath()) + if err != nil { + return fmt.Errorf("failed to read node flags: %w", err) + } + flags := FlagsMap{} + if err := json.Unmarshal(bytes, &flags); err != nil { + return fmt.Errorf("failed to unmarshal node flags: %w", err) + } + n.Flags = flags + return n.EnsureNodeID() +} + +func (n *Node) writeFlags() error { + bytes, err := DefaultJSONMarshal(n.Flags) + if err != nil { + return fmt.Errorf("failed to marshal node flags: %w", err) + } + if err := os.WriteFile(n.getFlagsPath(), bytes, perms.ReadWrite); err != nil { + return fmt.Errorf("failed to write node flags: %w", err) + } + return nil +} + +func (n *Node) getConfigPath() string { + return filepath.Join(n.getDataDir(), defaultConfigFilename) +} + +func (n *Node) readConfig() error { + bytes, err := os.ReadFile(n.getConfigPath()) + if err != nil { + return fmt.Errorf("failed to read node config: %w", err) + } + if err := json.Unmarshal(bytes, n); err != nil { + return fmt.Errorf("failed to unmarshal node config: %w", err) + } + return nil +} + +type serializedNodeConfig struct { + IsEphemeral bool + RuntimeConfig *NodeRuntimeConfig +} + +func (n *Node) writeConfig() error { + config := serializedNodeConfig{ + IsEphemeral: n.IsEphemeral, + RuntimeConfig: n.RuntimeConfig, + } + bytes, err := DefaultJSONMarshal(config) + if err != nil { + return fmt.Errorf("failed to marshal node config: %w", err) + } + if err := os.WriteFile(n.getConfigPath(), bytes, perms.ReadWrite); err != nil { + return fmt.Errorf("failed to write node config: %w", err) + } + return nil +} + +func (n *Node) Read() error { + if err := n.readFlags(); err != nil { + return err + } + if err := n.readConfig(); err != nil { + return err + } + return n.readState() +} + +func (n *Node) Write() error { + if err := os.MkdirAll(n.getDataDir(), perms.ReadWriteExecute); err != nil { + return fmt.Errorf("failed to create node dir: %w", err) + } + + if err := n.writeFlags(); err != nil { + return nil + } + return n.writeConfig() +} diff --git a/tests/fixture/tmpnet/node_process.go b/tests/fixture/tmpnet/node_process.go new file mode 100644 index 000000000000..b24bba37ae85 --- /dev/null +++ b/tests/fixture/tmpnet/node_process.go @@ -0,0 +1,258 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package tmpnet + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net" + "os" + "os/exec" + "path/filepath" + "syscall" + "time" + + "github.com/ava-labs/avalanchego/api/health" + "github.com/ava-labs/avalanchego/config" + "github.com/ava-labs/avalanchego/node" +) + +const ( + AvalancheGoPathEnvName = "AVALANCHEGO_PATH" + + defaultNodeInitTimeout = 10 * time.Second +) + +var errNodeAlreadyRunning = errors.New("failed to start node: node is already running") + +func checkNodeHealth(ctx context.Context, uri string) (bool, error) { + // Check that the node is reporting healthy + health, err := health.NewClient(uri).Health(ctx, nil) + if err == nil { + return health.Healthy, nil + } + + switch t := err.(type) { + case *net.OpError: + if t.Op == "read" { + // Connection refused - potentially recoverable + return false, nil + } + case syscall.Errno: + if t == syscall.ECONNREFUSED { + // Connection refused - potentially recoverable + return false, nil + } + } + // Assume all other errors are not recoverable + return false, fmt.Errorf("failed to query node health: %w", err) +} + +// Defines local-specific node configuration. Supports setting default +// and node-specific values. +type NodeProcess struct { + node *Node + + // PID of the node process + pid int +} + +func (p *NodeProcess) setProcessContext(processContext node.NodeProcessContext) { + p.pid = processContext.PID + p.node.URI = processContext.URI + p.node.StakingAddress = processContext.StakingAddress +} + +func (p *NodeProcess) readState() error { + path := p.getProcessContextPath() + if _, err := os.Stat(path); errors.Is(err, fs.ErrNotExist) { + // The absence of the process context file indicates the node is not running + p.setProcessContext(node.NodeProcessContext{}) + return nil + } + + bytes, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("failed to read node process context: %w", err) + } + processContext := node.NodeProcessContext{} + if err := json.Unmarshal(bytes, &processContext); err != nil { + return fmt.Errorf("failed to unmarshal node process context: %w", err) + } + p.setProcessContext(processContext) + return nil +} + +// Start waits for the process context to be written which +// indicates that the node will be accepting connections on +// its staking port. The network will start faster with this +// synchronization due to the avoidance of exponential backoff +// if a node tries to connect to a beacon that is not ready. +func (p *NodeProcess) Start(w io.Writer) error { + // Avoid attempting to start an already running node. + proc, err := p.getProcess() + if err != nil { + return fmt.Errorf("failed to start node process: %w", err) + } + if proc != nil { + return errNodeAlreadyRunning + } + + // Ensure a stale process context file is removed so that the + // creation of a new file can indicate node start. + if err := os.Remove(p.getProcessContextPath()); err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("failed to remove stale process context file: %w", err) + } + + cmd := exec.Command(p.node.RuntimeConfig.AvalancheGoPath, "--config-file", p.node.getFlagsPath()) // #nosec G204 + if err := cmd.Start(); err != nil { + return err + } + + // Determine appropriate level of node description detail + dataDir := p.node.getDataDir() + nodeDescription := fmt.Sprintf("node %q", p.node.ID) + if p.node.IsEphemeral { + nodeDescription = "ephemeral " + nodeDescription + } + nonDefaultNodeDir := filepath.Base(dataDir) != p.node.ID.String() + if nonDefaultNodeDir { + // Only include the data dir if its base is not the default (the node ID) + nodeDescription = fmt.Sprintf("%s with path: %s", nodeDescription, dataDir) + } + + go func() { + if err := cmd.Wait(); err != nil { + if err.Error() != "signal: killed" { + _, _ = fmt.Fprintf(w, "%s finished with error: %v\n", nodeDescription, err) + } + } + _, _ = fmt.Fprintf(w, "%s exited\n", nodeDescription) + }() + + // A node writes a process context file on start. If the file is not + // found in a reasonable amount of time, the node is unlikely to have + // started successfully. + if err := p.waitForProcessContext(context.Background()); err != nil { + return fmt.Errorf("failed to start local node: %w", err) + } + + _, err = fmt.Fprintf(w, "Started %s\n", nodeDescription) + return err +} + +// Signals the node process to stop. +func (p *NodeProcess) InitiateStop() error { + proc, err := p.getProcess() + if err != nil { + return fmt.Errorf("failed to retrieve process to stop: %w", err) + } + if proc == nil { + // Already stopped + return nil + } + if err := proc.Signal(syscall.SIGTERM); err != nil { + return fmt.Errorf("failed to send SIGTERM to pid %d: %w", p.pid, err) + } + return nil +} + +// Waits for the node process to stop. +func (p *NodeProcess) WaitForStopped(ctx context.Context) error { + ticker := time.NewTicker(defaultNodeTickerInterval) + defer ticker.Stop() + for { + proc, err := p.getProcess() + if err != nil { + return fmt.Errorf("failed to retrieve process: %w", err) + } + if proc == nil { + return nil + } + + select { + case <-ctx.Done(): + return fmt.Errorf("failed to see node process stop %q before timeout: %w", p.node.ID, ctx.Err()) + case <-ticker.C: + } + } +} + +func (p *NodeProcess) IsHealthy(ctx context.Context) (bool, error) { + // Check that the node process is running as a precondition for + // checking health. getProcess will also ensure that the node's + // API URI is current. + proc, err := p.getProcess() + if err != nil { + return false, fmt.Errorf("failed to determine process status: %w", err) + } + if proc == nil { + return false, ErrNotRunning + } + + return checkNodeHealth(ctx, p.node.URI) +} + +func (p *NodeProcess) getProcessContextPath() string { + return filepath.Join(p.node.getDataDir(), config.DefaultProcessContextFilename) +} + +func (p *NodeProcess) waitForProcessContext(ctx context.Context) error { + ticker := time.NewTicker(defaultNodeTickerInterval) + defer ticker.Stop() + + ctx, cancel := context.WithTimeout(ctx, defaultNodeInitTimeout) + defer cancel() + for len(p.node.URI) == 0 { + err := p.readState() + if err != nil { + return fmt.Errorf("failed to read process context for node %q: %w", p.node.ID, err) + } + + select { + case <-ctx.Done(): + return fmt.Errorf("failed to load process context for node %q before timeout: %w", p.node.ID, ctx.Err()) + case <-ticker.C: + } + } + return nil +} + +// Retrieve the node process if it is running. As part of determining +// process liveness, the node's process context will be refreshed if +// live or cleared if not running. +func (p *NodeProcess) getProcess() (*os.Process, error) { + // Read the process context to ensure freshness. The node may have + // stopped or been restarted since last read. + if err := p.readState(); err != nil { + return nil, fmt.Errorf("failed to read process context: %w", err) + } + + if p.pid == 0 { + // Process is not running + return nil, nil + } + + proc, err := os.FindProcess(p.pid) + if err != nil { + return nil, fmt.Errorf("failed to find process: %w", err) + } + + // Sending 0 will not actually send a signal but will perform + // error checking. + err = proc.Signal(syscall.Signal(0)) + if err == nil { + // Process is running + return proc, nil + } + if errors.Is(err, os.ErrProcessDone) { + // Process is not running + return nil, nil + } + return nil, fmt.Errorf("failed to determine process status: %w", err) +} diff --git a/tests/fixture/tmpnet/utils.go b/tests/fixture/tmpnet/utils.go index 2e3bb82d4e1d..5fea866abe38 100644 --- a/tests/fixture/tmpnet/utils.go +++ b/tests/fixture/tmpnet/utils.go @@ -54,6 +54,10 @@ type NodeURI struct { func GetNodeURIs(nodes []*Node) []NodeURI { uris := make([]NodeURI, 0, len(nodes)) for _, node := range nodes { + if node.IsEphemeral { + // Avoid returning URIs for nodes whose lifespan is indeterminate + continue + } // Only append URIs that are not empty. A node may have an // empty URI if it is not currently running. if len(node.URI) > 0 { diff --git a/tests/upgrade/upgrade_test.go b/tests/upgrade/upgrade_test.go index 764fb6afa68d..f53c54c3d184 100644 --- a/tests/upgrade/upgrade_test.go +++ b/tests/upgrade/upgrade_test.go @@ -53,7 +53,7 @@ var _ = ginkgo.Describe("[Upgrade]", func() { ginkgo.By(fmt.Sprintf("restarting all nodes with %q binary", avalancheGoExecPathToUpgradeTo)) for _, node := range network.Nodes { ginkgo.By(fmt.Sprintf("restarting node %q with %q binary", node.ID, avalancheGoExecPathToUpgradeTo)) - require.NoError(node.Stop()) + require.NoError(node.Stop(e2e.DefaultContext())) // A node must start with sufficient bootstrap nodes to represent a quorum. Since the node's current // bootstrap configuration may not satisfy this requirement (i.e. if on network start the node was one of @@ -66,9 +66,10 @@ var _ = ginkgo.Describe("[Upgrade]", func() { require.NotEmpty(bootstrapIDs) node.Flags[config.BootstrapIDsKey] = strings.Join(bootstrapIDs, ",") node.Flags[config.BootstrapIPsKey] = strings.Join(bootstrapIPs, ",") - require.NoError(node.WriteConfig()) + node.RuntimeConfig.AvalancheGoPath = avalancheGoExecPath + require.NoError(node.Write()) - require.NoError(node.Start(ginkgo.GinkgoWriter, avalancheGoExecPath)) + require.NoError(node.Start(ginkgo.GinkgoWriter)) ginkgo.By(fmt.Sprintf("waiting for node %q to report healthy after restart", node.ID)) e2e.WaitForHealthy(node)