From 0c4d460e42d12775250163a5a27aaa0b171db389 Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Tue, 10 Dec 2024 22:11:21 -0700 Subject: [PATCH] chore: tasks --- apiclient/invoke.go | 3 + apiclient/types/agent.go | 1 + apiclient/types/cronjob.go | 1 - apiclient/types/emailreceiver.go | 1 - apiclient/types/invoke.go | 4 + apiclient/types/tables.go | 20 + apiclient/types/tasks.go | 1 + apiclient/types/thread.go | 23 +- apiclient/types/webhook.go | 1 - apiclient/types/workflow.go | 107 +---- apiclient/types/workflowexecution.go | 12 + apiclient/types/zz_generated.deepcopy.go | 107 ++++- apiclient/workflow.go | 36 +- pkg/api/handlers/tables.go | 120 ++++++ pkg/api/handlers/tasks.go | 192 ++++++--- pkg/api/handlers/threads.go | 67 ++++ pkg/api/handlers/webhooks.go | 34 +- pkg/api/handlers/workflows.go | 21 + pkg/api/router/router.go | 10 + pkg/cli/tools_register.go | 2 +- pkg/controller/data/otto.yaml | 2 + pkg/controller/handlers/cronjob/cronjob.go | 2 +- pkg/controller/handlers/workflow/files.go | 10 + pkg/controller/handlers/workflow/workflow.go | 25 -- .../workflowexecution/workflowexecution.go | 22 + pkg/controller/routes.go | 4 +- pkg/events/events.go | 21 +- pkg/invoke/invoker.go | 19 +- pkg/invoke/step.go | 4 + pkg/invoke/workflow.go | 6 +- pkg/render/render.go | 90 ++--- pkg/render/workflow.go | 24 +- pkg/smtp/smtp.go | 52 ++- pkg/storage/apis/otto.otto8.ai/v1/agent.go | 2 + pkg/storage/apis/otto.otto8.ai/v1/cronjob.go | 7 +- .../apis/otto.otto8.ai/v1/emailaddress.go | 7 +- pkg/storage/apis/otto.otto8.ai/v1/thread.go | 1 + pkg/storage/apis/otto.otto8.ai/v1/webhook.go | 7 +- pkg/storage/apis/otto.otto8.ai/v1/workflow.go | 27 +- .../otto.otto8.ai/v1/workflowexecution.go | 17 +- .../apis/otto.otto8.ai/v1/workspace.go | 4 +- .../otto.otto8.ai/v1/zz_generated.deepcopy.go | 15 + .../openapi/generated/openapi_generated.go | 346 ++++++++++++---- pkg/system/tools.go | 1 + ui/user/src/lib/actions/textarea.ts | 6 +- ui/user/src/lib/components/Editors.svelte | 22 +- ui/user/src/lib/components/Messages.svelte | 58 --- ui/user/src/lib/components/Navbar.svelte | 40 +- ui/user/src/lib/components/Thread.svelte | 72 ++++ .../lib/components/editor/Codemirror.svelte | 4 +- .../src/lib/components/editor/Milkdown.svelte | 2 +- .../src/lib/components/editor/Table.svelte | 84 ++++ .../src/lib/components/messages/Input.svelte | 36 +- .../lib/components/messages/Message.svelte | 100 +++-- .../components/messages/MessageIcon.svelte | 2 +- ui/user/src/lib/components/navbar/Menu.svelte | 48 ++- .../src/lib/components/navbar/Profile.svelte | 113 ++---- .../src/lib/components/navbar/Tables.svelte | 50 +++ .../src/lib/components/navbar/Tasks.svelte | 4 +- .../src/lib/components/tasks/Dropdown.svelte | 78 ++-- ui/user/src/lib/components/tasks/Input.svelte | 162 ++++++++ .../src/lib/components/tasks/OnDemand.svelte | 81 ++-- ui/user/src/lib/components/tasks/Runs.svelte | 156 ++++++-- .../src/lib/components/tasks/Schedule.svelte | 2 +- ui/user/src/lib/components/tasks/Step.svelte | 377 ++++++------------ ui/user/src/lib/components/tasks/Steps.svelte | 165 +++++--- ui/user/src/lib/components/tasks/Table.svelte | 24 +- ui/user/src/lib/components/tasks/Task.svelte | 90 ++++- .../src/lib/components/tasks/Trigger.svelte | 99 +---- ui/user/src/lib/components/tasks/Type.svelte | 133 ++++++ ui/user/src/lib/services/chat/index.ts | 2 +- ui/user/src/lib/services/chat/messages.ts | 24 +- .../src/lib/services/chat/messagesource.ts | 57 --- ui/user/src/lib/services/chat/operations.ts | 44 +- .../src/lib/services/chat/thread.svelte.ts | 133 ++++++ ui/user/src/lib/services/chat/types.ts | 26 +- .../src/lib/services/editor/index.svelte.ts | 72 ++-- ui/user/src/lib/stores/assistants.ts | 29 +- ui/user/src/lib/stores/currentassistant.ts | 44 +- ui/user/src/lib/stores/editor.svelte.ts | 2 + ui/user/src/lib/stores/index.ts | 4 +- ui/user/src/lib/stores/preferredtheme.ts | 8 +- ui/user/src/lib/stores/profile.ts | 10 +- ui/user/src/lib/stores/stepmessages.svelte.ts | 60 --- ui/user/src/lib/stores/storeinit.ts | 61 --- ui/user/src/lib/stores/tasks.svelte.ts | 7 +- ui/user/src/lib/stores/theme.ts | 10 +- ui/user/src/lib/stores/tools.ts | 13 +- ui/user/src/lib/time.ts | 22 + ui/user/src/routes/[agent]/+page.svelte | 44 +- 90 files changed, 2616 insertions(+), 1542 deletions(-) create mode 100644 apiclient/types/tables.go create mode 100644 apiclient/types/workflowexecution.go create mode 100644 pkg/api/handlers/tables.go delete mode 100644 ui/user/src/lib/components/Messages.svelte create mode 100644 ui/user/src/lib/components/Thread.svelte create mode 100644 ui/user/src/lib/components/editor/Table.svelte create mode 100644 ui/user/src/lib/components/navbar/Tables.svelte create mode 100644 ui/user/src/lib/components/tasks/Input.svelte create mode 100644 ui/user/src/lib/components/tasks/Type.svelte delete mode 100644 ui/user/src/lib/services/chat/messagesource.ts create mode 100644 ui/user/src/lib/services/chat/thread.svelte.ts delete mode 100644 ui/user/src/lib/stores/stepmessages.svelte.ts delete mode 100644 ui/user/src/lib/stores/storeinit.ts create mode 100644 ui/user/src/lib/time.ts diff --git a/apiclient/invoke.go b/apiclient/invoke.go index 689a77ca..3c9c1b1b 100644 --- a/apiclient/invoke.go +++ b/apiclient/invoke.go @@ -35,7 +35,10 @@ func (c *Client) Invoke(ctx context.Context, agentID string, input string, opts if opts.Async { _, _ = io.Copy(io.Discard, resp.Body) resp.Body.Close() + events := make(chan types.Progress) + close(events) return &types.InvokeResponse{ + Events: events, ThreadID: resp.Header.Get("X-Otto-Thread-Id"), }, nil } diff --git a/apiclient/types/agent.go b/apiclient/types/agent.go index e8d5d6dd..256c82be 100644 --- a/apiclient/types/agent.go +++ b/apiclient/types/agent.go @@ -43,6 +43,7 @@ type AgentManifest struct { MaxThreadTools int `json:"maxThreadTools"` Params map[string]string `json:"params"` Model string `json:"model"` + Env []EnvVar `json:"env"` } func (m AgentManifest) GetParams() *openapi3.Schema { diff --git a/apiclient/types/cronjob.go b/apiclient/types/cronjob.go index 0aafdbd6..ebedb6d9 100644 --- a/apiclient/types/cronjob.go +++ b/apiclient/types/cronjob.go @@ -13,7 +13,6 @@ type CronJobManifest struct { Schedule string `json:"schedule,omitempty"` Workflow string `json:"workflow,omitempty"` Input string `json:"input,omitempty"` - UserID string `json:"userID,omitempty"` TaskSchedule *Schedule `json:"taskSchedule,omitempty"` } diff --git a/apiclient/types/emailreceiver.go b/apiclient/types/emailreceiver.go index 366e2f63..b709b298 100644 --- a/apiclient/types/emailreceiver.go +++ b/apiclient/types/emailreceiver.go @@ -11,7 +11,6 @@ type EmailReceiverManifest struct { Name string `json:"name"` Description string `json:"description"` User string `json:"user,omitempty"` - UserID string `json:"userID,omitempty"` Workflow string `json:"workflow"` AllowedSenders []string `json:"allowedSenders,omitempty"` } diff --git a/apiclient/types/invoke.go b/apiclient/types/invoke.go index ac7a8be6..ba5136d7 100644 --- a/apiclient/types/invoke.go +++ b/apiclient/types/invoke.go @@ -18,6 +18,9 @@ type Progress struct { // If RunID is not populated, the event will not specify tied to any particular run RunID string `json:"runID,omitempty"` + // ParentRunID is the parent run of the run that is specified in the RunID field + ParentRunID string `json:"parentRunID,omitempty"` + // Time is the time the event was generated Time *Time `json:"time,omitempty"` @@ -92,6 +95,7 @@ type ToolCall struct { Name string `json:"name,omitempty"` Description string `json:"description,omitempty"` Input string `json:"input,omitempty"` + Output string `json:"output,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` } diff --git a/apiclient/types/tables.go b/apiclient/types/tables.go new file mode 100644 index 00000000..a2648587 --- /dev/null +++ b/apiclient/types/tables.go @@ -0,0 +1,20 @@ +package types + +type Table struct { + Name string `json:"name"` +} + +type TableList List[Table] + +// +k8s:deepcopy-gen=false + +// +k8s:openapi-gen=false +type TableRow struct { + Columns []string `json:"columns,omitempty"` + Values map[string]any `json:"values"` +} + +// +k8s:deepcopy-gen=false + +// +k8s:openapi-gen=false +type TableRowList List[TableRow] diff --git a/apiclient/types/tasks.go b/apiclient/types/tasks.go index e9f49434..135c8006 100644 --- a/apiclient/types/tasks.go +++ b/apiclient/types/tasks.go @@ -52,6 +52,7 @@ type TaskIf struct { type TaskRun struct { Metadata TaskID string `json:"taskID,omitempty"` + Input string `json:"input,omitempty"` Task TaskManifest `json:"task,omitempty"` StartTime *Time `json:"startTime,omitempty"` EndTime *Time `json:"endTime,omitempty"` diff --git a/apiclient/types/thread.go b/apiclient/types/thread.go index e4db8594..f9bba3ba 100644 --- a/apiclient/types/thread.go +++ b/apiclient/types/thread.go @@ -22,17 +22,18 @@ func (in WorkflowState) IsTerminal() bool { type Thread struct { Metadata ThreadManifest - AgentID string `json:"agentID,omitempty"` - WorkflowID string `json:"workflowID,omitempty"` - WebhookID string `json:"webhookID,omitempty"` - EmailReceiverID string `json:"emailReceiverID,omitempty"` - State string `json:"state,omitempty"` - LastRunID string `json:"lastRunID,omitempty"` - CurrentRunID string `json:"currentRunID,omitempty"` - ParentThreadID string `json:"parentThreadID,omitempty"` - UserID string `json:"userID,omitempty"` - AgentAlias string `json:"agentAlias,omitempty"` - Abort bool `json:"abort,omitempty"` + AgentID string `json:"agentID,omitempty"` + WorkflowID string `json:"workflowID,omitempty"` + WebhookID string `json:"webhookID,omitempty"` + EmailReceiverID string `json:"emailReceiverID,omitempty"` + State string `json:"state,omitempty"` + LastRunID string `json:"lastRunID,omitempty"` + CurrentRunID string `json:"currentRunID,omitempty"` + ParentThreadID string `json:"parentThreadID,omitempty"` + UserID string `json:"userID,omitempty"` + AgentAlias string `json:"agentAlias,omitempty"` + Abort bool `json:"abort,omitempty"` + Env []string `json:"env,omitempty"` } type ThreadList List[Thread] diff --git a/apiclient/types/webhook.go b/apiclient/types/webhook.go index 4fd07880..abf95b27 100644 --- a/apiclient/types/webhook.go +++ b/apiclient/types/webhook.go @@ -16,7 +16,6 @@ type WebhookManifest struct { Headers []string `json:"headers"` Secret string `json:"secret"` ValidationHeader string `json:"validationHeader"` - UserID string `json:"userID,omitempty"` } type WebhookList List[Webhook] diff --git a/apiclient/types/workflow.go b/apiclient/types/workflow.go index 3fe0dbfe..a351101c 100644 --- a/apiclient/types/workflow.go +++ b/apiclient/types/workflow.go @@ -14,13 +14,11 @@ type WorkflowList List[Workflow] type WorkflowManifest struct { AgentManifest `json:",inline"` - Credentials []string `json:"credentials"` - Env []WorkflowEnv `json:"env"` - Steps []Step `json:"steps"` - Output string `json:"output"` + Steps []Step `json:"steps"` + Output string `json:"output"` } -type WorkflowEnv struct { +type EnvVar struct { Name string `json:"name"` Value string `json:"value"` Description string `json:"description"` @@ -86,15 +84,15 @@ func (s Step) Display() string { } if s.While != nil { preamble.WriteString(" while ") - preamble.WriteString(oneline(s.While.Condition)) + preamble.WriteString(oneLine(s.While.Condition)) } if s.If != nil { preamble.WriteString(" if ") - preamble.WriteString(oneline(s.If.Condition)) + preamble.WriteString(oneLine(s.If.Condition)) } if s.Step != "" { preamble.WriteString(" ") - preamble.WriteString(oneline(s.Step)) + preamble.WriteString(oneLine(s.Step)) } return preamble.String() } @@ -116,7 +114,7 @@ type While struct { Steps []Step `json:"steps,omitempty"` } -func oneline(s string) string { +func oneLine(s string) string { l := strings.Split(s, "\n")[0] if len(l) > 80 { return l[:80] + "..." @@ -124,97 +122,6 @@ func oneline(s string) string { return l } -func DeleteStep(manifest *WorkflowManifest, id string) *WorkflowManifest { - if manifest == nil || id == "" { - return nil - } - - result := manifest.DeepCopy() - lookupID, _, _ := strings.Cut(id, "{") - result.Steps = deleteStep(manifest.Steps, lookupID) - return result -} - -func deleteStep(steps []Step, id string) []Step { - newSteps := make([]Step, 0, len(steps)) - for _, step := range steps { - if step.ID != id { - if step.While != nil { - step.While.Steps = deleteStep(step.While.Steps, id) - } - if step.If != nil { - step.If.Steps = deleteStep(step.If.Steps, id) - step.If.Else = deleteStep(step.If.Else, id) - } - newSteps = append(newSteps, step) - } - } - return newSteps -} - -func AppendStep(manifest *WorkflowManifest, parentID string, step Step) *WorkflowManifest { - if manifest == nil { - return nil - } - - parentID, addToElse := strings.CutSuffix(parentID, "::else") - - result := manifest.DeepCopy() - if parentID == "" { - result.Steps = append(result.Steps, step) - return result - } - - lookupID, _, _ := strings.Cut(parentID, "{") - result.Steps = appendStep(result.Steps, lookupID, addToElse, step) - return result -} - -func appendStep(steps []Step, id string, addToElse bool, stepToAdd Step) []Step { - result := make([]Step, 0, len(steps)) - - for _, step := range steps { - if step.ID != id { - if step.If != nil { - step.If.Steps = appendStep(step.If.Steps, id, addToElse, stepToAdd) - step.If.Else = appendStep(step.If.Else, id, addToElse, stepToAdd) - } - if step.While != nil { - step.While.Steps = appendStep(step.While.Steps, id, addToElse, stepToAdd) - } - result = append(result, step) - continue - } - - if step.If != nil { - if addToElse { - step.If.Else = append(step.If.Else, stepToAdd) - } else { - step.If.Steps = append(step.If.Steps, stepToAdd) - } - } else if step.While != nil { - step.While.Steps = append(step.While.Steps, stepToAdd) - } - - result = append(result, step) - } - - return result -} - -func SetStep(manifest *WorkflowManifest, step Step) { - id := step.ID - if manifest == nil || id == "" { - return - } - lookupID, _, _ := strings.Cut(id, "{") - found, _ := findInSteps("", manifest.Steps, lookupID) - if found != nil { - *found = step - } - return -} - func FindStep(manifest *WorkflowManifest, id string) (_ *Step, parentID string) { if manifest == nil || id == "" { return nil, "" diff --git a/apiclient/types/workflowexecution.go b/apiclient/types/workflowexecution.go new file mode 100644 index 00000000..b37b426c --- /dev/null +++ b/apiclient/types/workflowexecution.go @@ -0,0 +1,12 @@ +package types + +type WorkflowExecution struct { + Metadata + Workflow WorkflowManifest `json:"workflow,omitempty"` + StartTime Time `json:"startTime"` + EndTime *Time `json:"endTime"` + Input string `json:"input"` + Error string `json:"error,omitempty"` +} + +type WorkflowExecutionList List[WorkflowExecution] diff --git a/apiclient/types/zz_generated.deepcopy.go b/apiclient/types/zz_generated.deepcopy.go index 9b084b8a..9dcb05b5 100644 --- a/apiclient/types/zz_generated.deepcopy.go +++ b/apiclient/types/zz_generated.deepcopy.go @@ -129,6 +129,11 @@ func (in *AgentManifest) DeepCopyInto(out *AgentManifest) { (*out)[key] = val } } + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]EnvVar, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AgentManifest. @@ -447,6 +452,21 @@ func (in *EmailReceiverManifest) DeepCopy() *EmailReceiverManifest { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EnvVar) DeepCopyInto(out *EnvVar) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EnvVar. +func (in *EnvVar) DeepCopy() *EnvVar { + if in == nil { + return nil + } + out := new(EnvVar) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ErrHTTP) DeepCopyInto(out *ErrHTTP) { *out = *in @@ -1286,6 +1306,41 @@ func (in *SubFlow) DeepCopy() *SubFlow { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Table) DeepCopyInto(out *Table) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Table. +func (in *Table) DeepCopy() *Table { + if in == nil { + return nil + } + out := new(Table) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TableList) DeepCopyInto(out *TableList) { + *out = *in + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Table, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TableList. +func (in *TableList) DeepCopy() *TableList { + if in == nil { + return nil + } + out := new(TableList) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Task) DeepCopyInto(out *Task) { *out = *in @@ -1542,6 +1597,11 @@ func (in *Thread) DeepCopyInto(out *Thread) { *out = *in in.Metadata.DeepCopyInto(&out.Metadata) in.ThreadManifest.DeepCopyInto(&out.ThreadManifest) + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Thread. @@ -1904,16 +1964,45 @@ func (in *WorkflowCall) DeepCopy() *WorkflowCall { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *WorkflowEnv) DeepCopyInto(out *WorkflowEnv) { +func (in *WorkflowExecution) DeepCopyInto(out *WorkflowExecution) { *out = *in + in.Metadata.DeepCopyInto(&out.Metadata) + in.Workflow.DeepCopyInto(&out.Workflow) + in.StartTime.DeepCopyInto(&out.StartTime) + if in.EndTime != nil { + in, out := &in.EndTime, &out.EndTime + *out = (*in).DeepCopy() + } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowEnv. -func (in *WorkflowEnv) DeepCopy() *WorkflowEnv { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowExecution. +func (in *WorkflowExecution) DeepCopy() *WorkflowExecution { if in == nil { return nil } - out := new(WorkflowEnv) + out := new(WorkflowExecution) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WorkflowExecutionList) DeepCopyInto(out *WorkflowExecutionList) { + *out = *in + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]WorkflowExecution, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowExecutionList. +func (in *WorkflowExecutionList) DeepCopy() *WorkflowExecutionList { + if in == nil { + return nil + } + out := new(WorkflowExecutionList) in.DeepCopyInto(out) return out } @@ -1944,16 +2033,6 @@ func (in *WorkflowList) DeepCopy() *WorkflowList { func (in *WorkflowManifest) DeepCopyInto(out *WorkflowManifest) { *out = *in in.AgentManifest.DeepCopyInto(&out.AgentManifest) - if in.Credentials != nil { - in, out := &in.Credentials, &out.Credentials - *out = make([]string, len(*in)) - copy(*out, *in) - } - if in.Env != nil { - in, out := &in.Env, &out.Env - *out = make([]WorkflowEnv, len(*in)) - copy(*out, *in) - } if in.Steps != nil { in, out := &in.Steps, &out.Steps *out = make([]Step, len(*in)) diff --git a/apiclient/workflow.go b/apiclient/workflow.go index e43157c6..e9204ae7 100644 --- a/apiclient/workflow.go +++ b/apiclient/workflow.go @@ -38,8 +38,35 @@ func (c *Client) CreateWorkflow(ctx context.Context, workflow types.WorkflowMani return toObject(resp, &types.Workflow{}) } +type ListWorkflowExecutionsOptions struct { + ThreadID string +} + +func (c *Client) ListWorkflowExecutions(ctx context.Context, workflowID string, opts ListWorkflowExecutionsOptions) (result types.WorkflowExecutionList, err error) { + defer func() { + sort.Slice(result.Items, func(i, j int) bool { + return result.Items[i].Metadata.Created.Time.Before(result.Items[j].Metadata.Created.Time) + }) + }() + + url := fmt.Sprintf("/workflows/%s/executions", workflowID) + if opts.ThreadID != "" { + url = fmt.Sprintf("/threads/%s/workflows/%s/executions", opts.ThreadID, workflowID) + } + + _, resp, err := c.doRequest(ctx, http.MethodGet, url, nil) + if err != nil { + return + } + defer resp.Body.Close() + + _, err = toObject(resp, &result) + return +} + type ListWorkflowsOptions struct { - Alias string + Alias string + ThreadID string } func (c *Client) ListWorkflows(ctx context.Context, opts ListWorkflowsOptions) (result types.WorkflowList, err error) { @@ -49,7 +76,12 @@ func (c *Client) ListWorkflows(ctx context.Context, opts ListWorkflowsOptions) ( }) }() - _, resp, err := c.doRequest(ctx, http.MethodGet, "/workflows", nil) + url := "/workflows" + if opts.ThreadID != "" { + url = fmt.Sprintf("/threads/%s/workflows", opts.ThreadID) + } + + _, resp, err := c.doRequest(ctx, http.MethodGet, url, nil) if err != nil { return } diff --git a/pkg/api/handlers/tables.go b/pkg/api/handlers/tables.go new file mode 100644 index 00000000..09b126af --- /dev/null +++ b/pkg/api/handlers/tables.go @@ -0,0 +1,120 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "regexp" + + "github.com/gptscript-ai/go-gptscript" + "github.com/otto8-ai/otto8/apiclient/types" + "github.com/otto8-ai/otto8/pkg/api" + v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" +) + +type TableHandler struct { + gptScript *gptscript.GPTScript +} + +func NewTableHandler(gptScript *gptscript.GPTScript) *TableHandler { + return &TableHandler{ + gptScript: gptScript, + } +} + +func (t *TableHandler) tables(req api.Context, workspaceID string) (string, error) { + var toolRef v1.ToolReference + if err := req.Get(&toolRef, "database-tables"); err != nil { + return "", err + } + run, err := t.gptScript.Run(req.Context(), toolRef.Status.Reference, gptscript.Options{ + Workspace: workspaceID, + }) + if err != nil { + return "", err + } + defer run.Close() + return run.Text() +} + +func (t *TableHandler) rows(req api.Context, workspaceID, tableName string) (string, error) { + var toolRef v1.ToolReference + if err := req.Get(&toolRef, "database-query"); err != nil { + return "", err + } + input, err := json.Marshal(map[string]string{ + "query": fmt.Sprintf("SELECT * FROM '%s';", tableName), + }) + if err != nil { + return "", err + } + run, err := t.gptScript.Run(req.Context(), toolRef.Status.Reference, gptscript.Options{ + Input: string(input), + Workspace: workspaceID, + }) + if err != nil { + return "", err + } + defer run.Close() + return run.Text() +} + +func (t *TableHandler) ListTables(req api.Context) error { + var ( + assistantID = req.PathValue("assistant_id") + result = types.TableList{ + Items: []types.Table{}, + } + ) + + thread, err := getUserThread(req, assistantID) + if err != nil { + return err + } + + if thread.Status.WorkspaceID == "" { + return req.Write(result) + } + + content, err := t.tables(req, thread.Status.WorkspaceID) + if err != nil { + return err + } + + req.ResponseWriter.Header().Set("Content-Type", "application/json") + _, err = req.ResponseWriter.Write([]byte(content)) + return err +} + +var validTableName = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) + +func (t *TableHandler) GetRows(req api.Context) error { + var ( + assistantID = req.PathValue("assistant_id") + tableName = req.PathValue("table_name") + result = types.TableRowList{ + Items: []types.TableRow{}, + } + ) + + if validTableName.MatchString(tableName) == false { + return types.NewErrBadRequest("invalid table name %s", tableName) + } + + thread, err := getUserThread(req, assistantID) + if err != nil { + return err + } + + if thread.Status.WorkspaceID == "" { + return req.Write(result) + } + + content, err := t.rows(req, thread.Status.WorkspaceID, tableName) + if err != nil { + return err + } + + req.ResponseWriter.Header().Set("Content-Type", "application/json") + _, err = req.ResponseWriter.Write([]byte(content)) + return err +} diff --git a/pkg/api/handlers/tasks.go b/pkg/api/handlers/tasks.go index 6259cb7b..6d5514eb 100644 --- a/pkg/api/handlers/tasks.go +++ b/pkg/api/handlers/tasks.go @@ -35,7 +35,7 @@ func NewTaskHandler(invoker *invoke.Invoker, events *events.Emitter) *TaskHandle func (t *TaskHandler) Abort(req api.Context) error { var taskRunID = req.PathValue("run_id") - workflow, err := t.getTask(req) + workflow, userThread, err := t.getTask(req) if err != nil { return err } @@ -56,6 +56,10 @@ func (t *TaskHandler) Abort(req api.Context) error { return err } + if wfe.Spec.ThreadName != userThread.Name && workflow.Name != wfe.Spec.WorkflowName { + return types.NewErrHttp(http.StatusForbidden, "task run does not belong to the thread") + } + var thread v1.Thread if err := req.Get(&thread, wfe.Status.ThreadName); err != nil { return err @@ -67,7 +71,7 @@ func (t *TaskHandler) Abort(req api.Context) error { func (t *TaskHandler) Events(req api.Context) error { var taskRunID = req.PathValue("run_id") - workflow, err := t.getTask(req) + workflow, thread, err := t.getTask(req) if err != nil { return err } @@ -91,7 +95,7 @@ func (t *TaskHandler) Events(req api.Context) error { return err } - if wfe.Spec.UserID != req.User.GetUID() && workflow.Name != wfe.Spec.WorkflowName { + if wfe.Spec.ThreadName != thread.Name && workflow.Name != wfe.Spec.WorkflowName { return types.NewErrHttp(http.StatusForbidden, "task run does not belong to the user") } @@ -113,8 +117,61 @@ func editorWFE(req api.Context, workflowName string) string { return name.SafeHashConcatName(system.ThreadPrefix, workflowName, req.User.GetUID()) } +func (t *TaskHandler) AbortRun(req api.Context) error { + workflow, _, err := t.getTask(req) + if err != nil { + return err + } + + var ( + wfe v1.WorkflowExecution + runID = req.PathValue("run_id") + ) + + if runID == "editor" { + runID = editorWFE(req, workflow.Name) + } + + if err := req.Get(&wfe, runID); err != nil { + return err + } + + if wfe.Spec.WorkflowName != workflow.Name { + return types.NewErrNotFound("task run not found") + } + + var thread v1.Thread + if err := req.Get(&thread, wfe.Status.ThreadName); err != nil { + return err + } + + return abortThread(req, &thread) +} + +func (t *TaskHandler) GetRun(req api.Context) error { + workflow, _, err := t.getTask(req) + if err != nil { + return err + } + + var ( + wfe v1.WorkflowExecution + runID = req.PathValue("run_id") + ) + if runID == "editor" { + runID = editorWFE(req, workflow.Name) + } + if err := req.Get(&wfe, runID); err != nil { + return err + } + if wfe.Spec.WorkflowName != workflow.Name { + return types.NewErrNotFound("task run not found") + } + return req.Write(convertTaskRun(workflow, &wfe)) +} + func (t *TaskHandler) DeleteRun(req api.Context) error { - workflow, err := t.getTask(req) + workflow, userThread, err := t.getTask(req) if err != nil { return err } @@ -126,7 +183,7 @@ func (t *TaskHandler) DeleteRun(req api.Context) error { return err } - if wfe.Spec.UserID != req.User.GetUID() || wfe.Spec.WorkflowName != workflow.Name { + if wfe.Spec.ThreadName != userThread.Name || wfe.Spec.WorkflowName != workflow.Name { return types.NewErrHttp(http.StatusForbidden, "task run does not belong to the user") } @@ -134,7 +191,7 @@ func (t *TaskHandler) DeleteRun(req api.Context) error { } func (t *TaskHandler) ListRuns(req api.Context) error { - workflow, err := t.getTask(req) + workflow, userThread, err := t.getTask(req) if err != nil { return err } @@ -142,7 +199,7 @@ func (t *TaskHandler) ListRuns(req api.Context) error { var wfeList v1.WorkflowExecutionList if err := req.List(&wfeList, kclient.MatchingFields{ "spec.workflowName": workflow.Name, - "spec.userID": req.User.GetUID(), + "spec.threadName": userThread.Name, }); err != nil { return err } @@ -180,7 +237,7 @@ func (t *TaskHandler) Run(req api.Context) error { input = nil } - workflow, err := t.getTask(req) + workflow, userThread, err := t.getTask(req) if err != nil { return err } @@ -195,7 +252,7 @@ func (t *TaskHandler) Run(req api.Context) error { }, Spec: v1.WorkflowExecutionSpec{ Input: string(input), - UserID: req.User.GetUID(), + ThreadName: userThread.Name, WorkflowName: workflow.Name, }, Status: v1.WorkflowExecutionStatus{}, @@ -206,7 +263,7 @@ func (t *TaskHandler) Run(req api.Context) error { } else { resp, err := t.invoker.Workflow(req.Context(), req.Storage, workflow, string(input), invoke.WorkflowOptions{ WorkflowExecutionName: editorWFE(req, workflow.Name), - UserID: req.User.GetUID(), + OwningThreadName: userThread.Name, StepID: stepID, }) if err != nil { @@ -219,15 +276,22 @@ func (t *TaskHandler) Run(req api.Context) error { } func convertTaskRun(workflow *v1.Workflow, wfe *v1.WorkflowExecution) types.TaskRun { + var endTime *types.Time + if wfe.Status.EndTime != nil { + endTime = types.NewTime(wfe.Status.EndTime.Time) + } return types.TaskRun{ - Metadata: MetadataFrom(wfe), - TaskID: workflow.Name, - Task: convertTask(*workflow, nil).TaskManifest, + Metadata: MetadataFrom(wfe), + TaskID: workflow.Name, + Input: wfe.Spec.Input, + Task: convertTaskManifest(wfe.Status.WorkflowManifest), + StartTime: types.NewTime(wfe.CreationTimestamp.Time), + EndTime: endTime, } } func (t *TaskHandler) Delete(req api.Context) error { - workflow, err := t.getTask(req) + workflow, _, err := t.getTask(req) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -239,12 +303,12 @@ func (t *TaskHandler) Delete(req api.Context) error { } func (t *TaskHandler) Update(req api.Context) error { - workflow, err := t.getTask(req) + workflow, _, err := t.getTask(req) if err != nil { return err } - _, manifest, task, err := t.getAssistantAndManifestFromRequest(req) + _, _, manifest, task, err := t.getAssistantThreadAndManifestFromRequest(req) if err != nil { return err } @@ -345,8 +409,8 @@ func (t *TaskHandler) updateEmail(req api.Context, workflow *v1.Workflow, task t EmailReceiverManifest: types.EmailReceiverManifest{ User: workflow.Spec.Manifest.Alias, Workflow: workflow.Name, - UserID: req.User.GetUID(), }, + ThreadName: workflow.Spec.ThreadName, }, } if err := req.Create(&email); err != nil { @@ -383,8 +447,8 @@ func (t *TaskHandler) updateWebhook(req api.Context, workflow *v1.Workflow, task WebhookManifest: types.WebhookManifest{ Alias: workflow.Spec.Manifest.Alias, Workflow: workflow.Name, - UserID: req.User.GetUID(), }, + ThreadName: workflow.Spec.ThreadName, }, } if err := req.Create(&webhook); err != nil { @@ -420,9 +484,9 @@ func (t *TaskHandler) updateCron(req api.Context, workflow *v1.Workflow, task ty Spec: v1.CronJobSpec{ CronJobManifest: types.CronJobManifest{ Workflow: workflow.Name, - UserID: req.User.GetUID(), TaskSchedule: task.Schedule, }, + ThreadName: workflow.Spec.ThreadName, }, } if err := req.Create(&cron); err != nil { @@ -441,33 +505,49 @@ func (t *TaskHandler) updateCron(req api.Context, workflow *v1.Workflow, task ty return nil } -func (t *TaskHandler) getAssistantAndManifestFromRequest(req api.Context) (*v1.Agent, types.WorkflowManifest, types.TaskManifest, error) { +func (t *TaskHandler) getAssistantThreadAndManifestFromRequest(req api.Context) (*v1.Agent, *v1.Thread, types.WorkflowManifest, types.TaskManifest, error) { assistantID := req.PathValue("assistant_id") assistant, err := getAssistant(req, assistantID) if err != nil { - return nil, types.WorkflowManifest{}, types.TaskManifest{}, err + return nil, nil, types.WorkflowManifest{}, types.TaskManifest{}, err } thread, err := getUserThread(req, assistantID) if err != nil { - return nil, types.WorkflowManifest{}, types.TaskManifest{}, err + return nil, nil, types.WorkflowManifest{}, types.TaskManifest{}, err } var manifest types.TaskManifest if err := req.Read(&manifest); err != nil { - return nil, types.WorkflowManifest{}, types.TaskManifest{}, err + return nil, nil, types.WorkflowManifest{}, types.TaskManifest{}, err } - return assistant, toWorkflowManifest(assistant, thread, manifest), manifest, nil + return assistant, thread, toWorkflowManifest(assistant, thread, manifest), manifest, nil } func (t *TaskHandler) Create(req api.Context) error { - assistant, workflowManifest, taskManifest, err := t.getAssistantAndManifestFromRequest(req) + _, thread, workflowManifest, taskManifest, err := t.getAssistantThreadAndManifestFromRequest(req) if err != nil { return err } + var workspaces v1.WorkspaceList + err = req.List(&workspaces, kclient.MatchingFields{ + "status.workspaceID": thread.Status.WorkspaceID, + }) + if err != nil { + return err + } + + if len(workspaces.Items) == 0 { + return types.NewErrBadRequest("no workspace found for the thread") + } + + if len(workspaces.Items) != 1 { + return types.NewErrBadRequest("multiple workspaces found for the thread") + } + workflowManifest.Alias, err = randomtoken.Generate() if err != nil { return err @@ -480,9 +560,10 @@ func (t *TaskHandler) Create(req api.Context) error { Namespace: req.Namespace(), }, Spec: v1.WorkflowSpec{ - AgentName: assistant.Name, - UserID: req.User.GetUID(), - Manifest: workflowManifest, + ThreadName: thread.Name, + Manifest: workflowManifest, + KnowledgeSetNames: thread.Status.KnowledgeSetNames, + WorkspaceName: workspaces.Items[0].Name, }, } @@ -504,6 +585,11 @@ func toWorkflowManifest(agent *v1.Agent, thread *v1.Thread, manifest types.TaskM AgentManifest: agent.Spec.Manifest, } + workflowManifest.AgentManifest.Env = append(workflowManifest.AgentManifest.Env, types.EnvVar{ + Name: "DATABASE_WORKSPACE_ID", + Value: thread.Status.WorkspaceID, + }) + for _, tool := range thread.Spec.Manifest.Tools { if !slices.Contains(workflowManifest.Tools, tool) { workflowManifest.Tools = append(workflowManifest.Tools, tool) @@ -545,7 +631,7 @@ func toWorkflowIf(ifStep *types.TaskIf) *types.If { } func (t *TaskHandler) Get(req api.Context) error { - task, err := t.getTask(req) + task, _, err := t.getTask(req) if err != nil { return err } @@ -572,35 +658,37 @@ func (t *TaskHandler) Get(req api.Context) error { })) } -func (t *TaskHandler) getTask(req api.Context) (*v1.Workflow, error) { +func (t *TaskHandler) getTask(req api.Context) (*v1.Workflow, *v1.Thread, error) { assistantID := req.PathValue("assistant_id") var workflow v1.Workflow if err := req.Get(&workflow, req.PathValue("id")); err != nil { - return nil, err + return nil, nil, err } - assistant, err := getAssistant(req, assistantID) + thread, err := getUserThread(req, assistantID) if err != nil { - return nil, err + return nil, nil, err } - if workflow.Spec.AgentName != assistant.Name || workflow.Spec.UserID != req.User.GetUID() { - return nil, types.NewErrHttp(http.StatusForbidden, "task does not belong to the user") + if workflow.Spec.ThreadName != thread.Name { + return nil, nil, types.NewErrHttp(http.StatusForbidden, "task does not belong to the thread") } - return &workflow, nil + return &workflow, thread, nil } func (t *TaskHandler) List(req api.Context) error { - assistant, err := getAssistant(req, req.PathValue("assistant_id")) + assistantID := req.PathValue("assistant_id") + + thread, err := getUserThread(req, assistantID) if err != nil { return err } var crons v1.CronJobList if err := req.List(&crons, kclient.MatchingFields{ - "spec.userID": req.User.GetUID(), + "spec.threadName": thread.Name, }); err != nil { return err } @@ -612,7 +700,7 @@ func (t *TaskHandler) List(req api.Context) error { var webhooks v1.WebhookList if err := req.List(&webhooks, kclient.MatchingFields{ - "spec.userID": req.User.GetUID(), + "spec.threadName": thread.Name, }); err != nil { return err } @@ -624,7 +712,7 @@ func (t *TaskHandler) List(req api.Context) error { var emailReceivers v1.EmailReceiverList if err := req.List(&emailReceivers, kclient.MatchingFields{ - "spec.userID": req.User.GetUID(), + "spec.threadName": thread.Name, }); err != nil { return err } @@ -636,8 +724,7 @@ func (t *TaskHandler) List(req api.Context) error { var workflows v1.WorkflowList if err := req.List(&workflows, kclient.MatchingFields{ - "spec.agentName": assistant.Name, - "spec.userID": req.User.GetUID(), + "spec.threadName": thread.Name, }); err != nil { return err } @@ -655,16 +742,23 @@ func (t *TaskHandler) List(req api.Context) error { return req.Write(taskList) } +func convertTaskManifest(manifest *types.WorkflowManifest) types.TaskManifest { + if manifest == nil { + return types.TaskManifest{} + } + return types.TaskManifest{ + Name: manifest.Name, + Description: manifest.Description, + Steps: toTaskSteps(manifest.Steps), + } +} + func convertTask(workflow v1.Workflow, trigger *triggers) types.Task { task := types.Task{ - Metadata: MetadataFrom(&workflow), - TaskManifest: types.TaskManifest{ - Name: workflow.Spec.Manifest.Name, - Description: workflow.Spec.Manifest.Description, - }, - Alias: workflow.Spec.Manifest.Alias, + Metadata: MetadataFrom(&workflow), + TaskManifest: convertTaskManifest(&workflow.Spec.Manifest), + Alias: workflow.Namespace + "/" + workflow.Spec.Manifest.Alias, } - task.Steps = toTaskSteps(workflow.Spec.Manifest.Steps) if trigger != nil && trigger.CronJob != nil && trigger.CronJob.Name != "" { task.Schedule = trigger.CronJob.Spec.TaskSchedule } diff --git a/pkg/api/handlers/threads.go b/pkg/api/handlers/threads.go index 52ffdd5a..83f512d9 100644 --- a/pkg/api/handlers/threads.go +++ b/pkg/api/handlers/threads.go @@ -13,6 +13,7 @@ import ( "github.com/otto8-ai/otto8/pkg/events" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kclient "sigs.k8s.io/controller-runtime/pkg/client" ) const DefaultMaxUserThreadTools = 5 @@ -55,6 +56,7 @@ func convertThread(thread v1.Thread) types.Thread { AgentAlias: thread.Spec.AgentAlias, UserID: thread.Spec.UserUID, Abort: thread.Spec.Abort, + Env: thread.Spec.Env, } } @@ -75,6 +77,71 @@ func (a *ThreadHandler) Abort(req api.Context) error { return req.Write(thread) } +func (a *ThreadHandler) WorkflowExecutions(req api.Context) error { + var ( + id = req.PathValue("id") + workflowID = req.PathValue("workflow_id") + ) + + var workflowExecutions v1.WorkflowExecutionList + if err := req.List(&workflowExecutions, kclient.MatchingFields{ + "spec.threadName": id, + "spec.workflowName": workflowID, + }); err != nil { + return err + } + + var resp types.WorkflowExecutionList + for _, we := range workflowExecutions.Items { + resp.Items = append(resp.Items, convertWorkflowExecution(we)) + } + + return req.Write(resp) +} + +func convertWorkflowExecution(we v1.WorkflowExecution) types.WorkflowExecution { + var w types.WorkflowManifest + if we.Status.WorkflowManifest != nil { + w = *we.Status.WorkflowManifest + } + var endTime *types.Time + if we.Status.EndTime != nil { + endTime = types.NewTime(we.Status.EndTime.Time) + } + return types.WorkflowExecution{ + Metadata: MetadataFrom(&we), + Workflow: w, + Input: we.Spec.Input, + Error: we.Status.Error, + StartTime: *types.NewTime(we.CreationTimestamp.Time), + EndTime: endTime, + } +} + +func (a *ThreadHandler) Workflows(req api.Context) error { + var ( + id = req.PathValue("id") + ) + + var workflows v1.WorkflowList + if err := req.List(&workflows, kclient.MatchingFields{ + "spec.threadName": id, + }); err != nil { + return err + } + + var resp types.WorkflowList + for _, workflow := range workflows.Items { + wf, err := convertWorkflow(workflow, "", req.APIBaseURL) + if err != nil { + return err + } + resp.Items = append(resp.Items, *wf) + } + + return req.Write(resp) +} + func (a *ThreadHandler) Events(req api.Context) error { var ( id = req.PathValue("id") diff --git a/pkg/api/handlers/webhooks.go b/pkg/api/handlers/webhooks.go index 07a5bbfc..2a6cfa65 100644 --- a/pkg/api/handlers/webhooks.go +++ b/pkg/api/handlers/webhooks.go @@ -4,6 +4,7 @@ import ( "crypto/hmac" "crypto/sha256" "encoding/hex" + "encoding/json" "fmt" "net/http" "net/textproto" @@ -233,24 +234,28 @@ func (a *WebhookHandler) Execute(req api.Context) error { } } - var input strings.Builder - _, _ = input.WriteString("You are being called from a webhook.\n\n") - if len(body) > 0 { - _, _ = input.WriteString("Here is the payload of the webhook:\n") - _, _ = input.Write(body) + var input struct { + Type string `json:"type"` + Payload string `json:"payload"` + Headers map[string]string `json:"headers"` } - _, _ = input.WriteString("\nHere are the headers of the webhook:\n") + input.Type = "webhook" + input.Payload = string(body) + input.Headers = make(map[string]string) + allHeaders := slices.Contains(webhook.Spec.Headers, "*") for k := range req.Request.Header { if !allHeaders && !slices.Contains(webhook.Spec.Headers, k) { continue } - input.WriteString("\n") - input.WriteString(k) - input.WriteString(": ") - input.WriteString(req.Request.Header.Get(k)) + input.Headers[k] = req.Request.Header.Get(k) + } + + inputText, err := json.Marshal(input) + if err != nil { + return fmt.Errorf("failed to marshal input: %w", err) } var workflow v1.Workflow @@ -260,15 +265,14 @@ func (a *WebhookHandler) Execute(req api.Context) error { if err = req.Create(&v1.WorkflowExecution{ ObjectMeta: metav1.ObjectMeta{ - // The name here is the sha256 hash of the body to handle multiple executions of the same webhook. - // That is, if the webhook is called twice with the same body, it will only be executed once. - Name: system.WorkflowExecutionPrefix + fmt.Sprintf("%x", sha256.Sum256(body)), - Namespace: req.Namespace(), + GenerateName: system.WorkflowExecutionPrefix, + Namespace: req.Namespace(), }, Spec: v1.WorkflowExecutionSpec{ WorkflowName: workflow.Name, WebhookName: webhook.Name, - Input: input.String(), + ThreadName: webhook.Spec.ThreadName, + Input: string(inputText), }, }); err != nil && !apierrors.IsAlreadyExists(err) { return err diff --git a/pkg/api/handlers/workflows.go b/pkg/api/handlers/workflows.go index 2599e1d1..d7e02c0c 100644 --- a/pkg/api/handlers/workflows.go +++ b/pkg/api/handlers/workflows.go @@ -16,6 +16,7 @@ import ( "github.com/otto8-ai/otto8/pkg/system" "github.com/otto8-ai/otto8/pkg/wait" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kclient "sigs.k8s.io/controller-runtime/pkg/client" ) var log = mvl.Package() @@ -327,6 +328,26 @@ func (a *WorkflowHandler) EnsureCredentialForKnowledgeSource(req api.Context) er return req.WriteCreated(resp) } +func (a *WorkflowHandler) WorkflowExecutions(req api.Context) error { + var ( + id = req.PathValue("id") + ) + + var wfes v1.WorkflowExecutionList + if err := req.List(&wfes, kclient.MatchingFields{ + "spec.workflowName": id, + }); err != nil { + return err + } + + var resp types.WorkflowExecutionList + for _, we := range wfes.Items { + resp.Items = append(resp.Items, convertWorkflowExecution(we)) + } + + return req.Write(resp) +} + func (a *WorkflowHandler) Script(req api.Context) error { var ( id = req.Request.PathValue("id") diff --git a/pkg/api/router/router.go b/pkg/api/router/router.go index e903e8f1..8ab8d20e 100644 --- a/pkg/api/router/router.go +++ b/pkg/api/router/router.go @@ -28,6 +28,7 @@ func Router(services *services.Services) (http.Handler, error) { emailreceiver := handlers.NewEmailReceiverHandler(services.EmailServerName) defaultModelAliases := handlers.NewDefaultModelAliasHandler() version := handlers.NewVersionHandler(services.EmailServerName) + tables := handlers.NewTableHandler(services.GPTClient) // Version mux.HandleFunc("GET /api/version", version.GetVersion) @@ -72,12 +73,18 @@ func Router(services *services.Services) (http.Handler, error) { mux.HandleFunc("DELETE /api/assistants/{assistant_id}/tasks/{id}", tasks.Delete) mux.HandleFunc("POST /api/assistants/{assistant_id}/tasks/{id}/run", tasks.Run) mux.HandleFunc("GET /api/assistants/{assistant_id}/tasks/{id}/runs", tasks.ListRuns) + mux.HandleFunc("GET /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}", tasks.GetRun) + mux.HandleFunc("POST /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}/abort", tasks.AbortRun) mux.HandleFunc("DELETE /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}", tasks.DeleteRun) mux.HandleFunc("GET /api/assistants/{assistant_id}/tasks/{id}/events", tasks.Events) mux.HandleFunc("POST /api/assistants/{assistant_id}/tasks/{id}/events", tasks.Abort) mux.HandleFunc("GET /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}/events", tasks.Events) mux.HandleFunc("POST /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}/events", tasks.Abort) + // Tables + mux.HandleFunc("GET /api/assistants/{assistant_id}/tables", tables.ListTables) + mux.HandleFunc("GET /api/assistants/{assistant_id}/tables/{table_name}/rows", tables.GetRows) + // Agent files mux.HandleFunc("GET /api/agents/{id}/files", agents.ListFiles) mux.HandleFunc("POST /api/agents/{id}/files/{file}", agents.UploadFile) @@ -104,6 +111,7 @@ func Router(services *services.Services) (http.Handler, error) { // Workflows mux.HandleFunc("GET /api/workflows", workflows.List) mux.HandleFunc("GET /api/workflows/{id}", workflows.ByID) + mux.HandleFunc("GET /api/workflows/{id}/executions", workflows.WorkflowExecutions) mux.HandleFunc("GET /api/workflows/{id}/script", workflows.Script) mux.HandleFunc("GET /api/workflows/{id}/script.gpt", workflows.Script) mux.HandleFunc("GET /api/workflows/{id}/script/tool.gpt", workflows.Script) @@ -146,6 +154,8 @@ func Router(services *services.Services) (http.Handler, error) { mux.HandleFunc("GET /api/threads/{id}", threads.ByID) mux.HandleFunc("POST /api/threads/{id}/abort", threads.Abort) mux.HandleFunc("GET /api/threads/{id}/events", threads.Events) + mux.HandleFunc("GET /api/threads/{id}/workflows", threads.Workflows) + mux.HandleFunc("GET /api/threads/{id}/workflows/{workflow_id}/executions", threads.WorkflowExecutions) mux.HandleFunc("DELETE /api/threads/{id}", threads.Delete) mux.HandleFunc("PUT /api/threads/{id}", threads.Update) mux.HandleFunc("GET /api/agents/{agent}/threads", threads.List) diff --git a/pkg/cli/tools_register.go b/pkg/cli/tools_register.go index e0dce98a..0a4c6970 100644 --- a/pkg/cli/tools_register.go +++ b/pkg/cli/tools_register.go @@ -9,7 +9,7 @@ import ( type ToolRegister struct { root *Otto8 - Quiet bool `usage:"Only print IDs of created tool references:"q"` + Quiet bool `usage:"Only print IDs of created tool references:" short:"q"` } func (l *ToolRegister) Customize(cmd *cobra.Command) { diff --git a/pkg/controller/data/otto.yaml b/pkg/controller/data/otto.yaml index bb7d625e..31c15ea5 100644 --- a/pkg/controller/data/otto.yaml +++ b/pkg/controller/data/otto.yaml @@ -22,6 +22,8 @@ spec: tools: - workspace-files - time + - database + - knowledge defaultThreadTools: - github-bundle - google-docs-bundle diff --git a/pkg/controller/handlers/cronjob/cronjob.go b/pkg/controller/handlers/cronjob/cronjob.go index ca20e668..700fd9b3 100644 --- a/pkg/controller/handlers/cronjob/cronjob.go +++ b/pkg/controller/handlers/cronjob/cronjob.go @@ -53,7 +53,7 @@ func (h *Handler) Run(req router.Request, resp router.Response) error { WorkflowName: workflow.Name, Input: cj.Spec.Input, CronJobName: cj.Name, - UserID: cj.Spec.UserID, + ThreadName: cj.Spec.ThreadName, }, }, ); err != nil { diff --git a/pkg/controller/handlers/workflow/files.go b/pkg/controller/handlers/workflow/files.go index cdbc5e13..e925af84 100644 --- a/pkg/controller/handlers/workflow/files.go +++ b/pkg/controller/handlers/workflow/files.go @@ -17,6 +17,11 @@ func createWorkspace(ctx context.Context, c kclient.Client, workflow *v1.Workflo return nil } + if workflow.Spec.WorkspaceName != "" { + workflow.Status.WorkspaceName = workflow.Spec.WorkspaceName + return nil + } + ws := &v1.Workspace{ ObjectMeta: metav1.ObjectMeta{ Namespace: workflow.Namespace, @@ -40,6 +45,11 @@ func createKnowledgeSet(ctx context.Context, c kclient.Client, workflow *v1.Work return nil } + if len(workflow.Spec.KnowledgeSetNames) > 0 { + workflow.Status.KnowledgeSetNames = workflow.Spec.KnowledgeSetNames + return nil + } + ks := &v1.KnowledgeSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: workflow.Namespace, diff --git a/pkg/controller/handlers/workflow/workflow.go b/pkg/controller/handlers/workflow/workflow.go index 8c5a8772..adcf5354 100644 --- a/pkg/controller/handlers/workflow/workflow.go +++ b/pkg/controller/handlers/workflow/workflow.go @@ -3,9 +3,7 @@ package workflow import ( "github.com/otto8-ai/nah/pkg/router" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" - "github.com/otto8-ai/otto8/pkg/system" "k8s.io/apimachinery/pkg/api/equality" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func EnsureIDs(req router.Request, resp router.Response) error { @@ -17,26 +15,3 @@ func EnsureIDs(req router.Request, resp router.Response) error { } return nil } - -func WorkspaceObjects(req router.Request, _ router.Response) error { - workflow := req.Object.(*v1.Workflow) - if workflow.Status.WorkspaceName == "" { - ws := &v1.Workspace{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: req.Namespace, - GenerateName: system.WorkspacePrefix, - Finalizers: []string{v1.WorkspaceFinalizer}, - }, - Spec: v1.WorkspaceSpec{ - WorkflowName: workflow.Name, - }, - } - if err := req.Client.Create(req.Ctx, ws); err != nil { - return err - } - - workflow.Status.WorkspaceName = ws.Name - } - - return nil -} diff --git a/pkg/controller/handlers/workflowexecution/workflowexecution.go b/pkg/controller/handlers/workflowexecution/workflowexecution.go index 2e85ef5b..163d8fc5 100644 --- a/pkg/controller/handlers/workflowexecution/workflowexecution.go +++ b/pkg/controller/handlers/workflowexecution/workflowexecution.go @@ -142,3 +142,25 @@ func (h *Handler) newThread(ctx context.Context, c kclient.Client, wf *v1.Workfl return &thread, c.Create(ctx, &thread) } + +func (h *Handler) ReassignThread(req router.Request, _ router.Response) error { + var ( + wfe = req.Object.(*v1.WorkflowExecution) + ) + + if wfe.Status.ThreadName != "" || wfe.Spec.WorkflowName == "" { + return nil + } + + var we v1.Workflow + if err := req.Get(&we, wfe.Namespace, wfe.Spec.WorkflowName); err != nil { + return kclient.IgnoreNotFound(err) + } + + if we.Spec.ThreadName != "" { + wfe.Spec.ThreadName = we.Spec.ThreadName + return req.Client.Update(req.Ctx, wfe) + } + + return nil +} diff --git a/pkg/controller/routes.go b/pkg/controller/routes.go index 62972008..6dfdc971 100644 --- a/pkg/controller/routes.go +++ b/pkg/controller/routes.go @@ -49,14 +49,15 @@ func (c *Controller) setupRoutes() error { root.Type(&v1.Thread{}).HandlerFunc(threads.WorkflowState) // Workflows - root.Type(&v1.Workflow{}).HandlerFunc(workflow.WorkspaceObjects) root.Type(&v1.Workflow{}).HandlerFunc(workflow.EnsureIDs) root.Type(&v1.Workflow{}).HandlerFunc(workflow.CreateWorkspaceAndKnowledgeSet) root.Type(&v1.Workflow{}).HandlerFunc(workflow.BackPopulateAuthStatus) + root.Type(&v1.Workflow{}).HandlerFunc(cleanup.Cleanup) // WorkflowExecutions root.Type(&v1.WorkflowExecution{}).HandlerFunc(cleanup.Cleanup) root.Type(&v1.WorkflowExecution{}).HandlerFunc(workflowExecution.Run) + root.Type(&v1.WorkflowExecution{}).HandlerFunc(workflowExecution.ReassignThread) // Agents root.Type(&v1.Agent{}).HandlerFunc(agents.CreateWorkspaceAndKnowledgeSet) @@ -75,6 +76,7 @@ func (c *Controller) setupRoutes() error { // Reference root.Type(&v1.Agent{}).HandlerFunc(alias.AssignAlias) + root.Type(&v1.EmailReceiver{}).HandlerFunc(alias.AssignAlias) root.Type(&v1.Workflow{}).HandlerFunc(alias.AssignAlias) root.Type(&v1.Model{}).HandlerFunc(alias.AssignAlias) root.Type(&v1.DefaultModelAlias{}).HandlerFunc(alias.AssignAlias) diff --git a/pkg/events/events.go b/pkg/events/events.go index 8d207686..8d6737cb 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -260,10 +260,16 @@ func (e *Emitter) printRun(ctx context.Context, state *printState, run v1.Run, r return err } step, _ := types.FindStep(wfe.Status.WorkflowManifest, run.Spec.WorkflowStepID) + if run.Spec.WorkflowStepID != "" && step == nil { + step = &types.Step{ + ID: run.Spec.WorkflowStepID, + } + } result <- types.Progress{ - RunID: run.Name, - Time: types.NewTime(wfe.CreationTimestamp.Time), - Step: step, + RunID: run.Name, + ParentRunID: run.Spec.PreviousRunName, + Time: types.NewTime(wfe.CreationTimestamp.Time), + Step: step, } state.lastStepPrinted = run.Spec.WorkflowStepID } @@ -707,6 +713,7 @@ func (e *Emitter) printCall(ctx context.Context, namespace, runID string, prg *g Name: tool.Name, Description: tool.Description, Input: subCall.Input, + Output: getToolCallOutput(frames, callID), Metadata: tool.MetaData, } } else { @@ -743,6 +750,14 @@ func (e *Emitter) printCall(ctx context.Context, namespace, runID string, prg *g return nil } +func getToolCallOutput(frames gptscript.CallFrames, callID string) string { + out := frames[callID].Output + if len(out) == 1 { + return out[0].Content + } + return "" +} + func isSubCallTargetIDs(tool gptscript.Tool) (agentID string, workflowID string) { for _, line := range strings.Split(tool.Instructions, "\n") { suffix, ok := strings.CutPrefix(line, "#OTTO8_SUBCALL: TARGET: ") diff --git a/pkg/invoke/invoker.go b/pkg/invoke/invoker.go index 55a3a260..94cc5944 100644 --- a/pkg/invoke/invoker.go +++ b/pkg/invoke/invoker.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "maps" + "slices" "strings" "sync" "time" @@ -104,6 +105,10 @@ func (r *Response) Result(ctx context.Context) (TaskResult, error) { return TaskResult{}, err } + if runState.Name != r.Run.Name { + panic("runState doesnt match") + } + if runState.Spec.Error != "" { return TaskResult{}, ErrToolResult{ Message: runState.Spec.Error, @@ -233,7 +238,7 @@ func CreateThreadForAgent(ctx context.Context, c kclient.WithWatch, agent *v1.Ag return &thread, c.Create(ctx, &thread) } -func (i *Invoker) updateThreadFields(ctx context.Context, c kclient.WithWatch, agent *v1.Agent, thread *v1.Thread, opt Options) error { +func (i *Invoker) updateThreadFields(ctx context.Context, c kclient.WithWatch, agent *v1.Agent, thread *v1.Thread, extraEnv []string, opt Options) error { var updated bool if opt.AgentAlias != "" && thread.Spec.AgentAlias != opt.AgentAlias { thread.Spec.AgentAlias = opt.AgentAlias @@ -243,6 +248,10 @@ func (i *Invoker) updateThreadFields(ctx context.Context, c kclient.WithWatch, a thread.Spec.AgentName = agent.Name updated = true } + if !slices.Equal(thread.Spec.Env, extraEnv) { + thread.Spec.Env = extraEnv + updated = true + } if updated { return c.Status().Update(ctx, thread) } @@ -262,10 +271,6 @@ func (i *Invoker) Agent(ctx context.Context, c kclient.WithWatch, agent *v1.Agen return nil, err } - if err := i.updateThreadFields(ctx, c, agent, thread, opt); err != nil { - return nil, err - } - credContextIDs := []string{thread.Name} if opt.ThreadCredentialScope != nil && !*opt.ThreadCredentialScope { credContextIDs = nil @@ -284,6 +289,10 @@ func (i *Invoker) Agent(ctx context.Context, c kclient.WithWatch, agent *v1.Agen return nil, err } + if err := i.updateThreadFields(ctx, c, agent, thread, extraEnv, opt); err != nil { + return nil, err + } + if len(agent.Spec.Manifest.Params) == 0 { data := map[string]any{} if err := json.Unmarshal([]byte(input), &data); err == nil { diff --git a/pkg/invoke/step.go b/pkg/invoke/step.go index 092b7f7a..411f82de 100644 --- a/pkg/invoke/step.go +++ b/pkg/invoke/step.go @@ -3,6 +3,7 @@ package invoke import ( "context" "encoding/json" + "fmt" "github.com/otto8-ai/nah/pkg/router" "github.com/otto8-ai/otto8/apiclient/types" @@ -57,6 +58,9 @@ func (i *Invoker) toAgentFromStep(ctx context.Context, c kclient.Client, step *v if err := c.Get(ctx, router.Key(step.Namespace, wfe.Spec.WorkflowName), &wf); err != nil { return v1.Agent{}, err } + if wfe.Status.WorkflowManifest == nil { + return v1.Agent{}, fmt.Errorf("workflow execution %s has no manifest", wfe.Name) + } return i.toAgent(ctx, c, &wf, step, wfe.Spec.Input, *wfe.Status.WorkflowManifest) } diff --git a/pkg/invoke/workflow.go b/pkg/invoke/workflow.go index e0610ce2..8b5d42a1 100644 --- a/pkg/invoke/workflow.go +++ b/pkg/invoke/workflow.go @@ -20,7 +20,7 @@ import ( type WorkflowOptions struct { ThreadName string StepID string - UserID string + OwningThreadName string WorkflowExecutionName string Events bool } @@ -33,7 +33,7 @@ func (i *Invoker) startWorkflow(ctx context.Context, c kclient.WithWatch, wf *v1 Namespace: wf.Namespace, }, Spec: v1.WorkflowExecutionSpec{ - UserID: opt.UserID, + ThreadName: opt.OwningThreadName, Input: input, WorkflowName: wf.Name, }, @@ -183,7 +183,7 @@ func (i *Invoker) rerunThread(ctx context.Context, c kclient.WithWatch, wf *v1.W return nil, nil, err } - if input != "" && wfe.Spec.Input != input { + if wfe.Spec.Input != input { if stepID == "" { // If input doesn't match, delete all steps and rerun stepID = "*" diff --git a/pkg/render/render.go b/pkg/render/render.go index f51d7894..44a96ba6 100644 --- a/pkg/render/render.go +++ b/pkg/render/render.go @@ -11,7 +11,6 @@ import ( "github.com/gptscript-ai/go-gptscript" "github.com/otto8-ai/otto8/apiclient/types" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" - "github.com/otto8-ai/otto8/pkg/system" apierror "k8s.io/apimachinery/pkg/api/errors" kclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -25,6 +24,10 @@ type AgentOptions struct { } func Agent(ctx context.Context, db kclient.Client, agent *v1.Agent, oauthServerURL string, opts AgentOptions) (_ []gptscript.ToolDef, extraEnv []string, _ error) { + defer func() { + sort.Strings(extraEnv) + }() + mainTool := gptscript.ToolDef{ Name: agent.Spec.Manifest.Name, Description: agent.Spec.Manifest.Description, @@ -40,6 +43,22 @@ func Agent(ctx context.Context, db kclient.Client, agent *v1.Agent, oauthServerU extraEnv = append(extraEnv, agent.Spec.Env...) + for _, env := range agent.Spec.Manifest.Env { + if env.Name == "" { + continue + } + if !validEnv.MatchString(env.Name) { + return nil, nil, fmt.Errorf("invalid env var %s, must match %s", env.Name, validEnv.String()) + } + if env.Value == "" { + agent.Spec.Credentials = append(agent.Spec.Credentials, + fmt.Sprintf(`github.com/gptscript-ai/credential as %s with "%s" as message and "%s" as env and %s as field`, + env.Name, env.Description, env.Name, env.Name)) + } else { + extraEnv = append(extraEnv, fmt.Sprintf("%s=%s", env.Name, env.Value)) + } + } + if mainTool.Instructions == "" { mainTool.Instructions = v1.DefaultAgentPrompt } @@ -63,6 +82,14 @@ func Agent(ctx context.Context, db kclient.Client, agent *v1.Agent, oauthServerU mainTool.Tools = append(mainTool.Tools, name) } + for _, tool := range agent.Spec.SystemTools { + name, err := ResolveToolReference(ctx, db, "", agent.Namespace, tool) + if err != nil { + return nil, nil, err + } + mainTool.Tools = append(mainTool.Tools, name) + } + mainTool, otherTools, err := addAgentTools(ctx, db, agent, mainTool, otherTools) if err != nil { return nil, nil, err @@ -73,7 +100,7 @@ func Agent(ctx context.Context, db kclient.Client, agent *v1.Agent, oauthServerU return nil, nil, err } - mainTool, otherTools, err = addKnowledgeTools(ctx, db, agent, opts.Thread, mainTool, otherTools) + extraEnv, err = addKnowledgeTools(ctx, db, agent, opts.Thread, extraEnv) if err != nil { return nil, nil, err } @@ -130,7 +157,7 @@ func OAuthAppEnv(ctx context.Context, db kclient.Client, oauthAppNames []string, return extraEnv, nil } -func addKnowledgeTools(ctx context.Context, db kclient.Client, agent *v1.Agent, thread *v1.Thread, mainTool gptscript.ToolDef, otherTools []gptscript.ToolDef) (_ gptscript.ToolDef, _ []gptscript.ToolDef, _ error) { +func addKnowledgeTools(ctx context.Context, db kclient.Client, agent *v1.Agent, thread *v1.Thread, extraEnv []string) ([]string, error) { var knowledgeSetNames []string knowledgeSetNames = append(knowledgeSetNames, agent.Status.KnowledgeSetNames...) if thread != nil { @@ -138,25 +165,15 @@ func addKnowledgeTools(ctx context.Context, db kclient.Client, agent *v1.Agent, } if len(knowledgeSetNames) == 0 { - return mainTool, otherTools, nil + return extraEnv, nil } - knowledgeTool, err := ResolveToolReference(ctx, db, types.ToolReferenceTypeSystem, agent.Namespace, system.KnowledgeRetrievalTool) - if err != nil { - return mainTool, nil, err - } - - resultFormatter, err := ResolveToolReference(ctx, db, types.ToolReferenceTypeSystem, agent.Namespace, system.ResultFormatterTool) - if err != nil { - return mainTool, nil, err - } - - for i, knowledgeSetName := range knowledgeSetNames { + for _, knowledgeSetName := range knowledgeSetNames { var ks v1.KnowledgeSet if err := db.Get(ctx, kclient.ObjectKey{Namespace: agent.Namespace, Name: knowledgeSetName}, &ks); apierror.IsNotFound(err) { continue } else if err != nil { - return mainTool, nil, err + return nil, err } if !ks.Status.HasContent { @@ -175,44 +192,13 @@ func addKnowledgeTools(ctx context.Context, db kclient.Client, agent *v1.Agent, dataDescription = "No data description available" } - toolName := "knowledge_set_query" - if i > 0 { - toolName = fmt.Sprintf("knowledge_set_%d_query", i) - } - - tool := gptscript.ToolDef{ - Name: toolName, - Description: fmt.Sprintf("Obtain search result from the knowledge set known as %s", ks.Name), - Instructions: "#!sys.echo", - Arguments: gptscript.ObjectSchema( - "query", "A search query that will be evaluated against the knowledge set"), - OutputFilters: []string{ - knowledgeTool + fmt.Sprintf(fmt.Sprintf(" with %s/%s as datasets and ${query} as query", ks.Namespace, ks.Name)), - resultFormatter, - }, - } - - contentTool := gptscript.ToolDef{ - Name: toolName + "_context", - Instructions: strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(` -#!sys.echo -# START INSTRUCTIONS: KNOWLEDGE SET: %n% - -Use the tool %k% to query knowledge set %n% to assist in Retrieval-Augmented Generation (RAG). -The knowledge set %n% contains data described as: - -%d% - -# END INSTRUCTIONS: KNOWLEDGE SET: %n% -`, "%k%", toolName), "%d%", dataDescription), "%n%", ks.Name), - } - - mainTool.Tools = append(mainTool.Tools, tool.Name) - mainTool.Context = append(mainTool.Context, contentTool.Name) - otherTools = append(otherTools, tool, contentTool) + return append(extraEnv, + fmt.Sprintf("KNOW_DATASETS=%s/%s", ks.Namespace, ks.Name), + fmt.Sprintf("KNOW_DATASET_DESCRIPTION=%s", dataDescription), + ), nil } - return mainTool, otherTools, nil + return extraEnv, nil } func addWorkflowTools(ctx context.Context, db kclient.Client, agent *v1.Agent, mainTool gptscript.ToolDef, otherTools []gptscript.ToolDef) (_ gptscript.ToolDef, _ []gptscript.ToolDef, _ error) { diff --git a/pkg/render/workflow.go b/pkg/render/workflow.go index 5ca9d841..11d735be 100644 --- a/pkg/render/workflow.go +++ b/pkg/render/workflow.go @@ -3,16 +3,20 @@ package render import ( "context" "fmt" + "regexp" "strings" "github.com/otto8-ai/nah/pkg/router" "github.com/otto8-ai/otto8/apiclient/types" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" + "github.com/otto8-ai/otto8/pkg/system" apierror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kclient "sigs.k8s.io/controller-runtime/pkg/client" ) +var validEnv = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$") + type WorkflowOptions struct { Step *types.Step ManifestOverride *types.WorkflowManifest @@ -58,7 +62,6 @@ func Workflow(ctx context.Context, c kclient.Client, wf *v1.Workflow, opts Workf }, Spec: v1.AgentSpec{ Manifest: agentManifest, - Credentials: wf.Spec.Manifest.Credentials, CredentialContextID: wf.Name, }, Status: v1.AgentStatus{ @@ -67,19 +70,6 @@ func Workflow(ctx context.Context, c kclient.Client, wf *v1.Workflow, opts Workf }, } - for _, env := range wf.Spec.Manifest.Env { - if env.Name == "" { - continue - } - if env.Value == "" { - agent.Spec.Credentials = append(agent.Spec.Credentials, - fmt.Sprintf(`github.com/gptscript-ai/credential as %s with "%s" as message and "%s" as env and %s as field`, - env.Name, env.Description, env.Name, env.Name)) - } else { - agent.Spec.Env = append(agent.Spec.Env, fmt.Sprintf("%s=%s", env.Name, env.Value)) - } - } - if step := opts.Step; step != nil { if step.Cache != nil { agent.Spec.Manifest.Cache = step.Cache @@ -104,9 +94,7 @@ func Workflow(ctx context.Context, c kclient.Client, wf *v1.Workflow, opts Workf agent.Spec.Manifest.Prompt = v1.DefaultWorkflowAgentPrompt } - if opts.Input != "" { - agent.Spec.Manifest.Prompt = fmt.Sprintf("WORKFLOW INPUT: %s\nEND WORKFLOW INPUT\n\n%s", opts.Input, agent.Spec.Manifest.Prompt) - } - + agent.Spec.Env = append(agent.Spec.Env, "WORKFLOW_INPUT="+opts.Input) + agent.Spec.SystemTools = append(agent.Spec.SystemTools, system.WorkflowTool) return &agent, nil } diff --git a/pkg/smtp/smtp.go b/pkg/smtp/smtp.go index d404c92d..fc190fb2 100644 --- a/pkg/smtp/smtp.go +++ b/pkg/smtp/smtp.go @@ -3,8 +3,8 @@ package smtp import ( "bytes" "context" - "crypto/sha256" "encoding/base64" + "encoding/json" "fmt" "io" "mime" @@ -87,8 +87,13 @@ func (s *Server) handler(_ net.Addr, from string, to []string, data []byte) erro continue } + ns, name, ok := strings.Cut(name, ".") + if !ok { + log.Infof("Skipping mail for %s: no namespace found", toAddr.Address) + } + var emailReceiver v1.EmailReceiver - if err := alias.Get(s.ctx, s.c, &emailReceiver, "", name); apierror.IsNotFound(err) { + if err := alias.Get(s.ctx, s.c, &emailReceiver, ns, name); apierror.IsNotFound(err) { log.Infof("Skipping mail for %s: no receiver found", toAddr.Address) continue } else if err != nil { @@ -108,7 +113,7 @@ func (s *Server) handler(_ net.Addr, from string, to []string, data []byte) erro } } - if err := s.dispatchEmail(emailReceiver, fromAddress.Address, body); err != nil { + if err := s.dispatchEmail(emailReceiver, body, message); err != nil { return fmt.Errorf("dispatch email: %w", err) } } @@ -160,34 +165,43 @@ func getBody(message *mail.Message) (string, error) { return "", fmt.Errorf("failed to find text/plain body: %s", mediaType) } -func (s *Server) dispatchEmail(email v1.EmailReceiver, from, body string) error { - var input strings.Builder - _, _ = input.WriteString("You are being called from an email from ") - _, _ = input.WriteString(from) - _, _ = input.WriteString(". With the body:\n\n") - _, _ = input.WriteString("START BODY\n") - _, _ = input.WriteString(body) - _, _ = input.WriteString("\nEND BODY\n\n") +func (s *Server) dispatchEmail(email v1.EmailReceiver, body string, message *mail.Message) error { + var input struct { + Type string `json:"type"` + From string `json:"from"` + To string `json:"to"` + Subject string `json:"subject"` + Body string `json:"body"` + } + + input.Type = "email" + input.From = message.Header.Get("From") + input.To = message.Header.Get("To") + input.Subject = message.Header.Get("Subject") + input.Body = body + + inputJSON, err := json.Marshal(input) + if err != nil { + return fmt.Errorf("marshal input: %w", err) + } var workflow v1.Workflow - if err := alias.Get(s.ctx, s.c, &workflow, "", email.Spec.Workflow); err != nil { + if err := alias.Get(s.ctx, s.c, &workflow, email.Namespace, email.Spec.Workflow); err != nil { return err } - err := s.c.Create(s.ctx, &v1.WorkflowExecution{ + return s.c.Create(s.ctx, &v1.WorkflowExecution{ ObjectMeta: metav1.ObjectMeta{ - // The name here is the sha256 hash of the body to handle multiple executions of the same webhook. - // That is, if the webhook is called twice with the same body, it will only be executed once. - Name: system.WorkflowExecutionPrefix + fmt.Sprintf("%x", sha256.Sum256([]byte(from+body))), - Namespace: workflow.Namespace, + GenerateName: system.WorkflowExecutionPrefix, + Namespace: workflow.Namespace, }, Spec: v1.WorkflowExecutionSpec{ WorkflowName: workflow.Name, EmailReceiverName: email.Name, - Input: input.String(), + ThreadName: workflow.Spec.ThreadName, + Input: string(inputJSON), }, }) - return kclient.IgnoreAlreadyExists(err) } func matches(address string, email v1.EmailReceiver) bool { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/agent.go b/pkg/storage/apis/otto.otto8.ai/v1/agent.go index 7c99b407..d8478f42 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/agent.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/agent.go @@ -65,6 +65,8 @@ func (a *Agent) FieldNames() []string { type AgentSpec struct { Manifest types.AgentManifest `json:"manifest,omitempty"` + SystemTools []string `json:"systemTools,omitempty"` + ContextInput string `json:"contextInput,omitempty"` InputFilters []string `json:"inputFilters,omitempty"` Credentials []string `json:"credentials,omitempty"` CredentialContextID string `json:"credentialContextID,omitempty"` diff --git a/pkg/storage/apis/otto.otto8.ai/v1/cronjob.go b/pkg/storage/apis/otto.otto8.ai/v1/cronjob.go index a36bae2c..416bb555 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/cronjob.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/cronjob.go @@ -26,14 +26,14 @@ func (c *CronJob) Has(field string) (exists bool) { func (c *CronJob) Get(field string) (value string) { switch field { - case "spec.userID": - return c.Spec.UserID + case "spec.threadName": + return c.Spec.ThreadName } return "" } func (c *CronJob) FieldNames() []string { - return []string{"spec.userID"} + return []string{"spec.threadName"} } func (*CronJob) GetColumns() [][]string { @@ -59,6 +59,7 @@ func (c *CronJob) DeleteRefs() []Ref { type CronJobSpec struct { types.CronJobManifest `json:",inline"` + ThreadName string `json:"threadName,omitempty"` } type CronJobStatus struct { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/emailaddress.go b/pkg/storage/apis/otto.otto8.ai/v1/emailaddress.go index 3c0a30fb..f314d2ef 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/emailaddress.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/emailaddress.go @@ -30,14 +30,14 @@ func (in *EmailReceiver) Has(field string) (exists bool) { func (in *EmailReceiver) Get(field string) (value string) { switch field { - case "spec.userID": - return in.Spec.UserID + case "spec.threadName": + return in.Spec.ThreadName } return "" } func (in *EmailReceiver) FieldNames() []string { - return []string{"spec.userID"} + return []string{"spec.threadName"} } func (in *EmailReceiver) GetAliasName() string { @@ -81,6 +81,7 @@ func (in *EmailReceiver) DeleteRefs() []Ref { type EmailReceiverSpec struct { types.EmailReceiverManifest `json:",inline"` + ThreadName string `json:"threadName,omitempty"` } type EmailReceiverStatus struct { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/thread.go b/pkg/storage/apis/otto.otto8.ai/v1/thread.go index ebbffe85..5f545fd7 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/thread.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/thread.go @@ -66,6 +66,7 @@ type ThreadSpec struct { TextEmbeddingModel string `json:"textEmbeddingModel,omitempty"` SystemTask bool `json:"systemTask,omitempty"` Abort bool `json:"abort,omitempty"` + Env []string `json:"env,omitempty"` } func (in *Thread) DeleteRefs() []Ref { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/webhook.go b/pkg/storage/apis/otto.otto8.ai/v1/webhook.go index 56cca1f8..830c3899 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/webhook.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/webhook.go @@ -25,7 +25,7 @@ type Webhook struct { } func (w *Webhook) FieldNames() []string { - return []string{"spec.userID"} + return []string{"spec.threadName"} } func (w *Webhook) Has(field string) (exists bool) { @@ -34,8 +34,8 @@ func (w *Webhook) Has(field string) (exists bool) { func (w *Webhook) Get(field string) (value string) { switch field { - case "spec.userID": - return w.Spec.UserID + case "spec.threadName": + return w.Spec.ThreadName } return "" } @@ -83,6 +83,7 @@ func (w *Webhook) DeleteRefs() []Ref { type WebhookSpec struct { types.WebhookManifest `json:",inline"` TokenHash []byte `json:"tokenHash,omitempty"` + ThreadName string } type WebhookStatus struct { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/workflow.go b/pkg/storage/apis/otto.otto8.ai/v1/workflow.go index cdc64618..db96a179 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/workflow.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/workflow.go @@ -30,18 +30,15 @@ func (in *Workflow) Has(field string) (exists bool) { func (in *Workflow) Get(field string) (value string) { switch field { - case "spec.agentName": - return in.Spec.AgentName - case "spec.userID": - return in.Spec.UserID + case "spec.threadName": + return in.Spec.ThreadName } return "" } func (in *Workflow) FieldNames() []string { return []string{ - "spec.agentName", - "spec.userID", + "spec.threadName", } } @@ -70,9 +67,21 @@ func (in *Workflow) SetAliasObservedGeneration(gen int64) { } type WorkflowSpec struct { - AgentName string `json:"agentName,omitempty"` - UserID string `json:"userID,omitempty"` - Manifest types.WorkflowManifest `json:"manifest,omitempty"` + ThreadName string `json:"threadName,omitempty"` + Manifest types.WorkflowManifest `json:"manifest,omitempty"` + KnowledgeSetNames []string `json:"knowledgeSetNames,omitempty"` + WorkspaceName string `json:"workspaceName,omitempty"` +} + +func (in *Workflow) DeleteRefs() []Ref { + refs := []Ref{ + {ObjType: &Thread{}, Name: in.Spec.ThreadName}, + {ObjType: &Workspace{}, Name: in.Spec.WorkspaceName}, + } + for _, name := range in.Spec.KnowledgeSetNames { + refs = append(refs, Ref{ObjType: &KnowledgeSet{}, Name: name}) + } + return refs } type WorkflowStatus struct { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/workflowexecution.go b/pkg/storage/apis/otto.otto8.ai/v1/workflowexecution.go index 27832838..346fe789 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/workflowexecution.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/workflowexecution.go @@ -27,8 +27,8 @@ func (in *WorkflowExecution) Has(field string) bool { func (in *WorkflowExecution) Get(field string) string { if in != nil { switch field { - case "spec.userID": - return in.Spec.UserID + case "spec.threadName": + return in.Spec.ThreadName case "spec.webhookName": return in.Spec.WebhookName case "spec.cronJobName": @@ -44,7 +44,13 @@ func (in *WorkflowExecution) Get(field string) string { } func (in *WorkflowExecution) FieldNames() []string { - return []string{"spec.userID", "spec.webhookName", "spec.cronJobName", "spec.workflowName", "spec.parentRunName"} + return []string{ + "spec.threadName", + "spec.webhookName", + "spec.cronJobName", + "spec.workflowName", + "spec.parentRunName", + } } func (in *WorkflowExecution) GetColumns() [][]string { @@ -59,8 +65,9 @@ func (in *WorkflowExecution) GetColumns() [][]string { } type WorkflowExecutionSpec struct { - Input string `json:"input,omitempty"` - UserID string `json:"userID,omitempty"` + Input string `json:"input,omitempty"` + // ThreadName is the name of the thread that owns this execution, which is the same as the owning thread of the workflow. + ThreadName string `json:"threadName,omitempty"` WorkflowName string `json:"workflowName,omitempty"` WebhookName string `json:"webhookName,omitempty"` EmailReceiverName string `json:"emailReceiverName,omitempty"` diff --git a/pkg/storage/apis/otto.otto8.ai/v1/workspace.go b/pkg/storage/apis/otto.otto8.ai/v1/workspace.go index 21dc5429..623ed758 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/workspace.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/workspace.go @@ -44,13 +44,15 @@ func (in *Workspace) Get(field string) string { return in.Spec.ThreadName case "spec.knowledgeSetName": return in.Spec.KnowledgeSetName + case "status.workspaceID": + return in.Status.WorkspaceID } return "" } func (*Workspace) FieldNames() []string { - return []string{"spec.agentName", "spec.workflowName", "spec.threadName", "spec.knowledgeSetName"} + return []string{"spec.agentName", "spec.workflowName", "spec.threadName", "spec.knowledgeSetName", "status.workspaceID"} } var _ fields.Fields = (*Workspace)(nil) diff --git a/pkg/storage/apis/otto.otto8.ai/v1/zz_generated.deepcopy.go b/pkg/storage/apis/otto.otto8.ai/v1/zz_generated.deepcopy.go index 44110f33..70fe51ea 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/zz_generated.deepcopy.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/zz_generated.deepcopy.go @@ -73,6 +73,11 @@ func (in *AgentList) DeepCopyObject() runtime.Object { func (in *AgentSpec) DeepCopyInto(out *AgentSpec) { *out = *in in.Manifest.DeepCopyInto(&out.Manifest) + if in.SystemTools != nil { + in, out := &in.SystemTools, &out.SystemTools + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.InputFilters != nil { in, out := &in.InputFilters, &out.InputFilters *out = make([]string, len(*in)) @@ -1348,6 +1353,11 @@ func (in *ThreadSpec) DeepCopyInto(out *ThreadSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ThreadSpec. @@ -1769,6 +1779,11 @@ func (in *WorkflowList) DeepCopyObject() runtime.Object { func (in *WorkflowSpec) DeepCopyInto(out *WorkflowSpec) { *out = *in in.Manifest.DeepCopyInto(&out.Manifest) + if in.KnowledgeSetNames != nil { + in, out := &in.KnowledgeSetNames, &out.KnowledgeSetNames + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowSpec. diff --git a/pkg/storage/openapi/generated/openapi_generated.go b/pkg/storage/openapi/generated/openapi_generated.go index b8084731..e0b82433 100644 --- a/pkg/storage/openapi/generated/openapi_generated.go +++ b/pkg/storage/openapi/generated/openapi_generated.go @@ -35,6 +35,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/otto8-ai/otto8/apiclient/types.EmailReceiver": schema_otto8_ai_otto8_apiclient_types_EmailReceiver(ref), "github.com/otto8-ai/otto8/apiclient/types.EmailReceiverList": schema_otto8_ai_otto8_apiclient_types_EmailReceiverList(ref), "github.com/otto8-ai/otto8/apiclient/types.EmailReceiverManifest": schema_otto8_ai_otto8_apiclient_types_EmailReceiverManifest(ref), + "github.com/otto8-ai/otto8/apiclient/types.EnvVar": schema_otto8_ai_otto8_apiclient_types_EnvVar(ref), "github.com/otto8-ai/otto8/apiclient/types.ErrHTTP": schema_otto8_ai_otto8_apiclient_types_ErrHTTP(ref), "github.com/otto8-ai/otto8/apiclient/types.File": schema_otto8_ai_otto8_apiclient_types_File(ref), "github.com/otto8-ai/otto8/apiclient/types.FileList": schema_otto8_ai_otto8_apiclient_types_FileList(ref), @@ -70,6 +71,8 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/otto8-ai/otto8/apiclient/types.Step": schema_otto8_ai_otto8_apiclient_types_Step(ref), "github.com/otto8-ai/otto8/apiclient/types.StepTemplateInvoke": schema_otto8_ai_otto8_apiclient_types_StepTemplateInvoke(ref), "github.com/otto8-ai/otto8/apiclient/types.SubFlow": schema_otto8_ai_otto8_apiclient_types_SubFlow(ref), + "github.com/otto8-ai/otto8/apiclient/types.Table": schema_otto8_ai_otto8_apiclient_types_Table(ref), + "github.com/otto8-ai/otto8/apiclient/types.TableList": schema_otto8_ai_otto8_apiclient_types_TableList(ref), "github.com/otto8-ai/otto8/apiclient/types.Task": schema_otto8_ai_otto8_apiclient_types_Task(ref), "github.com/otto8-ai/otto8/apiclient/types.TaskEmail": schema_otto8_ai_otto8_apiclient_types_TaskEmail(ref), "github.com/otto8-ai/otto8/apiclient/types.TaskIf": schema_otto8_ai_otto8_apiclient_types_TaskIf(ref), @@ -99,7 +102,8 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/otto8-ai/otto8/apiclient/types.While": schema_otto8_ai_otto8_apiclient_types_While(ref), "github.com/otto8-ai/otto8/apiclient/types.Workflow": schema_otto8_ai_otto8_apiclient_types_Workflow(ref), "github.com/otto8-ai/otto8/apiclient/types.WorkflowCall": schema_otto8_ai_otto8_apiclient_types_WorkflowCall(ref), - "github.com/otto8-ai/otto8/apiclient/types.WorkflowEnv": schema_otto8_ai_otto8_apiclient_types_WorkflowEnv(ref), + "github.com/otto8-ai/otto8/apiclient/types.WorkflowExecution": schema_otto8_ai_otto8_apiclient_types_WorkflowExecution(ref), + "github.com/otto8-ai/otto8/apiclient/types.WorkflowExecutionList": schema_otto8_ai_otto8_apiclient_types_WorkflowExecutionList(ref), "github.com/otto8-ai/otto8/apiclient/types.WorkflowList": schema_otto8_ai_otto8_apiclient_types_WorkflowList(ref), "github.com/otto8-ai/otto8/apiclient/types.WorkflowManifest": schema_otto8_ai_otto8_apiclient_types_WorkflowManifest(ref), "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1.Agent": schema_storage_apis_ottootto8ai_v1_Agent(ref), @@ -539,12 +543,25 @@ func schema_otto8_ai_otto8_apiclient_types_AgentManifest(ref common.ReferenceCal Format: "", }, }, + "env": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.EnvVar"), + }, + }, + }, + }, + }, }, - Required: []string{"name", "icons", "description", "temperature", "cache", "alias", "prompt", "knowledgeDescription", "agents", "workflows", "tools", "availableThreadTools", "defaultThreadTools", "oauthApps", "maxThreadTools", "params", "model"}, + Required: []string{"name", "icons", "description", "temperature", "cache", "alias", "prompt", "knowledgeDescription", "agents", "workflows", "tools", "availableThreadTools", "defaultThreadTools", "oauthApps", "maxThreadTools", "params", "model", "env"}, }, }, Dependencies: []string{ - "github.com/otto8-ai/otto8/apiclient/types.AgentIcons"}, + "github.com/otto8-ai/otto8/apiclient/types.AgentIcons", "github.com/otto8-ai/otto8/apiclient/types.EnvVar"}, } } @@ -882,12 +899,6 @@ func schema_otto8_ai_otto8_apiclient_types_CronJobManifest(ref common.ReferenceC Format: "", }, }, - "userID": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, "taskSchedule": { SchemaProps: spec.SchemaProps{ Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Schedule"), @@ -1070,12 +1081,6 @@ func schema_otto8_ai_otto8_apiclient_types_EmailReceiverManifest(ref common.Refe Format: "", }, }, - "userID": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, "workflow": { SchemaProps: spec.SchemaProps{ Default: "", @@ -1104,6 +1109,40 @@ func schema_otto8_ai_otto8_apiclient_types_EmailReceiverManifest(ref common.Refe } } +func schema_otto8_ai_otto8_apiclient_types_EnvVar(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "name": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "value": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "description": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"name", "value", "description"}, + }, + }, + } +} + func schema_otto8_ai_otto8_apiclient_types_ErrHTTP(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -2207,6 +2246,13 @@ func schema_otto8_ai_otto8_apiclient_types_Progress(ref common.ReferenceCallback Format: "", }, }, + "parentRunID": { + SchemaProps: spec.SchemaProps{ + Description: "ParentRunID is the parent run of the run that is specified in the RunID field", + Type: []string{"string"}, + Format: "", + }, + }, "time": { SchemaProps: spec.SchemaProps{ Description: "Time is the time the event was generated", @@ -2772,6 +2818,54 @@ func schema_otto8_ai_otto8_apiclient_types_SubFlow(ref common.ReferenceCallback) } } +func schema_otto8_ai_otto8_apiclient_types_Table(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "name": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"name"}, + }, + }, + } +} + +func schema_otto8_ai_otto8_apiclient_types_TableList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Table"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "github.com/otto8-ai/otto8/apiclient/types.Table"}, + } +} + func schema_otto8_ai_otto8_apiclient_types_Task(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -2996,6 +3090,12 @@ func schema_otto8_ai_otto8_apiclient_types_TaskRun(ref common.ReferenceCallback) Format: "", }, }, + "input": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, "task": { SchemaProps: spec.SchemaProps{ Default: map[string]interface{}{}, @@ -3207,6 +3307,20 @@ func schema_otto8_ai_otto8_apiclient_types_Thread(ref common.ReferenceCallback) Format: "", }, }, + "env": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, Required: []string{"Metadata", "ThreadManifest"}, }, @@ -3311,6 +3425,12 @@ func schema_otto8_ai_otto8_apiclient_types_ToolCall(ref common.ReferenceCallback Format: "", }, }, + "output": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, "metadata": { SchemaProps: spec.SchemaProps{ Type: []string{"object"}, @@ -3723,12 +3843,6 @@ func schema_otto8_ai_otto8_apiclient_types_WebhookManifest(ref common.ReferenceC Format: "", }, }, - "userID": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, }, Required: []string{"name", "description", "alias", "workflow", "headers", "secret", "validationHeader"}, }, @@ -3896,37 +4010,81 @@ func schema_otto8_ai_otto8_apiclient_types_WorkflowCall(ref common.ReferenceCall } } -func schema_otto8_ai_otto8_apiclient_types_WorkflowEnv(ref common.ReferenceCallback) common.OpenAPIDefinition { +func schema_otto8_ai_otto8_apiclient_types_WorkflowExecution(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ Type: []string{"object"}, Properties: map[string]spec.Schema{ - "name": { + "Metadata": { SchemaProps: spec.SchemaProps{ - Default: "", - Type: []string{"string"}, - Format: "", + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Metadata"), }, }, - "value": { + "workflow": { SchemaProps: spec.SchemaProps{ - Default: "", - Type: []string{"string"}, - Format: "", + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.WorkflowManifest"), }, }, - "description": { + "startTime": { + SchemaProps: spec.SchemaProps{ + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Time"), + }, + }, + "endTime": { + SchemaProps: spec.SchemaProps{ + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Time"), + }, + }, + "input": { SchemaProps: spec.SchemaProps{ Default: "", Type: []string{"string"}, Format: "", }, }, + "error": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, - Required: []string{"name", "value", "description"}, + Required: []string{"Metadata", "startTime", "endTime", "input"}, }, }, + Dependencies: []string{ + "github.com/otto8-ai/otto8/apiclient/types.Metadata", "github.com/otto8-ai/otto8/apiclient/types.Time", "github.com/otto8-ai/otto8/apiclient/types.WorkflowManifest"}, + } +} + +func schema_otto8_ai_otto8_apiclient_types_WorkflowExecutionList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.WorkflowExecution"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "github.com/otto8-ai/otto8/apiclient/types.WorkflowExecution"}, } } @@ -4129,20 +4287,6 @@ func schema_otto8_ai_otto8_apiclient_types_WorkflowManifest(ref common.Reference Format: "", }, }, - "credentials": { - SchemaProps: spec.SchemaProps{ - Type: []string{"array"}, - Items: &spec.SchemaOrArray{ - Schema: &spec.Schema{ - SchemaProps: spec.SchemaProps{ - Default: "", - Type: []string{"string"}, - Format: "", - }, - }, - }, - }, - }, "env": { SchemaProps: spec.SchemaProps{ Type: []string{"array"}, @@ -4150,7 +4294,7 @@ func schema_otto8_ai_otto8_apiclient_types_WorkflowManifest(ref common.Reference Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ Default: map[string]interface{}{}, - Ref: ref("github.com/otto8-ai/otto8/apiclient/types.WorkflowEnv"), + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.EnvVar"), }, }, }, @@ -4177,11 +4321,11 @@ func schema_otto8_ai_otto8_apiclient_types_WorkflowManifest(ref common.Reference }, }, }, - Required: []string{"name", "icons", "description", "temperature", "cache", "alias", "prompt", "knowledgeDescription", "agents", "workflows", "tools", "availableThreadTools", "defaultThreadTools", "oauthApps", "maxThreadTools", "params", "model", "credentials", "env", "steps", "output"}, + Required: []string{"name", "icons", "description", "temperature", "cache", "alias", "prompt", "knowledgeDescription", "agents", "workflows", "tools", "availableThreadTools", "defaultThreadTools", "oauthApps", "maxThreadTools", "params", "model", "env", "steps", "output"}, }, }, Dependencies: []string{ - "github.com/otto8-ai/otto8/apiclient/types.AgentIcons", "github.com/otto8-ai/otto8/apiclient/types.Step", "github.com/otto8-ai/otto8/apiclient/types.WorkflowEnv"}, + "github.com/otto8-ai/otto8/apiclient/types.AgentIcons", "github.com/otto8-ai/otto8/apiclient/types.EnvVar", "github.com/otto8-ai/otto8/apiclient/types.Step"}, } } @@ -4291,6 +4435,26 @@ func schema_storage_apis_ottootto8ai_v1_AgentSpec(ref common.ReferenceCallback) Ref: ref("github.com/otto8-ai/otto8/apiclient/types.AgentManifest"), }, }, + "systemTools": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + "contextInput": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, "inputFilters": { SchemaProps: spec.SchemaProps{ Type: []string{"array"}, @@ -4661,15 +4825,15 @@ func schema_storage_apis_ottootto8ai_v1_CronJobSpec(ref common.ReferenceCallback Format: "", }, }, - "userID": { + "taskSchedule": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Schedule"), }, }, - "taskSchedule": { + "threadName": { SchemaProps: spec.SchemaProps{ - Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Schedule"), + Type: []string{"string"}, + Format: "", }, }, }, @@ -4960,12 +5124,6 @@ func schema_storage_apis_ottootto8ai_v1_EmailReceiverSpec(ref common.ReferenceCa Format: "", }, }, - "userID": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, "workflow": { SchemaProps: spec.SchemaProps{ Default: "", @@ -4987,6 +5145,12 @@ func schema_storage_apis_ottootto8ai_v1_EmailReceiverSpec(ref common.ReferenceCa }, }, }, + "threadName": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"name", "description", "workflow"}, }, @@ -6791,6 +6955,20 @@ func schema_storage_apis_ottootto8ai_v1_ThreadSpec(ref common.ReferenceCallback) Format: "", }, }, + "env": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, }, }, @@ -7249,20 +7427,21 @@ func schema_storage_apis_ottootto8ai_v1_WebhookSpec(ref common.ReferenceCallback Format: "", }, }, - "userID": { + "tokenHash": { SchemaProps: spec.SchemaProps{ Type: []string{"string"}, - Format: "", + Format: "byte", }, }, - "tokenHash": { + "ThreadName": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "byte", + Default: "", + Type: []string{"string"}, + Format: "", }, }, }, - Required: []string{"name", "description", "alias", "workflow", "headers", "secret", "validationHeader"}, + Required: []string{"name", "description", "alias", "workflow", "headers", "secret", "validationHeader", "ThreadName"}, }, }, } @@ -7451,10 +7630,11 @@ func schema_storage_apis_ottootto8ai_v1_WorkflowExecutionSpec(ref common.Referen Format: "", }, }, - "userID": { + "threadName": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", + Description: "ThreadName is the name of the thread that owns this execution, which is the same as the owning thread of the workflow.", + Type: []string{"string"}, + Format: "", }, }, "workflowName": { @@ -7631,13 +7811,7 @@ func schema_storage_apis_ottootto8ai_v1_WorkflowSpec(ref common.ReferenceCallbac SchemaProps: spec.SchemaProps{ Type: []string{"object"}, Properties: map[string]spec.Schema{ - "agentName": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, - "userID": { + "threadName": { SchemaProps: spec.SchemaProps{ Type: []string{"string"}, Format: "", @@ -7649,6 +7823,26 @@ func schema_storage_apis_ottootto8ai_v1_WorkflowSpec(ref common.ReferenceCallbac Ref: ref("github.com/otto8-ai/otto8/apiclient/types.WorkflowManifest"), }, }, + "knowledgeSetNames": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + "workspaceName": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/system/tools.go b/pkg/system/tools.go index 47d1e264..49f17092 100644 --- a/pkg/system/tools.go +++ b/pkg/system/tools.go @@ -9,6 +9,7 @@ const ( WebsiteCleanTool = "website-cleaner" ResultFormatterTool = "result-formatter" ModelProviderTool = "otto8-model-provider" + WorkflowTool = "workflow" DefaultNamespace = "default" ) diff --git a/ui/user/src/lib/actions/textarea.ts b/ui/user/src/lib/actions/textarea.ts index f155c0cd..f134f39a 100644 --- a/ui/user/src/lib/actions/textarea.ts +++ b/ui/user/src/lib/actions/textarea.ts @@ -1,9 +1,6 @@ function resize(node: HTMLTextAreaElement) { node.style.height = 'auto'; - // not totally sure why 4 is needed here, otherwise the textarea is too small and we - // get a scrollbar - node.style.height = (node.scrollHeight < 44 ? 44 : node.scrollHeight) + 4 + 'px'; - console.log('resize', node.scrollHeight, node.style.height); + node.style.height = node.scrollHeight + 'px'; } export function autoHeight(node: HTMLTextAreaElement) { @@ -11,4 +8,5 @@ export function autoHeight(node: HTMLTextAreaElement) { node.onfocus = () => resize(node); node.oninput = () => resize(node); node.onresize = () => resize(node); + node.onchange = () => resize(node); } diff --git a/ui/user/src/lib/components/Editors.svelte b/ui/user/src/lib/components/Editors.svelte index cdaa0639..ee875e26 100644 --- a/ui/user/src/lib/components/Editors.svelte +++ b/ui/user/src/lib/components/Editors.svelte @@ -1,6 +1,7 @@
- {#if EditorService.items.length > 1 || !EditorService.items[0].task} + {#if EditorService.items.length > 1 || (!EditorService.items[0].task && !EditorService.items[0].table)}
@@ -76,6 +66,8 @@
{#if file.name.toLowerCase().endsWith('.md')} + {:else if file.table} + {:else if file.task} - import type { Messages, Progress } from '$lib/services'; - import { ChatService } from '$lib/services'; - import Message from '$lib/components/messages/Message.svelte'; - import { fade } from 'svelte/transition'; - import { newMessageSource } from '$lib/services/chat/messagesource.js'; - import { onDestroy } from 'svelte'; - import { currentAssistant } from '$lib/stores'; - - interface Props { - onMessages?: (messages: Messages) => void; - onError: (err: Error) => void; - onLoadFile: (filename: string) => void; - } - - let { onMessages, onError, onLoadFile }: Props = $props(); - - let progressEvents: Progress[] = []; - let replayComplete = false; - let messages: Messages = $state({ messages: [], inProgress: false }); - let close: () => void | undefined; - - $effect(() => { - if ($currentAssistant.id && !close) { - close = newMessageSource($currentAssistant.id, handleMessage, { - onError - }); - } - }); - - onDestroy(() => { - close?.(); - }); - - function handleMessage(progress: Progress) { - progressEvents = [...progressEvents, progress]; - if (!replayComplete) { - replayComplete = progressEvents.find((e) => e.replayComplete) !== undefined; - } - - if (!replayComplete) { - return; - } - - messages = ChatService.progressToMessages(progressEvents); - - // forward the messages to the parent component - onMessages?.(messages); - } - - -
- {#each messages.messages as msg} - {#if !msg.ignore} - - {/if} - {/each} -
diff --git a/ui/user/src/lib/components/Navbar.svelte b/ui/user/src/lib/components/Navbar.svelte index 28a3ccbf..6850a3e4 100644 --- a/ui/user/src/lib/components/Navbar.svelte +++ b/ui/user/src/lib/components/Navbar.svelte @@ -6,13 +6,26 @@ import Files from '$lib/components/navbar/Files.svelte'; import KnowledgeFile from '$lib/components/navbar/KnowledgeFiles.svelte'; import Tasks from '$lib/components/navbar/Tasks.svelte'; - import { onMount } from 'svelte'; + import Tables from '$lib/components/navbar/Tables.svelte'; + import { tools } from '$lib/stores'; - let showTasks = $state(false); + function hasOptionalTools() { + for (const tool of $tools.items) { + if (!tool.builtin) { + return true; + } + } + return false; + } - onMount(() => { - showTasks = window.location.href.split('?')[1]?.includes('tasks=1'); - }); + function hasTool(tool: string) { + for (const t of $tools.items) { + if (t.id === tool) { + return t.enabled || t.builtin; + } + } + return false; + }
+

{tableName}

+ +
+ { + await ChatService.invoke($currentAssistant.id, { + prompt: `In the table database '${tableName}' do the following instruction:\n${i.prompt}` + }); + }} + /> +
+ + + +
+
+ + + {#each data?.columns ?? [] as column} + + {/each} + + + + {#each data?.rows ?? [] as row} + + {#each data?.columns ?? [] as col} + + {/each} + + {/each} + +
{column}
{row[col]}
+
+
+ + diff --git a/ui/user/src/lib/components/messages/Input.svelte b/ui/user/src/lib/components/messages/Input.svelte index 985ae276..3733624c 100644 --- a/ui/user/src/lib/components/messages/Input.svelte +++ b/ui/user/src/lib/components/messages/Input.svelte @@ -1,24 +1,24 @@ - + {/if} +
{#if description}

{description}

{/if} diff --git a/ui/user/src/lib/components/navbar/Profile.svelte b/ui/user/src/lib/components/navbar/Profile.svelte index f8b6a3e4..3fdce142 100644 --- a/ui/user/src/lib/components/navbar/Profile.svelte +++ b/ui/user/src/lib/components/navbar/Profile.svelte @@ -1,101 +1,54 @@ - -
- - -
-
-

- {$profile.email || 'Anonymous'} -

-
-
- {#if credPromise !== undefined} - {#await credPromise} -

- Credentials -

- {:then credentials} -

Credentials

- {#if credentials?.items.length > 0} -
    - {#each credentials.items as cred} -
  • - {cred.name} - -
  • - {/each} -
- {:else} - No credentials - {/if} - {/await} + {/snippet} + {#snippet body()} +
+ {#if credentials && credentials?.items.length > 0} + Credentials + {#each credentials.items as cred} +
+ {cred.name} + +
+ {/each} + {:else} + No credentials {/if}
- -
-
+
+ {/snippet} + diff --git a/ui/user/src/lib/components/navbar/Tables.svelte b/ui/user/src/lib/components/navbar/Tables.svelte new file mode 100644 index 00000000..4594bd85 --- /dev/null +++ b/ui/user/src/lib/components/navbar/Tables.svelte @@ -0,0 +1,50 @@ + + + + {#snippet icon()} + + {/snippet} + {#snippet body()} + {#if tables.tables.length === 0} +

No files

+ {:else} +
    + {#each tables.tables as table} +
  • +
    +
    +{:else} +
    + {@render runButton()} +
    {/if} + + { + deleteTask(toDelete); + toDelete = ''; + }} + oncancel={() => (toDelete = '')} +/> + + diff --git a/ui/user/src/lib/components/tasks/Schedule.svelte b/ui/user/src/lib/components/tasks/Schedule.svelte index 36b7c4fe..b325c377 100644 --- a/ui/user/src/lib/components/tasks/Schedule.svelte +++ b/ui/user/src/lib/components/tasks/Schedule.svelte @@ -21,10 +21,10 @@ }: Props = $props(); +

    Schedule

    import Self from './Step.svelte'; - import type { TaskStep } from '$lib/services'; - import type { StepMessages } from '$lib/stores'; + import { ChatService, type Messages, type Task, type TaskStep } from '$lib/services'; import Message from '$lib/components/messages/Message.svelte'; - import { X } from '$lib/icons'; - import { CheckCircle, RefreshCcw, Save, Undo, XCircle } from 'lucide-svelte'; + import { Plus, Trash } from '$lib/icons'; + import { LoaderCircle, OctagonX, Play, RefreshCcw, Save, Undo } from 'lucide-svelte'; import { tick } from 'svelte'; import { autoHeight } from '$lib/actions/textarea'; + import Modal from '$lib/components/Modal.svelte'; + import { currentAssistant } from '$lib/stores'; interface Props { parentStale?: boolean; onChange?: (steps: TaskStep[]) => void | Promise; run?: (step: TaskStep, steps?: TaskStep[]) => Promise; - steps: TaskStep[]; + task: Task; index: number; + pending?: boolean; editMode?: boolean; - stepMessages?: StepMessages; + stepMessages?: Map; } let { parentStale, onChange, run, - steps, + task, index, editMode = false, + pending, stepMessages }: Props = $props(); - let step = $derived(steps[index]); - let messages = $derived(stepMessages?.messages.get(step.id)?.messages ?? []); - let currentValue = $state(textValue(steps[index])); - let dirty = $derived(textValue(steps[index]) !== currentValue); - let stale: boolean = $derived(dirty || parentStale || false); - let isIf: boolean = $derived(currentValue?.startsWith('If ') || false); - let isThenPath: boolean = $derived.by(() => { - for (const message of messages) { - return message.message.join('').trim().toLowerCase() == 'true'; - } - return false; - }); - let isElsePath: boolean = $derived.by(() => { - for (const message of messages) { - return message.message.join('').trim().toLowerCase() == 'false'; - } - return false; - }); + let step = $derived(task.steps[index]); + let messages = $derived(stepMessages?.get(step.id)?.messages ?? []); + let lastSeenValue = $state(''); + let currentValue = $state(task.steps[index].step); + let dirty = $derived(task.steps[index].step !== currentValue); + let stale: boolean = $derived(dirty || parentStale || !parentMatches()); + let running = $derived(stepMessages?.get(step.id)?.inProgress ?? false); + let toDelete: boolean | undefined = $state(); + let nextStep: Self | undefined = $state(); $effect(() => { - if (!currentValue?.trimStart().toLowerCase().startsWith('if ')) { - return; - } - - const newValue = currentValue.trimStart().replace(/^[iI][fF]/, 'If'); - if (newValue !== currentValue) { - currentValue = newValue; + if (editMode) { + if (lastSeenValue !== step.step) { + currentValue = step.step; + lastSeenValue = currentValue ?? ''; + } + } else { + if (currentValue !== step.step) { + currentValue = step.step; + } + if (lastSeenValue !== '') { + lastSeenValue = ''; + } } + }); - if (steps.length < index + 2) { - console.log('if detected'); - steps.push(createStep()); + function parentMatches() { + if (running) { + return true; } - - if (!step.if) { - console.log('creating if'); - step.if = { - condition: '', - steps: [createStep()], - else: [createStep()] - }; + if (index === 0) { + return true; } - }); - - function textValue(step: TaskStep) { - return step.if ? 'If ' + step.if.condition : step.step; + const lastRun = stepMessages + ?.get(task.steps[index - 1].id) + ?.messages.findLast((msg) => msg.runID); + const currentRun = stepMessages + ?.get(task.steps[index].id) + ?.messages.find((msg) => msg.parentRunID); + return lastRun?.runID === currentRun?.parentRunID; } async function deleteStep() { - const newSteps = [...steps]; + toDelete = undefined; + const newSteps = [...task.steps]; newSteps.splice(index, 1); await onChange?.(newSteps); } @@ -88,56 +86,29 @@ } } - function firstLine(e: KeyboardEvent) { - return ( - e.target instanceof HTMLTextAreaElement && - e.target.value.lastIndexOf('\n', e.target.selectionStart - 1) === -1 && - e.target.selectionStart === e.target.selectionEnd - ); - } - - function lastLine(e: KeyboardEvent) { - return ( - e.target instanceof HTMLTextAreaElement && - e.target.value.indexOf('\n', e.target.selectionStart) === -1 && - e.target.selectionStart === e.target.selectionEnd - ); - } - - function lastChar(e: KeyboardEvent) { - return ( - e.target instanceof HTMLTextAreaElement && - e.target.selectionStart === e.target.value.length && - e.target.selectionStart === e.target.selectionEnd - ); - } - function synchronized(newSteps?: TaskStep[]): TaskStep[] | undefined { if (!newSteps && !dirty) { return; } - const retSteps = newSteps ?? [...steps]; + const retSteps = newSteps ?? [...task.steps]; if (dirty) { - const newStep = { ...step }; - if (currentValue?.startsWith('If ')) { - newStep.step = ''; - if (!newStep.if) { - newStep.if = { - condition: '' - }; - } - newStep.if.condition = currentValue.replace(/^If /, '').trim(); - } else { - newStep.step = currentValue; - newStep.if = undefined; - } - retSteps[index] = newStep; + retSteps[index] = { + ...step, + step: currentValue + }; } return retSteps; } + export async function saveAll() { + await save(); + if (nextStep) { + await nextStep.saveAll(); + } + } + async function save(steps?: TaskStep[]) { const newSteps = synchronized(steps); if (newSteps) { @@ -145,93 +116,47 @@ } } + async function addStep() { + const newStep = createStep(); + const newSteps = [...task.steps]; + newSteps.splice(index + 1, 0, newStep); + await save(newSteps); + await tick(); + document.getElementById('step' + newStep.id)?.focus(); + } + async function onkeydown(e: KeyboardEvent) { - if (e.key === 'Enter' && e.ctrlKey) { + if (e.key === 'Enter' && !e.ctrlKey && !e.shiftKey) { e.preventDefault(); await doRun(); - } else if (e.key === 'ArrowUp' && firstLine(e) && e.target instanceof HTMLTextAreaElement) { - const oldIndex = e.target.selectionStart; - setTimeout(() => { - if (e.target instanceof HTMLTextAreaElement && e.target.selectionStart === 0) { - const prevNode = document.getElementById('step' + steps[index - 1]?.id); - if (prevNode) { - e.target.selectionStart = oldIndex; - e.target.selectionEnd = oldIndex; - prevNode.focus(); - } - } - }); - } else if (e.key === 'ArrowDown' && lastLine(e) && e.target instanceof HTMLTextAreaElement) { - const oldIndex = e.target.selectionStart; - setTimeout(() => { - if ( - e.target instanceof HTMLTextAreaElement && - e.target.selectionStart === e.target.value.length - ) { - const nextNode = document.getElementById('step' + steps[index + 1]?.id); - if (nextNode) { - e.target.selectionStart = oldIndex; - e.target.selectionEnd = oldIndex; - nextNode.focus(); - } - } - }); - } else if ( - e.key === 'Enter' && - !e.ctrlKey && - !e.shiftKey && - lastChar(e) && - e.target instanceof HTMLTextAreaElement && - e.target.value.trim() !== '' - ) { - const newStep = createStep(); - const newSteps = [...steps]; - newSteps.splice(index + 1, 0, newStep); + } else if (e.key === 'Enter' && e.ctrlKey && !e.shiftKey) { e.preventDefault(); - await save(newSteps); - await tick(); - document.getElementById('step' + newStep.id)?.focus(); - } else if ( - e.key === 'Backspace' && - e.target instanceof HTMLTextAreaElement && - e.target.value === '' && - steps.length > 1 - ) { - e.preventDefault(); - await deleteStep(); - document.getElementById('step' + steps[index - 1]?.id)?.focus(); + await addStep(); } } - function setIfSteps(ifSteps?: TaskStep[], elseSteps?: TaskStep[]): TaskStep[] { - const newStep: TaskStep = { - ...step, - if: { - ...(step.if ?? { condition: '' }), - steps: ifSteps, - else: elseSteps - } - }; - const newSteps = [...steps]; - newSteps[index] = newStep; - return newSteps; - } - function createStep(): TaskStep { return { id: Math.random().toString(36).substring(7), step: '' }; } async function doRun() { + if ((running || pending) && editMode) { + await ChatService.abort($currentAssistant.id, { + taskID: task.id, + runID: 'editor' + }); + return; + } + if (running || pending || !currentValue || currentValue?.trim() === '') { + return; + } await run?.(step, synchronized()); } -
  • +
  • {#if editMode} - {#if isIf} - If - {/if}
    {#if dirty} -
    - {:else if isIf} -
    - If - {currentValue?.slice(3)} + {#if currentValue?.trim() !== ''} + + {/if}
    {:else} {currentValue} {/if}
    {#if messages.length > 0} -
    +
    {#each messages as msg} - + {#if !msg.sent} + + {/if} {/each} + {#if stale} +
    + {/if}
    {/if} - - {#if steps.length > index + 1 && isIf} - {#if step?.if?.steps?.length} - {#key step.id + ' then'} -
      - Then - {#if isThenPath} - - {/if} - { - await onChange?.(setIfSteps(steps, step?.if?.else)); - }} - run={async (step, steps) => { - if (steps) { - await run?.(step, setIfSteps(steps, step?.if?.else)); - } else { - await run?.(step, steps); - } - }} - {editMode} - steps={step.if.steps} - index={0} - {stepMessages} - parentStale={stale} - /> -
    - {/key} - {/if} - {#if step?.if?.else?.length} - {#key step.id + ' else'} -
      - Else - {#if isElsePath} - - {/if} - { - await onChange?.(setIfSteps(step?.if?.steps, steps)); - }} - run={async (step, steps) => { - if (steps) { - await run?.(step, setIfSteps(step?.if?.steps, steps)); - } else { - await run?.(step, steps); - } - }} - {editMode} - steps={step.if.else} - index={0} - {stepMessages} - parentStale={stale} - /> -
    - {/key} - {/if} - {/if}
  • -{#if steps.length > index + 1} - {#key steps[index + 1].id} +{#if task.steps.length > index + 1} + {#key task.steps[index + 1].id} - ol { - @apply list-[lower-alpha] pl-2; - } - - li { - @apply ms-6; - } - - li::marker { - @apply font-semibold; - } - - .keyword { - @apply rounded-md bg-blue px-2 text-white shadow-md; - } - - .then-else { - @apply mb-2 mt-3 p-4; - } - + (toDelete = undefined)} +/> diff --git a/ui/user/src/lib/components/tasks/Steps.svelte b/ui/user/src/lib/components/tasks/Steps.svelte index 9ea5f635..e643829a 100644 --- a/ui/user/src/lib/components/tasks/Steps.svelte +++ b/ui/user/src/lib/components/tasks/Steps.svelte @@ -1,100 +1,149 @@ -{#if editMode} -
    -

    Test Input

    - -
    -{/if} +

    Steps

    + {#if editMode} - +
    + +
    {/if}
      - {#if steps.length > 0} - {#key steps[0].id} - + {#if task.steps.length > 0} + {#key task.steps[0].id} + {/key} {/if}
    diff --git a/ui/user/src/lib/components/tasks/Table.svelte b/ui/user/src/lib/components/tasks/Table.svelte index d060bbf3..4cbf8e42 100644 --- a/ui/user/src/lib/components/tasks/Table.svelte +++ b/ui/user/src/lib/components/tasks/Table.svelte @@ -3,13 +3,14 @@ interface Props { header: string[]; + placeholders?: string[]; rows: string[][]; buttons?: Snippet<[string[]]>; onCellBlur?: (value: string, row: number, col: number) => void | Promise; editable?: boolean; } - let { header, rows, buttons, editable, onCellBlur }: Props = $props(); + let { header, placeholders, rows, buttons, editable, onCellBlur }: Props = $props(); {#snippet drawCell(value: string, row: number, col: number)} @@ -17,12 +18,13 @@ { if (e.target instanceof HTMLInputElement) { onCellBlur?.(e.target.value, row, col); } }} + placeholder={placeholders?.[col] ?? ''} onkeydown={(e) => { if (e.key === 'Enter' && e.target instanceof HTMLInputElement) { e.target.blur(); @@ -34,11 +36,11 @@ {/if} {/snippet} - - +
    + {#each header as key} - {/each} @@ -67,16 +69,4 @@
    + {key}
    diff --git a/ui/user/src/lib/components/tasks/Task.svelte b/ui/user/src/lib/components/tasks/Task.svelte index 8a04bac0..29ac37f6 100644 --- a/ui/user/src/lib/components/tasks/Task.svelte +++ b/ui/user/src/lib/components/tasks/Task.svelte @@ -1,13 +1,14 @@ -{#if editMode || selectedTrigger() !== 'onDemand' || Object.keys(task?.onDemand?.params ?? {}).length > 0} +{#if visible}
    - {#if editMode} -

    Trigger

    -
    - -
    - {:else} -

    {options[selectedTrigger()]}

    - {/if} {#if selectedTrigger() === 'schedule'} {#if selectedTrigger() === 'webhook'} -
    - URL +
    +

    Webhook URL

    {webhook} -
    {/if} {#if selectedTrigger() === 'email' && email} -
    - {#if editMode}Email{/if} - Address -
    - {email} - -
    +
    +

    + {#if editMode}Email{/if} + Address +

    + {email}
    {/if} {#if selectedTrigger() === 'onDemand'} diff --git a/ui/user/src/lib/components/tasks/Type.svelte b/ui/user/src/lib/components/tasks/Type.svelte new file mode 100644 index 00000000..b994281e --- /dev/null +++ b/ui/user/src/lib/components/tasks/Type.svelte @@ -0,0 +1,133 @@ + + +
    +
    + {#if editMode} + + {/if} +
    + {#if editMode || !inputVisible} + + {/if} +
    diff --git a/ui/user/src/lib/services/chat/index.ts b/ui/user/src/lib/services/chat/index.ts index 19c9292f..b4f89226 100644 --- a/ui/user/src/lib/services/chat/index.ts +++ b/ui/user/src/lib/services/chat/index.ts @@ -1,7 +1,7 @@ import { baseURL } from './http'; import { buildMessagesFromProgress } from './messages'; -import * as MessageSource from './messagesource'; import * as Operations from './operations'; +import * as MessageSource from './thread.svelte'; export default { progressToMessages: buildMessagesFromProgress, diff --git a/ui/user/src/lib/services/chat/messages.ts b/ui/user/src/lib/services/chat/messages.ts index ae41ec57..957607f9 100644 --- a/ui/user/src/lib/services/chat/messages.ts +++ b/ui/user/src/lib/services/chat/messages.ts @@ -54,6 +54,8 @@ function reformatInputMessage(msg: Message) { } else { msg.message = [input.prompt]; } + } else if (input.prompt === '') { + msg.message = ['']; } if (input.explain) { msg.explain = input.explain; @@ -67,8 +69,8 @@ function reformatInputMessage(msg: Message) { } function reformatWriteMessage(msg: Message, last: boolean) { - msg.icon = 'stock:Pencil'; - msg.done = !last || msg.toolCall; + msg.icon = 'Pencil'; + msg.done = !last || msg.toolCall !== undefined; msg.sourceName = msg.done ? 'Wrote to Workspace' : 'Writing to Workspace'; let content = msg.message.join('').trim(); if (!content.endsWith('"}')) { @@ -92,7 +94,7 @@ function reformatWriteMessage(msg: Message, last: boolean) { } if (last && msg.file?.filename && msg.file?.content) { - setFileContent(msg.file.filename, msg.file.content, msg.toolCall); + setFileContent(msg.file.filename, msg.file.content, msg.toolCall !== undefined); } } @@ -132,6 +134,7 @@ export function buildMessagesFromProgress(progresses: Progress[]): Messages { function toMessages(progresses: Progress[]): Messages { let messages: Message[] = []; let lastRunID: string | undefined; + let parentRunID: string | undefined; let inProgress = false; for (const [i, progress] of progresses.entries()) { @@ -155,6 +158,14 @@ function toMessages(progresses: Progress[]): Messages { continue; } + if (progress.parentRunID) { + if (progress.runID === lastRunID) { + parentRunID = progress.parentRunID; + } else { + parentRunID = undefined; + } + } + if (progress.runComplete) { lastRunID = progress.runID; inProgress = false; @@ -180,7 +191,7 @@ function toMessages(progresses: Progress[]): Messages { } else if (progress.input) { // delete the current runID, this is to avoid duplicate messages messages = messages.filter((m) => m.runID !== progress.runID); - messages.push(newInputMessage(progress)); + messages.push(newInputMessage(progress, parentRunID)); } else if (progress.content) { const found = messages.findLast( (m) => m.contentID === progress.contentID && progress.contentID @@ -220,9 +231,10 @@ function toMessages(progresses: Progress[]): Messages { }; } -function newInputMessage(progress: Progress): Message { +function newInputMessage(progress: Progress, parentRunID?: string): Message { return { runID: progress.runID || '', + parentRunID: parentRunID, time: new Date(progress.time), icon: profileIcon, sourceName: 'You', @@ -287,7 +299,7 @@ function newContentMessage(progress: Progress): Message { result.icon = progress.toolCall.metadata.icon; } result.message = progress.toolCall.input ? [progress.toolCall.input] : []; - result.toolCall = true; + result.toolCall = progress.toolCall; result.tool = true; } diff --git a/ui/user/src/lib/services/chat/messagesource.ts b/ui/user/src/lib/services/chat/messagesource.ts deleted file mode 100644 index 0bffffe1..00000000 --- a/ui/user/src/lib/services/chat/messagesource.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { newMessageEventSource } from './operations'; -import type { Progress } from './types'; - -export function newMessageSource( - assistant: string, - onProgress: (message: Progress) => void, - opts?: { - task?: { - id: string; - follow?: boolean; - }; - onError?: (error: Error) => void; - onClose?: () => void; - } -): () => void { - let replayComplete = false; - - const es = newMessageEventSource(assistant, { - task: opts?.task - }); - es.onmessage = handleMessage; - es.onopen = () => { - console.log('Message EventSource opened'); - }; - es.addEventListener('close', () => { - console.log('Message EventSource closed'); - opts?.onClose?.(); - es.close(); - }); - es.onerror = (e: Event) => { - if (e.eventPhase === EventSource.CLOSED) { - console.log('Message EventSource closed'); - } - }; - - function handleMessage(event: MessageEvent) { - const progress = JSON.parse(event.data) as Progress; - if (progress.replayComplete) { - replayComplete = true; - } - if (progress.error) { - if (progress.error.includes('abort')) { - onProgress(progress); - } else if (replayComplete && opts?.onError) { - opts.onError(new Error(progress.error)); - } - } else { - onProgress(progress); - } - } - - return () => { - if (es.readyState !== EventSource.CLOSED) { - es.close(); - } - }; -} diff --git a/ui/user/src/lib/services/chat/operations.ts b/ui/user/src/lib/services/chat/operations.ts index 2a01f4e3..b5f85358 100644 --- a/ui/user/src/lib/services/chat/operations.ts +++ b/ui/user/src/lib/services/chat/operations.ts @@ -12,7 +12,9 @@ import { type TaskList, type TaskRun, type TaskRunList, - type Version + type Version, + type TableList, + type Rows } from './types'; export async function getProfile(): Promise { @@ -105,7 +107,19 @@ export async function invoke(assistant: string, msg: string | InvokeInput) { await doPost(`/assistants/${assistant}/invoke`, msg); } -export async function abort(assistant: string) { +export async function abort( + assistant: string, + opts?: { + taskID?: string; + runID?: string; + } +) { + if (opts?.taskID && opts?.runID) { + return await doPost( + `/assistants/${assistant}/tasks/${opts.taskID}/runs/${opts.runID}/abort`, + {} + ); + } await doPost(`/assistants/${assistant}/abort`, {}); } @@ -160,13 +174,15 @@ export function newMessageEventSource( id: string; follow?: boolean; }; + runID?: string; } ): EventSource { if (opts?.task) { - return new EventSource( - baseURL + - `/assistants/${assistant}/tasks/${opts.task.id}/events${opts.task.follow ? '?follow=true' : ''}` - ); + let url = `/assistants/${assistant}/tasks/${opts.task.id}/events`; + if (opts.runID) { + url = `/assistants/${assistant}/tasks/${opts.task.id}/runs/${opts.runID}/events`; + } + return new EventSource(baseURL + `${url}${opts.task.follow ? '?follow=true' : ''}`); } return new EventSource(baseURL + `/assistants/${assistant}/events`); } @@ -196,6 +212,14 @@ export async function getTask(assistant: string, id: string): Promise { return (await doGet(`/assistants/${assistant}/tasks/${id}`)) as Task; } +export async function getTaskRun( + assistant: string, + taskID: string, + runID: string +): Promise { + return (await doGet(`/assistants/${assistant}/tasks/${taskID}/runs/${runID}`)) as TaskRun; +} + export async function deleteTaskRun(assistant: string, id: string, runID: string) { return doDelete(`/assistants/${assistant}/tasks/${id}/runs/${runID}`); } @@ -207,3 +231,11 @@ export async function listTaskRuns(assistant: string, id: string): Promise void) | undefined; + readonly #es: EventSource; + readonly #progresses: Progress[] = []; + + constructor( + assistant: string, + opts?: { + task?: { + id: string; + follow?: boolean; + }; + runID?: string; + onError?: (error: Error) => void; + onClose?: () => void; + } + ) { + const es = newMessageEventSource(assistant, { + task: opts?.task, + runID: opts?.runID + }); + es.onmessage = (e) => { + this.handleMessage(e); + }; + es.onopen = () => { + console.log('Message EventSource opened'); + }; + es.addEventListener('close', () => { + console.log('Message EventSource closed'); + opts?.onClose?.(); + es.close(); + }); + es.onerror = (e: Event) => { + if (e.eventPhase === EventSource.CLOSED) { + console.log('Message EventSource closed'); + } + }; + + this.#assistant = assistant; + this.#es = es; + this.#onError = opts?.onError; + } + + async abort() { + try { + await ChatAbort(this.#assistant); + } finally { + this.pending = false; + } + } + + async invoke(input: InvokeInput | string) { + this.pending = true; + await ChatInvoke(this.#assistant, input); + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + onMessages(m: Messages) {} + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + onStepMessages(stepID: string, m: Messages) {} + + #handleSteps() { + const newMessages = new Map(); + let stepID: string | undefined; + for (const progress of this.#progresses) { + if (progress.step?.id) { + stepID = progress.step?.id.split('{')[0]; + newMessages.delete(stepID); + } + if (stepID) { + if (!newMessages.has(stepID)) { + newMessages.set(stepID, []); + } + newMessages.get(stepID)?.push(progress); + } + } + + for (const [stepID, msgs] of newMessages) { + this.onStepMessages(stepID, buildMessagesFromProgress(msgs)); + } + } + + #onProgress(progress: Progress) { + this.#progresses.push(progress); + if (this.replayComplete) { + this.onMessages(buildMessagesFromProgress(this.#progresses)); + this.#handleSteps(); + } + } + + handleMessage(event: MessageEvent) { + const progress = JSON.parse(event.data) as Progress; + if (progress.replayComplete) { + this.replayComplete = true; + } + if (progress.error) { + if (progress.error.includes('abort')) { + this.#onProgress(progress); + } else if (this.replayComplete && this.#onError) { + this.#onError(new Error(progress.error)); + } + } else { + this.#onProgress(progress); + } + this.pending = false; + } + + async runTask( + taskID: string, + opts?: { + stepID?: string; + input?: string | object; + } + ): Promise { + this.pending = true; + return await ChatRunTask(this.#assistant, taskID, opts); + } + + close() { + if (this.#es.readyState !== EventSource.CLOSED) { + this.#es.close(); + } + } +} diff --git a/ui/user/src/lib/services/chat/types.ts b/ui/user/src/lib/services/chat/types.ts index b7c2dfe8..3deced5d 100644 --- a/ui/user/src/lib/services/chat/types.ts +++ b/ui/user/src/lib/services/chat/types.ts @@ -1,5 +1,6 @@ export interface Progress { runID?: string; + parentRunID?: string; time: string; content: string; contentID?: string; @@ -50,6 +51,7 @@ type ToolCall = { name?: string; description?: string; input?: string; + output?: string; metadata?: { [key: string]: string }; }; @@ -63,12 +65,13 @@ type WorkflowCall = { export interface Message { runID: string; + parentRunID?: string; time?: Date; sent?: boolean; aborted?: boolean; icon?: string; tool?: boolean; - toolCall?: boolean; + toolCall?: ToolCall; toolInput?: boolean; sourceName: string; sourceDescription?: string; @@ -195,13 +198,6 @@ export interface CredentialList { export interface TaskStep { id: string; step?: string; - if?: TaskIfStep; -} - -export interface TaskIfStep { - condition: string; - steps?: TaskStep[]; - else?: TaskStep[]; } export interface Task { @@ -239,8 +235,22 @@ export interface TaskRun { task: Task; startTime?: string; endTime?: string; + input?: string; } export interface TaskRunList { items: TaskRun[]; } + +export interface TableList { + tables: Table[]; +} + +export interface Table { + name: string; +} + +export interface Rows { + columns: string[]; + rows: Record[]; +} diff --git a/ui/user/src/lib/services/editor/index.svelte.ts b/ui/user/src/lib/services/editor/index.svelte.ts index cb711211..447a1459 100644 --- a/ui/user/src/lib/services/editor/index.svelte.ts +++ b/ui/user/src/lib/services/editor/index.svelte.ts @@ -1,14 +1,13 @@ import items, { type EditorItem } from '$lib/stores/editor.svelte'; import tasks from '$lib/stores/tasks.svelte'; import ChatService from '../chat'; -import { get, type Writable, writable } from 'svelte/store'; +import { type Writable, writable } from 'svelte/store'; const visible = writable(false); const maxSize = writable(false); const editor: Editor = { remove, - init, load, select, items, @@ -20,45 +19,52 @@ export interface Editor { load: (assistant: string, id: string) => Promise; remove: (name: string) => void; select: (name: string) => void; - init: (assistant: string) => Promise; items: EditorItem[]; visible: Writable; maxSize: Writable; } -async function init(assistant: string) { - let currentID = window.location.href.split('#editor:')[1]; - const maxSize = currentID?.search(',maxSize'); - currentID = currentID?.split(',maxSize')[0]; - if (maxSize > 0) { - editor.maxSize.set(true); - } - if (currentID && assistant) { - return load(assistant, currentID); - } -} - function hasItem(id: string): boolean { const item = items?.find((item) => item.id === id); return item !== undefined; } async function load(assistant: string, id: string) { + if (hasItem(id)) { + select(id); + visible.set(true); + return; + } if (id.startsWith('w1')) { await loadTask(assistant, id); visible.set(true); return; } + if (id.startsWith('table://')) { + await loadTable(id); + visible.set(true); + return; + } await loadFile(assistant, id); visible.set(true); } -async function loadTask(assistant: string, taskID: string) { - if (hasItem(taskID)) { - select(taskID); - return; - } +async function loadTable(id: string) { + const tableName = id.split('table://')[1]; + const targetFile: EditorItem = { + id: id, + name: tableName, + contents: '', + buffer: '', + modified: false, + selected: true, + table: tableName + }; + items.push(targetFile); + select(id); +} +async function loadTask(assistant: string, taskID: string) { try { let task = tasks.items.get(taskID); if (!task) { @@ -82,11 +88,6 @@ async function loadTask(assistant: string, taskID: string) { } async function loadFile(assistant: string, file: string) { - if (hasItem(file)) { - select(file); - return; - } - try { const contents = await ChatService.getFile(assistant, file); const targetFile = { @@ -114,13 +115,6 @@ function select(id: string) { if (item.id === id) { item.selected = true; matched = true; - if (typeof window !== 'undefined') { - if (get(maxSize)) { - window.location.href = `#editor:${item.id},maxSize`; - } else { - window.location.href = `#editor:${item.id}`; - } - } } else { item.selected = false; } @@ -134,15 +128,13 @@ function select(id: string) { function remove(id: string) { for (let i = 0; i < items.length; i++) { if (items[i].id === id) { - if (items[i].selected) { - if (i > 0) { - select(items[i - 1].id); - } else if (items.length > 1) { - select(items[i + 1].id); - } - items.splice(i, 1); - break; + if (i > 0) { + select(items[i - 1].id); + } else if (items.length > 1) { + select(items[i + 1].id); } + items.splice(i, 1); + break; } } diff --git a/ui/user/src/lib/stores/assistants.ts b/ui/user/src/lib/stores/assistants.ts index 290c9ed3..0a26d6f2 100644 --- a/ui/user/src/lib/stores/assistants.ts +++ b/ui/user/src/lib/stores/assistants.ts @@ -1,30 +1,13 @@ -import { page } from '$app/stores'; import { listAssistants } from '$lib/services/chat/operations'; import { type Assistant } from '$lib/services/chat/types'; -import { storeWithInit } from '$lib/stores/storeinit'; import { writable } from 'svelte/store'; -function assignSelected(assistants: Assistant[], selectedName: string): Assistant[] { - const result: Assistant[] = []; +const store = writable([]); - for (const assistant of assistants) { - assistant.current = selectedName !== '' && assistant.id === selectedName; - result.push(assistant); - } - - return result; +if (typeof window !== 'undefined') { + listAssistants().then((assistants) => { + store.set(assistants.items); + }); } -const store = writable(assignSelected([], '')); - -export default storeWithInit(store, async () => { - page.subscribe(async (value) => { - const selectedName = value.params?.agent ?? ''; - try { - const assistants = await listAssistants(); - store.set(assignSelected(assistants.items, selectedName)); - } catch { - // just ignore - } - }); -}); +export default store; diff --git a/ui/user/src/lib/stores/currentassistant.ts b/ui/user/src/lib/stores/currentassistant.ts index 6ac4945b..ffeca546 100644 --- a/ui/user/src/lib/stores/currentassistant.ts +++ b/ui/user/src/lib/stores/currentassistant.ts @@ -1,7 +1,7 @@ +import { page } from '$app/stores'; import type { Assistant } from '$lib/services'; import assistants from './assistants'; -import { storeWithInit } from './storeinit'; -import { writable } from 'svelte/store'; +import { get, writable } from 'svelte/store'; const def: Assistant = { id: '', @@ -11,14 +11,34 @@ const def: Assistant = { const store = writable(def); -export default storeWithInit(store, async () => { - assistants.subscribe(async (assistants) => { - for (const assistant of assistants) { - if (assistant.current) { - store.set(assistant); - return; - } +function assignSelected(currentAssistants: Assistant[], selectedName: string): Assistant { + let changed = false; + for (let i = 0; i < currentAssistants.length; i++) { + const assistant = currentAssistants[i]; + const isCurrent = selectedName !== '' && assistant.id === selectedName; + if (assistant.current != isCurrent) { + assistant.current = isCurrent; + changed = true; } - store.set(def); - }); -}); + } + if (changed) { + assistants.set(currentAssistants); + } + return currentAssistants.find((value) => value.current) ?? def; +} + +function init() { + const p = get(page); + const a = get(assistants); + if (p && a.length > 0) { + const selectedName = p.params?.agent ?? ''; + store.set(assignSelected(a, selectedName)); + } +} + +if (typeof window !== 'undefined') { + page.subscribe(init); + assistants.subscribe(init); +} + +export default store; diff --git a/ui/user/src/lib/stores/editor.svelte.ts b/ui/user/src/lib/stores/editor.svelte.ts index 1124cc6e..5658fdd0 100644 --- a/ui/user/src/lib/stores/editor.svelte.ts +++ b/ui/user/src/lib/stores/editor.svelte.ts @@ -8,7 +8,9 @@ export interface EditorItem { modified?: boolean; selected?: boolean; task?: Task; + table?: string; } const items = $state([]); + export default items; diff --git a/ui/user/src/lib/stores/index.ts b/ui/user/src/lib/stores/index.ts index cd09fd61..9046199b 100644 --- a/ui/user/src/lib/stores/index.ts +++ b/ui/user/src/lib/stores/index.ts @@ -4,11 +4,9 @@ export { default as theme } from './theme'; export { default as profile } from './profile'; export { default as errors } from './errors'; export { default as assistants } from './assistants'; -export { default as currentAssistant } from './currentassistant'; +export { default as currentAssistant } from './currentassistant.js'; export { default as tools } from './tools'; export { default as tasks } from './tasks.svelte'; export { default as files } from './files'; export { default as knowledgeFiles } from './knowledgefiles'; export { default as editor } from './editor.svelte'; -export { createStepMessages } from './stepmessages.svelte'; -export type { StepMessages } from './stepmessages.svelte'; diff --git a/ui/user/src/lib/stores/preferredtheme.ts b/ui/user/src/lib/stores/preferredtheme.ts index e1a09b4f..882bf1b3 100644 --- a/ui/user/src/lib/stores/preferredtheme.ts +++ b/ui/user/src/lib/stores/preferredtheme.ts @@ -1,11 +1,10 @@ -import { storeWithInit } from './storeinit'; import { writable } from 'svelte/store'; type PreferredTheme = 'light' | 'dark'; -const store = storeWithInit(writable('light' as PreferredTheme), init); +const store = writable('light' as PreferredTheme); -function init() { +if (typeof window !== 'undefined') { const mm = window.matchMedia('(prefers-color-scheme: dark)'); mm.addEventListener('change', (e) => { store.set(e.matches ? 'dark' : 'light'); @@ -15,6 +14,5 @@ function init() { // mask writable as readable export default { - subscribe: store.subscribe, - init + subscribe: store.subscribe }; diff --git a/ui/user/src/lib/stores/profile.ts b/ui/user/src/lib/stores/profile.ts index b0fe7a46..aaf59376 100644 --- a/ui/user/src/lib/stores/profile.ts +++ b/ui/user/src/lib/stores/profile.ts @@ -1,6 +1,5 @@ import { getProfile } from '$lib/services/chat/operations'; import { type Profile } from '$lib/services/chat/types'; -import { storeWithInit } from './storeinit'; import { writable } from 'svelte/store'; const store = writable({ @@ -25,4 +24,11 @@ async function init() { } } } -export default storeWithInit(store, init); + +if (typeof window !== 'undefined') { + init().then(() => console.log('Profile initialized')); +} + +export default { + subscribe: store.subscribe +}; diff --git a/ui/user/src/lib/stores/stepmessages.svelte.ts b/ui/user/src/lib/stores/stepmessages.svelte.ts deleted file mode 100644 index 3076d0ad..00000000 --- a/ui/user/src/lib/stores/stepmessages.svelte.ts +++ /dev/null @@ -1,60 +0,0 @@ -import type { Messages, Progress } from '$lib/services'; -import { buildMessagesFromProgress } from '$lib/services/chat/messages'; -import { newMessageSource } from '$lib/services/chat/messagesource'; -import { SvelteMap } from 'svelte/reactivity'; - -export interface StepMessages { - messages: Map; - close: () => void; -} - -export function createStepMessages( - assistant: string, - opts?: { - task?: { - id: string; - follow?: boolean; - }; - onClose?: () => void; - } -): StepMessages { - const progresses: Progress[] = []; - const result: Map = new SvelteMap(); - const close = newMessageSource(assistant, append, { - task: opts?.task, - onClose: () => { - close?.(); - opts?.onClose?.(); - } - }); - - function append(p: Progress) { - progresses.push(p); - const newMessages = new Map(); - let stepID: string | undefined; - for (const progress of progresses) { - if (progress.input) { - continue; - } - if (progress.step?.id) { - stepID = progress.step?.id.split('{')[0]; - newMessages.delete(stepID); - } - if (stepID) { - if (!newMessages.has(stepID)) { - newMessages.set(stepID, []); - } - newMessages.get(stepID)?.push(progress); - } - } - - for (const [stepID, msgs] of newMessages) { - result.set(stepID, buildMessagesFromProgress(msgs)); - } - } - - return { - messages: result, - close - }; -} diff --git a/ui/user/src/lib/stores/storeinit.ts b/ui/user/src/lib/stores/storeinit.ts deleted file mode 100644 index eb970e99..00000000 --- a/ui/user/src/lib/stores/storeinit.ts +++ /dev/null @@ -1,61 +0,0 @@ -import errorStore from './errors'; -import { type Readable, type Subscriber, type Unsubscriber, type Writable } from 'svelte/store'; - -function toError(error: unknown): Error { - if (error instanceof Error) { - return error; - } - return new Error(String(error)); -} - -export type Initializer = () => void | Promise; -export type ErrorCallback = (e: Error) => void; - -export function storeWithInit | Writable>( - target: T, - init: Initializer, - errorCallback?: ErrorCallback -): T { - let initialized = false; - let inflight = false; - - function handleError(e: unknown) { - if (errorCallback) { - errorCallback(toError(e)); - } - errorStore.append(toError(e)); - } - - function initialize(): void { - if (initialized || inflight || typeof window === 'undefined') { - return; - } - - try { - const promise = init(); - if (promise instanceof Promise) { - inflight = true; - promise - .then(() => { - initialized = true; - }) - .catch(handleError) - .finally(() => { - inflight = false; - }); - } else { - initialized = true; - } - } catch (e) { - handleError(e); - } - } - - return { - ...target, - subscribe(run: Subscriber, invalidate?: () => void): Unsubscriber { - initialize(); - return target.subscribe(run, invalidate); - } - }; -} diff --git a/ui/user/src/lib/stores/tasks.svelte.ts b/ui/user/src/lib/stores/tasks.svelte.ts index cb0fffe8..bc8711fc 100644 --- a/ui/user/src/lib/stores/tasks.svelte.ts +++ b/ui/user/src/lib/stores/tasks.svelte.ts @@ -27,13 +27,14 @@ async function remove(id: string) { items.delete(id); } -async function update(task: Task) { +async function update(task: Task): Promise { const assistantID = get(currentAssistant)?.id; if (!assistantID) { - return; + return task; } const newTask = await saveTask(assistantID, task); items.set(newTask.id, newTask); + return newTask; } async function create(): Promise { @@ -53,7 +54,7 @@ async function create(): Promise { export interface TaskStore { items: Map; reload: () => Promise; - update: (task: Task) => Promise; + update: (task: Task) => Promise; remove: (id: string) => Promise; create: () => Promise; } diff --git a/ui/user/src/lib/stores/theme.ts b/ui/user/src/lib/stores/theme.ts index a5212a41..d4ad6e96 100644 --- a/ui/user/src/lib/stores/theme.ts +++ b/ui/user/src/lib/stores/theme.ts @@ -1,4 +1,3 @@ -import { storeWithInit } from '$lib/stores/storeinit'; import { writable } from 'svelte/store'; export type Theme = 'light' | 'dark' | 'system'; @@ -21,7 +20,8 @@ function init() { }); } -export default { - ...storeWithInit(store, init), - init -}; +if (typeof window !== 'undefined') { + init(); +} + +export default store; diff --git a/ui/user/src/lib/stores/tools.ts b/ui/user/src/lib/stores/tools.ts index 65496ae8..f73d9e63 100644 --- a/ui/user/src/lib/stores/tools.ts +++ b/ui/user/src/lib/stores/tools.ts @@ -1,7 +1,6 @@ import { listTools } from '$lib/services/chat/operations'; import { type AssistantToolList } from '$lib/services/chat/types'; import assistants from '$lib/stores/assistants'; -import { storeWithInit } from '$lib/stores/storeinit'; import { writable } from 'svelte/store'; const store = writable({ @@ -9,13 +8,21 @@ const store = writable({ items: [] }); -export default storeWithInit(store, async () => { +let initialized = false; + +if (typeof window !== 'undefined') { assistants.subscribe(async (assistants) => { + if (initialized) { + return; + } for (const assistant of assistants) { if (assistant.current && assistant.id) { store.set(await listTools(assistant.id)); + initialized = true; break; } } }); -}); +} + +export default store; diff --git a/ui/user/src/lib/time.ts b/ui/user/src/lib/time.ts new file mode 100644 index 00000000..90609bcf --- /dev/null +++ b/ui/user/src/lib/time.ts @@ -0,0 +1,22 @@ +export function formatTime(time: Date | string) { + const now = new Date(); + if (typeof time === 'string') { + time = new Date(time); + } + if ( + time.getDate() == now.getDate() && + time.getMonth() == now.getMonth() && + time.getFullYear() == now.getFullYear() + ) { + return time.toLocaleTimeString(undefined, { + hour: 'numeric', + minute: 'numeric' + }); + } + return time.toLocaleDateString(undefined, { + month: 'short', + day: 'numeric', + hour: 'numeric', + minute: 'numeric' + }); +} diff --git a/ui/user/src/routes/[agent]/+page.svelte b/ui/user/src/routes/[agent]/+page.svelte index df368508..8f4f1103 100644 --- a/ui/user/src/routes/[agent]/+page.svelte +++ b/ui/user/src/routes/[agent]/+page.svelte @@ -1,33 +1,16 @@