Skip to content

Commit

Permalink
go/upgrade: Implement node upgrading mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Feb 10, 2020
1 parent 9a249be commit fe47ce5
Show file tree
Hide file tree
Showing 19 changed files with 984 additions and 16 deletions.
7 changes: 7 additions & 0 deletions .changelog/2607.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Implement node upgrade mechanism.

The node now accepts upgrade descriptors which describe the upgrade to carry
out. It can shut down at the appropriate epoch and then execute any required
migration handlers on the node itself and on the consensus layer. Once a
descriptor is submitted, the node will refuse to start if it doesn't contain the
necessary handlers.
7 changes: 7 additions & 0 deletions go/common/persistent/persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func (ss *ServiceStore) PutCBOR(key []byte, value interface{}) error {
})
}

// Delete removes the specified key from the service store.
func (ss *ServiceStore) Delete(key []byte) error {
return ss.store.db.Update(func(tx *badger.Txn) error {
return tx.Delete(ss.dbKey(key))
})
}

func (ss *ServiceStore) dbKey(key []byte) []byte {
return bytes.Join([][]byte{ss.name, key}, []byte{'.'})
}
29 changes: 24 additions & 5 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/oasislabs/oasis-core/go/consensus/api/transaction"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
genesis "github.com/oasislabs/oasis-core/go/genesis/api"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

