Skip to content

Commit

Permalink
[occ] Add basic worker task and scheduler shell (#328)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
- Adds a basic scheduler shell (see TODOs)
- Adds a basic task definition with request/response/index
- Listens to abort channel after an execution to determine conflict

## Testing performed to validate your change
- Compiles (holding off until shape is validated)
- Basic Unit Test for ProcessAll
  • Loading branch information
stevenlanders authored and udpatil committed Feb 27, 2024
1 parent bbe4ae6 commit 703e5ac
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 10 deletions.
25 changes: 18 additions & 7 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/cosmos/cosmos-sdk/tasks"
"os"
"sort"
"strings"
Expand Down Expand Up @@ -239,13 +240,23 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
// DeliverTxBatch executes multiple txs
// TODO: support occ logic with scheduling
func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) {
// TODO: replace with actual scheduler logic
// This is stubbed so that it does something sensible
responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
//TODO: inject multiversion store without import cycle (figure out right place for this)
// ctx = ctx.WithMultiVersionStore(multiversion.NewMultiVersionStore())

reqList := make([]abci.RequestDeliverTx, 0, len(req.TxEntries))
for _, tx := range req.TxEntries {
responses = append(responses, &sdk.DeliverTxResult{
Response: app.DeliverTx(ctx, tx.Request),
})
reqList = append(reqList, tx.Request)
}

scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.DeliverTx)
txRes, err := scheduler.ProcessAll(ctx, reqList)
if err != nil {
//TODO: handle error
}

responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
for _, tx := range txRes {
responses = append(responses, &sdk.DeliverTxResult{Response: tx})
}
return sdk.DeliverTxBatchResponse{Results: responses}
}
Expand All @@ -256,7 +267,7 @@ func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchReques
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
// TODO: (occ) this is the function called from sei-chain to perform execution of a transaction.
// We'd likely replace this with an execution task that is scheduled by the OCC scheduler
// We'd likely replace this with an execution tasks that is scheduled by the OCC scheduler
func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
defer func() {
Expand Down
187 changes: 187 additions & 0 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package tasks

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/tendermint/tendermint/abci/types"
"golang.org/x/sync/errgroup"
)

type status string

const (
// statusPending tasks are ready for execution
// all executing tasks are in pending state
statusPending status = "pending"
// statusExecuted tasks are ready for validation
// these tasks did not abort during execution
statusExecuted status = "executed"
// statusAborted means the task has been aborted
// these tasks transition to pending upon next execution
statusAborted status = "aborted"
// statusValidated means the task has been validated
// tasks in this status can be reset if an earlier task fails validation
statusValidated status = "validated"
)

type deliverTxTask struct {
Status status
Index int
Incarnation int
Request types.RequestDeliverTx
Response *types.ResponseDeliverTx
}

// Scheduler processes tasks concurrently
type Scheduler interface {
ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error)
}

type scheduler struct {
deliverTx func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx)
workers int
}

// NewScheduler creates a new scheduler
func NewScheduler(workers int, deliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) (res types.ResponseDeliverTx)) Scheduler {
return &scheduler{
workers: workers,
deliverTx: deliverTxFunc,
}
}

func toTasks(reqs []types.RequestDeliverTx) []*deliverTxTask {
res := make([]*deliverTxTask, 0, len(reqs))
for idx, r := range reqs {
res = append(res, &deliverTxTask{
Request: r,
Index: idx,
Status: statusPending,
})
}
return res
}

func collectResponses(tasks []*deliverTxTask) []types.ResponseDeliverTx {
res := make([]types.ResponseDeliverTx, 0, len(tasks))
for _, t := range tasks {
res = append(res, *t.Response)
}
return res
}

