Skip to content

Commit

Permalink
go/control/status: Add fields for quick overview of node status
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Apr 14, 2022
1 parent 89ed839 commit 025fc6e
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go/consensus/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ type Vote struct {

// Status is the current status overview.
type Status struct { // nolint: maligned
// Status is an concise status of the consensus backend.
Status string `json:"string"`

// Version is the version of the consensus protocol that the node is using.
Version version.Version `json:"version"`
// Backend is the consensus backend identifier.
Expand Down
4 changes: 4 additions & 0 deletions go/consensus/tendermint/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ func (t *fullService) GetStatus(ctx context.Context) (*consensusAPI.Status, erro
status.ChainContext = t.genesis.ChainContext()
status.GenesisHeight = t.genesis.Height
if t.started() {
status.Status = "Ready"
// Only attempt to fetch blocks in case the consensus service has started as otherwise
// requests will block.
genBlk, err := t.GetBlock(ctx, t.genesis.Height)
Expand Down Expand Up @@ -845,6 +846,9 @@ func (t *fullService) GetStatus(ctx context.Context) (*consensusAPI.Status, erro
consensusAddr := []byte(crypto.PublicKeyToTendermint(&consensusPk).Address())
status.IsValidator = vals.HasAddress(consensusAddr)
}
} else {
// If node is not started it is still syncing.
status.Status = "Syncing"
}

return status, nil
Expand Down
1 change: 1 addition & 0 deletions go/consensus/tendermint/seed/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (srv *seedService) SupportedFeatures() consensus.FeatureMask {
// Implements Backend.
func (srv *seedService) GetStatus(ctx context.Context) (*consensus.Status, error) {
status := &consensus.Status{
Status: "Ready",
Version: version.ConsensusProtocol,
Backend: api.BackendName,
Features: srv.SupportedFeatures(),
Expand Down
3 changes: 3 additions & 0 deletions go/oasis-test-runner/scenario/e2e/early_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func (sc *earlyQueryImpl) Run(childEnv *env.Env) error {
if err != nil {
return fmt.Errorf("failed to get status for node: %w", err)
}
if status.Consensus.Status != "Syncing" {
return fmt.Errorf("node doesn't report as 'Syncing', got: %s", status.Consensus.Status)
}
if status.Consensus.LatestHeight != 0 {
return fmt.Errorf("node reports non-zero latest height before chain is initialized")
}
Expand Down
3 changes: 3 additions & 0 deletions go/oasis-test-runner/scenario/e2e/seed_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func (sc *seedAPI) Run(childEnv *env.Env) error { // nolint: gocyclo
if err != nil {
return fmt.Errorf("failed to get status for node: %w", err)
}
if status.Consensus.Status != "Ready" {
return fmt.Errorf("seed node consensus status should be 'Ready', got: %s", status.Consensus.Status)
}
if status.Consensus.LatestHeight != int64(0) {
return fmt.Errorf("seed node latest height should be 0, got: %d", status.Consensus.LatestHeight)
}
Expand Down
3 changes: 3 additions & 0 deletions go/worker/common/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (

// Status is the common runtime worker status.
type Status struct {
// Status is an concise status of the committee node.
Status string `json:"status"`

// ActiveVersion is the currently active version.
ActiveVersion *version.Version `json:"active_version"`

Expand Down
41 changes: 41 additions & 0 deletions go/worker/common/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -174,6 +175,14 @@ type Node struct {

hooks []NodeHooks

// Initialization steps.
consensusSynced uint32
runtimeRegistryDescriptor uint32
keymanagerAvailable uint32
hostedRuntimeProvisioned uint32
historyReindexingDone uint32
workersInitialized uint32

// Mutable and shared between nodes' workers.
// Guarded by .CrossNode.
CrossNode sync.Mutex
Expand All @@ -187,6 +196,28 @@ type Node struct {
logger *logging.Logger
}

func (n *Node) getInitStatus() string {
if atomic.LoadUint32(&n.consensusSynced) == 0 {
return "Waiting for consensus sync"
}
if atomic.LoadUint32(&n.runtimeRegistryDescriptor) == 0 {
return "Waiting for runtime registry descriptor"
}
if atomic.LoadUint32(&n.keymanagerAvailable) == 0 {
return "Waiting for available key manager"
}
if atomic.LoadUint32(&n.hostedRuntimeProvisioned) == 0 {
return "Waiting for hosted runtime provision"
}
if atomic.LoadUint32(&n.historyReindexingDone) == 0 {
return "Waiting for history reindex"
}
if atomic.LoadUint32(&n.workersInitialized) == 0 {
return "Waiting for workers to initialize"
}
return "Ready"
}

// Name returns the service name.
func (n *Node) Name() string {
return "committee node"
Expand Down Expand Up @@ -250,6 +281,8 @@ func (n *Node) GetStatus(ctx context.Context) (*api.Status, error) {
defer n.CrossNode.Unlock()

var status api.Status
status.Status = n.getInitStatus()

if n.CurrentBlock != nil {
status.LatestRound = n.CurrentBlock.Header.Round
status.LatestHeight = n.CurrentBlockHeight
Expand Down Expand Up @@ -539,6 +572,7 @@ func (n *Node) worker() {
case <-n.Consensus.Synced():
}
n.logger.Info("consensus has finished initial synchronization")
atomic.StoreUint32(&n.consensusSynced, 1)

// Wait for the runtime.
rt, err := n.Runtime.ActiveDescriptor(n.ctx)
Expand All @@ -548,6 +582,7 @@ func (n *Node) worker() {
)
return
}
atomic.StoreUint32(&n.runtimeRegistryDescriptor, 1)

n.CurrentEpoch, err = n.Consensus.Beacon().GetEpoch(n.ctx, consensus.HeightLatest)
if err != nil {
Expand Down Expand Up @@ -582,6 +617,7 @@ func (n *Node) worker() {

n.logger.Info("runtime has a key manager available")
}
atomic.StoreUint32(&n.keymanagerAvailable, 1)

// Start watching consensus blocks.
consensusBlocks, consensusBlocksSub, err := n.Consensus.WatchBlocks(n.ctx)
Expand Down Expand Up @@ -621,6 +657,7 @@ func (n *Node) worker() {
)
return
}
atomic.StoreUint32(&n.hostedRuntimeProvisioned, 1)

hrtEventCh, hrtSub, err := hrt.WatchEvents(n.ctx)
if err != nil {
Expand Down Expand Up @@ -649,7 +686,9 @@ func (n *Node) worker() {

// Perform initial hosted runtime version update to ensure we have something even in cases where
// initial block processing fails for any reason.
n.CrossNode.Lock()
n.updateHostedRuntimeVersionLocked()
n.CrossNode.Unlock()

initialized := false
for {
Expand All @@ -671,6 +710,7 @@ func (n *Node) worker() {
// history reindexing has been completed.
if !initialized {
n.logger.Debug("common worker is initialized")
atomic.StoreUint32(&n.historyReindexingDone, 1)

close(n.initCh)
initialized = true
Expand All @@ -686,6 +726,7 @@ func (n *Node) worker() {
}
}
n.logger.Debug("all child workers are initialized")
atomic.StoreUint32(&n.workersInitialized, 1)
}

// Received a block (annotated).
Expand Down

0 comments on commit 025fc6e

Please sign in to comment.