Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
radiantspace committed Mar 16, 2024
1 parent a9229de commit e010cc1
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 48 deletions.
82 changes: 45 additions & 37 deletions backend/app/telegram/message_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func pollThreadRun(ctx context.Context, threadId string, chatIDString string, ru
for {
select {
case <-ctx.Done():
log.Infof("Context cancelled, closing streaming connection in chat: %s", chatIDString)
log.Infof("[pollThreadRun] Context cancelled, closing streaming connection in chat: %s", chatIDString)
return nil, fmt.Errorf("context cancelled in chat %s", chatIDString)
case <-ticker.C:
threadRun, err := BOT.API.GetThreadRun(ctx, threadId, runId)
Expand Down Expand Up @@ -446,6 +446,38 @@ func ChunkSendMessage(bot *telego.Bot, chatID telego.ChatID, text string) {
}
}

// update current message and sends a new message in up to 4000 chars chunks
func ChunkEditSendMessage(ctx context.Context, bot *telego.Bot, chatID telego.ChatID, text string, messageID int, voice bool) (lastMessage *telego.Message, err error) {
if text == "" {
return nil, nil
}
chunks := util.ChunkString(text, 4000)
for i, chunk := range chunks {
last := false
markup := getLikeDislikeReplyMarkup()
if i == len(chunks)-1 {
markup = getPendingReplyMarkup()
last = true
}
if i == 0 {
log.Debugf("[ChunkEditSendMessage] chunk %d (size %d) - editing message %d in chat %s", i, len(chunk), messageID, chatID)
_, err = bot.EditMessageText(&telego.EditMessageTextParams{
ChatID: chatID,
MessageID: messageID,
Text: chunk,
ReplyMarkup: markup,
})
} else {
log.Debugf("[ChunkEditSendMessage] chunk %d (size %d) - sending new message in chat %s", i, len(chunk), chatID)
lastMessage, err = bot.SendMessage(tu.Message(chatID, chunk).WithReplyMarkup(markup))
}
if !last && voice {
ChunkSendVoice(ctx, bot, chatID, chunk, false)
}
}
return lastMessage, err
}

type NamedReader struct {
io.Reader
name string
Expand Down Expand Up @@ -552,17 +584,10 @@ func processMessageChannel(
log.Errorf("Failed to add reaction to message in chat: %s, %v", chatIDString, err)
}
} else {
if mode == lib.VoiceGPT {
ChunkSendVoice(ctx, bot, chatID, finalMessageString, false)
}

finalMessageParams := telego.EditMessageTextParams{
ChatID: chatID,
MessageID: responseMessage.MessageID,
Text: finalMessageString,
ReplyMarkup: getLikeDislikeReplyMarkup(),
_, err = ChunkEditSendMessage(ctx, bot, chatID, finalMessageString, responseMessage.MessageID, mode == lib.VoiceGPT)
if err != nil {
log.Errorf("Failed to ChunkEditSendMessage message in chat: %s, %v", chatIDString, err)
}
_, err = bot.EditMessageText(&finalMessageParams)
}
if err != nil {
log.Errorf("Failed to add reply markup to message in chat: %s, %v", chatIDString, err)
Expand All @@ -571,7 +596,7 @@ func processMessageChannel(
for {
select {
case <-ctx.Done():
log.Infof("Context cancelled, closing streaming connection in chat: %s", chatIDString)
log.Infof("[processMessageChannel] Context cancelled, closing streaming connection in chat: %s", chatIDString)
return
case <-ticker.C:
if previousMessageLength == len(responseText) {
Expand All @@ -581,42 +606,25 @@ func processMessageChannel(
trimmedResponseText := postprocessMessage(responseText, mode, userMessagePrimer)

var nextMessageObject *telego.Message
maxMessageSize := 4000
if mode == lib.VoiceGPT {
maxMessageSize = 1000
}
if len(trimmedResponseText) > maxMessageSize {
currentMessage := trimmedResponseText[:maxMessageSize]
responseText := trimmedResponseText[maxMessageSize:]
previousMessageLength = len(responseText)
if mode == lib.VoiceGPT {
ChunkSendVoice(ctx, bot, chatID, currentMessage, false)
}
nextMessageObject, err = bot.SendMessage(tu.Message(chatID, responseText).WithReplyMarkup(
getPendingReplyMarkup(),
))
if err != nil {
log.Errorf("Failed to send next message in chat: %s, %v", chatIDString, err)
}
trimmedResponseText = currentMessage
nextMessageObject, err = ChunkEditSendMessage(ctx, bot, chatID, trimmedResponseText, responseMessage.MessageID, mode == lib.VoiceGPT)
if err != nil {
log.Errorf("Failed to ChunkEditSendMessage message in chat: %s, %v", chatIDString, err)
}
_, err = bot.EditMessageText(&telego.EditMessageTextParams{
ChatID: chatID,
MessageID: responseMessage.MessageID,
Text: trimmedResponseText,
ReplyMarkup: getPendingReplyMarkup(),
})
if nextMessageObject != nil {
responseMessage = nextMessageObject
responseText = nextMessageObject.Text
nextMessageObject = nil
}
if err != nil {
log.Errorf("Failed to edit message in chat: %s, %v", chatIDString, err)
}
case message := <-messageChannel:
log.Debugf("Sending message: %s, in chat: %s", message, chatIDString)
if len(message) == 0 {
continue
}
responseText = strings.TrimPrefix(responseText, "...")
responseText += message
log.Debugf("Received message (new size %d, total size %d) in chat: %s", len(message), len(responseText), chatIDString)
}
}
}
86 changes: 85 additions & 1 deletion backend/app/telegram/message_processors_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,92 @@
package telegram

import (
"context"
"reflect"
"talk2robots/m/v2/app/db/mongo"
"talk2robots/m/v2/app/db/redis"
"talk2robots/m/v2/app/lib"
"talk2robots/m/v2/app/models"
"talk2robots/m/v2/app/openai"
"testing"
"time"

"github.com/mymmrac/telego"
log "github.com/sirupsen/logrus"
"github.com/undefinedlabs/go-mpatch"
)

func TestProcessChatCompleteStreamingMessage(t *testing.T) {
func init() {
setupTestDatadog()

redis.RedisClient = redis.NewMockRedisClient()

mongo.MongoDBClient = mongo.NewMockMongoDBClient(
models.MongoUser{
ID: "123",
Usage: 0.1,
},
)

setupTestBot()
setupCommandHandlers()

log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
ForceColors: true,
})
log.SetLevel(log.DebugLevel)
}

func TestProcessThreadedStreamingMessage(t *testing.T) {
message := telego.Message{
Chat: telego.Chat{
ID: 123,
Type: "private",
},
Text: "Tell me about 25 most famous Jedi and write two paragraphs about each of them",
}
ctx, cancelContext := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, models.UserContext{}, "123")
ctx = context.WithValue(ctx, models.ClientContext{}, "telegram")
ctx = context.WithValue(ctx, models.ChannelContext{}, "123")

// OpenAI API patch
openAIPatch, _ := mpatch.PatchInstanceMethodByName(
reflect.TypeOf(BOT.API),
"CreateThreadAndRunStreaming",
func(a *openai.API, ctx context.Context, assistantId string, model models.Engine, thread *models.Thread, cancelContext context.CancelFunc) (chan string, error) {
messages := make(chan string)
go func() {
defer close(messages)
defer cancelContext()
message := "This is one message of 256 characters. This is one message of ___ characters. This is one message of ___ characters. This is one message of ___ characters. This is one message of ___ characters. This is one message of ___ characters. This is one longgggg.\n"
for i := 0; i < 60; i++ {
// sleep for 1 second
<-time.After(60 * time.Millisecond)
messages <- message
}
}()
return messages, nil
},
)
defer openAIPatch.Unpatch()

// SendMessage patch
sendMessagePatch, _ := mpatch.PatchInstanceMethodByName(
reflect.TypeOf(BOT.Bot),
"SendMessage",
getSendMessageFuncAssertion(t, "...", 123),
)
defer sendMessagePatch.Unpatch()

// EditMessage patch
editMessagePatch, _ := mpatch.PatchInstanceMethodByName(
reflect.TypeOf(BOT.Bot),
"EditMessageText",
getEditMessageFuncAssertion(t, "This is one message of 256 characters", 123),
)
defer editMessagePatch.Unpatch()

ProcessThreadedStreamingMessage(ctx, BOT.Bot, &message, lib.ChatGPT, models.ChatGpt35Turbo, cancelContext)
}
48 changes: 38 additions & 10 deletions backend/app/telegram/telegram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@ import (
)

func init() {
testClient, err := statsd.New("127.0.0.1:8125", statsd.WithNamespace("tests."))
if err != nil {
log.Fatalf("error creating test DataDog client: %v", err)
}
config.CONFIG = &config.Config{
DataDogClient: testClient,
}
setupTestDatadog()

redis.RedisClient = redis.NewMockRedisClient()

Expand All @@ -33,21 +27,31 @@ func init() {
},
)

setupBot()
setupTestBot()
setupCommandHandlers()
}

func getTestBot() *telego.Bot {
return &telego.Bot{}
}

func setupBot() {
func setupTestBot() {
BOT = &Bot{
Name: "testbot",
Bot: getTestBot(),
}
}

func setupTestDatadog() {
testClient, err := statsd.New("127.0.0.1:8125", statsd.WithNamespace("tests."))
if err != nil {
log.Fatalf("error creating test DataDog client: %v", err)
}
config.CONFIG = &config.Config{
DataDogClient: testClient,
}
}

func getSendMessageFuncAssertion(t *testing.T, expectedRegex string, expectedChatID int64) func(bot *telego.Bot, params *telego.SendMessageParams) (*telego.Message, error) {
return func(bot *telego.Bot, params *telego.SendMessageParams) (*telego.Message, error) {
if params.ChatID.ID != expectedChatID {
Expand All @@ -62,7 +66,31 @@ func getSendMessageFuncAssertion(t *testing.T, expectedRegex string, expectedCha
t.Errorf("Expected message to match regex %s, got %s", expectedRegex, params.Text)
}

return &telego.Message{}, nil
return &telego.Message{
MessageID: 12345,
Text: params.Text,
}, nil
}
}

func getEditMessageFuncAssertion(t *testing.T, expectedRegex string, expectedChatID int64) func(bot *telego.Bot, params *telego.EditMessageTextParams) (*telego.Message, error) {
return func(bot *telego.Bot, params *telego.EditMessageTextParams) (*telego.Message, error) {
if params.ChatID.ID != expectedChatID {
t.Errorf("Expected chat ID %d, got %d", expectedChatID, params.ChatID.ID)
}

matched, err := regexp.MatchString(expectedRegex, params.Text)
if err != nil {
t.Errorf("Error matching regex: %v", err)
}
if !matched {
t.Errorf("Expected message to match regex %s, got %s", expectedRegex, params.Text)
}

return &telego.Message{
MessageID: params.MessageID,
Text: params.Text,
}, nil
}
}

Expand Down

0 comments on commit e010cc1

Please sign in to comment.