diff --git a/tasks/scheduler.go b/tasks/scheduler.go index bc83a1f62..7d840c60d 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -100,6 +100,7 @@ type scheduler struct { workers int multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore tracingInfo *tracing.Info + allTasksMap map[int]*deliverTxTask allTasks []*deliverTxTask executeCh chan func() validateCh chan func() @@ -176,10 +177,11 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { return valid, conflicts } -func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { - res := make([]*deliverTxTask, 0, len(reqs)) +func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { + tasksMap := make(map[int]*deliverTxTask) + allTasks := make([]*deliverTxTask, 0, len(reqs)) for idx, r := range reqs { - res = append(res, &deliverTxTask{ + task := &deliverTxTask{ Request: r.Request, SdkTx: r.SdkTx, Checksum: r.Checksum, @@ -187,9 +189,11 @@ func toTasks(reqs []*sdk.DeliverTxEntry) []*deliverTxTask { Index: idx, Status: statusPending, Dependencies: map[int]struct{}{}, - }) + } + tasksMap[r.AbsoluteIndex] = task + allTasks = append(allTasks, task) } - return res + return allTasks, tasksMap } func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { @@ -212,9 +216,11 @@ func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) { s.multiVersionStores = mvs } -func dependenciesValidated(tasks []*deliverTxTask, deps map[int]struct{}) bool { +func dependenciesValidated(tasksMap map[int]*deliverTxTask, deps map[int]struct{}) bool { for i := range deps { - if !tasks[i].IsStatus(statusValidated) { + // because idx contains absoluteIndices, we need to fetch from map + task := tasksMap[i] + if !task.IsStatus(statusValidated) { return false } } @@ -242,12 +248,12 @@ func allValidated(tasks []*deliverTxTask) bool { func (s *scheduler) PrefillEstimates(reqs []*sdk.DeliverTxEntry) { // iterate over TXs, update estimated writesets where applicable - for i, req := range reqs { + for _, req := range reqs { mappedWritesets := req.EstimatedWritesets // order shouldnt matter for storeKeys because each storeKey partitioned MVS is independent for storeKey, writeset := range mappedWritesets { // we use `-1` to indicate a prefill incarnation - s.multiVersionStores[storeKey].SetEstimatedWriteset(i, -1, writeset) + s.multiVersionStores[storeKey].SetEstimatedWriteset(req.AbsoluteIndex, -1, writeset) } } } @@ -270,9 +276,11 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t // initialize mutli-version stores if they haven't been initialized yet s.tryInitMultiVersionStore(ctx) // prefill estimates - s.PrefillEstimates(reqs) - tasks := toTasks(reqs) + // This "optimization" path is being disabled because we don't have a strong reason to have it given that it + // s.PrefillEstimates(reqs) + tasks, tasksMap := toTasks(reqs) s.allTasks = tasks + s.allTasksMap = tasksMap s.executeCh = make(chan func(), len(tasks)) s.validateCh = make(chan func(), len(tasks)) defer s.emitMetrics() @@ -348,7 +356,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { task.AppendDependencies(conflicts) // if the conflicts are now validated, then rerun this task - if dependenciesValidated(s.allTasks, task.Dependencies) { + if dependenciesValidated(s.allTasksMap, task.Dependencies) { return true } else { // otherwise, wait for completion @@ -365,7 +373,7 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { case statusWaiting: // if conflicts are done, then this task is ready to run again - return dependenciesValidated(s.allTasks, task.Dependencies) + return dependenciesValidated(s.allTasksMap, task.Dependencies) } panic("unexpected status: " + task.Status) } diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index f6358f0c3..e520a06df 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -21,6 +21,7 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachekv" "github.com/cosmos/cosmos-sdk/store/cachemulti" "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/multiversion" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/occ" "github.com/cosmos/cosmos-sdk/utils/tracing" @@ -57,6 +58,25 @@ func abortRecoveryFunc(response *types.ResponseDeliverTx) { } } +func requestListWithEstimatedWritesets(n int) []*sdk.DeliverTxEntry { + tasks := make([]*sdk.DeliverTxEntry, n) + for i := 0; i < n; i++ { + tasks[i] = &sdk.DeliverTxEntry{ + Request: types.RequestDeliverTx{ + Tx: []byte(fmt.Sprintf("%d", i)), + }, + AbsoluteIndex: i, + EstimatedWritesets: sdk.MappedWritesets{ + testStoreKey: multiversion.WriteSet{ + string(itemKey): []byte("foo"), + }, + }, + } + + } + return tasks +} + func initTestCtx(injectStores bool) sdk.Context { ctx := sdk.Context{}.WithContext(context.Background()) keys := make(map[string]sdk.StoreKey) @@ -119,7 +139,7 @@ func TestProcessAll(t *testing.T) { } }, deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) kv := ctx.MultiStore().GetKVStore(testStoreKey) if ctx.TxIndex()%2 == 0 { // For even-indexed transactions, write to the store @@ -160,7 +180,7 @@ func TestProcessAll(t *testing.T) { addStores: true, requests: requestList(1000), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) @@ -192,7 +212,43 @@ func TestProcessAll(t *testing.T) { addStores: true, requests: requestList(1000), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + + // write to the store with this tx's index + kv.Set(itemKey, req.Tx) + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: val, + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + for idx, response := range res { + if idx == 0 { + require.Equal(t, "", response.Info) + } else { + // the info is what was read from the kv store by the tx + // each tx writes its own index, so the info should be the index of the previous tx + require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info) + } + } + // confirm last write made it to the parent store + latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) + require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest) + }, + expectedErr: nil, + }, + { + name: "Test every tx accesses same key with estimated writesets", + workers: 50, + runs: 1, + addStores: true, + requests: requestListWithEstimatedWritesets(1000), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) val := string(kv.Get(itemKey)) @@ -227,8 +283,8 @@ func TestProcessAll(t *testing.T) { runs: 1, addStores: true, requests: requestList(2000), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) types.ResponseDeliverTx { - defer abortRecoveryFunc(&response) + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) if ctx.TxIndex()%10 != 0 { return types.ResponseDeliverTx{ Info: "none", @@ -256,7 +312,7 @@ func TestProcessAll(t *testing.T) { addStores: false, requests: requestList(10), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + defer abortRecoveryFunc(&res) return types.ResponseDeliverTx{ Info: fmt.Sprintf("%d", ctx.TxIndex()), }