From 7589b6738afa8e469e30347ee9e4456f548c8435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Ber=C4=8Di=C4=8D?= Date: Fri, 17 Jan 2020 16:20:49 +0100 Subject: [PATCH] go/upgrade: Implement node upgrading mechanism --- .changelog/2607.feature.md | 9 + go/common/persistent/persistent.go | 12 + go/consensus/tendermint/abci/mux.go | 29 +- go/consensus/tendermint/tendermint.go | 7 +- go/control/api/api.go | 9 + go/control/api/grpc.go | 63 ++++ go/control/control.go | 17 +- go/oasis-node/cmd/control/control.go | 64 ++++ .../cmd/debug/byzantine/tendermint.go | 2 +- go/oasis-node/cmd/node/node.go | 22 +- go/oasis-test-runner/oasis/byzantine.go | 1 + go/oasis-test-runner/oasis/compute.go | 1 + go/oasis-test-runner/oasis/keymanager.go | 1 + go/oasis-test-runner/oasis/log.go | 13 + go/oasis-test-runner/oasis/oasis.go | 3 +- go/oasis-test-runner/oasis/storage.go | 1 + go/oasis-test-runner/oasis/validator.go | 5 +- go/oasis-test-runner/scenario/e2e/upgrade.go | 335 ++++++++++++++++++ go/oasis-test-runner/test-runner.go | 2 + go/upgrade/api/api.go | 147 ++++++++ go/upgrade/migrations/dummy.go | 69 ++++ go/upgrade/migrations/migrations.go | 69 ++++ go/upgrade/upgrade.go | 282 +++++++++++++++ 23 files changed, 1147 insertions(+), 16 deletions(-) create mode 100644 .changelog/2607.feature.md create mode 100644 go/oasis-test-runner/scenario/e2e/upgrade.go create mode 100644 go/upgrade/api/api.go create mode 100644 go/upgrade/migrations/dummy.go create mode 100644 go/upgrade/migrations/migrations.go create mode 100644 go/upgrade/upgrade.go diff --git a/.changelog/2607.feature.md b/.changelog/2607.feature.md new file mode 100644 index 00000000000..3b27b761266 --- /dev/null +++ b/.changelog/2607.feature.md @@ -0,0 +1,9 @@ +Implement node upgrade mechanism. + +The node now accepts upgrade descriptors which describe the upgrade to carry +out. It can shut down at the appropriate epoch and then execute any required +migration handlers on the node itself and on the consensus layer. Once a +descriptor is submitted, the old node can be normally restarted and used until +the upgrade epoch is reached; the new binary can not be used at all until the +old binary has had a chance to reach the upgrade epoch. Once that is reached, +the old binary will refuse to start. diff --git a/go/common/persistent/persistent.go b/go/common/persistent/persistent.go index 388fe62bd10..fbc613573c6 100644 --- a/go/common/persistent/persistent.go +++ b/go/common/persistent/persistent.go @@ -105,6 +105,18 @@ func (ss *ServiceStore) PutCBOR(key []byte, value interface{}) error { }) } +// Delete removes the specified key from the service store. +func (ss *ServiceStore) Delete(key []byte) error { + return ss.store.db.Update(func(tx *badger.Txn) error { + switch err := tx.Delete(ss.dbKey(key)); err { + case badger.ErrKeyNotFound: + return ErrNotFound + default: + return err + } + }) +} + func (ss *ServiceStore) dbKey(key []byte) []byte { return bytes.Join([][]byte{ss.name, key}, []byte{'.'}) } diff --git a/go/consensus/tendermint/abci/mux.go b/go/consensus/tendermint/abci/mux.go index cbf7258a84d..963fa55de3f 100644 --- a/go/consensus/tendermint/abci/mux.go +++ b/go/consensus/tendermint/abci/mux.go @@ -27,6 +27,7 @@ import ( "github.com/oasislabs/oasis-core/go/consensus/api/transaction" epochtime "github.com/oasislabs/oasis-core/go/epochtime/api" genesis "github.com/oasislabs/oasis-core/go/genesis/api" + upgrade "github.com/oasislabs/oasis-core/go/upgrade/api" ) const ( @@ -264,12 +265,12 @@ func (a *ApplicationServer) EstimateGas(caller signature.PublicKey, tx *transact // NewApplicationServer returns a new ApplicationServer, using the provided // directory to persist state. -func NewApplicationServer(ctx context.Context, cfg *ApplicationConfig) (*ApplicationServer, error) { +func NewApplicationServer(ctx context.Context, upgrader upgrade.Backend, cfg *ApplicationConfig) (*ApplicationServer, error) { metricsOnce.Do(func() { prometheus.MustRegister(abciCollectors...) }) - mux, err := newABCIMux(ctx, cfg) + mux, err := newABCIMux(ctx, upgrader, cfg) if err != nil { return nil, err } @@ -284,8 +285,9 @@ type abciMux struct { sync.RWMutex types.BaseApplication - logger *logging.Logger - state *applicationState + logger *logging.Logger + upgrader upgrade.Backend + state *applicationState appsByName map[string]Application appsByMethod map[transaction.MethodName]Application @@ -457,6 +459,7 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock { blockHeight := mux.state.BlockHeight() + mux.logger.Debug("BeginBlock", "req", req, "block_height", blockHeight, @@ -480,6 +483,21 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB ctx := mux.state.NewContext(ContextBeginBlock, mux.currentTime) defer ctx.Close() + currentEpoch, err := mux.state.GetCurrentEpoch(ctx.Ctx()) + if err != nil { + panic("mux: can't get current epoch in BeginBlock") + } + + // Check if there are any upgrades pending or if we need to halt for an upgrade. + switch err = mux.upgrader.ConsensusUpgrade(ctx, currentEpoch, blockHeight); err { + case nil: + // Everything ok. + case upgrade.ErrStopForUpgrade: + panic("mux: reached upgrade epoch") + default: + panic(fmt.Sprintf("mux: error while trying to perform consensus upgrade: %v", err)) + } + switch mux.state.haltMode { case false: if !mux.state.inHaltEpoch(ctx) { @@ -867,7 +885,7 @@ func (mux *abciMux) checkDependencies() error { return nil } -func newABCIMux(ctx context.Context, cfg *ApplicationConfig) (*abciMux, error) { +func newABCIMux(ctx context.Context, upgrader upgrade.Backend, cfg *ApplicationConfig) (*abciMux, error) { state, err := newApplicationState(ctx, cfg) if err != nil { return nil, err @@ -875,6 +893,7 @@ func newABCIMux(ctx context.Context, cfg *ApplicationConfig) (*abciMux, error) { mux := &abciMux{ logger: logging.GetLogger("abci-mux"), + upgrader: upgrader, state: state, appsByName: make(map[string]Application), appsByMethod: make(map[transaction.MethodName]Application), diff --git a/go/consensus/tendermint/tendermint.go b/go/consensus/tendermint/tendermint.go index 2d86b762cee..d8f62084aaa 100644 --- a/go/consensus/tendermint/tendermint.go +++ b/go/consensus/tendermint/tendermint.go @@ -64,6 +64,7 @@ import ( roothashAPI "github.com/oasislabs/oasis-core/go/roothash/api" schedulerAPI "github.com/oasislabs/oasis-core/go/scheduler/api" stakingAPI "github.com/oasislabs/oasis-core/go/staking/api" + upgradeAPI "github.com/oasislabs/oasis-core/go/upgrade/api" ) const ( @@ -172,6 +173,7 @@ type tendermintService struct { ctx context.Context svcMgr *cmbackground.ServiceManager + upgrader upgradeAPI.Backend mux *abci.ApplicationServer node *tmnode.Node client tmcli.Client @@ -888,7 +890,7 @@ func (t *tendermintService) lazyInit() error { MinGasPrice: viper.GetUint64(CfgConsensusMinGasPrice), OwnTxSigner: t.nodeSigner.Public(), } - t.mux, err = abci.NewApplicationServer(t.ctx, appConfig) + t.mux, err = abci.NewApplicationServer(t.ctx, t.upgrader, appConfig) if err != nil { return err } @@ -1176,7 +1178,7 @@ func (t *tendermintService) worker() { } // New creates a new Tendermint service. -func New(ctx context.Context, dataDir string, identity *identity.Identity, genesisProvider genesisAPI.Provider) (service.TendermintService, error) { +func New(ctx context.Context, dataDir string, identity *identity.Identity, upgrader upgradeAPI.Backend, genesisProvider genesisAPI.Provider) (service.TendermintService, error) { // Retrive the genesis document early so that it is possible to // use it while initializing other things. genesisDoc, err := genesisProvider.GetGenesisDocument() @@ -1195,6 +1197,7 @@ func New(ctx context.Context, dataDir string, identity *identity.Identity, genes t := &tendermintService{ BaseBackgroundService: *cmservice.NewBaseBackgroundService("tendermint"), svcMgr: cmbackground.NewServiceManager(logging.GetLogger("tendermint/servicemanager")), + upgrader: upgrader, blockNotifier: pubsub.NewBroker(false), consensusSigner: identity.ConsensusSigner, nodeSigner: identity.NodeSigner, diff --git a/go/control/api/api.go b/go/control/api/api.go index 2f3f6da3772..1e2a9d7dc85 100644 --- a/go/control/api/api.go +++ b/go/control/api/api.go @@ -6,6 +6,7 @@ import ( "github.com/oasislabs/oasis-core/go/common/errors" epochtime "github.com/oasislabs/oasis-core/go/epochtime/api" + upgrade "github.com/oasislabs/oasis-core/go/upgrade/api" ) // NodeController is a node controller interface. @@ -23,6 +24,14 @@ type NodeController interface { // IsSynced checks whether the node has finished syncing. // TODO: These should be replaced with IsReady (see oasis-core#2130). IsSynced(ctx context.Context) (bool, error) + + // UpgradeBinary submits an upgrade descriptor to a running node. + // The node will wait for the appropriate epoch, then update its binaries + // and shut down. + UpgradeBinary(ctx context.Context, descriptor *upgrade.Descriptor) error + + // CancelUpgrade cancels a pending upgrade, unless it is already in progress. + CancelUpgrade(ctx context.Context) error } // Shutdownable is an interface the node presents for shutting itself down. diff --git a/go/control/api/grpc.go b/go/control/api/grpc.go index 91d4f5313fc..bee69c02081 100644 --- a/go/control/api/grpc.go +++ b/go/control/api/grpc.go @@ -6,6 +6,7 @@ import ( "google.golang.org/grpc" cmnGrpc "github.com/oasislabs/oasis-core/go/common/grpc" + upgradeApi "github.com/oasislabs/oasis-core/go/upgrade/api" ) var ( @@ -18,6 +19,10 @@ var ( methodWaitSync = serviceName.NewMethod("WaitSync", nil) // methodIsSynced is the IsSynced method. methodIsSynced = serviceName.NewMethod("IsSynced", nil) + // methodUpgradeBinary is the UpgradeBinary method. + methodUpgradeBinary = serviceName.NewMethod("UpgradeBinary", upgradeApi.Descriptor{}) + // methodCancelUpgrade is the CancelUpgrade method. + methodCancelUpgrade = serviceName.NewMethod("CancelUpgrade", nil) // serviceDesc is the gRPC service descriptor. serviceDesc = grpc.ServiceDesc{ @@ -36,6 +41,14 @@ var ( MethodName: methodIsSynced.ShortName(), Handler: handlerIsSynced, }, + { + MethodName: methodUpgradeBinary.ShortName(), + Handler: handlerUpgradeBinary, + }, + { + MethodName: methodCancelUpgrade.ShortName(), + Handler: handlerCancelUpgrade, + }, }, Streams: []grpc.StreamDesc{}, } @@ -102,6 +115,48 @@ func handlerIsSynced( // nolint: golint return interceptor(ctx, nil, info, handler) } +func handlerUpgradeBinary( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var descriptor upgradeApi.Descriptor + if err := dec(&descriptor); err != nil { + return nil, err + } + if interceptor == nil { + return nil, srv.(NodeController).UpgradeBinary(ctx, &descriptor) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodUpgradeBinary.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, srv.(NodeController).UpgradeBinary(ctx, req.(*upgradeApi.Descriptor)) + } + return interceptor(ctx, &descriptor, info, handler) +} + +func handlerCancelUpgrade( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + if interceptor == nil { + return nil, srv.(NodeController).CancelUpgrade(ctx) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodCancelUpgrade.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, srv.(NodeController).CancelUpgrade(ctx) + } + return interceptor(ctx, nil, info, handler) +} + // RegisterService registers a new node controller service with the given gRPC server. func RegisterService(server *grpc.Server, service NodeController) { server.RegisterService(&serviceDesc, service) @@ -127,6 +182,14 @@ func (c *nodeControllerClient) IsSynced(ctx context.Context) (bool, error) { return rsp, nil } +func (c *nodeControllerClient) UpgradeBinary(ctx context.Context, descriptor *upgradeApi.Descriptor) error { + return c.conn.Invoke(ctx, methodUpgradeBinary.FullName(), descriptor, nil) +} + +func (c *nodeControllerClient) CancelUpgrade(ctx context.Context) error { + return c.conn.Invoke(ctx, methodCancelUpgrade.FullName(), nil, nil) +} + // NewNodeControllerClient creates a new gRPC node controller client service. func NewNodeControllerClient(c *grpc.ClientConn) NodeController { return &nodeControllerClient{c} diff --git a/go/control/control.go b/go/control/control.go index cb7d8eaf8c7..3530ef3bfbd 100644 --- a/go/control/control.go +++ b/go/control/control.go @@ -5,12 +5,14 @@ import ( "context" consensus "github.com/oasislabs/oasis-core/go/consensus/api" - "github.com/oasislabs/oasis-core/go/control/api" + control "github.com/oasislabs/oasis-core/go/control/api" + upgrade "github.com/oasislabs/oasis-core/go/upgrade/api" ) type nodeController struct { - node api.Shutdownable + node control.Shutdownable consensus consensus.Backend + upgrader upgrade.Backend } func (c *nodeController) RequestShutdown(ctx context.Context, wait bool) error { @@ -49,10 +51,19 @@ func (c *nodeController) IsSynced(ctx context.Context) (bool, error) { } } +func (c *nodeController) UpgradeBinary(ctx context.Context, descriptor *upgrade.Descriptor) error { + return c.upgrader.SubmitDescriptor(ctx, descriptor) +} + +func (c *nodeController) CancelUpgrade(ctx context.Context) error { + return c.upgrader.CancelUpgrade(ctx) +} + // New creates a new oasis-node controller. -func New(node api.Shutdownable, consensus consensus.Backend) api.NodeController { +func New(node control.Shutdownable, consensus consensus.Backend, upgrader upgrade.Backend) control.NodeController { return &nodeController{ node: node, consensus: consensus, + upgrader: upgrader, } } diff --git a/go/oasis-node/cmd/control/control.go b/go/oasis-node/cmd/control/control.go index 0e2749b54e6..ae65fe0b6c9 100644 --- a/go/oasis-node/cmd/control/control.go +++ b/go/oasis-node/cmd/control/control.go @@ -3,7 +3,9 @@ package control import ( "context" + "encoding/json" "fmt" + "io/ioutil" "os" "github.com/spf13/cobra" @@ -13,6 +15,7 @@ import ( control "github.com/oasislabs/oasis-core/go/control/api" cmdCommon "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common" cmdGrpc "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/grpc" + upgrade "github.com/oasislabs/oasis-core/go/upgrade/api" ) var ( @@ -41,6 +44,19 @@ var ( Run: doShutdown, } + controlUpgradeBinaryCmd = &cobra.Command{ + Use: "upgrade-binary ", + Short: "submit an upgrade descriptor to the node and request shutdown", + Args: cobra.ExactArgs(1), + Run: doUpgradeBinary, + } + + controlCancelUpgradeCmd = &cobra.Command{ + Use: "cancel-upgrade", + Short: "cancel a pending upgrade unless it is already in progress", + Run: doCancelUpgrade, + } + logger = logging.GetLogger("cmd/control") ) @@ -115,6 +131,52 @@ func doShutdown(cmd *cobra.Command, args []string) { } } +func doUpgradeBinary(cmd *cobra.Command, args []string) { + conn, client := DoConnect(cmd) + defer conn.Close() + + descriptorBytes, err := ioutil.ReadFile(args[0]) + if err != nil { + logger.Error("failed to read upgrade descriptor", + "err", err, + ) + os.Exit(1) + } + + var desc upgrade.Descriptor + if err = json.Unmarshal(descriptorBytes, &desc); err != nil { + logger.Error("can't parse upgrade descriptor", + "err", err, + ) + os.Exit(1) + } + + if !desc.IsValid() { + logger.Error("submitted upgrade descriptor is not valid") + os.Exit(1) + } + + if err = client.UpgradeBinary(context.Background(), &desc); err != nil { + logger.Error("error while sending upgrade descriptor to the node", + "err", err, + ) + os.Exit(1) + } +} + +func doCancelUpgrade(cmd *cobra.Command, args []string) { + conn, client := DoConnect(cmd) + defer conn.Close() + + err := client.CancelUpgrade(context.Background()) + if err != nil { + logger.Error("failed to send upgrade cancellation request", + "err", err, + ) + os.Exit(1) + } +} + // Register registers the client sub-command and all of it's children. func Register(parentCmd *cobra.Command) { controlCmd.PersistentFlags().AddFlagSet(cmdGrpc.ClientFlags) @@ -124,5 +186,7 @@ func Register(parentCmd *cobra.Command) { controlCmd.AddCommand(controlIsSyncedCmd) controlCmd.AddCommand(controlWaitSyncCmd) controlCmd.AddCommand(controlShutdownCmd) + controlCmd.AddCommand(controlUpgradeBinaryCmd) + controlCmd.AddCommand(controlCancelUpgradeCmd) parentCmd.AddCommand(controlCmd) } diff --git a/go/oasis-node/cmd/debug/byzantine/tendermint.go b/go/oasis-node/cmd/debug/byzantine/tendermint.go index 0cee6f4af95..2c28f454444 100644 --- a/go/oasis-node/cmd/debug/byzantine/tendermint.go +++ b/go/oasis-node/cmd/debug/byzantine/tendermint.go @@ -37,7 +37,7 @@ func (ht *honestTendermint) start(id *identity.Identity, dataDir string) error { } genesisDoc.SetChainContext() - ht.service, err = tendermint.New(context.Background(), dataDir, id, genesis) + ht.service, err = tendermint.New(context.Background(), dataDir, id, nil, genesis) if err != nil { return errors.Wrap(err, "tendermint New") } diff --git a/go/oasis-node/cmd/node/node.go b/go/oasis-node/cmd/node/node.go index 7bcfbf01550..84d9e270e43 100644 --- a/go/oasis-node/cmd/node/node.go +++ b/go/oasis-node/cmd/node/node.go @@ -54,6 +54,8 @@ import ( stakingAPI "github.com/oasislabs/oasis-core/go/staking/api" "github.com/oasislabs/oasis-core/go/storage" storageAPI "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/upgrade" + upgradeAPI "github.com/oasislabs/oasis-core/go/upgrade/api" workerCommon "github.com/oasislabs/oasis-core/go/worker/common" "github.com/oasislabs/oasis-core/go/worker/common/p2p" "github.com/oasislabs/oasis-core/go/worker/compute" @@ -107,6 +109,7 @@ type Node struct { Consensus consensusAPI.Backend + Upgrader upgradeAPI.Backend Genesis genesisAPI.Provider Identity *identity.Identity Beacon beacon.Backend @@ -509,6 +512,21 @@ func newNode(testNode bool) (*Node, error) { return nil, err } + // Initialize upgrader backend and check if we can even launch. + node.Upgrader, err = upgrade.New(node.commonStore, cmdCommon.DataDir()) + if err != nil { + logger.Error("failed to initialize upgrade backend", + "err", err, + ) + return nil, err + } + if err = node.Upgrader.StartupUpgrade(); err != nil { + logger.Error("error occurred during startup upgrade", + "err", err, + ) + return nil, err + } + // Generate/Load the node identity. // TODO/hsm: Configure factory dynamically. signerFactory := fileSigner.NewFactory(dataDir, signature.SignerNode, signature.SignerP2P, signature.SignerConsensus) @@ -593,7 +611,7 @@ func newNode(testNode bool) (*Node, error) { node.svcMgr.Register(node.svcTmntSeed) } else { // Initialize Tendermint service. - node.svcTmnt, err = tendermint.New(node.svcMgr.Ctx, dataDir, node.Identity, node.Genesis) + node.svcTmnt, err = tendermint.New(node.svcMgr.Ctx, dataDir, node.Identity, node.Upgrader, node.Genesis) if err != nil { logger.Error("failed to initialize tendermint service", "err", err, @@ -721,7 +739,7 @@ func newNode(testNode bool) (*Node, error) { } // Initialize and start the node controller. - node.NodeController = control.New(node, node.Consensus) + node.NodeController = control.New(node, node.Consensus, node.Upgrader) controlAPI.RegisterService(node.grpcInternal.Server(), node.NodeController) if flags.DebugDontBlameOasis() { // Initialize and start the debug controller if we are in debug mode. diff --git a/go/oasis-test-runner/oasis/byzantine.go b/go/oasis-test-runner/oasis/byzantine.go index 4f52c6b6261..920fe7e3d48 100644 --- a/go/oasis-test-runner/oasis/byzantine.go +++ b/go/oasis-test-runner/oasis/byzantine.go @@ -115,6 +115,7 @@ func (net *Network) NewByzantine(cfg *ByzantineCfg) (*Byzantine, error) { activationEpoch: cfg.ActivationEpoch, } worker.doStartNode = worker.startNode + copy(worker.NodeID[:], publicKey[:]) net.byzantine = append(net.byzantine, worker) net.nextNodePort += 2 diff --git a/go/oasis-test-runner/oasis/compute.go b/go/oasis-test-runner/oasis/compute.go index 9add4ed2d2f..01f9f0d3d38 100644 --- a/go/oasis-test-runner/oasis/compute.go +++ b/go/oasis-test-runner/oasis/compute.go @@ -158,6 +158,7 @@ func (net *Network) NewCompute(cfg *ComputeCfg) (*Compute, error) { p2pPort: net.nextNodePort + 2, } worker.doStartNode = worker.startNode + copy(worker.NodeID[:], publicKey[:]) net.computeWorkers = append(net.computeWorkers, worker) net.nextNodePort += 3 diff --git a/go/oasis-test-runner/oasis/keymanager.go b/go/oasis-test-runner/oasis/keymanager.go index 2daea5267f5..45220e3b4f8 100644 --- a/go/oasis-test-runner/oasis/keymanager.go +++ b/go/oasis-test-runner/oasis/keymanager.go @@ -267,6 +267,7 @@ func (net *Network) NewKeymanager(cfg *KeymanagerCfg) (*Keymanager, error) { workerClientPort: net.nextNodePort + 1, } km.doStartNode = km.startNode + copy(km.NodeID[:], publicKey[:]) net.keymanagers = append(net.keymanagers, km) net.nextNodePort += 2 diff --git a/go/oasis-test-runner/oasis/log.go b/go/oasis-test-runner/oasis/log.go index 67cc39859bc..542397af479 100644 --- a/go/oasis-test-runner/oasis/log.go +++ b/go/oasis-test-runner/oasis/log.go @@ -5,6 +5,7 @@ import ( tendermint "github.com/oasislabs/oasis-core/go/consensus/tendermint/api" "github.com/oasislabs/oasis-core/go/oasis-test-runner/log" roothash "github.com/oasislabs/oasis-core/go/roothash/api" + upgrade "github.com/oasislabs/oasis-core/go/upgrade/api" ) // LogAssertEvent returns a handler which checks whether a specific log event was @@ -66,3 +67,15 @@ func LogAssertNoMergeDiscrepancyDetected() log.WatcherHandlerFactory { func LogAssertPeerExchangeDisabled() log.WatcherHandlerFactory { return LogAssertEvent(tendermint.LogEventPeerExchangeDisabled, "peer exchange not disabled") } + +// LogAssertUpgradeStartup returns a handler which checks whether a startup migration +// handler was run based on JSON log output. +func LogAssertUpgradeStartup() log.WatcherHandlerFactory { + return LogAssertEvent(upgrade.LogEventStartupUpgrade, "expected startup upgrade did not run") +} + +// LogAssertUpgradeConsensus returns a handler which checks whether a consensus migration +// handler was run based on JSON log output. +func LogAssertUpgradeConsensus() log.WatcherHandlerFactory { + return LogAssertEvent(upgrade.LogEventConsensusUpgrade, "expected consensus upgrade did not run") +} diff --git a/go/oasis-test-runner/oasis/oasis.go b/go/oasis-test-runner/oasis/oasis.go index 128390b0be6..18a4916e1f8 100644 --- a/go/oasis-test-runner/oasis/oasis.go +++ b/go/oasis-test-runner/oasis/oasis.go @@ -50,7 +50,8 @@ const ( // Node defines the common fields for all node types. type Node struct { // nolint: maligned - Name string + Name string + NodeID signature.PublicKey net *Network dir *env.Dir diff --git a/go/oasis-test-runner/oasis/storage.go b/go/oasis-test-runner/oasis/storage.go index a2d2d02ced6..d2394f4c947 100644 --- a/go/oasis-test-runner/oasis/storage.go +++ b/go/oasis-test-runner/oasis/storage.go @@ -171,6 +171,7 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) { p2pPort: net.nextNodePort + 2, } worker.doStartNode = worker.startNode + copy(worker.NodeID[:], publicKey[:]) net.storageWorkers = append(net.storageWorkers, worker) net.nextNodePort += 3 diff --git a/go/oasis-test-runner/oasis/validator.go b/go/oasis-test-runner/oasis/validator.go index 45e2ba8cda3..46e9c93b066 100644 --- a/go/oasis-test-runner/oasis/validator.go +++ b/go/oasis-test-runner/oasis/validator.go @@ -169,8 +169,9 @@ func (net *Network) NewValidator(cfg *ValidatorCfg) (*Validator, error) { if err != nil { return nil, errors.Wrap(err, "oasis/validator: failed to provision node identity") } - val.tmAddress = crypto.PublicKeyToTendermint(&valPublicKey).Address().String() - if err = cfg.Entity.addNode(valPublicKey); err != nil { + copy(val.NodeID[:], valPublicKey[:]) + val.tmAddress = crypto.PublicKeyToTendermint(&val.NodeID).Address().String() + if err = cfg.Entity.addNode(val.NodeID); err != nil { return nil, err } diff --git a/go/oasis-test-runner/scenario/e2e/upgrade.go b/go/oasis-test-runner/scenario/e2e/upgrade.go new file mode 100644 index 00000000000..66f290052ab --- /dev/null +++ b/go/oasis-test-runner/scenario/e2e/upgrade.go @@ -0,0 +1,335 @@ +package e2e + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "path" + "sync" + "time" + + "github.com/spf13/viper" + + "github.com/oasislabs/oasis-core/go/common/crypto/hash" + "github.com/oasislabs/oasis-core/go/common/node" + "github.com/oasislabs/oasis-core/go/common/persistent" + "github.com/oasislabs/oasis-core/go/common/pubsub" + "github.com/oasislabs/oasis-core/go/common/sgx" + "github.com/oasislabs/oasis-core/go/common/sgx/ias" + consensus "github.com/oasislabs/oasis-core/go/consensus/api" + epoch "github.com/oasislabs/oasis-core/go/epochtime/api" + "github.com/oasislabs/oasis-core/go/oasis-test-runner/env" + "github.com/oasislabs/oasis-core/go/oasis-test-runner/log" + "github.com/oasislabs/oasis-core/go/oasis-test-runner/oasis" + "github.com/oasislabs/oasis-core/go/oasis-test-runner/oasis/cli" + "github.com/oasislabs/oasis-core/go/oasis-test-runner/scenario" + registry "github.com/oasislabs/oasis-core/go/registry/api" + "github.com/oasislabs/oasis-core/go/upgrade/migrations" +) + +var ( + // NodeUpgrade is the node upgrade scenario. + NodeUpgrade scenario.Scenario = newNodeUpgradeImpl() + + malformedDescriptor = []byte(`{ + "name": "nifty upgrade", + "epoch": 1, + "method": "nifty", + "identifier": "this is a hash. i repeat. this is a hash, not a string." + }`) + + // Warning: this string contains printf conversions, it's NOT directly usable as a descriptor. + nonexistentDescriptorTemplate = `{ + "name": "nonexistent-handler", + "epoch": %d, + "method": "internal", + "identifier": "0000000000000000000000000000000000000000000000000000000000000000" + }` + + // Warning: this string contains printf conversions, it's NOT directly usable as a descriptor. + validDescriptorTemplate = `{ + "name": "%v", + "epoch": %d, + "method": "internal", + "identifier": "%v" + }` +) + +type nodeUpgradeImpl struct { + basicImpl + + validator *oasis.Validator + controller *oasis.Controller + + nodeCh <-chan *registry.NodeEvent + + ctx context.Context + currentEpoch epoch.EpochTime +} + +func (sc *nodeUpgradeImpl) writeDescriptor(name string, content []byte) (string, error) { + filePath := path.Join(sc.net.BasePath(), "upgrade-"+name+".json") + if err := ioutil.WriteFile(filePath, content, 0644); err != nil { + sc.logger.Error("can't write descriptor to network directory", + "err", err, + "name", name, + ) + return "", err + } + return filePath, nil +} + +func (sc *nodeUpgradeImpl) nextEpoch() error { + sc.currentEpoch++ + if err := sc.net.Controller().SetEpoch(sc.ctx, sc.currentEpoch); err != nil { + return fmt.Errorf("failed to set epoch to %d: %w", sc.currentEpoch, err) + } + return nil +} + +func (sc *nodeUpgradeImpl) restart(wait bool) error { + sc.logger.Debug("restarting validator") + if err := sc.validator.Restart(); err != nil { + return fmt.Errorf("can't restart validator: %w", err) + } + + if !wait { + return nil + } + + for { + select { + case ev := <-sc.nodeCh: + if ev.IsRegistration && ev.Node.ID.Equal(sc.validator.NodeID) { + // Nothing else is restarted, so no need to check for specifics here. + _ = sc.controller.WaitSync(sc.ctx) + return nil + } + case <-time.After(60 * time.Second): + return fmt.Errorf("timed out waiting for validator to re-register") + } + } +} + +func newNodeUpgradeImpl() scenario.Scenario { + sc := &nodeUpgradeImpl{ + basicImpl: *newBasicImpl("node-upgrade", "", nil), + ctx: context.Background(), + } + return sc +} + +func (sc *nodeUpgradeImpl) Name() string { + return "node-upgrade" +} + +func (sc *nodeUpgradeImpl) Fixture() (*oasis.NetworkFixture, error) { + var tee node.TEEHardware + err := tee.FromString(viper.GetString(cfgTEEHardware)) + if err != nil { + return nil, err + } + var mrSigner *sgx.MrSigner + if tee == node.TEEHardwareIntelSGX { + mrSigner = &ias.FortanixTestMrSigner + } + + return &oasis.NetworkFixture{ + TEE: oasis.TEEFixture{ + Hardware: tee, + MrSigner: mrSigner, + }, + Network: oasis.NetworkCfg{ + NodeBinary: viper.GetString(cfgNodeBinary), + RuntimeLoaderBinary: viper.GetString(cfgRuntimeLoader), + EpochtimeMock: true, + DefaultLogWatcherHandlerFactories: []log.WatcherHandlerFactory{ + oasis.LogAssertUpgradeStartup(), + oasis.LogAssertUpgradeConsensus(), + }, + }, + Entities: []oasis.EntityCfg{ + oasis.EntityCfg{IsDebugTestEntity: true}, + oasis.EntityCfg{}, + }, + Validators: []oasis.ValidatorFixture{ + oasis.ValidatorFixture{Entity: 1, MinGasPrice: 1, Restartable: true}, + oasis.ValidatorFixture{Entity: 1, MinGasPrice: 1, Restartable: true}, + oasis.ValidatorFixture{Entity: 1, MinGasPrice: 1, Restartable: true}, + oasis.ValidatorFixture{Entity: 1, MinGasPrice: 1, Restartable: true}, + }, + }, nil +} + +func (sc *nodeUpgradeImpl) Run(childEnv *env.Env) error { + var err error + var descPath string + + if err = sc.net.Start(); err != nil { + return err + } + + sc.logger.Info("waiting for network to come up") + if err = sc.net.Controller().WaitNodesRegistered(sc.ctx, 4); err != nil { + return err + } + if err = sc.nextEpoch(); err != nil { + return err + } + + var nodeSub pubsub.ClosableSubscription + sc.nodeCh, nodeSub, err = sc.net.Controller().Registry.WatchNodes(sc.ctx) + if err != nil { + return fmt.Errorf("can't subscribe to registry node events: %w", err) + } + defer nodeSub.Close() + + sc.validator = sc.net.Validators()[1] // the network controller is on the first one + submitArgs := []string{ + "control", "upgrade-binary", + "--log.level", "debug", + "--wait", + "--address", "unix:" + sc.validator.SocketPath(), + } + + // Wait for the node to be ready since we didn't wait for any clients. + sc.controller, err = oasis.NewController(sc.validator.SocketPath()) + if err != nil { + return err + } + if err = sc.controller.WaitSync(sc.ctx); err != nil { + return err + } + + // Try submitting an invalid update descriptor. + // This should return immediately and the node should still be running. + sc.logger.Info("submitting invalid upgrade descriptor") + if descPath, err = sc.writeDescriptor("malformed", malformedDescriptor); err != nil { + return err + } + if err = cli.RunSubCommand(childEnv, sc.logger, "control-upgrade", sc.net.Config().NodeBinary, append(submitArgs, descPath)); err == nil { + sc.logger.Error("submitting malformed descriptor didn't result in an error. that's an error.") + return errors.New("there should be errors with malformed descriptor") + } + + // Try submitting a well formed descriptor but with an off hash, so no handlers are run. + // The node should exit immediately. + sc.logger.Info("submitting descriptor with nonexistent upgrade handler") + nonexistentDescriptor := fmt.Sprintf(nonexistentDescriptorTemplate, sc.currentEpoch+1) + if descPath, err = sc.writeDescriptor("nonexistent", []byte(nonexistentDescriptor)); err != nil { + return err + } + + if err = cli.RunSubCommand(childEnv, sc.logger, "control-upgrade", sc.net.Config().NodeBinary, append(submitArgs, descPath)); err != nil { + return fmt.Errorf("error submitting descriptor with nonexistent handler to node: %w", err) + } + + if err = sc.nextEpoch(); err != nil { + return err + } + <-sc.validator.Exit() + // The node will exit uncleanly due to the interesting consensus implementation. + // We don't need the error here. + + // Try restarting the node. It should exit immediately now; on paper it can't handle the upgrade + // described in the descriptor. + if err = sc.restart(false); err != nil { + return err + } + <-sc.validator.Exit() + + // Remove the stored descriptor so we can restart and submit a proper one. + sc.logger.Info("clearing stored upgrade descriptor") + store, err := persistent.NewCommonStore(sc.validator.DataDir()) + if err != nil { + return fmt.Errorf("can't open upgraded node's persistent store: %w", err) + } + svcStore, err := store.GetServiceStore("upgrade") + if err != nil { + store.Close() + return fmt.Errorf("can't open upgraded node's upgrade module storage: %w", err) + } + if err = svcStore.Delete([]byte("descriptor")); err != nil { + svcStore.Close() + store.Close() + return fmt.Errorf("can't delete descripotor from upgraded node's persistent store: %w", err) + } + svcStore.Close() + store.Close() + + // Generate a valid upgrade descriptor; this should exercise the test handlers in the node. + var nodeHash hash.Hash + nodeText, err := ioutil.ReadFile(sc.net.Config().NodeBinary) + if err != nil { + return fmt.Errorf("can't read node binary for hashing: %w", err) + } + nodeHash.FromBytes(nodeText) + + validDescriptor := fmt.Sprintf(validDescriptorTemplate, migrations.DummyUpgradeName, sc.currentEpoch+1, nodeHash.String()) + + if descPath, err = sc.writeDescriptor("valid", []byte(validDescriptor)); err != nil { + return err + } + + // Restart the node again, so we have the full set of validators. + if err = sc.restart(true); err != nil { + return err + } + + // Now submit the valid descriptor to all of the validators. + sc.logger.Info("submitting valid upgrade descriptor to all validators") + for i, val := range sc.net.Validators() { + submitArgs[len(submitArgs)-1] = "unix:" + val.SocketPath() + if err = cli.RunSubCommand(childEnv, sc.logger, "control-upgrade", sc.net.Config().NodeBinary, append(submitArgs, descPath)); err != nil { + return fmt.Errorf("failed to submit upgrade descriptor to validator %d: %w", i, err) + } + } + if err = sc.nextEpoch(); err != nil { + return err + } + + sc.logger.Info("restarting network") + errCh := make(chan error, len(sc.net.Validators())) + var group sync.WaitGroup + for i, val := range sc.net.Validators() { + group.Add(1) + go func(i int, val *oasis.Validator) { + defer group.Done() + sc.logger.Debug("waiting for validator to exit", "num", i) + <-val.Exit() + sc.logger.Debug("restarting validator", "num", i) + if restartError := val.Restart(); err != nil { + errCh <- restartError + } + }(i, val) + } + + group.Wait() + select { + case err = <-errCh: + return fmt.Errorf("can't restart upgraded validator for upgrade test: %w", err) + default: + } + + sc.logger.Info("waiting for network to come back up") + if err = sc.net.Controller().WaitNodesRegistered(sc.ctx, 4); err != nil { + return err + } + sc.logger.Info("final epoch advance") + if err = sc.nextEpoch(); err != nil { + return err + } + + // Check the entity set during consensus upgrade. + idQuery := ®istry.IDQuery{ + Height: consensus.HeightLatest, + ID: migrations.TestEntity.ID, + } + _, err = sc.net.Controller().Registry.GetEntity(sc.ctx, idQuery) + if err != nil { + return fmt.Errorf("can't get registered test entity: %w", err) + } + + return sc.net.CheckLogWatchers() +} diff --git a/go/oasis-test-runner/test-runner.go b/go/oasis-test-runner/test-runner.go index dc83e640505..8c96599b129 100644 --- a/go/oasis-test-runner/test-runner.go +++ b/go/oasis-test-runner/test-runner.go @@ -60,6 +60,8 @@ func main() { // Transaction source test. _ = cmd.Register(e2e.TxSourceTransferShort) _ = cmd.RegisterNondefault(e2e.TxSourceTransfer) + // Node upgrade test. + _ = cmd.Register(e2e.NodeUpgrade) // Execute the command, now that everything has been initialized. cmd.Execute() diff --git a/go/upgrade/api/api.go b/go/upgrade/api/api.go new file mode 100644 index 00000000000..922d123866e --- /dev/null +++ b/go/upgrade/api/api.go @@ -0,0 +1,147 @@ +// Package api defines the interface exporting the upgrade infrastructure's functionality. +package api + +import ( + "context" + + "github.com/oasislabs/oasis-core/go/common/errors" + epochtime "github.com/oasislabs/oasis-core/go/epochtime/api" +) + +const ( + // ModuleName is the upgrade module name. + ModuleName = "upgrade" + + // UpgradeMethInternal is the internal upgrade method, + // where the node binary itself has the migration code. + UpgradeMethInternal = "internal" + + // LogEventStartupUpgrade is a log event value that signals the startup upgrade handler was called. + LogEventStartupUpgrade = "dummy-migration/startup-upgrade" + // LogEventConsensusUpgrade is a log event value that signals the consensus upgrade handler was called. + LogEventConsensusUpgrade = "dummy-migration/consensus-upgrade" +) + +// UpgradeStage is used in the upgrade descriptor to store completed stages. +type UpgradeStage uint64 + +const ( + // UpgradeStageStartup is the startup upgrade stage, executed at the beginning of node startup. + UpgradeStageStartup UpgradeStage = 1 + + // UpgradeStageConsensus is the upgrade stage carried out during consensus events. + UpgradeStageConsensus UpgradeStage = 2 + + upgradeStageLast = UpgradeStageConsensus + + // InvalidUpgradeHeight means the upgrade epoch hasn't been reached yet. + InvalidUpgradeHeight = int64(0) +) + +var ( + // ErrStopForUpgrade is the error returned by the consensus upgrade function when it detects that + // the consensus layer has reached the scheduled shutdown epoch and should be interrupted. + ErrStopForUpgrade = errors.New(ModuleName, 1, "upgrade: reached upgrade epoch") + + // ErrUpgradePending is the error returned when there is a pending upgrade and the node detects that it is + // not the one performing it. + ErrUpgradePending = errors.New(ModuleName, 2, "upgrade: this binary is scheduled to be replaced") + + // ErrNewTooSoon is the error returned when the node started isn't the pre-upgrade version and the upgrade + // epoch hasn't been reached yet. + ErrNewTooSoon = errors.New(ModuleName, 3, "upgrade: running different binary before reaching the upgrade epoch") + + // ErrInvalidResumingVersion is the error returned when the running node's version is different from the one that + // started performing the upgrade. + ErrInvalidResumingVersion = errors.New(ModuleName, 4, "upgrade: node restarted mid-upgrade with different version") + + // ErrAlreadyPending is the error returned from SubmitDescriptor when there is already an upgrade pending. + ErrAlreadyPending = errors.New(ModuleName, 5, "upgrade: an upgrade is already pending, can not submit new descriptor") + + // ErrUpgradeInProgress is the error returned from CancelUpgrade when the upgrade being cancelled is already in progress. + ErrUpgradeInProgress = errors.New(ModuleName, 6, "upgrade: can not cancel upgrade in progress") +) + +// Descriptor describes an upgrade. +type Descriptor struct { + // Name is the name of the upgrade. It should be derived from the node version. + Name string `json:"name"` + // Method is the upgrade method that should be used for this upgrade. + Method string `json:"method"` + // Identifier is a hash of the binary to be used for upgrading. + // Upgrade methods other than "internal" may have differently formatted identifiers. + Identifier string `json:"identifier"` + // Epoch is the epoch at which the upgrade should happen. + Epoch epochtime.EpochTime `json:"epoch"` +} + +// IsValid checks if the upgrade descriptor is valid. +func (d Descriptor) IsValid() bool { + if d.Method != UpgradeMethInternal { + return false + } + if d.Epoch < 1 { + return false + } + return true +} + +// PendingUpgrade describes a currently pending upgrade and includes the +// submitted upgrade descriptor. +type PendingUpgrade struct { + // Descriptor is the upgrade descriptor describing the upgrade. + Descriptor *Descriptor `json:"descriptor"` + + // SubmittingVersion is the version of the node used to submit the descriptor. + SubmittingVersion string `json:"submitting_version"` + // RunningVersion is the version of the node trying to execute the descriptor. + RunningVersion string `json:"running_version"` + + // UpgradeHeight is the height at which the upgrade epoch was reached + // (or InvalidUpgradeHeight if it hasn't been reached yet). + UpgradeHeight int64 `json:"upgrade_height"` + + // LastCompletedStage is the last upgrade stage that was successfully completed. + LastCompletedStage UpgradeStage `json:"last_completed_stage"` +} + +// IsCompleted checks if all upgrade stages were already completed. +func (pu PendingUpgrade) IsCompleted() bool { + return pu.LastCompletedStage >= upgradeStageLast +} + +// HasAnyStages checks if any stages were completed at all. +func (pu PendingUpgrade) HasAnyStages() bool { + return pu.LastCompletedStage > 0 +} + +// HasStage checks if a given stage has been completed or not. +func (pu PendingUpgrade) HasStage(stage UpgradeStage) bool { + return pu.LastCompletedStage >= stage +} + +// PushStage marks the given stage as completed. +func (pu *PendingUpgrade) PushStage(stage UpgradeStage) { + if pu.LastCompletedStage+1 != stage { + panic("upgrade: out of order upgrade stage execution") + } + pu.LastCompletedStage = stage +} + +// Backend defines the interface for upgrade managers. +type Backend interface { + // SubmitDescriptor submits the serialized descriptor to the upgrade manager + // which then schedules and manages the upgrade. + SubmitDescriptor(context.Context, *Descriptor) error + + // CancelUpgrade cancels a pending upgrade, unless it is already in progress + CancelUpgrade(context.Context) error + + // StartupUpgrade performs the startup portion of the upgrade. + // It is idempotent with respect to the current upgrade descriptor. + StartupUpgrade() error + + // ConsensusUpgrade performs the consensus portion of the upgrade. + // It is idempotent with respect to the current upgrade descriptor. + ConsensusUpgrade(interface{}, epochtime.EpochTime, int64) error +} diff --git a/go/upgrade/migrations/dummy.go b/go/upgrade/migrations/dummy.go new file mode 100644 index 00000000000..839dc2daba4 --- /dev/null +++ b/go/upgrade/migrations/dummy.go @@ -0,0 +1,69 @@ +package migrations + +import ( + "github.com/oasislabs/oasis-core/go/common/crypto/signature" + "github.com/oasislabs/oasis-core/go/common/crypto/signature/signers/memory" + "github.com/oasislabs/oasis-core/go/common/entity" + "github.com/oasislabs/oasis-core/go/consensus/tendermint/abci" + registryState "github.com/oasislabs/oasis-core/go/consensus/tendermint/apps/registry/state" + stakingState "github.com/oasislabs/oasis-core/go/consensus/tendermint/apps/staking/state" + registry "github.com/oasislabs/oasis-core/go/registry/api" + staking "github.com/oasislabs/oasis-core/go/staking/api" +) + +const ( + // DummyUpgradeName is the name of the dummy upgrade, for use in the upgrade descriptor. + DummyUpgradeName = "__e2e-test-valid" + + testSigningSeed = "__e2e-test-migration-entity" +) + +var ( + _ Handler = (*dummyMigrationHandler)(nil) + + TestEntity entity.Entity + + entitySigner signature.Signer +) + +func init() { + entitySigner = memory.NewTestSigner(testSigningSeed) + TestEntity.ID = entitySigner.Public() +} + +type dummyMigrationHandler struct { +} + +func (th *dummyMigrationHandler) StartupUpgrade(ctx *Context) error { + return nil +} + +func (th *dummyMigrationHandler) ConsensusUpgrade(ctx *Context, privateCtx interface{}) error { + abciCtx := privateCtx.(*abci.Context) + regState := registryState.NewMutableState(abciCtx.State()) + stakeState := stakingState.NewMutableState(abciCtx.State()) + + sigEntity, err := entity.SignEntity(entitySigner, registry.RegisterEntitySignatureContext, &TestEntity) + if err != nil { + return err + } + + // Add a new entity to the registry. The test runner will check for its presence to verify + // the migration ran successfully. + regState.SetEntity(&TestEntity, sigEntity) + + // Set this entity's staking properly. + stakeState.SetAccount(TestEntity.ID, &staking.Account{ + Escrow: staking.EscrowAccount{ + StakeAccumulator: staking.StakeAccumulator{ + Claims: map[staking.StakeClaim][]staking.ThresholdKind{ + registry.StakeClaimRegisterEntity: []staking.ThresholdKind{ + staking.KindEntity, + }, + }, + }, + }, + }) + + return nil +} diff --git a/go/upgrade/migrations/migrations.go b/go/upgrade/migrations/migrations.go new file mode 100644 index 00000000000..e4200d9feff --- /dev/null +++ b/go/upgrade/migrations/migrations.go @@ -0,0 +1,69 @@ +// Package migrations implements upgrade migration handlers. +package migrations + +import ( + "github.com/oasislabs/oasis-core/go/common/logging" + upgradeApi "github.com/oasislabs/oasis-core/go/upgrade/api" +) + +const ( + // ModuleName is the migration module name. + ModuleName = "upgrade-migrations" +) + +var ( + registeredHandlers = map[string]Handler{ + DummyUpgradeName: &dummyMigrationHandler{}, + } +) + +// Handler is the interface used by migration handlers. +type Handler interface { + // StartupUpgrade is called by the upgrade manager to perform + // the node startup portion of the upgrade. + StartupUpgrade(*Context) error + + // ConsensusUpgrade is called by the upgrade manager to perform + // the consensus portion of the upgrade. The interface argument is + // a private structure passed to Backend.ConsensusUpgrade by the + // consensus backend. + ConsensusUpgrade(*Context, interface{}) error +} + +// Context defines the common context used by migration handlers. +type Context struct { + // Upgrade is the currently pending upgrade structure. + Upgrade *upgradeApi.PendingUpgrade + + // DataDir is the node's data directory. + DataDir string + + Logger *logging.Logger +} + +// Register registers a new migration handler, by upgrade name. +func Register(name string, handler Handler) { + registeredHandlers[name] = handler +} + +// NewContext returns a new upgrade migration context. +func NewContext(upgrade *upgradeApi.PendingUpgrade, dataDir string) *Context { + return &Context{ + Upgrade: upgrade, + DataDir: dataDir, + Logger: logging.GetLogger(ModuleName), + } +} + +// GetHandler returns the handler associated with the upgrade described in the context. +// If the handler does not exist, this is considered a severe programmer error and will result in a panic. +func GetHandler(ctx *Context) Handler { + handler, ok := registeredHandlers[ctx.Upgrade.Descriptor.Name] + if !ok { + // If we got here, that means the upgrade descriptor checked out, including the upgrader hash. + // Nothing left to do but bite the dust. + panic("unknown upgrade name, no way forward") + } + + return handler +} diff --git a/go/upgrade/upgrade.go b/go/upgrade/upgrade.go new file mode 100644 index 00000000000..7a10f97c792 --- /dev/null +++ b/go/upgrade/upgrade.go @@ -0,0 +1,282 @@ +// Package upgrade implements the node upgrade backend. +// +// After submitting an upgrade descriptor, the old node may continue +// running or be restarted up to the point when the consensus layer reaches +// the upgrade epoch. The new node may not be started until the old node has +// reached the upgrade epoch. +package upgrade + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "sync" + + "github.com/oasislabs/oasis-core/go/common/crypto/hash" + "github.com/oasislabs/oasis-core/go/common/logging" + "github.com/oasislabs/oasis-core/go/common/persistent" + "github.com/oasislabs/oasis-core/go/common/version" + epochtime "github.com/oasislabs/oasis-core/go/epochtime/api" + "github.com/oasislabs/oasis-core/go/upgrade/api" + "github.com/oasislabs/oasis-core/go/upgrade/migrations" +) + +var ( + _ api.Backend = (*upgradeManager)(nil) + + metadataStoreKey = []byte("descriptor") + + thisVersion = makeVersionString() +) + +func hashSelf() (*hash.Hash, error) { + path, err := os.Executable() + if err != nil { + return nil, err + } + + contents, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + + var hash hash.Hash + hash.FromBytes(contents) + return &hash, nil +} + +func makeVersionString() string { + return version.SoftwareVersion +} + +type upgradeManager struct { + store *persistent.ServiceStore + pending *api.PendingUpgrade + lock sync.Mutex + + ctx *migrations.Context + handler migrations.Handler + + logger *logging.Logger +} + +func (u *upgradeManager) SubmitDescriptor(ctx context.Context, descriptor *api.Descriptor) error { + u.lock.Lock() + defer u.lock.Unlock() + + if u.pending != nil { + return api.ErrAlreadyPending + } + + u.pending = &api.PendingUpgrade{ + Descriptor: descriptor, + } + u.pending.SubmittingVersion = thisVersion + + u.logger.Info("received upgrade descriptor, scheduling shutdown", + "name", u.pending.Descriptor.Name, + "epoch", u.pending.Descriptor.Epoch, + ) + + return u.flushDescriptor() +} + +func (u *upgradeManager) CancelUpgrade(ctx context.Context) error { + u.lock.Lock() + defer u.lock.Unlock() + + if u.pending == nil { + // Make sure nothing is saved. + return u.flushDescriptor() + } + + if u.pending.RunningVersion != "" || u.pending.UpgradeHeight != api.InvalidUpgradeHeight || u.pending.HasAnyStages() { + return api.ErrUpgradeInProgress + } + + oldPending := u.pending + u.pending = nil + if err := u.flushDescriptor(); err != nil { + u.pending = oldPending + return err + } + return nil +} + +func (u *upgradeManager) checkStatus() error { + var err error + + if err = u.store.GetCBOR(metadataStoreKey, &u.pending); err != nil { + if err == persistent.ErrNotFound { + // No upgrade pending, nothing to do. + u.logger.Debug("no pending descriptor, continuing startup") + return nil + } + return fmt.Errorf("can't decode stored upgrade descriptor: %w", err) + } + + if u.pending.IsCompleted() { + // This technically shouldn't happen, but isn't really an error either. + return u.flushDescriptor() + } + + // By this point, the descriptor is valid and still pending. + if u.pending.UpgradeHeight == api.InvalidUpgradeHeight { + // Only allow the old binary to run before the upgrade epoch. + if u.pending.SubmittingVersion != thisVersion { + return api.ErrNewTooSoon + } + return nil + } + + // Otherwise, the upgrade should proceed right now. Check that we're the right binary. + thisHash, err := hashSelf() + if err != nil { + return err + } + + var upgraderHash hash.Hash + if err = upgraderHash.UnmarshalHex(u.pending.Descriptor.Identifier); err != nil { + return fmt.Errorf("can't decode stored upgrade identifier: %w", err) + } + + if !thisHash.Equal(&upgraderHash) { + return api.ErrUpgradePending + } + + // In case the previous startup was e.g. interruptd during the second part of the + // upgrade, we need to make sure that we're the same version as the previous run. + if u.pending.RunningVersion != "" && u.pending.RunningVersion != thisVersion { + return api.ErrInvalidResumingVersion + } + + // Everything checks out, fill in the blanks. + u.pending.RunningVersion = thisVersion + _ = u.flushDescriptor() + u.logger.Info("loaded pending upgrade metadata", + "name", u.pending.Descriptor.Name, + "last_stage", u.pending.LastCompletedStage, + ) + return nil +} + +func (u *upgradeManager) flushDescriptor() error { + if u.pending == nil { + if err := u.store.Delete(metadataStoreKey); err != persistent.ErrNotFound { + return err + } + return nil + } + if u.pending.IsCompleted() { + u.logger.Info("upgrade completed, removing state", + "name", u.pending.Descriptor.Name, + ) + err := u.store.Delete(metadataStoreKey) + if err == nil { + u.pending = nil + } + return err + } + return u.store.PutCBOR(metadataStoreKey, &u.pending) +} + +func (u *upgradeManager) StartupUpgrade() error { + u.lock.Lock() + defer u.lock.Unlock() + + if u.pending == nil || u.pending.UpgradeHeight == api.InvalidUpgradeHeight { + return nil + } + if !u.pending.HasStage(api.UpgradeStageStartup) { + // Make sure we're in order (pushing will panic otherwise). + u.pending.PushStage(api.UpgradeStageStartup) + + u.logger.Warn("performing startup upgrade", + "name", u.pending.Descriptor.Name, + "submitted_by", u.pending.SubmittingVersion, + "version", u.pending.RunningVersion, + logging.LogEvent, api.LogEventStartupUpgrade, + ) + err := u.handler.StartupUpgrade(u.ctx) + if err == nil { + // Save the updated descriptor state. + err = u.flushDescriptor() + } + return err + } + u.logger.Warn("startup upgrade already performed, skipping", + "name", u.pending.Descriptor.Name, + "submitted_by", u.pending.SubmittingVersion, + "version", u.pending.RunningVersion, + ) + return nil +} + +func (u *upgradeManager) ConsensusUpgrade(privateCtx interface{}, currentEpoch epochtime.EpochTime, currentHeight int64) error { + u.lock.Lock() + defer u.lock.Unlock() + + if u.pending == nil { + return nil + } + + // If we haven't reached the upgrade epoch yet, we run normally; + // startup made sure we're an appropriate binary for that. + if u.pending.UpgradeHeight == api.InvalidUpgradeHeight { + if currentEpoch < u.pending.Descriptor.Epoch { + return nil + } + u.pending.UpgradeHeight = currentHeight + if err := u.flushDescriptor(); err != nil { + return err + } + return api.ErrStopForUpgrade + } + + // If we're already past the upgrade height, then everything must be complete. + if u.pending.UpgradeHeight < currentHeight { + u.pending.PushStage(api.UpgradeStageConsensus) + return u.flushDescriptor() + } + + if u.pending.UpgradeHeight > currentHeight { + panic("consensus upgrade: UpgradeHeight is in the future but upgrade epoch seen already") + } + + if !u.pending.HasStage(api.UpgradeStageConsensus) { + u.logger.Warn("performing consensus upgrade", + "name", u.pending.Descriptor.Name, + "submitted_by", u.pending.SubmittingVersion, + "version", u.pending.RunningVersion, + logging.LogEvent, api.LogEventConsensusUpgrade, + ) + return u.handler.ConsensusUpgrade(u.ctx, privateCtx) + } + return nil +} + +// New constructs and returns a new upgrade manager. It also checks for and loads any +// pending upgrade descriptors; if this node is not the one intended to be run according +// to the loaded descriptor, New will return an error. +func New(store *persistent.CommonStore, dataDir string) (api.Backend, error) { + svcStore, err := store.GetServiceStore(api.ModuleName) + if err != nil { + return nil, err + } + upgrader := &upgradeManager{ + store: svcStore, + logger: logging.GetLogger(api.ModuleName), + } + + if err := upgrader.checkStatus(); err != nil { + return nil, err + } + + if upgrader.pending != nil { + upgrader.ctx = migrations.NewContext(upgrader.pending, dataDir) + upgrader.handler = migrations.GetHandler(upgrader.ctx) + } + + return upgrader, nil +}