Skip to content

Commit

Permalink
go/consensus/tendermint: Use MKVS for storing application state
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Feb 20, 2020
1 parent 0ca1065 commit 22af83b
Show file tree
Hide file tree
Showing 75 changed files with 2,391 additions and 2,020 deletions.
1 change: 1 addition & 0 deletions .changelog/1898.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Use MKVS for storing application state
2 changes: 1 addition & 1 deletion go/common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (
//
// NOTE: Any change in the major or minor versions are considered
// breaking changes for the protocol.
ConsensusProtocol = Version{Major: 0, Minor: 23, Patch: 0}
ConsensusProtocol = Version{Major: 0, Minor: 24, Patch: 0}

// Tendermint exposes the tendermint core version.
Tendermint = parseSemVerStr(version.TMCoreSemVer)
Expand Down
81 changes: 45 additions & 36 deletions go/consensus/tendermint/abci/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package abci
import (
"bytes"
"context"
"fmt"
"time"

"github.com/tendermint/iavl"
"github.com/tendermint/tendermint/abci/types"

"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/common/logging"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/api"
mkvs "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel"
)

type contextKey struct{}
Expand Down Expand Up @@ -57,7 +58,7 @@ func (m ContextMode) String() string {

// Context is the context of processing a transaction/block.
type Context struct {
ctx context.Context
context.Context

mode ContextMode
currentTime time.Time
Expand All @@ -69,22 +70,13 @@ type Context struct {
txSigner signature.PublicKey

appState ApplicationState
state *iavl.MutableTree
state mkvs.Tree
blockHeight int64
blockCtx *BlockContext

logger *logging.Logger
}
stateCheckpoint *StateCheckpoint

// NewMockContext creates a new mock context for use in tests.
func NewMockContext(mode ContextMode, now time.Time) *Context {
return &Context{
ctx: context.Background(),
mode: mode,
currentTime: now,
gasAccountant: NewNopGasAccountant(),
logger: logging.GetLogger("consensus/tendermint/abci").With("mode", mode),
}
logger *logging.Logger
}

// FromCtx extracts an ABCI context from a context.Context if one has been
Expand All @@ -99,25 +91,26 @@ func FromCtx(ctx context.Context) *Context {
// After calling this method, the context should no longer be used.
func (c *Context) Close() {
if c.IsSimulation() {
c.state.Rollback()
if tree, ok := c.state.(mkvs.ClosableTree); ok {
tree.Close()
}
}

c.events = nil
c.appState = nil
c.state = nil
c.blockCtx = nil

if c.stateCheckpoint != nil {
panic("context: open checkpoint was never committed or discarded")
}
}

// Logger returns the logger associated with this context.
func (c *Context) Logger() *logging.Logger {
return c.logger
}

// Ctx returns a context.Context that is associated with this ABCI context.
func (c *Context) Ctx() context.Context {
return c.ctx
}

// Mode returns the context mode.
func (c *Context) Mode() ContextMode {
return c.mode
Expand Down Expand Up @@ -225,8 +218,11 @@ func (c *Context) Now() time.Time {
return c.currentTime
}

// State returns the mutable state tree.
func (c *Context) State() *iavl.MutableTree {
// State returns the state tree associated with this context.
func (c *Context) State() mkvs.KeyValueTree {
if c.stateCheckpoint != nil {
return c.stateCheckpoint.overlay
}
return c.state
}

Expand All @@ -253,34 +249,47 @@ func (c *Context) BlockContext() *BlockContext {
return c.blockCtx
}

// NewStateCheckpoint creates a new state checkpoint.
func (c *Context) NewStateCheckpoint() *StateCheckpoint {
return &StateCheckpoint{
ImmutableTree: *c.State().ImmutableTree,
ctx: c,
// StartCheckpoint starts a new state checkpoint. Any further updates to the context's state will
// be performed against the checkpoint and will only be committed in case of an explicit Commit.
//
// The caller must make sure to call either Close or Commit on the checkpoint, otherwise this will
// leak resources.
func (c *Context) StartCheckpoint() *StateCheckpoint {
if c.stateCheckpoint != nil {
panic("context: nested checkpoints are not allowed")
}
c.stateCheckpoint = &StateCheckpoint{
ctx: c,
overlay: mkvs.NewOverlay(c.state),
}
return c.stateCheckpoint
}

// StateCheckpoint is a state checkpoint that can be used to rollback state.
type StateCheckpoint struct {
iavl.ImmutableTree

ctx *Context
ctx *Context
overlay mkvs.OverlayTree
}

// Close releases resources associated with the checkpoint.
// Close releases resources associated with the checkpoint without committing it.
func (sc *StateCheckpoint) Close() {
if sc.ctx == nil {
return
}
sc.overlay.Close()
sc.ctx.stateCheckpoint = nil
sc.ctx = nil
}

// Rollback rolls back the active state to the one from the checkpoint.
func (sc *StateCheckpoint) Rollback() {
// Commit commits any changes performed since the checkpoint was created.
func (sc *StateCheckpoint) Commit() {
if sc.ctx == nil {
return
}
st := sc.ctx.State()
st.Rollback()
st.ImmutableTree = &sc.ImmutableTree
if err := sc.overlay.Commit(sc.ctx); err != nil {
panic(fmt.Errorf("context: failed to commit checkpoint: %w", err))
}
sc.Close()
}

// BlockContextKey is an interface for a block context key.
Expand Down
32 changes: 32 additions & 0 deletions go/consensus/tendermint/abci/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package abci

import (
"errors"
"fmt"
)

type errorUnavailableState struct {
inner error
}

func (e *errorUnavailableState) Error() string {
return fmt.Sprintf("unavailable/corrupted state: %s", e.inner.Error())
}

func (e *errorUnavailableState) Unwrap() error {
return e.inner
}

// UnavailableStateError wraps an error in an unavailable state error.
func UnavailableStateError(err error) error {
if err == nil {
return nil
}
return &errorUnavailableState{err}
}

// IsUnavailableStateError returns true if any error in err's chain is an unavailable state error.
func IsUnavailableStateError(err error) bool {
var e *errorUnavailableState
return errors.As(err, &e)
}
41 changes: 25 additions & 16 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

const (
stateKeyGenesisDigest = "OasisGenesisDigest"
stateKeyGenesisRequest = "OasisGenesisRequest"
stateKeyInitChainEvents = "OasisInitChainEvents"

metricsUpdateInterval = 10 * time.Second
Expand All @@ -57,6 +56,7 @@ var (
// ApplicationConfig is the configuration for the consensus application.
type ApplicationConfig struct {
DataDir string
StorageBackend string
Pruning PruneConfig
HaltEpochHeight epochtime.EpochTime
MinGasPrice uint64
Expand Down Expand Up @@ -218,11 +218,6 @@ func (a *ApplicationServer) RegisterHaltHook(hook func(ctx context.Context, bloc
a.mux.registerHaltHook(hook)
}

// Pruner returns the ABCI state pruner.
func (a *ApplicationServer) Pruner() StatePruner {
return a.mux.state.statePruner
}

// SetEpochtime sets the mux epochtime.
//
// Epochtime must be set before the multiplexer can be used.
Expand Down Expand Up @@ -396,7 +391,10 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
tmp := bytes.NewBuffer(nil)
_ = types.WriteMessage(&req, tmp)
genesisDigest := sha512.Sum512_256(tmp.Bytes())
mux.state.deliverTxTree.Set([]byte(stateKeyGenesisDigest), genesisDigest[:])
err = mux.state.deliverTxTree.Insert(mux.state.ctx, []byte(stateKeyGenesisDigest), genesisDigest[:])
if err != nil {
panic(err)
}

resp := mux.BaseApplication.InitChain(req)

Expand All @@ -422,11 +420,6 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
mux.logger.Debug("Genesis hook dispatch complete")
}()

// TODO: remove stateKeyGenesisRequest here, see oasis-core#2426
b, _ = req.Marshal()
mux.state.deliverTxTree.Set([]byte(stateKeyGenesisRequest), b)
mux.state.checkTxTree.Set([]byte(stateKeyGenesisRequest), b)

// Call InitChain() on all applications.
mux.logger.Debug("InitChain: initializing applications")

Expand All @@ -452,7 +445,10 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
// Since returning emitted events doesn't work for InitChain() response yet,
// we store those and return them in BeginBlock().
evBinary := cbor.Marshal(ctx.GetEvents())
mux.state.deliverTxTree.Set([]byte(stateKeyInitChainEvents), evBinary)
err = mux.state.deliverTxTree.Insert(mux.state.ctx, []byte(stateKeyInitChainEvents), evBinary)
if err != nil {
panic(err)
}

return resp
}
Expand Down Expand Up @@ -483,7 +479,7 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
ctx := mux.state.NewContext(ContextBeginBlock, mux.currentTime)
defer ctx.Close()

currentEpoch, err := mux.state.GetCurrentEpoch(ctx.Ctx())
currentEpoch, err := mux.state.GetCurrentEpoch(ctx)
if err != nil {
panic("mux: can't get current epoch in BeginBlock")
}
Expand Down Expand Up @@ -547,14 +543,19 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
// During the first block, also collect and prepend application events
// generated during InitChain to BeginBlock events.
if mux.state.BlockHeight() == 0 {
_, evBinary := mux.state.deliverTxTree.Get([]byte(stateKeyInitChainEvents))
evBinary, err := mux.state.deliverTxTree.Get(ctx, []byte(stateKeyInitChainEvents))
if err != nil {
panic(fmt.Errorf("mux: BeginBlock: failed to query init chain events: %w", err))
}
if evBinary != nil {
var events []types.Event
_ = cbor.Unmarshal(evBinary, &events)

response.Events = append(events, response.Events...)

mux.state.deliverTxTree.Remove([]byte(stateKeyInitChainEvents))
if err := mux.state.deliverTxTree.Remove(ctx, []byte(stateKeyInitChainEvents)); err != nil {
panic(fmt.Errorf("mux: BeginBlock: failed to remove init chain events: %w", err))
}
}
}

Expand Down Expand Up @@ -729,6 +730,14 @@ func (mux *abciMux) DeliverTx(req types.RequestDeliverTx) types.ResponseDeliverT
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
if IsUnavailableStateError(err) {
// Make sure to not commit any transactions which include results based on unavailable
// and/or corrupted state -- doing so can further corrupt state.
ctx.Logger().Error("unavailable and/or corrupted state detected during tx processing",
"err", err,
)
panic(err)
}
module, code := errors.Code(err)

return types.ResponseDeliverTx{
Expand Down
Loading

0 comments on commit 22af83b

Please sign in to comment.