From 7fa60375eb09620ffc4c3f134c98e237a3324a3b Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Mon, 9 Sep 2024 12:25:53 -0400 Subject: [PATCH] feat: add agent and thread workspace and knowledge file CRUD Endpoints are added to CRUD files for a general workspace for both agents and threads. Endpoints are also added to CRUD files for knowledge in both agents and threads. When knowledge is added or removed from knowledge, then controller will kickoff ingestion of the knowledge. An endpoint is added to manually kickoff ingestion as well. When deleting an agent or a thread, the corresponding workspaces are cleaned up. For now, the `KNOWLEDGE_BIN` environment variable is required for ingestion. Signed-off-by: Donnie Adams --- .gitignore | 5 + pkg/agents/render.go | 35 ++++-- pkg/api/handlers/agent.go | 114 +++++++++++++++++- pkg/api/handlers/files.go | 52 ++++++++ pkg/api/handlers/threads.go | 114 ++++++++++++++++++ pkg/api/router/router.go | 26 +++- pkg/api/types/types.go | 2 + pkg/controller/handlers/agents/agents.go | 47 ++++++++ pkg/controller/handlers/threads/threads.go | 48 +++++++- pkg/controller/routes.go | 6 + pkg/invoke/invoker.go | 35 +++--- pkg/services/config.go | 6 +- .../apis/otto.gptscript.ai/v1/agent.go | 4 +- pkg/storage/apis/otto.gptscript.ai/v1/run.go | 13 +- .../apis/otto.gptscript.ai/v1/thread.go | 21 ++-- .../v1/zz_generated.deepcopy.go | 7 +- .../openapi/generated/openapi_generated.go | 44 +++++++ pkg/storage/services/config.go | 4 +- pkg/workspace/knowledge.go | 19 +++ pkg/workspace/workspace.go | 10 ++ 20 files changed, 562 insertions(+), 50 deletions(-) create mode 100644 pkg/api/handlers/files.go create mode 100644 pkg/controller/handlers/agents/agents.go create mode 100644 pkg/workspace/knowledge.go create mode 100644 pkg/workspace/workspace.go diff --git a/.gitignore b/.gitignore index 53fbd60c..8c54fd40 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,8 @@ vendor/ # Ignore OS specific files .DS_Store Thumbs.db + +# Ignore local DB files +otto.db +otto.db-* + diff --git a/pkg/agents/render.go b/pkg/agents/render.go index d7e2814c..a38acc17 100644 --- a/pkg/agents/render.go +++ b/pkg/agents/render.go @@ -8,6 +8,7 @@ import ( "github.com/gptscript-ai/go-gptscript" "github.com/gptscript-ai/otto/pkg/storage" "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + "github.com/gptscript-ai/otto/pkg/workspace" kclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -15,27 +16,37 @@ var DefaultAgentParams = []string{ "message", "Message to send to the agent", } -func Render(ctx context.Context, db storage.Client, namespace string, manifest v1.Manifest) ([]gptscript.ToolDef, error) { +func Render(ctx context.Context, db storage.Client, agent *v1.Agent, thread *v1.Thread, knowledgeTool, knowledgeBin string) ([]gptscript.ToolDef, []string, error) { + var extraEnv []string t := []gptscript.ToolDef{{ - Name: manifest.Name, - Description: manifest.Description, + Name: agent.Spec.Manifest.Name, + Description: agent.Spec.Manifest.Description, Chat: true, - Tools: manifest.Tools, - Arguments: manifest.GetParams(), - Instructions: manifest.Prompt, + Tools: agent.Spec.Manifest.Tools, + Arguments: agent.Spec.Manifest.GetParams(), + Instructions: agent.Spec.Manifest.Prompt, Type: "agent", }} - if len(manifest.Agents) == 0 { - return t, nil + if agent.Status.HasKnowledge || thread.Status.HasKnowledge { + t[0].Tools = append(t[0].Tools, knowledgeTool) + extraEnv = append(extraEnv, + fmt.Sprintf("KNOWLEDGE_BIN=%s", knowledgeBin), + fmt.Sprintf("GPTSCRIPT_SCRIPT_ID=%s", workspace.KnowledgeIDFromWorkspaceID(agent.Spec.KnowledgeWorkspaceID)), + fmt.Sprintf("GPTSCRIPT_THREAD_ID=%s", workspace.KnowledgeIDFromWorkspaceID(thread.Spec.KnowledgeWorkspaceID)), + ) } - agents, err := ByName(ctx, db, namespace) + if len(agent.Spec.Manifest.Agents) == 0 { + return t, extraEnv, nil + } + + agents, err := ByName(ctx, db, agent.Namespace) if err != nil { - return nil, err + return nil, nil, err } - for _, agentRef := range manifest.Agents { + for _, agentRef := range agent.Spec.Manifest.Agents { agent, ok := agents[agentRef] if !ok { continue @@ -61,7 +72,7 @@ func Render(ctx context.Context, db storage.Client, namespace string, manifest v t = append(t, toolDef) } - return t, nil + return t, extraEnv, nil } func ByName(ctx context.Context, db storage.Client, namespace string) (map[string]v1.Agent, error) { diff --git a/pkg/api/handlers/agent.go b/pkg/api/handlers/agent.go index e1fb28f6..e5bf6afc 100644 --- a/pkg/api/handlers/agent.go +++ b/pkg/api/handlers/agent.go @@ -3,6 +3,7 @@ package handlers import ( "context" "errors" + "fmt" "net/http" "github.com/BurntSushi/toml" @@ -142,7 +143,7 @@ func (a *AgentHandler) Create(ctx context.Context, req api.Request) error { return err } - req.ResponseWriter.WriteHeader(http.StatusCreated) + req.WriteHeader(http.StatusCreated) return req.JSON(convertAgent(agent, api.GetURLPrefix(req))) } @@ -172,3 +173,114 @@ func (a *AgentHandler) List(_ context.Context, req api.Request) error { return req.JSON(resp) } + +func (a *AgentHandler) Files(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + return listFiles(ctx, req, a.WorkspaceClient, agent.Spec.WorkspaceID) +} + +func (a *AgentHandler) UploadFile(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + return uploadFile(ctx, req, a.WorkspaceClient, agent.Spec.WorkspaceID) +} + +func (a *AgentHandler) DeleteFile(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + filename = req.Request.PathValue("file") + agent v2.Agent + ) + + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + return deleteFile(ctx, req, a.WorkspaceClient, agent.Spec.WorkspaceID, filename) +} + +func (a *AgentHandler) Knowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + return listFiles(ctx, req, a.WorkspaceClient, agent.Spec.KnowledgeWorkspaceID) +} + +func (a *AgentHandler) UploadKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + if err := uploadFile(ctx, req, a.WorkspaceClient, agent.Spec.KnowledgeWorkspaceID); err != nil { + return err + } + + agent.Status.IngestKnowledge = true + agent.Status.HasKnowledge = true + return req.Storage.Status().Update(ctx, &agent) +} + +func (a *AgentHandler) DeleteKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + filename = req.Request.PathValue("file") + agent v2.Agent + ) + + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + if err := deleteFile(ctx, req, a.WorkspaceClient, agent.Spec.KnowledgeWorkspaceID, filename); err != nil { + return err + } + + agent.Status.IngestKnowledge = true + return req.Storage.Status().Update(ctx, &agent) +} + +func (a *AgentHandler) IngestKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + files, err := a.WorkspaceClient.Ls(ctx, agent.Spec.KnowledgeWorkspaceID) + if err != nil { + return err + } + + req.WriteHeader(http.StatusNoContent) + + if len(files) == 0 && !agent.Status.HasKnowledge { + return nil + } + + agent.Status.IngestKnowledge = true + return req.Storage.Status().Update(ctx, &agent) +} diff --git a/pkg/api/handlers/files.go b/pkg/api/handlers/files.go new file mode 100644 index 00000000..e9039d9a --- /dev/null +++ b/pkg/api/handlers/files.go @@ -0,0 +1,52 @@ +package handlers + +import ( + "context" + "fmt" + "io" + "net/http" + + "github.com/gptscript-ai/otto/pkg/api" + "github.com/gptscript-ai/otto/pkg/api/types" + wclient "github.com/thedadams/workspace-provider/pkg/client" +) + +func listFiles(ctx context.Context, req api.Request, wc *wclient.Client, workspaceID string) error { + files, err := wc.Ls(ctx, workspaceID) + if err != nil { + return fmt.Errorf("failed to list files in workspace %q: %w", workspaceID, err) + } + + return req.JSON(types.FileList{Items: files}) +} + +func uploadFile(ctx context.Context, req api.Request, wc *wclient.Client, workspaceID string) error { + file := req.Request.PathValue("file") + if file == "" { + return fmt.Errorf("file path parameter is required") + } + + writer, err := wc.WriteFile(ctx, workspaceID, file) + if err != nil { + return fmt.Errorf("failed to upload file %q to workspace %q: %w", file, workspaceID, err) + } + + _, err = io.Copy(writer, req.Request.Body) + if err != nil { + return fmt.Errorf("failed to write file %q to workspace %q: %w", file, workspaceID, err) + } + + req.WriteHeader(http.StatusCreated) + + return nil +} + +func deleteFile(ctx context.Context, req api.Request, wc *wclient.Client, workspaceID, filename string) error { + if err := wc.DeleteFile(ctx, workspaceID, filename); err != nil { + return fmt.Errorf("failed to delete file %q from workspace %q: %w", filename, workspaceID, err) + } + + req.WriteHeader(http.StatusNoContent) + + return nil +} diff --git a/pkg/api/handlers/threads.go b/pkg/api/handlers/threads.go index f0f6fb73..88a8d3e3 100644 --- a/pkg/api/handlers/threads.go +++ b/pkg/api/handlers/threads.go @@ -2,13 +2,17 @@ package handlers import ( "context" + "fmt" + "net/http" "github.com/gptscript-ai/otto/pkg/api" "github.com/gptscript-ai/otto/pkg/api/types" "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + wclient "github.com/thedadams/workspace-provider/pkg/client" ) type ThreadHandler struct { + WorkspaceClient *wclient.Client } func convertThread(thread v1.Thread) types.Thread { @@ -43,3 +47,113 @@ func (a *ThreadHandler) List(_ context.Context, req api.Request) error { return req.JSON(resp) } +func (a *ThreadHandler) Files(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + return listFiles(ctx, req, a.WorkspaceClient, thread.Spec.WorkspaceID) +} + +func (a *ThreadHandler) UploadFile(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + return uploadFile(ctx, req, a.WorkspaceClient, thread.Spec.WorkspaceID) +} + +func (a *ThreadHandler) DeleteFile(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + filename = req.Request.PathValue("file") + thread v1.Thread + ) + + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + return deleteFile(ctx, req, a.WorkspaceClient, thread.Spec.WorkspaceID, filename) +} + +func (a *ThreadHandler) Knowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + return listFiles(ctx, req, a.WorkspaceClient, thread.Spec.KnowledgeWorkspaceID) +} + +func (a *ThreadHandler) UploadKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + if err := uploadFile(ctx, req, a.WorkspaceClient, thread.Spec.KnowledgeWorkspaceID); err != nil { + return err + } + + thread.Status.IngestKnowledge = true + thread.Status.HasKnowledge = true + return req.Storage.Status().Update(ctx, &thread) +} + +func (a *ThreadHandler) DeleteKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + filename = req.Request.PathValue("file") + thread v1.Thread + ) + + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + if err := deleteFile(ctx, req, a.WorkspaceClient, thread.Spec.KnowledgeWorkspaceID, filename); err != nil { + return err + } + + thread.Status.IngestKnowledge = true + return req.Storage.Status().Update(ctx, &thread) +} + +func (a *ThreadHandler) IngestKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + files, err := a.WorkspaceClient.Ls(ctx, thread.Spec.KnowledgeWorkspaceID) + if err != nil { + return err + } + + req.WriteHeader(http.StatusNoContent) + + if len(files) == 0 && !thread.Status.HasKnowledge { + return nil + } + + thread.Status.IngestKnowledge = true + return req.Storage.Status().Update(ctx, &thread) +} diff --git a/pkg/api/router/router.go b/pkg/api/router/router.go index 91252446..e4659bc8 100644 --- a/pkg/api/router/router.go +++ b/pkg/api/router/router.go @@ -15,8 +15,12 @@ func Router(services *services.Services) (http.Handler, error) { WorkspaceClient: services.WorkspaceClient, WorkspaceProvider: "directory", } - invoker := handlers.InvokeHandler{Invoker: services.Invoker} - threads := handlers.ThreadHandler{} + invoker := handlers.InvokeHandler{ + Invoker: services.Invoker, + } + threads := handlers.ThreadHandler{ + WorkspaceClient: services.WorkspaceClient, + } runs := handlers.RunHandler{} // Agents @@ -24,6 +28,15 @@ func Router(services *services.Services) (http.Handler, error) { mux.Handle("POST /agents", w(agents.Create)) mux.Handle("PUT /agents/{id}", w(agents.Update)) mux.Handle("DELETE /agents/{id}", w(agents.Delete)) + // Agent files + mux.Handle("GET /agents/{id}/files", w(agents.Files)) + mux.Handle("POST /agents/{id}/files/{file}", w(agents.UploadFile)) + mux.Handle("DELETE /agents/{id}/files/{file}", w(agents.DeleteFile)) + // Agent knowledge files + mux.Handle("GET /agents/{id}/knowledge", w(agents.Knowledge)) + mux.Handle("POST /agents/{id}/knowledge", w(agents.IngestKnowledge)) + mux.Handle("POST /agents/{id}/knowledge/{file}", w(agents.UploadKnowledge)) + mux.Handle("DELETE /agents/{id}/knowledge/{file}", w(agents.DeleteKnowledge)) // Invoker mux.Handle("POST /invoke/{agent}", w(invoker.Invoke)) @@ -32,6 +45,15 @@ func Router(services *services.Services) (http.Handler, error) { // Threads mux.Handle("GET /threads", w(threads.List)) mux.Handle("GET /agents/{agent}/threads", w(threads.List)) + // Thread files + mux.Handle("GET /threads/{id}/files", w(threads.Files)) + mux.Handle("POST /threads/{id}/files/{file}", w(threads.UploadFile)) + mux.Handle("DELETE /threads/{id}/files/{file}", w(threads.DeleteFile)) + // Thread knowledge files + mux.Handle("GET /threads/{id}/knowledge", w(threads.Knowledge)) + mux.Handle("POST /threads/{id}/knowledge", w(threads.IngestKnowledge)) + mux.Handle("POST /threads/{id}/knowledge/{file}", w(threads.UploadKnowledge)) + mux.Handle("DELETE /threads/{id}/knowledge/{file}", w(threads.DeleteKnowledge)) // Runs mux.Handle("GET /runs", w(runs.List)) diff --git a/pkg/api/types/types.go b/pkg/api/types/types.go index 1ea3ab73..ddef8fa1 100644 --- a/pkg/api/types/types.go +++ b/pkg/api/types/types.go @@ -36,6 +36,8 @@ type Thread struct { type ThreadList List[Thread] +type FileList List[string] + type Run struct { ID string `json:"id,omitempty"` Created time.Time `json:"created,omitempty"` diff --git a/pkg/controller/handlers/agents/agents.go b/pkg/controller/handlers/agents/agents.go new file mode 100644 index 00000000..81e38066 --- /dev/null +++ b/pkg/controller/handlers/agents/agents.go @@ -0,0 +1,47 @@ +package agents + +import ( + "fmt" + "os/exec" + + "github.com/acorn-io/baaah/pkg/router" + v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + "github.com/gptscript-ai/otto/pkg/workspace" + wclient "github.com/thedadams/workspace-provider/pkg/client" +) + +func RemoveWorkspaces(wc *wclient.Client, knowledgeBin string) router.HandlerFunc { + return func(req router.Request, resp router.Response) error { + agent := req.Object.(*v1.Agent) + if err := wc.Rm(req.Ctx, agent.Spec.WorkspaceID); err != nil { + return err + } + + if agent.Status.HasKnowledge { + if err := exec.Command(knowledgeBin, "delete-dataset", agent.Spec.KnowledgeWorkspaceID).Run(); err != nil { + return fmt.Errorf("failed to delete knowledge dataset: %w", err) + } + } + + if agent.Spec.KnowledgeWorkspaceID != "" { + return wc.Rm(req.Ctx, agent.Spec.KnowledgeWorkspaceID) + } + return nil + } +} + +func IngestKnowledge(knowledgeBin string) router.HandlerFunc { + return func(req router.Request, resp router.Response) error { + agent := req.Object.(*v1.Agent) + if !agent.Status.IngestKnowledge || !agent.Status.HasKnowledge { + return nil + } + + if err := workspace.IngestKnowledge(knowledgeBin, agent.Spec.KnowledgeWorkspaceID); err != nil { + return err + } + + agent.Status.IngestKnowledge = false + return nil + } +} diff --git a/pkg/controller/handlers/threads/threads.go b/pkg/controller/handlers/threads/threads.go index 4fc14d3d..f0f999a7 100644 --- a/pkg/controller/handlers/threads/threads.go +++ b/pkg/controller/handlers/threads/threads.go @@ -1,20 +1,62 @@ package threads import ( + "fmt" + "os/exec" + "github.com/acorn-io/baaah/pkg/router" v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + "github.com/gptscript-ai/otto/pkg/workspace" + wclient "github.com/thedadams/workspace-provider/pkg/client" apierrors "k8s.io/apimachinery/pkg/api/errors" ) func Cleanup(req router.Request, resp router.Response) error { - run := req.Object.(*v1.Thread) + thread := req.Object.(*v1.Thread) var agent v1.Agent - if err := req.Get(&agent, run.Namespace, run.Spec.AgentName); apierrors.IsNotFound(err) { - return req.Client.Delete(req.Ctx, run) + if err := req.Get(&agent, thread.Namespace, thread.Spec.AgentName); apierrors.IsNotFound(err) { + return req.Client.Delete(req.Ctx, thread) } else if err != nil { return err } return nil } + +func RemoveWorkspace(wc *wclient.Client, knowledgeBin string) router.HandlerFunc { + return func(req router.Request, resp router.Response) error { + thread := req.Object.(*v1.Thread) + if err := wc.Rm(req.Ctx, thread.Spec.WorkspaceID); err != nil { + return err + } + + if thread.Status.HasKnowledge { + if err := exec.Command(knowledgeBin, "delete-dataset", thread.Spec.KnowledgeWorkspaceID).Run(); err != nil { + return fmt.Errorf("failed to delete knowledge dataset: %w", err) + } + } + + if thread.Spec.KnowledgeWorkspaceID != "" { + return wc.Rm(req.Ctx, thread.Spec.KnowledgeWorkspaceID) + } + + return nil + } +} + +func IngestKnowledge(knowledgeBin string) router.HandlerFunc { + return func(req router.Request, resp router.Response) error { + thread := req.Object.(*v1.Thread) + if !thread.Status.IngestKnowledge || !thread.Status.HasKnowledge { + return nil + } + + if err := workspace.IngestKnowledge(knowledgeBin, thread.Spec.KnowledgeWorkspaceID); err != nil { + return err + } + + thread.Status.IngestKnowledge = false + return nil + } +} diff --git a/pkg/controller/routes.go b/pkg/controller/routes.go index b199c3ba..77745213 100644 --- a/pkg/controller/routes.go +++ b/pkg/controller/routes.go @@ -4,6 +4,7 @@ import ( "github.com/acorn-io/baaah/pkg/apply" "github.com/acorn-io/baaah/pkg/conditions" "github.com/acorn-io/baaah/pkg/router" + "github.com/gptscript-ai/otto/pkg/controller/handlers/agents" "github.com/gptscript-ai/otto/pkg/controller/handlers/runs" "github.com/gptscript-ai/otto/pkg/controller/handlers/threads" "github.com/gptscript-ai/otto/pkg/services" @@ -16,8 +17,13 @@ func routes(router *router.Router, services *services.Services) error { root.Type(&v1.Run{}).FinalizeFunc(v1.RunFinalizer, runs.DeleteRunState) root.Type(&v1.Run{}).HandlerFunc(runs.Cleanup) + root.Type(&v1.Thread{}).FinalizeFunc(v1.ThreadFinalizer, threads.RemoveWorkspace(services.WorkspaceClient, services.KnowledgeBin)) + root.Type(&v1.Thread{}).HandlerFunc(threads.IngestKnowledge(services.KnowledgeBin)) root.Type(&v1.Thread{}).HandlerFunc(threads.Cleanup) + root.Type(&v1.Agent{}).FinalizeFunc(v1.AgentFinalizer, agents.RemoveWorkspaces(services.WorkspaceClient, services.KnowledgeBin)) + root.Type(&v1.Agent{}).HandlerFunc(agents.IngestKnowledge(services.KnowledgeBin)) + return nil } diff --git a/pkg/invoke/invoker.go b/pkg/invoke/invoker.go index 0da1326a..e8a810e2 100644 --- a/pkg/invoke/invoker.go +++ b/pkg/invoke/invoker.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "errors" - "os" "strings" "sync" "time" @@ -17,6 +16,7 @@ import ( "github.com/gptscript-ai/otto/pkg/jwt" "github.com/gptscript-ai/otto/pkg/storage" v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + "github.com/gptscript-ai/otto/pkg/workspace" wclient "github.com/thedadams/workspace-provider/pkg/client" apierror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,15 +29,19 @@ type Invoker struct { tokenService *jwt.TokenService workspaceClient *wclient.Client threadWorkspaceProvider string + knowledgeTool string + knowledgeBin string } -func NewInvoker(storage storage.Client, gptClient *gptscript.GPTScript, tokenService *jwt.TokenService, workspaceClient *wclient.Client) *Invoker { +func NewInvoker(storage storage.Client, gptClient *gptscript.GPTScript, tokenService *jwt.TokenService, workspaceClient *wclient.Client, knowledgeTool, knowledgeBin string) *Invoker { return &Invoker{ storage: storage, gptClient: gptClient, tokenService: tokenService, workspaceClient: workspaceClient, threadWorkspaceProvider: "directory", + knowledgeTool: knowledgeTool, + knowledgeBin: knowledgeBin, } } @@ -51,11 +55,6 @@ type Options struct { ThreadName string } -func getWorkspace(thread *v1.Thread) string { - _, path, _ := strings.Cut(thread.Spec.WorkspaceID, "://") - return path -} - func (i *Invoker) getThread(ctx context.Context, agent *v1.Agent, input, threadName string) (*v1.Thread, error) { var ( thread v1.Thread @@ -75,6 +74,11 @@ func (i *Invoker) getThread(ctx context.Context, agent *v1.Agent, input, threadN return nil, err } + knowledgeWorkspaceID, err := i.workspaceClient.Create(ctx, i.threadWorkspaceProvider, agent.Spec.KnowledgeWorkspaceID) + if err != nil { + return nil, err + } + thread = v1.Thread{ ObjectMeta: metav1.ObjectMeta{ Name: createName, @@ -82,9 +86,10 @@ func (i *Invoker) getThread(ctx context.Context, agent *v1.Agent, input, threadN Namespace: agent.Namespace, }, Spec: v1.ThreadSpec{ - AgentName: agent.Name, - Input: input, - WorkspaceID: workspaceID, + AgentName: agent.Name, + Input: input, + WorkspaceID: workspaceID, + KnowledgeWorkspaceID: knowledgeWorkspaceID, }, } if err := i.storage.Create(ctx, &thread); err != nil { @@ -123,7 +128,7 @@ func (i *Invoker) Invoke(ctx context.Context, agent *v1.Agent, input string, opt return nil, err } - tools, err := agents.Render(ctx, i.storage, agent.Namespace, agent.Spec.Manifest) + tools, extraEnv, err := agents.Render(ctx, i.storage, agent, thread, i.knowledgeTool, i.knowledgeBin) if err != nil { return nil, err } @@ -148,6 +153,7 @@ func (i *Invoker) Invoke(ctx context.Context, agent *v1.Agent, input string, opt AgentName: agent.Name, PreviousRunName: thread.Status.LastRunName, Input: input, + ExtraEnv: extraEnv, }, } @@ -169,14 +175,15 @@ func (i *Invoker) Invoke(ctx context.Context, agent *v1.Agent, input string, opt runResp, err := i.gptClient.Evaluate(ctx, gptscript.Options{ GlobalOptions: gptscript.GlobalOptions{ - Env: append(os.Environ(), + Env: append(extraEnv, "OTTO_TOKEN="+token, "OTTO_RUN_ID="+run.Name, "OTTO_THREAD_ID="+thread.Name, - "OTTO_AGENT_ID="+agent.Name), + "OTTO_AGENT_ID="+agent.Name, + ), }, Input: input, - Workspace: getWorkspace(thread), + Workspace: workspace.GetDir(thread.Spec.WorkspaceID), ChatState: chatState, IncludeEvents: true, }, tools...) diff --git a/pkg/services/config.go b/pkg/services/config.go index 553d66fe..2360dba2 100644 --- a/pkg/services/config.go +++ b/pkg/services/config.go @@ -30,6 +30,8 @@ type Services struct { TokenServer *jwt.TokenService APIServer *api.Server WorkspaceClient *wclient.Client + KnowledgeBin string + KnowledgeTool string } func New(ctx context.Context, config Config) (*Services, error) { @@ -65,6 +67,8 @@ func New(ctx context.Context, config Config) (*Services, error) { APIServer: api.NewServer(storageClient, c, tokenServer), TokenServer: tokenServer, WorkspaceClient: workspaceClient, - Invoker: invoke.NewInvoker(storageClient, c, tokenServer, workspaceClient), + Invoker: invoke.NewInvoker(storageClient, c, tokenServer, workspaceClient, config.KnowledgeTool, config.KnowledgeBin), + KnowledgeBin: config.KnowledgeBin, + KnowledgeTool: config.KnowledgeTool, }, nil } diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/agent.go b/pkg/storage/apis/otto.gptscript.ai/v1/agent.go index 2550995a..a7b3a22e 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/agent.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/agent.go @@ -36,7 +36,9 @@ type AgentSpec struct { } type AgentStatus struct { - Conditions []metav1.Condition `json:"conditions,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty"` + HasKnowledge bool `json:"hasKnowledge,omitempty"` + IngestKnowledge bool `json:"ingestKnowledge,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/run.go b/pkg/storage/apis/otto.gptscript.ai/v1/run.go index ab714732..cef1a9dc 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/run.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/run.go @@ -11,7 +11,9 @@ var ( ) const ( - RunFinalizer = "otto.gptscript.ai/run" + RunFinalizer = "otto.gptscript.ai/run" + ThreadFinalizer = "otto.gptscript.ai/thread" + AgentFinalizer = "otto.gptscript.ai/agent" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -40,10 +42,11 @@ type ToolProgress struct { } type RunSpec struct { - ThreadName string `json:"threadName,omitempty"` - AgentName string `json:"agentName,omitempty"` - PreviousRunName string `json:"previousRunName,omitempty"` - Input string `json:"input"` + ThreadName string `json:"threadName,omitempty"` + AgentName string `json:"agentName,omitempty"` + PreviousRunName string `json:"previousRunName,omitempty"` + Input string `json:"input"` + ExtraEnv []string `json:"extraEnv,omitempty"` } type RunStatus struct { diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/thread.go b/pkg/storage/apis/otto.gptscript.ai/v1/thread.go index e974925f..c44cc41c 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/thread.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/thread.go @@ -25,18 +25,21 @@ func (in *Thread) GetConditions() *[]metav1.Condition { } type ThreadSpec struct { - Input string `json:"input,omitempty"` - AgentName string `json:"agentName,omitempty"` - WorkspaceID string `json:"workspaceID,omitempty"` + Input string `json:"input,omitempty"` + AgentName string `json:"agentName,omitempty"` + WorkspaceID string `json:"workspaceID,omitempty"` + KnowledgeWorkspaceID string `json:"knowledgeWorkspaceID,omitempty"` } type ThreadStatus struct { - Description string `json:"description,omitempty"` - LastRunName string `json:"lastRunName,omitempty"` - LastRunState gptscriptclient.RunState `json:"lastRunState,omitempty"` - LastRunOutput string `json:"lastRunOutput,omitempty"` - LastRunError string `json:"lastRunError,omitempty"` - Conditions []metav1.Condition `json:"conditions,omitempty"` + Description string `json:"description,omitempty"` + LastRunName string `json:"lastRunName,omitempty"` + LastRunState gptscriptclient.RunState `json:"lastRunState,omitempty"` + LastRunOutput string `json:"lastRunOutput,omitempty"` + LastRunError string `json:"lastRunError,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty"` + HasKnowledge bool `json:"hasKnowledge,omitempty"` + IngestKnowledge bool `json:"ingestKnowledge,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go b/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go index 7579df8f..7600fde7 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go @@ -160,7 +160,7 @@ func (in *Run) DeepCopyInto(out *Run) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -217,6 +217,11 @@ func (in *RunList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RunSpec) DeepCopyInto(out *RunSpec) { *out = *in + if in.ExtraEnv != nil { + in, out := &in.ExtraEnv, &out.ExtraEnv + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunSpec. diff --git a/pkg/storage/openapi/generated/openapi_generated.go b/pkg/storage/openapi/generated/openapi_generated.go index 2f265c97..1756e602 100644 --- a/pkg/storage/openapi/generated/openapi_generated.go +++ b/pkg/storage/openapi/generated/openapi_generated.go @@ -258,6 +258,18 @@ func schema_storage_apis_ottogptscriptai_v1_AgentStatus(ref common.ReferenceCall }, }, }, + "hasKnowledge": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, + "ingestKnowledge": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, @@ -510,6 +522,20 @@ func schema_storage_apis_ottogptscriptai_v1_RunSpec(ref common.ReferenceCallback Format: "", }, }, + "extraEnv": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, Required: []string{"input"}, }, @@ -970,6 +996,12 @@ func schema_storage_apis_ottogptscriptai_v1_ThreadSpec(ref common.ReferenceCallb Format: "", }, }, + "knowledgeWorkspaceID": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, @@ -1025,6 +1057,18 @@ func schema_storage_apis_ottogptscriptai_v1_ThreadStatus(ref common.ReferenceCal }, }, }, + "hasKnowledge": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, + "ingestKnowledge": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/storage/services/config.go b/pkg/storage/services/config.go index 3c6dab22..a735e424 100644 --- a/pkg/storage/services/config.go +++ b/pkg/storage/services/config.go @@ -15,7 +15,9 @@ type Config struct { StorageToken string `usage:"Token for storage access, will be generated if not passed"` //AuditLogPath string `usage:"Location of where to store audit logs"` //AuditLogPolicyFile string `usage:"Location of audit log policy file"` - DSN string `usage:"Database dsn in driver://connection_string format" default:"sqlite://file:otto.db?_journal=WAL&cache=shared&_busy_timeout=30000"` + DSN string `usage:"Database dsn in driver://connection_string format" default:"sqlite://file:otto.db?_journal=WAL&cache=shared&_busy_timeout=30000"` + KnowledgeBin string `usage:"Location of knowledge binary" default:"knowledge" env:"KNOWLEDGE_BIN"` + KnowledgeTool string `usage:"The knowledge tool to use" default:"github.com/gptscript-ai/knowledge/gateway@v0.4.14-rc.2" env:"KNOWLEDGE_TOOL"` } type Services struct { diff --git a/pkg/workspace/knowledge.go b/pkg/workspace/knowledge.go new file mode 100644 index 00000000..6361c377 --- /dev/null +++ b/pkg/workspace/knowledge.go @@ -0,0 +1,19 @@ +package workspace + +import ( + "fmt" + "os/exec" + "strings" +) + +func KnowledgeIDFromWorkspaceID(workspaceID string) string { + return strings.ReplaceAll(workspaceID, " ", "_") +} + +func IngestKnowledge(knowledgeBin, knowledgeWorkspaceID string) error { + if err := exec.Command(knowledgeBin, "ingest", "--prune", "--dataset", KnowledgeIDFromWorkspaceID(knowledgeWorkspaceID), GetDir(knowledgeWorkspaceID)).Run(); err != nil { + return fmt.Errorf("failed to ingest agent knowledge dataset: %w", err) + } + + return nil +} diff --git a/pkg/workspace/workspace.go b/pkg/workspace/workspace.go new file mode 100644 index 00000000..cc075943 --- /dev/null +++ b/pkg/workspace/workspace.go @@ -0,0 +1,10 @@ +package workspace + +import ( + "strings" +) + +func GetDir(workspaceID string) string { + _, path, _ := strings.Cut(workspaceID, "://") + return path +}