Skip to content

Commit

Permalink
[occ] Add struct field and helpers for estimate prefills (#341)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
This adds in the ability to prefill estimates based on metadata passed
along with deliverTxBatch

## Testing performed to validate your change
Unit Test to verify that multiversion store initialization is now
idempotent, and works properly regardless of whether estimate prefill is
enabled
  • Loading branch information
udpatil authored Oct 24, 2023
1 parent 0b9193c commit 27484e4
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 22 deletions.
15 changes: 7 additions & 8 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/cosmos/cosmos-sdk/tasks"
"os"
"sort"
"strings"
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/tasks"

"github.com/armon/go-metrics"
"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -237,15 +238,13 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc

// DeliverTxBatch executes multiple txs
func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) {
reqList := make([]abci.RequestDeliverTx, 0, len(req.TxEntries))
for _, tx := range req.TxEntries {
reqList = append(reqList, tx.Request)
}

scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.DeliverTx)
txRes, err := scheduler.ProcessAll(ctx, reqList)
// This will basically no-op the actual prefill if the metadata for the txs is empty

// process all txs, this will also initializes the MVS if prefill estimates was disabled
txRes, err := scheduler.ProcessAll(ctx, req.TxEntries)
if err != nil {
//TODO: handle error
// TODO: handle error
}

responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
Expand Down
30 changes: 24 additions & 6 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (dt *deliverTxTask) Increment() {

// Scheduler processes tasks concurrently
type Scheduler interface {
ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error)
ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error)
}

type scheduler struct {
Expand Down Expand Up @@ -99,11 +99,11 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) {
return valid, conflicts
}

func toTasks(reqs []types.RequestDeliverTx) []*deliverTxTask {
func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask {
res := make([]*deliverTxTask, 0, len(reqs))
for idx, r := range reqs {
res = append(res, &deliverTxTask{
Request: r,
Request: r.Request,
Index: idx,
Status: statusPending,
})
Expand All @@ -119,7 +119,10 @@ func collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx {
return res
}

func (s *scheduler) initMultiVersionStore(ctx sdk.Context) {
func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) {
if s.multiVersionStores != nil {
return
}
mvs := make(map[sdk.StoreKey]multiversion.MultiVersionStore)
keys := ctx.MultiStore().StoreKeys()
for _, sk := range keys {
Expand All @@ -146,8 +149,23 @@ func allValidated(tasks []*deliverTxTask) bool {
return true
}

func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) {
s.initMultiVersionStore(ctx)
func (s *scheduler) PrefillEstimates(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) {
// iterate over TXs, update estimated writesets where applicable
for i, req := range reqs {
mappedWritesets := req.EstimatedWritesets
// order shouldnt matter for storeKeys because each storeKey partitioned MVS is independent
for storeKey, writeset := range mappedWritesets {
// we use `-1` to indicate a prefill incarnation
s.multiVersionStores[storeKey].SetEstimatedWriteset(i, -1, writeset)
}
}
}

func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) {
// initialize mutli-version stores if they haven't been initialized yet
s.tryInitMultiVersionStore(ctx)
// prefill estimates
s.PrefillEstimates(ctx, reqs)
tasks := toTasks(reqs)
toExecute := tasks
for !allValidated(tasks) {
Expand Down
16 changes: 10 additions & 6 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"errors"
"fmt"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"testing"

"github.com/cosmos/cosmos-sdk/store/cachemulti"

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/types"
dbm "github.com/tendermint/tm-db"
Expand All @@ -21,12 +22,15 @@ type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) types.R
var testStoreKey = sdk.NewKVStoreKey("mock")
var itemKey = []byte("key")

func requestList(n int) []types.RequestDeliverTx {
tasks := make([]types.RequestDeliverTx, n)
func requestList(n int) []*sdk.DeliverTxEntry {
tasks := make([]*sdk.DeliverTxEntry, n)
for i := 0; i < n; i++ {
tasks[i] = types.RequestDeliverTx{
Tx: []byte(fmt.Sprintf("%d", i)),
tasks[i] = &sdk.DeliverTxEntry{
Request: types.RequestDeliverTx{
Tx: []byte(fmt.Sprintf("%d", i)),
},
}

}
return tasks
}
Expand All @@ -51,7 +55,7 @@ func TestProcessAll(t *testing.T) {
name string
workers int
runs int
requests []types.RequestDeliverTx
requests []*sdk.DeliverTxEntry
deliverTxFunc mockDeliverTxFunc
addStores bool
expectedErr error
Expand Down
11 changes: 9 additions & 2 deletions types/tx_batch.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package types

import abci "github.com/tendermint/tendermint/abci/types"
import (
"github.com/cosmos/cosmos-sdk/store/multiversion"
abci "github.com/tendermint/tendermint/abci/types"
)

// DeliverTxEntry represents an individual transaction's request within a batch.
// This can be extended to include tx-level tracing or metadata
type DeliverTxEntry struct {
Request abci.RequestDeliverTx
Request abci.RequestDeliverTx
EstimatedWritesets MappedWritesets
}

// EstimatedWritesets represents an estimated writeset for a transaction mapped by storekey to the writeset estimate.
type MappedWritesets map[StoreKey]multiversion.WriteSet

// DeliverTxBatchRequest represents a request object for a batch of transactions.
// This can be extended to include request-level tracing or metadata
type DeliverTxBatchRequest struct {
Expand Down

0 comments on commit 27484e4

Please sign in to comment.