diff --git a/tasks/scheduler.go b/tasks/scheduler.go index 4de36f166..d3f5d654e 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -72,7 +72,8 @@ type scheduler struct { multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore tracingInfo *tracing.Info allTasks []*deliverTxTask - workCh chan func() + executeCh chan func() + validateCh chan func() } // NewScheduler creates a new scheduler @@ -92,14 +93,14 @@ func (s *scheduler) invalidateTask(task *deliverTxTask) { } } -func (s *scheduler) Start(ctx context.Context, workers int) { +func start(ctx context.Context, ch chan func(), workers int) { for i := 0; i < workers; i++ { go func() { for { select { case <-ctx.Done(): return - case work := <-s.workCh: + case work := <-ch: work() } } @@ -107,8 +108,12 @@ func (s *scheduler) Start(ctx context.Context, workers int) { } } -func (s *scheduler) Do(work func()) { - s.workCh <- work +func (s *scheduler) DoValidate(work func()) { + s.validateCh <- work +} + +func (s *scheduler) DoExecute(work func()) { + s.executeCh <- work } func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { @@ -200,7 +205,8 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t s.PrefillEstimates(ctx, reqs) tasks := toTasks(reqs) s.allTasks = tasks - s.workCh = make(chan func(), len(tasks)) + s.executeCh = make(chan func(), len(tasks)) + s.validateCh = make(chan func(), len(tasks)) workers := s.workers if s.workers < 1 { @@ -209,7 +215,8 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t workerCtx, cancel := context.WithCancel(ctx.Context()) defer cancel() - s.Start(workerCtx, workers) + start(workerCtx, s.executeCh, workers) + start(workerCtx, s.validateCh, workers) toExecute := tasks for !allValidated(tasks) { @@ -301,7 +308,7 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del for i := 0; i < len(tasks); i++ { t := tasks[i] wg.Add(1) - s.Do(func() { + s.DoValidate(func() { defer wg.Done() if !s.validateTask(ctx, t) { t.Reset() @@ -331,7 +338,7 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { for _, task := range tasks { t := task - s.Do(func() { + s.DoExecute(func() { s.prepareAndRunTask(validationWg, ctx, t) }) } @@ -347,7 +354,7 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task task.Ctx = ctx s.executeTask(task.Ctx, task) - s.Do(func() { + go func() { defer wg.Done() defer close(task.ValidateCh) // wait on previous task to finish validation @@ -358,7 +365,7 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task task.Reset() } task.ValidateCh <- struct{}{} - }) + }() } //func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) (sdk.Context, trace.Span) { diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index c6427a68b..42db778cb 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -68,7 +68,7 @@ func TestProcessAll(t *testing.T) { workers: 50, runs: 50, addStores: true, - requests: requestList(50), + requests: requestList(100), deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { // all txs read and write to the same key to maximize conflicts kv := ctx.MultiStore().GetKVStore(testStoreKey)