Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply upgrade proposals directly from governance consensus application #3870

Merged
merged 2 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/3870.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Apply upgrade proposals directly from governance consensus application

This should make it more robust in light of fast sync as otherwise the worker
could miss upgrade events in certain edge cases.
11 changes: 11 additions & 0 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,17 @@ func (mux *abciMux) ApplySnapshotChunk(req types.RequestApplySnapshotChunk) type
"root", cp.Root,
logging.LogEvent, LogEventABCIStateSyncComplete,
)

// Notify applications that state has been synced.
ctx := mux.state.NewContext(api.ContextEndBlock, mux.currentTime)
defer ctx.Close()

if err = mux.md.Publish(ctx, api.MessageStateSyncCompleted, nil); err != nil {
mux.logger.Error("failed to dispatch state sync completed message",
"err", err,
)
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}
}

return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ACCEPT}
Expand Down
6 changes: 6 additions & 0 deletions go/consensus/tendermint/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,9 @@ func (bpk BlockProposerKey) NewDefault() interface{} {
// multiplexer.
panic("no proposer address in block context")
}

type messageKind uint8

// MessageStateSyncCompleted is the message kind for when the node successfully performs a state
// sync. The message itself is nil.
var MessageStateSyncCompleted = messageKind(0)
54 changes: 52 additions & 2 deletions go/consensus/tendermint/apps/governance/governance.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (app *governanceApplication) Dependencies() []string {

func (app *governanceApplication) OnRegister(state api.ApplicationState, md api.MessageDispatcher) {
app.state = state

// Subscribe to messages emitted by other apps.
md.Subscribe(api.MessageStateSyncCompleted, app)
}

func (app *governanceApplication) OnCleanup() {
Expand Down Expand Up @@ -81,7 +84,34 @@ func (app *governanceApplication) ExecuteTx(ctx *api.Context, tx *transaction.Tr
}

func (app *governanceApplication) ExecuteMessage(ctx *api.Context, kind, msg interface{}) error {
return governance.ErrInvalidArgument
state := governanceState.NewMutableState(ctx.State())

switch kind {
case api.MessageStateSyncCompleted:
// State sync has just completed, check whether there are any pending upgrades to make
// sure we don't miss them after the sync.
pendingUpgrades, err := state.PendingUpgrades(ctx)
if err != nil {
return fmt.Errorf("tendermint/governance: couldn't get pending upgrades: %w", err)
}

// Apply all pending upgrades locally.
if upgrader := ctx.AppState().Upgrader(); upgrader != nil {
for _, pu := range pendingUpgrades {
switch err = upgrader.SubmitDescriptor(ctx, pu); err {
case nil, upgrade.ErrAlreadyPending:
default:
ctx.Logger().Error("failed to locally apply the upgrade descriptor",
"err", err,
"descriptor", pu,
)
}
}
}
return nil
default:
return governance.ErrInvalidArgument
}
}

func (app *governanceApplication) BeginBlock(ctx *api.Context, request types.RequestBeginBlock) error {
Expand Down Expand Up @@ -170,6 +200,16 @@ func (app *governanceApplication) executeProposal(ctx *api.Context, state *gover
if err != nil {
return fmt.Errorf("failed to set pending upgrade: %w", err)
}

// Locally apply the upgrade proposal.
if upgrader := ctx.AppState().Upgrader(); upgrader != nil {
if err = upgrader.SubmitDescriptor(ctx, &proposal.Content.Upgrade.Descriptor); err != nil {
ctx.Logger().Error("failed to locally apply the upgrade descriptor",
"err", err,
"descriptor", proposal.Content.Upgrade.Descriptor,
)
}
}
case proposal.Content.CancelUpgrade != nil:
cancelingProposal, err := state.Proposal(ctx, proposal.Content.CancelUpgrade.ProposalID)
if err != nil {
Expand All @@ -178,14 +218,24 @@ func (app *governanceApplication) executeProposal(ctx *api.Context, state *gover
if cancelingProposal.Content.Upgrade == nil {
return fmt.Errorf("%w: canceling proposal needs to be an upgrade proposal", governance.ErrNoSuchUpgrade)
}
_, err = state.PendingUpgradeProposal(ctx, cancelingProposal.ID)
upgradeProposal, err := state.PendingUpgradeProposal(ctx, cancelingProposal.ID)
if err != nil {
return fmt.Errorf("failed to get pending upgrade: %w", err)
}
err = state.RemovePendingUpgrade(ctx, cancelingProposal.Content.Upgrade.Epoch, cancelingProposal.ID)
if err != nil {
return fmt.Errorf("failed to remove pending upgrade: %w", err)
}

// Locally cancel the upgrade proposal.
if upgrader := ctx.AppState().Upgrader(); upgrader != nil {
if err = upgrader.CancelUpgrade(ctx, &upgradeProposal.Descriptor); err != nil {
ctx.Logger().Error("failed to locally cancel the upgrade",
"err", err,
"descriptor", upgradeProposal.Descriptor,
)
}
}
default:
return governance.ErrInvalidArgument
}
Expand Down
11 changes: 10 additions & 1 deletion go/consensus/tendermint/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -120,6 +121,9 @@ const (
CfgConsensusStateSyncTrustHeight = "consensus.tendermint.state_sync.trust_height"
// CfgConsensusStateSyncTrustHash is the known trusted block header hash for the light client.
CfgConsensusStateSyncTrustHash = "consensus.tendermint.state_sync.trust_hash"

// CfgUpgradeStopDelay is the average amount of time to delay shutting down the node on upgrade.
CfgUpgradeStopDelay = "consensus.tendermint.upgrade.stop_delay"
)

const (
Expand Down Expand Up @@ -1329,7 +1333,10 @@ func (t *fullService) lazyInit() error {
go func() {
// Sleep another period so there is some time between when consensus shuts down and
// when all the other services start shutting down.
time.Sleep(waitPeriod)
//
// Randomize the period so that not all nodes shut down at the same time.
delay := getRandomValueFromInterval(0.5, rand.Float64(), viper.GetDuration(CfgUpgradeStopDelay))
time.Sleep(delay)

t.Logger.Info("stopping the node for upgrade")
t.Stop()
Expand Down Expand Up @@ -1537,6 +1544,8 @@ func init() {
Flags.Uint64(CfgConsensusStateSyncTrustHeight, 0, "state sync: light client trusted height")
Flags.String(CfgConsensusStateSyncTrustHash, "", "state sync: light client trusted consensus header hash")

Flags.Duration(CfgUpgradeStopDelay, 60*time.Second, "average amount of time to delay shutting down the node on upgrade")

_ = Flags.MarkHidden(CfgDebugUnsafeReplayRecoverCorruptedWAL)

_ = Flags.MarkHidden(CfgSupplementarySanityEnabled)
Expand Down
18 changes: 18 additions & 0 deletions go/consensus/tendermint/full/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package full

import "time"

// Borrowed from https://github.com/cenkalti/backoff.

// Returns a random value from the following interval:
// [currentInterval - randomizationFactor * currentInterval, currentInterval + randomizationFactor * currentInterval].
func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration {
delta := randomizationFactor * float64(currentInterval)
minInterval := float64(currentInterval) - delta
maxInterval := float64(currentInterval) + delta

// Get a random value from the range [minInterval, maxInterval].
// The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then
// we want a 33% chance for selecting either 1, 2 or 3.
return time.Duration(minInterval + (random * (maxInterval - minInterval + 1)))
}
40 changes: 10 additions & 30 deletions go/oasis-node/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import (
workerSentry "github.com/oasisprotocol/oasis-core/go/worker/sentry"
"github.com/oasisprotocol/oasis-core/go/worker/storage"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage"
workerGovernanceUpgrade "github.com/oasisprotocol/oasis-core/go/worker/upgrade"
)

// Flags has the configuration flags.
Expand Down Expand Up @@ -118,17 +117,16 @@ type Node struct {
RuntimeRegistry runtimeRegistry.Registry
RuntimeClient runtimeClientAPI.RuntimeClientService

CommonWorker *workerCommon.Worker
ExecutorWorker *executor.Worker
StorageWorker *workerStorage.Worker
SentryWorker *workerSentry.Worker
P2P *p2p.P2P
RegistrationWorker *registration.Worker
KeymanagerWorker *workerKeymanager.Worker
ConsensusWorker *workerConsensusRPC.Worker
BeaconWorker *workerBeacon.Worker
GovernanceUpgradeWorker *workerGovernanceUpgrade.Worker
readyCh chan struct{}
CommonWorker *workerCommon.Worker
ExecutorWorker *executor.Worker
StorageWorker *workerStorage.Worker
SentryWorker *workerSentry.Worker
P2P *p2p.P2P
RegistrationWorker *registration.Worker
KeymanagerWorker *workerKeymanager.Worker
ConsensusWorker *workerConsensusRPC.Worker
BeaconWorker *workerBeacon.Worker
readyCh chan struct{}

logger *logging.Logger
}
Expand Down Expand Up @@ -366,19 +364,6 @@ func (n *Node) initRuntimeWorkers() error {
}
n.svcMgr.Register(n.BeaconWorker)

// Initialize the consensus upgrade worker.
n.GovernanceUpgradeWorker, err = workerGovernanceUpgrade.New(
n.Consensus,
n.Upgrader,
)
if err != nil {
n.logger.Error("failed to initialize governance upgrade worker",
"err", err,
)
return err
}
n.svcMgr.Register(n.GovernanceUpgradeWorker)

// Initialize the storage worker.
n.StorageWorker, err = workerStorage.New(
n.grpcInternal,
Expand Down Expand Up @@ -478,11 +463,6 @@ func (n *Node) startRuntimeWorkers() error {
return err
}

// Start the consensus upgrade worker.
if err := n.GovernanceUpgradeWorker.Start(); err != nil {
return err
}

// Start the sentry worker.
if err := n.SentryWorker.Start(); err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ func (args *argBuilder) tendermintStateSync(
return args
}

func (args *argBuilder) tendermintUpgradeStopDelay(delay time.Duration) *argBuilder {
args.vec = append(args.vec,
"--"+tendermintFull.CfgUpgradeStopDelay, delay.String(),
)
return args
}

func (args *argBuilder) storageBackend(backend string) *argBuilder {
args.vec = append(args.vec, []string{
"--" + workerStorage.CfgBackend, backend,
Expand Down
3 changes: 2 additions & 1 deletion go/oasis-test-runner/oasis/oasis.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,8 @@ func (net *Network) startOasisNode(
extraArgs = extraArgs.
appendIASProxy(net.iasProxy).
tendermintDebugAddrBookLenient().
tendermintDebugAllowDuplicateIP()
tendermintDebugAllowDuplicateIP().
tendermintUpgradeStopDelay(10 * time.Second)
}
if net.cfg.UseShortGrpcSocketPaths {
// Keep the socket, if it was already generated!
Expand Down
Loading