Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCC] Fix deadlock #356

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ type scheduler struct {
multiVersionStores map[sdk.StoreKey]multiversion.MultiVersionStore
tracingInfo *tracing.Info
allTasks []*deliverTxTask
workCh chan func()
executeCh chan func()
validateCh chan func()
}

// NewScheduler creates a new scheduler
Expand All @@ -92,23 +93,27 @@ func (s *scheduler) invalidateTask(task *deliverTxTask) {
}
}

func (s *scheduler) Start(ctx context.Context, workers int) {
func start(ctx context.Context, ch chan func(), workers int) {
for i := 0; i < workers; i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case work := <-s.workCh:
case work := <-ch:
work()
}
}
}()
}
}

func (s *scheduler) Do(work func()) {
s.workCh <- work
func (s *scheduler) DoValidate(work func()) {
s.validateCh <- work
}

func (s *scheduler) DoExecute(work func()) {
s.executeCh <- work
}

func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) {
Expand Down Expand Up @@ -200,7 +205,8 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t
s.PrefillEstimates(ctx, reqs)
tasks := toTasks(reqs)
s.allTasks = tasks
s.workCh = make(chan func(), len(tasks))
s.executeCh = make(chan func(), len(tasks))
s.validateCh = make(chan func(), len(tasks))

workers := s.workers
if s.workers < 1 {
Expand All @@ -209,7 +215,8 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t

workerCtx, cancel := context.WithCancel(ctx.Context())
defer cancel()
s.Start(workerCtx, workers)
start(workerCtx, s.executeCh, workers)
start(workerCtx, s.validateCh, workers)

toExecute := tasks
for !allValidated(tasks) {
Expand Down Expand Up @@ -301,7 +308,7 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del
for i := 0; i < len(tasks); i++ {
t := tasks[i]
wg.Add(1)
s.Do(func() {
s.DoValidate(func() {
defer wg.Done()
if !s.validateTask(ctx, t) {
t.Reset()
Expand Down Expand Up @@ -331,7 +338,7 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {

for _, task := range tasks {
t := task
s.Do(func() {
s.DoExecute(func() {
s.prepareAndRunTask(validationWg, ctx, t)
})
}
Expand All @@ -347,7 +354,7 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task
task.Ctx = ctx

s.executeTask(task.Ctx, task)
s.Do(func() {
go func() {
defer wg.Done()
defer close(task.ValidateCh)
// wait on previous task to finish validation
Expand All @@ -358,7 +365,7 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task
task.Reset()
}
task.ValidateCh <- struct{}{}
})
}()
}

//func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) (sdk.Context, trace.Span) {
Expand Down
2 changes: 1 addition & 1 deletion tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestProcessAll(t *testing.T) {
workers: 50,
runs: 50,
addStores: true,
requests: requestList(50),
requests: requestList(100),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
Expand Down
Loading