diff --git a/backend/app/telegram/message_processors.go b/backend/app/telegram/message_processors.go index 124d206..18fa070 100644 --- a/backend/app/telegram/message_processors.go +++ b/backend/app/telegram/message_processors.go @@ -412,7 +412,7 @@ func pollThreadRun(ctx context.Context, threadId string, chatIDString string, ru for { select { case <-ctx.Done(): - log.Infof("Context cancelled, closing streaming connection in chat: %s", chatIDString) + log.Infof("[pollThreadRun] Context cancelled, closing streaming connection in chat: %s", chatIDString) return nil, fmt.Errorf("context cancelled in chat %s", chatIDString) case <-ticker.C: threadRun, err := BOT.API.GetThreadRun(ctx, threadId, runId) @@ -446,6 +446,38 @@ func ChunkSendMessage(bot *telego.Bot, chatID telego.ChatID, text string) { } } +// update current message and sends a new message in up to 4000 chars chunks +func ChunkEditSendMessage(ctx context.Context, bot *telego.Bot, chatID telego.ChatID, text string, messageID int, voice bool) (lastMessage *telego.Message, err error) { + if text == "" { + return nil, nil + } + chunks := util.ChunkString(text, 4000) + for i, chunk := range chunks { + last := false + markup := getLikeDislikeReplyMarkup() + if i == len(chunks)-1 { + markup = getPendingReplyMarkup() + last = true + } + if i == 0 { + log.Debugf("[ChunkEditSendMessage] chunk %d (size %d) - editing message %d in chat %s", i, len(chunk), messageID, chatID) + _, err = bot.EditMessageText(&telego.EditMessageTextParams{ + ChatID: chatID, + MessageID: messageID, + Text: chunk, + ReplyMarkup: markup, + }) + } else { + log.Debugf("[ChunkEditSendMessage] chunk %d (size %d) - sending new message in chat %s", i, len(chunk), chatID) + lastMessage, err = bot.SendMessage(tu.Message(chatID, chunk).WithReplyMarkup(markup)) + } + if !last && voice { + ChunkSendVoice(ctx, bot, chatID, chunk, false) + } + } + return lastMessage, err +} + type NamedReader struct { io.Reader name string @@ -552,17 +584,10 @@ func processMessageChannel( log.Errorf("Failed to add reaction to message in chat: %s, %v", chatIDString, err) } } else { - if mode == lib.VoiceGPT { - ChunkSendVoice(ctx, bot, chatID, finalMessageString, false) - } - - finalMessageParams := telego.EditMessageTextParams{ - ChatID: chatID, - MessageID: responseMessage.MessageID, - Text: finalMessageString, - ReplyMarkup: getLikeDislikeReplyMarkup(), + _, err = ChunkEditSendMessage(ctx, bot, chatID, finalMessageString, responseMessage.MessageID, mode == lib.VoiceGPT) + if err != nil { + log.Errorf("Failed to ChunkEditSendMessage message in chat: %s, %v", chatIDString, err) } - _, err = bot.EditMessageText(&finalMessageParams) } if err != nil { log.Errorf("Failed to add reply markup to message in chat: %s, %v", chatIDString, err) @@ -571,7 +596,7 @@ func processMessageChannel( for { select { case <-ctx.Done(): - log.Infof("Context cancelled, closing streaming connection in chat: %s", chatIDString) + log.Infof("[processMessageChannel] Context cancelled, closing streaming connection in chat: %s", chatIDString) return case <-ticker.C: if previousMessageLength == len(responseText) { @@ -581,42 +606,25 @@ func processMessageChannel( trimmedResponseText := postprocessMessage(responseText, mode, userMessagePrimer) var nextMessageObject *telego.Message - maxMessageSize := 4000 - if mode == lib.VoiceGPT { - maxMessageSize = 1000 - } - if len(trimmedResponseText) > maxMessageSize { - currentMessage := trimmedResponseText[:maxMessageSize] - responseText := trimmedResponseText[maxMessageSize:] - previousMessageLength = len(responseText) - if mode == lib.VoiceGPT { - ChunkSendVoice(ctx, bot, chatID, currentMessage, false) - } - nextMessageObject, err = bot.SendMessage(tu.Message(chatID, responseText).WithReplyMarkup( - getPendingReplyMarkup(), - )) - if err != nil { - log.Errorf("Failed to send next message in chat: %s, %v", chatIDString, err) - } - trimmedResponseText = currentMessage + nextMessageObject, err = ChunkEditSendMessage(ctx, bot, chatID, trimmedResponseText, responseMessage.MessageID, mode == lib.VoiceGPT) + if err != nil { + log.Errorf("Failed to ChunkEditSendMessage message in chat: %s, %v", chatIDString, err) } - _, err = bot.EditMessageText(&telego.EditMessageTextParams{ - ChatID: chatID, - MessageID: responseMessage.MessageID, - Text: trimmedResponseText, - ReplyMarkup: getPendingReplyMarkup(), - }) if nextMessageObject != nil { responseMessage = nextMessageObject + responseText = nextMessageObject.Text nextMessageObject = nil } if err != nil { log.Errorf("Failed to edit message in chat: %s, %v", chatIDString, err) } case message := <-messageChannel: - log.Debugf("Sending message: %s, in chat: %s", message, chatIDString) + if len(message) == 0 { + continue + } responseText = strings.TrimPrefix(responseText, "...") responseText += message + log.Debugf("Received message (new size %d, total size %d) in chat: %s", len(message), len(responseText), chatIDString) } } } diff --git a/backend/app/telegram/message_processors_test.go b/backend/app/telegram/message_processors_test.go index 2ba67c9..dbe79f2 100644 --- a/backend/app/telegram/message_processors_test.go +++ b/backend/app/telegram/message_processors_test.go @@ -1,8 +1,92 @@ package telegram import ( + "context" + "reflect" + "talk2robots/m/v2/app/db/mongo" + "talk2robots/m/v2/app/db/redis" + "talk2robots/m/v2/app/lib" + "talk2robots/m/v2/app/models" + "talk2robots/m/v2/app/openai" "testing" + "time" + + "github.com/mymmrac/telego" + log "github.com/sirupsen/logrus" + "github.com/undefinedlabs/go-mpatch" ) -func TestProcessChatCompleteStreamingMessage(t *testing.T) { +func init() { + setupTestDatadog() + + redis.RedisClient = redis.NewMockRedisClient() + + mongo.MongoDBClient = mongo.NewMockMongoDBClient( + models.MongoUser{ + ID: "123", + Usage: 0.1, + }, + ) + + setupTestBot() + setupCommandHandlers() + + log.SetFormatter(&log.TextFormatter{ + FullTimestamp: true, + ForceColors: true, + }) + log.SetLevel(log.DebugLevel) +} + +func TestProcessThreadedStreamingMessage(t *testing.T) { + message := telego.Message{ + Chat: telego.Chat{ + ID: 123, + Type: "private", + }, + Text: "Tell me about 25 most famous Jedi and write two paragraphs about each of them", + } + ctx, cancelContext := context.WithCancel(context.Background()) + ctx = context.WithValue(ctx, models.UserContext{}, "123") + ctx = context.WithValue(ctx, models.ClientContext{}, "telegram") + ctx = context.WithValue(ctx, models.ChannelContext{}, "123") + + // OpenAI API patch + openAIPatch, _ := mpatch.PatchInstanceMethodByName( + reflect.TypeOf(BOT.API), + "CreateThreadAndRunStreaming", + func(a *openai.API, ctx context.Context, assistantId string, model models.Engine, thread *models.Thread, cancelContext context.CancelFunc) (chan string, error) { + messages := make(chan string) + go func() { + defer close(messages) + defer cancelContext() + message := "This is one message of 256 characters. This is one message of ___ characters. This is one message of ___ characters. This is one message of ___ characters. This is one message of ___ characters. This is one message of ___ characters. This is one longgggg.\n" + for i := 0; i < 60; i++ { + // sleep for 1 second + <-time.After(60 * time.Millisecond) + messages <- message + } + }() + return messages, nil + }, + ) + defer openAIPatch.Unpatch() + + // SendMessage patch + sendMessagePatch, _ := mpatch.PatchInstanceMethodByName( + reflect.TypeOf(BOT.Bot), + "SendMessage", + getSendMessageFuncAssertion(t, "...", 123), + ) + defer sendMessagePatch.Unpatch() + + // EditMessage patch + editMessagePatch, _ := mpatch.PatchInstanceMethodByName( + reflect.TypeOf(BOT.Bot), + "EditMessageText", + getEditMessageFuncAssertion(t, "This is one message of 256 characters", 123), + ) + defer editMessagePatch.Unpatch() + + ProcessThreadedStreamingMessage(ctx, BOT.Bot, &message, lib.ChatGPT, models.ChatGpt35Turbo, cancelContext) } diff --git a/backend/app/telegram/telegram_test.go b/backend/app/telegram/telegram_test.go index 93eb581..8bd45da 100644 --- a/backend/app/telegram/telegram_test.go +++ b/backend/app/telegram/telegram_test.go @@ -16,13 +16,7 @@ import ( ) func init() { - testClient, err := statsd.New("127.0.0.1:8125", statsd.WithNamespace("tests.")) - if err != nil { - log.Fatalf("error creating test DataDog client: %v", err) - } - config.CONFIG = &config.Config{ - DataDogClient: testClient, - } + setupTestDatadog() redis.RedisClient = redis.NewMockRedisClient() @@ -33,7 +27,7 @@ func init() { }, ) - setupBot() + setupTestBot() setupCommandHandlers() } @@ -41,13 +35,23 @@ func getTestBot() *telego.Bot { return &telego.Bot{} } -func setupBot() { +func setupTestBot() { BOT = &Bot{ Name: "testbot", Bot: getTestBot(), } } +func setupTestDatadog() { + testClient, err := statsd.New("127.0.0.1:8125", statsd.WithNamespace("tests.")) + if err != nil { + log.Fatalf("error creating test DataDog client: %v", err) + } + config.CONFIG = &config.Config{ + DataDogClient: testClient, + } +} + func getSendMessageFuncAssertion(t *testing.T, expectedRegex string, expectedChatID int64) func(bot *telego.Bot, params *telego.SendMessageParams) (*telego.Message, error) { return func(bot *telego.Bot, params *telego.SendMessageParams) (*telego.Message, error) { if params.ChatID.ID != expectedChatID { @@ -62,7 +66,31 @@ func getSendMessageFuncAssertion(t *testing.T, expectedRegex string, expectedCha t.Errorf("Expected message to match regex %s, got %s", expectedRegex, params.Text) } - return &telego.Message{}, nil + return &telego.Message{ + MessageID: 12345, + Text: params.Text, + }, nil + } +} + +func getEditMessageFuncAssertion(t *testing.T, expectedRegex string, expectedChatID int64) func(bot *telego.Bot, params *telego.EditMessageTextParams) (*telego.Message, error) { + return func(bot *telego.Bot, params *telego.EditMessageTextParams) (*telego.Message, error) { + if params.ChatID.ID != expectedChatID { + t.Errorf("Expected chat ID %d, got %d", expectedChatID, params.ChatID.ID) + } + + matched, err := regexp.MatchString(expectedRegex, params.Text) + if err != nil { + t.Errorf("Error matching regex: %v", err) + } + if !matched { + t.Errorf("Expected message to match regex %s, got %s", expectedRegex, params.Text) + } + + return &telego.Message{ + MessageID: params.MessageID, + Text: params.Text, + }, nil } }