Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement scheduled destruct msgs feature in cron task. #2466

Merged
merged 35 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1b56405
update protocol in go mod.
mo3et Jul 26, 2024
1e0d74f
add debug log in writePongMsg.
mo3et Jul 26, 2024
824392e
update log level.
mo3et Jul 26, 2024
d612429
Merge branch 'main' of github.com:openimsdk/open-im-server into refac…
mo3et Jul 26, 2024
b41823f
Merge branch 'main' of github.com:openimsdk/open-im-server into refac…
mo3et Jul 26, 2024
144a404
Merge branch 'main' of github.com:openimsdk/open-im-server into refac…
mo3et Jul 26, 2024
cac45e4
Merge branch 'openimsdk:main' into refactor/parse-token
mo3et Jul 26, 2024
7667af4
Merge branch 'refactor/parse-token' of github.com:mo3et/open-im-serve…
mo3et Jul 26, 2024
83dd8a4
Merge branch 'main' of github.com:openimsdk/open-im-server into refac…
mo3et Jul 26, 2024
34e1c60
Merge branch 'openimsdk:main' into refactor/parse-token
mo3et Jul 26, 2024
86e3e38
Merge branch 'refactor/parse-token' of github.com:mo3et/open-im-serve…
mo3et Jul 26, 2024
8a8b99a
add Warn log in writePongMsg.
mo3et Jul 28, 2024
567ceaf
Merge branch 'main' of github.com:openimsdk/open-im-server into refac…
mo3et Jul 28, 2024
a230f0e
add debug log.
mo3et Jul 28, 2024
61a417d
Merge branch 'openimsdk:main' into refactor/parse-token
mo3et Jul 29, 2024
291a318
Merge branch 'main' of github.com:openimsdk/open-im-server into refac…
mo3et Jul 29, 2024
da08d55
feat: update webhookBeforeMemberJoinGroup to batch method.
mo3et Jul 30, 2024
2eeb6aa
Merge branch 'refactor/parse-token' of github.com:mo3et/open-im-serve…
mo3et Jul 30, 2024
7cea317
Merge branch 'main' of github.com:openimsdk/open-im-server into refac…
mo3et Jul 30, 2024
ba170ba
feat: update version field implement.
mo3et Jul 30, 2024
a386110
update webhook implement contents.
mo3et Jul 30, 2024
ab268c5
update method field and contents.
mo3et Jul 30, 2024
4c70bff
update callbackCommand field.
mo3et Jul 30, 2024
96b7c8d
fix: add correct fields.
mo3et Jul 30, 2024
3307241
update struct tags.
mo3et Jul 30, 2024
e74fa5c
Merge branch 'main' of github.com:openimsdk/open-im-server into refac…
mo3et Jul 31, 2024
1237549
refactor: rename friend module to relation.
mo3et Jul 31, 2024
ee428e6
Merge branch 'main' of github.com:openimsdk/open-im-server into refac…
mo3et Aug 1, 2024
bead363
feat: implement scheduled destruct msgs feature in cron task.
mo3et Aug 1, 2024
d416e97
update log contents.
mo3et Aug 1, 2024
3e54795
update func name and comments.
mo3et Aug 1, 2024
07eb0cb
update waitgroup to errgroup.
mo3et Aug 1, 2024
33abdda
update errgroup wait.
mo3et Aug 1, 2024
0f42b33
remove unnecessary contents.
mo3et Aug 1, 2024
031c0f1
update clearMsg logic.
mo3et Aug 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.47
github.com/openimsdk/protocol v0.0.69-alpha.50
github.com/openimsdk/tools v0.0.49-alpha.55
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.47 h1:WEpU7dHSzcpiyPoUkgSt1mC9HfQ6xSDNNZf4KWbZiFI=
github.com/openimsdk/protocol v0.0.69-alpha.47/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.69-alpha.50 h1:4r6vY9LsjFrR8AAwORFhijOGmq2vzDH3XTX4wBiw+2M=
github.com/openimsdk/protocol v0.0.69-alpha.50/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
Expand Down
89 changes: 73 additions & 16 deletions internal/rpc/conversation/conversaion.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ package conversation

import (
"context"
"sort"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/tools/db/redisutil"
"sort"

"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
Expand All @@ -40,10 +43,11 @@ import (
)

type conversationServer struct {
msgRpcClient *rpcclient.MessageRpcClient
user *rpcclient.UserRpcClient
groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase
msgRpcClient *rpcclient.MessageRpcClient
user *rpcclient.UserRpcClient
groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase

conversationNotificationSender *ConversationNotificationSender
config *Config
}
Expand Down Expand Up @@ -204,11 +208,11 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s
}

