Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[occ] Fix situation where no stores causes a panic #338

Merged
merged 4 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions baseapp/deliver_tx_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package baseapp
import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -12,15 +11,27 @@ import (

"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

func toInt(b []byte) int {
r, _ := strconv.Atoi(string(b))
return r
}
func anteHandler(capKey sdk.StoreKey, storeKey []byte) sdk.AnteHandler {
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
store := ctx.KVStore(capKey)
txTest := tx.(txTest)

if txTest.FailOnAnte {
return ctx, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "ante handler failure")
}

val := getIntFromStore(store, storeKey)
setIntOnStore(store, storeKey, val+1)

func toByteArr(i int) []byte {
return []byte(fmt.Sprintf("%d", i))
ctx.EventManager().EmitEvents(
counterEvent("ante-val", val+1),
)

return ctx, nil
}
}

func handlerKVStore(capKey sdk.StoreKey) sdk.Handler {
Expand All @@ -40,12 +51,12 @@ func handlerKVStore(capKey sdk.StoreKey) sdk.Handler {
store := ctx.KVStore(capKey)

// increment per-tx key (no conflict)
val := toInt(store.Get(txKey))
store.Set(txKey, toByteArr(val+1))
val := getIntFromStore(store, txKey)
setIntOnStore(store, txKey, val+1)

// increment shared key
sharedVal := toInt(store.Get(sharedKey))
store.Set(sharedKey, toByteArr(sharedVal+1))
sharedVal := getIntFromStore(store, sharedKey)
setIntOnStore(store, sharedKey, sharedVal+1)

// Emit an event with the incremented value and the unique ID
ctx.EventManager().EmitEvent(
Expand Down Expand Up @@ -75,8 +86,11 @@ func requireAttribute(t *testing.T, evts []abci.Event, name string, val string)

func TestDeliverTxBatch(t *testing.T) {
// test increments in the ante
//anteKey := []byte("ante-key")
anteOpt := func(bapp *BaseApp) {}
anteKey := []byte("ante-key")

anteOpt := func(bapp *BaseApp) {
bapp.SetAnteHandler(anteHandler(capKey1, anteKey))
}

// test increments in the handler
routerOpt := func(bapp *BaseApp) {
Expand Down
31 changes: 17 additions & 14 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,24 +272,27 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
for _, task := range tasks {
// initialize the context
ctx = ctx.WithTxIndex(task.Index)

// non-blocking
cms := ctx.MultiStore().CacheMultiStore()
abortCh := make(chan occ.Abort, len(s.multiVersionStores))

// init version stores by store key
vs := make(map[store.StoreKey]*multiversion.VersionIndexedStore)
for storeKey, mvs := range s.multiVersionStores {
vs[storeKey] = mvs.VersionedIndexedStore(task.Index, task.Incarnation, abortCh)
}
// if there are no stores, don't try to wrap, because there's nothing to wrap
if len(s.multiVersionStores) > 0 {
// non-blocking
cms := ctx.MultiStore().CacheMultiStore()

// save off version store so we can ask it things later
task.VersionStores = vs
ms := cms.SetKVStores(func(k store.StoreKey, kvs sdk.KVStore) store.CacheWrap {
return vs[k]
})
// init version stores by store key
vs := make(map[store.StoreKey]*multiversion.VersionIndexedStore)
for storeKey, mvs := range s.multiVersionStores {
vs[storeKey] = mvs.VersionedIndexedStore(task.Index, task.Incarnation, abortCh)
}

ctx = ctx.WithMultiStore(ms)
// save off version store so we can ask it things later
task.VersionStores = vs
ms := cms.SetKVStores(func(k store.StoreKey, kvs sdk.KVStore) store.CacheWrap {
return vs[k]
})

ctx = ctx.WithMultiStore(ms)
}

task.AbortCh = abortCh
task.Ctx = ctx
Expand Down
77 changes: 52 additions & 25 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"errors"
"fmt"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"testing"

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/types"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
sdk "github.com/cosmos/cosmos-sdk/types"
)
Expand All @@ -31,14 +31,16 @@ func requestList(n int) []types.RequestDeliverTx {
return tasks
}

func initTestCtx() sdk.Context {
func initTestCtx(injectStores bool) sdk.Context {
ctx := sdk.Context{}.WithContext(context.Background())
db := dbm.NewMemDB()
mem := dbadapter.Store{DB: db}
stores := make(map[sdk.StoreKey]sdk.CacheWrapper)
stores[testStoreKey] = cachekv.NewStore(mem, testStoreKey, 1000)
keys := make(map[string]sdk.StoreKey)
keys[testStoreKey.Name()] = testStoreKey
stores := make(map[sdk.StoreKey]sdk.CacheWrapper)
db := dbm.NewMemDB()
if injectStores {
mem := dbadapter.Store{DB: db}
stores[testStoreKey] = cachekv.NewStore(mem, testStoreKey, 1000)
keys[testStoreKey.Name()] = testStoreKey
}
store := cachemulti.NewStore(db, stores, keys, nil, nil, nil)
ctx = ctx.WithMultiStore(&store)
return ctx
Expand All @@ -51,13 +53,16 @@ func TestProcessAll(t *testing.T) {
runs int
requests []types.RequestDeliverTx
deliverTxFunc mockDeliverTxFunc
addStores bool
expectedErr error
assertions func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx)
}{
{
name: "Test for conflicts",
workers: 50,
runs: 25,
requests: requestList(50),
name: "Test every tx accesses same key",
workers: 50,
runs: 25,
addStores: true,
requests: requestList(50),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
Expand All @@ -71,6 +76,38 @@ func TestProcessAll(t *testing.T) {
Info: val,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
if idx == 0 {
require.Equal(t, "", response.Info)
} else {
// the info is what was read from the kv store by the tx
// each tx writes its own index, so the info should be the index of the previous tx
require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info)
}
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, []byte("49"), latest)
},
expectedErr: nil,
},
{
name: "Test no stores on context should not panic",
workers: 50,
runs: 1,
addStores: false,
requests: requestList(50),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
return types.ResponseDeliverTx{
Info: fmt.Sprintf("%d", ctx.TxIndex()),
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
require.Equal(t, fmt.Sprintf("%d", idx), response.Info)
}
},
expectedErr: nil,
},
}
Expand All @@ -79,25 +116,15 @@ func TestProcessAll(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
for i := 0; i < tt.runs; i++ {
s := NewScheduler(tt.workers, tt.deliverTxFunc)
ctx := initTestCtx()
ctx := initTestCtx(tt.addStores)

res, err := s.ProcessAll(ctx, tt.requests)
require.Len(t, res, len(tt.requests))

if !errors.Is(err, tt.expectedErr) {
t.Errorf("Expected error %v, got %v", tt.expectedErr, err)
} else {
require.Len(t, res, len(tt.requests))
for idx, response := range res {
if idx == 0 {
require.Equal(t, "", response.Info)
} else {
// the info is what was read from the kv store by the tx
// each tx writes its own index, so the info should be the index of the previous tx
require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info)
}
}
// confirm last write made it to the parent store
res := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, []byte(fmt.Sprintf("%d", len(tt.requests)-1)), res)
tt.assertions(t, ctx, res)
}
}
})
Expand Down