Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCC] Fix hang where abort channel blocks iterator #379

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task
defer eSpan.End()
task.Ctx = eCtx

s.prepareTask(task)
s.executeTask(task)

s.DoValidate(func() {
Expand Down Expand Up @@ -441,27 +440,58 @@ func (s *scheduler) prepareTask(task *deliverTxTask) {
task.Ctx = ctx
}

// executeTask executes a single task
func (s *scheduler) executeTask(task *deliverTxTask) {
dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerDeliverTx", task)
dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerExecuteTask", task)
defer dSpan.End()
task.Ctx = dCtx

resp := s.deliverTx(task.Ctx, task.Request)
s.prepareTask(task)

// Channel to signal the completion of deliverTx
doneCh := make(chan types.ResponseDeliverTx)

// Run deliverTx in a separate goroutine
go func() {
doneCh <- s.deliverTx(task.Ctx, task.Request)
}()

// Flag to mark if abort has happened
var abortOccurred bool

var wg sync.WaitGroup
wg.Add(1)

var abort *occ.Abort
// Drain the AbortCh in a non-blocking way
go func() {
defer wg.Done()
for abt := range task.AbortCh {
if !abortOccurred {
abortOccurred = true
abort = &abt
}
}
}()

// Wait for deliverTx to complete
resp := <-doneCh

close(task.AbortCh)

if abt, ok := <-task.AbortCh; ok {
wg.Wait()

// If abort has occurred, return, else set the response and status
if abortOccurred {
task.Status = statusAborted
task.Abort = &abt
task.Abort = abort
return
}

task.Status = statusExecuted
task.Response = &resp

// write from version store to multiversion stores
for _, v := range task.VersionStores {
v.WriteToMultiVersionStore()
}

task.Status = statusExecuted
task.Response = &resp
}
121 changes: 100 additions & 21 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
"net/http"
_ "net/http/pprof"
"runtime"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -52,30 +55,106 @@ func initTestCtx(injectStores bool) sdk.Context {
return ctx
}

func generateTasks(count int) []*deliverTxTask {
var res []*deliverTxTask
for i := 0; i < count; i++ {
res = append(res, &deliverTxTask{Index: i})
}
return res
}

func TestProcessAll(t *testing.T) {
runtime.SetBlockProfileRate(1)

go func() {
http.ListenAndServe("localhost:6060", nil)
}()

tests := []struct {
name string
workers int
runs int
before func(ctx sdk.Context)
requests []*sdk.DeliverTxEntry
deliverTxFunc mockDeliverTxFunc
addStores bool
expectedErr error
assertions func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx)
}{
{
name: "Test every tx accesses same key",
name: "Test zero txs does not hang",
workers: 20,
runs: 10,
addStores: true,
requests: requestList(0),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
panic("should not deliver")
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
require.Len(t, res, 0)
},
expectedErr: nil,
},
{
name: "Test tx writing to a store that another tx is iterating",
workers: 50,
runs: 50,
runs: 1,
requests: requestList(500),
addStores: true,
requests: requestList(100),
before: func(ctx sdk.Context) {
kv := ctx.MultiStore().GetKVStore(testStoreKey)
// initialize 100 test values in the base kv store so iterating isn't too fast
for i := 0; i < 10; i++ {
kv.Set([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i)))
}
},
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
kv := ctx.MultiStore().GetKVStore(testStoreKey)
if ctx.TxIndex()%2 == 0 {
// For even-indexed transactions, write to the store
kv.Set(req.Tx, req.Tx)
return types.ResponseDeliverTx{
Info: "write",
}
} else {
// For odd-indexed transactions, iterate over the store

// just write so we have more writes going on
kv.Set(req.Tx, req.Tx)
iterator := kv.Iterator(nil, nil)
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
// Do nothing, just iterate
}
return types.ResponseDeliverTx{
Info: "iterate",
}
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
if idx%2 == 0 {
require.Equal(t, "write", response.Info)
} else {
require.Equal(t, "iterate", response.Info)
}
}
},
expectedErr: nil,
},
{
name: "Test no overlap txs",
workers: 20,
runs: 10,
addStores: true,
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))

// write to the store with this tx's index
kv.Set(itemKey, req.Tx)
kv.Set(req.Tx, req.Tx)
val := string(kv.Get(req.Tx))

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Expand All @@ -84,26 +163,22 @@ func TestProcessAll(t *testing.T) {
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
if idx == 0 {
require.Equal(t, "", response.Info)
} else {
// the info is what was read from the kv store by the tx
// each tx writes its own index, so the info should be the index of the previous tx
require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info)
}
require.Equal(t, fmt.Sprintf("%d", idx), response.Info)
}
store := ctx.MultiStore().GetKVStore(testStoreKey)
for i := 0; i < len(res); i++ {
val := store.Get([]byte(fmt.Sprintf("%d", i)))
require.Equal(t, []byte(fmt.Sprintf("%d", i)), val)
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest)
},
expectedErr: nil,
},
{
name: "Test few workers many txs",
workers: 5,
runs: 10,
name: "Test every tx accesses same key",
workers: 50,
runs: 1,
addStores: true,
requests: requestList(50),
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
Expand Down Expand Up @@ -136,9 +211,9 @@ func TestProcessAll(t *testing.T) {
{
name: "Test no stores on context should not panic",
workers: 50,
runs: 1,
runs: 10,
addStores: false,
requests: requestList(50),
requests: requestList(10),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return types.ResponseDeliverTx{
Info: fmt.Sprintf("%d", ctx.TxIndex()),
Expand Down Expand Up @@ -167,6 +242,10 @@ func TestProcessAll(t *testing.T) {
s := NewScheduler(tt.workers, ti, tt.deliverTxFunc)
ctx := initTestCtx(tt.addStores)

if tt.before != nil {
tt.before(ctx)
}

res, err := s.ProcessAll(ctx, tt.requests)
require.Len(t, res, len(tt.requests))

Expand Down
Loading