Skip to content

Commit

Permalink
#163: Error on stream ended unexpectedly
Browse files Browse the repository at this point in the history
  • Loading branch information
roma-glushko committed Mar 17, 2024
1 parent 739d960 commit 2198aa2
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 10 deletions.
11 changes: 3 additions & 8 deletions pkg/providers/openai/chat_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,15 @@ func (s *ChatStream) Recv() (*schemas.ChatStreamChunk, error) {
for {
rawEvent, err := s.reader.ReadEvent()
if err != nil {
if err == io.EOF {
s.tel.L().Debug("Chat stream is over", zap.String("provider", providerName))

// TODO: This should be treated as an error probably (unexpected stream end)

return nil, io.EOF
}

s.tel.L().Warn(
"Chat stream is unexpectedly disconnected",
zap.String("provider", providerName),
zap.Error(err),
)

// if err is io.EOF, this still means that the stream is interrupted unexpectedly
// because the normal stream termination is done via finding out streamDoneMarker

return nil, clients.ErrProviderUnavailable
}

Expand Down
70 changes: 68 additions & 2 deletions pkg/providers/openai/chat_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ func TestOpenAIClient_ChatStreamSupported(t *testing.T) {
func TestOpenAIClient_ChatStreamRequest(t *testing.T) {
tests := map[string]string{
"success stream": "./testdata/chat_stream.success.txt",
"success stream, but no last done message": "./testdata/chat_stream.nodone.txt",
"success stream, but with empty event": "./testdata/chat_stream.empty.txt",
}

for name, streamFile := range tests {
Expand Down Expand Up @@ -95,3 +93,71 @@ func TestOpenAIClient_ChatStreamRequest(t *testing.T) {
})
}
}

func TestOpenAIClient_ChatStreamRequestInterrupted(t *testing.T) {
tests := map[string]string{
"success stream, but no last done message": "./testdata/chat_stream.nodone.txt",
"success stream, but with empty event": "./testdata/chat_stream.empty.txt",
}

for name, streamFile := range tests {
t.Run(name, func(t *testing.T) {
openAIMock := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rawPayload, _ := io.ReadAll(r.Body)

var data interface{}
// Parse the JSON body
err := json.Unmarshal(rawPayload, &data)
if err != nil {
t.Errorf("error decoding payload (%q): %v", string(rawPayload), err)
}

chatResponse, err := os.ReadFile(filepath.Clean(streamFile))
if err != nil {
t.Errorf("error reading openai chat mock response: %v", err)
}

w.Header().Set("Content-Type", "text/event-stream")

_, err = w.Write(chatResponse)
if err != nil {
t.Errorf("error on sending chat response: %v", err)
}
})

openAIServer := httptest.NewServer(openAIMock)
defer openAIServer.Close()

ctx := context.Background()
providerCfg := DefaultConfig()
clientCfg := clients.DefaultClientConfig()

providerCfg.BaseURL = openAIServer.URL

client, err := NewClient(providerCfg, clientCfg, telemetry.NewTelemetryMock())
require.NoError(t, err)

req := schemas.ChatRequest{Message: schemas.ChatMessage{
Role: "user",
Content: "What's the capital of the United Kingdom?",
}}

stream, err := client.ChatStream(ctx, &req)
require.NoError(t, err)

err = stream.Open()
require.NoError(t, err)

for {
chunk, err := stream.Recv()
if err != nil {
require.ErrorIs(t, err, clients.ErrProviderUnavailable)
return
}

require.NoError(t, err)
require.NotNil(t, chunk)
}
})
}
}

0 comments on commit 2198aa2

Please sign in to comment.