diff --git a/config/manager/resources/manager.yaml b/config/manager/resources/manager.yaml index 65ada55d..fefa92e4 100644 --- a/config/manager/resources/manager.yaml +++ b/config/manager/resources/manager.yaml @@ -27,11 +27,11 @@ spec: - name: REGISTRY_VERSION value: 2.x - name: REGISTRY_IMAGE_MEM - value: "quay.io/apicurio/apicurio-registry-mem: latest-snapshot " + value: "quay.io/apicurio/apicurio-registry-mem:latest-snapshot" - name: REGISTRY_IMAGE_KAFKASQL - value: "quay.io/apicurio/apicurio-registry-kafkasql: latest-snapshot " + value: "quay.io/apicurio/apicurio-registry-kafkasql:latest-snapshot" - name: REGISTRY_IMAGE_SQL - value: "quay.io/apicurio/apicurio-registry-sql: latest-snapshot " + value: "quay.io/apicurio/apicurio-registry-sql:latest-snapshot" - name: WATCH_NAMESPACE valueFrom: fieldRef: diff --git a/controllers/apicurioregistry_controller.go b/controllers/apicurioregistry_controller.go index e81b07ec..ef33c8c2 100644 --- a/controllers/apicurioregistry_controller.go +++ b/controllers/apicurioregistry_controller.go @@ -201,7 +201,7 @@ func (this *ApicurioRegistryReconciler) Reconcile(_ go_ctx.Context, request reco controlLoop.Run() // Reschedule if requested - requeue, delay := controlLoop.GetContext().GetAndResetRequeue() + requeue, delay := controlLoop.GetContext().Finalize() return reconcile.Result{Requeue: requeue, RequeueAfter: delay}, nil } diff --git a/controllers/cf/cf_pod_template_spec.go b/controllers/cf/cf_pod_template_spec.go index 9cb456cb..a5dbdaa1 100644 --- a/controllers/cf/cf_pod_template_spec.go +++ b/controllers/cf/cf_pod_template_spec.go @@ -26,15 +26,31 @@ type PodTemplateSpecCF struct { previousBasePodTemplateSpec *ar.ApicurioRegistryPodTemplateSpec basePodTemplateSpec *ar.ApicurioRegistryPodTemplateSpec - valid bool - targetPodTemplateSpec *core.PodTemplateSpec + + previousDeploymentPodSpec *core.PodTemplateSpec + deploymentPodSpec *core.PodTemplateSpec + + valid bool + targetPodTemplateSpec *core.PodTemplateSpec + + lastActedReconcileSequence int64 } +// Using the Deployment directly to detemine whether the PTS has to be applied is a problem, +// because other CFs might modify the Deployment as well. +// We have no way of comparing the target PTS with the PTS in the Deployment, +// since we don't know which changes are from spec PTS and which from the CFs. +// To work around this, we will reconcile if: +// - The PTS in the spec has changed, or +// - The PTS in the Deployment has changed. +// If we are done, there will be no change in the Deployment between execution of this CF in subsequent reconciliations, +// so we don't have to update the PTS again. This may waste a few cycles, but I don't think we can do better than that. func NewPodTemplateSpecCF(ctx context.LoopContext, services services.LoopServices) loop.ControlFunction { res := &PodTemplateSpecCF{ - ctx: ctx, - svcResourceCache: ctx.GetResourceCache(), - services: services, + ctx: ctx, + svcResourceCache: ctx.GetResourceCache(), + services: services, + lastActedReconcileSequence: -2, } res.log = ctx.GetLog().Sugar().With("cf", res.Describe()) return res @@ -45,18 +61,36 @@ func (this *PodTemplateSpecCF) Describe() string { } func (this *PodTemplateSpecCF) Sense() { + + this.log.Debugw("Sense", + "this.ctx.GetReconcileSequence()", this.ctx.GetReconcileSequence(), + "this.lastActedReconcileSequence", this.lastActedReconcileSequence, + ) + + if this.lastActedReconcileSequence+1 == this.ctx.GetReconcileSequence() { + this.log.Debugln("Sense", "We have acted in the previous loop, record the previous PTS from the Deployment, and reschedule") + // We have acted in the previous loop, record the previous PTS from the Deployment, and reschedule + if deploymentEntry, deploymentExists := this.svcResourceCache.Get(resources.RC_KEY_DEPLOYMENT); deploymentExists { + this.log.Debugln("Sense", "Setting this.previousDeploymentPodSpec") + this.previousDeploymentPodSpec = &deploymentEntry.GetValue().(*apps.Deployment).Spec.Template + this.previousDeploymentPodSpec = this.previousDeploymentPodSpec.DeepCopy() // Defensive copy + this.ctx.SetRequeueNow() + return + } + } + this.valid = false if entry, exists := this.svcResourceCache.Get(resources.RC_KEY_SPEC); exists { this.basePodTemplateSpec = &entry.GetValue().(*ar.ApicurioRegistry).Spec.Deployment.PodTemplateSpecPreview - this.basePodTemplateSpec = this.basePodTemplateSpec.DeepCopy() // Defensive copy so we don't update the spec + this.basePodTemplateSpec = this.basePodTemplateSpec.DeepCopy() // Defensive copy so we won't update the spec if deploymentEntry, deploymentExists := this.svcResourceCache.Get(resources.RC_KEY_DEPLOYMENT); deploymentExists { - currentPodSpec := &deploymentEntry.GetValue().(*apps.Deployment).Spec.Template - currentPodSpec = currentPodSpec.DeepCopy() + this.deploymentPodSpec = &deploymentEntry.GetValue().(*apps.Deployment).Spec.Template + this.deploymentPodSpec = this.deploymentPodSpec.DeepCopy() // Defensive copy factoryPodSpec := this.services.GetKubeFactory().CreateDeployment().Spec.Template - targetPodSpec, err := SanitizeBasePodSpec(this.log, this.basePodTemplateSpec, currentPodSpec, &factoryPodSpec) + targetPodSpec, err := SanitizeBasePodSpec(this.log, this.basePodTemplateSpec, this.deploymentPodSpec, &factoryPodSpec) if err == nil { this.targetPodTemplateSpec, err = ConvertToPodTemplateSpec(targetPodSpec) if err == nil { @@ -68,7 +102,7 @@ func (this *PodTemplateSpecCF) Sense() { this.log.Errorw("an error has occurred when processing spec.deployment.podTemplateSpecPreview field", "error", err) this.services.GetConditionManager().GetConfigurationErrorCondition(). TransitionInvalid(err.Error(), "spec.deployment.podTemplateSpecPreview") - // No need to transition to not ready, since we can just with the previous config + // No need to transition to not-ready this.ctx.SetRequeueDelaySec(10) } } @@ -76,29 +110,49 @@ func (this *PodTemplateSpecCF) Sense() { } func (this *PodTemplateSpecCF) Compare() bool { + + if this.lastActedReconcileSequence == this.ctx.GetReconcileSequence() { + this.log.Debugln("Compare", "Act only once per reconciliation loop") + return false // Act only once per reconciliation loop + } + if this.lastActedReconcileSequence+1 == this.ctx.GetReconcileSequence() { + this.log.Debugln("Compare", "We have acted in the previous loop, recorded the previous PTS from the Deployment, and have to skip") + return false // We have acted in the reconciliation, recorded the previous PTS from the Deployment, and have to skip + } + + this.log.Debugw("Compare", "valid", this.valid) + if this.previousBasePodTemplateSpec != nil { // common.LogNillable does not work TODO find out why - this.log.Debugw("Obsevation #1", "this.previousBasePodTemplateSpec", this.previousBasePodTemplateSpec) + this.log.Debugw("Compare", "this.previousBasePodTemplateSpec", this.previousBasePodTemplateSpec) + } else { + this.log.Debugw("Compare", "this.previousBasePodTemplateSpec", "") + } + this.log.Debugw("Compare", "this.basePodTemplateSpec", this.basePodTemplateSpec) + + if this.previousDeploymentPodSpec != nil { // common.LogNillable does not work TODO find out why + this.log.Debugw("Compare", "this.previousDeploymentPodSpec", this.previousDeploymentPodSpec) } else { - this.log.Debugw("Obsevation #1", "this.previousBasePodTemplateSpec", "") + this.log.Debugw("Compare", "this.previousDeploymentPodSpec", "") } - this.log.Debugw("Obsevation #2", "this.basePodTemplateSpec", this.basePodTemplateSpec) - this.log.Debugw("Obsevation #3", "this.targetPodTemplateSpec", this.targetPodTemplateSpec) + this.log.Debugw("Compare", "this.deploymentPodSpec", this.deploymentPodSpec) + + this.log.Debugw("Compare", "!reflect.DeepEqual(this.basePodTemplateSpec, this.previousBasePodTemplateSpec)", !reflect.DeepEqual(this.basePodTemplateSpec, this.previousBasePodTemplateSpec)) + + this.log.Debugw("Compare", "!reflect.DeepEqual(this.deploymentPodSpec, this.previousDeploymentPodSpec)", !reflect.DeepEqual(this.deploymentPodSpec, this.previousDeploymentPodSpec)) + return this.valid && - // We're only comparing changes to the podSpecPreview, not the real pod spec, - // so we do not overwrite changes by the other CFs, which would cause a loop panic - (this.previousBasePodTemplateSpec == nil || !reflect.DeepEqual(this.basePodTemplateSpec, this.previousBasePodTemplateSpec)) + (!reflect.DeepEqual(this.basePodTemplateSpec, this.previousBasePodTemplateSpec) || !reflect.DeepEqual(this.deploymentPodSpec, this.previousDeploymentPodSpec)) } func (this *PodTemplateSpecCF) Respond() { - + this.log.Debugln("Respond", "start respond") if entry, exists := this.svcResourceCache.Get(resources.RC_KEY_DEPLOYMENT); exists { entry.ApplyPatch(func(value interface{}) interface{} { deployment := value.(*apps.Deployment).DeepCopy() - - deployment.Spec.Template = *this.targetPodTemplateSpec - this.previousBasePodTemplateSpec = this.basePodTemplateSpec - + deployment.Spec.Template = *this.targetPodTemplateSpec + this.lastActedReconcileSequence = this.ctx.GetReconcileSequence() + this.log.Debugln("Respond", "responded") return deployment }) } diff --git a/controllers/cf/condition/cf_app_health.go b/controllers/cf/condition/cf_app_health.go index 739bc863..23c5c57f 100644 --- a/controllers/cf/condition/cf_app_health.go +++ b/controllers/cf/condition/cf_app_health.go @@ -135,6 +135,9 @@ func (this *AppHealthCF) Compare() bool { // Executing AFTER initialization, // that part is handled by InitializingCF // Prevent loop from getting stable by only executing once + this.log.Debugln("this.disabled", this.disabled) + this.log.Debugln("this.initializing", this.initializing) + this.log.Debugln("this.ctx.GetAttempts()", this.ctx.GetAttempts()) return !this.disabled && !this.initializing && this.ctx.GetAttempts() == 0 } diff --git a/controllers/cf/condition/cf_initializing.go b/controllers/cf/condition/cf_initializing.go index 90399b0a..84763e03 100644 --- a/controllers/cf/condition/cf_initializing.go +++ b/controllers/cf/condition/cf_initializing.go @@ -89,7 +89,10 @@ func (this *InitializingCF) Sense() { this.requestOk = this.ctx.GetTestingSupport().GetMockCanMakeHTTPRequestToOperand(this.ctx.GetAppNamespace().Str()) } else { if this.targetType == core.ServiceTypeClusterIP && this.targetIP != "" { - url := scheme + this.targetIP + ":" + port + // NOTE: The client will follow redirects, but I have found that there is a strange issue with a cyclic redirect: + // http://172.30.162.200:8080 -> http://172.30.162.200:8080/ui -> http://172.30.162.200:8080/ui + // that ends with the client returning status 404. Therefore, we are using /apis instead. + url := scheme + this.targetIP + ":" + port + "/apis" res, err := this.httpClient.Get(url) if err == nil { defer res.Body.Close() @@ -99,9 +102,9 @@ func (this *InitializingCF) Sense() { this.log.Warnw("request to check that Apicurio Registry instance is available has failed with a status", "url", url, "status", res.StatusCode) } } else if os.IsTimeout(err) { - this.log.Warnw("request to check that Apicurio Registry instance is available has timed out", "url", url, "timeout", this.httpClient.Timeout) + this.log.Warnw("request to check that Apicurio Registry instance is available has timed out", "url", url, "timeout", this.httpClient.Timeout, "err", err) } else { - this.log.Warnw("request to check that Apicurio Registry instance is available has failed", "url", url) + this.log.Warnw("request to check that Apicurio Registry instance is available has failed", "url", url, "err", err) } } } @@ -115,6 +118,9 @@ func (this *InitializingCF) Sense() { func (this *InitializingCF) Compare() bool { // Executing only when initializing // Prevent loop from getting stable by only executing once + this.log.Debugln("this.disabled", this.disabled) + this.log.Debugln("this.initializing", this.initializing) + this.log.Debugln("this.ctx.GetAttempts()", this.ctx.GetAttempts()) return !this.disabled && this.initializing && this.ctx.GetAttempts() == 0 } diff --git a/controllers/cf/env_test.go b/controllers/cf/env_test.go index 1436722f..d32a369a 100644 --- a/controllers/cf/env_test.go +++ b/controllers/cf/env_test.go @@ -42,6 +42,7 @@ func TestEnvCF(t *testing.T) { }, })) loop.Run() + ctx.Finalize() c.AssertEquals(t, []corev1.EnvVar{ { Name: "VAR_1_NAME", @@ -78,6 +79,7 @@ func TestEnvCF(t *testing.T) { }, })) loop.Run() + ctx.Finalize() c.AssertEquals(t, []corev1.EnvVar{ { Name: "VAR_3_NAME", @@ -110,6 +112,7 @@ func TestEnvCF(t *testing.T) { }, })) loop.Run() + ctx.Finalize() c.AssertEquals(t, []corev1.EnvVar{ { Name: "VAR_3_NAME", @@ -169,6 +172,7 @@ func TestEnvOrdering(t *testing.T) { }, })) loop.Run() + ctx.Finalize() ctx.GetResourceCache().Set(resources.RC_KEY_SPEC, resources.NewResourceCacheEntry(ctx.GetAppName(), &v1.ApicurioRegistry{ Spec: v1.ApicurioRegistrySpec{ Configuration: v1.ApicurioRegistrySpecConfiguration{ @@ -186,6 +190,7 @@ func TestEnvOrdering(t *testing.T) { }, })) loop.Run() + ctx.Finalize() sorted := ctx.GetEnvCache().GetSorted() sortedI := convert(sorted) c.AssertIsInOrder(t, sortedI, @@ -230,13 +235,13 @@ func TestEnvPriority(t *testing.T) { }, })) loop.Run() - + ctx.Finalize() // TODO When overwriting an entry, previous dependencies are removed! // Operator Managed ctx.GetEnvCache().Set(env.NewSimpleEnvCacheEntryBuilder("VAR_2_NAME", "OPERATOR").Build()) ctx.GetEnvCache().Set(env.NewSimpleEnvCacheEntryBuilder("VAR_3_NAME", "OPERATOR").Build()) loop.Run() - + ctx.Finalize() // Deployment - User Managed ctx.GetResourceCache().Set(resources.RC_KEY_DEPLOYMENT, resources.NewResourceCacheEntry(ctx.GetAppName(), &apps.Deployment{ ObjectMeta: meta.ObjectMeta{ @@ -269,6 +274,7 @@ func TestEnvPriority(t *testing.T) { }, })) loop.Run() + ctx.Finalize() sorted := ctx.GetEnvCache().GetSorted() sortedI := convert(sorted) diff --git a/controllers/loop/context/context.go b/controllers/loop/context/context.go index 2026e119..b8aa2cac 100644 --- a/controllers/loop/context/context.go +++ b/controllers/loop/context/context.go @@ -16,7 +16,7 @@ type LoopContext interface { SetRequeueNow() SetRequeueDelaySoon() SetRequeueDelaySec(delay uint) - GetAndResetRequeue() (bool, time.Duration) + Finalize() (bool, time.Duration) GetClients() *client.Clients GetResourceCache() resources.ResourceCache GetEnvCache() env.EnvCache @@ -24,4 +24,5 @@ type LoopContext interface { GetAttempts() int GetTestingSupport() *c.TestSupport GetSupportedFeatures() *c.SupportedFeatures + GetReconcileSequence() int64 } diff --git a/controllers/loop/context/context_impl.go b/controllers/loop/context/context_impl.go index 038df849..89c3afa3 100644 --- a/controllers/loop/context/context_impl.go +++ b/controllers/loop/context/context_impl.go @@ -6,6 +6,7 @@ import ( "github.com/Apicurio/apicurio-registry-operator/controllers/svc/env" "github.com/Apicurio/apicurio-registry-operator/controllers/svc/resources" "go.uber.org/zap" + "math" "time" ) @@ -13,30 +14,32 @@ var _ LoopContext = &loopContext{} // A long-lived singleton container for shared, data only, 0 dependencies, components type loopContext struct { - appName c.Name - appNamespace c.Namespace - log *zap.Logger - requeue bool - requeueDelay time.Duration - resourceCache resources.ResourceCache - envCache env.EnvCache - attempts int - clients *client.Clients - testing *c.TestSupport - features *c.SupportedFeatures + appName c.Name + appNamespace c.Namespace + log *zap.Logger + requeue bool + requeueDelay time.Duration + resourceCache resources.ResourceCache + envCache env.EnvCache + attempts int + clients *client.Clients + testing *c.TestSupport + features *c.SupportedFeatures + reconcileSequence int64 } // Create a new context when the operator is deployed, provide mostly static data func NewLoopContext(appName c.Name, appNamespace c.Namespace, log *zap.Logger, clients *client.Clients, testing *c.TestSupport, features *c.SupportedFeatures) LoopContext { this := &loopContext{ - appName: appName, - appNamespace: appNamespace, - requeue: false, - requeueDelay: 0, - clients: clients, - testing: testing, - log: log, - features: features, + appName: appName, + appNamespace: appNamespace, + requeue: false, + requeueDelay: 0, + clients: clients, + testing: testing, + log: log, + features: features, + reconcileSequence: 0, } this.resourceCache = resources.NewResourceCache() this.envCache = env.NewEnvCache(log) @@ -64,6 +67,7 @@ func (this *loopContext) SetRequeueDelaySoon() { } func (this *loopContext) SetRequeueDelaySec(delay uint) { + this.log.Sugar().Debugln("SetRequeueDelaySec called with", delay) d := time.Duration(delay) * time.Second if this.requeue == false || d < this.requeueDelay { this.requeueDelay = d @@ -71,11 +75,15 @@ func (this *loopContext) SetRequeueDelaySec(delay uint) { } } -func (this *loopContext) GetAndResetRequeue() (bool, time.Duration) { +func (this *loopContext) Finalize() (bool, time.Duration) { defer func() { this.requeue = false this.requeueDelay = 0 }() + if this.reconcileSequence == math.MaxInt64 { + panic("int64 counter overflow. Restarting to reset.") // This will probably never happen + } + this.reconcileSequence += 1 return this.requeue, this.requeueDelay } @@ -106,3 +114,7 @@ func (this *loopContext) GetTestingSupport() *c.TestSupport { func (this *loopContext) GetSupportedFeatures() *c.SupportedFeatures { return this.features } + +func (this *loopContext) GetReconcileSequence() int64 { + return this.reconcileSequence +} diff --git a/controllers/loop/context/context_mock.go b/controllers/loop/context/context_mock.go index cee19e38..bef043a8 100644 --- a/controllers/loop/context/context_mock.go +++ b/controllers/loop/context/context_mock.go @@ -6,24 +6,27 @@ import ( "github.com/Apicurio/apicurio-registry-operator/controllers/svc/env" "github.com/Apicurio/apicurio-registry-operator/controllers/svc/resources" "go.uber.org/zap" + "math" "time" ) var _ LoopContext = &LoopContextMock{} type LoopContextMock struct { - appName c.Name - appNamespace c.Namespace - log *zap.Logger - resourceCache resources.ResourceCache - envCache env.EnvCache - attempts int + appName c.Name + appNamespace c.Namespace + log *zap.Logger + resourceCache resources.ResourceCache + envCache env.EnvCache + attempts int + reconcileSequence int64 } func NewLoopContextMock() *LoopContextMock { res := &LoopContextMock{ - appName: c.Name("mock"), - appNamespace: c.Namespace("mock"), + appName: c.Name("mock"), + appNamespace: c.Namespace("mock"), + reconcileSequence: 0, } res.log = c.GetRootLogger(true) res.resourceCache = resources.NewResourceCache() @@ -55,8 +58,12 @@ func (this *LoopContextMock) SetRequeueDelaySec(delay uint) { panic("Not implemented") } -func (this *LoopContextMock) GetAndResetRequeue() (bool, time.Duration) { - panic("Not implemented") +func (this *LoopContextMock) Finalize() (bool, time.Duration) { + if this.reconcileSequence == math.MaxInt64 { + panic("int64 counter overflow. Restarting to reset.") // This will probably never happen + } + this.reconcileSequence += 1 + return false, 0 } func (this *LoopContextMock) GetResourceCache() resources.ResourceCache { @@ -86,3 +93,7 @@ func (this *LoopContextMock) GetTestingSupport() *c.TestSupport { func (this *LoopContextMock) GetSupportedFeatures() *c.SupportedFeatures { panic("Not implemented") } + +func (this *LoopContextMock) GetReconcileSequence() int64 { + return this.reconcileSequence +} diff --git a/install/install.yaml b/install/install.yaml index 5e6dcd34..f160d545 100644 --- a/install/install.yaml +++ b/install/install.yaml @@ -5140,11 +5140,11 @@ spec: - name: REGISTRY_VERSION value: 2.x - name: REGISTRY_IMAGE_MEM - value: 'quay.io/apicurio/apicurio-registry-mem: latest-snapshot ' + value: 'quay.io/apicurio/apicurio-registry-mem:latest-snapshot' - name: REGISTRY_IMAGE_KAFKASQL - value: 'quay.io/apicurio/apicurio-registry-kafkasql: latest-snapshot ' + value: 'quay.io/apicurio/apicurio-registry-kafkasql:latest-snapshot' - name: REGISTRY_IMAGE_SQL - value: 'quay.io/apicurio/apicurio-registry-sql: latest-snapshot ' + value: 'quay.io/apicurio/apicurio-registry-sql:latest-snapshot' - name: WATCH_NAMESPACE valueFrom: fieldRef: