diff --git a/go/tendermint/apps/roothash/api.go b/go/tendermint/apps/roothash/api.go index 5186a04c91d..0fd8da9f67b 100644 --- a/go/tendermint/apps/roothash/api.go +++ b/go/tendermint/apps/roothash/api.go @@ -1,10 +1,15 @@ package roothash import ( + "github.com/oasislabs/ekiden/go/roothash/api/block" + "github.com/pkg/errors" + "github.com/tendermint/iavl" + "github.com/oasislabs/ekiden/go/common/cbor" "github.com/oasislabs/ekiden/go/common/crypto/signature" roothash "github.com/oasislabs/ekiden/go/roothash/api" "github.com/oasislabs/ekiden/go/roothash/api/commitment" + "github.com/oasislabs/ekiden/go/tendermint/abci" "github.com/oasislabs/ekiden/go/tendermint/api" ) @@ -14,7 +19,7 @@ const ( TransactionTag byte = 0x02 // AppName is the ABCI application name. - AppName string = "999_roothash" + AppName string = "300_roothash" ) var ( @@ -125,3 +130,15 @@ func (v *ValueComputeDiscrepancyDetected) MarshalCBOR() []byte { func (v *ValueComputeDiscrepancyDetected) UnmarshalCBOR(data []byte) error { return cbor.Unmarshal(data, v) } + +// MessageHandler is the interface implemented by components that handle roothash messages. +type MessageHandler interface { + // Name returns the name of the component. + // We use this for creating a deterministic order of calling the components. + Name() string + + // HandleRoothashMessages processes roothash messages. + // Return an error to indicate that the component is not satisfied with the messages. + // If a component is not satisfied, changes to the MutableTree will be discarded. + HandleRoothashMessages(*abci.Context, *iavl.MutableTree, *ValueFinalized, []*block.RoothashMessage) error +} diff --git a/go/tendermint/apps/roothash/roothash.go b/go/tendermint/apps/roothash/roothash.go index 6d78f70ccda..42bd8f8bb96 100644 --- a/go/tendermint/apps/roothash/roothash.go +++ b/go/tendermint/apps/roothash/roothash.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "encoding/hex" + "sort" "time" "github.com/pkg/errors" @@ -59,6 +60,9 @@ type rootHashApplication struct { beacon beacon.Backend roundTimeout time.Duration + + messageHandlersByName map[string]MessageHandler + messageHandlersByLexOrdering []MessageHandler } func (app *rootHashApplication) Name() string { @@ -651,7 +655,10 @@ func (app *rootHashApplication) commit( // Try to finalize round. if !ctx.IsCheckOnly() { - app.tryFinalizeMerge(ctx, runtime, rtState, false) + finalizedBlock := app.tryFinalizeMerge(ctx, runtime, rtState, false) + if finalizedBlock != nil { + app.postProcessFinalizedBlock(finalizedBlock) + } } case tx.TxComputeCommit != nil: pools := make(map[*commitment.Pool]bool) @@ -785,7 +792,7 @@ func (app *rootHashApplication) tryFinalizeMerge( runtime *registry.Runtime, rtState *runtimeState, forced bool, -) { +) *block.Block { latestBlock := rtState.CurrentBlock blockNr := latestBlock.Header.Round id, _ := runtime.ID.MarshalBinary() @@ -796,7 +803,7 @@ func (app *rootHashApplication) tryFinalizeMerge( app.logger.Error("attempted to finalize merge when block already finalized", "round", blockNr, ) - return + return nil } commit, err := rtState.Round.MergePool.TryFinalize(ctx.Now(), app.roundTimeout, forced, true) @@ -815,22 +822,15 @@ func (app *rootHashApplication) tryFinalizeMerge( rtState.Round.MergePool.ResetCommitments() rtState.Round.ComputePool.ResetCommitments() rtState.Round.Finalized = true - rtState.CurrentBlock = blk - ctx.EmitTag(TagUpdate, TagUpdateValue) - tagV := ValueFinalized{ - ID: id, - Round: blk.Header.Round, - } - ctx.EmitTag(TagFinalized, tagV.MarshalCBOR()) - return + return blk case commitment.ErrStillWaiting: // Need more commits. app.logger.Debug("insufficient commitments for finality, waiting", "round", blockNr, ) - return + return nil case commitment.ErrDiscrepancyDetected: // Discrepancy has been detected. app.logger.Warn("merge discrepancy detected", @@ -843,7 +843,7 @@ func (app *rootHashApplication) tryFinalizeMerge( Event: roothash.MergeDiscrepancyDetectedEvent{}, } ctx.EmitTag(TagMergeDiscrepancyDetected, tagV.MarshalCBOR()) - return + return nil default: } @@ -854,6 +854,57 @@ func (app *rootHashApplication) tryFinalizeMerge( ) app.emitEmptyBlock(ctx, rtState, block.RoundFailed) + return nil +} + +func (app *rootHashApplication) RegisterMessageHandler(mh MessageHandler) error { + name := mh.Name() + if app.messageHandlersByName[name] != nil { + return errors.Errorf("message handler already registered: '%s'", name) + } + + app.messageHandlersByName[name] = mh + app.rebuildMessageHandlerLexOrdering() // Inefficient but not a lot of apps. + + app.logger.Debug("Registered new message handler", + "mh", name, + ) + + return nil +} + +func (app *rootHashApplication) rebuildMessageHandlerLexOrdering() { + numMessageHandlers := len(app.messageHandlersByName) + messageHandlerOrder := make([]string, 0, numMessageHandlers) + for name := range app.messageHandlersByName { + messageHandlerOrder = append(messageHandlerOrder, name) + } + sort.Strings(messageHandlerOrder) + + app.messageHandlersByLexOrdering = make([]MessageHandler, 0, numMessageHandlers) + for _, name := range messageHandlerOrder { + app.messageHandlersByLexOrdering = append(app.messageHandlersByLexOrdering, app.messageHandlersByName[name]) + } +} + +func XXXGetMessages(ctx *abci.Context, tree *iavl.MutableTree) (*ValueFinalized, []*block.RoothashMessage, error) { + vfRaw := ctx.GetTag(TagFinalized) + if vfRaw == nil { + return nil, nil, nil + } + + var vf ValueFinalized + if err := cbor.Unmarshal(vfRaw, &vf); err != nil { + return nil, nil, errors.Wrapf(err, "failed to unmarshal %s value", TagFinalized) + } + + state := newMutableState(tree) + runtime, err := state.getRuntimeState(vf.ID) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to load state for runtime %s", vf.ID) + } + + return &vf, runtime.CurrentBlock.Header.RoothashMessages, nil } // New constructs a new roothash application instance. @@ -864,10 +915,11 @@ func New( roundTimeout time.Duration, ) abci.Application { return &rootHashApplication{ - ctx: ctx, - logger: logging.GetLogger("tendermint/roothash"), - timeSource: timeSource, - beacon: beacon, - roundTimeout: roundTimeout, + ctx: ctx, + logger: logging.GetLogger("tendermint/roothash"), + timeSource: timeSource, + beacon: beacon, + roundTimeout: roundTimeout, + messageHandlersByName: make(map[string]MessageHandler), } } diff --git a/go/tendermint/apps/staking/api.go b/go/tendermint/apps/staking/api.go index a4bbc0fb5a6..843ee7e2adf 100644 --- a/go/tendermint/apps/staking/api.go +++ b/go/tendermint/apps/staking/api.go @@ -11,7 +11,7 @@ const ( TransactionTag byte = 0x05 // AppName is the ABCI application name. - AppName string = "100_staking" + AppName string = "400_staking" ) var ( diff --git a/go/tendermint/apps/staking/staking.go b/go/tendermint/apps/staking/staking.go index b386103f0a7..17e7bc3388b 100644 --- a/go/tendermint/apps/staking/staking.go +++ b/go/tendermint/apps/staking/staking.go @@ -3,6 +3,7 @@ package staking import ( "encoding/hex" + "github.com/oasislabs/ekiden/go/roothash/api/block" "github.com/pkg/errors" "github.com/tendermint/iavl" @@ -15,6 +16,7 @@ import ( staking "github.com/oasislabs/ekiden/go/staking/api" "github.com/oasislabs/ekiden/go/tendermint/abci" "github.com/oasislabs/ekiden/go/tendermint/api" + roothashapp "github.com/oasislabs/ekiden/go/tendermint/apps/roothash" ) var ( @@ -42,6 +44,7 @@ func (app *stakingApplication) Blessed() bool { } func (app *stakingApplication) Dependencies() []string { + // Can process roothash messages. Not required though. return nil } @@ -203,6 +206,7 @@ func (app *stakingApplication) DeliverTx(ctx *abci.Context, tx []byte) error { } func (app *stakingApplication) ForeignDeliverTx(ctx *abci.Context, other abci.Application, tx []byte) error { + app.processRoothashMessages(ctx, app.state.DeliverTxTree()) return nil } @@ -211,6 +215,7 @@ func (app *stakingApplication) EndBlock(request types.RequestEndBlock) (types.Re } func (app *stakingApplication) FireTimer(ctx *abci.Context, timer *abci.Timer) { + app.processRoothashMessages(ctx, app.state.DeliverTxTree()) } func (app *stakingApplication) queryTotalSupply(s, r interface{}) ([]byte, error) { @@ -494,6 +499,37 @@ func (app *stakingApplication) addEscrow(ctx *abci.Context, state *MutableState, return nil } +func (app *stakingApplication) handleRoothashMessages(ctx *abci.Context, tree *iavl.MutableTree, vf *roothashapp.ValueFinalized, messages []*block.RoothashMessage) error { + for _, message := range messages { + if message.DummyRoothashMessage != nil { + app.logger.Info("dummy message from roothash", + "from_runtime", vf.ID, + "greeting", message.DummyRoothashMessage.Greeting, + ) + } + } + + return nil +} + +func (app *stakingApplication) processRoothashMessages(ctx *abci.Context, tree *iavl.MutableTree) { + vf, messages, err := roothashapp.GetMessages(ctx, tree) + if err != nil { + app.logger.Error("FireTimer: failed to get messages from roothash", + "err", err, + ) + panic(err) + } + + if vf == nil { + return + } + + if err = app.handleRoothashMessages(ctx, tree, vf, messages); err != nil { + panic("TODO: revert roothash round instead of breaking the whole ledger") + } +} + func (app *stakingApplication) reclaimEscrow(ctx *abci.Context, state *MutableState, signedReclaim *staking.SignedReclaimEscrow) error { var reclaim staking.ReclaimEscrow if err := signedReclaim.Open(staking.ReclaimEscrowSignatureContext, &reclaim); err != nil {