Skip to content

Commit

Permalink
go/upgrade: Implement node upgrading mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Feb 14, 2020
1 parent 0f833dc commit f0da65f
Show file tree
Hide file tree
Showing 26 changed files with 1,387 additions and 16 deletions.
9 changes: 9 additions & 0 deletions .changelog/2607.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Implement node upgrade mechanism

The node now accepts upgrade descriptors which describe the upgrade to carry
out. It can shut down at the appropriate epoch and then execute any required
migration handlers on the node itself and on the consensus layer. Once a
descriptor is submitted, the old node can be normally restarted and used until
the upgrade epoch is reached; the new binary can not be used at all until the
old binary has had a chance to reach the upgrade epoch. Once that is reached,
the old binary will refuse to start.
12 changes: 12 additions & 0 deletions go/common/persistent/persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@ func (ss *ServiceStore) PutCBOR(key []byte, value interface{}) error {
})
}

// Delete removes the specified key from the service store.
func (ss *ServiceStore) Delete(key []byte) error {
return ss.store.db.Update(func(tx *badger.Txn) error {
switch err := tx.Delete(ss.dbKey(key)); err {
case badger.ErrKeyNotFound:
return ErrNotFound
default:
return err
}
})
}

func (ss *ServiceStore) dbKey(key []byte) []byte {
return bytes.Join([][]byte{ss.name, key}, []byte{'.'})
}
29 changes: 24 additions & 5 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/oasislabs/oasis-core/go/consensus/api/transaction"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
genesis "github.com/oasislabs/oasis-core/go/genesis/api"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

