From 614b493eb22c01dc75c99f6bfbe3fbc59152a790 Mon Sep 17 00:00:00 2001 From: Ajay Chintala Date: Wed, 16 Oct 2024 10:27:59 -0700 Subject: [PATCH] New coordinator initial commit (#289) --- llmstack/apps/apis.py | 157 ++++------ llmstack/apps/app_session_utils.py | 124 +++----- .../apps/handlers/app_processor_runner.py | 4 +- llmstack/apps/handlers/app_runnner.py | 6 +- llmstack/apps/handlers/playground_runner.py | 4 +- llmstack/apps/runner/__init__.py | 0 llmstack/apps/runner/app_coordinator.py | 132 ++++++++ llmstack/apps/runner/app_runner.py | 216 +++++++++++++ llmstack/apps/runner/input_actor.py | 47 +++ llmstack/apps/runner/output_actor.py | 128 ++++++++ llmstack/apps/urls.py | 2 +- llmstack/client/package-lock.json | 5 +- llmstack/client/package.json | 1 + .../client/src/components/apps/AppEditor.jsx | 3 +- .../src/components/apps/ProcessorEditor.jsx | 25 +- .../components/apps/renderer/AppRenderer.jsx | 69 ++--- .../src/components/apps/renderer/Messages.jsx | 4 + llmstack/client/src/data/utils.js | 54 ++++ llmstack/client/src/pages/AppConsole.jsx | 1 + llmstack/common/utils/liquid.py | 26 ++ llmstack/common/utils/utils.py | 66 ++-- llmstack/play/actor.py | 96 +++--- llmstack/play/actors/agent.py | 10 +- llmstack/play/actors/bookkeeping.py | 5 - llmstack/play/actors/input.py | 87 ------ llmstack/play/actors/output.py | 145 --------- llmstack/play/coordinator.py | 23 +- llmstack/play/messages.py | 57 ++++ llmstack/play/output_stream.py | 290 +++++++++--------- llmstack/play/utils.py | 54 ---- .../providers/api_processor_interface.py | 64 ++-- .../{api_processors.py => processors.py} | 6 +- llmstack/processors/serializers.py | 42 +-- llmstack/server/asgi.py | 4 +- llmstack/server/consumers.py | 142 ++++++--- poetry.lock | 21 +- pyproject.toml | 1 + 37 files changed, 1192 insertions(+), 929 deletions(-) create mode 100644 llmstack/apps/runner/__init__.py create mode 100644 llmstack/apps/runner/app_coordinator.py create mode 100644 llmstack/apps/runner/app_runner.py create mode 100644 llmstack/apps/runner/input_actor.py create mode 100644 llmstack/apps/runner/output_actor.py delete mode 100644 llmstack/play/actors/input.py delete mode 100644 llmstack/play/actors/output.py create mode 100644 llmstack/play/messages.py rename llmstack/processors/providers/{api_processors.py => processors.py} (86%) diff --git a/llmstack/apps/apis.py b/llmstack/apps/apis.py index 2a1af5e383b..00bebeaa71d 100644 --- a/llmstack/apps/apis.py +++ b/llmstack/apps/apis.py @@ -3,13 +3,14 @@ import uuid import yaml +from asgiref.sync import async_to_sync from channels.db import database_sync_to_async from django.conf import settings from django.core.validators import validate_email from django.db.models import Q from django.forms import ValidationError from django.http import StreamingHttpResponse -from django.shortcuts import get_object_or_404 +from django.shortcuts import aget_object_or_404, get_object_or_404 from django.utils.decorators import method_decorator from django.views.decorators.cache import cache_page from django.views.decorators.clickjacking import xframe_options_exempt @@ -23,15 +24,13 @@ from rest_framework.response import Response as DRFResponse from llmstack.apps.app_session_utils import create_app_session -from llmstack.apps.handlers.app_processor_runner import AppProcessorRunner -from llmstack.apps.handlers.app_runner_factory import AppRunnerFactory -from llmstack.apps.handlers.playground_runner import PlaygroundRunner from llmstack.apps.integration_configs import ( DiscordIntegrationConfig, SlackIntegrationConfig, TwilioIntegrationConfig, WebIntegrationConfig, ) +from llmstack.apps.runner.app_runner import AppRunner from llmstack.apps.yaml_loader import ( get_app_template_by_slug, get_app_templates_from_contrib, @@ -41,7 +40,7 @@ from llmstack.connections.apis import ConnectionsViewSet from llmstack.emails.sender import EmailSender from llmstack.emails.templates.factory import EmailTemplateFactory -from llmstack.processors.providers.api_processors import ApiProcessorFactory +from llmstack.processors.providers.processors import ProcessorFactory from .models import App, AppData, AppHub, AppType, AppVisibility from .serializers import ( @@ -512,7 +511,7 @@ def patch(self, request, uid): ) try: for processor in processors_data: - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.get_api_processor( processor["processor_slug"], processor["provider_slug"] ) configuration_cls = processor_cls.get_configuration_cls() @@ -708,12 +707,12 @@ def post(self, request): @action(detail=True, methods=["post"]) @xframe_options_exempt - def run(self, request, uid, session_id=None, platform=None): + def run(self, request, app_uuid, session_id=None, platform=None): stream = request.data.get("stream", False) request_uuid = str(uuid.uuid4()) try: result = self.run_app_internal( - uid, + app_uuid, session_id, request_uuid, request, @@ -746,10 +745,9 @@ async def init_app_async(self, uid): return await database_sync_to_async(self.init_app)(uid) def init_app(self, uid): - app = get_object_or_404(App, uuid=uuid.UUID(uid)) session_id = str(uuid.uuid4()) - create_app_session(app, session_id) + create_app_session(session_id) return session_id @@ -822,12 +820,6 @@ def run_playground_internal( preview=False, disable_history=False, ): - from llmstack.apps.handlers.playground_runner import ( - PlaygroundApp, - PlaygroundAppType, - ) - - stream = request.data.get("stream", True) request_ip = request.headers.get("X-Forwarded-For", request.META.get("REMOTE_ADDR", "")).split(",")[ 0 ].strip() or request.META.get("HTTP_X_REAL_IP", "") @@ -838,9 +830,6 @@ def run_playground_internal( location = get_location(request_ip) request_location = f"{location.get('city', '')}, {location.get('country_code', '')}" if location else "" - request_user_agent = request.META.get("HTTP_USER_AGENT", "") - request_content_type = request.META.get("CONTENT_TYPE", "") - if flag_enabled( "HAS_EXCEEDED_MONTHLY_PROCESSOR_RUN_QUOTA", request=request, @@ -850,49 +839,9 @@ def run_playground_internal( "You have exceeded your monthly processor run quota. Please upgrade your plan to continue using the platform.", ) - app_owner_profile = ( - Profile.objects.get(user=request.user) if request.user.is_authenticated else AnonymousProfile() - ) - owner_connections = get_connections(app_owner_profile) if request.user.is_authenticated else {} processor_id = request.data["input"]["api_provider_slug"] + "_" + request.data["input"]["api_backend_slug"] - processor_cls = ApiProcessorFactory.get_api_processor( - request.data["input"]["api_backend_slug"], - request.data["input"]["api_provider_slug"], - ) - app = PlaygroundApp( - id="", - uuid="ebb44d38-76b8-4735-b33b-5aadfe6470f9", - type=PlaygroundAppType(slug="playground"), - web_integration_config={}, - is_published=True, - ) - app_runner = PlaygroundRunner( - app=app, - app_data={ - "processors": [ - { - "id": processor_id, - "processor_slug": request.data["input"]["api_backend_slug"], - "provider_slug": request.data["input"]["api_provider_slug"], - "input": request.data["input"]["input"], - "config": request.data["input"]["config"], - } - ], - "output_template": processor_cls.get_output_template(), - }, - request_uuid=request_uuid, - request=request, - session_id=session_id, - app_owner=app_owner_profile, - stream=stream, - request_ip=request_ip, - request_location=request_location, - request_user_agent=request_user_agent, - request_content_type=request_content_type, - disable_history=False, - connections=owner_connections, - ) + app_runner = None return app_runner.run_app(processor_id=processor_id) @@ -978,9 +927,58 @@ def run_platform_app_internal( return app_runner.run_app() + async def get_app_runner_async( + self, + session_id, + app_uuid, + source, + request_user, + preview=False, + app_data=None, + ): + runner_user = request_user + if not app_data: + app = await aget_object_or_404(App, uuid=uuid.UUID(app_uuid)) + app_data_obj = ( + await AppData.objects.filter( + app_uuid=app.uuid, + is_draft=preview, + ) + .order_by("-created_at") + .afirst() + ) + if not app_data_obj: + raise Exception("App data not found") + app_data = app_data_obj.data + + if not runner_user: + runner_user = app.owner + + app_run_user_profile = await Profile.objects.aget(user=runner_user) + vendor_env = { + "provider_configs": await database_sync_to_async(app_run_user_profile.get_merged_provider_configs)(), + } + + return AppRunner( + session_id=session_id, + app_data=app_data, + source=source, + vendor_env=vendor_env, + ) + + def get_app_runner(self, session_id, app_uuid, source, request_user, preview=False, app_data=None): + return async_to_sync(self.get_app_runner_async)( + session_id, + app_uuid, + source, + request_user, + preview, + app_data, + ) + def run_app_internal( self, - uid, + app_uuid, session_id, request_uuid, request, @@ -991,12 +989,12 @@ def run_app_internal( app_store_app_data=None, ): app = ( - get_object_or_404(App, uuid=uuid.UUID(uid)) + get_object_or_404(App, uuid=uuid.UUID(app_uuid)) if not app_store_app_data else App( name=app_store_app_data.get("name", ""), store_uuid=app_store_uuid, - uuid=uid if uid else app_store_uuid, + uuid=app_uuid if app_uuid else app_store_uuid, owner=request.user, type=AppType(slug=app_store_app_data.get("type_slug", "agent")), is_published=True, @@ -1069,16 +1067,6 @@ def run_app_internal( ) app_runner_class = None - if platform == "discord": - app_runner_class = AppRunnerFactory.get_app_runner("discord") - elif platform == "slack": - app_runner_class = AppRunnerFactory.get_app_runner("slack") - elif platform == "twilio-sms": - app_runner_class = AppRunnerFactory.get_app_runner("twilio-sms") - elif platform == "twilio-voice": - app_runner_class = AppRunnerFactory.get_app_runner("twilio-voice") - else: - app_runner_class = AppRunnerFactory.get_app_runner(app.type.slug) app_runner = app_runner_class( app=app, @@ -1110,9 +1098,6 @@ def run_processor_internal( disable_history=False, ): app = get_object_or_404(App, uuid=uuid.UUID(uid)) - app_owner = get_object_or_404(Profile, user=app.owner) - - stream = request.data.get("stream", False) request_ip = request.headers.get( "X-Forwarded-For", @@ -1133,9 +1118,6 @@ def run_processor_internal( location = get_location(request_ip) request_location = f"{location.get('city', '')}, {location.get('country_code', '')}" if location else "" - request_user_agent = request.META.get("HTTP_USER_AGENT", "") - request_content_type = request.META.get("CONTENT_TYPE", "") - if flag_enabled( "HAS_EXCEEDED_MONTHLY_PROCESSOR_RUN_QUOTA", request=request, @@ -1165,22 +1147,7 @@ def run_processor_internal( .first() ) - owner_connections = get_connections(app_owner) - app_runner = AppProcessorRunner( - app=app, - app_data=app_data_obj.data if app_data_obj else None, - request_uuid=request_uuid, - request=request, - session_id=session_id, - app_owner=app_owner, - stream=stream, - request_ip=request_ip, - request_location=request_location, - request_user_agent=request_user_agent, - request_content_type=request_content_type, - disable_history=disable_history, - connections=owner_connections, - ) + app_runner = None return app_runner.run_app(processor_id=processor_id) diff --git a/llmstack/apps/app_session_utils.py b/llmstack/apps/app_session_utils.py index 05969688f61..17a3081663f 100644 --- a/llmstack/apps/app_session_utils.py +++ b/llmstack/apps/app_session_utils.py @@ -1,122 +1,72 @@ import logging -from datetime import datetime +import uuid +from datetime import datetime, timezone import orjson as json from django.conf import settings from django.core.cache import caches -APP_SESSION_TIMEOUT = settings.APP_SESSION_TIMEOUT - logger = logging.getLogger(__name__) -app_session_store = caches["app_session"] -app_session_data_store = caches["app_session_data"] +def create_app_session(app_session_id=None): + app_session_store = caches["app_session"] + app_session_id = app_session_id or str(uuid.uuid4()) -def create_app_session(app, app_session_uuid): + current_time = datetime.now(timezone.utc).isoformat() app_session = { - "uuid": app_session_uuid, - "app": app.id if app else -1, - "created_at": str(datetime.now()), - "last_updated_at": str(datetime.now()), + "id": app_session_id, + "data": {}, + "created_at": current_time, + "last_updated_at": current_time, } + app_session_store.set( - f"app_session_{app_session_uuid}", + f"app_session_{app_session['id']}", json.dumps(app_session), + settings.APP_SESSION_TIMEOUT, ) + return app_session -def get_app_session(app_session_uuid): - app_session = app_session_store.get(f"app_session_{app_session_uuid}") - if app_session is None: +def get_app_session(app_session_id): + if not app_session_id: return None - return json.loads(app_session) - - -def create_app_session_data(app_session, endpoint, data): - if not app_session: + app_session = caches["app_session"].get(f"app_session_{app_session_id}") + if app_session is None: return None - endpoint_id = endpoint["id"] if isinstance(endpoint, dict) else endpoint.id - - app_session_data = { - "app_session": app_session, - "endpoint": endpoint_id, - "data": data, - "created_at": str(datetime.now()), - "last_updated_at": str(datetime.now()), - } - app_session_data_store.set( - f'app_session_data_{app_session["uuid"]}_{endpoint_id}', - json.dumps( - app_session_data, - ), - APP_SESSION_TIMEOUT, - ) - return app_session_data - - -def save_app_session_data(app_session_data): - app_session_data_store.set( - f'app_session_data_{app_session_data["app_session"]["uuid"]}_{app_session_data["endpoint"]}', - json.dumps(app_session_data), - APP_SESSION_TIMEOUT, - ) - return app_session_data + return json.loads(app_session) -def get_app_session_data(app_session, endpoint): - if not app_session: - return None +def get_or_create_app_session(app_session_id=None): + app_session = get_app_session(app_session_id) - endpoint_id = endpoint["id"] if isinstance(endpoint, dict) else endpoint.id + if app_session is None: + app_session = create_app_session(app_session_id) + return app_session - app_session_data = app_session_data_store.get( - f'app_session_data_{app_session["uuid"]}_{endpoint_id}', - ) - if app_session_data is None: - return None - return json.loads(app_session_data) +def save_app_session_data(app_session_id, key, value): + app_session = get_or_create_app_session(app_session_id) -def create_agent_app_session_data(app_session, data): - if not app_session: - return None + app_session["data"][key] = value + app_session["last_updated_at"] = datetime.now(timezone.utc).isoformat() - app_session_data = { - "app_session": app_session, - "data": data, - "created_at": str(datetime.now()), - "last_updated_at": str(datetime.now()), - } - app_session_data_store.set( - f'app_session_data_{app_session["uuid"]}_agent', - json.dumps( - app_session_data, - ), - APP_SESSION_TIMEOUT, + caches["app_session"].set( + f"app_session_{app_session['id']}", + json.dumps(app_session), + settings.APP_SESSION_TIMEOUT, ) - return app_session_data -def save_agent_app_session_data(app_session_data): - app_session_data_store.set( - f'app_session_data_{app_session_data["app_session"]["uuid"]}_agent', - json.dumps(app_session_data), - APP_SESSION_TIMEOUT, - ) - return app_session_data +def get_app_session_data(app_session_id, key): + app_session = get_app_session(app_session_id) + return app_session["data"].get(key, None) if app_session else None -def get_agent_app_session_data(app_session): - if not app_session: - return None - app_session_data = app_session_data_store.get( - f'app_session_data_{app_session["uuid"]}_agent', - ) - if app_session_data is None: - return None - return json.loads(app_session_data) +def delete_app_session(app_session_id): + caches["app_session"].delete(f"app_session_{app_session_id}") diff --git a/llmstack/apps/handlers/app_processor_runner.py b/llmstack/apps/handlers/app_processor_runner.py index cb2998f9409..52f6cea0d4c 100644 --- a/llmstack/apps/handlers/app_processor_runner.py +++ b/llmstack/apps/handlers/app_processor_runner.py @@ -8,7 +8,7 @@ from llmstack.play.actor import ActorConfig from llmstack.play.actors.input import InputActor from llmstack.play.actors.output import OutputActor -from llmstack.processors.providers.api_processors import ApiProcessorFactory +from llmstack.processors.providers.processors import ProcessorFactory logger = logging.getLogger(__name__) @@ -54,7 +54,7 @@ def _get_processor_actor_configs(self, processor_id): "processor_slug and provider_slug are required for each processor", ) - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.api_processor( app_processor["processor_slug"], app_processor["provider_slug"], ) diff --git a/llmstack/apps/handlers/app_runnner.py b/llmstack/apps/handlers/app_runnner.py index aff3eb44582..df97b668309 100644 --- a/llmstack/apps/handlers/app_runnner.py +++ b/llmstack/apps/handlers/app_runnner.py @@ -23,7 +23,7 @@ from llmstack.play.coordinator import Coordinator from llmstack.play.utils import convert_template_vars_from_legacy_format from llmstack.processors.providers.api_processor_interface import ApiProcessorInterface -from llmstack.processors.providers.api_processors import ApiProcessorFactory +from llmstack.processors.providers.processors import ProcessorFactory logger = logging.getLogger(__name__) @@ -207,7 +207,7 @@ def _get_processor_actor_configs(self): "processor_slug and provider_slug are required for each processor", ) - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.api_processor( processor["processor_slug"], processor["provider_slug"], ) @@ -280,7 +280,7 @@ def _get_processor_actor_configs(self): len(processors) + 1, ), ): - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.api_processor( processor.api_backend.slug, processor.api_backend.api_provider.slug, ) diff --git a/llmstack/apps/handlers/playground_runner.py b/llmstack/apps/handlers/playground_runner.py index fcf81177f4d..92e11c18d8c 100644 --- a/llmstack/apps/handlers/playground_runner.py +++ b/llmstack/apps/handlers/playground_runner.py @@ -8,7 +8,7 @@ from llmstack.play.actors.bookkeeping import BookKeepingActor from llmstack.play.actors.input import InputActor from llmstack.play.actors.output import OutputActor -from llmstack.processors.providers.api_processors import ApiProcessorFactory +from llmstack.processors.providers.processors import ProcessorFactory logger = logging.getLogger(__name__) @@ -67,7 +67,7 @@ def _get_processor_actor_configs(self, processor_id): "processor_slug and provider_slug are required for each processor", ) - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.api_processor( app_processor["processor_slug"], app_processor["provider_slug"], ) diff --git a/llmstack/apps/runner/__init__.py b/llmstack/apps/runner/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/llmstack/apps/runner/app_coordinator.py b/llmstack/apps/runner/app_coordinator.py new file mode 100644 index 00000000000..199725d25d1 --- /dev/null +++ b/llmstack/apps/runner/app_coordinator.py @@ -0,0 +1,132 @@ +import logging +import uuid +from typing import Dict, List + +from pykka import ThreadingActor + +from llmstack.apps.runner.input_actor import InputActor +from llmstack.apps.runner.output_actor import OutputActor +from llmstack.play.actor import ActorConfig +from llmstack.play.messages import ContentData +from llmstack.play.output_stream import Message, MessageType + +logger = logging.getLogger(__name__) + + +class AppCoordinator(ThreadingActor): + def __init__(self, actor_configs: List[ActorConfig], output_template: str = ""): + super().__init__() + + # Make sure there are no duplicate names in actor_configs + assert len(set([actor_config.name for actor_config in actor_configs])) == len( + actor_configs, + ) + + # Make sure none of the actor_config names are in ["input", "output", "agent"] + for actor_config in actor_configs: + if actor_config.name in ["input", "output", "agent", "coordinator"]: + raise ValueError(f"Actor config name {actor_config.name} is reserved") + + self._actor_configs_map = {actor_config.name: actor_config for actor_config in actor_configs} + + # Map of actor id to actor + self.actors = { + "input": InputActor.start( + coordinator_urn=self.actor_urn, + ), + "output": OutputActor.start( + coordinator_urn=self.actor_urn, + dependencies=["input"] + list(self._actor_configs_map.keys()), + templates={"output": output_template}, + ), + } + + self._actor_dependencies = {ac.name: set(ac.dependencies) for ac in actor_configs} + self._actor_dependencies["output"] = set(["input"] + list(self._actor_configs_map.keys())) + self._actor_dependencies["input"] = set() + + self._actor_dependents = {ac.name: set() for ac in actor_configs} + self._actor_dependents["input"] = set(list(self._actor_configs_map.keys()) + ["output"]) + self._actor_dependents["output"] = set() + + # Update dependents based on dependencies + for actor, deps in self._actor_dependencies.items(): + for dep in deps: + if dep in self._actor_dependents: + self._actor_dependents[dep].add(actor) + + # Start actors that have no dependencies and send a BEGIN message + for actor_config in actor_configs: + if not self._actor_dependencies[actor_config.name]: + actor_id = actor_config.name + self.actors[actor_id] = actor_config.actor.start( + id=actor_id, + coordinator_urn=self.actor_urn, + dependencies=actor_config.dependencies, + **actor_config.kwargs, + ) + self.tell_actor(actor_id, Message(type=MessageType.BEGIN)) + + def tell_actor(self, actor_id: str, message: Message): + if actor_id not in self.actors: + if "/" in actor_id: + actor_id_prefix = actor_id.split("/")[0] + else: + actor_id_prefix = actor_id + + logger.info(f"Starting actor {actor_id}") + self.actors[actor_id] = self._actor_configs_map[actor_id_prefix].actor.start( + id=actor_id, + coordinator_urn=self.actor_urn, + dependencies=self._actor_configs_map[actor_id_prefix].dependencies, + **self._actor_configs_map[actor_id_prefix].kwargs, + ) + + self.actors[actor_id].tell(message) + + def relay(self, message: Message): + logger.debug(f"Relaying message {message} to {self._actor_dependents.get(message.sender)}") + + # Relay message to all dependents + for dependent in self._actor_dependents.get(message.sender, set()): + self.tell_actor(dependent, message) + + # Send to message.receiver if we have not already sent to it + if message.receiver != "coordinator" and message.receiver not in self._actor_dependents.get( + message.sender, set() + ): + self.tell_actor(message.receiver, message) + + def input(self, data: Dict): + message = Message( + id=str(uuid.uuid4()), + type=MessageType.CONTENT, + sender="coordinator", + receiver="input", + data=ContentData(content=data), + ) + + # Reset actors before handling new input + self.reset_actors() + + self.tell_actor("input", message) + + async def output(self): + return await self.actors["output"].proxy().get_output() + + def bookkeeping_data(self): + return self.actors["output"].proxy().get_bookkeeping_data().get() + + def on_stop(self): + logger.info("Coordinator is stopping") + self.stop_actors() + super().on_stop() + + def stop_actors(self): + logger.info("Stopping actors") + for actor in self.actors.values(): + actor.stop() + + def reset_actors(self): + for actor in self.actors.values(): + actor.proxy().reset().get() diff --git a/llmstack/apps/runner/app_runner.py b/llmstack/apps/runner/app_runner.py new file mode 100644 index 00000000000..59f5de2722b --- /dev/null +++ b/llmstack/apps/runner/app_runner.py @@ -0,0 +1,216 @@ +import asyncio +import logging +import uuid +from enum import Enum +from typing import Dict, List, Optional, Union + +from pydantic import BaseModel + +from llmstack.apps.runner.app_coordinator import AppCoordinator +from llmstack.common.blocks.base.schema import StrEnum +from llmstack.events.apis import EventsViewSet +from llmstack.play.actor import ActorConfig +from llmstack.processors.providers.processors import ProcessorFactory + +logger = logging.getLogger(__name__) + + +class AppRunnerSourceType(str, Enum): + PLAYGROUND = "playground" + PLATFORM = "platform" + APP_STORE = "app_store" + SLACK = "slack" + TWILIO = "twilio" + DISCORD = "discord" + WEB = "web" + + def __str__(self): + return str(self.value) + + def __repr__(self): + return str(self) + + +class AppRunnerSource(BaseModel): + type: AppRunnerSourceType + id: str + + +class WebAppRunnerSource(AppRunnerSource): + type: AppRunnerSourceType = AppRunnerSourceType.WEB + request_ip: str + request_location: str + request_user_agent: str + request_content_type: str + + +class PlatformAppRunnerSource(AppRunnerSource): + type: AppRunnerSourceType = AppRunnerSourceType.PLATFORM + + +class AppStoreAppRunnerSource(AppRunnerSource): + type: AppRunnerSourceType = AppRunnerSourceType.APP_STORE + + +class SlackAppRunnerSource(AppRunnerSource): + type: AppRunnerSourceType = AppRunnerSourceType.SLACK + + +class TwilioAppRunnerSource(AppRunnerSource): + type: AppRunnerSourceType = AppRunnerSourceType.TWILIO + + +class DiscordAppRunnerSource(AppRunnerSource): + type: AppRunnerSourceType = AppRunnerSourceType.DISCORD + + +class AppRunnerStreamingResponseType(StrEnum): + ERRORS = "errors" + OUTPUT = "output" + OUTPUT_STREAM_CHUNK = "output_stream_chunk" + OUTPUT_STREAM_BEGIN = "output_stream_begin" + OUTPUT_STREAM_END = "output_stream_end" + + +class AppRunnerResponseError(BaseModel): + code: Optional[int] = None + message: str + + +class AppRunnerResponseData(BaseModel): + pass + + +class AppRunnerResponseErrorsData(AppRunnerResponseData): + errors: List[AppRunnerResponseError] + + +class AppRunnerResponseOutputData(AppRunnerResponseData): + output: Dict[str, str] + chunks: Optional[Dict] = None # Stitched structured output from all the processors + + +class AppRunnerResponseOutputChunkData(AppRunnerResponseData): + deltas: Optional[Dict[str, str]] = None # delta to be applied to each actor's output + chunk: Optional[Dict] = None # Structured output from each processor + + +class AppRunnerRequest(BaseModel): + client_request_id: Optional[str] = None # ID sent by the client + session_id: str + input: Dict + + +class AppRunnerResponse(BaseModel): + id: str # ID of the request + client_request_id: Optional[str] = None # ID sent by the client + output: Optional[str] = None + chunks: Optional[Dict] = None # Stitched structured output from all the processors + errors: Optional[List[AppRunnerResponseError]] = None + + +class AppRunnerStreamingResponse(BaseModel): + id: str # ID of the request + client_request_id: Optional[str] = None # ID sent by the client + type: AppRunnerStreamingResponseType + data: Optional[ + Union[AppRunnerResponseErrorsData, AppRunnerResponseOutputChunkData, AppRunnerResponseOutputData] + ] = None + + +class AppRunner: + def _get_actor_configs_from_processors( + self, processors: List[Dict], session_id: str, is_agent: bool, vendor_env: Dict = {} + ): + actor_configs = [] + for processor in processors: + if "processor_slug" not in processor or "provider_slug" not in processor: + logger.warning( + "processor_slug and provider_slug are required for each processor", + ) + continue + + processor_cls = ProcessorFactory.get_processor( + processor["processor_slug"], + processor["provider_slug"], + ) + actor_configs.append( + ActorConfig( + name=processor["id"], + actor=processor_cls, + kwargs={ + "input": processor.get("input", {}), + "config": processor.get("config", {}), + "env": vendor_env, + "session_id": session_id, + }, + dependencies=processor.get("dependencies", []), + output_template=processor.get("output_template", None) if is_agent else None, + ), + ) + return actor_configs + + def __init__( + self, session_id: str = None, app_data: Dict = {}, source: AppRunnerSource = None, vendor_env: Dict = {} + ): + self._session_id = session_id or str(uuid.uuid4()) + self._app_data = app_data + self._source = source + self._is_agent = app_data.get("type_slug") == "agent" + + actor_configs = self._get_actor_configs_from_processors( + app_data.get("processors", []), self._session_id, self._is_agent, vendor_env + ) + output_template = app_data.get("output_template", {}).get("markdown", "") + self._coordinator = AppCoordinator.start(actor_configs=actor_configs, output_template=output_template).proxy() + + async def stop(self): + await self._coordinator.stop() + + async def run(self, request: AppRunnerRequest): + request_id = str(uuid.uuid4()) + + self._coordinator.input(request.input) + + yield AppRunnerStreamingResponse( + id=request_id, + client_request_id=request.client_request_id, + type=AppRunnerStreamingResponseType.OUTPUT_STREAM_BEGIN, + ) + async for output in await self._coordinator.output().get(): + if "chunks" in output: + # Received all chunks + break + + await asyncio.sleep(0.0001) + + yield AppRunnerStreamingResponse( + id=request_id, + client_request_id=request.client_request_id, + type=AppRunnerStreamingResponseType.OUTPUT_STREAM_CHUNK, + data=AppRunnerResponseOutputChunkData(deltas=output["deltas"], chunk=output["chunk"]), + ) + + yield AppRunnerStreamingResponse( + id=request_id, + client_request_id=request.client_request_id, + type=AppRunnerStreamingResponseType.OUTPUT_STREAM_END, + ) + + # Send the final output + if "chunks" in output: + yield AppRunnerStreamingResponse( + id=request_id, + client_request_id=request.client_request_id, + type=AppRunnerStreamingResponseType.OUTPUT, + data=AppRunnerResponseOutputData(output=output["output"], chunks=output["chunks"]), + ) + + # Persist bookkeeping data + bookkeeping_data = self._coordinator.bookkeeping_data().get().get() + EventsViewSet().create( + "app.run.finished", + { + "bookkeeping_data_map": bookkeeping_data, + }, + ) diff --git a/llmstack/apps/runner/input_actor.py b/llmstack/apps/runner/input_actor.py new file mode 100644 index 00000000000..6273d6e9c91 --- /dev/null +++ b/llmstack/apps/runner/input_actor.py @@ -0,0 +1,47 @@ +import logging +import time +from typing import Any, NamedTuple + +from asgiref.sync import async_to_sync + +from llmstack.play.actor import Actor, BookKeepingData + +logger = logging.getLogger(__name__) + + +class InputRequest(NamedTuple): + """ + Input request + """ + + request_endpoint_uuid: str + request_app_uuid: str + request_app_session_key: str + request_owner: object + request_uuid: str + request_user_email: str + request_ip: str + request_location: str + request_user_agent: str + request_content_type: str + request_body: str + disable_history: bool = False + request_app_store_uuid: str = "" + + +class InputActor(Actor): + def __init__( + self, + coordinator_urn, + ): + super().__init__(id="input", coordinator_urn=coordinator_urn, dependencies=["coordinator"]) + + def input(self, message: Any) -> Any: + async_to_sync(self._output_stream.write)(message["coordinator"]) + self._output_stream.finalize() + self._output_stream.bookkeep( + BookKeepingData( + input=message["coordinator"], + timestamp=time.time(), + ), + ) diff --git a/llmstack/apps/runner/output_actor.py b/llmstack/apps/runner/output_actor.py new file mode 100644 index 00000000000..513939b3fa4 --- /dev/null +++ b/llmstack/apps/runner/output_actor.py @@ -0,0 +1,128 @@ +import asyncio +import logging +from typing import Any, Dict, NamedTuple + +import pykka +from diff_match_patch import diff_match_patch +from pydantic import BaseModel + +from llmstack.common.utils.liquid import render_template +from llmstack.play.actor import Actor +from llmstack.play.messages import Message, MessageType +from llmstack.play.output_stream import stitch_model_objects + +logger = logging.getLogger(__name__) + +SENTINEL = object() + + +class OutputResponse(NamedTuple): + """ + Output response + """ + + response_content_type: str + response_status: int + response_body: str + response_headers: dict + + +class OutputActor(Actor): + def __init__( + self, + coordinator_urn, + dependencies, + templates: Dict[str, str] = {}, + ): + super().__init__(id="output", coordinator_urn=coordinator_urn, dependencies=dependencies) + self._templates = templates + self.reset() + self._diff_match_patch = diff_match_patch() + + def on_receive(self, message: Message) -> Any: + if message.type == MessageType.ERRORS: + self.on_error(message.sender, message.data.errors) + return + + if message.type == MessageType.CONTENT_STREAM_CHUNK: + try: + self._stitched_data = stitch_model_objects(self._stitched_data, {message.sender: message.data.chunk}) + new_int_output = render_template(self._templates["output"], self._stitched_data) + delta = self._diff_match_patch.diff_toDelta( + self._diff_match_patch.diff_main(self._int_output.get("output", ""), new_int_output) + ) + self._int_output["output"] = new_int_output + + self._content_queue.put_nowait( + { + "deltas": {"output": delta}, + "chunk": {message.sender: message.data.chunk}, + } + ) + except Exception as e: + logger.error(f"Error processing content stream chunk: {e}") + + if message.type == MessageType.CONTENT: + self._messages[message.sender] = ( + message.data.content.model_dump() + if isinstance(message.data.content, BaseModel) + else message.data.content + ) + + if set(self._dependencies) == set(self._messages.keys()): + self._content_queue.put_nowait( + { + "output": self._int_output, + "chunks": self._messages, + } + ) + + if message.type == MessageType.BOOKKEEPING: + self._bookkeeping_data_map[message.sender] = message.data + + if set(self._dependencies) == set(self._bookkeeping_data_map.keys()): + self._bookkeeping_data_future.set(self._bookkeeping_data_map) + + async def get_output(self): + try: + while True: + if not self._content_queue.empty(): + output = self._content_queue.get_nowait() + yield output + self._content_queue.task_done() + else: + if self._error or self._stopped: + break + await asyncio.sleep(0.01) + + except asyncio.CancelledError: + logger.info("Output stream cancelled") + finally: + if self._error: + yield {"errors": list(self._error.values())} + elif self._stopped and not self._data_sent: + yield {"errors": ["Output interrupted"]} + + logger.info("Output stream completed") + + def on_stop(self) -> None: + self._stopped = True + return super().on_stop() + + def on_error(self, sender, error) -> None: + logger.error(f"Error in output actor: {error}") + self._error = error + + def reset(self) -> None: + self._stitched_data = {} + self._int_output = {} + self._error = None + self._stopped = False + self._bookkeeping_data_map = {} + self._bookkeeping_data_future = pykka.ThreadingFuture() + self._content_queue = asyncio.Queue() + self._messages = {} + super().reset() + + def get_bookkeeping_data(self): + return self._bookkeeping_data_future diff --git a/llmstack/apps/urls.py b/llmstack/apps/urls.py index f87c04bc2f3..aa4892323b2 100644 --- a/llmstack/apps/urls.py +++ b/llmstack/apps/urls.py @@ -27,7 +27,7 @@ "api/apps//processors//run", apis.AppViewSet.as_view({"post": "processor_run"}), ), - path("api/apps//run", apis.AppViewSet.as_view({"post": "run"})), + path("api/apps//run", apis.AppViewSet.as_view({"post": "run"})), path( "api/apps//versions", apis.AppViewSet.as_view({"get": "versions"}), diff --git a/llmstack/client/package-lock.json b/llmstack/client/package-lock.json index 68b2d5cf980..7e598cc5310 100644 --- a/llmstack/client/package-lock.json +++ b/llmstack/client/package-lock.json @@ -1,12 +1,12 @@ { "name": "llmstack", - "version": "0.2.4", + "version": "0.2.5", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "llmstack", - "version": "0.2.4", + "version": "0.2.5", "dependencies": { "@fontsource/lato": "^5.0.20", "@glideapps/glide-data-grid": "^6.0.3", @@ -37,6 +37,7 @@ "@testing-library/react": "^13.4.0", "@testing-library/user-event": "^13.5.0", "axios": "^1.5.1", + "diff-match-patch": "^1.0.5", "dompurify": "^3.1.6", "hast-util-is-element": "^3.0.0", "hast-util-to-html": "^9.0.0", diff --git a/llmstack/client/package.json b/llmstack/client/package.json index 3e746c9dde3..bc8e7289df7 100644 --- a/llmstack/client/package.json +++ b/llmstack/client/package.json @@ -34,6 +34,7 @@ "@testing-library/react": "^13.4.0", "@testing-library/user-event": "^13.5.0", "axios": "^1.5.1", + "diff-match-patch": "^1.0.5", "dompurify": "^3.1.6", "hast-util-is-element": "^3.0.0", "hast-util-to-html": "^9.0.0", diff --git a/llmstack/client/src/components/apps/AppEditor.jsx b/llmstack/client/src/components/apps/AppEditor.jsx index 5b8c9991930..09edfb2b0c1 100644 --- a/llmstack/client/src/components/apps/AppEditor.jsx +++ b/llmstack/client/src/components/apps/AppEditor.jsx @@ -41,7 +41,7 @@ export function AppEditor(props) { label: "1. Input", items: schema, pillPrefix: "[1] Input / ", - id: "_inputs0", + id: "input", }, ...processors.map((p, index) => { const processor = processorList.find( @@ -155,6 +155,7 @@ export function AppEditor(props) { input: null, config: null, output_template: processor?.output_template, + dependencies: [], }); setProcessors(newProcessors); setActiveStep(newProcessors.length + 1); diff --git a/llmstack/client/src/components/apps/ProcessorEditor.jsx b/llmstack/client/src/components/apps/ProcessorEditor.jsx index a3c7a7ff996..65a53dcabfe 100644 --- a/llmstack/client/src/components/apps/ProcessorEditor.jsx +++ b/llmstack/client/src/components/apps/ProcessorEditor.jsx @@ -7,7 +7,6 @@ import { CardContent, FormControlLabel, Checkbox, - Typography, } from "@mui/material"; import validator from "@rjsf/validator-ajv8"; import { lazy, useEffect, useRef, useState, useMemo } from "react"; @@ -15,6 +14,7 @@ import { useRecoilValue } from "recoil"; import { useValidationErrorsForAppComponents } from "../../data/appValidation"; import { processorsState } from "../../data/atoms"; import "./AppEditor.css"; +import { getTemplateKeysFromObject } from "../../data/utils"; const ThemedJsonForm = lazy(() => import("../ThemedJsonForm")); const AppStepCard = lazy(() => import("./AppStepCard")); @@ -256,6 +256,12 @@ export function ProcessorEditor({ formData={processors[index].input} onChange={({ formData }) => { processors[index].input = formData; + processors[index].dependencies = isTool + ? [] + : getTemplateKeysFromObject({ + input: processors[index]?.input, + config: processors[index]?.config, + }); setProcessors([...processors]); }} widgets={{ @@ -293,6 +299,12 @@ export function ProcessorEditor({ formData={processors[index].config} onChange={({ formData }) => { processors[index].config = formData; + processors[index].dependencies = isTool + ? [] + : getTemplateKeysFromObject({ + input: processors[index]?.input, + config: processors[index]?.config, + }); setProcessors([...processors]); }} widgets={{ @@ -337,17 +349,6 @@ export function ProcessorEditor({ )} - - } - aria-controls="transformer-content" - id="transformer-header" - style={{ backgroundColor: "#dce8fb", display: "none" }} - > - Data Transformer - - - ); diff --git a/llmstack/client/src/components/apps/renderer/AppRenderer.jsx b/llmstack/client/src/components/apps/renderer/AppRenderer.jsx index 6872abef93d..239354a1f9c 100644 --- a/llmstack/client/src/components/apps/renderer/AppRenderer.jsx +++ b/llmstack/client/src/components/apps/renderer/AppRenderer.jsx @@ -9,7 +9,7 @@ import React, { } from "react"; import ReactGA from "react-ga4"; import { useLocation } from "react-router-dom"; -import { stitchObjects } from "../../../data/utils"; +import { diff_match_patch } from "diff-match-patch"; import { AppMessage, AppErrorMessage, @@ -77,6 +77,7 @@ export default function AppRenderer({ app, ws, onEventDone = null }) { const messagesRef = useRef(new Messages()); const isLoggedIn = useRecoilValue(isLoggedInState); const setAppRunData = useSetRecoilState(appRunDataState); + const dmp = new diff_match_patch(); if (ws && ws.messageRef) { messagesRef.current = ws.messageRef; @@ -286,6 +287,32 @@ export default function AppRenderer({ app, ws, onEventDone = null }) { } } + if (message.type && message.type === "output_stream_chunk") { + const existingMessageContent = + messagesRef.current.getContent(message.id) || ""; + + const deltas = message.data.deltas; + const diffs = dmp.diff_fromDelta( + existingMessageContent, + deltas["output"], + ); + const newMessageContent = dmp.diff_text2(diffs); + + messagesRef.current.add( + new AppMessage( + message.id, + message.client_request_id, + newMessageContent, + ), + ); + + setAppRunData((prevState) => ({ + ...prevState, + messages: messagesRef.current.get(), + isStreaming: newMessageContent !== existingMessageContent, + })); + } + if (message.errors && message.errors.length > 0) { message.errors.forEach((error) => { messagesRef.current.add( @@ -302,46 +329,6 @@ export default function AppRenderer({ app, ws, onEventDone = null }) { })); chunkedOutput.current = {}; } - - // Merge the new output with the existing output - if (message.output) { - let newChunkedOutput = {}; - - if (message.output.agent) { - newChunkedOutput = stitchObjects(chunkedOutput.current, { - [message.output.agent.id]: message.output.agent.content, - }); - } else { - newChunkedOutput = stitchObjects( - chunkedOutput.current, - message.output, - ); - } - - chunkedOutput.current = newChunkedOutput; - } - - if (message.id && message.output) { - parseIncomingMessage( - message, - chunkedOutput.current, - outputTemplate, - outputTemplates.current, - app?.data?.type_slug, - ) - .then((newMessage) => { - messagesRef.current.add(newMessage); - - setAppRunData((prevState) => ({ - ...prevState, - messages: messagesRef.current.get(), - isStreaming: newMessage.content !== null, - })); - }) - .catch((error) => { - console.error("Failed to create message object from output", error); - }); - } }); } diff --git a/llmstack/client/src/components/apps/renderer/Messages.jsx b/llmstack/client/src/components/apps/renderer/Messages.jsx index 70a2f06432a..ad929112865 100644 --- a/llmstack/client/src/components/apps/renderer/Messages.jsx +++ b/llmstack/client/src/components/apps/renderer/Messages.jsx @@ -129,4 +129,8 @@ export class Messages { clear() { this.messages = {}; } + + getContent(id) { + return this.messages[id]?.content; + } } diff --git a/llmstack/client/src/data/utils.js b/llmstack/client/src/data/utils.js index 8df938677c7..b88a03407db 100644 --- a/llmstack/client/src/data/utils.js +++ b/llmstack/client/src/data/utils.js @@ -201,3 +201,57 @@ export function getJSONSchemaFromInputFields(inputFields) { return { schema, uiSchema }; } + +/** + * Takes an object, recursively iterates through all the leaf elements of type string, + * and extracts the unique liquid template keys. + * + * For example, if some of the string leaf values contain: + * - "{{_inputs0.test}}" + * - "{% for output in proc1.outputs %}{{output}}{% endfor %}" + * - "{% assign test = proc2.output %}{{test}}" + * + * This function will return ["inputs0", "proc1", "proc2"]. + */ +export function getTemplateKeysFromObject(obj) { + let templateKeys = new Set(); + + function extractKeys(value) { + if (typeof value === "string") { + // Match Liquid variables: {{ variable }} + const variableMatches = value.match(/\{\{\s*([^}]+)\s*\}\}/g) || []; + + // Match Liquid tags: {% tag %} + const tagMatches = value.match(/\{%\s*([^}]+)\s*%\}/g) || []; + + // Combine all matches + const allMatches = [...variableMatches, ...tagMatches]; + + allMatches.forEach((match) => { + // Extract the key (first part before a dot or space) + const key = match.replace(/[{%}]/g, "").trim().split(/[\s.]/)[0]; + if ( + key && + ![ + "if", + "else", + "endif", + "for", + "endfor", + "assign", + "capture", + "endcapture", + ].includes(key) + ) { + templateKeys.add(key); + } + }); + } else if (typeof value === "object" && value !== null) { + Object.values(value).forEach(extractKeys); + } + } + + extractKeys(obj); + console.log(obj, templateKeys); + return Array.from(templateKeys); +} diff --git a/llmstack/client/src/pages/AppConsole.jsx b/llmstack/client/src/pages/AppConsole.jsx index 049668cb411..8f86ac49a72 100644 --- a/llmstack/client/src/pages/AppConsole.jsx +++ b/llmstack/client/src/pages/AppConsole.jsx @@ -295,6 +295,7 @@ export default function AppConsolePage(props) { input: processor.input, input_fields: processor.input_fields, output_template: processor.output_template || {}, + dependencies: processor.dependencies || [], })), }; diff --git a/llmstack/common/utils/liquid.py b/llmstack/common/utils/liquid.py index cba4bb3c4d9..0b5febea6bc 100644 --- a/llmstack/common/utils/liquid.py +++ b/llmstack/common/utils/liquid.py @@ -6,6 +6,7 @@ import lxml.etree as ET from liquid import Environment from lxml import html +from pydantic import BaseModel # Add custom filters env = Environment() @@ -111,3 +112,28 @@ def _serialize_xpath_result(result): def render_template(template, data): return env.from_string(template).render(**data) + + +def hydrate_input(input, values): + def render(value): + if isinstance(value, str): + try: + return render_template(value, values) + except Exception: + logger.exception("Error rendering template when hydrating input") + + return value + + def traverse(obj): + if isinstance(obj, dict): + return {key: traverse(render(value)) for key, value in obj.items()} + elif isinstance(obj, list): + return [traverse(render(item)) for item in obj] + elif isinstance(obj, BaseModel): + cls = obj.__class__ + return cls.model_validate(traverse(obj.model_dump())) + elif isinstance(obj, str): + return render(obj) + return obj + + return traverse(input) diff --git a/llmstack/common/utils/utils.py b/llmstack/common/utils/utils.py index 40c200d0b62..62c1c75cf17 100644 --- a/llmstack/common/utils/utils.py +++ b/llmstack/common/utils/utils.py @@ -22,33 +22,32 @@ run_sitemap_spider_in_process, run_url_spider_in_process, ) -from llmstack.common.utils.liquid import render_template logger = logging.getLogger(__name__) -city_loc_reader = ( - geoip2.database.Reader( - settings.GEOIP_CITY_DB_PATH, - ) - if hasattr( - settings, - "GEOIP_CITY_DB_PATH", - ) - else None -) -country_loc_reader = ( - geoip2.database.Reader( - settings.GEOIP_COUNTRY_DB_PATH, + +def get_location(ip): + city_loc_reader = ( + geoip2.database.Reader( + settings.GEOIP_CITY_DB_PATH, + ) + if hasattr( + settings, + "GEOIP_CITY_DB_PATH", + ) + else None ) - if hasattr( - settings, - "GEOIP_COUNTRY_DB_PATH", + country_loc_reader = ( + geoip2.database.Reader( + settings.GEOIP_COUNTRY_DB_PATH, + ) + if hasattr( + settings, + "GEOIP_COUNTRY_DB_PATH", + ) + else None ) - else None -) - -def get_location(ip): if not ip or not city_loc_reader or not country_loc_reader: return {} @@ -519,31 +518,6 @@ def vectorize_text(text): return vectors[0] -def hydrate_input(input, values): - def render(value): - if isinstance(value, str): - try: - return render_template(value, values) - except Exception: - logger.exception("Error rendering template when hydrating input") - - return value - - def traverse(obj): - if isinstance(obj, dict): - return {key: traverse(render(value)) for key, value in obj.items()} - elif isinstance(obj, list): - return [traverse(render(item)) for item in obj] - elif isinstance(obj, BaseModel): - cls = obj.__class__ - return cls.model_validate(traverse(obj.model_dump())) - elif isinstance(obj, str): - return render(obj) - return obj - - return traverse(input) - - def retry_on_db_error(func=None, max_retries=3, delay=1): def decorator(f): @wraps(f) diff --git a/llmstack/play/actor.py b/llmstack/play/actor.py index 161a3d15c7e..cfc92541593 100644 --- a/llmstack/play/actor.py +++ b/llmstack/play/actor.py @@ -1,13 +1,13 @@ import logging import time -import uuid from types import TracebackType -from typing import Any, Type +from typing import Any, Optional, Type from pydantic import BaseModel, model_validator from pykka import ThreadingActor -from llmstack.play.output_stream import Message, MessageType +from llmstack.play.messages import Message, MessageType +from llmstack.play.output_stream import OutputStream logger = logging.getLogger(__name__) @@ -42,57 +42,56 @@ class ActorConfig(BaseModel): Configuration for the actor """ - class Config: - arbitrary_types_allowed = True - name: str - template_key: str = "" # This is used to find other actors dependent on this actor actor: Type kwargs: dict = {} - dependencies: list = [] - output_cls: Type = None + dependencies: list = [] # List of actor ids that this actor depends on + output_template: Optional[str] = None # Output template for the actor class Actor(ThreadingActor): - def __init__(self, dependencies: list = [], all_dependencies: list = []): + def __init__(self, id: str, coordinator_urn: str, output_cls: Type = None, dependencies: list = []): super().__init__() + self._id = id self._dependencies = dependencies - self._all_dependencies = all_dependencies - self._messages = {} # Holds messages while waiting for dependencies - - def on_receive(self, message: Message) -> Any: - if message.message_type == MessageType.BEGIN: - self.input(message.message) + self._coordinator_urn = coordinator_urn - message_and_key = ( - { - message.template_key: message.message, - } - if message.template_key - else message.message + self._messages = {} # Holds messages while waiting for dependencies + self._output_stream = OutputStream( + stream_id=self._id, + coordinator_urn=self._coordinator_urn, + output_cls=output_cls, ) - if message.message_type == MessageType.STREAM_ERROR: - self.on_error(message_and_key) + def on_receive(self, message: Message) -> Any: + if message.type == MessageType.ERRORS: + self.on_error(message.data.errors) return - if message.message_type == MessageType.STREAM_DATA: - self.input_stream(message_and_key) - - if message.message_type == MessageType.STREAM_CLOSED: - self._messages = {**self._messages, **message_and_key} - - # Call input only when all the dependencies are met - if message.message_type == MessageType.STREAM_CLOSED and set( - self.dependencies, - ) == set(self._messages.keys()): - self.input(self._messages) + if message.type == MessageType.CONTENT_STREAM_BEGIN: + pass + + if message.type == MessageType.CONTENT_STREAM_END: + pass + + if message.type == MessageType.CONTENT_STREAM_CHUNK: + self.input_stream({message.sender: message.data.chunk}) + + if message.type == MessageType.CONTENT: + self._messages = { + **self._messages, + **{ + message.sender: ( + message.data.content.model_dump() + if isinstance(message.data.content, BaseModel) + else message.data.content + ) + }, + } - # If the message is for a tool, call the tool - if message.message_type == MessageType.TOOL_INVOKE: - self._output_stream.set_message_id(str(uuid.uuid4())) - self._output_stream.set_response_to(message.message_id) - self.invoke(message.message) + # Call input only when all the dependencies are met + if set(self._dependencies) == set(self._messages.keys()): + self.input(self._messages) def input(self, message: Any) -> Any: # Co-ordinator calls this when all the dependencies are met. This @@ -104,19 +103,13 @@ def input_stream(self, message: Any) -> Any: # data raise NotImplementedError - def get_dependencies(self): - # Return a list of template_keys that this actor depends on - # TODO: This should be persisted in the endpoint or app config - return [] + def reset(self): + # Resets the current state so we can reuse this actor with new input + self._messages = {} @property def dependencies(self): - return list( - filter( - lambda x: x in self._all_dependencies, - list(set(self._dependencies + self.get_dependencies())), - ), - ) + return [] def on_error(self, error: Any) -> None: # Co-ordinator calls this when any actor in the dependency chain has @@ -136,5 +129,4 @@ def on_failure( f"Encountered {exception_type} in {type(self)}({self.actor_urn}): {exception_value}", ) - # Send error to output stream - self._output_stream.error(exception_value) + # TODO: Send error to output stream diff --git a/llmstack/play/actors/agent.py b/llmstack/play/actors/agent.py index f58ea839899..35f799c6ae7 100644 --- a/llmstack/play/actors/agent.py +++ b/llmstack/play/actors/agent.py @@ -8,12 +8,11 @@ from openai import OpenAI from pydantic import BaseModel -from llmstack.apps.app_session_utils import save_agent_app_session_data +from llmstack.apps.runner.output_actor import OutputResponse from llmstack.apps.types.agent import AgentModel from llmstack.common.utils.liquid import render_template from llmstack.common.utils.provider_config import get_matched_provider_config from llmstack.play.actor import Actor, BookKeepingData -from llmstack.play.actors.output import OutputResponse from llmstack.play.output_stream import Message, MessageType from llmstack.processors.providers.config import ProviderConfigSource from llmstack.processors.providers.metrics import MetricType @@ -501,7 +500,7 @@ def on_receive(self, message: Message) -> Any: + self._agent_messages + [{"role": "assistant", "content": full_content}], } - save_agent_app_session_data(self._agent_app_session_data) + # save_agent_app_session_data(self._agent_app_session_data) self._output_stream.bookkeep(bookkeeping_data) self._output_stream.finalize() @@ -574,8 +573,3 @@ def on_receive(self, message: Message) -> Any: def on_stop(self) -> None: super().on_stop() - - def get_dependencies(self): - return list( - set([x["template_key"] for x in self._processor_configs.values()]), - ) diff --git a/llmstack/play/actors/bookkeeping.py b/llmstack/play/actors/bookkeeping.py index 74d14c4332b..3cede224f2a 100644 --- a/llmstack/play/actors/bookkeeping.py +++ b/llmstack/play/actors/bookkeeping.py @@ -109,8 +109,3 @@ def on_stop(self) -> None: except Exception as e: logger.error(f"Error adding history persistence job: {e}") return super().on_stop() - - def get_dependencies(self): - return list( - set([x["template_key"] for x in self._processor_configs.values()]), - ) diff --git a/llmstack/play/actors/input.py b/llmstack/play/actors/input.py deleted file mode 100644 index 93a113bbc08..00000000000 --- a/llmstack/play/actors/input.py +++ /dev/null @@ -1,87 +0,0 @@ -import logging -import time -from types import TracebackType -from typing import Any, NamedTuple, Type - -from asgiref.sync import async_to_sync - -from llmstack.play.actor import Actor, BookKeepingData - -logger = logging.getLogger(__name__) - - -class InputRequest(NamedTuple): - """ - Input request - """ - - request_endpoint_uuid: str - request_app_uuid: str - request_app_session_key: str - request_owner: object - request_uuid: str - request_user_email: str - request_ip: str - request_location: str - request_user_agent: str - request_content_type: str - request_body: str - disable_history: bool = False - request_app_store_uuid: str = "" - - -class InputActor(Actor): - def __init__( - self, - output_stream, - input_request, - dependencies=[], - all_dependencies=[], - ): - super().__init__(dependencies=dependencies, all_dependencies=all_dependencies) - self.input_request = input_request - self.data = None - self.output_stream = output_stream - self.stream_started = False - self.stream_closed = False - self.sent_data = False - - def write(self, message: Any) -> Any: - async_to_sync(self.output_stream.write)(message) - self.output_stream.finalize() - self.output_stream.bookkeep( - BookKeepingData( - input=message, - run_data={ - **self.input_request._asdict(), - }, - timestamp=time.time(), - disable_history=self.input_request.disable_history, - ), - ) - - def get_output(self): - # Return an iter that yield whenever we get a new message - while True: - if not self.stream_started or self.sent_data and not self.stream_closed: - continue - - self.sent_data = True - - if self.stream_closed: - break - - yield self.data.__dict__ - - def on_stop(self) -> None: - pass - - def on_failure( - self, - exception_type: Type[BaseException], - exception_value: BaseException, - traceback: TracebackType, - ) -> None: - logger.error( - f"IOActor failed: {exception_type} {exception_value} {traceback}", - ) diff --git a/llmstack/play/actors/output.py b/llmstack/play/actors/output.py deleted file mode 100644 index 091cb83c044..00000000000 --- a/llmstack/play/actors/output.py +++ /dev/null @@ -1,145 +0,0 @@ -import logging -import time -from typing import Any, NamedTuple - -from llmstack.common.utils.liquid import render_template -from llmstack.play.actor import Actor, BookKeepingData -from llmstack.play.utils import extract_jinja2_variables - -logger = logging.getLogger(__name__) - - -class OutputResponse(NamedTuple): - """ - Output response - """ - - response_content_type: str - response_status: int - response_body: str - response_headers: dict - - -class OutputActor(Actor): - def __init__( - self, - output_stream, - dependencies=[], - template=None, - all_dependencies=[], - ): - super().__init__(dependencies=dependencies, all_dependencies=all_dependencies) - self._output_stream = output_stream - self._data = None - self._data_chunk = None - self._data_chunks = [] - self._data_sent = False - self._data_chunk_sent = False - self._data_chunks_sent = 0 - self._data_done = False - self._template = template - self._error = None - self._stopped = False - - def input(self, message: Any) -> Any: - if self._template: - try: - self._data = render_template(self._template, message) - except Exception as e: - logger.error( - f"Error rendering template {self._template} with data {self._data}: {e}", - ) - else: - self._data = message - self._data_done = True - self._output_stream.finalize() - - output_response = OutputResponse( - response_content_type="application/json" if not self._template else "text/markdown", - response_status=200 if not self._error else 400, - response_body=self._data if not self._error else self._error, - response_headers={}, - ) - - self._output_stream.bookkeep( - BookKeepingData( - run_data={**output_response._asdict()}, - timestamp=time.time(), - ), - ) - - def input_stream(self, message: Any) -> Any: - self._data_chunks.append(message) - - def get_output(self): - while True: - if ( - self._error - or (self._stopped and self._data_sent and not self._data) - or (self._data_done and self._data_sent) - ): - break - if not self._data or self._data_sent: - continue - - self._data_sent = True - yield {"output": self._data, "chunks": self._data_chunks} - - if self._error: - yield {"errors": list(self._error.values())} - - if self._stopped and not self._data_sent: - yield {"errors": ["Output interrupted"]} - - def get_output_stream(self): - while True: - if self._error or self._stopped or (self._data_done and self._data_chunk_sent): - break - - if self._data_chunks_sent < len(self._data_chunks): - self._data_chunk = self._data_chunks[self._data_chunks_sent] - self._data_chunks_sent += 1 - self._data_chunk_sent = False - - if not self._data_chunk or self._data_chunk_sent: - continue - self._data_chunk_sent = True - - yield self._data_chunk - - if self._error: - yield {"errors": list(self._error.values())} - - def on_stop(self) -> None: - self._data_done = True - self._data_chunk_sent = True - self._stopped = True - return super().on_stop() - - def on_error(self, error) -> None: - logger.info(f"Error in output actor: {error}") - self._error = error - self._data_done = True - self._output_stream.finalize() - - output_response = OutputResponse( - response_content_type="application/json", - response_status=400, - response_body=self._error, - response_headers={}, - ) - - self._output_stream.bookkeep( - BookKeepingData( - run_data={**output_response._asdict()}, - timestamp=time.time(), - ), - ) - - def get_dependencies(self): - if not self._template: - return [] - - dependencies = [x.split(".")[0] for x in extract_jinja2_variables(self._template)] - - return list(set(dependencies)) diff --git a/llmstack/play/coordinator.py b/llmstack/play/coordinator.py index 431c4721560..753171d832e 100644 --- a/llmstack/play/coordinator.py +++ b/llmstack/play/coordinator.py @@ -4,7 +4,7 @@ from pykka import ActorDeadError, ThreadingActor from llmstack.play.actor import ActorConfig -from llmstack.play.output_stream import Message, MessageType, OutputStream +from llmstack.play.output_stream import Message, MessageType from llmstack.play.utils import ResettableTimer logger = logging.getLogger(__name__) @@ -13,9 +13,8 @@ class Coordinator(ThreadingActor): - def __init__(self, session_id, actor_configs: List[ActorConfig]): + def __init__(self, actor_configs: List[ActorConfig]): super().__init__() - self._session_id = session_id self._stream_errors = {} # Make sure there are not duplicate names or template_keys in @@ -24,28 +23,16 @@ def __init__(self, session_id, actor_configs: List[ActorConfig]): actor_configs, ) - assert len(set([actor_config.template_key for actor_config in actor_configs])) == len( - actor_configs, - ) - self._actor_configs = {actor_config.name: actor_config for actor_config in actor_configs} - # Create output_streams - self._output_streams = [] + all_dependencies = [actor_config.name for actor_config in actor_configs] - all_dependencies = [actor_config.template_key for actor_config in actor_configs] # Spawn actors self.actors = {} for actor_config in actor_configs: - self._output_streams.append( - OutputStream( - stream_id=actor_config.name, - coordinator_urn=self.actor_urn, - output_cls=actor_config.output_cls, - ), - ) actor = actor_config.actor.start( - output_stream=self._output_streams[-1], + id=actor_config.name, + coordinator_urn=self.actor_urn, dependencies=actor_config.dependencies, all_dependencies=all_dependencies, **actor_config.kwargs, diff --git a/llmstack/play/messages.py b/llmstack/play/messages.py new file mode 100644 index 00000000000..b17cb552dae --- /dev/null +++ b/llmstack/play/messages.py @@ -0,0 +1,57 @@ +from typing import Any, Dict, List, Optional, Union + +from pydantic import BaseModel + +from llmstack.common.blocks.base.schema import StrEnum + + +class MessageType(StrEnum): + BEGIN = "begin" + BOOKKEEPING = "bookkeeping" + CONTENT = "content" + CONTENT_STREAM_CHUNK = "content_stream_chunk" + CONTENT_STREAM_BEGIN = "content_stream_begin" + CONTENT_STREAM_END = "content_stream_end" + ERRORS = "errors" + + +class MessageData(BaseModel): + pass + + +class Error(BaseModel): + code: int = -1 + message: str + + +class ErrorsData(MessageData): + errors: List[Error] + + +class ContentData(MessageData): + content: Any + + +class ContentStreamChunkData(MessageData): + chunk: Any + + +class ContentStreamErrorsData(MessageData): + errors: List[Error] + + +class Message(BaseModel): + id: str + type: MessageType + sender: str + receiver: Optional[str] = None + reply_to: Optional[str] = None # Id of the message this is a reply to + data: Optional[ + Union[ + ContentData, + ContentStreamChunkData, + ContentStreamErrorsData, + ErrorsData, + Dict, + ] + ] = None diff --git a/llmstack/play/output_stream.py b/llmstack/play/output_stream.py index 702c899e776..900e7861178 100644 --- a/llmstack/play/output_stream.py +++ b/llmstack/play/output_stream.py @@ -4,13 +4,21 @@ import asyncio import logging +import uuid from collections import defaultdict -from typing import Any, Dict, Optional, Type +from typing import Any, Dict, Type from pydantic import BaseModel from pykka import ActorProxy, ActorRegistry -from llmstack.common.blocks.base.schema import StrEnum +from llmstack.play.messages import ( + ContentData, + ContentStreamChunkData, + ContentStreamErrorsData, + Error, + Message, + MessageType, +) __all__ = ["OutputStream"] @@ -27,25 +35,25 @@ def stitch_model_objects(obj1: Any, obj2: Any) -> Any: Returns: The stitched object. """ - from llmstack.play.actors.agent import AgentOutput - - if isinstance(obj1, dict) and isinstance(obj2, AgentOutput): - return { - **obj1, - **obj2.model_dump(), - **{ - "content": stitch_model_objects( - obj1.get( - "content", - {}, - ), - obj2.model_dump().get( - "content", - {}, - ), - ), - }, - } + # from llmstack.play.actors.agent import AgentOutput + + # if isinstance(obj1, dict) and isinstance(obj2, AgentOutput): + # return { + # **obj1, + # **obj2.model_dump(), + # **{ + # "content": stitch_model_objects( + # obj1.get( + # "content", + # {}, + # ), + # obj2.model_dump().get( + # "content", + # {}, + # ), + # ), + # }, + # } if isinstance(obj1, BaseModel): obj1 = obj1.model_dump() @@ -80,41 +88,40 @@ def stitch_fields( return obj2 if obj2 else obj1 -class MessageType(StrEnum): - """ - MessageType enum. - """ +# class MessageType(StrEnum): +# """ +# MessageType enum. +# """ + +# BEGIN = "begin" +# BOOKKEEPING = ("bookkeeping",) +# BOOKKEEPING_DONE = "bookkeeping_done" +# INPUT = "input" +# TOOL_INVOKE = "tool_invoke" +# AGENT_DONE = "agent_done" +# STREAM = "stream" +# STREAM_CLOSED = "stream_closed" +# STREAM_ERROR = "stream_error" +# STREAM_DATA = "stream_data" +# STREAM_FINALIZED = "stream_finalized" +# STREAM_FINALIZED_ERROR = "stream_finalized_error" +# STREAM_FINALIZED_DATA = "stream_finalized_data" +# STREAM_FINALIZED_CLOSED = "stream_finalized_closed" +# STREAM_FINALIZED_CLOSED_ERROR = "stream_finalized_closed_error" +# STREAM_FINALIZED_CLOSED_DATA = "stream_finalized_closed_data" + + +# class Message(BaseModel): +# message_id: Optional[str] = None +# message_type: MessageType = MessageType.BEGIN +# message_from: Optional[str] = None +# message_to: Optional[str] = None +# response_to: Optional[str] = None # This is used to send a response to a message +# message: Optional[Any] = None + - BEGIN = "begin" - BOOKKEEPING = ("bookkeeping",) - BOOKKEEPING_DONE = "bookkeeping_done" - INPUT = "input" - TOOL_INVOKE = "tool_invoke" - AGENT_DONE = "agent_done" - STREAM = "stream" - STREAM_CLOSED = "stream_closed" - STREAM_ERROR = "stream_error" - STREAM_DATA = "stream_data" - STREAM_FINALIZED = "stream_finalized" - STREAM_FINALIZED_ERROR = "stream_finalized_error" - STREAM_FINALIZED_DATA = "stream_finalized_data" - STREAM_FINALIZED_CLOSED = "stream_finalized_closed" - STREAM_FINALIZED_CLOSED_ERROR = "stream_finalized_closed_error" - STREAM_FINALIZED_CLOSED_DATA = "stream_finalized_closed_data" - - -class Message(BaseModel): - message_id: Optional[str] = None - message_type: MessageType = MessageType.BEGIN - message_from: Optional[str] = None - message_to: Optional[str] = None - response_to: Optional[str] = None # This is used to send a response to a message - message: Optional[Any] = None - template_key: Optional[str] = None - - -class StreamClosedException(Exception): - pass +# class StreamClosedException(Exception): +# pass class OutputStream: @@ -122,9 +129,9 @@ class OutputStream: OutputStream class. """ - class Status: - OPEN = "open" - CLOSED = "closed" + # class Status: + # OPEN = "open" + # CLOSED = "closed" def __init__( self, @@ -135,14 +142,12 @@ def __init__( """ Initializes the OutputStream class. """ + self._message_id = str(uuid.uuid4()) self._data = None self._output_cls = output_cls self._stream_id = stream_id self._coordinator_urn = coordinator_urn self._coordinator_proxy = None - self._message_id = None - self._response_to = None - self._status: OutputStream.Status = OutputStream.Status.OPEN @property def _coordinator(self) -> ActorProxy: @@ -155,44 +160,43 @@ def _coordinator(self) -> ActorProxy: self._coordinator_urn, ).proxy() except Exception as e: - logger.error(f"Failed to get coordinator proxy: {e}") + logger.error(f"Failed to get coordinator proxy for {self._coordinator_urn}: {e}") return self._coordinator_proxy - def set_message_id(self, message_id: str) -> None: - """ - Sets the current message id. - """ - self._message_id = message_id + # def set_message_id(self, message_id: str) -> None: + # """ + # Sets the current message id. + # """ + # self._message_id = message_id - def set_response_to(self, response_to: str) -> None: - """ - Sets the response to. - """ - self._response_to = response_to + # def set_response_to(self, response_to: str) -> None: + # """ + # Sets the response to. + # """ + # self._response_to = response_to - async def write(self, data: BaseModel, message_id=None, message_to=None, response_to=None) -> None: + async def write(self, data: Any) -> None: """ Stitches fields from data to _data. """ - if self._status == OutputStream.Status.CLOSED: - raise StreamClosedException("Output stream is closed.") self._coordinator.relay( Message( - message_id=message_id or self._message_id, - message_type=MessageType.STREAM_DATA, - message_from=self._stream_id, - message=( - data.model_dump() - if isinstance( - data, - BaseModel, - ) - else data + id=self._message_id, + type=MessageType.CONTENT_STREAM_CHUNK, + sender=self._stream_id, + receiver="coordinator", + data=ContentStreamChunkData( + chunk=( + data.model_dump() + if isinstance( + data, + BaseModel, + ) + else data + ), ), - message_to=message_to, - response_to=response_to or self._response_to, ), ) @@ -209,22 +213,22 @@ async def write(self, data: BaseModel, message_id=None, message_to=None, respons self._data = stitch_model_objects(self._data, data) await asyncio.sleep(0.0001) - async def write_raw(self, message: Message) -> None: - """ - Writes raw message to the output stream. - """ - if self._status == OutputStream.Status.CLOSED: - raise StreamClosedException("Output stream is closed.") + # async def write_raw(self, message: Message) -> None: + # """ + # Writes raw message to the output stream. + # """ + # if self._status == OutputStream.Status.CLOSED: + # raise StreamClosedException("Output stream is closed.") - self._coordinator.relay(message) + # self._coordinator.relay(message) - await asyncio.sleep(0.0001) + # await asyncio.sleep(0.0001) - def close(self) -> None: - """ - Closes the output stream. - """ - self._status = OutputStream.Status.CLOSED + # def close(self) -> None: + # """ + # Closes the output stream. + # """ + # self._status = OutputStream.Status.CLOSED def get_data(self) -> BaseModel: """ @@ -232,43 +236,53 @@ def get_data(self) -> BaseModel: """ return self._data - def get_status(self) -> Status: - """ - Returns the status. - """ - return self._status + # def get_status(self) -> Status: + # """ + # Returns the status. + # """ + # return self._status def finalize( self, - message_id=None, - message_to=None, - response_to=None, ) -> BaseModel: """ Closes the output stream and returns stitched data. """ - output = self._data + output = self._data if not self._output_cls else self._output_cls(**self._data) self._data = None + # Send the end message + self._coordinator.relay( + Message( + id=self._message_id, + type=MessageType.CONTENT_STREAM_END, + sender=self._stream_id, + receiver="coordinator", + ), + ) + + # Send the final data + self._coordinator.relay( Message( - message_id=message_id or self._message_id, - message_to=message_to, - message_type=MessageType.STREAM_CLOSED, - message_from=self._stream_id, - message=( - output.model_dump() - if isinstance( - output, - BaseModel, - ) - else output + id=self._message_id, + type=MessageType.CONTENT, + sender=self._stream_id, + receiver="coordinator", + data=ContentData( + content=( + output.model_dump() + if isinstance( + output, + BaseModel, + ) + else output + ), ), - response_to=response_to or self._response_to, ), ) - return output if not self._output_cls else self._output_cls(**output) + return output def bookkeep(self, data: BaseModel) -> None: """ @@ -276,37 +290,23 @@ def bookkeep(self, data: BaseModel) -> None: """ self._coordinator.relay( Message( - message_type=MessageType.BOOKKEEPING, - message_from=self._stream_id, - message=data.model_dump(), - message_id=self._message_id, + id=self._message_id, + type=MessageType.BOOKKEEPING, + sender=self._stream_id, + receiver="output", + data=data.model_dump(), ), ) - def bookkeep_done(self) -> None: - """ - Bookkeeping done. - """ - try: - self._coordinator.relay( - Message( - message_type=MessageType.BOOKKEEPING_DONE, - message_from=self._stream_id, - message=None, - ), - ) - except Exception as e: - # Coordinator may have already stopped - logger.info(f"Error sending bookkeeping done message: {e}") - def error(self, error: Exception) -> None: """ Error entry. """ self._coordinator.relay( Message( - message_type=MessageType.STREAM_ERROR, - message_from=self._stream_id, - message=str(error), + type=MessageType.ERRORS, + sender=self._stream_id, + receiver="coordinator", + data=ContentStreamErrorsData(errors=[Error(message=str(error))]), ), ) diff --git a/llmstack/play/utils.py b/llmstack/play/utils.py index 43d22054721..0fd048492bd 100644 --- a/llmstack/play/utils.py +++ b/llmstack/play/utils.py @@ -15,7 +15,6 @@ LoopExpression, Nil, ) -from pydantic import BaseModel def extract_nodes(node): @@ -114,59 +113,6 @@ def stop(self): self.condition.notify_all() -def extract_jinja2_variables(input_data): - def extract_from_string(s): - # Define regular expression patterns to match Jinja2 elements, including - # - variables: {{ variable_name }} or {{ variable_name | filter }} - # - tags: {% tag_name %} - variable_pattern = r"{{ *(.*?) *}}" - tag_pattern = r"{% *(.*?) *%}" - - variables = set() - - # Find all variable matches - variable_matches = re.findall(variable_pattern, s) - for match in variable_matches: - variables.add(match.strip().split("|")[0].strip()) - - # Find all tag matches - tag_matches = re.findall(tag_pattern, s) - for match in tag_matches: - # Split the tag content by space to determine its structure, - # and add the variable depending on the tag - split_tag = match.strip().split() - if split_tag[0] in {"if", "elif"}: - # In {% if x > y %}, {% if x == y %} or {% if x != y %}, - # extract both 'x' and 'y' as variables - variables.update(re.findall(r"\b\w+\b", split_tag[1])) - elif split_tag[0] == "for": - # In {% for item in items %}, extract 'items' as a variable - if len(split_tag) == 4 and split_tag[2] == "in": - variables.add(split_tag[3]) - - return variables - - variables = set() - - if isinstance(input_data, str): - variables.update(extract_from_string(input_data)) - elif isinstance(input_data, dict): - for key, value in input_data.items(): - if isinstance(value, str): - variables.update(extract_from_string(value)) - elif isinstance(value, dict): - variables.update(extract_jinja2_variables(value)) - elif isinstance(value, list): - variables.update(extract_jinja2_variables(value)) - elif isinstance(input_data, list): - for item in input_data: - variables.update(extract_jinja2_variables(item)) - elif isinstance(input_data, BaseModel): - variables.update(extract_jinja2_variables(input_data.__dict__)) - - return variables - - def extract_variables_from_liquid_template(liquid_template): variables = [] diff --git a/llmstack/processors/providers/api_processor_interface.py b/llmstack/processors/providers/api_processor_interface.py index 14303a6f6f8..8d293ea94d7 100644 --- a/llmstack/processors/providers/api_processor_interface.py +++ b/llmstack/processors/providers/api_processor_interface.py @@ -1,12 +1,13 @@ import logging import time from functools import cache -from typing import Any, Dict, Optional, TypeVar +from typing import Any, Dict, Optional import ujson as json from django import db from pydantic import BaseModel +from llmstack.apps.app_session_utils import get_app_session_data, save_app_session_data from llmstack.apps.schemas import OutputTemplate from llmstack.assets.utils import get_asset_by_objref from llmstack.common.blocks.base.processor import ( @@ -16,19 +17,15 @@ ProcessorInterface, ) from llmstack.common.blocks.base.schema import BaseSchema as _Schema +from llmstack.common.utils.liquid import hydrate_input from llmstack.common.utils.provider_config import get_matched_provider_config -from llmstack.common.utils.utils import hydrate_input from llmstack.play.actor import Actor, BookKeepingData from llmstack.play.actors.agent import ToolInvokeInput -from llmstack.play.utils import extract_jinja2_variables from llmstack.processors.providers.config import ProviderConfig, ProviderConfigSource from llmstack.processors.providers.metrics import MetricType logger = logging.getLogger(__name__) -ConfigurationSchemaType = TypeVar("ConfigurationSchemaType") -InputSchemaType = TypeVar("InputSchemaType") -OutputSchemaType = TypeVar("OutputSchemaType") TEXT_WIDGET_NAME = "output_text" IMAGE_WIDGET_NAME = "output_image" @@ -147,11 +144,10 @@ def __init__( input, config, env, - output_stream=None, + session_id="", + coordinator_urn=None, dependencies=[], - all_dependencies=[], metadata={}, - session_data=None, request=None, id=None, is_tool=False, @@ -159,22 +155,27 @@ def __init__( ): Actor.__init__( self, + id=id, + coordinator_urn=coordinator_urn, + output_cls=self._get_output_class(), dependencies=dependencies, - all_dependencies=all_dependencies, ) self._config = self._get_configuration_class()(**config) self._input = self._get_input_class()(**input) + self._config_template = self._get_configuration_class()(**config) + self._input_template = self._get_input_class()(**input) self._env = env - self._id = id - self._output_stream = output_stream + self._session_id = session_id self._is_tool = is_tool self._request = request self._metadata = metadata self._session_enabled = session_enabled self._usage_data = [("promptly/*/*/*", MetricType.INVOCATION, (ProviderConfigSource.PLATFORM_DEFAULT, 1))] - self.process_session_data(session_data if session_enabled else {}) + session_data = get_app_session_data(self._session_id, self._id) + if self._session_enabled: + self.process_session_data(session_data or {}) @classmethod def get_output_schema(cls) -> dict: @@ -311,18 +312,7 @@ def validate_and_process(self) -> str: raise Exception("Invalid result type") def get_bookkeeping_data(self) -> BookKeepingData: - None - - def get_dependencies(self): - # Iterate over string templates in values of input and config and - # extract dependencies - dependencies = [] - dependencies.extend(extract_jinja2_variables(self._input)) - dependencies.extend(extract_jinja2_variables(self._config)) - - # In case of _inputs0.xyz, extract _inputs0 as dependency - dependencies = [x.split(".")[0] for x in dependencies] - return list(set(dependencies)) + return None def input(self, message: Any) -> Any: # Hydrate the input and config before processing @@ -332,19 +322,19 @@ def input(self, message: Any) -> Any: try: self._input = ( hydrate_input( - self._input, + self._input_template, message, ) if message - else self._input + else self._input_template ) self._config = ( hydrate_input( - self._config, + self._config_template, message, ) if self._config and message - else self._config + else self._config_template ) output = self.process() except Exception as e: @@ -381,6 +371,9 @@ def input(self, message: Any) -> Any: if bookkeeping_data: bookkeeping_data.usage_data = self.usage_data() + # Persist session_data + save_app_session_data(self._session_id, self._id, bookkeeping_data.session_data) + self._output_stream.bookkeep(bookkeeping_data) def tool_invoke_input(self, tool_args: dict) -> ToolInvokeInput: @@ -389,23 +382,22 @@ def tool_invoke_input(self, tool_args: dict) -> ToolInvokeInput: ) def invoke(self, message: ToolInvokeInput) -> Any: - self._base_input = self._input try: self._input = ( hydrate_input( - self._input, + self._input_template, {**message.input, **message.tool_args}, ) if message - else self._input + else self._input_template ) self._config = ( hydrate_input( - self._config, + self._config_template, {**message.input, **message.tool_args}, ) - if self._config - else self._config + if self._config_template + else self._config_template ) # Merge tool args with input @@ -444,8 +436,6 @@ def invoke(self, message: ToolInvokeInput) -> Any: self._output_stream.bookkeep(bookkeeping_data) - self._input = self._base_input - def input_stream(self, message: Any) -> Any: # We do not support input stream for this processor pass diff --git a/llmstack/processors/providers/api_processors.py b/llmstack/processors/providers/processors.py similarity index 86% rename from llmstack/processors/providers/api_processors.py rename to llmstack/processors/providers/processors.py index 6cc196a49ee..5172e28708e 100644 --- a/llmstack/processors/providers/api_processors.py +++ b/llmstack/processors/providers/processors.py @@ -5,13 +5,13 @@ logger = logging.getLogger(__name__) -class ApiProcessorFactory: +class ProcessorFactory: """ - Factory class for API processors + Factory class for processors """ @staticmethod - def get_api_processor( + def get_processor( processor_slug, provider_slug=None, ) -> ApiProcessorInterface: diff --git a/llmstack/processors/serializers.py b/llmstack/processors/serializers.py index 55d4ccd77b7..53d28d1197e 100644 --- a/llmstack/processors/serializers.py +++ b/llmstack/processors/serializers.py @@ -3,9 +3,9 @@ from rest_framework import serializers -from llmstack.processors.providers.api_processors import ApiProcessorFactory +from llmstack.processors.providers.processors import ProcessorFactory -from .models import ApiBackend, ApiProvider, Endpoint, Feedback, RunEntry +from .models import ApiBackend, ApiProvider, Feedback, RunEntry class ApiProviderSerializer(serializers.ModelSerializer): @@ -25,7 +25,7 @@ class ApiBackendSerializer(serializers.ModelSerializer): output_template = serializers.SerializerMethodField() def get_config_schema(self, obj): - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.get_processor( obj.slug, obj.api_provider.slug, ) @@ -34,7 +34,7 @@ def get_config_schema(self, obj): return json.loads(processor_cls.get_configuration_schema()) def get_input_schema(self, obj): - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.get_processor( obj.slug, obj.api_provider.slug, ) @@ -43,7 +43,7 @@ def get_input_schema(self, obj): return json.loads(processor_cls.get_input_schema()) def get_output_schema(self, obj): - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.get_processor( obj.slug, obj.api_provider.slug, ) @@ -52,7 +52,7 @@ def get_output_schema(self, obj): return json.loads(processor_cls.get_output_schema()) def get_config_ui_schema(self, obj): - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.get_processor( obj.slug, obj.api_provider.slug, ) @@ -61,7 +61,7 @@ def get_config_ui_schema(self, obj): return processor_cls.get_configuration_ui_schema() def get_input_ui_schema(self, obj): - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.get_processor( obj.slug, obj.api_provider.slug, ) @@ -70,7 +70,7 @@ def get_input_ui_schema(self, obj): return processor_cls.get_input_ui_schema() def get_output_ui_schema(self, obj): - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.get_processor( obj.slug, obj.api_provider.slug, ) @@ -79,7 +79,7 @@ def get_output_ui_schema(self, obj): return processor_cls.get_output_ui_schema() def get_output_template(self, obj): - processor_cls = ApiProcessorFactory.get_api_processor( + processor_cls = ProcessorFactory.get_processor( obj.slug, obj.api_provider.slug, ) @@ -107,30 +107,6 @@ class Meta: ] -class EndpointSerializer(serializers.ModelSerializer): - api_backend = ApiBackendSerializer() - - class Meta: - model = Endpoint - fields = [ - "name", - "uuid", - "api_backend", - "param_values", - "post_processor", - "prompt", - "draft", - "is_live", - "parent_uuid", - "description", - "version", - "created_on", - "is_app", - "config", - "input", - ] - - class HistorySerializer(serializers.ModelSerializer): app_detail = serializers.SerializerMethodField() processor_runs = serializers.SerializerMethodField() diff --git a/llmstack/server/asgi.py b/llmstack/server/asgi.py index c5e4a1612ea..78ef34d3358 100644 --- a/llmstack/server/asgi.py +++ b/llmstack/server/asgi.py @@ -30,8 +30,8 @@ "websocket": AuthMiddlewareStack( URLRouter( [ - path("ws/apps/", AppConsumer.as_asgi()), - path("ws/apps//", AppConsumer.as_asgi()), + path("ws/apps/", AppConsumer.as_asgi()), + path("ws/apps//", AppConsumer.as_asgi()), path("ws/assets//", AssetStreamConsumer.as_asgi()), path( "ws/connections//activate", diff --git a/llmstack/server/consumers.py b/llmstack/server/consumers.py index 4d8133a3e22..a2c6ec08719 100644 --- a/llmstack/server/consumers.py +++ b/llmstack/server/consumers.py @@ -10,10 +10,17 @@ from django.conf import settings from django.core.exceptions import PermissionDenied from django.http import HttpRequest, QueryDict -from django_ratelimit.exceptions import Ratelimited + +# from django_ratelimit.exceptions import Ratelimited from flags.state import flag_enabled +from llmstack.apps.runner.app_runner import ( + AppRunnerRequest, + AppRunnerStreamingResponseType, + WebAppRunnerSource, +) from llmstack.assets.utils import get_asset_by_objref +from llmstack.common.utils.utils import get_location from llmstack.connections.actors import ConnectionActivationActor from llmstack.connections.models import ( Connection, @@ -86,15 +93,43 @@ def _build_request_from_input(post_data, scope): class AppConsumer(AsyncWebsocketConsumer): async def connect(self): - self.app_id = self.scope["url_route"]["kwargs"]["app_id"] - self.preview = True if "preview" in self.scope["url_route"]["kwargs"] else False - self._session_id = None - self._coordinator_ref = None + from llmstack.apps.apis import AppViewSet + + self._app_uuid = self.scope["url_route"]["kwargs"]["app_uuid"] + self._preview = True if "preview" in self.scope["url_route"]["kwargs"] else False + self._session_id = str(uuid.uuid4()) + + headers = dict(self.scope["headers"]) + request_ip = headers.get( + "X-Forwarded-For", + self.scope.get("client", [""])[0] or "", + ).split(",")[ + 0 + ].strip() or headers.get("X-Real-IP", "") + request_location = headers.get("X-Client-Geo-Location", "") + if not request_location: + location = get_location(request_ip) + request_location = f"{location.get('city', '')}, {location.get('country_code', '')}" if location else "" + + self._source = WebAppRunnerSource( + id=self._session_id, + request_ip=request_ip, + request_location=request_location, + request_user_agent=headers.get("User-Agent", ""), + request_content_type=headers.get("Content-Type", ""), + ) + self._app_runner = await AppViewSet().get_app_runner_async( + self._session_id, + self._app_uuid, + self._source, + self.scope.get("user", None), + self._preview, + ) await self.accept() async def disconnect(self, close_code): - # TODO: Close the stream - pass + if self._app_runner: + await self._app_runner.stop() async def _run_app(self, request_uuid, request, **kwargs): from llmstack.apps.apis import AppViewSet @@ -108,11 +143,26 @@ async def _run_app(self, request_uuid, request, **kwargs): ) async def _respond_to_event(self, text_data): + json_data = json.loads(text_data) + client_request_id = json_data.get("id", None) + app_runner_request = AppRunnerRequest( + client_request_id=client_request_id, + session_id=self._session_id, + input=json_data.get("input", {}), + ) + try: + response_iterator = self._app_runner.run(app_runner_request) + async for response in response_iterator: + if response.type == AppRunnerStreamingResponseType.OUTPUT_STREAM_CHUNK: + await self.send(text_data=json.dumps(response.model_dump())) + except Exception as e: + logger.exception(f"Failed to run app: {e}") + + async def _respond_to_event_old(self, text_data): from llmstack.apps.apis import AppViewSet from llmstack.apps.models import AppSessionFiles json_data = json.loads(text_data) - input = json_data.get("input", {}) id = json_data.get("id", None) event = json_data.get("event", None) request_uuid = str(uuid.uuid4()) @@ -123,44 +173,44 @@ async def _respond_to_event(self, text_data): self._user = self.scope.get("user", None) self._session = self.scope.get("session", None) - if event == "run": - try: - request = await _build_request_from_input({"input": input, "stream": True}, self.scope) - if is_ratelimited_fn(request, self._respond_to_event): - raise Ratelimited("Rate limit reached.") - - output_stream, self._coordinator_ref = await self._run_app(request_uuid=request_uuid, request=request) - # Generate a uuid for the response - response_id = str(uuid.uuid4()) - - async for output in output_stream: - if "errors" in output or "session" in output: - if "session" in output: - self._session_id = output["session"]["id"] - await self.send(text_data=json.dumps({**output, **{"reply_to": id}})) - else: - await self.send( - text_data=json.dumps( - {"output": output, "reply_to": id, "id": response_id, "request_id": request_uuid} - ) - ) - - await self.send( - text_data=json.dumps( - {"event": "done", "reply_to": id, "id": response_id, "request_id": request_uuid} - ) - ) - except Ratelimited: - await self.send( - text_data=json.dumps({"event": "ratelimited", "reply_to": id, "request_id": request_uuid}) - ) - except UsageLimitReached: - await self.send( - text_data=json.dumps({"event": "usagelimited", "reply_to": id, "request_id": request_uuid}) - ) - except Exception as e: - logger.exception(e) - await self.send(text_data=json.dumps({"errors": [str(e)], "reply_to": id, "request_id": request_uuid})) + # if event == "run": + # try: + # request = await _build_request_from_input({"input": input, "stream": True}, self.scope) + # if is_ratelimited_fn(request, self._respond_to_event): + # raise Ratelimited("Rate limit reached.") + + # output_stream, self._coordinator_ref = await self._run_app(request_uuid=request_uuid, request=request) + # # Generate a uuid for the response + # response_id = str(uuid.uuid4()) + + # async for output in output_stream: + # if "errors" in output or "session" in output: + # if "session" in output: + # self._session_id = output["session"]["id"] + # await self.send(text_data=json.dumps({**output, **{"reply_to": id}})) + # else: + # await self.send( + # text_data=json.dumps( + # {"output": output, "reply_to": id, "id": response_id, "request_id": request_uuid} + # ) + # ) + + # await self.send( + # text_data=json.dumps( + # {"event": "done", "reply_to": id, "id": response_id, "request_id": request_uuid} + # ) + # ) + # except Ratelimited: + # await self.send( + # text_data=json.dumps({"event": "ratelimited", "reply_to": id, "request_id": request_uuid}) + # ) + # except UsageLimitReached: + # await self.send( + # text_data=json.dumps({"event": "usagelimited", "reply_to": id, "request_id": request_uuid}) + # ) + # except Exception as e: + # logger.exception(e) + # await self.send(text_data=json.dumps({"errors": [str(e)], "reply_to": id, "request_id": request_uuid})) if event == "init": # Create a new session and return the session id diff --git a/poetry.lock b/poetry.lock index 495a885e129..43a43fb7309 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "aiohttp" @@ -1363,6 +1363,20 @@ wrapt = ">=1.10,<2" [package.extras] dev = ["PyTest", "PyTest-Cov", "bump2version (<1)", "sphinx (<2)", "tox"] +[[package]] +name = "diff-match-patch" +version = "20230430" +description = "Diff Match and Patch" +optional = false +python-versions = ">=3.7" +files = [ + {file = "diff-match-patch-20230430.tar.gz", hash = "sha256:953019cdb9c9d2c9e47b5b12bcff3cf4746fc4598eb406076fa1fc27e6a1f15c"}, + {file = "diff_match_patch-20230430-py3-none-any.whl", hash = "sha256:dce43505fb7b1b317de7195579388df0746d90db07015ed47a85e5e44930ef93"}, +] + +[package.extras] +dev = ["attribution (==1.6.2)", "black (==23.3.0)", "flit (==3.8.0)", "mypy (==1.2.0)", "ufmt (==2.1.0)", "usort (==1.0.6)"] + [[package]] name = "dirtyjson" version = "1.0.8" @@ -4967,6 +4981,7 @@ optional = false python-versions = ">=3.9" files = [ {file = "pandas-2.2.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:90c6fca2acf139569e74e8781709dccb6fe25940488755716d1d354d6bc58bce"}, + {file = "pandas-2.2.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c7adfc142dac335d8c1e0dcbd37eb8617eac386596eb9e1a1b77791cf2498238"}, {file = "pandas-2.2.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4abfe0be0d7221be4f12552995e58723c7422c80a659da13ca382697de830c08"}, {file = "pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8635c16bf3d99040fdf3ca3db669a7250ddf49c55dc4aa8fe0ae0fa8d6dcc1f0"}, {file = "pandas-2.2.2-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:40ae1dffb3967a52203105a077415a86044a2bea011b5f321c6aa64b379a3f51"}, @@ -4987,6 +5002,7 @@ files = [ {file = "pandas-2.2.2-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:43498c0bdb43d55cb162cdc8c06fac328ccb5d2eabe3cadeb3529ae6f0517c32"}, {file = "pandas-2.2.2-cp312-cp312-win_amd64.whl", hash = "sha256:d187d355ecec3629624fccb01d104da7d7f391db0311145817525281e2804d23"}, {file = "pandas-2.2.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0ca6377b8fca51815f382bd0b697a0814c8bda55115678cbc94c30aacbb6eff2"}, + {file = "pandas-2.2.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9057e6aa78a584bc93a13f0a9bf7e753a5e9770a30b4d758b8d5f2a62a9433cd"}, {file = "pandas-2.2.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:001910ad31abc7bf06f49dcc903755d2f7f3a9186c0c040b827e522e9cef0863"}, {file = "pandas-2.2.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:66b479b0bd07204e37583c191535505410daa8df638fd8e75ae1b383851fe921"}, {file = "pandas-2.2.2-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:a77e9d1c386196879aa5eb712e77461aaee433e54c68cf253053a73b7e49c33a"}, @@ -6323,6 +6339,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -9133,4 +9150,4 @@ networking = ["junos-eznc"] [metadata] lock-version = "2.0" python-versions = ">=3.10,<4" -content-hash = "cf015a53a700c0a0ab1b886bad0ef21ac193f7e955b17520975535a59533cb2c" +content-hash = "56d5aa90db79654efa0cbe90a801336e30dd3b50796a5ac0d598716fc61be238" diff --git a/pyproject.toml b/pyproject.toml index 1ad681abcb1..96a976f44e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ pydantic = "^2.7.4" daphne = "^4.1.2" striprtf = "^0.0.26" langrocks = "0.1.4" +diff-match-patch = "^20230430" [tool.poetry.group.faiss.dependencies] faiss-cpu = "^1.8.0"