diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 97511d6df..d1fe03e4d 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -96,7 +96,7 @@ const ( var ( // kafkaConfigBrokerRackRegex the regex to parse the "broker.rack" Kafka property used in read-only configs - kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*(\w+)`) + kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*([\w-]+)`) ) // Reconciler implements the Component Reconciler @@ -221,6 +221,8 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { log.V(1).Info("Reconciling") + log.Info("broker rack map", "kafkaBrokerAvailabilityZoneMap", r.kafkaBrokerAvailabilityZoneMap) + ctx := context.Background() if err := k8sutil.UpdateBrokerConfigurationBackup(r.Client, r.KafkaCluster); err != nil { log.Error(err, "failed to update broker configuration backup") @@ -927,6 +929,9 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo // Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed() terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items) + if len(terminatingOrPendingPods) > 0 { + log.Info("terminating or pending pods", "terminatingOrPendingPods", terminatingOrPendingPods) + } if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed { return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress") } diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index 2843ec5fe..c91ee266d 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -1269,9 +1269,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { desiredPod: &corev1.Pod{}, currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}}, pods: []corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, errorExpected: true, }, @@ -1293,9 +1293,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { desiredPod: &corev1.Pod{}, currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, pods: []corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, - {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, }, errorExpected: true, }, @@ -1531,6 +1531,124 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { outOfSyncReplicas: []int32{101}, errorExpected: false, }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartsAllowed: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + errorExpected: true, + }, + { + testName: "Pod is not deleted if there are out-of-sync replicas in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartsAllowed: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + outOfSyncReplicas: []int32{101}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if there are offline replicas in another AZ, if brokers per AZ < tolerated failures", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartsAllowed: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + allOfflineReplicas: []int32{101}, + errorExpected: true, + }, + { + testName: "Pod is not deleted if pod is restarting in another AZ, if broker rack value contains dashes", + kafkaCluster: v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + Spec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{ + {Id: 101, ReadOnlyConfig: "broker.rack=az-1"}, + {Id: 201, ReadOnlyConfig: "broker.rack=az-2"}, + {Id: 301, ReadOnlyConfig: "broker.rack=az-3"}, + }, + RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{ + FailureThreshold: 2, + ConcurrentBrokerRestartsAllowed: 2, + }, + }, + Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading}, + }, + desiredPod: &corev1.Pod{}, + currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + pods: []corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}}, + {ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}}, + }, + errorExpected: true, + }, } mockCtrl := gomock.NewController(t) @@ -1556,9 +1674,13 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) { } // Mock kafka client - mockedKafkaClient := new(mocks.KafkaClient) - mockedKafkaClient.On("AllOfflineReplicas").Return(test.outOfSyncReplicas, nil) - mockedKafkaClient.On("OutOfSyncReplicas").Return(test.outOfSyncReplicas, nil) + mockedKafkaClient := mocks.NewMockKafkaClient(mockCtrl) + if test.allOfflineReplicas != nil { + mockedKafkaClient.EXPECT().AllOfflineReplicas().Return(test.allOfflineReplicas, nil) + } + if test.outOfSyncReplicas != nil { + mockedKafkaClient.EXPECT().OutOfSyncReplicas().Return(test.outOfSyncReplicas, nil) + } mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil) // Call the handleRollingUpgrade function with the provided test.desiredPod and test.currentPod