Skip to content

Commit

Permalink
Merge pull request #148 from GuGoOrg/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
liaosunny123 authored Aug 24, 2023
2 parents ae3050b + 639ed0c commit ec60117
Show file tree
Hide file tree
Showing 29 changed files with 332 additions and 131 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ debug/
output/
log/
.env
docker/static
docker/basic/static
docker/rabbitmq/static
static/
File renamed without changes.
3 changes: 3 additions & 0 deletions docker/rabbitmq/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM rabbitmq:3.12.3-management

COPY ./static /plugins
3 changes: 0 additions & 3 deletions src/constant/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,4 @@ const MessageRpcServerPort = ":37007"
const RelationRpcServerName = "GuGoTik-RelationService"
const RelationRpcServerPort = ":37008"

const LikeRpcServerName = "GuGoTik-likeService"
const LikeRpcServerPort = ":37009"

const VideoPicker = "GuGoTik-VideoPicker"
14 changes: 7 additions & 7 deletions src/services/favorite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func main() {
tp, err := tracing.SetTraceProvider(config.LikeRpcServerName)
tp, err := tracing.SetTraceProvider(config.FavoriteRpcServerName)

if err != nil {
logging.Logger.WithFields(logrus.Fields{
Expand All @@ -40,12 +40,12 @@ func main() {
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)

log := logging.LogService(config.LikeRpcServerName)
log := logging.LogService(config.FavoriteRpcServerName)

lis, err := net.Listen("tcp", config.LikeRpcServerPort)
lis, err := net.Listen("tcp", config.FavoriteRpcServerPort)

if err != nil {
log.Panicf("Rpc %s listen happens error: %v", config.LikeRpcServerName, err)
log.Panicf("Rpc %s listen happens error: %v", config.FavoriteRpcServerName, err)
}

var srv FavoriteServiceServerImpl
Expand All @@ -55,12 +55,12 @@ func main() {

health.RegisterHealthServer(s, &probe)

if err := consul.RegisterConsul(config.LikeRpcServerName, config.LikeRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.LikeRpcServerName, err)
if err := consul.RegisterConsul(config.FavoriteRpcServerName, config.FavoriteRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.FavoriteRpcServerName, err)
}

srv.New()
log.Infof("Rpc %s is running at %s now", config.LikeRpcServerName, config.LikeRpcServerPort)
log.Infof("Rpc %s is running at %s now", config.FavoriteRpcServerName, config.FavoriteRpcServerPort)
if err := s.Serve(lis); err != nil {
log.Panicf("Rpc %s listen happens error for: %v", config.FavoriteRpcServerName, err)
}
Expand Down
6 changes: 2 additions & 4 deletions src/services/feed/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ func (s FeedServiceImpl) ListVideos(ctx context.Context, request *feed.ListFeedR
logger := logging.LogService("FeedService.ListVideos").WithContext(ctx)

now := uint32(time.Now().UnixMilli())

layout := "2006-01-02T15:04:05.999Z"
t, err := time.Parse(layout, *request.LatestTime)
latestTime := t.Unix()
latestTime, err := strconv.ParseInt(*request.LatestTime, 10, 64)
if err != nil {
var numError *strconv.NumError
if errors.As(err, &numError) {
Expand Down Expand Up @@ -258,6 +255,7 @@ func queryDetailed(
go func(i int, v *models.Video) {
defer wg.Done()
commentCount, localErr := CommentClient.ListComment(ctx, &comment.ListCommentRequest{
ActorId: actorId,
VideoId: v.ID,
})
if localErr != nil {
Expand Down
60 changes: 37 additions & 23 deletions src/services/message/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"fmt"

"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"
)

var UserClient user.UserServiceClient
Expand Down Expand Up @@ -47,8 +46,11 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action

if err != nil || userResponse.StatusCode != strings.ServiceOKCode {
logger.WithFields(logrus.Fields{
"err": err,
"cctor_id": request.ActorId,
"err": err,
"ActorId": request.ActorId,
"user_id": request.UserId,
"action_type": request.ActionType,
"content_text": request.Content,
}).Errorf("User service error")
logging.SetSpanError(span, err)

Expand All @@ -58,12 +60,14 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action
}, err
}

res, err = addMessage(ctx, logger, span, request.ActorId, request.UserId, request.Content)
res, err = addMessage(ctx, request.ActorId, request.UserId, request.Content)
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
"ActorId": request.ActorId,
}).Errorf("User service error")
"err": err,
"user_id": request.UserId,
"action_type": request.ActionType,
"content_text": request.Content,
}).Errorf("database insert error")
logging.SetSpanError(span, err)
return res, err
}
Expand All @@ -75,14 +79,15 @@ func (c MessageServiceImpl) ChatAction(ctx context.Context, request *chat.Action
return res, err
}

// Chat(context.Context, *ChatRequest) (*ChatResponse, error)
// Chat Chat(context.Context, *ChatRequest) (*ChatResponse, error)
func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest) (resp *chat.ChatResponse, err error) {
ctx, span := tracing.Tracer.Start(ctx, "ChatService")
defer span.End()
logger := logging.LogService("ChatService.chat").WithContext(ctx)
logger.WithFields(logrus.Fields{
"user_id": request.UserId,
"ActorId": request.ActorId,
"user_id": request.UserId,
"ActorId": request.ActorId,
"pre_msg_time": request.PreMsgTime,
}).Debugf("Process start")
toUserId := request.UserId
fromUserId := request.ActorId
Expand All @@ -95,14 +100,19 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest)
//这个地方应该取出多少条消息?
//TO DO 看怎么需要一下

var rMessageList []*chat.Message
result := database.Client.WithContext(ctx).Where("conversation_id=?", conversationId).
Order("created_at desc").Find(&rMessageList)
var pMessageList []models.Message
result := database.Client.WithContext(ctx).
Where("conversation_id=?", conversationId).
Order("created_at desc").
Find(&pMessageList)

if result.Error != nil {
logger.WithFields(logrus.Fields{
"err": result.Error,
}).Errorf("ChatServiceImpl list chat failed to response when listing message")
"err": result.Error,
"user_id": request.UserId,
"ActorId": request.ActorId,
"pre_msg_time": request.PreMsgTime,
}).Errorf("ChatServiceImpl list chat failed to response when listing message,database err")
logging.SetSpanError(span, err)

resp = &chat.ChatResponse{
Expand All @@ -112,6 +122,17 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest)
return
}

