From db786e77582a2fff6bbc88db489b6d1150c1083f Mon Sep 17 00:00:00 2001 From: Uday Patil Date: Mon, 11 Mar 2024 11:26:53 -0500 Subject: [PATCH] Fix indexesValidated and PrefillEstimates to operate on absolute idx (#454) This is one component that was missed when refactoring to use absolute indices for EVM changes. This change refactors such that prefill estimates will appropriately fill the estimates by absolute Index and indexes validated will similarly check via absolute indices instead of relative. Existing unit tests + loadtesting --- tasks/scheduler.go | 34 +++++++++++++-------- tasks/scheduler_test.go | 68 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 83 insertions(+), 19 deletions(-) diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 58235ad1c..22016f7a1 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -101,6 +101,7 @@ type scheduler struct { workers int multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore tracingInfo *tracing.Info + allTasksMap map[int]*deliverTxTask allTasks []*deliverTxTask executeCh chan func() validateCh chan func() @@ -177,10 +178,11 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { return valid, conflicts } -func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { - res := make([]*deliverTxTask, 0, len(reqs)) +func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { + tasksMap := make(map[int]*deliverTxTask) + allTasks := make([]*deliverTxTask, 0, len(reqs)) for idx, r := range reqs { - res = append(res, &deliverTxTask{ + task := &deliverTxTask{ Request: r.Request, SdkTx: r.SdkTx, Checksum: r.Checksum, @@ -188,9 +190,11 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { Index: idx, Status: statusPending, Dependencies: map[int]struct{}{}, - }) + } + tasksMap[r.AbsoluteIndex] = task + allTasks = append(allTasks, task) } - return res + return allTasks, tasksMap } func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { @@ -213,9 +217,11 @@ func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) { s.multiVersionStores = mvs } -func dependenciesValidated(tasks []*deliverTxTask, deps map[int]struct{}) bool { +func dependenciesValidated(tasksMap map[int]*deliverTxTask, deps map[int]struct{}) bool { for i := range deps { - if !tasks[i].IsStatus(statusValidated) { + // because idx contains absoluteIndices, we need to fetch from map + task := tasksMap[i] + if !task.IsStatus(statusValidated) { return false } } @@ -243,12 +249,12 @@ func allValidated(tasks []*deliverTxTask) bool { func (s *scheduler) PrefillEstimates(reqs []*sdk.DeliverTxEntry) { // iterate over TXs, update estimated writesets where applicable - for i, req := range reqs { + for _, req := range reqs { mappedWritesets := req.EstimatedWritesets // order shouldnt matter for storeKeys because each storeKey partitioned MVS is independent for storeKey, writeset := range mappedWritesets { // we use `-1` to indicate a prefill incarnation - s.multiVersionStores[storeKey].SetEstimatedWriteset(i, -1, writeset) + s.multiVersionStores[storeKey].SetEstimatedWriteset(req.AbsoluteIndex, -1, writeset) } } } @@ -270,9 +276,11 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t // initialize mutli-version stores if they haven't been initialized yet s.tryInitMultiVersionStore(ctx) // prefill estimates - s.PrefillEstimates(reqs) - tasks := toTasks(reqs) + // This "optimization" path is being disabled because we don't have a strong reason to have it given that it + // s.PrefillEstimates(reqs) + tasks, tasksMap := toTasks(reqs) s.allTasks = tasks + s.allTasksMap = tasksMap s.executeCh = make(chan func(), len(tasks)) s.validateCh = make(chan func(), len(tasks)) defer s.emitMetrics() @@ -346,7 +354,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { task.AppendDependencies(conflicts) // if the conflicts are now validated, then rerun this task - if dependenciesValidated(s.allTasks, task.Dependencies) { + if dependenciesValidated(s.allTasksMap, task.Dependencies) { return true } else { // otherwise, wait for completion @@ -363,7 +371,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { case statusWaiting: // if conflicts are done, then this task is ready to run again - return dependenciesValidated(s.allTasks, task.Dependencies) + return dependenciesValidated(s.allTasksMap, task.Dependencies) } panic("unexpected status: " + task.Status) } diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index 0487c2a12..63231d1ef 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -18,6 +18,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachekv" "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/multiversion" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/cosmos/cosmos-sdk/types/occ" @@ -56,6 +57,25 @@ func abortRecoveryFunc(response *types.ResponseDeliverTx) { } } +func requestListWithEstimatedWritesets(n int) []*sdk.DeliverTxEntry { + tasks := make([]*sdk.DeliverTxEntry, n) + for i := 0; i < n; i++ { + tasks[i] = &sdk.DeliverTxEntry{ + Request: types.RequestDeliverTx{ + Tx: []byte(fmt.Sprintf("%d", i)), + }, + AbsoluteIndex: i, + EstimatedWritesets: sdk.MappedWritesets{ + testStoreKey: multiversion.WriteSet{ + string(itemKey): []byte("foo"), + }, + }, + } + + } + return tasks +} + func initTestCtx(injectStores bool) sdk.Context { ctx := sdk.Context{}.WithContext(context.Background()) keys := make(map[string]sdk.StoreKey) @@ -117,7 +137,7 @@ func TestProcessAll(t *testing.T) { } }, deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) kv := ctx.MultiStore().GetKVStore(testStoreKey) if ctx.TxIndex()%2 == 0 { // For even-indexed transactions, write to the store @@ -158,7 +178,7 @@ func TestProcessAll(t *testing.T) { addStores: true, requests: requestList(1000), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) @@ -190,7 +210,43 @@ func TestProcessAll(t *testing.T) { addStores: true, requests: requestList(1000), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + + // write to the store with this tx's index + kv.Set(itemKey, req.Tx) + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: val, + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + for idx, response := range res { + if idx == 0 { + require.Equal(t, "", response.Info) + } else { + // the info is what was read from the kv store by the tx + // each tx writes its own index, so the info should be the index of the previous tx + require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info) + } + } + // confirm last write made it to the parent store + latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) + require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest) + }, + expectedErr: nil, + }, + { + name: "Test every tx accesses same key with estimated writesets", + workers: 50, + runs: 1, + addStores: true, + requests: requestListWithEstimatedWritesets(1000), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) val := string(kv.Get(itemKey)) @@ -225,8 +281,8 @@ func TestProcessAll(t *testing.T) { runs: 1, addStores: true, requests: requestList(2000), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) types.ResponseDeliverTx { - defer abortRecoveryFunc(&response) + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) if ctx.TxIndex()%10 != 0 { return types.ResponseDeliverTx{ Info: "none", @@ -254,7 +310,7 @@ func TestProcessAll(t *testing.T) { addStores: false, requests: requestList(10), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) return types.ResponseDeliverTx{ Info: fmt.Sprintf("%d", ctx.TxIndex()), }