diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 5445d724d..ab6c681b2 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -48,7 +48,6 @@ type deliverTxTask struct { Status status Dependencies map[int]struct{} Abort *occ.Abort - Index int Incarnation int Request types.RequestDeliverTx SdkTx sdk.Tx @@ -181,13 +180,12 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { tasksMap := make(map[int]*deliverTxTask) allTasks := make([]*deliverTxTask, 0, len(reqs)) - for idx, r := range reqs { + for _, r := range reqs { task := &deliverTxTask{ Request: r.Request, SdkTx: r.SdkTx, Checksum: r.Checksum, AbsoluteIndex: r.AbsoluteIndex, - Index: idx, Status: statusPending, Dependencies: map[int]struct{}{}, } @@ -303,7 +301,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t toExecute := tasks for !allValidated(tasks) { - // if the max incarnation >= 5, we should revert to synchronous + // if the max incarnation >= x, we should revert to synchronous if iterations >= maximumIterations { // process synchronously s.synchronous = true @@ -470,7 +468,6 @@ func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) spanCtx, span := s.tracingInfo.StartWithContext(name, ctx.TraceSpanContext()) if task != nil { span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(task.Request.Tx)))) - span.SetAttributes(attribute.Int("txIndex", task.Index)) span.SetAttributes(attribute.Int("absoluteIndex", task.AbsoluteIndex)) span.SetAttributes(attribute.Int("txIncarnation", task.Incarnation)) } diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index 7776179b1..738ea2986 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -324,14 +324,50 @@ func TestProcessAll(t *testing.T) { }, expectedErr: nil, }, + { + name: "Test every tx accesses same key with estimated writesets", + workers: 50, + runs: 1, + addStores: true, + requests: requestListWithEstimatedWritesets(1000), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + + // write to the store with this tx's index + kv.Set(itemKey, req.Tx) + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: val, + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + for idx, response := range res { + if idx == 0 { + require.Equal(t, "", response.Info) + } else { + // the info is what was read from the kv store by the tx + // each tx writes its own index, so the info should be the index of the previous tx + require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info) + } + } + // confirm last write made it to the parent store + latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) + require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest) + }, + expectedErr: nil, + }, { name: "Test every tx accesses same key with delays", workers: 50, runs: 1, addStores: true, requests: requestList(1000), - deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) (response types.ResponseDeliverTx) { - defer abortRecoveryFunc(&response) + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) wait := rand.Intn(10) time.Sleep(time.Duration(wait) * time.Millisecond) // all txs read and write to the same key to maximize conflicts