From 4c1a9df2cf54bac306f97fef1817abe3f580d5b8 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 24 Jun 2021 20:41:08 +0200 Subject: [PATCH 1/7] go/consensus: Expose consensus state checkpointer --- .changelog/4080.internal.1.md | 1 + go/consensus/api/api.go | 6 ++++++ go/consensus/tendermint/abci/state.go | 4 ++++ go/consensus/tendermint/api/state.go | 10 ++++++++++ go/consensus/tendermint/full/full.go | 5 +++++ go/consensus/tendermint/seed/seed.go | 6 ++++++ go/oasis-node/cmd/debug/dumpdb/dumpdb.go | 5 +++++ 7 files changed, 37 insertions(+) create mode 100644 .changelog/4080.internal.1.md diff --git a/.changelog/4080.internal.1.md b/.changelog/4080.internal.1.md new file mode 100644 index 00000000000..f843854fdb5 --- /dev/null +++ b/.changelog/4080.internal.1.md @@ -0,0 +1 @@ +go/consensus: Expose consensus state checkpointer diff --git a/go/consensus/api/api.go b/go/consensus/api/api.go index fc50298bec9..61024572910 100644 --- a/go/consensus/api/api.go +++ b/go/consensus/api/api.go @@ -24,6 +24,7 @@ import ( roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" staking "github.com/oasisprotocol/oasis-core/go/staking/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" mkvsNode "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node" ) @@ -228,6 +229,11 @@ type Backend interface { // GetAddresses returns the consensus backend addresses. GetAddresses() ([]node.ConsensusAddress, error) + + // Checkpointer returns the checkpointer associated with consensus state. + // + // This may be nil in case checkpoints are disabled. + Checkpointer() checkpoint.Checkpointer } // HaltHook is a function that gets called when consensus needs to halt for some reason. diff --git a/go/consensus/tendermint/abci/state.go b/go/consensus/tendermint/abci/state.go index 0374f5bb4b2..cf2cd8ef9f7 100644 --- a/go/consensus/tendermint/abci/state.go +++ b/go/consensus/tendermint/abci/state.go @@ -120,6 +120,10 @@ func (s *applicationState) Storage() storage.LocalBackend { return s.storage } +func (s *applicationState) Checkpointer() checkpoint.Checkpointer { + return s.checkpointer +} + func (s *applicationState) InitialHeight() int64 { return int64(s.initialHeight) } diff --git a/go/consensus/tendermint/api/state.go b/go/consensus/tendermint/api/state.go index b5ec64d5eff..61f78aa187e 100644 --- a/go/consensus/tendermint/api/state.go +++ b/go/consensus/tendermint/api/state.go @@ -17,6 +17,7 @@ import ( staking "github.com/oasisprotocol/oasis-core/go/staking/api" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api" ) @@ -77,6 +78,11 @@ type ApplicationQueryState interface { // Storage returns the storage backend. Storage() storage.LocalBackend + // Checkpointer returns the checkpointer associated with the application state. + // + // This may be nil in case checkpoints are disabled. + Checkpointer() checkpoint.Checkpointer + // BlockHeight returns the last committed block height. BlockHeight() int64 @@ -125,6 +131,10 @@ func (ms *mockApplicationState) Storage() storage.LocalBackend { panic("not implemented") } +func (ms *mockApplicationState) Checkpointer() checkpoint.Checkpointer { + return nil +} + func (ms *mockApplicationState) InitialHeight() int64 { return ms.cfg.Genesis.Height } diff --git a/go/consensus/tendermint/full/full.go b/go/consensus/tendermint/full/full.go index f739e01a6f9..f1e88f29083 100644 --- a/go/consensus/tendermint/full/full.go +++ b/go/consensus/tendermint/full/full.go @@ -72,6 +72,7 @@ import ( roothashAPI "github.com/oasisprotocol/oasis-core/go/roothash/api" schedulerAPI "github.com/oasisprotocol/oasis-core/go/scheduler/api" stakingAPI "github.com/oasisprotocol/oasis-core/go/staking/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" upgradeAPI "github.com/oasisprotocol/oasis-core/go/upgrade/api" ) @@ -318,6 +319,10 @@ func (t *fullService) GetAddresses() ([]node.ConsensusAddress, error) { return []node.ConsensusAddress{addr}, nil } +func (t *fullService) Checkpointer() checkpoint.Checkpointer { + return t.mux.State().Checkpointer() +} + func (t *fullService) StateToGenesis(ctx context.Context, blockHeight int64) (*genesisAPI.Document, error) { blk, err := t.GetTendermintBlock(ctx, blockHeight) if err != nil { diff --git a/go/consensus/tendermint/seed/seed.go b/go/consensus/tendermint/seed/seed.go index f1e268a0beb..2b5be9308c2 100644 --- a/go/consensus/tendermint/seed/seed.go +++ b/go/consensus/tendermint/seed/seed.go @@ -36,6 +36,7 @@ import ( roothash "github.com/oasisprotocol/oasis-core/go/roothash/api" scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" staking "github.com/oasisprotocol/oasis-core/go/staking/api" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer" ) @@ -173,6 +174,11 @@ func (srv *seedService) GetAddresses() ([]node.ConsensusAddress, error) { return []node.ConsensusAddress{addr}, nil } +// Implements Backend. +func (srv *seedService) Checkpointer() checkpoint.Checkpointer { + return nil +} + // Implements Backend. func (srv *seedService) SubmitEvidence(ctx context.Context, evidence *consensus.Evidence) error { return consensus.ErrUnsupported diff --git a/go/oasis-node/cmd/debug/dumpdb/dumpdb.go b/go/oasis-node/cmd/debug/dumpdb/dumpdb.go index 15151516f0e..4bf542a9045 100644 --- a/go/oasis-node/cmd/debug/dumpdb/dumpdb.go +++ b/go/oasis-node/cmd/debug/dumpdb/dumpdb.go @@ -39,6 +39,7 @@ import ( staking "github.com/oasisprotocol/oasis-core/go/staking/api" storage "github.com/oasisprotocol/oasis-core/go/storage/api" storageDB "github.com/oasisprotocol/oasis-core/go/storage/database" + "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" ) const ( @@ -385,6 +386,10 @@ func (qs *dumpQueryState) Storage() storage.LocalBackend { return qs.ldb } +func (qs *dumpQueryState) Checkpointer() checkpoint.Checkpointer { + return nil +} + func (qs *dumpQueryState) BlockHeight() int64 { return qs.height } From 26a0daa1d7a04b9d786ef300a0ec1bc9547cf0eb Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 24 Jun 2021 20:54:54 +0200 Subject: [PATCH 2/7] go/storage/mkvs/checkpoint: Add ForceCheckpoint and WatchCheckpoints --- .changelog/4080.internal.2.md | 1 + go/storage/mkvs/checkpoint/checkpointer.go | 99 +++++++++++++++---- .../mkvs/checkpoint/checkpointer_test.go | 47 ++++++++- 3 files changed, 125 insertions(+), 22 deletions(-) create mode 100644 .changelog/4080.internal.2.md diff --git a/.changelog/4080.internal.2.md b/.changelog/4080.internal.2.md new file mode 100644 index 00000000000..ffa262e255b --- /dev/null +++ b/.changelog/4080.internal.2.md @@ -0,0 +1 @@ +go/storage/mkvs/checkpoint: Add ForceCheckpoint and WatchCheckpoints diff --git a/go/storage/mkvs/checkpoint/checkpointer.go b/go/storage/mkvs/checkpoint/checkpointer.go index 1eab5f94bec..24d9e50eee4 100644 --- a/go/storage/mkvs/checkpoint/checkpointer.go +++ b/go/storage/mkvs/checkpoint/checkpointer.go @@ -10,6 +10,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/common/pubsub" db "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node" ) @@ -61,6 +62,17 @@ type Checkpointer interface { // NotifyNewVersion notifies the checkpointer that a new version has been finalized. NotifyNewVersion(version uint64) + // ForceCheckpoint makes the checkpointer create a checkpoint of the given version even if it is + // outside the regular checkpoint schedule. In case the checkpoint at that version already + // exists, this will be a no-op. + // + // The checkpoint will be created asynchronously. + ForceCheckpoint(version uint64) + + // WatchCheckpoints returns a channel that produces a stream of checkpointed versions. The + // versions are emitted before the checkpointing process starts. + WatchCheckpoints() (<-chan uint64, pubsub.ClosableSubscription, error) + // Flush makes the checkpointer immediately process any notifications. Flush() @@ -73,19 +85,42 @@ type Checkpointer interface { type checkpointer struct { cfg CheckpointerConfig - ndb db.NodeDB - creator Creator - notifyCh *channels.RingChannel - flushCh *channels.RingChannel - statusCh chan struct{} - pausedCh chan bool + ndb db.NodeDB + creator Creator + notifyCh *channels.RingChannel + flushCh *channels.RingChannel + statusCh chan struct{} + pausedCh chan bool + cpNotifier *pubsub.Broker logger *logging.Logger } +type notifyNewVersion struct { + version uint64 +} + +type notifyForceCheckpoint struct { + version uint64 +} + // Implements Checkpointer. func (c *checkpointer) NotifyNewVersion(version uint64) { - c.notifyCh.In() <- version + c.notifyCh.In() <- notifyNewVersion{version} +} + +// Implements Checkpointer. +func (c *checkpointer) ForceCheckpoint(version uint64) { + c.notifyCh.In() <- notifyForceCheckpoint{version} +} + +// Implements Checkpointer. +func (c *checkpointer) WatchCheckpoints() (<-chan uint64, pubsub.ClosableSubscription, error) { + typedCh := make(chan uint64) + sub := c.cpNotifier.Subscribe() + sub.Unwrap(typedCh) + + return typedCh, sub, nil } // Implements Checkpointer. @@ -98,6 +133,9 @@ func (c *checkpointer) Pause(pause bool) { } func (c *checkpointer) checkpoint(ctx context.Context, version uint64, params *CreationParameters) (err error) { + // Notify watchers about the checkpoint we are about to make. + c.cpNotifier.Broadcast(version) + var roots []node.Root if c.cfg.GetRoots == nil { roots, err = c.ndb.GetRootsForVersion(ctx, version) @@ -258,15 +296,26 @@ func (c *checkpointer) worker(ctx context.Context) { continue } - var version uint64 + var ( + version uint64 + force bool + ) select { case <-ctx.Done(): return - case v := <-c.notifyCh.Out(): - version = v.(uint64) + case n := <-c.notifyCh.Out(): + switch nf := n.(type) { + case notifyNewVersion: + version = nf.version + case notifyForceCheckpoint: + version = nf.version + force = true + default: + panic(fmt.Errorf("unsupported checkpointer notification type: %T", nf)) + } } - if paused { + if paused && !force { continue } @@ -289,11 +338,18 @@ func (c *checkpointer) worker(ctx context.Context) { } // Don't checkpoint if checkpoints are disabled. - if params.Interval == 0 { + if params.Interval == 0 && !force { continue } - if err := c.maybeCheckpoint(ctx, version, params); err != nil { + var err error + switch force { + case false: + err = c.maybeCheckpoint(ctx, version, params) + case true: + err = c.checkpoint(ctx, version, params) + } + if err != nil { c.logger.Error("failed to checkpoint", "version", version, "err", err, @@ -318,14 +374,15 @@ func NewCheckpointer( cfg CheckpointerConfig, ) (Checkpointer, error) { c := &checkpointer{ - cfg: cfg, - ndb: ndb, - creator: creator, - notifyCh: channels.NewRingChannel(1), - flushCh: channels.NewRingChannel(1), - statusCh: make(chan struct{}), - pausedCh: make(chan bool), - logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace), + cfg: cfg, + ndb: ndb, + creator: creator, + notifyCh: channels.NewRingChannel(1), + flushCh: channels.NewRingChannel(1), + statusCh: make(chan struct{}), + pausedCh: make(chan bool), + cpNotifier: pubsub.NewBroker(false), + logger: logging.GetLogger("storage/mkvs/checkpoint/"+cfg.Name).With("namespace", cfg.Namespace), } go c.worker(ctx) return c, nil diff --git a/go/storage/mkvs/checkpoint/checkpointer_test.go b/go/storage/mkvs/checkpoint/checkpointer_test.go index 7a7d54df71a..cf42217387b 100644 --- a/go/storage/mkvs/checkpoint/checkpointer_test.go +++ b/go/storage/mkvs/checkpoint/checkpointer_test.go @@ -91,8 +91,14 @@ func testCheckpointer(t *testing.T, earliestVersion, interval uint64, preExistin }) require.NoError(err, "NewCheckpointer") + // Start watching checkpoints. + cpCh, sub, err := cp.WatchCheckpoints() + require.NoError(err, "WatchCheckpoints") + defer sub.Close() + // Finalize a few rounds. - for round := earliestVersion; round < earliestVersion+(testNumKept+1)*interval; round++ { + var round uint64 + for round = earliestVersion; round < earliestVersion+(testNumKept+1)*interval; round++ { tree := mkvs.NewWithRoot(nil, ndb, root) err = tree.Insert(ctx, []byte(fmt.Sprintf("round %d", round)), []byte(fmt.Sprintf("value %d", round))) require.NoError(err, "Insert") @@ -121,8 +127,44 @@ func testCheckpointer(t *testing.T, earliestVersion, interval uint64, preExistin }) require.NoError(err, "GetCheckpoints") require.Len(cps, testNumKept, "incorrect number of live checkpoints") + + // Make sure checkpoint event was emitted. + select { + case v := <-cpCh: + require.Equal(cps[len(cps)-1].Root.Version, v, "checkpoint event should be correct") + case <-time.After(2 * testCheckInterval): + t.Fatalf("failed to wait for checkpointer to emit event") + } } } + + // Force a checkpoint at a version outside the regular interval. + if interval > 1 { + cpVersion := round - interval + 1 + cp.ForceCheckpoint(cpVersion) + + select { + case <-cp.(*checkpointer).statusCh: + case <-time.After(2 * testCheckInterval): + t.Fatalf("failed to wait for checkpointer to checkpoint") + } + + // Make sure that the correct checkpoint was created. + cps, err := fc.GetCheckpoints(ctx, &GetCheckpointsRequest{ + Version: checkpointVersion, + Namespace: testNs, + }) + require.NoError(err, "GetCheckpoints") + + var found bool + for _, cpm := range cps { + if cpm.Root.Version == cpVersion { + found = true + break + } + } + require.True(found, "forced checkpoint should have been created") + } } func TestCheckpointer(t *testing.T) { @@ -138,4 +180,7 @@ func TestCheckpointer(t *testing.T) { t.Run("MaybeUnderflow", func(t *testing.T) { testCheckpointer(t, 5, 10, true) }) + t.Run("ForceCheckpoint", func(t *testing.T) { + testCheckpointer(t, 0, 10, false) + }) } From 2236a2fd2be683dba64324c31abcb249529ada25 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 24 Jun 2021 20:58:00 +0200 Subject: [PATCH 3/7] go/worker/storage: Force checkpoint sync when block info unavailable --- .changelog/4080.bugfix.md | 1 + go/worker/storage/committee/node.go | 34 +++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 .changelog/4080.bugfix.md diff --git a/.changelog/4080.bugfix.md b/.changelog/4080.bugfix.md new file mode 100644 index 00000000000..40ccb534631 --- /dev/null +++ b/.changelog/4080.bugfix.md @@ -0,0 +1 @@ +go/worker/storage: Force checkpoint sync when block info unavailable diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 2720a257f81..5bae1704ceb 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -922,6 +922,36 @@ func (n *Node) worker() { // nolint: gocyclo n.checkpointer.Flush() } + // Check if we are able to fetch the first block that we would be syncing if we used iterative + // syncing. In case we cannot (likely because we synced the consensus layer via state sync), we + // must wait for a later checkpoint to become available. + if !n.checkpointSyncForced { + // Determine what is the first round that we would need to sync. + iterativeSyncStart := cachedLastRound + if iterativeSyncStart == n.undefinedRound { + iterativeSyncStart++ + } + + // Check if we actually have information about that round. This assumes that any reindexing + // was already performed (the common node would not indicate being initialized otherwise). + _, err = n.commonNode.Runtime.History().GetBlock(n.ctx, iterativeSyncStart) + switch { + case err == nil: + case errors.Is(err, roothashApi.ErrNotFound): + // No information is available about this round, force checkpoint sync. + n.logger.Warn("forcing checkpoint sync as we don't have authoritative block info", + "round", iterativeSyncStart, + ) + n.checkpointSyncForced = true + default: + // Unknown error while fetching block information, abort. + n.logger.Error("failed to query block", + "err", err, + ) + return + } + } + n.logger.Info("worker initialized", "genesis_round", genesisBlock.Header.Round, "last_synced", cachedLastRound, @@ -952,8 +982,8 @@ func (n *Node) worker() { // nolint: gocyclo switch n.checkpointSyncForced { case true: // We have no other options but to perform a checkpoint sync as we are missing - // genesis state. - n.logger.Info("checkpoint sync required as we don't have genesis state, retrying", + // either state or authoritative blocks. + n.logger.Info("checkpoint sync required, retrying", "err", err, "attempt", attempt, ) From a5d860f3ac19659328f49a66ad3d41ce6faad051 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 24 Jun 2021 20:58:20 +0200 Subject: [PATCH 4/7] go/worker/storage: Synchronize checkpoints with consensus layer --- .changelog/4080.feature.md | 1 + go/worker/storage/committee/node.go | 53 +++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 .changelog/4080.feature.md diff --git a/.changelog/4080.feature.md b/.changelog/4080.feature.md new file mode 100644 index 00000000000..ae5bb8568f9 --- /dev/null +++ b/.changelog/4080.feature.md @@ -0,0 +1 @@ +go/worker/storage: Synchronize checkpoints with consensus layer diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 5bae1704ceb..e06d257af71 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -363,6 +363,9 @@ func (n *Node) Name() string { func (n *Node) Start() error { go n.watchQuit() go n.worker() + if n.checkpointer != nil { + go n.consensusCheckpointSyncer() + } return nil } @@ -835,6 +838,56 @@ func (n *Node) watchQuit() { close(n.quitCh) } +func (n *Node) consensusCheckpointSyncer() { + // Make sure we always create a checkpoint when the consensus layer creates a checkpoint. The + // reason why we do this is to make it faster for storage nodes that use consensus state sync + // to catch up as exactly the right checkpoint will be available. + consensusCp := n.commonNode.Consensus.Checkpointer() + if consensusCp == nil { + return + } + + ch, sub, err := consensusCp.WatchCheckpoints() + if err != nil { + n.logger.Error("failed to watch checkpoints", + "err", err, + ) + return + } + defer sub.Close() + + for { + select { + case <-n.quitCh: + return + case <-n.ctx.Done(): + return + case version := <-ch: + // Lookup what runtime round corresponds to the given consensus layer version and make + // sure we checkpoint it. + blk, err := n.commonNode.Consensus.RootHash().GetLatestBlock(n.ctx, &roothashApi.RuntimeRequest{ + RuntimeID: n.commonNode.Runtime.ID(), + Height: int64(version), + }) + if err != nil { + n.logger.Error("failed to get runtime block corresponding to consensus checkpoint", + "err", err, + "height", version, + ) + continue + } + + // Force runtime storage checkpointer to create a checkpoint at this round. + n.logger.Info("consensus checkpoint, force runtime checkpoint", + "height", version, + "round", blk.Header.Round, + ) + + n.checkpointer.ForceCheckpoint(blk.Header.Round) + } + } +} + // This is only called from the main worker goroutine, so no locking should be necessary. func (n *Node) nudgeAvailability(lastSynced, latest uint64) { if lastSynced == n.undefinedRound || latest == n.undefinedRound { From 4d7f3509742eac12119c18349b39895400b51b24 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Fri, 25 Jun 2021 12:58:33 +0200 Subject: [PATCH 5/7] go/runtime: Delay subscriptions until after consensus sync Previously if the node used consensus state sync it would fail to receive any updates for the various descriptors until the descriptors were updated after the state sync checkpoint. --- .changelog/4080.internal.3.md | 5 +++ go/keymanager/client/client.go | 15 +++---- go/oasis-node/node_test.go | 2 +- go/runtime/client/client.go | 2 +- go/runtime/nodes/runtime.go | 70 +++++++++++++++++------------ go/runtime/nodes/versioned.go | 41 ++++++++++------- go/runtime/registry/registry.go | 44 +++++++++--------- go/storage/client/init.go | 9 ++-- go/storage/client/tests/tests.go | 2 +- go/worker/common/committee/group.go | 2 +- go/worker/common/committee/node.go | 2 +- go/worker/keymanager/handler.go | 2 +- go/worker/keymanager/watcher.go | 14 +++--- go/worker/storage/committee/node.go | 6 +-- 14 files changed, 119 insertions(+), 97 deletions(-) create mode 100644 .changelog/4080.internal.3.md diff --git a/.changelog/4080.internal.3.md b/.changelog/4080.internal.3.md new file mode 100644 index 00000000000..f957e53a271 --- /dev/null +++ b/.changelog/4080.internal.3.md @@ -0,0 +1,5 @@ +go/runtime: Delay subscriptions until after consensus sync + +Previously if the node used consensus state sync it would fail to receive any +updates for the various descriptors until the descriptors were updated after +the state sync checkpoint. diff --git a/go/keymanager/client/client.go b/go/keymanager/client/client.go index 1fb8a68a6be..08b3d65507a 100644 --- a/go/keymanager/client/client.go +++ b/go/keymanager/client/client.go @@ -37,8 +37,7 @@ var ErrKeyManagerNotAvailable = errors.New("keymanager/client: key manager not a type Client struct { runtime runtimeRegistry.Runtime - backend api.Backend - registry registry.Backend + consensus consensus.Backend ctx context.Context initCh chan struct{} @@ -116,7 +115,7 @@ func (c *Client) CallRemote(ctx context.Context, data []byte) ([]byte, error) { } func (c *Client) worker() { - stCh, stSub := c.backend.WatchStatuses() + stCh, stSub := c.consensus.KeyManager().WatchStatuses() defer stSub.Close() rtCh, rtSub, err := c.runtime.WatchRegistryDescriptor() @@ -155,7 +154,7 @@ func (c *Client) worker() { } // Fetch current key manager status. - st, err := c.backend.GetStatus(c.ctx, ®istry.NamespaceQuery{ + st, err := c.consensus.KeyManager().GetStatus(c.ctx, ®istry.NamespaceQuery{ ID: *kmID, Height: consensus.HeightLatest, }) @@ -208,11 +207,10 @@ func (c *Client) updateState(status *api.Status) { func New( ctx context.Context, runtime runtimeRegistry.Runtime, - backend api.Backend, - registry registry.Backend, + consensus consensus.Backend, identity *identity.Identity, ) (*Client, error) { - committeeNodes, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, registry) + committeeNodes, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, consensus) if err != nil { return nil, fmt.Errorf("keymanager/client: failed to create node descriptor watcher: %w", err) } @@ -228,8 +226,7 @@ func New( c := &Client{ runtime: runtime, - backend: backend, - registry: registry, + consensus: consensus, ctx: ctx, initCh: make(chan struct{}), committeeWatcher: committeeNodes, diff --git a/go/oasis-node/node_test.go b/go/oasis-node/node_test.go index f76d36c0f8c..8941cb6eec2 100644 --- a/go/oasis-node/node_test.go +++ b/go/oasis-node/node_test.go @@ -519,7 +519,7 @@ func testStorageClientWithNode(t *testing.T, node *testNode) { require.NoError(t, err, "GetRuntime") localBackend := rt.Storage().(storageAPI.LocalBackend) - client, err := storageClient.NewStatic(ctx, node.Identity, node.Consensus.Registry(), node.Identity.NodeSigner.Public()) + client, err := storageClient.NewStatic(ctx, node.Identity, node.Consensus, node.Identity.NodeSigner.Public()) require.NoError(t, err, "NewStatic") // Determine the current round. This is required so that we can commit into diff --git a/go/runtime/client/client.go b/go/runtime/client/client.go index ce3f59ad5fd..ce990f05de3 100644 --- a/go/runtime/client/client.go +++ b/go/runtime/client/client.go @@ -550,7 +550,7 @@ func (c *runtimeClient) CallEnclave(ctx context.Context, request *enclaverpc.Cal if km = c.kmClients[rt.ID()]; km == nil { c.logger.Debug("creating new key manager client instance") - km, err = keymanager.New(c.common.ctx, rt, c.common.consensus.KeyManager(), c.common.consensus.Registry(), nil) + km, err = keymanager.New(c.common.ctx, rt, c.common.consensus, nil) if err != nil { c.Unlock() c.logger.Error("failed to create key manager client instance", diff --git a/go/runtime/nodes/runtime.go b/go/runtime/nodes/runtime.go index b554d73f4d1..982b436a70c 100644 --- a/go/runtime/nodes/runtime.go +++ b/go/runtime/nodes/runtime.go @@ -13,7 +13,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/pubsub" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" - registry "github.com/oasisprotocol/oasis-core/go/registry/api" ) const roleTagPrefix = "role" @@ -35,7 +34,7 @@ func TagsForRoleMask(nodeRoles node.RolesMask) (tags []string) { type runtimeNodesWatcher struct { // nolint: maligned sync.RWMutex - registry registry.Backend + consensus consensus.Backend runtimeID common.Namespace @@ -128,9 +127,46 @@ func (rw *runtimeNodesWatcher) removeLocked(n *node.Node) { }) } -func (rw *runtimeNodesWatcher) watchRuntimeNodeUpdates(ctx context.Context, ch <-chan *registry.NodeEvent, sub pubsub.ClosableSubscription) { +func (rw *runtimeNodesWatcher) watchRuntimeNodeUpdates(ctx context.Context) { + rw.logger.Debug("waiting consensus sync") + select { + case <-ctx.Done(): + return + case <-rw.consensus.Synced(): + } + rw.logger.Debug("consensus synced") + + ch, sub, err := rw.consensus.Registry().WatchNodes(ctx) + if err != nil { + rw.logger.Error("failed to watch nodes", + "err", err, + ) + return + } defer sub.Close() + // Setup initial state. + // This is needed since in case node is restarted, we won't be replaying + // old blocks and therefore won't receive the node registration update events + // for currently registered nodes. + nodes, err := rw.consensus.Registry().GetNodes(ctx, consensus.HeightLatest) + // If there's no committee blocks this is a fresh node so initial state is empty. + if err != nil && err != consensus.ErrNoCommittedBlocks { + rw.logger.Error("error querying registry for nodes", + "err", err, + ) + return + } + for _, n := range nodes { + if n.GetRuntime(rw.runtimeID) == nil { + continue + } + + rw.Lock() + rw.updateLocked(n) + rw.Unlock() + } + for { select { case <-ctx.Done(): @@ -159,11 +195,11 @@ func (rw *runtimeNodesWatcher) watchRuntimeNodeUpdates(ctx context.Context, ch < // Aditionally, watched nodes are tagged by node roles. func NewRuntimeNodeLookup( ctx context.Context, - registry registry.Backend, + consensus consensus.Backend, runtimeID common.Namespace, ) (NodeDescriptorLookup, error) { rw := &runtimeNodesWatcher{ - registry: registry, + consensus: consensus, runtimeID: runtimeID, nodes: make(map[signature.PublicKey]*node.Node), nodesByPeerID: make(map[signature.PublicKey]*node.Node), @@ -183,29 +219,7 @@ func NewRuntimeNodeLookup( } }) - ch, sub, err := registry.WatchNodes(ctx) - if err != nil { - return nil, fmt.Errorf("runtime/nodes/watcher: failed to watch nodes: %w", err) - } - - // Setup initial state. - // This is needed since in case node is restarted, we won't be replaying - // old blocks and therefore won't receive the node registration update events - // for currently registered nodes. - nodes, err := rw.registry.GetNodes(ctx, consensus.HeightLatest) - // If there's no committee blocks this is a fresh node so initial state is empty. - if err != nil && err != consensus.ErrNoCommittedBlocks { - return nil, fmt.Errorf("runtime/nodes/watcher: error querying registry for nodes: %w", err) - } - for _, n := range nodes { - if n.GetRuntime(rw.runtimeID) == nil { - continue - } - // NOTE: Nothing else is accessing this yet, so no lock needed here. - rw.updateLocked(n) - } - - go rw.watchRuntimeNodeUpdates(ctx, ch, sub) + go rw.watchRuntimeNodeUpdates(ctx) return rw, nil } diff --git a/go/runtime/nodes/versioned.go b/go/runtime/nodes/versioned.go index 1a7d1c2aa8b..b0000ace943 100644 --- a/go/runtime/nodes/versioned.go +++ b/go/runtime/nodes/versioned.go @@ -53,9 +53,7 @@ type VersionedNodeDescriptorWatcher interface { type versionedNodeDescriptorWatcher struct { sync.RWMutex - registry registry.Backend - - ctx context.Context + consensus consensus.Backend frozen bool version int64 @@ -133,7 +131,7 @@ func (nw *versionedNodeDescriptorWatcher) WatchNodeWithTag(ctx context.Context, } // Fetch the latest version of the node from registry. - n, err := nw.registry.GetNode(ctx, ®istry.IDQuery{ID: id, Height: consensus.HeightLatest}) + n, err := nw.consensus.Registry().GetNode(ctx, ®istry.IDQuery{ID: id, Height: consensus.HeightLatest}) if err != nil { return nil, fmt.Errorf("committee: failed to fetch node info: %w", err) } @@ -220,12 +218,28 @@ func (nw *versionedNodeDescriptorWatcher) WatchNodeUpdates() (<-chan *NodeUpdate return ch, sub, nil } -func (nw *versionedNodeDescriptorWatcher) worker(ch <-chan *registry.NodeEvent, sub pubsub.ClosableSubscription) { +func (nw *versionedNodeDescriptorWatcher) watchRuntimeNodeUpdates(ctx context.Context) { + nw.logger.Debug("waiting consensus sync") + select { + case <-ctx.Done(): + return + case <-nw.consensus.Synced(): + } + nw.logger.Debug("consensus synced") + + // Subscribe to node updates. + ch, sub, err := nw.consensus.Registry().WatchNodes(ctx) + if err != nil { + nw.logger.Error("failed to watch nodes", + "err", err, + ) + return + } defer sub.Close() for { select { - case <-nw.ctx.Done(): + case <-ctx.Done(): return case ev := <-ch: func() { @@ -256,17 +270,10 @@ func (nw *versionedNodeDescriptorWatcher) Versioned() bool { // // This watcher will only track nodes that will be explicitly marked to watch // via WatchNode/WatchNodeWithTags methods. -func NewVersionedNodeDescriptorWatcher(ctx context.Context, registry registry.Backend) (VersionedNodeDescriptorWatcher, error) { - // Subscribe to node updates. - ch, sub, err := registry.WatchNodes(ctx) - if err != nil { - return nil, fmt.Errorf("committee: failed to watch nodes: %w", err) - } - +func NewVersionedNodeDescriptorWatcher(ctx context.Context, consensus consensus.Backend) (VersionedNodeDescriptorWatcher, error) { nw := &versionedNodeDescriptorWatcher{ - registry: registry, - ctx: ctx, - logger: logging.GetLogger("runtime/committee/nodedescriptorwatcher"), + consensus: consensus, + logger: logging.GetLogger("runtime/committee/nodedescriptorwatcher"), } nw.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) { nw.RLock() @@ -282,7 +289,7 @@ func NewVersionedNodeDescriptorWatcher(ctx context.Context, registry registry.Ba }) nw.Reset() - go nw.worker(ch, sub) + go nw.watchRuntimeNodeUpdates(ctx) return nw, nil } diff --git a/go/runtime/registry/registry.go b/go/runtime/registry/registry.go index a55b886da69..0f75b289731 100644 --- a/go/runtime/registry/registry.go +++ b/go/runtime/registry/registry.go @@ -11,7 +11,6 @@ import ( "github.com/spf13/viper" - beacon "github.com/oasisprotocol/oasis-core/go/beacon/api" "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/identity" @@ -320,16 +319,7 @@ func (r *runtime) updateActiveDescriptor(ctx context.Context) bool { return true } -func (r *runtime) watchUpdates( - ctx context.Context, - epoCh <-chan beacon.EpochTime, - sub pubsub.ClosableSubscription, - regCh <-chan *registry.Runtime, - regSub pubsub.ClosableSubscription, -) { - defer sub.Close() - defer regSub.Close() - +func (r *runtime) watchUpdates(ctx context.Context) { r.logger.Debug("waiting consensus sync") select { case <-ctx.Done(): @@ -338,6 +328,26 @@ func (r *runtime) watchUpdates( } r.logger.Debug("consensus synced") + // Subscribe to epoch transitions. + epoCh, sub, err := r.consensus.Beacon().WatchEpochs(ctx) + if err != nil { + r.logger.Error("failed to watch epochs", + "err", err, + ) + return + } + defer sub.Close() + + // Subscribe to runtime updates. + regCh, regSub, err := r.consensus.Registry().WatchRuntimes(ctx) + if err != nil { + r.logger.Error("failed to watch runtime updates", + "err", err, + ) + return + } + defer regSub.Close() + var regInitialized, activeInitialized bool for { select { @@ -385,7 +395,7 @@ func (r *runtime) finishInitialization(ctx context.Context, ident *identity.Iden defer r.Unlock() if r.storage == nil { - storageBackend, err := client.NewForPublicStorage(ctx, r.id, ident, r.consensus.Registry(), r) + storageBackend, err := client.NewForPublicStorage(ctx, r.id, ident, r.consensus, r) if err != nil { return fmt.Errorf("runtime/registry: cannot create storage for runtime %s: %w", r.id, err) } @@ -572,14 +582,6 @@ func newRuntime( consensus consensus.Backend, logger *logging.Logger, ) (*runtime, error) { - ch, sub, err := consensus.Beacon().WatchEpochs(ctx) - if err != nil { - return nil, fmt.Errorf("runtime/registry: failed to watch epochs %s: %w", id, err) - } - regCh, regSub, err := consensus.Registry().WatchRuntimes(ctx) - if err != nil { - return nil, fmt.Errorf("runtime/registry: failed to watch updates for runtime %s: %w", id, err) - } watchCtx, cancel := context.WithCancel(ctx) rt := &runtime{ @@ -592,7 +594,7 @@ func newRuntime( activeDescriptorNotifier: pubsub.NewBroker(true), logger: logger.With("runtime_id", id), } - go rt.watchUpdates(watchCtx, ch, sub, regCh, regSub) + go rt.watchUpdates(watchCtx) // Configure runtime host if needed. if cfg.Host != nil { diff --git a/go/storage/client/init.go b/go/storage/client/init.go index 953b01bff3e..a0dc6147305 100644 --- a/go/storage/client/init.go +++ b/go/storage/client/init.go @@ -9,6 +9,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/identity" "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/common/node" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" registry "github.com/oasisprotocol/oasis-core/go/registry/api" "github.com/oasisprotocol/oasis-core/go/runtime/nodes" "github.com/oasisprotocol/oasis-core/go/runtime/nodes/grpc" @@ -62,13 +63,13 @@ func NewForPublicStorage( ctx context.Context, namespace common.Namespace, ident *identity.Identity, - registryBackend registry.Backend, + consensus consensus.Backend, runtime registry.RuntimeDescriptorProvider, opts ...Option, ) (api.Backend, error) { nl, err := nodes.NewRuntimeNodeLookup( ctx, - registryBackend, + consensus, namespace, ) if err != nil { @@ -93,11 +94,11 @@ func NewForPublicStorage( func NewStatic( ctx context.Context, ident *identity.Identity, - registryBackend registry.Backend, + consensus consensus.Backend, nodeID signature.PublicKey, opts ...Option, ) (api.Backend, error) { - nw, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, registryBackend) + nw, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, consensus) if err != nil { return nil, fmt.Errorf("storage/client: failed to create node descriptor watcher: %w", err) } diff --git a/go/storage/client/tests/tests.go b/go/storage/client/tests/tests.go index 01c55e2c575..df9c8080be8 100644 --- a/go/storage/client/tests/tests.go +++ b/go/storage/client/tests/tests.go @@ -42,7 +42,7 @@ func ClientWorkerTests( ns := rt.Runtime.ID // Initialize storage client. - client, err := storageClient.NewForPublicStorage(ctx, ns, identity, consensus.Registry(), nil) + client, err := storageClient.NewForPublicStorage(ctx, ns, identity, consensus, nil) require.NoError(err, "NewStorageClient") // Create mock root hash. diff --git a/go/worker/common/committee/group.go b/go/worker/common/committee/group.go index ae9133b38b7..577aa1bd8b4 100644 --- a/go/worker/common/committee/group.go +++ b/go/worker/common/committee/group.go @@ -594,7 +594,7 @@ func NewGroup( consensus consensus.Backend, p2p *p2p.P2P, ) (*Group, error) { - nw, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, consensus.Registry()) + nw, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, consensus) if err != nil { return nil, fmt.Errorf("group: failed to create node watcher: %w", err) } diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index 112f1a33218..642ae4b2418 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -362,7 +362,7 @@ func (n *Node) worker() { if rt.KeyManager != nil { n.logger.Info("runtime indicates a key manager is required, waiting for it to be ready") - n.KeyManagerClient, err = keymanagerClient.New(n.ctx, n.Runtime, n.KeyManager, n.Consensus.Registry(), n.Identity) + n.KeyManagerClient, err = keymanagerClient.New(n.ctx, n.Runtime, n.Consensus, n.Identity) if err != nil { n.logger.Error("failed to create key manager client", "err", err, diff --git a/go/worker/keymanager/handler.go b/go/worker/keymanager/handler.go index 4eb5e5f82ca..7b58ff56ff8 100644 --- a/go/worker/keymanager/handler.go +++ b/go/worker/keymanager/handler.go @@ -29,7 +29,7 @@ type hostHandler struct { } func (h *hostHandler) initRemoteClient(commonWorker *workerCommon.Worker) { - remoteClient, err := client.New(h.w.ctx, h.w.runtime, commonWorker.KeyManager, commonWorker.Consensus.Registry(), commonWorker.Identity) + remoteClient, err := client.New(h.w.ctx, h.w.runtime, commonWorker.Consensus, commonWorker.Identity) if err != nil { h.w.logger.Error("failed to create remote client", "err", err, diff --git a/go/worker/keymanager/watcher.go b/go/worker/keymanager/watcher.go index bf49b35261d..13924615603 100644 --- a/go/worker/keymanager/watcher.go +++ b/go/worker/keymanager/watcher.go @@ -4,24 +4,24 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/accessctl" "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" "github.com/oasisprotocol/oasis-core/go/common/node" - registry "github.com/oasisprotocol/oasis-core/go/registry/api" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" "github.com/oasisprotocol/oasis-core/go/runtime/nodes" ) type kmNodeWatcher struct { - w *Worker - registry registry.Backend + w *Worker + consensus consensus.Backend } func newKmNodeWatcher(w *Worker) *kmNodeWatcher { return &kmNodeWatcher{ - w: w, - registry: w.commonWorker.Consensus.Registry(), + w: w, + consensus: w.commonWorker.Consensus, } } func (knw *kmNodeWatcher) watchNodes() { - nodesCh, nodesSub, err := knw.registry.WatchNodeList(knw.w.ctx) + nodesCh, nodesSub, err := knw.consensus.Registry().WatchNodeList(knw.w.ctx) if err != nil { knw.w.logger.Error("worker/keymanager: failed to watch node list", "err", err, @@ -30,7 +30,7 @@ func (knw *kmNodeWatcher) watchNodes() { } defer nodesSub.Close() - watcher, err := nodes.NewVersionedNodeDescriptorWatcher(knw.w.ctx, knw.registry) + watcher, err := nodes.NewVersionedNodeDescriptorWatcher(knw.w.ctx, knw.consensus) if err != nil { knw.w.logger.Error("worker/keymanager: failed to create node desc watcher", "err", err, diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index e06d257af71..0eb91e55ef3 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -254,11 +254,7 @@ func NewNode( // Create a new storage client that will be used for remote sync. // This storage client connects to all registered storage nodes for the runtime. - nl, err := nodes.NewRuntimeNodeLookup( - n.ctx, - n.commonNode.Consensus.Registry(), - rtID, - ) + nl, err := nodes.NewRuntimeNodeLookup(n.ctx, n.commonNode.Consensus, rtID) if err != nil { return nil, fmt.Errorf("group: failed to create runtime node watcher: %w", err) } From 91b4f79603322599b85785ab02e11cee37feeec5 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Fri, 25 Jun 2021 13:30:38 +0200 Subject: [PATCH 6/7] go/oasis-test-runner: Add consensus state sync to storage sync scenario --- .../scenario/e2e/runtime/storage_sync.go | 89 +++++++++++++++++-- 1 file changed, 82 insertions(+), 7 deletions(-) diff --git a/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go b/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go index cdbb9450255..e36bc79aaee 100644 --- a/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go +++ b/go/oasis-test-runner/scenario/e2e/runtime/storage_sync.go @@ -2,10 +2,12 @@ package runtime import ( "context" + "encoding/hex" "fmt" "strings" "time" + control "github.com/oasisprotocol/oasis-core/go/control/api" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/log" "github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis" @@ -45,6 +47,11 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) { // new node registers. f.Network.SetMockEpoch() + // Enable consensus layer checkpoints. + f.Network.Consensus.Parameters.StateCheckpointInterval = 10 + f.Network.Consensus.Parameters.StateCheckpointNumKept = 2 + f.Network.Consensus.Parameters.StateCheckpointChunkSize = 1024 * 1024 + // Make the first storage worker check for checkpoints more often. f.StorageWorkers[0].CheckpointCheckInterval = 1 * time.Second // Configure runtime to allow a smaller replication factor as otherwise execution may fail when @@ -69,11 +76,22 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) { CheckpointSyncEnabled: true, LogWatcherHandlerFactories: []log.WatcherHandlerFactory{oasis.LogAssertCheckpointSync()}, }) + // And one more storage worker that will sync the consensus layer via state sync. + f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{ + Backend: database.BackendNameBadgerDB, + Entity: 1, + NoAutoStart: true, + CheckpointSyncEnabled: true, + LogWatcherHandlerFactories: []log.WatcherHandlerFactory{ + oasis.LogAssertCheckpointSync(), + oasis.LogEventABCIStateSyncComplete(), + }, + }) return f, nil } -func (sc *storageSyncImpl) Run(childEnv *env.Env) error { +func (sc *storageSyncImpl) Run(childEnv *env.Env) error { //nolint: gocyclo clientErrCh, cmd, err := sc.runtimeImpl.start(childEnv) if err != nil { return err @@ -196,15 +214,72 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error { } } - // Now spin up the last storage worker and check if it syncs with a checkpoint. + sc.Logger.Info("running first late storage worker") + + // Now spin up the first late storage worker and check if it syncs with a checkpoint. lateWorker := sc.Net.StorageWorkers()[3] - err = lateWorker.Start() - if err != nil { - return fmt.Errorf("can't start last storage worker: %w", err) + if err = lateWorker.Start(); err != nil { + return fmt.Errorf("can't start first late storage worker: %w", err) } - if err := lateWorker.WaitReady(ctx); err != nil { - return fmt.Errorf("error waiting for late storage worker to become ready: %w", err) + if err = lateWorker.WaitReady(ctx); err != nil { + return fmt.Errorf("error waiting for first late storage worker to become ready: %w", err) } + + sc.Logger.Info("running second late storage worker") + + // Get the TLS public key from the validators. + var ( + consensusNodes []string + trustHeight uint64 + trustHash string + ) + for _, v := range sc.Net.Validators() { + var ctrl *oasis.Controller + ctrl, err = oasis.NewController(v.SocketPath()) + if err != nil { + return fmt.Errorf("failed to create controller for validator %s: %w", v.Name, err) + } + + var status *control.Status + status, err = ctrl.GetStatus(ctx) + if err != nil { + return fmt.Errorf("failed to get status for validator %s: %w", v.Name, err) + } + + if status.Registration.Descriptor == nil { + return fmt.Errorf("validator %s has not registered", v.Name) + } + if len(status.Registration.Descriptor.TLS.Addresses) == 0 { + return fmt.Errorf("validator %s has no TLS addresses", v.Name) + } + + var rawAddress []byte + tlsAddress := status.Registration.Descriptor.TLS.Addresses[0] + rawAddress, err = tlsAddress.MarshalText() + if err != nil { + return fmt.Errorf("failed to marshal TLS address: %w", err) + } + consensusNodes = append(consensusNodes, string(rawAddress)) + + trustHeight = uint64(status.Consensus.LatestHeight) + trustHash = hex.EncodeToString(status.Consensus.LatestHash) + } + + // Configure state sync for the last storage node. + lateWorker = sc.Net.StorageWorkers()[4] + lateWorker.SetConsensusStateSync(&oasis.ConsensusStateSyncCfg{ + ConsensusNodes: consensusNodes, + TrustHeight: trustHeight, + TrustHash: trustHash, + }) + + if err = lateWorker.Start(); err != nil { + return fmt.Errorf("can't start second late storage worker: %w", err) + } + if err = lateWorker.WaitReady(ctx); err != nil { + return fmt.Errorf("error waiting for second late storage worker to become ready: %w", err) + } + // Wait a bit to give the logger in the node time to sync; the message has already been // logged by this point, it just might not be on disk yet. <-time.After(1 * time.Second) From 384d90f91efd2f0e48366fc59fb35377d643101b Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Fri, 25 Jun 2021 13:44:43 +0200 Subject: [PATCH 7/7] go/runtime/committee: Remove unused package --- go/runtime/committee/committee.go | 237 ------------------------------ 1 file changed, 237 deletions(-) delete mode 100644 go/runtime/committee/committee.go diff --git a/go/runtime/committee/committee.go b/go/runtime/committee/committee.go deleted file mode 100644 index ead665c7acc..00000000000 --- a/go/runtime/committee/committee.go +++ /dev/null @@ -1,237 +0,0 @@ -package committee - -import ( - "bytes" - "context" - "fmt" - "sort" - - "github.com/oasisprotocol/oasis-core/go/common" - "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" - "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" - "github.com/oasisprotocol/oasis-core/go/common/logging" - "github.com/oasisprotocol/oasis-core/go/common/pubsub" - registry "github.com/oasisprotocol/oasis-core/go/registry/api" - "github.com/oasisprotocol/oasis-core/go/runtime/nodes" - scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api" -) - -// Watcher is the committee watcher interface. -type Watcher interface { - // Nodes returns a node descriptor lookup interface that watches all nodes in the committee. - Nodes() nodes.NodeDescriptorLookup - - // EpochTransition signals an epoch transition to the committee watcher. - EpochTransition(ctx context.Context, height int64) error -} - -type committeeWatcher struct { // nolint: maligned - nw nodes.VersionedNodeDescriptorWatcher - scheduler scheduler.Backend - - runtimeID common.Namespace - kind scheduler.CommitteeKind - - filters []Filter - autoEpoch bool - - lastCommitteeID hash.Hash - - logger *logging.Logger -} - -func (cw *committeeWatcher) Nodes() nodes.NodeDescriptorLookup { - return cw.nw -} - -func (cw *committeeWatcher) EpochTransition(ctx context.Context, height int64) (err error) { - if cw.autoEpoch { - return fmt.Errorf("committee: manual epoch transition not allowed when automatic is enabled") - } - - defer func() { - // Make sure to not watch any nodes in case we fail to update the committee. - if err != nil { - cw.lastCommitteeID.Empty() - cw.nw.Reset() - } - }() - - // TODO: Support request for only a specific committee kind. - var committees []*scheduler.Committee - committees, err = cw.scheduler.GetCommittees(ctx, &scheduler.GetCommitteesRequest{ - RuntimeID: cw.runtimeID, - Height: height, - }) - if err != nil { - return fmt.Errorf("committee: unable to fetch committees: %w", err) - } - - var committee *scheduler.Committee - for _, c := range committees { - if c.Kind != cw.kind { - continue - } - committee = c - break - } - if committee == nil { - return fmt.Errorf("committee: no committee of kind %s for runtime %s", cw.kind, cw.runtimeID) - } - - return cw.update(ctx, height, committee) -} - -func (cw *committeeWatcher) update(ctx context.Context, version int64, committee *scheduler.Committee) (err error) { - defer func() { - // Make sure to not watch any nodes in case we fail to update the committee. - if err != nil { - cw.lastCommitteeID.Empty() - cw.nw.Reset() - } - }() - - var filtered []*scheduler.CommitteeNode -Members: - for _, member := range committee.Members { - // Filter members. - for _, f := range cw.filters { - if !f(member) { - continue Members - } - } - - filtered = append(filtered, member) - } - // Sort list to ensure a canonical identifier. - sort.Slice(filtered, func(i, j int) bool { - return bytes.Compare(filtered[i].PublicKey[:], filtered[j].PublicKey[:]) < 0 - }) - - // If the set of (filtered) committee members did not change, there is no need to trigger a - // reset and recreate everything. Nodes will be updated anyway. - cid := hash.NewFrom(filtered) - - if cw.lastCommitteeID.Equal(&cid) { - cw.logger.Debug("not updating committee as members/roles have not changed", - "filtered_committee_id", cid, - ) - // Bump committee version as that might have changed. - cw.nw.BumpVersion(version) - return nil - } - - // Clear all previous nodes. - cw.nw.Reset() - - for _, member := range filtered { - if _, err = cw.nw.WatchNode(ctx, member.PublicKey); err != nil { - return fmt.Errorf("committee: failed to watch node: %w", err) - } - } - - // Freeze the node watcher as we will not be watching any additional nodes. - cw.nw.Freeze(version) - cw.lastCommitteeID = cid - - return nil -} - -func (cw *committeeWatcher) watchCommittees(ctx context.Context, ch <-chan *scheduler.Committee, sub pubsub.ClosableSubscription) { - defer sub.Close() - - for { - select { - case <-ctx.Done(): - return - case c := <-ch: - if c == nil { - return - } - if c.RuntimeID != cw.runtimeID { - continue - } - if c.Kind != cw.kind { - continue - } - - if err := cw.update(ctx, int64(c.ValidFor), c); err != nil { - cw.logger.Error("failed to update committee", - "err", err, - ) - } - } - } -} - -// WatcherOption is an option for NewWatcher. -type WatcherOption func(cw *committeeWatcher) - -// WithAutomaticEpochTransitions is an option for enabling automatic epoch transitions in the -// committee watcher. Committees will be updated whenever the scheduler elects new committees. -func WithAutomaticEpochTransitions() WatcherOption { - return func(cw *committeeWatcher) { - cw.autoEpoch = true - } -} - -// Filter is filter function for the committee watcher. It should return false for any members which -// should be excluded. -type Filter func(*scheduler.CommitteeNode) bool - -// IgnoreNodeFilter is a committee watcher filter that filters out nodes based on their public key. -func IgnoreNodeFilter(pk signature.PublicKey) Filter { - return func(cn *scheduler.CommitteeNode) bool { - return !cn.PublicKey.Equal(pk) - } -} - -// WithFilter is an option that adds a given filter to the committee watcher. -func WithFilter(f Filter) WatcherOption { - return func(cw *committeeWatcher) { - cw.filters = append(cw.filters, f) - } -} - -// NewWatcher creates a new committee watcher. -func NewWatcher( - ctx context.Context, - scheduler scheduler.Backend, - registry registry.Backend, - runtimeID common.Namespace, - kind scheduler.CommitteeKind, - options ...WatcherOption, -) (Watcher, error) { - nw, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, registry) - if err != nil { - return nil, fmt.Errorf("committee: failed to create node descriptor watcher: %w", err) - } - - cw := &committeeWatcher{ - nw: nw, - scheduler: scheduler, - runtimeID: runtimeID, - kind: kind, - logger: logging.GetLogger("runtime/committee/watcher").With( - "runtime_id", runtimeID, - "kind", kind, - ), - } - cw.lastCommitteeID.Empty() - - for _, o := range options { - o(cw) - } - - // If configured, subscribe to committee updates. - if cw.autoEpoch { - ch, sub, err := scheduler.WatchCommittees(ctx) - if err != nil { - return nil, fmt.Errorf("committee: failed to watch committees: %w", err) - } - - go cw.watchCommittees(ctx, ch, sub) - } - - return cw, nil -}