diff --git a/apiclient/invoke.go b/apiclient/invoke.go index 689a77ca..3c9c1b1b 100644 --- a/apiclient/invoke.go +++ b/apiclient/invoke.go @@ -35,7 +35,10 @@ func (c *Client) Invoke(ctx context.Context, agentID string, input string, opts if opts.Async { _, _ = io.Copy(io.Discard, resp.Body) resp.Body.Close() + events := make(chan types.Progress) + close(events) return &types.InvokeResponse{ + Events: events, ThreadID: resp.Header.Get("X-Otto-Thread-Id"), }, nil } diff --git a/apiclient/types/agent.go b/apiclient/types/agent.go index e8d5d6dd..256c82be 100644 --- a/apiclient/types/agent.go +++ b/apiclient/types/agent.go @@ -43,6 +43,7 @@ type AgentManifest struct { MaxThreadTools int `json:"maxThreadTools"` Params map[string]string `json:"params"` Model string `json:"model"` + Env []EnvVar `json:"env"` } func (m AgentManifest) GetParams() *openapi3.Schema { diff --git a/apiclient/types/cronjob.go b/apiclient/types/cronjob.go index 0aafdbd6..ebedb6d9 100644 --- a/apiclient/types/cronjob.go +++ b/apiclient/types/cronjob.go @@ -13,7 +13,6 @@ type CronJobManifest struct { Schedule string `json:"schedule,omitempty"` Workflow string `json:"workflow,omitempty"` Input string `json:"input,omitempty"` - UserID string `json:"userID,omitempty"` TaskSchedule *Schedule `json:"taskSchedule,omitempty"` } diff --git a/apiclient/types/emailreceiver.go b/apiclient/types/emailreceiver.go index 366e2f63..b709b298 100644 --- a/apiclient/types/emailreceiver.go +++ b/apiclient/types/emailreceiver.go @@ -11,7 +11,6 @@ type EmailReceiverManifest struct { Name string `json:"name"` Description string `json:"description"` User string `json:"user,omitempty"` - UserID string `json:"userID,omitempty"` Workflow string `json:"workflow"` AllowedSenders []string `json:"allowedSenders,omitempty"` } diff --git a/apiclient/types/invoke.go b/apiclient/types/invoke.go index ac7a8be6..ba5136d7 100644 --- a/apiclient/types/invoke.go +++ b/apiclient/types/invoke.go @@ -18,6 +18,9 @@ type Progress struct { // If RunID is not populated, the event will not specify tied to any particular run RunID string `json:"runID,omitempty"` + // ParentRunID is the parent run of the run that is specified in the RunID field + ParentRunID string `json:"parentRunID,omitempty"` + // Time is the time the event was generated Time *Time `json:"time,omitempty"` @@ -92,6 +95,7 @@ type ToolCall struct { Name string `json:"name,omitempty"` Description string `json:"description,omitempty"` Input string `json:"input,omitempty"` + Output string `json:"output,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` } diff --git a/apiclient/types/tables.go b/apiclient/types/tables.go new file mode 100644 index 00000000..a2648587 --- /dev/null +++ b/apiclient/types/tables.go @@ -0,0 +1,20 @@ +package types + +type Table struct { + Name string `json:"name"` +} + +type TableList List[Table] + +// +k8s:deepcopy-gen=false + +// +k8s:openapi-gen=false +type TableRow struct { + Columns []string `json:"columns,omitempty"` + Values map[string]any `json:"values"` +} + +// +k8s:deepcopy-gen=false + +// +k8s:openapi-gen=false +type TableRowList List[TableRow] diff --git a/apiclient/types/tasks.go b/apiclient/types/tasks.go index e9f49434..135c8006 100644 --- a/apiclient/types/tasks.go +++ b/apiclient/types/tasks.go @@ -52,6 +52,7 @@ type TaskIf struct { type TaskRun struct { Metadata TaskID string `json:"taskID,omitempty"` + Input string `json:"input,omitempty"` Task TaskManifest `json:"task,omitempty"` StartTime *Time `json:"startTime,omitempty"` EndTime *Time `json:"endTime,omitempty"` diff --git a/apiclient/types/thread.go b/apiclient/types/thread.go index e4db8594..f9bba3ba 100644 --- a/apiclient/types/thread.go +++ b/apiclient/types/thread.go @@ -22,17 +22,18 @@ func (in WorkflowState) IsTerminal() bool { type Thread struct { Metadata ThreadManifest - AgentID string `json:"agentID,omitempty"` - WorkflowID string `json:"workflowID,omitempty"` - WebhookID string `json:"webhookID,omitempty"` - EmailReceiverID string `json:"emailReceiverID,omitempty"` - State string `json:"state,omitempty"` - LastRunID string `json:"lastRunID,omitempty"` - CurrentRunID string `json:"currentRunID,omitempty"` - ParentThreadID string `json:"parentThreadID,omitempty"` - UserID string `json:"userID,omitempty"` - AgentAlias string `json:"agentAlias,omitempty"` - Abort bool `json:"abort,omitempty"` + AgentID string `json:"agentID,omitempty"` + WorkflowID string `json:"workflowID,omitempty"` + WebhookID string `json:"webhookID,omitempty"` + EmailReceiverID string `json:"emailReceiverID,omitempty"` + State string `json:"state,omitempty"` + LastRunID string `json:"lastRunID,omitempty"` + CurrentRunID string `json:"currentRunID,omitempty"` + ParentThreadID string `json:"parentThreadID,omitempty"` + UserID string `json:"userID,omitempty"` + AgentAlias string `json:"agentAlias,omitempty"` + Abort bool `json:"abort,omitempty"` + Env []string `json:"env,omitempty"` } type ThreadList List[Thread] diff --git a/apiclient/types/webhook.go b/apiclient/types/webhook.go index 4fd07880..abf95b27 100644 --- a/apiclient/types/webhook.go +++ b/apiclient/types/webhook.go @@ -16,7 +16,6 @@ type WebhookManifest struct { Headers []string `json:"headers"` Secret string `json:"secret"` ValidationHeader string `json:"validationHeader"` - UserID string `json:"userID,omitempty"` } type WebhookList List[Webhook] diff --git a/apiclient/types/workflow.go b/apiclient/types/workflow.go index 3fe0dbfe..a351101c 100644 --- a/apiclient/types/workflow.go +++ b/apiclient/types/workflow.go @@ -14,13 +14,11 @@ type WorkflowList List[Workflow] type WorkflowManifest struct { AgentManifest `json:",inline"` - Credentials []string `json:"credentials"` - Env []WorkflowEnv `json:"env"` - Steps []Step `json:"steps"` - Output string `json:"output"` + Steps []Step `json:"steps"` + Output string `json:"output"` } -type WorkflowEnv struct { +type EnvVar struct { Name string `json:"name"` Value string `json:"value"` Description string `json:"description"` @@ -86,15 +84,15 @@ func (s Step) Display() string { } if s.While != nil { preamble.WriteString(" while ") - preamble.WriteString(oneline(s.While.Condition)) + preamble.WriteString(oneLine(s.While.Condition)) } if s.If != nil { preamble.WriteString(" if ") - preamble.WriteString(oneline(s.If.Condition)) + preamble.WriteString(oneLine(s.If.Condition)) } if s.Step != "" { preamble.WriteString(" ") - preamble.WriteString(oneline(s.Step)) + preamble.WriteString(oneLine(s.Step)) } return preamble.String() } @@ -116,7 +114,7 @@ type While struct { Steps []Step `json:"steps,omitempty"` } -func oneline(s string) string { +func oneLine(s string) string { l := strings.Split(s, "\n")[0] if len(l) > 80 { return l[:80] + "..." @@ -124,97 +122,6 @@ func oneline(s string) string { return l } -func DeleteStep(manifest *WorkflowManifest, id string) *WorkflowManifest { - if manifest == nil || id == "" { - return nil - } - - result := manifest.DeepCopy() - lookupID, _, _ := strings.Cut(id, "{") - result.Steps = deleteStep(manifest.Steps, lookupID) - return result -} - -func deleteStep(steps []Step, id string) []Step { - newSteps := make([]Step, 0, len(steps)) - for _, step := range steps { - if step.ID != id { - if step.While != nil { - step.While.Steps = deleteStep(step.While.Steps, id) - } - if step.If != nil { - step.If.Steps = deleteStep(step.If.Steps, id) - step.If.Else = deleteStep(step.If.Else, id) - } - newSteps = append(newSteps, step) - } - } - return newSteps -} - -func AppendStep(manifest *WorkflowManifest, parentID string, step Step) *WorkflowManifest { - if manifest == nil { - return nil - } - - parentID, addToElse := strings.CutSuffix(parentID, "::else") - - result := manifest.DeepCopy() - if parentID == "" { - result.Steps = append(result.Steps, step) - return result - } - - lookupID, _, _ := strings.Cut(parentID, "{") - result.Steps = appendStep(result.Steps, lookupID, addToElse, step) - return result -} - -func appendStep(steps []Step, id string, addToElse bool, stepToAdd Step) []Step { - result := make([]Step, 0, len(steps)) - - for _, step := range steps { - if step.ID != id { - if step.If != nil { - step.If.Steps = appendStep(step.If.Steps, id, addToElse, stepToAdd) - step.If.Else = appendStep(step.If.Else, id, addToElse, stepToAdd) - } - if step.While != nil { - step.While.Steps = appendStep(step.While.Steps, id, addToElse, stepToAdd) - } - result = append(result, step) - continue - } - - if step.If != nil { - if addToElse { - step.If.Else = append(step.If.Else, stepToAdd) - } else { - step.If.Steps = append(step.If.Steps, stepToAdd) - } - } else if step.While != nil { - step.While.Steps = append(step.While.Steps, stepToAdd) - } - - result = append(result, step) - } - - return result -} - -func SetStep(manifest *WorkflowManifest, step Step) { - id := step.ID - if manifest == nil || id == "" { - return - } - lookupID, _, _ := strings.Cut(id, "{") - found, _ := findInSteps("", manifest.Steps, lookupID) - if found != nil { - *found = step - } - return -} - func FindStep(manifest *WorkflowManifest, id string) (_ *Step, parentID string) { if manifest == nil || id == "" { return nil, "" diff --git a/apiclient/types/workflowexecution.go b/apiclient/types/workflowexecution.go new file mode 100644 index 00000000..b37b426c --- /dev/null +++ b/apiclient/types/workflowexecution.go @@ -0,0 +1,12 @@ +package types + +type WorkflowExecution struct { + Metadata + Workflow WorkflowManifest `json:"workflow,omitempty"` + StartTime Time `json:"startTime"` + EndTime *Time `json:"endTime"` + Input string `json:"input"` + Error string `json:"error,omitempty"` +} + +type WorkflowExecutionList List[WorkflowExecution] diff --git a/apiclient/types/zz_generated.deepcopy.go b/apiclient/types/zz_generated.deepcopy.go index 9b084b8a..9dcb05b5 100644 --- a/apiclient/types/zz_generated.deepcopy.go +++ b/apiclient/types/zz_generated.deepcopy.go @@ -129,6 +129,11 @@ func (in *AgentManifest) DeepCopyInto(out *AgentManifest) { (*out)[key] = val } } + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]EnvVar, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AgentManifest. @@ -447,6 +452,21 @@ func (in *EmailReceiverManifest) DeepCopy() *EmailReceiverManifest { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EnvVar) DeepCopyInto(out *EnvVar) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EnvVar. +func (in *EnvVar) DeepCopy() *EnvVar { + if in == nil { + return nil + } + out := new(EnvVar) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ErrHTTP) DeepCopyInto(out *ErrHTTP) { *out = *in @@ -1286,6 +1306,41 @@ func (in *SubFlow) DeepCopy() *SubFlow { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Table) DeepCopyInto(out *Table) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Table. +func (in *Table) DeepCopy() *Table { + if in == nil { + return nil + } + out := new(Table) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TableList) DeepCopyInto(out *TableList) { + *out = *in + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Table, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TableList. +func (in *TableList) DeepCopy() *TableList { + if in == nil { + return nil + } + out := new(TableList) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Task) DeepCopyInto(out *Task) { *out = *in @@ -1542,6 +1597,11 @@ func (in *Thread) DeepCopyInto(out *Thread) { *out = *in in.Metadata.DeepCopyInto(&out.Metadata) in.ThreadManifest.DeepCopyInto(&out.ThreadManifest) + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Thread. @@ -1904,16 +1964,45 @@ func (in *WorkflowCall) DeepCopy() *WorkflowCall { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *WorkflowEnv) DeepCopyInto(out *WorkflowEnv) { +func (in *WorkflowExecution) DeepCopyInto(out *WorkflowExecution) { *out = *in + in.Metadata.DeepCopyInto(&out.Metadata) + in.Workflow.DeepCopyInto(&out.Workflow) + in.StartTime.DeepCopyInto(&out.StartTime) + if in.EndTime != nil { + in, out := &in.EndTime, &out.EndTime + *out = (*in).DeepCopy() + } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowEnv. -func (in *WorkflowEnv) DeepCopy() *WorkflowEnv { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowExecution. +func (in *WorkflowExecution) DeepCopy() *WorkflowExecution { if in == nil { return nil } - out := new(WorkflowEnv) + out := new(WorkflowExecution) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WorkflowExecutionList) DeepCopyInto(out *WorkflowExecutionList) { + *out = *in + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]WorkflowExecution, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowExecutionList. +func (in *WorkflowExecutionList) DeepCopy() *WorkflowExecutionList { + if in == nil { + return nil + } + out := new(WorkflowExecutionList) in.DeepCopyInto(out) return out } @@ -1944,16 +2033,6 @@ func (in *WorkflowList) DeepCopy() *WorkflowList { func (in *WorkflowManifest) DeepCopyInto(out *WorkflowManifest) { *out = *in in.AgentManifest.DeepCopyInto(&out.AgentManifest) - if in.Credentials != nil { - in, out := &in.Credentials, &out.Credentials - *out = make([]string, len(*in)) - copy(*out, *in) - } - if in.Env != nil { - in, out := &in.Env, &out.Env - *out = make([]WorkflowEnv, len(*in)) - copy(*out, *in) - } if in.Steps != nil { in, out := &in.Steps, &out.Steps *out = make([]Step, len(*in)) diff --git a/apiclient/workflow.go b/apiclient/workflow.go index e43157c6..e9204ae7 100644 --- a/apiclient/workflow.go +++ b/apiclient/workflow.go @@ -38,8 +38,35 @@ func (c *Client) CreateWorkflow(ctx context.Context, workflow types.WorkflowMani return toObject(resp, &types.Workflow{}) } +type ListWorkflowExecutionsOptions struct { + ThreadID string +} + +func (c *Client) ListWorkflowExecutions(ctx context.Context, workflowID string, opts ListWorkflowExecutionsOptions) (result types.WorkflowExecutionList, err error) { + defer func() { + sort.Slice(result.Items, func(i, j int) bool { + return result.Items[i].Metadata.Created.Time.Before(result.Items[j].Metadata.Created.Time) + }) + }() + + url := fmt.Sprintf("/workflows/%s/executions", workflowID) + if opts.ThreadID != "" { + url = fmt.Sprintf("/threads/%s/workflows/%s/executions", opts.ThreadID, workflowID) + } + + _, resp, err := c.doRequest(ctx, http.MethodGet, url, nil) + if err != nil { + return + } + defer resp.Body.Close() + + _, err = toObject(resp, &result) + return +} + type ListWorkflowsOptions struct { - Alias string + Alias string + ThreadID string } func (c *Client) ListWorkflows(ctx context.Context, opts ListWorkflowsOptions) (result types.WorkflowList, err error) { @@ -49,7 +76,12 @@ func (c *Client) ListWorkflows(ctx context.Context, opts ListWorkflowsOptions) ( }) }() - _, resp, err := c.doRequest(ctx, http.MethodGet, "/workflows", nil) + url := "/workflows" + if opts.ThreadID != "" { + url = fmt.Sprintf("/threads/%s/workflows", opts.ThreadID) + } + + _, resp, err := c.doRequest(ctx, http.MethodGet, url, nil) if err != nil { return } diff --git a/pkg/api/handlers/tables.go b/pkg/api/handlers/tables.go new file mode 100644 index 00000000..09b126af --- /dev/null +++ b/pkg/api/handlers/tables.go @@ -0,0 +1,120 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "regexp" + + "github.com/gptscript-ai/go-gptscript" + "github.com/otto8-ai/otto8/apiclient/types" + "github.com/otto8-ai/otto8/pkg/api" + v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" +) + +type TableHandler struct { + gptScript *gptscript.GPTScript +} + +func NewTableHandler(gptScript *gptscript.GPTScript) *TableHandler { + return &TableHandler{ + gptScript: gptScript, + } +} + +func (t *TableHandler) tables(req api.Context, workspaceID string) (string, error) { + var toolRef v1.ToolReference + if err := req.Get(&toolRef, "database-tables"); err != nil { + return "", err + } + run, err := t.gptScript.Run(req.Context(), toolRef.Status.Reference, gptscript.Options{ + Workspace: workspaceID, + }) + if err != nil { + return "", err + } + defer run.Close() + return run.Text() +} + +func (t *TableHandler) rows(req api.Context, workspaceID, tableName string) (string, error) { + var toolRef v1.ToolReference + if err := req.Get(&toolRef, "database-query"); err != nil { + return "", err + } + input, err := json.Marshal(map[string]string{ + "query": fmt.Sprintf("SELECT * FROM '%s';", tableName), + }) + if err != nil { + return "", err + } + run, err := t.gptScript.Run(req.Context(), toolRef.Status.Reference, gptscript.Options{ + Input: string(input), + Workspace: workspaceID, + }) + if err != nil { + return "", err + } + defer run.Close() + return run.Text() +} + +func (t *TableHandler) ListTables(req api.Context) error { + var ( + assistantID = req.PathValue("assistant_id") + result = types.TableList{ + Items: []types.Table{}, + } + ) + + thread, err := getUserThread(req, assistantID) + if err != nil { + return err + } + + if thread.Status.WorkspaceID == "" { + return req.Write(result) + } + + content, err := t.tables(req, thread.Status.WorkspaceID) + if err != nil { + return err + } + + req.ResponseWriter.Header().Set("Content-Type", "application/json") + _, err = req.ResponseWriter.Write([]byte(content)) + return err +} + +var validTableName = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) + +func (t *TableHandler) GetRows(req api.Context) error { + var ( + assistantID = req.PathValue("assistant_id") + tableName = req.PathValue("table_name") + result = types.TableRowList{ + Items: []types.TableRow{}, + } + ) + + if validTableName.MatchString(tableName) == false { + return types.NewErrBadRequest("invalid table name %s", tableName) + } + + thread, err := getUserThread(req, assistantID) + if err != nil { + return err + } + + if thread.Status.WorkspaceID == "" { + return req.Write(result) + } + + content, err := t.rows(req, thread.Status.WorkspaceID, tableName) + if err != nil { + return err + } + + req.ResponseWriter.Header().Set("Content-Type", "application/json") + _, err = req.ResponseWriter.Write([]byte(content)) + return err +} diff --git a/pkg/api/handlers/tasks.go b/pkg/api/handlers/tasks.go index 6259cb7b..6d5514eb 100644 --- a/pkg/api/handlers/tasks.go +++ b/pkg/api/handlers/tasks.go @@ -35,7 +35,7 @@ func NewTaskHandler(invoker *invoke.Invoker, events *events.Emitter) *TaskHandle func (t *TaskHandler) Abort(req api.Context) error { var taskRunID = req.PathValue("run_id") - workflow, err := t.getTask(req) + workflow, userThread, err := t.getTask(req) if err != nil { return err } @@ -56,6 +56,10 @@ func (t *TaskHandler) Abort(req api.Context) error { return err } + if wfe.Spec.ThreadName != userThread.Name && workflow.Name != wfe.Spec.WorkflowName { + return types.NewErrHttp(http.StatusForbidden, "task run does not belong to the thread") + } + var thread v1.Thread if err := req.Get(&thread, wfe.Status.ThreadName); err != nil { return err @@ -67,7 +71,7 @@ func (t *TaskHandler) Abort(req api.Context) error { func (t *TaskHandler) Events(req api.Context) error { var taskRunID = req.PathValue("run_id") - workflow, err := t.getTask(req) + workflow, thread, err := t.getTask(req) if err != nil { return err } @@ -91,7 +95,7 @@ func (t *TaskHandler) Events(req api.Context) error { return err } - if wfe.Spec.UserID != req.User.GetUID() && workflow.Name != wfe.Spec.WorkflowName { + if wfe.Spec.ThreadName != thread.Name && workflow.Name != wfe.Spec.WorkflowName { return types.NewErrHttp(http.StatusForbidden, "task run does not belong to the user") } @@ -113,8 +117,61 @@ func editorWFE(req api.Context, workflowName string) string { return name.SafeHashConcatName(system.ThreadPrefix, workflowName, req.User.GetUID()) } +func (t *TaskHandler) AbortRun(req api.Context) error { + workflow, _, err := t.getTask(req) + if err != nil { + return err + } + + var ( + wfe v1.WorkflowExecution + runID = req.PathValue("run_id") + ) + + if runID == "editor" { + runID = editorWFE(req, workflow.Name) + } + + if err := req.Get(&wfe, runID); err != nil { + return err + } + + if wfe.Spec.WorkflowName != workflow.Name { + return types.NewErrNotFound("task run not found") + } + + var thread v1.Thread + if err := req.Get(&thread, wfe.Status.ThreadName); err != nil { + return err + } + + return abortThread(req, &thread) +} + +func (t *TaskHandler) GetRun(req api.Context) error { + workflow, _, err := t.getTask(req) + if err != nil { + return err + } + + var ( + wfe v1.WorkflowExecution + runID = req.PathValue("run_id") + ) + if runID == "editor" { + runID = editorWFE(req, workflow.Name) + } + if err := req.Get(&wfe, runID); err != nil { + return err + } + if wfe.Spec.WorkflowName != workflow.Name { + return types.NewErrNotFound("task run not found") + } + return req.Write(convertTaskRun(workflow, &wfe)) +} + func (t *TaskHandler) DeleteRun(req api.Context) error { - workflow, err := t.getTask(req) + workflow, userThread, err := t.getTask(req) if err != nil { return err } @@ -126,7 +183,7 @@ func (t *TaskHandler) DeleteRun(req api.Context) error { return err } - if wfe.Spec.UserID != req.User.GetUID() || wfe.Spec.WorkflowName != workflow.Name { + if wfe.Spec.ThreadName != userThread.Name || wfe.Spec.WorkflowName != workflow.Name { return types.NewErrHttp(http.StatusForbidden, "task run does not belong to the user") } @@ -134,7 +191,7 @@ func (t *TaskHandler) DeleteRun(req api.Context) error { } func (t *TaskHandler) ListRuns(req api.Context) error { - workflow, err := t.getTask(req) + workflow, userThread, err := t.getTask(req) if err != nil { return err } @@ -142,7 +199,7 @@ func (t *TaskHandler) ListRuns(req api.Context) error { var wfeList v1.WorkflowExecutionList if err := req.List(&wfeList, kclient.MatchingFields{ "spec.workflowName": workflow.Name, - "spec.userID": req.User.GetUID(), + "spec.threadName": userThread.Name, }); err != nil { return err } @@ -180,7 +237,7 @@ func (t *TaskHandler) Run(req api.Context) error { input = nil } - workflow, err := t.getTask(req) + workflow, userThread, err := t.getTask(req) if err != nil { return err } @@ -195,7 +252,7 @@ func (t *TaskHandler) Run(req api.Context) error { }, Spec: v1.WorkflowExecutionSpec{ Input: string(input), - UserID: req.User.GetUID(), + ThreadName: userThread.Name, WorkflowName: workflow.Name, }, Status: v1.WorkflowExecutionStatus{}, @@ -206,7 +263,7 @@ func (t *TaskHandler) Run(req api.Context) error { } else { resp, err := t.invoker.Workflow(req.Context(), req.Storage, workflow, string(input), invoke.WorkflowOptions{ WorkflowExecutionName: editorWFE(req, workflow.Name), - UserID: req.User.GetUID(), + OwningThreadName: userThread.Name, StepID: stepID, }) if err != nil { @@ -219,15 +276,22 @@ func (t *TaskHandler) Run(req api.Context) error { } func convertTaskRun(workflow *v1.Workflow, wfe *v1.WorkflowExecution) types.TaskRun { + var endTime *types.Time + if wfe.Status.EndTime != nil { + endTime = types.NewTime(wfe.Status.EndTime.Time) + } return types.TaskRun{ - Metadata: MetadataFrom(wfe), - TaskID: workflow.Name, - Task: convertTask(*workflow, nil).TaskManifest, + Metadata: MetadataFrom(wfe), + TaskID: workflow.Name, + Input: wfe.Spec.Input, + Task: convertTaskManifest(wfe.Status.WorkflowManifest), + StartTime: types.NewTime(wfe.CreationTimestamp.Time), + EndTime: endTime, } } func (t *TaskHandler) Delete(req api.Context) error { - workflow, err := t.getTask(req) + workflow, _, err := t.getTask(req) if err != nil { if apierrors.IsNotFound(err) { return nil @@ -239,12 +303,12 @@ func (t *TaskHandler) Delete(req api.Context) error { } func (t *TaskHandler) Update(req api.Context) error { - workflow, err := t.getTask(req) + workflow, _, err := t.getTask(req) if err != nil { return err } - _, manifest, task, err := t.getAssistantAndManifestFromRequest(req) + _, _, manifest, task, err := t.getAssistantThreadAndManifestFromRequest(req) if err != nil { return err } @@ -345,8 +409,8 @@ func (t *TaskHandler) updateEmail(req api.Context, workflow *v1.Workflow, task t EmailReceiverManifest: types.EmailReceiverManifest{ User: workflow.Spec.Manifest.Alias, Workflow: workflow.Name, - UserID: req.User.GetUID(), }, + ThreadName: workflow.Spec.ThreadName, }, } if err := req.Create(&email); err != nil { @@ -383,8 +447,8 @@ func (t *TaskHandler) updateWebhook(req api.Context, workflow *v1.Workflow, task WebhookManifest: types.WebhookManifest{ Alias: workflow.Spec.Manifest.Alias, Workflow: workflow.Name, - UserID: req.User.GetUID(), }, + ThreadName: workflow.Spec.ThreadName, }, } if err := req.Create(&webhook); err != nil { @@ -420,9 +484,9 @@ func (t *TaskHandler) updateCron(req api.Context, workflow *v1.Workflow, task ty Spec: v1.CronJobSpec{ CronJobManifest: types.CronJobManifest{ Workflow: workflow.Name, - UserID: req.User.GetUID(), TaskSchedule: task.Schedule, }, + ThreadName: workflow.Spec.ThreadName, }, } if err := req.Create(&cron); err != nil { @@ -441,33 +505,49 @@ func (t *TaskHandler) updateCron(req api.Context, workflow *v1.Workflow, task ty return nil } -func (t *TaskHandler) getAssistantAndManifestFromRequest(req api.Context) (*v1.Agent, types.WorkflowManifest, types.TaskManifest, error) { +func (t *TaskHandler) getAssistantThreadAndManifestFromRequest(req api.Context) (*v1.Agent, *v1.Thread, types.WorkflowManifest, types.TaskManifest, error) { assistantID := req.PathValue("assistant_id") assistant, err := getAssistant(req, assistantID) if err != nil { - return nil, types.WorkflowManifest{}, types.TaskManifest{}, err + return nil, nil, types.WorkflowManifest{}, types.TaskManifest{}, err } thread, err := getUserThread(req, assistantID) if err != nil { - return nil, types.WorkflowManifest{}, types.TaskManifest{}, err + return nil, nil, types.WorkflowManifest{}, types.TaskManifest{}, err } var manifest types.TaskManifest if err := req.Read(&manifest); err != nil { - return nil, types.WorkflowManifest{}, types.TaskManifest{}, err + return nil, nil, types.WorkflowManifest{}, types.TaskManifest{}, err } - return assistant, toWorkflowManifest(assistant, thread, manifest), manifest, nil + return assistant, thread, toWorkflowManifest(assistant, thread, manifest), manifest, nil } func (t *TaskHandler) Create(req api.Context) error { - assistant, workflowManifest, taskManifest, err := t.getAssistantAndManifestFromRequest(req) + _, thread, workflowManifest, taskManifest, err := t.getAssistantThreadAndManifestFromRequest(req) if err != nil { return err } + var workspaces v1.WorkspaceList + err = req.List(&workspaces, kclient.MatchingFields{ + "status.workspaceID": thread.Status.WorkspaceID, + }) + if err != nil { + return err + } + + if len(workspaces.Items) == 0 { + return types.NewErrBadRequest("no workspace found for the thread") + } + + if len(workspaces.Items) != 1 { + return types.NewErrBadRequest("multiple workspaces found for the thread") + } + workflowManifest.Alias, err = randomtoken.Generate() if err != nil { return err @@ -480,9 +560,10 @@ func (t *TaskHandler) Create(req api.Context) error { Namespace: req.Namespace(), }, Spec: v1.WorkflowSpec{ - AgentName: assistant.Name, - UserID: req.User.GetUID(), - Manifest: workflowManifest, + ThreadName: thread.Name, + Manifest: workflowManifest, + KnowledgeSetNames: thread.Status.KnowledgeSetNames, + WorkspaceName: workspaces.Items[0].Name, }, } @@ -504,6 +585,11 @@ func toWorkflowManifest(agent *v1.Agent, thread *v1.Thread, manifest types.TaskM AgentManifest: agent.Spec.Manifest, } + workflowManifest.AgentManifest.Env = append(workflowManifest.AgentManifest.Env, types.EnvVar{ + Name: "DATABASE_WORKSPACE_ID", + Value: thread.Status.WorkspaceID, + }) + for _, tool := range thread.Spec.Manifest.Tools { if !slices.Contains(workflowManifest.Tools, tool) { workflowManifest.Tools = append(workflowManifest.Tools, tool) @@ -545,7 +631,7 @@ func toWorkflowIf(ifStep *types.TaskIf) *types.If { } func (t *TaskHandler) Get(req api.Context) error { - task, err := t.getTask(req) + task, _, err := t.getTask(req) if err != nil { return err } @@ -572,35 +658,37 @@ func (t *TaskHandler) Get(req api.Context) error { })) } -func (t *TaskHandler) getTask(req api.Context) (*v1.Workflow, error) { +func (t *TaskHandler) getTask(req api.Context) (*v1.Workflow, *v1.Thread, error) { assistantID := req.PathValue("assistant_id") var workflow v1.Workflow if err := req.Get(&workflow, req.PathValue("id")); err != nil { - return nil, err + return nil, nil, err } - assistant, err := getAssistant(req, assistantID) + thread, err := getUserThread(req, assistantID) if err != nil { - return nil, err + return nil, nil, err } - if workflow.Spec.AgentName != assistant.Name || workflow.Spec.UserID != req.User.GetUID() { - return nil, types.NewErrHttp(http.StatusForbidden, "task does not belong to the user") + if workflow.Spec.ThreadName != thread.Name { + return nil, nil, types.NewErrHttp(http.StatusForbidden, "task does not belong to the thread") } - return &workflow, nil + return &workflow, thread, nil } func (t *TaskHandler) List(req api.Context) error { - assistant, err := getAssistant(req, req.PathValue("assistant_id")) + assistantID := req.PathValue("assistant_id") + + thread, err := getUserThread(req, assistantID) if err != nil { return err } var crons v1.CronJobList if err := req.List(&crons, kclient.MatchingFields{ - "spec.userID": req.User.GetUID(), + "spec.threadName": thread.Name, }); err != nil { return err } @@ -612,7 +700,7 @@ func (t *TaskHandler) List(req api.Context) error { var webhooks v1.WebhookList if err := req.List(&webhooks, kclient.MatchingFields{ - "spec.userID": req.User.GetUID(), + "spec.threadName": thread.Name, }); err != nil { return err } @@ -624,7 +712,7 @@ func (t *TaskHandler) List(req api.Context) error { var emailReceivers v1.EmailReceiverList if err := req.List(&emailReceivers, kclient.MatchingFields{ - "spec.userID": req.User.GetUID(), + "spec.threadName": thread.Name, }); err != nil { return err } @@ -636,8 +724,7 @@ func (t *TaskHandler) List(req api.Context) error { var workflows v1.WorkflowList if err := req.List(&workflows, kclient.MatchingFields{ - "spec.agentName": assistant.Name, - "spec.userID": req.User.GetUID(), + "spec.threadName": thread.Name, }); err != nil { return err } @@ -655,16 +742,23 @@ func (t *TaskHandler) List(req api.Context) error { return req.Write(taskList) } +func convertTaskManifest(manifest *types.WorkflowManifest) types.TaskManifest { + if manifest == nil { + return types.TaskManifest{} + } + return types.TaskManifest{ + Name: manifest.Name, + Description: manifest.Description, + Steps: toTaskSteps(manifest.Steps), + } +} + func convertTask(workflow v1.Workflow, trigger *triggers) types.Task { task := types.Task{ - Metadata: MetadataFrom(&workflow), - TaskManifest: types.TaskManifest{ - Name: workflow.Spec.Manifest.Name, - Description: workflow.Spec.Manifest.Description, - }, - Alias: workflow.Spec.Manifest.Alias, + Metadata: MetadataFrom(&workflow), + TaskManifest: convertTaskManifest(&workflow.Spec.Manifest), + Alias: workflow.Namespace + "/" + workflow.Spec.Manifest.Alias, } - task.Steps = toTaskSteps(workflow.Spec.Manifest.Steps) if trigger != nil && trigger.CronJob != nil && trigger.CronJob.Name != "" { task.Schedule = trigger.CronJob.Spec.TaskSchedule } diff --git a/pkg/api/handlers/threads.go b/pkg/api/handlers/threads.go index 52ffdd5a..83f512d9 100644 --- a/pkg/api/handlers/threads.go +++ b/pkg/api/handlers/threads.go @@ -13,6 +13,7 @@ import ( "github.com/otto8-ai/otto8/pkg/events" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kclient "sigs.k8s.io/controller-runtime/pkg/client" ) const DefaultMaxUserThreadTools = 5 @@ -55,6 +56,7 @@ func convertThread(thread v1.Thread) types.Thread { AgentAlias: thread.Spec.AgentAlias, UserID: thread.Spec.UserUID, Abort: thread.Spec.Abort, + Env: thread.Spec.Env, } } @@ -75,6 +77,71 @@ func (a *ThreadHandler) Abort(req api.Context) error { return req.Write(thread) } +func (a *ThreadHandler) WorkflowExecutions(req api.Context) error { + var ( + id = req.PathValue("id") + workflowID = req.PathValue("workflow_id") + ) + + var workflowExecutions v1.WorkflowExecutionList + if err := req.List(&workflowExecutions, kclient.MatchingFields{ + "spec.threadName": id, + "spec.workflowName": workflowID, + }); err != nil { + return err + } + + var resp types.WorkflowExecutionList + for _, we := range workflowExecutions.Items { + resp.Items = append(resp.Items, convertWorkflowExecution(we)) + } + + return req.Write(resp) +} + +func convertWorkflowExecution(we v1.WorkflowExecution) types.WorkflowExecution { + var w types.WorkflowManifest + if we.Status.WorkflowManifest != nil { + w = *we.Status.WorkflowManifest + } + var endTime *types.Time + if we.Status.EndTime != nil { + endTime = types.NewTime(we.Status.EndTime.Time) + } + return types.WorkflowExecution{ + Metadata: MetadataFrom(&we), + Workflow: w, + Input: we.Spec.Input, + Error: we.Status.Error, + StartTime: *types.NewTime(we.CreationTimestamp.Time), + EndTime: endTime, + } +} + +func (a *ThreadHandler) Workflows(req api.Context) error { + var ( + id = req.PathValue("id") + ) + + var workflows v1.WorkflowList + if err := req.List(&workflows, kclient.MatchingFields{ + "spec.threadName": id, + }); err != nil { + return err + } + + var resp types.WorkflowList + for _, workflow := range workflows.Items { + wf, err := convertWorkflow(workflow, "", req.APIBaseURL) + if err != nil { + return err + } + resp.Items = append(resp.Items, *wf) + } + + return req.Write(resp) +} + func (a *ThreadHandler) Events(req api.Context) error { var ( id = req.PathValue("id") diff --git a/pkg/api/handlers/webhooks.go b/pkg/api/handlers/webhooks.go index 07a5bbfc..2a6cfa65 100644 --- a/pkg/api/handlers/webhooks.go +++ b/pkg/api/handlers/webhooks.go @@ -4,6 +4,7 @@ import ( "crypto/hmac" "crypto/sha256" "encoding/hex" + "encoding/json" "fmt" "net/http" "net/textproto" @@ -233,24 +234,28 @@ func (a *WebhookHandler) Execute(req api.Context) error { } } - var input strings.Builder - _, _ = input.WriteString("You are being called from a webhook.\n\n") - if len(body) > 0 { - _, _ = input.WriteString("Here is the payload of the webhook:\n") - _, _ = input.Write(body) + var input struct { + Type string `json:"type"` + Payload string `json:"payload"` + Headers map[string]string `json:"headers"` } - _, _ = input.WriteString("\nHere are the headers of the webhook:\n") + input.Type = "webhook" + input.Payload = string(body) + input.Headers = make(map[string]string) + allHeaders := slices.Contains(webhook.Spec.Headers, "*") for k := range req.Request.Header { if !allHeaders && !slices.Contains(webhook.Spec.Headers, k) { continue } - input.WriteString("\n") - input.WriteString(k) - input.WriteString(": ") - input.WriteString(req.Request.Header.Get(k)) + input.Headers[k] = req.Request.Header.Get(k) + } + + inputText, err := json.Marshal(input) + if err != nil { + return fmt.Errorf("failed to marshal input: %w", err) } var workflow v1.Workflow @@ -260,15 +265,14 @@ func (a *WebhookHandler) Execute(req api.Context) error { if err = req.Create(&v1.WorkflowExecution{ ObjectMeta: metav1.ObjectMeta{ - // The name here is the sha256 hash of the body to handle multiple executions of the same webhook. - // That is, if the webhook is called twice with the same body, it will only be executed once. - Name: system.WorkflowExecutionPrefix + fmt.Sprintf("%x", sha256.Sum256(body)), - Namespace: req.Namespace(), + GenerateName: system.WorkflowExecutionPrefix, + Namespace: req.Namespace(), }, Spec: v1.WorkflowExecutionSpec{ WorkflowName: workflow.Name, WebhookName: webhook.Name, - Input: input.String(), + ThreadName: webhook.Spec.ThreadName, + Input: string(inputText), }, }); err != nil && !apierrors.IsAlreadyExists(err) { return err diff --git a/pkg/api/handlers/workflows.go b/pkg/api/handlers/workflows.go index 2599e1d1..d7e02c0c 100644 --- a/pkg/api/handlers/workflows.go +++ b/pkg/api/handlers/workflows.go @@ -16,6 +16,7 @@ import ( "github.com/otto8-ai/otto8/pkg/system" "github.com/otto8-ai/otto8/pkg/wait" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kclient "sigs.k8s.io/controller-runtime/pkg/client" ) var log = mvl.Package() @@ -327,6 +328,26 @@ func (a *WorkflowHandler) EnsureCredentialForKnowledgeSource(req api.Context) er return req.WriteCreated(resp) } +func (a *WorkflowHandler) WorkflowExecutions(req api.Context) error { + var ( + id = req.PathValue("id") + ) + + var wfes v1.WorkflowExecutionList + if err := req.List(&wfes, kclient.MatchingFields{ + "spec.workflowName": id, + }); err != nil { + return err + } + + var resp types.WorkflowExecutionList + for _, we := range wfes.Items { + resp.Items = append(resp.Items, convertWorkflowExecution(we)) + } + + return req.Write(resp) +} + func (a *WorkflowHandler) Script(req api.Context) error { var ( id = req.Request.PathValue("id") diff --git a/pkg/api/router/router.go b/pkg/api/router/router.go index e903e8f1..8ab8d20e 100644 --- a/pkg/api/router/router.go +++ b/pkg/api/router/router.go @@ -28,6 +28,7 @@ func Router(services *services.Services) (http.Handler, error) { emailreceiver := handlers.NewEmailReceiverHandler(services.EmailServerName) defaultModelAliases := handlers.NewDefaultModelAliasHandler() version := handlers.NewVersionHandler(services.EmailServerName) + tables := handlers.NewTableHandler(services.GPTClient) // Version mux.HandleFunc("GET /api/version", version.GetVersion) @@ -72,12 +73,18 @@ func Router(services *services.Services) (http.Handler, error) { mux.HandleFunc("DELETE /api/assistants/{assistant_id}/tasks/{id}", tasks.Delete) mux.HandleFunc("POST /api/assistants/{assistant_id}/tasks/{id}/run", tasks.Run) mux.HandleFunc("GET /api/assistants/{assistant_id}/tasks/{id}/runs", tasks.ListRuns) + mux.HandleFunc("GET /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}", tasks.GetRun) + mux.HandleFunc("POST /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}/abort", tasks.AbortRun) mux.HandleFunc("DELETE /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}", tasks.DeleteRun) mux.HandleFunc("GET /api/assistants/{assistant_id}/tasks/{id}/events", tasks.Events) mux.HandleFunc("POST /api/assistants/{assistant_id}/tasks/{id}/events", tasks.Abort) mux.HandleFunc("GET /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}/events", tasks.Events) mux.HandleFunc("POST /api/assistants/{assistant_id}/tasks/{id}/runs/{run_id}/events", tasks.Abort) + // Tables + mux.HandleFunc("GET /api/assistants/{assistant_id}/tables", tables.ListTables) + mux.HandleFunc("GET /api/assistants/{assistant_id}/tables/{table_name}/rows", tables.GetRows) + // Agent files mux.HandleFunc("GET /api/agents/{id}/files", agents.ListFiles) mux.HandleFunc("POST /api/agents/{id}/files/{file}", agents.UploadFile) @@ -104,6 +111,7 @@ func Router(services *services.Services) (http.Handler, error) { // Workflows mux.HandleFunc("GET /api/workflows", workflows.List) mux.HandleFunc("GET /api/workflows/{id}", workflows.ByID) + mux.HandleFunc("GET /api/workflows/{id}/executions", workflows.WorkflowExecutions) mux.HandleFunc("GET /api/workflows/{id}/script", workflows.Script) mux.HandleFunc("GET /api/workflows/{id}/script.gpt", workflows.Script) mux.HandleFunc("GET /api/workflows/{id}/script/tool.gpt", workflows.Script) @@ -146,6 +154,8 @@ func Router(services *services.Services) (http.Handler, error) { mux.HandleFunc("GET /api/threads/{id}", threads.ByID) mux.HandleFunc("POST /api/threads/{id}/abort", threads.Abort) mux.HandleFunc("GET /api/threads/{id}/events", threads.Events) + mux.HandleFunc("GET /api/threads/{id}/workflows", threads.Workflows) + mux.HandleFunc("GET /api/threads/{id}/workflows/{workflow_id}/executions", threads.WorkflowExecutions) mux.HandleFunc("DELETE /api/threads/{id}", threads.Delete) mux.HandleFunc("PUT /api/threads/{id}", threads.Update) mux.HandleFunc("GET /api/agents/{agent}/threads", threads.List) diff --git a/pkg/cli/tools_register.go b/pkg/cli/tools_register.go index e0dce98a..0a4c6970 100644 --- a/pkg/cli/tools_register.go +++ b/pkg/cli/tools_register.go @@ -9,7 +9,7 @@ import ( type ToolRegister struct { root *Otto8 - Quiet bool `usage:"Only print IDs of created tool references:"q"` + Quiet bool `usage:"Only print IDs of created tool references:" short:"q"` } func (l *ToolRegister) Customize(cmd *cobra.Command) { diff --git a/pkg/controller/data/otto.yaml b/pkg/controller/data/otto.yaml index bb7d625e..31c15ea5 100644 --- a/pkg/controller/data/otto.yaml +++ b/pkg/controller/data/otto.yaml @@ -22,6 +22,8 @@ spec: tools: - workspace-files - time + - database + - knowledge defaultThreadTools: - github-bundle - google-docs-bundle diff --git a/pkg/controller/handlers/cronjob/cronjob.go b/pkg/controller/handlers/cronjob/cronjob.go index ca20e668..700fd9b3 100644 --- a/pkg/controller/handlers/cronjob/cronjob.go +++ b/pkg/controller/handlers/cronjob/cronjob.go @@ -53,7 +53,7 @@ func (h *Handler) Run(req router.Request, resp router.Response) error { WorkflowName: workflow.Name, Input: cj.Spec.Input, CronJobName: cj.Name, - UserID: cj.Spec.UserID, + ThreadName: cj.Spec.ThreadName, }, }, ); err != nil { diff --git a/pkg/controller/handlers/workflow/files.go b/pkg/controller/handlers/workflow/files.go index cdbc5e13..e925af84 100644 --- a/pkg/controller/handlers/workflow/files.go +++ b/pkg/controller/handlers/workflow/files.go @@ -17,6 +17,11 @@ func createWorkspace(ctx context.Context, c kclient.Client, workflow *v1.Workflo return nil } + if workflow.Spec.WorkspaceName != "" { + workflow.Status.WorkspaceName = workflow.Spec.WorkspaceName + return nil + } + ws := &v1.Workspace{ ObjectMeta: metav1.ObjectMeta{ Namespace: workflow.Namespace, @@ -40,6 +45,11 @@ func createKnowledgeSet(ctx context.Context, c kclient.Client, workflow *v1.Work return nil } + if len(workflow.Spec.KnowledgeSetNames) > 0 { + workflow.Status.KnowledgeSetNames = workflow.Spec.KnowledgeSetNames + return nil + } + ks := &v1.KnowledgeSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: workflow.Namespace, diff --git a/pkg/controller/handlers/workflow/workflow.go b/pkg/controller/handlers/workflow/workflow.go index 8c5a8772..adcf5354 100644 --- a/pkg/controller/handlers/workflow/workflow.go +++ b/pkg/controller/handlers/workflow/workflow.go @@ -3,9 +3,7 @@ package workflow import ( "github.com/otto8-ai/nah/pkg/router" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" - "github.com/otto8-ai/otto8/pkg/system" "k8s.io/apimachinery/pkg/api/equality" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func EnsureIDs(req router.Request, resp router.Response) error { @@ -17,26 +15,3 @@ func EnsureIDs(req router.Request, resp router.Response) error { } return nil } - -func WorkspaceObjects(req router.Request, _ router.Response) error { - workflow := req.Object.(*v1.Workflow) - if workflow.Status.WorkspaceName == "" { - ws := &v1.Workspace{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: req.Namespace, - GenerateName: system.WorkspacePrefix, - Finalizers: []string{v1.WorkspaceFinalizer}, - }, - Spec: v1.WorkspaceSpec{ - WorkflowName: workflow.Name, - }, - } - if err := req.Client.Create(req.Ctx, ws); err != nil { - return err - } - - workflow.Status.WorkspaceName = ws.Name - } - - return nil -} diff --git a/pkg/controller/handlers/workflowexecution/workflowexecution.go b/pkg/controller/handlers/workflowexecution/workflowexecution.go index 2e85ef5b..163d8fc5 100644 --- a/pkg/controller/handlers/workflowexecution/workflowexecution.go +++ b/pkg/controller/handlers/workflowexecution/workflowexecution.go @@ -142,3 +142,25 @@ func (h *Handler) newThread(ctx context.Context, c kclient.Client, wf *v1.Workfl return &thread, c.Create(ctx, &thread) } + +func (h *Handler) ReassignThread(req router.Request, _ router.Response) error { + var ( + wfe = req.Object.(*v1.WorkflowExecution) + ) + + if wfe.Status.ThreadName != "" || wfe.Spec.WorkflowName == "" { + return nil + } + + var we v1.Workflow + if err := req.Get(&we, wfe.Namespace, wfe.Spec.WorkflowName); err != nil { + return kclient.IgnoreNotFound(err) + } + + if we.Spec.ThreadName != "" { + wfe.Spec.ThreadName = we.Spec.ThreadName + return req.Client.Update(req.Ctx, wfe) + } + + return nil +} diff --git a/pkg/controller/routes.go b/pkg/controller/routes.go index 62972008..6dfdc971 100644 --- a/pkg/controller/routes.go +++ b/pkg/controller/routes.go @@ -49,14 +49,15 @@ func (c *Controller) setupRoutes() error { root.Type(&v1.Thread{}).HandlerFunc(threads.WorkflowState) // Workflows - root.Type(&v1.Workflow{}).HandlerFunc(workflow.WorkspaceObjects) root.Type(&v1.Workflow{}).HandlerFunc(workflow.EnsureIDs) root.Type(&v1.Workflow{}).HandlerFunc(workflow.CreateWorkspaceAndKnowledgeSet) root.Type(&v1.Workflow{}).HandlerFunc(workflow.BackPopulateAuthStatus) + root.Type(&v1.Workflow{}).HandlerFunc(cleanup.Cleanup) // WorkflowExecutions root.Type(&v1.WorkflowExecution{}).HandlerFunc(cleanup.Cleanup) root.Type(&v1.WorkflowExecution{}).HandlerFunc(workflowExecution.Run) + root.Type(&v1.WorkflowExecution{}).HandlerFunc(workflowExecution.ReassignThread) // Agents root.Type(&v1.Agent{}).HandlerFunc(agents.CreateWorkspaceAndKnowledgeSet) @@ -75,6 +76,7 @@ func (c *Controller) setupRoutes() error { // Reference root.Type(&v1.Agent{}).HandlerFunc(alias.AssignAlias) + root.Type(&v1.EmailReceiver{}).HandlerFunc(alias.AssignAlias) root.Type(&v1.Workflow{}).HandlerFunc(alias.AssignAlias) root.Type(&v1.Model{}).HandlerFunc(alias.AssignAlias) root.Type(&v1.DefaultModelAlias{}).HandlerFunc(alias.AssignAlias) diff --git a/pkg/events/events.go b/pkg/events/events.go index 8d207686..8d6737cb 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -260,10 +260,16 @@ func (e *Emitter) printRun(ctx context.Context, state *printState, run v1.Run, r return err } step, _ := types.FindStep(wfe.Status.WorkflowManifest, run.Spec.WorkflowStepID) + if run.Spec.WorkflowStepID != "" && step == nil { + step = &types.Step{ + ID: run.Spec.WorkflowStepID, + } + } result <- types.Progress{ - RunID: run.Name, - Time: types.NewTime(wfe.CreationTimestamp.Time), - Step: step, + RunID: run.Name, + ParentRunID: run.Spec.PreviousRunName, + Time: types.NewTime(wfe.CreationTimestamp.Time), + Step: step, } state.lastStepPrinted = run.Spec.WorkflowStepID } @@ -707,6 +713,7 @@ func (e *Emitter) printCall(ctx context.Context, namespace, runID string, prg *g Name: tool.Name, Description: tool.Description, Input: subCall.Input, + Output: getToolCallOutput(frames, callID), Metadata: tool.MetaData, } } else { @@ -743,6 +750,14 @@ func (e *Emitter) printCall(ctx context.Context, namespace, runID string, prg *g return nil } +func getToolCallOutput(frames gptscript.CallFrames, callID string) string { + out := frames[callID].Output + if len(out) == 1 { + return out[0].Content + } + return "" +} + func isSubCallTargetIDs(tool gptscript.Tool) (agentID string, workflowID string) { for _, line := range strings.Split(tool.Instructions, "\n") { suffix, ok := strings.CutPrefix(line, "#OTTO8_SUBCALL: TARGET: ") diff --git a/pkg/invoke/invoker.go b/pkg/invoke/invoker.go index 55a3a260..94cc5944 100644 --- a/pkg/invoke/invoker.go +++ b/pkg/invoke/invoker.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "maps" + "slices" "strings" "sync" "time" @@ -104,6 +105,10 @@ func (r *Response) Result(ctx context.Context) (TaskResult, error) { return TaskResult{}, err } + if runState.Name != r.Run.Name { + panic("runState doesnt match") + } + if runState.Spec.Error != "" { return TaskResult{}, ErrToolResult{ Message: runState.Spec.Error, @@ -233,7 +238,7 @@ func CreateThreadForAgent(ctx context.Context, c kclient.WithWatch, agent *v1.Ag return &thread, c.Create(ctx, &thread) } -func (i *Invoker) updateThreadFields(ctx context.Context, c kclient.WithWatch, agent *v1.Agent, thread *v1.Thread, opt Options) error { +func (i *Invoker) updateThreadFields(ctx context.Context, c kclient.WithWatch, agent *v1.Agent, thread *v1.Thread, extraEnv []string, opt Options) error { var updated bool if opt.AgentAlias != "" && thread.Spec.AgentAlias != opt.AgentAlias { thread.Spec.AgentAlias = opt.AgentAlias @@ -243,6 +248,10 @@ func (i *Invoker) updateThreadFields(ctx context.Context, c kclient.WithWatch, a thread.Spec.AgentName = agent.Name updated = true } + if !slices.Equal(thread.Spec.Env, extraEnv) { + thread.Spec.Env = extraEnv + updated = true + } if updated { return c.Status().Update(ctx, thread) } @@ -262,10 +271,6 @@ func (i *Invoker) Agent(ctx context.Context, c kclient.WithWatch, agent *v1.Agen return nil, err } - if err := i.updateThreadFields(ctx, c, agent, thread, opt); err != nil { - return nil, err - } - credContextIDs := []string{thread.Name} if opt.ThreadCredentialScope != nil && !*opt.ThreadCredentialScope { credContextIDs = nil @@ -284,6 +289,10 @@ func (i *Invoker) Agent(ctx context.Context, c kclient.WithWatch, agent *v1.Agen return nil, err } + if err := i.updateThreadFields(ctx, c, agent, thread, extraEnv, opt); err != nil { + return nil, err + } + if len(agent.Spec.Manifest.Params) == 0 { data := map[string]any{} if err := json.Unmarshal([]byte(input), &data); err == nil { diff --git a/pkg/invoke/step.go b/pkg/invoke/step.go index 092b7f7a..411f82de 100644 --- a/pkg/invoke/step.go +++ b/pkg/invoke/step.go @@ -3,6 +3,7 @@ package invoke import ( "context" "encoding/json" + "fmt" "github.com/otto8-ai/nah/pkg/router" "github.com/otto8-ai/otto8/apiclient/types" @@ -57,6 +58,9 @@ func (i *Invoker) toAgentFromStep(ctx context.Context, c kclient.Client, step *v if err := c.Get(ctx, router.Key(step.Namespace, wfe.Spec.WorkflowName), &wf); err != nil { return v1.Agent{}, err } + if wfe.Status.WorkflowManifest == nil { + return v1.Agent{}, fmt.Errorf("workflow execution %s has no manifest", wfe.Name) + } return i.toAgent(ctx, c, &wf, step, wfe.Spec.Input, *wfe.Status.WorkflowManifest) } diff --git a/pkg/invoke/workflow.go b/pkg/invoke/workflow.go index e0610ce2..8b5d42a1 100644 --- a/pkg/invoke/workflow.go +++ b/pkg/invoke/workflow.go @@ -20,7 +20,7 @@ import ( type WorkflowOptions struct { ThreadName string StepID string - UserID string + OwningThreadName string WorkflowExecutionName string Events bool } @@ -33,7 +33,7 @@ func (i *Invoker) startWorkflow(ctx context.Context, c kclient.WithWatch, wf *v1 Namespace: wf.Namespace, }, Spec: v1.WorkflowExecutionSpec{ - UserID: opt.UserID, + ThreadName: opt.OwningThreadName, Input: input, WorkflowName: wf.Name, }, @@ -183,7 +183,7 @@ func (i *Invoker) rerunThread(ctx context.Context, c kclient.WithWatch, wf *v1.W return nil, nil, err } - if input != "" && wfe.Spec.Input != input { + if wfe.Spec.Input != input { if stepID == "" { // If input doesn't match, delete all steps and rerun stepID = "*" diff --git a/pkg/render/render.go b/pkg/render/render.go index f51d7894..44a96ba6 100644 --- a/pkg/render/render.go +++ b/pkg/render/render.go @@ -11,7 +11,6 @@ import ( "github.com/gptscript-ai/go-gptscript" "github.com/otto8-ai/otto8/apiclient/types" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" - "github.com/otto8-ai/otto8/pkg/system" apierror "k8s.io/apimachinery/pkg/api/errors" kclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -25,6 +24,10 @@ type AgentOptions struct { } func Agent(ctx context.Context, db kclient.Client, agent *v1.Agent, oauthServerURL string, opts AgentOptions) (_ []gptscript.ToolDef, extraEnv []string, _ error) { + defer func() { + sort.Strings(extraEnv) + }() + mainTool := gptscript.ToolDef{ Name: agent.Spec.Manifest.Name, Description: agent.Spec.Manifest.Description, @@ -40,6 +43,22 @@ func Agent(ctx context.Context, db kclient.Client, agent *v1.Agent, oauthServerU extraEnv = append(extraEnv, agent.Spec.Env...) + for _, env := range agent.Spec.Manifest.Env { + if env.Name == "" { + continue + } + if !validEnv.MatchString(env.Name) { + return nil, nil, fmt.Errorf("invalid env var %s, must match %s", env.Name, validEnv.String()) + } + if env.Value == "" { + agent.Spec.Credentials = append(agent.Spec.Credentials, + fmt.Sprintf(`github.com/gptscript-ai/credential as %s with "%s" as message and "%s" as env and %s as field`, + env.Name, env.Description, env.Name, env.Name)) + } else { + extraEnv = append(extraEnv, fmt.Sprintf("%s=%s", env.Name, env.Value)) + } + } + if mainTool.Instructions == "" { mainTool.Instructions = v1.DefaultAgentPrompt } @@ -63,6 +82,14 @@ func Agent(ctx context.Context, db kclient.Client, agent *v1.Agent, oauthServerU mainTool.Tools = append(mainTool.Tools, name) } + for _, tool := range agent.Spec.SystemTools { + name, err := ResolveToolReference(ctx, db, "", agent.Namespace, tool) + if err != nil { + return nil, nil, err + } + mainTool.Tools = append(mainTool.Tools, name) + } + mainTool, otherTools, err := addAgentTools(ctx, db, agent, mainTool, otherTools) if err != nil { return nil, nil, err @@ -73,7 +100,7 @@ func Agent(ctx context.Context, db kclient.Client, agent *v1.Agent, oauthServerU return nil, nil, err } - mainTool, otherTools, err = addKnowledgeTools(ctx, db, agent, opts.Thread, mainTool, otherTools) + extraEnv, err = addKnowledgeTools(ctx, db, agent, opts.Thread, extraEnv) if err != nil { return nil, nil, err } @@ -130,7 +157,7 @@ func OAuthAppEnv(ctx context.Context, db kclient.Client, oauthAppNames []string, return extraEnv, nil } -func addKnowledgeTools(ctx context.Context, db kclient.Client, agent *v1.Agent, thread *v1.Thread, mainTool gptscript.ToolDef, otherTools []gptscript.ToolDef) (_ gptscript.ToolDef, _ []gptscript.ToolDef, _ error) { +func addKnowledgeTools(ctx context.Context, db kclient.Client, agent *v1.Agent, thread *v1.Thread, extraEnv []string) ([]string, error) { var knowledgeSetNames []string knowledgeSetNames = append(knowledgeSetNames, agent.Status.KnowledgeSetNames...) if thread != nil { @@ -138,25 +165,15 @@ func addKnowledgeTools(ctx context.Context, db kclient.Client, agent *v1.Agent, } if len(knowledgeSetNames) == 0 { - return mainTool, otherTools, nil + return extraEnv, nil } - knowledgeTool, err := ResolveToolReference(ctx, db, types.ToolReferenceTypeSystem, agent.Namespace, system.KnowledgeRetrievalTool) - if err != nil { - return mainTool, nil, err - } - - resultFormatter, err := ResolveToolReference(ctx, db, types.ToolReferenceTypeSystem, agent.Namespace, system.ResultFormatterTool) - if err != nil { - return mainTool, nil, err - } - - for i, knowledgeSetName := range knowledgeSetNames { + for _, knowledgeSetName := range knowledgeSetNames { var ks v1.KnowledgeSet if err := db.Get(ctx, kclient.ObjectKey{Namespace: agent.Namespace, Name: knowledgeSetName}, &ks); apierror.IsNotFound(err) { continue } else if err != nil { - return mainTool, nil, err + return nil, err } if !ks.Status.HasContent { @@ -175,44 +192,13 @@ func addKnowledgeTools(ctx context.Context, db kclient.Client, agent *v1.Agent, dataDescription = "No data description available" } - toolName := "knowledge_set_query" - if i > 0 { - toolName = fmt.Sprintf("knowledge_set_%d_query", i) - } - - tool := gptscript.ToolDef{ - Name: toolName, - Description: fmt.Sprintf("Obtain search result from the knowledge set known as %s", ks.Name), - Instructions: "#!sys.echo", - Arguments: gptscript.ObjectSchema( - "query", "A search query that will be evaluated against the knowledge set"), - OutputFilters: []string{ - knowledgeTool + fmt.Sprintf(fmt.Sprintf(" with %s/%s as datasets and ${query} as query", ks.Namespace, ks.Name)), - resultFormatter, - }, - } - - contentTool := gptscript.ToolDef{ - Name: toolName + "_context", - Instructions: strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(` -#!sys.echo -# START INSTRUCTIONS: KNOWLEDGE SET: %n% - -Use the tool %k% to query knowledge set %n% to assist in Retrieval-Augmented Generation (RAG). -The knowledge set %n% contains data described as: - -%d% - -# END INSTRUCTIONS: KNOWLEDGE SET: %n% -`, "%k%", toolName), "%d%", dataDescription), "%n%", ks.Name), - } - - mainTool.Tools = append(mainTool.Tools, tool.Name) - mainTool.Context = append(mainTool.Context, contentTool.Name) - otherTools = append(otherTools, tool, contentTool) + return append(extraEnv, + fmt.Sprintf("KNOW_DATASETS=%s/%s", ks.Namespace, ks.Name), + fmt.Sprintf("KNOW_DATASET_DESCRIPTION=%s", dataDescription), + ), nil } - return mainTool, otherTools, nil + return extraEnv, nil } func addWorkflowTools(ctx context.Context, db kclient.Client, agent *v1.Agent, mainTool gptscript.ToolDef, otherTools []gptscript.ToolDef) (_ gptscript.ToolDef, _ []gptscript.ToolDef, _ error) { diff --git a/pkg/render/workflow.go b/pkg/render/workflow.go index 5ca9d841..11d735be 100644 --- a/pkg/render/workflow.go +++ b/pkg/render/workflow.go @@ -3,16 +3,20 @@ package render import ( "context" "fmt" + "regexp" "strings" "github.com/otto8-ai/nah/pkg/router" "github.com/otto8-ai/otto8/apiclient/types" v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1" + "github.com/otto8-ai/otto8/pkg/system" apierror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kclient "sigs.k8s.io/controller-runtime/pkg/client" ) +var validEnv = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$") + type WorkflowOptions struct { Step *types.Step ManifestOverride *types.WorkflowManifest @@ -58,7 +62,6 @@ func Workflow(ctx context.Context, c kclient.Client, wf *v1.Workflow, opts Workf }, Spec: v1.AgentSpec{ Manifest: agentManifest, - Credentials: wf.Spec.Manifest.Credentials, CredentialContextID: wf.Name, }, Status: v1.AgentStatus{ @@ -67,19 +70,6 @@ func Workflow(ctx context.Context, c kclient.Client, wf *v1.Workflow, opts Workf }, } - for _, env := range wf.Spec.Manifest.Env { - if env.Name == "" { - continue - } - if env.Value == "" { - agent.Spec.Credentials = append(agent.Spec.Credentials, - fmt.Sprintf(`github.com/gptscript-ai/credential as %s with "%s" as message and "%s" as env and %s as field`, - env.Name, env.Description, env.Name, env.Name)) - } else { - agent.Spec.Env = append(agent.Spec.Env, fmt.Sprintf("%s=%s", env.Name, env.Value)) - } - } - if step := opts.Step; step != nil { if step.Cache != nil { agent.Spec.Manifest.Cache = step.Cache @@ -104,9 +94,7 @@ func Workflow(ctx context.Context, c kclient.Client, wf *v1.Workflow, opts Workf agent.Spec.Manifest.Prompt = v1.DefaultWorkflowAgentPrompt } - if opts.Input != "" { - agent.Spec.Manifest.Prompt = fmt.Sprintf("WORKFLOW INPUT: %s\nEND WORKFLOW INPUT\n\n%s", opts.Input, agent.Spec.Manifest.Prompt) - } - + agent.Spec.Env = append(agent.Spec.Env, "WORKFLOW_INPUT="+opts.Input) + agent.Spec.SystemTools = append(agent.Spec.SystemTools, system.WorkflowTool) return &agent, nil } diff --git a/pkg/smtp/smtp.go b/pkg/smtp/smtp.go index d404c92d..fc190fb2 100644 --- a/pkg/smtp/smtp.go +++ b/pkg/smtp/smtp.go @@ -3,8 +3,8 @@ package smtp import ( "bytes" "context" - "crypto/sha256" "encoding/base64" + "encoding/json" "fmt" "io" "mime" @@ -87,8 +87,13 @@ func (s *Server) handler(_ net.Addr, from string, to []string, data []byte) erro continue } + ns, name, ok := strings.Cut(name, ".") + if !ok { + log.Infof("Skipping mail for %s: no namespace found", toAddr.Address) + } + var emailReceiver v1.EmailReceiver - if err := alias.Get(s.ctx, s.c, &emailReceiver, "", name); apierror.IsNotFound(err) { + if err := alias.Get(s.ctx, s.c, &emailReceiver, ns, name); apierror.IsNotFound(err) { log.Infof("Skipping mail for %s: no receiver found", toAddr.Address) continue } else if err != nil { @@ -108,7 +113,7 @@ func (s *Server) handler(_ net.Addr, from string, to []string, data []byte) erro } } - if err := s.dispatchEmail(emailReceiver, fromAddress.Address, body); err != nil { + if err := s.dispatchEmail(emailReceiver, body, message); err != nil { return fmt.Errorf("dispatch email: %w", err) } } @@ -160,34 +165,43 @@ func getBody(message *mail.Message) (string, error) { return "", fmt.Errorf("failed to find text/plain body: %s", mediaType) } -func (s *Server) dispatchEmail(email v1.EmailReceiver, from, body string) error { - var input strings.Builder - _, _ = input.WriteString("You are being called from an email from ") - _, _ = input.WriteString(from) - _, _ = input.WriteString(". With the body:\n\n") - _, _ = input.WriteString("START BODY\n") - _, _ = input.WriteString(body) - _, _ = input.WriteString("\nEND BODY\n\n") +func (s *Server) dispatchEmail(email v1.EmailReceiver, body string, message *mail.Message) error { + var input struct { + Type string `json:"type"` + From string `json:"from"` + To string `json:"to"` + Subject string `json:"subject"` + Body string `json:"body"` + } + + input.Type = "email" + input.From = message.Header.Get("From") + input.To = message.Header.Get("To") + input.Subject = message.Header.Get("Subject") + input.Body = body + + inputJSON, err := json.Marshal(input) + if err != nil { + return fmt.Errorf("marshal input: %w", err) + } var workflow v1.Workflow - if err := alias.Get(s.ctx, s.c, &workflow, "", email.Spec.Workflow); err != nil { + if err := alias.Get(s.ctx, s.c, &workflow, email.Namespace, email.Spec.Workflow); err != nil { return err } - err := s.c.Create(s.ctx, &v1.WorkflowExecution{ + return s.c.Create(s.ctx, &v1.WorkflowExecution{ ObjectMeta: metav1.ObjectMeta{ - // The name here is the sha256 hash of the body to handle multiple executions of the same webhook. - // That is, if the webhook is called twice with the same body, it will only be executed once. - Name: system.WorkflowExecutionPrefix + fmt.Sprintf("%x", sha256.Sum256([]byte(from+body))), - Namespace: workflow.Namespace, + GenerateName: system.WorkflowExecutionPrefix, + Namespace: workflow.Namespace, }, Spec: v1.WorkflowExecutionSpec{ WorkflowName: workflow.Name, EmailReceiverName: email.Name, - Input: input.String(), + ThreadName: workflow.Spec.ThreadName, + Input: string(inputJSON), }, }) - return kclient.IgnoreAlreadyExists(err) } func matches(address string, email v1.EmailReceiver) bool { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/agent.go b/pkg/storage/apis/otto.otto8.ai/v1/agent.go index 7c99b407..d8478f42 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/agent.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/agent.go @@ -65,6 +65,8 @@ func (a *Agent) FieldNames() []string { type AgentSpec struct { Manifest types.AgentManifest `json:"manifest,omitempty"` + SystemTools []string `json:"systemTools,omitempty"` + ContextInput string `json:"contextInput,omitempty"` InputFilters []string `json:"inputFilters,omitempty"` Credentials []string `json:"credentials,omitempty"` CredentialContextID string `json:"credentialContextID,omitempty"` diff --git a/pkg/storage/apis/otto.otto8.ai/v1/cronjob.go b/pkg/storage/apis/otto.otto8.ai/v1/cronjob.go index a36bae2c..416bb555 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/cronjob.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/cronjob.go @@ -26,14 +26,14 @@ func (c *CronJob) Has(field string) (exists bool) { func (c *CronJob) Get(field string) (value string) { switch field { - case "spec.userID": - return c.Spec.UserID + case "spec.threadName": + return c.Spec.ThreadName } return "" } func (c *CronJob) FieldNames() []string { - return []string{"spec.userID"} + return []string{"spec.threadName"} } func (*CronJob) GetColumns() [][]string { @@ -59,6 +59,7 @@ func (c *CronJob) DeleteRefs() []Ref { type CronJobSpec struct { types.CronJobManifest `json:",inline"` + ThreadName string `json:"threadName,omitempty"` } type CronJobStatus struct { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/emailaddress.go b/pkg/storage/apis/otto.otto8.ai/v1/emailaddress.go index 3c0a30fb..f314d2ef 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/emailaddress.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/emailaddress.go @@ -30,14 +30,14 @@ func (in *EmailReceiver) Has(field string) (exists bool) { func (in *EmailReceiver) Get(field string) (value string) { switch field { - case "spec.userID": - return in.Spec.UserID + case "spec.threadName": + return in.Spec.ThreadName } return "" } func (in *EmailReceiver) FieldNames() []string { - return []string{"spec.userID"} + return []string{"spec.threadName"} } func (in *EmailReceiver) GetAliasName() string { @@ -81,6 +81,7 @@ func (in *EmailReceiver) DeleteRefs() []Ref { type EmailReceiverSpec struct { types.EmailReceiverManifest `json:",inline"` + ThreadName string `json:"threadName,omitempty"` } type EmailReceiverStatus struct { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/thread.go b/pkg/storage/apis/otto.otto8.ai/v1/thread.go index ebbffe85..5f545fd7 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/thread.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/thread.go @@ -66,6 +66,7 @@ type ThreadSpec struct { TextEmbeddingModel string `json:"textEmbeddingModel,omitempty"` SystemTask bool `json:"systemTask,omitempty"` Abort bool `json:"abort,omitempty"` + Env []string `json:"env,omitempty"` } func (in *Thread) DeleteRefs() []Ref { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/webhook.go b/pkg/storage/apis/otto.otto8.ai/v1/webhook.go index 56cca1f8..830c3899 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/webhook.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/webhook.go @@ -25,7 +25,7 @@ type Webhook struct { } func (w *Webhook) FieldNames() []string { - return []string{"spec.userID"} + return []string{"spec.threadName"} } func (w *Webhook) Has(field string) (exists bool) { @@ -34,8 +34,8 @@ func (w *Webhook) Has(field string) (exists bool) { func (w *Webhook) Get(field string) (value string) { switch field { - case "spec.userID": - return w.Spec.UserID + case "spec.threadName": + return w.Spec.ThreadName } return "" } @@ -83,6 +83,7 @@ func (w *Webhook) DeleteRefs() []Ref { type WebhookSpec struct { types.WebhookManifest `json:",inline"` TokenHash []byte `json:"tokenHash,omitempty"` + ThreadName string } type WebhookStatus struct { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/workflow.go b/pkg/storage/apis/otto.otto8.ai/v1/workflow.go index cdc64618..db96a179 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/workflow.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/workflow.go @@ -30,18 +30,15 @@ func (in *Workflow) Has(field string) (exists bool) { func (in *Workflow) Get(field string) (value string) { switch field { - case "spec.agentName": - return in.Spec.AgentName - case "spec.userID": - return in.Spec.UserID + case "spec.threadName": + return in.Spec.ThreadName } return "" } func (in *Workflow) FieldNames() []string { return []string{ - "spec.agentName", - "spec.userID", + "spec.threadName", } } @@ -70,9 +67,21 @@ func (in *Workflow) SetAliasObservedGeneration(gen int64) { } type WorkflowSpec struct { - AgentName string `json:"agentName,omitempty"` - UserID string `json:"userID,omitempty"` - Manifest types.WorkflowManifest `json:"manifest,omitempty"` + ThreadName string `json:"threadName,omitempty"` + Manifest types.WorkflowManifest `json:"manifest,omitempty"` + KnowledgeSetNames []string `json:"knowledgeSetNames,omitempty"` + WorkspaceName string `json:"workspaceName,omitempty"` +} + +func (in *Workflow) DeleteRefs() []Ref { + refs := []Ref{ + {ObjType: &Thread{}, Name: in.Spec.ThreadName}, + {ObjType: &Workspace{}, Name: in.Spec.WorkspaceName}, + } + for _, name := range in.Spec.KnowledgeSetNames { + refs = append(refs, Ref{ObjType: &KnowledgeSet{}, Name: name}) + } + return refs } type WorkflowStatus struct { diff --git a/pkg/storage/apis/otto.otto8.ai/v1/workflowexecution.go b/pkg/storage/apis/otto.otto8.ai/v1/workflowexecution.go index 27832838..346fe789 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/workflowexecution.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/workflowexecution.go @@ -27,8 +27,8 @@ func (in *WorkflowExecution) Has(field string) bool { func (in *WorkflowExecution) Get(field string) string { if in != nil { switch field { - case "spec.userID": - return in.Spec.UserID + case "spec.threadName": + return in.Spec.ThreadName case "spec.webhookName": return in.Spec.WebhookName case "spec.cronJobName": @@ -44,7 +44,13 @@ func (in *WorkflowExecution) Get(field string) string { } func (in *WorkflowExecution) FieldNames() []string { - return []string{"spec.userID", "spec.webhookName", "spec.cronJobName", "spec.workflowName", "spec.parentRunName"} + return []string{ + "spec.threadName", + "spec.webhookName", + "spec.cronJobName", + "spec.workflowName", + "spec.parentRunName", + } } func (in *WorkflowExecution) GetColumns() [][]string { @@ -59,8 +65,9 @@ func (in *WorkflowExecution) GetColumns() [][]string { } type WorkflowExecutionSpec struct { - Input string `json:"input,omitempty"` - UserID string `json:"userID,omitempty"` + Input string `json:"input,omitempty"` + // ThreadName is the name of the thread that owns this execution, which is the same as the owning thread of the workflow. + ThreadName string `json:"threadName,omitempty"` WorkflowName string `json:"workflowName,omitempty"` WebhookName string `json:"webhookName,omitempty"` EmailReceiverName string `json:"emailReceiverName,omitempty"` diff --git a/pkg/storage/apis/otto.otto8.ai/v1/workspace.go b/pkg/storage/apis/otto.otto8.ai/v1/workspace.go index 21dc5429..623ed758 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/workspace.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/workspace.go @@ -44,13 +44,15 @@ func (in *Workspace) Get(field string) string { return in.Spec.ThreadName case "spec.knowledgeSetName": return in.Spec.KnowledgeSetName + case "status.workspaceID": + return in.Status.WorkspaceID } return "" } func (*Workspace) FieldNames() []string { - return []string{"spec.agentName", "spec.workflowName", "spec.threadName", "spec.knowledgeSetName"} + return []string{"spec.agentName", "spec.workflowName", "spec.threadName", "spec.knowledgeSetName", "status.workspaceID"} } var _ fields.Fields = (*Workspace)(nil) diff --git a/pkg/storage/apis/otto.otto8.ai/v1/zz_generated.deepcopy.go b/pkg/storage/apis/otto.otto8.ai/v1/zz_generated.deepcopy.go index 44110f33..70fe51ea 100644 --- a/pkg/storage/apis/otto.otto8.ai/v1/zz_generated.deepcopy.go +++ b/pkg/storage/apis/otto.otto8.ai/v1/zz_generated.deepcopy.go @@ -73,6 +73,11 @@ func (in *AgentList) DeepCopyObject() runtime.Object { func (in *AgentSpec) DeepCopyInto(out *AgentSpec) { *out = *in in.Manifest.DeepCopyInto(&out.Manifest) + if in.SystemTools != nil { + in, out := &in.SystemTools, &out.SystemTools + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.InputFilters != nil { in, out := &in.InputFilters, &out.InputFilters *out = make([]string, len(*in)) @@ -1348,6 +1353,11 @@ func (in *ThreadSpec) DeepCopyInto(out *ThreadSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ThreadSpec. @@ -1769,6 +1779,11 @@ func (in *WorkflowList) DeepCopyObject() runtime.Object { func (in *WorkflowSpec) DeepCopyInto(out *WorkflowSpec) { *out = *in in.Manifest.DeepCopyInto(&out.Manifest) + if in.KnowledgeSetNames != nil { + in, out := &in.KnowledgeSetNames, &out.KnowledgeSetNames + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkflowSpec. diff --git a/pkg/storage/openapi/generated/openapi_generated.go b/pkg/storage/openapi/generated/openapi_generated.go index b8084731..e0b82433 100644 --- a/pkg/storage/openapi/generated/openapi_generated.go +++ b/pkg/storage/openapi/generated/openapi_generated.go @@ -35,6 +35,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/otto8-ai/otto8/apiclient/types.EmailReceiver": schema_otto8_ai_otto8_apiclient_types_EmailReceiver(ref), "github.com/otto8-ai/otto8/apiclient/types.EmailReceiverList": schema_otto8_ai_otto8_apiclient_types_EmailReceiverList(ref), "github.com/otto8-ai/otto8/apiclient/types.EmailReceiverManifest": schema_otto8_ai_otto8_apiclient_types_EmailReceiverManifest(ref), + "github.com/otto8-ai/otto8/apiclient/types.EnvVar": schema_otto8_ai_otto8_apiclient_types_EnvVar(ref), "github.com/otto8-ai/otto8/apiclient/types.ErrHTTP": schema_otto8_ai_otto8_apiclient_types_ErrHTTP(ref), "github.com/otto8-ai/otto8/apiclient/types.File": schema_otto8_ai_otto8_apiclient_types_File(ref), "github.com/otto8-ai/otto8/apiclient/types.FileList": schema_otto8_ai_otto8_apiclient_types_FileList(ref), @@ -70,6 +71,8 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/otto8-ai/otto8/apiclient/types.Step": schema_otto8_ai_otto8_apiclient_types_Step(ref), "github.com/otto8-ai/otto8/apiclient/types.StepTemplateInvoke": schema_otto8_ai_otto8_apiclient_types_StepTemplateInvoke(ref), "github.com/otto8-ai/otto8/apiclient/types.SubFlow": schema_otto8_ai_otto8_apiclient_types_SubFlow(ref), + "github.com/otto8-ai/otto8/apiclient/types.Table": schema_otto8_ai_otto8_apiclient_types_Table(ref), + "github.com/otto8-ai/otto8/apiclient/types.TableList": schema_otto8_ai_otto8_apiclient_types_TableList(ref), "github.com/otto8-ai/otto8/apiclient/types.Task": schema_otto8_ai_otto8_apiclient_types_Task(ref), "github.com/otto8-ai/otto8/apiclient/types.TaskEmail": schema_otto8_ai_otto8_apiclient_types_TaskEmail(ref), "github.com/otto8-ai/otto8/apiclient/types.TaskIf": schema_otto8_ai_otto8_apiclient_types_TaskIf(ref), @@ -99,7 +102,8 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/otto8-ai/otto8/apiclient/types.While": schema_otto8_ai_otto8_apiclient_types_While(ref), "github.com/otto8-ai/otto8/apiclient/types.Workflow": schema_otto8_ai_otto8_apiclient_types_Workflow(ref), "github.com/otto8-ai/otto8/apiclient/types.WorkflowCall": schema_otto8_ai_otto8_apiclient_types_WorkflowCall(ref), - "github.com/otto8-ai/otto8/apiclient/types.WorkflowEnv": schema_otto8_ai_otto8_apiclient_types_WorkflowEnv(ref), + "github.com/otto8-ai/otto8/apiclient/types.WorkflowExecution": schema_otto8_ai_otto8_apiclient_types_WorkflowExecution(ref), + "github.com/otto8-ai/otto8/apiclient/types.WorkflowExecutionList": schema_otto8_ai_otto8_apiclient_types_WorkflowExecutionList(ref), "github.com/otto8-ai/otto8/apiclient/types.WorkflowList": schema_otto8_ai_otto8_apiclient_types_WorkflowList(ref), "github.com/otto8-ai/otto8/apiclient/types.WorkflowManifest": schema_otto8_ai_otto8_apiclient_types_WorkflowManifest(ref), "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1.Agent": schema_storage_apis_ottootto8ai_v1_Agent(ref), @@ -539,12 +543,25 @@ func schema_otto8_ai_otto8_apiclient_types_AgentManifest(ref common.ReferenceCal Format: "", }, }, + "env": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.EnvVar"), + }, + }, + }, + }, + }, }, - Required: []string{"name", "icons", "description", "temperature", "cache", "alias", "prompt", "knowledgeDescription", "agents", "workflows", "tools", "availableThreadTools", "defaultThreadTools", "oauthApps", "maxThreadTools", "params", "model"}, + Required: []string{"name", "icons", "description", "temperature", "cache", "alias", "prompt", "knowledgeDescription", "agents", "workflows", "tools", "availableThreadTools", "defaultThreadTools", "oauthApps", "maxThreadTools", "params", "model", "env"}, }, }, Dependencies: []string{ - "github.com/otto8-ai/otto8/apiclient/types.AgentIcons"}, + "github.com/otto8-ai/otto8/apiclient/types.AgentIcons", "github.com/otto8-ai/otto8/apiclient/types.EnvVar"}, } } @@ -882,12 +899,6 @@ func schema_otto8_ai_otto8_apiclient_types_CronJobManifest(ref common.ReferenceC Format: "", }, }, - "userID": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, "taskSchedule": { SchemaProps: spec.SchemaProps{ Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Schedule"), @@ -1070,12 +1081,6 @@ func schema_otto8_ai_otto8_apiclient_types_EmailReceiverManifest(ref common.Refe Format: "", }, }, - "userID": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, "workflow": { SchemaProps: spec.SchemaProps{ Default: "", @@ -1104,6 +1109,40 @@ func schema_otto8_ai_otto8_apiclient_types_EmailReceiverManifest(ref common.Refe } } +func schema_otto8_ai_otto8_apiclient_types_EnvVar(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "name": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "value": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + "description": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"name", "value", "description"}, + }, + }, + } +} + func schema_otto8_ai_otto8_apiclient_types_ErrHTTP(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -2207,6 +2246,13 @@ func schema_otto8_ai_otto8_apiclient_types_Progress(ref common.ReferenceCallback Format: "", }, }, + "parentRunID": { + SchemaProps: spec.SchemaProps{ + Description: "ParentRunID is the parent run of the run that is specified in the RunID field", + Type: []string{"string"}, + Format: "", + }, + }, "time": { SchemaProps: spec.SchemaProps{ Description: "Time is the time the event was generated", @@ -2772,6 +2818,54 @@ func schema_otto8_ai_otto8_apiclient_types_SubFlow(ref common.ReferenceCallback) } } +func schema_otto8_ai_otto8_apiclient_types_Table(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "name": { + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"name"}, + }, + }, + } +} + +func schema_otto8_ai_otto8_apiclient_types_TableList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Table"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "github.com/otto8-ai/otto8/apiclient/types.Table"}, + } +} + func schema_otto8_ai_otto8_apiclient_types_Task(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -2996,6 +3090,12 @@ func schema_otto8_ai_otto8_apiclient_types_TaskRun(ref common.ReferenceCallback) Format: "", }, }, + "input": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, "task": { SchemaProps: spec.SchemaProps{ Default: map[string]interface{}{}, @@ -3207,6 +3307,20 @@ func schema_otto8_ai_otto8_apiclient_types_Thread(ref common.ReferenceCallback) Format: "", }, }, + "env": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, Required: []string{"Metadata", "ThreadManifest"}, }, @@ -3311,6 +3425,12 @@ func schema_otto8_ai_otto8_apiclient_types_ToolCall(ref common.ReferenceCallback Format: "", }, }, + "output": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, "metadata": { SchemaProps: spec.SchemaProps{ Type: []string{"object"}, @@ -3723,12 +3843,6 @@ func schema_otto8_ai_otto8_apiclient_types_WebhookManifest(ref common.ReferenceC Format: "", }, }, - "userID": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, }, Required: []string{"name", "description", "alias", "workflow", "headers", "secret", "validationHeader"}, }, @@ -3896,37 +4010,81 @@ func schema_otto8_ai_otto8_apiclient_types_WorkflowCall(ref common.ReferenceCall } } -func schema_otto8_ai_otto8_apiclient_types_WorkflowEnv(ref common.ReferenceCallback) common.OpenAPIDefinition { +func schema_otto8_ai_otto8_apiclient_types_WorkflowExecution(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ SchemaProps: spec.SchemaProps{ Type: []string{"object"}, Properties: map[string]spec.Schema{ - "name": { + "Metadata": { SchemaProps: spec.SchemaProps{ - Default: "", - Type: []string{"string"}, - Format: "", + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Metadata"), }, }, - "value": { + "workflow": { SchemaProps: spec.SchemaProps{ - Default: "", - Type: []string{"string"}, - Format: "", + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.WorkflowManifest"), }, }, - "description": { + "startTime": { + SchemaProps: spec.SchemaProps{ + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Time"), + }, + }, + "endTime": { + SchemaProps: spec.SchemaProps{ + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Time"), + }, + }, + "input": { SchemaProps: spec.SchemaProps{ Default: "", Type: []string{"string"}, Format: "", }, }, + "error": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, - Required: []string{"name", "value", "description"}, + Required: []string{"Metadata", "startTime", "endTime", "input"}, }, }, + Dependencies: []string{ + "github.com/otto8-ai/otto8/apiclient/types.Metadata", "github.com/otto8-ai/otto8/apiclient/types.Time", "github.com/otto8-ai/otto8/apiclient/types.WorkflowManifest"}, + } +} + +func schema_otto8_ai_otto8_apiclient_types_WorkflowExecutionList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.WorkflowExecution"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "github.com/otto8-ai/otto8/apiclient/types.WorkflowExecution"}, } } @@ -4129,20 +4287,6 @@ func schema_otto8_ai_otto8_apiclient_types_WorkflowManifest(ref common.Reference Format: "", }, }, - "credentials": { - SchemaProps: spec.SchemaProps{ - Type: []string{"array"}, - Items: &spec.SchemaOrArray{ - Schema: &spec.Schema{ - SchemaProps: spec.SchemaProps{ - Default: "", - Type: []string{"string"}, - Format: "", - }, - }, - }, - }, - }, "env": { SchemaProps: spec.SchemaProps{ Type: []string{"array"}, @@ -4150,7 +4294,7 @@ func schema_otto8_ai_otto8_apiclient_types_WorkflowManifest(ref common.Reference Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ Default: map[string]interface{}{}, - Ref: ref("github.com/otto8-ai/otto8/apiclient/types.WorkflowEnv"), + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.EnvVar"), }, }, }, @@ -4177,11 +4321,11 @@ func schema_otto8_ai_otto8_apiclient_types_WorkflowManifest(ref common.Reference }, }, }, - Required: []string{"name", "icons", "description", "temperature", "cache", "alias", "prompt", "knowledgeDescription", "agents", "workflows", "tools", "availableThreadTools", "defaultThreadTools", "oauthApps", "maxThreadTools", "params", "model", "credentials", "env", "steps", "output"}, + Required: []string{"name", "icons", "description", "temperature", "cache", "alias", "prompt", "knowledgeDescription", "agents", "workflows", "tools", "availableThreadTools", "defaultThreadTools", "oauthApps", "maxThreadTools", "params", "model", "env", "steps", "output"}, }, }, Dependencies: []string{ - "github.com/otto8-ai/otto8/apiclient/types.AgentIcons", "github.com/otto8-ai/otto8/apiclient/types.Step", "github.com/otto8-ai/otto8/apiclient/types.WorkflowEnv"}, + "github.com/otto8-ai/otto8/apiclient/types.AgentIcons", "github.com/otto8-ai/otto8/apiclient/types.EnvVar", "github.com/otto8-ai/otto8/apiclient/types.Step"}, } } @@ -4291,6 +4435,26 @@ func schema_storage_apis_ottootto8ai_v1_AgentSpec(ref common.ReferenceCallback) Ref: ref("github.com/otto8-ai/otto8/apiclient/types.AgentManifest"), }, }, + "systemTools": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + "contextInput": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, "inputFilters": { SchemaProps: spec.SchemaProps{ Type: []string{"array"}, @@ -4661,15 +4825,15 @@ func schema_storage_apis_ottootto8ai_v1_CronJobSpec(ref common.ReferenceCallback Format: "", }, }, - "userID": { + "taskSchedule": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", + Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Schedule"), }, }, - "taskSchedule": { + "threadName": { SchemaProps: spec.SchemaProps{ - Ref: ref("github.com/otto8-ai/otto8/apiclient/types.Schedule"), + Type: []string{"string"}, + Format: "", }, }, }, @@ -4960,12 +5124,6 @@ func schema_storage_apis_ottootto8ai_v1_EmailReceiverSpec(ref common.ReferenceCa Format: "", }, }, - "userID": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, "workflow": { SchemaProps: spec.SchemaProps{ Default: "", @@ -4987,6 +5145,12 @@ func schema_storage_apis_ottootto8ai_v1_EmailReceiverSpec(ref common.ReferenceCa }, }, }, + "threadName": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"name", "description", "workflow"}, }, @@ -6791,6 +6955,20 @@ func schema_storage_apis_ottootto8ai_v1_ThreadSpec(ref common.ReferenceCallback) Format: "", }, }, + "env": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, }, }, @@ -7249,20 +7427,21 @@ func schema_storage_apis_ottootto8ai_v1_WebhookSpec(ref common.ReferenceCallback Format: "", }, }, - "userID": { + "tokenHash": { SchemaProps: spec.SchemaProps{ Type: []string{"string"}, - Format: "", + Format: "byte", }, }, - "tokenHash": { + "ThreadName": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "byte", + Default: "", + Type: []string{"string"}, + Format: "", }, }, }, - Required: []string{"name", "description", "alias", "workflow", "headers", "secret", "validationHeader"}, + Required: []string{"name", "description", "alias", "workflow", "headers", "secret", "validationHeader", "ThreadName"}, }, }, } @@ -7451,10 +7630,11 @@ func schema_storage_apis_ottootto8ai_v1_WorkflowExecutionSpec(ref common.Referen Format: "", }, }, - "userID": { + "threadName": { SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", + Description: "ThreadName is the name of the thread that owns this execution, which is the same as the owning thread of the workflow.", + Type: []string{"string"}, + Format: "", }, }, "workflowName": { @@ -7631,13 +7811,7 @@ func schema_storage_apis_ottootto8ai_v1_WorkflowSpec(ref common.ReferenceCallbac SchemaProps: spec.SchemaProps{ Type: []string{"object"}, Properties: map[string]spec.Schema{ - "agentName": { - SchemaProps: spec.SchemaProps{ - Type: []string{"string"}, - Format: "", - }, - }, - "userID": { + "threadName": { SchemaProps: spec.SchemaProps{ Type: []string{"string"}, Format: "", @@ -7649,6 +7823,26 @@ func schema_storage_apis_ottootto8ai_v1_WorkflowSpec(ref common.ReferenceCallbac Ref: ref("github.com/otto8-ai/otto8/apiclient/types.WorkflowManifest"), }, }, + "knowledgeSetNames": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + "workspaceName": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/system/tools.go b/pkg/system/tools.go index 47d1e264..49f17092 100644 --- a/pkg/system/tools.go +++ b/pkg/system/tools.go @@ -9,6 +9,7 @@ const ( WebsiteCleanTool = "website-cleaner" ResultFormatterTool = "result-formatter" ModelProviderTool = "otto8-model-provider" + WorkflowTool = "workflow" DefaultNamespace = "default" ) diff --git a/ui/user/src/lib/actions/textarea.ts b/ui/user/src/lib/actions/textarea.ts index f155c0cd..f134f39a 100644 --- a/ui/user/src/lib/actions/textarea.ts +++ b/ui/user/src/lib/actions/textarea.ts @@ -1,9 +1,6 @@ function resize(node: HTMLTextAreaElement) { node.style.height = 'auto'; - // not totally sure why 4 is needed here, otherwise the textarea is too small and we - // get a scrollbar - node.style.height = (node.scrollHeight < 44 ? 44 : node.scrollHeight) + 4 + 'px'; - console.log('resize', node.scrollHeight, node.style.height); + node.style.height = node.scrollHeight + 'px'; } export function autoHeight(node: HTMLTextAreaElement) { @@ -11,4 +8,5 @@ export function autoHeight(node: HTMLTextAreaElement) { node.onfocus = () => resize(node); node.oninput = () => resize(node); node.onresize = () => resize(node); + node.onchange = () => resize(node); } diff --git a/ui/user/src/lib/components/Editors.svelte b/ui/user/src/lib/components/Editors.svelte index cdaa0639..ee875e26 100644 --- a/ui/user/src/lib/components/Editors.svelte +++ b/ui/user/src/lib/components/Editors.svelte @@ -1,6 +1,7 @@
- {#if EditorService.items.length > 1 || !EditorService.items[0].task} + {#if EditorService.items.length > 1 || (!EditorService.items[0].task && !EditorService.items[0].table)}
@@ -76,6 +66,8 @@
{#if file.name.toLowerCase().endsWith('.md')} + {:else if file.table} + {:else if file.task} - import type { Messages, Progress } from '$lib/services'; - import { ChatService } from '$lib/services'; - import Message from '$lib/components/messages/Message.svelte'; - import { fade } from 'svelte/transition'; - import { newMessageSource } from '$lib/services/chat/messagesource.js'; - import { onDestroy } from 'svelte'; - import { currentAssistant } from '$lib/stores'; - - interface Props { - onMessages?: (messages: Messages) => void; - onError: (err: Error) => void; - onLoadFile: (filename: string) => void; - } - - let { onMessages, onError, onLoadFile }: Props = $props(); - - let progressEvents: Progress[] = []; - let replayComplete = false; - let messages: Messages = $state({ messages: [], inProgress: false }); - let close: () => void | undefined; - - $effect(() => { - if ($currentAssistant.id && !close) { - close = newMessageSource($currentAssistant.id, handleMessage, { - onError - }); - } - }); - - onDestroy(() => { - close?.(); - }); - - function handleMessage(progress: Progress) { - progressEvents = [...progressEvents, progress]; - if (!replayComplete) { - replayComplete = progressEvents.find((e) => e.replayComplete) !== undefined; - } - - if (!replayComplete) { - return; - } - - messages = ChatService.progressToMessages(progressEvents); - - // forward the messages to the parent component - onMessages?.(messages); - } - - -
- {#each messages.messages as msg} - {#if !msg.ignore} - - {/if} - {/each} -
diff --git a/ui/user/src/lib/components/Navbar.svelte b/ui/user/src/lib/components/Navbar.svelte index 28a3ccbf..6850a3e4 100644 --- a/ui/user/src/lib/components/Navbar.svelte +++ b/ui/user/src/lib/components/Navbar.svelte @@ -6,13 +6,26 @@ import Files from '$lib/components/navbar/Files.svelte'; import KnowledgeFile from '$lib/components/navbar/KnowledgeFiles.svelte'; import Tasks from '$lib/components/navbar/Tasks.svelte'; - import { onMount } from 'svelte'; + import Tables from '$lib/components/navbar/Tables.svelte'; + import { tools } from '$lib/stores'; - let showTasks = $state(false); + function hasOptionalTools() { + for (const tool of $tools.items) { + if (!tool.builtin) { + return true; + } + } + return false; + } - onMount(() => { - showTasks = window.location.href.split('?')[1]?.includes('tasks=1'); - }); + function hasTool(tool: string) { + for (const t of $tools.items) { + if (t.id === tool) { + return t.enabled || t.builtin; + } + } + return false; + }
+

{tableName}

+ +
+ { + await ChatService.invoke($currentAssistant.id, { + prompt: `In the table database '${tableName}' do the following instruction:\n${i.prompt}` + }); + }} + /> +
+ + + +
+
+ + + {#each data?.columns ?? [] as column} + + {/each} + + + + {#each data?.rows ?? [] as row} + + {#each data?.columns ?? [] as col} + + {/each} + + {/each} + +
{column}
{row[col]}
+
+
+ + diff --git a/ui/user/src/lib/components/messages/Input.svelte b/ui/user/src/lib/components/messages/Input.svelte index 985ae276..3733624c 100644 --- a/ui/user/src/lib/components/messages/Input.svelte +++ b/ui/user/src/lib/components/messages/Input.svelte @@ -1,24 +1,24 @@ - + {/if} +
{#if description}

{description}

{/if} diff --git a/ui/user/src/lib/components/navbar/Profile.svelte b/ui/user/src/lib/components/navbar/Profile.svelte index f8b6a3e4..3fdce142 100644 --- a/ui/user/src/lib/components/navbar/Profile.svelte +++ b/ui/user/src/lib/components/navbar/Profile.svelte @@ -1,101 +1,54 @@ - -
- - -
-
-

- {$profile.email || 'Anonymous'} -

-
-
- {#if credPromise !== undefined} - {#await credPromise} -

- Credentials -

- {:then credentials} -

Credentials

- {#if credentials?.items.length > 0} -
    - {#each credentials.items as cred} -
  • - {cred.name} - -
  • - {/each} -
- {:else} - No credentials - {/if} - {/await} + {/snippet} + {#snippet body()} +
+ {#if credentials && credentials?.items.length > 0} + Credentials + {#each credentials.items as cred} +
+ {cred.name} + +
+ {/each} + {:else} + No credentials {/if}
- -
-
+
+ {/snippet} + diff --git a/ui/user/src/lib/components/navbar/Tables.svelte b/ui/user/src/lib/components/navbar/Tables.svelte new file mode 100644 index 00000000..4594bd85 --- /dev/null +++ b/ui/user/src/lib/components/navbar/Tables.svelte @@ -0,0 +1,50 @@ + + + + {#snippet icon()} + + {/snippet} + {#snippet body()} + {#if tables.tables.length === 0} +

No files

+ {:else} +