Skip to content

Commit

Permalink
[OCC] Fix hang where abort channel blocks iterator (#379)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
- instead of assuming one thing will arrive to the abort channel, drain
it
## Testing performed to validate your change
- new unit test captures situation (tests iterator)
  • Loading branch information
stevenlanders authored and udpatil committed Jan 18, 2024
1 parent af75c70 commit 2c85e05
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 30 deletions.
48 changes: 39 additions & 9 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task
defer eSpan.End()
task.Ctx = eCtx

s.prepareTask(task)
s.executeTask(task)

s.DoValidate(func() {
Expand Down Expand Up @@ -441,27 +440,58 @@ func (s *scheduler) prepareTask(task *deliverTxTask) {
task.Ctx = ctx
}

// executeTask executes a single task
func (s *scheduler) executeTask(task *deliverTxTask) {
dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerDeliverTx", task)
dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerExecuteTask", task)
defer dSpan.End()
task.Ctx = dCtx

resp := s.deliverTx(task.Ctx, task.Request)
s.prepareTask(task)

// Channel to signal the completion of deliverTx
doneCh := make(chan types.ResponseDeliverTx)

// Run deliverTx in a separate goroutine
go func() {
doneCh <- s.deliverTx(task.Ctx, task.Request)
}()

// Flag to mark if abort has happened
var abortOccurred bool

var wg sync.WaitGroup
wg.Add(1)

var abort *occ.Abort
// Drain the AbortCh in a non-blocking way
go func() {
defer wg.Done()
for abt := range task.AbortCh {
if !abortOccurred {
abortOccurred = true
abort = &abt
}
}
}()

// Wait for deliverTx to complete
resp := <-doneCh

close(task.AbortCh)

if abt, ok := <-task.AbortCh; ok {
wg.Wait()

// If abort has occurred, return, else set the response and status
if abortOccurred {
task.Status = statusAborted
task.Abort = &abt
task.Abort = abort
return
}

task.Status = statusExecuted
task.Response = &resp

// write from version store to multiversion stores
for _, v := range task.VersionStores {
v.WriteToMultiVersionStore()
}

task.Status = statusExecuted
task.Response = &resp
}
121 changes: 100 additions & 21 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
"net/http"
_ "net/http/pprof"
"runtime"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -52,30 +55,106 @@ func initTestCtx(injectStores bool) sdk.Context {
return ctx
}

func generateTasks(count int) []*deliverTxTask {
var res []*deliverTxTask
for i := 0; i < count; i++ {
res = append(res, &deliverTxTask{Index: i})
}
return res
}

func TestProcessAll(t *testing.T) {
runtime.SetBlockProfileRate(1)

go func() {
http.ListenAndServe("localhost:6060", nil)
}()

tests := []struct {
name string
workers int
runs int
before func(ctx sdk.Context)
requests []*sdk.DeliverTxEntry
deliverTxFunc mockDeliverTxFunc
addStores bool
expectedErr error
assertions func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx)
}{
{
name: "Test every tx accesses same key",
name: "Test zero txs does not hang",
workers: 20,
runs: 10,
addStores: true,
requests: requestList(0),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
panic("should not deliver")
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
require.Len(t, res, 0)
},
expectedErr: nil,
},
{
name: "Test tx writing to a store that another tx is iterating",
workers: 50,
runs: 50,
runs: 1,
requests: requestList(500),
addStores: true,
requests: requestList(100),
before: func(ctx sdk.Context) {
kv := ctx.MultiStore().GetKVStore(testStoreKey)
// initialize 100 test values in the base kv store so iterating isn't too fast
for i := 0; i < 10; i++ {
kv.Set([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i)))
}
},
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
kv := ctx.MultiStore().GetKVStore(testStoreKey)
if ctx.TxIndex()%2 == 0 {
// For even-indexed transactions, write to the store
kv.Set(req.Tx, req.Tx)
return types.ResponseDeliverTx{
Info: "write",
}
} else {
// For odd-indexed transactions, iterate over the store

// just write so we have more writes going on
kv.Set(req.Tx, req.Tx)
iterator := kv.Iterator(nil, nil)
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
// Do nothing, just iterate
}
return types.ResponseDeliverTx{
Info: "iterate",
}
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
if idx%2 == 0 {
require.Equal(t, "write", response.Info)
} else {
require.Equal(t, "iterate", response.Info)
}
}
},
expectedErr: nil,
},
{
name: "Test no overlap txs",
workers: 20,
runs: 10,
addStores: true,
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))

// write to the store with this tx's index
kv.Set(itemKey, req.Tx)
kv.Set(req.Tx, req.Tx)
val := string(kv.Get(req.Tx))

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Expand All @@ -84,26 +163,22 @@ func TestProcessAll(t *testing.T) {
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
if idx == 0 {
require.Equal(t, "", response.Info)
} else {
// the info is what was read from the kv store by the tx
// each tx writes its own index, so the info should be the index of the previous tx
require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info)
}
require.Equal(t, fmt.Sprintf("%d", idx), response.Info)
}
store := ctx.MultiStore().GetKVStore(testStoreKey)
for i := 0; i < len(res); i++ {
val := store.Get([]byte(fmt.Sprintf("%d", i)))
require.Equal(t, []byte(fmt.Sprintf("%d", i)), val)
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest)
},
expectedErr: nil,
},
{
name: "Test few workers many txs",
workers: 5,
runs: 10,
name: "Test every tx accesses same key",
workers: 50,
runs: 1,
addStores: true,
requests: requestList(50),
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
Expand Down Expand Up @@ -136,9 +211,9 @@ func TestProcessAll(t *testing.T) {
{
name: "Test no stores on context should not panic",
workers: 50,
runs: 1,
runs: 10,
addStores: false,
requests: requestList(50),
requests: requestList(10),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return types.ResponseDeliverTx{
Info: fmt.Sprintf("%d", ctx.TxIndex()),
Expand Down Expand Up @@ -167,6 +242,10 @@ func TestProcessAll(t *testing.T) {
s := NewScheduler(tt.workers, ti, tt.deliverTxFunc)
ctx := initTestCtx(tt.addStores)

if tt.before != nil {
tt.before(ctx)
}

res, err := s.ProcessAll(ctx, tt.requests)
require.Len(t, res, len(tt.requests))

Expand Down

0 comments on commit 2c85e05

Please sign in to comment.