diff --git a/tasks/scheduler.go b/tasks/scheduler.go index de0466f55..c0e7c143d 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -388,7 +388,6 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task defer eSpan.End() task.Ctx = eCtx - s.prepareTask(task) s.executeTask(task) s.DoValidate(func() { @@ -441,27 +440,58 @@ func (s *scheduler) prepareTask(task *deliverTxTask) { task.Ctx = ctx } -// executeTask executes a single task func (s *scheduler) executeTask(task *deliverTxTask) { - dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerDeliverTx", task) + dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerExecuteTask", task) defer dSpan.End() task.Ctx = dCtx - resp := s.deliverTx(task.Ctx, task.Request) + s.prepareTask(task) + + // Channel to signal the completion of deliverTx + doneCh := make(chan types.ResponseDeliverTx) + + // Run deliverTx in a separate goroutine + go func() { + doneCh <- s.deliverTx(task.Ctx, task.Request) + }() + + // Flag to mark if abort has happened + var abortOccurred bool + + var wg sync.WaitGroup + wg.Add(1) + + var abort *occ.Abort + // Drain the AbortCh in a non-blocking way + go func() { + defer wg.Done() + for abt := range task.AbortCh { + if !abortOccurred { + abortOccurred = true + abort = &abt + } + } + }() + + // Wait for deliverTx to complete + resp := <-doneCh close(task.AbortCh) - if abt, ok := <-task.AbortCh; ok { + wg.Wait() + + // If abort has occurred, return, else set the response and status + if abortOccurred { task.Status = statusAborted - task.Abort = &abt + task.Abort = abort return } + task.Status = statusExecuted + task.Response = &resp + // write from version store to multiversion stores for _, v := range task.VersionStores { v.WriteToMultiVersionStore() } - - task.Status = statusExecuted - task.Response = &resp } diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index 886bbe5ce..298143584 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "net/http" + _ "net/http/pprof" + "runtime" "testing" "github.com/stretchr/testify/require" @@ -52,11 +55,26 @@ func initTestCtx(injectStores bool) sdk.Context { return ctx } +func generateTasks(count int) []*deliverTxTask { + var res []*deliverTxTask + for i := 0; i < count; i++ { + res = append(res, &deliverTxTask{Index: i}) + } + return res +} + func TestProcessAll(t *testing.T) { + runtime.SetBlockProfileRate(1) + + go func() { + http.ListenAndServe("localhost:6060", nil) + }() + tests := []struct { name string workers int runs int + before func(ctx sdk.Context) requests []*sdk.DeliverTxEntry deliverTxFunc mockDeliverTxFunc addStores bool @@ -64,18 +82,79 @@ func TestProcessAll(t *testing.T) { assertions func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) }{ { - name: "Test every tx accesses same key", + name: "Test zero txs does not hang", + workers: 20, + runs: 10, + addStores: true, + requests: requestList(0), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { + panic("should not deliver") + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + require.Len(t, res, 0) + }, + expectedErr: nil, + }, + { + name: "Test tx writing to a store that another tx is iterating", workers: 50, - runs: 50, + runs: 1, + requests: requestList(500), addStores: true, - requests: requestList(100), + before: func(ctx sdk.Context) { + kv := ctx.MultiStore().GetKVStore(testStoreKey) + // initialize 100 test values in the base kv store so iterating isn't too fast + for i := 0; i < 10; i++ { + kv.Set([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i))) + } + }, + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { + kv := ctx.MultiStore().GetKVStore(testStoreKey) + if ctx.TxIndex()%2 == 0 { + // For even-indexed transactions, write to the store + kv.Set(req.Tx, req.Tx) + return types.ResponseDeliverTx{ + Info: "write", + } + } else { + // For odd-indexed transactions, iterate over the store + + // just write so we have more writes going on + kv.Set(req.Tx, req.Tx) + iterator := kv.Iterator(nil, nil) + defer iterator.Close() + for ; iterator.Valid(); iterator.Next() { + // Do nothing, just iterate + } + return types.ResponseDeliverTx{ + Info: "iterate", + } + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + for idx, response := range res { + if idx%2 == 0 { + require.Equal(t, "write", response.Info) + } else { + require.Equal(t, "iterate", response.Info) + } + } + }, + expectedErr: nil, + }, + { + name: "Test no overlap txs", + workers: 20, + runs: 10, + addStores: true, + requests: requestList(1000), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) - val := string(kv.Get(itemKey)) // write to the store with this tx's index - kv.Set(itemKey, req.Tx) + kv.Set(req.Tx, req.Tx) + val := string(kv.Get(req.Tx)) // return what was read from the store (final attempt should be index-1) return types.ResponseDeliverTx{ @@ -84,26 +163,22 @@ func TestProcessAll(t *testing.T) { }, assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { for idx, response := range res { - if idx == 0 { - require.Equal(t, "", response.Info) - } else { - // the info is what was read from the kv store by the tx - // each tx writes its own index, so the info should be the index of the previous tx - require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info) - } + require.Equal(t, fmt.Sprintf("%d", idx), response.Info) + } + store := ctx.MultiStore().GetKVStore(testStoreKey) + for i := 0; i < len(res); i++ { + val := store.Get([]byte(fmt.Sprintf("%d", i))) + require.Equal(t, []byte(fmt.Sprintf("%d", i)), val) } - // confirm last write made it to the parent store - latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) - require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest) }, expectedErr: nil, }, { - name: "Test few workers many txs", - workers: 5, - runs: 10, + name: "Test every tx accesses same key", + workers: 50, + runs: 1, addStores: true, - requests: requestList(50), + requests: requestList(1000), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey) @@ -136,9 +211,9 @@ func TestProcessAll(t *testing.T) { { name: "Test no stores on context should not panic", workers: 50, - runs: 1, + runs: 10, addStores: false, - requests: requestList(50), + requests: requestList(10), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { return types.ResponseDeliverTx{ Info: fmt.Sprintf("%d", ctx.TxIndex()), @@ -167,6 +242,10 @@ func TestProcessAll(t *testing.T) { s := NewScheduler(tt.workers, ti, tt.deliverTxFunc) ctx := initTestCtx(tt.addStores) + if tt.before != nil { + tt.before(ctx) + } + res, err := s.ProcessAll(ctx, tt.requests) require.Len(t, res, len(tt.requests))