From 1b56405fe87a62dcbf129598633a2764670686a8 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 26 Jul 2024 15:18:40 +0800 Subject: [PATCH 01/20] update protocol in go mod. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index fe3f8afbc2..71301d2909 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.41 + github.com/openimsdk/protocol v0.0.69-alpha.42 github.com/openimsdk/tools v0.0.49-alpha.55 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index e59418cd80..53060b1984 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.41 h1:9hoQ6UHMBq+g58KXir90EpnnvwJ1bvDPixPSaODo4nY= -github.com/openimsdk/protocol v0.0.69-alpha.41/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.42 h1:Vwuru2NtyTHuqaM+1JGxcoGvP25QWjS92oI0zGJp+lM= +github.com/openimsdk/protocol v0.0.69-alpha.42/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= From 1e0d74f911f55674f58a05309122a327beb01bd6 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 26 Jul 2024 15:19:08 +0800 Subject: [PATCH 02/20] add debug log in writePongMsg. --- internal/msggateway/client.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 889e5c456e..d3836fd409 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -107,7 +107,6 @@ func (c *Client) pingHandler(appData string) error { } log.ZDebug(c.ctx, "ping Handler Success.", "appData", appData) - return c.writePongMsg(appData) } @@ -392,17 +391,24 @@ func (c *Client) writePingMsg() error { } func (c *Client) writePongMsg(appData string) error { + log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData) if c.closed.Load() { return nil } + log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData) c.w.Lock() defer c.w.Unlock() + log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData) err := c.conn.SetWriteDeadline(writeWait) if err != nil { - return err + return errs.Wrap(err) + } + err = c.conn.WriteMessage(PongMessage, []byte(appData)) + if err != nil { + log.ZError(c.ctx, "Write Message is error", errs.Wrap(err), "Pong msg", PongMessage) } - return c.conn.WriteMessage(PongMessage, []byte(appData)) + return errs.Wrap(err) } From 824392e18c7db8c7397f5a58d6d51ca2104f8d48 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 26 Jul 2024 15:21:25 +0800 Subject: [PATCH 03/20] update log level. --- internal/msggateway/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index d3836fd409..ded830c43a 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -407,7 +407,7 @@ func (c *Client) writePongMsg(appData string) error { } err = c.conn.WriteMessage(PongMessage, []byte(appData)) if err != nil { - log.ZError(c.ctx, "Write Message is error", errs.Wrap(err), "Pong msg", PongMessage) + log.ZWarn(c.ctx, "Write Message have error", errs.Wrap(err), "Pong msg", PongMessage) } return errs.Wrap(err) From 8a8b99a8bd438fe7f5c8da7531e30089a685761a Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Sun, 28 Jul 2024 10:06:38 +0800 Subject: [PATCH 04/20] add Warn log in writePongMsg. --- internal/msggateway/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index ded830c43a..ea52488385 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -393,6 +393,7 @@ func (c *Client) writePingMsg() error { func (c *Client) writePongMsg(appData string) error { log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData) if c.closed.Load() { + log.ZWarn(c.ctx, "is closed in server", nil, "appdata", appData) return nil } @@ -403,6 +404,7 @@ func (c *Client) writePongMsg(appData string) error { log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData) err := c.conn.SetWriteDeadline(writeWait) if err != nil { + log.ZWarn(c.ctx, "SetWriteDeadline in Server have error", errs.Wrap(err), "writeWait", writeWait, "appData", appData) return errs.Wrap(err) } err = c.conn.WriteMessage(PongMessage, []byte(appData)) From a230f0e98081c3170caefa5f3f6a268487819156 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Sun, 28 Jul 2024 10:57:17 +0800 Subject: [PATCH 05/20] add debug log. --- internal/msggateway/client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index ea52488385..9fd71d9893 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -393,7 +393,7 @@ func (c *Client) writePingMsg() error { func (c *Client) writePongMsg(appData string) error { log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData) if c.closed.Load() { - log.ZWarn(c.ctx, "is closed in server", nil, "appdata", appData) + log.ZWarn(c.ctx, "is closed in server", nil, "appdata", appData, "closed err", c.closedErr) return nil } @@ -412,5 +412,6 @@ func (c *Client) writePongMsg(appData string) error { log.ZWarn(c.ctx, "Write Message have error", errs.Wrap(err), "Pong msg", PongMessage) } + log.ZDebug(c.ctx, "write message is success", "appdata", appData, "closed err", c.closedErr) return errs.Wrap(err) } From da08d5561b54efd529ba6feb19c8c3a6c74c4bcc Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 30 Jul 2024 15:42:11 +0800 Subject: [PATCH 06/20] feat: update webhookBeforeMemberJoinGroup to batch method. --- internal/msggateway/client.go | 3 -- internal/rpc/group/callback.go | 39 ++++++++++++++- internal/rpc/group/group.go | 35 +++++++------ pkg/callbackstruct/constant.go | 89 +++++++++++++++++----------------- pkg/callbackstruct/group.go | 24 +++++++++ 5 files changed, 124 insertions(+), 66 deletions(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 9fd71d9893..a4902570a6 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -397,11 +397,9 @@ func (c *Client) writePongMsg(appData string) error { return nil } - log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData) c.w.Lock() defer c.w.Unlock() - log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData) err := c.conn.SetWriteDeadline(writeWait) if err != nil { log.ZWarn(c.ctx, "SetWriteDeadline in Server have error", errs.Wrap(err), "writeWait", writeWait, "appData", appData) @@ -412,6 +410,5 @@ func (c *Client) writePongMsg(appData string) error { log.ZWarn(c.ctx, "Write Message have error", errs.Wrap(err), "Pong msg", PongMessage) } - log.ZDebug(c.ctx, "write message is success", "appdata", appData, "closed err", c.closedErr) return errs.Wrap(err) } diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index f31c4587c5..bc4d999f2a 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -16,6 +16,8 @@ package group import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -27,7 +29,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - "time" ) // CallbackBeforeCreateGroup callback before create group. @@ -125,6 +126,42 @@ func (s *groupServer) webhookBeforeMemberJoinGroup(ctx context.Context, before * }) } +func (s *groupServer) webhookBeforeMemberJoinGroupBatch(ctx context.Context, before *config.BeforeConfig, groupMembers []*model.GroupMember, groupEx string) error { + return webhook.WithCondition(ctx, before, func(ctx context.Context) error { + groupMembersMap := datautil.SliceToMap(groupMembers, func(e *model.GroupMember) string { + return e.UserID + }) + var groupMembersCallback []*callbackstruct.CallbackGroupMember + + for _, member := range groupMembers { + groupMembersCallback = append(groupMembersCallback, &callbackstruct.CallbackGroupMember{ + UserID: member.UserID, + Ex: member.Ex, + GroupEx: groupEx, + }) + } + + cbReq := &callbackstruct.CallbackBeforeMemberJoinGroupBatchReq{ + CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupBatchCommand, + MembersList: groupMembersCallback, + } + resp := &callbackstruct.CallbackBeforeMemberJoinGroupBatchResp{} + + if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { + return err + } + + for _, memberCallbackResp := range resp.MemberCallbacks { + datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].FaceURL, memberCallbackResp.FaceURL) + datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Ex, memberCallbackResp.Ex) + datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Nickname, memberCallbackResp.Nickname) + datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].RoleLevel, memberCallbackResp.RoleLevel) + } + + return nil + }) +} + func (s *groupServer) webhookBeforeSetGroupMemberInfo(ctx context.Context, before *config.BeforeConfig, req *group.SetGroupMemberInfo) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { cbReq := callbackstruct.CallbackBeforeSetGroupMemberInfoReq{ diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 6790d09581..d16049d709 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -246,7 +246,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR return nil, err } - joinGroup := func(userID string, roleLevel int32) error { + joinGroupFunc := func(userID string, roleLevel int32) { groupMember := &model.GroupMember{ GroupID: group.GroupID, UserID: userID, @@ -258,25 +258,23 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR MuteEndTime: time.UnixMilli(0), } - if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { - return err - } groupMembers = append(groupMembers, groupMember) - return nil - } - if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil { - return nil, err } + + joinGroupFunc(req.OwnerUserID, constant.GroupOwner) + for _, userID := range req.AdminUserIDs { - if err := joinGroup(userID, constant.GroupAdmin); err != nil { - return nil, err - } + joinGroupFunc(userID, constant.GroupAdmin) } + for _, userID := range req.MemberUserIDs { - if err := joinGroup(userID, constant.GroupOrdinaryUsers); err != nil { - return nil, err - } + joinGroupFunc(userID, constant.GroupOrdinaryUsers) } + + if err := s.webhookBeforeMemberJoinGroupBatch(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + return nil, err + } + if err := s.db.CreateGroup(ctx, []*model.Group{group}, groupMembers); err != nil { return nil, err } @@ -442,12 +440,13 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite MuteEndTime: time.UnixMilli(0), } - if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, member, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { - return nil, err - } groupMembers = append(groupMembers, member) + } + if err := s.webhookBeforeMemberJoinGroupBatch(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + return nil, err } + if err := s.db.CreateGroup(ctx, nil, groupMembers); err != nil { return nil, err } @@ -811,7 +810,6 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup MuteEndTime: time.Unix(0, 0), InviterUserID: groupRequest.InviterUserID, OperatorUserID: mcontext.GetOpUserID(ctx), - Ex: groupRequest.Ex, } if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, member, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err @@ -898,6 +896,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) return &pbgroup.JoinGroupResp{}, nil } + groupRequest := model.GroupRequest{ UserID: req.InviterUserID, ReqMsg: req.ReqMessage, diff --git a/pkg/callbackstruct/constant.go b/pkg/callbackstruct/constant.go index 66e1598cda..6808a665da 100644 --- a/pkg/callbackstruct/constant.go +++ b/pkg/callbackstruct/constant.go @@ -15,48 +15,49 @@ package callbackstruct const ( - CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand" - CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand" - CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand" - CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand" - CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand" - CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand" - CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand" - CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand" - CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand" - CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand" - CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand" - CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand" - CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand" - CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand" - CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand" - CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand" - CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand" - CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand" - CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand" - CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand" - CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand" - CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand" - CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand" - CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand" - CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand" - CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand" - CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand" - CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" - CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" - CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" - CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" - CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand" - CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand" - CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand" - CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand" - CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand" - CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand" - CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand" - CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand" - CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand" - CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand" - CallbackBeforeMemberJoinGroupCommand = "callbackBeforeMemberJoinGroupCommand" - CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand" - CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand" + CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand" + CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand" + CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand" + CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand" + CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand" + CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand" + CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand" + CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand" + CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand" + CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand" + CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand" + CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand" + CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand" + CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand" + CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand" + CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand" + CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand" + CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand" + CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand" + CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand" + CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand" + CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand" + CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand" + CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand" + CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand" + CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand" + CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand" + CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" + CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" + CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" + CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" + CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand" + CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand" + CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand" + CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand" + CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand" + CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand" + CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand" + CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand" + CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand" + CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand" + CallbackBeforeMemberJoinGroupCommand = "callbackBeforeMemberJoinGroupCommand" + CallbackBeforeMemberJoinGroupBatchCommand = "callbackBeforeMemberJoinGroupBatchCommand" + CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand" + CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand" ) diff --git a/pkg/callbackstruct/group.go b/pkg/callbackstruct/group.go index e78d45ab4c..9db8ea53be 100644 --- a/pkg/callbackstruct/group.go +++ b/pkg/callbackstruct/group.go @@ -67,6 +67,30 @@ type CallbackBeforeMemberJoinGroupReq struct { GroupEx string `json:"groupEx"` } +type CallbackGroupMember struct { + UserID string `json:"userID"` + Ex string `json:"ex"` + GroupEx string `json:"groupEx"` +} + +type CallbackBeforeMemberJoinGroupBatchReq struct { + CallbackCommand `json:"callbackCommand"` + MembersList []*CallbackGroupMember +} + +type MemberJoinGroupCallBack struct { + UserID *string `json:"userID"` + Nickname *string `json:"nickname"` + FaceURL *string `json:"faceURL"` + RoleLevel *int32 `json:"roleLevel"` + MuteEndTime *int64 `json:"muteEndTime"` + Ex *string `json:"ex"` +} +type CallbackBeforeMemberJoinGroupBatchResp struct { + CommonCallbackResp + MemberCallbacks []*MemberJoinGroupCallBack +} + type CallbackBeforeMemberJoinGroupResp struct { CommonCallbackResp Nickname *string `json:"nickname"` From ba170badc3500a0e2ecb342b8c42be67003cd527 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 30 Jul 2024 16:19:55 +0800 Subject: [PATCH 07/20] feat: update version field implement. --- pkg/common/cmd/api.go | 5 +++-- pkg/common/cmd/auth.go | 5 +++-- pkg/common/cmd/conversation.go | 5 +++-- pkg/common/cmd/cron_task.go | 5 +++-- pkg/common/cmd/friend.go | 5 +++-- pkg/common/cmd/group.go | 5 +++-- pkg/common/cmd/msg.go | 5 +++-- pkg/common/cmd/msg_gateway.go | 4 ++-- pkg/common/cmd/msg_transfer.go | 5 +++-- pkg/common/cmd/push.go | 5 +++-- pkg/common/cmd/root.go | 5 +++-- pkg/common/cmd/third.go | 5 +++-- pkg/common/cmd/user.go | 5 +++-- pkg/common/config/parse.go | 4 ---- pkg/common/config/version | 1 - version/version | 1 + version/version.go | 6 ++++++ 17 files changed, 45 insertions(+), 31 deletions(-) delete mode 100644 pkg/common/config/version create mode 100644 version/version create mode 100644 version/version.go diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index ecdb0dd3ad..4088ecd09d 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -16,8 +16,9 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/api" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -38,7 +39,7 @@ func NewApiCmd() *ApiCmd { DiscoveryConfigFilename: &apiConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index 7d75a7da65..b35a95f395 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -16,9 +16,10 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/auth" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -40,7 +41,7 @@ func NewAuthRpcCmd() *AuthRpcCmd { DiscoveryConfigFilename: &authConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 57ffa52bc4..bdb4447f48 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -16,9 +16,10 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/conversation" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -43,7 +44,7 @@ func NewConversationRpcCmd() *ConversationRpcCmd { DiscoveryConfigFilename: &conversationConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index fd44475246..d6c5e472e1 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -16,8 +16,9 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/tools" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -38,7 +39,7 @@ func NewCronTaskCmd() *CronTaskCmd { DiscoveryConfigFilename: &cronTaskConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index 8be1f77458..1bc9e6c543 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -16,9 +16,10 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/friend" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -44,7 +45,7 @@ func NewFriendRpcCmd() *FriendRpcCmd { DiscoveryConfigFilename: &friendConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 20124be957..9b0fbf8de3 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -16,10 +16,11 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/group" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -45,7 +46,7 @@ func NewGroupRpcCmd() *GroupRpcCmd { DiscoveryConfigFilename: &groupConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index 91f7931fbe..bfd29398ef 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -16,9 +16,10 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/msg" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -45,7 +46,7 @@ func NewMsgRpcCmd() *MsgRpcCmd { DiscoveryConfigFilename: &msgConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 29d3fba330..6363bfbf9e 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -16,9 +16,9 @@ package cmd import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/internal/msggateway" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" @@ -42,7 +42,7 @@ func NewMsgGatewayCmd() *MsgGatewayCmd { DiscoveryConfigFilename: &msgGatewayConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 0d48281e50..3643934135 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -16,8 +16,9 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -42,7 +43,7 @@ func NewMsgTransferCmd() *MsgTransferCmd { DiscoveryConfigFilename: &msgTransferConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 6e6014021e..c9b8b1c245 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -16,9 +16,10 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/push" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -45,7 +46,7 @@ func NewPushRpcCmd() *PushRpcCmd { DiscoveryConfigFilename: &pushConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { ret.pushConfig.FcmConfigPath = ret.ConfigPath() return ret.runE() diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 84e9856973..b43f86557f 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -19,6 +19,7 @@ import ( "path/filepath" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/spf13/cobra" @@ -138,13 +139,13 @@ func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error { r.log.StorageLocation, r.log.RemainRotationCount, r.log.RotationTime, - config.Version, + version.Version, r.log.IsSimplify, ) if err != nil { return errs.Wrap(err) } - return errs.Wrap(log.InitConsoleLogger(r.processName, r.log.RemainLogLevel, r.log.IsJson, config.Version)) + return errs.Wrap(log.InitConsoleLogger(r.processName, r.log.RemainLogLevel, r.log.IsJson, version.Version)) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index b6731f1ff4..a301b738fa 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -16,9 +16,10 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/third" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -44,7 +45,7 @@ func NewThirdRpcCmd() *ThirdRpcCmd { DiscoveryConfigFilename: &thirdConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 674f9e3a6d..9a614afcab 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -16,9 +16,10 @@ package cmd import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/user" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" + "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" ) @@ -45,7 +46,7 @@ func NewUserRpcCmd() *UserRpcCmd { DiscoveryConfigFilename: &userConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) - ret.ctx = context.WithValue(context.Background(), "version", config.Version) + ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { return ret.runE() } diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index 28e9f5db60..08f82ac7d3 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -15,7 +15,6 @@ package config import ( - _ "embed" "os" "path/filepath" @@ -26,9 +25,6 @@ import ( "gopkg.in/yaml.v3" ) -//go:embed version -var Version string - const ( FileName = "config.yaml" NotificationFileName = "notification.yaml" diff --git a/pkg/common/config/version b/pkg/common/config/version deleted file mode 100644 index 240bba9069..0000000000 --- a/pkg/common/config/version +++ /dev/null @@ -1 +0,0 @@ -3.7.0 \ No newline at end of file diff --git a/version/version b/version/version new file mode 100644 index 0000000000..0be1fc7d24 --- /dev/null +++ b/version/version @@ -0,0 +1 @@ +3.8.0 \ No newline at end of file diff --git a/version/version.go b/version/version.go new file mode 100644 index 0000000000..23b3a82f53 --- /dev/null +++ b/version/version.go @@ -0,0 +1,6 @@ +package version + +import _ "embed" + +//go:embed version +var Version string From a38611009adf54a2065e3687560e4ac5bde5d02b Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 30 Jul 2024 16:52:09 +0800 Subject: [PATCH 08/20] update webhook implement contents. --- internal/rpc/group/callback.go | 62 +++++++++++++--------------------- internal/rpc/group/group.go | 9 ++--- pkg/callbackstruct/group.go | 28 ++++----------- 3 files changed, 34 insertions(+), 65 deletions(-) diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index bc4d999f2a..a685678955 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -101,32 +101,7 @@ func (s *groupServer) webhookAfterCreateGroup(ctx context.Context, after *config s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupResp{}, after) } -func (s *groupServer) webhookBeforeMemberJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMember *model.GroupMember, groupEx string) error { - return webhook.WithCondition(ctx, before, func(ctx context.Context) error { - cbReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{ - CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupCommand, - GroupID: groupMember.GroupID, - UserID: groupMember.UserID, - Ex: groupMember.Ex, - GroupEx: groupEx, - } - resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{} - if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { - return err - } - - if resp.MuteEndTime != nil { - groupMember.MuteEndTime = time.UnixMilli(*resp.MuteEndTime) - } - datautil.NotNilReplace(&groupMember.FaceURL, resp.FaceURL) - datautil.NotNilReplace(&groupMember.Ex, resp.Ex) - datautil.NotNilReplace(&groupMember.Nickname, resp.Nickname) - datautil.NotNilReplace(&groupMember.RoleLevel, resp.RoleLevel) - return nil - }) -} - -func (s *groupServer) webhookBeforeMemberJoinGroupBatch(ctx context.Context, before *config.BeforeConfig, groupMembers []*model.GroupMember, groupEx string) error { +func (s *groupServer) webhookBeforeMembersJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMembers []*model.GroupMember, groupEx string) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { groupMembersMap := datautil.SliceToMap(groupMembers, func(e *model.GroupMember) string { return e.UserID @@ -135,27 +110,33 @@ func (s *groupServer) webhookBeforeMemberJoinGroupBatch(ctx context.Context, bef for _, member := range groupMembers { groupMembersCallback = append(groupMembersCallback, &callbackstruct.CallbackGroupMember{ - UserID: member.UserID, - Ex: member.Ex, - GroupEx: groupEx, + UserID: member.UserID, + Ex: member.Ex, }) } - cbReq := &callbackstruct.CallbackBeforeMemberJoinGroupBatchReq{ + cbReq := &callbackstruct.CallbackBeforeMembersJoinGroupReq{ CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupBatchCommand, MembersList: groupMembersCallback, + GroupEx: groupEx, } - resp := &callbackstruct.CallbackBeforeMemberJoinGroupBatchResp{} + resp := &callbackstruct.CallbackBeforeMembersJoinGroupResp{} if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { return err } for _, memberCallbackResp := range resp.MemberCallbacks { - datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].FaceURL, memberCallbackResp.FaceURL) - datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Ex, memberCallbackResp.Ex) - datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Nickname, memberCallbackResp.Nickname) - datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].RoleLevel, memberCallbackResp.RoleLevel) + if _, ok := groupMembersMap[(*memberCallbackResp.UserID)]; ok { + if memberCallbackResp.MuteEndTime != nil { + groupMembersMap[(*memberCallbackResp.UserID)].MuteEndTime = time.UnixMilli(*memberCallbackResp.MuteEndTime) + } + + datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].FaceURL, memberCallbackResp.FaceURL) + datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Ex, memberCallbackResp.Ex) + datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Nickname, memberCallbackResp.Nickname) + datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].RoleLevel, memberCallbackResp.RoleLevel) + } } return nil @@ -281,10 +262,13 @@ func (s *groupServer) webhookBeforeInviteUserToGroup(ctx context.Context, before return err } - if len(resp.RefusedMembersAccount) > 0 { - // Handle the scenario where certain members are refused - // You might want to update the req.Members list or handle it as per your business logic - } + // Handle the scenario where certain members are refused + // You might want to update the req.Members list or handle it as per your business logic + + // if len(resp.RefusedMembersAccount) > 0 { + // implement members are refused + // } + return nil }) } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index d16049d709..ead9957c95 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -271,7 +271,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR joinGroupFunc(userID, constant.GroupOrdinaryUsers) } - if err := s.webhookBeforeMemberJoinGroupBatch(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } @@ -443,7 +443,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite groupMembers = append(groupMembers, member) } - if err := s.webhookBeforeMemberJoinGroupBatch(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } @@ -811,7 +811,8 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup InviterUserID: groupRequest.InviterUserID, OperatorUserID: mcontext.GetOpUserID(ctx), } - if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, member, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + + if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{member}, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } } @@ -880,7 +881,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) MuteEndTime: time.UnixMilli(0), } - if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{groupMember}, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } diff --git a/pkg/callbackstruct/group.go b/pkg/callbackstruct/group.go index 9db8ea53be..35d76ea449 100644 --- a/pkg/callbackstruct/group.go +++ b/pkg/callbackstruct/group.go @@ -59,23 +59,15 @@ type CallbackAfterCreateGroupResp struct { CommonCallbackResp } -type CallbackBeforeMemberJoinGroupReq struct { - CallbackCommand `json:"callbackCommand"` - GroupID string `json:"groupID"` - UserID string `json:"userID"` - Ex string `json:"ex"` - GroupEx string `json:"groupEx"` -} - type CallbackGroupMember struct { - UserID string `json:"userID"` - Ex string `json:"ex"` - GroupEx string `json:"groupEx"` + UserID string `json:"userID"` + Ex string `json:"ex"` } -type CallbackBeforeMemberJoinGroupBatchReq struct { +type CallbackBeforeMembersJoinGroupReq struct { CallbackCommand `json:"callbackCommand"` MembersList []*CallbackGroupMember + GroupEx string `json:"groupEx"` } type MemberJoinGroupCallBack struct { @@ -86,18 +78,10 @@ type MemberJoinGroupCallBack struct { MuteEndTime *int64 `json:"muteEndTime"` Ex *string `json:"ex"` } -type CallbackBeforeMemberJoinGroupBatchResp struct { - CommonCallbackResp - MemberCallbacks []*MemberJoinGroupCallBack -} -type CallbackBeforeMemberJoinGroupResp struct { +type CallbackBeforeMembersJoinGroupResp struct { CommonCallbackResp - Nickname *string `json:"nickname"` - FaceURL *string `json:"faceURL"` - RoleLevel *int32 `json:"roleLevel"` - MuteEndTime *int64 `json:"muteEndTime"` - Ex *string `json:"ex"` + MemberCallbacks []*MemberJoinGroupCallBack } type CallbackBeforeSetGroupMemberInfoReq struct { From ab268c5bcfb0e9dacb569da105c24e47a343609f Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 30 Jul 2024 16:57:51 +0800 Subject: [PATCH 09/20] update method field and contents. --- internal/rpc/group/callback.go | 3 ++- internal/rpc/group/group.go | 8 ++++---- pkg/callbackstruct/group.go | 1 + 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index a685678955..ccf24a2182 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -101,7 +101,7 @@ func (s *groupServer) webhookAfterCreateGroup(ctx context.Context, after *config s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupResp{}, after) } -func (s *groupServer) webhookBeforeMembersJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMembers []*model.GroupMember, groupEx string) error { +func (s *groupServer) webhookBeforeMembersJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMembers []*model.GroupMember, groupID string, groupEx string) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { groupMembersMap := datautil.SliceToMap(groupMembers, func(e *model.GroupMember) string { return e.UserID @@ -117,6 +117,7 @@ func (s *groupServer) webhookBeforeMembersJoinGroup(ctx context.Context, before cbReq := &callbackstruct.CallbackBeforeMembersJoinGroupReq{ CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupBatchCommand, + GroupID: groupID, MembersList: groupMembersCallback, GroupEx: groupEx, } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index ead9957c95..aa12c9d0f1 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -271,7 +271,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR joinGroupFunc(userID, constant.GroupOrdinaryUsers) } - if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } @@ -443,7 +443,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite groupMembers = append(groupMembers, member) } - if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } @@ -812,7 +812,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup OperatorUserID: mcontext.GetOpUserID(ctx), } - if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{member}, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{member}, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } } @@ -881,7 +881,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) MuteEndTime: time.UnixMilli(0), } - if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{groupMember}, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { + if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{groupMember}, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } diff --git a/pkg/callbackstruct/group.go b/pkg/callbackstruct/group.go index 35d76ea449..05c476cc33 100644 --- a/pkg/callbackstruct/group.go +++ b/pkg/callbackstruct/group.go @@ -66,6 +66,7 @@ type CallbackGroupMember struct { type CallbackBeforeMembersJoinGroupReq struct { CallbackCommand `json:"callbackCommand"` + GroupID string `json:"groupID"` MembersList []*CallbackGroupMember GroupEx string `json:"groupEx"` } From 4c70bff59bdecb78f2f00f4bad0a651319282c32 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 30 Jul 2024 17:06:52 +0800 Subject: [PATCH 10/20] update callbackCommand field. --- internal/rpc/group/callback.go | 2 +- pkg/callbackstruct/constant.go | 89 +++++++++++++++++----------------- 2 files changed, 45 insertions(+), 46 deletions(-) diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index ccf24a2182..1051a4f777 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -116,7 +116,7 @@ func (s *groupServer) webhookBeforeMembersJoinGroup(ctx context.Context, before } cbReq := &callbackstruct.CallbackBeforeMembersJoinGroupReq{ - CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupBatchCommand, + CallbackCommand: callbackstruct.CallbackBeforeMembersJoinGroupCommand, GroupID: groupID, MembersList: groupMembersCallback, GroupEx: groupEx, diff --git a/pkg/callbackstruct/constant.go b/pkg/callbackstruct/constant.go index 6808a665da..ab393dd36e 100644 --- a/pkg/callbackstruct/constant.go +++ b/pkg/callbackstruct/constant.go @@ -15,49 +15,48 @@ package callbackstruct const ( - CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand" - CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand" - CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand" - CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand" - CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand" - CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand" - CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand" - CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand" - CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand" - CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand" - CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand" - CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand" - CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand" - CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand" - CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand" - CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand" - CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand" - CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand" - CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand" - CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand" - CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand" - CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand" - CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand" - CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand" - CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand" - CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand" - CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand" - CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" - CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" - CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" - CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" - CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand" - CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand" - CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand" - CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand" - CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand" - CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand" - CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand" - CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand" - CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand" - CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand" - CallbackBeforeMemberJoinGroupCommand = "callbackBeforeMemberJoinGroupCommand" - CallbackBeforeMemberJoinGroupBatchCommand = "callbackBeforeMemberJoinGroupBatchCommand" - CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand" - CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand" + CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand" + CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand" + CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand" + CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand" + CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand" + CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand" + CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand" + CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand" + CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand" + CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand" + CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand" + CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand" + CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand" + CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand" + CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand" + CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand" + CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand" + CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand" + CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand" + CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand" + CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand" + CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand" + CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand" + CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand" + CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand" + CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand" + CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand" + CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" + CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" + CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" + CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" + CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand" + CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand" + CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand" + CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand" + CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand" + CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand" + CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand" + CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand" + CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand" + CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand" + CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand" + CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand" + CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand" ) From 96b7c8dc283a044bd60c83a647e2de25f04859b6 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 30 Jul 2024 17:56:05 +0800 Subject: [PATCH 11/20] fix: add correct fields. --- .env | 1 - pkg/callbackstruct/group.go | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.env b/.env index 3199b37149..a71332d5ee 100644 --- a/.env +++ b/.env @@ -1,4 +1,3 @@ - MONGO_IMAGE=mongo:6.0.2 REDIS_IMAGE=redis:7.0.0 ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8 diff --git a/pkg/callbackstruct/group.go b/pkg/callbackstruct/group.go index 05c476cc33..a912f5d6ac 100644 --- a/pkg/callbackstruct/group.go +++ b/pkg/callbackstruct/group.go @@ -66,9 +66,9 @@ type CallbackGroupMember struct { type CallbackBeforeMembersJoinGroupReq struct { CallbackCommand `json:"callbackCommand"` - GroupID string `json:"groupID"` - MembersList []*CallbackGroupMember - GroupEx string `json:"groupEx"` + GroupID string `json:"groupID"` + MembersList []*CallbackGroupMember `json:"memberList"` + GroupEx string `json:"groupEx"` } type MemberJoinGroupCallBack struct { From 3307241654a3c67c9dc29d53dd6f3e7147d98213 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 30 Jul 2024 18:07:19 +0800 Subject: [PATCH 12/20] update struct tags. --- internal/rpc/group/callback.go | 2 +- pkg/callbackstruct/group.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index 1051a4f777..f877aa64a8 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -127,7 +127,7 @@ func (s *groupServer) webhookBeforeMembersJoinGroup(ctx context.Context, before return err } - for _, memberCallbackResp := range resp.MemberCallbacks { + for _, memberCallbackResp := range resp.MemberCallbackList { if _, ok := groupMembersMap[(*memberCallbackResp.UserID)]; ok { if memberCallbackResp.MuteEndTime != nil { groupMembersMap[(*memberCallbackResp.UserID)].MuteEndTime = time.UnixMilli(*memberCallbackResp.MuteEndTime) diff --git a/pkg/callbackstruct/group.go b/pkg/callbackstruct/group.go index a912f5d6ac..23a73ebd23 100644 --- a/pkg/callbackstruct/group.go +++ b/pkg/callbackstruct/group.go @@ -82,7 +82,7 @@ type MemberJoinGroupCallBack struct { type CallbackBeforeMembersJoinGroupResp struct { CommonCallbackResp - MemberCallbacks []*MemberJoinGroupCallBack + MemberCallbackList []*MemberJoinGroupCallBack `json:"memberCallbackList"` } type CallbackBeforeSetGroupMemberInfoReq struct { From 1237549e25abe88642ab994c40ace3dc9d496f5e Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 31 Jul 2024 18:02:58 +0800 Subject: [PATCH 13/20] refactor: rename friend module to relation. --- internal/rpc/{friend => relation}/black.go | 2 +- internal/rpc/{friend => relation}/callback.go | 2 +- internal/rpc/{friend => relation}/friend.go | 3 +- .../rpc/{friend => relation}/notification.go | 2 +- internal/rpc/{friend => relation}/sync.go | 2 +- internal/rpc/user/user.go | 6 ++-- pkg/common/cmd/friend.go | 34 +++++++++---------- 7 files changed, 26 insertions(+), 25 deletions(-) rename internal/rpc/{friend => relation}/black.go (99%) rename internal/rpc/{friend => relation}/callback.go (99%) rename internal/rpc/{friend => relation}/friend.go (99%) rename internal/rpc/{friend => relation}/notification.go (99%) rename internal/rpc/{friend => relation}/sync.go (99%) diff --git a/internal/rpc/friend/black.go b/internal/rpc/relation/black.go similarity index 99% rename from internal/rpc/friend/black.go rename to internal/rpc/relation/black.go index 218d1e7f85..e149e31653 100644 --- a/internal/rpc/friend/black.go +++ b/internal/rpc/relation/black.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package friend +package relation import ( "context" diff --git a/internal/rpc/friend/callback.go b/internal/rpc/relation/callback.go similarity index 99% rename from internal/rpc/friend/callback.go rename to internal/rpc/relation/callback.go index 746ad21fa1..69c4c9e0ec 100644 --- a/internal/rpc/friend/callback.go +++ b/internal/rpc/relation/callback.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package friend +package relation import ( "context" diff --git a/internal/rpc/friend/friend.go b/internal/rpc/relation/friend.go similarity index 99% rename from internal/rpc/friend/friend.go rename to internal/rpc/relation/friend.go index bdb786bca4..3d29ad3379 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/relation/friend.go @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package friend +package relation import ( "context" + "github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/open-im-server/v3/pkg/common/config" diff --git a/internal/rpc/friend/notification.go b/internal/rpc/relation/notification.go similarity index 99% rename from internal/rpc/friend/notification.go rename to internal/rpc/relation/notification.go index 5fb34577fc..83c5d2ca9d 100644 --- a/internal/rpc/friend/notification.go +++ b/internal/rpc/relation/notification.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package friend +package relation import ( "context" diff --git a/internal/rpc/friend/sync.go b/internal/rpc/relation/sync.go similarity index 99% rename from internal/rpc/friend/sync.go rename to internal/rpc/relation/sync.go index 902cc73037..0ad94fe825 100644 --- a/internal/rpc/friend/sync.go +++ b/internal/rpc/relation/sync.go @@ -1,4 +1,4 @@ -package friend +package relation import ( "context" diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 1e534437df..a6952bd6db 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -17,7 +17,7 @@ package user import ( "context" "errors" - "github.com/openimsdk/open-im-server/v3/internal/rpc/friend" + "github.com/openimsdk/open-im-server/v3/internal/rpc/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" @@ -54,7 +54,7 @@ import ( type userServer struct { online cache.OnlineCache db controller.UserDatabase - friendNotificationSender *friend.FriendNotificationSender + friendNotificationSender *relation.FriendNotificationSender userNotificationSender *UserNotificationSender friendRpcClient *rpcclient.FriendRpcClient groupRpcClient *rpcclient.GroupRpcClient @@ -105,7 +105,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi RegisterCenter: client, friendRpcClient: &friendRpcClient, groupRpcClient: &groupRpcClient, - friendNotificationSender: friend.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, friend.WithDBFunc(database.FindWithError)), + friendNotificationSender: relation.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, relation.WithDBFunc(database.FindWithError)), userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)), config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index 1bc9e6c543..a564facd06 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -17,7 +17,7 @@ package cmd import ( "context" - "github.com/openimsdk/open-im-server/v3/internal/rpc/friend" + "github.com/openimsdk/open-im-server/v3/internal/rpc/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -26,23 +26,23 @@ import ( type FriendRpcCmd struct { *RootCmd - ctx context.Context - configMap map[string]any - friendConfig *friend.Config + ctx context.Context + configMap map[string]any + relationConfig *relation.Config } func NewFriendRpcCmd() *FriendRpcCmd { - var friendConfig friend.Config - ret := &FriendRpcCmd{friendConfig: &friendConfig} + var relationConfig relation.Config + ret := &FriendRpcCmd{relationConfig: &relationConfig} ret.configMap = map[string]any{ - OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig, - RedisConfigFileName: &friendConfig.RedisConfig, - MongodbConfigFileName: &friendConfig.MongodbConfig, - ShareFileName: &friendConfig.Share, - NotificationFileName: &friendConfig.NotificationConfig, - WebhooksConfigFileName: &friendConfig.WebhooksConfig, - LocalCacheConfigFileName: &friendConfig.LocalCacheConfig, - DiscoveryConfigFilename: &friendConfig.Discovery, + OpenIMRPCFriendCfgFileName: &relationConfig.RpcConfig, + RedisConfigFileName: &relationConfig.RedisConfig, + MongodbConfigFileName: &relationConfig.MongodbConfig, + ShareFileName: &relationConfig.Share, + NotificationFileName: &relationConfig.NotificationConfig, + WebhooksConfigFileName: &relationConfig.WebhooksConfig, + LocalCacheConfigFileName: &relationConfig.LocalCacheConfig, + DiscoveryConfigFilename: &relationConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) @@ -57,7 +57,7 @@ func (a *FriendRpcCmd) Exec() error { } func (a *FriendRpcCmd) runE() error { - return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP, - a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports, - a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start) + return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, + a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports, + a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) } From bead363df14ee92afd775fd995c5dc56980bbd5b Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 1 Aug 2024 14:25:07 +0800 Subject: [PATCH 14/20] feat: implement scheduled destruct msgs feature in cron task. --- go.mod | 2 +- go.sum | 4 +- internal/rpc/conversation/conversaion.go | 90 +++++++++++++++++++----- internal/rpc/msg/clear.go | 72 +++++++++++++++++-- internal/rpc/msg/server.go | 5 ++ internal/tools/cron_task.go | 54 +++++++++++--- pkg/common/convert/conversation.go | 4 +- pkg/rpcclient/conversation.go | 10 ++- 8 files changed, 206 insertions(+), 35 deletions(-) diff --git a/go.mod b/go.mod index 1a1cf36d2d..49254097a4 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.47 + github.com/openimsdk/protocol v0.0.69-alpha.50 github.com/openimsdk/tools v0.0.49-alpha.55 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 815afe8d21..ac525b2db6 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.47 h1:WEpU7dHSzcpiyPoUkgSt1mC9HfQ6xSDNNZf4KWbZiFI= -github.com/openimsdk/protocol v0.0.69-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.50 h1:4r6vY9LsjFrR8AAwORFhijOGmq2vzDH3XTX4wBiw+2M= +github.com/openimsdk/protocol v0.0.69-alpha.50/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 3047c376bb..0d21b3299b 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -16,13 +16,16 @@ package conversation import ( "context" + "sort" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" - tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/tools/db/redisutil" - "sort" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -40,10 +43,11 @@ import ( ) type conversationServer struct { - msgRpcClient *rpcclient.MessageRpcClient - user *rpcclient.UserRpcClient - groupRpcClient *rpcclient.GroupRpcClient - conversationDatabase controller.ConversationDatabase + msgRpcClient *rpcclient.MessageRpcClient + user *rpcclient.UserRpcClient + groupRpcClient *rpcclient.GroupRpcClient + conversationDatabase controller.ConversationDatabase + conversationNotificationSender *ConversationNotificationSender config *Config } @@ -204,11 +208,11 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s } func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) { - var conversation tablerelation.Conversation + var conversation dbModel.Conversation if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil { return nil, err } - err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tablerelation.Conversation{&conversation}) + err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*dbModel.Conversation{&conversation}) if err != nil { return nil, err } @@ -232,7 +236,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver } } var unequal int - var conv tablerelation.Conversation + var conv dbModel.Conversation if len(req.UserIDs) == 1 { cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID}) if err != nil { @@ -243,7 +247,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver } conv = *cs[0] } - var conversation tablerelation.Conversation + var conversation dbModel.Conversation conversation.ConversationID = req.Conversation.ConversationID conversation.ConversationType = req.Conversation.ConversationType conversation.UserID = req.Conversation.UserID @@ -292,7 +296,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver } } if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType { - var conversations []*tablerelation.Conversation + var conversations []*dbModel.Conversation for _, ownerUserID := range req.UserIDs { conversation2 := conversation conversation2.OwnerUserID = ownerUserID @@ -340,12 +344,12 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, ) (*pbconversation.CreateSingleChatConversationsResp, error) { switch req.ConversationType { case constant.SingleChatType: - var conversation tablerelation.Conversation + var conversation dbModel.Conversation conversation.ConversationID = req.ConversationID conversation.ConversationType = req.ConversationType conversation.OwnerUserID = req.SendID conversation.UserID = req.RecvID - err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation}) + err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation}) if err != nil { log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation) } @@ -353,17 +357,17 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, conversation2 := conversation conversation2.OwnerUserID = req.RecvID conversation2.UserID = req.SendID - err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation2}) + err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2}) if err != nil { log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) } case constant.NotificationChatType: - var conversation tablerelation.Conversation + var conversation dbModel.Conversation conversation.ConversationID = req.ConversationID conversation.ConversationType = req.ConversationType conversation.OwnerUserID = req.RecvID conversation.UserID = req.SendID - err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation}) + err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation}) if err != nil { log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) } @@ -584,6 +588,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv if req.MaxSeq != nil { m["max_seq"] = req.MaxSeq.Value } + if req.LatestMsgDestructTime != nil { + m["latest_msg_destruct_time"] = time.UnixMilli(req.LatestMsgDestructTime.Value) + } if len(m) > 0 { if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil { return nil, err @@ -602,3 +609,54 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco Conversations: convert.ConversationsDB2Pb(conversations), }, nil } + +func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) { + log.ZInfo(ctx, "ConversationDestructMsgs cron start") + num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) + if err != nil { + log.ZError(ctx, "GetAllConversationIDsNumber failed", err) + return nil, err + } + const batchNum = 100 + + if num == 0 { + return nil, errs.New("Need Destruct Msg is nil").Wrap() + } + + maxPage := (num + batchNum - 1) / batchNum + + temp := make([]*model.Conversation, 0, maxPage*batchNum) + + for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ { + pagination := &sdkws.RequestPagination{ + PageNumber: int32(pageNumber), + ShowNumber: batchNum, + } + + conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) + if err != nil { + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) + continue + } + + log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) + if len(conversationIDs) == 0 { + continue + } + + conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs) + if err != nil { + log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs) + continue + } + + for _, conversation := range conversations { + if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8 + conversation.LatestMsgDestructTime.IsZero()) { + temp = append(temp, conversation) + } + } + } + + return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil +} diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 774eae32cd..151dab861d 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -2,16 +2,25 @@ package msg import ( "context" + "strings" + "sync" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/conversation" + pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/wrapperspb" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "strings" - "time" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/idutil" + "github.com/openimsdk/tools/utils/stringutil" ) +// hard delete in Database. func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { return nil, err @@ -26,17 +35,20 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. ) clearMsg := func(ctx context.Context) (bool, error) { conversationSeqs := make(map[string]struct{}) + + // update latest msg destruct time in conversation DB. defer func() { req := &conversation.UpdateConversationReq{ - MsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()), + LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()), } for conversationID := range conversationSeqs { req.ConversationID = conversationID - if err := m.Conversation.UpdateConversations(ctx, req); err != nil { + if err := m.Conversation.UpdateConversation(ctx, req); err != nil { log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime) } } }() + msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100) if err != nil { return false, err @@ -61,6 +73,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. } return true, nil } + for { keep, err := clearMsg(ctx) if err != nil { @@ -75,3 +88,54 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. } return &msg.ClearMsgResp{}, nil } + +// soft delete for self +func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { + temp := convert.ConversationsPb2DB(req.Conversations) + + batchNum := 100 + var wg sync.WaitGroup + wg.Add((len(temp) + batchNum - 1) / batchNum) + + for i := 0; i < len(temp); i += batchNum { + batch := temp[i:min(i+batchNum, len(temp))] + + go func(batch []*model.Conversation) { + defer wg.Done() + + for _, conversation := range temp { + handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) + log.ZDebug(handleCtx, "User MsgsDestruct", + "conversationID", conversation.ConversationID, + "ownerUserID", conversation.OwnerUserID, + "msgDestructTime", conversation.MsgDestructTime, + "lastMsgDestructTime", conversation.LatestMsgDestructTime) + + seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) + if err != nil { + log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + continue + } + + if len(seqs) > 0 { + if err := m.Conversation.UpdateConversation(handleCtx, + &pbconversation.UpdateConversationReq{ + UserIDs: []string{conversation.OwnerUserID}, + ConversationID: conversation.ConversationID, + LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil { + log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + continue + } + + // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) + } + } + }(batch) + + } + wg.Wait() + + return nil, nil +} + + diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index de0f698ea3..91f41f1b11 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" @@ -50,6 +51,7 @@ type ( ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. Handlers MessageInterceptorChain // Chain of handlers for processing messages. notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. + msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. config *Config // Global configuration settings. webhookClient *webhook.Client } @@ -117,7 +119,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) + s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg)) + msg.RegisterMsgServer(server, s) + return nil } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 1ef4943cd6..ffa54709d9 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -17,16 +17,19 @@ package tools import ( "context" "fmt" + "os" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "os" - "time" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -50,18 +53,34 @@ func Start(ctx context.Context, config *CronTaskConfig) error { } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0]) - conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) + + msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) if err != nil { return err } - cli := msg.NewMsgClient(conn) + + thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + if err != nil { + return err + } + + conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) + if err != nil { + return err + } + + msgClient := msg.NewMsgClient(msgConn) + conversationClient := pbconversation.NewConversationClient(conversationConn) + thirdClient := third.NewThirdClient(thirdConn) + crontab := cron.New() + clearFunc := func() { now := time.Now() deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) - if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { + if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) return } @@ -71,11 +90,27 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return errs.Wrap(err) } - tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) - if err != nil { - return err + msgDestructFunc := func() { + now := time.Now() + ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) + log.ZInfo(ctx, "msg destruct ", "now", now) + + conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) + if err != nil { + log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) + return + } else { + _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations}) + if err != nil { + log.ZError(ctx, "Destruct Msgs failed.", err) + return + } + } + log.ZInfo(ctx, "msg destruct cron task completed", "cont", time.Since(now)) + } + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil { + return errs.Wrap(err) } - thirdClient := third.NewThirdClient(tConn) deleteFunc := func() { now := time.Now() @@ -91,6 +126,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil { return errs.Wrap(err) } + log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) crontab.Start() <-ctx.Done() diff --git a/pkg/common/convert/conversation.go b/pkg/common/convert/conversation.go index a76d7d9f6d..9389b02524 100644 --- a/pkg/common/convert/conversation.go +++ b/pkg/common/convert/conversation.go @@ -22,7 +22,7 @@ import ( func ConversationDB2Pb(conversationDB *model.Conversation) *conversation.Conversation { conversationPB := &conversation.Conversation{} - conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix() + conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli() if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil { return nil } @@ -35,7 +35,7 @@ func ConversationsDB2Pb(conversationsDB []*model.Conversation) (conversationsPB if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil { continue } - conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix() + conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli() conversationsPB = append(conversationsPB, conversationPB) } return conversationsPB diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index e078f432b3..8f95f86a6c 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -82,7 +82,7 @@ func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs [] return err } -func (c *ConversationRpcClient) UpdateConversations(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error { +func (c *ConversationRpcClient) UpdateConversation(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error { _, err := c.Client.UpdateConversation(ctx, conversation) return err } @@ -146,3 +146,11 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont } return resp.UserIDs, nil } + +func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) { + resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) + if err != nil { + return nil, err + } + return resp.Conversations, nil +} From d416e977aaa29c67a7a5b4989cfea577893e4102 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 1 Aug 2024 14:29:36 +0800 Subject: [PATCH 15/20] update log contents. --- internal/rpc/conversation/conversaion.go | 1 - internal/tools/cron_task.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 0d21b3299b..4cf20f919c 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -611,7 +611,6 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco } func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) { - log.ZInfo(ctx, "ConversationDestructMsgs cron start") num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) if err != nil { log.ZError(ctx, "GetAllConversationIDsNumber failed", err) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index ffa54709d9..3a9ce6da9d 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -93,7 +93,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { msgDestructFunc := func() { now := time.Now() ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) - log.ZInfo(ctx, "msg destruct ", "now", now) + log.ZInfo(ctx, "msg destruct cron start", "now", now) conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) if err != nil { From 3e547958b02acfeb790c279d4ecc9b0e718771c8 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 1 Aug 2024 14:54:40 +0800 Subject: [PATCH 16/20] update func name and comments. --- internal/rpc/msg/clear.go | 3 +-- internal/tools/cron_task.go | 11 +++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 151dab861d..fd121318b9 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -127,6 +127,7 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) continue } + // if you need Notify SDK client userseq is update. // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) } } @@ -137,5 +138,3 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) return nil, nil } - - diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 3a9ce6da9d..b1d59800ce 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -75,7 +75,8 @@ func Start(ctx context.Context, config *CronTaskConfig) error { crontab := cron.New() - clearFunc := func() { + // scheduled hard delete outdated Msgs in specific time. + clearMsgFunc := func() { now := time.Now() deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) @@ -86,10 +87,11 @@ func Start(ctx context.Context, config *CronTaskConfig) error { } log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearFunc); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { return errs.Wrap(err) } + // scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature. msgDestructFunc := func() { now := time.Now() ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) @@ -112,7 +114,8 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return errs.Wrap(err) } - deleteFunc := func() { + // scheduled delete outdated file Objects and their datas in specific time. + deleteObjectFunc := func() { now := time.Now() deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) @@ -123,7 +126,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { } log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { return errs.Wrap(err) } From 07eb0cb387d1d24f825c20132d6d8cf79e1347b7 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 1 Aug 2024 15:05:00 +0800 Subject: [PATCH 17/20] update waitgroup to errgroup. --- internal/rpc/msg/clear.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index fd121318b9..2b22ef5132 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -3,12 +3,10 @@ package msg import ( "context" "strings" - "sync" "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/conversation" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" @@ -18,6 +16,7 @@ import ( "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/stringutil" + "golang.org/x/sync/errgroup" ) // hard delete in Database. @@ -94,16 +93,15 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) temp := convert.ConversationsPb2DB(req.Conversations) batchNum := 100 - var wg sync.WaitGroup - wg.Add((len(temp) + batchNum - 1) / batchNum) + + errg, _ := errgroup.WithContext(ctx) + errg.SetLimit(100) for i := 0; i < len(temp); i += batchNum { batch := temp[i:min(i+batchNum, len(temp))] - go func(batch []*model.Conversation) { - defer wg.Done() - - for _, conversation := range temp { + errg.Go(func() error { + for _, conversation := range batch { handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) log.ZDebug(handleCtx, "User MsgsDestruct", "conversationID", conversation.ConversationID, @@ -131,10 +129,8 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) } } - }(batch) - + return nil + }) } - wg.Wait() - return nil, nil } From 33abddadb5150b3ead8ac27e601e907d657b9a65 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 1 Aug 2024 15:10:15 +0800 Subject: [PATCH 18/20] update errgroup wait. --- internal/rpc/msg/clear.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 2b22ef5132..7864dbe215 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -132,5 +132,10 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) return nil }) } + + if err := errg.Wait(); err != nil { + return nil, err + } + return nil, nil } From 0f42b330edfc9ef8b20d8e3c75b878bef5c82547 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 1 Aug 2024 15:51:17 +0800 Subject: [PATCH 19/20] remove unnecessary contents. --- internal/rpc/msg/clear.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 7864dbe215..09b38a139b 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -7,7 +7,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - "github.com/openimsdk/protocol/conversation" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/wrapperspb" @@ -35,19 +34,6 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. clearMsg := func(ctx context.Context) (bool, error) { conversationSeqs := make(map[string]struct{}) - // update latest msg destruct time in conversation DB. - defer func() { - req := &conversation.UpdateConversationReq{ - LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()), - } - for conversationID := range conversationSeqs { - req.ConversationID = conversationID - if err := m.Conversation.UpdateConversation(ctx, req); err != nil { - log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime) - } - } - }() - msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100) if err != nil { return false, err From 031c0f1eb2fb5f08acb2c0c4164e6b330a6b23c4 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 1 Aug 2024 16:00:37 +0800 Subject: [PATCH 20/20] update clearMsg logic. --- internal/rpc/msg/clear.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 09b38a139b..6be551eada 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -2,7 +2,6 @@ package msg import ( "context" - "strings" "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -32,8 +31,6 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. start = time.Now() ) clearMsg := func(ctx context.Context) (bool, error) { - conversationSeqs := make(map[string]struct{}) - msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100) if err != nil { return false, err @@ -41,6 +38,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. if len(msgs) == 0 { return false, nil } + for _, msg := range msgs { index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg) if err != nil { @@ -49,13 +47,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. if len(index) == 0 { return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed") } + docNum++ msgNum += len(index) - conversationID := msg.DocID[:strings.LastIndex(msg.DocID, ":")] - if _, ok := conversationSeqs[conversationID]; !ok { - conversationSeqs[conversationID] = struct{}{} - } } + return true, nil } @@ -69,6 +65,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. log.ZInfo(ctx, "clear msg success", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) break } + log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) } return &msg.ClearMsgResp{}, nil