Skip to content

Commit

Permalink
Merge pull request #8 from thedadams/knowledge-ingest-tool
Browse files Browse the repository at this point in the history
chore: use runs to ingest and delete knowledge
  • Loading branch information
ibuildthecloud authored Sep 11, 2024
2 parents 601493e + 2756733 commit fd3b303
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 78 deletions.
6 changes: 6 additions & 0 deletions pkg/api/handlers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,13 @@ func (a *AgentHandler) DeleteKnowledge(ctx context.Context, req api.Request) err
return err
}

files, err := a.WorkspaceClient.Ls(ctx, agent.Status.KnowledgeWorkspaceID)
if err != nil {
return fmt.Errorf("failed to list files in workspace %s: %w", agent.Status.KnowledgeWorkspaceID, err)
}

agent.Status.IngestKnowledge = true
agent.Status.HasKnowledge = len(files) > 0
return req.Storage.Status().Update(ctx, &agent)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/api/handlers/threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,13 @@ func (a *ThreadHandler) DeleteKnowledge(ctx context.Context, req api.Request) er
return err
}

files, err := a.WorkspaceClient.Ls(ctx, thread.Spec.KnowledgeWorkspaceID)
if err != nil {
return fmt.Errorf("failed to list files in workspace %s: %w", thread.Spec.KnowledgeWorkspaceID, err)
}

thread.Status.IngestKnowledge = true
thread.Status.HasKnowledge = len(files) > 0
return req.Storage.Status().Update(ctx, &thread)
}

Expand Down
13 changes: 5 additions & 8 deletions pkg/controller/handlers/agents/agents.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package agents

import (
"fmt"
"os/exec"

"github.com/acorn-io/baaah/pkg/router"
"github.com/gptscript-ai/otto/pkg/knowledge"
v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1"
"github.com/gptscript-ai/otto/pkg/workspace"
wclient "github.com/thedadams/workspace-provider/pkg/client"
)

type AgentHandler struct {
WorkspaceClient *wclient.Client
KnowledgeBin string
Ingester *knowledge.Ingester
WorkspaceProvider string
}

Expand Down Expand Up @@ -44,8 +41,8 @@ func (a *AgentHandler) RemoveWorkspaces(req router.Request, resp router.Response
}

if agent.Status.HasKnowledge {
if err := exec.Command(a.KnowledgeBin, "delete-dataset", agent.Status.KnowledgeWorkspaceID).Run(); err != nil {
return fmt.Errorf("failed to delete knowledge dataset: %w", err)
if err := a.Ingester.DeleteKnowledge(req.Ctx, agent.Namespace, agent.Status.KnowledgeWorkspaceID); err != nil {
return err
}
}

Expand All @@ -61,7 +58,7 @@ func (a *AgentHandler) IngestKnowledge(req router.Request, resp router.Response)
return nil
}

