Skip to content

Commit

Permalink
refactor: db refactor and cache key add. (#2320)
Browse files Browse the repository at this point in the history
* refactor: db refactor and cache key add.

* refactor: db refactor and cache key add.

* refactor: go version update.

* refactor: file name change.
  • Loading branch information
FGadvancer authored May 27, 2024
1 parent 1eef401 commit 76d9688
Show file tree
Hide file tree
Showing 132 changed files with 2,703 additions and 2,255 deletions.
10 changes: 5 additions & 5 deletions internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package msgtransfer
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
Expand All @@ -26,11 +28,9 @@ import (
"syscall"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
Expand Down Expand Up @@ -83,8 +83,8 @@ func Start(ctx context.Context, index int, config *Config) error {
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
//todo MsgCacheTimeout
msgModel := cache.NewMsgCache(rdb, config.RedisConfig.EnablePipeline)
seqModel := cache.NewSeqCache(rdb)
msgModel := redis.NewMsgCache(rdb, config.RedisConfig.EnablePipeline)
seqModel := redis.NewSeqCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/msgtransfer/online_history_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/IBM/sarama"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
Expand Down
2 changes: 1 addition & 1 deletion internal/msgtransfer/online_msg_to_mongo_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
Expand Down
2 changes: 1 addition & 1 deletion internal/push/offlinepush/fcm/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
firebase "firebase.google.com/go"
"firebase.google.com/go/messaging"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/errs"
"github.com/redis/go-redis/v9"
Expand Down
2 changes: 1 addition & 1 deletion internal/push/offlinepush/getui/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
Expand Down
2 changes: 1 addition & 1 deletion internal/push/offlinepush/offlinepusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
)

const (
Expand Down
6 changes: 3 additions & 3 deletions internal/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
Expand Down Expand Up @@ -49,7 +49,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
cacheModel := cache.NewThirdCache(rdb)
cacheModel := redis.NewThirdCache(rdb)
offlinePusher, err := offlinepush.NewOfflinePusher(&config.RpcConfig, cacheModel)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions internal/rpc/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ package auth
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/tools/db/redisutil"
"github.com/redis/go-redis/v9"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
pbauth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/protocol/constant"
Expand Down Expand Up @@ -61,7 +61,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
userRpcClient: &userRpcClient,
RegisterCenter: client,
authDatabase: controller.NewAuthDatabase(
cache.NewTokenCacheModel(rdb),
redis2.NewTokenCacheModel(rdb),
config.Share.Secret,
config.RpcConfig.TokenPolicy.Expire,
),
Expand Down
34 changes: 18 additions & 16 deletions internal/rpc/conversation/conversaion.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/tools/db/redisutil"
"sort"

"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
pbconversation "github.com/openimsdk/protocol/conversation"
Expand Down Expand Up @@ -73,13 +74,14 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
cache.InitLocalCache(&config.LocalCacheConfig)
localcache.InitLocalCache(&config.LocalCacheConfig)
pbconversation.RegisterConversationServer(server, &conversationServer{
msgRpcClient: &msgRpcClient,
user: &userRpcClient,
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, &msgRpcClient),
groupRpcClient: &groupRpcClient,
conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, &config.LocalCacheConfig, cache.GetDefaultOpt(), conversationDB), mgocli.GetTx()),
conversationDatabase: controller.NewConversationDatabase(conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, redis.GetRocksCacheOptions(), conversationDB), mgocli.GetTx()),
})
return nil
}
Expand Down Expand Up @@ -192,11 +194,11 @@ func (c *conversationServer) GetConversations(ctx context.Context, req *pbconver
}

func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
var conversation tablerelation.ConversationModel
var conversation tablerelation.Conversation
if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil {
return nil, err
}
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tablerelation.ConversationModel{&conversation})
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tablerelation.Conversation{&conversation})
if err != nil {
return nil, err
}
Expand All @@ -220,7 +222,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
}
var unequal int
var conv tablerelation.ConversationModel
var conv tablerelation.Conversation
if len(req.UserIDs) == 1 {
cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID})
if err != nil {
Expand All @@ -231,7 +233,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
conv = *cs[0]
}
var conversation tablerelation.ConversationModel
var conversation tablerelation.Conversation
conversation.ConversationID = req.Conversation.ConversationID
conversation.ConversationType = req.Conversation.ConversationType
conversation.UserID = req.Conversation.UserID
Expand Down Expand Up @@ -280,7 +282,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
}
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
var conversations []*tablerelation.ConversationModel
var conversations []*tablerelation.Conversation
for _, ownerUserID := range req.UserIDs {
conversation2 := conversation
conversation2.OwnerUserID = ownerUserID
Expand Down Expand Up @@ -328,30 +330,30 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
) (*pbconversation.CreateSingleChatConversationsResp, error) {
switch req.ConversationType {
case constant.SingleChatType:
var conversation tablerelation.ConversationModel
var conversation tablerelation.Conversation
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.SendID
conversation.UserID = req.RecvID
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation})
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
}

