Skip to content

Commit

Permalink
fix: PTS change not detected when Deployment is deleted or edited
Browse files Browse the repository at this point in the history
See https://issues.redhat.com/browse/IPT-1131

Additional fixes:
- Typo in the operand image tags after last release
- Initialization check being always unsuccesful
  • Loading branch information
jsenko committed Jun 14, 2024
1 parent abe54d2 commit f112323
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 65 deletions.
6 changes: 3 additions & 3 deletions config/manager/resources/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ spec:
- name: REGISTRY_VERSION
value: 2.x
- name: REGISTRY_IMAGE_MEM
value: "quay.io/apicurio/apicurio-registry-mem: latest-snapshot "
value: "quay.io/apicurio/apicurio-registry-mem:latest-snapshot"
- name: REGISTRY_IMAGE_KAFKASQL
value: "quay.io/apicurio/apicurio-registry-kafkasql: latest-snapshot "
value: "quay.io/apicurio/apicurio-registry-kafkasql:latest-snapshot"
- name: REGISTRY_IMAGE_SQL
value: "quay.io/apicurio/apicurio-registry-sql: latest-snapshot "
value: "quay.io/apicurio/apicurio-registry-sql:latest-snapshot"
- name: WATCH_NAMESPACE
valueFrom:
fieldRef:
Expand Down
2 changes: 1 addition & 1 deletion controllers/apicurioregistry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (this *ApicurioRegistryReconciler) Reconcile(_ go_ctx.Context, request reco
controlLoop.Run()

// Reschedule if requested
requeue, delay := controlLoop.GetContext().GetAndResetRequeue()
requeue, delay := controlLoop.GetContext().Finalize()
return reconcile.Result{Requeue: requeue, RequeueAfter: delay}, nil
}

Expand Down
98 changes: 76 additions & 22 deletions controllers/cf/cf_pod_template_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,31 @@ type PodTemplateSpecCF struct {

previousBasePodTemplateSpec *ar.ApicurioRegistryPodTemplateSpec
basePodTemplateSpec *ar.ApicurioRegistryPodTemplateSpec
valid bool
targetPodTemplateSpec *core.PodTemplateSpec

previousDeploymentPodSpec *core.PodTemplateSpec
deploymentPodSpec *core.PodTemplateSpec

valid bool
targetPodTemplateSpec *core.PodTemplateSpec

lastActedReconcileSequence int64
}

