From 81a3139b92e4b501acf7c4825e65dd198e8ab438 Mon Sep 17 00:00:00 2001 From: Nicolas Lenz Date: Tue, 17 Oct 2023 13:12:13 +0200 Subject: [PATCH] fix(openai): parsing of streaming completions --- ihp-openai/IHP/OpenAI.hs | 87 ++++++++++++++++++++++------------------ 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/ihp-openai/IHP/OpenAI.hs b/ihp-openai/IHP/OpenAI.hs index 3991d43e5..f5ba2daf1 100644 --- a/ihp-openai/IHP/OpenAI.hs +++ b/ihp-openai/IHP/OpenAI.hs @@ -120,45 +120,56 @@ streamCompletion secretKey completionRequest' onStart callback = do streamCompletionWithoutRetry :: ByteString -> CompletionRequest -> IO () -> (Text -> IO ()) -> IO (Either Text Text) streamCompletionWithoutRetry secretKey completionRequest' onStart callback = do - let completionRequest = enableStream completionRequest' - modifyContextSSL (\context -> do - SSL.contextSetVerificationMode context SSL.VerifyNone - pure context - ) - withOpenSSL do - withConnection (establishConnection "https://api.openai.com/v1/chat/completions") \connection -> do - let q = buildRequest1 do - http POST "/v1/chat/completions" - setContentType "application/json" - Network.Http.Client.setHeader "Authorization" ("Bearer " <> secretKey) - - sendRequest connection q (jsonBody completionRequest) + let completionRequest = enableStream completionRequest' + modifyContextSSL (\context -> do + SSL.contextSetVerificationMode context SSL.VerifyNone + pure context + ) + withOpenSSL do + withConnection (establishConnection "https://api.openai.com/v1/chat/completions") \connection -> do + let q = buildRequest1 do + http POST "/v1/chat/completions" + setContentType "application/json" + Network.Http.Client.setHeader "Authorization" ("Bearer " <> secretKey) + sendRequest connection q (jsonBody completionRequest) + onStart + receiveResponse connection handler - let handler = \p i -> do - let status = getStatusCode p - if status == 200 - then do - x <- Streams.foldM (parseResponseChunk callback) ("", "") i - return (Right (snd x)) - else do - x <- Streams.fold mappend mempty i - return (Left $ "an error happend: " <> Text.pack (show x)) - - onStart - receiveResponse connection handler where - parseResponseChunk :: (Text -> IO ()) -> (ByteString, Text) -> ByteString -> IO (ByteString, Text) - parseResponseChunk callback (curBuffer, chunk) input = do - case ByteString.stripPrefix "data: " (ByteString.strip (curBuffer <> input)) of - Just json -> do - case decodeStrict json of - Just CompletionResult { choices } -> do - let tokens :: Text = mconcat $ map (.text) choices - callback tokens - pure ("", chunk <> tokens) - otherwise -> do - pure (curBuffer <> json, chunk) - Nothing -> pure (curBuffer <> input, chunk) + handler :: Response -> Streams.InputStream ByteString -> IO (Either Text Text) + handler response stream = do + let status = getStatusCode response + if status == 200 + then do + {- + parse stream line by line as event stream format according to API spec: + https://platform.openai.com/docs/api-reference/chat/create#chat/create-stream + https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format + -} + (_, _, output) <- Streams.lines stream >>= Streams.foldM (parseResponseChunk callback) ("", False, "") + return (Right output) + else do + x :: ByteString <- Streams.fold mappend mempty stream + return (Left $ "an error happend: " <> Text.pack (show x)) + + parseResponseChunk :: (Text -> IO ()) -> (ByteString, Bool, Text) -> ByteString -> IO (ByteString, Bool, Text) + parseResponseChunk callback (curBuffer, emptyLineFound, chunk) input + -- input line is empty, but previous was not, append newline to buffer + | ByteString.null input && not emptyLineFound = pure (curBuffer <> "\n", True, chunk) + -- input line is empty, previous line was already empty: message ended, clear buffer + | ByteString.null input && emptyLineFound = pure ("", True, chunk) + -- lines starting with : are comments, ignore + | ":" `ByteString.isPrefixOf` input = pure (curBuffer, False, chunk) + -- try to parse line together with buffer otherwise + | otherwise = case ByteString.stripPrefix "data: " (ByteString.strip (curBuffer <> input)) of + Just json -> do + case eitherDecodeStrict json of + Right CompletionResult { choices } -> do + let tokens :: Text = mconcat $ map (.text) choices + callback tokens + pure ("", False, chunk <> tokens) + Left err -> pure (curBuffer <> json, False, chunk) + Nothing -> pure (curBuffer <> input, False, chunk) fetchCompletion :: ByteString -> CompletionRequest -> IO Text @@ -192,4 +203,4 @@ fetchCompletionWithoutRetry secretKey completionRequest = do pure (mconcat $ map (.text) completionResult.choices) enableStream :: CompletionRequest -> CompletionRequest -enableStream completionRequest = completionRequest { stream = True } \ No newline at end of file +enableStream completionRequest = completionRequest { stream = True }