diff --git a/.gitignore b/.gitignore index 53fbd60c3..8c54fd40f 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,8 @@ vendor/ # Ignore OS specific files .DS_Store Thumbs.db + +# Ignore local DB files +otto.db +otto.db-* + diff --git a/pkg/agents/render.go b/pkg/agents/render.go index d7e2814c6..a38acc179 100644 --- a/pkg/agents/render.go +++ b/pkg/agents/render.go @@ -8,6 +8,7 @@ import ( "github.com/gptscript-ai/go-gptscript" "github.com/gptscript-ai/otto/pkg/storage" "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + "github.com/gptscript-ai/otto/pkg/workspace" kclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -15,27 +16,37 @@ var DefaultAgentParams = []string{ "message", "Message to send to the agent", } -func Render(ctx context.Context, db storage.Client, namespace string, manifest v1.Manifest) ([]gptscript.ToolDef, error) { +func Render(ctx context.Context, db storage.Client, agent *v1.Agent, thread *v1.Thread, knowledgeTool, knowledgeBin string) ([]gptscript.ToolDef, []string, error) { + var extraEnv []string t := []gptscript.ToolDef{{ - Name: manifest.Name, - Description: manifest.Description, + Name: agent.Spec.Manifest.Name, + Description: agent.Spec.Manifest.Description, Chat: true, - Tools: manifest.Tools, - Arguments: manifest.GetParams(), - Instructions: manifest.Prompt, + Tools: agent.Spec.Manifest.Tools, + Arguments: agent.Spec.Manifest.GetParams(), + Instructions: agent.Spec.Manifest.Prompt, Type: "agent", }} - if len(manifest.Agents) == 0 { - return t, nil + if agent.Status.HasKnowledge || thread.Status.HasKnowledge { + t[0].Tools = append(t[0].Tools, knowledgeTool) + extraEnv = append(extraEnv, + fmt.Sprintf("KNOWLEDGE_BIN=%s", knowledgeBin), + fmt.Sprintf("GPTSCRIPT_SCRIPT_ID=%s", workspace.KnowledgeIDFromWorkspaceID(agent.Spec.KnowledgeWorkspaceID)), + fmt.Sprintf("GPTSCRIPT_THREAD_ID=%s", workspace.KnowledgeIDFromWorkspaceID(thread.Spec.KnowledgeWorkspaceID)), + ) } - agents, err := ByName(ctx, db, namespace) + if len(agent.Spec.Manifest.Agents) == 0 { + return t, extraEnv, nil + } + + agents, err := ByName(ctx, db, agent.Namespace) if err != nil { - return nil, err + return nil, nil, err } - for _, agentRef := range manifest.Agents { + for _, agentRef := range agent.Spec.Manifest.Agents { agent, ok := agents[agentRef] if !ok { continue @@ -61,7 +72,7 @@ func Render(ctx context.Context, db storage.Client, namespace string, manifest v t = append(t, toolDef) } - return t, nil + return t, extraEnv, nil } func ByName(ctx context.Context, db storage.Client, namespace string) (map[string]v1.Agent, error) { diff --git a/pkg/api/handlers/agent.go b/pkg/api/handlers/agent.go index e1fb28f6c..e5bf6afcf 100644 --- a/pkg/api/handlers/agent.go +++ b/pkg/api/handlers/agent.go @@ -3,6 +3,7 @@ package handlers import ( "context" "errors" + "fmt" "net/http" "github.com/BurntSushi/toml" @@ -142,7 +143,7 @@ func (a *AgentHandler) Create(ctx context.Context, req api.Request) error { return err } - req.ResponseWriter.WriteHeader(http.StatusCreated) + req.WriteHeader(http.StatusCreated) return req.JSON(convertAgent(agent, api.GetURLPrefix(req))) } @@ -172,3 +173,114 @@ func (a *AgentHandler) List(_ context.Context, req api.Request) error { return req.JSON(resp) } + +func (a *AgentHandler) Files(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + return listFiles(ctx, req, a.WorkspaceClient, agent.Spec.WorkspaceID) +} + +func (a *AgentHandler) UploadFile(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + return uploadFile(ctx, req, a.WorkspaceClient, agent.Spec.WorkspaceID) +} + +func (a *AgentHandler) DeleteFile(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + filename = req.Request.PathValue("file") + agent v2.Agent + ) + + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + return deleteFile(ctx, req, a.WorkspaceClient, agent.Spec.WorkspaceID, filename) +} + +func (a *AgentHandler) Knowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + return listFiles(ctx, req, a.WorkspaceClient, agent.Spec.KnowledgeWorkspaceID) +} + +func (a *AgentHandler) UploadKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + if err := uploadFile(ctx, req, a.WorkspaceClient, agent.Spec.KnowledgeWorkspaceID); err != nil { + return err + } + + agent.Status.IngestKnowledge = true + agent.Status.HasKnowledge = true + return req.Storage.Status().Update(ctx, &agent) +} + +func (a *AgentHandler) DeleteKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + filename = req.Request.PathValue("file") + agent v2.Agent + ) + + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + if err := deleteFile(ctx, req, a.WorkspaceClient, agent.Spec.KnowledgeWorkspaceID, filename); err != nil { + return err + } + + agent.Status.IngestKnowledge = true + return req.Storage.Status().Update(ctx, &agent) +} + +func (a *AgentHandler) IngestKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + agent v2.Agent + ) + if err := req.Get(&agent, id); err != nil { + return fmt.Errorf("failed to get agent with id %s: %w", id, err) + } + + files, err := a.WorkspaceClient.Ls(ctx, agent.Spec.KnowledgeWorkspaceID) + if err != nil { + return err + } + + req.WriteHeader(http.StatusNoContent) + + if len(files) == 0 && !agent.Status.HasKnowledge { + return nil + } + + agent.Status.IngestKnowledge = true + return req.Storage.Status().Update(ctx, &agent) +} diff --git a/pkg/api/handlers/files.go b/pkg/api/handlers/files.go new file mode 100644 index 000000000..e9039d9ad --- /dev/null +++ b/pkg/api/handlers/files.go @@ -0,0 +1,52 @@ +package handlers + +import ( + "context" + "fmt" + "io" + "net/http" + + "github.com/gptscript-ai/otto/pkg/api" + "github.com/gptscript-ai/otto/pkg/api/types" + wclient "github.com/thedadams/workspace-provider/pkg/client" +) + +func listFiles(ctx context.Context, req api.Request, wc *wclient.Client, workspaceID string) error { + files, err := wc.Ls(ctx, workspaceID) + if err != nil { + return fmt.Errorf("failed to list files in workspace %q: %w", workspaceID, err) + } + + return req.JSON(types.FileList{Items: files}) +} + +func uploadFile(ctx context.Context, req api.Request, wc *wclient.Client, workspaceID string) error { + file := req.Request.PathValue("file") + if file == "" { + return fmt.Errorf("file path parameter is required") + } + + writer, err := wc.WriteFile(ctx, workspaceID, file) + if err != nil { + return fmt.Errorf("failed to upload file %q to workspace %q: %w", file, workspaceID, err) + } + + _, err = io.Copy(writer, req.Request.Body) + if err != nil { + return fmt.Errorf("failed to write file %q to workspace %q: %w", file, workspaceID, err) + } + + req.WriteHeader(http.StatusCreated) + + return nil +} + +func deleteFile(ctx context.Context, req api.Request, wc *wclient.Client, workspaceID, filename string) error { + if err := wc.DeleteFile(ctx, workspaceID, filename); err != nil { + return fmt.Errorf("failed to delete file %q from workspace %q: %w", filename, workspaceID, err) + } + + req.WriteHeader(http.StatusNoContent) + + return nil +} diff --git a/pkg/api/handlers/threads.go b/pkg/api/handlers/threads.go index f0f6fb73a..88a8d3e34 100644 --- a/pkg/api/handlers/threads.go +++ b/pkg/api/handlers/threads.go @@ -2,13 +2,17 @@ package handlers import ( "context" + "fmt" + "net/http" "github.com/gptscript-ai/otto/pkg/api" "github.com/gptscript-ai/otto/pkg/api/types" "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + wclient "github.com/thedadams/workspace-provider/pkg/client" ) type ThreadHandler struct { + WorkspaceClient *wclient.Client } func convertThread(thread v1.Thread) types.Thread { @@ -43,3 +47,113 @@ func (a *ThreadHandler) List(_ context.Context, req api.Request) error { return req.JSON(resp) } +func (a *ThreadHandler) Files(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + return listFiles(ctx, req, a.WorkspaceClient, thread.Spec.WorkspaceID) +} + +func (a *ThreadHandler) UploadFile(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + return uploadFile(ctx, req, a.WorkspaceClient, thread.Spec.WorkspaceID) +} + +func (a *ThreadHandler) DeleteFile(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + filename = req.Request.PathValue("file") + thread v1.Thread + ) + + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + return deleteFile(ctx, req, a.WorkspaceClient, thread.Spec.WorkspaceID, filename) +} + +func (a *ThreadHandler) Knowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + return listFiles(ctx, req, a.WorkspaceClient, thread.Spec.KnowledgeWorkspaceID) +} + +func (a *ThreadHandler) UploadKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + if err := uploadFile(ctx, req, a.WorkspaceClient, thread.Spec.KnowledgeWorkspaceID); err != nil { + return err + } + + thread.Status.IngestKnowledge = true + thread.Status.HasKnowledge = true + return req.Storage.Status().Update(ctx, &thread) +} + +func (a *ThreadHandler) DeleteKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + filename = req.Request.PathValue("file") + thread v1.Thread + ) + + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + if err := deleteFile(ctx, req, a.WorkspaceClient, thread.Spec.KnowledgeWorkspaceID, filename); err != nil { + return err + } + + thread.Status.IngestKnowledge = true + return req.Storage.Status().Update(ctx, &thread) +} + +func (a *ThreadHandler) IngestKnowledge(ctx context.Context, req api.Request) error { + var ( + id = req.Request.PathValue("id") + thread v1.Thread + ) + if err := req.Get(&thread, id); err != nil { + return fmt.Errorf("failed to get thread with id %s: %w", id, err) + } + + files, err := a.WorkspaceClient.Ls(ctx, thread.Spec.KnowledgeWorkspaceID) + if err != nil { + return err + } + + req.WriteHeader(http.StatusNoContent) + + if len(files) == 0 && !thread.Status.HasKnowledge { + return nil + } + + thread.Status.IngestKnowledge = true + return req.Storage.Status().Update(ctx, &thread) +} diff --git a/pkg/api/router/router.go b/pkg/api/router/router.go index 91252446d..e4659bc86 100644 --- a/pkg/api/router/router.go +++ b/pkg/api/router/router.go @@ -15,8 +15,12 @@ func Router(services *services.Services) (http.Handler, error) { WorkspaceClient: services.WorkspaceClient, WorkspaceProvider: "directory", } - invoker := handlers.InvokeHandler{Invoker: services.Invoker} - threads := handlers.ThreadHandler{} + invoker := handlers.InvokeHandler{ + Invoker: services.Invoker, + } + threads := handlers.ThreadHandler{ + WorkspaceClient: services.WorkspaceClient, + } runs := handlers.RunHandler{} // Agents @@ -24,6 +28,15 @@ func Router(services *services.Services) (http.Handler, error) { mux.Handle("POST /agents", w(agents.Create)) mux.Handle("PUT /agents/{id}", w(agents.Update)) mux.Handle("DELETE /agents/{id}", w(agents.Delete)) + // Agent files + mux.Handle("GET /agents/{id}/files", w(agents.Files)) + mux.Handle("POST /agents/{id}/files/{file}", w(agents.UploadFile)) + mux.Handle("DELETE /agents/{id}/files/{file}", w(agents.DeleteFile)) + // Agent knowledge files + mux.Handle("GET /agents/{id}/knowledge", w(agents.Knowledge)) + mux.Handle("POST /agents/{id}/knowledge", w(agents.IngestKnowledge)) + mux.Handle("POST /agents/{id}/knowledge/{file}", w(agents.UploadKnowledge)) + mux.Handle("DELETE /agents/{id}/knowledge/{file}", w(agents.DeleteKnowledge)) // Invoker mux.Handle("POST /invoke/{agent}", w(invoker.Invoke)) @@ -32,6 +45,15 @@ func Router(services *services.Services) (http.Handler, error) { // Threads mux.Handle("GET /threads", w(threads.List)) mux.Handle("GET /agents/{agent}/threads", w(threads.List)) + // Thread files + mux.Handle("GET /threads/{id}/files", w(threads.Files)) + mux.Handle("POST /threads/{id}/files/{file}", w(threads.UploadFile)) + mux.Handle("DELETE /threads/{id}/files/{file}", w(threads.DeleteFile)) + // Thread knowledge files + mux.Handle("GET /threads/{id}/knowledge", w(threads.Knowledge)) + mux.Handle("POST /threads/{id}/knowledge", w(threads.IngestKnowledge)) + mux.Handle("POST /threads/{id}/knowledge/{file}", w(threads.UploadKnowledge)) + mux.Handle("DELETE /threads/{id}/knowledge/{file}", w(threads.DeleteKnowledge)) // Runs mux.Handle("GET /runs", w(runs.List)) diff --git a/pkg/api/types/types.go b/pkg/api/types/types.go index 1ea3ab730..ddef8fa13 100644 --- a/pkg/api/types/types.go +++ b/pkg/api/types/types.go @@ -36,6 +36,8 @@ type Thread struct { type ThreadList List[Thread] +type FileList List[string] + type Run struct { ID string `json:"id,omitempty"` Created time.Time `json:"created,omitempty"` diff --git a/pkg/controller/handlers/agents/agents.go b/pkg/controller/handlers/agents/agents.go new file mode 100644 index 000000000..81e380667 --- /dev/null +++ b/pkg/controller/handlers/agents/agents.go @@ -0,0 +1,47 @@ +package agents + +import ( + "fmt" + "os/exec" + + "github.com/acorn-io/baaah/pkg/router" + v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + "github.com/gptscript-ai/otto/pkg/workspace" + wclient "github.com/thedadams/workspace-provider/pkg/client" +) + +func RemoveWorkspaces(wc *wclient.Client, knowledgeBin string) router.HandlerFunc { + return func(req router.Request, resp router.Response) error { + agent := req.Object.(*v1.Agent) + if err := wc.Rm(req.Ctx, agent.Spec.WorkspaceID); err != nil { + return err + } + + if agent.Status.HasKnowledge { + if err := exec.Command(knowledgeBin, "delete-dataset", agent.Spec.KnowledgeWorkspaceID).Run(); err != nil { + return fmt.Errorf("failed to delete knowledge dataset: %w", err) + } + } + + if agent.Spec.KnowledgeWorkspaceID != "" { + return wc.Rm(req.Ctx, agent.Spec.KnowledgeWorkspaceID) + } + return nil + } +} + +func IngestKnowledge(knowledgeBin string) router.HandlerFunc { + return func(req router.Request, resp router.Response) error { + agent := req.Object.(*v1.Agent) + if !agent.Status.IngestKnowledge || !agent.Status.HasKnowledge { + return nil + } + + if err := workspace.IngestKnowledge(knowledgeBin, agent.Spec.KnowledgeWorkspaceID); err != nil { + return err + } + + agent.Status.IngestKnowledge = false + return nil + } +} diff --git a/pkg/controller/handlers/threads/threads.go b/pkg/controller/handlers/threads/threads.go index 4fc14d3d9..f0f999a7e 100644 --- a/pkg/controller/handlers/threads/threads.go +++ b/pkg/controller/handlers/threads/threads.go @@ -1,20 +1,62 @@ package threads import ( + "fmt" + "os/exec" + "github.com/acorn-io/baaah/pkg/router" v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + "github.com/gptscript-ai/otto/pkg/workspace" + wclient "github.com/thedadams/workspace-provider/pkg/client" apierrors "k8s.io/apimachinery/pkg/api/errors" ) func Cleanup(req router.Request, resp router.Response) error { - run := req.Object.(*v1.Thread) + thread := req.Object.(*v1.Thread) var agent v1.Agent - if err := req.Get(&agent, run.Namespace, run.Spec.AgentName); apierrors.IsNotFound(err) { - return req.Client.Delete(req.Ctx, run) + if err := req.Get(&agent, thread.Namespace, thread.Spec.AgentName); apierrors.IsNotFound(err) { + return req.Client.Delete(req.Ctx, thread) } else if err != nil { return err } return nil } + +func RemoveWorkspace(wc *wclient.Client, knowledgeBin string) router.HandlerFunc { + return func(req router.Request, resp router.Response) error { + thread := req.Object.(*v1.Thread) + if err := wc.Rm(req.Ctx, thread.Spec.WorkspaceID); err != nil { + return err + } + + if thread.Status.HasKnowledge { + if err := exec.Command(knowledgeBin, "delete-dataset", thread.Spec.KnowledgeWorkspaceID).Run(); err != nil { + return fmt.Errorf("failed to delete knowledge dataset: %w", err) + } + } + + if thread.Spec.KnowledgeWorkspaceID != "" { + return wc.Rm(req.Ctx, thread.Spec.KnowledgeWorkspaceID) + } + + return nil + } +} + +func IngestKnowledge(knowledgeBin string) router.HandlerFunc { + return func(req router.Request, resp router.Response) error { + thread := req.Object.(*v1.Thread) + if !thread.Status.IngestKnowledge || !thread.Status.HasKnowledge { + return nil + } + + if err := workspace.IngestKnowledge(knowledgeBin, thread.Spec.KnowledgeWorkspaceID); err != nil { + return err + } + + thread.Status.IngestKnowledge = false + return nil + } +} diff --git a/pkg/controller/routes.go b/pkg/controller/routes.go index b199c3ba5..777452136 100644 --- a/pkg/controller/routes.go +++ b/pkg/controller/routes.go @@ -4,6 +4,7 @@ import ( "github.com/acorn-io/baaah/pkg/apply" "github.com/acorn-io/baaah/pkg/conditions" "github.com/acorn-io/baaah/pkg/router" + "github.com/gptscript-ai/otto/pkg/controller/handlers/agents" "github.com/gptscript-ai/otto/pkg/controller/handlers/runs" "github.com/gptscript-ai/otto/pkg/controller/handlers/threads" "github.com/gptscript-ai/otto/pkg/services" @@ -16,8 +17,13 @@ func routes(router *router.Router, services *services.Services) error { root.Type(&v1.Run{}).FinalizeFunc(v1.RunFinalizer, runs.DeleteRunState) root.Type(&v1.Run{}).HandlerFunc(runs.Cleanup) + root.Type(&v1.Thread{}).FinalizeFunc(v1.ThreadFinalizer, threads.RemoveWorkspace(services.WorkspaceClient, services.KnowledgeBin)) + root.Type(&v1.Thread{}).HandlerFunc(threads.IngestKnowledge(services.KnowledgeBin)) root.Type(&v1.Thread{}).HandlerFunc(threads.Cleanup) + root.Type(&v1.Agent{}).FinalizeFunc(v1.AgentFinalizer, agents.RemoveWorkspaces(services.WorkspaceClient, services.KnowledgeBin)) + root.Type(&v1.Agent{}).HandlerFunc(agents.IngestKnowledge(services.KnowledgeBin)) + return nil } diff --git a/pkg/invoke/invoker.go b/pkg/invoke/invoker.go index 0da1326aa..e8a810e2c 100644 --- a/pkg/invoke/invoker.go +++ b/pkg/invoke/invoker.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "errors" - "os" "strings" "sync" "time" @@ -17,6 +16,7 @@ import ( "github.com/gptscript-ai/otto/pkg/jwt" "github.com/gptscript-ai/otto/pkg/storage" v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1" + "github.com/gptscript-ai/otto/pkg/workspace" wclient "github.com/thedadams/workspace-provider/pkg/client" apierror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -29,15 +29,19 @@ type Invoker struct { tokenService *jwt.TokenService workspaceClient *wclient.Client threadWorkspaceProvider string + knowledgeTool string + knowledgeBin string } -func NewInvoker(storage storage.Client, gptClient *gptscript.GPTScript, tokenService *jwt.TokenService, workspaceClient *wclient.Client) *Invoker { +func NewInvoker(storage storage.Client, gptClient *gptscript.GPTScript, tokenService *jwt.TokenService, workspaceClient *wclient.Client, knowledgeTool, knowledgeBin string) *Invoker { return &Invoker{ storage: storage, gptClient: gptClient, tokenService: tokenService, workspaceClient: workspaceClient, threadWorkspaceProvider: "directory", + knowledgeTool: knowledgeTool, + knowledgeBin: knowledgeBin, } } @@ -51,11 +55,6 @@ type Options struct { ThreadName string } -func getWorkspace(thread *v1.Thread) string { - _, path, _ := strings.Cut(thread.Spec.WorkspaceID, "://") - return path -} - func (i *Invoker) getThread(ctx context.Context, agent *v1.Agent, input, threadName string) (*v1.Thread, error) { var ( thread v1.Thread @@ -75,6 +74,11 @@ func (i *Invoker) getThread(ctx context.Context, agent *v1.Agent, input, threadN return nil, err } + knowledgeWorkspaceID, err := i.workspaceClient.Create(ctx, i.threadWorkspaceProvider, agent.Spec.KnowledgeWorkspaceID) + if err != nil { + return nil, err + } + thread = v1.Thread{ ObjectMeta: metav1.ObjectMeta{ Name: createName, @@ -82,9 +86,10 @@ func (i *Invoker) getThread(ctx context.Context, agent *v1.Agent, input, threadN Namespace: agent.Namespace, }, Spec: v1.ThreadSpec{ - AgentName: agent.Name, - Input: input, - WorkspaceID: workspaceID, + AgentName: agent.Name, + Input: input, + WorkspaceID: workspaceID, + KnowledgeWorkspaceID: knowledgeWorkspaceID, }, } if err := i.storage.Create(ctx, &thread); err != nil { @@ -123,7 +128,7 @@ func (i *Invoker) Invoke(ctx context.Context, agent *v1.Agent, input string, opt return nil, err } - tools, err := agents.Render(ctx, i.storage, agent.Namespace, agent.Spec.Manifest) + tools, extraEnv, err := agents.Render(ctx, i.storage, agent, thread, i.knowledgeTool, i.knowledgeBin) if err != nil { return nil, err } @@ -148,6 +153,7 @@ func (i *Invoker) Invoke(ctx context.Context, agent *v1.Agent, input string, opt AgentName: agent.Name, PreviousRunName: thread.Status.LastRunName, Input: input, + ExtraEnv: extraEnv, }, } @@ -169,14 +175,15 @@ func (i *Invoker) Invoke(ctx context.Context, agent *v1.Agent, input string, opt runResp, err := i.gptClient.Evaluate(ctx, gptscript.Options{ GlobalOptions: gptscript.GlobalOptions{ - Env: append(os.Environ(), + Env: append(extraEnv, "OTTO_TOKEN="+token, "OTTO_RUN_ID="+run.Name, "OTTO_THREAD_ID="+thread.Name, - "OTTO_AGENT_ID="+agent.Name), + "OTTO_AGENT_ID="+agent.Name, + ), }, Input: input, - Workspace: getWorkspace(thread), + Workspace: workspace.GetDir(thread.Spec.WorkspaceID), ChatState: chatState, IncludeEvents: true, }, tools...) diff --git a/pkg/services/config.go b/pkg/services/config.go index 553d66fe0..2360dba2d 100644 --- a/pkg/services/config.go +++ b/pkg/services/config.go @@ -30,6 +30,8 @@ type Services struct { TokenServer *jwt.TokenService APIServer *api.Server WorkspaceClient *wclient.Client + KnowledgeBin string + KnowledgeTool string } func New(ctx context.Context, config Config) (*Services, error) { @@ -65,6 +67,8 @@ func New(ctx context.Context, config Config) (*Services, error) { APIServer: api.NewServer(storageClient, c, tokenServer), TokenServer: tokenServer, WorkspaceClient: workspaceClient, - Invoker: invoke.NewInvoker(storageClient, c, tokenServer, workspaceClient), + Invoker: invoke.NewInvoker(storageClient, c, tokenServer, workspaceClient, config.KnowledgeTool, config.KnowledgeBin), + KnowledgeBin: config.KnowledgeBin, + KnowledgeTool: config.KnowledgeTool, }, nil } diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/agent.go b/pkg/storage/apis/otto.gptscript.ai/v1/agent.go index 2550995a9..a7b3a22e3 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/agent.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/agent.go @@ -36,7 +36,9 @@ type AgentSpec struct { } type AgentStatus struct { - Conditions []metav1.Condition `json:"conditions,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty"` + HasKnowledge bool `json:"hasKnowledge,omitempty"` + IngestKnowledge bool `json:"ingestKnowledge,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/run.go b/pkg/storage/apis/otto.gptscript.ai/v1/run.go index ab7147324..cef1a9dc5 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/run.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/run.go @@ -11,7 +11,9 @@ var ( ) const ( - RunFinalizer = "otto.gptscript.ai/run" + RunFinalizer = "otto.gptscript.ai/run" + ThreadFinalizer = "otto.gptscript.ai/thread" + AgentFinalizer = "otto.gptscript.ai/agent" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -40,10 +42,11 @@ type ToolProgress struct { } type RunSpec struct { - ThreadName string `json:"threadName,omitempty"` - AgentName string `json:"agentName,omitempty"` - PreviousRunName string `json:"previousRunName,omitempty"` - Input string `json:"input"` + ThreadName string `json:"threadName,omitempty"` + AgentName string `json:"agentName,omitempty"` + PreviousRunName string `json:"previousRunName,omitempty"` + Input string `json:"input"` + ExtraEnv []string `json:"extraEnv,omitempty"` } type RunStatus struct { diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/thread.go b/pkg/storage/apis/otto.gptscript.ai/v1/thread.go index e974925f0..c44cc41c2 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/thread.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/thread.go @@ -25,18 +25,21 @@ func (in *Thread) GetConditions() *[]metav1.Condition { } type ThreadSpec struct { - Input string `json:"input,omitempty"` - AgentName string `json:"agentName,omitempty"` - WorkspaceID string `json:"workspaceID,omitempty"` + Input string `json:"input,omitempty"` + AgentName string `json:"agentName,omitempty"` + WorkspaceID string `json:"workspaceID,omitempty"` + KnowledgeWorkspaceID string `json:"knowledgeWorkspaceID,omitempty"` } type ThreadStatus struct { - Description string `json:"description,omitempty"` - LastRunName string `json:"lastRunName,omitempty"` - LastRunState gptscriptclient.RunState `json:"lastRunState,omitempty"` - LastRunOutput string `json:"lastRunOutput,omitempty"` - LastRunError string `json:"lastRunError,omitempty"` - Conditions []metav1.Condition `json:"conditions,omitempty"` + Description string `json:"description,omitempty"` + LastRunName string `json:"lastRunName,omitempty"` + LastRunState gptscriptclient.RunState `json:"lastRunState,omitempty"` + LastRunOutput string `json:"lastRunOutput,omitempty"` + LastRunError string `json:"lastRunError,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty"` + HasKnowledge bool `json:"hasKnowledge,omitempty"` + IngestKnowledge bool `json:"ingestKnowledge,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go b/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go index 7579df8f7..7600fde75 100644 --- a/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go +++ b/pkg/storage/apis/otto.gptscript.ai/v1/zz_generated.deepcopy.go @@ -160,7 +160,7 @@ func (in *Run) DeepCopyInto(out *Run) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -217,6 +217,11 @@ func (in *RunList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RunSpec) DeepCopyInto(out *RunSpec) { *out = *in + if in.ExtraEnv != nil { + in, out := &in.ExtraEnv, &out.ExtraEnv + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunSpec. diff --git a/pkg/storage/openapi/generated/openapi_generated.go b/pkg/storage/openapi/generated/openapi_generated.go index 2f265c97c..1756e6027 100644 --- a/pkg/storage/openapi/generated/openapi_generated.go +++ b/pkg/storage/openapi/generated/openapi_generated.go @@ -258,6 +258,18 @@ func schema_storage_apis_ottogptscriptai_v1_AgentStatus(ref common.ReferenceCall }, }, }, + "hasKnowledge": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, + "ingestKnowledge": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, @@ -510,6 +522,20 @@ func schema_storage_apis_ottogptscriptai_v1_RunSpec(ref common.ReferenceCallback Format: "", }, }, + "extraEnv": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, Required: []string{"input"}, }, @@ -970,6 +996,12 @@ func schema_storage_apis_ottogptscriptai_v1_ThreadSpec(ref common.ReferenceCallb Format: "", }, }, + "knowledgeWorkspaceID": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, @@ -1025,6 +1057,18 @@ func schema_storage_apis_ottogptscriptai_v1_ThreadStatus(ref common.ReferenceCal }, }, }, + "hasKnowledge": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, + "ingestKnowledge": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/storage/services/config.go b/pkg/storage/services/config.go index 3c6dab227..a735e424c 100644 --- a/pkg/storage/services/config.go +++ b/pkg/storage/services/config.go @@ -15,7 +15,9 @@ type Config struct { StorageToken string `usage:"Token for storage access, will be generated if not passed"` //AuditLogPath string `usage:"Location of where to store audit logs"` //AuditLogPolicyFile string `usage:"Location of audit log policy file"` - DSN string `usage:"Database dsn in driver://connection_string format" default:"sqlite://file:otto.db?_journal=WAL&cache=shared&_busy_timeout=30000"` + DSN string `usage:"Database dsn in driver://connection_string format" default:"sqlite://file:otto.db?_journal=WAL&cache=shared&_busy_timeout=30000"` + KnowledgeBin string `usage:"Location of knowledge binary" default:"knowledge" env:"KNOWLEDGE_BIN"` + KnowledgeTool string `usage:"The knowledge tool to use" default:"github.com/gptscript-ai/knowledge/gateway@v0.4.14-rc.2" env:"KNOWLEDGE_TOOL"` } type Services struct { diff --git a/pkg/workspace/knowledge.go b/pkg/workspace/knowledge.go new file mode 100644 index 000000000..6361c377e --- /dev/null +++ b/pkg/workspace/knowledge.go @@ -0,0 +1,19 @@ +package workspace + +import ( + "fmt" + "os/exec" + "strings" +) + +func KnowledgeIDFromWorkspaceID(workspaceID string) string { + return strings.ReplaceAll(workspaceID, " ", "_") +} + +func IngestKnowledge(knowledgeBin, knowledgeWorkspaceID string) error { + if err := exec.Command(knowledgeBin, "ingest", "--prune", "--dataset", KnowledgeIDFromWorkspaceID(knowledgeWorkspaceID), GetDir(knowledgeWorkspaceID)).Run(); err != nil { + return fmt.Errorf("failed to ingest agent knowledge dataset: %w", err) + } + + return nil +} diff --git a/pkg/workspace/workspace.go b/pkg/workspace/workspace.go new file mode 100644 index 000000000..cc075943d --- /dev/null +++ b/pkg/workspace/workspace.go @@ -0,0 +1,10 @@ +package workspace + +import ( + "strings" +) + +func GetDir(workspaceID string) string { + _, path, _ := strings.Cut(workspaceID, "://") + return path +}