From 35d9587b773f0e5d5cbf8746031cc7c69f52e555 Mon Sep 17 00:00:00 2001 From: ajhai Date: Tue, 22 Oct 2024 00:39:05 -0700 Subject: [PATCH] Handle stop event in app consumer --- llmstack/server/consumers.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/llmstack/server/consumers.py b/llmstack/server/consumers.py index 99a51e19b33..e3435de73d5 100644 --- a/llmstack/server/consumers.py +++ b/llmstack/server/consumers.py @@ -151,20 +151,25 @@ async def _run_app(self, request_uuid, request, **kwargs): async def _respond_to_event(self, text_data): json_data = json.loads(text_data) client_request_id = json_data.get("id", None) - app_runner_request = AppRunnerRequest( - client_request_id=client_request_id, - session_id=self._session_id, - input=json_data.get("input", {}), - ) - try: - response_iterator = self._app_runner.run(app_runner_request) - async for response in response_iterator: - if response.type == AppRunnerStreamingResponseType.OUTPUT_STREAM_CHUNK: - await self.send(text_data=json.dumps(response.model_dump())) - elif response.type == AppRunnerStreamingResponseType.OUTPUT_STREAM_END: - await self.send(text_data=json.dumps({"event": "done", "request_id": client_request_id})) - except Exception as e: - logger.exception(f"Failed to run app: {e}") + event = json_data.get("event", None) + + if event == "run": + app_runner_request = AppRunnerRequest( + client_request_id=client_request_id, + session_id=self._session_id, + input=json_data.get("input", {}), + ) + try: + response_iterator = self._app_runner.run(app_runner_request) + async for response in response_iterator: + if response.type == AppRunnerStreamingResponseType.OUTPUT_STREAM_CHUNK: + await self.send(text_data=json.dumps(response.model_dump())) + elif response.type == AppRunnerStreamingResponseType.OUTPUT_STREAM_END: + await self.send(text_data=json.dumps({"event": "done", "request_id": client_request_id})) + except Exception as e: + logger.exception(f"Failed to run app: {e}") + elif event == "stop": + self.disconnect() async def _respond_to_event_old(self, text_data): from llmstack.apps.apis import AppViewSet