Skip to content

Commit

Permalink
Address PR comments - part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
ctrlaltluc committed Jul 21, 2023
1 parent 46920e0 commit 43a982a
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 34 deletions.
53 changes: 22 additions & 31 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"

"emperror.dev/errors"
properties "github.com/banzaicloud/koperator/properties/pkg"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -87,9 +88,6 @@ const (
nonControllerBrokerReconcilePriority
// controllerBrokerReconcilePriority the priority used for controller broker used to define its priority in the reconciliation order
controllerBrokerReconcilePriority

// defaultConcurrentBrokerRestartsAllowed the default number of brokers that can be restarted in parallel
defaultConcurrentBrokerRestartsAllowed = 1
)

var (
Expand All @@ -100,10 +98,7 @@ var (
// Reconciler implements the Component Reconciler
type Reconciler struct {
resources.Reconciler
// kafkaClientProvider is used to create a new KafkaClient
kafkaClientProvider kafkaclient.Provider
// kafkaBrokerAvailabilityZoneMap is a map of broker id to availability zone used in concurrent broker restarts logic
kafkaBrokerAvailabilityZoneMap map[int32]string
}

// New creates a new reconciler for Kafka
Expand All @@ -114,17 +109,19 @@ func New(client client.Client, directClient client.Reader, cluster *v1beta1.Kafk
DirectClient: directClient,
KafkaCluster: cluster,
},
kafkaClientProvider: kafkaClientProvider,
kafkaBrokerAvailabilityZoneMap: getBrokerAzMap(cluster),
kafkaClientProvider: kafkaClientProvider,
}
}

func getBrokerAzMap(cluster *v1beta1.KafkaCluster) map[int32]string {
brokerAzMap := make(map[int32]string)
for _, broker := range cluster.Spec.Brokers {
brokerRack := getBrokerRack(broker.ReadOnlyConfig)
if brokerRack != "" {
brokerAzMap[broker.Id] = brokerRack
readOnlyConfigs, err := properties.NewFromString(broker.ReadOnlyConfig)
if err == nil {
brokerRack, brokerRackConfigFound := readOnlyConfigs.Get("broker.rack")
if brokerRackConfigFound {
brokerAzMap[broker.Id] = brokerRack.Value()
}
}
}
// if incomplete broker AZ information, consider all brokers as being in different AZs
Expand Down Expand Up @@ -916,18 +913,19 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
if err != nil {
return errors.WrapIf(err, "failed to reconcile resource")
}
// Check that all pods are present as in spec, before checking for terminating or pending pods, as we can have absent pods
if len(podList.Items) < len(r.KafkaCluster.Spec.Brokers) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod count differs from brokers spec"), "rolling upgrade in progress")
}

// Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once
concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed()
terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items)
if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress")
if len(terminatingOrPendingPods) >= r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack)+" pod(s) is still terminating or creating"), "rolling upgrade in progress")
}
currentPodAz := r.getBrokerAz(currentPod)
if concurrentBrokerRestartsAllowed > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods) {
kafkaBrokerAvailabilityZoneMap := getBrokerAzMap(r.KafkaCluster)
currentPodAz := r.getBrokerAz(currentPod, kafkaBrokerAvailabilityZoneMap)
if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && r.existsTerminatingPodFromAnotherAz(currentPodAz, terminatingOrPendingPods, kafkaBrokerAvailabilityZoneMap) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("pod is still terminating or creating from another AZ"), "rolling upgrade in progress")
}

Expand Down Expand Up @@ -965,8 +963,8 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
}

// If multiple concurrent restarts and broker failures allowed, restart only brokers from the same AZ
if concurrentBrokerRestartsAllowed > 1 && r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold > 1 {
if r.existsFailedBrokerFromAnotherRack(currentPodAz, impactedReplicas) {
if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 && r.KafkaCluster.Spec.RollingUpgradeConfig.FailureThreshold > 1 {
if r.existsFailedBrokerFromAnotherRack(currentPodAz, impactedReplicas, kafkaBrokerAvailabilityZoneMap) {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New("broker is not healthy from another AZ"), "rolling upgrade in progress")
}
}
Expand All @@ -991,31 +989,24 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
return nil
}

