Skip to content

Commit

Permalink
[occ] Fix situation where no stores causes a panic (#338)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
Some tests from sei-chain don't inject a store, and while I'm not sure
if that's a valid scenario I made the scheduler.go tolerant to the
situation to avoid introducing this assumption to the system.

## Testing performed to validate your change
New unit test confirming lack of crash
  • Loading branch information
stevenlanders authored and udpatil committed Jan 31, 2024
1 parent 60b2113 commit 1178e0b
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 52 deletions.
40 changes: 27 additions & 13 deletions baseapp/deliver_tx_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package baseapp
import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -12,15 +11,27 @@ import (

"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

func toInt(b []byte) int {
r, _ := strconv.Atoi(string(b))
return r
}
func anteHandler(capKey sdk.StoreKey, storeKey []byte) sdk.AnteHandler {
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
store := ctx.KVStore(capKey)
txTest := tx.(txTest)

if txTest.FailOnAnte {
return ctx, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "ante handler failure")
}

val := getIntFromStore(store, storeKey)
setIntOnStore(store, storeKey, val+1)

func toByteArr(i int) []byte {
return []byte(fmt.Sprintf("%d", i))
ctx.EventManager().EmitEvents(
counterEvent("ante-val", val+1),
)

return ctx, nil
}
}

func handlerKVStore(capKey sdk.StoreKey) sdk.Handler {
Expand All @@ -40,12 +51,12 @@ func handlerKVStore(capKey sdk.StoreKey) sdk.Handler {
store := ctx.KVStore(capKey)

// increment per-tx key (no conflict)
val := toInt(store.Get(txKey))
store.Set(txKey, toByteArr(val+1))
val := getIntFromStore(store, txKey)
setIntOnStore(store, txKey, val+1)

// increment shared key
sharedVal := toInt(store.Get(sharedKey))
store.Set(sharedKey, toByteArr(sharedVal+1))
sharedVal := getIntFromStore(store, sharedKey)
setIntOnStore(store, sharedKey, sharedVal+1)

// Emit an event with the incremented value and the unique ID
ctx.EventManager().EmitEvent(
Expand Down Expand Up @@ -75,8 +86,11 @@ func requireAttribute(t *testing.T, evts []abci.Event, name string, val string)

func TestDeliverTxBatch(t *testing.T) {
// test increments in the ante
//anteKey := []byte("ante-key")
anteOpt := func(bapp *BaseApp) {}
anteKey := []byte("ante-key")

anteOpt := func(bapp *BaseApp) {
bapp.SetAnteHandler(anteHandler(capKey1, anteKey))
}

// test increments in the handler
routerOpt := func(bapp *BaseApp) {
Expand Down
31 changes: 17 additions & 14 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,24 +272,27 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
for _, task := range tasks {
// initialize the context
ctx = ctx.WithTxIndex(task.Index)

// non-blocking
cms := ctx.MultiStore().CacheMultiStore()
abortCh := make(chan occ.Abort, len(s.multiVersionStores))

// init version stores by store key
vs := make(map[store.StoreKey]*multiversion.VersionIndexedStore)
for storeKey, mvs := range s.multiVersionStores {
vs[storeKey] = mvs.VersionedIndexedStore(task.Index, task.Incarnation, abortCh)
}
// if there are no stores, don't try to wrap, because there's nothing to wrap
if len(s.multiVersionStores) > 0 {
// non-blocking
cms := ctx.MultiStore().CacheMultiStore()

// save off version store so we can ask it things later
task.VersionStores = vs
ms := cms.SetKVStores(func(k store.StoreKey, kvs sdk.KVStore) store.CacheWrap {
return vs[k]
})
// init version stores by store key
vs := make(map[store.StoreKey]*multiversion.VersionIndexedStore)
for storeKey, mvs := range s.multiVersionStores {
vs[storeKey] = mvs.VersionedIndexedStore(task.Index, task.Incarnation, abortCh)
}

ctx = ctx.WithMultiStore(ms)
// save off version store so we can ask it things later
task.VersionStores = vs
ms := cms.SetKVStores(func(k store.StoreKey, kvs sdk.KVStore) store.CacheWrap {
return vs[k]
})

ctx = ctx.WithMultiStore(ms)
}

task.AbortCh = abortCh
task.Ctx = ctx
Expand Down
77 changes: 52 additions & 25 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"errors"
"fmt"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"testing"

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/types"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
sdk "github.com/cosmos/cosmos-sdk/types"
)
Expand All @@ -31,14 +31,16 @@ func requestList(n int) []types.RequestDeliverTx {
return tasks
}

func initTestCtx() sdk.Context {
func initTestCtx(injectStores bool) sdk.Context {
ctx := sdk.Context{}.WithContext(context.Background())
db := dbm.NewMemDB()
mem := dbadapter.Store{DB: db}
stores := make(map[sdk.StoreKey]sdk.CacheWrapper)
stores[testStoreKey] = cachekv.NewStore(mem, testStoreKey, 1000)
keys := make(map[string]sdk.StoreKey)
keys[testStoreKey.Name()] = testStoreKey
stores := make(map[sdk.StoreKey]sdk.CacheWrapper)
db := dbm.NewMemDB()
if injectStores {
mem := dbadapter.Store{DB: db}
stores[testStoreKey] = cachekv.NewStore(mem, testStoreKey, 1000)
keys[testStoreKey.Name()] = testStoreKey
}
store := cachemulti.NewStore(db, stores, keys, nil, nil, nil)
ctx = ctx.WithMultiStore(&store)
return ctx
Expand All @@ -51,13 +53,16 @@ func TestProcessAll(t *testing.T) {
runs int
requests []types.RequestDeliverTx
deliverTxFunc mockDeliverTxFunc
addStores bool
expectedErr error
assertions func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx)
}{
{
name: "Test for conflicts",
workers: 50,
runs: 25,
requests: requestList(50),
name: "Test every tx accesses same key",
workers: 50,
runs: 25,
addStores: true,
requests: requestList(50),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
Expand All @@ -71,6 +76,38 @@ func TestProcessAll(t *testing.T) {
Info: val,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
if idx == 0 {
require.Equal(t, "", response.Info)
} else {
// the info is what was read from the kv store by the tx
// each tx writes its own index, so the info should be the index of the previous tx
require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info)
}
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, []byte("49"), latest)
},
expectedErr: nil,
},
{
name: "Test no stores on context should not panic",
workers: 50,
runs: 1,
addStores: false,
requests: requestList(50),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return types.ResponseDeliverTx{
Info: fmt.Sprintf("%d", ctx.TxIndex()),
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
require.Equal(t, fmt.Sprintf("%d", idx), response.Info)
}
},
expectedErr: nil,
},
}
Expand All @@ -79,25 +116,15 @@ func TestProcessAll(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
for i := 0; i < tt.runs; i++ {
s := NewScheduler(tt.workers, tt.deliverTxFunc)
ctx := initTestCtx()
ctx := initTestCtx(tt.addStores)

res, err := s.ProcessAll(ctx, tt.requests)
require.Len(t, res, len(tt.requests))

if !errors.Is(err, tt.expectedErr) {
t.Errorf("Expected error %v, got %v", tt.expectedErr, err)
} else {
require.Len(t, res, len(tt.requests))
for idx, response := range res {
if idx == 0 {
require.Equal(t, "", response.Info)
} else {
// the info is what was read from the kv store by the tx
// each tx writes its own index, so the info should be the index of the previous tx
require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info)
}
}
// confirm last write made it to the parent store
res := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, []byte(fmt.Sprintf("%d", len(tt.requests)-1)), res)
tt.assertions(t, ctx, res)
}
}
})
Expand Down

0 comments on commit 1178e0b

Please sign in to comment.