Skip to content

Commit

Permalink
go/consensus/tendermint: Use MKVS for storing application state
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Mar 26, 2020
1 parent b4546d5 commit 0248ed8
Show file tree
Hide file tree
Showing 81 changed files with 3,161 additions and 2,475 deletions.
1 change: 1 addition & 0 deletions .buildkite/code.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ steps:
- coverage-misc.txt
- coverage-oasis-node.txt
- coverage-urkel.txt
- /tmp/oasis-node-test_*/test-node.log
plugins:
<<: *docker_plugin

Expand Down
1 change: 1 addition & 0 deletions .changelog/1898.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Use MKVS for storing application state
1 change: 1 addition & 0 deletions .changelog/2710.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Store consensus parameters in ABCI state
2 changes: 1 addition & 1 deletion go/common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
//
// NOTE: Any change in the major or minor versions are considered
// breaking changes for the protocol.
ConsensusProtocol = Version{Major: 0, Minor: 23, Patch: 0}
ConsensusProtocol = Version{Major: 0, Minor: 24, Patch: 0}

// Tendermint exposes the tendermint core version.
Tendermint = parseSemVerStr(version.TMCoreSemVer)
Expand Down
4 changes: 4 additions & 0 deletions go/consensus/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ var (

// ErrOversizedTx is the error returned when the given transaction is too big to be processed.
ErrOversizedTx = errors.New(moduleName, 2, "consensus: oversized transaction")

// ErrVersionNotFound is the error returned when the given version (height) cannot be found,
// possibly because it was pruned.
ErrVersionNotFound = errors.New(moduleName, 3, "consensus: version not found")
)