const (
Expand Down Expand Up @@ -264,12 +265,12 @@ func (a *ApplicationServer) EstimateGas(caller signature.PublicKey, tx *transact

// NewApplicationServer returns a new ApplicationServer, using the provided
// directory to persist state.
func NewApplicationServer(ctx context.Context, cfg *ApplicationConfig) (*ApplicationServer, error) {
func NewApplicationServer(ctx context.Context, upgrader upgrade.Backend, cfg *ApplicationConfig) (*ApplicationServer, error) {
metricsOnce.Do(func() {
prometheus.MustRegister(abciCollectors...)
})

mux, err := newABCIMux(ctx, cfg)
mux, err := newABCIMux(ctx, upgrader, cfg)
if err != nil {
return nil, err
}
Expand All @@ -284,8 +285,9 @@ type abciMux struct {
sync.RWMutex
types.BaseApplication

logger *logging.Logger
state *applicationState
logger *logging.Logger
upgrader upgrade.Backend
state *applicationState

appsByName map[string]Application
appsByMethod map[transaction.MethodName]Application
Expand Down Expand Up @@ -457,6 +459,7 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai

func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock {
blockHeight := mux.state.BlockHeight()

mux.logger.Debug("BeginBlock",
"req", req,
"block_height", blockHeight,
Expand All @@ -480,6 +483,21 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
ctx := mux.state.NewContext(ContextBeginBlock, mux.currentTime)
defer ctx.Close()

currentEpoch, err := mux.state.GetCurrentEpoch(ctx.Ctx())
if err != nil {
panic("mux: can't get current epoch in BeginBlock")
}

// Check if there are any upgrades pending or if we need to halt for an upgrade.
switch err = mux.upgrader.ConsensusUpgrade(ctx, currentEpoch, blockHeight); err {
case nil:
// Everything ok.
case upgrade.ErrStopForUpgrade:
panic("mux: reached upgrade epoch")
default:
panic(fmt.Sprintf("mux: error while trying to perform consensus upgrade: %v", err))
}

switch mux.state.haltMode {
case false:
if !mux.state.inHaltEpoch(ctx) {
Expand Down Expand Up @@ -867,14 +885,15 @@ func (mux *abciMux) checkDependencies() error {
return nil
}

func newABCIMux(ctx context.Context, cfg *ApplicationConfig) (*abciMux, error) {
func newABCIMux(ctx context.Context, upgrader upgrade.Backend, cfg *ApplicationConfig) (*abciMux, error) {
state, err := newApplicationState(ctx, cfg)
if err != nil {
return nil, err
}

mux := &abciMux{
logger: logging.GetLogger("abci-mux"),
upgrader: upgrader,
state: state,
appsByName: make(map[string]Application),
appsByMethod: make(map[transaction.MethodName]Application),
Expand Down
7 changes: 5 additions & 2 deletions go/consensus/tendermint/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
roothashAPI "github.com/oasislabs/oasis-core/go/roothash/api"
schedulerAPI "github.com/oasislabs/oasis-core/go/scheduler/api"
stakingAPI "github.com/oasislabs/oasis-core/go/staking/api"
upgradeAPI "github.com/oasislabs/oasis-core/go/upgrade/api"
)

const (
Expand Down Expand Up @@ -172,6 +173,7 @@ type tendermintService struct {

ctx context.Context
svcMgr *cmbackground.ServiceManager
upgrader upgradeAPI.Backend
mux *abci.ApplicationServer
node *tmnode.Node
client tmcli.Client
Expand Down Expand Up @@ -888,7 +890,7 @@ func (t *tendermintService) lazyInit() error {
MinGasPrice: viper.GetUint64(CfgConsensusMinGasPrice),
OwnTxSigner: t.nodeSigner.Public(),
}
t.mux, err = abci.NewApplicationServer(t.ctx, appConfig)
t.mux, err = abci.NewApplicationServer(t.ctx, t.upgrader, appConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -1169,7 +1171,7 @@ func (t *tendermintService) worker() {
}

// New creates a new Tendermint service.
func New(ctx context.Context, dataDir string, identity *identity.Identity, genesisProvider genesisAPI.Provider) (service.TendermintService, error) {
func New(ctx context.Context, dataDir string, identity *identity.Identity, upgrader upgradeAPI.Backend, genesisProvider genesisAPI.Provider) (service.TendermintService, error) {
// Retrive the genesis document early so that it is possible to
// use it while initializing other things.
genesisDoc, err := genesisProvider.GetGenesisDocument()
Expand All @@ -1188,6 +1190,7 @@ func New(ctx context.Context, dataDir string, identity *identity.Identity, genes
t := &tendermintService{
BaseBackgroundService: *cmservice.NewBaseBackgroundService("tendermint"),
svcMgr: cmbackground.NewServiceManager(logging.GetLogger("tendermint/servicemanager")),
upgrader: upgrader,
blockNotifier: pubsub.NewBroker(false),
consensusSigner: identity.ConsensusSigner,
nodeSigner: identity.NodeSigner,
Expand Down
9 changes: 9 additions & 0 deletions go/control/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/oasislabs/oasis-core/go/common/errors"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

// NodeController is a node controller interface.
Expand All @@ -23,6 +24,14 @@ type NodeController interface {
// IsSynced checks whether the node has finished syncing.
// TODO: These should be replaced with IsReady (see oasis-core#2130).
IsSynced(ctx context.Context) (bool, error)

// UpgradeBinary submits an upgrade descriptor to a running node.
// The node will wait for the appropriate epoch, then update its binaries
// and shut down.
UpgradeBinary(ctx context.Context, descriptor *upgrade.Descriptor) error

// CancelUpgrade cancels a pending upgrade, unless it is already in progress.
CancelUpgrade(ctx context.Context) error
}

// Shutdownable is an interface the node presents for shutting itself down.
Expand Down
63 changes: 63 additions & 0 deletions go/control/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"google.golang.org/grpc"

cmnGrpc "github.com/oasislabs/oasis-core/go/common/grpc"
upgradeApi "github.com/oasislabs/oasis-core/go/upgrade/api"
)

var (
Expand All @@ -18,6 +19,10 @@ var (
methodWaitSync = serviceName.NewMethod("WaitSync", nil)
// methodIsSynced is the IsSynced method.
methodIsSynced = serviceName.NewMethod("IsSynced", nil)
// methodUpgradeBinary is the UpgradeBinary method.
methodUpgradeBinary = serviceName.NewMethod("UpgradeBinary", upgradeApi.Descriptor{})
// methodCancelUpgrade is the CancelUpgrade method.
methodCancelUpgrade = serviceName.NewMethod("CancelUpgrade", nil)

// serviceDesc is the gRPC service descriptor.
serviceDesc = grpc.ServiceDesc{
Expand All @@ -36,6 +41,14 @@ var (
MethodName: methodIsSynced.ShortName(),
Handler: handlerIsSynced,
},
{
MethodName: methodUpgradeBinary.ShortName(),
Handler: handlerUpgradeBinary,
},
{
MethodName: methodCancelUpgrade.ShortName(),
Handler: handlerCancelUpgrade,
},
},
Streams: []grpc.StreamDesc{},
}
Expand Down Expand Up @@ -102,6 +115,48 @@ func handlerIsSynced( // nolint: golint
return interceptor(ctx, nil, info, handler)
}

func handlerUpgradeBinary( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
var descriptor upgradeApi.Descriptor
if err := dec(&descriptor); err != nil {
return nil, err
}
if interceptor == nil {
return nil, srv.(NodeController).UpgradeBinary(ctx, &descriptor)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodUpgradeBinary.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, srv.(NodeController).UpgradeBinary(ctx, req.(*upgradeApi.Descriptor))
}
return interceptor(ctx, &descriptor, info, handler)
}

func handlerCancelUpgrade( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
if interceptor == nil {
return nil, srv.(NodeController).CancelUpgrade(ctx)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodCancelUpgrade.FullName(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return nil, srv.(NodeController).CancelUpgrade(ctx)
}
return interceptor(ctx, nil, info, handler)
}

// RegisterService registers a new node controller service with the given gRPC server.
func RegisterService(server *grpc.Server, service NodeController) {
server.RegisterService(&serviceDesc, service)
Expand All @@ -127,6 +182,14 @@ func (c *nodeControllerClient) IsSynced(ctx context.Context) (bool, error) {
return rsp, nil
}

func (c *nodeControllerClient) UpgradeBinary(ctx context.Context, descriptor *upgradeApi.Descriptor) error {
return c.conn.Invoke(ctx, methodUpgradeBinary.FullName(), descriptor, nil)
}

func (c *nodeControllerClient) CancelUpgrade(ctx context.Context) error {
return c.conn.Invoke(ctx, methodCancelUpgrade.FullName(), nil, nil)
}

// NewNodeControllerClient creates a new gRPC node controller client service.
func NewNodeControllerClient(c *grpc.ClientConn) NodeController {
return &nodeControllerClient{c}
Expand Down
17 changes: 14 additions & 3 deletions go/control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"context"

consensus "github.com/oasislabs/oasis-core/go/consensus/api"
"github.com/oasislabs/oasis-core/go/control/api"
control "github.com/oasislabs/oasis-core/go/control/api"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

type nodeController struct {
node api.Shutdownable
node control.Shutdownable
consensus consensus.Backend
upgrader upgrade.Backend
}

func (c *nodeController) RequestShutdown(ctx context.Context, wait bool) error {
Expand Down Expand Up @@ -49,10 +51,19 @@ func (c *nodeController) IsSynced(ctx context.Context) (bool, error) {
}
}

func (c *nodeController) UpgradeBinary(ctx context.Context, descriptor *upgrade.Descriptor) error {
return c.upgrader.SubmitDescriptor(ctx, descriptor)
}

func (c *nodeController) CancelUpgrade(ctx context.Context) error {
return c.upgrader.CancelUpgrade(ctx)
}

// New creates a new oasis-node controller.
func New(node api.Shutdownable, consensus consensus.Backend) api.NodeController {
func New(node control.Shutdownable, consensus consensus.Backend, upgrader upgrade.Backend) control.NodeController {
return &nodeController{
node: node,
consensus: consensus,
upgrader: upgrader,
}
}
Loading

0 comments on commit f0da65f

Please sign in to comment.