Skip to content

Commit

Permalink
go/consensus/tendermint: Use MKVS for storing application state
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Apr 1, 2020
1 parent 54a287b commit 7f27af0
Show file tree
Hide file tree
Showing 81 changed files with 3,183 additions and 2,498 deletions.
1 change: 1 addition & 0 deletions .buildkite/code.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ steps:
- coverage-misc.txt
- coverage-oasis-node.txt
- coverage-mkvs.txt
- /tmp/oasis-node-test_*/test-node.log
plugins:
<<: *docker_plugin

Expand Down
1 change: 1 addition & 0 deletions .changelog/1898.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Use MKVS for storing application state
1 change: 1 addition & 0 deletions .changelog/2710.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/consensus/tendermint: Store consensus parameters in ABCI state
4 changes: 4 additions & 0 deletions go/consensus/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ var (

// ErrOversizedTx is the error returned when the given transaction is too big to be processed.
ErrOversizedTx = errors.New(moduleName, 2, "consensus: oversized transaction")

// ErrVersionNotFound is the error returned when the given version (height) cannot be found,
// possibly because it was pruned.
ErrVersionNotFound = errors.New(moduleName, 3, "consensus: version not found")
)

// ClientBackend is a limited consensus interface used by clients that
Expand Down
116 changes: 59 additions & 57 deletions go/consensus/tendermint/abci/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import (
consensus "github.com/oasislabs/oasis-core/go/consensus/api"
"github.com/oasislabs/oasis-core/go/consensus/api/transaction"
consensusGenesis "github.com/oasislabs/oasis-core/go/consensus/genesis"
abciState "github.com/oasislabs/oasis-core/go/consensus/tendermint/abci/state"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/api"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
genesis "github.com/oasislabs/oasis-core/go/genesis/api"
upgrade "github.com/oasislabs/oasis-core/go/upgrade/api"
)

