Skip to content

Commit

Permalink
scratch
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-wh committed Oct 2, 2019
1 parent 3ae4357 commit 8a43a38
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 20 deletions.
19 changes: 18 additions & 1 deletion go/tendermint/apps/roothash/api.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package roothash

import (
"github.com/oasislabs/ekiden/go/roothash/api/block"
"github.com/pkg/errors"
"github.com/tendermint/iavl"

"github.com/oasislabs/ekiden/go/common/cbor"
"github.com/oasislabs/ekiden/go/common/crypto/signature"
roothash "github.com/oasislabs/ekiden/go/roothash/api"
"github.com/oasislabs/ekiden/go/roothash/api/commitment"
"github.com/oasislabs/ekiden/go/tendermint/abci"
"github.com/oasislabs/ekiden/go/tendermint/api"
)

Expand All @@ -14,7 +19,7 @@ const (
TransactionTag byte = 0x02

// AppName is the ABCI application name.
AppName string = "999_roothash"
AppName string = "300_roothash"
)

var (
Expand Down Expand Up @@ -125,3 +130,15 @@ func (v *ValueComputeDiscrepancyDetected) MarshalCBOR() []byte {
func (v *ValueComputeDiscrepancyDetected) UnmarshalCBOR(data []byte) error {
return cbor.Unmarshal(data, v)
}

// MessageHandler is the interface implemented by components that handle roothash messages.
type MessageHandler interface {
// Name returns the name of the component.
// We use this for creating a deterministic order of calling the components.
Name() string

// HandleRoothashMessages processes roothash messages.
// Return an error to indicate that the component is not satisfied with the messages.
// If a component is not satisfied, changes to the MutableTree will be discarded.
HandleRoothashMessages(*abci.Context, *iavl.MutableTree, *ValueFinalized, []*block.RoothashMessage) error
}
88 changes: 70 additions & 18 deletions go/tendermint/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"encoding/hex"
"sort"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -59,6 +60,9 @@ type rootHashApplication struct {
beacon beacon.Backend

roundTimeout time.Duration

messageHandlersByName map[string]MessageHandler
messageHandlersByLexOrdering []MessageHandler
}

func (app *rootHashApplication) Name() string {
Expand Down Expand Up @@ -651,7 +655,10 @@ func (app *rootHashApplication) commit(

// Try to finalize round.
if !ctx.IsCheckOnly() {
app.tryFinalizeMerge(ctx, runtime, rtState, false)
finalizedBlock := app.tryFinalizeMerge(ctx, runtime, rtState, false)
if finalizedBlock != nil {
app.postProcessFinalizedBlock(finalizedBlock)
}
}
case tx.TxComputeCommit != nil:
pools := make(map[*commitment.Pool]bool)
Expand Down Expand Up @@ -785,7 +792,7 @@ func (app *rootHashApplication) tryFinalizeMerge(
runtime *registry.Runtime,
rtState *runtimeState,
forced bool,
) {
) *block.Block {
latestBlock := rtState.CurrentBlock
blockNr := latestBlock.Header.Round
id, _ := runtime.ID.MarshalBinary()
Expand All @@ -796,7 +803,7 @@ func (app *rootHashApplication) tryFinalizeMerge(
app.logger.Error("attempted to finalize merge when block already finalized",
"round", blockNr,
)
return
return nil
}

commit, err := rtState.Round.MergePool.TryFinalize(ctx.Now(), app.roundTimeout, forced, true)
Expand All @@ -815,22 +822,15 @@ func (app *rootHashApplication) tryFinalizeMerge(
rtState.Round.MergePool.ResetCommitments()
rtState.Round.ComputePool.ResetCommitments()
rtState.Round.Finalized = true
rtState.CurrentBlock = blk

ctx.EmitTag(TagUpdate, TagUpdateValue)
tagV := ValueFinalized{
ID: id,
Round: blk.Header.Round,
}
ctx.EmitTag(TagFinalized, tagV.MarshalCBOR())
return
return blk
case commitment.ErrStillWaiting:
// Need more commits.
app.logger.Debug("insufficient commitments for finality, waiting",
"round", blockNr,
)

return
return nil
case commitment.ErrDiscrepancyDetected:
// Discrepancy has been detected.
app.logger.Warn("merge discrepancy detected",
Expand All @@ -843,7 +843,7 @@ func (app *rootHashApplication) tryFinalizeMerge(
Event: roothash.MergeDiscrepancyDetectedEvent{},
}
ctx.EmitTag(TagMergeDiscrepancyDetected, tagV.MarshalCBOR())
return
return nil
default:
}

Expand All @@ -854,6 +854,57 @@ func (app *rootHashApplication) tryFinalizeMerge(
)

app.emitEmptyBlock(ctx, rtState, block.RoundFailed)
return nil
}

func (app *rootHashApplication) RegisterMessageHandler(mh MessageHandler) error {
name := mh.Name()
if app.messageHandlersByName[name] != nil {
return errors.Errorf("message handler already registered: '%s'", name)
}

app.messageHandlersByName[name] = mh
app.rebuildMessageHandlerLexOrdering() // Inefficient but not a lot of apps.

app.logger.Debug("Registered new message handler",
"mh", name,
)

return nil
}

func (app *rootHashApplication) rebuildMessageHandlerLexOrdering() {
numMessageHandlers := len(app.messageHandlersByName)
messageHandlerOrder := make([]string, 0, numMessageHandlers)
for name := range app.messageHandlersByName {
messageHandlerOrder = append(messageHandlerOrder, name)
}
sort.Strings(messageHandlerOrder)

app.messageHandlersByLexOrdering = make([]MessageHandler, 0, numMessageHandlers)
for _, name := range messageHandlerOrder {
app.messageHandlersByLexOrdering = append(app.messageHandlersByLexOrdering, app.messageHandlersByName[name])
}
}

func XXXGetMessages(ctx *abci.Context, tree *iavl.MutableTree) (*ValueFinalized, []*block.RoothashMessage, error) {
vfRaw := ctx.GetTag(TagFinalized)
if vfRaw == nil {
return nil, nil, nil
}

var vf ValueFinalized
if err := cbor.Unmarshal(vfRaw, &vf); err != nil {
return nil, nil, errors.Wrapf(err, "failed to unmarshal %s value", TagFinalized)
}

state := newMutableState(tree)
runtime, err := state.getRuntimeState(vf.ID)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to load state for runtime %s", vf.ID)
}

return &vf, runtime.CurrentBlock.Header.RoothashMessages, nil
}

// New constructs a new roothash application instance.
Expand All @@ -864,10 +915,11 @@ func New(
roundTimeout time.Duration,
) abci.Application {
return &rootHashApplication{
ctx: ctx,
logger: logging.GetLogger("tendermint/roothash"),
timeSource: timeSource,
beacon: beacon,
roundTimeout: roundTimeout,
ctx: ctx,
logger: logging.GetLogger("tendermint/roothash"),
timeSource: timeSource,
beacon: beacon,
roundTimeout: roundTimeout,
messageHandlersByName: make(map[string]MessageHandler),
}
}
2 changes: 1 addition & 1 deletion go/tendermint/apps/staking/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const (
TransactionTag byte = 0x05

// AppName is the ABCI application name.
AppName string = "100_staking"
AppName string = "400_staking"
)

var (
Expand Down
36 changes: 36 additions & 0 deletions go/tendermint/apps/staking/staking.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package staking

import (
"encoding/hex"
"github.com/oasislabs/ekiden/go/roothash/api/block"

"github.com/pkg/errors"
"github.com/tendermint/iavl"
Expand All @@ -15,6 +16,7 @@ import (
staking "github.com/oasislabs/ekiden/go/staking/api"
"github.com/oasislabs/ekiden/go/tendermint/abci"
"github.com/oasislabs/ekiden/go/tendermint/api"
roothashapp "github.com/oasislabs/ekiden/go/tendermint/apps/roothash"
)

var (
Expand Down Expand Up @@ -42,6 +44,7 @@ func (app *stakingApplication) Blessed() bool {
}

func (app *stakingApplication) Dependencies() []string {
// Can process roothash messages. Not required though.
return nil
}

Expand Down Expand Up @@ -203,6 +206,7 @@ func (app *stakingApplication) DeliverTx(ctx *abci.Context, tx []byte) error {
}

func (app *stakingApplication) ForeignDeliverTx(ctx *abci.Context, other abci.Application, tx []byte) error {
app.processRoothashMessages(ctx, app.state.DeliverTxTree())
return nil
}

Expand All @@ -211,6 +215,7 @@ func (app *stakingApplication) EndBlock(request types.RequestEndBlock) (types.Re
}

func (app *stakingApplication) FireTimer(ctx *abci.Context, timer *abci.Timer) {
app.processRoothashMessages(ctx, app.state.DeliverTxTree())
}

func (app *stakingApplication) queryTotalSupply(s, r interface{}) ([]byte, error) {
Expand Down Expand Up @@ -494,6 +499,37 @@ func (app *stakingApplication) addEscrow(ctx *abci.Context, state *MutableState,
return nil
}

func (app *stakingApplication) handleRoothashMessages(ctx *abci.Context, tree *iavl.MutableTree, vf *roothashapp.ValueFinalized, messages []*block.RoothashMessage) error {
for _, message := range messages {
if message.DummyRoothashMessage != nil {
app.logger.Info("dummy message from roothash",
"from_runtime", vf.ID,
"greeting", message.DummyRoothashMessage.Greeting,
)
}
}

return nil
}

func (app *stakingApplication) processRoothashMessages(ctx *abci.Context, tree *iavl.MutableTree) {
vf, messages, err := roothashapp.GetMessages(ctx, tree)
if err != nil {
app.logger.Error("FireTimer: failed to get messages from roothash",
"err", err,
)
panic(err)
}

if vf == nil {
return
}

if err = app.handleRoothashMessages(ctx, tree, vf, messages); err != nil {
panic("TODO: revert roothash round instead of breaking the whole ledger")
}
}

func (app *stakingApplication) reclaimEscrow(ctx *abci.Context, state *MutableState, signedReclaim *staking.SignedReclaimEscrow) error {
var reclaim staking.ReclaimEscrow
if err := signedReclaim.Open(staking.ReclaimEscrowSignatureContext, &reclaim); err != nil {
Expand Down

0 comments on commit 8a43a38

Please sign in to comment.