diff --git a/x/group/internal/orm/indexer.go b/x/group/internal/orm/indexer.go index 67b3f13d2cc1..a2dc8e6fed95 100644 --- a/x/group/internal/orm/indexer.go +++ b/x/group/internal/orm/indexer.go @@ -129,10 +129,8 @@ func uniqueKeysAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID R return sdkerrors.Wrap(errors.ErrORMInvalidArgument, "empty index key") } - it := store.Iterator(PrefixRange(secondaryIndexKeyBytes)) - defer it.Close() - if it.Valid() { - return errors.ErrORMUniqueConstraint + if err := checkUniqueIndexKey(store, secondaryIndexKeyBytes); err != nil { + return err } indexKey, err := buildKeyFromParts([]interface{}{secondaryIndexKey, []byte(rowID)}) @@ -144,6 +142,16 @@ func uniqueKeysAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID R return nil } +// checkUniqueIndexKey checks that the given secondary index key is unique +func checkUniqueIndexKey(store sdk.KVStore, secondaryIndexKeyBytes []byte) error { + it := store.Iterator(PrefixRange(secondaryIndexKeyBytes)) + defer it.Close() + if it.Valid() { + return errors.ErrORMUniqueConstraint + } + return nil +} + // multiKeyAddFunc allows multiple entries for a key func multiKeyAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID RowID) error { secondaryIndexKeyBytes, err := keyPartBytes(secondaryIndexKey, false) diff --git a/x/group/internal/orm/table.go b/x/group/internal/orm/table.go index a25615a965a4..4d6987bd73b2 100644 --- a/x/group/internal/orm/table.go +++ b/x/group/internal/orm/table.go @@ -170,9 +170,7 @@ func (a table) Has(store sdk.KVStore, key RowID) bool { return false } pStore := prefix.NewStore(store, a.prefix[:]) - it := pStore.Iterator(PrefixRange(key)) - defer it.Close() - return it.Valid() + return pStore.Has(key) } // GetOne load the object persisted for the given RowID into the dest parameter. @@ -252,11 +250,9 @@ func (a table) Export(store sdk.KVStore, dest ModelSlicePtr) (uint64, error) { // data should be a slice of structs that implement PrimaryKeyed. func (a table) Import(store sdk.KVStore, data interface{}, _ uint64) error { // Clear all data - pStore := prefix.NewStore(store, a.prefix[:]) - it := pStore.Iterator(nil, nil) - defer it.Close() - for ; it.Valid(); it.Next() { - if err := a.Delete(store, it.Key()); err != nil { + keys := a.keys(store) + for _, key := range keys { + if err := a.Delete(store, key); err != nil { return err } } @@ -282,6 +278,18 @@ func (a table) Import(store sdk.KVStore, data interface{}, _ uint64) error { return nil } +func (a table) keys(store sdk.KVStore) [][]byte { + pStore := prefix.NewStore(store, a.prefix[:]) + it := pStore.Iterator(nil, nil) + defer it.Close() + + var keys [][]byte + for ; it.Valid(); it.Next() { + keys = append(keys, it.Key()) + } + return keys +} + // typeSafeIterator is initialized with a type safe RowGetter only. type typeSafeIterator struct { store sdk.KVStore diff --git a/x/group/internal/orm/types.go b/x/group/internal/orm/types.go index b5ca93b0ee95..a98bcdee64ab 100644 --- a/x/group/internal/orm/types.go +++ b/x/group/internal/orm/types.go @@ -113,12 +113,12 @@ func NewTypeSafeRowGetter(prefixKey [2]byte, model reflect.Type, cdc codec.Codec } pStore := prefix.NewStore(store, prefixKey[:]) - it := pStore.Iterator(PrefixRange(rowID)) - defer it.Close() - if !it.Valid() { + bz := pStore.Get(rowID) + if len(bz) == 0 { return sdkerrors.ErrNotFound } - return cdc.Unmarshal(it.Value(), dest) + + return cdc.Unmarshal(bz, dest) } } diff --git a/x/group/keeper/keeper.go b/x/group/keeper/keeper.go index 93cd875da5a3..de2ecfcc58ab 100644 --- a/x/group/keeper/keeper.go +++ b/x/group/keeper/keeper.go @@ -223,12 +223,12 @@ func (k Keeper) GetGroupSequence(ctx sdk.Context) uint64 { return k.groupTable.Sequence().CurVal(ctx.KVStore(k.key)) } -// iterateProposalsByVPEnd iterates over all proposals whose voting_period_end is after the `endTime` time argument. -func (k Keeper) iterateProposalsByVPEnd(ctx sdk.Context, endTime time.Time, cb func(proposal group.Proposal) (bool, error)) error { +// proposalsByVPEnd returns all proposals whose voting_period_end is after the `endTime` time argument. +func (k Keeper) proposalsByVPEnd(ctx sdk.Context, endTime time.Time) (proposals []group.Proposal, err error) { timeBytes := sdk.FormatTimeBytes(endTime) it, err := k.proposalsByVotingPeriodEnd.PrefixScan(ctx.KVStore(k.key), nil, timeBytes) if err != nil { - return err + return proposals, err } defer it.Close() @@ -248,19 +248,12 @@ func (k Keeper) iterateProposalsByVPEnd(ctx sdk.Context, endTime time.Time, cb f break } if err != nil { - return err - } - - stop, err := cb(proposal) - if err != nil { - return err - } - if stop { - break + return proposals, err } + proposals = append(proposals, proposal) } - return nil + return proposals, nil } // pruneProposal deletes a proposal from state. @@ -279,12 +272,33 @@ func (k Keeper) pruneProposal(ctx sdk.Context, proposalID uint64) error { // abortProposals iterates through all proposals by group policy index // and marks submitted proposals as aborted. func (k Keeper) abortProposals(ctx sdk.Context, groupPolicyAddr sdk.AccAddress) error { - proposalIt, err := k.proposalByGroupPolicyIndex.Get(ctx.KVStore(k.key), groupPolicyAddr.Bytes()) + proposals, err := k.proposalsByGroupPolicy(ctx, groupPolicyAddr) if err != nil { return err } + + for _, proposalInfo := range proposals { + // Mark all proposals still in the voting phase as aborted. + if proposalInfo.Status == group.PROPOSAL_STATUS_SUBMITTED { + proposalInfo.Status = group.PROPOSAL_STATUS_ABORTED + + if err := k.proposalTable.Update(ctx.KVStore(k.key), proposalInfo.Id, &proposalInfo); err != nil { + return err + } + } + } + return nil +} + +// proposalsByGroupPolicy returns all proposals for a given group policy. +func (k Keeper) proposalsByGroupPolicy(ctx sdk.Context, groupPolicyAddr sdk.AccAddress) ([]group.Proposal, error) { + proposalIt, err := k.proposalByGroupPolicyIndex.Get(ctx.KVStore(k.key), groupPolicyAddr.Bytes()) + if err != nil { + return nil, err + } defer proposalIt.Close() + var proposals []group.Proposal for { var proposalInfo group.Proposal _, err = proposalIt.LoadNext(&proposalInfo) @@ -292,30 +306,40 @@ func (k Keeper) abortProposals(ctx sdk.Context, groupPolicyAddr sdk.AccAddress) break } if err != nil { - return err + return proposals, err } - // Mark all proposals still in the voting phase as aborted. - if proposalInfo.Status == group.PROPOSAL_STATUS_SUBMITTED { - proposalInfo.Status = group.PROPOSAL_STATUS_ABORTED - - if err := k.proposalTable.Update(ctx.KVStore(k.key), proposalInfo.Id, &proposalInfo); err != nil { - return err - } - } + proposals = append(proposals, proposalInfo) } - return nil + return proposals, nil } // pruneVotes prunes all votes for a proposal from state. func (k Keeper) pruneVotes(ctx sdk.Context, proposalID uint64) error { - store := ctx.KVStore(k.key) - it, err := k.voteByProposalIndex.Get(store, proposalID) + votes, err := k.votesByProposal(ctx, proposalID) if err != nil { return err } + + for _, v := range votes { + err = k.voteTable.Delete(ctx.KVStore(k.key), &v) + if err != nil { + return err + } + } + + return nil +} + +// votesByProposal returns all votes for a given proposal. +func (k Keeper) votesByProposal(ctx sdk.Context, proposalID uint64) ([]group.Vote, error) { + it, err := k.voteByProposalIndex.Get(ctx.KVStore(k.key), proposalID) + if err != nil { + return nil, err + } defer it.Close() + var votes []group.Vote for { var vote group.Vote _, err = it.LoadNext(&vote) @@ -323,32 +347,26 @@ func (k Keeper) pruneVotes(ctx sdk.Context, proposalID uint64) error { break } if err != nil { - return err - } - - err = k.voteTable.Delete(store, &vote) - if err != nil { - return err + return votes, err } + votes = append(votes, vote) } - - return nil + return votes, nil } // PruneProposals prunes all proposals that are expired, i.e. whose // `voting_period + max_execution_period` is greater than the current block // time. func (k Keeper) PruneProposals(ctx sdk.Context) error { - err := k.iterateProposalsByVPEnd(ctx, ctx.BlockTime().Add(-k.config.MaxExecutionPeriod), func(proposal group.Proposal) (bool, error) { + proposals, err := k.proposalsByVPEnd(ctx, ctx.BlockTime().Add(-k.config.MaxExecutionPeriod)) + if err != nil { + return nil + } + for _, proposal := range proposals { err := k.pruneProposal(ctx, proposal.Id) if err != nil { - return true, err + return err } - - return false, nil - }) - if err != nil { - return err } return nil @@ -358,36 +376,39 @@ func (k Keeper) PruneProposals(ctx sdk.Context) error { // has ended, tallies their votes, prunes them, and updates the proposal's // `FinalTallyResult` field. func (k Keeper) TallyProposalsAtVPEnd(ctx sdk.Context) error { - return k.iterateProposalsByVPEnd(ctx, ctx.BlockTime(), func(proposal group.Proposal) (bool, error) { + proposals, err := k.proposalsByVPEnd(ctx, ctx.BlockTime()) + if err != nil { + return nil + } + for _, proposal := range proposals { policyInfo, err := k.getGroupPolicyInfo(ctx, proposal.GroupPolicyAddress) if err != nil { - return true, sdkerrors.Wrap(err, "group policy") + return sdkerrors.Wrap(err, "group policy") } electorate, err := k.getGroupInfo(ctx, policyInfo.GroupId) if err != nil { - return true, sdkerrors.Wrap(err, "group") + return sdkerrors.Wrap(err, "group") } - proposalId := proposal.Id if proposal.Status == group.PROPOSAL_STATUS_ABORTED || proposal.Status == group.PROPOSAL_STATUS_WITHDRAWN { - if err := k.pruneProposal(ctx, proposalId); err != nil { - return true, err + proposalID := proposal.Id + if err := k.pruneProposal(ctx, proposalID); err != nil { + return err } - if err := k.pruneVotes(ctx, proposalId); err != nil { - return true, err + if err := k.pruneVotes(ctx, proposalID); err != nil { + return err } } else { err = k.doTallyAndUpdate(ctx, &proposal, electorate, policyInfo) if err != nil { - return true, sdkerrors.Wrap(err, "doTallyAndUpdate") + return sdkerrors.Wrap(err, "doTallyAndUpdate") } if err := k.proposalTable.Update(ctx.KVStore(k.key), proposal.Id, &proposal); err != nil { - return true, sdkerrors.Wrap(err, "proposal update") + return sdkerrors.Wrap(err, "proposal update") } } - - return false, nil - }) + } + return nil } diff --git a/x/group/keeper/keeper_test.go b/x/group/keeper/keeper_test.go index 84ee34bcdf81..92e640906019 100644 --- a/x/group/keeper/keeper_test.go +++ b/x/group/keeper/keeper_test.go @@ -85,6 +85,40 @@ func TestKeeperTestSuite(t *testing.T) { suite.Run(t, new(TestSuite)) } +// Testing a deadlock issue when querying group members +// https://github.com/cosmos/cosmos-sdk/issues/12111 +func (s *TestSuite) TestCreateGroupWithLotsOfMembers() { + for i := 50; i < 70; i++ { + membersResp := s.createGroupAndGetMembers(i) + s.Require().Equal(len(membersResp), i) + } +} + +func (s *TestSuite) createGroupAndGetMembers(numMembers int) []*group.GroupMember { + addressPool := simapp.AddTestAddrsIncremental(s.app, s.sdkCtx, numMembers, sdk.NewInt(30000000)) + members := make([]group.MemberRequest, numMembers) + for i := 0; i < len(members); i++ { + members[i] = group.MemberRequest{ + Address: addressPool[i].String(), + Weight: "1", + } + } + + g, err := s.keeper.CreateGroup(s.ctx, &group.MsgCreateGroup{ + Admin: members[0].Address, + Members: members, + }) + s.Require().NoErrorf(err, "failed to create group with %d members", len(members)) + s.T().Logf("group %d created with %d members", g.GroupId, len(members)) + + groupMemberResp, err := s.keeper.GroupMembers(s.ctx, &group.QueryGroupMembersRequest{GroupId: g.GroupId}) + s.Require().NoError(err) + + s.T().Logf("got %d members from group %d", len(groupMemberResp.Members), g.GroupId) + + return groupMemberResp.Members +} + func (s *TestSuite) TestCreateGroup() { addrs := s.addrs addr1 := addrs[0]