const (
stateKeyGenesisDigest = "OasisGenesisDigest"
stateKeyGenesisRequest = "OasisGenesisRequest"
stateKeyInitChainEvents = "OasisInitChainEvents"

metricsUpdateInterval = 10 * time.Second
Expand All @@ -60,6 +61,7 @@ var (
// ApplicationConfig is the configuration for the consensus application.
type ApplicationConfig struct {
DataDir string
StorageBackend string
Pruning PruneConfig
HaltEpochHeight epochtime.EpochTime
MinGasPrice uint64
Expand All @@ -82,7 +84,7 @@ type TransactionAuthHandler interface {
// low.
//
// The context may be modified to configure a gas accountant.
AuthenticateTx(ctx *Context, tx *transaction.Transaction) error
AuthenticateTx(ctx *api.Context, tx *transaction.Transaction) error
}

// Application is the interface implemented by multiplexed Oasis-specific
Expand Down Expand Up @@ -115,46 +117,46 @@ type Application interface {

// OnRegister is the function that is called when the Application
// is registered with the multiplexer instance.
OnRegister(state ApplicationState)
OnRegister(state api.ApplicationState)

// OnCleanup is the function that is called when the ApplicationServer
// has been halted.
OnCleanup()

// ExecuteTx executes a transaction.
ExecuteTx(*Context, *transaction.Transaction) error
ExecuteTx(*api.Context, *transaction.Transaction) error

// ForeignExecuteTx delivers a transaction of another application for
// processing.
//
// This can be used to run post-tx hooks when dependencies exist
// between applications.
ForeignExecuteTx(*Context, Application, *transaction.Transaction) error
ForeignExecuteTx(*api.Context, Application, *transaction.Transaction) error

// InitChain initializes the blockchain with validators and other
// info from TendermintCore.
//
// Note: Errors are irrecoverable and will result in a panic.
InitChain(*Context, types.RequestInitChain, *genesis.Document) error
InitChain(*api.Context, types.RequestInitChain, *genesis.Document) error

// BeginBlock signals the beginning of a block.
//
// Returned tags will be added to the current block.
//
// Note: Errors are irrecoverable and will result in a panic.
BeginBlock(*Context, types.RequestBeginBlock) error
BeginBlock(*api.Context, types.RequestBeginBlock) error

// EndBlock signals the end of a block, returning changes to the
// validator set.
//
// Note: Errors are irrecoverable and will result in a panic.
EndBlock(*Context, types.RequestEndBlock) (types.ResponseEndBlock, error)
EndBlock(*api.Context, types.RequestEndBlock) (types.ResponseEndBlock, error)

// FireTimer is called within BeginBlock before any other processing
// takes place for each timer that should fire.
//
// Note: Errors are irrecoverable and will result in a panic.
FireTimer(*Context, *Timer) error
FireTimer(*api.Context, *Timer) error

// Commit is omitted because Applications will work on a cache of
// the state bound to the multiplexer.
Expand Down Expand Up @@ -222,11 +224,6 @@ func (a *ApplicationServer) RegisterHaltHook(hook func(ctx context.Context, bloc
a.mux.registerHaltHook(hook)
}

// Pruner returns the ABCI state pruner.
func (a *ApplicationServer) Pruner() StatePruner {
return a.mux.state.statePruner
}

// SetEpochtime sets the mux epochtime.
//
// Epochtime must be set before the multiplexer can be used.
Expand Down Expand Up @@ -399,7 +396,10 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
tmp := bytes.NewBuffer(nil)
_ = types.WriteMessage(&req, tmp)
genesisDigest := sha512.Sum512_256(tmp.Bytes())
mux.state.deliverTxTree.Set([]byte(stateKeyGenesisDigest), genesisDigest[:])
err = mux.state.deliverTxTree.Insert(mux.state.ctx, []byte(stateKeyGenesisDigest), genesisDigest[:])
if err != nil {
panic(err)
}

resp := mux.BaseApplication.InitChain(req)

Expand All @@ -425,20 +425,10 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
mux.logger.Debug("Genesis hook dispatch complete")
}()

// TODO: remove stateKeyGenesisRequest here, see oasis-core#2426
b, err = req.Marshal()
if err != nil {
// It's suspected this wouldn't happen normally, because ABCI is meant to go out
// through protobuf anyway, so the request ought to be representable.
panic(fmt.Sprintf("InitChain: marshalling RequestInitChain back to protobuf: %v", err))
}
mux.state.deliverTxTree.Set([]byte(stateKeyGenesisRequest), b)
mux.state.checkTxTree.Set([]byte(stateKeyGenesisRequest), b)

// Call InitChain() on all applications.
mux.logger.Debug("InitChain: initializing applications")

ctx := mux.state.NewContext(ContextInitChain, mux.currentTime)
ctx := mux.state.NewContext(api.ContextInitChain, mux.currentTime)
defer ctx.Close()

for _, app := range mux.appsByLexOrder {
Expand All @@ -460,11 +450,19 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
// Since returning emitted events doesn't work for InitChain() response yet,
// we store those and return them in BeginBlock().
evBinary := cbor.Marshal(ctx.GetEvents())
mux.state.deliverTxTree.Set([]byte(stateKeyInitChainEvents), evBinary)
err = ctx.State().Insert(ctx, []byte(stateKeyInitChainEvents), evBinary)
if err != nil {
panic(err)
}

// Refresh consensus parameters.
if err = mux.state.refreshConsensusParameters(); err != nil {
panic(fmt.Errorf("mux: failed to refresh consensus parameters: %w", err))
// Initialize consensus parameters.
state := abciState.NewMutableState(ctx.State())
if err = state.SetConsensusParameters(ctx, &st.Consensus.Parameters); err != nil {
panic(fmt.Errorf("mux: failed to set consensus parameters: %w", err))
}
// Since InitChain does not have a commit step, perform some state updates here.
if err = mux.state.doInitChain(st.Time); err != nil {
panic(fmt.Errorf("mux: failed to init chain state: %w", err))
}

return resp
Expand All @@ -485,23 +483,20 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
mux.lastBeginBlock = blockHeight
mux.currentTime = req.Header.Time

params, err := mux.state.ConsensusParameters()
if err != nil {
panic(fmt.Errorf("failed to fetch consensus parameters: %w", err))
}
params := mux.state.ConsensusParameters()

// Create empty block context.
mux.state.blockCtx = NewBlockContext()
mux.state.blockCtx = api.NewBlockContext()
if params.MaxBlockGas > 0 {
mux.state.blockCtx.Set(GasAccountantKey{}, NewGasAccountant(params.MaxBlockGas))
mux.state.blockCtx.Set(api.GasAccountantKey{}, api.NewGasAccountant(params.MaxBlockGas))
} else {
mux.state.blockCtx.Set(GasAccountantKey{}, NewNopGasAccountant())
mux.state.blockCtx.Set(api.GasAccountantKey{}, api.NewNopGasAccountant())
}
// Create BeginBlock context.
ctx := mux.state.NewContext(ContextBeginBlock, mux.currentTime)
ctx := mux.state.NewContext(api.ContextBeginBlock, mux.currentTime)
defer ctx.Close()

currentEpoch, err := mux.state.GetCurrentEpoch(ctx.Ctx())
currentEpoch, err := mux.state.GetCurrentEpoch(ctx)
if err != nil {
panic("mux: can't get current epoch in BeginBlock")
}
Expand Down Expand Up @@ -565,30 +560,32 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
// During the first block, also collect and prepend application events
// generated during InitChain to BeginBlock events.
if mux.state.BlockHeight() == 0 {
_, evBinary := mux.state.deliverTxTree.Get([]byte(stateKeyInitChainEvents))
evBinary, err := mux.state.deliverTxTree.Get(ctx, []byte(stateKeyInitChainEvents))
if err != nil {
panic(fmt.Errorf("mux: BeginBlock: failed to query init chain events: %w", err))
}
if evBinary != nil {
var events []types.Event
_ = cbor.Unmarshal(evBinary, &events)

response.Events = append(events, response.Events...)

mux.state.deliverTxTree.Remove([]byte(stateKeyInitChainEvents))
if err := mux.state.deliverTxTree.Remove(ctx, []byte(stateKeyInitChainEvents)); err != nil {
panic(fmt.Errorf("mux: BeginBlock: failed to remove init chain events: %w", err))
}
}
}

return response
}

func (mux *abciMux) decodeTx(ctx *Context, rawTx []byte) (*transaction.Transaction, *transaction.SignedTransaction, error) {
func (mux *abciMux) decodeTx(ctx *api.Context, rawTx []byte) (*transaction.Transaction, *transaction.SignedTransaction, error) {
if mux.state.haltMode {
ctx.Logger().Debug("executeTx: in halt, rejecting all transactions")
return nil, nil, fmt.Errorf("halt mode, rejecting all transactions")
}

params, err := mux.state.ConsensusParameters()
if err != nil {
return nil, nil, fmt.Errorf("failed to fetch consensus parameters: %w", err)
}
params := mux.state.ConsensusParameters()
if params.MaxTxSize > 0 && uint64(len(rawTx)) > params.MaxTxSize {
// This deliberately avoids logging the rawTx since spamming the
// logs is also bad.
Expand Down Expand Up @@ -623,7 +620,7 @@ func (mux *abciMux) decodeTx(ctx *Context, rawTx []byte) (*transaction.Transacti
return &tx, &sigTx, nil
}

func (mux *abciMux) processTx(ctx *Context, tx *transaction.Transaction, txSize int) error {
func (mux *abciMux) processTx(ctx *api.Context, tx *transaction.Transaction, txSize int) error {
// Pass the transaction through the fee handler if configured.
if txAuthHandler := mux.state.txAuthHandler; txAuthHandler != nil {
if err := txAuthHandler.AuthenticateTx(ctx, tx); err != nil {
Expand All @@ -638,11 +635,8 @@ func (mux *abciMux) processTx(ctx *Context, tx *transaction.Transaction, txSize
}

// Charge gas based on the size of the transaction.
params, err := mux.state.ConsensusParameters()
if err != nil {
return fmt.Errorf("failed to fetch consensus parameters: %w", err)
}
if err = ctx.Gas().UseGas(txSize, consensusGenesis.GasOpTxByte, params.GasCosts); err != nil {
params := mux.state.ConsensusParameters()
if err := ctx.Gas().UseGas(txSize, consensusGenesis.GasOpTxByte, params.GasCosts); err != nil {
return err
}

Expand Down Expand Up @@ -680,7 +674,7 @@ func (mux *abciMux) processTx(ctx *Context, tx *transaction.Transaction, txSize
return nil
}

func (mux *abciMux) executeTx(ctx *Context, rawTx []byte) error {
func (mux *abciMux) executeTx(ctx *api.Context, rawTx []byte) error {
tx, sigTx, err := mux.decodeTx(ctx, rawTx)
if err != nil {
return err
Expand All @@ -697,7 +691,7 @@ func (mux *abciMux) EstimateGas(caller signature.PublicKey, tx *transaction.Tran
// be called in parallel to the consensus layer and to other invocations.
//
// For simulation mode, time will be filled in by NewContext from last block time.
ctx := mux.state.NewContext(ContextSimulateTx, time.Time{})
ctx := mux.state.NewContext(api.ContextSimulateTx, time.Time{})
defer ctx.Close()

// Modify transaction to include maximum possible gas in order to estimate the upper limit on
Expand Down Expand Up @@ -769,7 +763,7 @@ func (mux *abciMux) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
}
}

ctx := mux.state.NewContext(ContextCheckTx, mux.currentTime)
ctx := mux.state.NewContext(api.ContextCheckTx, mux.currentTime)
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
Expand Down Expand Up @@ -805,10 +799,18 @@ func (mux *abciMux) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
}

func (mux *abciMux) DeliverTx(req types.RequestDeliverTx) types.ResponseDeliverTx {
ctx := mux.state.NewContext(ContextDeliverTx, mux.currentTime)
ctx := mux.state.NewContext(api.ContextDeliverTx, mux.currentTime)
defer ctx.Close()

if err := mux.executeTx(ctx, req.Tx); err != nil {
if api.IsUnavailableStateError(err) {
// Make sure to not commit any transactions which include results based on unavailable
// and/or corrupted state -- doing so can further corrupt state.
ctx.Logger().Error("unavailable and/or corrupted state detected during tx processing",
"err", err,
)
panic(err)
}
module, code := errors.Code(err)

return types.ResponseDeliverTx{
Expand Down Expand Up @@ -840,7 +842,7 @@ func (mux *abciMux) EndBlock(req types.RequestEndBlock) types.ResponseEndBlock {
return types.ResponseEndBlock{}
}

ctx := mux.state.NewContext(ContextEndBlock, mux.currentTime)
ctx := mux.state.NewContext(api.ContextEndBlock, mux.currentTime)
defer ctx.Close()

// Fire all application timers first.
Expand Down
Loading

0 comments on commit 7f27af0

Please sign in to comment.