func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []types.RequestDeliverTx) ([]types.ResponseDeliverTx, error) {
tasks := toTasks(reqs)
toExecute := tasks
for len(toExecute) > 0 {

// execute sets statuses of tasks to either executed or aborted
err := s.executeAll(ctx, toExecute)
if err != nil {
return nil, err
}

// validate returns any that should be re-executed
// note this processes ALL tasks, not just those recently executed
toExecute, err = s.validateAll(ctx, tasks)
if err != nil {
return nil, err
}
for _, t := range toExecute {
t.Incarnation++
t.Status = statusPending
t.Response = nil
//TODO: reset anything that needs resetting
}
}
return collectResponses(tasks), nil
}

// TODO: validate each tasks
// TODO: return list of tasks that are invalid
func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*deliverTxTask, error) {
var res []*deliverTxTask

// find first non-validated entry
var startIdx int
for idx, t := range tasks {
if t.Status != statusValidated {
startIdx = idx
break
}
}

for i := startIdx; i < len(tasks); i++ {
// any aborted tx is known to be suspect here
if tasks[i].Status == statusAborted {
res = append(res, tasks[i])
} else {
//TODO: validate the tasks and add it if invalid
//TODO: create and handle abort for validation
tasks[i].Status = statusValidated
}
}
return res, nil
}

// ExecuteAll executes all tasks concurrently
// Tasks are updated with their status
// TODO: retries on aborted tasks
// TODO: error scenarios
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
ch := make(chan *deliverTxTask, len(tasks))
grp, gCtx := errgroup.WithContext(ctx.Context())

// a workers value < 1 means no limit
workers := s.workers
if s.workers < 1 {
workers = len(tasks)
}

for i := 0; i < workers; i++ {
grp.Go(func() error {
for {
select {
case <-gCtx.Done():
return gCtx.Err()
case task, ok := <-ch:
if !ok {
return nil
}
//TODO: ensure version multi store is on context
// buffered so it doesn't block on write
// abortCh := make(chan occ.Abort, 1)

//TODO: consume from abort in non-blocking way (give it a length)
resp := s.deliverTx(ctx, task.Request)

// close(abortCh)

//if _, ok := <-abortCh; ok {
// tasks.status = TaskStatusAborted
// continue
//}

task.Status = statusExecuted
task.Response = &resp
}
}
})
}
grp.Go(func() error {
defer close(ch)
for _, task := range tasks {
select {
case <-gCtx.Done():
return gCtx.Err()
case ch <- task:
}
}
return nil
})

if err := grp.Wait(); err != nil {
return err
}

return nil
}
59 changes: 59 additions & 0 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tasks

import (
"context"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/abci/types"
"testing"
)

type mockDeliverTxFunc func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx

func (f mockDeliverTxFunc) DeliverTx(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return f(ctx, req)
}

func requestList(n int) []types.RequestDeliverTx {
tasks := make([]types.RequestDeliverTx, n)
for i := 0; i < n; i++ {
tasks[i] = types.RequestDeliverTx{}
}
return tasks
}

func TestProcessAll(t *testing.T) {
tests := []struct {
name string
workers int
requests []types.RequestDeliverTx
deliverTxFunc mockDeliverTxFunc
expectedErr error
}{
{
name: "All tasks processed without aborts",
workers: 2,
requests: requestList(5),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return types.ResponseDeliverTx{}
},
expectedErr: nil,
},
//TODO: Add more test cases
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := NewScheduler(tt.workers, tt.deliverTxFunc.DeliverTx)
ctx := sdk.Context{}.WithContext(context.Background())

res, err := s.ProcessAll(ctx, tt.requests)
if err != tt.expectedErr {
t.Errorf("Expected error %v, got %v", tt.expectedErr, err)
} else {
// response for each request exists
assert.Len(t, res, len(tt.requests))
}
})
}
}
8 changes: 5 additions & 3 deletions types/occ/scheduler.go → types/occ/types.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package scheduler
package occ

import "errors"
import (
"errors"
)

var (
ErrReadEstimate = errors.New("multiversion store value contains estimate, cannot read, aborting")
)

// define the return struct for abort due to conflict
// Abort contains the information for a transaction's conflict
type Abort struct {
DependentTxIdx int
Err error
Expand Down

0 comments on commit 703e5ac

Please sign in to comment.