Skip to content

Commit

Permalink
go/consensus/tendermint: Use MKVS for storing application state
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Feb 14, 2020
1 parent 3b88346 commit 12e21e6
Show file tree
Hide file tree
Showing 72 changed files with 2,391 additions and 1,897 deletions.
1 change: 1 addition & 0 deletions .changelog/1898.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Use MKVS for storing application state
2 changes: 1 addition & 1 deletion go/common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (
//
// NOTE: Any change in the major or minor versions are considered
// breaking changes for the protocol.
ConsensusProtocol = Version{Major: 0, Minor: 23, Patch: 0}
ConsensusProtocol = Version{Major: 0, Minor: 24, Patch: 0}

// Tendermint exposes the tendermint core version.
Tendermint = parseSemVerStr(version.TMCoreSemVer)
Expand Down
38 changes: 9 additions & 29 deletions go/consensus/tendermint/abci/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"context"
"time"

"github.com/tendermint/iavl"
"github.com/tendermint/tendermint/abci/types"

"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/api"
mkvs "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel"
)

type contextKey struct{}
Expand Down Expand Up @@ -57,7 +57,7 @@ func (m ContextMode) String() string {

// Context is the context of processing a transaction/block.
type Context struct {
ctx context.Context
context.Context

mode ContextMode
currentTime time.Time
Expand All @@ -69,24 +69,13 @@ type Context struct {
txSigner signature.PublicKey

appState ApplicationState
state *iavl.MutableTree
state *mkvs.Tree
blockHeight int64
blockCtx *BlockContext

logger *logging.Logger
}

// NewMockContext creates a new mock context for use in tests.
func NewMockContext(mode ContextMode, now time.Time) *Context {
return &Context{
ctx: context.Background(),
mode: mode,
currentTime: now,
gasAccountant: NewNopGasAccountant(),
logger: logging.GetLogger("consensus/tendermint/abci").With("mode", mode),
}
}

// FromCtx extracts an ABCI context from a context.Context if one has been
// set. Otherwise it returns nil.
func FromCtx(ctx context.Context) *Context {
Expand All @@ -99,7 +88,7 @@ func FromCtx(ctx context.Context) *Context {
// After calling this method, the context should no longer be used.
func (c *Context) Close() {
if c.IsSimulation() {
c.state.Rollback()
c.state.Close()
}

c.events = nil
Expand All @@ -113,11 +102,6 @@ func (c *Context) Logger() *logging.Logger {
return c.logger
}

// Ctx returns a context.Context that is associated with this ABCI context.
func (c *Context) Ctx() context.Context {
return c.ctx
}

// Mode returns the context mode.
func (c *Context) Mode() ContextMode {
return c.mode
Expand Down Expand Up @@ -225,8 +209,8 @@ func (c *Context) Now() time.Time {
return c.currentTime
}

// State returns the mutable state tree.
func (c *Context) State() *iavl.MutableTree {
// State returns the state tree associated with this context.
func (c *Context) State() *mkvs.Tree {
return c.state
}

Expand Down Expand Up @@ -255,16 +239,14 @@ func (c *Context) BlockContext() *BlockContext {

// NewStateCheckpoint creates a new state checkpoint.
func (c *Context) NewStateCheckpoint() *StateCheckpoint {
// XXX: Support state checkpoints.
return &StateCheckpoint{
ImmutableTree: *c.State().ImmutableTree,
ctx: c,
ctx: c,
}
}

// StateCheckpoint is a state checkpoint that can be used to rollback state.
type StateCheckpoint struct {
iavl.ImmutableTree

ctx *Context
}

Expand All @@ -278,9 +260,7 @@ func (sc *StateCheckpoint) Rollback() {
if sc.ctx == nil {
return
}
st := sc.ctx.State()
st.Rollback()
st.ImmutableTree = &sc.ImmutableTree
// XXX: Support rollback.
}

// BlockContextKey is an interface for a block context key.
Expand Down
32 changes: 32 additions & 0 deletions go/consensus/tendermint/abci/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package abci

import (
"errors"
"fmt"
)

type errorUnavailableState struct {
inner error
}

func (e *errorUnavailableState) Error() string {
return fmt.Sprintf("unavailable/corrupted state: %s", e.inner.Error())
}

func (e *errorUnavailableState) Unwrap() error {
return e.inner
}

// UnavailableStateError wraps an error in an unavailable state error.
func UnavailableStateError(err error) error {
if err == nil {
return nil
}
return &errorUnavailableState{err}
}

// IsUnavailableStateError returns true if any error in err's chain is an unavailable state error.
func IsUnavailableStateError(err error) bool {
var e *errorUnavailableState
return errors.As(err, &e)
}
39 changes: 24 additions & 15 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

const (
stateKeyGenesisDigest = "OasisGenesisDigest"
stateKeyGenesisRequest = "OasisGenesisRequest"
stateKeyInitChainEvents = "OasisInitChainEvents"

metricsUpdateInterval = 10 * time.Second
Expand All @@ -56,6 +55,7 @@ var (
// ApplicationConfig is the configuration for the consensus application.
type ApplicationConfig struct {
DataDir string
StorageBackend string
Pruning PruneConfig
HaltEpochHeight epochtime.EpochTime
MinGasPrice uint64
Expand Down Expand Up @@ -217,11 +217,6 @@ func (a *ApplicationServer) RegisterHaltHook(hook func(ctx context.Context, bloc
a.mux.registerHaltHook(hook)
}

// Pruner returns the ABCI state pruner.
func (a *ApplicationServer) Pruner() StatePruner {
return a.mux.state.statePruner
}

// SetEpochtime sets the mux epochtime.
//
// Epochtime must be set before the multiplexer can be used.
Expand Down Expand Up @@ -394,7 +389,10 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
tmp := bytes.NewBuffer(nil)
_ = types.WriteMessage(&req, tmp)
genesisDigest := sha512.Sum512_256(tmp.Bytes())
mux.state.deliverTxTree.Set([]byte(stateKeyGenesisDigest), genesisDigest[:])
err = mux.state.deliverTxTree.Insert(mux.state.ctx, []byte(stateKeyGenesisDigest), genesisDigest[:])
if err != nil {
panic(err)
}

resp := mux.BaseApplication.InitChain(req)

Expand All @@ -420,11 +418,6 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
mux.logger.Debug("Genesis hook dispatch complete")
}()

// TODO: remove stateKeyGenesisRequest here, see oasis-core#2426
b, _ = req.Marshal()
mux.state.deliverTxTree.Set([]byte(stateKeyGenesisRequest), b)
mux.state.checkTxTree.Set([]byte(stateKeyGenesisRequest), b)

// Call InitChain() on all applications.
mux.logger.Debug("InitChain: initializing applications")

Expand All @@ -450,7 +443,10 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
// Since returning emitted events doesn't work for InitChain() response yet,
// we store those and return them in BeginBlock().
evBinary := cbor.Marshal(ctx.GetEvents())
mux.state.deliverTxTree.Set([]byte(stateKeyInitChainEvents), evBinary)
err = mux.state.deliverTxTree.Insert(mux.state.ctx, []byte(stateKeyInitChainEvents), evBinary)
if err != nil {
panic(err)
}

return resp
}
Expand Down Expand Up @@ -529,14 +525,19 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
// During the first block, also collect and prepend application events
// generated during InitChain to BeginBlock events.
if mux.state.BlockHeight() == 0 {
_, evBinary := mux.state.deliverTxTree.Get([]byte(stateKeyInitChainEvents))
evBinary, err := mux.state.deliverTxTree.Get(ctx, []byte(stateKeyInitChainEvents))
if err != nil {
panic(fmt.Errorf("mux: BeginBlock: failed to query init chain events: %w", err))
}
if evBinary != nil {
var events []types.Event
_ = cbor.Unmarshal(evBinary, &events)

response.Events = append(events, response.Events...)

mux.state.deliverTxTree.Remove([]byte(stateKeyInitChainEvents))
if err := mux.state.deliverTxTree.Remove(ctx, []byte(stateKeyInitChainEvents)); err != nil {
panic(fmt.Errorf("mux: BeginBlock: failed to remove init chain events: %w", err))
}
}
}

Expand Down Expand Up @@ -711,6 +712,14 @@ func (mux *abciMux) DeliverTx(req types.RequestDeliverTx) types.ResponseDeliverT
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
if IsUnavailableStateError(err) {
// Make sure to not commit any transactions which include results based on unavailable
// and/or corrupted state -- doing so can further corrupt state.
ctx.Logger().Error("unavailable and/or corrupted state detected during tx processing",
"err", err,
)
panic(err)
}
module, code := errors.Code(err)

return types.ResponseDeliverTx{
Expand Down
Loading

0 comments on commit 12e21e6

Please sign in to comment.