diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index dd605edecd..a47e0ee85c 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -95,7 +95,7 @@ const ( var ( // kafkaConfigBrokerRackRegex the regex to parse the "broker.rack" Kafka property used in read-only configs - kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*(\w+)`) + kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*([\w-]+)`) ) // Reconciler implements the Component Reconciler @@ -219,6 +219,8 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { log.V(1).Info("Reconciling") + log.Info("broker rack map", r.kafkaBrokerAvailabilityZoneMap) + ctx := context.Background() if err := k8sutil.UpdateBrokerConfigurationBackup(r.Client, r.KafkaCluster); err != nil { log.Error(err, "failed to update broker configuration backup") @@ -926,6 +928,9 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo // Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed() terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items) + if len(terminatingOrPendingPods) > 0 { + log.Info("terminating or pending pods", terminatingOrPendingPods) + } if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") } diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index ba531ecbf2..5ebaa7065e 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1269,9 +1269,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { desiredPod: &corev1.Pod{}, currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}}, pods: []corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, errorExpected: true, }, @@ -1293,9 +1293,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { desiredPod: &corev1.Pod{}, currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, pods: []corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, errorExpected: true, }, @@ -1531,6 +1531,124 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { outOfSyncReplicas: []int32{101}, errorExpected: false, }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartsAllowed: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if there are out-of-sync replicas in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartsAllowed: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + outOfSyncReplicas: []int32{101}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if there are offline replicas in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartsAllowed: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + allOfflineReplicas: []int32{101}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, if broker rack value contains dashes", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az-1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az-2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az-3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartsAllowed: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + errorExpected: true, + }, } mockCtrl := gomock.NewController(t) @@ -1557,7 +1675,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { // Mock kafka client mockedKafkaClient := new(mocks.KafkaClient) - mockedKafkaClient.On("AllOfflineReplicas").Return(test.outOfSyncReplicas, nil) + mockedKafkaClient.On("AllOfflineReplicas").Return(test.allOfflineReplicas, nil) mockedKafkaClient.On("OutOfSyncReplicas").Return(test.outOfSyncReplicas, nil) mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil)