Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement true batchGetIncrGroupMember RPC method and dependency methods. #2417

Merged
merged 14 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 1 addition & 41 deletions internal/api/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/tools/a2r"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/log"
)

type GroupApi rpcclient.Group
Expand Down Expand Up @@ -148,45 +146,7 @@ func (o *GroupApi) GetIncrementalGroupMember(c *gin.Context) {
}

func (o *GroupApi) GetIncrementalGroupMemberBatch(c *gin.Context) {
type BatchIncrementalReq struct {
UserID string `json:"user_id"`
List []*group.GetIncrementalGroupMemberReq `json:"list"`
}
type BatchIncrementalResp struct {
List map[string]*group.GetIncrementalGroupMemberResp `json:"list"`
}
req, err := a2r.ParseRequestNotCheck[BatchIncrementalReq](c)
if err != nil {
apiresp.GinError(c, err)
return
}
resp := &BatchIncrementalResp{
List: make(map[string]*group.GetIncrementalGroupMemberResp),
}
var (
changeCount int
)
for _, req := range req.List {
if _, ok := resp.List[req.GroupID]; ok {
continue
}
res, err := o.Client.GetIncrementalGroupMember(c, req)
if err != nil {
if len(resp.List) == 0 {
apiresp.GinError(c, err)
} else {
log.ZError(c, "group incr sync versopn", err, "groupID", req.GroupID, "success", len(resp.List))
apiresp.GinSuccess(c, resp)
}
return
}
resp.List[req.GroupID] = res
changeCount += len(res.Insert) + len(res.Delete) + len(res.Update)
if changeCount >= 200 {
break
}
}
apiresp.GinSuccess(c, resp)
a2r.Call(group.GroupClient.BatchGetIncrementalGroupMember, o.Client, c)
}

