Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(baseapp): introduce mutex to state #18846

Merged
merged 4 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 41 additions & 41 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
// done after the finalizeBlockState and context have been set as it's persisted
// to state.
if req.ConsensusParams != nil {
err := app.StoreConsensusParams(app.finalizeBlockState.ctx, *req.ConsensusParams)
err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams)
if err != nil {
return nil, err
}
Expand All @@ -82,28 +82,28 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
// handler, the block height is zero by default. However, after Commit is called
// the height needs to reflect the true block height.
initHeader.Height = req.InitialHeight
app.checkState.ctx = app.checkState.ctx.WithBlockHeader(initHeader).
app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
})
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockHeader(initHeader).
}))
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
})
}))
}()

if app.initChainer == nil {
return &abci.ResponseInitChain{}, nil
}

// add block gas meter for any genesis transactions (allow infinite gas)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(storetypes.NewInfiniteGasMeter())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))

res, err := app.initChainer(app.finalizeBlockState.ctx, req)
res, err := app.initChainer(app.finalizeBlockState.Context(), req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -410,7 +410,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
return nil, errors.New("PrepareProposal called with invalid height")
}

app.prepareProposalState.ctx = app.getContextForProposal(app.prepareProposalState.ctx, req.Height).
app.prepareProposalState.SetContext(app.getContextForProposal(app.prepareProposalState.Context(), req.Height).
WithVoteInfos(toVoteInfo(req.LocalLastCommit.Votes)). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithProposer(req.ProposerAddress).
Expand All @@ -425,11 +425,11 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
ChainID: app.chainID,
Height: req.Height,
Time: req.Time,
})
}))

app.prepareProposalState.ctx = app.prepareProposalState.ctx.
WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.ctx)).
WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.ctx))
app.prepareProposalState.SetContext(app.prepareProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.Context())))

defer func() {
if err := recover(); err != nil {
Expand All @@ -444,7 +444,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
}
}()

resp, err = app.prepareProposal(app.prepareProposalState.ctx, req)
resp, err = app.prepareProposal(app.prepareProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to prepare proposal", "height", req.Height, "time", req.Time, "err", err)
return &abci.ResponsePrepareProposal{Txs: req.Txs}, nil
Expand Down Expand Up @@ -502,7 +502,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
app.setState(execModeFinalize, header)
}

app.processProposalState.ctx = app.getContextForProposal(app.processProposalState.ctx, req.Height).
app.processProposalState.SetContext(app.getContextForProposal(app.processProposalState.Context(), req.Height).
WithVoteInfos(req.ProposedLastCommit.Votes). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithHeaderHash(req.Hash).
Expand All @@ -519,11 +519,11 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
ChainID: app.chainID,
Height: req.Height,
Time: req.Time,
})
}))

app.processProposalState.ctx = app.processProposalState.ctx.
WithConsensusParams(app.GetConsensusParams(app.processProposalState.ctx)).
WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.ctx))
app.processProposalState.SetContext(app.processProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.processProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.Context())))

defer func() {
if err := recover(); err != nil {
Expand All @@ -538,7 +538,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
}
}()

resp, err = app.processProposal(app.processProposalState.ctx, req)
resp, err = app.processProposal(app.processProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
Expand Down Expand Up @@ -578,7 +578,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.RequestExtendVote) (
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -654,7 +654,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -696,7 +696,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r

// internalFinalizeBlock executes the block, called by the Optimistic
// Execution flow or by the FinalizeBlock ABCI method. The context received is
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.ctx
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context()
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
var events []abci.Event
Expand Down Expand Up @@ -732,7 +732,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}

// Context is now updated with Header information.
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().
WithBlockHeader(header).
WithHeaderHash(req.Hash).
WithHeaderInfo(coreheader.Info{
Expand All @@ -742,24 +742,24 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
Hash: req.Hash,
AppHash: app.LastCommitID().Hash,
}).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.ctx)).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())).
WithVoteInfos(req.DecidedLastCommit.Votes).
WithExecMode(sdk.ExecModeFinalize).
WithCometInfo(corecomet.Info{
Evidence: sdk.ToSDKEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: sdk.ToSDKCommitInfo(req.DecidedLastCommit),
})
}))

// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.ctx)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter)
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))

if app.checkState != nil {
app.checkState.ctx = app.checkState.ctx.
app.checkState.SetContext(app.checkState.Context().
WithBlockGasMeter(gasMeter).
WithHeaderHash(req.Hash)
WithHeaderHash(req.Hash))
}

if err := app.preBlock(req); err != nil {
Expand All @@ -783,8 +783,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
events = append(events, beginBlock.Events...)

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.ctx)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter)
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
Expand Down Expand Up @@ -825,7 +825,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.ctx)
endBlock, err := app.endBlock(app.finalizeBlockState.Context())
if err != nil {
return nil, err
}
Expand All @@ -839,7 +839,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())

