Skip to content

Commit

Permalink
better status tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
radiantspace committed May 18, 2024
1 parent 217e351 commit 9177a30
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 19 deletions.
73 changes: 71 additions & 2 deletions backend/app/db/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ type MongoClient interface {
GetUserIds(ctx context.Context, page int, pageSize int) ([]string, error)
GetUsersCount(ctx context.Context) (int64, error)
GetUsersCountForSubscription(ctx context.Context, subscription string) (int64, error)
GetUserIdsUsedSince(ctx context.Context, since time.Time, page int, pageSize int) ([]string, error)
GetUserIdsNotifiedBefore(ctx context.Context, before time.Time, page int, pageSize int) ([]string, error)
MigrateUsersToSubscription(ctx context.Context, from, to string) error
Ping(ctx context.Context, rp *readpref.ReadPref) error
UpdateUserContacts(ctx context.Context, name, phone, email string) error
UpdateUserSubscription(ctx context.Context, subscription models.MongoSubscription) error
UpdateUserUsage(ctx context.Context, userTotalCost float64) error
UpdateUserStripeCustomerId(ctx context.Context, stripeCustomerId string) error
UpdateUsersNotified(ctx context.Context, userIds []string) error
}

var MongoDBClient MongoClient
Expand Down Expand Up @@ -99,7 +102,7 @@ func (c *Client) UpdateUserSubscription(ctx context.Context, subscription models
update = bson.M{
"$set": bson.M{
"subscription": subscription,
"subscription_date": time.Now().Format("2006-01-02"),
"subscription_date": time.Now().UTC().Format("2006-01-02"),
},
}
} else {
Expand All @@ -121,7 +124,8 @@ func (c *Client) UpdateUserUsage(ctx context.Context, newUsage float64) error {
if newUsage != 0 {
update = bson.M{
"$set": bson.M{
"usage": newUsage,
"usage": newUsage,
"last_used_at": time.Now().UTC().Format("2006-01-02T15:04:05"),
},
}
} else {
Expand All @@ -133,6 +137,25 @@ func (c *Client) UpdateUserUsage(ctx context.Context, newUsage float64) error {
return err
}

func (c *Client) UpdateUsersNotified(ctx context.Context, userIds []string) error {
if len(userIds) == 0 {
return nil
}

collection := c.Database(config.CONFIG.MongoDBName).Collection("users")

filter := bson.M{"_id": bson.M{"$in": userIds}}
update := bson.M{
"$set": bson.M{
"last_notified_at": time.Now().UTC().Format("2006-01-02T15:04:05"),
},
}

options := options.Update().SetUpsert(true)
_, err := collection.UpdateMany(ctx, filter, update, options)
return err
}

func (c *Client) UpdateUserContacts(ctx context.Context, name, phone, email string) error {
userId := ctx.Value(models.UserContext{}).(string)
collection := c.Database(config.CONFIG.MongoDBName).Collection("users")
Expand Down Expand Up @@ -213,3 +236,49 @@ func (c *Client) GetUserIds(ctx context.Context, page int, pageSize int) ([]stri
}
return userIds, nil
}

func (c *Client) GetUserIdsUsedSince(ctx context.Context, since time.Time, page int, pageSize int) ([]string, error) {
collection := c.Database(config.CONFIG.MongoDBName).Collection("users")
findOptions := options.Find()
findOptions.SetSkip(int64(page * pageSize))
findOptions.SetLimit(int64(pageSize))
cursor, err := collection.Find(ctx, bson.M{"subscription_date": bson.M{"$gte": since.Format("2006-01-02T15:04:05")}}, findOptions)
if err != nil {
return nil, fmt.Errorf("GetUserIdsSince: failed to find users: %w", err)
}
defer cursor.Close(ctx)

var userIds []string
for cursor.Next(ctx) {
var user models.MongoUser
err := cursor.Decode(&user)
if err != nil {
return nil, fmt.Errorf("GetUserIdsSince: failed to decode user: %w", err)
}
userIds = append(userIds, user.ID)
}
return userIds, nil
}

func (c *Client) GetUserIdsNotifiedBefore(ctx context.Context, before time.Time, page int, pageSize int) ([]string, error) {
collection := c.Database(config.CONFIG.MongoDBName).Collection("users")
findOptions := options.Find()
findOptions.SetSkip(int64(page * pageSize))
findOptions.SetLimit(int64(pageSize))
cursor, err := collection.Find(ctx, bson.M{"last_notified_at": bson.M{"$lte": before.Format("2006-01-02T15:04:05")}}, findOptions)
if err != nil {
return nil, fmt.Errorf("GetUserIdsNotifiedBefore: failed to find users: %w", err)
}
defer cursor.Close(ctx)

var userIds []string
for cursor.Next(ctx) {
var user models.MongoUser
err := cursor.Decode(&user)
if err != nil {
return nil, fmt.Errorf("GetUserIdsNotifiedBefore: failed to decode user: %w", err)
}
userIds = append(userIds, user.ID)
}
return userIds, nil
}
2 changes: 2 additions & 0 deletions backend/app/models/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type MongoUser struct {
SubscriptionDate string `bson:"subscription_date"`
SubscriptionType MongoSubscription `bson:"subscription"`
Usage float64 `bson:"usage"`
LastUsedAt string `bson:"last_used_at"`
LastNotifiedAt string `bson:"last_notified_at"`
}

type MongoSubscription struct {
Expand Down
13 changes: 13 additions & 0 deletions backend/app/status/system_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ type SystemUsage struct {
TotalBasicUsers int64 `json:"total_basic_users"`
TotalTokens int64 `json:"total_tokens"`
TotalCost float64 `json:"total_cost"`
TotalImages int64 `json:"total_images"`
AudioDurationMinutes float64 `json:"audio_duration_minutes"`
WeekActiveUsers int64 `json:"week_active_users"`
MonthActiveUsers int64 `json:"month_active_users"`
}

// Status
Expand Down Expand Up @@ -100,6 +103,10 @@ func (h *SystemStatusHandler) GetSystemStatus() SystemStatus {
if audioDurationMinutes.Err() == nil {
status.Usage.AudioDurationMinutes, _ = audioDurationMinutes.Float64()
}
images := h.Redis.Get(context.Background(), "system_totals:images")
if images.Err() == nil {
status.Usage.TotalImages, _ = images.Int64()
}
}
if status.MongoDB.Available {
users, _ := h.MongoDB.GetUsersCount(context.Background())
Expand All @@ -108,6 +115,12 @@ func (h *SystemStatusHandler) GetSystemStatus() SystemStatus {
status.Usage.TotalFreePlusUsers = freePlusUsers
basicUsers, _ := h.MongoDB.GetUsersCountForSubscription(context.Background(), "basic")
status.Usage.TotalBasicUsers = basicUsers

weekActiveUsers, _ := h.MongoDB.GetUserIdsUsedSince(context.Background(), time.Now().AddDate(0, 0, -7), 0, 1000000)
status.Usage.WeekActiveUsers = int64(len(weekActiveUsers))

monthActiveUsers, _ := h.MongoDB.GetUserIdsUsedSince(context.Background(), time.Now().AddDate(0, -1, 0), 0, 1000000)
status.Usage.MonthActiveUsers = int64(len(monthActiveUsers))
}
return status
}
6 changes: 6 additions & 0 deletions backend/app/telegram/message_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ func ChunkSendMessage(bot *telego.Bot, message *telego.Message, text string) {
log.Errorf("Failed to send message to telegram: %v, chatID: %s, threadID: %d", err, chatID, message.MessageThreadID)
}
}
time.Sleep(1 * time.Second)
}
}

Expand Down Expand Up @@ -508,13 +509,17 @@ func ChunkEditSendMessage(
params.ParseMode = ""
_, err = bot.EditMessageText(params)
}

time.Sleep(1 * time.Second) // sleep to prevent rate limiting
} else {
log.Debugf("[ChunkEditSendMessage] chunk %d (size %d) - sending new message in chat %s", i, len(chunk), chatID)
lastMessage, err = bot.SendMessage(tu.Message(chatID, chunk).WithParseMode("HTML").WithMessageThreadID(message.MessageThreadID).WithReplyMarkup(markup))

if err != nil && strings.Contains(err.Error(), "can't parse entities") {
lastMessage, err = bot.SendMessage(tu.Message(chatID, chunk).WithMessageThreadID(message.MessageThreadID).WithReplyMarkup(markup))
}

time.Sleep(1 * time.Second) // sleep to prevent rate limiting
}
if !last && voice {
ChunkSendVoice(ctx, bot, message, chunk, false)
Expand Down Expand Up @@ -572,6 +577,7 @@ func ChunkSendVoice(ctx context.Context, bot *telego.Bot, message *telego.Messag
voiceParams.ParseMode = ""
_, err = bot.SendVoice(voiceParams.WithReplyMarkup(getLikeDislikeReplyMarkup(message.MessageThreadID)))
}
time.Sleep(1 * time.Second) // sleep to prevent rate limiting
if err != nil {
log.Errorf("Failed to send voice message: %v in chatID: %d", err, chatID.ID)
continue
Expand Down
42 changes: 27 additions & 15 deletions backend/app/telegram/telegram_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,16 +303,26 @@ func handleSendMessageToUsers(ctx context.Context, bot *Bot, message *telego.Mes
pageSize := 10
sleepBetweenPages := 1 * time.Second
notifiedUsers := 0.0
skippedUsers := 0.0
skippedNonTelegramUsers := 0.0
skippedGroupUsers := 0.0
skippedErrorUsers := 0.0
notifiedUsersIds := []string{}

defer func() {
bot.SendMessage(tu.Message(SystemBOT.ChatID, fmt.Sprintf("Message sent to %.f users, %.f users skipped", notifiedUsers, skippedUsers)))
bot.SendMessage(tu.Message(SystemBOT.ChatID, fmt.Sprintf("Message sent to %.f users, %.f groups skipped, %.f error users skipped, %.f non-telegram users skipped", notifiedUsers, skippedGroupUsers, skippedErrorUsers, skippedNonTelegramUsers)))
config.CONFIG.DataDogClient.Incr("custom_message_sent_total", []string{}, notifiedUsers)
config.CONFIG.DataDogClient.Incr("custom_message_skipped_total", []string{}, skippedUsers)
config.CONFIG.DataDogClient.Incr("custom_message_sent_groups_skipped", []string{}, skippedGroupUsers)
config.CONFIG.DataDogClient.Incr("custom_message_sent_non_telegram_users_skipped", []string{}, skippedNonTelegramUsers)
config.CONFIG.DataDogClient.Incr("custom_message_sent_error_users_skipped", []string{}, skippedErrorUsers)

err := mongo.MongoDBClient.UpdateUsersNotified(context.Background(), notifiedUsersIds)
if err != nil {
log.Errorf("Failed to update users notified: %s", err)
}
}()

for {
users, err := mongo.MongoDBClient.GetUserIds(context.Background(), page, pageSize)
users, err := mongo.MongoDBClient.GetUserIdsNotifiedBefore(context.Background(), time.Now().AddDate(0, 0, -1), page, pageSize)
if err != nil {
bot.SendMessage(tu.Message(SystemBOT.ChatID, fmt.Sprintf("Failed to get users page %d (page size %d): %s", page, pageSize, err)))
return
Expand All @@ -321,30 +331,32 @@ func handleSendMessageToUsers(ctx context.Context, bot *Bot, message *telego.Mes
break
}
for _, user := range users {
if user == "" || strings.HasPrefix(user, "SYSTEM:STATUS") || strings.HasPrefix(user, "U") || strings.HasPrefix(user, "slack") || strings.HasPrefix(user, "-") {
// skip non telegram users, groups and channels
if user == "" || strings.HasPrefix(user, "SYSTEM:STATUS") || strings.HasPrefix(user, "U") || strings.HasPrefix(user, "slack") {
// skip non telegram users
skippedNonTelegramUsers++
continue
}

userId, err := strconv.ParseInt(user, 10, 64)
if err != nil {
log.Errorf("Failed to convert user id %s to int: %s", user, err)
skippedUsers++
if strings.HasPrefix(user, "-") {
// skip group users
skippedGroupUsers++
continue
}

if userId != SystemBOT.ChatID.ID {
// skip anything else but system user for now
skippedUsers++
userId, err := strconv.ParseInt(user, 10, 64)
if err != nil {
log.Errorf("Failed to convert user id %s to int: %s", user, err)
skippedErrorUsers++
continue
}

_, err = BOT.SendMessage(tu.Message(tu.ID(userId), messageText))
if err != nil {
skippedUsers++
log.Errorf("Failed to send message to user %d: %s", userId, err)
skippedErrorUsers++
log.Errorf("Failed to send message to user %d: %v", userId, err)
} else {
notifiedUsers++
notifiedUsersIds = append(notifiedUsersIds, user)
log.Infof("Custom message sent to user %d", userId)
}

Expand Down
7 changes: 5 additions & 2 deletions backend/app/workers/status/status_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,19 @@ func Run() {
func FetchStatus() (string, error) {
w := WORKER
systemStatus := status.New(mongo.MongoDBClient, redis.RedisClient, w.AI).GetSystemStatus()
config.CONFIG.DataDogClient.Gauge("status_worker.fireworks_ai_available", boolToFloat64(systemStatus.FireworksAI.Available), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.mongo_db_available", boolToFloat64(systemStatus.MongoDB.Available), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.redis_available", boolToFloat64(systemStatus.Redis.Available), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.open_ai_available", boolToFloat64(systemStatus.OpenAI.Available), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.fireworks_ai_available", boolToFloat64(systemStatus.FireworksAI.Available), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.redis_available", boolToFloat64(systemStatus.Redis.Available), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.audio_duration_minutes", systemStatus.Usage.AudioDurationMinutes, nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.total_cost", systemStatus.Usage.TotalCost, nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.total_tokens", float64(systemStatus.Usage.TotalTokens), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.total_users", float64(systemStatus.Usage.TotalUsers), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.total_free_plus_users", float64(systemStatus.Usage.TotalFreePlusUsers), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.total_basic_users", float64(systemStatus.Usage.TotalBasicUsers), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.total_images", float64(systemStatus.Usage.TotalImages), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.week_active_users", float64(systemStatus.Usage.WeekActiveUsers), nil, 1)
config.CONFIG.DataDogClient.Gauge("status_worker.month_active_users", float64(systemStatus.Usage.MonthActiveUsers), nil, 1)
if !systemStatus.MongoDB.Available {
reportUnavailableStatus(w.TelegramSystemBot, w.SystemTelegramChatID, w.MainBotName, "MongoDB")
}
Expand Down

0 comments on commit 9177a30

Please sign in to comment.