From 9177a3021838653170941b66fafb926b91c49074 Mon Sep 17 00:00:00 2001 From: Yury Nevalenny Date: Sat, 18 May 2024 18:06:22 +0000 Subject: [PATCH] better status tracking --- backend/app/db/mongo/mongo.go | 73 ++++++++++++++++++++- backend/app/models/mongo.go | 2 + backend/app/status/system_status.go | 13 ++++ backend/app/telegram/message_processors.go | 6 ++ backend/app/telegram/telegram_system.go | 42 +++++++----- backend/app/workers/status/status_worker.go | 7 +- 6 files changed, 124 insertions(+), 19 deletions(-) diff --git a/backend/app/db/mongo/mongo.go b/backend/app/db/mongo/mongo.go index d5c0b09..da9d4c2 100644 --- a/backend/app/db/mongo/mongo.go +++ b/backend/app/db/mongo/mongo.go @@ -25,12 +25,15 @@ type MongoClient interface { GetUserIds(ctx context.Context, page int, pageSize int) ([]string, error) GetUsersCount(ctx context.Context) (int64, error) GetUsersCountForSubscription(ctx context.Context, subscription string) (int64, error) + GetUserIdsUsedSince(ctx context.Context, since time.Time, page int, pageSize int) ([]string, error) + GetUserIdsNotifiedBefore(ctx context.Context, before time.Time, page int, pageSize int) ([]string, error) MigrateUsersToSubscription(ctx context.Context, from, to string) error Ping(ctx context.Context, rp *readpref.ReadPref) error UpdateUserContacts(ctx context.Context, name, phone, email string) error UpdateUserSubscription(ctx context.Context, subscription models.MongoSubscription) error UpdateUserUsage(ctx context.Context, userTotalCost float64) error UpdateUserStripeCustomerId(ctx context.Context, stripeCustomerId string) error + UpdateUsersNotified(ctx context.Context, userIds []string) error } var MongoDBClient MongoClient @@ -99,7 +102,7 @@ func (c *Client) UpdateUserSubscription(ctx context.Context, subscription models update = bson.M{ "$set": bson.M{ "subscription": subscription, - "subscription_date": time.Now().Format("2006-01-02"), + "subscription_date": time.Now().UTC().Format("2006-01-02"), }, } } else { @@ -121,7 +124,8 @@ func (c *Client) UpdateUserUsage(ctx context.Context, newUsage float64) error { if newUsage != 0 { update = bson.M{ "$set": bson.M{ - "usage": newUsage, + "usage": newUsage, + "last_used_at": time.Now().UTC().Format("2006-01-02T15:04:05"), }, } } else { @@ -133,6 +137,25 @@ func (c *Client) UpdateUserUsage(ctx context.Context, newUsage float64) error { return err } +func (c *Client) UpdateUsersNotified(ctx context.Context, userIds []string) error { + if len(userIds) == 0 { + return nil + } + + collection := c.Database(config.CONFIG.MongoDBName).Collection("users") + + filter := bson.M{"_id": bson.M{"$in": userIds}} + update := bson.M{ + "$set": bson.M{ + "last_notified_at": time.Now().UTC().Format("2006-01-02T15:04:05"), + }, + } + + options := options.Update().SetUpsert(true) + _, err := collection.UpdateMany(ctx, filter, update, options) + return err +} + func (c *Client) UpdateUserContacts(ctx context.Context, name, phone, email string) error { userId := ctx.Value(models.UserContext{}).(string) collection := c.Database(config.CONFIG.MongoDBName).Collection("users") @@ -213,3 +236,49 @@ func (c *Client) GetUserIds(ctx context.Context, page int, pageSize int) ([]stri } return userIds, nil } + +func (c *Client) GetUserIdsUsedSince(ctx context.Context, since time.Time, page int, pageSize int) ([]string, error) { + collection := c.Database(config.CONFIG.MongoDBName).Collection("users") + findOptions := options.Find() + findOptions.SetSkip(int64(page * pageSize)) + findOptions.SetLimit(int64(pageSize)) + cursor, err := collection.Find(ctx, bson.M{"subscription_date": bson.M{"$gte": since.Format("2006-01-02T15:04:05")}}, findOptions) + if err != nil { + return nil, fmt.Errorf("GetUserIdsSince: failed to find users: %w", err) + } + defer cursor.Close(ctx) + + var userIds []string + for cursor.Next(ctx) { + var user models.MongoUser + err := cursor.Decode(&user) + if err != nil { + return nil, fmt.Errorf("GetUserIdsSince: failed to decode user: %w", err) + } + userIds = append(userIds, user.ID) + } + return userIds, nil +} + +func (c *Client) GetUserIdsNotifiedBefore(ctx context.Context, before time.Time, page int, pageSize int) ([]string, error) { + collection := c.Database(config.CONFIG.MongoDBName).Collection("users") + findOptions := options.Find() + findOptions.SetSkip(int64(page * pageSize)) + findOptions.SetLimit(int64(pageSize)) + cursor, err := collection.Find(ctx, bson.M{"last_notified_at": bson.M{"$lte": before.Format("2006-01-02T15:04:05")}}, findOptions) + if err != nil { + return nil, fmt.Errorf("GetUserIdsNotifiedBefore: failed to find users: %w", err) + } + defer cursor.Close(ctx) + + var userIds []string + for cursor.Next(ctx) { + var user models.MongoUser + err := cursor.Decode(&user) + if err != nil { + return nil, fmt.Errorf("GetUserIdsNotifiedBefore: failed to decode user: %w", err) + } + userIds = append(userIds, user.ID) + } + return userIds, nil +} diff --git a/backend/app/models/mongo.go b/backend/app/models/mongo.go index 82f34e7..b25aab7 100644 --- a/backend/app/models/mongo.go +++ b/backend/app/models/mongo.go @@ -9,6 +9,8 @@ type MongoUser struct { SubscriptionDate string `bson:"subscription_date"` SubscriptionType MongoSubscription `bson:"subscription"` Usage float64 `bson:"usage"` + LastUsedAt string `bson:"last_used_at"` + LastNotifiedAt string `bson:"last_notified_at"` } type MongoSubscription struct { diff --git a/backend/app/status/system_status.go b/backend/app/status/system_status.go index 224d2d6..89cbf56 100644 --- a/backend/app/status/system_status.go +++ b/backend/app/status/system_status.go @@ -26,7 +26,10 @@ type SystemUsage struct { TotalBasicUsers int64 `json:"total_basic_users"` TotalTokens int64 `json:"total_tokens"` TotalCost float64 `json:"total_cost"` + TotalImages int64 `json:"total_images"` AudioDurationMinutes float64 `json:"audio_duration_minutes"` + WeekActiveUsers int64 `json:"week_active_users"` + MonthActiveUsers int64 `json:"month_active_users"` } // Status @@ -100,6 +103,10 @@ func (h *SystemStatusHandler) GetSystemStatus() SystemStatus { if audioDurationMinutes.Err() == nil { status.Usage.AudioDurationMinutes, _ = audioDurationMinutes.Float64() } + images := h.Redis.Get(context.Background(), "system_totals:images") + if images.Err() == nil { + status.Usage.TotalImages, _ = images.Int64() + } } if status.MongoDB.Available { users, _ := h.MongoDB.GetUsersCount(context.Background()) @@ -108,6 +115,12 @@ func (h *SystemStatusHandler) GetSystemStatus() SystemStatus { status.Usage.TotalFreePlusUsers = freePlusUsers basicUsers, _ := h.MongoDB.GetUsersCountForSubscription(context.Background(), "basic") status.Usage.TotalBasicUsers = basicUsers + + weekActiveUsers, _ := h.MongoDB.GetUserIdsUsedSince(context.Background(), time.Now().AddDate(0, 0, -7), 0, 1000000) + status.Usage.WeekActiveUsers = int64(len(weekActiveUsers)) + + monthActiveUsers, _ := h.MongoDB.GetUserIdsUsedSince(context.Background(), time.Now().AddDate(0, -1, 0), 0, 1000000) + status.Usage.MonthActiveUsers = int64(len(monthActiveUsers)) } return status } diff --git a/backend/app/telegram/message_processors.go b/backend/app/telegram/message_processors.go index 37e204f..0e0b4eb 100644 --- a/backend/app/telegram/message_processors.go +++ b/backend/app/telegram/message_processors.go @@ -468,6 +468,7 @@ func ChunkSendMessage(bot *telego.Bot, message *telego.Message, text string) { log.Errorf("Failed to send message to telegram: %v, chatID: %s, threadID: %d", err, chatID, message.MessageThreadID) } } + time.Sleep(1 * time.Second) } } @@ -508,6 +509,8 @@ func ChunkEditSendMessage( params.ParseMode = "" _, err = bot.EditMessageText(params) } + + time.Sleep(1 * time.Second) // sleep to prevent rate limiting } else { log.Debugf("[ChunkEditSendMessage] chunk %d (size %d) - sending new message in chat %s", i, len(chunk), chatID) lastMessage, err = bot.SendMessage(tu.Message(chatID, chunk).WithParseMode("HTML").WithMessageThreadID(message.MessageThreadID).WithReplyMarkup(markup)) @@ -515,6 +518,8 @@ func ChunkEditSendMessage( if err != nil && strings.Contains(err.Error(), "can't parse entities") { lastMessage, err = bot.SendMessage(tu.Message(chatID, chunk).WithMessageThreadID(message.MessageThreadID).WithReplyMarkup(markup)) } + + time.Sleep(1 * time.Second) // sleep to prevent rate limiting } if !last && voice { ChunkSendVoice(ctx, bot, message, chunk, false) @@ -572,6 +577,7 @@ func ChunkSendVoice(ctx context.Context, bot *telego.Bot, message *telego.Messag voiceParams.ParseMode = "" _, err = bot.SendVoice(voiceParams.WithReplyMarkup(getLikeDislikeReplyMarkup(message.MessageThreadID))) } + time.Sleep(1 * time.Second) // sleep to prevent rate limiting if err != nil { log.Errorf("Failed to send voice message: %v in chatID: %d", err, chatID.ID) continue diff --git a/backend/app/telegram/telegram_system.go b/backend/app/telegram/telegram_system.go index 6e64350..dd71456 100644 --- a/backend/app/telegram/telegram_system.go +++ b/backend/app/telegram/telegram_system.go @@ -303,16 +303,26 @@ func handleSendMessageToUsers(ctx context.Context, bot *Bot, message *telego.Mes pageSize := 10 sleepBetweenPages := 1 * time.Second notifiedUsers := 0.0 - skippedUsers := 0.0 + skippedNonTelegramUsers := 0.0 + skippedGroupUsers := 0.0 + skippedErrorUsers := 0.0 + notifiedUsersIds := []string{} defer func() { - bot.SendMessage(tu.Message(SystemBOT.ChatID, fmt.Sprintf("Message sent to %.f users, %.f users skipped", notifiedUsers, skippedUsers))) + bot.SendMessage(tu.Message(SystemBOT.ChatID, fmt.Sprintf("Message sent to %.f users, %.f groups skipped, %.f error users skipped, %.f non-telegram users skipped", notifiedUsers, skippedGroupUsers, skippedErrorUsers, skippedNonTelegramUsers))) config.CONFIG.DataDogClient.Incr("custom_message_sent_total", []string{}, notifiedUsers) - config.CONFIG.DataDogClient.Incr("custom_message_skipped_total", []string{}, skippedUsers) + config.CONFIG.DataDogClient.Incr("custom_message_sent_groups_skipped", []string{}, skippedGroupUsers) + config.CONFIG.DataDogClient.Incr("custom_message_sent_non_telegram_users_skipped", []string{}, skippedNonTelegramUsers) + config.CONFIG.DataDogClient.Incr("custom_message_sent_error_users_skipped", []string{}, skippedErrorUsers) + + err := mongo.MongoDBClient.UpdateUsersNotified(context.Background(), notifiedUsersIds) + if err != nil { + log.Errorf("Failed to update users notified: %s", err) + } }() for { - users, err := mongo.MongoDBClient.GetUserIds(context.Background(), page, pageSize) + users, err := mongo.MongoDBClient.GetUserIdsNotifiedBefore(context.Background(), time.Now().AddDate(0, 0, -1), page, pageSize) if err != nil { bot.SendMessage(tu.Message(SystemBOT.ChatID, fmt.Sprintf("Failed to get users page %d (page size %d): %s", page, pageSize, err))) return @@ -321,30 +331,32 @@ func handleSendMessageToUsers(ctx context.Context, bot *Bot, message *telego.Mes break } for _, user := range users { - if user == "" || strings.HasPrefix(user, "SYSTEM:STATUS") || strings.HasPrefix(user, "U") || strings.HasPrefix(user, "slack") || strings.HasPrefix(user, "-") { - // skip non telegram users, groups and channels + if user == "" || strings.HasPrefix(user, "SYSTEM:STATUS") || strings.HasPrefix(user, "U") || strings.HasPrefix(user, "slack") { + // skip non telegram users + skippedNonTelegramUsers++ continue } - userId, err := strconv.ParseInt(user, 10, 64) - if err != nil { - log.Errorf("Failed to convert user id %s to int: %s", user, err) - skippedUsers++ + if strings.HasPrefix(user, "-") { + // skip group users + skippedGroupUsers++ continue } - if userId != SystemBOT.ChatID.ID { - // skip anything else but system user for now - skippedUsers++ + userId, err := strconv.ParseInt(user, 10, 64) + if err != nil { + log.Errorf("Failed to convert user id %s to int: %s", user, err) + skippedErrorUsers++ continue } _, err = BOT.SendMessage(tu.Message(tu.ID(userId), messageText)) if err != nil { - skippedUsers++ - log.Errorf("Failed to send message to user %d: %s", userId, err) + skippedErrorUsers++ + log.Errorf("Failed to send message to user %d: %v", userId, err) } else { notifiedUsers++ + notifiedUsersIds = append(notifiedUsersIds, user) log.Infof("Custom message sent to user %d", userId) } diff --git a/backend/app/workers/status/status_worker.go b/backend/app/workers/status/status_worker.go index cfa15e2..8530fb7 100644 --- a/backend/app/workers/status/status_worker.go +++ b/backend/app/workers/status/status_worker.go @@ -30,16 +30,19 @@ func Run() { func FetchStatus() (string, error) { w := WORKER systemStatus := status.New(mongo.MongoDBClient, redis.RedisClient, w.AI).GetSystemStatus() + config.CONFIG.DataDogClient.Gauge("status_worker.fireworks_ai_available", boolToFloat64(systemStatus.FireworksAI.Available), nil, 1) config.CONFIG.DataDogClient.Gauge("status_worker.mongo_db_available", boolToFloat64(systemStatus.MongoDB.Available), nil, 1) - config.CONFIG.DataDogClient.Gauge("status_worker.redis_available", boolToFloat64(systemStatus.Redis.Available), nil, 1) config.CONFIG.DataDogClient.Gauge("status_worker.open_ai_available", boolToFloat64(systemStatus.OpenAI.Available), nil, 1) - config.CONFIG.DataDogClient.Gauge("status_worker.fireworks_ai_available", boolToFloat64(systemStatus.FireworksAI.Available), nil, 1) + config.CONFIG.DataDogClient.Gauge("status_worker.redis_available", boolToFloat64(systemStatus.Redis.Available), nil, 1) config.CONFIG.DataDogClient.Gauge("status_worker.audio_duration_minutes", systemStatus.Usage.AudioDurationMinutes, nil, 1) config.CONFIG.DataDogClient.Gauge("status_worker.total_cost", systemStatus.Usage.TotalCost, nil, 1) config.CONFIG.DataDogClient.Gauge("status_worker.total_tokens", float64(systemStatus.Usage.TotalTokens), nil, 1) config.CONFIG.DataDogClient.Gauge("status_worker.total_users", float64(systemStatus.Usage.TotalUsers), nil, 1) config.CONFIG.DataDogClient.Gauge("status_worker.total_free_plus_users", float64(systemStatus.Usage.TotalFreePlusUsers), nil, 1) config.CONFIG.DataDogClient.Gauge("status_worker.total_basic_users", float64(systemStatus.Usage.TotalBasicUsers), nil, 1) + config.CONFIG.DataDogClient.Gauge("status_worker.total_images", float64(systemStatus.Usage.TotalImages), nil, 1) + config.CONFIG.DataDogClient.Gauge("status_worker.week_active_users", float64(systemStatus.Usage.WeekActiveUsers), nil, 1) + config.CONFIG.DataDogClient.Gauge("status_worker.month_active_users", float64(systemStatus.Usage.MonthActiveUsers), nil, 1) if !systemStatus.MongoDB.Available { reportUnavailableStatus(w.TelegramSystemBot, w.SystemTelegramChatID, w.MainBotName, "MongoDB") }