Skip to content

Commit

Permalink
New coordinator initial commit (#289)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajhai authored Oct 16, 2024
1 parent 43494ef commit 614b493
Show file tree
Hide file tree
Showing 37 changed files with 1,192 additions and 929 deletions.
157 changes: 62 additions & 95 deletions llmstack/apps/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import uuid

import yaml
from asgiref.sync import async_to_sync
from channels.db import database_sync_to_async
from django.conf import settings
from django.core.validators import validate_email
from django.db.models import Q
from django.forms import ValidationError
from django.http import StreamingHttpResponse
from django.shortcuts import get_object_or_404
from django.shortcuts import aget_object_or_404, get_object_or_404
from django.utils.decorators import method_decorator
from django.views.decorators.cache import cache_page
from django.views.decorators.clickjacking import xframe_options_exempt
Expand All @@ -23,15 +24,13 @@
from rest_framework.response import Response as DRFResponse

from llmstack.apps.app_session_utils import create_app_session
from llmstack.apps.handlers.app_processor_runner import AppProcessorRunner
from llmstack.apps.handlers.app_runner_factory import AppRunnerFactory
from llmstack.apps.handlers.playground_runner import PlaygroundRunner
from llmstack.apps.integration_configs import (
DiscordIntegrationConfig,
SlackIntegrationConfig,
TwilioIntegrationConfig,
WebIntegrationConfig,
)
from llmstack.apps.runner.app_runner import AppRunner
from llmstack.apps.yaml_loader import (
get_app_template_by_slug,
get_app_templates_from_contrib,
Expand All @@ -41,7 +40,7 @@
from llmstack.connections.apis import ConnectionsViewSet
from llmstack.emails.sender import EmailSender
from llmstack.emails.templates.factory import EmailTemplateFactory
from llmstack.processors.providers.api_processors import ApiProcessorFactory
from llmstack.processors.providers.processors import ProcessorFactory

from .models import App, AppData, AppHub, AppType, AppVisibility
from .serializers import (
Expand Down Expand Up @@ -512,7 +511,7 @@ def patch(self, request, uid):
)
try:
for processor in processors_data:
processor_cls = ApiProcessorFactory.get_api_processor(
processor_cls = ProcessorFactory.get_api_processor(
processor["processor_slug"], processor["provider_slug"]
)
configuration_cls = processor_cls.get_configuration_cls()
Expand Down Expand Up @@ -708,12 +707,12 @@ def post(self, request):

@action(detail=True, methods=["post"])
@xframe_options_exempt
def run(self, request, uid, session_id=None, platform=None):
def run(self, request, app_uuid, session_id=None, platform=None):
stream = request.data.get("stream", False)
request_uuid = str(uuid.uuid4())
try:
result = self.run_app_internal(
uid,
app_uuid,
session_id,
request_uuid,
request,
Expand Down Expand Up @@ -746,10 +745,9 @@ async def init_app_async(self, uid):
return await database_sync_to_async(self.init_app)(uid)

def init_app(self, uid):
app = get_object_or_404(App, uuid=uuid.UUID(uid))
session_id = str(uuid.uuid4())

create_app_session(app, session_id)
create_app_session(session_id)

return session_id

Expand Down Expand Up @@ -822,12 +820,6 @@ def run_playground_internal(
preview=False,
disable_history=False,
):
from llmstack.apps.handlers.playground_runner import (
PlaygroundApp,
PlaygroundAppType,
)

stream = request.data.get("stream", True)
request_ip = request.headers.get("X-Forwarded-For", request.META.get("REMOTE_ADDR", "")).split(",")[
0
].strip() or request.META.get("HTTP_X_REAL_IP", "")
Expand All @@ -838,9 +830,6 @@ def run_playground_internal(
location = get_location(request_ip)
request_location = f"{location.get('city', '')}, {location.get('country_code', '')}" if location else ""

request_user_agent = request.META.get("HTTP_USER_AGENT", "")
request_content_type = request.META.get("CONTENT_TYPE", "")

if flag_enabled(
"HAS_EXCEEDED_MONTHLY_PROCESSOR_RUN_QUOTA",
request=request,
Expand All @@ -850,49 +839,9 @@ def run_playground_internal(
"You have exceeded your monthly processor run quota. Please upgrade your plan to continue using the platform.",
)

app_owner_profile = (
Profile.objects.get(user=request.user) if request.user.is_authenticated else AnonymousProfile()
)
owner_connections = get_connections(app_owner_profile) if request.user.is_authenticated else {}
processor_id = request.data["input"]["api_provider_slug"] + "_" + request.data["input"]["api_backend_slug"]
processor_cls = ApiProcessorFactory.get_api_processor(
request.data["input"]["api_backend_slug"],
request.data["input"]["api_provider_slug"],
)

app = PlaygroundApp(
id="",
uuid="ebb44d38-76b8-4735-b33b-5aadfe6470f9",
type=PlaygroundAppType(slug="playground"),
web_integration_config={},
is_published=True,
)
app_runner = PlaygroundRunner(
app=app,
app_data={
"processors": [
{
"id": processor_id,
"processor_slug": request.data["input"]["api_backend_slug"],
"provider_slug": request.data["input"]["api_provider_slug"],
"input": request.data["input"]["input"],
"config": request.data["input"]["config"],
}
],
"output_template": processor_cls.get_output_template(),
},
request_uuid=request_uuid,
request=request,
session_id=session_id,
app_owner=app_owner_profile,
stream=stream,
request_ip=request_ip,
request_location=request_location,
request_user_agent=request_user_agent,
request_content_type=request_content_type,
disable_history=False,
connections=owner_connections,
)
app_runner = None

return app_runner.run_app(processor_id=processor_id)

Expand Down Expand Up @@ -978,9 +927,58 @@ def run_platform_app_internal(

return app_runner.run_app()

async def get_app_runner_async(
self,
session_id,
app_uuid,
source,
request_user,
preview=False,
app_data=None,
):
runner_user = request_user
if not app_data:
app = await aget_object_or_404(App, uuid=uuid.UUID(app_uuid))
app_data_obj = (
await AppData.objects.filter(
app_uuid=app.uuid,
is_draft=preview,
)
.order_by("-created_at")
.afirst()
)
if not app_data_obj:
raise Exception("App data not found")
app_data = app_data_obj.data

if not runner_user:
runner_user = app.owner

app_run_user_profile = await Profile.objects.aget(user=runner_user)
vendor_env = {
"provider_configs": await database_sync_to_async(app_run_user_profile.get_merged_provider_configs)(),
}

return AppRunner(
session_id=session_id,
app_data=app_data,
source=source,
vendor_env=vendor_env,
)

def get_app_runner(self, session_id, app_uuid, source, request_user, preview=False, app_data=None):
return async_to_sync(self.get_app_runner_async)(
session_id,
app_uuid,
source,
request_user,
preview,
app_data,
)

def run_app_internal(
self,
uid,
app_uuid,
session_id,
request_uuid,
request,
Expand All @@ -991,12 +989,12 @@ def run_app_internal(
app_store_app_data=None,
):
app = (
get_object_or_404(App, uuid=uuid.UUID(uid))
get_object_or_404(App, uuid=uuid.UUID(app_uuid))
if not app_store_app_data
else App(
name=app_store_app_data.get("name", ""),
store_uuid=app_store_uuid,
uuid=uid if uid else app_store_uuid,
uuid=app_uuid if app_uuid else app_store_uuid,
owner=request.user,
type=AppType(slug=app_store_app_data.get("type_slug", "agent")),
is_published=True,
Expand Down Expand Up @@ -1069,16 +1067,6 @@ def run_app_internal(
)

app_runner_class = None
if platform == "discord":
app_runner_class = AppRunnerFactory.get_app_runner("discord")
elif platform == "slack":
app_runner_class = AppRunnerFactory.get_app_runner("slack")
elif platform == "twilio-sms":
app_runner_class = AppRunnerFactory.get_app_runner("twilio-sms")
elif platform == "twilio-voice":
app_runner_class = AppRunnerFactory.get_app_runner("twilio-voice")
else:
app_runner_class = AppRunnerFactory.get_app_runner(app.type.slug)

app_runner = app_runner_class(
app=app,
Expand Down Expand Up @@ -1110,9 +1098,6 @@ def run_processor_internal(
disable_history=False,
):
app = get_object_or_404(App, uuid=uuid.UUID(uid))
app_owner = get_object_or_404(Profile, user=app.owner)

stream = request.data.get("stream", False)

request_ip = request.headers.get(
"X-Forwarded-For",
Expand All @@ -1133,9 +1118,6 @@ def run_processor_internal(
location = get_location(request_ip)
request_location = f"{location.get('city', '')}, {location.get('country_code', '')}" if location else ""

request_user_agent = request.META.get("HTTP_USER_AGENT", "")
request_content_type = request.META.get("CONTENT_TYPE", "")

if flag_enabled(
"HAS_EXCEEDED_MONTHLY_PROCESSOR_RUN_QUOTA",
request=request,
Expand Down Expand Up @@ -1165,22 +1147,7 @@ def run_processor_internal(
.first()
)

owner_connections = get_connections(app_owner)
app_runner = AppProcessorRunner(
app=app,
app_data=app_data_obj.data if app_data_obj else None,
request_uuid=request_uuid,
request=request,
session_id=session_id,
app_owner=app_owner,
stream=stream,
request_ip=request_ip,
request_location=request_location,
request_user_agent=request_user_agent,
request_content_type=request_content_type,
disable_history=disable_history,
connections=owner_connections,
)
app_runner = None

return app_runner.run_app(processor_id=processor_id)

Expand Down
Loading

0 comments on commit 614b493

Please sign in to comment.