diff --git a/baseapp/abci.go b/baseapp/abci.go index 6ff92ffab..8ebdc39df 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -250,15 +250,13 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc // DeliverTxBatch executes multiple txs func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) { - reqList := make([]abci.RequestDeliverTx, 0, len(req.TxEntries)) - for _, tx := range req.TxEntries { - reqList = append(reqList, tx.Request) - } - scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.DeliverTx) - txRes, err := scheduler.ProcessAll(ctx, reqList) + // This will basically no-op the actual prefill if the metadata for the txs is empty + + // process all txs, this will also initializes the MVS if prefill estimates was disabled + txRes, err := scheduler.ProcessAll(ctx, req.TxEntries) if err != nil { - //TODO: handle error + // TODO: handle error } responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries)) diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 12179295e..095deb545 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -57,7 +57,7 @@ func (dt *deliverTxTask) Increment() { // Scheduler processes tasks concurrently type Scheduler interface { - ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) + ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) } type scheduler struct { @@ -99,11 +99,11 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { return valid, conflicts } -func toTasks(reqs []types.RequestDeliverTx) []*deliverTxTask { +func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { res := make([]*deliverTxTask, 0, len(reqs)) for idx, r := range reqs { res = append(res, &deliverTxTask{ - Request: r, + Request: r.Request, Index: idx, Status: statusPending, }) @@ -119,7 +119,10 @@ func collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { return res } -func (s *scheduler) initMultiVersionStore(ctx sdk.Context) { +func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) { + if s.multiVersionStores != nil { + return + } mvs := make(map[sdk.StoreKey]multiversion.MultiVersionStore) keys := ctx.MultiStore().StoreKeys() for _, sk := range keys { @@ -146,8 +149,23 @@ func allValidated(tasks []*deliverTxTask) bool { return true } -func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) { - s.initMultiVersionStore(ctx) +func (s *scheduler) PrefillEstimates(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) { + // iterate over TXs, update estimated writesets where applicable + for i, req := range reqs { + mappedWritesets := req.EstimatedWritesets + // order shouldnt matter for storeKeys because each storeKey partitioned MVS is independent + for storeKey, writeset := range mappedWritesets { + // we use `-1` to indicate a prefill incarnation + s.multiVersionStores[storeKey].SetEstimatedWriteset(i, -1, writeset) + } + } +} + +func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) { + // initialize mutli-version stores if they haven't been initialized yet + s.tryInitMultiVersionStore(ctx) + // prefill estimates + s.PrefillEstimates(ctx, reqs) tasks := toTasks(reqs) toExecute := tasks for !allValidated(tasks) { diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index a2c861f44..5cf2be6ba 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -4,9 +4,10 @@ import ( "context" "errors" "fmt" - "github.com/cosmos/cosmos-sdk/store/cachemulti" "testing" + "github.com/cosmos/cosmos-sdk/store/cachemulti" + "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/types" dbm "github.com/tendermint/tm-db" @@ -21,12 +22,15 @@ type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) types.R var testStoreKey = sdk.NewKVStoreKey("mock") var itemKey = []byte("key") -func requestList(n int) []types.RequestDeliverTx { - tasks := make([]types.RequestDeliverTx, n) +func requestList(n int) []*sdk.DeliverTxEntry { + tasks := make([]*sdk.DeliverTxEntry, n) for i := 0; i < n; i++ { - tasks[i] = types.RequestDeliverTx{ - Tx: []byte(fmt.Sprintf("%d", i)), + tasks[i] = &sdk.DeliverTxEntry{ + Request: types.RequestDeliverTx{ + Tx: []byte(fmt.Sprintf("%d", i)), + }, } + } return tasks } @@ -51,7 +55,7 @@ func TestProcessAll(t *testing.T) { name string workers int runs int - requests []types.RequestDeliverTx + requests []*sdk.DeliverTxEntry deliverTxFunc mockDeliverTxFunc addStores bool expectedErr error diff --git a/types/tx_batch.go b/types/tx_batch.go index a54742fae..b053aa5fa 100644 --- a/types/tx_batch.go +++ b/types/tx_batch.go @@ -1,13 +1,20 @@ package types -import abci "github.com/tendermint/tendermint/abci/types" +import ( + "github.com/cosmos/cosmos-sdk/store/multiversion" + abci "github.com/tendermint/tendermint/abci/types" +) // DeliverTxEntry represents an individual transaction's request within a batch. // This can be extended to include tx-level tracing or metadata type DeliverTxEntry struct { - Request abci.RequestDeliverTx + Request abci.RequestDeliverTx + EstimatedWritesets MappedWritesets } +// EstimatedWritesets represents an estimated writeset for a transaction mapped by storekey to the writeset estimate. +type MappedWritesets map[StoreKey]multiversion.WriteSet + // DeliverTxBatchRequest represents a request object for a batch of transactions. // This can be extended to include request-level tracing or metadata type DeliverTxBatchRequest struct {