Skip to content

Commit

Permalink
Use StrategicMergePatch when updating existing cluster resources (fly…
Browse files Browse the repository at this point in the history
katrogan authored Dec 12, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent ca735c4 commit c90f0cb
Showing 2 changed files with 48 additions and 3 deletions.
13 changes: 10 additions & 3 deletions pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
@@ -15,8 +15,6 @@ import (

"github.com/lyft/flyteadmin/pkg/executioncluster/interfaces"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

@@ -285,18 +283,27 @@ func (c *controller) syncNamespace(ctx context.Context, namespace NamespaceName,
}
for _, target := range c.executionCluster.GetAllValidTargets() {
k8sObjCopy := k8sObj.DeepCopyObject()
logger.Debugf(ctx, "Attempting to create resource [%+v](%+v) in cluster [%v] for namespace [%s]",
k8sObj.GetObjectKind().GroupVersionKind().Kind, k8sObj, target.ID, namespace)
err = target.Client.Create(ctx, k8sObjCopy)
if err != nil {
if k8serrors.IsAlreadyExists(err) {
logger.Debugf(ctx, "Resource [%+v] in namespace [%s] already exists - attempting update instead",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace)
c.metrics.AppliedTemplateExists.Inc()
err = target.Client.Patch(ctx, k8sObjCopy, client.MergeFrom(k8sObjCopy))
// Use a strategic-merge-patch to mimic `kubectl apply` behavior.
// Kubectl defaults to using the StrategicMergePatch strategy.
// However the controller-runtime only has an implementation for MergePatch which we were formerly
// using but failed to actually always merge resources in the Patch call.
err = target.Client.Patch(ctx, k8sObjCopy, StrategicMergeFrom(k8sObjCopy))
if err != nil {
c.metrics.TemplateUpdateErrors.Inc()
logger.Infof(ctx, "Failed to update resource [%+v] in namespace [%s] with err :%v",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace, err)
collectedErrs = append(collectedErrs, err)
} else {
logger.Debugf(ctx, "Successfully updated resource [%+v] in namespace [%s]",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace)
}
c.appliedTemplates[namespace][templateFile.Name()] = templateFile.ModTime()
} else {
38 changes: 38 additions & 0 deletions pkg/clusterresource/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package clusterresource

import (
jsonpatch "github.com/evanphx/json-patch"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type strategicMergeFromPatch struct {
from runtime.Object
}

// Type implements patch.
func (s *strategicMergeFromPatch) Type() types.PatchType {
return types.StrategicMergePatchType
}

// Data implements Patch.
func (s *strategicMergeFromPatch) Data(obj runtime.Object) ([]byte, error) {
originalJSON, err := json.Marshal(s.from)
if err != nil {
return nil, err
}

modifiedJSON, err := json.Marshal(obj)
if err != nil {
return nil, err
}

return jsonpatch.CreateMergePatch(originalJSON, modifiedJSON)
}

// StrategicMergeFrom creates a Patch using the strategic-merge-patch strategy with the given object as base.
func StrategicMergeFrom(obj runtime.Object) client.Patch {
return &strategicMergeFromPatch{obj}
}

0 comments on commit c90f0cb

Please sign in to comment.