Skip to content

Commit

Permalink
Make playground work with new coordinator model
Browse files Browse the repository at this point in the history
  • Loading branch information
vegito22 committed Oct 17, 2024
1 parent 76cbb23 commit 526ce79
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 18 deletions.
50 changes: 50 additions & 0 deletions llmstack/apps/apis.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import os
import uuid
Expand Down Expand Up @@ -1240,3 +1241,52 @@ def run_with_app_data(self, request):
)

return DRFResponse(data=response, status=200)


class PlaygroundViewSet(viewsets.ViewSet):
async def get_app_runner_async(self, session_id, source, request_user, input_data, config_data):
runner_user = request_user
processor_slug = source.processor_slug
provider_slug = source.provider_slug
app_run_user_profile = await Profile.objects.aget(user=runner_user)

vendor_env = {
"provider_configs": await database_sync_to_async(app_run_user_profile.get_merged_provider_configs)(),
}

processor_cls = ProcessorFactory.get_processor(processor_slug=processor_slug, provider_slug=provider_slug)
input_schema = json.loads(processor_cls.get_input_schema())
input_fields = []
for property in input_schema["properties"]:
input_fields.append({"name": property, "type": input_schema["properties"][property]["type"]})

app_data = {
"name": f"Processor {provider_slug}_{processor_slug}",
"config": {},
"type_slug": "",
"processors": [
{
"id": "processor",
"name": processor_cls.name(),
"input": input_data,
"config": config_data,
"description": processor_cls.description(),
"dependencies": ["input"],
"provider_slug": provider_slug,
"processor_slug": processor_slug,
"output_template": processor_cls.get_output_template().model_dump(),
}
],
"description": "",
"input_fields": input_fields,
"output_template": {"markdown": "{{processor}}"},
}
return AppRunner(
session_id=session_id,
app_data=app_data,
source=source,
vendor_env=vendor_env,
)

def get_app_runner(self, session_id, source, request_user, input_data, config_data):
return async_to_sync(self.get_app_runner_async)(session_id, source, request_user, input_data, config_data)
10 changes: 10 additions & 0 deletions llmstack/apps/runner/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ def id(self):
return self.app_uuid


class PlaygroundAppRunnerSource(AppRunnerSource):
type: AppRunnerSourceType = AppRunnerSourceType.PLAYGROUND
provider_slug: str
processor_slug: str

@property
def id(self):
return f"{self.provider_slug}-{self.processor_slug}"


class PlatformAppRunnerSource(AppRunnerSource):
type: AppRunnerSourceType = AppRunnerSourceType.PLATFORM
slug: str
Expand Down
105 changes: 87 additions & 18 deletions llmstack/server/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from llmstack.apps.runner.app_runner import (
AppRunnerRequest,
AppRunnerStreamingResponseType,
PlaygroundAppRunnerSource,
WebAppRunnerSource,
)
from llmstack.assets.utils import get_asset_by_objref
Expand Down Expand Up @@ -427,32 +428,100 @@ async def receive(self, text_data=None, bytes_data=None):
self.disconnect(1000)


class PlaygroundConsumer(AppConsumer):
# class PlaygroundConsumer(AppConsumer):
# async def connect(self):
# self.app_id = None
# self.preview = False
# self._session_id = None
# self._coordinator_ref = None
# await self.accept()

# async def _run_app(self, request_uuid, request, **kwargs):
# from llmstack.apps.apis import AppViewSet

# if is_usage_limited_fn(request, self._run_app):
# raise UsageLimitReached("Usage limit reached. Please login to continue.")

# if await _usage_limit_exceeded(request, request.user):
# raise OutOfCredits(
# "You have exceeded your usage credits. Please add credits to your account from settings to continue using the platform.",
# )

# return await AppViewSet().run_playground_internal_async(
# session_id=self._session_id,
# request_uuid=request_uuid,
# request=request,
# preview=self.preview,
# )


class PlaygroundConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.app_id = None
self.preview = False
self._session_id = None
self._coordinator_ref = None
headers = dict(self.scope["headers"])
request_ip = headers.get("X-Forwarded-For", self.scope.get("client", [""])[0] or "").split(",")[
0
].strip() or headers.get("X-Real-IP", "")
request_location = headers.get("X-Client-Geo-Location", "")
if not request_location:
location = get_location(request_ip)
request_location = f"{location.get('city', '')}, {location.get('country_code', '')}" if location else ""

request_user_email = self.scope.get("user", None).email if self.scope.get("user", None) else None

self._source = PlaygroundAppRunnerSource(
request_ip=request_ip,
request_location=request_location,
request_user_agent=headers.get("User-Agent", ""),
request_content_type=headers.get("Content-Type", ""),
request_user_email=request_user_email,
processor_slug="",
provider_slug="",
)
await self.accept()

async def _run_app(self, request_uuid, request, **kwargs):
from llmstack.apps.apis import AppViewSet
async def disconnect(self, close_code):
pass

if is_usage_limited_fn(request, self._run_app):
raise UsageLimitReached("Usage limit reached. Please login to continue.")
async def _respond_to_event(self, text_data):
from llmstack.apps.apis import PlaygroundViewSet

if await _usage_limit_exceeded(request, request.user):
raise OutOfCredits(
"You have exceeded your usage credits. Please add credits to your account from settings to continue using the platform.",
)
json_data = json.loads(text_data)
event_input = json_data.get("input", {})

processor_slug = event_input.get("api_backend_slug")
provider_slug = event_input.get("api_provider_slug")
input_data = event_input.get("input", {})
config_data = event_input.get("config", {})

session_id = str(uuid.uuid4())
source = self._source.model_copy(
update={
"session_id": session_id,
"processor_slug": processor_slug,
"provider_slug": provider_slug,
}
)

return await AppViewSet().run_playground_internal_async(
session_id=self._session_id,
request_uuid=request_uuid,
request=request,
preview=self.preview,
client_request_id = json_data.get("id", None)
app_runner_request = AppRunnerRequest(
client_request_id=client_request_id, session_id=session_id, input=input_data
)

app_runner = await PlaygroundViewSet().get_app_runner_async(
session_id, source, self.scope.get("user", None), input_data, config_data
)
try:
response_iterator = app_runner.run(app_runner_request)
async for response in response_iterator:
if response.type == AppRunnerStreamingResponseType.OUTPUT_STREAM_CHUNK:
await self.send(text_data=json.dumps(response.model_dump()))
except Exception as e:
logger.exception(f"Failed to run app: {e}")
await app_runner.stop()

async def receive(self, text_data):
run_coro_in_new_loop(self._respond_to_event(text_data))


class AppStoreAppConsumer(AppConsumer):
async def _run_app(self, request_uuid, request, **kwargs):
Expand Down

0 comments on commit 526ce79

Please sign in to comment.