conversation2 := conversation
conversation2.OwnerUserID = req.RecvID
conversation2.UserID = req.SendID
err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation2})
err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation2})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
case constant.NotificationChatType:
var conversation tablerelation.ConversationModel
var conversation tablerelation.Conversation
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.RecvID
conversation.UserID = req.SendID
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.ConversationModel{&conversation})
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/rpc/friend/black.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package friend

import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
pbfriend "github.com/openimsdk/protocol/friend"
"github.com/openimsdk/tools/mcontext"
)
Expand Down Expand Up @@ -58,7 +58,7 @@ func (s *friendServer) RemoveBlack(ctx context.Context, req *pbfriend.RemoveBlac
return nil, err
}

if err := s.blackDatabase.Delete(ctx, []*relation.BlackModel{{OwnerUserID: req.OwnerUserID, BlockUserID: req.BlackUserID}}); err != nil {
if err := s.blackDatabase.Delete(ctx, []*model.Black{{OwnerUserID: req.OwnerUserID, BlockUserID: req.BlackUserID}}); err != nil {
return nil, err
}

Expand All @@ -75,15 +75,15 @@ func (s *friendServer) AddBlack(ctx context.Context, req *pbfriend.AddBlackReq)
if err != nil {
return nil, err
}
black := relation.BlackModel{
black := model.Black{
OwnerUserID: req.OwnerUserID,
BlockUserID: req.BlackUserID,
OperatorUserID: mcontext.GetOpUserID(ctx),
CreateTime: time.Now(),
Ex: req.Ex,
}

if err := s.blackDatabase.Create(ctx, []*relation.BlackModel{&black}); err != nil {
if err := s.blackDatabase.Create(ctx, []*model.Black{&black}); err != nil {
return nil, err
}
s.notificationSender.BlackAddedNotification(ctx, req)
Expand Down
21 changes: 11 additions & 10 deletions internal/rpc/friend/friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ package friend
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/tools/db/redisutil"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
pbfriend "github.com/openimsdk/protocol/friend"
Expand Down Expand Up @@ -96,19 +97,19 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
&msgRpcClient,
WithRpcFunc(userRpcClient.GetUsersInfo),
)
cache.InitLocalCache(&config.LocalCacheConfig)
localcache.InitLocalCache(&config.LocalCacheConfig)

// Register Friend server with refactored MongoDB and Redis integrations
pbfriend.RegisterFriendServer(server, &friendServer{
friendDatabase: controller.NewFriendDatabase(
friendMongoDB,
friendRequestMongoDB,
cache.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB, cache.GetDefaultOpt()),
redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB, redis.GetRocksCacheOptions()),
mgocli.GetTx(),
),
blackDatabase: controller.NewBlackDatabase(
blackMongoDB,
cache.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, cache.GetDefaultOpt()),
redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, redis.GetRocksCacheOptions()),
),
userRpcClient: &userRpcClient,
notificationSender: notificationSender,
Expand Down Expand Up @@ -193,7 +194,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res
return nil, err
}

friendRequest := tablerelation.FriendRequestModel{
friendRequest := model.FriendRequest{
FromUserID: req.FromUserID,
ToUserID: req.ToUserID,
HandleMsg: req.HandleMsg,
Expand Down Expand Up @@ -384,10 +385,10 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfrien
if err != nil {
return nil, err
}
friendMap := datautil.SliceToMap(friends, func(e *tablerelation.FriendModel) string {
friendMap := datautil.SliceToMap(friends, func(e *model.Friend) string {
return e.FriendUserID
})
blackMap := datautil.SliceToMap(blacks, func(e *tablerelation.BlackModel) string {
blackMap := datautil.SliceToMap(blacks, func(e *model.Black) string {
return e.BlockUserID
})
resp := &pbfriend.GetSpecifiedFriendsInfoResp{
Expand Down
6 changes: 3 additions & 3 deletions internal/rpc/friend/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package friend

import (
"context"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/protocol/constant"
Expand All @@ -46,7 +46,7 @@ func WithFriendDB(db controller.FriendDatabase) friendNotificationSenderOptions
}

func WithDBFunc(
fn func(ctx context.Context, userIDs []string) (users []*relationtb.UserModel, err error),
fn func(ctx context.Context, userIDs []string) (users []*relationtb.User, err error),
) friendNotificationSenderOptions {
return func(s *FriendNotificationSender) {
f := func(ctx context.Context, userIDs []string) (result []notification.CommonUser, err error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/rpc/group/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
Expand Down Expand Up @@ -100,7 +100,7 @@ func (s *groupServer) webhookAfterCreateGroup(ctx context.Context, after *config
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupResp{}, after)
}

func (s *groupServer) webhookBeforeMemberJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMember *relation.GroupMemberModel, groupEx string) error {
func (s *groupServer) webhookBeforeMemberJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMember *model.GroupMember, groupEx string) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{
CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupCommand,
Expand Down
Loading

0 comments on commit 76d9688

Please sign in to comment.