Skip to content

Commit

Permalink
fix: wait for the alias to be processed in the API layer
Browse files Browse the repository at this point in the history
For objects with aliases, like agents, if the alias is created or
updated, the API may return stale data because the controller hasn't
processed the object yet. This change adds a mechanism for the API layer
to ensure that the alias controller has processed the object.

Signed-off-by: Donnie Adams <[email protected]>
  • Loading branch information
thedadams committed Nov 26, 2024
1 parent fd56256 commit f532350
Show file tree
Hide file tree
Showing 16 changed files with 202 additions and 53 deletions.
20 changes: 15 additions & 5 deletions pkg/api/handlers/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ func (a *AgentHandler) Update(req api.Context) error {
return err
}

resp, err := convertAgent(agent, req)
processedAgent, err := wait.For(req.Context(), req.Storage, &agent, func(agent *v1.Agent) bool {
return agent.Generation == agent.Status.AliasObservedGeneration
})
if err != nil {
return fmt.Errorf("failed to update agent: %w", err)
}

resp, err := convertAgent(*processedAgent, req)
if err != nil {
return err
}
Expand All @@ -75,7 +82,7 @@ func (a *AgentHandler) Create(req api.Context) error {
if err := req.Read(&manifest); err != nil {
return err
}
agent := v1.Agent{
agent := &v1.Agent{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.AgentPrefix,
Namespace: req.Namespace(),
Expand All @@ -85,11 +92,14 @@ func (a *AgentHandler) Create(req api.Context) error {
},
}

if err := req.Create(&agent); err != nil {
return err
agent, err := wait.For(req.Context(), req.Storage, agent, func(agent *v1.Agent) bool {
return agent.Generation == agent.Status.AliasObservedGeneration
}, wait.Option{Create: true})
if err != nil {
return fmt.Errorf("failed to create agent: %w", err)
}

resp, err := convertAgent(agent, req)
resp, err := convertAgent(*agent, req)
if err != nil {
return err
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/api/handlers/emailreceiver.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package handlers

import (
"fmt"

"github.com/otto8-ai/otto8/apiclient/types"
"github.com/otto8-ai/otto8/pkg/alias"
"github.com/otto8-ai/otto8/pkg/api"
v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1"
"github.com/otto8-ai/otto8/pkg/system"
"github.com/otto8-ai/otto8/pkg/wait"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -35,12 +38,18 @@ func (e *EmailReceiverHandler) Update(req api.Context) error {
}

er.Spec.EmailReceiverManifest = manifest

if err := req.Update(&er); err != nil {
return err
}

return req.Write(convertEmailReceiver(er, e.hostname))
processedEr, err := wait.For(req.Context(), req.Storage, &er, func(er *v1.EmailReceiver) bool {
return er.Generation == er.Status.AliasObservedGeneration
})
if err != nil {
return fmt.Errorf("failed to update email receiver: %w", err)
}

return req.Write(convertEmailReceiver(*processedEr, e.hostname))
}

func (e *EmailReceiverHandler) Delete(req api.Context) error {
Expand All @@ -62,7 +71,7 @@ func (e *EmailReceiverHandler) Create(req api.Context) error {
return err
}

er := v1.EmailReceiver{
er := &v1.EmailReceiver{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.EmailReceiverPrefix,
Namespace: req.Namespace(),
Expand All @@ -72,11 +81,14 @@ func (e *EmailReceiverHandler) Create(req api.Context) error {
},
}

if err := req.Create(&er); err != nil {
return err
er, err := wait.For(req.Context(), req.Storage, er, func(er *v1.EmailReceiver) bool {
return er.Generation == er.Status.AliasObservedGeneration
}, wait.Option{Create: true})
if err != nil {
return fmt.Errorf("failed to create email receiver: %w", err)
}

return req.WriteCreated(convertEmailReceiver(er, e.hostname))
return req.WriteCreated(convertEmailReceiver(*er, e.hostname))
}

func convertEmailReceiver(emailReceiver v1.EmailReceiver, hostname string) *types.EmailReceiver {
Expand Down
23 changes: 17 additions & 6 deletions pkg/api/handlers/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1"
"github.com/otto8-ai/otto8/pkg/storage/selectors"
"github.com/otto8-ai/otto8/pkg/system"
"github.com/otto8-ai/otto8/pkg/wait"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -78,7 +79,14 @@ func (a *ModelHandler) Update(req api.Context) error {
return err
}

resp, err := convertModel(req.Context(), req.Storage, existing)
processedModel, err := wait.For(req.Context(), req.Storage, &existing, func(model *v1.Model) bool {
return model.Generation == model.Status.AliasObservedGeneration
})
if err != nil {
return fmt.Errorf("failed to update model: %w", err)
}

resp, err := convertModel(req.Context(), req.Storage, *processedModel)
if err != nil {
return err
}
Expand All @@ -105,7 +113,7 @@ func (a *ModelHandler) Create(req api.Context) error {
return types.NewErrBadRequest("model provider %s must be of type %s not %s", modelManifest.ModelProvider, types.ToolReferenceTypeModelProvider, toolRef.Spec.Type)
}

model := v1.Model{
model := &v1.Model{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.ModelPrefix,
Namespace: req.Namespace(),
Expand All @@ -115,15 +123,18 @@ func (a *ModelHandler) Create(req api.Context) error {
},
}

if err := validateModelManifestAndSetDefaults(&model); err != nil {
if err := validateModelManifestAndSetDefaults(model); err != nil {
return err
}

if err := req.Create(&model); err != nil {
return err
model, err := wait.For(req.Context(), req.Storage, model, func(model *v1.Model) bool {
return model.Generation == model.Status.AliasObservedGeneration
}, wait.Option{Create: true})
if err != nil {
return fmt.Errorf("failed to create model: %w", err)
}

resp, err := convertModel(req.Context(), req.Storage, model)
resp, err := convertModel(req.Context(), req.Storage, *model)
if err != nil {
return err
}
Expand Down
21 changes: 16 additions & 5 deletions pkg/api/handlers/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/otto8-ai/otto8/pkg/api/server"
v1 "github.com/otto8-ai/otto8/pkg/storage/apis/otto.otto8.ai/v1"
"github.com/otto8-ai/otto8/pkg/system"
"github.com/otto8-ai/otto8/pkg/wait"
"golang.org/x/crypto/bcrypt"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -78,7 +79,14 @@ func (a *WebhookHandler) Update(req api.Context) error {
return err
}

return req.Write(convertWebhook(wh, server.GetURLPrefix(req)))
processedWh, err := wait.For(req.Context(), req.Storage, &wh, func(wh *v1.Webhook) bool {
return wh.Generation == wh.Status.AliasObservedGeneration
})
if err != nil {
return fmt.Errorf("failed to update webhook: %w", err)
}

return req.Write(convertWebhook(*processedWh, server.GetURLPrefix(req)))
}

func (a *WebhookHandler) Delete(req api.Context) error {
Expand All @@ -100,7 +108,7 @@ func (a *WebhookHandler) Create(req api.Context) error {
return err
}

wh := v1.Webhook{
wh := &v1.Webhook{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.WebhookPrefix,
Namespace: req.Namespace(),
Expand All @@ -123,11 +131,14 @@ func (a *WebhookHandler) Create(req api.Context) error {
wh.Spec.Headers[i] = textproto.CanonicalMIMEHeaderKey(h)
}

if err := req.Create(&wh); err != nil {
return err
wh, err := wait.For(req.Context(), req.Storage, wh, func(wh *v1.Webhook) bool {
return wh.Generation == wh.Status.AliasObservedGeneration
}, wait.Option{Create: true})
if err != nil {
return fmt.Errorf("failed to create webhook: %w", err)
}

return req.WriteCreated(convertWebhook(wh, server.GetURLPrefix(req)))
return req.WriteCreated(convertWebhook(*wh, server.GetURLPrefix(req)))
}

func convertWebhook(webhook v1.Webhook, urlPrefix string) *types.Webhook {
Expand Down
21 changes: 16 additions & 5 deletions pkg/api/handlers/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,14 @@ func (a *WorkflowHandler) Update(req api.Context) error {
return err
}

resp, err := convertWorkflow(wf, req)
processedWf, err := wait.For(req.Context(), req.Storage, &wf, func(wf *v1.Workflow) bool {
return wf.Generation == wf.Status.AliasObservedGeneration
})
if err != nil {
return fmt.Errorf("failed to update workflow: %w", err)
}

resp, err := convertWorkflow(*processedWf, req)
if err != nil {
return err
}
Expand All @@ -116,8 +123,9 @@ func (a *WorkflowHandler) Create(req api.Context) error {
if err := req.Read(&manifest); err != nil {
return err
}

manifest = workflow.PopulateIDs(manifest)
workflow := v1.Workflow{
wf := &v1.Workflow{
ObjectMeta: metav1.ObjectMeta{
GenerateName: system.WorkflowPrefix,
Namespace: req.Namespace(),
Expand All @@ -127,11 +135,14 @@ func (a *WorkflowHandler) Create(req api.Context) error {
},
}

if err := req.Create(&workflow); err != nil {
return err
wf, err := wait.For(req.Context(), req.Storage, wf, func(wf *v1.Workflow) bool {
return wf.Generation == wf.Status.AliasObservedGeneration
}, wait.Option{Create: true})
if err != nil {
return fmt.Errorf("failed to create workflow: %w", err)
}

resp, err := convertWorkflow(workflow, req)
resp, err := convertWorkflow(*wf, req)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/handlers/alias/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func AssignAlias(req router.Request, _ router.Response) error {
aliasable := req.Object.(v1.Aliasable)

if aliasable.GetAliasName() == "" {
if aliasable.IsAssigned() {
if aliasable.IsAssigned() || aliasable.GetGeneration() != aliasable.GetAliasObservedGeneration() {
aliasable.SetAssigned(false)
aliasable.SetAliasObservedGeneration(aliasable.GetGeneration())
return req.Client.Status().Update(req.Ctx, req.Object)
}

Expand Down Expand Up @@ -60,8 +61,9 @@ func AssignAlias(req router.Request, _ router.Response) error {
return err
}

if assigned := matches(alias, req.Object); assigned != aliasable.IsAssigned() {
if assigned := matches(alias, req.Object); assigned != aliasable.IsAssigned() || aliasable.GetGeneration() != aliasable.GetAliasObservedGeneration() {
aliasable.SetAssigned(assigned)
aliasable.SetAliasObservedGeneration(aliasable.GetGeneration())
return req.Client.Status().Update(req.Ctx, req.Object)
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/storage/apis/otto.otto8.ai/v1/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ func (a *Agent) SetAssigned(assigned bool) {
a.Status.AliasAssigned = assigned
}

func (a *Agent) GetAliasObservedGeneration() int64 {
return a.Status.AliasObservedGeneration
}

func (a *Agent) SetAliasObservedGeneration(gen int64) {
a.Status.AliasObservedGeneration = gen
}

func (a *Agent) Has(field string) bool {
return a.Get(field) != ""
}
Expand Down Expand Up @@ -64,10 +72,11 @@ type AgentSpec struct {
}

type AgentStatus struct {
KnowledgeSetNames []string `json:"knowledgeSetNames,omitempty"`
WorkspaceName string `json:"workspaceName,omitempty"`
AliasAssigned bool `json:"aliasAssigned,omitempty"`
AuthStatus map[string]types.OAuthAppLoginAuthStatus `json:"authStatus,omitempty"`
KnowledgeSetNames []string `json:"knowledgeSetNames,omitempty"`
WorkspaceName string `json:"workspaceName,omitempty"`
AliasAssigned bool `json:"aliasAssigned,omitempty"`
AuthStatus map[string]types.OAuthAppLoginAuthStatus `json:"authStatus,omitempty"`
AliasObservedGeneration int64 `json:"aliasProcessed,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/apis/otto.otto8.ai/v1/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var (
Expand Down Expand Up @@ -42,9 +43,12 @@ type AliasSpec struct {
// +k8s:deepcopy-gen=false

type Aliasable interface {
kclient.Object
GetAliasName() string
SetAssigned(bool)
IsAssigned() bool
GetAliasObservedGeneration() int64
SetAliasObservedGeneration(int64)
}

// +k8s:deepcopy-gen=false
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/apis/otto.otto8.ai/v1/defaultmodelalias.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func (a *DefaultModelAlias) GetAliasScope() string {
return "Model"
}

func (a *DefaultModelAlias) GetAliasObservedGeneration() int64 {
return a.Generation
}

func (a *DefaultModelAlias) SetAliasObservedGeneration(int64) {}

type DefaultModelAliasSpec struct {
Manifest types.DefaultModelAliasManifest `json:"manifest"`
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/storage/apis/otto.otto8.ai/v1/emailaddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ func (in *EmailReceiver) IsAssigned() bool {
return in.Status.AliasAssigned
}

func (in *EmailReceiver) GetAliasObservedGeneration() int64 {
return in.Status.AliasObservedGeneration
}

func (in *EmailReceiver) SetAliasObservedGeneration(gen int64) {
in.Status.AliasObservedGeneration = gen
}

func (*EmailReceiver) GetColumns() [][]string {
return [][]string{
{"Name", "Name"},
Expand All @@ -42,10 +50,10 @@ func (*EmailReceiver) GetColumns() [][]string {
}
}

func (w *EmailReceiver) DeleteRefs() []Ref {
if system.IsWorkflowID(w.Spec.Workflow) {
func (in *EmailReceiver) DeleteRefs() []Ref {
if system.IsWorkflowID(in.Spec.Workflow) {
return []Ref{
{ObjType: new(Workflow), Name: w.Spec.Workflow},
{ObjType: new(Workflow), Name: in.Spec.Workflow},
}
}
return nil
Expand All @@ -56,7 +64,8 @@ type EmailReceiverSpec struct {
}

type EmailReceiverStatus struct {
AliasAssigned bool `json:"aliasAssigned,omitempty"`
AliasAssigned bool `json:"aliasAssigned,omitempty"`
AliasObservedGeneration int64 `json:"aliasProcessed,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
Loading

0 comments on commit f532350

Please sign in to comment.