Skip to content

Commit

Permalink
Fix indexesValidated and PrefillEstimates to operate on absolute idx (#…
Browse files Browse the repository at this point in the history
…454)

This is one component that was missed when refactoring to use absolute
indices for EVM changes. This change refactors such that prefill
estimates will appropriately fill the estimates by absolute Index and
indexes validated will similarly check via absolute indices instead of
relative.

Existing unit tests + loadtesting
  • Loading branch information
udpatil committed Mar 11, 2024
1 parent d9ebd3c commit db786e7
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 19 deletions.
34 changes: 21 additions & 13 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type scheduler struct {
workers int
multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore
tracingInfo *tracing.Info
allTasksMap map[int]*deliverTxTask
allTasks []*deliverTxTask
executeCh chan func()
validateCh chan func()
Expand Down Expand Up @@ -177,20 +178,23 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) {
return valid, conflicts
}

func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask {
res := make([]*deliverTxTask, 0, len(reqs))
func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) {
tasksMap := make(map[int]*deliverTxTask)
allTasks := make([]*deliverTxTask, 0, len(reqs))
for idx, r := range reqs {
res = append(res, &deliverTxTask{
task := &deliverTxTask{
Request: r.Request,
SdkTx: r.SdkTx,
Checksum: r.Checksum,
AbsoluteIndex: r.AbsoluteIndex,
Index: idx,
Status: statusPending,
Dependencies: map[int]struct{}{},
})
}
tasksMap[r.AbsoluteIndex] = task
allTasks = append(allTasks, task)
}
return res
return allTasks, tasksMap
}

func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx {
Expand All @@ -213,9 +217,11 @@ func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) {
s.multiVersionStores = mvs
}

func dependenciesValidated(tasks []*deliverTxTask, deps map[int]struct{}) bool {
func dependenciesValidated(tasksMap map[int]*deliverTxTask, deps map[int]struct{}) bool {
for i := range deps {
if !tasks[i].IsStatus(statusValidated) {
// because idx contains absoluteIndices, we need to fetch from map
task := tasksMap[i]
if !task.IsStatus(statusValidated) {
return false
}
}
Expand Down Expand Up @@ -243,12 +249,12 @@ func allValidated(tasks []*deliverTxTask) bool {

func (s *scheduler) PrefillEstimates(reqs []*sdk.DeliverTxEntry) {
// iterate over TXs, update estimated writesets where applicable
for i, req := range reqs {
for _, req := range reqs {
mappedWritesets := req.EstimatedWritesets
// order shouldnt matter for storeKeys because each storeKey partitioned MVS is independent
for storeKey, writeset := range mappedWritesets {
// we use `-1` to indicate a prefill incarnation
s.multiVersionStores[storeKey].SetEstimatedWriteset(i, -1, writeset)
s.multiVersionStores[storeKey].SetEstimatedWriteset(req.AbsoluteIndex, -1, writeset)
}
}
}
Expand All @@ -270,9 +276,11 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
// initialize mutli-version stores if they haven't been initialized yet
s.tryInitMultiVersionStore(ctx)
// prefill estimates
s.PrefillEstimates(reqs)
tasks := toTasks(reqs)
// This "optimization" path is being disabled because we don't have a strong reason to have it given that it
// s.PrefillEstimates(reqs)
tasks, tasksMap := toTasks(reqs)
s.allTasks = tasks
s.allTasksMap = tasksMap
s.executeCh = make(chan func(), len(tasks))
s.validateCh = make(chan func(), len(tasks))
defer s.emitMetrics()
Expand Down Expand Up @@ -346,7 +354,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
task.AppendDependencies(conflicts)

// if the conflicts are now validated, then rerun this task
if dependenciesValidated(s.allTasks, task.Dependencies) {
if dependenciesValidated(s.allTasksMap, task.Dependencies) {
return true
} else {
// otherwise, wait for completion
Expand All @@ -363,7 +371,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool {

case statusWaiting:
// if conflicts are done, then this task is ready to run again
return dependenciesValidated(s.allTasks, task.Dependencies)
return dependenciesValidated(s.allTasksMap, task.Dependencies)
}
panic("unexpected status: " + task.Status)
}
Expand Down
68 changes: 62 additions & 6 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/multiversion"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/occ"
Expand Down Expand Up @@ -56,6 +57,25 @@ func abortRecoveryFunc(response *types.ResponseDeliverTx) {
}
}

func requestListWithEstimatedWritesets(n int) []*sdk.DeliverTxEntry {
tasks := make([]*sdk.DeliverTxEntry, n)
for i := 0; i < n; i++ {
tasks[i] = &sdk.DeliverTxEntry{
Request: types.RequestDeliverTx{
Tx: []byte(fmt.Sprintf("%d", i)),
},
AbsoluteIndex: i,
EstimatedWritesets: sdk.MappedWritesets{
testStoreKey: multiversion.WriteSet{
string(itemKey): []byte("foo"),
},
},
}

}
return tasks
}

func initTestCtx(injectStores bool) sdk.Context {
ctx := sdk.Context{}.WithContext(context.Background())
keys := make(map[string]sdk.StoreKey)
Expand Down Expand Up @@ -117,7 +137,7 @@ func TestProcessAll(t *testing.T) {
}
},
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&response)
defer abortRecoveryFunc(&res)
kv := ctx.MultiStore().GetKVStore(testStoreKey)
if ctx.TxIndex()%2 == 0 {
// For even-indexed transactions, write to the store
Expand Down Expand Up @@ -158,7 +178,7 @@ func TestProcessAll(t *testing.T) {
addStores: true,
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&response)
defer abortRecoveryFunc(&res)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)

Expand Down Expand Up @@ -190,7 +210,43 @@ func TestProcessAll(t *testing.T) {
addStores: true,
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&response)
defer abortRecoveryFunc(&res)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))

// write to the store with this tx's index
kv.Set(itemKey, req.Tx)

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: val,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
if idx == 0 {
require.Equal(t, "", response.Info)
} else {
// the info is what was read from the kv store by the tx
// each tx writes its own index, so the info should be the index of the previous tx
require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info)
}
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest)
},
expectedErr: nil,
},
{
name: "Test every tx accesses same key with estimated writesets",
workers: 50,
runs: 1,
addStores: true,
requests: requestListWithEstimatedWritesets(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))
Expand Down Expand Up @@ -225,8 +281,8 @@ func TestProcessAll(t *testing.T) {
runs: 1,
addStores: true,
requests: requestList(2000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) types.ResponseDeliverTx {
defer abortRecoveryFunc(&response)
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
if ctx.TxIndex()%10 != 0 {
return types.ResponseDeliverTx{
Info: "none",
Expand Down Expand Up @@ -254,7 +310,7 @@ func TestProcessAll(t *testing.T) {
addStores: false,
requests: requestList(10),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&response)
defer abortRecoveryFunc(&res)
return types.ResponseDeliverTx{
Info: fmt.Sprintf("%d", ctx.TxIndex()),
}
Expand Down

0 comments on commit db786e7

Please sign in to comment.