rMessageList := make([]*chat.Message, 0, len(pMessageList))
for _, pMessage := range pMessageList {
rMessageList = append(rMessageList, &chat.Message{
Id: pMessage.ID,
Content: pMessage.Content,
CreateTime: uint32(pMessage.CreatedAt.Unix()),
FromUserId: &pMessage.FromUserId,
ToUserId: &pMessage.ToUserId,
})
}

resp = &chat.ChatResponse{
StatusCode: strings.ServiceOKCode,
StatusMsg: strings.ServiceOK,
Expand All @@ -125,7 +146,7 @@ func (c MessageServiceImpl) Chat(ctx context.Context, request *chat.ChatRequest)
return
}

func addMessage(ctx context.Context, logger *logrus.Entry, span trace.Span, fromUserId uint32, toUserId uint32, Context string) (resp *chat.ActionResponse, err error) {
func addMessage(ctx context.Context, fromUserId uint32, toUserId uint32, Context string) (resp *chat.ActionResponse, err error) {
conversationId := fmt.Sprintf("%d_%d", toUserId, fromUserId)

if toUserId > fromUserId {
Expand All @@ -142,13 +163,6 @@ func addMessage(ctx context.Context, logger *logrus.Entry, span trace.Span, from
result := database.Client.WithContext(ctx).Create(&message)

if result.Error != nil {
//TO_DO 错误 替换
logger.WithFields(logrus.Fields{
"err": result.Error,
"id": message.ID,
"ActorId": message.FromUserId,
"to_id": message.ToUserId,
}).Errorf("send message failed when insert to database")

resp = &chat.ActionResponse{
StatusCode: strings.UnableToAddMessageErrorCode,
Expand Down
63 changes: 62 additions & 1 deletion src/services/user/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"GuGoTik/src/constant/strings"
"GuGoTik/src/extra/tracing"
"GuGoTik/src/models"
"GuGoTik/src/rpc/favorite"
"GuGoTik/src/rpc/publish"
"GuGoTik/src/rpc/relation"
"GuGoTik/src/rpc/user"
Expand All @@ -25,12 +26,17 @@ var relationClient relation.RelationServiceClient

var publishClient publish.PublishServiceClient

var favoriteClient favorite.FavoriteServiceClient

func (a UserServiceImpl) New() {
relationConn := grpc2.Connect(config.RelationRpcServerName)
relationClient = relation.NewRelationServiceClient(relationConn)

publishConn := grpc2.Connect(config.PublishRpcServerName)
publishClient = publish.NewPublishServiceClient(publishConn)

favoriteConn := grpc2.Connect(config.FavoriteRpcServerName)
favoriteClient = favorite.NewFavoriteServiceClient(favoriteConn)
}

func (a UserServiceImpl) GetUserInfo(ctx context.Context, request *user.UserRequest) (resp *user.UserResponse, err error) {
Expand Down Expand Up @@ -82,7 +88,7 @@ func (a UserServiceImpl) GetUserInfo(ctx context.Context, request *user.UserRequ
}

var wg sync.WaitGroup
wg.Add(4)
wg.Add(6)
isErr := false

go func() {
Expand Down Expand Up @@ -192,6 +198,61 @@ func (a UserServiceImpl) GetUserInfo(ctx context.Context, request *user.UserRequ
resp.User.WorkCount = &rResp.Count
}()

go func() {
defer wg.Done()
rResp, err := favoriteClient.CountUserTotalFavorited(ctx, &favorite.CountUserTotalFavoritedRequest{
ActorId: request.ActorId,
UserId: request.UserId,
})
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
"userId": request.UserId,
}).Errorf("Error when user service get toal favorited")
isErr = true
return
}

if rResp != nil && rResp.StatusCode == strings.ServiceOKCode {
if err != nil {
logger.WithFields(logrus.Fields{
"errMsg": rResp.StatusMsg,
"userId": request.UserId,
}).Errorf("Error when user service get toal favorited")
isErr = true
return
}
}

resp.User.TotalFavorited = &rResp.Count
}()

go func() {
defer wg.Done()
rResp, err := favoriteClient.CountUserFavorite(ctx, &favorite.CountUserFavoriteRequest{UserId: request.UserId})
if err != nil {
logger.WithFields(logrus.Fields{
"err": err,
"userId": request.UserId,
}).Errorf("Error when user service get favorite")
isErr = true
return
}

if rResp != nil && rResp.StatusCode == strings.ServiceOKCode {
if err != nil {
logger.WithFields(logrus.Fields{
"errMsg": rResp.StatusMsg,
"userId": request.UserId,
}).Errorf("Error when user service get favorite")
isErr = true
return
}
}

resp.User.FavoriteCount = &rResp.Count
}()

wg.Wait()

if isErr {
Expand Down
9 changes: 5 additions & 4 deletions src/web/auth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
grpc2 "GuGoTik/src/utils/grpc"
"GuGoTik/src/utils/logging"
"GuGoTik/src/web/models"
"GuGoTik/src/web/utils"
"github.com/gin-gonic/gin"
_ "github.com/mbobakov/grpc-consul-resolver"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -40,7 +41,7 @@ func LoginHandle(c *gin.Context) {
logger.WithFields(logrus.Fields{
"Username": req.UserName,
}).Warnf("Error when trying to connect with AuthService")
c.JSON(http.StatusOK, res)
c.Render(http.StatusOK, utils.CustomJSON{Data: res, Context: c})
return
}

Expand All @@ -50,7 +51,7 @@ func LoginHandle(c *gin.Context) {
"UserId": res.UserId,
}).Infof("User log in")

c.JSON(http.StatusOK, res)
c.Render(http.StatusOK, utils.CustomJSON{Data: res, Context: c})
}

func RegisterHandle(c *gin.Context) {
Expand All @@ -77,7 +78,7 @@ func RegisterHandle(c *gin.Context) {
logger.WithFields(logrus.Fields{
"Username": req.UserName,
}).Warnf("Error when trying to connect with AuthService")
c.JSON(http.StatusOK, res)
c.Render(http.StatusOK, utils.CustomJSON{Data: res, Context: c})
return
}

Expand All @@ -87,7 +88,7 @@ func RegisterHandle(c *gin.Context) {
"UserId": res.UserId,
}).Infof("User register in")

c.JSON(http.StatusOK, res)
c.Render(http.StatusOK, utils.CustomJSON{Data: res, Context: c})
}

func init() {
Expand Down
Loading

0 comments on commit ec60117

Please sign in to comment.