From 94ee61cdb27391ff9d9523812cfeb2912a567aa7 Mon Sep 17 00:00:00 2001 From: ptrus Date: Thu, 14 Apr 2022 15:06:51 +0200 Subject: [PATCH] go/control/status: Add fields for quick overview of node status --- .changelog/4669.feature.md | 1 + go/consensus/api/api.go | 51 +++++++++ go/consensus/tendermint/full/full.go | 10 ++ go/consensus/tendermint/seed/seed.go | 1 + .../scenario/e2e/consensus_state_sync.go | 6 ++ .../scenario/e2e/early_query.go | 3 + .../scenario/e2e/runtime/node_shutdown.go | 13 +++ go/oasis-test-runner/scenario/e2e/seed_api.go | 3 + go/worker/common/api/api.go | 100 ++++++++++++++++++ go/worker/common/committee/node.go | 48 +++++++++ 10 files changed, 236 insertions(+) create mode 100644 .changelog/4669.feature.md diff --git a/.changelog/4669.feature.md b/.changelog/4669.feature.md new file mode 100644 index 00000000000..3988cc59a1d --- /dev/null +++ b/.changelog/4669.feature.md @@ -0,0 +1 @@ +go/control/status: Add fields for quick overview of node status diff --git a/go/consensus/api/api.go b/go/consensus/api/api.go index b67594fb20c..149feece21b 100644 --- a/go/consensus/api/api.go +++ b/go/consensus/api/api.go @@ -4,6 +4,7 @@ package api import ( "context" + "fmt" "strings" "time" @@ -202,8 +203,58 @@ type Vote struct { VotingPower uint64 `json:"voting_power"` } +// StatusState is the concise status state of the consensus backend. +type StatusState uint8 + +var ( + // StatusStateReady is the ready status state. + StatusStateReady StatusState = 0 + // StatusStateSyncing is the syncing status state. + StatusStateSyncing StatusState = 1 +) + +// String returns a string representation of a status state. +func (s StatusState) String() string { + switch s { + case StatusStateReady: + return "ready" + case StatusStateSyncing: + return "syncing" + default: + return "[invalid status state]" + } +} + +// MarshalText encodes a StatusState into text form. +func (s StatusState) MarshalText() ([]byte, error) { + switch s { + case StatusStateReady: + return []byte(StatusStateReady.String()), nil + case StatusStateSyncing: + return []byte(StatusStateSyncing.String()), nil + default: + return nil, fmt.Errorf("invalid StatusState: %d", s) + } +} + +// UnmarshalText decodes a text slice into a StatusState. +func (s *StatusState) UnmarshalText(text []byte) error { + switch string(text) { + case StatusStateReady.String(): + *s = StatusStateReady + case StatusStateSyncing.String(): + *s = StatusStateSyncing + default: + return fmt.Errorf("invalid StatusState: %s", string(text)) + } + return nil +} + // Status is the current status overview. type Status struct { // nolint: maligned + // Status is an concise status of the consensus backend. + Status StatusState `json:"status"` + // Version is the version of the consensus protocol that the node is using. Version version.Version `json:"version"` // Backend is the consensus backend identifier. diff --git a/go/consensus/tendermint/full/full.go b/go/consensus/tendermint/full/full.go index 0aa109968a4..ad0d9bafc97 100644 --- a/go/consensus/tendermint/full/full.go +++ b/go/consensus/tendermint/full/full.go @@ -766,6 +766,7 @@ func (t *fullService) GetUnconfirmedTransactions(ctx context.Context) ([][]byte, func (t *fullService) GetStatus(ctx context.Context) (*consensusAPI.Status, error) { status := &consensusAPI.Status{ + Status: consensusAPI.StatusStateSyncing, Version: version.ConsensusProtocol, Backend: api.BackendName, Features: t.SupportedFeatures(), @@ -774,6 +775,15 @@ func (t *fullService) GetStatus(ctx context.Context) (*consensusAPI.Status, erro status.ChainContext = t.genesis.ChainContext() status.GenesisHeight = t.genesis.Height if t.started() { + // Check if node is synced. + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-t.Synced(): + status.Status = consensusAPI.StatusStateReady + default: + } + // Only attempt to fetch blocks in case the consensus service has started as otherwise // requests will block. genBlk, err := t.GetBlock(ctx, t.genesis.Height) diff --git a/go/consensus/tendermint/seed/seed.go b/go/consensus/tendermint/seed/seed.go index 529cbe7e679..2185025775e 100644 --- a/go/consensus/tendermint/seed/seed.go +++ b/go/consensus/tendermint/seed/seed.go @@ -131,6 +131,7 @@ func (srv *seedService) SupportedFeatures() consensus.FeatureMask { // Implements Backend. func (srv *seedService) GetStatus(ctx context.Context) (*consensus.Status, error) { status := &consensus.Status{ + Status: consensus.StatusStateReady, Version: version.ConsensusProtocol, Backend: api.BackendName, Features: srv.SupportedFeatures(), diff --git a/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go b/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go index 1ae7fe67e56..4e8982e84e9 100644 --- a/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go +++ b/go/oasis-test-runner/scenario/e2e/consensus_state_sync.go @@ -112,6 +112,9 @@ func (sc *consensusStateSyncImpl) Run(childEnv *env.Env) error { if err != nil { return fmt.Errorf("failed to get status for validator %s: %w", v.Name, err) } + if status.Consensus.Status != consensus.StatusStateReady { + return fmt.Errorf("validator %s not ready", v.Name) + } if status.Registration.Descriptor == nil { return fmt.Errorf("validator %s has not registered", v.Name) @@ -160,6 +163,9 @@ func (sc *consensusStateSyncImpl) Run(childEnv *env.Env) error { if err != nil { return fmt.Errorf("failed to fetch validator status: %w", err) } + if status.Consensus.Status != consensus.StatusStateReady { + return fmt.Errorf("synced validator not ready") + } // Make sure that the last retained height has been set correctly. if lrh := status.Consensus.LastRetainedHeight; lrh < 20 { diff --git a/go/oasis-test-runner/scenario/e2e/early_query.go b/go/oasis-test-runner/scenario/e2e/early_query.go index d7dde3b4c12..2683f216f56 100644 --- a/go/oasis-test-runner/scenario/e2e/early_query.go +++ b/go/oasis-test-runner/scenario/e2e/early_query.go @@ -95,6 +95,9 @@ func (sc *earlyQueryImpl) Run(childEnv *env.Env) error { if err != nil { return fmt.Errorf("failed to get status for node: %w", err) } + if status.Consensus.Status != consensus.StatusStateSyncing { + return fmt.Errorf("node reports as ready before chain is initialized") + } if status.Consensus.LatestHeight != 0 { return fmt.Errorf("node reports non-zero latest height before chain is initialized") } diff --git a/go/oasis-test-runner/scenario/e2e/runtime/node_shutdown.go b/go/oasis-test-runner/scenario/e2e/runtime/node_shutdown.go index 6a65a73d8b6..dd7449286d8 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/node_shutdown.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/node_shutdown.go @@ -7,6 +7,7 @@ import ( tlsCert "github.com/oasisprotocol/oasis-core/go/common/crypto/tls" "github.com/oasisprotocol/oasis-core/go/common/pubsub" + "github.com/oasisprotocol/oasis-core/go/worker/common/api" "github.com/oasisprotocol/oasis-core/go/common/identity" consensusAPI "github.com/oasisprotocol/oasis-core/go/consensus/api" @@ -77,6 +78,15 @@ func (sc *nodeShutdownImpl) Run(childEnv *env.Env) error { if err != nil { return fmt.Errorf("failed to get status for node: %w", err) } + if status.Consensus.Status != consensusAPI.StatusStateReady { + return fmt.Errorf("node consensus status should be '%s', got: '%s'", consensusAPI.StatusStateReady, status.Consensus.Status) + } + if status.Runtimes[runtimeID].Committee == nil { + return fmt.Errorf("node committee status missing") + } + if st := status.Runtimes[runtimeID].Committee.Status; st != api.StatusStateReady { + return fmt.Errorf("node comute worker status should be '%s', got: '%s'", api.StatusStateReady, st) + } if status.Registration.Descriptor == nil { return fmt.Errorf("node has not registered") } @@ -190,6 +200,9 @@ func (sc *nodeShutdownImpl) Run(childEnv *env.Env) error { if err != nil { return err } + if status.Consensus.Status != consensusAPI.StatusStateReady { + return fmt.Errorf("node consensus status should be '%s', got: '%s'", consensusAPI.StatusStateReady, status.Consensus.Status) + } if status.Registration.NodeStatus != nil { return fmt.Errorf("node should not be registered") } diff --git a/go/oasis-test-runner/scenario/e2e/seed_api.go b/go/oasis-test-runner/scenario/e2e/seed_api.go index 8ee956962fc..b255feba770 100644 --- a/go/oasis-test-runner/scenario/e2e/seed_api.go +++ b/go/oasis-test-runner/scenario/e2e/seed_api.go @@ -82,6 +82,9 @@ func (sc *seedAPI) Run(childEnv *env.Env) error { // nolint: gocyclo if err != nil { return fmt.Errorf("failed to get status for node: %w", err) } + if status.Consensus.Status != consensusAPI.StatusStateReady { + return fmt.Errorf("seed node consensus status should be '%s', got: '%s'", consensusAPI.StatusStateReady, status.Consensus.Status) + } if status.Consensus.LatestHeight != int64(0) { return fmt.Errorf("seed node latest height should be 0, got: %d", status.Consensus.LatestHeight) } diff --git a/go/worker/common/api/api.go b/go/worker/common/api/api.go index 7979e294382..89bfa0294f3 100644 --- a/go/worker/common/api/api.go +++ b/go/worker/common/api/api.go @@ -1,12 +1,112 @@ package api import ( + "fmt" + "github.com/oasisprotocol/oasis-core/go/common/version" scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" ) +// StatusState is the concise status state of the common runtime worker. +type StatusState uint8 + +const ( + // StatusStateReady is the ready status state. + StatusStateReady StatusState = 0 + // StatusStateWaitingConsensusSync is the waiting for consensus sync status state. + StatusStateWaitingConsensusSync StatusState = 1 + // StatusStateWaitingRuntimeRegistry is the waiting for runtime registry descriptor status state. + StatusStateWaitingRuntimeRegistry StatusState = 2 + // StatusStateWaitingKeymanager is the waiting for keymanager status state. + StatusStateWaitingKeymanager StatusState = 3 + // StatusStateWaitingHostedRuntime is the waiting for the hosted runtime status state. + StatusStateWaitingHostedRuntime StatusState = 4 + // StatusStateWaitingHistoryReindex is the waiting for runtime history reindex status state. + StatusStateWaitingHistoryReindex StatusState = 5 + // StatusStateWaitingWorkersInit is the waiting for workers to initialize status state. + StatusStateWaitingWorkersInit StatusState = 6 + // StatusStateRuntimeSuspended is the runtime suspended status state. + StatusStateRuntimeSuspended StatusState = 7 +) + +// String returns a string representation of a status state. +func (s StatusState) String() string { + switch s { + case StatusStateReady: + return "ready" + case StatusStateWaitingConsensusSync: + return "waiting for consensus sync" + case StatusStateWaitingRuntimeRegistry: + return "waiting for runtime registry descriptor" + case StatusStateWaitingKeymanager: + return "waiting for available keymanager" + case StatusStateWaitingHostedRuntime: + return "waiting for hosted runtime provision" + case StatusStateWaitingHistoryReindex: + return "waiting for history reindex" + case StatusStateWaitingWorkersInit: + return "waiting for workers to initialize" + case StatusStateRuntimeSuspended: + return "runtime suspended" + default: + return "[invalid status state]" + } +} + +// MarshalText encodes a StatusState into text form. +func (s StatusState) MarshalText() ([]byte, error) { + switch s { + case StatusStateReady: + return []byte(StatusStateReady.String()), nil + case StatusStateWaitingConsensusSync: + return []byte(StatusStateWaitingConsensusSync.String()), nil + case StatusStateWaitingRuntimeRegistry: + return []byte(StatusStateWaitingRuntimeRegistry.String()), nil + case StatusStateWaitingKeymanager: + return []byte(StatusStateWaitingKeymanager.String()), nil + case StatusStateWaitingHostedRuntime: + return []byte(StatusStateWaitingHostedRuntime.String()), nil + case StatusStateWaitingHistoryReindex: + return []byte(StatusStateWaitingHistoryReindex.String()), nil + case StatusStateWaitingWorkersInit: + return []byte(StatusStateWaitingWorkersInit.String()), nil + case StatusStateRuntimeSuspended: + return []byte(StatusStateRuntimeSuspended.String()), nil + default: + return nil, fmt.Errorf("invalid StatusState: %d", s) + } +} + +// UnmarshalText decodes a text slice into a StatusState. +func (s *StatusState) UnmarshalText(text []byte) error { + switch string(text) { + case StatusStateReady.String(): + *s = StatusStateReady + case StatusStateWaitingConsensusSync.String(): + *s = StatusStateWaitingConsensusSync + case StatusStateWaitingRuntimeRegistry.String(): + *s = StatusStateWaitingRuntimeRegistry + case StatusStateWaitingKeymanager.String(): + *s = StatusStateWaitingKeymanager + case StatusStateWaitingHostedRuntime.String(): + *s = StatusStateWaitingHostedRuntime + case StatusStateWaitingHistoryReindex.String(): + *s = StatusStateWaitingHistoryReindex + case StatusStateWaitingWorkersInit.String(): + *s = StatusStateWaitingWorkersInit + case StatusStateRuntimeSuspended.String(): + *s = StatusStateRuntimeSuspended + default: + return fmt.Errorf("invalid StatusState: %s", string(text)) + } + return nil +} + // Status is the common runtime worker status. type Status struct { + // Status is an concise status of the committee node. + Status StatusState `json:"status"` + // ActiveVersion is the currently active version. ActiveVersion *version.Version `json:"active_version"` diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index 75020193622..db455efde47 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -174,6 +175,14 @@ type Node struct { hooks []NodeHooks + // Status states. + consensusSynced uint32 + runtimeRegistryDescriptor uint32 + keymanagerAvailable uint32 + hostedRuntimeProvisioned uint32 + historyReindexingDone uint32 + workersInitialized uint32 + // Mutable and shared between nodes' workers. // Guarded by .CrossNode. CrossNode sync.Mutex @@ -187,6 +196,33 @@ type Node struct { logger *logging.Logger } +func (n *Node) getStatusStateLocked() api.StatusState { + if atomic.LoadUint32(&n.consensusSynced) == 0 { + return api.StatusStateWaitingConsensusSync + } + if atomic.LoadUint32(&n.runtimeRegistryDescriptor) == 0 { + return api.StatusStateWaitingRuntimeRegistry + } + if atomic.LoadUint32(&n.keymanagerAvailable) == 0 { + return api.StatusStateWaitingKeymanager + } + if atomic.LoadUint32(&n.hostedRuntimeProvisioned) == 0 { + return api.StatusStateWaitingHostedRuntime + } + if atomic.LoadUint32(&n.historyReindexingDone) == 0 { + return api.StatusStateWaitingHistoryReindex + } + if atomic.LoadUint32(&n.workersInitialized) == 0 { + return api.StatusStateWaitingWorkersInit + } + // If resumeCh exists the runtime is suspended (safe to check since the cross node lock should be held). + if n.resumeCh != nil { + return api.StatusStateRuntimeSuspended + } + + return api.StatusStateReady +} + // Name returns the service name. func (n *Node) Name() string { return "committee node" @@ -250,6 +286,8 @@ func (n *Node) GetStatus(ctx context.Context) (*api.Status, error) { defer n.CrossNode.Unlock() var status api.Status + status.Status = n.getStatusStateLocked() + if n.CurrentBlock != nil { status.LatestRound = n.CurrentBlock.Header.Round status.LatestHeight = n.CurrentBlockHeight @@ -520,6 +558,9 @@ func (n *Node) handleNewEventLocked(ev *roothash.Event) { } func (n *Node) handleRuntimeHostEvent(ev *host.Event) { + if ev.Started != nil { + atomic.StoreUint32(&n.hostedRuntimeProvisioned, 1) + } for _, hooks := range n.hooks { hooks.HandleRuntimeHostEvent(ev) } @@ -539,6 +580,7 @@ func (n *Node) worker() { case <-n.Consensus.Synced(): } n.logger.Info("consensus has finished initial synchronization") + atomic.StoreUint32(&n.consensusSynced, 1) // Wait for the runtime. rt, err := n.Runtime.ActiveDescriptor(n.ctx) @@ -548,6 +590,7 @@ func (n *Node) worker() { ) return } + atomic.StoreUint32(&n.runtimeRegistryDescriptor, 1) n.CurrentEpoch, err = n.Consensus.Beacon().GetEpoch(n.ctx, consensus.HeightLatest) if err != nil { @@ -582,6 +625,7 @@ func (n *Node) worker() { n.logger.Info("runtime has a key manager available") } + atomic.StoreUint32(&n.keymanagerAvailable, 1) // Start watching consensus blocks. consensusBlocks, consensusBlocksSub, err := n.Consensus.WatchBlocks(n.ctx) @@ -649,7 +693,9 @@ func (n *Node) worker() { // Perform initial hosted runtime version update to ensure we have something even in cases where // initial block processing fails for any reason. + n.CrossNode.Lock() n.updateHostedRuntimeVersionLocked() + n.CrossNode.Unlock() initialized := false for { @@ -671,6 +717,7 @@ func (n *Node) worker() { // history reindexing has been completed. if !initialized { n.logger.Debug("common worker is initialized") + atomic.StoreUint32(&n.historyReindexingDone, 1) close(n.initCh) initialized = true @@ -686,6 +733,7 @@ func (n *Node) worker() { } } n.logger.Debug("all child workers are initialized") + atomic.StoreUint32(&n.workersInitialized, 1) } // Received a block (annotated).