Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak due to emitting infinite events during snapshot #391

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions store/cachekv/mergeiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,24 @@ import (
//
// TODO: Optimize by memoizing.
type cacheMergeIterator struct {
parent types.Iterator
cache types.Iterator
ascending bool
eventManager *sdktypes.EventManager
storeKey sdktypes.StoreKey
parent types.Iterator
cache types.Iterator
ascending bool
storeKey sdktypes.StoreKey
}

var _ types.Iterator = (*cacheMergeIterator)(nil)

func NewCacheMergeIterator(
parent, cache types.Iterator,
ascending bool,
eventManager *sdktypes.EventManager,
storeKey sdktypes.StoreKey,
) *cacheMergeIterator {
iter := &cacheMergeIterator{
parent: parent,
cache: cache,
ascending: ascending,
eventManager: eventManager,
storeKey: storeKey,
parent: parent,
cache: cache,
ascending: ascending,
storeKey: storeKey,
}

return iter
Expand Down Expand Up @@ -138,14 +135,12 @@ func (iter *cacheMergeIterator) Value() []byte {
// If parent is invalid, get the cache value.
if !iter.parent.Valid() {
value := iter.cache.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, iter.cache.Key(), value)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These events are causing memory leak during snapshot creation, it should be safe to delete since those events are not being used in critical path at all

return value
}

// If cache is invalid, get the parent value.
if !iter.cache.Valid() {
value := iter.parent.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, iter.parent.Key(), value)
return value
}

Expand All @@ -156,11 +151,9 @@ func (iter *cacheMergeIterator) Value() []byte {
switch cmp {
case -1: // parent < cache
value := iter.parent.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, keyP, value)
return value
case 0, 1: // parent >= cache
value := iter.cache.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, keyC, value)
return value
default:
panic("invalid comparison result")
Expand Down
33 changes: 8 additions & 25 deletions store/cachekv/mergeiterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import (
"testing"

"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/store/dbadapter"
)

func TestEmitEventMangerInIterator(t *testing.T) {
func TestMangerIterator(t *testing.T) {
// initiate mock kvstore
mem := dbadapter.Store{DB: dbm.NewMemDB()}
kvstore := cachekv.NewStore(mem, types.NewKVStoreKey("CacheKvTest"), types.DefaultCacheSizeLimit)
Expand All @@ -25,32 +23,17 @@ func TestEmitEventMangerInIterator(t *testing.T) {
}

// initialize mock mergeIterator
eventManager := sdktypes.NewEventManager()
parent := kvstore.Iterator(keys[0], keys[2])
cache := kvstore.Iterator(nil, nil)
for ; cache.Valid(); cache.Next() {
}
iter := cachekv.NewCacheMergeIterator(parent, cache, true, eventManager, types.NewKVStoreKey("CacheKvTest"))

// get the next value
iter.Value()
iter := cachekv.NewCacheMergeIterator(parent, cache, true, types.NewKVStoreKey("CacheKvTest"))

// assert the resource access is still emitted correctly when the cache store is unavailable
require.Equal(t, "access_type", string(eventManager.Events()[0].Attributes[0].Key))
require.Equal(t, "read", string(eventManager.Events()[0].Attributes[0].Value))
require.Equal(t, "store_key", string(eventManager.Events()[0].Attributes[1].Key))
require.Equal(t, "CacheKvTest", string(eventManager.Events()[0].Attributes[1].Value))
// get the next value and it should not be nil
nextValue := iter.Value()
require.NotNil(t, nextValue)

// assert event emission when cache is available
cache = kvstore.Iterator(keys[1], keys[2])
iter = cachekv.NewCacheMergeIterator(parent, cache, true, eventManager, types.NewKVStoreKey("CacheKvTest"))

// get the next value
iter.Value()

// assert the resource access is still emitted correctly when the cache store is available
require.Equal(t, "access_type", string(eventManager.Events()[0].Attributes[0].Key))
require.Equal(t, "read", string(eventManager.Events()[0].Attributes[0].Value))
require.Equal(t, "store_key", string(eventManager.Events()[0].Attributes[1].Key))
require.Equal(t, "CacheKvTest", string(eventManager.Events()[0].Attributes[1].Value))
nextValue = iter.Value()
require.NotNil(t, nextValue)
}
2 changes: 1 addition & 1 deletion store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
}()
store.dirtyItems(start, end)
cache = newMemIterator(start, end, store.sortedCache, store.deleted, ascending, store.eventManager, store.storeKey)
return NewCacheMergeIterator(parent, cache, ascending, store.eventManager, store.storeKey)
return NewCacheMergeIterator(parent, cache, ascending, store.storeKey)
}

func findStartIndex(strL []string, startQ string) int {
Expand Down
Loading