diff --git a/store/multiversion/store.go b/store/multiversion/store.go index e8e7a7156..693be21ec 100644 --- a/store/multiversion/store.go +++ b/store/multiversion/store.go @@ -364,9 +364,11 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { if value != nil { // conflict // TODO: would we want to return early? + conflictSet[latestValue.Index()] = struct{}{} valid = false } } else if !bytes.Equal(latestValue.Value(), value) { + conflictSet[latestValue.Index()] = struct{}{} valid = false } } diff --git a/store/multiversion/store_test.go b/store/multiversion/store_test.go index 3b35e7702..8b989b37c 100644 --- a/store/multiversion/store_test.go +++ b/store/multiversion/store_test.go @@ -207,20 +207,20 @@ func TestMultiVersionStoreValidateState(t *testing.T) { "key3": []byte("value6"), }) - // expect failure with empty conflicts + // expect failure with conflict of tx 2 valid, conflicts = mvs.ValidateTransactionState(5) require.False(t, valid) - require.Empty(t, conflicts) + require.Equal(t, []int{2}, conflicts) // add a conflict due to deletion mvs.SetWriteset(3, 1, map[string][]byte{ "key1": nil, }) - // expect failure with empty conflicts + // expect failure with conflict of tx 2 and 3 valid, conflicts = mvs.ValidateTransactionState(5) require.False(t, valid) - require.Empty(t, conflicts) + require.Equal(t, []int{2, 3}, conflicts) // add a conflict due to estimate mvs.SetEstimatedWriteset(4, 1, map[string][]byte{ @@ -230,7 +230,7 @@ func TestMultiVersionStoreValidateState(t *testing.T) { // expect index 4 to be returned valid, conflicts = mvs.ValidateTransactionState(5) require.False(t, valid) - require.Equal(t, []int{4}, conflicts) + require.Equal(t, []int{2, 3, 4}, conflicts) } func TestMultiVersionStoreParentValidationMismatch(t *testing.T) { @@ -436,10 +436,10 @@ func TestMVSIteratorValidationWithKeySwitch(t *testing.T) { writeset2["key3"] = []byte("valueX") mvs.SetWriteset(2, 2, writeset2) - // should be invalid + // should be invalid with conflict of 2 valid, conflicts := mvs.ValidateTransactionState(5) require.False(t, valid) - require.Empty(t, conflicts) + require.Equal(t, []int{2}, conflicts) } func TestMVSIteratorValidationWithKeyAdded(t *testing.T) { diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 22016f7a1..de5119e8c 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -36,8 +36,8 @@ const ( statusValidated status = "validated" // statusWaiting tasks are waiting for another tx to complete statusWaiting status = "waiting" - // maximumIncarnation before we revert to sequential (for high conflict rates) - maximumIncarnation = 5 + // maximumIterations before we revert to sequential (for high conflict rates) + maximumIterations = 10 ) type deliverTxTask struct { @@ -48,7 +48,6 @@ type deliverTxTask struct { Status status Dependencies map[int]struct{} Abort *occ.Abort - Index int Incarnation int Request types.RequestDeliverTx SdkTx sdk.Tx @@ -181,13 +180,12 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { tasksMap := make(map[int]*deliverTxTask) allTasks := make([]*deliverTxTask, 0, len(reqs)) - for idx, r := range reqs { + for _, r := range reqs { task := &deliverTxTask{ Request: r.Request, SdkTx: r.SdkTx, Checksum: r.Checksum, AbsoluteIndex: r.AbsoluteIndex, - Index: idx, Status: statusPending, Dependencies: map[int]struct{}{}, } @@ -273,6 +271,8 @@ func (s *scheduler) emitMetrics() { } func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) { + var iterations int + // initialize mutli-version stores if they haven't been initialized yet s.tryInitMultiVersionStore(ctx) // prefill estimates @@ -302,35 +302,33 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t toExecute := tasks for !allValidated(tasks) { - // if the max incarnation >= 5, we should revert to synchronous - if s.maxIncarnation >= maximumIncarnation { + // if the max incarnation >= x, we should revert to synchronous + if iterations >= maximumIterations { // process synchronously s.synchronous = true - // execute all non-validated tasks (no more "waiting" status) - toExecute = filterTasks(tasks, func(t *deliverTxTask) bool { - return !t.IsStatus(statusValidated) - }) + startIdx, anyLeft := s.findFirstNonValidated() + if !anyLeft { + break + } + toExecute = tasks[startIdx:] } - var err error - // execute sets statuses of tasks to either executed or aborted - if len(toExecute) > 0 { - err = s.executeAll(ctx, toExecute) - if err != nil { - return nil, err - } + if err := s.executeAll(ctx, toExecute); err != nil { + return nil, err } // validate returns any that should be re-executed // note this processes ALL tasks, not just those recently executed - toExecute, err = s.validateAll(ctx, tasks) + toExecute, err := s.validateAll(ctx, tasks) if err != nil { return nil, err } // these are retries which apply to metrics s.metrics.retries += len(toExecute) + iterations++ } + for _, mv := range s.multiVersionStores { mv.WriteLatestToStore() } @@ -434,7 +432,12 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del // ExecuteAll executes all tasks concurrently func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { + if len(tasks) == 0 { + return nil + } + ctx, span := s.traceSpan(ctx, "SchedulerExecuteAll", nil) + span.SetAttributes(attribute.Bool("synchronous", s.synchronous)) defer span.End() // validationWg waits for all validations to complete @@ -467,7 +470,6 @@ func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) spanCtx, span := s.tracingInfo.StartWithContext(name, ctx.TraceSpanContext()) if task != nil { span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(task.Request.Tx)))) - span.SetAttributes(attribute.Int("txIndex", task.Index)) span.SetAttributes(attribute.Int("absoluteIndex", task.AbsoluteIndex)) span.SetAttributes(attribute.Int("txIncarnation", task.Incarnation)) } @@ -514,6 +516,17 @@ func (s *scheduler) executeTask(task *deliverTxTask) { defer dSpan.End() task.Ctx = dCtx + // in the synchronous case, we only want to re-execute tasks that need re-executing + // if already validated, then this does another validation + if s.synchronous && task.IsStatus(statusValidated) { + s.shouldRerun(task) + if task.IsStatus(statusValidated) { + return + } + task.Reset() + task.Increment() + } + s.prepareTask(task) resp := s.deliverTx(task.Ctx, task.Request, task.SdkTx, task.Checksum) @@ -528,10 +541,13 @@ func (s *scheduler) executeTask(task *deliverTxTask) { abort, ok := <-task.AbortCh if ok { // if there is an abort item that means we need to wait on the dependent tx - task.SetStatus(statusWaiting) + task.SetStatus(statusAborted) task.Abort = &abort task.AppendDependencies([]int{abort.DependentTxIdx}) } + for _, v := range task.VersionStores { + v.WriteEstimatesToMultiVersionStore() + } return } diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index 63231d1ef..738ea2986 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -4,10 +4,12 @@ import ( "context" "errors" "fmt" + "math/rand" "net/http" _ "net/http/pprof" "runtime" "testing" + "time" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/types" @@ -322,6 +324,77 @@ func TestProcessAll(t *testing.T) { }, expectedErr: nil, }, + { + name: "Test every tx accesses same key with estimated writesets", + workers: 50, + runs: 1, + addStores: true, + requests: requestListWithEstimatedWritesets(1000), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + + // write to the store with this tx's index + kv.Set(itemKey, req.Tx) + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: val, + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + for idx, response := range res { + if idx == 0 { + require.Equal(t, "", response.Info) + } else { + // the info is what was read from the kv store by the tx + // each tx writes its own index, so the info should be the index of the previous tx + require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info) + } + } + // confirm last write made it to the parent store + latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) + require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest) + }, + expectedErr: nil, + }, + { + name: "Test every tx accesses same key with delays", + workers: 50, + runs: 1, + addStores: true, + requests: requestList(1000), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) + wait := rand.Intn(10) + time.Sleep(time.Duration(wait) * time.Millisecond) + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + time.Sleep(time.Duration(wait) * time.Millisecond) + // write to the store with this tx's index + newVal := val + fmt.Sprintf("%d", ctx.TxIndex()) + kv.Set(itemKey, []byte(newVal)) + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: newVal, + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + expected := "" + for idx, response := range res { + expected = expected + fmt.Sprintf("%d", idx) + require.Equal(t, expected, response.Info) + } + // confirm last write made it to the parent store + latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) + require.Equal(t, expected, string(latest)) + }, + expectedErr: nil, + }, } for _, tt := range tests { @@ -343,7 +416,7 @@ func TestProcessAll(t *testing.T) { } res, err := s.ProcessAll(ctx, tt.requests) - require.LessOrEqual(t, s.(*scheduler).maxIncarnation, maximumIncarnation) + require.LessOrEqual(t, s.(*scheduler).maxIncarnation, maximumIterations) require.Len(t, res, len(tt.requests)) if !errors.Is(err, tt.expectedErr) {