// Using the Deployment directly to detemine whether the PTS has to be applied is a problem,
// because other CFs might modify the Deployment as well.
// We have no way of comparing the target PTS with the PTS in the Deployment,
// since we don't know which changes are from spec PTS and which from the CFs.
// To work around this, we will reconcile if:
// - The PTS in the spec has changed, or
// - The PTS in the Deployment has changed.
// If we are done, there will be no change in the Deployment between execution of this CF in subsequent reconciliations,
// so we don't have to update the PTS again. This may waste a few cycles, but I don't think we can do better than that.
func NewPodTemplateSpecCF(ctx context.LoopContext, services services.LoopServices) loop.ControlFunction {
res := &PodTemplateSpecCF{
ctx: ctx,
svcResourceCache: ctx.GetResourceCache(),
services: services,
ctx: ctx,
svcResourceCache: ctx.GetResourceCache(),
services: services,
lastActedReconcileSequence: -2,
}
res.log = ctx.GetLog().Sugar().With("cf", res.Describe())
return res
Expand All @@ -45,18 +61,36 @@ func (this *PodTemplateSpecCF) Describe() string {
}

func (this *PodTemplateSpecCF) Sense() {

this.log.Debugw("Sense",
"this.ctx.GetReconcileSequence()", this.ctx.GetReconcileSequence(),
"this.lastActedReconcileSequence", this.lastActedReconcileSequence,
)

if this.lastActedReconcileSequence+1 == this.ctx.GetReconcileSequence() {
this.log.Debugln("Sense", "We have acted in the previous loop, record the previous PTS from the Deployment, and reschedule")
// We have acted in the previous loop, record the previous PTS from the Deployment, and reschedule
if deploymentEntry, deploymentExists := this.svcResourceCache.Get(resources.RC_KEY_DEPLOYMENT); deploymentExists {
this.log.Debugln("Sense", "Setting this.previousDeploymentPodSpec")
this.previousDeploymentPodSpec = &deploymentEntry.GetValue().(*apps.Deployment).Spec.Template
this.previousDeploymentPodSpec = this.previousDeploymentPodSpec.DeepCopy() // Defensive copy
this.ctx.SetRequeueNow()
return
}
}

this.valid = false

if entry, exists := this.svcResourceCache.Get(resources.RC_KEY_SPEC); exists {

this.basePodTemplateSpec = &entry.GetValue().(*ar.ApicurioRegistry).Spec.Deployment.PodTemplateSpecPreview
this.basePodTemplateSpec = this.basePodTemplateSpec.DeepCopy() // Defensive copy so we don't update the spec
this.basePodTemplateSpec = this.basePodTemplateSpec.DeepCopy() // Defensive copy so we won't update the spec

if deploymentEntry, deploymentExists := this.svcResourceCache.Get(resources.RC_KEY_DEPLOYMENT); deploymentExists {
currentPodSpec := &deploymentEntry.GetValue().(*apps.Deployment).Spec.Template
currentPodSpec = currentPodSpec.DeepCopy()
this.deploymentPodSpec = &deploymentEntry.GetValue().(*apps.Deployment).Spec.Template
this.deploymentPodSpec = this.deploymentPodSpec.DeepCopy() // Defensive copy
factoryPodSpec := this.services.GetKubeFactory().CreateDeployment().Spec.Template
targetPodSpec, err := SanitizeBasePodSpec(this.log, this.basePodTemplateSpec, currentPodSpec, &factoryPodSpec)
targetPodSpec, err := SanitizeBasePodSpec(this.log, this.basePodTemplateSpec, this.deploymentPodSpec, &factoryPodSpec)
if err == nil {
this.targetPodTemplateSpec, err = ConvertToPodTemplateSpec(targetPodSpec)
if err == nil {
Expand All @@ -68,37 +102,57 @@ func (this *PodTemplateSpecCF) Sense() {
this.log.Errorw("an error has occurred when processing spec.deployment.podTemplateSpecPreview field", "error", err)
this.services.GetConditionManager().GetConfigurationErrorCondition().
TransitionInvalid(err.Error(), "spec.deployment.podTemplateSpecPreview")
// No need to transition to not ready, since we can just with the previous config
// No need to transition to not-ready
this.ctx.SetRequeueDelaySec(10)
}
}
}
}

func (this *PodTemplateSpecCF) Compare() bool {

if this.lastActedReconcileSequence == this.ctx.GetReconcileSequence() {
this.log.Debugln("Compare", "Act only once per reconciliation loop")
return false // Act only once per reconciliation loop
}
if this.lastActedReconcileSequence+1 == this.ctx.GetReconcileSequence() {
this.log.Debugln("Compare", "We have acted in the previous loop, recorded the previous PTS from the Deployment, and have to skip")
return false // We have acted in the reconciliation, recorded the previous PTS from the Deployment, and have to skip
}

this.log.Debugw("Compare", "valid", this.valid)

if this.previousBasePodTemplateSpec != nil { // common.LogNillable does not work TODO find out why
this.log.Debugw("Obsevation #1", "this.previousBasePodTemplateSpec", this.previousBasePodTemplateSpec)
this.log.Debugw("Compare", "this.previousBasePodTemplateSpec", this.previousBasePodTemplateSpec)
} else {
this.log.Debugw("Compare", "this.previousBasePodTemplateSpec", "<nil>")
}
this.log.Debugw("Compare", "this.basePodTemplateSpec", this.basePodTemplateSpec)

if this.previousDeploymentPodSpec != nil { // common.LogNillable does not work TODO find out why
this.log.Debugw("Compare", "this.previousDeploymentPodSpec", this.previousDeploymentPodSpec)
} else {
this.log.Debugw("Obsevation #1", "this.previousBasePodTemplateSpec", "<nil>")
this.log.Debugw("Compare", "this.previousDeploymentPodSpec", "<nil>")
}
this.log.Debugw("Obsevation #2", "this.basePodTemplateSpec", this.basePodTemplateSpec)
this.log.Debugw("Obsevation #3", "this.targetPodTemplateSpec", this.targetPodTemplateSpec)
this.log.Debugw("Compare", "this.deploymentPodSpec", this.deploymentPodSpec)

this.log.Debugw("Compare", "!reflect.DeepEqual(this.basePodTemplateSpec, this.previousBasePodTemplateSpec)", !reflect.DeepEqual(this.basePodTemplateSpec, this.previousBasePodTemplateSpec))

this.log.Debugw("Compare", "!reflect.DeepEqual(this.deploymentPodSpec, this.previousDeploymentPodSpec)", !reflect.DeepEqual(this.deploymentPodSpec, this.previousDeploymentPodSpec))

return this.valid &&
// We're only comparing changes to the podSpecPreview, not the real pod spec,
// so we do not overwrite changes by the other CFs, which would cause a loop panic
(this.previousBasePodTemplateSpec == nil || !reflect.DeepEqual(this.basePodTemplateSpec, this.previousBasePodTemplateSpec))
(!reflect.DeepEqual(this.basePodTemplateSpec, this.previousBasePodTemplateSpec) || !reflect.DeepEqual(this.deploymentPodSpec, this.previousDeploymentPodSpec))
}

func (this *PodTemplateSpecCF) Respond() {

this.log.Debugln("Respond", "start respond")
if entry, exists := this.svcResourceCache.Get(resources.RC_KEY_DEPLOYMENT); exists {
entry.ApplyPatch(func(value interface{}) interface{} {
deployment := value.(*apps.Deployment).DeepCopy()

deployment.Spec.Template = *this.targetPodTemplateSpec

this.previousBasePodTemplateSpec = this.basePodTemplateSpec

deployment.Spec.Template = *this.targetPodTemplateSpec
this.lastActedReconcileSequence = this.ctx.GetReconcileSequence()
this.log.Debugln("Respond", "responded")
return deployment
})
}
Expand Down
3 changes: 3 additions & 0 deletions controllers/cf/condition/cf_app_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ func (this *AppHealthCF) Compare() bool {
// Executing AFTER initialization,
// that part is handled by InitializingCF
// Prevent loop from getting stable by only executing once
this.log.Debugln("this.disabled", this.disabled)
this.log.Debugln("this.initializing", this.initializing)
this.log.Debugln("this.ctx.GetAttempts()", this.ctx.GetAttempts())
return !this.disabled && !this.initializing && this.ctx.GetAttempts() == 0
}

Expand Down
12 changes: 9 additions & 3 deletions controllers/cf/condition/cf_initializing.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func (this *InitializingCF) Sense() {
this.requestOk = this.ctx.GetTestingSupport().GetMockCanMakeHTTPRequestToOperand(this.ctx.GetAppNamespace().Str())
} else {
if this.targetType == core.ServiceTypeClusterIP && this.targetIP != "" {
url := scheme + this.targetIP + ":" + port
// NOTE: The client will follow redirects, but I have found that there is a strange issue with a cyclic redirect:
// http://172.30.162.200:8080 -> http://172.30.162.200:8080/ui -> http://172.30.162.200:8080/ui
// that ends with the client returning status 404. Therefore, we are using /apis instead.
url := scheme + this.targetIP + ":" + port + "/apis"
res, err := this.httpClient.Get(url)
if err == nil {
defer res.Body.Close()
Expand All @@ -99,9 +102,9 @@ func (this *InitializingCF) Sense() {
this.log.Warnw("request to check that Apicurio Registry instance is available has failed with a status", "url", url, "status", res.StatusCode)
}
} else if os.IsTimeout(err) {
this.log.Warnw("request to check that Apicurio Registry instance is available has timed out", "url", url, "timeout", this.httpClient.Timeout)
this.log.Warnw("request to check that Apicurio Registry instance is available has timed out", "url", url, "timeout", this.httpClient.Timeout, "err", err)
} else {
this.log.Warnw("request to check that Apicurio Registry instance is available has failed", "url", url)
this.log.Warnw("request to check that Apicurio Registry instance is available has failed", "url", url, "err", err)
}
}
}
Expand All @@ -115,6 +118,9 @@ func (this *InitializingCF) Sense() {
func (this *InitializingCF) Compare() bool {
// Executing only when initializing
// Prevent loop from getting stable by only executing once
this.log.Debugln("this.disabled", this.disabled)
this.log.Debugln("this.initializing", this.initializing)
this.log.Debugln("this.ctx.GetAttempts()", this.ctx.GetAttempts())
return !this.disabled && this.initializing && this.ctx.GetAttempts() == 0
}

Expand Down
10 changes: 8 additions & 2 deletions controllers/cf/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestEnvCF(t *testing.T) {
},
}))
loop.Run()
ctx.Finalize()
c.AssertEquals(t, []corev1.EnvVar{
{
Name: "VAR_1_NAME",
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestEnvCF(t *testing.T) {
},
}))
loop.Run()
ctx.Finalize()
c.AssertEquals(t, []corev1.EnvVar{
{
Name: "VAR_3_NAME",
Expand Down Expand Up @@ -110,6 +112,7 @@ func TestEnvCF(t *testing.T) {
},
}))
loop.Run()
ctx.Finalize()
c.AssertEquals(t, []corev1.EnvVar{
{
Name: "VAR_3_NAME",
Expand Down Expand Up @@ -169,6 +172,7 @@ func TestEnvOrdering(t *testing.T) {
},
}))
loop.Run()
ctx.Finalize()
ctx.GetResourceCache().Set(resources.RC_KEY_SPEC, resources.NewResourceCacheEntry(ctx.GetAppName(), &v1.ApicurioRegistry{
Spec: v1.ApicurioRegistrySpec{
Configuration: v1.ApicurioRegistrySpecConfiguration{
Expand All @@ -186,6 +190,7 @@ func TestEnvOrdering(t *testing.T) {
},
}))
loop.Run()
ctx.Finalize()
sorted := ctx.GetEnvCache().GetSorted()
sortedI := convert(sorted)
c.AssertIsInOrder(t, sortedI,
Expand Down Expand Up @@ -230,13 +235,13 @@ func TestEnvPriority(t *testing.T) {
},
}))
loop.Run()

ctx.Finalize()
// TODO When overwriting an entry, previous dependencies are removed!
// Operator Managed
ctx.GetEnvCache().Set(env.NewSimpleEnvCacheEntryBuilder("VAR_2_NAME", "OPERATOR").Build())
ctx.GetEnvCache().Set(env.NewSimpleEnvCacheEntryBuilder("VAR_3_NAME", "OPERATOR").Build())
loop.Run()

ctx.Finalize()
// Deployment - User Managed
ctx.GetResourceCache().Set(resources.RC_KEY_DEPLOYMENT, resources.NewResourceCacheEntry(ctx.GetAppName(), &apps.Deployment{
ObjectMeta: meta.ObjectMeta{
Expand Down Expand Up @@ -269,6 +274,7 @@ func TestEnvPriority(t *testing.T) {
},
}))
loop.Run()
ctx.Finalize()

sorted := ctx.GetEnvCache().GetSorted()
sortedI := convert(sorted)
Expand Down
3 changes: 2 additions & 1 deletion controllers/loop/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ type LoopContext interface {
SetRequeueNow()
SetRequeueDelaySoon()
SetRequeueDelaySec(delay uint)
GetAndResetRequeue() (bool, time.Duration)
Finalize() (bool, time.Duration)
GetClients() *client.Clients
GetResourceCache() resources.ResourceCache
GetEnvCache() env.EnvCache
SetAttempts(attempts int)
GetAttempts() int
GetTestingSupport() *c.TestSupport
GetSupportedFeatures() *c.SupportedFeatures
GetReconcileSequence() int64
}
52 changes: 32 additions & 20 deletions controllers/loop/context/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,40 @@ import (
"github.com/Apicurio/apicurio-registry-operator/controllers/svc/env"
"github.com/Apicurio/apicurio-registry-operator/controllers/svc/resources"
"go.uber.org/zap"
"math"
"time"
)

var _ LoopContext = &loopContext{}

// A long-lived singleton container for shared, data only, 0 dependencies, components
type loopContext struct {
appName c.Name
appNamespace c.Namespace
log *zap.Logger
requeue bool
requeueDelay time.Duration
resourceCache resources.ResourceCache
envCache env.EnvCache
attempts int
clients *client.Clients
testing *c.TestSupport
features *c.SupportedFeatures
appName c.Name
appNamespace c.Namespace
log *zap.Logger
requeue bool
requeueDelay time.Duration
resourceCache resources.ResourceCache
envCache env.EnvCache
attempts int
clients *client.Clients
testing *c.TestSupport
features *c.SupportedFeatures
reconcileSequence int64
}

// Create a new context when the operator is deployed, provide mostly static data
func NewLoopContext(appName c.Name, appNamespace c.Namespace, log *zap.Logger, clients *client.Clients, testing *c.TestSupport, features *c.SupportedFeatures) LoopContext {
this := &loopContext{
appName: appName,
appNamespace: appNamespace,
requeue: false,
requeueDelay: 0,
clients: clients,
testing: testing,
log: log,
features: features,
appName: appName,
appNamespace: appNamespace,
requeue: false,
requeueDelay: 0,
clients: clients,
testing: testing,
log: log,
features: features,
reconcileSequence: 0,
}
this.resourceCache = resources.NewResourceCache()
this.envCache = env.NewEnvCache(log)
Expand Down Expand Up @@ -64,18 +67,23 @@ func (this *loopContext) SetRequeueDelaySoon() {
}

func (this *loopContext) SetRequeueDelaySec(delay uint) {
this.log.Sugar().Debugln("SetRequeueDelaySec called with", delay)
d := time.Duration(delay) * time.Second
if this.requeue == false || d < this.requeueDelay {
this.requeueDelay = d
this.requeue = true
}
}

func (this *loopContext) GetAndResetRequeue() (bool, time.Duration) {
func (this *loopContext) Finalize() (bool, time.Duration) {
defer func() {
this.requeue = false
this.requeueDelay = 0
}()
if this.reconcileSequence == math.MaxInt64 {
panic("int64 counter overflow. Restarting to reset.") // This will probably never happen
}
this.reconcileSequence += 1
return this.requeue, this.requeueDelay
}

Expand Down Expand Up @@ -106,3 +114,7 @@ func (this *loopContext) GetTestingSupport() *c.TestSupport {
func (this *loopContext) GetSupportedFeatures() *c.SupportedFeatures {
return this.features
}

func (this *loopContext) GetReconcileSequence() int64 {
return this.reconcileSequence
}
Loading

0 comments on commit f112323

Please sign in to comment.