-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🌱 Improve dry run for server side apply using an annotation #6709
🌱 Improve dry run for server side apply using an annotation #6709
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round of review (everything except diff package)
Now continuing with the diff package
@@ -34,6 +34,20 @@ func (p Path) Append(k string) Path { | |||
return append(p, k) | |||
} | |||
|
|||
// Item return a path for an item element, represented as <path> + []. item elements are used as | |||
// a placeholder for schema definitions for list items of for map items. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just that I get it right:
The output of this func for the Path
"spec", "template", "spec"
is a Path
"spec", "template", "spec[]"
I somehow can't map the method name to the functionality. Isn't it rather something like ToArray
or AsArray
??
Path{"a","b","c"}.ToArray() => Path{"a","b","c[]"}
Should we add a unit test?
@@ -114,7 +115,8 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt | |||
r.patchEngine = patches.NewEngine(r.RuntimeClient) | |||
r.recorder = mgr.GetEventRecorderFor("topology/cluster") | |||
if r.patchHelperFactory == nil { | |||
r.patchHelperFactory = serverSideApplyPatchHelperFactory(r.Client) | |||
crdSchemaCache := diff.NewCRDSchemaCache(r.Client) | |||
r.patchHelperFactory = serverSideApplyPatchHelperFactory(r.Client, crdSchemaCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we just do diff.NewCRDSchemaCache(r.Client)
in serverSideApplyPatchHelperFactory
? The func already has the client and seems like a good way to avoid having to create the CRDSchemaCache on all callsites
} | ||
} | ||
|
||
// dryRunPatchHelperFactory makes use of a two-ways patch and is used in situations where we cannot rely on managed fields. | ||
func dryRunPatchHelperFactory(c client.Client) structuredmerge.PatchHelperFactoryFunc { | ||
return func(original, modified client.Object, opts ...structuredmerge.HelperOption) (structuredmerge.PatchHelper, error) { | ||
return func(ctx context.Context, original, modified client.Object, opts ...structuredmerge.HelperOption) (structuredmerge.PatchHelper, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return func(ctx context.Context, original, modified client.Object, opts ...structuredmerge.HelperOption) (structuredmerge.PatchHelper, error) { | |
return func(_ context.Context, original, modified client.Object, opts ...structuredmerge.HelperOption) (structuredmerge.PatchHelper, error) { |
nit
@@ -2520,6 +2537,13 @@ func TestReconcileMachineDeploymentMachineHealthCheck(t *testing.T) { | |||
uidsByName[mdts.Object.Name] = mdts.Object.GetUID() | |||
|
|||
if mdts.MachineHealthCheck != nil { | |||
// Adds last applied intent annotation assuming the current object has been originated from the topology controller, without external changes. | |||
currentIntent := &unstructured.Unstructured{} | |||
g.Expect(env.Scheme().Convert(mdts.MachineHealthCheck, currentIntent, nil)).To(Succeed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this back and forth is because AddCurrentIntentAnnotation
only supports Unstructured, correct?
@@ -41,7 +42,7 @@ type serverSidePatchHelper struct { | |||
} | |||
|
|||
// NewServerSidePatchHelper returns a new PatchHelper using server side apply. | |||
func NewServerSidePatchHelper(original, modified client.Object, c client.Client, opts ...HelperOption) (PatchHelper, error) { | |||
func NewServerSidePatchHelper(ctx context.Context, original, modified client.Object, c client.Client, crdSchemaCache diff.CRDSchemaCache, opts ...HelperOption) (PatchHelper, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
Can we please propagate ctx
also into cleanupLegacyManagedFields
?
// Gets the schema for the modified object gvk. | ||
// NOTE: this schema drives DryRunDiff operations; modified (current intent) and original (current object) | ||
// are of the same gvk, given that we are always calling UpdateReferenceAPIContract when reading both of them. | ||
// previousIntent instead could be of an older version, but this impacts dryRun only partially (see diff.isDroppingAnyIntent for more details) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was initially thinking that the controller is always restarted after relevant CRDs change.
But I think this is only true for core CRDs (https://storage.googleapis.com/kubernetes-jenkins/logs/periodic-cluster-api-e2e-upgrade-v0-3-to-main/1539927677492793344/artifacts/clusters/clusterctl-upgrade-9ovu0z/clusterctl-upgrade.log).
After the core controller and it's CRDs are updated we are updating all other providers. During that CRDs are patched, if providers add new fields to existing apiVersions which frequently happens we will probably have a stale cache here.
What about additionally using the CRD resourceVersion as part of the cache key to avoid this issue?
Potential follow-up issue: Do we care about evicting stale cache entries over time? I think it's not a factor as we assume that CAPI is updated frequently enough so we won't have too many stale entries to run into issues, right?
} | ||
|
||
crd := &apiextensionsv1.CustomResourceDefinition{} | ||
crd.SetName(fmt.Sprintf("%s.%s", flect.Pluralize(strings.ToLower(gvk.Kind)), gvk.Group)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm kind of fine with making that assumption. But should we note somewhere that this might not work with all providers?
We wanted to check if all providers are naming their CRDs correctly in: #5686
@@ -135,6 +152,12 @@ func (h *serverSidePatchHelper) Patch(ctx context.Context) error { | |||
log := ctrl.LoggerFrom(ctx) | |||
log.V(5).Info("Patching object", "Intent", h.modified) | |||
|
|||
// Stores che current intent as last applied intent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Stores che current intent as last applied intent. | |
// Stores the current intent as last applied intent. |
nit
@@ -444,3 +462,23 @@ func TestServerSideApply_CleanupLegacyManagedFields(t *testing.T) { | |||
g.Expect(gotSSAManager).To(BeTrue()) | |||
}) | |||
} | |||
|
|||
// getTopologyManagedFields returns metadata.managedFields entry tracking |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just stop caring about what happens to managed fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mid 2nd-round of review
var previousIntentJSON bytes.Buffer | ||
zr, err := gzip.NewReader(bytes.NewReader(previousIntentJSONGZIP)) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed to create gzip reader forprevious intent") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, errors.Wrap(err, "failed to create gzip reader forprevious intent") | |
return nil, errors.Wrap(err, "failed to create gzip reader for previous intent") |
nit
} | ||
|
||
if _, err := io.Copy(&previousIntentJSON, zr); err != nil { //nolint:gosec | ||
return nil, errors.Wrap(err, "failed to copy from gzip reader") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, errors.Wrap(err, "failed to copy from gzip reader") | |
return nil, errors.Wrap(err, "failed to copy previous intent from gzip reader") |
nit
} | ||
|
||
if err := zr.Close(); err != nil { | ||
return nil, errors.Wrap(err, "failed to close gzip reader for managed fields") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, errors.Wrap(err, "failed to close gzip reader for managed fields") | |
return nil, errors.Wrap(err, "failed to close gzip reader for previous intent") |
|
||
previousIntentMap := make(map[string]interface{}) | ||
if err := json.Unmarshal(previousIntentJSON.Bytes(), &previousIntentMap); err != nil { | ||
return nil, errors.Wrap(err, "failed to unmarshal managed fields") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, errors.Wrap(err, "failed to unmarshal managed fields") | |
return nil, errors.Wrap(err, "failed to unmarshal previous intent") |
return nil, errors.Wrap(err, "failed to unmarshal managed fields") | ||
} | ||
|
||
return &unstructured.Unstructured{Object: previousIntentMap}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to use the Kubernetes decoder? Otherwise we should run into the int64 vs float64 issue (xref: #6668)
} | ||
|
||
// GetItemWithKeys returns the element with the given keys/values from the list. | ||
func (ld *ListDef) GetItemWithKeys(list []interface{}, keys map[string]interface{}) map[string]interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't keys actually a map?
} | ||
|
||
// LoadOrStore returns the CRDSchema value for the gvk, if present. | ||
// Otherwise, if not present, it retrieves the corresponding CRD definition, infer the CRDSchema, stores it, returns it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Otherwise, if not present, it retrieves the corresponding CRD definition, infer the CRDSchema, stores it, returns it. | |
// Otherwise, if not present, it retrieves the corresponding CRD definition, infers the CRDSchema, stores it and returns it. |
if s, ok := sc.m[gvk]; ok { | ||
return s, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeating this here essentially means calling load before is redundant.
We could get rid of load
and store
move the mutex up to LoadOrStore and keep the schema calculation in a util func if we want
crd := &apiextensionsv1.CustomResourceDefinition{} | ||
crd.SetName(fmt.Sprintf("%s.%s", flect.Pluralize(strings.ToLower(gvk.Kind)), gvk.Group)) | ||
if err := sc.client.Get(ctx, client.ObjectKeyFromObject(crd), crd); err != nil { | ||
return nil, errors.Wrapf(err, "failed to get CRD for %s", gvk.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we just use the CRD name in the error? Seems easier to understand
schema := CRDSchema{} | ||
for _, v := range crd.Spec.Versions { | ||
if v.Name == gvk.Version && v.Schema != nil && v.Schema.OpenAPIV3Schema != nil { | ||
addToSchema(contract.Path{}, schema, v.Schema.OpenAPIV3Schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Shouldn't it be an error if we can't find a schema for GVK?
@@ -444,3 +462,23 @@ func TestServerSideApply_CleanupLegacyManagedFields(t *testing.T) { | |||
g.Expect(gotSSAManager).To(BeTrue()) | |||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some new / broken test cases, same for #6710 (and maybe main), these testcases should result in the object not getting created.
t.Run("Error on object which has another uid due to immutability", func(t *testing.T) {
g := NewWithT(t)
// Get the current object (assumes tests to be run in sequence).
original := obj.DeepCopy()
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(original), original)).To(Succeed())
// Create a patch helper for a modified object with some changes to what previously applied by th topology manager.
modified := obj.DeepCopy()
g.Expect(unstructured.SetNestedField(modified.Object, "changed", "spec", "controlPlaneEndpoint", "host")).To(Succeed())
g.Expect(unstructured.SetNestedField(modified.Object, "changed-by-topology-controller", "spec", "bar")).To(Succeed())
// Set an other uid to original
original.SetUID("a-wrong-one")
modified.SetUID("")
// Create a patch helper which should fail because original's real UID changed.
_, err := NewServerSidePatchHelper(ctx, original, modified, env.GetClient(), schemaCache)
g.Expect(err).To(HaveOccurred())
})
t.Run("Error on object which does not exist (anymore) but was expected to get updated", func(t *testing.T) {
original := builder.TestInfrastructureCluster(ns.Name, "obj3").WithSpecFields(map[string]interface{}{
"spec.controlPlaneEndpoint.host": "1.2.3.4",
"spec.controlPlaneEndpoint.port": int64(1234),
"spec.foo": "", // this field is then explicitly ignored by the patch helper
}).Build()
modified := original.DeepCopy()
// Set a not existing uid to the not existing original object
original.SetUID("does-not-exist")
// Create a patch helper which should fail because original does not exist.
_, err = NewServerSidePatchHelper(ctx, original, modified, env.GetClient(), schemaCache)
g.Expect(err).To(HaveOccurred())
})
/hold |
@fabriziopandini: PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
@fabriziopandini I think we can close this PR, given we chose SSA dry-run? |
yes! |
@fabriziopandini: Closed this PR. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
What this PR does / why we need it:
This proposes an improvement on how we implement dry run for server side apply by introducing an annotation.
The proposed implementation has the following advantages