diff --git a/src/go/k8s/config/e2e-tests/kustomization.yaml b/src/go/k8s/config/e2e-tests/kustomization.yaml index 0d8e67cfb268..d83c8cab14bf 100644 --- a/src/go/k8s/config/e2e-tests/kustomization.yaml +++ b/src/go/k8s/config/e2e-tests/kustomization.yaml @@ -1,9 +1,4 @@ -bases: -- ../default - -patchesStrategicMerge: -- manager.yaml - +--- apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: @@ -13,4 +8,7 @@ images: - name: vectorized/configurator newName: localhost/configurator newTag: dev - +resources: + - ../default +patches: + - path: manager.yaml diff --git a/src/go/k8s/config/e2e-tests/manager.yaml b/src/go/k8s/config/e2e-tests/manager.yaml index e4add0e8f239..a4773d506665 100644 --- a/src/go/k8s/config/e2e-tests/manager.yaml +++ b/src/go/k8s/config/e2e-tests/manager.yaml @@ -20,6 +20,7 @@ spec: - "--allow-pvc-deletion=true" - "--superusers-prefix=__redpanda_system__" - "--log-level=trace" + - "--unsafe-decommission-failed-brokers=true" livenessProbe: timeoutSeconds: 10 readinessProbe: diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 4b1632871dd2..5ebeb71e5aec 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -45,7 +45,6 @@ import ( "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/networking" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" - "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/certmanager" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" ) @@ -76,6 +75,7 @@ type ClusterReconciler struct { MetricsTimeout time.Duration RestrictToRedpandaVersion string allowPVCDeletion bool + GhostDecommissioning bool } //+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete @@ -270,6 +270,9 @@ func (r *ClusterReconciler) Reconcile( if err := r.setPodNodeIDLabel(ctx, &vectorizedCluster, log, ar); err != nil { return ctrl.Result{}, fmt.Errorf("setting pod node_id label: %w", err) } + if err := r.decommissionGhostBrokers(ctx, &vectorizedCluster, log, ar); err != nil { + return ctrl.Result{}, fmt.Errorf("deleting ghost brokers: %w", err) + } // want: refactor above to resources (i.e. setInitialSuperUserPassword, reconcileConfiguration) // ensuring license must be at the end when condition ClusterConfigured=true and AdminAPI is ready @@ -962,6 +965,79 @@ func (r *ClusterReconciler) setInitialSuperUserPassword( return utilerrors.NewAggregate(errs) } +// decommissionGhostBrokers decommissions brokers that redpanda thinks exists, but aren't assigned to any pods +// This is not a reversible process. If creating a new broker due to an empty disk was a mistake, the data +// that was on that disk will be unusable. +func (r *ClusterReconciler) decommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error { + if r.GhostDecommissioning { + return r.doDecommissionGhostBrokers(c, vCluster, l, ar) + } + return nil +} + +// custom error to satisfy err113 +type missingBrokerIDError struct{} + +func (m *missingBrokerIDError) Error() string { + return "a pod is temporarily missing the broker-id annotation" +} + +func (r *ClusterReconciler) doDecommissionGhostBrokers(c context.Context, vCluster *vectorizedv1alpha1.Cluster, l logr.Logger, ar *attachedResources) error { + ctx, done := context.WithCancel(c) + defer done() + log := l.WithName("deleteGhostBrokers") + log.V(logger.DebugLevel).Info("deleting ghost brokers") + + pods, err := r.podList(ctx, vCluster) + if err != nil { + return fmt.Errorf("unable to fetch PodList: %w", err) + } + + pki, err := ar.getPKI() + if err != nil { + return fmt.Errorf("getting pki: %w", err) + } + + adminClient, err := r.AdminAPIClientFactory(ctx, r.Client, vCluster, ar.getHeadlessServiceFQDN(), pki.AdminAPIConfigProvider()) + if err != nil { + return fmt.Errorf("unable to create admin client: %w", err) + } + adminBrokers, err := adminClient.Brokers(ctx) + if err != nil { + return fmt.Errorf("unable to fetch brokers: %w", err) + } + + actualBrokerIDs := make(map[int]any, len(pods.Items)) + for i := range pods.Items { + pod := &pods.Items[i] + if pod.Annotations == nil { + return fmt.Errorf("requeuing: %w", &missingBrokerIDError{}) + } + nodeIDStrAnnotation, annotationExist := pod.Annotations[resources.PodAnnotationNodeIDKey] + if !annotationExist { + return fmt.Errorf("requeuing: %w", &missingBrokerIDError{}) + } + id, err := strconv.Atoi(nodeIDStrAnnotation) + if err != nil { + return fmt.Errorf("pod %s has an invalid broker-id annotation: %q: %w", pod.Name, nodeIDStrAnnotation, err) + } + actualBrokerIDs[id] = nil + } + + // if the admin API shows brokers that are not assigned to existing pods, decommission them + for i := range adminBrokers { + broker := adminBrokers[i] + if _, ok := actualBrokerIDs[broker.NodeID]; ok { + continue + } + if err := adminClient.DecommissionBroker(ctx, broker.NodeID); err != nil { + return fmt.Errorf("failed to decommission ghost broker: %w", err) + } + } + + return nil +} + // createUserOnAdminAPI will return to requeue only when api error occurred func createUserOnAdminAPI(ctx context.Context, adminAPI adminutils.AdminAPIClient, secret *corev1.Secret) error { username := string(secret.Data[corev1.BasicAuthUsernameKey]) @@ -1128,374 +1204,3 @@ func isRedpandaClusterVersionManaged( } return true } - -type attachedResources struct { - ctx context.Context - reconciler *ClusterReconciler - log logr.Logger - cluster *vectorizedv1alpha1.Cluster - items map[string]resources.Resource -} - -const ( - bootstrapService = "BootstrapService" - clusterRole = "ClusterRole" - clusterRoleBinding = "ClusterRoleBinding" - clusterService = "ClusterPorts" - configMap = "ConfigMap" - headlessService = "HeadlessService" - ingress = "Ingress" - nodeportService = "NodeportService" - pki = "PKI" - podDisruptionBudget = "PodDisruptionBudget" - proxySuperuser = "ProxySuperuser" - schemaRegistrySuperUser = "SchemaRegistrySuperUser" - serviceAccount = "ServiceAccount" - secret = "Secret" - statefulSet = "StatefulSet" -) - -func newAttachedResources(ctx context.Context, r *ClusterReconciler, log logr.Logger, cluster *vectorizedv1alpha1.Cluster) *attachedResources { - return &attachedResources{ - ctx: ctx, - reconciler: r, - log: log, - cluster: cluster, - items: map[string]resources.Resource{}, - } -} - -type resourceKey string - -func (a *attachedResources) Ensure() (ctrl.Result, error) { - result := ctrl.Result{} - var errs error - for key, resource := range a.items { - if resource == nil { - continue - } - err := resource.Ensure(context.WithValue(a.ctx, resourceKey("resource"), key)) - var e *resources.RequeueAfterError - if errors.As(err, &e) { - a.log.Info(e.Error()) - if result.RequeueAfter < e.RequeueAfter { - result = ctrl.Result{RequeueAfter: e.RequeueAfter} - } - } else if err != nil { - a.log.Error(err, "Failed to reconcile resource", "resource", key) - errs = errors.Join(errs, err) - } - } - return result, errs -} - -func (a *attachedResources) bootstrapService() { - // if already initialized, exit immediately - if _, ok := a.items[bootstrapService]; ok { - return - } - redpandaPorts := networking.NewRedpandaPorts(a.cluster) - loadbalancerPorts := collectLBPorts(redpandaPorts) - a.items[bootstrapService] = resources.NewLoadBalancerService(a.reconciler.Client, a.cluster, a.reconciler.Scheme, loadbalancerPorts, true, a.log) -} - -func (a *attachedResources) getBootstrapService() *resources.LoadBalancerServiceResource { - a.bootstrapService() - return a.items[bootstrapService].(*resources.LoadBalancerServiceResource) -} - -func (a *attachedResources) getBootstrapServiceKey() types.NamespacedName { - return a.getBootstrapService().Key() -} - -func (a *attachedResources) clusterRole() { - // if already initialized, exit immediately - if _, ok := a.items[clusterRole]; ok { - return - } - a.items[clusterRole] = resources.NewClusterRole(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.log) -} - -func (a *attachedResources) clusterRoleBinding() { - // if already initialized, exit immediately - if _, ok := a.items[clusterRoleBinding]; ok { - return - } - a.items[clusterRoleBinding] = resources.NewClusterRoleBinding(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.log) -} - -func (a *attachedResources) getClusterRoleBinding() *resources.ClusterRoleBindingResource { - a.clusterRoleBinding() - return a.items[clusterRoleBinding].(*resources.ClusterRoleBindingResource) -} - -func (a *attachedResources) clusterService() { - // if already initialized, exit immediately - if _, ok := a.items[clusterService]; ok { - return - } - redpandaPorts := networking.NewRedpandaPorts(a.cluster) - clusterPorts := collectClusterPorts(redpandaPorts, a.cluster) - a.items[clusterService] = resources.NewClusterService(a.reconciler.Client, a.cluster, a.reconciler.Scheme, clusterPorts, a.log) -} - -func (a *attachedResources) getClusterService() *resources.ClusterServiceResource { - a.clusterService() - return a.items[clusterService].(*resources.ClusterServiceResource) -} - -func (a *attachedResources) getClusterServiceName() string { - return a.getClusterService().Key().Name -} - -func (a *attachedResources) getClusterServiceFQDN() string { - return a.getClusterService().ServiceFQDN(a.reconciler.clusterDomain) -} - -func (a *attachedResources) configMap() error { - // if already initialized, exit immediately - if _, ok := a.items[configMap]; ok { - return nil - } - - proxySASLUserKey := a.getProxySuperUserKey() - schemaRegistrySASLUserKey := a.getSchemaRegistrySuperUserKey() - - err := a.pki() - if err != nil { - return err - } - pki := a.items[pki].(*certmanager.PkiReconciler) - - a.items[configMap] = resources.NewConfigMap(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.getHeadlessServiceFQDN(), proxySASLUserKey, schemaRegistrySASLUserKey, pki.BrokerTLSConfigProvider(), a.log) - return nil -} - -func (a *attachedResources) getConfigMap() (*resources.ConfigMapResource, error) { - err := a.configMap() - if err != nil { - return nil, err - } - return a.items[configMap].(*resources.ConfigMapResource), nil -} - -func (a *attachedResources) headlessService() { - // if already initialized, exit immediately - if _, ok := a.items[headlessService]; ok { - return - } - redpandaPorts := networking.NewRedpandaPorts(a.cluster) - headlessPorts := collectHeadlessPorts(redpandaPorts) - - a.items[headlessService] = resources.NewHeadlessService(a.reconciler.Client, a.cluster, a.reconciler.Scheme, headlessPorts, a.log) -} - -func (a *attachedResources) getHeadlessService() *resources.HeadlessServiceResource { - a.headlessService() - return a.items[headlessService].(*resources.HeadlessServiceResource) -} - -func (a *attachedResources) getHeadlessServiceKey() types.NamespacedName { - return a.getHeadlessService().Key() -} - -func (a *attachedResources) getHeadlessServiceName() string { - return a.getHeadlessServiceKey().Name -} - -func (a *attachedResources) getHeadlessServiceFQDN() string { - return a.getHeadlessService().HeadlessServiceFQDN(a.reconciler.clusterDomain) -} - -func (a *attachedResources) ingress() { - // if already initialized, exit immediately - if _, ok := a.items[ingress]; ok { - return - } - clusterServiceName := a.getClusterServiceName() - - var pandaProxyIngressConfig *vectorizedv1alpha1.IngressConfig - subdomain := "" - proxyAPIExternal := a.cluster.PandaproxyAPIExternal() - if proxyAPIExternal != nil { - subdomain = proxyAPIExternal.External.Subdomain - pandaProxyIngressConfig = proxyAPIExternal.External.Ingress - } - - a.items[ingress] = resources.NewIngress( - a.reconciler.Client, - a.cluster, - a.reconciler.Scheme, - subdomain, - clusterServiceName, - resources.PandaproxyPortExternalName, - a.log).WithAnnotations(map[string]string{resources.SSLPassthroughAnnotation: "true"}).WithUserConfig(pandaProxyIngressConfig) -} - -func (a *attachedResources) nodeportService() { - // if already initialized, exit immediately - if _, ok := a.items[nodeportService]; ok { - return - } - redpandaPorts := networking.NewRedpandaPorts(a.cluster) - nodeports := collectNodePorts(redpandaPorts) - a.items[nodeportService] = resources.NewNodePortService(a.reconciler.Client, a.cluster, a.reconciler.Scheme, nodeports, a.log) -} - -func (a *attachedResources) getNodeportService() *resources.NodePortServiceResource { - a.nodeportService() - return a.items[nodeportService].(*resources.NodePortServiceResource) -} - -func (a *attachedResources) getNodeportServiceKey() types.NamespacedName { - return a.getNodeportService().Key() -} - -func (a *attachedResources) pki() error { - // if already initialized, exit immediately - if _, ok := a.items[pki]; ok { - return nil - } - - newPKI, err := certmanager.NewPki(a.ctx, a.reconciler.Client, a.cluster, a.getHeadlessServiceFQDN(), a.getClusterServiceFQDN(), a.reconciler.Scheme, a.log) - if err != nil { - return fmt.Errorf("creating pki: %w", err) - } - - a.items[pki] = newPKI - return nil -} - -func (a *attachedResources) getPKI() (*certmanager.PkiReconciler, error) { - err := a.pki() - if err != nil { - return nil, err - } - return a.items[pki].(*certmanager.PkiReconciler), nil -} - -func (a *attachedResources) podDisruptionBudget() { - // if already initialized, exit immediately - if _, ok := a.items[podDisruptionBudget]; ok { - return - } - a.items[podDisruptionBudget] = resources.NewPDB(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.log) -} - -func (a *attachedResources) proxySuperuser() { - // if already initialized, exit immediately - if _, ok := a.items[proxySuperuser]; ok { - return - } - - var proxySASLUser *resources.SuperUsersResource - a.items[proxySuperuser] = proxySASLUser - if a.cluster.IsSASLOnInternalEnabled() && a.cluster.PandaproxyAPIInternal() != nil { - a.items[proxySuperuser] = resources.NewSuperUsers(a.reconciler.Client, a.cluster, a.reconciler.Scheme, resources.ScramPandaproxyUsername, resources.PandaProxySuffix, a.log) - } -} - -func (a *attachedResources) getProxySuperuser() *resources.SuperUsersResource { - a.proxySuperuser() - return a.items[proxySuperuser].(*resources.SuperUsersResource) -} - -func (a *attachedResources) getProxySuperUserKey() types.NamespacedName { - if a.getProxySuperuser() == nil { - return types.NamespacedName{} - } - return a.getProxySuperuser().Key() -} - -func (a *attachedResources) schemaRegistrySuperUser() { - // if already initialized, exit immediately - if _, ok := a.items[schemaRegistrySuperUser]; ok { - return - } - - var schemaRegistrySASLUser *resources.SuperUsersResource - a.items[schemaRegistrySuperUser] = schemaRegistrySASLUser - if a.cluster.IsSASLOnInternalEnabled() && a.cluster.Spec.Configuration.SchemaRegistry != nil { - a.items[schemaRegistrySuperUser] = resources.NewSuperUsers(a.reconciler.Client, a.cluster, a.reconciler.Scheme, resources.ScramSchemaRegistryUsername, resources.SchemaRegistrySuffix, a.log) - } -} - -func (a *attachedResources) getSchemaRegistrySuperUser() *resources.SuperUsersResource { - a.schemaRegistrySuperUser() - return a.items[schemaRegistrySuperUser].(*resources.SuperUsersResource) -} - -func (a *attachedResources) getSchemaRegistrySuperUserKey() types.NamespacedName { - if a.getSchemaRegistrySuperUser() == nil { - return types.NamespacedName{} - } - return a.getSchemaRegistrySuperUser().Key() -} - -func (a *attachedResources) serviceAccount() { - // if already initialized, exit immediately - if _, ok := a.items[serviceAccount]; ok { - return - } - a.items[serviceAccount] = resources.NewServiceAccount(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.log) -} - -func (a *attachedResources) getServiceAccount() *resources.ServiceAccountResource { - a.serviceAccount() - return a.items[serviceAccount].(*resources.ServiceAccountResource) -} - -func (a *attachedResources) getServiceAccountKey() types.NamespacedName { - return a.getServiceAccount().Key() -} - -func (a *attachedResources) getServiceAccountName() string { - return a.getServiceAccountKey().Name -} - -func (a *attachedResources) secret() { - // if already initialized, exit immediately - if _, ok := a.items[secret]; ok { - return - } - a.items[secret] = resources.PreStartStopScriptSecret(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.getHeadlessServiceFQDN(), a.getProxySuperUserKey(), a.getSchemaRegistrySuperUserKey(), a.log) -} - -func (a *attachedResources) statefulSet() error { - // if already initialized, exit immediately - if _, ok := a.items[statefulSet]; ok { - return nil - } - pki, err := a.getPKI() - if err != nil { - return err - } - cm, err := a.getConfigMap() - if err != nil { - return err - } - a.items[statefulSet] = resources.NewStatefulSet( - a.reconciler.Client, - a.cluster, - a.reconciler.Scheme, - a.getHeadlessServiceFQDN(), - a.getHeadlessServiceName(), - a.getNodeportServiceKey(), - pki.StatefulSetVolumeProvider(), - pki.AdminAPIConfigProvider(), - a.getServiceAccountName(), - a.reconciler.configuratorSettings, - cm.GetNodeConfigHash, - a.reconciler.AdminAPIClientFactory, - a.reconciler.DecommissionWaitInterval, - a.log, - a.reconciler.MetricsTimeout) - return nil -} - -func (a *attachedResources) getStatefulSet() (*resources.StatefulSetResource, error) { - if err := a.statefulSet(); err != nil { - return nil, err - } - return a.items[statefulSet].(*resources.StatefulSetResource), nil -} diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_attached_resources.go b/src/go/k8s/controllers/redpanda/cluster_controller_attached_resources.go new file mode 100644 index 000000000000..ed5c7ca11390 --- /dev/null +++ b/src/go/k8s/controllers/redpanda/cluster_controller_attached_resources.go @@ -0,0 +1,386 @@ +package redpanda + +import ( + "context" + "errors" + "fmt" + + "github.com/go-logr/logr" + vectorizedv1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/vectorized/v1alpha1" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/networking" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/certmanager" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" +) + +type attachedResources struct { + ctx context.Context + reconciler *ClusterReconciler + log logr.Logger + cluster *vectorizedv1alpha1.Cluster + items map[string]resources.Resource +} + +const ( + bootstrapService = "BootstrapService" + clusterRole = "ClusterRole" + clusterRoleBinding = "ClusterRoleBinding" + clusterService = "ClusterPorts" + configMap = "ConfigMap" + headlessService = "HeadlessService" + ingress = "Ingress" + nodeportService = "NodeportService" + pki = "PKI" + podDisruptionBudget = "PodDisruptionBudget" + proxySuperuser = "ProxySuperuser" + schemaRegistrySuperUser = "SchemaRegistrySuperUser" + serviceAccount = "ServiceAccount" + secret = "Secret" + statefulSet = "StatefulSet" +) + +func newAttachedResources(ctx context.Context, r *ClusterReconciler, log logr.Logger, cluster *vectorizedv1alpha1.Cluster) *attachedResources { + return &attachedResources{ + ctx: ctx, + reconciler: r, + log: log, + cluster: cluster, + items: map[string]resources.Resource{}, + } +} + +type resourceKey string + +func (a *attachedResources) Ensure() (ctrl.Result, error) { + result := ctrl.Result{} + var errs error + for key, resource := range a.items { + if resource == nil { + continue + } + err := resource.Ensure(context.WithValue(a.ctx, resourceKey("resource"), key)) + var e *resources.RequeueAfterError + if errors.As(err, &e) { + a.log.Info(e.Error()) + if result.RequeueAfter < e.RequeueAfter { + result = ctrl.Result{RequeueAfter: e.RequeueAfter} + } + } else if err != nil { + a.log.Error(err, "Failed to reconcile resource", "resource", key) + errs = errors.Join(errs, err) + } + } + return result, errs +} + +func (a *attachedResources) bootstrapService() { + // if already initialized, exit immediately + if _, ok := a.items[bootstrapService]; ok { + return + } + redpandaPorts := networking.NewRedpandaPorts(a.cluster) + loadbalancerPorts := collectLBPorts(redpandaPorts) + a.items[bootstrapService] = resources.NewLoadBalancerService(a.reconciler.Client, a.cluster, a.reconciler.Scheme, loadbalancerPorts, true, a.log) +} + +func (a *attachedResources) getBootstrapService() *resources.LoadBalancerServiceResource { + a.bootstrapService() + return a.items[bootstrapService].(*resources.LoadBalancerServiceResource) +} + +func (a *attachedResources) getBootstrapServiceKey() types.NamespacedName { + return a.getBootstrapService().Key() +} + +func (a *attachedResources) clusterRole() { + // if already initialized, exit immediately + if _, ok := a.items[clusterRole]; ok { + return + } + a.items[clusterRole] = resources.NewClusterRole(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.log) +} + +func (a *attachedResources) clusterRoleBinding() { + // if already initialized, exit immediately + if _, ok := a.items[clusterRoleBinding]; ok { + return + } + a.items[clusterRoleBinding] = resources.NewClusterRoleBinding(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.log) +} + +func (a *attachedResources) getClusterRoleBinding() *resources.ClusterRoleBindingResource { + a.clusterRoleBinding() + return a.items[clusterRoleBinding].(*resources.ClusterRoleBindingResource) +} + +func (a *attachedResources) clusterService() { + // if already initialized, exit immediately + if _, ok := a.items[clusterService]; ok { + return + } + redpandaPorts := networking.NewRedpandaPorts(a.cluster) + clusterPorts := collectClusterPorts(redpandaPorts, a.cluster) + a.items[clusterService] = resources.NewClusterService(a.reconciler.Client, a.cluster, a.reconciler.Scheme, clusterPorts, a.log) +} + +func (a *attachedResources) getClusterService() *resources.ClusterServiceResource { + a.clusterService() + return a.items[clusterService].(*resources.ClusterServiceResource) +} + +func (a *attachedResources) getClusterServiceName() string { + return a.getClusterService().Key().Name +} + +func (a *attachedResources) getClusterServiceFQDN() string { + return a.getClusterService().ServiceFQDN(a.reconciler.clusterDomain) +} + +func (a *attachedResources) configMap() error { + // if already initialized, exit immediately + if _, ok := a.items[configMap]; ok { + return nil + } + + proxySASLUserKey := a.getProxySuperUserKey() + schemaRegistrySASLUserKey := a.getSchemaRegistrySuperUserKey() + + err := a.pki() + if err != nil { + return err + } + pki := a.items[pki].(*certmanager.PkiReconciler) + + a.items[configMap] = resources.NewConfigMap(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.getHeadlessServiceFQDN(), proxySASLUserKey, schemaRegistrySASLUserKey, pki.BrokerTLSConfigProvider(), a.log) + return nil +} + +func (a *attachedResources) getConfigMap() (*resources.ConfigMapResource, error) { + err := a.configMap() + if err != nil { + return nil, err + } + return a.items[configMap].(*resources.ConfigMapResource), nil +} + +func (a *attachedResources) headlessService() { + // if already initialized, exit immediately + if _, ok := a.items[headlessService]; ok { + return + } + redpandaPorts := networking.NewRedpandaPorts(a.cluster) + headlessPorts := collectHeadlessPorts(redpandaPorts) + + a.items[headlessService] = resources.NewHeadlessService(a.reconciler.Client, a.cluster, a.reconciler.Scheme, headlessPorts, a.log) +} + +func (a *attachedResources) getHeadlessService() *resources.HeadlessServiceResource { + a.headlessService() + return a.items[headlessService].(*resources.HeadlessServiceResource) +} + +func (a *attachedResources) getHeadlessServiceKey() types.NamespacedName { + return a.getHeadlessService().Key() +} + +func (a *attachedResources) getHeadlessServiceName() string { + return a.getHeadlessServiceKey().Name +} + +func (a *attachedResources) getHeadlessServiceFQDN() string { + return a.getHeadlessService().HeadlessServiceFQDN(a.reconciler.clusterDomain) +} + +func (a *attachedResources) ingress() { + // if already initialized, exit immediately + if _, ok := a.items[ingress]; ok { + return + } + clusterServiceName := a.getClusterServiceName() + + var pandaProxyIngressConfig *vectorizedv1alpha1.IngressConfig + subdomain := "" + proxyAPIExternal := a.cluster.PandaproxyAPIExternal() + if proxyAPIExternal != nil { + subdomain = proxyAPIExternal.External.Subdomain + pandaProxyIngressConfig = proxyAPIExternal.External.Ingress + } + + a.items[ingress] = resources.NewIngress( + a.reconciler.Client, + a.cluster, + a.reconciler.Scheme, + subdomain, + clusterServiceName, + resources.PandaproxyPortExternalName, + a.log).WithAnnotations(map[string]string{resources.SSLPassthroughAnnotation: "true"}).WithUserConfig(pandaProxyIngressConfig) +} + +func (a *attachedResources) nodeportService() { + // if already initialized, exit immediately + if _, ok := a.items[nodeportService]; ok { + return + } + redpandaPorts := networking.NewRedpandaPorts(a.cluster) + nodeports := collectNodePorts(redpandaPorts) + a.items[nodeportService] = resources.NewNodePortService(a.reconciler.Client, a.cluster, a.reconciler.Scheme, nodeports, a.log) +} + +func (a *attachedResources) getNodeportService() *resources.NodePortServiceResource { + a.nodeportService() + return a.items[nodeportService].(*resources.NodePortServiceResource) +} + +func (a *attachedResources) getNodeportServiceKey() types.NamespacedName { + return a.getNodeportService().Key() +} + +func (a *attachedResources) pki() error { + // if already initialized, exit immediately + if _, ok := a.items[pki]; ok { + return nil + } + + newPKI, err := certmanager.NewPki(a.ctx, a.reconciler.Client, a.cluster, a.getHeadlessServiceFQDN(), a.getClusterServiceFQDN(), a.reconciler.Scheme, a.log) + if err != nil { + return fmt.Errorf("creating pki: %w", err) + } + + a.items[pki] = newPKI + return nil +} + +func (a *attachedResources) getPKI() (*certmanager.PkiReconciler, error) { + err := a.pki() + if err != nil { + return nil, err + } + return a.items[pki].(*certmanager.PkiReconciler), nil +} + +func (a *attachedResources) podDisruptionBudget() { + // if already initialized, exit immediately + if _, ok := a.items[podDisruptionBudget]; ok { + return + } + a.items[podDisruptionBudget] = resources.NewPDB(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.log) +} + +func (a *attachedResources) proxySuperuser() { + // if already initialized, exit immediately + if _, ok := a.items[proxySuperuser]; ok { + return + } + + var proxySASLUser *resources.SuperUsersResource + a.items[proxySuperuser] = proxySASLUser + if a.cluster.IsSASLOnInternalEnabled() && a.cluster.PandaproxyAPIInternal() != nil { + a.items[proxySuperuser] = resources.NewSuperUsers(a.reconciler.Client, a.cluster, a.reconciler.Scheme, resources.ScramPandaproxyUsername, resources.PandaProxySuffix, a.log) + } +} + +func (a *attachedResources) getProxySuperuser() *resources.SuperUsersResource { + a.proxySuperuser() + return a.items[proxySuperuser].(*resources.SuperUsersResource) +} + +func (a *attachedResources) getProxySuperUserKey() types.NamespacedName { + if a.getProxySuperuser() == nil { + return types.NamespacedName{} + } + return a.getProxySuperuser().Key() +} + +func (a *attachedResources) schemaRegistrySuperUser() { + // if already initialized, exit immediately + if _, ok := a.items[schemaRegistrySuperUser]; ok { + return + } + + var schemaRegistrySASLUser *resources.SuperUsersResource + a.items[schemaRegistrySuperUser] = schemaRegistrySASLUser + if a.cluster.IsSASLOnInternalEnabled() && a.cluster.Spec.Configuration.SchemaRegistry != nil { + a.items[schemaRegistrySuperUser] = resources.NewSuperUsers(a.reconciler.Client, a.cluster, a.reconciler.Scheme, resources.ScramSchemaRegistryUsername, resources.SchemaRegistrySuffix, a.log) + } +} + +func (a *attachedResources) getSchemaRegistrySuperUser() *resources.SuperUsersResource { + a.schemaRegistrySuperUser() + return a.items[schemaRegistrySuperUser].(*resources.SuperUsersResource) +} + +func (a *attachedResources) getSchemaRegistrySuperUserKey() types.NamespacedName { + if a.getSchemaRegistrySuperUser() == nil { + return types.NamespacedName{} + } + return a.getSchemaRegistrySuperUser().Key() +} + +func (a *attachedResources) serviceAccount() { + // if already initialized, exit immediately + if _, ok := a.items[serviceAccount]; ok { + return + } + a.items[serviceAccount] = resources.NewServiceAccount(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.log) +} + +func (a *attachedResources) getServiceAccount() *resources.ServiceAccountResource { + a.serviceAccount() + return a.items[serviceAccount].(*resources.ServiceAccountResource) +} + +func (a *attachedResources) getServiceAccountKey() types.NamespacedName { + return a.getServiceAccount().Key() +} + +func (a *attachedResources) getServiceAccountName() string { + return a.getServiceAccountKey().Name +} + +func (a *attachedResources) secret() { + // if already initialized, exit immediately + if _, ok := a.items[secret]; ok { + return + } + a.items[secret] = resources.PreStartStopScriptSecret(a.reconciler.Client, a.cluster, a.reconciler.Scheme, a.getHeadlessServiceFQDN(), a.getProxySuperUserKey(), a.getSchemaRegistrySuperUserKey(), a.log) +} + +func (a *attachedResources) statefulSet() error { + // if already initialized, exit immediately + if _, ok := a.items[statefulSet]; ok { + return nil + } + pki, err := a.getPKI() + if err != nil { + return err + } + cm, err := a.getConfigMap() + if err != nil { + return err + } + a.items[statefulSet] = resources.NewStatefulSet( + a.reconciler.Client, + a.cluster, + a.reconciler.Scheme, + a.getHeadlessServiceFQDN(), + a.getHeadlessServiceName(), + a.getNodeportServiceKey(), + pki.StatefulSetVolumeProvider(), + pki.AdminAPIConfigProvider(), + a.getServiceAccountName(), + a.reconciler.configuratorSettings, + cm.GetNodeConfigHash, + a.reconciler.AdminAPIClientFactory, + a.reconciler.DecommissionWaitInterval, + a.log, + a.reconciler.MetricsTimeout) + return nil +} + +func (a *attachedResources) getStatefulSet() (*resources.StatefulSetResource, error) { + if err := a.statefulSet(); err != nil { + return nil, err + } + return a.items[statefulSet].(*resources.StatefulSetResource), nil +} diff --git a/src/go/k8s/main.go b/src/go/k8s/main.go index aabc3226b0f0..b251190cb77d 100644 --- a/src/go/k8s/main.go +++ b/src/go/k8s/main.go @@ -109,6 +109,7 @@ func main() { // storage driver. allowPVCDeletion bool debug bool + ghostbuster bool ) flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.") @@ -132,6 +133,8 @@ func main() { flag.StringVar(&vectorizedv1alpha1.SuperUsersPrefix, "superusers-prefix", "", "Prefix to add in username of superusers managed by operator. This will only affect new clusters, enabling this will not add prefix to existing clusters (alpha feature)") flag.BoolVar(&debug, "debug", false, "Set to enable debugging") flag.StringVar(&namespace, "namespace", "", "If namespace is set to not empty value, it changes scope of Redpanda operator to work in single namespace") + flag.BoolVar(&ghostbuster, "unsafe-decommission-failed-brokers", false, "Set to enable decommissioning a failed broker that is configured but does not exist in the StatefulSet (ghost broker). This may result in invalidating valid data") + _ = flag.CommandLine.MarkHidden("unsafe-decommission-failed-brokers") logOptions.BindFlags(flag.CommandLine) clientOptions.BindFlags(flag.CommandLine) @@ -290,6 +293,7 @@ func main() { DecommissionWaitInterval: decommissionWaitInterval, MetricsTimeout: metricsTimeout, RestrictToRedpandaVersion: restrictToRedpandaVersion, + GhostDecommissioning: ghostbuster, }).WithClusterDomain(clusterDomain).WithConfiguratorSettings(configurator).WithAllowPVCDeletion(allowPVCDeletion).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Unable to create controller", "controller", "Cluster") os.Exit(1)