diff --git a/baseapp/abci.go b/baseapp/abci.go index 6b1be63ba..ffe0dd49a 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/cosmos/cosmos-sdk/tasks" "os" "sort" "strings" @@ -239,13 +240,23 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc // DeliverTxBatch executes multiple txs // TODO: support occ logic with scheduling func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) { - // TODO: replace with actual scheduler logic - // This is stubbed so that it does something sensible - responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries)) + //TODO: inject multiversion store without import cycle (figure out right place for this) + // ctx = ctx.WithMultiVersionStore(multiversion.NewMultiVersionStore()) + + reqList := make([]abci.RequestDeliverTx, 0, len(req.TxEntries)) for _, tx := range req.TxEntries { - responses = append(responses, &sdk.DeliverTxResult{ - Response: app.DeliverTx(ctx, tx.Request), - }) + reqList = append(reqList, tx.Request) + } + + scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.DeliverTx) + txRes, err := scheduler.ProcessAll(ctx, reqList) + if err != nil { + //TODO: handle error + } + + responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries)) + for _, tx := range txRes { + responses = append(responses, &sdk.DeliverTxResult{Response: tx}) } return sdk.DeliverTxBatchResponse{Results: responses} } @@ -256,7 +267,7 @@ func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchReques // Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant // gas execution context. // TODO: (occ) this is the function called from sei-chain to perform execution of a transaction. -// We'd likely replace this with an execution task that is scheduled by the OCC scheduler +// We'd likely replace this with an execution tasks that is scheduled by the OCC scheduler func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) { defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx") defer func() { diff --git a/tasks/scheduler.go b/tasks/scheduler.go new file mode 100644 index 000000000..c8b063fe2 --- /dev/null +++ b/tasks/scheduler.go @@ -0,0 +1,187 @@ +package tasks + +import ( + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/tendermint/tendermint/abci/types" + "golang.org/x/sync/errgroup" +) + +type status string + +const ( + // statusPending tasks are ready for execution + // all executing tasks are in pending state + statusPending status = "pending" + // statusExecuted tasks are ready for validation + // these tasks did not abort during execution + statusExecuted status = "executed" + // statusAborted means the task has been aborted + // these tasks transition to pending upon next execution + statusAborted status = "aborted" + // statusValidated means the task has been validated + // tasks in this status can be reset if an earlier task fails validation + statusValidated status = "validated" +) + +type deliverTxTask struct { + Status status + Index int + Incarnation int + Request types.RequestDeliverTx + Response *types.ResponseDeliverTx +} + +// Scheduler processes tasks concurrently +type Scheduler interface { + ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) +} + +type scheduler struct { + deliverTx func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx) + workers int +} + +// NewScheduler creates a new scheduler +func NewScheduler(workers int, deliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx)) Scheduler { + return &scheduler{ + workers: workers, + deliverTx: deliverTxFunc, + } +} + +func toTasks(reqs []types.RequestDeliverTx) []*deliverTxTask { + res := make([]*deliverTxTask, 0, len(reqs)) + for idx, r := range reqs { + res = append(res, &deliverTxTask{ + Request: r, + Index: idx, + Status: statusPending, + }) + } + return res +} + +func collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx { + res := make([]types.ResponseDeliverTx, 0, len(tasks)) + for _, t := range tasks { + res = append(res, *t.Response) + } + return res +} + +func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) { + tasks := toTasks(reqs) + toExecute := tasks + for len(toExecute) > 0 { + + // execute sets statuses of tasks to either executed or aborted + err := s.executeAll(ctx, toExecute) + if err != nil { + return nil, err + } + + // validate returns any that should be re-executed + // note this processes ALL tasks, not just those recently executed + toExecute, err = s.validateAll(ctx, tasks) + if err != nil { + return nil, err + } + for _, t := range toExecute { + t.Incarnation++ + t.Status = statusPending + t.Response = nil + //TODO: reset anything that needs resetting + } + } + return collectResponses(tasks), nil +} + +// TODO: validate each tasks +// TODO: return list of tasks that are invalid +func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) { + var res []*deliverTxTask + + // find first non-validated entry + var startIdx int + for idx, t := range tasks { + if t.Status != statusValidated { + startIdx = idx + break + } + } + + for i := startIdx; i < len(tasks); i++ { + // any aborted tx is known to be suspect here + if tasks[i].Status == statusAborted { + res = append(res, tasks[i]) + } else { + //TODO: validate the tasks and add it if invalid + //TODO: create and handle abort for validation + tasks[i].Status = statusValidated + } + } + return res, nil +} + +// ExecuteAll executes all tasks concurrently +// Tasks are updated with their status +// TODO: retries on aborted tasks +// TODO: error scenarios +func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error { + ch := make(chan *deliverTxTask, len(tasks)) + grp, gCtx := errgroup.WithContext(ctx.Context()) + + // a workers value < 1 means no limit + workers := s.workers + if s.workers < 1 { + workers = len(tasks) + } + + for i := 0; i < workers; i++ { + grp.Go(func() error { + for { + select { + case <-gCtx.Done(): + return gCtx.Err() + case task, ok := <-ch: + if !ok { + return nil + } + //TODO: ensure version multi store is on context + // buffered so it doesn't block on write + // abortCh := make(chan occ.Abort, 1) + + //TODO: consume from abort in non-blocking way (give it a length) + resp := s.deliverTx(ctx, task.Request) + + // close(abortCh) + + //if _, ok := <-abortCh; ok { + // tasks.status = TaskStatusAborted + // continue + //} + + task.Status = statusExecuted + task.Response = &resp + } + } + }) + } + grp.Go(func() error { + defer close(ch) + for _, task := range tasks { + select { + case <-gCtx.Done(): + return gCtx.Err() + case ch <- task: + } + } + return nil + }) + + if err := grp.Wait(); err != nil { + return err + } + + return nil +} diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go new file mode 100644 index 000000000..ba9d97846 --- /dev/null +++ b/tasks/scheduler_test.go @@ -0,0 +1,59 @@ +package tasks + +import ( + "context" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/abci/types" + "testing" +) + +type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx + +func (f mockDeliverTxFunc) DeliverTx(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { + return f(ctx, req) +} + +func requestList(n int) []types.RequestDeliverTx { + tasks := make([]types.RequestDeliverTx, n) + for i := 0; i < n; i++ { + tasks[i] = types.RequestDeliverTx{} + } + return tasks +} + +func TestProcessAll(t *testing.T) { + tests := []struct { + name string + workers int + requests []types.RequestDeliverTx + deliverTxFunc mockDeliverTxFunc + expectedErr error + }{ + { + name: "All tasks processed without aborts", + workers: 2, + requests: requestList(5), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx { + return types.ResponseDeliverTx{} + }, + expectedErr: nil, + }, + //TODO: Add more test cases + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := NewScheduler(tt.workers, tt.deliverTxFunc.DeliverTx) + ctx := sdk.Context{}.WithContext(context.Background()) + + res, err := s.ProcessAll(ctx, tt.requests) + if err != tt.expectedErr { + t.Errorf("Expected error %v, got %v", tt.expectedErr, err) + } else { + // response for each request exists + assert.Len(t, res, len(tt.requests)) + } + }) + } +} diff --git a/types/occ/scheduler.go b/types/occ/types.go similarity index 76% rename from types/occ/scheduler.go rename to types/occ/types.go index 3905be395..de321b7cb 100644 --- a/types/occ/scheduler.go +++ b/types/occ/types.go @@ -1,12 +1,14 @@ -package scheduler +package occ -import "errors" +import ( + "errors" +) var ( ErrReadEstimate = errors.New("multiversion store value contains estimate, cannot read, aborting") ) -// define the return struct for abort due to conflict +// Abort contains the information for a transaction's conflict type Abort struct { DependentTxIdx int Err error