From e48e3123ffa111d1d903b0ade711f52ea0e1b06e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Ber=C4=8Di=C4=8D?= Date: Thu, 12 Oct 2023 04:12:17 +0200 Subject: [PATCH 1/2] go: Add database syncing to consensus statuses --- go/consensus/api/api.go | 12 ++++++++---- go/consensus/cometbft/full/common.go | 13 +++++++++++-- go/consensus/cometbft/full/full.go | 6 +++++- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/go/consensus/api/api.go b/go/consensus/api/api.go index c1164d2efad..530c5860ad3 100644 --- a/go/consensus/api/api.go +++ b/go/consensus/api/api.go @@ -254,6 +254,8 @@ var ( StatusStateReady StatusState // StatusStateSyncing is the syncing status state. StatusStateSyncing StatusState = 1 + // StatusStateDBLoading is the status state when the database is loading. + StatusStateDBLoading StatusState = 2 ) // String returns a string representation of a status state. @@ -263,6 +265,8 @@ func (s StatusState) String() string { return "ready" case StatusStateSyncing: return "syncing" + case StatusStateDBLoading: + return "loading database" default: return "[invalid status state]" } @@ -271,10 +275,8 @@ func (s StatusState) String() string { // MarshalText encodes a StatusState into text form. func (s StatusState) MarshalText() ([]byte, error) { switch s { - case StatusStateReady: - return []byte(StatusStateReady.String()), nil - case StatusStateSyncing: - return []byte(StatusStateSyncing.String()), nil + case StatusStateReady, StatusStateSyncing, StatusStateDBLoading: + return []byte(s.String()), nil default: return nil, fmt.Errorf("invalid StatusState: %d", s) } @@ -287,6 +289,8 @@ func (s *StatusState) UnmarshalText(text []byte) error { *s = StatusStateReady case StatusStateSyncing.String(): *s = StatusStateSyncing + case StatusStateDBLoading.String(): + *s = StatusStateDBLoading default: return fmt.Errorf("invalid StatusState: %s", string(text)) } diff --git a/go/consensus/cometbft/full/common.go b/go/consensus/cometbft/full/common.go index 6b5fea20535..c2aef5de01a 100644 --- a/go/consensus/cometbft/full/common.go +++ b/go/consensus/cometbft/full/common.go @@ -103,14 +103,19 @@ type commonNode struct { const ( stateNotReady = 0 stateInitialized = 1 - stateStarted = 2 - stateStopping = 3 + stateDBLoaded = 2 + stateStarted = 3 + stateStopping = 4 ) func (n *commonNode) initialized() bool { return atomic.LoadUint32(&n.state) >= stateInitialized } +func (n *commonNode) dbLoaded() bool { + return atomic.LoadUint32(&n.state) >= stateDBLoaded +} + func (n *commonNode) started() bool { return atomic.LoadUint32(&n.state) >= stateStarted } @@ -148,6 +153,10 @@ func (n *commonNode) start() error { return n.mux.Start() } +func (n *commonNode) finishDBLoading() { + atomic.StoreUint32(&n.state, stateDBLoaded) +} + func (n *commonNode) finishStart() { atomic.StoreUint32(&n.state, stateStarted) close(n.startedCh) diff --git a/go/consensus/cometbft/full/full.go b/go/consensus/cometbft/full/full.go index 69efb59a3c3..dc1d00fd170 100644 --- a/go/consensus/cometbft/full/full.go +++ b/go/consensus/cometbft/full/full.go @@ -389,7 +389,10 @@ func (t *fullService) GetStatus(ctx context.Context) (*consensusAPI.Status, erro if err != nil { return nil, err } - status.Status = consensusAPI.StatusStateSyncing + status.Status = consensusAPI.StatusStateDBLoading + if t.dbLoaded() { + status.Status = consensusAPI.StatusStateSyncing + } status.P2P = &consensusAPI.P2PStatus{} status.P2P.PubKey = t.identity.P2PSigner.Public() @@ -679,6 +682,7 @@ func (t *fullService) lazyInit() error { // nolint: gocyclo default: } + t.finishDBLoading() return db, nil } From 1e986d6ae8566f9124ab0d677a4ff9f46ae5b12a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Ber=C4=8Di=C4=8D?= Date: Tue, 17 Oct 2023 17:31:10 +0200 Subject: [PATCH 2/2] tmp --- go/runtime/registry/registry.go | 21 ++++++++++++ go/worker/storage/api/api.go | 1 + go/worker/storage/committee/node.go | 50 ++++++++++++++++++++++++----- go/worker/storage/worker.go | 26 +++++++++------ 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/go/runtime/registry/registry.go b/go/runtime/registry/registry.go index 91b7115aa9c..8e3ce0c744f 100644 --- a/go/runtime/registry/registry.go +++ b/go/runtime/registry/registry.go @@ -88,6 +88,9 @@ type Runtime interface { // RegisterStorage sets the given local storage backend for the runtime. RegisterStorage(storage storageAPI.Backend) + // StorageInitFailed reports that storage initialization for the runtime failed. + StorageInitFailed(err error) + // AddRoles adds available node roles to the runtime. AddRoles(roles node.RolesMask) @@ -129,6 +132,7 @@ type runtime struct { // nolint: maligned consensus consensus.Backend storage storageAPI.Backend + storageErrCh chan error localStorage localstorage.LocalStorage history history.History @@ -204,9 +208,21 @@ func (r *runtime) RegisterStorage(storage storageAPI.Backend) { if r.storage != nil { panic("runtime storage backend already assigned") } + close(r.storageErrCh) r.storage = storage } +func (r *runtime) StorageInitFailed(err error) { + r.Lock() + defer r.Unlock() + + if r.storage != nil { + panic("runtime storage backend set already but reported init error") + } + r.storageErrCh <- err + close(r.storageErrCh) +} + func (r *runtime) AddRoles(roles node.RolesMask) { r.Lock() defer r.Unlock() @@ -394,6 +410,10 @@ func (r *runtime) watchUpdates(ctx context.Context) { } func (r *runtime) finishInitialization() error { + if err, ok := <-r.storageErrCh; ok && err != nil { + return err + } + r.Lock() defer r.Unlock() @@ -555,6 +575,7 @@ func newRuntime( id: id, dataDir: rtDataDir, consensus: consensus, + storageErrCh: make(chan error, 1), localStorage: localStorage, cancelCtx: cancel, registryDescriptorCh: make(chan struct{}), diff --git a/go/worker/storage/api/api.go b/go/worker/storage/api/api.go index a01791b9efc..f14519a94da 100644 --- a/go/worker/storage/api/api.go +++ b/go/worker/storage/api/api.go @@ -15,6 +15,7 @@ const ModuleName = "worker/storage" type StorageWorkerStatus string const ( + StatusDBLoading StorageWorkerStatus = "loading database" StatusInitializing StorageWorkerStatus = "initializing" StatusStarting StorageWorkerStatus = "starting" StatusStopping StorageWorkerStatus = "stopping" diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 974fc4de34e..f5a1415d6ea 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -148,7 +148,8 @@ type Node struct { // nolint: maligned quitCh chan struct{} workerQuitCh chan struct{} - initCh chan struct{} + storageInitCh chan error + initCh chan struct{} } func NewNode( @@ -157,7 +158,7 @@ func NewNode( roleProvider registration.RoleProvider, rpcRoleProvider registration.RoleProvider, workerCommonCfg workerCommon.Config, - localStorage storageApi.LocalBackend, + storageCtor func(context.Context) (storageApi.LocalBackend, error), checkpointerCfg *checkpoint.CheckpointerConfig, checkpointSyncCfg *CheckpointSyncConfig, ) (*Node, error) { @@ -173,13 +174,11 @@ func NewNode( workerCommonCfg: workerCommonCfg, - localStorage: localStorage, - fetchPool: fetchPool, checkpointSyncCfg: checkpointSyncCfg, - status: api.StatusInitializing, + status: api.StatusDBLoading, blockCh: channels.NewInfiniteChannel(), diffCh: make(chan *fetchedDiff), @@ -187,7 +186,9 @@ func NewNode( quitCh: make(chan struct{}), workerQuitCh: make(chan struct{}), - initCh: make(chan struct{}), + + storageInitCh: make(chan error, 1), + initCh: make(chan struct{}), } // Validate checkpoint sync configuration. @@ -200,6 +201,36 @@ func NewNode( n.ctx, n.ctxCancel = context.WithCancel(context.Background()) + go func() { + defer close(n.storageInitCh) + localStorage, err := storageCtor(n.ctx) + if err != nil { + err = fmt.Errorf("error creating storage worker local backend: %w", err) + n.storageInitCh <- err + commonNode.Runtime.StorageInitFailed(err) + return + } + n.storageInitCh <- n.newBottomHalf(commonNode, rpcRoleProvider, localStorage, checkpointerCfg) + }() + + return n, nil +} + +func (n *Node) newBottomHalf( + commonNode *committee.Node, + rpcRoleProvider registration.RoleProvider, + localStorage storageApi.LocalBackend, + checkpointerCfg *checkpoint.CheckpointerConfig, +) error { + func() { + n.statusLock.Lock() + defer n.statusLock.Unlock() + + n.status = api.StatusInitializing + }() + n.localStorage = localStorage + commonNode.Runtime.RegisterStorage(localStorage) + // Create a new checkpointer if enabled. if checkpointerCfg != nil { checkpointerCfg = &checkpoint.CheckpointerConfig{ @@ -245,7 +276,7 @@ func NewNode( *checkpointerCfg, ) if err != nil { - return nil, fmt.Errorf("failed to create checkpointer: %w", err) + return fmt.Errorf("failed to create checkpointer: %w", err) } } @@ -264,7 +295,7 @@ func NewNode( commonNode.P2P.RegisterProtocolServer(storagePub.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) } - return n, nil + return nil } // Service interface. @@ -276,6 +307,9 @@ func (n *Node) Name() string { // Start causes the worker to start responding to CometBFT new block events. func (n *Node) Start() error { + if err := <-n.storageInitCh; err != nil { + return err + } go n.watchQuit() go n.worker() if n.checkpointer != nil { diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index cdd13d0f9d8..e89ebb83033 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -1,6 +1,7 @@ package storage import ( + "context" "fmt" "github.com/oasisprotocol/oasis-core/go/common" @@ -9,6 +10,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/workerpool" "github.com/oasisprotocol/oasis-core/go/config" + storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" committeeCommon "github.com/oasisprotocol/oasis-core/go/worker/common/committee" @@ -95,9 +97,8 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC } } - localStorage, err := NewLocalBackend(commonNode.Runtime.DataDir(), id) - if err != nil { - return fmt.Errorf("can't create local storage backend: %w", err) + storageCtor := func(ctx context.Context) (storageApi.LocalBackend, error) { + return NewLocalBackend(commonNode.Runtime.DataDir(), id) } node, err := committee.NewNode( @@ -106,7 +107,7 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC rp, rpRPC, w.commonWorker.GetConfig(), - localStorage, + storageCtor, checkpointerCfg, &committee.CheckpointSyncConfig{ Disabled: config.GlobalConfig.Storage.CheckpointSyncDisabled, @@ -116,7 +117,6 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerC if err != nil { return err } - commonNode.Runtime.RegisterStorage(localStorage) commonNode.AddHooks(node) w.runtimes[id] = node @@ -167,13 +167,21 @@ func (w *Worker) Start() error { }() // Start all runtimes and wait for initialization. - go func() { - w.logger.Info("starting storage sync services", "num_runtimes", len(w.runtimes)) + var err error - for _, r := range w.runtimes { - _ = r.Start() + w.logger.Info("starting storage sync services", "num_runtimes", len(w.runtimes)) + for id, r := range w.runtimes { + if err = r.Start(); err != nil { + w.logger.Error("committee node did not start successfully", "err", err, "runtime", id) } + defer func(r *committee.Node) { + if err != nil { + r.Stop() + } + }(r) + } + go func() { // Wait for runtimes to be initialized and the node to be registered. for _, r := range w.runtimes { <-r.Initialized()