const (
Expand Down Expand Up @@ -264,12 +265,12 @@ func (a *ApplicationServer) EstimateGas(caller signature.PublicKey, tx *transact

// NewApplicationServer returns a new ApplicationServer, using the provided
// directory to persist state.
func NewApplicationServer(ctx context.Context, cfg *ApplicationConfig) (*ApplicationServer, error) {
func NewApplicationServer(ctx context.Context, upgrader upgrade.Backend, cfg *ApplicationConfig) (*ApplicationServer, error) {
metricsOnce.Do(func() {
prometheus.MustRegister(abciCollectors...)
})

mux, err := newABCIMux(ctx, cfg)
mux, err := newABCIMux(ctx, upgrader, cfg)
if err != nil {
return nil, err
}
Expand All @@ -284,8 +285,9 @@ type abciMux struct {
sync.RWMutex
types.BaseApplication

logger *logging.Logger
state *applicationState
logger *logging.Logger
upgrader upgrade.Backend
state *applicationState

appsByName map[string]Application
appsByMethod map[transaction.MethodName]Application
Expand Down Expand Up @@ -457,6 +459,7 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai

func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock {
blockHeight := mux.state.BlockHeight()

mux.logger.Debug("BeginBlock",
"req", req,
"block_height", blockHeight,
Expand All @@ -480,6 +483,21 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
ctx := mux.state.NewContext(ContextBeginBlock, mux.currentTime)
defer ctx.Close()

currentEpoch, err := mux.state.GetCurrentEpoch(ctx.Ctx())
if err != nil {
panic("mux: can't get current epoch in BeginBlock")
}

// Check if there are any upgrades pending or if we need to halt for an upgrade.
switch mux.upgrader.ConsensusUpgrade(ctx, currentEpoch) {
case nil:
// Everything ok.
case upgrade.ErrStopForUpgrade:
panic("mux: reached upgrade epoch")
default:
panic("mux: error while trying to perform consensus upgrade")
}

switch mux.state.haltMode {
case false:
if !mux.state.inHaltEpoch(ctx) {
Expand Down Expand Up @@ -867,14 +885,15 @@ func (mux *abciMux) checkDependencies() error {
return nil
}

func newABCIMux(ctx context.Context, cfg *ApplicationConfig) (*abciMux, error) {
func newABCIMux(ctx context.Context, upgrader upgrade.Backend, cfg *ApplicationConfig) (*abciMux, error) {
state, err := newApplicationState(ctx, cfg)
if err != nil {
return nil, err
}

mux := &abciMux{
logger: logging.GetLogger("abci-mux"),
upgrader: upgrader,
state: state,
appsByName: make(map[string]Application),
appsByMethod: make(map[transaction.MethodName]Application),
Expand Down
7 changes: 5 additions & 2 deletions go/consensus/tendermint/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
roothashAPI "github.com/oasislabs/oasis-core/go/roothash/api"
schedulerAPI "github.com/oasislabs/oasis-core/go/scheduler/api"
stakingAPI "github.com/oasislabs/oasis-core/go/staking/api"
upgradeAPI "github.com/oasislabs/oasis-core/go/upgrade/api"
)

const (
Expand Down Expand Up @@ -164,6 +165,7 @@ type tendermintService struct {

ctx context.Context
svcMgr *cmbackground.ServiceManager
upgrader upgradeAPI.Backend
mux *abci.ApplicationServer
node *tmnode.Node
client tmcli.Client
Expand Down Expand Up @@ -880,7 +882,7 @@ func (t *tendermintService) lazyInit() error {
MinGasPrice: viper.GetUint64(CfgConsensusMinGasPrice),
OwnTxSigner: t.nodeSigner.Public(),
}
t.mux, err = abci.NewApplicationServer(t.ctx, appConfig)
t.mux, err = abci.NewApplicationServer(t.ctx, t.upgrader, appConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -1164,7 +1166,7 @@ func (t *tendermintService) worker() {
}

// New creates a new Tendermint service.
func New(ctx context.Context, dataDir string, identity *identity.Identity, genesisProvider genesisAPI.Provider) (service.TendermintService, error) {
func New(ctx context.Context, dataDir string, identity *identity.Identity, upgrader upgradeAPI.Backend, genesisProvider genesisAPI.Provider) (service.TendermintService, error) {
// Retrive the genesis document early so that it is possible to
// use it while initializing other things.
genesisDoc, err := genesisProvider.GetGenesisDocument()
Expand All @@ -1183,6 +1185,7 @@ func New(ctx context.Context, dataDir string, identity *identity.Identity, genes
t := &tendermintService{
BaseBackgroundService: *cmservice.NewBaseBackgroundService("tendermint"),
svcMgr: cmbackground.NewServiceManager(logging.GetLogger("tendermint/servicemanager")),
upgrader: upgrader,
blockNotifier: pubsub.NewBroker(false),
consensusSigner: identity.ConsensusSigner,
nodeSigner: identity.NodeSigner,
Expand Down
6 changes: 6 additions & 0 deletions go/control/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/oasislabs/oasis-core/go/common/errors"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

// NodeController is a node controller interface.
Expand All @@ -23,6 +24,11 @@ type NodeController interface {
// IsSynced checks whether the node has finished syncing.
// TODO: These should be replaced with IsReady (see oasis-core#2130).
IsSynced(ctx context.Context) (bool, error)

// UpgradeBinary submits an upgrade descriptor to a running node.
// The node will wait for the appropriate epoch, then update its binaries
// and shut down.
UpgradeBinary(ctx context.Context, descriptor *upgrade.Descriptor) error
}

// Shutdownable is an interface the node presents for shutting itself down.
Expand Down
34 changes: 34 additions & 0 deletions go/control/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"google.golang.org/grpc"

cmnGrpc "github.com/oasislabs/oasis-core/go/common/grpc"
upgradeApi "github.com/oasislabs/oasis-core/go/upgrade/api"
)

var (
Expand All @@ -18,6 +19,8 @@ var (
methodWaitSync = serviceName.NewMethod("WaitSync", nil)
// methodIsSynced is the IsSynced method.
methodIsSynced = serviceName.NewMethod("IsSynced", nil)
// methodUpgradeBinary is the UpgradeBinary method.
methodUpgradeBinary = serviceName.NewMethod("UpgradeBinary", upgradeApi.Descriptor{})

// serviceDesc is the gRPC service descriptor.
serviceDesc = grpc.ServiceDesc{
Expand All @@ -36,6 +39,10 @@ var (
MethodName: methodIsSynced.ShortName(),
Handler: handlerIsSynced,
},
{
MethodName: methodUpgradeBinary.ShortName(),
Handler: handlerUpgradeBinary,
},
},
Streams: []grpc.StreamDesc{},
}
Expand Down Expand Up @@ -102,6 +109,29 @@ func handlerIsSynced( // nolint: golint
return interceptor(ctx, nil, info, handler)
}

func handlerUpgradeBinary( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var descriptor upgradeApi.Descriptor
if err := dec(&descriptor); err != nil {
return nil, err
}
if interceptor == nil {
return nil, srv.(NodeController).UpgradeBinary(ctx, &descriptor)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodUpgradeBinary.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, srv.(NodeController).UpgradeBinary(ctx, req.(*upgradeApi.Descriptor))
}
return interceptor(ctx, &descriptor, info, handler)
}

// RegisterService registers a new node controller service with the given gRPC server.
func RegisterService(server *grpc.Server, service NodeController) {
server.RegisterService(&serviceDesc, service)
Expand All @@ -127,6 +157,10 @@ func (c *nodeControllerClient) IsSynced(ctx context.Context) (bool, error) {
return rsp, nil
}

func (c *nodeControllerClient) UpgradeBinary(ctx context.Context, descriptor *upgradeApi.Descriptor) error {
return c.conn.Invoke(ctx, methodUpgradeBinary.FullName(), descriptor, nil)
}

// NewNodeControllerClient creates a new gRPC node controller client service.
func NewNodeControllerClient(c *grpc.ClientConn) NodeController {
return &nodeControllerClient{c}
Expand Down
13 changes: 10 additions & 3 deletions go/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"context"

consensus "github.com/oasislabs/oasis-core/go/consensus/api"
"github.com/oasislabs/oasis-core/go/control/api"
control "github.com/oasislabs/oasis-core/go/control/api"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

type nodeController struct {
node api.Shutdownable
node control.Shutdownable
consensus consensus.Backend
upgrader upgrade.Backend
}

func (c *nodeController) RequestShutdown(ctx context.Context, wait bool) error {
Expand Down Expand Up @@ -49,10 +51,15 @@ func (c *nodeController) IsSynced(ctx context.Context) (bool, error) {
}
}

func (c *nodeController) UpgradeBinary(ctx context.Context, descriptor *upgrade.Descriptor) error {
return c.upgrader.SubmitDescriptor(ctx, descriptor)
}

// New creates a new oasis-node controller.
func New(node api.Shutdownable, consensus consensus.Backend) api.NodeController {
func New(node control.Shutdownable, consensus consensus.Backend, upgrader upgrade.Backend) control.NodeController {
return &nodeController{
node: node,
consensus: consensus,
upgrader: upgrader,
}
}
44 changes: 44 additions & 0 deletions go/oasis-node/cmd/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package control

import (
"context"
"encoding/json"
"io/ioutil"
"os"

"github.com/spf13/cobra"
Expand All @@ -12,6 +14,7 @@ import (
control "github.com/oasislabs/oasis-core/go/control/api"
cmdCommon "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common"
cmdGrpc "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/grpc"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

var (
Expand Down Expand Up @@ -40,6 +43,13 @@ var (
Run: doShutdown,
}

controlUpgradeBinaryCmd = &cobra.Command{
Use: "upgrade-binary <upgrade-descriptor>",
Short: "submit an upgrade descriptor to the node and request shutdown",
Args: cobra.ExactArgs(1),
Run: doUpgradeBinary,
}

logger = logging.GetLogger("cmd/control")
)

Expand Down Expand Up @@ -112,6 +122,39 @@ func doShutdown(cmd *cobra.Command, args []string) {
}
}

func doUpgradeBinary(cmd *cobra.Command, args []string) {
conn, client := DoConnect(cmd)
defer conn.Close()

descriptorBytes, err := ioutil.ReadFile(args[0])
if err != nil {
logger.Error("failed to read upgrade descriptor",
"err", err,
)
os.Exit(1)
}

var desc upgrade.Descriptor
if err = json.Unmarshal(descriptorBytes, &desc); err != nil {
logger.Error("can't parse upgrade descriptor",
"err", err,
)
os.Exit(1)
}

if !desc.IsValid() {
logger.Error("submitted upgrade descriptor is not valid")
os.Exit(1)
}

if err = client.UpgradeBinary(context.Background(), &desc); err != nil {
logger.Error("error while sending upgrade descriptor to the node",
"err", err,
)
os.Exit(1)
}
}

// Register registers the client sub-command and all of it's children.
func Register(parentCmd *cobra.Command) {
controlCmd.PersistentFlags().AddFlagSet(cmdGrpc.ClientFlags)
Expand All @@ -121,5 +164,6 @@ func Register(parentCmd *cobra.Command) {
controlCmd.AddCommand(controlIsSyncedCmd)
controlCmd.AddCommand(controlWaitSyncCmd)
controlCmd.AddCommand(controlShutdownCmd)
controlCmd.AddCommand(controlUpgradeBinaryCmd)
parentCmd.AddCommand(controlCmd)
}
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/debug/byzantine/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (ht *honestTendermint) start(id *identity.Identity, dataDir string) error {
}
genesisDoc.SetChainContext()

ht.service, err = tendermint.New(context.Background(), dataDir, id, genesis)
ht.service, err = tendermint.New(context.Background(), dataDir, id, nil, genesis)
if err != nil {
return errors.Wrap(err, "tendermint New")
}
Expand Down
Loading

0 comments on commit fe47ce5

Please sign in to comment.