return &abci.ResponseFinalizeBlock{
Events: events,
Expand Down Expand Up @@ -887,7 +887,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons

// call the streaming service hooks with the FinalizeBlock messages
for _, streamingListener := range app.streamingManager.ABCIListeners {
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.ctx, *req, *res); err != nil {
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); err != nil {
app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err)
}
}
Expand Down Expand Up @@ -921,11 +921,11 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error {
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
header := app.finalizeBlockState.ctx.BlockHeader()
header := app.finalizeBlockState.Context().BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

if app.precommiter != nil {
app.precommiter(app.finalizeBlockState.ctx)
app.precommiter(app.finalizeBlockState.Context())
}

rms, ok := app.cms.(*rootmulti.Store)
Expand All @@ -941,7 +941,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {

abciListeners := app.streamingManager.ABCIListeners
if len(abciListeners) > 0 {
ctx := app.finalizeBlockState.ctx
ctx := app.finalizeBlockState.Context()
blockHeight := ctx.BlockHeight()
changeSet := app.cms.PopStateCache()

Expand All @@ -961,7 +961,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
app.finalizeBlockState = nil

if app.prepareCheckStater != nil {
app.prepareCheckStater(app.checkState.ctx)
app.prepareCheckStater(app.checkState.Context())
}

// The SnapshotIfApplicable method will create the snapshot by starting the goroutine
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.ResponseQuery {
// access any state changes made in InitChain.
func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context {
if height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()

// clear all context data set during InitChain to avoid inconsistent behavior
ctx = ctx.WithHeaderInfo(coreheader.Info{}).WithBlockHeader(cmtproto.Header{})
Expand Down Expand Up @@ -1236,7 +1236,7 @@ func (app *BaseApp) CreateQueryContext(height int64, prove bool) (sdk.Context, e
ctx := sdk.NewContext(cacheMS, true, app.logger).
WithMinGasPrices(app.minGasPrices).
WithBlockHeight(height).
WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)).WithBlockHeader(app.checkState.ctx.BlockHeader())
WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)).WithBlockHeader(app.checkState.Context().BlockHeader())

if height != lastBlockHeight {
rms, ok := app.cms.(*rootmulti.Store)
Expand Down Expand Up @@ -1304,7 +1304,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 {
// evidence parameters instead of computing an estimated number of blocks based
// on the unbonding period and block commitment time as the two should be
// equivalent.
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 {
retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks
}
Expand Down
12 changes: 6 additions & 6 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (app *BaseApp) setState(mode execMode, header cmtproto.Header) {

switch mode {
case execModeCheck:
baseState.ctx = baseState.ctx.WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)
baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices))
app.checkState = baseState

case execModePrepareProposal:
Expand Down Expand Up @@ -656,7 +656,7 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.ctx.
ctx := modeState.Context().
WithTxBytes(txBytes)
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

Expand Down Expand Up @@ -696,7 +696,7 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context

func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error {
if app.preBlocker != nil {
ctx := app.finalizeBlockState.ctx
ctx := app.finalizeBlockState.Context()
rsp, err := app.preBlocker(ctx, req)
if err != nil {
return err
Expand All @@ -708,7 +708,7 @@ func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error {
// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(ctx)
ctx = ctx.WithBlockGasMeter(gasMeter)
app.finalizeBlockState.ctx = ctx
app.finalizeBlockState.SetContext(ctx)
}
}
return nil
Expand All @@ -721,7 +721,7 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock,
)

if app.beginBlocker != nil {
resp, err = app.beginBlocker(app.finalizeBlockState.ctx)
resp, err = app.beginBlocker(app.finalizeBlockState.Context())
if err != nil {
return resp, err
}
Expand Down Expand Up @@ -783,7 +783,7 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) {
var endblock sdk.EndBlock

if app.endBlocker != nil {
eb, err := app.endBlocker(app.finalizeBlockState.ctx)
eb, err := app.endBlocker(app.finalizeBlockState.Context())
if err != nil {
return endblock, err
}
Expand Down
15 changes: 14 additions & 1 deletion baseapp/state.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package baseapp

import (
"sync"

storetypes "cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"
)

type state struct {
ms storetypes.CacheMultiStore
ms storetypes.CacheMultiStore

mtx sync.RWMutex
ctx sdk.Context
}

Expand All @@ -17,7 +21,16 @@ func (st *state) CacheMultiStore() storetypes.CacheMultiStore {
return st.ms.CacheMultiStore()
}

// SetContext updates the state's context to the context provided.
func (st *state) SetContext(ctx sdk.Context) {
st.mtx.Lock()
defer st.mtx.Unlock()
st.ctx = ctx
}

// Context returns the Context of the state.
func (st *state) Context() sdk.Context {
st.mtx.RLock()
defer st.mtx.RUnlock()
return st.ctx
}
Loading