func (o *GroupApi) GetFullGroupMemberUserIDs(c *gin.Context) {
Expand Down
2 changes: 1 addition & 1 deletion internal/rpc/conversation/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package conversation

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
Expand Down Expand Up @@ -40,7 +41,6 @@ func (c *conversationServer) GetIncrementalConversation(ctx context.Context, req
Find: func(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
return c.getConversations(ctx, req.UserID, conversationIDs)
},
ID: func(elem *conversation.Conversation) string { return elem.GroupID },
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*conversation.Conversation, full bool) *conversation.GetIncrementalConversationResp {
return &conversation.GetIncrementalConversationResp{
VersionID: version.ID.Hex(),
Expand Down
1 change: 0 additions & 1 deletion internal/rpc/friend/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation.
Find: func(ctx context.Context, ids []string) ([]*sdkws.FriendInfo, error) {
return s.getFriend(ctx, req.UserID, ids)
},
ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID },
Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
return &relation.GetIncrementalFriendsResp{
VersionID: version.ID.Hex(),
Expand Down
154 changes: 147 additions & 7 deletions internal/rpc/group/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package group

import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
Expand All @@ -10,13 +11,10 @@ import (
"github.com/openimsdk/protocol/constant"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
)

func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) {
//TODO implement me
panic("implement me")
}

func (s *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetFullGroupMemberUserIDsReq) (*pbgroup.GetFullGroupMemberUserIDsResp, error) {
vl, err := s.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID)
if err != nil {
Expand Down Expand Up @@ -104,7 +102,6 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
Find: func(ctx context.Context, ids []string) ([]*sdkws.GroupMemberFullInfo, error) {
return s.getGroupMembersInfo(ctx, req.GroupID, ids)
},
ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID },
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
return &pbgroup.GetIncrementalGroupMemberResp{
VersionID: version.ID.Hex(),
Expand Down Expand Up @@ -135,6 +132,150 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
return resp, nil
}

func (s *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (resp *pbgroup.BatchGetIncrementalGroupMemberResp, err error) {
type VersionInfo struct {
GroupID string
VersionID string
VersionNumber uint64
}

var groupIDs []string

groupsVersionMap := make(map[string]*VersionInfo)
groupsMap := make(map[string]*model.Group)
hasGroupUpdateMap := make(map[string]bool)
sortVersionMap := make(map[string]uint64)

var targetKeys, versionIDs []string
var versionNumbers []uint64

var requestBodyLen int

for _, group := range req.ReqList {
groupsVersionMap[group.GroupID] = &VersionInfo{
GroupID: group.GroupID,
VersionID: group.VersionID,
VersionNumber: group.Version,
}

groupIDs = append(groupIDs, group.GroupID)
}

groups, err := s.db.FindGroup(ctx, groupIDs)
if err != nil {
return nil, errs.Wrap(err)
}

for _, group := range groups {
if group.Status == constant.GroupStatusDismissed {
err = servererrs.ErrDismissedAlready.Wrap()
log.ZError(ctx, "This group is Dismissed Already", err, "group is", group.GroupID)

delete(groupsVersionMap, group.GroupID)
} else {
groupsMap[group.GroupID] = group
}
}

for groupID, vInfo := range groupsVersionMap {
targetKeys = append(targetKeys, groupID)
versionIDs = append(versionIDs, vInfo.VersionID)
versionNumbers = append(versionNumbers, vInfo.VersionNumber)
}

opt := incrversion.BatchOption[[]*sdkws.GroupMemberFullInfo, pbgroup.BatchGetIncrementalGroupMemberResp]{
Ctx: ctx,
TargetKeys: targetKeys,
VersionIDs: versionIDs,
VersionNumbers: versionNumbers,
Versions: func(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) {
vLogs, err := s.db.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits)
if err != nil {
return nil, errs.Wrap(err)
}

for groupID, vlog := range vLogs {
vlogElems := make([]model.VersionLogElem, 0, len(vlog.Logs))
for i, log := range vlog.Logs {
switch log.EID {
case model.VersionGroupChangeID:
vlog.LogLen--
hasGroupUpdateMap[groupID] = true
case model.VersionSortChangeID:
vlog.LogLen--
sortVersionMap[groupID] = uint64(log.Version)
default:
vlogElems = append(vlogElems, vlog.Logs[i])
}
}
vlog.Logs = vlogElems
if vlog.LogLen > 0 {
hasGroupUpdateMap[groupID] = true
}
}

return vLogs, nil
},
CacheMaxVersions: s.db.BatchFindMaxGroupMemberVersionCache,
Find: func(ctx context.Context, groupID string, ids []string) ([]*sdkws.GroupMemberFullInfo, error) {
memberInfo, err := s.getGroupMembersInfo(ctx, groupID, ids)
if err != nil {
return nil, err
}

return memberInfo, err
},
Resp: func(versions map[string]*model.VersionLog, deleteIdsMap map[string][]string, insertListMap, updateListMap map[string][]*sdkws.GroupMemberFullInfo, fullMap map[string]bool) *pbgroup.BatchGetIncrementalGroupMemberResp {
resList := make(map[string]*pbgroup.GetIncrementalGroupMemberResp)

for groupID, versionLog := range versions {
resList[groupID] = &pbgroup.GetIncrementalGroupMemberResp{
VersionID: versionLog.ID.Hex(),
Version: uint64(versionLog.Version),
Full: fullMap[groupID],
Delete: deleteIdsMap[groupID],
Insert: insertListMap[groupID],
Update: updateListMap[groupID],
SortVersion: sortVersionMap[groupID],
}

requestBodyLen += len(insertListMap[groupID]) + len(updateListMap[groupID]) + len(deleteIdsMap[groupID])
if requestBodyLen > 200 {
break
}
}

return &pbgroup.BatchGetIncrementalGroupMemberResp{
RespList: resList,
}
},
}

resp, err = opt.Build()
if err != nil {
return nil, errs.Wrap(err)
}

for groupID, val := range resp.RespList {
if val.Full || hasGroupUpdateMap[groupID] {
count, err := s.db.FindGroupMemberNum(ctx, groupID)
if err != nil {
return nil, err
}

owner, err := s.db.TakeGroupOwner(ctx, groupID)
if err != nil {
return nil, err
}

resp.RespList[groupID].Group = s.groupDB2PB(groupsMap[groupID], owner.UserID, count)
}
}

return resp, nil

}

func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupReq) (*pbgroup.GetIncrementalJoinGroupResp, error) {
if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil {
return nil, err
Expand All @@ -147,7 +288,6 @@ func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.
Version: s.db.FindJoinIncrVersion,
CacheMaxVersion: s.db.FindMaxJoinGroupVersionCache,
Find: s.getGroupsInfo,
ID: func(elem *sdkws.GroupInfo) string { return elem.GroupID },
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp {
return &pbgroup.GetIncrementalJoinGroupResp{
VersionID: version.ID.Hex(),
Expand Down
Loading
Loading