Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New coordinator initial commit #289

Merged
merged 20 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 62 additions & 95 deletions llmstack/apps/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import uuid

import yaml
from asgiref.sync import async_to_sync
from channels.db import database_sync_to_async
from django.conf import settings
from django.core.validators import validate_email
from django.db.models import Q
from django.forms import ValidationError
from django.http import StreamingHttpResponse
from django.shortcuts import get_object_or_404
from django.shortcuts import aget_object_or_404, get_object_or_404
from django.utils.decorators import method_decorator
from django.views.decorators.cache import cache_page
from django.views.decorators.clickjacking import xframe_options_exempt
Expand All @@ -23,15 +24,13 @@
from rest_framework.response import Response as DRFResponse

from llmstack.apps.app_session_utils import create_app_session
from llmstack.apps.handlers.app_processor_runner import AppProcessorRunner
from llmstack.apps.handlers.app_runner_factory import AppRunnerFactory
from llmstack.apps.handlers.playground_runner import PlaygroundRunner
from llmstack.apps.integration_configs import (
DiscordIntegrationConfig,
SlackIntegrationConfig,
TwilioIntegrationConfig,
WebIntegrationConfig,
)
from llmstack.apps.runner.app_runner import AppRunner
from llmstack.apps.yaml_loader import (
get_app_template_by_slug,
get_app_templates_from_contrib,
Expand All @@ -41,7 +40,7 @@
from llmstack.connections.apis import ConnectionsViewSet
from llmstack.emails.sender import EmailSender
from llmstack.emails.templates.factory import EmailTemplateFactory
from llmstack.processors.providers.api_processors import ApiProcessorFactory
from llmstack.processors.providers.processors import ProcessorFactory

from .models import App, AppData, AppHub, AppType, AppVisibility
from .serializers import (
Expand Down Expand Up @@ -512,7 +511,7 @@ def patch(self, request, uid):
)
try:
for processor in processors_data:
processor_cls = ApiProcessorFactory.get_api_processor(
processor_cls = ProcessorFactory.get_api_processor(
processor["processor_slug"], processor["provider_slug"]
)
configuration_cls = processor_cls.get_configuration_cls()
Expand Down Expand Up @@ -708,12 +707,12 @@ def post(self, request):

@action(detail=True, methods=["post"])
@xframe_options_exempt
def run(self, request, uid, session_id=None, platform=None):
def run(self, request, app_uuid, session_id=None, platform=None):
stream = request.data.get("stream", False)
request_uuid = str(uuid.uuid4())
try:
result = self.run_app_internal(
uid,
app_uuid,
session_id,
request_uuid,
request,
Expand Down Expand Up @@ -746,10 +745,9 @@ async def init_app_async(self, uid):
return await database_sync_to_async(self.init_app)(uid)

def init_app(self, uid):
app = get_object_or_404(App, uuid=uuid.UUID(uid))
session_id = str(uuid.uuid4())

create_app_session(app, session_id)
create_app_session(session_id)

return session_id

Expand Down Expand Up @@ -822,12 +820,6 @@ def run_playground_internal(
preview=False,
disable_history=False,
):
from llmstack.apps.handlers.playground_runner import (
PlaygroundApp,
PlaygroundAppType,
)

stream = request.data.get("stream", True)
request_ip = request.headers.get("X-Forwarded-For", request.META.get("REMOTE_ADDR", "")).split(",")[
0
].strip() or request.META.get("HTTP_X_REAL_IP", "")
Expand All @@ -838,9 +830,6 @@ def run_playground_internal(
location = get_location(request_ip)
request_location = f"{location.get('city', '')}, {location.get('country_code', '')}" if location else ""

request_user_agent = request.META.get("HTTP_USER_AGENT", "")
request_content_type = request.META.get("CONTENT_TYPE", "")

if flag_enabled(
"HAS_EXCEEDED_MONTHLY_PROCESSOR_RUN_QUOTA",
request=request,
Expand All @@ -850,49 +839,9 @@ def run_playground_internal(
"You have exceeded your monthly processor run quota. Please upgrade your plan to continue using the platform.",
)

app_owner_profile = (
Profile.objects.get(user=request.user) if request.user.is_authenticated else AnonymousProfile()
)
owner_connections = get_connections(app_owner_profile) if request.user.is_authenticated else {}
processor_id = request.data["input"]["api_provider_slug"] + "_" + request.data["input"]["api_backend_slug"]
processor_cls = ApiProcessorFactory.get_api_processor(
request.data["input"]["api_backend_slug"],
request.data["input"]["api_provider_slug"],
)

app = PlaygroundApp(
id="",
uuid="ebb44d38-76b8-4735-b33b-5aadfe6470f9",
type=PlaygroundAppType(slug="playground"),
web_integration_config={},
is_published=True,
)
app_runner = PlaygroundRunner(
app=app,
app_data={
"processors": [
{
"id": processor_id,
"processor_slug": request.data["input"]["api_backend_slug"],
"provider_slug": request.data["input"]["api_provider_slug"],
"input": request.data["input"]["input"],
"config": request.data["input"]["config"],
}
],
"output_template": processor_cls.get_output_template(),
},
request_uuid=request_uuid,
request=request,
session_id=session_id,
app_owner=app_owner_profile,
stream=stream,
request_ip=request_ip,
request_location=request_location,
request_user_agent=request_user_agent,
request_content_type=request_content_type,
disable_history=False,
connections=owner_connections,
)
app_runner = None

return app_runner.run_app(processor_id=processor_id)

Expand Down Expand Up @@ -978,9 +927,58 @@ def run_platform_app_internal(

return app_runner.run_app()

async def get_app_runner_async(
self,
session_id,
app_uuid,
source,
request_user,
preview=False,
app_data=None,
):
runner_user = request_user
if not app_data:
app = await aget_object_or_404(App, uuid=uuid.UUID(app_uuid))
app_data_obj = (
await AppData.objects.filter(
app_uuid=app.uuid,
is_draft=preview,
)
.order_by("-created_at")
.afirst()
)
if not app_data_obj:
raise Exception("App data not found")
app_data = app_data_obj.data

if not runner_user:
runner_user = app.owner

app_run_user_profile = await Profile.objects.aget(user=runner_user)
vendor_env = {
"provider_configs": await database_sync_to_async(app_run_user_profile.get_merged_provider_configs)(),
}

return AppRunner(
session_id=session_id,
app_data=app_data,
source=source,
vendor_env=vendor_env,
)

def get_app_runner(self, session_id, app_uuid, source, request_user, preview=False, app_data=None):
return async_to_sync(self.get_app_runner_async)(
session_id,
app_uuid,
source,
request_user,
preview,
app_data,
)

def run_app_internal(
self,
uid,
app_uuid,
session_id,
request_uuid,
request,
Expand All @@ -991,12 +989,12 @@ def run_app_internal(
app_store_app_data=None,
):
app = (
get_object_or_404(App, uuid=uuid.UUID(uid))
get_object_or_404(App, uuid=uuid.UUID(app_uuid))
if not app_store_app_data
else App(
name=app_store_app_data.get("name", ""),
store_uuid=app_store_uuid,
uuid=uid if uid else app_store_uuid,
uuid=app_uuid if app_uuid else app_store_uuid,
owner=request.user,
type=AppType(slug=app_store_app_data.get("type_slug", "agent")),
is_published=True,
Expand Down Expand Up @@ -1069,16 +1067,6 @@ def run_app_internal(
)

app_runner_class = None
if platform == "discord":
app_runner_class = AppRunnerFactory.get_app_runner("discord")
elif platform == "slack":
app_runner_class = AppRunnerFactory.get_app_runner("slack")
elif platform == "twilio-sms":
app_runner_class = AppRunnerFactory.get_app_runner("twilio-sms")
elif platform == "twilio-voice":
app_runner_class = AppRunnerFactory.get_app_runner("twilio-voice")
else:
app_runner_class = AppRunnerFactory.get_app_runner(app.type.slug)

app_runner = app_runner_class(
app=app,
Expand Down Expand Up @@ -1110,9 +1098,6 @@ def run_processor_internal(
disable_history=False,
):
app = get_object_or_404(App, uuid=uuid.UUID(uid))
app_owner = get_object_or_404(Profile, user=app.owner)

stream = request.data.get("stream", False)

request_ip = request.headers.get(
"X-Forwarded-For",
Expand All @@ -1133,9 +1118,6 @@ def run_processor_internal(
location = get_location(request_ip)
request_location = f"{location.get('city', '')}, {location.get('country_code', '')}" if location else ""

request_user_agent = request.META.get("HTTP_USER_AGENT", "")
request_content_type = request.META.get("CONTENT_TYPE", "")

if flag_enabled(
"HAS_EXCEEDED_MONTHLY_PROCESSOR_RUN_QUOTA",
request=request,
Expand Down Expand Up @@ -1165,22 +1147,7 @@ def run_processor_internal(
.first()
)

owner_connections = get_connections(app_owner)
app_runner = AppProcessorRunner(
app=app,
app_data=app_data_obj.data if app_data_obj else None,
request_uuid=request_uuid,
request=request,
session_id=session_id,
app_owner=app_owner,
stream=stream,
request_ip=request_ip,
request_location=request_location,
request_user_agent=request_user_agent,
request_content_type=request_content_type,
disable_history=disable_history,
connections=owner_connections,
)
app_runner = None

return app_runner.run_app(processor_id=processor_id)

Expand Down
Loading
Loading