func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}) bool {
func (r *Reconciler) existsFailedBrokerFromAnotherRack(currentPodAz string, impactedReplicas map[int32]struct{}, kafkaBrokerAvailabilityZoneMap map[int32]string) bool {
if currentPodAz == "" && len(impactedReplicas) > 0 {
return true
}
for brokerWithFailure := range impactedReplicas {
if currentPodAz != r.kafkaBrokerAvailabilityZoneMap[brokerWithFailure] {
if currentPodAz != kafkaBrokerAvailabilityZoneMap[brokerWithFailure] {
return true
}
}
return false
}

func (r *Reconciler) getConcurrentBrokerRestartsAllowed() int {
if r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack > 1 {
return r.KafkaCluster.Spec.RollingUpgradeConfig.ConcurrentBrokerRestartCountPerRack
}
return defaultConcurrentBrokerRestartsAllowed
}

func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, terminatingOrPendingPods []corev1.Pod) bool {
func (r *Reconciler) existsTerminatingPodFromAnotherAz(currentPodAz string, terminatingOrPendingPods []corev1.Pod, kafkaBrokerAvailabilityZoneMap map[int32]string) bool {
if currentPodAz == "" && len(terminatingOrPendingPods) > 0 {
return true
}
for _, terminatingOrPendingPod := range terminatingOrPendingPods {
if currentPodAz != r.getBrokerAz(&terminatingOrPendingPod) {
if currentPodAz != r.getBrokerAz(&terminatingOrPendingPod, kafkaBrokerAvailabilityZoneMap) {
return true
}
}
Expand Down Expand Up @@ -1385,12 +1376,12 @@ func getPodsInTerminatingOrPendingState(items []corev1.Pod) []corev1.Pod {
return pods
}

func (r *Reconciler) getBrokerAz(pod *corev1.Pod) string {
func (r *Reconciler) getBrokerAz(pod *corev1.Pod, kafkaBrokerAvailabilityZoneMap map[int32]string) string {
brokerId, err := strconv.ParseInt(pod.Labels[v1beta1.BrokerIdLabelKey], 10, 32)
if err != nil {
return ""
}
return r.kafkaBrokerAvailabilityZoneMap[int32(brokerId)]
return kafkaBrokerAvailabilityZoneMap[int32(brokerId)]
}

func getServiceFromExternalListener(client client.Client, cluster *v1beta1.KafkaCluster,
Expand Down
125 changes: 122 additions & 3 deletions pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
errorExpected: true,
},
{
testName: "Pod is deleted if allowed concurrent restarts is not specified and failure threshold is not reached",
testName: "Pod is deleted if allowed concurrent restarts is default and failure threshold is not reached",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Expand All @@ -1140,7 +1140,8 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
{Id: 302, ReadOnlyConfig: "broker.rack=az3"}},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 1,
FailureThreshold: 1,
ConcurrentBrokerRestartCountPerRack: 1,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
Expand Down Expand Up @@ -1262,6 +1263,124 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
outOfSyncReplicas: []int32{101},
errorExpected: false,
},
{
testName: "Pod is not deleted if pod is restarting in another AZ, if brokers per AZ < tolerated failures",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartCountPerRack: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
{
testName: "Pod is not deleted if there are out-of-sync replicas in another AZ, if brokers per AZ < tolerated failures",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartCountPerRack: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
outOfSyncReplicas: []int32{101},
errorExpected: true,
},
{
testName: "Pod is not deleted if there are offline replicas in another AZ, if brokers per AZ < tolerated failures",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartCountPerRack: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
allOfflineReplicas: []int32{101},
errorExpected: true,
},
{
testName: "Pod is not deleted if pod is restarting in another AZ, if broker rack value contains dashes",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az-1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az-2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az-3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartCountPerRack: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
}

mockCtrl := gomock.NewController(t)
Expand All @@ -1288,7 +1407,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {

// Mock kafka client
mockedKafkaClient := new(mocks.KafkaClient)
mockedKafkaClient.On("AllOfflineReplicas").Return(test.outOfSyncReplicas, nil)
mockedKafkaClient.On("AllOfflineReplicas").Return(test.allOfflineReplicas, nil)
mockedKafkaClient.On("OutOfSyncReplicas").Return(test.outOfSyncReplicas, nil)
mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil)

Expand Down

0 comments on commit 43a982a

Please sign in to comment.