if err := workspace.IngestKnowledge(a.KnowledgeBin, agent.Status.KnowledgeWorkspaceID); err != nil {
if err := a.Ingester.IngestKnowledge(req.Ctx, agent.Namespace, agent.Status.KnowledgeWorkspaceID); err != nil {
return err
}

Expand Down
60 changes: 29 additions & 31 deletions pkg/controller/handlers/threads/threads.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package threads

import (
"fmt"
"os/exec"

"github.com/acorn-io/baaah/pkg/router"
"github.com/gptscript-ai/otto/pkg/knowledge"
v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1"
"github.com/gptscript-ai/otto/pkg/workspace"
wclient "github.com/thedadams/workspace-provider/pkg/client"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)

func Cleanup(req router.Request, resp router.Response) error {
type ThreadHandler struct {
Workspace *wclient.Client
Ingester *knowledge.Ingester
}

func (t *ThreadHandler) Cleanup(req router.Request, resp router.Response) error {
thread := req.Object.(*v1.Thread)

if thread.Spec.AgentName != "" {
Expand All @@ -35,39 +37,35 @@ func Cleanup(req router.Request, resp router.Response) error {
return nil
}

func RemoveWorkspace(wc *wclient.Client, knowledgeBin string) router.HandlerFunc {
return func(req router.Request, resp router.Response) error {
thread := req.Object.(*v1.Thread)
if err := wc.Rm(req.Ctx, thread.Spec.WorkspaceID); err != nil {
func (t *ThreadHandler) RemoveWorkspaces(req router.Request, resp router.Response) error {
thread := req.Object.(*v1.Thread)
if err := t.Workspace.Rm(req.Ctx, thread.Spec.WorkspaceID); err != nil {
return err
}

if thread.Status.HasKnowledge {
if err := t.Ingester.DeleteKnowledge(req.Ctx, thread.Namespace, thread.Spec.KnowledgeWorkspaceID); err != nil {
return err
}
}

if thread.Status.HasKnowledge {
if err := exec.Command(knowledgeBin, "delete-dataset", thread.Spec.KnowledgeWorkspaceID).Run(); err != nil {
return fmt.Errorf("failed to delete knowledge dataset: %w", err)
}
}
if thread.Spec.KnowledgeWorkspaceID != "" {
return t.Workspace.Rm(req.Ctx, thread.Spec.KnowledgeWorkspaceID)
}

if thread.Spec.KnowledgeWorkspaceID != "" {
return wc.Rm(req.Ctx, thread.Spec.KnowledgeWorkspaceID)
}
return nil
}

func (t *ThreadHandler) IngestKnowledge(req router.Request, resp router.Response) error {
thread := req.Object.(*v1.Thread)
if !thread.Status.IngestKnowledge || !thread.Status.HasKnowledge {
return nil
}
}

func IngestKnowledge(knowledgeBin string) router.HandlerFunc {
return func(req router.Request, resp router.Response) error {
thread := req.Object.(*v1.Thread)
if !thread.Status.IngestKnowledge || !thread.Status.HasKnowledge {
return nil
}

if err := workspace.IngestKnowledge(knowledgeBin, thread.Spec.KnowledgeWorkspaceID); err != nil {
return err
}

thread.Status.IngestKnowledge = false
return nil
if err := t.Ingester.IngestKnowledge(req.Ctx, thread.Namespace, thread.Spec.KnowledgeWorkspaceID); err != nil {
return err
}

thread.Status.IngestKnowledge = false
return nil
}
12 changes: 9 additions & 3 deletions pkg/controller/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gptscript-ai/otto/pkg/controller/handlers/workflow"
"github.com/gptscript-ai/otto/pkg/controller/handlers/workflowexecution"
"github.com/gptscript-ai/otto/pkg/controller/handlers/workflowstep"
"github.com/gptscript-ai/otto/pkg/knowledge"
"github.com/gptscript-ai/otto/pkg/services"
v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1"
)
Expand All @@ -17,11 +18,16 @@ func routes(router *router.Router, services *services.Services) error {
workflows := workflow.New(services.WorkspaceClient, "directory")
workflowExecution := workflowexecution.New(services.WorkspaceClient)
workflowStep := workflowstep.New(services.Invoker)
ingester := knowledge.NewIngester(services.Invoker, services.KnowledgeTool)
agents := agents.AgentHandler{
WorkspaceClient: services.WorkspaceClient,
KnowledgeBin: services.KnowledgeBin,
Ingester: ingester,
WorkspaceProvider: "directory",
}
threads := threads.ThreadHandler{
Workspace: services.WorkspaceClient,
Ingester: ingester,
}

root := router.Middleware(conditions.ErrorMiddleware())

Expand All @@ -30,9 +36,9 @@ func routes(router *router.Router, services *services.Services) error {
root.Type(&v1.Run{}).HandlerFunc(runs.Cleanup)

// Threads
root.Type(&v1.Thread{}).FinalizeFunc(v1.ThreadFinalizer, threads.RemoveWorkspace(services.WorkspaceClient, services.KnowledgeBin))
root.Type(&v1.Thread{}).FinalizeFunc(v1.ThreadFinalizer, threads.RemoveWorkspaces)
root.Type(&v1.Thread{}).HandlerFunc(threads.Cleanup)
root.Type(&v1.Thread{}).HandlerFunc(threads.IngestKnowledge(services.KnowledgeBin))
root.Type(&v1.Thread{}).HandlerFunc(threads.IngestKnowledge)

// Workflows
root.Type(&v1.Workflow{}).FinalizeFunc(v1.WorkflowFinalizer, workflows.Finalize)
Expand Down
40 changes: 31 additions & 9 deletions pkg/invoke/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"
Expand All @@ -29,18 +30,16 @@ type Invoker struct {
workspaceClient *wclient.Client
threadWorkspaceProvider string
knowledgeTool string
knowledgeBin string
}

func NewInvoker(storage storage.Client, gptClient *gptscript.GPTScript, tokenService *jwt.TokenService, workspaceClient *wclient.Client, knowledgeTool, knowledgeBin string) *Invoker {
func NewInvoker(storage storage.Client, gptClient *gptscript.GPTScript, tokenService *jwt.TokenService, workspaceClient *wclient.Client, knowledgeTool string) *Invoker {
return &Invoker{
storage: storage,
gptClient: gptClient,
tokenService: tokenService,
workspaceClient: workspaceClient,
threadWorkspaceProvider: "directory",
knowledgeTool: knowledgeTool,
knowledgeBin: knowledgeBin,
}
}

Expand Down Expand Up @@ -132,7 +131,7 @@ func (i *Invoker) Agent(ctx context.Context, agent *v1.Agent, input string, opts
return nil, err
}

tools, extraEnv, err := render.Agent(ctx, i.storage, agent, thread, i.knowledgeTool, i.knowledgeBin)
tools, extraEnv, err := render.Agent(ctx, i.storage, agent, thread, i.knowledgeTool)
if err != nil {
return nil, err
}
Expand All @@ -146,7 +145,7 @@ func (i *Invoker) Agent(ctx context.Context, agent *v1.Agent, input string, opts
}
}

return i.createRun(ctx, thread, tools, input, runOptions{
return i.createRunFromTools(ctx, thread, tools, input, runOptions{
AgentName: agent.Name,
Env: extraEnv,
})
Expand All @@ -159,7 +158,17 @@ type runOptions struct {
Env []string
}

func (i *Invoker) createRun(ctx context.Context, thread *v1.Thread, tools []gptscript.ToolDef, input string, opts runOptions) (*Response, error) {
func (i *Invoker) createRunFromTools(ctx context.Context, thread *v1.Thread, tools []gptscript.ToolDef, input string, opts runOptions) (*Response, error) {
return i.createRun(ctx, thread, input, opts, tools)
}

func (i *Invoker) createRunFromRemoteTool(ctx context.Context, thread *v1.Thread, tool, input string, opts runOptions) (*Response, error) {
return i.createRun(ctx, thread, input, opts, tool)
}

// createRun is a low-level method that creates a Run object from a list of tools or a remote tool.
// Callers should use createRunFromTools or createRunFromRemoteTool instead.
func (i *Invoker) createRun(ctx context.Context, thread *v1.Thread, input string, opts runOptions, tool any) (*Response, error) {
var run = v1.Run{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "r1",
Expand Down Expand Up @@ -194,22 +203,35 @@ func (i *Invoker) createRun(ctx context.Context, thread *v1.Thread, tools []gpts
Scope: thread.Namespace,
})

runResp, err := i.gptClient.Evaluate(ctx, gptscript.Options{
options := gptscript.Options{
GlobalOptions: gptscript.GlobalOptions{
Env: append(opts.Env,
"OTTO_TOKEN="+token,
"OTTO_RUN_ID="+run.Name,
"OTTO_THREAD_ID="+thread.Name,
"OTTO_WORKFLOW_ID="+opts.WorkflowName,
"OTTO_WORKFLOW_STEP_ID="+opts.WorkflowStepName,
"OTTO_AGENT_ID="+opts.AgentName),
"OTTO_AGENT_ID="+opts.AgentName,
),
},
Input: input,
Workspace: workspace.GetDir(thread.Spec.WorkspaceID),
ChatState: chatState,
IncludeEvents: true,
ForceSequential: true,
}, tools...)
}

var runResp *gptscript.Run
switch t := tool.(type) {
case gptscript.ToolDef:
runResp, err = i.gptClient.Evaluate(ctx, options, t)
case []gptscript.ToolDef:
runResp, err = i.gptClient.Evaluate(ctx, options, t...)
case string:
runResp, err = i.gptClient.Run(ctx, t, options)
default:
return nil, fmt.Errorf("invalid tool type: %T", tool)
}
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/invoke/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (i *Invoker) Step(ctx context.Context, step *v1.WorkflowStep, input string)

tools := render.Step(step)

return i.createRun(ctx, &thread, tools, input, runOptions{
return i.createRunFromTools(ctx, &thread, tools, input, runOptions{
WorkflowName: step.Spec.WorkflowName,
WorkflowStepName: step.Spec.AfterWorkflowStepName,
})
Expand Down
28 changes: 28 additions & 0 deletions pkg/invoke/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package invoke

import (
"context"

v1 "github.com/gptscript-ai/otto/pkg/storage/apis/otto.gptscript.ai/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (i *Invoker) SystemAction(ctx context.Context, generateName, namespace, tool, input string, env ...string) (*Response, error) {
thread := v1.Thread{
ObjectMeta: metav1.ObjectMeta{
GenerateName: generateName,
Namespace: namespace,
},
Spec: v1.ThreadSpec{
Input: input,
},
}

if err := i.storage.Create(ctx, &thread); err != nil {
return nil, err
}

return i.createRunFromRemoteTool(ctx, &thread, tool, input, runOptions{
Env: env,
})
}
58 changes: 58 additions & 0 deletions pkg/knowledge/knowledge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package knowledge

import (
"context"
"fmt"
"strings"

"github.com/gptscript-ai/otto/pkg/invoke"
"github.com/gptscript-ai/otto/pkg/workspace"
)

type Ingester struct {
invoker *invoke.Invoker
knowledgeTool string
}

func NewIngester(invoker *invoke.Invoker, knowledgeTool string) *Ingester {
return &Ingester{
invoker: invoker,
knowledgeTool: knowledgeTool,
}
}

func (i *Ingester) IngestKnowledge(ctx context.Context, namespace, knowledgeWorkspaceID string) error {
knowledgeTool, tag, ok := strings.Cut(i.knowledgeTool, "@")
if ok {
tag = "@" + tag
}

run, err := i.invoker.SystemAction(ctx, "ingest-", namespace, knowledgeTool+"/ingest.gpt"+tag, workspace.GetDir(knowledgeWorkspaceID), "GPTSCRIPT_DATASET="+workspace.KnowledgeIDFromWorkspaceID(knowledgeWorkspaceID))
if err != nil {
return err
}

run.Wait()
if run.Run.Status.Error != "" {
return fmt.Errorf("failed to ingest knowledge: %s", run.Run.Status.Error)
}
return nil
}

func (i *Ingester) DeleteKnowledge(ctx context.Context, namespace, knowledgeWorkspaceID string) error {
knowledgeTool, tag, ok := strings.Cut(i.knowledgeTool, "@")
if ok {
tag = "@" + tag
}

run, err := i.invoker.SystemAction(ctx, "ingest-delete-", namespace, knowledgeTool+"/delete.gpt"+tag, workspace.KnowledgeIDFromWorkspaceID(knowledgeWorkspaceID))
if err != nil {
return err
}

run.Wait()
if run.Run.Status.Error != "" {
return fmt.Errorf("failed to delete knowledge: %s", run.Run.Status.Error)
}
return nil
}
Loading

0 comments on commit fd3b303

Please sign in to comment.