// ClientBackend is a limited consensus interface used by clients that
Expand Down
104 changes: 57 additions & 47 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import (
"github.com/oasislabs/oasis-core/go/common/version"
consensus "github.com/oasislabs/oasis-core/go/consensus/api"
"github.com/oasislabs/oasis-core/go/consensus/api/transaction"
abciState "github.com/oasislabs/oasis-core/go/consensus/tendermint/abci/state"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/api"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
genesis "github.com/oasislabs/oasis-core/go/genesis/api"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

const (
stateKeyGenesisDigest = "OasisGenesisDigest"
stateKeyGenesisRequest = "OasisGenesisRequest"
stateKeyInitChainEvents = "OasisInitChainEvents"

metricsUpdateInterval = 10 * time.Second
Expand All @@ -58,6 +59,7 @@ var (
// ApplicationConfig is the configuration for the consensus application.
type ApplicationConfig struct {
DataDir string
StorageBackend string
Pruning PruneConfig
HaltEpochHeight epochtime.EpochTime
MinGasPrice uint64
Expand All @@ -80,7 +82,7 @@ type TransactionAuthHandler interface {
// low.
//
// The context may be modified to configure a gas accountant.
AuthenticateTx(ctx *Context, tx *transaction.Transaction) error
AuthenticateTx(ctx *api.Context, tx *transaction.Transaction) error
}

// Application is the interface implemented by multiplexed Oasis-specific
Expand Down Expand Up @@ -113,46 +115,46 @@ type Application interface {

// OnRegister is the function that is called when the Application
// is registered with the multiplexer instance.
OnRegister(state ApplicationState)
OnRegister(state api.ApplicationState)

// OnCleanup is the function that is called when the ApplicationServer
// has been halted.
OnCleanup()

// ExecuteTx executes a transaction.
ExecuteTx(*Context, *transaction.Transaction) error
ExecuteTx(*api.Context, *transaction.Transaction) error

// ForeignExecuteTx delivers a transaction of another application for
// processing.
//
// This can be used to run post-tx hooks when dependencies exist
// between applications.
ForeignExecuteTx(*Context, Application, *transaction.Transaction) error
ForeignExecuteTx(*api.Context, Application, *transaction.Transaction) error

// InitChain initializes the blockchain with validators and other
// info from TendermintCore.
//
// Note: Errors are irrecoverable and will result in a panic.
InitChain(*Context, types.RequestInitChain, *genesis.Document) error
InitChain(*api.Context, types.RequestInitChain, *genesis.Document) error

// BeginBlock signals the beginning of a block.
//
// Returned tags will be added to the current block.
//
// Note: Errors are irrecoverable and will result in a panic.
BeginBlock(*Context, types.RequestBeginBlock) error
BeginBlock(*api.Context, types.RequestBeginBlock) error

// EndBlock signals the end of a block, returning changes to the
// validator set.
//
// Note: Errors are irrecoverable and will result in a panic.
EndBlock(*Context, types.RequestEndBlock) (types.ResponseEndBlock, error)
EndBlock(*api.Context, types.RequestEndBlock) (types.ResponseEndBlock, error)

// FireTimer is called within BeginBlock before any other processing
// takes place for each timer that should fire.
//
// Note: Errors are irrecoverable and will result in a panic.
FireTimer(*Context, *Timer) error
FireTimer(*api.Context, *Timer) error

// Commit is omitted because Applications will work on a cache of
// the state bound to the multiplexer.
Expand Down Expand Up @@ -220,11 +222,6 @@ func (a *ApplicationServer) RegisterHaltHook(hook func(ctx context.Context, bloc
a.mux.registerHaltHook(hook)
}

// Pruner returns the ABCI state pruner.
func (a *ApplicationServer) Pruner() StatePruner {
return a.mux.state.statePruner
}

// SetEpochtime sets the mux epochtime.
//
// Epochtime must be set before the multiplexer can be used.
Expand Down Expand Up @@ -397,7 +394,10 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
tmp := bytes.NewBuffer(nil)
_ = types.WriteMessage(&req, tmp)
genesisDigest := sha512.Sum512_256(tmp.Bytes())
mux.state.deliverTxTree.Set([]byte(stateKeyGenesisDigest), genesisDigest[:])
err = mux.state.deliverTxTree.Insert(mux.state.ctx, []byte(stateKeyGenesisDigest), genesisDigest[:])
if err != nil {
panic(err)
}

resp := mux.BaseApplication.InitChain(req)

Expand All @@ -423,15 +423,10 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
mux.logger.Debug("Genesis hook dispatch complete")
}()

// TODO: remove stateKeyGenesisRequest here, see oasis-core#2426
b, _ = req.Marshal()
mux.state.deliverTxTree.Set([]byte(stateKeyGenesisRequest), b)
mux.state.checkTxTree.Set([]byte(stateKeyGenesisRequest), b)

// Call InitChain() on all applications.
mux.logger.Debug("InitChain: initializing applications")

ctx := mux.state.NewContext(ContextInitChain, mux.currentTime)
ctx := mux.state.NewContext(api.ContextInitChain, mux.currentTime)
defer ctx.Close()

for _, app := range mux.appsByLexOrder {
Expand All @@ -453,11 +448,19 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
// Since returning emitted events doesn't work for InitChain() response yet,
// we store those and return them in BeginBlock().
evBinary := cbor.Marshal(ctx.GetEvents())
mux.state.deliverTxTree.Set([]byte(stateKeyInitChainEvents), evBinary)
err = ctx.State().Insert(ctx, []byte(stateKeyInitChainEvents), evBinary)
if err != nil {
panic(err)
}

// Refresh consensus parameters.
if err = mux.state.refreshConsensusParameters(); err != nil {
panic(fmt.Errorf("mux: failed to refresh consensus parameters: %w", err))
// Initialize consensus parameters.
state := abciState.NewMutableState(ctx.State())
if err = state.SetConsensusParameters(ctx, &st.Consensus.Parameters); err != nil {
panic(fmt.Errorf("mux: failed to set consensus parameters: %w", err))
}
// Since InitChain does not have a commit step, perform some state updates here.
if err = mux.state.doInitChain(st.Time); err != nil {
panic(fmt.Errorf("mux: failed to init chain state: %w", err))
}

return resp
Expand All @@ -478,23 +481,20 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
mux.lastBeginBlock = blockHeight
mux.currentTime = req.Header.Time

params, err := mux.state.ConsensusParameters()
if err != nil {
panic(fmt.Errorf("failed to fetch consensus parameters: %w", err))
}
params := mux.state.ConsensusParameters()

// Create empty block context.
mux.state.blockCtx = NewBlockContext()
mux.state.blockCtx = api.NewBlockContext()
if params.MaxBlockGas > 0 {
mux.state.blockCtx.Set(GasAccountantKey{}, NewGasAccountant(params.MaxBlockGas))
mux.state.blockCtx.Set(api.GasAccountantKey{}, api.NewGasAccountant(params.MaxBlockGas))
} else {
mux.state.blockCtx.Set(GasAccountantKey{}, NewNopGasAccountant())
mux.state.blockCtx.Set(api.GasAccountantKey{}, api.NewNopGasAccountant())
}
// Create BeginBlock context.
ctx := mux.state.NewContext(ContextBeginBlock, mux.currentTime)
ctx := mux.state.NewContext(api.ContextBeginBlock, mux.currentTime)
defer ctx.Close()

currentEpoch, err := mux.state.GetCurrentEpoch(ctx.Ctx())
currentEpoch, err := mux.state.GetCurrentEpoch(ctx)
if err != nil {
panic("mux: can't get current epoch in BeginBlock")
}
Expand Down Expand Up @@ -558,30 +558,32 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
// During the first block, also collect and prepend application events
// generated during InitChain to BeginBlock events.
if mux.state.BlockHeight() == 0 {
_, evBinary := mux.state.deliverTxTree.Get([]byte(stateKeyInitChainEvents))
evBinary, err := mux.state.deliverTxTree.Get(ctx, []byte(stateKeyInitChainEvents))
if err != nil {
panic(fmt.Errorf("mux: BeginBlock: failed to query init chain events: %w", err))
}
if evBinary != nil {
var events []types.Event
_ = cbor.Unmarshal(evBinary, &events)

response.Events = append(events, response.Events...)

mux.state.deliverTxTree.Remove([]byte(stateKeyInitChainEvents))
if err := mux.state.deliverTxTree.Remove(ctx, []byte(stateKeyInitChainEvents)); err != nil {
panic(fmt.Errorf("mux: BeginBlock: failed to remove init chain events: %w", err))
}
}
}

return response
}

func (mux *abciMux) decodeTx(ctx *Context, rawTx []byte) (*transaction.Transaction, *transaction.SignedTransaction, error) {
func (mux *abciMux) decodeTx(ctx *api.Context, rawTx []byte) (*transaction.Transaction, *transaction.SignedTransaction, error) {
if mux.state.haltMode {
ctx.Logger().Debug("executeTx: in halt, rejecting all transactions")
return nil, nil, fmt.Errorf("halt mode, rejecting all transactions")
}

params, err := mux.state.ConsensusParameters()
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch consensus parameters: %w", err)
}
params := mux.state.ConsensusParameters()
if params.MaxTxSize > 0 && uint64(len(rawTx)) > params.MaxTxSize {
// This deliberately avoids logging the rawTx since spamming the
// logs is also bad.
Expand Down Expand Up @@ -616,7 +618,7 @@ func (mux *abciMux) decodeTx(ctx *Context, rawTx []byte) (*transaction.Transacti
return &tx, &sigTx, nil
}

func (mux *abciMux) processTx(ctx *Context, tx *transaction.Transaction) error {
func (mux *abciMux) processTx(ctx *api.Context, tx *transaction.Transaction) error {
// Pass the transaction through the fee handler if configured.
if txAuthHandler := mux.state.txAuthHandler; txAuthHandler != nil {
if err := txAuthHandler.AuthenticateTx(ctx, tx); err != nil {
Expand Down Expand Up @@ -664,7 +666,7 @@ func (mux *abciMux) processTx(ctx *Context, tx *transaction.Transaction) error {
return nil
}

func (mux *abciMux) executeTx(ctx *Context, rawTx []byte) error {
func (mux *abciMux) executeTx(ctx *api.Context, rawTx []byte) error {
tx, sigTx, err := mux.decodeTx(ctx, rawTx)
if err != nil {
return err
Expand All @@ -681,7 +683,7 @@ func (mux *abciMux) EstimateGas(caller signature.PublicKey, tx *transaction.Tran
// be called in parallel to the consensus layer and to other invocations.
//
// For simulation mode, time will be filled in by NewContext from last block time.
ctx := mux.state.NewContext(ContextSimulateTx, time.Time{})
ctx := mux.state.NewContext(api.ContextSimulateTx, time.Time{})
defer ctx.Close()

ctx.SetTxSigner(caller)
Expand Down Expand Up @@ -738,7 +740,7 @@ func (mux *abciMux) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
}
}

ctx := mux.state.NewContext(ContextCheckTx, mux.currentTime)
ctx := mux.state.NewContext(api.ContextCheckTx, mux.currentTime)
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
Expand Down Expand Up @@ -774,10 +776,18 @@ func (mux *abciMux) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
}

func (mux *abciMux) DeliverTx(req types.RequestDeliverTx) types.ResponseDeliverTx {
ctx := mux.state.NewContext(ContextDeliverTx, mux.currentTime)
ctx := mux.state.NewContext(api.ContextDeliverTx, mux.currentTime)
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
if api.IsUnavailableStateError(err) {
// Make sure to not commit any transactions which include results based on unavailable
// and/or corrupted state -- doing so can further corrupt state.
ctx.Logger().Error("unavailable and/or corrupted state detected during tx processing",
"err", err,
)
panic(err)
}
module, code := errors.Code(err)

return types.ResponseDeliverTx{
Expand Down Expand Up @@ -809,7 +819,7 @@ func (mux *abciMux) EndBlock(req types.RequestEndBlock) types.ResponseEndBlock {
return types.ResponseEndBlock{}
}

ctx := mux.state.NewContext(ContextEndBlock, mux.currentTime)
ctx := mux.state.NewContext(api.ContextEndBlock, mux.currentTime)
defer ctx.Close()

// Fire all application timers first.
Expand Down
Loading

0 comments on commit 0248ed8

Please sign in to comment.