diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 58235ad1c..22016f7a1 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -101,6 +101,7 @@ type scheduler struct { workers int multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore tracingInfo *tracing.Info + allTasksMap map[int]*deliverTxTask allTasks []*deliverTxTask executeCh chan func() validateCh chan func() @@ -177,10 +178,11 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { return valid, conflicts } -func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { - res := make([]*deliverTxTask, 0, len(reqs)) +func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { + tasksMap := make(map[int]*deliverTxTask) + allTasks := make([]*deliverTxTask, 0, len(reqs)) for idx, r := range reqs { - res = append(res, &deliverTxTask{ + task := &deliverTxTask{ Request: r.Request, SdkTx: r.SdkTx, Checksum: r.Checksum, @@ -188,9 +190,11 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { Index: idx, Status: statusPending, Dependencies: map[int]struct{}{}, - }) + } + tasksMap[r.AbsoluteIndex] = task + allTasks = append(allTasks, task) } - return res + return allTasks, tasksMap } func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { @@ -213,9 +217,11 @@ func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) { s.multiVersionStores = mvs } -func dependenciesValidated(tasks []*deliverTxTask, deps map[int]struct{}) bool { +func dependenciesValidated(tasksMap map[int]*deliverTxTask, deps map[int]struct{}) bool { for i := range deps { - if !tasks[i].IsStatus(statusValidated) { + // because idx contains absoluteIndices, we need to fetch from map + task := tasksMap[i] + if !task.IsStatus(statusValidated) { return false } } @@ -243,12 +249,12 @@ func allValidated(tasks []*deliverTxTask) bool { func (s *scheduler) PrefillEstimates(reqs []*sdk.DeliverTxEntry) { // iterate over TXs, update estimated writesets where applicable - for i, req := range reqs { + for _, req := range reqs { mappedWritesets := req.EstimatedWritesets // order shouldnt matter for storeKeys because each storeKey partitioned MVS is independent for storeKey, writeset := range mappedWritesets { // we use `-1` to indicate a prefill incarnation - s.multiVersionStores[storeKey].SetEstimatedWriteset(i, -1, writeset) + s.multiVersionStores[storeKey].SetEstimatedWriteset(req.AbsoluteIndex, -1, writeset) } } } @@ -270,9 +276,11 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t // initialize mutli-version stores if they haven't been initialized yet s.tryInitMultiVersionStore(ctx) // prefill estimates - s.PrefillEstimates(reqs) - tasks := toTasks(reqs) + // This "optimization" path is being disabled because we don't have a strong reason to have it given that it + // s.PrefillEstimates(reqs) + tasks, tasksMap := toTasks(reqs) s.allTasks = tasks + s.allTasksMap = tasksMap s.executeCh = make(chan func(), len(tasks)) s.validateCh = make(chan func(), len(tasks)) defer s.emitMetrics() @@ -346,7 +354,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { task.AppendDependencies(conflicts) // if the conflicts are now validated, then rerun this task - if dependenciesValidated(s.allTasks, task.Dependencies) { + if dependenciesValidated(s.allTasksMap, task.Dependencies) { return true } else { // otherwise, wait for completion @@ -363,7 +371,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { case statusWaiting: // if conflicts are done, then this task is ready to run again - return dependenciesValidated(s.allTasks, task.Dependencies) + return dependenciesValidated(s.allTasksMap, task.Dependencies) } panic("unexpected status: " + task.Status) } diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index 0487c2a12..63231d1ef 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -18,6 +18,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachekv" "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/multiversion" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/cosmos/cosmos-sdk/types/occ" @@ -56,6 +57,25 @@ func abortRecoveryFunc(response *types.ResponseDeliverTx) { } } +func requestListWithEstimatedWritesets(n int) []*sdk.DeliverTxEntry { + tasks := make([]*sdk.DeliverTxEntry, n) + for i := 0; i < n; i++ { + tasks[i] = &sdk.DeliverTxEntry{ + Request: types.RequestDeliverTx{ + Tx: []byte(fmt.Sprintf("%d", i)), + }, + AbsoluteIndex: i, + EstimatedWritesets: sdk.MappedWritesets{ + testStoreKey: multiversion.WriteSet{ + string(itemKey): []byte("foo"), + }, + }, + } + + } + return tasks +} + func initTestCtx(injectStores bool) sdk.Context { ctx := sdk.Context{}.WithContext(context.Background()) keys := make(map[string]sdk.StoreKey) @@ -117,7 +137,7 @@ func TestProcessAll(t *testing.T) { } }, deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) kv := ctx.MultiStore().GetKVStore(testStoreKey) if ctx.TxIndex()%2 == 0 { // For even-indexed transactions, write to the store @@ -158,7 +178,7 @@ func TestProcessAll(t *testing.T) { addStores: true, requests: requestList(1000), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) @@ -190,7 +210,43 @@ func TestProcessAll(t *testing.T) { addStores: true, requests: requestList(1000), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + + // write to the store with this tx's index + kv.Set(itemKey, req.Tx) + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: val, + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + for idx, response := range res { + if idx == 0 { + require.Equal(t, "", response.Info) + } else { + // the info is what was read from the kv store by the tx + // each tx writes its own index, so the info should be the index of the previous tx + require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info) + } + } + // confirm last write made it to the parent store + latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) + require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest) + }, + expectedErr: nil, + }, + { + name: "Test every tx accesses same key with estimated writesets", + workers: 50, + runs: 1, + addStores: true, + requests: requestListWithEstimatedWritesets(1000), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) val := string(kv.Get(itemKey)) @@ -225,8 +281,8 @@ func TestProcessAll(t *testing.T) { runs: 1, addStores: true, requests: requestList(2000), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) types.ResponseDeliverTx { - defer abortRecoveryFunc(&response) + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) if ctx.TxIndex()%10 != 0 { return types.ResponseDeliverTx{ Info: "none", @@ -254,7 +310,7 @@ func TestProcessAll(t *testing.T) { addStores: false, requests: requestList(10), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) return types.ResponseDeliverTx{ Info: fmt.Sprintf("%d", ctx.TxIndex()), }