func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
var conversation tablerelation.Conversation
var conversation dbModel.Conversation
if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil {
return nil, err
}
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tablerelation.Conversation{&conversation})
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*dbModel.Conversation{&conversation})
if err != nil {
return nil, err
}
Expand All @@ -232,7 +236,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
}
var unequal int
var conv tablerelation.Conversation
var conv dbModel.Conversation
if len(req.UserIDs) == 1 {
cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID})
if err != nil {
Expand All @@ -243,7 +247,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
conv = *cs[0]
}
var conversation tablerelation.Conversation
var conversation dbModel.Conversation
conversation.ConversationID = req.Conversation.ConversationID
conversation.ConversationType = req.Conversation.ConversationType
conversation.UserID = req.Conversation.UserID
Expand Down Expand Up @@ -292,7 +296,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
}
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
var conversations []*tablerelation.Conversation
var conversations []*dbModel.Conversation
for _, ownerUserID := range req.UserIDs {
conversation2 := conversation
conversation2.OwnerUserID = ownerUserID
Expand Down Expand Up @@ -340,30 +344,30 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
) (*pbconversation.CreateSingleChatConversationsResp, error) {
switch req.ConversationType {
case constant.SingleChatType:
var conversation tablerelation.Conversation
var conversation dbModel.Conversation
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.SendID
conversation.UserID = req.RecvID
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
}

conversation2 := conversation
conversation2.OwnerUserID = req.RecvID
conversation2.UserID = req.SendID
err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation2})
err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
case constant.NotificationChatType:
var conversation tablerelation.Conversation
var conversation dbModel.Conversation
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.RecvID
conversation.UserID = req.SendID
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
Expand Down Expand Up @@ -584,6 +588,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv
if req.MaxSeq != nil {
m["max_seq"] = req.MaxSeq.Value
}
if req.LatestMsgDestructTime != nil {
m["latest_msg_destruct_time"] = time.UnixMilli(req.LatestMsgDestructTime.Value)
}
if len(m) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil {
return nil, err
Expand All @@ -602,3 +609,53 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
Conversations: convert.ConversationsDB2Pb(conversations),
}, nil
}

func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) {
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return nil, err
}
const batchNum = 100

if num == 0 {
return nil, errs.New("Need Destruct Msg is nil").Wrap()
}

maxPage := (num + batchNum - 1) / batchNum

temp := make([]*model.Conversation, 0, maxPage*batchNum)

for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}

conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}

log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}

conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
if err != nil {
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
continue
}

for _, conversation := range conversations {
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
conversation.LatestMsgDestructTime.IsZero()) {
temp = append(temp, conversation)
}
}
}

return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
}
72 changes: 68 additions & 4 deletions internal/rpc/msg/clear.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@ package msg

import (
"context"
"strings"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/protocol/conversation"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"strings"
"time"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
"golang.org/x/sync/errgroup"
)

// hard delete in Database.
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
Expand All @@ -26,17 +34,20 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
)
clearMsg := func(ctx context.Context) (bool, error) {
conversationSeqs := make(map[string]struct{})

// update latest msg destruct time in conversation DB.
defer func() {
req := &conversation.UpdateConversationReq{
MsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
}
for conversationID := range conversationSeqs {
req.ConversationID = conversationID
if err := m.Conversation.UpdateConversations(ctx, req); err != nil {
if err := m.Conversation.UpdateConversation(ctx, req); err != nil {
log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime)
}
}
}()

msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100)
if err != nil {
return false, err
Expand All @@ -61,6 +72,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
}
return true, nil
}

for {
keep, err := clearMsg(ctx)
if err != nil {
Expand All @@ -75,3 +87,55 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
}
return &msg.ClearMsgResp{}, nil
}

// soft delete for self
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
temp := convert.ConversationsPb2DB(req.Conversations)

batchNum := 100

errg, _ := errgroup.WithContext(ctx)
errg.SetLimit(100)

for i := 0; i < len(temp); i += batchNum {
batch := temp[i:min(i+batchNum, len(temp))]

errg.Go(func() error {
for _, conversation := range batch {
handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(handleCtx, "User MsgsDestruct",
"conversationID", conversation.ConversationID,
"ownerUserID", conversation.OwnerUserID,
"msgDestructTime", conversation.MsgDestructTime,
"lastMsgDestructTime", conversation.LatestMsgDestructTime)

seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil {
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}

if len(seqs) > 0 {
if err := m.Conversation.UpdateConversation(handleCtx,
&pbconversation.UpdateConversationReq{
UserIDs: []string{conversation.OwnerUserID},
ConversationID: conversation.ConversationID,
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil {
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}

// if you need Notify SDK client userseq is update.
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
}
return nil
})
}

if err := errg.Wait(); err != nil {
return nil, err
}

return nil, nil
}
5 changes: 5 additions & 0 deletions internal/rpc/msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package msg

import (
"context"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
Expand Down Expand Up @@ -50,6 +51,7 @@ type (
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
config *Config // Global configuration settings.
webhookClient *webhook.Client
}
Expand Down Expand Up @@ -117,7 +119,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
}

s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))
s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg))

msg.RegisterMsgServer(server, s)

return nil
}

Expand Down
Loading
Loading